Netty 中的 Future & Promise
您目前处于:编程  2017年01月01日

Netty 源码中大量使用了异步编程,从代码实现角度看就是大量使用了线程池和 Future。

熟悉 Java 5 的同学一定对 Future 不陌生。简单来说就是其代表了一个异步任务,任务将在未来某个时刻完成,而 Future 这个接口就是用来提供例如获取接口、查看任务状态等功能。

Netty 扩展了 Java 5 引入的 Future 机制。从下面的类图我们可以看到相关类的关系:

Netty 的 Future 接口

需要注意的是,上面类图中有两个 Future,最上面的是 java.util.concurrent.Future,而其下面的则是 io.netty.util.concurrent.Future。

JDK 的 Future 对象,该接口的方法如下:

// 取消异步操作
boolean cancel(boolean mayInterruptIfRunning);
// 异步操作是否取消
boolean isCancelled();
// 异步操作是否完成,正常终止、异常、取消都是完成
boolean isDone();
// 阻塞直到取得异步操作结果
V get() throws InterruptedException, ExecutionException;
// 同上,但最长阻塞时间为timeout
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;

接口中只有 isDone() 方法判断一个异步操作是否完成,但是对于完成的定义过于模糊,JDK 文档指出正常终止、抛出异常、用户取消都会使 isDone() 方法返回真。在我们的使用中,我们极有可能是对这三种情况分别处理,而 JDK 这样的设计不能满足我们的需求。

Netty 扩展了 JDK 的 Future 接口,扩展的方法如下:

// 异步操作完成且正常终止
boolean isSuccess();
// 异步操作是否可以取消
boolean isCancellable();
// 异步操作失败的原因
Throwable cause();
// 添加一个监听者,异步操作完成时回调,类比javascript的回调函数
Future<V> addListener(
    GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListener(
    GenericFutureListener<? extends Future<? super V>> listener);
// 阻塞直到异步操作完成
Future<V> await() throws InterruptedException;
// 同上,但异步操作失败时抛出异常
Future<V> sync() throws InterruptedException;
// 非阻塞地返回异步结果,如果尚未完成返回null
V getNow();

可知,Future 对象有两种状态尚未完成和已完成,其中已完成又有三种状态:成功、失败、用户取消。

Future 接口中的方法都是 getter 方法而没有 setter 方法,也就是说这样实现的 Future 子类的状态是不可变的,如果我们想要变化,Netty 提供的解决方法是:使用可写的 Future 即 Promise。

Netty 的 Promise

Promise 是特殊的可写 Future。Promise 在继承 Future 的基础之上进行了扩展,用来设置 IO 操作的结果。

当 Netty 进行 IO 操作的时候,会创建一个 Promise 对象,当操作完成或者失败的时候就会对 Promise 进行结果设置。

Promise<V> setSuccess(V result);
boolean trySuccess(V result);
Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);
boolean setUncancellable();

Netty 提供了一个 Promise 的默认实现 DefaultPromise。主要是 setSuccess 方法和 await 方法的实现

// 标记异步操作结果为成功,如果已被设置(不管成功还是失败)则抛出异常IllegalStateException
Promise<V> setSuccess(V result);
// 同上,只是结果已被设置时返回False
boolean trySuccess(V result);

Promise<V> setFailure(Throwable cause);
boolean tryFailure(Throwable cause);

// 设置结果为不可取消,结果已被取消返回False
boolean setUncancellable();

Promise 接口继承自 Future 接口,它提供的 setter 方法与常见的 setter 方法大为不同。Promise 从 Uncompleted 到 Completed 的状态转变有且只能有一次,也就是说 setSuccess 和 setFailure 方法最多只会成功一个,此外,在 setSuccess 和 setFailure 方法中会通知注册到其上的监听者。

一个简单实现

class Person extends Thread {
    BlockingQueue<Runnable> taskQueue; //任务队列
    public Person(String name) {
        super(name);
        taskQueue = new LinkedBlockingQueue<>();
    }

