Netty 入门

概述

什么是 Netty?

  • 异步:用的多线程,不是异步 IO
  • 依据事情驱动:表明用的 Selector

Netty 是一个异步的、依据事情驱动的网络应用结构,用于快速开发可保护、高功能的网络服务器和客户端

Netty 的地位

Netty 在 Java 网络应用结构中的地位就比方:Spring 结构在 JavaEE 开发中的地位

以下的结构都运用了 Netty,由于它们有网络通讯需求!

  • Cassandra – nosql 数据库
  • Spark – 大数据分布式核算结构
  • Hadoop – 大数据分布式存储结构
  • RocketMQ – ali 开源的音讯行列
  • ElasticSearch – 搜索引擎
  • gRPC – rpc 结构
  • Dubbo – rpc 结构
  • Spring 5.x – flux api 完全扔掉了 tomcat ,运用 netty 作为服务器端
  • Zookeeper – 分布式和谐结构

Netty 的优势

Netty vs NIO,工作量大,bug 多

  • 需求自己构建协议
  • 处理 TCP 传输问题,如粘包、半包
  • epoll 空轮询导致 CPU 100%
  • 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer

Netty vs 其它网络应用结构

  • Mina 由 apache 保护,将来 3.x 版别或许会有较大重构,损坏 API 向下兼容性,Netty 的开发迭代更敏捷,API 更简洁、文档更优秀

创立服务器 / 客户端

  1. 首要创立启动器
  2. 创立NioEventLoopGroup依据NIO服务端完成
  3. childHandler表明增加的处理器都是给SocketChannel用的
  4. ChannelInitializer 仅仅履行一次
  5. 客户端 SocketChannel 树立衔接后,履行 initChannel 以便增加更多的处理器
  6. new ServerBootstrap().bind 绑定监听端口

服务端

// 1. 启动器,担任拼装 netty组件,启动服务器
new ServerBootstrap()
        // BossEventLoop,WorkerEventLoop(selector, thread)
        // 创立一个事情循环线程组,适用于 NIO 完成,能够简略理解为 `线程池 + Selector`
        .group(new NioEventLoopGroup())
        // 指定要运用的服务器 Channel 完成,适用于 NIO 完成
        .channel(NioServerSocketChannel.class) // OIO BIO
        // BOSS 担任处理衔接Work(child)担任处理读写,决议了work(child)能履行哪些(handler)
        // 装备服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
        .childHandler(
                // 5. channel代表客户端进行数据读写的通道Initializer初始化,担任增加到其他 handler
                new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 增加详细的 handler
                        // 将 bytebuf 转换为字符串
                        nioSocketChannel.pipeline().addLast(new StringDecoder());
                        // 自界说handler
                        nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override// 读事情
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                // 打印转好的字符串
                                System.out.println(msg);
                            }
                        });
                    }
                })
        // 监听端口
        .bind(8080);

客户端

// 启动类
new Bootstrap()
        // 增加 EventLoop
        .group(new NioEventLoopGroup())
        // 挑选客户端 channel 完成
        .channel(NioSocketChannel.class)
        // 增加处理器
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override // 在树立衔接后被调用
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                // 把字符串编码成字节
                nioSocketChannel.pipeline().addLast(new StringEncoder());
            }
        })
        // 衔接服务器
        .connect(new InetSocketAddress("localhost", 8080))
        .sync()
        .channel()
        // 向服务器发送数据
        .writeAndFlush("hello, world");

流程剖析

  1. 先创立启动器类
  2. 增加组件,eventloop(内部就有线程和挑选器不断循环,查找事情)
  3. 挑选NIOServerSocket 完成
  4. 增加处理器,只要衔接事情发生之后才会履行 initChannel初始化办法
  5. 绑定监听端口(服务端就到这儿)
  6. (客户端)创立启动器和eventloop
  7. 客户端挑选socket事情
  8. 增加处理器,也是等衔接树立才会履行初始化办法
  9. 终究衔接服务器
  10. 服务器监听到accept事情之后
  11. 终究找处理器处理这个事情
  12. (咱们看不明理线),衔接树立后调用初始化办法
  13. 客户端 sync 只要衔接之后才会持续履行
  14. channel() 这个是衔接目标
  15. 终究就能够读写
  16. 发数据就会走到处理器内部
  17. 进行转为字节数组进行 bytebuf进行发送
  18. 服务端的eventloop就会监听到读事情
  19. 走到了服务器的处理器,进行处理

channel数据传输通道,可读出来可写进。和nio概念一致

handel中的message活动数据,handel 是一个工序,对原始数据进行一道道工序进行处理

pipeline便是流水线,一道工序,进行增加工序。

handel 分为 inbound和outbound
入站和出站 读入就走入站,写出就走出站

eventloop便是线程,相当于工人 一旦某一个工人担任一个事情,那就会担任究竟。一个工人是能够办理多个事情的

eventloop既能够履行io操作,也能够进行一般使命,每个工人都有自己的使命行列,顺次处理。底层用的单线程的线程池。

使命能够是守时使命也能够是一般使命

组件

EventLoop

EventLoop是事情循环目标

实质是一个单线程履行器,一起保护了Selector,里边有run办法处理 channel 上源源不断的 io 事情。

承继关系:

  • 一条线是承继自 java.util.concurrent.ScheduledExecutorService
  • 因而包含了线程池的一切办法
  • 另一条是承继自 netty 自己的 OrderedEventExecutor
  • 供给了 Boolean inEventLoop(Thread thread)办法判别一个线程是否归于此EventLoop
  • 供给了 parent办法来看看自己归于哪个 EventLoopGroup
// eventloop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    EventLoopGroup parent();
}
// 承继线程池接口
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

EventLoopGroup 事情循环组,咱们一般运用这个

EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 办法俩绑定其间一个 EventLoop,后续这个 channel 上的 io 事情都由同一个 EventLoop 来处理(确保了 io 事情处理时的线程安全)

  • 承继自 netty 自己的 EventExecutorGroup
  • 完成了 Iterable 接口供给遍历 EventLoop 的能力
  • 另有 next 办法获取调集中下一个 EventLoop

咱们平常运用的是下面两个循环组完成类

// 能够完成一般使命,io事情,守时使命
EventLoopGroup nioGroup = new NioEventLoopGroup();
// 一般使命和守时使命
EventLoopGroup defaultGroup = new DefaultEventLoopGroup();

那么咱们空参创立目标,线程数是多少呢

下面这段代码,是用来初始化 EventLoopGroup目标的。

假如没有指定线程数,会选用默许的,也便是其时体系的CPU核心数 * 2

/**
 * 结构函数,创立MultithreadEventLoopGroup目标
 * @param nThreads 表明EventLoopGroup中EventLoop的数量,假如为0则运用默许的线程数
 * @param executor 用于履行使命的Executor目标
 * @param args 可选参数列表
 */
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    // 调用父类的结构函数,初始化EventLoopGroup目标
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

获取其时CPU核数–我的是 16核

NettyRuntime.availableProcessors();

指定线程数量

EventLoopGroup nioGroup = new NioEventLoopGroup(2);

获取下一个线程

// 创立两个线程的事情循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 获取下一个线程
System.out.println(nioGroup.next()); // 第一次打印第一个
System.out.println(nioGroup.next()); // 打印第二个
System.out.println(nioGroup.next()); // 打印第一个,由于总共就两个
System.out.println(nioGroup.next()); // 打印第二个
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce

履行一般使命

  • 这儿的execute和submit都是一样的效果
// 创立两个线程的事情循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 履行一般使命
nioGroup.next().execute(() -> {
   log.debug("ok");
});
log.debug("main");

守时使命

// 创立两个线程的事情循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 履行守时使命 以必定的频率履行
// 参数1:履行的办法
// 参数2:表明第一次启动多久开端
// 参数3:表明距离多久再次触发
// 参数4:单位
nioGroup.next().scheduleAtFixedRate(() -> {
   log.debug("ok");
}, 3, 5, TimeUnit.SECONDS);
log.debug("main");

IO 事情

下面是服务端,客户端同上面。

一个客户端绑定一个服务的 EventLoop线程。

下面再编写客户端的时候,假如要打上断点完成堵塞,需求将idea的断点卡成单线程的

Netty 网络编程

new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    // 假如咱们没有用 nio的处理字节办法,这个msg是 bytebuf类型
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        ByteBuf buf = (ByteBuf) msg;
                        // 这个实践开发是要指定字符类型的,不要默许
                        log.debug(buf.toString(Charset.defaultCharset()));
                    }
                });
            }
        })
        .bind(8080);

