作者:姚辉(千习)
布景
先来谈下什么是分布式批处理,从字面来了解就是有大批量的事务数据需求使用程序去批量核算处理,而经过单机形式去履行会耗费很长的处理时刻,也不能充沛发挥事务集群中每个使用节点处理才能。经过一些常见的分布式批处理方案,能够有效地让事务集群中一切事务使用节点协同完结一个大批量数据处理的使命,然后提升全体的处理功率和处理牢靠性。
批处理模型
在简略单机场景下能够敞开多线程来一起处理一个大使命,在多个机器下能够由多台机器一起并行处理同一个使命。因而,分布式批处理方案需求为开发者在代码开发层面屏蔽上述使命切分后分发、并行履行、成果会聚、失败容错、动态扩容等事务使用集群间的分布式和谐逻辑,让使用者仅聚焦于上述红框描述的事务逻辑分片规则和事务逻辑处理即可。
大数据批处理比较
在大数据处理场景中咱们也会用到 MapReduce 模型,其处理逻辑本质与咱们要评论的事务批处理逻辑是一致的。在大数据场景下的批处理首要是面向数据自身的处理,并需求布置相应大数据渠道集群来支撑数据存储和数据批处理程序处理,因而该场景下首要目的是用于构建一个完好的数据渠道。与大数据批处理场景相比较,本次更首要聚焦评论分布式事务批处理场景,根据现有事务使用服务集群构建分布式批处理逻辑。经过分布式批处理方案能够处理以下需求
- 对耗时事务逻辑解耦,保证中心链路事务处理快速响应
- 充沛调度事务集群一切使用节点协作批量完结事务处理
- 有别于大数据处理,子使命处理进程中还会有调用其他在线事务服务参与批处理进程
开源批处理方案
ElasticJob
ElasticJob 是一款分布式使命调度结构,其首要特点是在 Quartz 基础上完结守时调度并供给在事务集群中对使命进行分片和谐处理才能。在整个架构上根据 Zookeeper 来完结使命分片履行、使用集群动态弹性调度、子使命履行高可用。分片调度模型可支撑大批量事务数据处理均衡的分发至事务集群中的每一个节点进行处理,有效地提高了使命处理功率。
- SimpleJob
Spring Boot 工程可经过 YAML 装备使命界说,指定以下内容:使命完结类、守时调度周期、分片信息。
elasticjob:
regCenter:
serverLists: 127.0.0.1:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: org.example.job.SpringBootSimpleJob
cron: 0/5 * * * * ?
overwrite: true
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
装备的 org.example.job.SpringBootSimpleJob 类需求完结 SimpleJob 接口的 execute 办法,而且经过 ShardingContext 参数获取对应事务分片数据进行事务逻辑处理。
@Component
public class SpringBootSimpleJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
String value = shardingContext.getShardingParameter();
System.out.println("simple.process->"+value);
}
}
咱们布置 3 个使用服务作为调度处理集群处理上述使命,当使命触发运转时,ElasticJob 就会将对应 3 个分片使命分别给 3 个使用服务进行处理来完结整个使命数据处理。
- DataflowJob
DataflowJob 目前来看本质上跟 SimpleJob 在全体的结构上并无本质差异。参阅如下接口,相比 SimpleJob 其增加了 fetchData 办法供事务方自行完结加载要处理的数据,实践就是将 SimpleJob 的 execute 办法在逻辑界说上拆解成两个进程。唯一差异在于 DataflowJob 供给一种常驻的数据处理使命(可称为:streaming process),支撑使命常驻运转直至 fetchData 为空。
public interface DataflowJob<T> extends ElasticJob {
/**
* Fetch to be processed data.
*
* @param shardingContext sharding context
* @return to be processed data
*/
List<T> fetchData(ShardingContext shardingContext);
/**
* Process data.
*
* @param shardingContext sharding context
* @param data to be processed data
*/
void processData(ShardingContext shardingContext, List<T> data);
}
在 DataflowJob 使命的 yaml 装备上增加 props: streaming.process=true,即可完结该使命 streaming process 的作用。当使命被触发履行后,每个分片使命将按对应流程:fetchData->processData->fetchData 循环履行直到 fetchData 为空。该形式场景分析:
- 单个分片使命待数据量大,fetchData 时读取该分片部分分页数据进行处理直至一切数据处理完毕
- 分片待数据继续发生,使使命经过 fetchData 一向获取数据,完结长时间驻留继续地进行事务数据处理
elasticjob:
regCenter:
serverLists: 127.0.0.1:2181
namespace: elasticjob-lite-springboot
jobs:
dataflowJob:
elasticJobClass: org.example.job.SpringBootDataflowJob
cron: 0/5 * * * * ?
overwrite: true
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
props:
# 敞开streaming process
streaming.process: true
- 特性分析
ElasticJob 的分布式分片调度模型,对常见简略的批处理场景供给了很大的便利支撑,处理了一个大批量事务数据处理分布式切分履行的整个和谐进程。另外在以下一些方面可能还存在些缺乏:
-
整个架构的中心取决于 ZK 稳定性
-
- 需求额定运维布置而且要保证其高可用
- 很多使命存储触发运转进程都依靠 ZK,当使命量大时 ZK 集群简单成为调度性能瓶颈
-
分片装备数量固定,不支撑动态分片
-
- 如每个分片待处理数据量差异大时,简单打破集群处理才能平衡
- 如分片界说不合理,当集群规划远大于分片数量时集群弹性失去作用
- 分片界说与事务逻辑较为割裂,人为维持两者之间联络比较麻烦
-
管控台才能弱
Spring Batch 批处理结构
Spring Batch 批处理结构,其供给轻量且完善批处理才能。Spring Batch 使命批处理框首要供给:单进程多线程处理、分布式多进程处理两种办法。在单进程多线程处理形式下,用户可自行定一个 Job 作为一个批处理使命单元,Job 是由一个或多个 Step 进程进行串联或并行组成,每一个 Step 又分别由 reader、process、writer 构成来完结每一步使命的读取、处理、输出。后续首要评论一个 Job 只包含一个 Step 的场景进行分析。
Spring Batch 结构个人觉得单进程下多线程实践含义并不是太大,首要是在较小批量数据使命处理选用该结构来完结有点费功夫,完全能够自行开线程池来处理问题。本次评论首要聚焦于必定规划的事务集群下分布式协同完结事务数据批处理使命的场景。在 Spring Batch 中供给了长途分片/分区处理才能,在 Job 的 Step 中可根据特定规则将使命拆分成多个子使命并分发给集群中其他的 worker 来处理,以完结分布式并行批处理处理才能。其长途交互才能常见是借助第三方音讯中间件来完结子使命的分发和履行成果会聚。
- 长途分块(Remote Chunking)
长途分块是 Spring Batch 在处理大批量数据使命时供给的一种分布式批处了处理方案,它能够做到在一个 Step 进程中经过 ItemReader 加载数据构建成多个 Chunk 块,并由 ItemWriter 将这多个分块经过音讯中间件或其他形式分发至集群节点,由集群使用节点对每一个 Chunk 块进行事务处理。
Remote Chunking 示例
在上述主节点 ItemReader 和 ItemWriter 能够映射为本次评论的批处理模型中的“使命拆分-split”阶段,主节点对 ItemWriter 可选用 Spring Batch Integration 供给的 ChunkMessageChannelItemWriter,该组件经过集成 Spring Integration 供给的其他通道(如:AMQP、JMS)完结批处理使命数据加载和分块分发。
@Bean
public Job remoteChunkingJob() {
return jobBuilderFactory.get("remoteChunkingJob")
.start(stepBuilderFactory.get("step2")
.<Integer, Integer>chunk(2) // 每Chunk块包含reader加载的记载数
.reader(itemReader())
// 选用ChunkMessageChannelItemWriter分发Chunk块
.writer(itemWriter())
.build())
.build();
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
}
@Bean
public ItemWriter<Integer> itemWriter() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
// 省掉了相关音讯中间件对接通道装备
Slave 节点首要是对分发过来的 Chunk 块数据(可了解为子使命)进行对应事务逻辑处理和数据成果输出。因而,在子使命处理端需求经过装备 Spring Batch Integration 供给的 ChunkProcessorChunkHandler 来完结子使命接纳、实践事务处理、反应处理成果等相关动作。
// 省掉了相关音讯中间件对接通道装备
// 接纳分块使命晋级及反应履行成果
@Bean
@ServiceActivator(inputChannel = "slaveRequests", outputChannel = "slaveReplies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessor<Integer> chunkProcessor = new SimpleChunkProcessor(slaveItemProcessor(), slaveItemWriter());
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
}
// 实践事务需求开发的使命处理逻辑processor
@Bean
public SlaveItemProcessor slaveItemProcessor(){ return new SlaveItemProcessor();}
// 实践事务需求开发的使命处理逻辑writer
@Bean
public SlaveItemWriter slaveItemWriter(){ return new SlaveItemWriter();}
- 长途分区(Remote Partitioning)
长途分区与长途分块首要差异在于 master 节点不担任数据加载,可了解为将当时 Step 经过 Partitioner 拆分出多个子 Step(也能够了解为子使命),然后经过 PartitionHandler 将对应的子使命分发给各个 Slave 节点处理,为此,Spring Batch Integration 供给了 MessageChannelPartitionHandler 来完结对应的子使命分发,其底层也是需求依靠音讯中间件等进行适配对接。在每个 Slave 节点需求读取子使命 Step 的上下文信息,根据该信息进行完好的 ItemReader、ItemProcess、ItemWrite 处理。
- 特性分析
Spring Batch 结构,归纳特性分析:
- 具有齐备批处理才能:支撑单机多线程、分布式多进程协同批处理处理,支撑自界说的分片模型。
- 缺守时调度支撑:原生无守时调度才能需集成三方守时结构(如:Spring Task 需自行处理集群重复触发)。
- 可视化管控才能弱:Spring Batch 常见选用程序或文件装备使命,管控台需额定搭建且管控才能较弱。
- 集成难度高:其分布式批处理才能需额定第三方中间件集成搭建,或根据其接口自行扩展开发;根据官方供给的办法完结企业级使用需求相对杂乱规划集成。
企业级批处理方案-SchedulerX 可视化 MapReduce 使命
SchedulerX 使命调度渠道针对企业级批处理需求供给了完善的全体处理方案,用户可直接选用公有云渠道的服务即可轻松完结事务使用集群的分布式批处理才能(用户非阿里云事务使用布置也可支撑对接),无需额定布置其他中间件集成保护。
原理分析
在整个处理方案中,使命调度渠道为用户注册的使命供给全方位的可视化管控、高牢靠守时调度以及可视化查询才能。另外,在用户事务使用侧经过集成 SchedulerX SDK,即可完结分布式批处理才能的快速接入。此刻用户仅需关心批处理模型中子使命事务切分规则、每个子使命处理逻辑即可。这个分布式批处理进程中具备以下特性:
- 子使命高可用:当集群履行节点宕机时,支撑主动 failover 将掉线机器上对子使命从头分发给其他节点
- 主动弹性扩容:当集群中有新对使用节点布置上来后,能主动参与到后续使命的履行进程中
- 可视化才能:为使命和子使命的履行进程供给各类监控运维及事务日志查询才能
下面描述下大致的原理进程:
- 在渠道创建 MapReduce 使命后,守时调度服务会为它敞开高牢靠的守时触发履行
- 当 MapReduce 使命触发履行时,调度服务会在接入上来的事务 Worker 节点中选择一个节点作为本次使命运转的主节点
- 主节点运转履行用户自界说开发的子使命切分加载逻辑,并经过 map 办法调用给集群中其他 worker 节点均衡地分发子使命处理恳求
- 主节点会监控整个分布式批处理使命的处理进程,以及每个 Worker 节点健康监控,保证全体运转高可用
- 其他各个 worker 节点在接纳子使命处理恳求后,开端回调履行用户自界说的事务逻辑,终究完结每个子使命的处理需求;而且能够装备单个使用节点一起处理子使命的并行线程数。
- 一切子使命完结后,主节点将会聚一切子使命履行成果回调 reduce 办法,并反应调度渠道记载本次履行成果
开发者只需在事务使用中完结一个 MapReduceJobProcessor 抽象类,在 isRootTask 中加载本次需求处理的事务子使命数据目标列表;在非 root 恳求中经过 jobContext.getTask()获取单个子使命目标信息,根据该信息履行事务处理逻辑。在事务使用布置发布至集群节点后,当使命触发运转时集群一切节点会参与和谐整个分布式批处理使命履行直至完结。
public class MapReduceHelloProcessor extends MapReduceJobProcessor {
@Override
public ProcessResult reduce(JobContext jobContext) throws Exception {
// 一切子使命完结的会聚逻辑处理回调,可选完结
jobContext.getTaskResults();
return new ProcessResult(true, "处理成果数量集:" + jobContext.getTaskResults().size());
}
@Override
public ProcessResult process(JobContext jobContext) throws Exception {
if (isRootTask(jobContext)) {
List<String> list = // 加载事务待处理的子使命列表
// 回调sdk供给的map办法,主动完结子使命分发
ProcessResult result = map(list, "SecondDataProcess");
return result;
} else {
// 获得单个子使命数据信息,进行单个子使命事务处理
String data = (String) jobContext.getTask();
// ... 事务逻辑处理补充 ...
return new ProcessResult(true, "数据处理成功!");
}
}
}
功用优势
- 子使命可视化才能
用户大盘:供给了一切使命的触发运转可视化记载信息。
可视化子使命概况:经过查询使命履行记载概况,可获得每一个子使命履行状况及地点节点。
- 子使命事务日志
在子使命列表中点击“日志”,能够获得当时子使命处理进程中的日志记载信息。
- 履行仓库查看
履行仓库查看功用,可用于在子使命处理进程中出现卡住一向运转未完毕的场景下,便利排查对应履行线程栈信息。
- 自界说事务标签
子使命事务标签才能,为用户供给了快速可视化的子使命事务信息查看和查询才能。鄙人图中“账户称号”是本次子使命切分出来的事务标签信息,用户可根据该信息快速了解对应事务子使命的处理状况,并支撑查询指定事务标签信息的子使命处理状况。
怎么为子使命装备自界说标签,只需对本次 map 分发的子使命目标完结 BizSubTask 接口,并完结其 labelMap 办法即可为每个子使命增加其专属的事务特征标签用于可视化查询。
public class AccountTransferProcessor extends MapReduceJobProcessor {
private static final Logger logger = LoggerFactory.getLogger("schedulerxLog");
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true);
}
@Override
public ProcessResult process(JobContext context) throws Exception {
if(isRootTask(context)){
logger.info("split task list size:20");
List<AccountInfo> list = new LinkedList();
for(int i=0; i < 20; i++){
list.add(new AccountInfo(i, "CUS"+StringUtils.leftPad(i+"", 4, "0"),
"AC"+StringUtils.leftPad(i+"", 12, "0")));
}
return map(list, "transfer");
}else {
logger.info("start biz process...");
logger.info("task info:"+context.getTask().toString());
TimeUnit.SECONDS.sleep(30L);
logger.info("start biz process end.");
return new ProcessResult(true);
}
}
}
public class AccountInfo implements BizSubTask {
private long id;
private String name;
private String accountId;
public AccountInfo(long id, String name, String accountId) {
this.id = id;
this.name = name;
this.accountId = accountId;
}
// 子使命标签信息设置
@Override
public Map<String, String> labelMap() {
Map<String, String> labelMap = new HashMap();
labelMap.put("账户称号", name);
return labelMap;
}
}
- 兼容开源
SchedulerX 支撑根据常见开源结构编写的履行器,包含:XXL-Job、ElasticJob,后续调度渠道还将方案支撑调度 Spring Batch 使命。
事例场景
分布式批处理模型(可视化 MapReduce 模型),在实践企业级使用中是有很多的需求场景存在。一些常见的使用场景如:
-
针对分库分表数据批量并行处理,将分库或分表信息作为子使命目标在集群节点间分发完结并行处理
-
按城市区域的物流订单数据处理,将城市和区域作为子使命目标在集群节点间分发完结并行处理
-
鉴于可视化 MapReduce 子使命可视化才能,可将要点客户/订单信息作为子使命处理目标,来进行相应数据报表处理或信息推送,以完结重要子使命的可视化盯梢处理
-
基金出售事务事例
以下供给一个基金出售事务事例以供参阅如果使用分布式批处理模型,以便使用者在自己的事务场景下自由发挥。事例阐明:在基金公司与基金出售公司(如:蚂蚁财富)之间每天会有投资者的账户/买卖请求数据同步处理,其往往选用的是文件数据交互,一个基金公司对接着 N 多家出售商(反之亦然),每家出售商供给的数据文件完全独立;每一个出售商的数据文件都需求经过文件校验、接口文件解析、数据校验、数据导入这么几个固定进程。基金公司在处理上述固定进程就十分合适选用分布式批处理办法以加速数据文件处理,以每个出售商为子使命目标分发至集群中,一切使用节点参与解析各自分配到的不同出售商数据文件处理。
@Component
public class FileImportJob extends MapReduceJobProcessor {
private static final Logger logger = LoggerFactory.getLogger("schedulerx");
@Override
public ProcessResult reduce(JobContext context) throws Exception {
return new ProcessResult(true);
}
@Override
public ProcessResult process(JobContext context) throws Exception {
if(isRootTask(context)){
// ---------------------------------------------------------
// Step1. 读取对接的出售商列表Code
// ---------------------------------------------------------
logger.info("以出售商为维度构建子使命列表...");
// 伪代码从数据库读取出售商列表,Agency类需求完结BizSubTask接口并可将
// 出售商称号/编码作为子使命标签,以便控制台可视化盯梢
List<Agency> agencylist = getAgencyListFromDb();
return map(agencylist, "fileImport");
}else {
// ---------------------------------------------------------
// Step2. 针对单个出售商进行对应文件数据的处理
// ---------------------------------------------------------
Agency agency = (Agency)context.getTask();
File file = loadFile(agency);
logger.info("文件加载完结.");
validFile(file);
logger.info("文件校验经过.");
List<Request> request = resolveRequest(file);
logger.info("文件数据解析完结.");
List<Request> request = checkRequest(request);
logger.info("请求数据查看经过.");
importRequest(request);
logger.info("请求数据导入完结.");
return new ProcessResult(true);
}
}
}
事例首要是将基金买卖清算中的一个事务环节,选用并行批处理办法来进行处理,其后续每一个处理环节也能够选用类似办法处理。另外,每一个可视化 MapReduce 使命节点经过 DAG 依靠编列可构建一个完好的主动事务清算流程。
总结
分布式使命调度渠道 SchedulerX 为企业级分布式批处理供给来完善的处理方案,为用户供给了快速易用的接入形式,并支撑守时调度、可视化运转盯梢、可管控简运维、高可用的调度服务,一起配套企业级监控大盘、日志服务、监控报警等才能。
参阅文献:
Spring Batch Integration:
docs.spring.io/spring-batc…
ElasticJob:
shardingsphere.apache.org/elasticjob/…
分布式使命调度 SchedulerX 使用手册:
help.aliyun.com/document_de…
SchedulerX 怎么帮助用户处理分布式使命调度:
https://mp.weixin.qq.com/s/EgyfS1Vuv4itnuxbiT7KwA