引言

在前面关于《Netty入门篇》的文章中,我们现已开端对Netty这个闻名的网络结构有了认知,本章的意图则是接受上文,再对Netty中的一些进阶知识进行论述,究竟前面的内容中,仅论述了一些Netty的核心组件,想要真正把握Netty结构,关于它我们应该具有更为全面的认知。

一、Netty中的粘包半包问题

实践上粘包、半包问题,并不仅仅只在Netty中存在,凡是依据TCP协议构建的网络组件,根本都需求面临这两个问题,关于粘包问题,在之前关于《计算机网络与协议簇-TCP沾包》中也曾讲到过:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

但其时我写成了沾包,但实践上专业的术语解释为:粘包,这儿我纠正一下,接着再简略说清楚粘包和半包的问题:

粘包:这种现象就好像其名,指通信双方中的一端发送了多个数据包,但在另一端则被读取成了一个数据包,比方客户端发送123、ABC两个数据包,但服务端却收成的却是123ABC这一个数据包。形成这个问题的实质原因,在前面TCP的章节中讲过,这首要是由于TPC为了优化传输功率,将多个小包兼并成一个大包发送,一同多个小包之间没有鸿沟切开形成的。

半包:指通信双方中的一端发送一个大的数据包,但在另一端被读取成了多个数据包,例如客户端向服务端发送了一个数据包:ABCDEFGXYZ,而服务端则读取成了ABCEFG、XYZ两个包,这两个包实践上都是一个数据包中的一部分,这个现象则被称之为半包问题(发生这种现象的原因在于:接纳方的数据接纳缓冲区过小导致的)。

上述说到的这两种网络通信的问题具体该怎么处理,这点我们放到后面再细说,先来看看Netty中的沾包和半包问题。

1.1、Netty的粘包、半包问题演示

这儿也就不多说废话了,结合《Netty入门篇》的知识,快速搭建出一个服务端、客户端的通信事例,如下:

// 演示数据粘包问题的服务端
public class AdhesivePackageServer {
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        server.childHandler(new ServerInitializer());
        server.bind("127.0.0.1",8888);
    }
}
// 演示粘包、半包问题的通用初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
            // 数据安排稳当事情:当收到客户端数据时会读取通道内的数据
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx)
                    throws Exception {
                // 在这儿直接输出通道内的数据信息
                System.out.println(ctx.channel());
                super.channelReadComplete(ctx);
            }
        });
    }
}
// 演示数据粘包问题的客户端
public class AdhesivePackageClient {
    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker);
            client.channel(NioSocketChannel.class);
            client.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        // 在通道准备安排稳当后会触发的事情
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) 
                                                            throws Exception {
                            // 向服务端发送十次数据,每次发送一个字节!
                            for (int i = 0; i < 10; i++) {
                                System.out.println("正在向服务端发送第"+ 
                                                        i +"次数据......");
                                ByteBuf buffer = ctx.alloc().buffer(1);
                                buffer.writeBytes(new byte[]{(byte) i});
                                ctx.writeAndFlush(buffer);
                            }
                        }
                    });
                }
            });
            client.connect("127.0.0.1", 8888).sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }
    }
}

这个事例中的代码也并不难了解,客户端的代码中,会向服务端发送十次数据,而服务端仅仅只做了数据读取的动作而已,接着来看看运转成果:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

从运转成果中可显着观测到,客户端发送的十个1Bytes的数据包,在服务端直接被兼并成了一个10Bytes的数据包,这显着便是粘包的现象,接着再来看看半包的问题,代码如下:

// 演示半包问题的服务端
public class HalfPackageServer {
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        // 调整服务端的接纳窗口巨细为四字节
        server.option(ChannelOption.SO_RCVBUF,4);
        server.childHandler(new ServerInitializer());
        server.bind("127.0.0.1",8888);
    }
}
// 演示半包问题的客户端
public class HalfPackageClient {
    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker);
            client.channel(NioSocketChannel.class);
            client.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        // 在通道准备安排稳当后会触发的事情
                        @Override
                        public void channelActive(ChannelHandlerContext ctx)
                                throws Exception {
                            // 向服务端发送十次数据,每次发送十个字节!
                            for (int i = 0; i < 10; i++) {
                                ByteBuf buffer = ctx.alloc().buffer();
                                buffer.writeBytes(new byte[]
                                        {'a','b','c','d','e','f','g','x','y','z'});
                                ctx.writeAndFlush(buffer);
                            }
                        }
                    });
                }
            });
            client.connect("127.0.0.1", 8888).sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }
    }
}-

上面的代码中,客户端向服务端发送了十次数据,每次数据会发送10个字节,而在服务端多加了下述这行代码:

server.option(ChannelOption.SO_RCVBUF,4);

这行代码的作用是调整服务端的接纳窗口巨细为四字节,由于默许的接纳窗口较大,客户端需求一次性发送很多数据才能演示出半包现象,这儿为了便于演示,因而将接纳窗口调小,运转成果如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

从上述运转成果中,也可以显着观察到半包现象,客户端发送的十个数据包,每个包中的数据都为10字节,但服务端中,接纳到的数据显着并不契合预期,尤其是第三个数据包,是一个不折不扣的半包现象。

1.2、粘包、半包问题的发生原因

前面简略聊了一下粘包、半包问题,但这些问题究竟是什么原因导致的呢?关于这点前面并未深入探讨,这儿来做一致解说,想要弄了解粘包、半包问题的发生原因,这还得说回TCP协议,我们还记得之前说过的TCP-滑动窗口嘛?

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

1.2.1、TCP协议的滑动窗口

由于TCP是一种可靠性传输协议,所以在网络通信进程中,会选用一问一答的形式,也便是一端发送数据后,必须得到另一端回来ACK呼应后,才会持续发送后续的数据。但这种一问一答的同步办法,显着会非常影响数据的传输功率。

TCP协议为了处理传输功率的问题,引入了一种名为滑动窗口的技能,也便是在发送方和接纳方上各有一个缓冲区,这个缓冲区被称为“窗口”,假定发送方的窗口巨细为100KB,那么发送端的前100KB数据,无需等候接纳端回来ACK,可以一向发送,直到发满100KB数据中止。

假如发送端在发送前100KB数据时,接纳端回来了某个数据包的ACK,那此刻发送端的窗口会一向向下滑动,比方开端窗口规模是0~100KB,收到ACK后会滑动到20~120KB、120~220KB....(实践上窗口的巨细、规模,TCP会依据网络拥塞程度、ACK呼应时刻等状况来自动调整)。