咱们来分工细化一下

  1. 咱们能够把 eventloop 划分为 boss 和 work
  2. 将accept 和 read 分隔处理

那 第一个事情循环组 线程是否能够设置为 1 呢

由于服务器这一有个,它也只会和里边一个 eventloop 进行绑定

new ServerBootstrap()
        // 将 参数1:只处理accept 参数2:处理read
        .group(new NioEventLoopGroup(), new NioEventLoopGroup())

持续分工细化–假如其间一个Nio线程履行中
在读操作时履行太久,会影响其他 channel读操作
最好不要让它占用 work nio线程,所以咱们持续细分

  1. 创立独立的事情循环目标,由于不需求进行io所所以一般的
  2. 将下个处理器绑定上。
// 细分2:创立独立的 EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
        .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline()
                        .addLast("handler1", new ChannelInboundHandlerAdapter() {
                            @Override
                            // 假如咱们没有用 nio的处理字节办法,这个msg是 bytebuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                // 这个实践开发是要指定字符类型的,不要默许
                                log.debug(buf.toString(Charset.defaultCharset()));
                                // 让音讯传递给下一个handler
                                ctx.fireChannelRead(msg);
                            }
                        })
                        // 指定事情循环组和名称
                        .addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                            @Override
                            // 假如咱们没有用 nio的处理字节办法,这个msg是 bytebuf类型
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                // 这个实践开发是要指定字符类型的,不要默许
                                log.debug(buf.toString(Charset.defaultCharset()));
                            }
                        });
            }
        })
        .bind(8080);

Netty 网络编程

那么是怎样完成切换事情循环组,也便是换人处理的呢?

假如两个 handler 绑定的是同一个线程,那么直接调用,不然调用的代码封装为一个使命目标。由一下一个 handler的线程调用

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 下一个 handler 的事情循环是否与其时的事情循环是同一个线程
    /**
    *    EventExecutor 是事情循环组
    */
    EventExecutor executor = next.executor();
    // 其时 handler 中的线程是否和 eventloop 是同一个线程
    // 是,直接调用
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } 
    // 不是,将要履行的代码作为使命提交给下一个事情循环处理(换人)
    else {
    // 运用runnable 履行一个事情
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

Channel & 衔接 & 封闭问题 & 异步

channel 的首要效果

  • close() 能够用来封闭 channel
  • closeFuture() 用来处理 channel 的封闭
    • sync 办法效果是同步等候 channel 封闭
    • 而 addListener 办法是异步等候 channel 封闭
  • pipeline() 办法增加处理器
  • write() 办法将数据写入,仅仅写进入缓冲区
  • writeAndFlush() 办法将数据写入并刷出

connect 衔接问题

connect:是异步堵塞,main办法主张,履行是Nio的线程

所以假如在发送数据前,必定要确保现已衔接好了,不然数据是发送不出去的

ChannelFuture:带有Future或者 promise都是和异步配套运用,用来处理成果

ChannelFuture channelFuture = new Bootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new StringEncoder());
            }
        })
        // connect 是异步堵塞,main办法主张调用,真正履行衔接的是Nio线程
        .connect(new InetSocketAddress("localhost", 8080));
//        channelFuture.sync();
// 会无堵塞向下履行获取channel,终究发送数据
Channel channel = channelFuture.channel();
//        log.debug("{}", channel); // 假如没有获取到打印的便是没有衔接的 channel,所以需求履行 sync
channel.writeAndFlush("123");

处理衔接问题,确保发送数据前必定是正确衔接的

  1. sync
// 堵塞其时线程,直到衔接树立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
  1. 将履行发送的代码,交给nio线程。
  • addListener(回调目标)办法异步处理成果
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    // 在nio 线程衔接树立好之后,会调用operationComplete
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        Channel channel = channelFuture.channel();
        log.debug("{}", channel);
        channel.writeAndFlush("123");
    }
});

封闭问题

咱们看下面这段代码

咱们能够输入内容,发送给服务端,按 q 封闭传输通道

但是咱们封闭的时候,想做一些操作,怎样整 直接在close下面写吗,那是不对的,由于close办法是异步的

channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(new Runnable() {
    @Override
    public void run() {
        Scanner sc = new Scanner(System.in);
        while (true) {
            String line = sc.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }
}, "input").start();

处理异步问题

增加日志调试

nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
  1. 同步处理—由主线程处理封闭操作
new Thread(() -> {
    Scanner sc = new Scanner(System.in);
    while (true) {
        String line = sc.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}, "input").start();
// 增加封闭处理器
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();// 设置同步
log.debug("处理封闭之后....");
  1. 异步处理–由 nio 线程处理
new Thread(() -> {
    Scanner sc = new Scanner(System.in);
    while (true) {
        String line = sc.nextLine();
        if ("q".equals(line)) {
            channel.close();
            break;
        }
        channel.writeAndFlush(line);
    }
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
// 设置封闭回调函数
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        log.debug("处理封闭之后....");
    }
});

高雅的封闭

  • 它会等候一切正在处理的使命完成后,再进行封闭。
NioEventLoopGroup group = new NioEventLoopGroup();
        ChannelFuture channelFuture = new Bootstrap()
                .group(group)
closeFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        log.debug("处理封闭之后的操作");
        group.shutdownGracefully(); // 高雅的封闭
    }
});

异步

异步:便是一个线程主张衔接一个线程去树立衔接

考虑下面的场景,4 个医师给人治病,每个患者花费 20 分钟,而且医师治病的过程中是以患者为单位的,一个患者看完了,才干看下一个患者。假定患者源源不断地来,能够核算一下 4 个医师一天工作 8 小时,处理的患者总数是:4 * 8 * 3 = 96个患者

Netty 网络编程

那咱们就能够细分一下,四个医师别离处理四个事情

只要一开端,医师 2、3、4 别离要等候 5、10、15 分钟才干履行工作,但只要后续患者源源不断地来,他们就能够满负荷工作,而且处理患者的能力提高到了 4 * 8 * 12 功率几乎是原来的四倍

Netty 网络编程

要点:

  1. 单线程无法异步提高功率,有必要配合多线程、多核CPU才干发挥异步的优势
  2. 异步并没有缩短呼应时刻,反而有所增加
  3. 合理进行使命拆分,也是运用异步的要害

Future & Promise

处理异步时:经常用到的两个接口, futer 核 promise

首要要阐明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 承继自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future:只能同步等候使命完毕(或成功、或失利)才干得到成果
  • netty Future:能够同步等候使命完毕得到成果,也能够异步办法得到成果,但都是要等使命完毕
  • netty Promise;不只要 netty Future 的功用,而且脱离了使命独立存在,只作为两个线程间传递成果的容器
功用/名称 jdk Future netty Future Promise
cancel 取消使命
isCanceled 使命是否取消
isDone 使命是否完成,不能区别成功失利
get 获取使命成果,堵塞等候
getNow 获取使命成果,非堵塞,还未发生成果时回来 null
await 等候使命完毕,假如使命失利,不会抛反常,而是经过 isSuccess 判别
sync 等候使命完毕,假如使命失利,抛出反常
isSuccess 判别使命是否成功
cause 获取失利信息,非堵塞,假如没有失利,回来null
addLinstener 增加回调,异步接纳成果
setSuccess 设置成功成果
setFailure 设置失利成果

jdk future

// 线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 提交使命
Future<Integer> future = pool.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        log.debug("履行核算"); // poll 线程
        Thread.sleep(1000);
        return 50;
    }
});
// 主线程经过 future 获取成果
log.debug("等候成果"); // 主线程
log.debug("成果是 {}", future.get()); // 主线程

nio future

// 创立事情循环组
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
// 获取履行事情
EventLoop eventLoop = eventGroup.next();
// 履行办法
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
    @Override
    public Integer call() throws Exception {
        log.debug("履行核算"); // nio 线程
        Thread.sleep(1000);
        return 50;
    }
});

同步获取线程回来成果

// 主线程经过 future 获取成果
log.debug("等候成果"); // 主线程
log.debug("成果是 {}", future.get()); // 主线程

异步

future.addListener(new GenericFutureListener<Future<? super Integer>>() {
    @Override
    public void operationComplete(Future<? super Integer> future) throws Exception {
        log.debug("等候成果"); // nio 线程
        log.debug("成果是 {}", future.getNow()); // nio 线程
    }
});

promise 获取成果

