Netty

Table of Contents

1. Netty

Netty 是优秀的 Java 网络编程框架。它封装了 Java API,并提供一些更易于使用的 API;它使开发者可以把精力放在业务逻辑上,而不是底层网络编程细节。

参考:本文主要摘自《Netty 实战》

1.1. Netty 核心组件

下面介绍 Netty 中的一些概念。

1.1.1. Channel

可以把 Netty 中的 Channel 看作是传入或者付出数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

注:Java NIO 中也有 Channel,它们表达的概念类似,但所支持的方法很不一样。

1.1.2. 回调

Netty 在内部使用回调来处理事件。下面代码展示了回调的例子:当一个新的连接已经被建立时,ConnectHandler 中的 channelActive()回调方法将会被调用。

public class ConnectHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx)   // channelActive是回调函数
            throws Exception {
        System.out.println(
                "Client " + ctx.channel().remoteAddress() + " connected");
    }
}

1.1.3. ChannelFuture

Future 是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。

JDK 预置了 java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成(这是非常繁琐的),或者一直阻塞直到它完成(这违背了异步执行的本意)。所以 Netty 提供了它自己的实现—— ChannelFuture,它支持在操作完成时自动执行注册的 Listener。

注:Java 8 中新增了 CompletableFuture,可实现类似的功能(操作完成时自动执行其它代码),如果它早点发布,可能 Netty 都不用自己实现 ChannelFuture 了。

下面是 ChannelFuture 的用法演示:

    public static void connect() {
        Channel channel = new NioSocketChannel();
        // Does not block
        ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    ByteBuf buffer = Unpooled.copiedBuffer(
                            "Hello", Charset.defaultCharset());
                    ChannelFuture wf = future.channel()
                            .writeAndFlush(buffer);
                    // ...
                } else {
                    Throwable cause = future.cause();
                    cause.printStackTrace();
                }
            }
        });
    }

1.1.4. 事件和 ChannelHandler

Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。而每个事件都被分发给 ChannelHandler 接口中的某个方法(往往由用户定制其实现)。 我们可以认为接口 ChannelHandler 中的方法是“为了响应特定事件而被执行的回调函数”。

Netty 提供了大量预定义的可以开箱即用的 ChannelHandler 实现,包括用于各种协议(如 HTTP 和 SSL/TLS)的 ChannelHandler。

1.1.4.1. 编码器和解码器

网络只将数据看作是原始的字节序列。然而,我们的应用程序则会把这些字节组织成有意义的信息。在数据和网络字节流之间做相互转换是最常见的编程任务之一。将应用程序的数据转换为网络格式,以及将网络格式转换为应用程序的数据的组件分别叫作编码器和解码器。

编码器和解码器都是 ChannelHandler 接口的实现。

1.1.4.2. EventLoop

Netty 在底层使用了 java.nio.channels.Selector,通过触发事件将 java.nio.channels.Selector 从应用程序中抽象出来,消除了所有本来将需要手动编写的派发代码。在内部,将会为每个 Channel 分配一个 EventLoop(它实现了接口 java.util.concurrent.ExecutorService,可理解为它是一个线程池),用以处理所有事件。

EventLoopGroup 是一组 EventLoop 的抽象,为更好地利用多核 CPU 资源,Netty 实例中一般会有多个 EventLoop(EventLoopGroup 中有个 next 方法可返回下一个 EventLoop)同时工作,每个 EventLoop 维护着一个 java.nio.channels.Selector 实例。

1.1.5. ChannelPipeline

A ChannelPipeline provides a container for a chain of ChannelHandlers and defines an API for propagating the flow of inbound and outbound events along the chain.

Channel 中的数据可以被多个 ChannelHandler 依次处理。这种有序的多个 ChannelHandler 组在一起就构造了 ChannelPipeline,如图 1 所示。

netty_channelpipeline.png

Figure 1: 数据流经 ChannelPipeline 中的多个 ChannelHandler

