Reactor / Dispatcher 设计模式
您目前处于:编程  2015年09月01日

了解 Reactor 模式,就要先从事件驱动的开发方式说起。

我们知道,服务器开发,CPU 的处理速度远高于 IO 速度,为了避免 CPU 因为 IO 为阻塞,好一点的方法是多进程或线程处理,但这会带来一些进程切换的开销。

这时先驱者找到了事件驱动,或者叫回调的方法。这种方式就是,应用向一个中间人注册一个回调(Event handler),当 IO 就绪后,这个中间人产生一个时间,并通知此 handler 进行处理。这种回调的方式,也闲了"好莱坞原则" - "Don't call us, we'll call you."

那在 IO 就绪这个事件后,谁来充当这个中间人?Reactor 模式的答案是:有一个不断等待和循环的单独进程(线程)来做这件事,它接受所有 handler 的注册,并负责先操作系统个查询 IO 是否就绪,在就绪后用指定的 handler 进行处理,这个角色的名称就叫做 Reactor。

Reactor 与 NIO


NIO 中 Reactor 的核心是 selector,一个简单的 Reactor 示例,一个核心的 Reactor 的循环,这种循环结构又叫做 EventLoop。


结合 NIO 服务端创建时序图 & 实际代码进行解说:

public class Reactor implements Runnable {
    public final Selector selector;
    public final ServerSocketChannel server;

    /**
     * 创建了 ServerSocketChannel 对象,并调用 configureBlocking() 方法,配置为非阻塞模式
     * 把通道绑定到制定端口,向 Selector 注册事件,并指定参数 OP_ACCEPT,即监听 accept 事件
     */
    public Reactor(int port) throws IOException {
        // 创建Selector对象  
        selector = Selector.open();  
       
        // 创建可选择通道,并配置为非阻塞模式  
        server = ServerSocketChannel.open();  
        server.configureBlocking(false);  
       
        // 绑定通道到指定端口  
        ServerSocket socket = server.socket();  
        InetSocketAddress address = new InetSocketAddress(port);  
        socket.bind(address); 
 
        /**
          * 为了将Channel和Selector配合使用,必须将channel注册到selector上。
          * 通过SelectableChannel.register()方法来实现
          */
        // 向 Selector 注册该 channel
        SelectionKey selectionKey = server.register(selector, Selection.OP_ACCEPT);
        /**
         * selectionKey.attach(theObject); 可以将一个对象或更多信息附着到 SelectionKey上,
         * Object attachedObj = selectionKey.attachment();  可以从 SelectionKey 获取附着的信息。
         */
        // 利用 selectionKey 的 attach 功能绑定 Acceptor,如果有事件,触发 Acceptor
        selectionKey.attach(new Acceptor(this));
    }

    /**
     * Selector 开始监听 ,进入内部循环。在非阻塞 IO 中,内部循环模式都是遵循这种方式。
     * 首先调用 select() 方法,该方法会阻塞,直到至少有一个事件发生,
     * 然后使用 selectedKeys() 方法获取发生事件的 SelectionKey,然后使用迭代器进行循环
     */
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // 该调用会阻塞,直到至少有一个事件发生  
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    SelectionKey key = (SelectionKey) it.next(); 
                    dispatch(key);
                }
                selected.clear();
            }
        } catch (IOException ex) {
            /* ... */
        }
    }

    /** 
     * 运行 Acceptor
     */  
    void dispatch(SelectionKey key) {  
        Acceptor acceptor = (Acceptor) key.attachment();
        Runnable r = (Runnable)(acceptor );    
        if (r != null) {    
            r.run();  
        }    
    } 
}
public class Acceptor implements Runnable {  
    private Reactor reactor;  
    public Acceptor(Reactor reactor) {  
        this.reactor=reactor;  
    }  

    /**
     * 接收请求
     */
    @Override  
    public void run() {  
        try {  
            ServerSocketChannel server = reactor.server;  
            SocketChannel channel = server.accept(); 
            if(channel != null) { 
                // 调用 Handler 来处理 channel  
                new SocketReadHandler(reactor.selector, channel);  
            }
        } catch (IOException e) {  
            /* ... */
        }  
    }  
}
public class SocketReadHandler implements Runnable {  
    private Selector selector;
    private SocketChannel channel;  

    public SocketReadHandler(Selector selector, SocketChannel channel) throws IOException {  
        this.selector = selector;
        this.channel = channel;  

        channel.configureBlocking(false);         
   
        /**
          * 将新接入的客户端连接注册到 Reactor 线程的多路复用器上
          * 监听读操作位,用来读取客户端发送的网络消息
          */
        SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_READ);            
        // 将 SelectionKey 绑定为本 Handler 有事件触发时,将调用本类的 run 方法。    
        selectionKey.attach(this); 
    }  
      
    /** 
     * 处理读取客户端发来的信息的事件
     */  
    @Override  
    public void run() {
        // 创建读取的缓冲区  
        ByteBuffer buffer = ByteBuffer.allocate(1024);  
        try {  
            int count = channel.read(buffer);  
            if (count > 0) {
                buffer.flip():
                CharBuffer charBuffer = decoder.decode(buffer);
                String msg = charBuffer.toString();
                
                // ...             

                SelectionKey selectionKey = channel.register(selector, SelectionKey.OP_WRITE);
                selectionKey.attach(name);
            }   
        } catch (IOException e) {  
            /* ... */
        }  
        buffer.clear();  
    }  
}

