前言

本篇博文是《从0到1学习 Netty》中 NIO 系列的第五篇博文,主要内容是运用多线程对程序进行优化,充分利用 CPU 的才能,往期系列文章请访问博主的 Netty 专栏,博文中的一切代码全部收集在博主的 GitHub 库房中;

引进

这前几篇文章中,都是选用单线程进行设计,尽管能够运转,可是没有充分利用 CPU 的功能,并且假如有一个事情的处理时间较长,则会影响其他事情的处理。

例如,开发一个项目,假如团队只要一个全栈工程师,那么他需求先完结前端,再完结后端,只能墨守成规的完结任务,假如前端开发遭遇困难,花费了很多时间,则会大大拉长项目开发周期,而假如一个团队里有前端工程师和后端工程师,则前后端的开发能同步进行,这样会大大进步开发功率。

同理,对之前的代码进行优化,分两组选择器:

  • 选择一个线程配置一个选择器,作为 ‘Boss’,专门处理 accept 事情
  • 创立多个线程(最好与 CPU 核心数一直),作为 ‘Worker’,每个线程配置一个选择器,轮番处理 readwrite 等事情

【Netty】「NIO」(五)多线程优化

完结

1、创立一个 Boss 线程,负责处理 accept 事情类型:

Thread.currentThread().setName("Boss");
Selector boss = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
SelectionKey bossKey = ssc.register(boss, 0, null);
bossKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(7999));
while (true) {
    boss.select();
    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
    while (iter.hasNext()) {
        SelectionKey key = iter.next();
        iter.remove();
        if (key.isAcceptable()) {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            SocketChannel sc = channel.accept();
            sc.configureBlocking(false);
        }
    }
}

2、创立 Worker 类,用于初始化 Worker 线程和 Selector,负责处理 read 事情类型:

class Worker implements Runnable{
    private Thread thread;
    private volatile Selector worker;
    private String name;
    public Worker(String name) {
        this.name = name;
    }
    public void register() throws IOException {
        this.thread = new Thread(this, this.name);
        this.worker = Selector.open();
        this.thread.start();
    }
    @Override
    public void run() {
        while (true) {
            try {
                this.worker.select();
                Iterator<SelectionKey> iter = this.worker.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    iter.remove();
                    if (key.isReadable()) {
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        SocketChannel channel = (SocketChannel) key.channel();
                        channel.read(buffer);
                        buffer.flip();
                        debugAll(buffer);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

可是这儿会有个问题,每次进行 register() 的时候会新创立一个线程,但我们只想一个 Worker 对应一个线程,所以我们需求对上述代码进行优化,运用标志符来进行判别是否完结过初始化:

private volatile boolean start = false;
public void register() throws IOException {
    if (!this.start) {
        this.thread = new Thread(this, this.name);
        this.selector = Selector.open();
        this.thread.start();
        this.start = true;
    }
}

留意,this.worker = Selector.open();this.thread.start(); 不要写反了,不然之后运转会出现空指针异常:

Exception in thread "worker-0" java.lang.NullPointerException
        at com.sidiot.netty.c3.MultiThreadServer$Worker.run(MultiThreadServer.java:75)
        at java.base/java.lang.Thread.run(Thread.java:832)

3、将 Worker 进行相关,先创立一个 worker 线程:

Worker worker0 = new Worker("worker-0");
worker0.register();
while (true) {
    ...
    while (iter.hasNext()) {
        ...
        if (key.isAcceptable()) {
            ...
            log.debug("connected... {}", sc.getRemoteAddress());
            log.debug("before register {}", sc.getRemoteAddress());
            sc.register(worker0.selector, SelectionKey.OP_READ, null);
            log.debug("after register {}", sc.getRemoteAddress());
        }
    }
}

4、编写客户端:

public class MultiThreadClient {
    public static void main(String[] args) throws IOException {  
        SocketChannel sc = SocketChannel.open();  
        sc.connect(new InetSocketAddress("localhost", 7999));  
        sc.write(Charset.defaultCharset().encode("Hello, World! --sidiot."));  
        System.in.read();  
    }  
}

5、运转服务端和客户端,运转成果如下:

20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:50612
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:50612
20:30:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:50612

发现 worker 并没有进行作业,或者说是客户端发送的数据并没有进入到 worker 的可读事情中,这是由于在 worker 的 run() 办法运转时,SocketChannel 还没有注册到 worker 的 selector 中,导致 worker 线程在 this.selector.select(); 的方位发生了堵塞;

6、由于 sc.register 发生在 boss 线程中,而 select 发生在 worker 线程中,无法确认两个线程的执行次序,因而需求把两步操作都放入一个线程中;

SocketChannel 传到到 Worker 的 register() 办法中:

public void register(SocketChannel sc) throws IOException {
    if (!this.start) {  
        this.thread = new Thread(this, this.name);  
        this.selector = Selector.open();  
        this.thread.start();  
        this.start = true;  
    }  
    sc.register(this.selector, SelectionKey.OP_READ, null);  
}

但这样仍是不行的,由于 register() 办法仍是在 boss 线程中执行,这就需求运用行列来完结线程间的通讯了:

private ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();
public void register(SocketChannel sc) throws IOException {  
    ...
    this.queue.add(() -> {  
        try {  
            sc.register(this.selector, SelectionKey.OP_READ, null);  
        } catch (ClosedChannelException e) {  
            throw new RuntimeException(e);  
        }  
    });  
    this.selector.wakeup();
}
@Override  
public void run() {  
    while (true) {  
        try {  
            this.selector.select();  
            Runnable task = this.queue.poll();  
            if (task != null) {  
                task.run();  
            }  
            Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
            ...
    }
}

留意,这儿需求 this.selector.wakeup(); 来唤醒 selector 继续往下走;

还有另一种办法,参阅代码点击这儿;

7、将单线程 worker 转成多线程:

Worker[] workers = new Worker[4];
for (int i = 0; i < workers.length; i++) {  
    workers[i] = new Worker("worker-" + i);  
}

一起运用计数器来完结各个 worker 线程的轮询运用:

AtomicInteger index = new AtomicInteger();
while (true) {  
    ...
    while (iter.hasNext()) {  
        ...
        if (key.isAcceptable()) {  
            ...
            workers[index.getAndIncrement() % workers.length].register(sc);  
        }  
    }  
}

运转成果:

22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54668
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54668
22:36:13 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54668
22:36:13 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54668
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
+--------+-------------------------------------------------+----------------+
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54676
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54676
22:36:20 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54676
22:36:20 [DEBUG] [worker-1] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54676
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
+--------+-------------------------------------------------+----------------+
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - connected... /127.0.0.1:54687
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - before register /127.0.0.1:54687
22:36:30 [DEBUG] [Boss] c.s.n.c.MultiThreadServer - after register /127.0.0.1:54687
22:36:30 [DEBUG] [worker-0] c.s.n.c.MultiThreadServer - read... /127.0.0.1:54687
+--------+-------------------- all ------------------------+----------------+
position: [0], limit: [7]
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 73 69 64 69 6f 74 2e 00 00 00 00 00 00 00 00 00 |sidiot..........|
+--------+-------------------------------------------------+----------------+

跋文

综上所述,多线程优化是在 Java NIO 中进步体系功能和响应才能的关键手法。经过引进并发处理机制、合理的线程办理策略以及有用的同步与通讯机制,能够充分发挥 NIO 结构的优势,提高体系的功率与可扩展性。

以上便是多线程优化的一切内容了,希望本篇博文对大家有所帮助!

参阅:

  • Netty API reference;
  • 黑马程序员Netty全套教程;

上篇精讲:「NIO」(四)消息边界与可写事情

我是,期待你的关注,创作不易,请多多支持;

大众号:sidiot的技术驿站

系列专栏:探索 Netty:源码解析与应用事例共享