Netty

Table of Contents

1 Netty

Netty 是优秀的Java网络编程框架。它封装了Java API,并提供一些更易于使用的API;它使开发者可以把精力放在业务逻辑上,而不是底层网络编程细节。

参考:本文主要摘自《Netty实战》

1.1 Netty核心组件

下面介绍Netty中的一些概念。

1.1.1 Channel

可以把Netty中的 Channel 看作是传入或者付出数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。

注:Java NIO中也有Channel,它们表达的概念类似,但所支持的方法很不一样。

1.1.2 回调

Netty在内部使用回调来处理事件。下面代码展示了回调的例子:当一个新的连接已经被建立时,ConnectHandler中的channelActive()回调方法将会被调用。

public class ConnectHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx)   // channelActive是回调函数
            throws Exception {
        System.out.println(
                "Client " + ctx.channel().remoteAddress() + " connected");
    }
}

1.1.3 ChannelFuture

Future是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。

JDK预置了java.util.concurrent.Future,但是其所提供的实现,只允许手动检查对应的操作是否已经完成(这是非常繁琐的),或者一直阻塞直到它完成(这违背了异步执行的本意)。所以Netty提供了它自己的实现—— ChannelFuture,它支持在操作完成时自动执行注册的Listener。

注:Java 8中新增了CompletableFuture,可实现类似的功能(操作完成时自动执行其它代码),如果它早点发布,可能Netty都不用自己实现ChannelFuture了。

下面是ChannelFuture的用法演示:

    public static void connect() {
        Channel channel = new NioSocketChannel();
        // Does not block
        ChannelFuture future = channel.connect(new InetSocketAddress("192.168.0.1", 25));
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (future.isSuccess()) {
                    ByteBuf buffer = Unpooled.copiedBuffer(
                            "Hello", Charset.defaultCharset());
                    ChannelFuture wf = future.channel()
                            .writeAndFlush(buffer);
                    // ...
                } else {
                    Throwable cause = future.cause();
                    cause.printStackTrace();
                }
            }
        });
    }

1.1.4 事件和ChannelHandler

Netty使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。而每个事件都被分发给 ChannelHandler 接口中的某个方法(往往由用户定制其实现)。 我们可以认为接口ChannelHandler中的方法是“为了响应特定事件而被执行的回调函数”。

Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议(如HTTP 和SSL/TLS)的ChannelHandler。

1.1.4.1 编码器和解码器

网络只将数据看作是原始的字节序列。然而,我们的应用程序则会把这些字节组织成有意义的信息。在数据和网络字节流之间做相互转换是最常见的编程任务之一。将应用程序的数据转换为网络格式,以及将网络格式转换为应用程序的数据的组件分别叫作编码器和解码器。

编码器和解码器都是 ChannelHandler 接口的实现。

1.1.4.2 EventLoop

Netty在底层使用了java.nio.channels.Selector,通过触发事件将java.nio.channels.Selector从应用程序中抽象出来,消除了所有本来将需要手动编写的派发代码。在内部,将会为每个Channel分配一个 EventLoop(它实现了接口java.util.concurrent.ExecutorService,可理解为它是一个线程池),用以处理所有事件。

EventLoopGroup 是一组EventLoop的抽象,为更好地利用多核CPU资源,Netty实例中一般会有多个EventLoop(EventLoopGroup中有个next方法可返回下一个EventLoop)同时工作,每个EventLoop维护着一个java.nio.channels.Selector实例。

1.1.5 ChannelPipeline

A ChannelPipeline provides a container for a chain of ChannelHandlers and defines an API for propagating the flow of inbound and outbound events along the chain.

Channel中的数据可以被多个ChannelHandler依次处理。这种有序的多个ChannelHandler组在一起就构造了 ChannelPipeline,如图 1 所示。

netty_channelpipeline.png

Figure 1: 数据流经ChannelPipeline中的多个ChannelHandler

每一个新创建的Channel都将会被分配一个新的ChannelPipeline。这项关联是永久性的;Channel既不能附加另外一个ChannelPipeline,也不能分离其当前的。在Netty组件的生命周期中,这是一项固定的操作,不需要开发人员的任何干预。通过表 1 所示的方法,可以修改ChannelPipeline中的ChannelHandler。