// 1. 准备 EventLoop 目标
EventLoop loop = new NioEventLoopGroup().next();
// 2. 主动创立 promise 成果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(loop);
new Thread(new Runnable() {
    @Override
    public void run() {
        // 3. 恣意一个线程履行核算,核算完毕运用 promise 填充成果
        log.debug("开端核算...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            // 填写失利的数据
            promise.setFailure(e);
        }
        // 填写成功的数据
        promise.setSuccess(90);
    }
}).start();
log.debug("等候成果...");
log.debug("成果是:{}", promise.get());

handler & pipeline

ChannelHandle 用来处理 Channel 上的各种事情,分为入站和出站两种。

当一切的ChannelHandle 连在一起便是 pipeline

  • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,首要用来读取客户端数据,写回成果
  • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,首要对写回成果进行加工

入站是依照次序履行的也便是, 会打印1,2,3

出站是依照逆序的办法,会打印6,5,4

new ServerBootstrap()
        .group(new NioEventLoopGroup())
        .channel(NioServerSocketChannel.class)
        .childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                // 增参加站处理器
                nioSocketChannel.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        log.debug("1");
                        super.channelRead(ctx, msg);
                        // 运用通道写入数据并改写。ctx通道信息,获取上下文的分配器,创立bufer输出数据
                        nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
                    }
                });
                // 同上
                        log.debug("2");
                // 同上
                        log.debug("3");
                //
                // 增加出站处理器
                nioSocketChannel.pipeline().addLast("h4", new ChannelOutboundHandlerAdapter() {
                    @Override
                    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                        log.debug("4");
                        super.write(ctx, msg, promise);
                    }
                });
                // 同上
                        log.debug("5");
                //
                // 同上
                        log.debug("6");
                //
            }
        })
        .bind(8080);

出站处理器
当咱们用 channel 的办法发送数据,会从尾部开端顺次履行出站过滤器
当用办法内部的 ctx channel的办法发送,会从其时方位开端往前找

调试出站入站,handler

public static void main(String[] args) {
        ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("1");
                super.channelRead(ctx, msg);
            }
        };
        ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.debug("2");
                super.channelRead(ctx, msg);
            }
        };
        ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("3");
                super.write(ctx, msg, promise);
            }
        };
        ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                log.debug("4");
                super.write(ctx, msg, promise);
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
        // 模仿入站操作
//        channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
        // 模仿出站操作
        channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
    }

bytebuf

创立了一个默许的 ByteBuf(池化依据直接内存的 ByteBuf),初始容量是 10

假如不指定,默许256,扩容 * 2

ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
public static void main(String[] args) {
    // 创立默许空间的 bytebuf,空间容量默许256,超越空间容量,会主动扩容
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // read index:0 write index:0 capacity:256
    log(buffer);
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < 300; i++) {
        sb.append("a");
    }
    buffer.writeBytes(sb.toString().getBytes());
    log(buffer);
    // read index:0 write index:300 capacity:512
}
// netty 调试 bytebuf 办法
private static void log(ByteBuf buffer) {
    int length = buffer.readableBytes();
    int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
    StringBuilder buf = new StringBuilder(rows * 80 * 2)
            .append("read index:").append(buffer.readerIndex())
            .append(" write index:").append(buffer.writerIndex())
            .append(" capacity:").append(buffer.capacity())
            .append(NEWLINE);
    appendPrettyHexDump(buf, buffer);
    System.out.println(buf.toString());
}

直接内存 & 堆内存

默许是创立的直接内存

创立堆内存

ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);

创立直接内存

ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);

直接内存创立和毁掉的价值贵重,但读写功率高(少一次内存仿制),合适配合池化功用一重用

直接内存对 GC 压力小,由于这部分内存不受 JVM 废物收回的办理,但也要留意及时主动开释

池化 vs 非池化

池化的最大意义在于能够重用 ByteBuf,优点有

  • 没有池化,则每次都得创立新的 ByteBuf 实例,这个操刁难直接内存价值贵重,就算是堆内存,也会增加 GC 压力
  • 有了池化,则能够重用池中 ByteBuf 实例,而且选用了与 jemalloc 类似的内存分配算法提高分配功率
  • 高并发时,池化功用更节省内存,削减内存溢出的或许

池化功用是否敞开,能够经过下面的体系环境变量来设置

下面的代码是在 vm虚拟机参数里设置的

-Dio.netty.allocator.type={unpooled|pooled}

NIO 的缓冲区池化能够有效地削减创立和毁掉缓冲区目标的成本,提高程序的功能和吞吐量。

  • 4.1 今后,非 Android 渠道默许启用池化完成,Android 渠道启用非池化完成
  • 4.1 之前,池化功用还不老练,默许是非池化完成
public static void main(String[] args) {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
    System.out.println(buffer.getClass());
    // class io.netty.buffer.PooledUnsafeDirectByteBuf
    /*
     * PooledUnsafeDirectByteBuf
     * Unpooled:非池化
     * Pooled:池化
     * Direct:直接内存
     * Heap:堆内存
     */
}

Bytebuf的组成部分

  • 抛弃字节:表明现已读过了
  • 可读字节:未读的数据
  • 可写字节:未写入的数据
  • 可扩容:最大的容量,悉数的

Netty 网络编程

butebuf 是读写两个指针,不需求切换读写形式,最开端读写指针都在 0 方位

写读办法

办法签名 意义 补白
writeBoolean(boolean value) 写入 boolean 值 用一字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian,即 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian,即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串

留意

  • 这些办法的未指明回来值的,其回来值都是 ByteBuf,意味着能够链式调用
  • 网络传输,默许习惯是 Big Endian

扩容规矩是

  • 写入后数据巨细未超越 512,则挑选下一个 16 的整数倍,例如写入后巨细为 12 ,则扩容后 capacity 是 16
  • 写入后数据巨细超越 512,则挑选下一个 2^n,例如写入后巨细为 513,则扩容后 capacity 是 2^10=1024(2^9=512 现已不够了)
  • 扩容不能超越 max capacity 会报错

读入

// 只读入一个字节
buffer.readByte()

假如需求重复读取 int 整数 5,怎样办?

能够在 read 前先做个符号 mark

buffer.markReaderIndex();
System.out.println(buffer.readInt());//读入
log(buffer);

这时要重复读取的话,重置到符号方位 reset

buffer.resetReaderIndex();

retain & release(废物收回)

由于 Netty 中有堆外内存的 ByteBuf 完成,堆外内存最好是手动来开释,而不是等 GC 废物收回。

  • UnpooledHeapByteBuf 运用的是 JVM 内存,只需等 GC 收回内存即可
  • UnpooledDirectByteBuf 运用的便是直接内存了,需求特别的办法来收回内存
  • PooledByteBuf 和它的子类运用了池化机制,需求更杂乱的规矩来收回内存

Netty 这儿选用了引证计数法来控制收回内存,每个 ByteBuf 都完成了 ReferenceCounted 接口

  • 每个 ByteBuf 目标的初始计数为 1
  • 调用 release 办法计数减 1,假如计数为 0,ByteBuf 内存被收回
  • 调用 retain 办法计数加 1,表明调用者没用完之前,其它 handler 即使调用了 release 也不会造成收回
  • 当计数为 0 时,底层内存会被收回,这时即使 ByteBuf 目标还在,其各个办法均无法正常运用

留意:由于 pipeline 的存在,一般需求将 ByteBuf 传递给下一个 ChannelHandler,假如在 try finally里 relase,那么就失去了传递性,有必要等 bytebuf 完成了它的使命,才干够。

基本规矩是,谁是终究运用者,谁担任 release

入站 bytebuf 处理准则
对原始 ByteBuf 不做处理 无须 release
将原始 ByteBuf 转换为其它类型的 Java 目标 ByteBuf 就没用了,有必要 release
不调用 ctx.fireChannelRead(msg) 向后传递 有必要 release
ByteBuf 没有成功传递到下一个 有必要 release

出站 ByteBuf 处理准则:由 HeadContext flush 后 release

反常处理准则

  • 有时候不清楚 ByteBuf 被引证了多少次,但又有必要完全开释,能够循环调用 release 直到回来 true

看看开释 bytebuf 的源码

尾部开释

在 io.netty.channel 的类中的内部类 TailContext 下面办法

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}

持续跟进

protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
    this.onUnhandledInboundMessage(msg);
    if (logger.isDebugEnabled()) {
        logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
    }
}

这儿就看到了 ReferenceCountUtil.release(msg); 用来开释