一同,除开发送方有窗口外,接纳方也会有一个窗口,接纳方只会读取窗口规模之内的数据,假如超出窗口规模的数据并不会读取,这也就意味着不会对窗口之外的数据包回来ACK,所以发送方在未收到ACK时,对应的窗口会中止向后滑动,并在必定时刻后对未回来ACK的数据进行重发。

关于TCP的滑动窗口,发送方的窗口起到优化传输功率的作用,而接纳端的窗口起到流量操控的作用。

1.2.2、传输层的MSS与链路层的MTU

了解了滑动窗口的概念后,接着来说说MSS、MTU这两个概念,MSS是传输层的最大报文长度约束,而MTU则是链路层的最大数据包巨细约束,一般MTU会约束MSS,比方MTU=1500,那么MSS最大只能为1500减去报文头长度,以TCP协议为例,MSS最大为1500-40=1460

为什么需求这个约束呢?这是由于网络设备硬件导致的,比方恣意类型的网卡,不或许让一个数据包无限增长,由于网卡会有带宽约束,比方一次性传输一个1GB的数据包,假如不约束巨细直接发送,这会导致网络呈现阻塞,并且超出网络硬件设备单次传输的最大约束。

所以当一个数据包,超出MSS巨细时,TCP协议会自动切开这个数据包,将该数据包拆分红一个个的小包,然后分批次进行传输,然后完成大文件的传输。

1.2.3、TCP协议的Nagle算法

依据MSS最大报文约束,可以完成大文件的切开并分批发送,但在网络通信中,还有另一种特别状况,便是极小的数据包传输,由于TCP的报文头默许会有40个字节,假如数据只要1字节,那加上报文头仍旧会发生一个41字节的数据包。

假如这种体积较小的数据包在传输中经常呈现,这定然会导致网络资源的糟蹋,究竟数据包中只要1字节是数据,另外40个字节是报文头,假如呈现1W个这样的数据包,也就意味着会发生400MB的报文头,但实践数据只占10MB,这显着是不稳当的。

正是由于上述原因,因而TCP协议中引入了一种名为Nagle的算法,如若接连几次发送的数据都很小,TCP会依据算法把多个数据兼并成一个包宣布,然后优化网络传输的功率,并且削减对资源的占用。

1.2.4、使用层的接纳缓冲区和发送缓冲区

关于操作系统的IO函数而言,网络数据不管是发送也好,仍是接纳也罢,并不会选用“复制”的办法作业,比方现在想要传输一个10MB的数据,不或许直接将这个数据一次性拷贝到缓冲区内,而是一个一个字节进行传输,举个比方:

假定现在要发送ABCDEFGXYZ....这组数据,IO函数会挨个将每个字节放到发送缓冲区中,会呈现A、B、C、D、E、F....这个次序挨个写入,而接纳方仍旧如此,读取数据时也会一个个字节读取,以A、B、C、D、E、F....这个次序读取一个数据包中的数据(实践状况会杂乱一些,或许会按必定单位操作数据,而并不是以单个字节作为单位)。

而使用程序为了发送/接纳数据,通常都需求具有两个缓冲区,即所说的接纳缓冲区和发送缓冲区,一个用来暂存要发送的数据,另一个则用来暂存接纳到的数据,一同这两个缓冲区的巨细,可自行调整其巨细(Netty默许的接纳/发送缓冲区巨细为1024KB)。

1.2.5、粘包、半包问题的发生原因

了解了上述几个概念后,接着再来看看粘包和半包就简略很多了,粘包和半包问题,或许会由多方面因素导致,如下:

  • 粘包:发送12345、ABCDE两个数据包,被接纳成12345ABCDE一个数据包,多个包粘在一同。
    • 使用层:接纳方的接纳缓冲区太大,导致读取多个数据包一同输出。
    • TCP滑动窗口:接纳方窗口较大,导致发送方宣布多个数据包,处理不及时形成粘包。
    • Nagle算法:由于发送方的数据包体积过小,导致多个数据包兼并成一个包发送。
  • 半包:发送12345ABCDE一个数据包,被接纳成12345、ABCDE两个数据包,一个包拆成多个。
    • 使用层:接纳方缓冲区太小,无法存方发送方的单个数据包,因而拆开读取。
    • 滑动窗口:接纳方的窗口太小,无法一次性放下完好数据包,只能读取其间一部分。
    • MSS约束:发送方的数据包超过MSS约束,被拆分为多个数据包发送。

上述便是呈现粘包、半包问题的根本原因,更多的是由于TCP协议形成的,所以想要处理这两个问题,就得自己重写底层的TCP协议,这关于我们而言并不现实,究竟TCP/IP协议栈,根本包括各式各样的网络设备,想要从根源上处理粘包、半包问题,重写协议后还得替换掉一切网络设备内部的TCP完成,现在世界上没有任何一个安排、企业、个人具有这样的影响力。

1.3、粘包、半包问题的处理计划

既然无法在底层从根源上处理问题,那此刻可以换个思路,也便是从使用层动身,粘包、半包问题都是由于数据包与包之间,没有鸿沟切开导致的,那想要处理这样的问题,发送方可以在每个数据包的尾部,自己拼接一个特别分隔符,接纳方读取到数据时,再依据对应的分隔符读取数据即可。

关于其他的一些网络编程的技能栈,我们不做过多延伸,要点来聊一聊Netty中的粘包、半包问题该怎么处理呢?其实这也并不需求自己动手处理,由于Netty内部早已内置了相关完成,究竟我们能想到的问题,结构的设计者也早已料到,接着一同来看看Netty的处理计划吧。

1.3.1、运用短衔接处理粘包问题

关于短衔接我们应该都不生疏,HTTP/1.0版别中,默许运用的便是TCP短衔接,这是指客户端在发送一次数据后,就会立马断开与服务端的网络衔接,在客户端断开衔接后,服务端会收到一个-1的状况码,而我们可以用这个作为音讯(数据)的鸿沟,以此区分不同的数据包,如下:

// 演示通过短衔接处理粘包问题的服务端
public class AdhesivePackageServer {
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        server.childHandler(new ServerInitializer());
        server.bind("127.0.0.1",8888);
    }
}
// 演示通过短衔接处理粘包问题的客户端
public class Client {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            sendData();
        }
    }
    private static void sendData(){
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker);
            client.channel(NioSocketChannel.class);
            client.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        // 在通道准备安排稳当后会触发的事情
                        @Override
                        public void channelActive(ChannelHandlerContext ctx)
                                throws Exception {
                            // 向服务端发送一个20字节的数据包,然后断开衔接
                            ByteBuf buffer = ctx.alloc().buffer(1);
                            buffer.writeBytes(new byte[]
                                        {'0','1','2','3','4',
                                        '5','6','7','8','9',
                                        'A','B','C','D','E',
                                        'M','N','X','Y','Z'});
                            ctx.writeAndFlush(buffer);
                            ctx.channel().close();
                        }
                    });
                }
            });
            client.connect("127.0.0.1", 8888).sync();
        } catch (Exception e){
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
        }
    }
}

服务端的代码,仍旧用之前演示粘包问题的AdhesivePackageServer,上述只对客户端的代码进行了改造,首要是将创立客户端衔接、发送数据的代码抽象成了一个办法,然后在循环内部调用该办法,运转成果如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

从运转成果中可以看出,发送的3个数据包,都未呈现粘包问题,每个数据包之间都是独立切开的。但这种办法处理粘包问题,实践上归于一种“投机取巧”的计划,究竟每个数据包都选用新的衔接发送,在操作系统级别来看,每个数据包都源自于不同的网络套接字,天然会分开读取。

但这种办法无法处理半包问题,例如这儿我们将服务端的接纳缓冲区调小:

// 演示半包问题的服务端
public class HalfPackageServer {
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        // 调整服务端的接纳缓冲区巨细为16字节(最小为16,无法设置更小)
        server.childOption(ChannelOption.RCVBUF_ALLOCATOR,
                new AdaptiveRecvByteBufAllocator(16,16,16));
        server.childHandler(new ServerInitializer());
        server.bind("127.0.0.1",8888);
    }
}

然后再发动这个服务端,接着再发动前面的客户端,作用如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

从成果中仍旧会发现,多个数据包之间仍是发生了半包问题,由于服务端的接纳缓冲区一次性最大只能存下16Bytes数据,所以客户端每次发送20Bytes数据,无法全部存入缓冲区,终究就呈现了一个数据包被拆成多个包读取。

正由于短衔接这种办法,无法很好的处理半包问题,所以一般线上除开特别场景外,不然不会运用短衔接这种形式来单独处理粘包问题,接着看看Netty中供给的一些处理计划。

1.3.2、定长帧解码器

前面聊到的短衔接办法,处理粘包问题的思路归于投机取巧行为,一同也需求频繁的树立/断开衔接,这无论是从资源利用率、仍是程序执行的功率上来说,都并不稳当,而Netty中供给了一系列处理粘包、半包问题的完成类,即Netty的帧解码器,先来看看定长帧解码器,事例如下:

// 通过定长帧解码器处理粘包、半包问题的演示类
public class FixedLengthFrameDecoderDemo {
    public static void main(String[] args) {
        // 通过Netty供给的测验通道来代替服务端、客户端
        EmbeddedChannel channel = new EmbeddedChannel(
                // 增加一个定长帧解码器(每条数据以8字节为单位拆包)
                new FixedLengthFrameDecoder(8),
                new LoggingHandler(LogLevel.DEBUG)
        );
        // 调用三次发送数据的办法(等价于向服务端发送三次数据)
        sendData(channel,"ABCDEGF",8);
        sendData(channel,"XYZ",8);
        sendData(channel,"12345678",8);
    }
    private static void sendData(EmbeddedChannel channel, String data, int len){
        //  获取发送数据的字节长度
        byte[] bytes = data.getBytes();
        int dataLength = bytes.length;
        // 依据固定长度补齐要发送的数据
        String alignString = "";
        if (dataLength < len){
            int alignLength = len - bytes.length;
            for (int i = 1; i <= alignLength; i++) {
                alignString = alignString + "*";
            }
        }
        // 拼接上补齐字符,得到终究要发送的音讯数据
        String msg = data + alignString;
        byte[] msgBytes = msg.getBytes();
        // 构建缓冲区,通过channel发送数据
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
        buffer.writeBytes(msgBytes);
        channel.writeInbound(buffer);
    }
}

留意看上述这个事例,在其间就并未搭建服务端、客户端了,而是选用EmbeddedChannel目标来测验,这个通道是Netty供给的测验通道,可以依据它来快速搭建测验用例,上述中的:

new EmbeddedChannel(
    new FixedLengthFrameDecoder(8),
    new LoggingHandler(LogLevel.DEBUG)
);

这段代码,就类似于之前在服务端的pipeline增加处理器的进程,等价于下述这段代码:

socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(8));
socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

了解了EmbeddedChannel后,接着先来看看运转成果,如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

留意看上述成果,在该事例中,服务端会以8Bytes为单位,然后对数据进行分包处理,平均每读取8Bytes数据,就会将其当作一个数据包。假如客户端发送的一条数据,长度没有8个字节,在sendData()办法中则会以*号补齐。比方上图中,发送了一条XYZ数据,由于长度只要3字节,所以会再拼接五个*号补齐八字节的长度。

这种选用固定长度解析数据的办法,确实可以有用防止粘包、半包问题的呈现,由于每个数据包之间,会以八个字节的长度作为鸿沟,然后切开数据。但这种办法也存在三个致命缺陷:

  • ①只适用于传输固定长度规模内的数据场景,并且客户端在发送数据前,还需自己依据长度补齐数据。
  • ②假如发送的数据超出固定长度,服务端仍旧会按固定长度分包,所以仍然会存在半包问题。
  • ③关于未达到固定长度的数据,还需求额定传输补齐的*号字符,会占用不用要的网络资源。

1.3.3、行帧解码器

上面说到的定长帧解码器,由于运用时存在少许约束,运用它来解析数据就并不那么灵活,尤其是针关于一些数据长度可变的场景,显得就有少许乏力,因而Netty中还供给了行帧解码器,事例如下:

// 通过行帧解码器处理粘包、半包问题的演示类
public class LineFrameDecoderDemo {
    public static void main(String[] args) {
        // 通过Netty供给的测验通道来代替服务端、客户端
        EmbeddedChannel channel = new EmbeddedChannel(
            // 增加一个行帧解码器(在超出1024后还未检测到换行符,就会中止读取)
            new LineBasedFrameDecoder(1024),
            new LoggingHandler(LogLevel.DEBUG)
        );
        // 调用三次发送数据的办法(等价于向服务端发送三次数据)
        sendData(channel,"ABCDEGF");
        sendData(channel,"XYZ");
        sendData(channel,"12345678");
    }
    private static void sendData(EmbeddedChannel channel, String data){
        // 在要发送的数据完毕,拼接上一个\n换行符(\r\n也可以)
        String msg = data + "\n";
        //  获取发送数据的字节长度
        byte[] msgBytes = msg.getBytes();
        // 构建缓冲区,通过channel发送数据
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
        buffer.writeBytes(msgBytes);
        channel.writeInbound(buffer);
    }
}