Table 1: Modifying ChannelPipeline
ChannelPipeline接口中的方法 说明
addFirst/addBefore/addAfter/addLast 将一个ChannelHandler添加到ChannelPipeline中
remove 将一个ChannelHandler从ChannelPipeline中移除
replace 将ChannelPipeline中的一个ChannelHandler替换为另一个ChannelHandler

2 实例:Echo服务器和客户端

2.1 Echo服务器

所有的Netty服务器都需要以下两部分:
一:至少一个ChannelHandler。该组件实现了服务器对从客户端接收的数据的处理,即它的业务逻辑。
二:引导代码。这是配置服务器的启动代码。比如,它会将服务器绑定到它要监听连接请求的端口上。

2.1.1 服务器ChannelHandler(业务逻辑)

Echo服务器会响应传入的消息,所以它需要实现 ChannelInboundHandler 接口(它是 ChannelHandler 的子接口),用来定义响应入站事件的方法。这个简单的应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter 类也就足够了,它提供了ChannelInboundHandler的默认实现。

我们感兴趣的方法是:

  • channelRead(): 对于每个传入的消息都会调用;
  • channelReadComplete(): 读取结束时会调用;
  • exceptionCaught(): 在读取操作期间,有异常抛出时会调用。

下面是Echo服务器业务逻辑的主要代码:

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);    // 将接收到的消息写给发送者,而不冲刷出站消息
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)
            throws Exception {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)   // 将未决消息冲刷到远程节点,并且关闭该Channel
           .addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

2.1.2 服务器引导代码

下面是服务器引导相关代码片断:

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public static void main(String[] args)
        throws Exception {
        if (args.length != 1) {
            System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
            return;
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                .channel(NioServerSocketChannel.class)     // 指定所使用的NIO传输Channel
                .localAddress(new InetSocketAddress(port)) // 使用指定的端口设置套接字地址
                .childHandler(new ChannelInitializer<SocketChannel>() { // 添加Handler到Channel的ChannelPipeline
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(serverHandler);
                    }
                });

            ChannelFuture f = b.bind().sync();   // 异步地绑定服务器;调用sync()方法阻塞等待直到绑定完成
            System.out.println(EchoServer.class.getName() +
                " started and listening for connections on " + f.channel().localAddress());
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

2.2 Echo客户端

和编写服务器类似,编写客户端也包含两个部分:业务逻辑和引导代码。

2.2.1 客户端ChannelHandler(业务逻辑)

如同服务器,客户端将拥有一个用来处理数据的ChannelInboundHandler。在这个场景下,可以扩展 SimpleChannelInboundHandler 类以处理所有必须的任务:

@Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {  // 在当服务器的连接已经建立之后将被调用
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
                CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { // 当从服务器接收消息时被调用
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,  // 在处理过程中引发异常时被调用
        Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

注1:每当接收数据时,都会调用channelRead0()方法。需要注意的是,由服务器发送的消息可能会被分块接收。也就是说,如果服务器发送了5字节,那么不能保证这5字节会被一次性接收。即使是对于这么少量的数据,channelRead0()方法也可能会被调用两次,第一次使用一个持有3字节的ByteBuf(Netty的字节容器),第二次使用一个持有2字节的ByteBuf。
注2:channelRead0()这个方法名称很奇怪。在官方文档中提到,它在Netty 5.0中会被重命名为messageReceived(),这显然比channelRead0()更加容易理解。

2.2.2 客户端引导代码

下面是客户端引导的相关代码片断:

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host, port))  // 设置服务器的InetSocketAddress
                .handler(new ChannelInitializer<SocketChannel>() { // 向ChannelPipeline中添加一个EchoClientHandler实例
                    @Override
                    public void initChannel(SocketChannel ch)
                        throws Exception {
                        ch.pipeline().addLast(
                             new EchoClientHandler());
                    }
                });
            ChannelFuture f = b.connect().sync(); // 连接到远程节点,阻塞等待直到连接完成
            f.channel().closeFuture().sync();     // 阻塞,直到Channel关闭
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>" );
            return;
        }

        final String host = args[0];
        final int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}

Author: cig01

Created: <2018-02-28 Wed 00:00>

Last updated: <2018-05-17 Thu 21:43>

Creator: Emacs 25.3.1 (Org mode 9.1.4)