Netty 概述

1、什么是 Netty

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty 是一个异步的、依据事情驱动的网络运用框架,用于快速开发可保护、高功用的网络服务器和客户端

留意:netty的异步还是依据多路复用的,并没有完成真实意义上的异步IO

2、Netty 的优势

假如运用传统 NIO,其作业量大,bug 多

  • 需求自己构建协议
  • 处理 TCP 传输问题,如粘包、半包
  • 由于 bug 的存在,epoll 空轮询导致 CPU 100%

Netty 对 API 进行增强,使之更易用,如

  • FastThreadLocal => ThreadLocal
  • ByteBuf => ByteBuffer

3、入门事例

1、服务器端代码

public class HelloServer {
    public static void main(String[] args) {
        // 1、发动器,担任装配netty组件,发动服务器
        new ServerBootstrap()
                // 2、创立 NioEventLoopGroup,能够简单理解为 线程池 + Selector
                .group(new NioEventLoopGroup())
                // 3、挑选服务器的 ServerSocketChannel 完成
                .channel(NioServerSocketChannel.class)
                // 4、child 担任处理读写,该办法决定了 child 履行哪些操作
            	// ChannelInitializer 处理器(仅履行一次)
            	// 它的作用是待客户端 SocketChannel 树立衔接后,履行 initChannel 以便增加更多的处理器
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 5、SocketChannel的处理器,运用StringDecoder解码,ByteBuf=>String
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 6、SocketChannel的事务处理,运用上一个处理器的处理成果
                        nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                System.out.println(s);
                            }
                        });
                    }
                    // 7、ServerSocketChannel绑定8080端口
                }).bind(8080);
    }
}

2、客户端代码