protected void onUnhandledInboundMessage(Object msg) {
    try {
        logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

用来判别,假如是这个类型的,代表能够开释,不然回来false

public static boolean release(Object msg) {
    return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

看看头部开释的源码

io.netty.channel下的HeadContext内部类的write办法

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
    this.unsafe.write(msg, promise);
}

咱们跟进 AbstractChannel完成类的 AbstractUnsafe内部类的 write办法

假如是通道是null 表明用计数办法封闭,并处理反常

this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
    try {
        ReferenceCountUtil.release(msg);
    } finally {
        this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.this.initialCloseCause, "write(Object, ChannelPromise)"));
    }
} else {
    int size;
    try {
        msg = AbstractChannel.this.filterOutboundMessage(msg);
        size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable var15) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            this.safeSetFailure(promise, var15);
        }
        return;
    }
    outboundBuffer.addMessage(msg, size, promise);
}

slice 零仿制表现之一

效果:削减内存仿制

【零仿制】的表现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存仿制,仍是运用原始 ByteBuf 的内存,切片后的 ByteBuf 保护独立的 read,write 指针

切片之后,会对容量进行约束,切多少便是多少。不能再次增加

public class Server {
    public static void main(String[] args) {
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
        // 赋值是个数据
        buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
        // 打印
        log(buffer);
        // 进行切片,这儿是没有发生数据仿制的
        // 从0 切到 5个,a-e
        ByteBuf f1 = buffer.slice(0, 5);
        // 从5 切到 5个,f-j
        ByteBuf f2 = buffer.slice(5, 5);
        log(f1);
        log(f2);
        System.out.println("===============");
        // 修该第0位也便是 a,发现原bytebuf也修改了
        f1.setByte(0, 'b');
        log(f1);
        log(buffer);
    }
    // netty 调试 bytebuf 办法
    private static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
                .append("read index:").append(buffer.readerIndex())
                .append(" write index:").append(buffer.writerIndex())
                .append(" capacity:").append(buffer.capacity())
                .append(NEWLINE);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
}

原 bytebuf 开释之后,切片的bytebuf是不能用了

处理上面的问题

咱们用完那个,主动开释它自己的 bytebuf

东西类仍是上面的

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
    buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
    log(buffer);
    ByteBuf f1 = buffer.slice(0, 5);
    // 让引证数据加1
    f1.retain();
    ByteBuf f2 = buffer.slice(5, 5);
    log(f1);
    log(f2);
    System.out.println("开释原 bytebuf");
    // 让原bytebuf 减1,开释不了真正的内存,没有减到0,不会开释内存
    buffer.release();
    log(f1);
}

duplicate

【零仿制】的表现之一,就比方截取了原始 ByteBuf 一切内容,而且没有 max capacity 的约束,也是与原始 ByteBuf 运用同一块底层内存,仅仅读写指针是独立的

copy

会将底层内存数据进行深仿制,因而无论读写,都与原始 ByteBuf 无关

零仿制,合并小bytebuf

从头核算了 指针方位,也是比较麻烦, 仍是要留意 relase问题哦

public static void main(String[] args) throws Exception {
    ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
    buf1.writeBytes(new byte[]{6, 7, 8, 9, 10});
    ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
    buf2.writeBytes(new byte[]{1, 2, 3, 4, 5});
    // 合并上面两个 buf,不发生仿制
    // 默许不会主动调整写入的方位,有必要带上true
    CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();
    bufs.addComponents(true, buf1, buf2);
    log(bufs);
}

Unpooled

Unpooled 是一个东西类,类如其名,供给了非池化的 ByteBuf 创立、组合、仿制等操作

这儿仅介绍其跟【零仿制】相关的 wrappedBuffer 办法,能够用来包装 ByteBuf

ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装 ByteBuf 个数超越一个时, 底层运用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));

ByteBuf 优势

  • 池化 – 能够重用池中 ByteBuf 实例,更节省内存,削减内存溢出的或许
  • 读写指针别离,不需求像 ByteBuffer 一样切换读写形式
  • 能够主动扩容
  • 支撑链式调用,运用更流通
  • 很多当地表现零仿制,例如 slice、duplicate、CompositeByteBuf

双向通讯

服务端

public static void main(String[] args) throws Exception {
    new ServerBootstrap()
            .group(new NioEventLoopGroup())
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buffer = (ByteBuf) msg;
                            System.out.println(buffer.toString(Charset.defaultCharset()));
                            // 主张运用 ctx.alloc() 创立 ByteBuf
                            ByteBuf response = ctx.alloc().buffer();
                            response.writeBytes(buffer);
                            ctx.writeAndFlush(response);
                            // 考虑:需求开释 buffer 吗
                            // 从 Socket 读取到字节数据之后,将数据转换成 ByteBuf 目标。
                            // 在 Netty 的特别完成下,当该音讯被服务端处理完成
                            // 即当音讯被写回客户端之后,Netty 会主动开释其占用的内存。
                            // 考虑:需求开释 response 吗
                            // response 是咱们手动创立的 ByteBuf 目标,而不是从音讯中经过解码发生的,因而咱们需求手动开释该目标
                            // 又由于咱们向客户端写入数据时运用了 ctx.writeAndFlush(response)
                            // 该办法会主动在发送完成后将 response 进行开释,因而咱们不需求手动开释该目标。
                            // 但假如是经过 write() 办法写入数据,那么咱们需求在写入完成后手动调用 
                            // response.release() 办法对 response 目标进行开释。
                        }
                    });
                }
            }).bind(8080);
}

客户端

public static void main(String[] args) throws InterruptedException {
    NioEventLoopGroup group = new NioEventLoopGroup();
    Channel channel = new Bootstrap().group(group)
            .channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                    nioSocketChannel.pipeline().addLast(new StringEncoder());
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            try {
                                ByteBuf buffer = (ByteBuf) msg;
                                System.out.println(buffer.toString(Charset.defaultCharset()));
                            } finally {
                                // 开释
                                ReferenceCountUtil.release(msg);
                            }
                            // 考虑:需求开释 buffer 吗
                            // 接纳到的 butebuf需求显示开释,防止内存泄露
                            // 假如不,则会保留到程序退出
                        }
                    });
                }
            }).connect("localhost", 8080).sync().channel();
    channel.closeFuture().addListener(future -> {
        group.shutdownGracefully();
    });
    new Thread(() -> {
        Scanner scanner = new Scanner(System.in);
        while (true) {
            String line = scanner.nextLine();
            if ("q".equals(line)) {
                channel.close();
                break;
            }
            channel.writeAndFlush(line);
        }
    }).start();
}

读和写误解

我最初在认识上有这样的误区,以为只要在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互堵塞,才干够完成高效的双向通讯,但实践上,Java Socket 是全双工的:在恣意时刻,线路上存在A 到 BB 到 A 的双向信号传输。即使是堵塞 IO,读和写是能够一起进行的,只要别离选用读线程和写线程即可,读不会堵塞写、写也不会堵塞读

服务端