从一个通道里读数据,直到所有的数据都读到缓冲区里。

Reactor 与 Netty

Reactor 模式有多个变种,Netty 基于 Multiple Reactors 模式做了一定的修改,Mutilple Reactors 模式有多个 reactor:mainReactor 和 subReactor,其中 mainReactor 只有一个,负责响应 client 的连接请求,并建立连接,它使用 NIO Selector;subReactor 可以有一个或多个,每个 subReactor 都会在一个独立线程中执行,并且维护一个独立的 NIO Selector。

这是因为 subReactor 会执行一个比较耗时的 IO 操作,例如消息的读写,使用个多个线程去执行,则更加有利于发挥 CPU 的运算能力,减少 IO 等待时间。

Netty 的线程模型基于 Multiple Reactors 模式,借用了 mainReactor 和 subReactor 结构,从代码来看,它并没有 Thread Pool。Netty 的 subReactor 与 worker thread 是用一个线程,采用 IO 多路复用机制,可以使一个 subReactor 监听并处理多个 channel 的 IO 请求。

其中 parentGroup 和 childGroup 是 Bootstrap 构建方法中传入的两个对象,这两个 group 均是线程池,childGroup 线程池会被各个 subReactor 充分利用,parentGroup 线程池则只是在 bind 某个端口后,获得其中一个线程作为 mainReactor。

Netty 里对应 mainReactor 的角色叫做 "Boss",而对应 subReactor 的角色叫做 "Worker"。Boss 负责分配请求,Worker 负责执行。在 Netty 4.0 之后,NioEventLoop 是 Netty NIO 部分的核心。

Reactor 与 Kafka

/**
 * An NIO socket server. The threading model is
 * 1 Acceptor thread that handles new connections
 * Acceptor has N Processor threads that each have their own selector and read requests from sockets
 * M Handler threads that handle requests and produce responses back to the processor threads for writing.
 */
class SocketServer(val host: String,
                   val port: Int,
                   val processorBeginIndex: Int,
                   val numProcessorThreads: Int,
                   val totalProcessorThreads: Int,
                   val time: Time,
                   val metrics: Metrics) extends Logging {
 
  private val processors = new Array[Processor](totalProcessorThreads)
 
  /**
   * Start the socket server
   */
  def startup() {
    this.synchronized {
      new Acceptor(host, port, processorBeginIndex, numProcessorThreads, processors, time, metrics)
    }
  }
}
 
/**
 * Thread that accepts and configures new connections. There is only need for one of these
 */
private class Acceptor(val host: String,
                       private val port: Int,
                       val processorBeginIndex: Int,
                       numProcessorThreads: Int,
                       processors: Array[Processor],
                       val time: Time,
                       val metrics: Metrics) extends Runnable {
  val nioSelector = java.nio.channels.Selector.open()
  val serverChannel = openServerSocket(host, port)
 
  val processorEndIndex = processorBeginIndex + numProcessorThreads
  this.synchronized {
    for (i <- processorBeginIndex until processorEndIndex) {
      processors(i) = new Processor(time, metrics)
    }
  }
 
  /*
   * Create a server socket to listen for connections on.
   */
  def openServerSocket(host: String, port: Int): ServerSocketChannel = {
    val serverChannel = ServerSocketChannel.open()
    serverChannel.configureBlocking(false)
 
    val socketAddress =
      if (host == null || host.trim.isEmpty)
        new InetSocketAddress(port)
      else
        new InetSocketAddress(host, port)
 
    try {
      serverChannel.socket.bind(socketAddress)
    } catch {
      case e: SocketException =>
        throw new Exception("Socket server failed to bind.")
    }
    serverChannel
  }
 
  /**
   * Accept loop that checks for new connection attempts
   */
  def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
 
    val currentProcessor = processorBeginIndex
    val ready = nioSelector.select()
 
    if (ready > 0) {
      val keys = nioSelector.selectedKeys()
      val iterator = keys.iterator()
      while (iterator.hasNext) {
        var key: SelectionKey = null
        try {
          key = iterator.next()
          iterator.remove()
          if (key.isAcceptable)
            accept(key, processors(currentProcessor))
          else
            throw new IllegalStateException("Unrecognized key state for acceptor thread.")
        } catch {
          case e: Throwable => error("Error while accepting connection")
        }
      }
    }
  }
 
  /*
   * Accept a new connection
   */
  def accept(key: SelectionKey, processor: Processor): Unit = {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
 
    socketChannel.configureBlocking(false)
    socketChannel.socket().setTcpNoDelay(true)
    socketChannel.socket().setKeepAlive(true)
 
    processor.accept(socketChannel)
  }
}
 
/**
 * Thread that processes all requests from a single connection. There are N of these running in parallel
 * each of which has its own selectors
 */
private class Processor(val time: Time,
                        val metrics: Metrics) extends Runnable {
 
  private val metricTags = new util.HashMap[String, String]()
 
  private val selector = new org.apache.kafka.common.network.Selector(
    metrics,
    time,
    "socket-server",
    metricTags)
 
  def run() {
    while (!Thread.interrupted()) {
      try {
        selector.poll(300)
      } catch {
        case e@(_: IllegalStateException | _: IOException) => {
          throw e
        }
      }
    }
  }
 
  /**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    selector.wakeup()
  }
}

转载请并标注: “本文转载自 linkedkeeper.com ”