Netty 入门
概述
什么是 Netty?
- 异步:用的多线程,不是异步 IO
- 依据事情驱动:表明用的 Selector
Netty 是一个异步的、依据事情驱动的网络应用结构,用于快速开发可保护、高功能的网络服务器和客户端
Netty 的地位
Netty 在 Java 网络应用结构中的地位就比方:Spring 结构在 JavaEE 开发中的地位
以下的结构都运用了 Netty,由于它们有网络通讯需求!
- Cassandra – nosql 数据库
- Spark – 大数据分布式核算结构
- Hadoop – 大数据分布式存储结构
- RocketMQ – ali 开源的音讯行列
- ElasticSearch – 搜索引擎
- gRPC – rpc 结构
- Dubbo – rpc 结构
- Spring 5.x – flux api 完全扔掉了 tomcat ,运用 netty 作为服务器端
- Zookeeper – 分布式和谐结构
Netty 的优势
Netty vs NIO,工作量大,bug 多
- 需求自己构建协议
- 处理 TCP 传输问题,如粘包、半包
- epoll 空轮询导致 CPU 100%
- 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
Netty vs 其它网络应用结构
- Mina 由 apache 保护,将来 3.x 版别或许会有较大重构,损坏 API 向下兼容性,Netty 的开发迭代更敏捷,API 更简洁、文档更优秀
创立服务器 / 客户端
- 首要创立启动器
- 创立NioEventLoopGroup依据NIO服务端完成
- childHandler表明增加的处理器都是给SocketChannel用的
- ChannelInitializer 仅仅履行一次
- 客户端 SocketChannel 树立衔接后,履行 initChannel 以便增加更多的处理器
- new ServerBootstrap().bind 绑定监听端口
服务端
// 1. 启动器,担任拼装 netty组件,启动服务器
new ServerBootstrap()
// BossEventLoop,WorkerEventLoop(selector, thread)
// 创立一个事情循环线程组,适用于 NIO 完成,能够简略理解为 `线程池 + Selector`
.group(new NioEventLoopGroup())
// 指定要运用的服务器 Channel 完成,适用于 NIO 完成
.channel(NioServerSocketChannel.class) // OIO BIO
// BOSS 担任处理衔接Work(child)担任处理读写,决议了work(child)能履行哪些(handler)
// 装备服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
.childHandler(
// 5. channel代表客户端进行数据读写的通道Initializer初始化,担任增加到其他 handler
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 增加详细的 handler
// 将 bytebuf 转换为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 自界说handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override// 读事情
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印转好的字符串
System.out.println(msg);
}
});
}
})
// 监听端口
.bind(8080);
客户端
// 启动类
new Bootstrap()
// 增加 EventLoop
.group(new NioEventLoopGroup())
// 挑选客户端 channel 完成
.channel(NioSocketChannel.class)
// 增加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override // 在树立衔接后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 把字符串编码成字节
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 衔接服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 向服务器发送数据
.writeAndFlush("hello, world");
流程剖析
- 先创立启动器类
- 增加组件,eventloop(内部就有线程和挑选器不断循环,查找事情)
- 挑选NIOServerSocket 完成
- 增加处理器,只要衔接事情发生之后才会履行 initChannel初始化办法
- 绑定监听端口(服务端就到这儿)
- (客户端)创立启动器和eventloop
- 客户端挑选socket事情
- 增加处理器,也是等衔接树立才会履行初始化办法
- 终究衔接服务器
- 服务器监听到accept事情之后
- 终究找处理器处理这个事情
- (咱们看不明理线),衔接树立后调用初始化办法
- 客户端 sync 只要衔接之后才会持续履行
- channel() 这个是衔接目标
- 终究就能够读写
- 发数据就会走到处理器内部
- 进行转为字节数组进行 bytebuf进行发送
- 服务端的eventloop就会监听到读事情
- 走到了服务器的处理器,进行处理
channel数据传输通道,可读出来可写进。和nio概念一致
handel中的message活动数据,handel 是一个工序,对原始数据进行一道道工序进行处理
pipeline便是流水线,一道工序,进行增加工序。
handel 分为 inbound和outbound
入站和出站 读入就走入站,写出就走出站
eventloop便是线程,相当于工人 一旦某一个工人担任一个事情,那就会担任究竟。一个工人是能够办理多个事情的
eventloop既能够履行io操作,也能够进行一般使命,每个工人都有自己的使命行列,顺次处理。底层用的单线程的线程池。
使命能够是守时使命也能够是一般使命
组件
EventLoop
EventLoop是事情循环目标
实质是一个单线程履行器,一起保护了Selector,里边有run办法处理 channel 上源源不断的 io 事情。
承继关系:
- 一条线是承继自 java.util.concurrent.ScheduledExecutorService
- 因而包含了线程池的一切办法
- 另一条是承继自 netty 自己的 OrderedEventExecutor
- 供给了 Boolean inEventLoop(Thread thread)办法判别一个线程是否归于此EventLoop
- 供给了 parent办法来看看自己归于哪个 EventLoopGroup
// eventloop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
// 承继线程池接口
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {
EventLoopGroup 事情循环组,咱们一般运用这个
EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 办法俩绑定其间一个 EventLoop,后续这个 channel 上的 io 事情都由同一个 EventLoop 来处理(确保了 io 事情处理时的线程安全)
- 承继自 netty 自己的 EventExecutorGroup
- 完成了 Iterable 接口供给遍历 EventLoop 的能力
- 另有 next 办法获取调集中下一个 EventLoop
咱们平常运用的是下面两个循环组完成类
// 能够完成一般使命,io事情,守时使命
EventLoopGroup nioGroup = new NioEventLoopGroup();
// 一般使命和守时使命
EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
那么咱们空参创立目标,线程数是多少呢
下面这段代码,是用来初始化 EventLoopGroup目标的。
假如没有指定线程数,会选用默许的,也便是其时体系的CPU核心数 * 2
/**
* 结构函数,创立MultithreadEventLoopGroup目标
* @param nThreads 表明EventLoopGroup中EventLoop的数量,假如为0则运用默许的线程数
* @param executor 用于履行使命的Executor目标
* @param args 可选参数列表
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 调用父类的结构函数,初始化EventLoopGroup目标
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
获取其时CPU核数–我的是 16核
NettyRuntime.availableProcessors();
指定线程数量
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
获取下一个线程
// 创立两个线程的事情循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 获取下一个线程
System.out.println(nioGroup.next()); // 第一次打印第一个
System.out.println(nioGroup.next()); // 打印第二个
System.out.println(nioGroup.next()); // 打印第一个,由于总共就两个
System.out.println(nioGroup.next()); // 打印第二个
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
履行一般使命
- 这儿的execute和submit都是一样的效果
// 创立两个线程的事情循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 履行一般使命
nioGroup.next().execute(() -> {
log.debug("ok");
});
log.debug("main");
守时使命
// 创立两个线程的事情循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 履行守时使命 以必定的频率履行
// 参数1:履行的办法
// 参数2:表明第一次启动多久开端
// 参数3:表明距离多久再次触发
// 参数4:单位
nioGroup.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 3, 5, TimeUnit.SECONDS);
log.debug("main");
IO 事情
下面是服务端,客户端同上面。
一个客户端绑定一个服务的 EventLoop线程。
下面再编写客户端的时候,假如要打上断点完成堵塞,需求将idea的断点卡成单线程的
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
// 假如咱们没有用 nio的处理字节办法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实践开发是要指定字符类型的,不要默许
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
咱们来分工细化一下
- 咱们能够把 eventloop 划分为 boss 和 work
- 将accept 和 read 分隔处理
那 第一个事情循环组 线程是否能够设置为 1 呢
由于服务器这一有个,它也只会和里边一个 eventloop 进行绑定
new ServerBootstrap()
// 将 参数1:只处理accept 参数2:处理read
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
持续分工细化–假如其间一个Nio线程履行中
在读操作时履行太久,会影响其他 channel读操作
最好不要让它占用 work nio线程,所以咱们持续细分
- 创立独立的事情循环目标,由于不需求进行io所所以一般的
- 将下个处理器绑定上。
// 细分2:创立独立的 EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
// 假如咱们没有用 nio的处理字节办法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实践开发是要指定字符类型的,不要默许
log.debug(buf.toString(Charset.defaultCharset()));
// 让音讯传递给下一个handler
ctx.fireChannelRead(msg);
}
})
// 指定事情循环组和名称
.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override
// 假如咱们没有用 nio的处理字节办法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实践开发是要指定字符类型的,不要默许
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
那么是怎样完成切换事情循环组,也便是换人处理的呢?
假如两个 handler 绑定的是同一个线程,那么直接调用,不然调用的代码封装为一个使命目标。由一下一个 handler的线程调用
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事情循环是否与其时的事情循环是同一个线程
/**
* EventExecutor 是事情循环组
*/
EventExecutor executor = next.executor();
// 其时 handler 中的线程是否和 eventloop 是同一个线程
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要履行的代码作为使命提交给下一个事情循环处理(换人)
else {
// 运用runnable 履行一个事情
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
Channel & 衔接 & 封闭问题 & 异步
channel 的首要效果
- close() 能够用来封闭 channel
- closeFuture() 用来处理 channel 的封闭
- sync 办法效果是同步等候 channel 封闭
- 而 addListener 办法是异步等候 channel 封闭
- pipeline() 办法增加处理器
- write() 办法将数据写入,仅仅写进入缓冲区
- writeAndFlush() 办法将数据写入并刷出
connect 衔接问题
connect:是异步堵塞,main办法主张,履行是Nio的线程
所以假如在发送数据前,必定要确保现已衔接好了,不然数据是发送不出去的
ChannelFuture:带有Future或者 promise都是和异步配套运用,用来处理成果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// connect 是异步堵塞,main办法主张调用,真正履行衔接的是Nio线程
.connect(new InetSocketAddress("localhost", 8080));
// channelFuture.sync();
// 会无堵塞向下履行获取channel,终究发送数据
Channel channel = channelFuture.channel();
// log.debug("{}", channel); // 假如没有获取到打印的便是没有衔接的 channel,所以需求履行 sync
channel.writeAndFlush("123");
处理衔接问题,确保发送数据前必定是正确衔接的
- sync
// 堵塞其时线程,直到衔接树立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
- 将履行发送的代码,交给nio线程。
- addListener(回调目标)办法异步处理成果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在nio 线程衔接树立好之后,会调用operationComplete
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
}
});
封闭问题
咱们看下面这段代码
咱们能够输入内容,发送给服务端,按 q 封闭传输通道
但是咱们封闭的时候,想做一些操作,怎样整 直接在close下面写吗,那是不对的,由于close办法是异步的
channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(new Runnable() {
@Override
public void run() {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}
}, "input").start();
处理异步问题
增加日志调试
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
- 同步处理—由主线程处理封闭操作
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 增加封闭处理器
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();// 设置同步
log.debug("处理封闭之后....");
- 异步处理–由 nio 线程处理
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
// 设置封闭回调函数
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("处理封闭之后....");
}
});
高雅的封闭
- 它会等候一切正在处理的使命完成后,再进行封闭。
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理封闭之后的操作");
group.shutdownGracefully(); // 高雅的封闭
}
});
异步
异步:便是一个线程主张衔接一个线程去树立衔接
考虑下面的场景,4 个医师给人治病,每个患者花费 20 分钟,而且医师治病的过程中是以患者为单位的,一个患者看完了,才干看下一个患者。假定患者源源不断地来,能够核算一下 4 个医师一天工作 8 小时,处理的患者总数是:4 * 8 * 3 = 96
个患者
那咱们就能够细分一下,四个医师别离处理四个事情
只要一开端,医师 2、3、4 别离要等候 5、10、15 分钟才干履行工作,但只要后续患者源源不断地来,他们就能够满负荷工作,而且处理患者的能力提高到了 4 * 8 * 12
功率几乎是原来的四倍
要点:
- 单线程无法异步提高功率,有必要配合多线程、多核CPU才干发挥异步的优势
- 异步并没有缩短呼应时刻,反而有所增加
- 合理进行使命拆分,也是运用异步的要害
Future & Promise
处理异步时:经常用到的两个接口, futer 核 promise
首要要阐明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 承继自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future:只能同步等候使命完毕(或成功、或失利)才干得到成果
- netty Future:能够同步等候使命完毕得到成果,也能够异步办法得到成果,但都是要等使命完毕
- netty Promise;不只要 netty Future 的功用,而且脱离了使命独立存在,只作为两个线程间传递成果的容器
功用/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消使命 | – | – |
isCanceled | 使命是否取消 | – | – |
isDone | 使命是否完成,不能区别成功失利 | – | – |
get | 获取使命成果,堵塞等候 | – | – |
getNow | – | 获取使命成果,非堵塞,还未发生成果时回来 null | – |
await | – | 等候使命完毕,假如使命失利,不会抛反常,而是经过 isSuccess 判别 | – |
sync | – | 等候使命完毕,假如使命失利,抛出反常 | – |
isSuccess | – | 判别使命是否成功 | – |
cause | – | 获取失利信息,非堵塞,假如没有失利,回来null | – |
addLinstener | – | 增加回调,异步接纳成果 | – |
setSuccess | – | – | 设置成功成果 |
setFailure | – | – | 设置失利成果 |
jdk future
// 线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 提交使命
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("履行核算"); // poll 线程
Thread.sleep(1000);
return 50;
}
});
// 主线程经过 future 获取成果
log.debug("等候成果"); // 主线程
log.debug("成果是 {}", future.get()); // 主线程
nio future
// 创立事情循环组
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
// 获取履行事情
EventLoop eventLoop = eventGroup.next();
// 履行办法
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("履行核算"); // nio 线程
Thread.sleep(1000);
return 50;
}
});
同步获取线程回来成果
// 主线程经过 future 获取成果
log.debug("等候成果"); // 主线程
log.debug("成果是 {}", future.get()); // 主线程
异步
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("等候成果"); // nio 线程
log.debug("成果是 {}", future.getNow()); // nio 线程
}
});
promise 获取成果
// 1. 准备 EventLoop 目标
EventLoop loop = new NioEventLoopGroup().next();
// 2. 主动创立 promise 成果容器
DefaultPromise<Integer> promise = new DefaultPromise<>(loop);
new Thread(new Runnable() {
@Override
public void run() {
// 3. 恣意一个线程履行核算,核算完毕运用 promise 填充成果
log.debug("开端核算...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
// 填写失利的数据
promise.setFailure(e);
}
// 填写成功的数据
promise.setSuccess(90);
}
}).start();
log.debug("等候成果...");
log.debug("成果是:{}", promise.get());
handler & pipeline
ChannelHandle 用来处理 Channel 上的各种事情,分为入站和出站两种。
当一切的ChannelHandle 连在一起便是 pipeline
- 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,首要用来读取客户端数据,写回成果
- 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,首要对写回成果进行加工
入站是依照次序履行的也便是, 会打印1,2,3
出站是依照逆序的办法,会打印6,5,4
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 增参加站处理器
nioSocketChannel.pipeline().addLast("h1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
// 运用通道写入数据并改写。ctx通道信息,获取上下文的分配器,创立bufer输出数据
nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes()));
}
});
// 同上
log.debug("2");
// 同上
log.debug("3");
//
// 增加出站处理器
nioSocketChannel.pipeline().addLast("h4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
});
// 同上
log.debug("5");
//
// 同上
log.debug("6");
//
}
})
.bind(8080);
出站处理器
当咱们用 channel 的办法发送数据,会从尾部开端顺次履行出站过滤器
当用办法内部的 ctx channel的办法发送,会从其时方位开端往前找
调试出站入站,handler
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 模仿入站操作
// channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
// 模仿出站操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
}
bytebuf
创立了一个默许的 ByteBuf(池化依据直接内存的 ByteBuf),初始容量是 10
假如不指定,默许256,扩容 * 2
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
public static void main(String[] args) {
// 创立默许空间的 bytebuf,空间容量默许256,超越空间容量,会主动扩容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// read index:0 write index:0 capacity:256
log(buffer);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 300; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes());
log(buffer);
// read index:0 write index:300 capacity:512
}
// netty 调试 bytebuf 办法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
直接内存 & 堆内存
默许是创立的直接内存
创立堆内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10);
创立直接内存
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10);
直接内存创立和毁掉的价值贵重,但读写功率高(少一次内存仿制),合适配合池化功用一重用
直接内存对 GC 压力小,由于这部分内存不受 JVM 废物收回的办理,但也要留意及时主动开释
池化 vs 非池化
池化的最大意义在于能够重用 ByteBuf,优点有
- 没有池化,则每次都得创立新的 ByteBuf 实例,这个操刁难直接内存价值贵重,就算是堆内存,也会增加 GC 压力
- 有了池化,则能够重用池中 ByteBuf 实例,而且选用了与 jemalloc 类似的内存分配算法提高分配功率
- 高并发时,池化功用更节省内存,削减内存溢出的或许
池化功用是否敞开,能够经过下面的体系环境变量来设置
下面的代码是在 vm虚拟机参数里设置的
-Dio.netty.allocator.type={unpooled|pooled}
NIO 的缓冲区池化能够有效地削减创立和毁掉缓冲区目标的成本,提高程序的功能和吞吐量。
- 4.1 今后,非 Android 渠道默许启用池化完成,Android 渠道启用非池化完成
- 4.1 之前,池化功用还不老练,默许是非池化完成
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer();
System.out.println(buffer.getClass());
// class io.netty.buffer.PooledUnsafeDirectByteBuf
/*
* PooledUnsafeDirectByteBuf
* Unpooled:非池化
* Pooled:池化
* Direct:直接内存
* Heap:堆内存
*/
}
Bytebuf的组成部分
- 抛弃字节:表明现已读过了
- 可读字节:未读的数据
- 可写字节:未写入的数据
- 可扩容:最大的容量,悉数的
butebuf 是读写两个指针,不需求切换读写形式,最开端读写指针都在 0 方位
写读办法
办法签名 | 意义 | 补白 |
---|---|---|
writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
writeByte(int value) | 写入 byte 值 | |
writeShort(int value) | 写入 short 值 | |
writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
writeLong(long value) | 写入 long 值 | |
writeChar(int value) | 写入 char 值 | |
writeFloat(float value) | 写入 float 值 | |
writeDouble(double value) | 写入 double 值 | |
writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
writeBytes(byte[] src) | 写入 byte[] | |
writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
留意
- 这些办法的未指明回来值的,其回来值都是 ByteBuf,意味着能够链式调用
- 网络传输,默许习惯是 Big Endian
扩容规矩是
- 写入后数据巨细未超越 512,则挑选下一个 16 的整数倍,例如写入后巨细为 12 ,则扩容后 capacity 是 16
- 写入后数据巨细超越 512,则挑选下一个 2^n,例如写入后巨细为 513,则扩容后 capacity 是 2^10=1024(2^9=512 现已不够了)
- 扩容不能超越 max capacity 会报错
读入
// 只读入一个字节
buffer.readByte()
假如需求重复读取 int 整数 5,怎样办?
能够在 read 前先做个符号 mark
buffer.markReaderIndex();
System.out.println(buffer.readInt());//读入
log(buffer);
这时要重复读取的话,重置到符号方位 reset
buffer.resetReaderIndex();
retain & release(废物收回)
由于 Netty 中有堆外内存的 ByteBuf 完成,堆外内存最好是手动来开释,而不是等 GC 废物收回。
- UnpooledHeapByteBuf 运用的是 JVM 内存,只需等 GC 收回内存即可
- UnpooledDirectByteBuf 运用的便是直接内存了,需求特别的办法来收回内存
- PooledByteBuf 和它的子类运用了池化机制,需求更杂乱的规矩来收回内存
Netty 这儿选用了引证计数法来控制收回内存,每个 ByteBuf 都完成了 ReferenceCounted 接口
- 每个 ByteBuf 目标的初始计数为 1
- 调用 release 办法计数减 1,假如计数为 0,ByteBuf 内存被收回
- 调用 retain 办法计数加 1,表明调用者没用完之前,其它 handler 即使调用了 release 也不会造成收回
- 当计数为 0 时,底层内存会被收回,这时即使 ByteBuf 目标还在,其各个办法均无法正常运用
留意:由于 pipeline 的存在,一般需求将 ByteBuf 传递给下一个 ChannelHandler,假如在 try finally里 relase,那么就失去了传递性,有必要等 bytebuf 完成了它的使命,才干够。
基本规矩是,谁是终究运用者,谁担任 release
入站 bytebuf 处理准则 | |
---|---|
对原始 ByteBuf 不做处理 | 无须 release |
将原始 ByteBuf 转换为其它类型的 Java 目标 | ByteBuf 就没用了,有必要 release |
不调用 ctx.fireChannelRead(msg) 向后传递 | 有必要 release |
ByteBuf 没有成功传递到下一个 | 有必要 release |
出站 ByteBuf 处理准则:由 HeadContext flush 后 release
反常处理准则
- 有时候不清楚 ByteBuf 被引证了多少次,但又有必要完全开释,能够循环调用 release 直到回来 true
看看开释 bytebuf 的源码
尾部开释
在 io.netty.channel 的类中的内部类 TailContext 下面办法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
DefaultChannelPipeline.this.onUnhandledInboundMessage(ctx, msg);
}
持续跟进
protected void onUnhandledInboundMessage(ChannelHandlerContext ctx, Object msg) {
this.onUnhandledInboundMessage(msg);
if (logger.isDebugEnabled()) {
logger.debug("Discarded message pipeline : {}. Channel : {}.", ctx.pipeline().names(), ctx.channel());
}
}
这儿就看到了 ReferenceCountUtil.release(msg); 用来开释
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
用来判别,假如是这个类型的,代表能够开释,不然回来false
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}
看看头部开释的源码
io.netty.channel下的HeadContext内部类的write办法
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
this.unsafe.write(msg, promise);
}
咱们跟进 AbstractChannel完成类的 AbstractUnsafe内部类的 write办法
假如是通道是null 表明用计数办法封闭,并处理反常
this.assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
try {
ReferenceCountUtil.release(msg);
} finally {
this.safeSetFailure(promise, this.newClosedChannelException(AbstractChannel.this.initialCloseCause, "write(Object, ChannelPromise)"));
}
} else {
int size;
try {
msg = AbstractChannel.this.filterOutboundMessage(msg);
size = AbstractChannel.this.pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable var15) {
try {
ReferenceCountUtil.release(msg);
} finally {
this.safeSetFailure(promise, var15);
}
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
slice 零仿制表现之一
效果:削减内存仿制
【零仿制】的表现之一,对原始 ByteBuf 进行切片成多个 ByteBuf,切片后的 ByteBuf 并没有发生内存仿制,仍是运用原始 ByteBuf 的内存,切片后的 ByteBuf 保护独立的 read,write 指针
切片之后,会对容量进行约束,切多少便是多少。不能再次增加
public class Server {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
// 赋值是个数据
buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
// 打印
log(buffer);
// 进行切片,这儿是没有发生数据仿制的
// 从0 切到 5个,a-e
ByteBuf f1 = buffer.slice(0, 5);
// 从5 切到 5个,f-j
ByteBuf f2 = buffer.slice(5, 5);
log(f1);
log(f2);
System.out.println("===============");
// 修该第0位也便是 a,发现原bytebuf也修改了
f1.setByte(0, 'b');
log(f1);
log(buffer);
}
// netty 调试 bytebuf 办法
private static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
原 bytebuf 开释之后,切片的bytebuf是不能用了
处理上面的问题
咱们用完那个,主动开释它自己的 bytebuf
东西类仍是上面的
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10);
buffer.writeBytes(new byte[]{'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'});
log(buffer);
ByteBuf f1 = buffer.slice(0, 5);
// 让引证数据加1
f1.retain();
ByteBuf f2 = buffer.slice(5, 5);
log(f1);
log(f2);
System.out.println("开释原 bytebuf");
// 让原bytebuf 减1,开释不了真正的内存,没有减到0,不会开释内存
buffer.release();
log(f1);
}
duplicate
【零仿制】的表现之一,就比方截取了原始 ByteBuf 一切内容,而且没有 max capacity 的约束,也是与原始 ByteBuf 运用同一块底层内存,仅仅读写指针是独立的
copy
会将底层内存数据进行深仿制,因而无论读写,都与原始 ByteBuf 无关
零仿制,合并小bytebuf
从头核算了 指针方位,也是比较麻烦, 仍是要留意 relase问题哦
public static void main(String[] args) throws Exception {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(10);
buf1.writeBytes(new byte[]{6, 7, 8, 9, 10});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(10);
buf2.writeBytes(new byte[]{1, 2, 3, 4, 5});
// 合并上面两个 buf,不发生仿制
// 默许不会主动调整写入的方位,有必要带上true
CompositeByteBuf bufs = ByteBufAllocator.DEFAULT.compositeBuffer();
bufs.addComponents(true, buf1, buf2);
log(bufs);
}
Unpooled
Unpooled 是一个东西类,类如其名,供给了非池化的 ByteBuf 创立、组合、仿制等操作
这儿仅介绍其跟【零仿制】相关的 wrappedBuffer 办法,能够用来包装 ByteBuf
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
// 当包装 ByteBuf 个数超越一个时, 底层运用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
ByteBuf 优势
- 池化 – 能够重用池中 ByteBuf 实例,更节省内存,削减内存溢出的或许
- 读写指针别离,不需求像 ByteBuffer 一样切换读写形式
- 能够主动扩容
- 支撑链式调用,运用更流通
- 很多当地表现零仿制,例如 slice、duplicate、CompositeByteBuf
双向通讯
服务端
public static void main(String[] args) throws Exception {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
// 主张运用 ctx.alloc() 创立 ByteBuf
ByteBuf response = ctx.alloc().buffer();
response.writeBytes(buffer);
ctx.writeAndFlush(response);
// 考虑:需求开释 buffer 吗
// 从 Socket 读取到字节数据之后,将数据转换成 ByteBuf 目标。
// 在 Netty 的特别完成下,当该音讯被服务端处理完成
// 即当音讯被写回客户端之后,Netty 会主动开释其占用的内存。
// 考虑:需求开释 response 吗
// response 是咱们手动创立的 ByteBuf 目标,而不是从音讯中经过解码发生的,因而咱们需求手动开释该目标
// 又由于咱们向客户端写入数据时运用了 ctx.writeAndFlush(response)
// 该办法会主动在发送完成后将 response 进行开释,因而咱们不需求手动开释该目标。
// 但假如是经过 write() 办法写入数据,那么咱们需求在写入完成后手动调用
// response.release() 办法对 response 目标进行开释。
}
});
}
}).bind(8080);
}
客户端
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
Channel channel = new Bootstrap().group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(Charset.defaultCharset()));
} finally {
// 开释
ReferenceCountUtil.release(msg);
}
// 考虑:需求开释 buffer 吗
// 接纳到的 butebuf需求显示开释,防止内存泄露
// 假如不,则会保留到程序退出
}
});
}
}).connect("localhost", 8080).sync().channel();
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}).start();
}
读和写误解
我最初在认识上有这样的误区,以为只要在 netty,nio 这样的多路复用 IO 模型时,读写才不会相互堵塞,才干够完成高效的双向通讯,但实践上,Java Socket 是全双工的:在恣意时刻,线路上存在A 到 B
和 B 到 A
的双向信号传输。即使是堵塞 IO,读和写是能够一起进行的,只要别离选用读线程和写线程即可,读不会堵塞写、写也不会堵塞读
服务端
public class TestServer {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888);
Socket s = ss.accept();
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
// 例如在这个方位参加 thread 级别断点,能够发现即使不写入数据,也不妨碍前面线程读取客户端数据
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
客户端
public class TestClient {
public static void main(String[] args) throws IOException {
Socket s = new Socket("localhost", 8888);
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(s.getInputStream()));
while (true) {
System.out.println(reader.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
for (int i = 0; i < 100; i++) {
writer.write(String.valueOf(i));
writer.newLine();
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
netty 进阶
在Client中:在入站处理器中,channelActive办法会在客户端衔接成功后触发。
- option(ChannelOption.SO_RCVBUF, 10):设置缓冲区巨细 = 10
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
// 设置缓冲区巨细,就不会一次悉数接纳了,但是发生了半包问题
.option(ChannelOption.SO_RCVBUF, 10);
看一组黏包现象,咱们下面当客户端衔接后会发送数据到服务端,但是咱们期望循环一次发一次,服务端却一次悉数接纳了
READ: 160B,一次悉数接纳了
服务端
@Slf4j
public class Server {
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
// 封闭时,同步封闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
boss.shutdownGracefully(); // 安全封闭
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
}
客户端
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在衔接树立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 循环写入
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
半包问题,便是设置了缓冲区巨细,然后客户端发送的数据是
留意:TCP 存在黏包半包,UDP 是不存在的
TCP 应答机制(滑动窗口)
TCP 可靠:发信息到对端,对端有必要应答,假如长时刻没有应答,会重发信息
,影响吞吐量
滑动窗口处理上面问题
粘包:发送方发送一个完好报文,由于接纳方处理不及时,且窗口比较大,就构成粘包
半包:接纳方只剩下了一部分,发送方的数据不能悉数放下,只能先放一部分,等候窗口移动持续放,构成半包
处理办法:引入窗口
窗口巨细决议了无需等候应答而能够持续发送的数据最大值
窗口实践就起到一个缓冲区的效果,一起也能起到流量控制的效果
- 图中深色的部分即要发送的数据,高亮的部分即窗口
- 窗口内的数据才答应被发送,当应答未到达前,窗口有必要中止滑动
- 假如 1001~2000 这个段的数据 ack 回来了,窗口就能够向前滑动
- 接纳方也会保护一个窗口,只要落在窗口内的数据才干答应接纳
短衔接
处理粘包:客户端每发送一次,就封闭衔接,也叫做短衔接
下面是客户端,服务端仍是上面的
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
send();
}
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在衔接树立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 循环写入
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (
InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
但是会发生半包,假如服务端设置了 衔接的 bytebuf 巨细,发送方发的数据过大,每次都接纳不全
这儿说下,修改容量的办法–ServerBootstrap的类
// 是针对每个衔接的装备,完成默许,会1024的容量
// 现在改成最小值,初始值,最大值。最低是十六,内部会取16的整数倍
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
固定长度
客户端和服务端约定好。服务端每次解码10个字节,多的就等下次够十个再解码。
留意次序:处理器,必定要先解码再打印日志信息。解码处理器在前面
服务端先参加。
// 增加定长解码器,长度和客户端约定好
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
客户端—每次都是十个字节一个数据,不足用_补齐
public static void main(String[] args) {
send();
}
/**
* 填充数组为指定长度,并将数组中的元素替换为指定字符对应的字节值。
*
* @param c 要替换的字符
* @param len 数组长度
* @return 填充后的字节数组
*/
public static byte[] fill10Bytes(char c, int len) {
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) 95);
for (int i = 0; i < len; ++i) {
bytes[i] = (byte) c;
}
System.out.println(new String(bytes));
return bytes;
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在衔接树立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 循环写入
ByteBuf buffer = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(fill10Bytes(c++, r.nextInt(10) + 1));
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (
InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
服务端
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
// 增加定长解码器,长度和客户端约定好
nioSocketChannel.pipeline().addLast(new FixedLengthFrameDecoder(10));
nioSocketChannel.pipeline().addLast(new LoggingHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
// 封闭时,同步封闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
boss.shutdownGracefully(); // 安全封闭
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
缺陷:会浪费字节,长度需求决议计划好
行解码器
换行符解码器,不能指定,默许 \n \r
// 还有一种DelimiterBasedFrameDecoder
// 参数1是最大长度,参数2:是指定分隔符
addLast(new LoggingHandler(LogLevel.DEBUG));
客户端
public static void main(String[] args) {
send();
}
/**
* 运用指定的字符生成一个指定长度的字符串,并在字符串结尾增加换行符。
*
* @param c 要生成的字符
* @param len 字符串长度
* @return 生成的字符串
*/
public static StringBuilder makeString(char c, int len) {
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; ++i) {
sb.append(c);
}
sb.append("\n");
return sb;
}
private static void send() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap().channel(NioSocketChannel.class).group(worker);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channelActive:在衔接树立好之后触发
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 循环写入
ByteBuf buffer = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 10; i++) {
String s = makeString(c++, r.nextInt(256) + 1).toString();
buffer.writeBytes(s.getBytes());
}
ctx.writeAndFlush(buffer);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost", 8254).sync();
channelFuture.channel().closeFuture().sync();
} catch (
InterruptedException e) {
log.error("client error...", e);
} finally {
worker.shutdownGracefully();
}
}
服务端
public void start() {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class)
.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
// 增加依据换行符的数据,遇到 \n \r等就会将这条数据截取。每一行最大数据1024
// 超出则丢掉
nioSocketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024));
nioSocketChannel.pipeline().addLast(new LoggingHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8254).sync();
// 封闭时,同步封闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
boss.shutdownGracefully(); // 安全封闭
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
LTC 解码器
lengthFieldOffset
长度字段偏移量,也便是长度字节开端方位,图中等于0,表明长度字节从 0 开端
lengthFieldLength
长度字段自身是多长,图中等于 2,表明000C,C = 12
lengthAdjustment
长度字段为基准,还有几个字节是内容
initialBytesToStrip
从头剥离几个字节,也便是字段长度记载了内容长度,总共两个字节,解析完想删除这两个字节
就能够依据上面信息,从0开端读,读12个字节拿到 hello world
下面这个有点不一样,它多带了一个头,从第 2 个字节开端读,总共三个长度字段
表明从0开端读,长度字段为 3 个,不需求剥离,越过两个字节,开端读取12个内容字节
下面办法测验
public static void main(String[] args) throws Exception {
// netty 的测验办法,省去很多
EmbeddedChannel channel = new EmbeddedChannel(
// 参数1:最大长度,参数2:长度字段开端方位,参数3:长度字段自身字节数,参数4:是否越过字节,参数5:剥离长度
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 1, 4),
new LoggingHandler(LogLevel.DEBUG)
);
// 四个字节的内容长度,实践内容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
send(buffer, "Hello, world");
send(buffer, "Hi!");
// 运用 Em的channel写入 bufer
channel.writeInbound(buffer);
// 打印
// .Hello, world,
// 前面的 . 代表咱们的附加内容,假如不剥离字段长度,会更多
}
private static void send(ByteBuf buffer, String content) {
byte[] bytes = content.getBytes();
buffer.writeInt(bytes.length); // 先写入内容的长度
buffer.writeByte(1); // 咱们写入一个附加内容,前面现已越过的字节,在参数4方位
buffer.writeBytes(bytes);
}
协议设计与解析
TCP/IP 中音讯传输依据流的办法,没有鸿沟。
协议的意图便是划定音讯的鸿沟,制定通讯两边要共同恪守的通讯规矩
- redis 协议
- 要求咱们先发送
- *3:表明三个元素
- 每个指令的长度
- $3:表明 set 三个长度
- set
- $4
- name
- $8
- zhangsan
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
final byte[] LINE = new byte[]{13, 10};
try {
Bootstrap bootstrap = new Bootstrap().group(worker).channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes()); // name 四个长度
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$8".getBytes()); // zhangsan 8个长度
buffer.writeBytes(LINE);
buffer.writeBytes("zhangsan".getBytes());
buffer.writeBytes(LINE);
ctx.writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(buf.toString(Charset.defaultCharset()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("localhost",6379).sync();
// 封闭时,同步封闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
worker.shutdownGracefully();
}
}
http 协议
new HttpServerCodec()
Netty 供给的一个方便的处理器,它将实践上是将 HttpRequestDecoder 和 HttpResponseEncoder 组合在一起,前者用于解码 HTTP 恳求,后者用于编码 HTTP 呼应。
下面是简略运用
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
NioEventLoopGroup boss = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new HttpServerCodec());
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("{}", msg.getClass());
if (msg instanceof HttpRequest) { // 恳求行,恳求头(是get恳求)
} else if (msg instanceof HttpContent) { // 恳求体(不是get)
}
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
// 封闭时,同步封闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
worker.shutdownGracefully();
}
}
咱们期望只关注某一种类型的处理
SimpleChannelInboundHandler:只关怀特定的音讯,request或者httpconent,不是则越过
- DefaultFullHttpResponse:获取呼应目标,进行写回数据
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
NioEventLoopGroup boss = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap().group(boss, worker).channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) {
nioSocketChannel.pipeline().addLast(new LoggingHandler());
nioSocketChannel.pipeline().addLast(new HttpServerCodec()); // 即使入栈处理器也是出站处理器
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) throws Exception {
// 获取恳求
log.debug(httpRequest.uri()); // 恳求行
log.debug("{}", httpRequest.headers()); // 恳求头
// 获取呼应
/*
* version 版别
* status 呼应状况
*/
DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(), HttpResponseStatus.OK);
// 向浏览器写入数据
byte[] bytes = "<h1>Hello World</h1>".getBytes();
response.content().writeBytes(bytes);
// 咱们应该在呼应头加 呼应体长度,不然浏览器会一向在等候呼应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length); // netty 结构 HttpHeaderNames
// 写回呼应
channelHandlerContext.writeAndFlush(response);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8352).sync();
// 封闭时,同步封闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.debug("server error...", e);
} finally {
worker.shutdownGracefully();
}
}
自界说协议要素
- 魔数,用来在第一时刻断定是否是无效数据包
- 版别号,能够支撑协议的晋级
- 序列化算法,音讯正文究竟选用哪种序列化反序列化办法,能够由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟事务相关
- 恳求序号,为了双工通讯,供给异步能力
- 正文长度
- 音讯正文
先写编解码器
这儿写一个 message 的发送的信息,仅仅一个类型状况
会有多个子类承继这个 message 类,表明多个状况
public abstract class Message implements Serializable {
/**
* 依据音讯类型字节,取得对应的音讯 class
*
* @param messageType 音讯类型字节
* @return 音讯 class
*/
public static Class<? extends Message> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
// 存储序列号
private int sequenceId;
// 存储音讯的类型
private int messageType;
public abstract int getMessageType();
// 登录恳求音讯
public static final int LoginRequestMessage = 0;
// 登录呼应音讯
public static final int LoginResponseMessage = 1;
// 谈天恳求音讯
public static final int ChatRequestMessage = 2;
// 谈天呼应音讯
public static final int ChatResponseMessage = 3;
// 创立群组恳求音讯
public static final int GroupCreateRequestMessage = 4;
// 创立群组呼应音讯
public static final int GroupCreateResponseMessage = 5;
// 参加群组恳求音讯
public static final int GroupJoinRequestMessage = 6;
// 参加群组呼应音讯
public static final int GroupJoinResponseMessage = 7;
// 退出群组恳求音讯
public static final int GroupQuitRequestMessage = 8;
// 退出群组呼应音讯
public static final int GroupQuitResponseMessage = 9;
// 群组谈天恳求音讯
public static final int GroupChatRequestMessage = 10;
// 群组谈天呼应音讯
public static final int GroupChatResponseMessage = 11;
// 获取群组成员恳求音讯
public static final int GroupMembersRequestMessage = 12;
// 获取群组成员呼应音讯
public static final int GroupMembersResponseMessage = 13;
// 心跳恳求音讯
public static final int PingMessage = 14;
// 心跳呼应音讯
public static final int PongMessage = 15;
/**
* 恳求类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
/**
* 呼应类型 byte 值
*/
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
自界说编码解码
readInt 是Netty中ByteBuf类的一个办法,用于从ByteBuf中读取一个int类型的数据。
它回来一个int类型的值,并将读取索引主动增加4(int的字节数)
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
// 编码
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4个字节的魔数,恣意
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版别
out.writeByte(1);
// 3. 字节的序列化办法 0,jdk 1 JSON
out.writeByte(0);
// 4. 字节指令类型---.getMessageType 拿到其时多态的,类型,每个类都有指定的类型
out.writeByte(msg.getMessageType());
// 5. 4个字节--恳求序号
out.writeInt(msg.getSequenceId());
out.writeByte(0xff); // 为了凑数的,由于固定字节数 要 2 的倍数
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容--用jdk序列化
out.writeBytes(bytes);
}
// 解码
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 先读取戏法
int magicNum = in.readInt();
// 读取版别
byte version = in.readByte();
// 读取序列号办法
byte serializerType = in.readByte();
// 读取音讯类型 messageType
byte messageType = in.readByte();
// 读取恳求序号
int sequenceId = in.readInt();
// 读取无意义的字节,其时为了规范加的
in.readByte();
// 读取长度
int length = in.readInt();
// 获取数据字节
byte[] bytes = new byte[length];
ByteBuf byteBuf = in.readBytes(bytes, 0, length);
// 转为目标
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);
Message message = (Message) ois.readObject();
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
// 终究增加到调集中
out.add(message);
}
}
然后咱们测验这个处理器
- 1024:最大容量
- 12:偏移量,由于序列号,魔数,类型无效字符等,协议。
- 4:长度字节
- 0:不需求越过
- 0:由于 decode自己解析,所以不需求
留意要加LengthFieldBasedFrameDecoder
(1024, 12, 4, 0, 0),处理黏包半包
敞开了解码器:假如数据不完好,将不会往下走
writeInbound:履行完会调用 release开释 bytebuf
public static void main(String[] args) throws Exception {
// 增加刚刚写的编码解码
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),
new LoggingHandler(),
new MessageCodec());
// encode 编码
LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zhangsan", "123");
channel.writeOutbound(loginRequestMessage);
// decode 解码
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 写入数据
new MessageCodec().encode(null, loginRequestMessage, buffer);
// 入站
channel.writeInbound(buffer);
}
打印的字节数,能够依据自己界说的协议来,对比,是正确的
注解(安全状况)
sharable 线程安全
咱们能够翻开处理类的源码查看
@Sharable
表明线程安全,这个处理器能够被同享运用
像 LTC解码器是不能够被同享的,存在线程危险
@Sharable
public class LoggingHandler extends ChannelDuplexHandler {
那咱们自己界说的 handle 能够加同享注解吗
ByteToMessageCodec
;子类文档明确规定,不能加同享注解,不然抛出反常
public class MessageCodec extends ByteToMessageCodec<Message> {
那需求增加怎样处理
运用这个父类即可–MessageToMessageCodec
有一些小变动,需求自行改一下,bytebuf方面
public class MessageCodec extends MessageToMessageCodec<ByteBuf, Message> {
开发谈天室
首要,咱们协议在 LengthFieldBasedFrameDecoder
直接写,会简单出错咱们制作成类
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
// 协议
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
登录
客户端
客户端衔接成功之后进行衔接树立事情,控制台输入
需求拓荒线程,运用 nio 线程会影响其他 io 操作
- 咱们构建完目标,进行
writeAndFlush
发送 - 往上持续找上一个
handel
-
MESSAGE_CODEC
编码,LOGGING_HANDLER
记载日志 - ProcotolFrameDecoder是入站的所以不会触发
先看一下下面这个 CountDownLatch
用于同步东西类,和谐线程之间的等候时刻
-
countDown()
:每次调用减 1,为零开释一切线程 -
await()
:让调用的线程等候计数器为 0 ,立即回来,不然会堵塞
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
用于在多线程下判别的 boolean 目标
-
get()
:获取其时布尔值。 -
set(boolean newValue)
:设置布尔值为指定的新值。
AtomicBoolean LOGIN = new AtomicBoolean();
会话办理—登录成功后保存是那个channel
那个用户
将用户和 channel
绑定起来
SessionFactory.getSession().bind(ctx.channel(), username);
留意:handler 没有加 @ChannelHandler.Sharable 会导致其他客户端衔接不上
衔接假死
原因
- 网络设备呈现毛病,例如网卡,机房等,底层的 TCP 衔接现已断开了,但应用程序没有感知到,依然占用着资源。
- 公网网络不稳定,呈现丢包。假如接连呈现丢包,这时现象便是客户端数据发不出去,服务端也一向收不到数据,就这么一向耗着
- 应用程序线程堵塞,无法进行数据读写
问题
- 假死的衔接占用的资源不能主动开释
- 向假死的衔接发送数据,得到的反馈是发送超时
IdleStateHandler 处理器处理以上问题
- 第一个参数: 表明读闲暇时刻,即在指定的时刻距离内假如没有读取到数据,则会触发userEventTriggered办法。
- 第二个参数: 表明写闲暇时刻,即在指定的时刻距离内假如没有写入数据,则会触发userEventTriggered办法。
- 第三个参数: 表明读写闲暇时刻,即在指定的时刻距离内假如既没有读取到数据也没有写入数据,则会触发userEventTriggered办法。
用心跳机制,确保客户端正常链接,处理闲暇问题
优化
序列化算法
序列化,反序列化首要用在音讯正文的转换上
- 序列化时,需求将 Java 目标变为要传输的数据(能够是 byte[],或 json 等,终究都需求变成 byte[])
- 反序列化时,需求将传入的正文数据还原成 Java 目标,便于处理
/**
* 扩展序列化
*/
public interface Serializer {
// 反序列化
<T> T deserialize(Class<T> tClass, byte[] bytes);
// 序列化
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失利", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失利", e);
}
}
},
Json {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
String s = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(s, tClass);
}
@Override
public <T> byte[] serialize(T object) {
String toJson = new Gson().toJson(object);
return toJson.getBytes(StandardCharsets.UTF_8);
}
}
}
}
留意反序列化,取得的目标
参数调优
// 客户端经过 option 装备参数,给 SocketChannel 装备参数
new Bootstrap().option()
// 服务端经过 option 装备参数,给 ServerSocketChannel 装备参数
new ServerBootstrap().option()
// 服务端经过 childOptio 装备参数,给 SocketChannel 装备参数
new ServerBootstrap().childOption()
客户端超时衔接
客户端不必定比及 5秒后抛出反常,它有或许判别服务器没有敞开,根本衔接不上,直接抛出网络编程反常
bootstrap.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);
SO_TIMEOUT 首要用在堵塞 IO,堵塞 IO 中 accept,read 等都是无限等候的,假如不期望永远堵塞,运用它调整超时时刻,netty 不会用的。
悉数代码
client
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
// 线程计数器
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
// 登录状况--默许未登录
AtomicBoolean LOGIN = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
// bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 500);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
// 初始化操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判别是不是,读闲暇时刻太长或者 写时刻闲暇时刻太长
// 5s 没用收到 channel 的数据会触发一个 IdleState#WRITER_IDLE 事情
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
// ChannelDuplexHandler 能够一起作为入站和出站处理器
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 触发特别事情 IdleState
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 写闲暇
if (event.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingMessage());
}
}
});
// 树立衔接之后触发 active
ch.pipeline().addLast("client_handel", new ChannelInboundHandlerAdapter() {
// 承受呼应信息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// log.debug("{}", msg);
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage message = (LoginResponseMessage) msg;
// 假如登录成功设置为 true
if (message.isSuccess()) {
LOGIN.set(true);
}
// 让计数器 - 1,让线程持续履行,唤醒
WAIT_FOR_LOGIN.countDown();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 在这儿要创立线程去完成,控制台输入,担任向服务器发送信息
new Thread(() -> {
Scanner sc = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = sc.nextLine();
System.out.println("请输入暗码:");
String password = sc.nextLine();
// 构建音讯目标
LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送信息
ctx.writeAndFlush(message);
System.out.println("等候后续操作....");
try {
// 呼应回来,持续履行
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 假如登录失利,封闭 channel,回来
log.debug("{}", LOGIN.get());
if (!LOGIN.get()) {
ctx.channel().close();
return;
}
// 打印菜单,登录成功
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]"); // 发送
System.out.println("gsend [group name] [content]"); // 向群聊发送
System.out.println("gcreate [group name] [m1,m2,m3...]"); // 创立群,而且拉人
System.out.println("gmembers [group name]"); // 查看群里成员
System.out.println("gjoin [group name]"); // 参加谈天群
System.out.println("gquit [group name]"); // 退出谈天群
System.out.println("quit");
System.out.println("==================================");
Scanner command = new Scanner(System.in);
// 用户输入指令--由于指令都是以空格分割的
String[] s = command.nextLine().split(" ");
switch (s[0]) { // 依据指令格局,0 表明指令类型
case "send":
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend":
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate":
HashSet<String> strings = new HashSet<>(Arrays.asList(s[2].split(",")));
strings.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], strings));
break;
case "gmembers":
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close();
return;
}
}
}, "system in").start();
}
});
}
});
log.debug("登录");
Channel channel = bootstrap.connect("localhost", 8081).sync().channel();
channel.closeFuture().sync();
} catch (
Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
config 包
public abstract class Config {
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static int getServerPort() {
String value = properties.getProperty("server.port");
if(value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
public static Serializer.Algorithm getSerializerAlgorithm() {
String value = properties.getProperty("serializer.algorithm");
if(value == null) {
return Serializer.Algorithm.Java;
} else {
return Serializer.Algorithm.valueOf(value);
}
}
}
protocol
@ChannelHandler.Sharable
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版别,
out.writeByte(1);
// 3. 1 字节的序列化办法 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
@ChannelHandler.Sharable
/**
* 有必要和 LengthFieldBasedFrameDecoder 一起运用,确保接到的 ByteBuf 音讯是完好的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版别,
out.writeByte(1);
// 3. 1 字节的序列化办法 jdk 0 , json 1
/* ordinal 参数会依据排序进行对比,比方 java是一个那么就会转换 0 以此类推 */
out.writeByte(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组,序列化, 运用
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 反序列化
// 依据个数拿到对应的序列化算法,找到反序列化的算法
Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerType];
// 确定详细音讯类型
Class<?> aClass = Message.getMessageClass(messageType);
Object message = algorithm.deserialize(aClass, bytes);
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
/**
* 扩展序列化
*/
public interface Serializer {
// 反序列化
<T> T deserialize(Class<T> tClass, byte[] bytes);
// 序列化
<T> byte[] serialize(T object);
enum Algorithm implements Serializer {
Java {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失利", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失利", e);
}
}
},
Json {
@Override
public <T> T deserialize(Class<T> tClass, byte[] bytes) {
String s = new String(bytes, StandardCharsets.UTF_8);
return new Gson().fromJson(s, tClass);
}
@Override
public <T> byte[] serialize(T object) {
String toJson = new Gson().toJson(object);
return toJson.getBytes(StandardCharsets.UTF_8);
}
}
}
}
server.handler
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
// 获取接纳者者
String to = chatRequestMessage.getTo();
// 依据用户名获取 channel
Channel channel = SessionFactory.getSession().getChannel(to);
// 为空,表明不在线
if (channel != null) {
// 发送信息
channel.writeAndFlush(new ChatResponseMessage(chatRequestMessage.getFrom(), chatRequestMessage.getContent()));
} else {
channelHandlerContext.writeAndFlush(new ChatResponseMessage(false, "对方用户不在线"));
}
}
}
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
// 依据群名,获取一切通道
List<Channel> channel = GroupSessionFactory.getGroupSession()
.getMembersChannel(groupChatRequestMessage.getGroupName());
// 循环发送信息
for (Channel channel1 : channel) {
channel1.writeAndFlush(new GroupChatResponseMessage(groupChatRequestMessage.getFrom(),
groupChatRequestMessage.getContent()));
}
}
}
/**
* 处理创立群聊
*/
@ChannelHandler.Sharable
public class GroupCreateRequestHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
// 将要创立的群聊名称
String groupName = groupCreateRequestMessage.getGroupName();
// 创立群而且要拉入的群成员
Set<String> members = groupCreateRequestMessage.getMembers();
// 获取群办理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
// 创立群
Group group = groupSession.createGroup(groupName, members);
if (group == null) {
// 发送拉群音讯
List<Channel> channelList = groupSession.getMembersChannel(groupName);
for (Channel channel : channelList) {
channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
}
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创立成功"));
} else {
channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "群现已存在"));
}
}
}
@ChannelHandler.Sharable // 由于没有状况信息,多线程下是安全的
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
// 判别登录状况
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage message;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username);
message = new LoginResponseMessage(true, "登录成功");
} else {
message = new LoginResponseMessage(false, "用户名或暗码不正确");
}
ctx.writeAndFlush(message);
}
}
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
// 当衔接断开触发 inactive 事情
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 正常断开", ctx.channel());
}
// 用户反常断开
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} 反常断开", ctx.channel());
}
}
backlog 衔接
服务器在三次衔接树立之后,将半衔接行列转移到全衔接行列
由于服务器 accept 衔接量特别大,所以将树立成功的信息放到全衔接行列。以此处理。
accept 是发生在衔接之前的。
-
在 linux 2.2 之前,backlog 巨细包含了两个行列的巨细,在 2.2 之后,别离用下面两个参数来控制
-
sync queue – 半衔接行列
- 巨细经过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
syncookies
启用的状况下,逻辑上没有最大值约束,这个设置便被忽略
- 巨细经过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在
-
accept queue – 全衔接行列
- 其巨细经过 /proc/sys/net/core/somaxconn 指定,在运用 listen 函数时,内核会依据传入的 backlog 参数与体系参数,取二者的较小值,假定linux装备 100,程序装备 200 ,取100为准
- 假如 accpet queue 行列满了,server 将发送一个拒绝衔接的错误信息到 client
-
ChannelOption.SO_BACKLOG, 2
,设置全衔接行列容量巨细
new ServerBootstrap()
.group(new NioEventLoopGroup())
// 装备让全链接行列最大存放 2个
.option(ChannelOption.SO_BACKLOG, 2)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LoggingHandler());
}
}).bind(8424);
断点在 nioeventloop 下面代码中,即可调试
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
超越行列巨细,将会抛出反常
体系参数
- ulimit -n:一个进程能够翻开的文件操作数
- TCP_NODELAY:设置 true 不需求延迟,不需求算法,主张 true
- 归于 SocketChannal 参数
- 归于操作体系参数
- SO_SNDBUF & SO_RCVBUF
- SO_SNDBUF 归于 SocketChannal 参数
- SO_RCVBUF 既可用于 SocketChannal 参数,也能够用于 ServerSocketChannal 参数(主张设置到 ServerSocketChannal 上)
内存缓冲区
io 只能是直接内存,ctx获取的 bytebuf
三种线程模型
单线程模型
异步非堵塞io,一切的操作都是由一个nio线程处理,合适特别小的程序
一个单线程负荷过度,客户端向服务端超时,服务端的超时处理睬处理,终究卡死就会宕机
多线程模型
分红reactor 单线程和reactor 多线程 单线程担任承受新来的恳求 处理逻辑交给线程池处理
由一组nio线程组成
主从线程模型
由一组线程池承受恳求,一组线程池处理io