在上述事例中,我们给服务端增加了一个LineBasedFrameDecoder(1024)行解码器,其间有个1024的数字,这是啥意思呢?这个是数据的最大长度约束,究竟在网络接纳进程中,假如一向没有读取到换行符,总不能一向接纳下去,所以当数据的长度超出该值后,Netty会默许将前面读到的数据分红一个数据包。

一同在发送数据的sendData()办法中,这回就无需我们自己补齐数据了,只需在每个要发送的数据完毕,手动拼接上一个\n\r\n换行符即可,服务端在读取数据时,会按换行符来作为鸿沟切开,运转成果如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

从成果中可以看出,每个数据包都是按客户端发送的格式做了解析,并未呈现粘包、半包现象。

1.3.4、分隔符帧解码器

上面聊了以换行符作为分隔符的解码器,但Netty中还供给了自定义分隔符的解码器,运用这种解码器,能让诸位随心所欲的定义自己的分隔符,事例如下:

public class DelimiterFrameDecoderDemo {
    public static void main(String[] args) {
        // 自定义一个分隔符(记得要用ByteBuf目标来包装)
        ByteBuf delimiter = ByteBufAllocator.DEFAULT.buffer(1);
        delimiter.writeByte('*');
        // 通过Netty供给的测验通道来代替服务端、客户端
        EmbeddedChannel channel = new EmbeddedChannel(
                // 增加一个分隔符帧解码器(传入自定义的分隔符)
                new DelimiterBasedFrameDecoder(1024,delimiter),
                new LoggingHandler(LogLevel.DEBUG)
        );
        // 调用三次发送数据的办法(等价于向服务端发送三次数据)
        sendData(channel,"ABCDEGF");
        sendData(channel,"XYZ");
        sendData(channel,"12345678");
    }
    private static void sendData(EmbeddedChannel channel, String data){
        // 在要发送的数据完毕,拼接上一个*号(由于前面自定义的分隔符为*号)
        String msg = data + "*";
        //  获取发送数据的字节长度
        byte[] msgBytes = msg.getBytes();
        // 构建缓冲区,通过channel发送数据
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
        buffer.writeBytes(msgBytes);
        channel.writeInbound(buffer);
    }
}

这个事例的运转成果与上一个完全相同,不同点则在于换了一个解码器,换成了:

new DelimiterBasedFrameDecoder(1024,delimiter)

而后发送数据的时分,对每个数据的完毕,手动拼接一个*号作为分隔符即可。

相较于本来的定长解码器,行解码器、自定义分隔符解码器显着更加灵活,由于支撑可变长度的数据,但这两种解码器,仍旧存在少许缺陷:

  • ①关于每一个读取到的字节都需求判别一下:是否为完毕的分隔符,这会影响整体功用。
  • ②仍旧存在最大长度约束,当数据超出最大长度后,会自动将其分包,在数据传输量较大的状况下,仍旧会导致半包现象呈现。

1.3.5、LTC帧解码器

前面聊过的多个解码器中,无论是哪个,都多多少少会存在少许不完美,因而Netty终究供给了一款LTC解码器,这个解码器也归于实践Netty开发中,使用最为广泛的一种,但了解起来稍微有些杂乱,先来看看它的结构办法:

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    public LengthFieldBasedFrameDecoder(
            int maxFrameLength, 
            int lengthFieldOffset, 
            int lengthFieldLength, 
            int lengthAdjustment, 
            int initialBytesToStrip) {
        this(maxFrameLength, 
        lengthFieldOffset, 
        lengthFieldLength, 
        lengthAdjustment, 
        initialBytesToStrip, true);
    }
    // 暂时省略其他参数的结构办法......
}

从上述结构器中可显着看出,LTC中存在五个参数,看起来都比较长,接着简略解释一下:

  • maxFrameLength:数据最大长度,答应单个数据包的最大长度,超出长度后会自动分包。
  • lengthFieldOffset:长度字段偏移量,表明描绘数据长度的信息从第几个字段开端。
  • lengthFieldLength:长度字段的占位巨细,表明数据中的运用了几个字节描绘正文长度。
  • lengthAdjustment:长度调整数,表明在长度字段的N个字节后才是正文数据的开端。
  • initialBytesToStrip:头部剥离字节数,表明先将数据去掉N个字节后,再开端读取数据。

上述这种办法描绘五个参数,我们估计了解起来有些困难,那么下面结合Netty源码中的注释,先把这几个参数完全搞了解再说,先来看个事例:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

比方上述这组数据,对应的参数如下:

lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0

这组参数表明啥意思呢?表明现在这条数据,长度字段从第0个字节开端,运用4个字节来描绘数据长度,这时服务端会读取数据的前4个字节,得到正文数据的长度,然后得知:在第四个字节之后,再往后读十个字节,是一条完好的数据,终究向后读取10个字节,终究就会读到Hi, ZhuZi.这条数据。

但上述这种办法对数据解码之后,读取时仍旧会显现长度字段,也便是前四个用来描绘长度的字节也会被读到,因而终究会显现出10Hi, ZhuZi.这样的格式,那假如想要去掉前面的长度字段怎么办呢?这需求用到initialBytesToStrip参数,如下:

lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 4

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

这组参数又是啥意思呢?其实和前面那一组数据没太大的改变,仅仅用initialBytesToStrip声明要剥离掉前4个字节,所以数据通过解码后,终究会去掉前面描绘长度的四个字节,仅显现Hi, ZhuZi.这十个字节的数据。

上述这种形式,其实便是预设了一个长度字段,服务端、客户端之间约定运用N个字节来描绘数据长度,接着在读取数据时,读取指定个字节,得到本次数据的长度,终究可以正常解码数据。但这种办法只能满意最根本的数据传输,假如在数据中还需求增加一些正文信息,比方附加数据头信息、版别号的状况,又该怎么处理呢?如下:

lengthFieldOffset = 8
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

