系列文章:
• 源码解读 Channel与Pipeline
• 源码解析之ServerSocketChannel
• 源码解读 ServerBootstrap
• 源码解读 EventLoop
阅读须知
Netty版本:4.1.14.Final
文章中使用/**/注释的方法会做深入分析
正文
我们在分析Bootstrap引导启动的源码时看到了Channel的注册,但是并没有做深入分析,本篇文章我们就来看一下Channel的注册流程。Channel的注册委托给了Channel内部的Unsafe来完成,我们来看实现:
AbstractChannel.AbstractUnsafe:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
// 已注册设置异常
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
// EventLoop与此Channel类型不相容设置异常
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise); /* 注册 */
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise); /* 注册 */
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
// 强制关闭Channel,关闭操作我们在分析bind流程时分析过
closeForcibly();
closeFuture.setClosed();
// 异常将指定的promise标记为失败
safeSetFailure(promise, t);
}
}
}
AbstractChannel.AbstractUnsafe:
private void register0(ChannelPromise promise) {
try {
// 检查Channel是否仍处于打开状态,因为它可能在注册调用超出eventLoop的同时关闭
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister(); /* 子类实现注册逻辑 */
neverRegistered = false;
registered = true;
// 确保在我们实际通知promise之前调用handlerAdded()
// 这是必需的,因为用户可能已经通过ChannelFutureListener中的pipeline触发事件
pipeline.invokeHandlerAddedIfNeeded();
// 将指定的promise标记为成功
safeSetSuccess(promise);
// 注册后调用ChannelInboundHandler的channelRegistered方法
pipeline.fireChannelRegistered();
// 如果Channel从未注册过,触发channelActive
// 如果Channel被注销并重新注册,这可以防止触发多次channelActive
if (isActive()) {
if (firstRegistration) {
// 第一次注册触发ChannelInboundHandler的channelActive方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// 此channel在之前注册并设置了autoRead()
// 这意味着我们需要再次开始读操作,以便我们处理入站数据
/*
* 计划一个读操作,用于填充ChannelPipeline中第一个ChannelInboundHandler的入站缓冲区
* 如果已经有待处理的读操作,则此方法不执行任何操作
*/
beginRead();
}
}
} catch (Throwable t) {
// 直接关闭Channel以避免FD泄漏
closeForcibly();
closeFuture.setClosed();
// 异常将指定的promise标记为失败
safeSetFailure(promise, t);
}
}
AbstractNioChannel:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 用给定的Selector注册此Channel,返回selectionKey
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// 强制Selector现在select,因为“canceled”的SelectionKey可能仍然被缓存并且不会被移除
// 因为尚未调用Select.select()操作
eventLoop().selectNow();
selected = true;
} else {
// 我们在选择器之前强制了一个选择操作
// 但SelectionKey仍然因任何原因被缓存。JDK的bug?
throw e;
}
}
}
}
这里我们看到了JDK的Channel注册过程。
AbstractChannel.AbstractUnsafe:
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead(); /* 计划一个读取操作 */
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
// 异常调用ChannelHandler的exceptionCaught方法
pipeline.fireExceptionCaught(e);
}
});
// 关闭Channel,在分析bind流程时已经分析过
close(voidPromise());
}
}
AbstractNioMessageChannel:
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
super.doBeginRead();
}
准备处理读操作之前需要设置网络操作位为读:
AbstractNioChannel:
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 将SelectionKey当前的操作位与读操作位进行按位与操作,如果等于0
// 说明目前并没有设置读操作位
if ((interestOps & readInterestOp) == 0) {
// 设置读操作位
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这里对于读操作位的判断和修改与JDK NIO SelectionKey的相关方法实现是等价的。
到这里,Channel的register流程就完成了。
本文受原创保护,未经作者授权,禁止转载。
linkedkeeper.com (文/张强) ©著作权归作者所有
|