    @Override
    public void run() {
        while(true) { //无限循环, 不断从任务队列取任务
            try {
                Runnable task = taskQueue.take();
                task.run();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public void submit(Runnable task) { //将任务提交到任务队列中去
        taskQueue.offer(task);
    }
}

做数学题的例子

void main() {

    final Person wang = new Person("wang");
    final Person li = new Person("li");
    li.start(); //启动小王
    wang.start(); //启动小李

    wang.submit(new Runnable() { //提交一个简单的题
        @Override
        public void run() {
            System.out.println(
                Thread.currentThread().getName() + "1. 这是一道简单的题");
        }
    });

    wang.submit(new Runnable() { //提交一个复杂的题
        @Override
        public void run() {
            li.submit(new Runnable() { //将复杂的题交给li来做
                @Override
                public void run() {
                    System.out.println(
                        Thread.currentThread().getName() + " 2. 这是一道复杂的题");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    wang.submit(new Runnable() { //做完之后将结果作为Task返回给wang
                        @Override
                        public void run() {
                            System.out.println(
                                Thread.currentThread().getName() + "复杂题执行结果");
                        }
                    });
                }
            });
        }
    });

    wang.submit(new Runnable() { //提交一个简单的题
        @Override
        public void run() {
            System.out.println(
            `Thread.currentThread().getName() + " 3. 这是一道简单的题");
        }
    });
}

执行结果是

wang 1. 这是一道简单的题
wang 3. 这是一道简单的题
li 2. 这是一道复杂的题
wang 复杂题执行完毕

Netty 中的实现

final DefaultEventExecutor wang = new DefaultEventExecutor();
final DefaultEventExecutor li = new DefaultEventExecutor();

wang.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println(
            Thread.currentThread().getName() + " 1. 这是一道简单的题");
    }
});

wang.execute(new Runnable() {
    @Override
    public void run() {
        final Promise<Integer> promise = wang.newPromise();
        promise.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) 
                throws Exception {
                System.out.println(Thread.currentThread().getName() + "复杂题执行结果");
            }
        });
        li.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(
                    Thread.currentThread().getName() + " 2. 这是一道复杂的题");
                promise.setSuccess(10);
            }
        });
    }
});

wang.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println(
            Thread.currentThread().getName() + " 3. 这是一道简单的题");
    }
});

执行结果是

defaultEventExecutor-1-1 1. 这是一道简单的题
defaultEventExecutor-1-1 3. 这是一道简单的题
defaultEventExecutor-3-1 2. 这是一道复杂的题
defaultEventExecutor-1-1 复杂题执行结果

看起来和简单实现中的代码差不多, DefaultEventExecutor可以简单的看做拥有一个队列的线程。与简单实现不同的是, 小李执行完任务后通知小王的方式。

在 Netty 中 Promise 代码一个可写的异步任务结果,以上代码的含义是:

生成一个 promise,为该 promise 注册一个 listener,当任务执行完后回调该 listener。

在另一个线程中执行一个异步任务,执行完后,将 promise 设置为成功,回调 listener,该 listener 在异步任务提交者线程中执行。

一种误用

为了实现以上问题,还可以像下面这样写。

wang.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 1. 这是一道简单的题");
    }
});

wang.submit(new Runnable() {
    @Override
    public void run() {
        Future<String> result = li.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                for(int i = 0; i <= 10000000; i++){
                    for(int j = 0; j <= 1000000; j++) {
                        ;
                    }
                }
                System.out.println(
                    Thread.currentThread().getName() + " 2. 这是一道复杂的题");
                return null;
            }
        });
        result.addListener(new GenericFutureListener<Future<? super String>>() {
            @Override
            public void operationComplete(
                Future<? super String> future) throws Exception {
                System.out.println(
                    Thread.currentThread().getName() + "3. 复杂题执行结果");
            }
        });
    }
});

wang.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " 3. 这是一道简单的题");
    }
});

执行结果是:

defaultEventExecutor-1-1 1. 这是一道简单的题
defaultEventExecutor-3-1 2. 这是一道复杂的题
defaultEventExecutor-1-1 3. 这是一道简单的题
defaultEventExecutor-3-1 3. 复杂题执行结果

这样写似乎更简单,但运行一下会发现,listener 的执行却是由小李来处理,按理说,小王交给小李一个任务,小李做完之后将结果返回给小王,应该是小王处理才对啊,可为什么是小李来处理呢?

查看源码可知,DefaultEventExecutor.submit 方法将一个 Callable 包装成一个 DefaultPromise,并且将执行者作为 DefaultPromise 的 exectutor,为什么要这样做呢?

Netty 的异步回调机制需要提交者必须有一个 TaskQueue 才行,而这里 wang 并不一定含有一个 TaskQueue,为了防止因为提交者没有 TaskQueue 而出错,所以只能赋值为执行者,而使用 newPromise 就没有问题,因为 newPromise 是 DefaultEventExecutor 的接口,而 DefaultEventExecutor 肯定有一个 TaskQueue。

Netty 源码中对异步回调的使用

在 Netty 中,ChannelHandlerContext 的 write(msg, promise) 和 bind(address, promise) 等操作都是一个异步操作,为了使该操作不阻塞当前 executor 的执行,一般这样使用:

Promise result = ctx.newPromise(ctx.executor());
ctx.write(msg, result);
result.addListener(listener);


Reference:

http://www.jianshu.com/p/a06da3256f0c

http://lingnanlu.github.io/2016/08/16/netty-asyc-callback


转载请并标注: “本文转载自 linkedkeeper.com (文/张松然)”