欢迎大家关注github.com/hsfxuebao,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
《Netty 权威指南》书上使用的源码是 Netty5 的,但是 Netty5 已经被废弃了,所以本文是参照 Net监控ty4.1 的源码解读的。
JDK 的 NIO 类库中,提供了 SocketChannel 和 ServerSocketChannel 用于非阻塞 I/O 操作。类似于 NIO 的 Channel,Netty监控家用远程手机 提供了自己的 Channel 和其教程画画子类实现。
1. Channel 功能说明
io.netty.channel.Cha监控安装nnel 是 Netty 的网络操作https认证抽象类,聚合了一组功github开放私库能,包括但不限于网络读写、客户端发起连接、主动关闭连接,https和http的区别同时也包含了 Netty 框架相关的一些功能,包括获取 Channel 的 EventLoop,获取缓冲区分配器 ByteBufAllocator 和 pipeline 等。
为了 Netty 不使用 NIO 的原生 Channel,而是要另起炉灶呢?主要原因如下:
- JDK 的 SocketChannel 和 ServerSocketChannel 没有统一的 Channel 接口供业务开发监控安装流程者使用。对用户而言,
没有统一的操作视图,使用起来不方便https安全问题
。 - JDK 的 SocketChannel 和 ServerSocketChannel 是
SPI 类接口,通过继承来扩展很不方便
,不如开发一个新的。 - Netty 的 Channel 需要https域名能跟 Nethttps域名ty 架构融合在一起。
- 自定义 C监控眼hannel 功能实现会更灵活。
基于以上原因,Netty 重新设计了 Chgithub官网annel,其主要设计理念如下:
- 在 Channel 接口层,github开放私库
采用 Facade 模式统一封装
,将网络 I/O 操作、网络 I/O 相关联的其他操作封装起来,统一对外提供。 -
Channel 接口定义尽量大而全
,为 Socket源码1688Channel 和 ServerSocketChannel 提供统一的视监控安装图,由不同子https安全问题类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现功能和接口的重用。 -
具体实现采用聚合而非包含的方式,Channel 负责统一分配和调度,更加灵活
。
Netty 的 Channel 都有哪些功能呢?
- 常见的网络 IO源码编辑器 操作:读、写、绑定端口、连接、关闭连接等。
- 获取 EventLoop。
- 获取 parent Channelgithub官网登陆入口,对于服务端 SocketChannel 来说,parent 就是创建它的 ServerSocketChannel。
- 唯一标志 id。
- 元数据 metadata,获取 TCP 参数配置等。
1.1 常用接口
-
eventLoop(),Channel需要注册到EventLoop的多路复用器上,用于处理IO事件,通过e源码ventLoop方法可以获取到Channel注册的EventLoop。EventLoop本github官网登陆入口质上就是处理网络读写事件的Reactor线程。在Netty中,它不仅仅用来处理网络事件,也源码时代可以用来执行定时任务和用户自定义NioTask等任务。
-
megithub永久回家地址tadata(),熟悉TCP协议的同学可能知道,当创建Socket的时候需要指定TCP参数,例如接收和发送的TC源码之家P缓冲区大小,TCP的超时时间,是否重用地址等等。在Netty中,每个Channel对应一个物理连接,每个连接都有自己的TCP参数配置。所以,Cgithub官网hannel会聚合一个ChannelMetadata用来对TCP参数提供元数据描教程拼音述信息,通过metadata方法就可以获取当前Channel的TCP参数配置。
-
parent(),对于服务端Channel而言,它的父Chann源码1688elgithub官网为空,对于客户端源码Channel,它的父Channel就是创建它的ServerSocketChannel。github开放私库
-
用户获取Chttps协议hannel标识的id,它返回ChannelId对象,ChannelId是Channel的唯一标识,它的可能生成策略如下:
- 机器源码编辑器的MAC地址(EUI-48或者EUhttps协议I-64)等可以代表全局唯一的信息。
- 当前进程的ID。
- 当前系统时间的毫秒——System.currentTimeMillis
- 当前系统时间的纳秒——System.nanoTime
- 32位的随机整型源码精灵永久兑换码数
- 32位自增的序教程英文翻译列数
2. Channel 源码分析
2.1 继承关系类图