public class TestServer {
    public static void main(String[] args) throws IOException {
        ServerSocket ss = new ServerSocket(8888);
        Socket s = ss.accept();
        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                // 例如在这个方位参加 thread 级别断点,能够发现即使不写入数据,也不妨碍前面线程读取客户端数据
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端

public class TestClient {
    public static void main(String[] args) throws IOException {
        Socket s = new Socket("localhost", 8888);
        new Thread(() -> {
            try {
                BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
                while (true) {
                    System.out.println(reader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        new Thread(() -> {
            try {
                BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                for (int i = 0; i < 100; i++) {
                    writer.write(String.valueOf(i));
                    writer.newLine();
                    writer.flush();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

netty 进阶

在Client中:在入站处理器中,channelActive办法会在客户端衔接成功后触发。

  • option(ChannelOption.SO_RCVBUF, 10):设置缓冲区巨细 = 10
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
                    // 设置缓冲区巨细,就不会一次悉数接纳了,但是发生了半包问题
                    .option(ChannelOption.SO_RCVBUF, 10);

看一组黏包现象,咱们下面当客户端衔接后会发送数据到服务端,但是咱们期望循环一次发一次,服务端却一次悉数接纳了

READ: 160B,一次悉数接纳了

服务端

@Slf4j
public class Server {
    public void start() {
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) {
                    nioSocketChannel.pipeline().addLast(new LoggingHandler());
                }
            });
            ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
            // 封闭时,同步封闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server error...", e);
        } finally {
            boss.shutdownGracefully(); // 安全封闭
            worker.shutdownGracefully();
        }
    }
    public static void main(String[] args) {
        new Server().start();
    }
}

客户端

public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在衔接树立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        // 循环写入
                        for (int i = 0; i < 10; i++) {
                            ByteBuf buffer = ctx.alloc().buffer(16);
                            buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                            ctx.writeAndFlush(buffer);
                        }
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

半包问题,便是设置了缓冲区巨细,然后客户端发送的数据是

Netty 网络编程

留意:TCP 存在黏包半包,UDP 是不存在的

TCP 应答机制(滑动窗口)

TCP 可靠:发信息到对端,对端有必要应答,假如长时刻没有应答,会重发信息,影响吞吐量

滑动窗口处理上面问题

粘包:发送方发送一个完好报文,由于接纳方处理不及时,且窗口比较大,就构成粘包

半包:接纳方只剩下了一部分,发送方的数据不能悉数放下,只能先放一部分,等候窗口移动持续放,构成半包

处理办法:引入窗口

窗口巨细决议了无需等候应答而能够持续发送的数据最大值

Netty 网络编程

窗口实践就起到一个缓冲区的效果,一起也能起到流量控制的效果

  • 图中深色的部分即要发送的数据,高亮的部分即窗口
  • 窗口内的数据才答应被发送,当应答未到达前,窗口有必要中止滑动
  • 假如 1001~2000 这个段的数据 ack 回来了,窗口就能够向前滑动
  • 接纳方也会保护一个窗口,只要落在窗口内的数据才干答应接纳

短衔接

处理粘包:客户端每发送一次,就封闭衔接,也叫做短衔接

下面是客户端,服务端仍是上面的

public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            send();
        }
    }
private static void send() {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在衔接树立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        // 循环写入
                        ByteBuf buffer = ctx.alloc().buffer(16);
                        buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
                        ctx.writeAndFlush(buffer);
                        ctx.close();
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (
            InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

但是会发生半包,假如服务端设置了 衔接的 bytebuf 巨细,发送方发的数据过大,每次都接纳不全

这儿说下,修改容量的办法–ServerBootstrap的类

// 是针对每个衔接的装备,完成默许,会1024的容量
// 现在改成最小值,初始值,最大值。最低是十六,内部会取16的整数倍
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));

固定长度

客户端和服务端约定好。服务端每次解码10个字节,多的就等下次够十个再解码。

留意次序:处理器,必定要先解码再打印日志信息。解码处理器在前面

服务端先参加。

// 增加定长解码器,长度和客户端约定好
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));

客户端—每次都是十个字节一个数据,不足用_补齐

public static void main(String[] args) {
    send();
}
    /**
     * 填充数组为指定长度,并将数组中的元素替换为指定字符对应的字节值。
     *
     * @param c   要替换的字符
     * @param len 数组长度
     * @return 填充后的字节数组
     */
    public static byte[] fill10Bytes(char c, int len) {
        byte[] bytes = new byte[10];
        Arrays.fill(bytes, (byte) 95);
        for (int i = 0; i < len; ++i) {
            bytes[i] = (byte) c;
        }
        System.out.println(new String(bytes));
        return bytes;
    }
private static void send() {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在衔接树立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) {
                        // 循环写入
                        ByteBuf buffer = ctx.alloc().buffer();
                        char c = '0';
                        Random r = new Random();
                        for (int i = 0; i < 10; i++) {
                            buffer.writeBytes(fill10Bytes(c++, r.nextInt(10) + 1));
                        }
                        ctx.writeAndFlush(buffer);
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (
            InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

服务端

public void start() {
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                // 增加定长解码器,长度和客户端约定好
                nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
        // 封闭时,同步封闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        boss.shutdownGracefully(); // 安全封闭
        worker.shutdownGracefully();
    }
}
public static void main(String[] args) {
    new Server().start();
}

缺陷:会浪费字节,长度需求决议计划好

行解码器

换行符解码器,不能指定,默许 \n \r

// 还有一种DelimiterBasedFrameDecoder
// 参数1是最大长度,参数2:是指定分隔符
addLast(new LoggingHandler(LogLevel.DEBUG));

客户端

public static void main(String[] args) {
    send();
}
/**
 * 运用指定的字符生成一个指定长度的字符串,并在字符串结尾增加换行符。
 *
 * @param c   要生成的字符
 * @param len 字符串长度
 * @return 生成的字符串
 */
public static StringBuilder makeString(char c, int len) {
    StringBuilder sb = new StringBuilder(len + 2);
    for (int i = 0; i < len; ++i) {
        sb.append(c);
    }
    sb.append("\n");
    return sb;
}
private static void send() {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    // channelActive:在衔接树立好之后触发
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) {
                        // 循环写入
                        ByteBuf buffer = ctx.alloc().buffer();
                        char c = '0';
                        Random r = new Random();
                        for (int i = 0; i < 10; i++) {
                            String s = makeString(c++, r.nextInt(256) + 1).toString();
                            buffer.writeBytes(s.getBytes());
                        }
                        ctx.writeAndFlush(buffer);
                    }
                });
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
        channelFuture.channel().closeFuture().sync();
    } catch (
            InterruptedException e) {
        log.error("client error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

服务端

public void start() {
    NioEventLoopGroup boss = new NioEventLoopGroup();
    NioEventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                // 增加依据换行符的数据,遇到 \n \r等就会将这条数据截取。每一行最大数据1024
                // 超出则丢掉
                nioSocketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024));
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
        // 封闭时,同步封闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        boss.shutdownGracefully(); // 安全封闭
        worker.shutdownGracefully();
    }
}
public static void main(String[] args) {
    new Server().start();
}

LTC 解码器

lengthFieldOffset 长度字段偏移量,也便是长度字节开端方位,图中等于0,表明长度字节从 0 开端

lengthFieldLength长度字段自身是多长,图中等于 2,表明000C,C = 12

lengthAdjustment长度字段为基准,还有几个字节是内容

initialBytesToStrip 从头剥离几个字节,也便是字段长度记载了内容长度,总共两个字节,解析完想删除这两个字节

就能够依据上面信息,从0开端读,读12个字节拿到 hello world

Netty 网络编程

下面这个有点不一样,它多带了一个头,从第 2 个字节开端读,总共三个长度字段

Netty 网络编程

表明从0开端读,长度字段为 3 个,不需求剥离,越过两个字节,开端读取12个内容字节

Netty 网络编程

下面办法测验

public static void main(String[] args) throws Exception {
    // netty 的测验办法,省去很多
    EmbeddedChannel channel = new EmbeddedChannel(
            // 参数1:最大长度,参数2:长度字段开端方位,参数3:长度字段自身字节数,参数4:是否越过字节,参数5:剥离长度
            new LengthFieldBasedFrameDecoder(
                    1024, 0, 4, 1, 4),
            new LoggingHandler(LogLevel.DEBUG)
    );
    // 四个字节的内容长度,实践内容
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    send(buffer, "Hello, world");
    send(buffer, "Hi!");
    // 运用 Em的channel写入 bufer
    channel.writeInbound(buffer);
    // 打印
    // .Hello, world,
    // 前面的 . 代表咱们的附加内容,假如不剥离字段长度,会更多
}
private static void send(ByteBuf buffer, String content) {
    byte[] bytes = content.getBytes();
    buffer.writeInt(bytes.length); // 先写入内容的长度
    buffer.writeByte(1); // 咱们写入一个附加内容,前面现已越过的字节,在参数4方位
    buffer.writeBytes(bytes);
}

协议设计与解析

TCP/IP 中音讯传输依据流的办法,没有鸿沟。

协议的意图便是划定音讯的鸿沟,制定通讯两边要共同恪守的通讯规矩

  • redis 协议
  • 要求咱们先发送
  • *3:表明三个元素
  • 每个指令的长度
  • $3:表明 set 三个长度
  • set
  • $4
  • name
  • $8
  • zhangsan
public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
        final byte[] LINE = new byte[]{13, 10};
        try {
            Bootstrap bootstrap = new Bootstrap().group(worker).channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel nioSocketChannel) {
                    nioSocketChannel.pipeline().addLast(new LoggingHandler());
                    nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ByteBuf buffer = ctx.alloc().buffer();
                            buffer.writeBytes("*3".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("$3".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("set".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("$4".getBytes()); // name 四个长度
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("name".getBytes());
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("$8".getBytes()); // zhangsan 8个长度
                            buffer.writeBytes(LINE);
                            buffer.writeBytes("zhangsan".getBytes());
                            buffer.writeBytes(LINE);
                            ctx.writeAndFlush(buffer);
                        }
                        @Override
                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                            ByteBuf buf = (ByteBuf) msg;
                            System.out.println(buf.toString(Charset.defaultCharset()));
                        }
                    });
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("localhost",6379).sync();
            // 封闭时,同步封闭
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.debug("server error...", e);
        } finally {
            worker.shutdownGracefully();
        }
}