上述这个示例中,假定附加信息占8Bytes,这儿就需求用到lengthFieldOffset参数,以此来表明长度字段偏移量是8,这意味着读取数据时,要从第九个字节开端,往后读四个字节的数据,才可以得到描绘数据长度的字段,然后解析得到10,终究再往后读取十个字节的数据,读到一条完好的数据。

当然,假如只想要读到正文数据怎么办?如下:

lengthFieldOffset = 8
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 12

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

仍旧只需求通过initialBytesToStrip参数,从头部剥离掉前12个字节即可,这儿的12个字节,由八字节的附加信息、四字节的长度描绘组成,去掉这两部分,天然就得到了正文数据。

OK,再来看另一种状况,假如长度字段在最前面,附加信息在中心,但我只想要读取正文数据怎么办呢?

lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 8
initialBytesToStrip = 12

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

在这儿我们又用到了lengthAdjustment这个参数,这个参数是长度调整数的意思,上面的示例中赋值为8,即表明从长度字段后开端,跳过8个字节后,才是正文数据的开端。接纳方在解码数据时,首要会从0开端读取四个字节,得到正文数据的长度为10,接着会依据lengthAdjustment参数,跳过中心8个的字节,终究再往后读10个字节数据,然后得到终究的正文数据。

OK~,通过上述几个示例的解说后,信任我们对给出的几个参数都有所了解,如若觉得有些晕乎,可回头再多仔细阅读几遍,这样有助于加深对各个参数的印象。但实质上来说,LTC解码器,便是依据这些参数,来承认一条数据的长度、位置,然后读取到精确的数据,防止粘包、半包的现象发生,接下来上个Demo了解:

// 通过LTC帧解码器处理粘包、半包问题的演示类
public class LTCDecoderDemo {
public static void main(String[] args) {
    // 通过Netty供给的测验通道来代替服务端、客户端
    EmbeddedChannel channel = new EmbeddedChannel(
            // 增加一个行帧解码器(在超出1024后还未检测到换行符,就会中止读取)
            new LengthFieldBasedFrameDecoder(1024,0,4,0,0),
            new LoggingHandler(LogLevel.DEBUG)
    );
    // 调用三次发送数据的办法(等价于向服务端发送三次数据)
    sendData(channel,"Hi, ZhuZi.");
}
    private static void sendData(EmbeddedChannel channel, String data){
        // 获取要发送的数据字节以及长度
        byte[] dataBytes = data.getBytes();
        int dataLength = dataBytes.length;
        // 先将数据长度写入到缓冲区、再将正文数据写入到缓冲区
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
        buffer.writeInt(dataLength);
        buffer.writeBytes(dataBytes);
        // 发送终究组装好的数据
        channel.writeInbound(buffer);
    }
}

上述事例中创立了一个LTC解码器,对应的参数值为1024,0,4,0,0,这别离对应前面的五个参数,如下:

maxFrameLength = 1024
lengthFieldOffset = 0
lengthFieldLength = 4
lengthAdjustment = 0
initialBytesToStrip = 0

这组值意思为:数据的第0~4个字节是长度字段,用来描绘正文数据的长度,运转成果如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

作用非常显着,既没有发生粘包、半包问题,并且无需逐一字节判别是否为切开符,这比照之前的几种解码器而言,这种办法的功率显着好上特别特别多。当然,上述成果中,假如想要去掉前面的四个.,就只需求将initialBytesToStrip = 4即可,从头部剥离掉四个字节再读取。

1.3.6、粘包、半包处理计划小结

前面介绍了短衔接、定长解码器、行解码器、分隔符解码器以及LTC解码器这五种计划,其间我们需求紧记的是终究一种,由于其他的计划多少存在一些功用问题,而通过LTC解码器这种办法处理粘包、半包问题的功率最好,由于无需逐一字节判别音讯鸿沟。

但实践Netty开发中,假如其他解码器更契合事务需求,也不用死死追求运用LTC解码器,究竟技能为事务供给服务,适合自己事务的,才是最好的!

二、Netty的长衔接与心跳机制

关于长衔接、短衔接,这个概念在前面稍有提及,所谓的短衔接便是每次读写数据完成后,立马断开客户端与服务端的网络衔接。而长衔接则是相反的意思,一次数据交互完成后,服务端和客户端之间持续坚持衔接,当后续需再次收/发数据时,可直接复用原有的网络衔接。

长衔接这种模式,在并发较高的状况下可以带来额定的功用收益,由于Netty服务端、客户端绑定IP端口,搭建Channel通道的进程,放到底层实践上便是TCP三次握手的进程,同理,客户端、服务端断开衔接的进程,即对应着TCP的四次挥手。

我们都知道,TCP三次握手/四次挥手,这个进程无疑是比较“重量级”的,并发状况下,频繁创立、毁掉网络衔接,其资源开支、功用开支会比较大,所以运用长衔接的计划,可以有用削减创立和毁掉网络衔接的动作。

那怎么让Netty敞开长衔接支撑呢?这需求涉及到之前用过的ChannelOption这个类,接着来具体讲讲它。

2.1、Netty调整网络参数(ChannelOption)