每一个新创建的 Channel 都将会被分配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。通过表 1 所示的方法,可以修改 ChannelPipeline 中的 ChannelHandler。

Table 1: Modifying ChannelPipeline
ChannelPipeline 接口中的方法 说明
addFirst/addBefore/addAfter/addLast 将一个 ChannelHandler 添加到 ChannelPipeline 中
remove 将一个 ChannelHandler 从 ChannelPipeline 中移除
replace 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 ChannelHandler

2. 实例:Echo 服务器和客户端

2.1. Echo 服务器

所有的 Netty 服务器都需要以下两部分:
一:至少一个 ChannelHandler。该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
二:引导代码。这是配置服务器的启动代码。比如,它会将服务器绑定到它要监听连接请求的端口上。

2.1.1. 服务器 ChannelHandler(业务逻辑)

Echo 服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口(它是 ChannelHandler 的子接口),用来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter 类也就足够了,它提供了 ChannelInboundHandler 的默认实现。

我们感兴趣的方法是:

  • channelRead(): 对于每个传入的消息都会调用;
  • channelReadComplete(): 读取结束时会调用;
  • exceptionCaught(): 在读取操作期间,有异常抛出时会调用。

下面是 Echo 服务器业务逻辑的主要代码:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);    // 将接收到的消息写给发送者,而不冲刷出站消息
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)   // 将未决消息冲刷到远程节点,并且关闭该Channel
           .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.1.2. 服务器引导代码

下面是服务器引导相关代码片断:

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args)
        throws Exception {
        if (args.length != 1) {
            System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
            return;
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                .channel(NioServerSocketChannel.class)     // 指定所使用的NIO传输Channel
                .localAddress(new InetSocketAddress(port)) // 使用指定的端口设置套接字地址
                .childHandler(new ChannelInitializer<SocketChannel>() { // 添加Handler到Channel的ChannelPipeline
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(serverHandler);
                    }
                });

            ChannelFuture f = b.bind().sync();   // 异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
            System.out.println(EchoServer.class.getName() +
                " started and listening for connections on " + f.channel().localAddress());
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

2.2. Echo 客户端

和编写服务器类似,编写客户端也包含两个部分:业务逻辑和引导代码。

2.2.1. 客户端 ChannelHandler(业务逻辑)

如同服务器,客户端将拥有一个用来处理数据的 ChannelInboundHandler。在这个场景下,可以扩展 SimpleChannelInboundHandler 类以处理所有必须的任务:

@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {  // 在当服务器的连接已经建立之后将被调用
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { // 当从服务器接收消息时被调用
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,  // 在处理过程中引发异常时被调用
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

注 1:每当接收数据时,都会调用 channelRead0()方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了 5 字节,那么不能保证这 5 字节会被一次性接收。即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有 3 字节的 ByteBuf(Netty 的字节容器),第二次使用一个持有 2 字节的 ByteBuf。
注 2:channelRead0()这个方法名称很奇怪。在官方文档中提到,它在 Netty 5.0 中会被重命名为 messageReceived(),这显然比 channelRead0()更加容易理解。

2.2.2. 客户端引导代码

下面是客户端引导的相关代码片断:

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host, port))  // 设置服务器的InetSocketAddress
                .handler(new ChannelInitializer<SocketChannel>() { // 向ChannelPipeline中添加一个EchoClientHandler实例
                    @Override
                    public void initChannel(SocketChannel ch)
                        throws Exception {
                        ch.pipeline().addLast(
                             new EchoClientHandler());
                    }
                });
            ChannelFuture f = b.connect().sync(); // 连接到远程节点,阻塞等待直到连接完成
            f.channel().closeFuture().sync();     // 阻塞,直到Channel关闭
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>" );
            return;
        }

        final String host = args[0];
        final int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}

Author: cig01

Created: <2018-02-28 Wed>

Last updated: <2018-05-17 Thu>

Creator: Emacs 27.1 (Org mode 9.4)