http 协议

new HttpServerCodec()

Netty 供给的一个方便的处理器,它将实践上是将 HttpRequestDecoder 和 HttpResponseEncoder 组合在一起,前者用于解码 HTTP 恳求,后者用于编码 HTTP 呼应。

下面是简略运用

public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    NioEventLoopGroup boss = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
                nioSocketChannel.pipeline().addLast(new HttpServerCodec());
                nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                        log.debug("{}", msg.getClass());
                        if (msg instanceof HttpRequest) { // 恳求行,恳求头(是get恳求)
                        } else if (msg instanceof HttpContent) { // 恳求体(不是get) 
                        }
                    }
                });
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
        // 封闭时,同步封闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

咱们期望只关注某一种类型的处理

SimpleChannelInboundHandler:只关怀特定的音讯,request或者httpconent,不是则越过

  • DefaultFullHttpResponse:获取呼应目标,进行写回数据
public static void main(String[] args) {
    NioEventLoopGroup worker = new NioEventLoopGroup();
    NioEventLoopGroup boss = new NioEventLoopGroup();
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
        serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel nioSocketChannel) {
                nioSocketChannel.pipeline().addLast(new LoggingHandler());
                nioSocketChannel.pipeline().addLast(new HttpServerCodec()); // 即使入栈处理器也是出站处理器
                nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
                        // 获取恳求
                        log.debug(httpRequest.uri()); // 恳求行
                        log.debug("{}", httpRequest.headers()); // 恳求头
                        // 获取呼应
                        /*
                         * version 版别
                         * status 呼应状况
                         */
                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.OK);
                        // 向浏览器写入数据
                        byte[] bytes = "<h1>Hello World</h1>".getBytes();
                        response.content().writeBytes(bytes);
                        // 咱们应该在呼应头加 呼应体长度,不然浏览器会一向在等候呼应内容
                        response.headers().setInt(CONTENT_LENGTH, bytes.length); // netty 结构 HttpHeaderNames
                        // 写回呼应
                        channelHandlerContext.writeAndFlush(response);
                    }
                });
            }
        });
        ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
        // 封闭时,同步封闭
        channelFuture.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        log.debug("server error...", e);
    } finally {
        worker.shutdownGracefully();
    }
}

自界说协议要素

  • 魔数,用来在第一时刻断定是否是无效数据包
  • 版别号,能够支撑协议的晋级
  • 序列化算法,音讯正文究竟选用哪种序列化反序列化办法,能够由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊… 跟事务相关
  • 恳求序号,为了双工通讯,供给异步能力
  • 正文长度
  • 音讯正文

先写编解码器

这儿写一个 message 的发送的信息,仅仅一个类型状况

会有多个子类承继这个 message 类,表明多个状况

public abstract class Message implements Serializable {
    /**
     * 依据音讯类型字节,取得对应的音讯 class
     *
     * @param messageType 音讯类型字节
     * @return 音讯 class
     */
    public static Class<? extends Message> getMessageClass(int messageType) {
        return messageClasses.get(messageType);
    }
    // 存储序列号
    private int sequenceId;
    // 存储音讯的类型
    private int messageType;
    public abstract int getMessageType();
    // 登录恳求音讯
    public static final int LoginRequestMessage = 0;
    // 登录呼应音讯
    public static final int LoginResponseMessage = 1;
    // 谈天恳求音讯
    public static final int ChatRequestMessage = 2;
    // 谈天呼应音讯
    public static final int ChatResponseMessage = 3;
    // 创立群组恳求音讯
    public static final int GroupCreateRequestMessage = 4;
    // 创立群组呼应音讯
    public static final int GroupCreateResponseMessage = 5;
    // 参加群组恳求音讯
    public static final int GroupJoinRequestMessage = 6;
    // 参加群组呼应音讯
    public static final int GroupJoinResponseMessage = 7;
    // 退出群组恳求音讯
    public static final int GroupQuitRequestMessage = 8;
    // 退出群组呼应音讯
    public static final int GroupQuitResponseMessage = 9;
    // 群组谈天恳求音讯
    public static final int GroupChatRequestMessage = 10;
    // 群组谈天呼应音讯
    public static final int GroupChatResponseMessage = 11;
    // 获取群组成员恳求音讯
    public static final int GroupMembersRequestMessage = 12;
    // 获取群组成员呼应音讯
    public static final int GroupMembersResponseMessage = 13;
    // 心跳恳求音讯
    public static final int PingMessage = 14;
    // 心跳呼应音讯
    public static final int PongMessage = 15;
    /**
     * 恳求类型 byte 值
     */
    public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    /**
     * 呼应类型 byte 值
     */
    public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
    private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
    static {
        messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
        messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
        messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
        messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
        messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
        messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
        messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
        messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
        messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
        messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
        messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
        messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
        messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
        messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
        messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    }

自界说编码解码

readInt 是Netty中ByteBuf类的一个办法,用于从ByteBuf中读取一个int类型的数据。

它回来一个int类型的值,并将读取索引主动增加4(int的字节数)

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
    // 编码
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4个字节的魔数,恣意
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版别
        out.writeByte(1);
        // 3. 字节的序列化办法 0,jdk 1 JSON
        out.writeByte(0);
        // 4. 字节指令类型---.getMessageType 拿到其时多态的,类型,每个类都有指定的类型
        out.writeByte(msg.getMessageType());
        // 5. 4个字节--恳求序号
        out.writeInt(msg.getSequenceId());
        out.writeByte(0xff); // 为了凑数的,由于固定字节数 要 2 的倍数
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容--用jdk序列化
        out.writeBytes(bytes);
    }
    // 解码
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 先读取戏法
        int magicNum = in.readInt();
        // 读取版别
        byte version = in.readByte();
        // 读取序列号办法
        byte serializerType = in.readByte();
        // 读取音讯类型 messageType
        byte messageType = in.readByte();
        // 读取恳求序号
        int sequenceId = in.readInt();
        // 读取无意义的字节,其时为了规范加的
        in.readByte();
        // 读取长度
        int length = in.readInt();
        // 获取数据字节
        byte[] bytes = new byte[length];
        ByteBuf byteBuf = in.readBytes(bytes, 0, length);
        // 转为目标
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
        Message message = (Message) ois.readObject();
        log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        // 终究增加到调集中
        out.add(message);
    }
}

然后咱们测验这个处理器

  • 1024:最大容量
  • 12:偏移量,由于序列号,魔数,类型无效字符等,协议。
  • 4:长度字节
  • 0:不需求越过
  • 0:由于 decode自己解析,所以不需求

留意要加LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),处理黏包半包

敞开了解码器:假如数据不完好,将不会往下走

writeInbound:履行完会调用 release开释 bytebuf

public static void main(String[] args) throws Exception {
    // 增加刚刚写的编码解码
    EmbeddedChannel channel = new EmbeddedChannel(
            new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
            new LoggingHandler(),
            new MessageCodec());
    // encode 编码
    LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zhangsan", "123");
    channel.writeOutbound(loginRequestMessage);
    // decode 解码
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // 写入数据
    new MessageCodec().encode(null, loginRequestMessage, buffer);
    // 入站
    channel.writeInbound(buffer);
}

打印的字节数,能够依据自己界说的协议来,对比,是正确的

注解(安全状况)

sharable 线程安全

咱们能够翻开处理类的源码查看

@Sharable 表明线程安全,这个处理器能够被同享运用

像 LTC解码器是不能够被同享的,存在线程危险

@Sharable
public class LoggingHandler extends ChannelDuplexHandler {

那咱们自己界说的 handle 能够加同享注解吗

ByteToMessageCodec;子类文档明确规定,不能加同享注解,不然抛出反常

public class MessageCodec extends ByteToMessageCodec<Message> {

那需求增加怎样处理

运用这个父类即可–MessageToMessageCodec

有一些小变动,需求自行改一下,bytebuf方面

public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {

开发谈天室

首要,咱们协议在 LengthFieldBasedFrameDecoder直接写,会简单出错咱们制作成类

public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
    // 协议
    public ProcotolFrameDecoder() {
        this(1024, 12, 4, 0, 0);
    }
    public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
}

登录

客户端

客户端衔接成功之后进行衔接树立事情,控制台输入

需求拓荒线程,运用 nio 线程会影响其他 io 操作