ChannelOptionNetty供给的参数调整类,该类中供给了很多常量,别离对应着底层TCP、UDP、计算机网络的一些参数,在创立服务端、客户端时,我们可以通过ChannelOption类来调整网络参数,以此满意不同的事务需求,该类中供给的常量列表如下:

  • ALLOCATORByteBuf缓冲区的分配器,默许值为ByteBufAllocator.DEFAULT
  • RCVBUF_ALLOCATOR:通道接纳数据的ByteBuf分配器,默许为AdaptiveRecvByteBufAllocator.DEFAULT
  • MESSAGE_SIZE_ESTIMATOR:音讯巨细估算器,默许为DefaultMessageSizeEstimator.DEFAULT
  • CONNECT_TIMEOUT_MILLIS:设置客户端的衔接超时时刻,默许为3000ms,超出会断开衔接。
  • MAX_MESSAGES_PER_READ:一次Loop最大读取的音讯数。
    • ServerChannel/NioChannel默许16,其他类型的Channel默许为1
  • WRITE_SPIN_COUNT:一次Loop最大写入的音讯数,默许为16
    • 一个数据16次还未写完,需求提交一个新的任务给EventLoop,防止数据量较大的场景阻塞系统。
  • WRITE_BUFFER_HIGH_WATER_MARK:写高水位符号,默许为64K,超出时Channel.isWritable()回来Flase
  • WRITE_BUFFER_LOW_WATER_MARK:写低水位符号,默许为32K,超出高水位又下降到低水位时,isWritable()回来True
  • WRITE_BUFFER_WATER_MARK:写水位符号,假如写的数据量也超出该值,仍旧回来Flase
  • ALLOW_HALF_CLOSURE:一个远程衔接封闭时,是否半关本地衔接,默许为Flase
    • Flase表明自动封闭本地衔接,为True会触发入站处理器的userEventTriggered()办法。
  • AUTO_READ:自动读取机制,默许为True,通道上有数据时,自动调用channel.read()读取数据。
  • AUTO_CLOSE:自动封闭机制,默许为Flase,发生错误时不会断开与某个通道的衔接。
  • SO_BROADCAST:设置播送机制,默许为Flase,为True时会敞开Socket的播送音讯。
  • SO_KEEPALIVE:敞开长衔接机制,一次数据交互完后不会立马断开衔接。
  • SO_SNDBUF:发送缓冲区,用于保存要发送的数据,未收到接纳数据的ACK之前,数据会存在这儿。
  • SO_RCVBUF:接受缓冲区,用户保存要接受的数据。
  • SO_REUSEADDR:是否复用IP地址与端口号,敞开后可重复绑定同一个地址。
  • SO_LINGER:设置延迟封闭,默许为-1
    • -1:表明禁用该功用,当调用close()办法后会当即回来,底层会先处理完数据。
    • 0:表明禁用该功用,调用后当即回来,底层会直接放弃正在处理的数据。
    • 大于0的正整数:封闭时等候n秒,或数据处理完成才正式封闭。
  • SO_BACKLOG:指定服务端的衔接行列长度,当衔接数达到该值时,会拒绝新的衔接恳求。
  • SO_TIMEOUT:设置接受数据时等候的超时时刻,默许为0,表明无限等候。
  • IP_TOS
  • IP_MULTICAST_ADDR:设置IP头的Type-of-Service字段,描绘IP包的优先级和QoS选项。
  • IP_MULTICAST_IF:对应IP参数IP_MULTICAST_IF,设置对应地址的网卡为多播模式。
  • IP_MULTICAST_TTL:对应IP参数IP_MULTICAST_IF2,同上但支撑IPv6
  • IP_MULTICAST_LOOP_DISABLED:对应IP参数IP_MULTICAST_LOOP,设置本地回环地址的多播模式。
  • TCP_NODELAY:敞开TCPNagle算法,会将多个小包兼并成一个大包发送。
  • DATAGRAM_CHANNEL_ACTIVE_ON_REGISTRATIONDatagramChannel注册的EventLoop即表明已激活。
  • SINGLE_EVENTEXECUTOR_PER_GROUPPipeline是否由单线程执行,默许为True,一切处理器由一条线程执行,无需通过线程上下文切换。

上面列出了ChannelOption类中供给的参数,其间包括了网络通用的参数、TCP协议、UDP协议以及IP协议的参数,其他的我们无需过多关怀,这儿要点留意TCP协议的两个参数:

  • TCP_NODELAY:敞开TCPNagle算法,会将多个小包兼并成一个大包发送。
  • SO_KEEPALIVE:敞开长衔接机制,一次数据交互完后不会立马断开衔接。

第一个参数便是之前聊到的Nagle算法,而关于现在要聊的长衔接,便是SO_KEEPALIVE这个参数,想要让这些参数收效,需求将其装载到对应的服务端/客户端上,Netty中供给了两个装载参数的办法:

  • option():发生在衔接初始化阶段,也便是程序初始化时,就会装载该办法装备的参数。
  • childOption():发生在衔接树立之后,这些参数只要等衔接树立后才会被装载。

其实也可以这样了解,option()办法装备的参数是对大局收效的,而childOption()装备的参数,是针关于衔接收效的,而想要敞开长衔接装备,只需稍微改造一下服务端/客户端代码即可:

// 服务端代码
server.childOption(ChannelOption.SO_KEEPALIVE, true);
// 客户端代码
client.option(ChannelOption.SO_KEEPALIVE, true);

通过上述的办法敞开长衔接之后,TCP默许每两小时会发送一次心跳检测,查看对端是否还存活,假如对端由于网络故障导致下线,TCP会自动断开与对方的衔接。

2.2、Netty的心跳机制

前面聊到了Netty的长衔接,其实实质上并不是Netty供给的长衔接完成,而是通过调整参数,凭借传输层TCP协议供给的长衔接机制,然后完成服务端与客户端的长衔接支撑。不过TCP尽管供给了长衔接支撑,但其心跳机制并不够完善,Why?其实答案很简略,由于心跳检测的距离时刻太长了,每隔两小时才检测一次!

或许有人会说:两小时就两小时,这有什么问题吗?其实问题有些大,由于两小时太长了,无法有用检测到机房断电、机器重启、网线拔出、防火墙更新等状况,假定一次心跳完毕后,对端就呈现了这些故障,依托TCP自身的心跳频率,需求等到两小时之后才能检测到问题。而这些现已失效的衔接应当及时除掉,不然会长时刻占用服务端资源,究竟服务端的可用衔接数是有限的。

所以,光依托TCP的心跳机制,这无法保障我们的使用稳健性,因而一般开发中心件也好、通信程序也罢、亦或是RPC结构等,都会在使用层再自完成一次心跳机制,而所谓的心跳机制,也并不是特别巨大上的东西,完成的思路有两种:

  • 服务端自动勘探:每距离必定时刻后,向一切客户端发送一个检测信号,进程如下:
    • 假定现在有三个节点,A为服务端,B、C都为客户端。
      • A:你们还活着吗?
      • B:我还活着!
      • C:…..(假定挂掉了,无呼应)
    • A收到了B的呼应,但C却未给出呼应,很有或许挂了,A中止与C的衔接。
  • 客户端自动奉告:每距离必定时刻后,客户端向服务端发送一个心跳包,进程如下:
    • 仍旧是上述那三个节点。
    • B:我还活着,不要开除我!
    • C:….(假定挂掉了,不发送心跳包)
    • A:收到B的心跳包,但未收到C的心跳包,将C的网络衔接断开。

一般来说,一套健全的心跳机制,都会结合上述两种计划一同完成,也便是客户端定时向服务端发送心跳包,当服务端未收到某个客户端心跳包的状况下,再自意向客户端建议勘探包,这一步首要是做二次承认,防止由于网络拥塞或其他问题,导致本来客户端宣布的心跳包丢失。

