目录
- 前言
- BIO版本
- NIO版本
- mini-netty版本v1:Reactor线程模型
- mini-netty版本v2:任务处理
- 总结
前言
Netty如今是使用最广泛的网络通信框架,许多人对此有强烈的学习需求。但是Netty本身代码量大、概念众多,单纯看代码学习容易一头雾水,看了后面忘了前面。这个问题的破解之道,就是以自己的思考重新写一遍,这是掌握一个复杂设计的最好方法。本文会带着你从零开始设计一个极简版的Netty,为你演示如何从最原始的BIO进化到NIO,再一步步添加Netty的Reactor线程模型和任务处理的。本文的源代码放在Github上(mini-netty),有兴趣的读者可以自己取阅。
BIO版本
先看最原始的BIO。BIO的特点是阻塞式的,它的API使用简单,但是缺点也很明显:一个连接对应一个线程,如果连接数量过多,那么线程数量也会很多,而CPU核数有限,造成大量不必要的线程切换开销。代码示例如下:
/*** 基于BIO的客户端*/
public class BioClient {public static void main(String[] args) throws IOException {try (Socket socket = new Socket("127.0.0.1", 13)) {OutputStream os = socket.getOutputStream();PrintStream ps = new PrintStream(os);try (Scanner sc = new Scanner(System.in)) {while(true){// 循环从系统输入获取消息并发送System.out.print("type in your msg:");String msg = sc.nextLine();ps.println(msg);ps.flush();}}}}
}
/*** 基于BIO的服务端*/
public class BioServer {public static void main(String[] args) throws IOException{try (ServerSocket ss = new ServerSocket(13)) {while(true){// 循环接收新的连接,并启动新的线程处理对应连接的IO消息Socket socket = ss.accept();new Thread(){@Overridepublic void run() {try{InputStream is = socket.getInputStream();BufferedReader br = new BufferedReader(new InputStreamReader(is));String msg;while((msg = br.readLine())!=null){System.out.println("receive msg:" + msg);}}catch (Exception e){e.printStackTrace();}}}.start();}}}
}
NIO版本
由于BIO的上述缺点,Java在后来的版本中推出了NIO,以获得非阻塞、更加可扩展的IO。NIO三大核心:Selector、Buffer、Channel。其中又以Selector为首,它仅需一个主线程循环,便可以通过收集和处理事件的方式,管理成千上万个连接,整个过程不会阻塞。示例代码如下:
/*** 基于NIO的客户端*/
public class NioClient {public static void main(String[] args) throws Exception{// 启动16个socket分别发送消息for (int i = 1; i <= 16; i++) {InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 13);SocketChannel socketChannel = SocketChannel.open(inetSocketAddress);socketChannel.configureBlocking(false);String msg = "msg" + i;ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());System.out.println("socket" + i + " send msg:" + msg);socketChannel.write(byteBuffer);socketChannel.close();Thread.sleep(100);}}
}
/*** 基于BIO的服务端*/
public class NioServer {public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();InetSocketAddress inetSocketAddress = new InetSocketAddress(13);serverSocketChannel.socket().bind(inetSocketAddress);//设置成非阻塞serverSocketChannel.configureBlocking(false); Selector selector = Selector.open();// 对serverSocket仅监听ACCEPT事件serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(true) {selector.select();//遍历selectionKeysSet<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();//处理连接事件if(key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept();//设置为非阻塞socketChannel.configureBlocking(false); System.out.println("client:" + socketChannel.getRemoteAddress() + " connected");//注册socket的READ事件到selectorsocketChannel.register(selector, SelectionKey.OP_READ); //处理读取事件} else if (key.isReadable()) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024);SocketChannel channel = (SocketChannel) key.channel();int len = channel.read(byteBuffer);if (len >= 0) {byte[] array = byteBuffer.array();String msg = new String(array);System.out.println("receive " + msg + " from client:" + channel.getRemoteAddress());}// <0说明客户端断开(一定要处理,否则会一直收到READ事件)else {System.out.println("client:" + channel.getRemoteAddress() + " disconnected");channel.close();}}//事件处理完毕,要记得清除iterator.remove(); }}}
}
从上面代码我们可以看出NIO有两个缺点:一是代码编写远比BIO复杂,所以现在很难再看到裸用NIO编写代码的人,一般都会在封装好的框架上做二次开发,比如Netty就封装成了基于事件的开发模式,只需要在channelActive、channelRead等方法中编写业务逻辑即可;二是上面代码仅使用单线程,并未充分利用多核能力,完全可以让主线程专门处理连接接收,多个子线程处理各个channel的读写事件。这个思路实际上就是Netty的Reactor模型:使用bossGroup线程池处理连接接收,等channel建立好后,再绑定到workerGroup线程池中的一个线程,这个线程负责处理绑定在它上面的channel的读写事件。
mini-netty版本v1:Reactor线程模型
基于以上思路,我们改进之前的代码。首先我们创建NettyClient和NettyServer类,将已编写好的NioClient和NioServer类分别拷贝到这两个新类中。我们打算用NettyServer处理连接接收,那么就需要单独编写一个类,处理channel的读写事件。这个新类需要继承Thread类,并且把原NettyServer类中处理读写事件的逻辑移植过来。按照这个思路编写NioEventLoop类如下:
/*** worker线程池中的线程* 功能类似于Netty中的同名类*/
public class NioEventLoop extends Thread {public NioEventLoop() {try {//构造方法中初始化自己的selectorthis.selector = Selector.open();} catch (IOException e) {e.printStackTrace();}}private Selector selector;public Selector getSelector() {return this.selector;}@Overridepublic void run() {while (true) {int selectNum = 0;try {//select超时时间设为10ms,避免一直阻塞selectNum = selector.select();} catch (IOException e) {e.printStackTrace();}if (selectNum > 0) {//监听所有通道//遍历selectionKeysSet<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();//处理读取事件if (key.isReadable()) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024);SocketChannel channel = (SocketChannel) key.channel();try {int len = channel.read(byteBuffer);if (len >= 0) {byte[] array = byteBuffer.array();String msg = new String(array);System.out.println("receive " + msg + " from client:" + channel.getRemoteAddress());}// <0说明客户端断开(一定要处理,否则会一直收到READ事件)else {System.out.println("client:" + channel.getRemoteAddress() + " disconnected");channel.close();}} catch (IOException e) {e.printStackTrace();}}//事件处理完毕,要记得清除iterator.remove(); }}}}
}
在原来的NettyServer类中,我们需要移除处理读写事件的代码,但是需要在新连接建立后,将新channel绑定到其中一个NioEventLoop上。绑定的逻辑是:在worker线程池中挑选一个NioEventLoop,再将新channel上的读写事件注册到这个NioEventLoop内部的selector上。然后我们需要在NettyServer初始化时创建一个worker线程池,线程池中挑选NioEventLoop的逻辑可以是按顺序遍历,这样我们需要一个数据结构来保存所有NioEventLoop的引用,以及一个index来保存当前遍历到的下标。按照这些思路修改NettyServer类如下:
/*** mini-netty的服务端*/
public class NettyServer {/*** worker线程池*/private static List<NioEventLoop> eventLoops = new ArrayList<>();/*** 下一次轮询到的下标*/private static int nextIndex = 0;/*** worker线程池中线程数量*/private static final int WORKER_THREAD_NUM = 8;static {// 做一些worker线程池的初始化工作for (int i = 1; i <= WORKER_THREAD_NUM; i++) {NioEventLoop el = new NioEventLoop();el.start();eventLoops.add(el);}}/*** 获取下一个event loop*/private static NioEventLoop getNextEventLoop() {NioEventLoop nextEventLoop = eventLoops.get(nextIndex);nextIndex++;if (nextIndex > eventLoops.size() - 1) {nextIndex = 0;}return nextEventLoop;}public static void main(String[] args) throws Exception{ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();InetSocketAddress inetSocketAddress = new InetSocketAddress(13);serverSocketChannel.socket().bind(inetSocketAddress);//设置成非阻塞serverSocketChannel.configureBlocking(false);Selector selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);int num = 0;while(true) {//监听所有通道selector.select();//遍历selectionKeysSet<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();//处理连接事件if(key.isAcceptable()) { SocketChannel socketChannel = serverSocketChannel.accept();System.out.println("client:" + socketChannel.getRemoteAddress() + " connected");//设置为非阻塞socketChannel.configureBlocking(false); System.out.println("bind new channel to event loop" + nextIndex);NioEventLoop nextEventLoop = getNextEventLoop();//注册socket的READ事件到所选eventLoop的selectorsocketChannel.register(nextEventLoop.getSelector(), SelectionKey.OP_READ);num++;System.out.println(num + " clients connected");}//事件处理完毕,要记得清除iterator.remove(); }}}
}
看完上面代码的读者可能会有疑问:在NioEventLoop类的run方法中,为啥要写selector.select(10)?如果改成selector.select()行不行?之所以写成带超时时间而非一直阻塞的,是因为READ事件是在channel建立后再注册到这个selector上面的,在这之前selector没有任何channel事件注册,所以会一直阻塞。那你可能会说,我在注册完READ后再加一句selector.wakeup()把selector唤醒行不行?如果尝试一下,就会知道也不可行,因为当selector在select()方法上阻塞时,注册READ事件的register方法也会一直阻塞,无法继续向下执行。
所以selector.select(10)虽然性能不好,但是至少能保证程序正常运行。为了进一步优化性能,Netty的思路是将select的超时时间设为下一个即将执行任务的过期时间,而不是固定的值。为此Netty引入了优雅的任务系统来解决这个问题,同时进一步提升与event loop交互的灵活性。我们将在下节为mini-netty引入类似的系统。
mini-netty版本v2:任务处理
任务系统通常可把任务分为两类,一类是需要立即执行的,称为task(普通任务);另一类是指定时间在未来执行的,称为futureTask(定时任务)。这两类任务都需要在event loop的循环中得到执行。
先来考虑futureTask的实现。futureTask本身的结构比较简单,只需要用一个long变量存储执行时间,一个Runnable变量存储执行命令,再加以包装即可。重点需要考虑的是用什么数据结构来保存futureTask的集合。我们看下实际需求:需要频繁地获取或删除头节点,头节点是执行时间距今最近的futureTask;需要按时间顺序遍历所有节点。如果使用ArrayDeque来实现,需要保证存储时有序,虽然获取或删除头节点是O(1)时间复杂度,但是插入或删除任意节点是O(n)时间复杂度。更好的选择是PriorityQueue,它能保证插入或删除任意节点是O(logn)的时间复杂度。
现在我们有了底层是优先级队列的futureTaskQueue,再来考虑它是否需要同步。在futureTaskQueue上确实可能存在多线程竞争的问题,但是如何加锁却是个难题。我们知道ConcurrentHashMap通过分段锁大大提升了并发能力,1.8版本后甚至将加锁粒度细化到了哈希表的每个头节点上,但是同样的做法却难以对PriorityQueue实施,因为优先级队列的数据结构不像哈希表那样好做分段。可如果我们对futureTaskQueue整体加锁,并发性又不够好,聪明的Netty设计者肯定不会这么做。Netty的做法是:将taskQueue设计成线程安全的,futureTaskQueue设计成非线程安全的;保证只有一个线程(即event loop)能直接与futureTaskQueue交互;当需要多线程添加futureTask时,将添加futureTask本身包装成一个task添加到taskQueue中。我们用这种思路自己实现添加futureTask的方法:
/*** 添加定时任务*/public void scheduleFutureTask(FutureTask task) {if (inEventLoop()) {futureTaskQueue.offer(task);} else {final long execTime = task.getExecTime();if (execTime < nextWakeupMillis.get()) {this.addTask(() -> {scheduleFutureTask(task);});} else {this.addTask(() -> {scheduleFutureTask(task);}, false);}}} /*** 判断当前线程是否就是本event loop*/public boolean inEventLoop() {return Thread.currentThread() == this;}
在上面的代码中,先通过inEventLoop()方法判断当前线程是否就是本event loop,如果是的,那么无需考虑多线程问题,直接对futureTaskQueue添加即可;否则,将scheduleFutureTask方法本身包装成task,加入到taskQueue中,待这个task被event loop执行到时,就会将其包装的futureTask添加到futureTaskQueue中。
接下来再考虑taskQueue用何种线程安全的Queue来实现。JDK中常用的线程安全Queue有BlockingQueue和ConcurrentLinkedQueue,前者是阻塞的不适合当前场景,故排除掉;后者适合当前场景,基于链表和CAS实现,非阻塞无界,可以用。但是Netty使用了一种在高并发下性能更好的线程安全Queue,这就是JCTools提供的MpscQueue,它基于数组和CAS实现,Mpsc意思是多生产者单消费者,完美适合当前场景(多线程添加task,单线程——event loop消费task)。Netty在许多内部实现中使用了这种高效的数据结构,关于它的实现细节在此不再赘述。
现在两种任务的实现思路已定。接下来我们再回到上一节末尾提出的问题:如何使用任务系统来优化selector.select?上节提到,思路是select的超时时间取距离将来最近一个定时任务的时间间隔。但是这里有个问题,没有考虑到插任务的情况。就是说如果添加的futureTask执行时间晚于select超时时间,那么肯定没问题;但是如果执行时间早于select超时时间,或添加的是需要立即执行的task,那么我们必须用wakeup及时唤醒selector,让其不再阻塞。下面的代码包含唤醒方法的实现,以及带和不带唤醒两种情况下添加task方法的实现:
/*** 添加任务(默认立即执行)*/public void addTask(Runnable task) {this.addTask(task, true);}/*** 添加任务* * 可指定是否立即执行,若立即执行,则尝试唤醒selector*/private void addTask(Runnable task, boolean immediate) {taskQueue.offer(task);if (immediate) {wakeup(inEventLoop());}}/*** 唤醒selector*/protected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupMillis.getAndSet(AWAKE) != AWAKE) {selector.wakeup();}}
当你看到这里,可能会提出疑问:唤醒selector是否会存在多线程竞争的问题,如何避免重复唤醒的问题?非常好!对于这种轻量级的状态竞争,我们通常会使用CAS的方案。可以设计一个原子类型的变量,当它已经被设置成AWAKE状态时,表明已有其他线程唤醒了selector,这时不再重复唤醒,这也就是我们在之前的wakeup方法中所展示的代码。进一步,我们可以考虑扩展这个原子类型的变量,当selector不处于唤醒状态时,那么必然处于被阻塞状态,那么这个原子变量还可以用来存储当前等待的最近futureTask的超时时间间隔。因此我们设计一个AtomicLong类型的变量:
private static final long AWAKE = -1L;private static final long NONE = Long.MAX_VALUE;/*** 下一个唤醒时间点(毫秒)* * 可能的值有:* AWAKE 当EL处于非阻塞状态* NONE 当EL处于阻塞状态,且没有已安排的唤醒时间点* 其他值 T 当EL处于阻塞状态,且已安排好唤醒时间点T*/private final AtomicLong nextWakeupMillis = new AtomicLong(AWAKE);
为了维护这个变量,我们修改NioEventLoop类中selector.select(10)这行代码为如下代码:
//根据无定时任务、定时任务已过期、未过期三种情况做不同处理FutureTask ft = futureTaskQueue.peek();if (ft != null) {nextWakeupMillis.set(ft.getExecTime());long leftTime = ft.getExecTime() - System.currentTimeMillis();if (leftTime > 0) {selectNum = selector.select(leftTime);} else {selectNum = selector.selectNow();}} else {nextWakeupMillis.set(NONE);selectNum = selector.select();}
读者还可以再回顾下之前wakeup和scheduleFutureTask两个方法中对nextWakeupMillis的处理:
/*** 唤醒selector*/protected void wakeup(boolean inEventLoop) {if (!inEventLoop && nextWakeupMillis.getAndSet(AWAKE) != AWAKE) {selector.wakeup();}}/*** 添加定时任务*/public void scheduleFutureTask(FutureTask task) {if (inEventLoop()) {futureTaskQueue.offer(task);} else {final long execTime = task.getExecTime();if (execTime < nextWakeupMillis.get()) {this.addTask(() -> {scheduleFutureTask(task);});} else {this.addTask(() -> {scheduleFutureTask(task);}, false);}}}
至此我们为mini-netty完成了任务系统的添加。
总结
这篇文章带着大家一步步从最简单最原始的BIO,进化到一个Netty的雏形(包括Reactor线程模型和任务系统)。主要想带给大家这样一个逐步深入的过程和设计思路的取舍。再复杂的系统,只要自己实际设计过一遍,就能懂得作者的核心思路所在,再看源码就容易多了。当然Netty的内容远不止本文提到的这些,如果以后有时间,笔者会撰写进一步的后续文章,敬请期待。
本文链接:https://www.ngui.cc/article/show-841445.html