public class HelloClient {
    public static void main(String[] args) throws InterruptedException {
        new Bootstrap()
                .group(new NioEventLoopGroup())
                // 挑选客户 Socket 完成类,NioSocketChannel 表示依据 NIO 的客户端完成
                .channel(NioSocketChannel.class)
                // ChannelInitializer 处理器(仅履行一次)
                // 它的作用是待客户端SocketChannel树立衔接后,履行initChannel以便增加更多的处理器
                .handler(new ChannelInitializer<Channel>() {
                    @Override
                    protected void initChannel(Channel channel) throws Exception {
                        // 音讯会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
                        channel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 指定要衔接的服务器和端口
                .connect(new InetSocketAddress("localhost", 8080))
                // Netty 中许多办法都是异步的,如 connect
                // 这时需求运用 sync 办法等候 connect 树立衔接完毕
                .sync()
                // 获取 channel 目标,它即为通道笼统,能够进行数据读写操作
                .channel()
                // 写入音讯并清空缓冲区
                .writeAndFlush("hello world");
    }
}

3、运转流程

左:客户端 右:服务器端

拆开Netty,我发现了这个8个从来没见过的东西?

组件解释

  • channel 能够理解为数据的通道
  • msg 理解为流动的数据,最开端输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型目标,最后输出又变成 ByteBuf
  • handler 能够理解为数据的处理工序
    • 工序有多道,合在一起便是 pipeline(传递途径),pipeline 担任发布事情(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事情进行处理(重写了相应事情处理办法)

      • pipeline 中有多个 handler,处理时会顺次调用其间的 handler
    • handler 分 Inbound 和 Outbound 两类

      • Inbound 入站
    • Outbound 出站

  • eventLoop 能够理解为处理数据的工人
    • eventLoop 能够办理多个 channel 的 io 操作,而且一旦 eventLoop 担任了某个 channel,就会将其与 channel 进行绑定,今后该 channel 中的 io 操作都由该 eventLoop 担任
    • eventLoop 既能够履行 io 操作,也能够进行任务处理,每个 eventLoop 有自己的任务行列,行列里能够堆积多个 channel 的待处理任务,任务分为一般任务、定时任务
    • eventLoop 按照 pipeline 次序,顺次按照 handler 的规划(代码)处理数据,能够为每个 handler 指定不同的 eventLoop

1、EventLoop

事情循环目标 EventLoop

EventLoop 实质是一个单线程履行器(一起保护了一个 Selector),里边有 run 办法处理一个或多个 Channel 上连绵不断的 io 事情

它的承继联系如下

  • 承继自 j.u.c.ScheduledExecutorService 因而包含了线程池中一切的办法

  • 承继自 netty 自己的 OrderedEventExecutor

    • 供给了 boolean inEventLoop (Thread thread) 办法判断一个线程是否归于此 EventLoop

    • 供给了 EventLoopGroup parent () 办法来看看自己归于哪个 EventLoopGroup

事情循环组 EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 办法来绑定其间一个 EventLoop,后续这个 Channel 上的 io 事情都由此 EventLoop 来处理(确保了 io 事情处理时的线程安全)

  • 承继自 netty 自己的 EventExecutorGroup
    • 完成了 Iterable 接口供给遍历 EventLoop 的才干
    • 另有 next 办法获取集合中下一个 EventLoop

1.1 处理一般与定时任务

public class TestEventLoop {
    public static void main(String[] args) {
        // 创立拥有两个EventLoop的NioEventLoopGroup,对应两个线程
        EventLoopGroup group = new NioEventLoopGroup(2);
        // 经过next办法能够取得下一个 EventLoop
        System.out.println(group.next());
        System.out.println(group.next());
        // 经过EventLoop履行一般任务
        group.next().execute(()->{
            System.out.println(Thread.currentThread().getName() + " hello");
        });
        // 经过EventLoop履行定时任务
        group.next().scheduleAtFixedRate(()->{
            System.out.println(Thread.currentThread().getName() + " hello2");
        }, 0, 1, TimeUnit.SECONDS);
        // 优雅地封闭
        group.shutdownGracefully();
    }
}

输出成果如下

io.netty.channel.nio.NioEventLoop@7bb11784
io.netty.channel.nio.NioEventLoop@33a10788
nioEventLoopGroup-2-1 hello
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2
nioEventLoopGroup-2-2 hello2

封闭 EventLoopGroup

优雅封闭 shutdownGracefully 办法。该办法会首要切换 EventLoopGroup 到封闭状态然后拒绝新的任务的参加,然后在任务行列的任务都处理完成后,停止线程的运转。然后确保整体运用是在正常有序的状态下退出的

1.2 处理 IO 任务

服务器代码

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

客户端代码

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        Channel channel = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()
                .channel();
        System.out.println(channel);
        // 此处打断点调试,调用 channel.writeAndFlush(...);
        System.in.read();
    }
}

1.3 分工

Bootstrap 的 group () 办法能够传入两个 EventLoopGroup 参数,别离担任处理不同的事情

public class MyServer {
    public static void main(String[] args) {
        new ServerBootstrap()
            	// 两个Group,别离为Boss 担任Accept事情,Worker 担任读写事情
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
				...
    }
}

多个客户端别离发送 hello 成果

nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

能够看出,一个 EventLoop 能够担任多个 Channel,且 EventLoop 一旦与 Channel 绑定,则一向担任处理该 Channel 中的事情

拆开Netty,我发现了这个8个从来没见过的东西?

增加自定义 EventLoopGroup

当有的任务需求较长的时间处理时,能够运用非 NioEventLoopGroup,防止同一个 NioEventLoop 中的其他 Channel 在较长的时间内都无法得到处理

   public class MyServer {
    public static void main(String[] args) {
        // 增加自定义的非NioEventLoopGroup
        EventLoopGroup group = new DefaultEventLoopGroup();
        new ServerBootstrap()
                .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 增加两个handler,第一个运用NioEventLoopGroup处理,第二个运用自定义EventLoopGroup处理
                        socketChannel.pipeline().addLast("nioHandler",new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                // 调用下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 该handler绑定自定义的Group
                        .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

发动四个客户端发送数据

nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

能够看出,客户端与服务器之间的事情,被 nioEventLoopGroup 和 defaultEventLoopGroup 别离处理

拆开Netty,我发现了这个8个从来没见过的东西?

切换的完成

不同的 EventLoopGroup 切换的完成原理如下

由上面的图能够看出,当 handler 中绑定的 Group 不一起,需求切换 Group 来履行不同的任务

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 取得下一个EventLoop, excutor 即为 EventLoopGroup
    EventExecutor executor = next.executor();
    // 假如下一个EventLoop 在当时的 EventLoopGroup中
    if (executor.inEventLoop()) {
        // 运用当时 EventLoopGroup 中的 EventLoop 来处理任务
        next.invokeChannelRead(m);
    } else {
        // 不然让另一个 EventLoopGroup 中的 EventLoop 来创立任务并履行
        executor.execute(new Runnable() {
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}
  • 假如两个 handler 绑定的是同一个 EventLoopGroup,那么就直接调用
  • 不然,把要调用的代码封装为一个任务目标,由下一个 handler 的 EventLoopGroup 来调用

2、Channel

Channel 的常用办法

  • close () 能够用来封闭 Channel
  • closeFuture () 用来处理 Channel 的封闭
    • sync 办法作用是同步等候 Channel 封闭
    • 而 addListener 办法是异步等候 Channel 封闭
  • pipeline () 办法用于增加处理器
  • write () 办法将数据写入
    • 由于缓冲机制,数据被写入到 Channel 中今后,不会当即被发送
    • 只有当缓冲满了或者调用了 flush () 办法后,才会将数据经过 Channel 发送出去
  • writeAndFlush () 办法将数据写入并当即发送(刷出)

2.1 ChannelFuture

衔接问题

拆分客户端代码

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该办法为异步非堵塞办法,主线程调用后不会被堵塞,真实去履行衔接操作的是NIO线程
            	// NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
        // 该办法用于等候衔接真实树立
        channelFuture.sync();
        // 获取客户端-服务器之间的Channel目标
        Channel channel = channelFuture.channel();
        channel.writeAndFlush("hello world");
        System.in.read();
    }
}

假如咱们去掉 channelFuture.sync() 办法,会服务器无法收到 hello world

这是由于树立衔接 (connect) 的进程是 异步非堵塞 的,若不经过 sync() 办法堵塞主线程,等候衔接真实树立,这时经过 channelFuture.channel () 拿到的 Channel 目标,并不是真实与服务器树立好衔接的 Channel,也就没法将信息正确的传输给服务器端

所以需求经过 channelFuture.sync() 办法,堵塞主线程,同步处理成果,等候衔接真实树立好今后,再去取得 Channel 传递数据。运用该办法,获取 Channel 和发送数据的线程 都是主线程

下面还有一种办法,用于 异步 获取树立衔接后的 Channel 和发送数据,使得履行这些操作的线程是 NIO 线程(去履行 connect 操作的线程)

addListener 办法

经过这种办法能够在 NIO 线程中获取 Channel 并发送数据,而不是在主线程中履行这些操作

public class MyClient {
    public static void main(String[] args) throws IOException, InterruptedException {
        ChannelFuture channelFuture = new Bootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                // 该办法为异步非堵塞办法,主线程调用后不会被堵塞,真实去履行衔接操作的是NIO线程
                // NIO线程:NioEventLoop 中的线程
                .connect(new InetSocketAddress("localhost", 8080));
		// 当connect办法履行完毕后,也便是衔接真实树立后
        // 会在NIO线程中调用operationComplete办法
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                Channel channel = channelFuture.channel();
                channel.writeAndFlush("hello world");
            }
        });
        System.in.read();
    }
}

处理封闭

public class ReadClient {
    public static void main(String[] args) throws InterruptedException {
        // 创立EventLoopGroup,运用完毕后封闭
        NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new StringEncoder());
                    }
                })
                .connect(new InetSocketAddress("localhost", 8080));
        channelFuture.sync();
        Channel channel = channelFuture.channel();
        Scanner scanner = new Scanner(System.in);
        // 创立一个线程用于输入并向服务器发送
        new Thread(()->{
            while (true) {
                String msg = scanner.next();
                if ("q".equals(msg)) {
                    // 封闭操作是异步的,在NIO线程中履行
                    channel.close();
                    break;
                }
                channel.writeAndFlush(msg);
            }
        }, "inputThread").start();
        // 取得closeFuture目标
        ChannelFuture closeFuture = channel.closeFuture();
        System.out.println("waiting close...");
        // 同步等候NIO线程履行完close操作
        closeFuture.sync();
        // 封闭之后履行一些操作,能够确保履行的操作一定是在channel封闭今后履行的
        System.out.println("封闭之后履行一些额定操作...");
        // 封闭EventLoopGroup
        group.shutdownGracefully();
    }
}

