分析 Netty 死锁异常 BlockingOperationException
您目前处于:笔记  2017-09-06

最近,我发现一些BlockOperationException异常出现在我的Netty4项目中,为什么会出现这个异常?有人说,在Netty的ServerBootstrap启动服务器的时候,使用sync()或await()方法会造成死锁,可我发现异常是出现在ChannelRead过程中,而且Bootstrap用的是bossGroup,而ChannelRead用的是workerGroup,两者使用的EventLoop应该是不用的,我认为是不会互相影响的,那究竟是什么原因产生思索异常呢?

我将这个问题发布到了StackOverflow进行提问(https://stackoverflow.com/questions/46020266/what-causes-blockingoperationexception-in-netty-4),非常幸运的得到了Norman Maurer(Netty的核心贡献者之一)的解答。

下面我将整个问题的分析思路整理出来,与大家分享。

正文

在使用Netty的ServerBootstrap启动服务器的时候,使用sync()方法会导致阻塞。

public void init() throws Exception {
    logger.info("start tcp server ...");
    Class clazz = NioServerSocketChannel.class;
    // Server 服务启动
    ServerBootstrap bootstrap = new ServerBootstrap();
    bootstrap.group(boosGroup, workerGroup);
    bootstrap.channel(clazz);
    bootstrap.childHandler(new ServerChannelInitializer(serverConfig));
    // 可选参数
    bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
    // 绑定接口,同步等待成功
    logger.info("start tcp server at port[" + port + "].");
    ChannelFuture future = bootstrap.bind(port).sync();
    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                logger.info("Server have success bind to " + port);
            } else {
                logger.error("Server fail bind to " + port);
                throw new InitErrorException("Server start fail !", future.cause());
            }
        }
    });
}

在这一行

ChannelFuture future = bootstrap.bind(port).sync();

bootstrap.bind()返回一个ChannelFuture,查看源代码DefaultChannelGroupFuture,sync()方法会调用await(),在await()中对ChannelFuture对象进行了加锁。

public Promise<V> sync() throws InterruptedException {
    await();
    rethrowIfFailed(); // 异步操作失败抛出异常
    return this;
}

sync()和await()很类似。

public Promise<V> await() throws InterruptedException {
    // 异步操作已经完成,直接返回
    if (isDone()) {
        return this;    
    }
    if (Thread.interrupted()) {
        throw new InterruptedException(toString());
    }
    // 同步使修改waiters的线程只有一个
    synchronized (this) {
        // 未完成则一直循环  
        while (!isDone()) { // 等待直到异步操作完成
            // 死锁检测
            checkDeadLock();
            incWaiters();   // ++waiters;
            try {
                wait(); // JDK方法
            } finally {
                decWaiters(); // --waiters
            }
        }
    }
    return this;
}

注意其中的checkDeadLock()方法用来进行死锁检测:

protected void checkDeadLock() {
    EventExecutor e = executor();
    if (e != null && e.inEventLoop()) {
        throw new BlockingOperationException(toString());
    }
}

e.inEventLoop()表示当前线程和executor的执行线程是同一个,即该线程上的一个任务等待该线程上的其他任务唤醒自己。我们知道线程的执行是线性,即前面的代码执行完毕才能执行后面的代码,因此这里产生了一个死锁。

在ChannelHandler方法中调用sync()或await()方法,会有可能引起死锁,而在实践中也偶发出现BlockingOperationException死锁异常:

io.netty.util.concurrent.BlockingOperationException:AbstractChannel$CloseFuture(incomplete)
    at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:391)
    at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157)
    at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:252)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:129)
    at io.netty.channel.DefaultChannelPromise.await(DefaultChannelPromise.java:28)
    at io.netty.util.concurrent.DefaultPromise.sync(DefaultPromise.java:219)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:117)
    at io.netty.channel.DefaultChannelPromise.sync(DefaultChannelPromise.java:28)

那么什么样的代码会产生这种情况呢,下面给出项目中出现死锁的代码:

private void channelWrite(T message) {
    boolean success = true;
    boolean sent = true;
    int timeout = 60;
    try {
        ChannelFuture cf = cxt.write(message);
        cxt.flush();
        if (sent) {
            success = cf.await(timeout);
        }
        if (cf.isSuccess()) {
            logger.debug("send success.");
        }
        Throwable cause = cf.cause();
        if (cause != null) {
            this.fireError(new PushException(cause));
        }
    } catch (LostConnectException e) {
        this.fireError(new PushException(e));
    } catch (Exception e) {
        this.fireError(new PushException(e));
    } catch (Throwable e) {
        this.fireError(new PushException("Failed to send message“, e));
    }
    if (!success) {
        this.fireError(new PushException("Failed to send message"));
    }
}

在这一行

ChannelFuture cf = cxt.write(message);
cxt.flush();

write方法只是把发送数据放入一个缓存,而不会真实的发送,而flush则是将放入缓存的数据发送数据,如果不flush会发生什么情况呢?当前线程会进入wait(),而分发送数据的代码没有被执行,因为发送数据的方法也是在当前线程中执行,这样死锁就产生了。

实践中,使用了writeAndFlush方法,仍会小概率的出现死锁异常,这又是为何?同时存在几个疑惑和猜测:

  • ServerBootstrap启动使用EventLoopGroup是bossGroup,而ChannelHandler进行IO读写使用EventLoopGroup是workerGroup,两者的EventExecutor应该是互不影响的。

    而且,实践中,死锁异常也并不出现在Netty启动过程中,多发生在Channel的writeAndFlush后await,所以推断sync方法虽然会调用await方法,但并不是引起死锁的主因。

  • 源码分析checkDeadLock,BlockingOperationException异常抛出,是当前线程和executor的执行线程是同一个。在GIthub/Netty查看了BlockingOperationException的问题,同样说不能在同一个IO线程里调用sync()或await()。

  • e.inEventLoop()为true则会抛BlockingOperationException异常,checkDeadLock方法之后就会进入await方法,即current thread进入休眠等待notify或interrupt,如果notify或interrupt的thread是current thread,那么current thread将出现死锁,无法被唤醒。

  • 谁会notify或interrupt唤起current thread?

    current thread调用writeAndFlush方法发送数据并返回一个ChannelFuture,并调用ChannelFuture的await方法进入休眠等待,当发送isDone()时,executor线程池fetch一个线程去唤醒notify event)current thread,但如果fetch到current thread就会出现死锁,因为当前线程已经休眠

    我们回忆下wait和notify,调用某个对象的wait()方法能让当前线程阻塞,调用该对象的notify()方法能够唤醒这个个正在等待的线程。wait()、notify()实现的是线程间的协作。

  • 可checkDeadLock是在wait之前进行检测的,那么Netty在current thread进入休眠之前,就应该已经fetch出唤醒thread。可猜测BlockingOperationException异常,可能是executor在fetch线程时将current thread作为notify thread而进行的自我容错检测。


Netty 的线程模型:

EventLoopGroup负责分配一个EventLoop到每个新创建的Channel。每个EventLoop处理绑定Channel的所有event和task。每个EventLoop和一个线程关联。同一个EventLoop可能会被分配到多个Channel。

在Netty4,所有的I/O操作和event都是由分配给EventLoop的那一个Thread来处理的。

让我们再来分析一下cxt.write(message)的源代码:

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
    } else {
        int size = channel.estimatorHandle().size(msg);
        if (size > 0) {
            ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
            if (buffer != null) {
      	        buffer.incrementPendingOutboundBytes(size);
            }
        }
        Runnable task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, msg, size, promise);
        }  else {
            task = WriteTask.newInstance(next, msg, size, promise);
        }
        safeExecute(executor, task, promise, msg);
    }
}

public EventExecutor executor() {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        return executor;
    }
}

private static void safeExecute(EventExecutor executor, Runnable runnable, 
                                            ChannelPromise promise, Object msg) {
    try {
        executor.execute(runnable);
    } catch (Throwable cause) {
        try {
            promise.setFailure(cause);
        } finally {
            if (msg != null) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
}

注意这行:

EventExecutor executor = next.executor();

,获取当前channel所绑定的eventLoop。如果当前调用线程就是分配给该Channel的EventLoop,代码被执行。否则,EventLoop将task放入一个内部的队列延后执行。

EventLoop和EventExecutor什么关系?

public interface EventLoop extends EventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

所以,我们大致分析出,在执行write方法时,Netty会判断current thread是否就是分给该Channe的EventLoop,如果是则行线程执行IO操作,否则提交executor等待分配。当执行await方法时,会从executor里fetch出执行线程,这里就需要checkDeadLock,判断执行线程和current threads是否时同一个线程,如果是就检测为死锁抛出异常BlockingOperationException。

那如何解决?官方建议优先使用addListener(GenericFutureListener),而非await()。

// BAD - NEVER DO THIS
@Override
public void channelRead(ChannelHandlerContext ctx, GoodByeMessage msg) {
    ChannelFuture future = ctx.channel().close();
    future.awaitUninterruptibly();
    // Perform post-closure operation
    // ...
}
// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx,  GoodByeMessage msg) {
    ChannelFuture future = ctx.channel().close();
    future.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            // Perform post-closure operation
            // ...
        }
    });
}

项目代码改造为:

private void pushMessage(T message) {
    try {
        ChannelFuture cf = cxt.writeAndFlush(message);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws PushException {
                if (future.isSuccess()) {
                    logger.debug("send success.");
                } else {
                    throw new PushException("Failed to send message.");
                }
                Throwable cause = future.cause();
                if (cause != null) {
                    throw new PushException(cause);
                }
            }
        });
    } catch (LostConnectException e) {
        this.fireError(new PushException(e));
    } catch (Exception e) {
        this.fireError(new PushException(e));
    } catch (Throwable e) {
        this.fireError(new PushException(e));
    }
}


Reference

https://stackoverflow.com/questions/44390660/how-netty-channelfuture-notify-does-not-cause-dead-lock

https://stackoverflow.com/questions/29161129/netty-blockingoperationexception

https://www.oschina.net/question/3524591_2244242

http://www.jianshu.com/p/a06da3256f0c

http://www.jianshu.com/p/4835eb4e91ab

http://blog.csdn.net/gaolong/article/details/12805581

http://blog.csdn.net/youaremoon/article/details/50279965

http://blog.csdn.net/youaremoon/article/details/50282353

http://www.linkedkeeper.com/detail/blog.action?bid=137


转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)”  ©著作权归作者所有