Java BIO.md

2022-08-22
0 689

I/O模型说明

I/O模型:用什么样的通信模式和架构进行数据的传输和接收,很大程度上决定了程序通信的性能。java共支持3种网络编程的I/O模型:BIONIOAIO

I/O模型

java BIO

同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情就会造成不必要的线程开销。客户端越多,服务端就需要启动更多的线程来满足每个客户端的通信需求,同时,每个客户端在发送或者接收数据的时候都是同步的,意思就是即使当前线程处于等待状态,该线程也需要等待且不能做其他事情。由此,BIO模式下需要创建很多线程,势必会带来很大的系统开销。
简单示意图:
image-20210531174637527

java NIO

同步非阻塞,服务器实现模式为一个线程处理多个请求,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
简单示意图:
image-20210531180009505

java AIO

异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般适用于连接数较多且连接时间较长的应用。

适用场景

  • BIO方式适用于连接数比较小且固定的架构,这种方式对服务器资源要求高,并发局限于应用中,程序简单易理解。
  • NIO方式适用于连接数多且连接比较短的架构,比如聊天服务器、弹幕系统、服务器间通讯等。编程较为复杂。
  • AIO方式适用于连接数多且连接比较长的架构,比如相册服务器,充分调用OS参与并发操作,编程较为复杂。

BIO

BIO 基本介绍

  • BIO就是传统的java io编程,其相关的类和接口在java.io包下
  • BIO(blocking I/O):同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务端就需要启动一个线程进行处理,如果这个连接不做任何事情就会造成不必要的线程开销,可以通过线程池机制来改善。

BIO工作机制

image-20210601140429108

BIO编程实例

  • 服务端代码
/**
 * 服务端
 */
public class Server {