封闭channel

当咱们要封闭 channel 时,能够调用 channel.close () 办法进行封闭。但是该办法也是一个异步办法。真实的封闭操作并不是在调用该办法的线程中履行的,而是在 NIO 线程中履行真实的封闭操作

假如咱们想在 channel 真实封闭今后,履行一些额定的操作,能够挑选以下两种办法来完成

  • 经过 channel.closeFuture () 办法取得对应的 ChannelFuture 目标,然后调用 sync () 办法堵塞履行操作的线程,等候 channel 真实封闭后,再履行其他操作
// 取得closeFuture目标
ChannelFuture closeFuture = channel.closeFuture();
// 同步等候NIO线程履行完close操作
closeFuture.sync();
  • 调用 closeFuture.addListener 办法,增加 close 的后续操作
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        // 等候channel封闭后才履行的操作
        System.out.println("封闭之后履行一些额定操作...");
        // 封闭EventLoopGroup
        group.shutdownGracefully();
    }
});

3、Future 与 Promise

3.1 概念

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口

netty 的 Future 承继自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等候任务完毕(或成功、或失败)才干得到成果

  • netty Future 能够同步等候任务完毕得到成果,也能够异步办法得到成果,但都是要等任务完毕

  • netty Promise 不仅有 netty Future 的功用,而且脱离了任务独立存在,只作为两个线程间传递成果的容器

