首页 > 编程学习 > Java BIO

Java BIO

发布时间:2022/1/17 12:25:47

Java BIO

Java BIO指的是blocking I/O,同步并阻塞式的IO

同步vs异步

同步和异步关注的是消息通信机制 (synchronous communication/ asynchronous communication)
所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果。
而异步则是相反,调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态通知来通知调用者,或通过回调函数处理这个调用。

阻塞vs非阻塞

阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态.
阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销
流程图
BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,但程序简单比较容易理解

BIO编程简单流程
1.服务器启动一个ServerSocket
2.客户端启动Socket,对服务器进行通信。默认情况下服务器端需要对每一个客户端简建立一个线程与之通信
3.客户端发出请求后,先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝
4.如果有响应,客户端则会等待请求结束后,再继续执行

通过如下的例子来说明,创建一个服务端,使用telnet来连接服务端

在mac上,通过brew install telnet来安装telnet

public class Main {
    public static void main(String[] args) {

        try {
            /**
             * 1.创建一个线程池
             * 2.如果有客户端连接,就创建一个线程,与之通讯
             */
            ExecutorService executorService = Executors.newCachedThreadPool();
            //创建ServerSocket
            ServerSocket serverSocket = new ServerSocket(10000);

            System.out.println("服务器启动了...");

            while (true) {
                //监听,等待客户端连接
                System.out.println("等待连接...");
                final Socket socket = serverSocket.accept();
                System.out.println("连接到了一个客户端...");

                //创建一个线程,与之通讯
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        handler(socket);
                    }
                });
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    //编写一个handler,与客户端通讯
    public static void handler(Socket socket) {
        try {
            System.out.println("线程信息 id = " + Thread.currentThread().getId() + ", 线程名称 = " + Thread.currentThread().getName());
            byte[] bytes = new byte[1024];
            //获取输入流
            InputStream inputStream = socket.getInputStream();
            //循环读取客户端发过来的数据
            while (true) {
                System.out.println("线程信息 id = " + Thread.currentThread().getId() + ", 线程名称 = " + Thread.currentThread().getName());

                System.out.println("等待read...");
                int read = inputStream.read(bytes);
                if (read != -1) {
                    String s = new String(bytes, 0, read);
                    System.out.println("获取client的数据:" + s);
                } else {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            System.out.println("关闭和client的连接...");
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

使用telnet 127.0.0.1 10000来连接服务端,输出结果如下:

服务器启动了...
等待连接...
连接到了一个客户端...
等待连接...
线程信息 id = 15, 线程名称 = pool-1-thread-1
线程信息 id = 15, 线程名称 = pool-1-thread-1
等待read...
获取client的数据:123456789

线程信息 id = 15, 线程名称 = pool-1-thread-1
等待read...
获取client的数据:98654321

线程信息 id = 15, 线程名称 = pool-1-thread-1
等待read...

可以看到,存在2处阻塞

  • final Socket socket = serverSocket.accept(); - 阻塞
  • int read = inputStream.read(bytes); - 阻塞

阻塞会造成线程资源的浪费

总结
1.每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
2.每个线程都会占用栈空间和CPU资源;
3.并不是每个socket都进行IO操作,无意义的线程处理;
4.客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

实现文件上传

下面的例子,通过socket实现一个文件的服务器,步骤见注释
1.客户端

/**
 * 文件上传client
 */
public class Client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1",8888);
            //把字节输出流包装成一个数据输出流
            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
            //1.先发送文件后缀
            dos.writeUTF(".png");
            //2.把文件数据发送给服务端进行接收
            InputStream is = new FileInputStream("/Users/xxxxxx/Pictures/avatar.png");
            byte[] buffer = new byte[1024];
            int len;
            while ((len = is.read(buffer)) > 0) {
                dos.write(buffer, 0, len);
            }
            dos.flush();
            //通知服务端这边的数据已发送完毕
            socket.shutdownOutput();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.服务端

/**
 * 文件上传服务端
 */
public class Server {
    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(8888);
            while (true) {
                Socket socket = serverSocket.accept();
                new ServerReaderThread(socket).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class ServerReaderThread extends Thread {
    private Socket socket;

    public ServerReaderThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            //数据输入流
            DataInputStream dis = new DataInputStream(socket.getInputStream());
            //读取客户端发送的文件类型
            String suffix = dis.readUTF();
            System.out.println("服务端已接收到文件后缀 = " + suffix);
            //定义一个字节输出管道,把客户端发过来的数据写出去
            OutputStream os = new FileOutputStream("/Users/wangzhen/Documents/JaveDemo/BIO/server/" +
                    UUID.randomUUID().toString() + suffix);
            //从数据输入流,读取输入数据,写出到字节输出流中
            byte[] bytes = new byte[1024];
            int len;
            while ((len = dis.read(bytes)) > 0) {
                os.write(bytes, 0, len);
            }
            os.close();
            System.out.println("服务端已接收到文件保存成功");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

运行后上传一张图片,如下:
上传图片

实现端口转发

即,一个客户端的消息可以发送给所有的客户端去接收,如下图所示:

端口转发
服务端代码如下,详情可见注释:

/**
 * 服务端
 * 1.注册端口
 * 2.接收客户端socket连接,实现一个独立的线程来处理
 * 3.把当前连接的客户端socket放入一个所谓的在线socket集合汇总保存
 * 4.接收客户端的消息后,把消息推送给所有的在线的socket
 */
public class Server {
    public static List<Socket> allOnlineSockets = new ArrayList<>();

    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(10000);
            while (true) {
                Socket socket = serverSocket.accept();
                allOnlineSockets.add(socket);
                //为当前登录成功的socket,分配一个独立的线程来处理,与之通讯
                new ServerReaderThread(socket).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

class ServerReaderThread extends Thread {
    private Socket socket;

    public ServerReaderThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            //从socket中取获取当前客户端的输入流
            BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String msg;
            while ((msg = br.readLine()) != null) {
                sendMsgToAllClient(msg);
            }
        } catch (Exception e) {
            System.err.println("有人下线了!");
            // 如果出异常,可以理解为有人下线了,更新在线客户端
            Server.allOnlineSockets.remove(socket);
        }
    }

    /**
     * 把客户端发送的消息,推送给所有的socket客户端
     * @param msg
     */
    private void sendMsgToAllClient(String msg)  {
        try {
            for (Socket allOnlineSocket : Server.allOnlineSockets) {
                PrintStream ps = new PrintStream(socket.getOutputStream());
                ps.println(msg);
                ps.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

简单即时通讯

可参考:

  • 从前慢-BIO、NIO、AIO
Copyright © 2010-2022 ngui.cc 版权所有 |关于我们| 联系方式| 豫B2-20100000