2.2.1、心跳机制的完成思路分析

前面叨叨絮絮说了很多,那么在Netty中该怎么完成呢?其实在Netty中供给了一个名为IdleStateHandler的类,它可以对一个通道上的读、写、读/写操作设置定时器,其间首要供给了三种类型的心跳检测:

// 当一个Channel(Socket)在指定时刻后未触发读事情,会触发这个事情
public static final IdleStateEvent READER_IDLE_STATE_EVENT;
// 当一个Channel(Socket)在指定时刻后未触发写事情,会触发这个事情
public static final IdleStateEvent WRITER_IDLE_STATE_EVENT;
// 上述读、写等候事情的结合体
public static final IdleStateEvent ALL_IDLE_STATE_EVENT;

Netty中,当一个已树立衔接的通道,超出指定时刻后还没有呈现数据交互,对应的Channel就会进入搁置Idle状况,依据不同的Socket/Channel事情,会进入不同的搁置状况,而不同的搁置状况又会触发不同的搁置事情,也便是上述说到的三种搁置事情,在Netty顶用IdleStateEvent事情类来表明。

OK,正是由于Netty供给了IdleStateEvent搁置事情类,所以我们可以依据它来完成心跳机制,但这儿还需求用到《Netty入门篇-入站处理器》中聊到的一个办法:userEventTriggered(),这个钩子办法,会在通道触发恣意事情后被调用,这也就意味着:只要通道上触发了事情,都会触发该办法执行,搁置事情也不破例

有了IdleState、userEventTriggered()这两个根底后,我们就可依据这两个玩意儿,去完成一个简略的心跳机制,最根本的功用完成如下:

  • 客户端:在搁置必定时刻后,可以自动给服务端发送心跳包。
  • 服务端:可以自动检测到未发送数据包的搁置衔接,并中止衔接。

2.2.2、带有心跳机制的客户端完成

上述这两点功用完成起来并不难,我们首要写一下客户端的完成,如下:

// 心跳机制的客户端处理器
public class HeartbeatClientHandler extends ChannelInboundHandlerAdapter {
    // 通用的心跳包数据
    private static final ByteBuf HEARTBEAT_DATA =
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("I am Alive", CharsetUtil.UTF_8));
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        // 假如当时触发的事情是搁置事情
        if (event instanceof IdleStateEvent) {
            IdleStateEvent idleEvent = (IdleStateEvent) event;
            // 假如当时通道触发了写搁置事情
            if (idleEvent.state() == IdleState.WRITER_IDLE){
                // 表明当时客户端有一段时刻未向服务端发送数据了,
                // 为了防止服务端封闭当时衔接,手动发送一个心跳包
                ctx.channel().writeAndFlush(HEARTBEAT_DATA.duplicate());
                System.out.println("成功向服务端发送心跳包....");
            } else {
                super.userEventTriggered(ctx, event);
            }
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("正在与服务端树立衔接....");
        // 树立衔接成功之后,先向服务端发送一条数据
        ctx.channel().writeAndFlush("我是会发心跳包的客户端-A!");
        super.channelActive(ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("服务端自动封闭了衔接....");
        super.channelInactive(ctx);
    }
}

由于要凭借userEventTriggered()办法来完成事情监听,所以我们需求定义一个类承继入站处理器,接着在其间做了一个判别,假如当时触发了IdleStateEvent搁置事情,这也就意味着现在没有向服务端发送数据了,因而需求发送一个心跳包,奉告服务端自己还活着,接着需求将这个处理器加在客户端上面,如下:

// 演示心跳机制的客户端(会发送心跳包)
public class ClientA {
    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker);
            client.channel(NioSocketChannel.class);
            // 打开长衔接装备
            client.option(ChannelOption.SO_KEEPALIVE, true);
            // 指定一个自定义的初始化器
            client.handler(new ClientInitializer());
            client.connect("127.0.0.1", 8888).sync();
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}
// 客户端的初始化器
public class ClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 装备假如3s内未触发写事情,就会触发写搁置事情
        pipeline.addLast("IdleStateHandler", 
                new IdleStateHandler(0,3,0,TimeUnit.SECONDS));
        pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
        // 装载自定义的客户端心跳处理器
        pipeline.addLast("HeartbeatHandler",new HeartbeatClientHandler());
    }
}

客户端的代码根本上和之前的事例差异不大,要点看ClientInitializer这个初始化器,里边首要加入了一个IdleStateHandler,参数为0、3、0,单位是秒,这是啥意思呢?点进源码看看结构函数,如下:

public IdleStateHandler(long readerIdleTime,
                        long writerIdleTime, 
                        long allIdleTime, 
                        TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

没错,其实赋值的三个参数,也就别离对应着读操作的搁置事情、写操作的搁置事情、读写操作的搁置事情,假如赋值为0,表明这些搁置事情不需求关怀,在前面的赋值中,第二个参数writerIdleTime被我们赋值成了3,这表明假如客户端通道在三秒内,未触发写事情,就会触发写搁置事情,而后会调用HeartbeatClientHandler.userEventTriggered()办法,然后向服务端发送一个心跳包。

2.2.3、带有心跳机制的服务端完成

接着再来看看服务端的代码完成,相同需求有一个心跳处理器,如下:

// 心跳机制的服务端处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
        // 假如当时触发的事情是搁置事情
        if (event instanceof IdleStateEvent) {
            IdleStateEvent idleEvent = (IdleStateEvent) event;
            // 假如对应的Channel通道触发了读搁置事情
            if (idleEvent.state() == IdleState.READER_IDLE){
                // 表明对应的客户端没有发送心跳包,则封闭对应的网络衔接
                // (心跳包也是一种特别的数据,会触发读事情,有心跳就不会进这步)
                ctx.channel().close();
                System.out.println("封闭了未发送心跳包的衔接....");
            } else {
                super.userEventTriggered(ctx, event);
            }
        }
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 假如收到的是心跳包,则给客户端做出一个回复
        if ("I am Alive".equals(msg)){
            ctx.channel().writeAndFlush("I know");
        }
        System.out.println("收到客户端音讯:" + msg);
        super.channelRead(ctx, msg);
    }
}

Server端的心跳处理器中,相同监听了搁置事情,但这儿监听的是读搁置事情,由于一个通道假如长时刻没有触发读事情,这表明对应的客户端现已很长事情没有发数据了,所以需求封闭对应的客户端衔接。