拆开Netty,我发现了这个8个从来没见过的东西?

3.2 JDK Future

public class JdkFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ThreadFactory factory = new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "JdkFuture");
            }
        };
        // 创立线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);
        // 取得Future目标
        Future<Integer> future = executor.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return 50;
            }
        });
        // 经过堵塞的办法,取得运转成果
        System.out.println(future.get());
    }
}

3.3 Netty Future

public class NettyFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        NioEventLoopGroup group = new NioEventLoopGroup();
        // 取得 EventLoop 目标
        EventLoop eventLoop = group.next();
        Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return 50;
            }
        });
        // 主线程中获取成果
        System.out.println(Thread.currentThread().getName() + " 获取成果");
        System.out.println("getNow " + future.getNow());
        System.out.println("get " + future.get());
        // NIO线程中异步获取成果
        future.addListener(new GenericFutureListener<Future<? super Integer>>() {
            @Override
            public void operationComplete(Future<? super Integer> future) throws Exception {
                System.out.println(Thread.currentThread().getName() + " 获取成果");
                System.out.println("getNow " + future.getNow());
            }
        });
    }
}

运转成果

main 获取成果
getNow null
get 50
nioEventLoopGroup-2-1 获取成果
getNow 50

Netty 中的 Future 目标,能够经过 EventLoop 的 sumbit () 办法得到

  • 能够经过 Future 目标的 get 办法,堵塞地获取回来成果

  • 也能够经过 getNow 办法,获取成果,若还没有成果,则回来 null,该办法对错堵塞的

  • 还能够经过 future.addListener 办法,在 Callable 办法履行的线程中,异步获取回来成果

3.4 Netty Promise

Promise 相当于一个容器,能够用于寄存各个线程中的成果,然后让其他线程去获取该成果

public class NettyPromise {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 创立EventLoop
        NioEventLoopGroup group = new NioEventLoopGroup();
        EventLoop eventLoop = group.next();
        // 创立Promise目标,用于寄存成果
        DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
        new Thread(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 自定义线程向Promise中寄存成果
            promise.setSuccess(50);
        }).start();
        // 主线程从Promise中获取成果
        System.out.println(Thread.currentThread().getName() + " " + promise.get());
    }
}

4、Handler 与 Pipeline

4.1 Pipeline