  • 咱们构建完目标,进行writeAndFlush发送
  • 往上持续找上一个handel
  • MESSAGE_CODEC 编码,LOGGING_HANDLER 记载日志
  • ProcotolFrameDecoder是入站的所以不会触发

先看一下下面这个 CountDownLatch

用于同步东西类,和谐线程之间的等候时刻

  • countDown():每次调用减 1,为零开释一切线程
  • await():让调用的线程等候计数器为 0 ,立即回来,不然会堵塞
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);

用于在多线程下判别的 boolean 目标

  • get():获取其时布尔值。
  • set(boolean newValue):设置布尔值为指定的新值。
AtomicBoolean LOGIN = new AtomicBoolean();

会话办理—登录成功后保存是那个channel那个用户

将用户和 channel 绑定起来

SessionFactory.getSession().bind(ctx.channel(), username);

留意:handler 没有加 @ChannelHandler.Sharable 会导致其他客户端衔接不上

衔接假死

原因

  • 网络设备呈现毛病,例如网卡,机房等,底层的 TCP 衔接现已断开了,但应用程序没有感知到,依然占用着资源。
  • 公网网络不稳定,呈现丢包。假如接连呈现丢包,这时现象便是客户端数据发不出去,服务端也一向收不到数据,就这么一向耗着
  • 应用程序线程堵塞,无法进行数据读写

问题

  • 假死的衔接占用的资源不能主动开释
  • 向假死的衔接发送数据,得到的反馈是发送超时

IdleStateHandler 处理器处理以上问题

  • 第一个参数: 表明读闲暇时刻,即在指定的时刻距离内假如没有读取到数据,则会触发userEventTriggered办法。
  • 第二个参数: 表明写闲暇时刻,即在指定的时刻距离内假如没有写入数据,则会触发userEventTriggered办法。
  • 第三个参数: 表明读写闲暇时刻,即在指定的时刻距离内假如既没有读取到数据也没有写入数据,则会触发userEventTriggered办法。

用心跳机制,确保客户端正常链接,处理闲暇问题

优化

序列化算法

序列化,反序列化首要用在音讯正文的转换上

  • 序列化时,需求将 Java 目标变为要传输的数据(能够是 byte[],或 json 等,终究都需求变成 byte[])
  • 反序列化时,需求将传入的正文数据还原成 Java 目标,便于处理
/**
 * 扩展序列化
 */
public interface Serializer {
    // 反序列化
    <T> T deserialize(Class<T> tClass, byte[] bytes);
    // 序列化
    <T> byte[] serialize(T object);
    enum Algorithm implements Serializer {
        Java {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                try {
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失利", e);
                }
            }
            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失利", e);
                }
            }
        },
        Json {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                String s = new String(bytes, StandardCharsets.UTF_8);
                return new Gson().fromJson(s, tClass);
            }
            @Override
            public <T> byte[] serialize(T object) {
                String toJson = new Gson().toJson(object);
                return toJson.getBytes(StandardCharsets.UTF_8);
            }
        }
    }
}

留意反序列化,取得的目标

参数调优

// 客户端经过 option 装备参数,给 SocketChannel 装备参数
new Bootstrap().option() 
// 服务端经过 option 装备参数,给 ServerSocketChannel 装备参数
new ServerBootstrap().option()
// 服务端经过 childOptio 装备参数,给 SocketChannel 装备参数
new ServerBootstrap().childOption()

客户端超时衔接

客户端不必定比及 5秒后抛出反常,它有或许判别服务器没有敞开,根本衔接不上,直接抛出网络编程反常

bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);

SO_TIMEOUT 首要用在堵塞 IO,堵塞 IO 中 accept,read 等都是无限等候的,假如不期望永远堵塞,运用它调整超时时刻,netty 不会用的。

悉数代码

Netty 网络编程

client

NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// 线程计数器
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 登录状况--默许未登录
AtomicBoolean LOGIN = new AtomicBoolean(false);
try {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class);
//            bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);
    bootstrap.group(group);
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        // 初始化操作
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new ProcotolFrameDecoder());
//                    ch.pipeline().addLast(LOGGING_HANDLER);
            ch.pipeline().addLast(MESSAGE_CODEC);
            // 用来判别是不是,读闲暇时刻太长或者 写时刻闲暇时刻太长
            // 5s 没用收到 channel 的数据会触发一个 IdleState#WRITER_IDLE 事情
            ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
            // ChannelDuplexHandler 能够一起作为入站和出站处理器
            ch.pipeline().addLast(new ChannelDuplexHandler() {
                // 触发特别事情 IdleState
                @Override
                public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                    IdleStateEvent event = (IdleStateEvent) evt;
                    // 写闲暇
                    if (event.state() == IdleState.WRITER_IDLE) {
                        ctx.writeAndFlush(new PingMessage());
                    }
                }
            });
            // 树立衔接之后触发 active
            ch.pipeline().addLast("client_handel", new ChannelInboundHandlerAdapter() {
                // 承受呼应信息
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//                            log.debug("{}", msg);
                    if (msg instanceof LoginResponseMessage) {
                        LoginResponseMessage message = (LoginResponseMessage) msg;
                        // 假如登录成功设置为 true
                        if (message.isSuccess()) {
                            LOGIN.set(true);
                        }
                        // 让计数器 - 1,让线程持续履行,唤醒
                        WAIT_FOR_LOGIN.countDown();
                    }
                }
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    // 在这儿要创立线程去完成,控制台输入,担任向服务器发送信息
                    new Thread(() -> {
                        Scanner sc = new Scanner(System.in);
                        System.out.println("请输入用户名:");
                        String username = sc.nextLine();
                        System.out.println("请输入暗码:");
                        String password = sc.nextLine();
                        // 构建音讯目标
                        LoginRequestMessage message = new LoginRequestMessage(username, password);
                        // 发送信息
                        ctx.writeAndFlush(message);
                        System.out.println("等候后续操作....");
                        try {
                            // 呼应回来,持续履行
                            WAIT_FOR_LOGIN.await();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 假如登录失利,封闭 channel,回来
                        log.debug("{}", LOGIN.get());
                        if (!LOGIN.get()) {
                            ctx.channel().close();
                            return;
                        }
                        // 打印菜单,登录成功
                        while (true) {
                            System.out.println("==================================");
                            System.out.println("send [username] [content]"); // 发送
                            System.out.println("gsend [group name] [content]"); // 向群聊发送
                            System.out.println("gcreate [group name] [m1,m2,m3...]"); // 创立群,而且拉人
                            System.out.println("gmembers [group name]"); // 查看群里成员
                            System.out.println("gjoin [group name]"); // 参加谈天群
                            System.out.println("gquit [group name]"); // 退出谈天群
                            System.out.println("quit");
                            System.out.println("==================================");
                            Scanner command = new Scanner(System.in);
                            // 用户输入指令--由于指令都是以空格分割的
                            String[] s = command.nextLine().split(" ");
                            switch (s[0]) { // 依据指令格局,0 表明指令类型
                                case "send":
                                    ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
                                    break;
                                case "gsend":
                                    ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
                                    break;
                                case "gcreate":
                                    HashSet<String> strings = new HashSet<>(Arrays.asList(s[2].split(",")));
                                    strings.add(username);
                                    ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], strings));
                                    break;
                                case "gmembers":
                                    ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
                                    break;
                                case "gjoin":
                                    ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
                                    break;
                                case "gquit":
                                    ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
                                    break;
                                case "quit":
                                    ctx.channel().close();
                                    return;
                            }
                        }
                    }, "system in").start();
                }
            });
        }
    });
    log.debug("登录");
    Channel channel = bootstrap.connect("localhost", 8081).sync().channel();
    channel.closeFuture().sync();
} catch (
        Exception e) {
    log.error("client error", e);
} finally {
    group.shutdownGracefully();
}

config 包