    public static void main(String[] args) {
        try {
            //
            ServerSocket ss = new ServerSocket(2323);
            System.out.println("=== 服务端启动成功 ===");
            // 等待客户端连接,程序会在这行代码阻塞
            Socket socket = ss.accept();
            // 获取输入流
            InputStream is = socket.getInputStream();
            // 获取缓冲输入字符流
            BufferedReader bis = new BufferedReader(new InputStreamReader(is));
            // 读取数据
            String msg;
            while ((msg = bis.readLine()) != null) {
                System.out.println("接收到客户端发送消息:" + msg);
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

}
  • 客户端代码
/**
 * 客户端
 */
public class Client {

    public static void main(String[] args) {
        //
        try {
            // 创建socket对象
            Socket s = new Socket("127.0.0.1", 2323);
            // 获取输出流
            OutputStream os = s.getOutputStream();
            // 将字节输出流包装成打印流
            PrintWriter pw = new PrintWriter(os);
            pw.println("hello world!");
            pw.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
  • 执行结果
=== 服务端启动成功 ===
接收到客户端发送消息:hello world!
java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.io.InputStreamReader.read(InputStreamReader.java:184)
	at java.io.BufferedReader.fill(BufferedReader.java:161)
	at java.io.BufferedReader.readLine(BufferedReader.java:324)
	at java.io.BufferedReader.readLine(BufferedReader.java:389)
	at com.fzkj.bio.Server.main(Server.java:25)

Process finished with exit code 0

在上面的示例中,实现了一个简单的客户端向服务端发送数据的例子。从结果来看,服务端虽然接收到了客户端发送的消息,但是却抛出了一个Connnection reset的异常。前面我们说过,BIO模式下的通信,服务端会为每一个客户端分配一个线程去处理请求,在这个示例中使用的是main线程,我们在服务端读取数据的时候使用的是一个while循环获取,而客户端的数据却只发送了一次就退出了,所以服务端在进入第二次等待数据的时候,等来的却是客户端的断开,就抛出了异常。

小结

  • 在以上通信中,服务端会一直等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直进入阻塞状态。
  • 同时服务端是按照行获取数据的(本例中),这意味着客户端也必须按照行发送数据,否则服务端将进入等待消息的阻塞状态。

改进

在上面的案例中,客户端只是发送了一条消息,下面就改进一下让客户端和服务端可以一直保持通信。

服务端不需要改,因为本来就是在一直接收客户端消息。只需要将客户端进行小小的改动即可。

  • 客户端改进
Scanner sc = new Scanner(System.in);
while (true) {
    System.out.println("输入要发送的消息:");
    String msg = sc.nextLine();
    pw.println(msg);
    pw.flush();
}

BIO模式下连接多个客户端

上述案例中,一个服务端只能接收一个客户端的通信请求,究其原因是我们在服务端只接收了一个连接请求,就是那个accept方法。那么如何让服务端可以接收多个客户端请求呢?这时就需要引入多线程了,为每个连接请求分配一个线程去处理。

  • 服务端代码示例
/**
 * 实现服务端可以接收多个客户端的连接请求
 * 思路:
 *  1、应该一直调用accept方法监听
 *  2、当有连接请求就新开一个线程去处理
 */
public class Server {

    public static void main(String[] args) {
        startServer();
    }

    public static void startServer() {
        try {
            // 1、监听端口
            ServerSocket ss = new ServerSocket(2323);
            System.out.println("==== 服务端启动成功 ====");
            while(true) {
                // 等待客户端连接
                Socket socket = ss.accept();
                // 交由一个新的线程去处理
                new Thread(new ServerReaderThread(socket)).start();
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

}

/**
 * 处理客户端请求
 */
class ServerReaderThread implements Runnable {

    private Socket socket;

    public ServerReaderThread(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            // 获取输入流
            InputStream is = socket.getInputStream();
            // 包装成缓冲字符输入流
            BufferedReader br = new BufferedReader(new InputStreamReader(is));
            String msg;
            // 接收消息
            while((msg = br.readLine()) != null) {
                System.out.println("接收到客户端 " + socket.getLocalPort() + " 的消息:" + msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
  • 客户端代码示例
public class Client {
    public static void main(String[] args) {
        try {
            // 1、创建socket连接对象,请求与服务端连接
            Socket socket = new Socket("127.0.0.1", 2323);
            // 2、 获取一个打印流
            PrintStream ps = new PrintStream(socket.getOutputStream());
            Scanner sc = new Scanner(System.in);
            while(true) {
                System.out.print("输入要发送的内容:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

小结

  • 每个socket接收到都会创建一个线程,线程的竞争和切换上下文会影响性能。
  • 每个线程都会占用栈空间和CPU资源。
  • 并不是每个socket都进行IO操作,无意义的线程处理
  • 客户端的并发访问增加时,服务端的线程数量也会增加 ,访问量越大,系统将会发生线程栈溢出,线程创建失败等问题,从而导致进程假死,不能提供服务。

伪异步I/O编程

接着我们采用一个伪异步I/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的Socket封装成一个Task交给后端的线程池进行处理。

  • 服务端
/**
 * 实现伪异步通信架构
 */
public class Server {
    public static void main(String[] args) {
        try {
            // 1、注册端口
            ServerSocket ss = new ServerSocket(2323);
            // 2、创建线程池对象
            ServerSocketPoolHandler ssp = new ServerSocketPoolHandler(3, 10);
            while(true) {
                Socket s = ss.accept();
                // 把socket封装成可执行的任务对象,传递给线程池处理
                ssp.execute(new ServerRunnbleTarget(s));
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}
  • 客户端
/**
 * 客户端
 */
public class Client {
    public static void main(String[] args) {
        Socket socket = null;
        try {
            // 1、创建socket连接对象,请求与服务端连接
            socket = new Socket("127.0.0.1", 2323);
            // 2、 获取一个打印流
            PrintStream ps = new PrintStream(socket.getOutputStream());
            Scanner sc = new Scanner(System.in);
            while(true) {
                System.out.print("输入要发送的内容:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 任务对象封装类
/**
 * 任务对象
 */
public class ServerRunnbleTarget implements Runnable{

    private Socket socket;

    public ServerRunnbleTarget(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            // 创建输入流
            InputStream is = socket.getInputStream();
            // 包装为字符输入流
            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
            String msg;
            while((msg = reader.readLine()) != null) {
                System.out.println("接收到客户端消息:" + msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 线程池
/**
 * 服务端socket处理线程池
 */
public class ServerSocketPoolHandler {

    private ExecutorService executorService;

    /**
 *     public ThreadPoolExecutor(int corePoolSize, // 核心数
 *                               int maximumPoolSize, // 最大线程数量
 *                               long keepAliveTime,
 *                               TimeUnit unit,
 *                               BlockingQueue<Runnable> workQueue)
     * @param maxThread
     * @param queueSize
     */
    public ServerSocketPoolHandler(int maxThread, int queueSize){
        executorService = new ThreadPoolExecutor(2, maxThread, 120, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(queueSize));
    }

    /**
     * 交由线程池
     * @param target
     */
    public void execute(Runnable target) {
        executorService.execute(target);
    }

}

小结

  • 伪异步io采用了线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然是采用同步阻塞模型,因此无法从根本上解决问题。
  • 如果单个消息处理得缓慢,获取服务器线程池中的全部线程都被阻塞,那么后续socket的io消息都将在队列中排队。

基于BIO模式下的文件上传

  • 服务端代码
/**
 * 接收客户端任意类型文件,并保存到磁盘
 */
public class Server {

    public static void main(String[] args) {
        try{
            ServerSocket ss = new ServerSocket(2323);
            while(true){
                Socket s = ss.accept();
                new Thread(new ServerReadThread(s)).start();
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }

}
  • 客户端代码
/**
 * 实现客户端上传任意类型的文件数据给服务端
 */
public class Client {

    public static void main(String[] args) {
        Socket socket = null;
        try (
                InputStream is = new FileInputStream("E:\\img.py");
                ){
            // 请求连接服务端
            socket = new Socket("127.0.0.1",2323);
            // 把字节输出流包装成一个数据输出流
            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
            // 先发送文件后缀
            dos.writeUTF(".py");
            // 再发送文件数据
            byte[] buffer = new byte[1024];
            int len;
            while((len = is.read(buffer)) > 0) {
                dos.write(buffer, 0, len);
            }
            dos.flush();
            socket.shutdownInput(); // 告诉服务端发送完毕
        }catch(Exception e) {
            e.printStackTrace();
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}
  • 处理线程
/**
 * 处理客户端请求的线程
 */
public class ServerReadThread implements Runnable {
    private Socket socket;

    public ServerReadThread(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        OutputStream os = null;
        try {
            // 得到一个数据输入流
            DataInputStream dis = new DataInputStream(socket.getInputStream());
            // 读取文件后缀
            String suffix = dis.readUTF();
            // 定义一个文件输出流把客户端传过来的文件保存到磁盘
            os = new FileOutputStream("E:\\upload\\"
            + UUID.randomUUID().toString() + suffix);
            byte[] buffer = new byte[1024];
            int len;
            while((len = dis.read(buffer)) > 0){
                os.write(buffer, 0, len);
            }
            os.flush();
            System.out.println("文件保存成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                os.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

BIO模式下端口转发思想

需要实现一个客户端的消息可以发送给所有客户端接收。

实现思路:

  • 需要接收消息
  • 在服务端定义一个在线的socket集合
  • 分配不同线程处理
  • 客户端需要接收消息

这个被称为端口转发思想。