public class PipeLineServer {
    public static void main(String[] args) {
        new ServerBootstrap()
                .group(new NioEventLoopGroup())
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 在socketChannel的pipeline中增加handler
                        // pipeline中handler是带有head与tail节点的双向链表,的实践结构为
    				 	// head <-> handler1 <-> ... <-> handler4 <->tail
                        // Inbound首要处理入站操作,一般为读操作,产生入站操作时会触发Inbound办法
                        // 入站时,handler是从head向后调用的
                        socketChannel.pipeline().addLast("handler1" ,new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                                // 父类该办法内部会调用fireChannelRead
                                // 将数据传递给下一个handler
                                super.channelRead(ctx, msg);
                            }
                        });
                        socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
                                // 履行write操作,使得Outbound的办法能够得到调用
          socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
                                super.channelRead(ctx, msg);
                            }
                        });
                        // Outbound首要处理出站操作,一般为写操作,产生出站操作时会触发Outbound办法
                        // 出站时,handler的调用是从tail向前调用的
                        socketChannel.pipeline().addLast("handler3" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                                super.write(ctx, msg, promise);
                            }
                        });
                        socketChannel.pipeline().addLast("handler4" ,new ChannelOutboundHandlerAdapter(){
                            @Override
                            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                                super.write(ctx, msg, promise);
                            }
                        });
                    }
                })
                .bind(8080);
    }
}

运转成果如下

nioEventLoopGroup-2-2 Inbound handler 1
nioEventLoopGroup-2-2 Inbound handler 2
nioEventLoopGroup-2-2 Outbound handler 2
nioEventLoopGroup-2-2 Outbound handler 1

经过 channel.pipeline ().addLast (name, handler) 增加 handler 时,记得给 handler 取名字。这样能够调用 pipeline 的 addAfter、addBefore 等办法更灵敏地向 pipeline 中增加 handler

handler 需求放入通道的 pipeline 中,才干依据放入次序来运用 handler

  • pipeline 是结构是一个带有 head 与 tail 指针的双向链表,其间的节点为 handler
    • 要经过 ctx.fireChannelRead (msg) 等办法,将当时 handler 的处理成果传递给下一个 handler
  • 当有 **入站(Inbound)**操作时,会从 head 开端向后 调用 handler,直到 handler 不是处理 Inbound 操作停止
  • 当有 **出站(Outbound)**操作时,会从 tail 开端向前 调用 handler,直到 handler 不是处理 Outbound 操作停止

具体结构如下

拆开Netty,我发现了这个8个从来没见过的东西?

调用次序如下

拆开Netty,我发现了这个8个从来没见过的东西?

4.2 OutboundHandler

socketChannel.writeAndFlush()

当 handler 中调用该办法进行写操作时,会触发 Outbound 操作,此刻是从 tail 向前寻找 OutboundHandler

拆开Netty,我发现了这个8个从来没见过的东西?

ctx.writeAndFlush()

当 handler 中调用该办法进行写操作时,会触发 Outbound 操作,此刻是从当时 handler 向前寻找 OutboundHandler

拆开Netty,我发现了这个8个从来没见过的东西?

4.3 EmbeddedChannel

EmbeddedChannel 能够用于测验各个 handler,经过其构造函数按次序传入需求测验 handler,然后调用对应的 Inbound 和 Outbound 办法即可

public class TestEmbeddedChannel {
    public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                System.out.println("4");
                super.write(ctx, msg, promise);
            }
        };
        // 用于测验Handler的Channel
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 履行Inbound操作 
        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        // 履行Outbound操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
    }
}

5、ByteBuf

调试工具办法

private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
        .append("read index:").append(buffer.readerIndex())
        .append(" write index:").append(buffer.writerIndex())
        .append(" capacity:").append(buffer.capacity())
        .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

该办法能够协助咱们更为具体地检查 ByteBuf 中的内容

5.1 创立

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        ByteBufUtil.log(buffer);
        // 向buffer中写入数据
        StringBuilder sb = new StringBuilder();
        for(int i = 0; i < 20; i++) {
            sb.append("a");
        }
        buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
        // 检查写入成果
        ByteBufUtil.log(buffer);
    }
}

运转成果

read index:0 write index:0 capacity:16
read index:0 write index:20 capacity:64
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61                                     |aaaa            |
+--------+-------------------------------------------------+----------------+
  • ByteBuf 经过 ByteBufAllocator 挑选 allocator 并调用对应的 buffer () 办法来创立的 ,默许运用 直接内存 作为 ByteBuf,容量为 256 个字节,能够指定初始容量的巨细
  • 当 ByteBuf 的容量无法包容一切数据时,ByteBuf 会进行扩容操作
  • 假如在 handler 中创立 ByteBuf,主张运用 ChannelHandlerContext ctx.alloc ().buffer () 来创立

