我正在参加「启航计划」

没看过的建议先看上一篇,本来计划讲讲linux内核,也看了一些书本,可是c放了太久了,看代码真实头疼,就先抛弃了,写写事务也没必要卷这么深吧。就讲到调用底层api为止我觉得刚刚好。不太拿手将源码结合讲故事,所以整片略显枯燥,迁就看下吧~~

demo

    public static void main(String[] args) throws InterruptedException {
        Object key=new Object();
        ThreadGroup parentThreadGroup=new ThreadGroup("parent");
        ThreadGroup sonThreadGroup=new ThreadGroup(parentThreadGroup,"son");
        new Thread(parentThreadGroup,()->{
            synchronized (key){
                try {
                    System.out.println("------------");
                    System.out.println("parent start.....");
                    Thread.sleep(3000);
                    System.out.println("parent over.....");
                    System.out.println(Thread.currentThread().getThreadGroup().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getParent().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(sonThreadGroup,()->{
            synchronized (key){
                try {
                    System.out.println("------------");
                    System.out.println("son start ......");
                    Thread.sleep(3000);
                    System.out.println("son over ......");
                    System.out.println(Thread.currentThread().getThreadGroup().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getParent().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(()->{
            synchronized (key){
                try {
                    System.out.println("------------");
                    System.out.println("other start ......");
                    Thread.sleep(3000);
                    System.out.println("other over ......");
                    System.out.println(Thread.currentThread().getThreadGroup().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getParent().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Thread.sleep(1000);
        try{
            //销毁此线程组及其一切子组。此线程组必须为空,表示该线程组中的一切线程都已中止。
//            parentThreadGroup.destroy();
//            System.out.println("isDestory?:"+parentThreadGroup.isDestroyed());
            //中止此线程组中的一切线程。
//            parentThreadGroup.stop();
            //中止此线程组中的一切线程。
//        parentThreadGroup.interrupt();
            System.out.println(            parentThreadGroup.activeCount());
        }catch (Exception e){
            System.out.println("isDestory?:"+parentThreadGroup.isDestroyed());
        }
        new Thread(sonThreadGroup,()->{
            synchronized (key){
                try {
                    System.out.println("------------");
                    System.out.println("later start ......");
                    Thread.sleep(3000);
                    System.out.println("later over ......");
                    System.out.println(Thread.currentThread().getThreadGroup().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getName());
                    System.out.println(Thread.currentThread().getThreadGroup().getParent().getParent().getParent().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }

源码剖析

Selector.open()

此处会有SelectorProvider是个模板办法,不用体系的完成不同,这就很蛋疼,没有EPollSelectorProvider,莫的办法,只能去github找了一份源码迁移到gitee,jdk8感兴趣的能够自行下载,想看其他版本的同学就要自行寻找了。

「NIO源码」JavaNIO源码 & JNI分析二:Java NIO源码分析

EPollSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    pollWrapper = new EPollArrayWrapper();
    pollWrapper.initInterrupt(fd0, fd1);
    fdToKey = new HashMap<>();
}

IOUtil.makePipe(false)

IOUtil.makePipe(false);是一个JNI办法对应的路径为/jdk8u_jdk/src/solaris/native/sun/nio/ch/IOUtil.c

  • 底层会调用int pipe(int pipefd[2]);
  • 函数pipe()会建立管道,并将文件描述词由参数pipefd数组回来。
    • pipefd[0]为管道里的读取端
    • pipefd[1]则为管道的写入端
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)
{
    int fd[2];
		//调用pipe函数
    if (pipe(fd) < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");
        return 0;
    }
    if (blocking == JNI_FALSE) {
        if ((configureBlocking(fd[0], JNI_FALSE) < 0)
            || (configureBlocking(fd[1], JNI_FALSE) < 0)) {
            JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");
            close(fd[0]);
            close(fd[1]);
            return 0;
        }
    }
    //2个地址合并成long回来
    return ((jlong) fd[0] << 32) | (jlong) fd[1];
}

new EPollArrayWrapper()

这个类呢,类注释写的很清楚了,这儿剪短的概述下,这个类便是操作epoll相关的结构体的,所以啦,里边也基本上都是JNI办法

EPollArrayWrapper() throws IOException {
    // creates the epoll file descriptor
    epfd = epollCreate();
    // the epoll_event array passed to epoll_wait
    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
    pollArray = new AllocatedNativeObject(allocationSize, true);
    pollArrayAddress = pollArray.address();
    // eventHigh needed when using file descriptors > 64k
    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
        eventsHigh = new HashMap<>();
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
    //创立epoll结构体
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

new AllocatedNativeObject(allocationSize, true);

下面pollArray = new AllocatedNativeObject(allocationSize, true);

pollArray注释为:来自 epoll_wait 的结果的 epoll_event 数组,但是看源码吧,也算不上数组,就拓荒了一个内存块。

protected NativeObject(int var1, boolean var2) {
    //略
    if (!var2) {
        this.allocationAddress = unsafe.allocateMemory((long)var1);
        this.address = this.allocationAddress;
    } else {
      	//获取内存页数
        int var3 = pageSize();
      	//拓荒内存空间,回来地址
        long var4 = unsafe.allocateMemory((long)(var1 + var3));
        this.allocationAddress = var4;
        this.address = var4 + (long)var3 - (var4 & (long)(var3 - 1));
    }
}

pollWrapper.initInterrupt(fd0, fd1)

fd0:前面pipe申请的读取端的文件描述符

上一篇我们介绍了epoll_ctl()函数, 这儿我们重温下。epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);epfd添加fd0fd0可读,over~~

void initInterrupt(int fd0, int fd1) {
    outgoingInterruptFD = fd1;
    incomingInterruptFD = fd0;
    epollCtl (epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;
    event.events = events;
    event.data.fd = fd;
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}


ServerSocketChannel.open()

下面就来到了channel,内部调用为sun.nio.ch.ServerSocketChannelImpl#ServerSocketChannelImpl(java.nio.channels.spi.SelectorProvider)

ServerSocketChannelImpl(SelectorProvider var1) throws IOException {
    super(var1);
    this.fd = Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(this.fd);
    this.state = 0;
}

Net.serverSocket(true)

这儿看个办法名就大约知道这是创立socket套接字的地方,各种协议规矩之类的就跳过了,感兴趣的能够研讨下。

static FileDescriptor serverSocket(boolean var0) {
    return IOUtil.newFD(socket0(isIPv6Available(), var0, true, fastLoopback));
}
JNIEXPORT int JNICALL
Java_sun_nio_ch_Net_socket0(JNIEnv *env, jclass cl, jboolean preferIPv6,
                            jboolean stream, jboolean reuse, jboolean ignored)
{
    int fd;
    int type = (stream ? SOCK_STREAM : SOCK_DGRAM);
#ifdef AF_INET6
    int domain = (ipv6_available() && preferIPv6) ? AF_INET6 : AF_INET;
#else
    int domain = AF_INET;
#endif
    //创立套接字
    fd = socket(domain, type, 0);
    if (fd < 0) {
        return handleSocketError(env, errno);
    }
#ifdef AF_INET6
    /* Disable IPV6_V6ONLY to ensure dual-socket support */
    if (domain == AF_INET6) {
        int arg = 0;
        if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char*)&arg,
                       sizeof(int)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IPV6_V6ONLY");
            close(fd);
            return -1;
        }
    }
#endif
    if (reuse) {
        int arg = 1;
        if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set SO_REUSEADDR");
            close(fd);
            return -1;
        }
    }
#if defined(__linux__)
    if (type == SOCK_DGRAM) {
        int arg = 0;
        int level = (domain == AF_INET6) ? IPPROTO_IPV6 : IPPROTO_IP;
        if ((setsockopt(fd, level, IP_MULTICAST_ALL, (char*)&arg, sizeof(arg)) < 0) &&
            (errno != ENOPROTOOPT)) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IP_MULTICAST_ALL");
            close(fd);
            return -1;
        }
    }
#endif
#if defined(__linux__) && defined(AF_INET6)
    /* By default, Linux uses the route default */
    if (domain == AF_INET6 && type == SOCK_DGRAM) {
        int arg = 1;
        if (setsockopt(fd, IPPROTO_IPV6, IPV6_MULTICAST_HOPS, &arg,
                       sizeof(arg)) < 0) {
            JNU_ThrowByNameWithLastError(env,
                                         JNU_JAVANETPKG "SocketException",
                                         "Unable to set IPV6_MULTICAST_HOPS");
            close(fd);
            return -1;
        }
    }
#endif
    return fd;
}

这儿newFD,便是创立一个对应的java目标,setfdVal()便是把文件描述符地址,放入到java目标中

public static FileDescriptor newFD(int var0) {
    FileDescriptor var1 = new FileDescriptor();
    setfdVal(var1, var0);
    return var1;
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_IOUtil_setfdVal(JNIEnv *env, jclass clazz, jobject fdo, jint val)
{
    (*env)->SetIntField(env, fdo, fd_fdID, val);
}

ssc.socket().bind()

无聊的省略了,终究会到sun.nio.ch.ServerSocketChannelImpl#bind

public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
    synchronized(this.lock) {
        if (!this.isOpen()) {
            throw new ClosedChannelException();
        } else if (this.isBound()) {
            throw new AlreadyBoundException();
        } else {
            //端口
            InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
            SecurityManager var5 = System.getSecurityManager();
            if (var5 != null) {
                var5.checkListen(var4.getPort());
            }
            NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
            //终究会调用bind和listen
            Net.bind(this.fd, var4.getAddress(), var4.getPort());
            Net.listen(this.fd, var2 < 1 ? 50 : var2);
            synchronized(this.stateLock) {
                this.localAddress = Net.localAddress(this.fd);
            }
            return this;
        }
    }
}

bind()

能够看到,channel里边封装的fd(Socket文件描述符),后续也是把这个绑定端口。

public static void bind(FileDescriptor var0, InetAddress var1, int var2) throws IOException {
    bind(UNSPEC, var0, var1, var2);
}
static void bind(ProtocolFamily var0, FileDescriptor var1, InetAddress var2, int var3) throws IOException {
    boolean var4 = isIPv6Available() && var0 != StandardProtocolFamily.INET;
    bind0(var1, var4, exclusiveBind, var2, var3);
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_bind0(JNIEnv *env, jclass clazz, jobject fdo, jboolean preferIPv6,
                          jboolean useExclBind, jobject iao, int port)
{
    SOCKADDR sa;
    int sa_len = SOCKADDR_LEN;
    int rv = 0;
    if (NET_InetAddressToSockaddr(env, iao, port, (struct sockaddr *)&sa, &sa_len, preferIPv6) != 0) {
      return;
    }
    //
    rv = NET_Bind(fdval(env, fdo), (struct sockaddr *)&sa, sa_len);
    if (rv != 0) {
        handleSocketError(env, errno);
    }
}
int
NET_Bind(int fd, struct sockaddr *him, int len)
{
    //略
    rv = bind(fd, him, len);
    //略
    return rv;
}

listen()

listen()就很直白了,就相当于直接调用本地办法。

static native void listen(FileDescriptor var0, int var1) throws IOException;
JNIEXPORT void JNICALL
Java_sun_nio_ch_Net_listen(JNIEnv *env, jclass cl, jobject fdo, jint backlog)
{
    if (listen(fdval(env, fdo), backlog) < 0)
        handleSocketError(env, errno);
}

ssc.configureBlocking(false)

将文件描述符设置为NIO,简略描述下略过啦

configureBlocking(int fd, jboolean blocking)
{
    int flags = fcntl(fd, F_GETFL);
    int newflags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
    return (flags == newflags) ? 0 : fcntl(fd, F_SETFL, newflags);
}

ssc.register(selector, SelectionKey.OP_ACCEPT);

在此之前先梳理下,

  • selector:就相当于epoll结构体,添加了pipe()创立的读取端文件描述符
  • channel:封装了创立的socket,记录了套接字文件描述符,而且绑定了本地端口,而且开端监听恳求
public final SelectionKey register(Selector sel, int ops,
                                   Object att)
    throws ClosedChannelException
{
    synchronized (regLock) {
        //略
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                    //att :null 这儿先忽略
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
    if (!(var1 instanceof SelChImpl)) {
        throw new IllegalSelectorException();
    } else {
        SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
        var4.attach(var3);
        synchronized(this.publicKeys) {
            //对应不同体系的完成
            this.implRegister(var4);
        }
        var4.interestOps(var2);
        return var4;
    }
}

implRegister()

SelectionKey内部封装了channel,前面现已很明显的看到,channel创立的Socket(套接字)和epoll结构体之间并没有显示相关,在这儿,便是进行相关。下面是Epoll的完成

fdToKey是selector初始化的时分创立的类型为Map<Integer,SelectionKeyImpl>,便是将文件描述符和SelectionKey相关起来,终究会添加到EpollArrayWrapper::eventsLow,此刻events=0

protected void implRegister(SelectionKeyImpl ski) {
    if (closed)
        throw new ClosedSelectorException();
    SelChImpl ch = ski.channel;
    int fd = Integer.valueOf(ch.getFDVal());
    fdToKey.put(fd, ski);
    pollWrapper.add(fd);
    keys.add(ski);
}
//pollWrapper.add(fd);终究会调用
private void setUpdateEvents(int fd, byte events, boolean force) {
    if (fd < MAX_UPDATE_ARRAY_SIZE) {
        if ((eventsLow[fd] != KILLED) || force) {
            eventsLow[fd] = events;
        }
    } else {
        Integer key = Integer.valueOf(fd);
        if (!isEventsHighKilled(key) || force) {
            eventsHigh.put(key, Byte.valueOf(events));
        }
    }
}

在随后的var4.interestOps(var2);eventsLow数组中event从0改成POLLIN,此刻还没有调用epollCtl()添加进epoll结构体

selector.select(3000)

直接从sun.nio.ch.EPollSelectorImpl#doSelect开端

protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    processDeregisterQueue();
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

poll()

这儿就能够很容易的看出来在每次poll()的时分会先把未注册的Socket套接字,通过调用epollCtl()添加进epoll结构体中

int poll(long timeout) throws IOException {
    updateRegistrations();
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    //这一段我猜想是中止操作,由于就算设置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也会修改成false。如有dalao欢迎告诉我
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}
private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        while (j < updateCount) {
            int fd = updateDescriptors[j];
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;
            if (events != KILLED) {
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    //这儿添加进结构体
                    epollCtl(epfd, opcode, fd, events);
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
}

epollWait也便是对应的调用底层办法了,pollArrayAddress前面拓荒的内存块,这儿也就知道干什么用了,也便是对应着epoll_event结构体指针

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;
    if (timeout <= 0) {           /* Indefinite or no wait */
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }
    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}
static int
iepoll(int epfd, struct epoll_event *events, int numfds, jlong timeout)
{
    jlong start, now;
    int remaining = timeout;
    struct timeval t;
    int diff;
    gettimeofday(&t, NULL);
    start = t.tv_sec * 1000 + t.tv_usec / 1000;
    for (;;) {
        int res = epoll_wait(epfd, events, numfds, remaining);
        if (res < 0 && errno == EINTR) {
            if (remaining >= 0) {
                gettimeofday(&t, NULL);
                now = t.tv_sec * 1000 + t.tv_usec / 1000;
                diff = now - start;
                remaining -= diff;
                if (diff < 0 || remaining <= 0) {
                    return 0;
                }
                start = now;
            }
        } else {
            return res;
        }
    }
}

//这一段我猜想是中止操作,由于就算设置了true,sun.nio.ch.EPollSelectorImpl#doSelect中也会修改成false,暂时存疑。如有dalao欢迎告诉我

//sun.nio.ch.EPollArrayWrapper#poll    
for (int i=0; i<updated; i++) {
    if (getDescriptor(i) == incomingInterruptFD) {
        interruptedIndex = i;
        interrupted = true;
        break;
    }
}
//sun.nio.ch.EPollSelectorImpl#doSelect
if (pollWrapper.interrupted()) {
    // Clear the wakeup pipe
    pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
    synchronized (interruptLock) {
        pollWrapper.clearInterrupted();
        IOUtil.drain(fd0);
        interruptTriggered = false;
    }
}

updateSelectedKeys()

private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        //从pollArrayAddress找到epoll_event结构体
        int nextFD = pollWrapper.getDescriptor(i);
        //找到对应的SelectionKey
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            int rOps = pollWrapper.getEventOps(i);
            //第一次没有
            if (selectedKeys.contains(ski)) {
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    //添加进selectedKeys集合
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    return numKeysUpdated;
}

其他

到这儿,整体脉络就很清楚了剩余的这些也没必要剖析了,基本和上一篇里边的对应了,各位读者姥爷这么聪明,就不浪费大家时刻了

if(key.isReadable()){
    handleRead(key);
}
if(key.isWritable() && key.isValid()){
    handleWrite(key);
}
if(key.isConnectable()){
    System.out.println("isConnectable = true");
}
//下面是c
// 如果是新的衔接,需要把新的socket添加到efd中
            if (ep[i].data.fd == listenfd )
            {
                connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&clilen);
                tep.events = EPOLLIN;
                tep.data.fd = connfd;
                ret = epoll_ctl(efd,EPOLL_CTL_ADD,connfd,&tep);
            }
            // 不然,读取数据
            else
            {
                connfd = ep[i].data.fd;
                int bytes = read(connfd,buf,MAXLEN);
                // 客户端封闭衔接
                if (bytes == 0){
                    ret =epoll_ctl(efd,EPOLL_CTL_DEL,connfd,NULL);
                    close(connfd);
                    printf("client[%d] closed\n", i);
                }
                else
                {
                    for (int j = 0; j < bytes; ++j)
                    {
                        buf[j] = toupper(buf[j]);
                    }
                    // 向客户端发送数据
                    write(connfd,buf,bytes);
                }
            }

总结

  • Channel:对socket的封装
    • ServerSocketChannel.open();<—>socket()
    • ssc.socket().bind()<—>bind()+listen()
  • Selector:对epoll的封装
    • Selector.open()<—>epoll_create()+epoll_ctl()
    • selector.select(3000)<—>epoll_ctl()+epoll_wait()
  • SelectionKey:相关上面2个,有了文件描述符快速找到对应Socket

参阅文章

epoll:zh.wikipedia.org/wiki/Epoll

linux文档:man7.org/linux/man-p…