系列文章:了解 Reactor 模式,就要先从事件驱动的开发方式说起。 我们知道,服务器开发,CPU 的处理速度远高于 IO 速度,为了避免 CPU 因为 IO 为阻塞,好一点的方法是多进程或线程处理,但这会带来一些进程切换的开销。 这时先驱者找到了事件驱动,或者叫回调的方法。这种方式就是,应用向一个中间人注册一个回调(Event handler),当 IO 就绪后,这个中间人产生一个时间,并通知此 handler 进行处理。这种回调的方式,也闲了"好莱坞原则" - "Don't call us, we'll call you." 那在 IO 就绪这个事件后,谁来充当这个中间人?Reactor 模式的答案是:有一个不断等待和循环的单独进程(线程)来做这件事,它接受所有 handler 的注册,并负责先操作系统个查询 IO 是否就绪,在就绪后用指定的 handler 进行处理,这个角色的名称就叫做 Reactor。 Reactor 与 NIO NIO 中 Reactor 的核心是 selector,一个简单的 Reactor 示例,一个核心的 Reactor 的循环,这种循环结构又叫做 EventLoop。 结合 NIO 服务端创建时序图 & 实际代码进行解说: public class Reactor implements Runnable { public final Selector selector; public final ServerSocketChannel server; /** * 创建了 ServerSocketChannel 对象,并调用 configureBlocking() 方法,配置为非阻塞模式 * 把通道绑定到制定端口,向 Selector 注册事件,并指定参数 OP_ACCEPT,即监听 accept 事件 */ public Reactor(int port) throws IOException { // 创建Selector对象 selector = Selector.open(); // 创建可选择通道,并配置为非阻塞模式 server = ServerSocketChannel.open(); server.configureBlocking(false); // 绑定通道到指定端口 ServerSocket socket = server.socket(); InetSocketAddress address = new InetSocketAddress(port); socket.bind(address); /** * 为了将Channel和Selector配合使用,必须将channel注册到selector上。 * 通过SelectableChannel.register()方法来实现 */ // 向 Selector 注册该 channel SelectionKey selectionKey = server.register(selector, Selection.OP_ACCEPT); /** * selectionKey.attach(theObject); 可以将一个对象或更多信息附着到 SelectionKey上, * Object attachedObj = selectionKey.attachment(); 可以从 SelectionKey 获取附着的信息。 */ // 利用 selectionKey 的 attach 功能绑定 Acceptor,如果有事件,触发 Acceptor selectionKey.attach(new Acceptor(this)); } /** * Selector 开始监听 ,进入内部循环。在非阻塞 IO 中,内部循环模式都是遵循这种方式。 * 首先调用 select() 方法,该方法会阻塞,直到至少有一个事件发生, * 然后使用 selectedKeys() 方法获取发生事件的 SelectionKey,然后使用迭代器进行循环 */ @Override public void run() { try { while (!Thread.interrupted()) { // 该调用会阻塞,直到至少有一个事件发生 selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); dispatch(key); } selected.clear(); } } catch (IOException ex) { /* ... */ } } /** * 运行 Acceptor */ void dispatch(SelectionKey key) { Acceptor acceptor = (Acceptor) key.attachment(); Runnable r = (Runnable)(acceptor ); if (r != null) { r.run(); } } } public class Acceptor implements Runnable { private Reactor reactor; public Acceptor(Reactor reactor) { this.reactor=reactor; } /** * 接收请求 */ @Override public void run() { try { ServerSocketChannel server = reactor.server; SocketChannel channel = server.accept(); if(channel != null) { // 调用 Handler 来处理 channel new SocketReadHandler(reactor.selector, channel); } } catch (IOException e) { /* ... */ } } } public class SocketReadHandler implements Runnable { private Selector selector; private SocketChannel channel; public SocketReadHandler(Selector selector, SocketChannel channel) throws IOException { this.selector = selector; this.channel = channel; channel.configureBlocking(false); /** * 将新接入的客户端连接注册到 Reactor 线程的多路复用器上 * 监听读操作位,用来读取客户端发送的网络消息 */ SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_READ); // 将 SelectionKey 绑定为本 Handler 有事件触发时,将调用本类的 run 方法。 selectionKey.attach(this); } /** * 处理读取客户端发来的信息的事件 */ @Override public void run() { // 创建读取的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); try { int count = channel.read(buffer); if (count > 0) { buffer.flip(): CharBuffer charBuffer = decoder.decode(buffer); String msg = charBuffer.toString(); // ... SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_WRITE); selectionKey.attach(name); } } catch (IOException e) { /* ... */ } buffer.clear(); } } 从一个通道里读数据,直到所有的数据都读到缓冲区里。 Reactor 与 Netty Reactor 模式有多个变种,Netty 基于 Multiple Reactors 模式做了一定的修改,Mutilple Reactors 模式有多个 reactor:mainReactor 和 subReactor,其中 mainReactor 只有一个,负责响应 client 的连接请求,并建立连接,它使用 NIO Selector;subReactor 可以有一个或多个,每个 subReactor 都会在一个独立线程中执行,并且维护一个独立的 NIO Selector。 这是因为 subReactor 会执行一个比较耗时的 IO 操作,例如消息的读写,使用个多个线程去执行,则更加有利于发挥 CPU 的运算能力,减少 IO 等待时间。 Netty 的线程模型基于 Multiple Reactors 模式,借用了 mainReactor 和 subReactor 结构,从代码来看,它并没有 Thread Pool。Netty 的 subReactor 与 worker thread 是用一个线程,采用 IO 多路复用机制,可以使一个 subReactor 监听并处理多个 channel 的 IO 请求。 其中 parentGroup 和 childGroup 是 Bootstrap 构建方法中传入的两个对象,这两个 group 均是线程池,childGroup 线程池会被各个 subReactor 充分利用,parentGroup 线程池则只是在 bind 某个端口后,获得其中一个线程作为 mainReactor。 Netty 里对应 mainReactor 的角色叫做 "Boss",而对应 subReactor 的角色叫做 "Worker"。Boss 负责分配请求,Worker 负责执行。在 Netty 4.0 之后,NioEventLoop 是 Netty NIO 部分的核心。 Reactor 与 Kafka /** * An NIO socket server. The threading model is * 1 Acceptor thread that handles new connections * Acceptor has N Processor threads that each have their own selector and read requests from sockets * M Handler threads that handle requests and produce responses back to the processor threads for writing. */ class SocketServer(val host: String, val port: Int, val processorBeginIndex: Int, val numProcessorThreads: Int, val totalProcessorThreads: Int, val time: Time, val metrics: Metrics) extends Logging { private val processors = new Array[Processor](totalProcessorThreads) /** * Start the socket server */ def startup() { this.synchronized { new Acceptor(host, port, processorBeginIndex, numProcessorThreads, processors, time, metrics) } } } /** * Thread that accepts and configures new connections. There is only need for one of these */ private class Acceptor(val host: String, private val port: Int, val processorBeginIndex: Int, numProcessorThreads: Int, processors: Array[Processor], val time: Time, val metrics: Metrics) extends Runnable { val nioSelector = java.nio.channels.Selector.open() val serverChannel = openServerSocket(host, port) val processorEndIndex = processorBeginIndex + numProcessorThreads this.synchronized { for (i <- processorBeginIndex until processorEndIndex) { processors(i) = new Processor(time, metrics) } } /* * Create a server socket to listen for connections on. */ def openServerSocket(host: String, port: Int): ServerSocketChannel = { val serverChannel = ServerSocketChannel.open() serverChannel.configureBlocking(false) val socketAddress = if (host == null || host.trim.isEmpty) new InetSocketAddress(port) else new InetSocketAddress(host, port) try { serverChannel.socket.bind(socketAddress) } catch { case e: SocketException => throw new Exception("Socket server failed to bind.") } serverChannel } /** * Accept loop that checks for new connection attempts */ def run() { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) val currentProcessor = processorBeginIndex val ready = nioSelector.select() if (ready > 0) { val keys = nioSelector.selectedKeys() val iterator = keys.iterator() while (iterator.hasNext) { var key: SelectionKey = null try { key = iterator.next() iterator.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") } catch { case e: Throwable => error("Error while accepting connection") } } } } /* * Accept a new connection */ def accept(key: SelectionKey, processor: Processor): Unit = { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() socketChannel.configureBlocking(false) socketChannel.socket().setTcpNoDelay(true) socketChannel.socket().setKeepAlive(true) processor.accept(socketChannel) } } /** * Thread that processes all requests from a single connection. There are N of these running in parallel * each of which has its own selectors */ private class Processor(val time: Time, val metrics: Metrics) extends Runnable { private val metricTags = new util.HashMap[String, String]() private val selector = new org.apache.kafka.common.network.Selector( metrics, time, "socket-server", metricTags) def run() { while (!Thread.interrupted()) { try { selector.poll(300) } catch { case e@(_: IllegalStateException | _: IOException) => { throw e } } } } /** * Queue up a new connection for reading */ def accept(socketChannel: SocketChannel) { selector.wakeup() } } 转载请并标注: “本文转载自 linkedkeeper.com ” ©著作权归作者所有 |