5.2 直接内存与堆内存

经过该办法创立的 ByteBuf,运用的是依据直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);

能够运用下面的代码来创立池化 依据堆 的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);

也能够运用下面的代码来创立池化依据直接内存的 ByteBuf

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
  • 直接内存创立和毁掉的价值贵重,但读写功用高(少一次内存仿制),适合合作池化功用一重用
  • 直接内存对 GC 压力小,由于这部分内存不受 JVM 废物收回的办理,但也要留意及时主动开释

验证

public class ByteBufStudy {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
        System.out.println(buffer.getClass());
        buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
        System.out.println(buffer.getClass());
        buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
        System.out.println(buffer.getClass());
    }
}
// 运用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
// 运用池化的堆内存    
class io.netty.buffer.PooledUnsafeHeapByteBuf
// 运用池化的直接内存    
class io.netty.buffer.PooledUnsafeDirectByteBuf

5.3 池化与非池化

池化的最大意义在于能够重用 ByteBuf,优点有

  • 没有池化,则每次都得创立新的 ByteBuf 实例,这个操刁难直接内存价值贵重,就算是堆内存,也会增加 GC 压力
  • 有了池化,则能够重用池中 ByteBuf 实例,而且采用了与 jemalloc 相似的内存分配算法提高分配功率
  • 高并发时,池化功用更节省内存,减少内存溢出的或许

池化功用是否开启,能够经过下面的体系环境变量来设置

-Dio.netty.allocator.type={unpooled|pooled}
  • 4.1 今后,非 Android 渠道默许启用池化完成,Android 渠道启用非池化完成
  • 4.1 之前,池化功用还不成熟,默许对错池化完成

5.4 组成

ByteBuf 首要有以下几个组成部分

  • 最大容量与当时容量
    • 在构造 ByteBuf 时,可传入两个参数,别离代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默许为 Integer.MAX_VALUE
    • 当 ByteBuf 容量无法包容一切数据时,会进行扩容操作,若超出最大容量,会抛出 java.lang.IndexOutOfBoundsException 反常
  • 读写操作不同于 ByteBuffer 只用 position 进行控制,ByteBuf 别离由读指针和写指针两个指针控制。进行读写操作时,无需进行形式的切换
    • 读指针前的部分被称为抛弃部分,是现已读过的内容
  • 读指针与写指针之间的空间称为可读部分
    • 写指针与当时容量之间的空间称为可写部分

拆开Netty,我发现了这个8个从来没见过的东西?

5.5 写入

常用办法如下

拆开Netty,我发现了这个8个从来没见过的东西?

留意

  • 这些办法的未指明回来值的,其回来值都是 ByteBuf,意味着能够链式调用来写入不同的数据
  • 网络传输中,默许习惯是 Big Endian,运用 writeInt (int value)

运用办法

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        ByteBufUtil.log(buffer);
        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        ByteBufUtil.log(buffer);
        buffer.writeInt(5);
        ByteBufUtil.log(buffer);
        buffer.writeIntLE(6);
        ByteBufUtil.log(buffer);
        buffer.writeLong(7);
        ByteBufUtil.log(buffer);
    }
}

运转成果

read index:0 write index:0 capacity:16
read index:0 write index:4 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04                                     |....            |
+--------+-------------------------------------------------+----------------+
read index:0 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05                         |........        |
+--------+-------------------------------------------------+----------------+
read index:0 write index:12 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
+--------+-------------------------------------------------+----------------+
read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+

还有一类办法是 set 开头的一系列办法,也能够写入数据,但不会改变写指针方位

5.6 扩容

当 ByteBuf 中的容量无法包容写入的数据时,会进行扩容操作

buffer.writeLong(7);
ByteBufUtil.log(buffer);
// 扩容前
read index:0 write index:12 capacity:16
...
// 扩容后
read index:0 write index:20 capacity:20
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07                                     |....            |
+--------+-------------------------------------------------+----------------+