public abstract class Config {
    static Properties properties;
    static {
        try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
            properties = new Properties();
            properties.load(in);
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
    public static int getServerPort() {
        String value = properties.getProperty("server.port");
        if(value == null) {
            return 8080;
        } else {
            return Integer.parseInt(value);
        }
    }
    public static Serializer.Algorithm getSerializerAlgorithm() {
        String value = properties.getProperty("serializer.algorithm");
        if(value == null) {
            return Serializer.Algorithm.Java;
        } else {
            return Serializer.Algorithm.valueOf(value);
        }
    }
}

protocol

@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {
    @Override
    public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版别,
        out.writeByte(1);
        // 3. 1 字节的序列化办法 jdk 0 , json 1
        out.writeByte(0);
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        ObjectOutputStream oos = new ObjectOutputStream(bos);
        oos.writeObject(msg);
        byte[] bytes = bos.toByteArray();
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
        Message message = (Message) ois.readObject();
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}
@ChannelHandler.Sharable
/**
 * 有必要和 LengthFieldBasedFrameDecoder 一起运用,确保接到的 ByteBuf 音讯是完好的
 */
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
        ByteBuf out = ctx.alloc().buffer();
        // 1. 4 字节的魔数
        out.writeBytes(new byte[]{1, 2, 3, 4});
        // 2. 1 字节的版别,
        out.writeByte(1);
        // 3. 1 字节的序列化办法 jdk 0 , json 1
        /* ordinal 参数会依据排序进行对比,比方 java是一个那么就会转换 0 以此类推 */
        out.writeByte(Config.getSerializerAlgorithm().ordinal());
        // 4. 1 字节的指令类型
        out.writeByte(msg.getMessageType());
        // 5. 4 个字节
        out.writeInt(msg.getSequenceId());
        // 无意义,对齐填充
        out.writeByte(0xff);
        // 6. 获取内容的字节数组,序列化, 运用
        byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
        // 7. 长度
        out.writeInt(bytes.length);
        // 8. 写入内容
        out.writeBytes(bytes);
        outList.add(out);
    }
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int magicNum = in.readInt();
        byte version = in.readByte();
        byte serializerType = in.readByte();
        byte messageType = in.readByte();
        int sequenceId = in.readInt();
        in.readByte();
        int length = in.readInt();
        byte[] bytes = new byte[length];
        in.readBytes(bytes, 0, length);
        // 反序列化
        // 依据个数拿到对应的序列化算法,找到反序列化的算法
        Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];
        // 确定详细音讯类型
        Class<?> aClass = Message.getMessageClass(messageType);
        Object message = algorithm.deserialize(aClass, bytes);
        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
        log.debug("{}", message);
        out.add(message);
    }
}
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
    public ProcotolFrameDecoder() {
        this(1024, 12, 4, 0, 0);
    }
    public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
    }
}
/**
 * 扩展序列化
 */
public interface Serializer {
    // 反序列化
    <T> T deserialize(Class<T> tClass, byte[] bytes);
    // 序列化
    <T> byte[] serialize(T object);
    enum Algorithm implements Serializer {
        Java {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                try {
                    ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                    return (T) ois.readObject();
                } catch (IOException | ClassNotFoundException e) {
                    throw new RuntimeException("反序列化失利", e);
                }
            }
            @Override
            public <T> byte[] serialize(T object) {
                try {
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    ObjectOutputStream oos = new ObjectOutputStream(bos);
                    oos.writeObject(object);
                    return bos.toByteArray();
                } catch (IOException e) {
                    throw new RuntimeException("序列化失利", e);
                }
            }
        },
        Json {
            @Override
            public <T> T deserialize(Class<T> tClass, byte[] bytes) {
                String s = new String(bytes, StandardCharsets.UTF_8);
                return new Gson().fromJson(s, tClass);
            }
            @Override
            public <T> byte[] serialize(T object) {
                String toJson = new Gson().toJson(object);
                return toJson.getBytes(StandardCharsets.UTF_8);
            }
        }
    }
}

server.handler

@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
        // 获取接纳者者
        String to = chatRequestMessage.getTo();
        // 依据用户名获取 channel
        Channel channel = SessionFactory.getSession().getChannel(to);
        // 为空,表明不在线
        if (channel != null) {
            // 发送信息
            channel.writeAndFlush(new ChatResponseMessage(chatRequestMessage.getFrom(), chatRequestMessage.getContent()));
        } else {
            channelHandlerContext.writeAndFlush(new ChatResponseMessage(false, "对方用户不在线"));
        }
    }
}
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
        // 依据群名,获取一切通道
        List<Channel> channel = GroupSessionFactory.getGroupSession()
                .getMembersChannel(groupChatRequestMessage.getGroupName());
        // 循环发送信息
        for (Channel channel1 : channel) {
            channel1.writeAndFlush(new GroupChatResponseMessage(groupChatRequestMessage.getFrom(),
                    groupChatRequestMessage.getContent()));
        }
    }
}
/**
 * 处理创立群聊
 */
@ChannelHandler.Sharable
public class GroupCreateRequestHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
        // 将要创立的群聊名称
        String groupName = groupCreateRequestMessage.getGroupName();
        // 创立群而且要拉入的群成员
        Set<String> members = groupCreateRequestMessage.getMembers();
        // 获取群办理器
        GroupSession groupSession = GroupSessionFactory.getGroupSession();
        // 创立群
        Group group = groupSession.createGroup(groupName, members);
        if (group == null) {
            // 发送拉群音讯
            List<Channel> channelList = groupSession.getMembersChannel(groupName);
            for (Channel channel : channelList) {
                channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
            }
            channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创立成功"));
        } else {
            channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "群现已存在"));
        }
    }
}
@ChannelHandler.Sharable // 由于没有状况信息,多线程下是安全的
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
        String username = msg.getUsername();
        String password = msg.getPassword();
        // 判别登录状况
        boolean login = UserServiceFactory.getUserService().login(username, password);
        LoginResponseMessage message;
        if (login) {
            SessionFactory.getSession().bind(ctx.channel(), username);
            message = new LoginResponseMessage(true, "登录成功");
        } else {
            message = new LoginResponseMessage(false, "用户名或暗码不正确");
        }
        ctx.writeAndFlush(message);
    }
}
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
    // 当衔接断开触发 inactive 事情
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 正常断开", ctx.channel());
    }
    // 用户反常断开
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        SessionFactory.getSession().unbind(ctx.channel());
        log.debug("{} 反常断开", ctx.channel());
    }
}

backlog 衔接

Netty 网络编程
服务器在三次衔接树立之后,将半衔接行列转移到全衔接行列

由于服务器 accept 衔接量特别大,所以将树立成功的信息放到全衔接行列。以此处理。

accept 是发生在衔接之前的。

  • 在 linux 2.2 之前,backlog 巨细包含了两个行列的巨细,在 2.2 之后,别离用下面两个参数来控制

  • sync queue – 半衔接行列

    • 巨细经过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的状况下,逻辑上没有最大值约束,这个设置便被忽略
  • accept queue – 全衔接行列

    • 其巨细经过 /proc/sys/net/core/somaxconn 指定,在运用 listen 函数时,内核会依据传入的 backlog 参数与体系参数,取二者的较小值,假定linux装备 100,程序装备 200 ,取100为准
    • 假如 accpet queue 行列满了,server 将发送一个拒绝衔接的错误信息到 client
  • ChannelOption.SO_BACKLOG, 2,设置全衔接行列容量巨细
new ServerBootstrap()
.group(new NioEventLoopGroup())
// 装备让全链接行列最大存放 2个
.option(ChannelOption.SO_BACKLOG, 2)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new LoggingHandler());
    }
}).bind(8424);

断点在 nioeventloop 下面代码中,即可调试

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    unsafe.read();
}

超越行列巨细,将会抛出反常

体系参数

  • ulimit -n:一个进程能够翻开的文件操作数
  • TCP_NODELAY:设置 true 不需求延迟,不需求算法,主张 true
  • 归于 SocketChannal 参数
  • 归于操作体系参数
  • SO_SNDBUF & SO_RCVBUF
  • SO_SNDBUF 归于 SocketChannal 参数
  • SO_RCVBUF 既可用于 SocketChannal 参数,也能够用于 ServerSocketChannal 参数(主张设置到 ServerSocketChannal 上)

内存缓冲区

io 只能是直接内存,ctx获取的 bytebuf

三种线程模型

单线程模型

异步非堵塞io,一切的操作都是由一个nio线程处理,合适特别小的程序

一个单线程负荷过度,客户端向服务端超时,服务端的超时处理睬处理,终究卡死就会宕机

多线程模型

分红reactor 单线程和reactor 多线程 单线程担任承受新来的恳求 处理逻辑交给线程池处理

由一组nio线程组成

主从线程模型

由一组线程池承受恳求,一组线程池处理io