上一篇2. xxl-job源码剖析-中心XxlJobExecutor咱们介绍了XxlJobExecutor。里面其实有个很重要的点其时没有深化剖析,便是在executor-server (rpc provider)部分,其时仅仅介绍了embedServer = new EmbedServer()就没持续深化了。可是个人以为这儿面的内容仍是许多的,所以本篇详细讲解下这个executor-server的内容。

咱们首要从这个embedServer.start开始。

EmbedServer的start办法

EmbedServer究竟是如何发动的呢?也能够说这个start办法究竟做了啥?咱们带着问题往下看。

public void start(final String address, final int port, final String appname, final String accessToken) {
    // 实现事务操作功用
    executorBiz = new ExecutorBizImpl();
    // 创建一个线程
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            // 选用netty进行网络服务
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                0,
                200,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                    }
                });
            try {
                // 敞开网络服务
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                // 闲暇检测
                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                // 支撑http协议
                                .addLast(new HttpServerCodec())
                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                // 事务逻辑处理
                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
                // bind
                ChannelFuture future = bootstrap.bind(port).sync();
                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
                // 发动注册
                startRegistry(appname, address);
                // wait util stop
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
    // 设置为后台线程
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    // 发动线程
    thread.start();
}

看到了发动的源码,你是否会茅塞顿开,本来便是创建了一个Netty服务端,监听端口。然后由Netty来帮忙进行Http协议的编码和解码。而咱们只需求关注事务,也便是EmbedHttpServerHandler的处理逻辑。