扩容规矩

  • 怎么写入后数据巨细未超越 512 字节,则挑选下一个 16 的整数倍进行扩容

    • 例如写入后巨细为 12 字节,则扩容后 capacity 是 16 字节
  • 假如写入后数据巨细超越 512 字节,则挑选下一个 2^n

  • 例如写入后巨细为 513 字节,则扩容后 capacity 是 210=1024 字节(29=512 现已不够了)

  • 扩容不能超越 maxCapacity,不然会抛出 java.lang.IndexOutOfBoundsException 反常

Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(20) + minWritableBytes(8) exceeds maxCapacity(20): PooledUnsafeDirectByteBuf(ridx: 0, widx: 20, cap: 20/20)

5.7 读取

读取首要是经过一系列 read 办法进行读取,读取时会依据读取数据的字节数移动读指针

假如需求 重复读取 ,需求调用 buffer.markReaderIndex() 对读指针进行符号,并经过 buffer.resetReaderIndex() 将读指针康复到 mark 符号的方位

public class ByteBufStudy {
    public static void main(String[] args) {
        // 创立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4});
        buffer.writeInt(5);
        // 读取4个字节
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        System.out.println(buffer.readByte());
        ByteBufUtil.log(buffer);
        // 经过mark与reset完成重复读取
        buffer.markReaderIndex();
        System.out.println(buffer.readInt());
        ByteBufUtil.log(buffer);
        // 康复到mark符号处
        buffer.resetReaderIndex();
        ByteBufUtil.log(buffer);
    }
}
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+
5
read index:8 write index:8 capacity:16
read index:4 write index:8 capacity:16
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05                                     |....            |
+--------+-------------------------------------------------+----------------+

还有以 get 开头的一系列办法,这些办法不会改变读指针的方位

5.8 开释

由于 Netty 中有堆外内存(直接内存)的 ByteBuf 完成,堆外内存最好是手动来开释,而不是等 GC 废物收回。

  • UnpooledHeapByteBuf 运用的是 JVM 内存,只需等 GC 收回内存即可
  • UnpooledDirectByteBuf 运用的便是直接内存了,需求特殊的办法来收回内存
  • PooledByteBuf 和它的子类运用了池化机制,需求更杂乱的规矩来收回内存

Netty 这里采用了引证计数法来控制收回内存,每个 ByteBuf 都完成了 ReferenceCounted 接口

  • 每个 ByteBuf 目标的初始计数为 1
  • 调用 release 办法计数减 1,假如计数为 0,ByteBuf 内存被收回
  • 调用 retain 办法计数加 1,表示调用者没用完之前,其它 handler 即便调用了 release 也不会形成收回
  • 当计数为 0 时,底层内存会被收回,这时即便 ByteBuf 目标还在,其各个办法均无法正常运用

开释规矩

由于 pipeline 的存在,一般需求将 ByteBuf 传递给下一个 ChannelHandler,假如在每个 ChannelHandler 中都去调用 release ,就失去了传递性(假如在这个 ChannelHandler 内这个 ByteBuf 已完成了它的任务,那么便无须再传递)

基本规矩是,谁是最后运用者,谁担任 release

  • 起点,对于 NIO 完成来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 办法中首次创立 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead (byteBuf))

  • 入站 ByteBuf 处理准则

    • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead (msg) 向后传递,这时无须 release
    • 将原始 ByteBuf 转换为其它类型的 Java 目标,这时 ByteBuf 就没用了,有必要 release
    • 假如不调用 ctx.fireChannelRead (msg) 向后传递,那么也有必要 release
    • 留意各种反常,假如 ByteBuf 没有成功传递到下一个 ChannelHandler,有必要 release
    • 假定音讯一向向后传,那么 TailContext 会担任开释未处理音讯(原始的 ByteBuf)
  • 出站 ByteBuf 处理准则

    • 出站音讯终究都会转为 ByteBuf 输出,一向向前传,由 HeadContext flush 后 release
  • 反常处理准则

    • 有时候不清楚 ByteBuf 被引证了多少次,但又有必要完全开释,能够循环调用 release 直到回来 true
while (!buffer.release()) {}

当 ByteBuf 被传到了 pipeline 的 head 与 tail 时,ByteBuf 会被其间的办法完全开释,但前提是 ByteBuf 被传递到了 head 与 tail 中

