您目前处于:笔记
2017-09-06
|
序 最近,我发现一些BlockOperationException异常出现在我的Netty4项目中,为什么会出现这个异常?有人说,在Netty的ServerBootstrap启动服务器的时候,使用sync()或await()方法会造成死锁,可我发现异常是出现在ChannelRead过程中,而且Bootstrap用的是bossGroup,而ChannelRead用的是workerGroup,两者使用的EventLoop应该是不用的,我认为是不会互相影响的,那究竟是什么原因产生思索异常呢? 我将这个问题发布到了StackOverflow进行提问(https://stackoverflow.com/questions/46020266/what-causes-blockingoperationexception-in-netty-4),非常幸运的得到了Norman Maurer(Netty的核心贡献者之一)的解答。 下面我将整个问题的分析思路整理出来,与大家分享。 正文 在使用Netty的ServerBootstrap启动服务器的时候,使用sync()方法会导致阻塞。 public void init() throws Exception { logger.info("start tcp server ..."); Class clazz = NioServerSocketChannel.class; // Server 服务启动 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boosGroup, workerGroup); bootstrap.channel(clazz); bootstrap.childHandler(new ServerChannelInitializer(serverConfig)); // 可选参数 bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024); // 绑定接口,同步等待成功 logger.info("start tcp server at port[" + port + "]."); ChannelFuture future = bootstrap.bind(port).sync(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { logger.info("Server have success bind to " + port); } else { logger.error("Server fail bind to " + port); throw new InitErrorException("Server start fail !", future.cause()); } } }); } 在这一行 ChannelFuture future = bootstrap.bind(port).sync(); bootstrap.bind()返回一个ChannelFuture,查看源代码DefaultChannelGroupFuture,sync()方法会调用await(),在await()中对ChannelFuture对象进行了加锁。 public Promise<V> sync() throws InterruptedException { await(); rethrowIfFailed(); // 异步操作失败抛出异常 return this; } sync()和await()很类似。 public Promise<V> await() throws InterruptedException { // 异步操作已经完成,直接返回 if (isDone()) { return this; } if (Thread.interrupted()) { throw new InterruptedException(toString()); } // 同步使修改waiters的线程只有一个 synchronized (this) { // 未完成则一直循环 while (!isDone()) { // 等待直到异步操作完成 // 死锁检测 checkDeadLock(); incWaiters(); // ++waiters; try { wait(); // JDK方法 } finally { decWaiters(); // --waiters } } } return this; } 注意其中的checkDeadLock()方法用来进行死锁检测: protected void checkDeadLock() { EventExecutor e = executor(); if (e != null && e.inEventLoop()) { throw new BlockingOperationException(toString()); } } e.inEventLoop()表示当前线程和executor的执行线程是同一个,即该线程上的一个任务等待该线程上的其他任务唤醒自己。我们知道线程的执行是线性,即前面的代码执行完毕才能执行后面的代码,因此这里产生了一个死锁。 在ChannelHandler方法中调用sync()或await()方法,会有可能引起死锁,而在实践中也偶发出现BlockingOperationException死锁异常: io.netty.util.concurrent.BlockingOperationException:AbstractChannel$CloseFuture(incomplete) at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:391) at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157) at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252) at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129) at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:28) at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:219) at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:117) at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:28) 那么什么样的代码会产生这种情况呢,下面给出项目中出现死锁的代码: private void channelWrite(T message) { boolean success = true; boolean sent = true; int timeout = 60; try { ChannelFuture cf = cxt.write(message); cxt.flush(); if (sent) { success = cf.await(timeout); } if (cf.isSuccess()) { logger.debug("send success."); } Throwable cause = cf.cause(); if (cause != null) { this.fireError(new PushException(cause)); } } catch (LostConnectException e) { this.fireError(new PushException(e)); } catch (Exception e) { this.fireError(new PushException(e)); } catch (Throwable e) { this.fireError(new PushException("Failed to send message“, e)); } if (!success) { this.fireError(new PushException("Failed to send message")); } } 在这一行 ChannelFuture cf = cxt.write(message); cxt.flush(); write方法只是把发送数据放入一个缓存,而不会真实的发送,而flush则是将放入缓存的数据发送数据,如果不flush会发生什么情况呢?当前线程会进入wait(),而分发送数据的代码没有被执行,因为发送数据的方法也是在当前线程中执行,这样死锁就产生了。 实践中,使用了writeAndFlush方法,仍会小概率的出现死锁异常,这又是为何?同时存在几个疑惑和猜测:
Netty 的线程模型: EventLoopGroup负责分配一个EventLoop到每个新创建的Channel。每个EventLoop处理绑定Channel的所有event和task。每个EventLoop和一个线程关联。同一个EventLoop可能会被分配到多个Channel。 在Netty4,所有的I/O操作和event都是由分配给EventLoop的那一个Thread来处理的。 让我们再来分析一下cxt.write(message)的源代码: private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeWrite(msg, promise); if (flush) { next.invokeFlush(); } } else { int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); if (buffer != null) { buffer.incrementPendingOutboundBytes(size); } } Runnable task; if (flush) { task = WriteAndFlushTask.newInstance(next, msg, size, promise); } else { task = WriteTask.newInstance(next, msg, size, promise); } safeExecute(executor, task, promise, msg); } } public EventExecutor executor() { if (executor == null) { return channel().eventLoop(); } else { return executor; } } private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } } 注意这行: EventExecutor executor = next.executor(); ,获取当前channel所绑定的eventLoop。如果当前调用线程就是分配给该Channel的EventLoop,代码被执行。否则,EventLoop将task放入一个内部的队列延后执行。 EventLoop和EventExecutor什么关系? public interface EventLoop extends EventExecutor, EventLoopGroup { @Override EventLoopGroup parent(); } 所以,我们大致分析出,在执行write方法时,Netty会判断current thread是否就是分给该Channe的EventLoop,如果是则行线程执行IO操作,否则提交executor等待分配。当执行await方法时,会从executor里fetch出执行线程,这里就需要checkDeadLock,判断执行线程和current threads是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException。 那如何解决?官方建议优先使用addListener(GenericFutureListener),而非await()。 // BAD - NEVER DO THIS @Override public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) { ChannelFuture future = ctx.channel().close(); future.awaitUninterruptibly(); // Perform post-closure operation // ... } // GOOD @Override public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) { ChannelFuture future = ctx.channel().close(); future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { // Perform post-closure operation // ... } }); } 项目代码改造为: private void pushMessage(T message) { try { ChannelFuture cf = cxt.writeAndFlush(message); cf.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws PushException { if (future.isSuccess()) { logger.debug("send success."); } else { throw new PushException("Failed to send message."); } Throwable cause = future.cause(); if (cause != null) { throw new PushException(cause); } } }); } catch (LostConnectException e) { this.fireError(new PushException(e)); } catch (Exception e) { this.fireError(new PushException(e)); } catch (Throwable e) { this.fireError(new PushException(e)); } } Reference: https://stackoverflow.com/questions/44390660/how-netty-channelfuture-notify-does-not-cause-dead-lock https://stackoverflow.com/questions/29161129/netty-blockingoperationexception https://www.oschina.net/question/3524591_2244242 http://www.jianshu.com/p/a06da3256f0c http://www.jianshu.com/p/4835eb4e91ab http://blog.csdn.net/gaolong/article/details/12805581 http://blog.csdn.net/youaremoon/article/details/50279965 http://blog.csdn.net/youaremoon/article/details/50282353 http://www.linkedkeeper.com/detail/blog.action?bid=137 转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)” ©著作权归作者所有 |