public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
    private ExecutorBiz executorBiz;
    private String accessToken;
    private ThreadPoolExecutor bizThreadPool;
    public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
        this.executorBiz = executorBiz;
        this.accessToken = accessToken;
        this.bizThreadPool = bizThreadPool;
    }
    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        // 获取调用中心发送过来的请求
        String requestData = msg.content().toString(CharsetUtil.UTF_8);
        String uri = msg.uri();
        HttpMethod httpMethod = msg.method();
        boolean keepAlive = HttpUtil.isKeepAlive(msg);
        String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
        // 由线程池进行异步处理,避免堵塞IO
        bizThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                // 处理请求
                Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
                // 得到的结果转成JSON
                String responseJson = GsonTool.toJson(responseObj);
                // 回来给调度中心
                writeResponse(ctx, keepAlive, responseJson);
            }
        });
    }
    // 详细的处理逻辑
    private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        // 校验POST请求
        if (HttpMethod.POST != httpMethod) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
        }
        if (uri == null || uri.trim().length() == 0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if (accessToken != null
            && accessToken.trim().length() > 0
            && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        // 依据uri进行不同的处理,不过这些处理逻辑全部委托给了executorBiz
        try {
            switch (uri) {
                case "/beat":
                    return executorBiz.beat();
                case "/idleBeat":
                    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                    return executorBiz.idleBeat(idleBeatParam);
                case "/run":
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                case "/kill":
                    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                    return executorBiz.kill(killParam);
                case "/log":
                    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                    return executorBiz.log(logParam);
                default:
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
        }
    }
    /**
     * 写入回来的http的响应报文
     */
    private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
        // write response
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.writeAndFlush(response);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
        ctx.close();
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ctx.channel().close();      // beat 3N, close if idle
            logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

EmbedHttpServerHandler的逻辑也不复杂,主要是对整理流程和功用进行了定义,其实主要的报文处理逻辑仍是委托executorBiz进行处理。

作为一个客户端,最重要的便是履行调度中心发过来的指令,所以这儿要点剖析一下executorBiz.run(triggerParam)

ExecutorBizImplrun办法

executorBizExecutorBizImpl类的一个实例。直接查看它的run办法的实现逻辑

public ReturnT<String> run(TriggerParam triggerParam) {
    // 依据jobId获取JobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;
    // 获取履行处理的类型
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {
        // 假设是spring bean的类型
        // 依据handler的姓名得到JobHandler处理类
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        // 校验jobThread中的Handler是否和参数中的传的一致
        if (jobThread!=null && jobHandler != newJobHandler) {
            // 假设不一致,阐明handler改变了,需求kill之前的
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            // 重置
            jobThread = null;
            jobHandler = null;
        }
        // 从头赋值handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
        // 疏忽groovy的校验赋值代码,类似于bean的
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
        // 疏忽脚本类的校验赋值代码,类似于bean的
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }
    // 处理堵塞战略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
    // 从头注册,设置jobThread
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
    // 将需求履行的参数放入堵塞行列等候线程履行,异步处理
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

上面代码前部分都是校验内容,最后履行了一个jobThread.pushTriggerQueue(triggerParam)办法,将参数放入到堵塞行列中,等候'JobThread的履行。

JobThread

JobThread继承了Thread类,内部维护着一个无界的堵塞行列LinkedBlockingQueue<TriggerParam> triggerQueue。其要点便是线程的run办法。

@Override
public void run() {
    // 调用init办法,也便是@XxlJob注解中装备的init办法
    try {
        handler.init();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }
    // 判断线程是否中止,由于内部是个堵塞行列,会一直等候履行,当强制kill的时分,需求进行中止
    while(!toStop){
        // 默许还没有跑使命
        running = false;
        // 闲暇次数累加
        idleTimes++;
        TriggerParam triggerParam = null;
        try {
            // 要查看 toStop 信号,咱们需求循环,所以不能运用 queue.take(),instand of poll(timeout)
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                // 不为空的状况,阐明需求履行使命了
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());
                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                    triggerParam.getJobId(),
                    triggerParam.getExecutorParams(),
                    logFileName,
                    triggerParam.getBroadcastIndex(),
                    triggerParam.getBroadcastTotal());
                // 初始化上下文,相当于在履行的时分放入参数
                XxlJobContext.setXxlJobContext(xxlJobContext);
                // 记录日志
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
                if (triggerParam.getExecutorTimeout() > 0) {
                    // 假设有超时时刻,需求在超时后中止
                    Thread futureThread = null;
                    try {
                        // 包装成FutureTask,发动另一个线程,进行线程的超时中止处理
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {
                                // init job context
                                XxlJobContext.setXxlJobContext(xxlJobContext);
                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();
                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {
                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);
                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // 无超时时刻的,直接履行
                    handler.execute();
                }
                // 校验履行结果
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                        ?tempHandleMsg.substring(0, 50000).concat("...")
                        :tempHandleMsg;
                    // 在context中设置结果信息
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                // 打印日志
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                                 + XxlJobContext.getXxlJobContext().getHandleCode()
                                 + ", handleMsg = "
                                 + XxlJobContext.getXxlJobContext().getHandleMsg()
                                );
            } else {
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }
            // handle result
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();
            XxlJobHelper.handleFail(errorMsg);
            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // callback handler info
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg() )
                     );
                } else {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.HANDLE_CODE_FAIL,
                        stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }
    // 假设强制中止,行列还有未处理的数据的时分
    while(triggerQueue !=null && triggerQueue.size()>0){
        // 行列中的回调触发器请求
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                triggerParam.getLogId(),
                triggerParam.getLogDateTime(),
                XxlJobContext.HANDLE_CODE_FAIL,
                stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }
    // 调用destroy办法,也便是@XxlJob注解中装备的destroy办法
    try {
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }
    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

通过上面的代码剖析,我觉得有几个写的很好的点能够参考学习的:

  1. 它的toStop变量。这个toStop变量是个volatile boolean类型的,确保可见性。这个并没有什么特别的,只要在while中进行判断,都需求运用volatile类型。可是它在判断toStop为false的时分,也便是或许被中止了,这个办法还做了处理,进行了一个回调。而不是直接回来,做清空行列的操作,我以为这点很好,毕竟遇到了意外的状况,仍是有地方能够查到的。
  2. triggerQueue中获取行列中的值的时分,运用了poll(3L, TimeUnit.SECONDS),并没有直接运用take()。假设运用take()的话,没有元素会一直堵塞,这样即使是toStop已经变了,也无法检测到,由于堵塞了,无法持续往后履行了,这儿留了一个口儿,3s后假设没有poll到会立马回来null,确保3s后会进行一个toStop检测。
  3. 线程中还运用了一个idleTimes闲暇次数的检测。假设超过了30次,就移除毁掉这个线程。避免一些无用的线程一直在等候。由于在之前剖析ExecutorBizImplrun办法中也能够看到有个重置后从头注册的一个进程,这个registJobThread会创建新的线程,假设改变了handler而不清理之前的线程,会导致线程越来越多。

总结

EmbedServer的start办法到最后履行的事务逻辑的JobThreadIJobHandlerexecute办法。咱们逐层深化,从网络模型到线程模型,包括线程行列的堵塞,超时,中止。这儿面的许多内容都值得咱们进行深化的考虑和学习的。假设咱们也写一个网络程序,应该如何处理网络请求,应该如何选用异步处理来增大吞吐量,提高程序运行的效率。还要如何做好线程和异常的善后工作。相信读者在阅读完源码后,一定会有着自己的考虑。