TailConext 中开释 ByteBuf 的源码

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        // 具体的开释办法
        ReferenceCountUtil.release(msg);
    }
}

判断传过来的是否为 ByteBuf,是的话才需求开释

public static boolean release(Object msg) {
	return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

5.9 切片

ByteBuf 切片是【零仿制】的表现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有产生内存仿制,还是运用原始 ByteBuf 的内存,切片后的 ByteBuf 保护独立的 read,write 指针

得到分片后的 buffer 后,要调用其 retain 办法,使其内部的引证计数加一。防止原 ByteBuf 开释,导致切片 buffer 无法运用修正原 ByteBuf 中的值,也会影响切片后得到的 ByteBuf

拆开Netty,我发现了这个8个从来没见过的东西?

public class TestSlice {
    public static void main(String[] args) {
        // 创立ByteBuf
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
        // 向buffer中写入数据
        buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
        // 将buffer分红两部分
        ByteBuf slice1 = buffer.slice(0, 5);
        ByteBuf slice2 = buffer.slice(5, 5);
        // 需求让分片的buffer引证计数加一
        // 防止原Buffer开释导致分片buffer无法运用
        slice1.retain();
        slice2.retain();
        ByteBufUtil.log(slice1);
        ByteBufUtil.log(slice2);
        // 更改原始buffer中的值
        System.out.println("===========修正原buffer中的值===========");
        buffer.setByte(0,5);
        System.out.println("===========打印slice1===========");
        ByteBufUtil.log(slice1);
    }
}

运转成果

read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 06 07 08 09 0a                                  |.....           |
+--------+-------------------------------------------------+----------------+
===========修正原buffer中的值===========
===========打印slice1===========
read index:0 write index:5 capacity:5
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 05 02 03 04 05                                  |.....           |
+--------+-------------------------------------------------+----------------+

5.10 优势

  • 池化思想 – 能够重用池中 ByteBuf 实例,更节省内存,减少内存溢出的或许
  • 读写指针别离,不需求像 ByteBuffer 相同切换读写形式
  • 能够主动扩容
  • 支撑链式调用,运用更流通
  • 许多地方表现零仿制,例如
  • slice、duplicate、CompositeByteBuf

往期优异:

  • Apache Druid数据查询套件详解计数、排名和分位数计算(送JSON-over-HTTP和SQL两种查询详解)

  • 假如你还没玩过Docker Stack办理服务,你现已out了,(送Portainer集群办理教程)

  • 怎样才干快速成为一名架构师?

  • 怎么从Java工程师生长为架构师?

  • 六种常用事务处理方案,你方唱罢,我登场(没有最好只有更好)

  • 超具体教程,一文入门Istio架构原理及实战运用

  • 【图解源码】Zookeeper3.7源码分析,Session的办理机制,Leader选举投票规矩,集群数据同步流程

  • 【图解源码】Zookeeper3.7源码分析,包含服务发动流程源、网络通信、RequestProcessor处理请求

  • 【知其然,知其所以然】配置中心 Apollo源码分析

  • 【推荐】我以为这是最完好的Apollo教程从入门到通晓

  • 探针技能-JavaAgent 和字节码增强技能-Byte Buddy

  • 100003字,带你解密 双11、618电商大促场景下的体系架构体系

  • 【开悟篇】Java多线程之JUC从入门到通晓

  • 12437字,带你深化探求RPC通讯原理

  • JVM调优实战演练,妈妈再也不同担心我的功用优化了

  • 13651个字,给你解释清楚 JVM目标毁掉

  • 搞不懂JVM的类加载机制,JVM功用优化从何谈起?

  • 4859字,609行,一次讲清楚JVM运转数据区

  • 让实习生搭个Redis集群,差点把我”搭“进去~~~

  • 我用Redis分布式锁,抢了瓶茅台,然后GG了~~

  • 新来的,你说下Redis的耐久化机制,哪一种能处理咱们遇到的这个事务问题?

本文由传智教育博学谷教研团队发布。

假如本文对您有协助,欢迎重视点赞;假如您有任何主张也可留言评论私信,您的支撑是我坚持创作的动力。

转载请注明出处!