NioServerSocketChannel、NioSocketChannel 两者都继承了 Channel、Abs教程拼音trac监控安装tChannel、Agithub官网b监控家用远程手机stractNioChannel。
2.2 AbstractChannel
主要成员变量如下所示:
// 父 Channel private final Channel parent; // 全局唯一 id。 private final ChannelId id; // Unsafe 实例 private final Unsafe unsafe; // 当前 Channel 对应的 DefaultChannelPipeline。 private final DefaultChannelPipeline pipeline; private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); private volatile SocketAddress localAddress; private volatile SocketAddress remoteAddress; // EventLoop private volatile EventLoop eventLoop; private volatile boolean registered; private boolean closeInitiated; private Throwable initialCloseCause;
Abstrgithub永久回家地址actChannel 中的网络 I/O 操作都是调用 p源码中的图片ipeline 中的对应方法源码编程器,继而由 pipeline 调用教程之家教学视频 ChannelHandler 进行处理。
@Override public ChannelFuture bind(SocketAddress localAddress) { return pipeline.bind(localAddress); } @Override public ChannelFuture connect(SocketAddress remoteAddress) { return pipeline.connect(remoteAddress); } @Override public ChannelFuture write(Object msg) { return pipeline.write(msg); }
2监控拍下东航客机坠落瞬间.3 AbstractNioChannel
主要成员变量有:
- SelectableChannel:这是一个 Java NIO SocketChannel 和 Servgithub官网erSocketChannel 的公共父类,放在这github开放私库里是因为 AbstractNioChannel 也是 NioSocketChannel 和 NioServerSocketChannel 的公共父类。
- readInterestOp:代教程之家表 JDK SelectionKey 的 OP_READ。
- Selection监控摄像头Key:Channel 注册到 EventLoop(Selector)时返回的 key,修改它可以改变感兴趣的事件。
- connectPromise:代表连接操作结果。
- connectTimeoutFuture:连接超时定时器。
- request教程画画edRemoteAddress:connec教程之家提取码t 时的远程地址。
private final SelectableChannel ch; protected final int readInterestOp; volatile SelectionKey selectionKey; boolean readPending; /** * The future of the current connection attempt. If not null, subsequent * connection attempts will fail. */ private ChannelPromise connectPromise; private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress;
AbstractNioChannel 类里比较重要的方法是 doRegister,该方法负责将 Channel 注册到多路复用器 Selector。
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
在 doRe监控拍下东航客机坠落瞬间gister 方法中,对 ops 字段设置为 0,也就是对任何事件都不感兴趣。真正的设置读操作位是在 doBeginRead 方法中,那么写操作位在何时设置呢?当然是有数据要写,而缓冲区满(或其他不能立即写)HTTPS的情况。
@Override protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }
2.4 Abstract监控家用远程手机NioByteChannel
AbstractNioByteChannel 是 NioSocketChannel 的父类,只有一个成员变量 flushTask,负责写半包消息。
private Runnable flushTask;
最主要的方法是 doWrite:
@Override protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { Object msg = in.current(); // 如果没有要写的数据,就清除写标志位,并返回 if (msg == null) { // Wrote all messages. clearOpWrite(); // Directly return here so incompleteWrite(...) is not called. return; } // 对于 ByteBuf 类型、FileRegion 类型分开处理,其他未知类型抛异常 if (msg instanceof ByteBuf) { ByteBuf buf = (ByteBuf) msg; int readableBytes = buf.readableBytes(); if (readableBytes == 0) { in.remove(); continue; } boolean done = false; long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } // 只循环写 writeSpinCount 次,为了避免写大块儿数据时,阻塞其他线程过长时间 for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); // 返回 0 表示写缓冲区满,setOpWrite 为 true 会设置 SelectionKey 的写标志位,在可写时会得到通知。 if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); if (done) { in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; } } else if (msg instanceof FileRegion) { FileRegion region = (FileRegion) msg; boolean done = region.transferred() >= region.count(); if (!done) { long flushedAmount = 0; if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } for (int i = writeSpinCount - 1; i >= 0; i--) { long localFlushedAmount = doWriteFileRegion(region); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (region.transferred() >= region.count()) { done = true; break; } } in.progress(flushedAmount); } if (done) { in.remove(); } else { // Break the loop and so incompleteWrite(...) is called. break; } } else { // Should not reach here. throw new Error(); } } incompleteWrite(setOpWrite); } // 走到这里,说明还有数据没有发送完毕,需要进一步处理 protected final void incompleteWrite(boolean setOpWrite) { // setOpWrite 为 true,设置 SelectionKey 写标志位 if (setOpWrite) { setOpWrite(); } else { // 否则,启动 flushTask 继续写半包消息 Runnable flushTask = this.flushTask; if (flushTask == null) { flushTask = this.flushTask = new Runnable() { @Override public void run() { flush(); } }; } eventLoop().execute(flushTask); } }
2.5 AbstractNioMessageChannel
AbstractNioMessageChannel 是 NioServerSocketChannel、NioDatagramChannel 的父类。其主要方法也是 doWrite,功能和 AbstractNioByteChannel 的 doWrite 也类似,区别只是后者只处理 ByteBuf 和 Fi教程拼音leR监控安装流程egion,前者无此限制,处理所有 Object。
protected void doWrite(ChannelOutboundBuffer in) throws Exception { final SelectionKey key = selectionKey(); final int interestOps = key.interestOps(); for (;;) { Object msg = in.current(); if (msg == null) { // Wrote all messages. if ((interestOps & SelectionKey.OP_WRITE) != 0) { key.interestOps(interestOps & ~SelectionKey.OP_WRITE); } break; } try { boolean done = false; for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) { if (doWriteMessage(msg, in)) { done = true; break; } } if (done) { in.remove(); } else { // Did not write all messages. if ((interestOps & SelectionKey.OP_WRITE) == 0) { key.interestOps(interestOps | SelectionKey.OP_WRITE); } break; } } catch (Exception e) { if (continueOnWriteError()) { in.remove(e); } else { throw e; } } } } // 处理 msg,由子类实现 protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
doWriteMessage 方源码交易平台法在 NioServerSocketChannel 中实现如下所示,是因教程拼音为 NioServerSocketChannel 只是用来监听端口,接收客户端请求,不负责传输实际数据。
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); }
doWriteMessage 方法在 NioSctpChannel 中是由具体实现的,从代码中可以看出来,它处理的只是 SctpMessage 类型的数https域名据。
protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { SctpMessage packet = (SctpMessage) msg; ByteBuf data = packet.content(); int dataLen = data.readableBytes(); if (dataLen == 0) { return true; } ByteBufAllocator alloc = alloc(); boolean needsCopy = data.nioBufferCount() != 1; if (!needsCopy) { if (!data.isDirect() && alloc.isDirectBufferPooled()) { needsCopy = true; } } ByteBuffer nioData; if (!needsCopy) { nioData = data.nioBuffer(); } else { data = alloc.directBuffer(dataLen).writeBytes(data); nioData = data.nioBuffer(); } final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier()); mi.payloadProtocolID(packet.protocolIdentifier()); mi.streamNumber(packet.streamIdentifier()); mi.unordered(packet.isUnordered()); // 写数据 final int writtenBytes = javaChannel().send(nioData, mi); return writtenBytes > 0; }
2.6 NioServerSocketChannel
NioServerSocketChannel 是服务端 Channel 的实现监控系统类,有一个用于配置 TCP 参数的 ServerSocketChannelConfig。
private final ServerSocketChannelConfig config;
作为服务端 Channel,其核心方法是端口绑定 doBind 方法、创建 SocketChannel 的 doReadMessages教程视频怎么制作方法 方法。
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
对于和服务端 Chann源码编程器el 无关的方法,要果断抛出 UnsupportedOperationException 异常。
@Override protected void doDisconnect() throws Exception { throw new UnsupportedOperationException(); } @Override protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception { throw new UnsupportedOperationException(); } @Override protected final Object filterOutboundMessage(Object msg) throws Exception { throw new UnsupportedOperationException(); }
2.7 NioSgithub开放私库ocketgithub开放私库Channel
NioSocketChannel 是客户端 Channgithub下载el 的实现类,也是只有一个用于配置参数监控的变量 SocketChannelConfig。
private final SocketChannelConfig config;
客户端 Channel 的核心方法有连接 doConnect、写半包 doWrite、读操作 doReadBytes,下面我们挨个分析。
连接操作 doConnect 具体实现如下:
- 如果 localAddress 为空,则进行绑定操作。
- 调用 socketChannel.connect 进行连接。
- 如果连接尚未完成,则注册 OP_CONNECT 事件。
- 如果连接失败抛出异常,也要调用 doClose 关闭连接。
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (localAddress != null) { doBind0(localAddress); } boolean success = false; try { // 实际上是调用了 socketChannel.connect 方法。 boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); if (!connected) { selectionKey().interestOps(SelectionKey.OP_CONNECT); } success = true; return connected; } finally { if (!success) { doClose(); } } }
写操作 doWrit监控安装流程e 具体实现如下教程画画:
- 判https和http的区别断待写数据大小,若为 0 则清除写标志位,并返回。
- 从 ChannelOutboundB监控摄像头uffer 里获取待写 ByteBuffer 数组,和待写 Bytgithub中文社区eBuffer 数量 nioBufferCnt。
- 针对 nioBufferCnt 的不同大小进行了区别处理。
- 如果 nioBufferCnt 为https和http的区别 0,则监控摄像头品牌排行调用父类的方法处理,以防有除了 ByteBuffer教程英文翻译 之外的数据需要写。
- nioBufferCnt 为 1 和大github下载于 1 的处理类似,都是循环写 getWriteSpinCoun监控眼t 次,若写完则结束,未写完则设置后续写半包的方式。这一点和父类 Abst教程魔方ractNioByteChannel 中的处理方法类似。
protected void doWrite(ChannelOutboundBuffer in) throws Exception { for (;;) { int size = in.size(); if (size == 0) { // All written so clear OP_WRITE clearOpWrite(); break; } long writtenBytes = 0; boolean done = false; boolean setOpWrite = false; // Ensure the pending writes are made of ByteBufs only. ByteBuffer[] nioBuffers = in.nioBuffers(); int nioBufferCnt = in.nioBufferCount(); long expectedWrittenBytes = in.nioBufferSize(); SocketChannel ch = javaChannel(); // Always us nioBuffers() to workaround data-corruption. switch (nioBufferCnt) { case 0: // We have something else beside ByteBuffers to write so fallback to normal writes. super.doWrite(in); return; case 1: // 和 default 的区别只是传给 ch.write 的是数组还是单个 ByteBuffer ByteBuffer nioBuffer = nioBuffers[0]; for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final int localWrittenBytes = ch.write(nioBuffer); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; if (expectedWrittenBytes == 0) { done = true; break; } } break; default: for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) { final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); if (localWrittenBytes == 0) { setOpWrite = true; break; } expectedWrittenBytes -= localWrittenBytes; writtenBytes += localWrittenBytes; // expectedWrittenBytes 为 0 表示数据发送完毕 if (expectedWrittenBytes == 0) { done = true; break; } } break; } // Release the fully written buffers, and update the indexes of the partially written buffer. in.removeBytes(writtenBytes); if (!done) { // 设置后续写半包方式 incompleteWrite(setOpWrite); break; } } }
读操作比较简单,主要是通过 ByteBuf 来从 Channel 中读取数据。
protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
2.8 总结
Channel 类体系的设计与其实现功能密不可分,父类中实现的是子类共同的功能。在多层次的抽象类中,每一个层次的抽象类负责实现一种功能。
当父类提供大而全的接口时github中文社区,父类可以根据需要去实现,不需要的可以抛出 UnsupportedOperationExce源码时代ption 异常。
3. Unsafe
3.1 功github永久回家地址能说明
Unsafe接口实际上是Channel 接口的辅助接口,它不应被用户代码 直接调用,实际的I/O 读写操作者是由Un源码之家safe接口负责完成的。
在netty中一个很核心的组件,封装了java底层的socket操作,作为连接netty和java 底层nio的重要桥梁。
方法名 | 返回值 | 功能说明 |
---|---|---|
invoker() | ChannelHandlerInvoker | 返回默认使用的ChannelHandlerlnvoker |
localAddress() | SocketAddgithub官网登陆入口ress | 返回本地绑定的So源码时代cket地址 |
remoteAddress()教程之家提取码 | SocketAddress | 返回通教程英语信对端Socket地址 |
register(ChannelPromise promise) | void | 注册Channel 到多路复源码精灵永久兑换码用器上,一旦完成,通知channel监控眼Future |
bind(SocketAddress localAddress,ChannelPromise promise) | void | 绑定指定的本地地址 localAddress到当前的Channel 上,一旦完成,通知c源码精灵永久兑换码hannelFuture |
connegithub开放私库ct(SocketAddress remoteAddress,SocketAddress localAddress,Channe监控可以保存多少天lPromise promise) | void | 绑定本地的 l监控系统ocalAddress之后,连接服务端,源码编辑器下载一旦完成,通知c教程的意思hannelFuture |
disconnect(ChannelPromi源码之家se promise) | void | 断天Channel的连接。一旦完成,通知channelFuture |
close(ChannelPromise promise) | void | 关闭Channel的连接,一旦完成,通知channe教程英语lFuture |
clos教程拼音cForcibly() | void | 强github中文社区制立即关闭连接 |
be教程之家教学视频ginRead() | void | 设置网络操作位为读用于读取信息 |
w监控摄像头品牌排行rite(Object msg,ChannelPromise promise) | void | 发送消息,一但完成,通知ChannelFuture |
flush() | void | 将发送缓源码时代冲数组中的消息写入到Channel中 |
voidPromise() | ChannelPromise | 返回一个特殊的可重用教程视频怎么制作方法和传递的ChannelPromise,它不用于操作成功或者失败的通知器,仅仅作为一容器被使用 |
outboundBuffer() | ChannelOutboundBu教程之家提取码ffer | 返回消息发送缓冲区https安全问题 |
Unsafe监控安装继承关系类图:
3.2 AbstractUnsafe 源码分析
-
register方法:主要用于将当前Unsafe对应的Channel注册到EventLogithub永久回家地址op的多路复用器上,然后调用DefaultChannelPipeline的fireChannel监控安装流程Registed方法,如果Channel被激活,则调用fireChannelActive方法。
-
bind方法:主要用于绑定指定端口。对于服务端,用于绑定监听端口,并源码时代设置backlog参数;教程的意思对于客户端,用于教程魔方指定客户端Channel的本地绑定Socket地源码之家址
-
disconngithub下载ect方法: 该方法用于客户端或服务端主动关闭连接
-
close方法:确保在多线https和http的区别程环境下,多次调用clo源码时代se和一次调用的影响一致,并且可以通过promis得到同样的结果。
-
保证在执行close的过程中,不能向channel写数据。
-
调用doClose0执行执真正的closegithub下载操作。
-
调用deregister对channel做最后源码中的图片的清理工作,并触发channelInact教程的意思i监控眼ve, channelUnregistered事件。
-
-
write方法:实际上是将消息添加到环形发送数组上,并不真正的写Channel(真正的写Channel是flusgithub中文官网网页h方法)。如果Channel 没有处于激活状态,说明TCP链路还有真正建立成功,当前Channel存在以下两种状态
-
Channel 打开,但是TCP链路尚未建立成功
-
Channel 已经关闭
-
-
fgithub中文官网网页lush方法:前面提到,write方法负责将消息放进发送缓冲区,并没有真正的发送,而flush方源码时代法就负责将发送缓冲区中待发送的消息全教程之家提取码部写进Ch教程的意思anngithub直播平台永久回家el中并发送。
部分源码如下:
protected abstract class AbstractUnsafe implements Unsafe { //下面是绑定方法的逻辑 传入的是SocketAddress @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { // 确认当前channel已经注册 assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } //验证传入的参数 // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { //具体的绑定操作在doBind方法里执行 这个方法是channel的方法,也就是我们在上面NioServerSocketChannel 里分析的逻辑 doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } //触发 active事件,在pipline链里传播 if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); } //往外写数据的操作(这里只往缓存里写数据) @Override public final void write(Object msg, ChannelPromise promise) { //验证是否已经注册并且react线程是否已经准备好 assertEventLoop(); //ChannelOutboundBuffer 表示要往外写数据的缓存 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { // If the outboundBuffer is null we know the channel was closed and so // need to fail the future right away. If it is not null the handling of the rest // will be done in flush0() // See https://github.com/netty/netty/issues/2362 safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION); // release message now to prevent resource-leak ReferenceCountUtil.release(msg); return; } int size; try { //对需要写的数据进行过滤 msg = filterOutboundMessage(msg); //对需要写的数据进行大小的预估 size = pipeline.estimatorHandle().size(msg); if (size < 0) { size = 0; } } catch (Throwable t) { safeSetFailure(promise, t); ReferenceCountUtil.release(msg); return; } //将数据增加到缓存中 outboundBuffer.addMessage(msg, size, promise); } // flush方法用于将数据写入到网络中 @Override public final void flush() { assertEventLoop(); //往外发送的缓存对象不能为空 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } outboundBuffer.addFlush(); flush0(); } //最终会调用到这个方法 @SuppressWarnings("deprecation") protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; //对channel的状态进行验证 // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); } else { // Do not trigger channelWritabilityChanged because the channel is closed already. outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } finally { inFlush0 = false; } return; } try { //会调用到Channel的doWrite方法,具体实现的源码可以看NioSocketChannel doWrite(outboundBuffer); } catch (Throwable t) { if (t instanceof IOException && config().isAutoClose()) { /** * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of * failing all flushed messages and also ensure the actual close of the underlying transport * will happen before the promises are notified. * * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} * may still return {@code true} even if the channel should be closed as result of the exception. */ close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } else { try { shutdownOutput(voidPromise(), t); } catch (Throwable t2) { close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false); } } } finally { inFlush0 = false; } } //开始读方法的逻辑 @Override public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { //会调用channel的doBeginRead方法 可以看上面AbstractNioChannel方法里的注释 doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } } //注册方法,channel会往selector里注册关注的事件 @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { //验证数据 if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { //调用下面的方法 register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; //会调用到channel里的方法 可以参考上面的AbstractNioChannel类 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
4. Netty服务端接收客户端数据的调用流程
4.1 读数据的调用流程
NioE源码编辑器ventLoop的run方法会监控GitHubSelectionKey对象
,当有读事件时,会调用unsafe对象的read()方法
,在read方法的逻辑里会触发pipeline对象链的调用
,最终调用到设置的各种ChannelHandler
4.2 写数据的调教程用流程
通过Channel的writeAhttps安全问题ndFlush会调用到pipegithub直播平台永久回家line的writeAndFlush方法里
,在pipeline的调用链里会调用链中的各种ChannelHandler(各以对需要写入的数据进行格式转换)最终通过HeadContext的write方法调用到unsafe里的监控拍下东航客机坠落瞬间write逻辑
。这里只是把数据写入教程英文翻译到ByteBuffe监控r里。通过调用unsafe的flash方法才能最监控拍下东航客机坠落瞬间终将数据写入到网络中,也就是上面的分析过程。
参考文https域名档
Net教程之家教学视频ty学习和源码分析github地址
Netty从入门到精通视频教监控摄像头程(B站)
Netty权威指南 第二版
Netty教程视频怎么制作方法中Channel与Unsafe源码解读
评论(0)