有小伙伴或许会疑惑:为什么一个客户端通道长时刻未发送数据就需求封闭衔接呀?这不是违背了长衔接的初衷吗?答案并非如此,由于前面在我们的客户端中,在通道长时刻未触发写事情的状况下,会自意向服务端发送心跳包,而心跳包也是一种特别的数据包,仍旧会触发服务端上的读事情,所以凡是正常发送心跳包的衔接,都不会被服务端自动封闭。

OK,接着来看看服务端的完成,其实和前面的客户端差不多:

// 演示心跳机制的服务端
public class Server {
    public static void main(String[] args) {
        EventLoopGroup group = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(group);
        server.channel(NioServerSocketChannel.class);
        // 在这儿敞开了长衔接装备,以及装备了自定义的初始化器
        server.childOption(ChannelOption.SO_KEEPALIVE, true);
        server.childHandler(new ServerInitializer());
        server.bind("127.0.0.1",8888);
    }
}
// 服务端的初始化器
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // 装备假如5s内未触发读事情,就会触发读搁置事情
        pipeline.addLast("IdleStateHandler", 
                new IdleStateHandler(5,0,0,TimeUnit.SECONDS));
        pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
        // 装载自定义的服务端心跳处理器
        pipeline.addLast("HeartbeatHandler",new HeartbeatServerHandler());
    }
}

要点留意看:在服务端装备的是读搁置事情,假如在5s内未触发读事情,就会触发对应通道的读搁置事情,但这儿是5s,为何不装备成客户端的3s呢?由于假如两头的搁置超时时刻装备成相同,就会形成客户端正在发心跳包、服务端正在封闭衔接的这种状况呈现,终究导致心跳机制无法正常作业,关于这点我们也可以自行演示。

2.2.4、普通的客户端完成

终究,为了方便观看作用,这儿我们再创立一个不会发送心跳包的客户端B,相同打开它的长衔接选项,然后来比照测验作用,如下:

// 演示心跳机制的客户端(不会发送心跳包)
public class ClientB {
    public static void main(String[] args) {
        EventLoopGroup worker = new NioEventLoopGroup();
        Bootstrap client = new Bootstrap();
        try {
            client.group(worker);
            client.channel(NioSocketChannel.class);
            client.option(ChannelOption.SO_KEEPALIVE, true);
            client.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) 
                                                        throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    pipeline.addLast("Encoder",new StringEncoder(CharsetUtil.UTF_8));
                    pipeline.addLast("Decoder",new StringDecoder(CharsetUtil.UTF_8));
                    pipeline.addLast(new ChannelInboundHandlerAdapter(){
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) 
                                                            throws Exception {
                            // 树立衔接成功之后,先向服务端发送一条数据
                            ctx.channel().writeAndFlush("我是不会发心跳包的客户端-B!");
                        }
                        @Override
                        public void channelInactive(ChannelHandlerContext ctx) 
                                                            throws Exception {
                            System.out.println("由于没发送心跳包,俺被开除啦!");
                            // 当通道被封闭时,中止前面发动的线程池
                            worker.shutdownGracefully();
                        }
                    });
                }
            });
            client.connect("127.0.0.1", 8888).sync();
        } catch (Exception e){
            e.printStackTrace();
        }
    }
}

上述这段代码中,仅构建出了一个最根本的客户端,其间首要干了两件事情:

  • ①在衔接树立成功之后,先向服务端发送一条数据。
  • ②在衔接(通道)被封闭时,输出一句“俺被开除啦!”的信息,并高雅中止线程池。

除此之外,该客户端并未装载自己完成的客户端心跳处理器,这也就意味着:客户端B并不会自动给服务端发送心跳包。

2.2.5、Netty心跳机制测验

接着别离发动服务端、客户端A、客户端B,然后查看操控台的日志,如下:

(十)Netty进阶篇:漫谈网络粘包、半包问题、解码器与长连接、心跳机制实战

从上图的运转成果来看,在三方发动之后,整体进程如下:

  • ClientA:先与服务端树立衔接,并且在树立衔接之后发送一条数据,后续持续发送心跳包。
  • ClientB:先与服务端树立衔接,然后在树立衔接成功后发送一条数据,后续不会再发数据。
  • Server:与ClientA、B坚持衔接,然后定时检测搁置衔接,封闭未发送心跳包的衔接。

在上述这个进程中,由于ClientB树立衔接后,未自意向服务端发送心跳包,所以在一段时刻之后,服务端自动将ClientB的衔接(通道)封闭了,有人会问:明明ClientB还活着呀,这样做合理吗?

其实这个问题是合理的,由于这儿仅仅模仿线上环境测验,所以ClientB没有自动发送数据包,但在线上环境,每个客户端都会定时向服务端发送心跳包,都会为每个客户端装备心跳处理器。在都装备了心跳处理器的状况下,假如一个客户端长时刻没发送心跳包,这意味着这个客户端十有八九凉凉了,所以天然需求将其封闭,防止这类“废弃衔接”占用服务端资源。

不过上述的心跳机制仅完成了最根底的版别,还未完全将其完善,但我这儿就不持续往下完成了,究竟主干现已搭建好了,剩下的仅仅一些细枝末节,我这儿提几点完善思路:

  • ①在检测到某个客户端未发送心跳包的状况下,服务端应当自动再建议一个勘探包,二次承认客户端是否真的挂了,这样做的好处在于:可以有用防止网络抖动形成的“客户端假死”现象。
  • ②客户端、服务端之间交互的数据包,应当选用一致的格式进行封装,也便是都恪守同一规范包装数据,例如{msgType:"Heartbeat", msgContent:"...", ...}
  • ③在客户端被封闭的状况下,凡是不是由于物理因素,如机房断电、网线被拔、机器宕机等状况形成的客户端下线,客户端都必须具有断线重连功用。

将上述三条完善后,才可以被称为是一套相对健全的心跳检测机制,所以我们感兴趣的状况下,可依据前面给出的源码接着完成~

三、Netty进阶篇总结

在这章节中,我们一点一滴的将粘包、半包、解码器、长衔接、心跳机制等一些进阶技能做了具体论述,其实本来计划将这章内容也写在《Netty入门篇》中的,但由于篇幅过长,所以不得不将其拆出来写,看完这篇中止,也就意味着我们对Netty这个结构有了根本认知。

但实践网络使用、中心件、RPC、根底架构等开发进程中,想要用Netty也并非易事,所以在接下来的章节中,会依据Netty解说多个实战事例,然后让诸位真正可以将Netty这个技能栈纳入囊中!