Kafka 是一个高吞吐量的分布式音讯中间件,可用于海量音讯的发布和订阅。

当面对大量的数据写入时,以音讯中间件接收数据,然后再批量写入届时序数据库中,这样能够将音讯中间件的高并发能力和时序数据库的高吞吐量联合起来,更好地处理海量数据的实时处理和存储问题。

本篇教程,咱们会向我们详细介绍 DolphinDB Kafka 插件的运用方法,并以一个“DolphinDB + Kafka 实时核算k线”的事例,向我们展现 DolphinDB Kafka 插件的最佳实践指南。

1. DolphinDB Kafka 插件介绍

DolphinDB Kafka 插件支撑把 DolphinDB 中出产的数据推送到 Kafka,也支撑从 Kafka订阅数据,并在DolphinDB中消费。用户能够在 DolphinDB 中实例化 Producer 目标,把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。用户也能够在 DolphinDB 中实例化 Consumer 目标,将 Kafka 中指定 Topic 的数据同步到 DolphinDB。DolphinDB Kafka 插件现在支撑以下数据类型的序列化和反序列化:

  • DolphinDB 标量
  • Kafka Java API 的内置类型:String(UTF-8) , Short , Integer , Long , Float , Double , Bytes , byte[] 以及 ByteBuffer
  • 以上数据类型所组成的向量

用 Kafka + DolphinDB 实时计算K线
用 Kafka + DolphinDB 实时计算K线

Kafka 插件现在支撑版别:relsease200, release130。本教程依据 Kafka Plugin release200 开发,请运用 DolphinDB 2.00.X 版别 server 进行相关测验。若需求测验其它版别 server,请切换至相应插件分支下载插件包进行测验。

2. 基本运用介绍

2.1 装置 DolphinDB Kafka 插件

用户可依据 DolphinDB server 版别和操作体系下载对应的已经编译好的插件文件,官方下载链接。手动编译装置能够参阅官网文档教程:DolphinDB Kafka 插件官方教程。

以 Linux 为例,下载好插件文件后需求增加动态库地址到环境变量中,留意插件装置的途径<PluginDir>,需求依据实践环境修正,本例中插件的装置途径为 /DolphinDB/server/plugins/kafka,履行命令如下:

export LD_LIBRARY_PATH="LD_LIBRARY_PATH:/DolphinDB/server/plugins/kafka"

用 Kafka + DolphinDB 实时计算K线

2.2 运用 DolphinDB Kafka Producer

语法

kafka::producer(config)
  • config:字典类型,表明DolphinDB Kafka Producer 的装备。字典的键是一个字符串,值是一个字符串或布尔值。有关 Kafka 装备的更多信息,请参阅 装备参数列表。

该函数调用后,会依据指定装备创立一个 Kafka Producer 实例,并回来句柄。

kafka::produce(producer, topic, key, value, json, [partition])
  • producer:Kafka 出产者的句柄
  • topic:Kafka 的主题
  • key:Kafka 出产者装备字典的键
  • value:Kafka 出产者装备字典的值
  • json:表明是否以 json 格式传递数据
  • partition:可选参数,整数,表明 Kafka 的 broker 分区

该函数调用后会,能够把 DolphinDB 中的数据同步到 Kafka 中指定的 Topic。

下面经过比如,展现怎么实时同步 DolphinDB 流数据表 KafkaTest 中的增量数据到 Kafka 的 dolphindb-producer-test Topic 中。

用 Kafka + DolphinDB 实时计算K线
用 Kafka + DolphinDB 实时计算K线​修正

DolphinDB 中创立 Producer 实例

DolphinDB GUI 衔接 DolphinDB 节点后履行以下脚本,加载 DolphinDB Kafka 插件:

try{loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")} catch(ex){print(ex)}

用 Kafka + DolphinDB 实时计算K线

留意:
本例中插件的装置途径为 /DolphinDB/server/plugins/kafka,用户需求依据自己实践环境进行修正。

每次发动 DolphinDB 服务后,只需手动加载一次即可。也能够设置为主动加载,参阅教程:主动加载插件教程。

DolphinDB GUI 中履行以下脚本, 创立 Producer 实例,留意需求依据实践环境装备 metadata.broker.list 参数:

producerCfg = dict(STRING, ANY)
producerCfg["metadata.broker.list"] = "192.193.168.4:9092"
producer = kafka::producer(producerCfg)

用 Kafka + DolphinDB 实时计算K线

模仿测验数据生成

DolphinDB GUI 中履行以下脚本,模仿测验数据生成:

share streamTable(take(1, 86400) as id, 2020.01.01T00:00:00 + 0..86399 as datetime, rand(1..100, 86400) as val) as `kafkaTest

用 Kafka + DolphinDB 实时计算K线

测验数据共有 86400 行,包含三列:id (INT 类型), datetime(DATETIME 类型)和 val(INT 类型),如下表所示

用 Kafka + DolphinDB 实时计算K线

Kafka 创立 Topic : dolphindb-producer-test

运用 Kafka 集群自带的 kafka-topics.sh 终端命令创立 Topic:

bin/kafka-topics.sh --create --topic dolphindb-producer-test --bootstrap-server 192.193.168.4:9092

用 Kafka + DolphinDB 实时计算K线

控制台输出成果:

Created topic dolphindb-producer-test.

用 Kafka + DolphinDB 实时计算K线

DolphinDB 流数据表中的数据同步至 Kafka

DolphinDB GUI 中履行以下脚本,声明自界说流数据表订阅的处理函数:

def sendMsgToKafkaFunc(producer, msg){
	try {
		kafka::produce(producer, "dolphindb-producer-test", 1, msg, true)
	}
	catch(ex) {
		writeLog("[Kafka Plugin] Failed to send msg to kafka with error:" +ex)
	}
}

用 Kafka + DolphinDB 实时计算K线

DolphinDB GUI 中履行以下脚本,订阅 DolphinDB 的流数据表 kafkaTest,处理函数是 sendMsgToKafkaFunc,将流数据表内的增量数据实时推送到 Kafka 的 dolphindb-producer-test Topic 中:

subscribeTable(tableName="kafkaTest", actionName="sendMsgToKafka", offset=0, handler=sendMsgToKafkaFunc{producer}, msgAsTable=true, reconnect=true)

用 Kafka + DolphinDB 实时计算K线

验证数据

运用 kafka-console-consumer 命令行东西消费 Topic 为 dolphindb-producer-test 中的数据。

履行下述句子,首先会输出流数据表中的历史数据,往流数据表中刺进新的数据后,kafka-console-consumer 会当即输出新增的数据:

./bin/kafka-console-consumer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic dolphindb-producer-test

用 Kafka + DolphinDB 实时计算K线

控制台会打印已消费的数据,输出成果如下:

... {"id":[1,1,...],"datetime":["2020.01.01T10:55:12","2020.01.01T10:55:13",...],"val":[73,74,...]} {"id":[1,1,...],"datetime":["2020.01.01T23:55:12","2020.01.01T23:55:13",...],"val":[88,1,...]} ...

用 Kafka + DolphinDB 实时计算K线

接下来在 DolphinDB GUI 中履行以下脚本,往流数据表 kafkaTest 中新刺进两条数据:

insert into kafkaTest values(2,now(),rand(1..100)) insert into kafkaTest values(2,now(),rand(1..100))

用 Kafka + DolphinDB 实时计算K线

控制台输出成果:

{"id":[2],"datetime":["2022.08.16T11:08:27"],"val":[23]}
{"id":[2],"datetime":["2022.08.16T11:10:42"],"val":[11]}

用 Kafka + DolphinDB 实时计算K线

由此验证 DolphinDB Kafka Producer 出产数据的完整性和正确性。

2.3 运用 DolphinDB Kafka Consumer

语法

kafka::consumer(config)

用 Kafka + DolphinDB 实时计算K线

  • config:字典类型,表明 Kafka 顾客的装备。字典的键是一个字符串,值是一个元组。有关 Kafka 装备的更多信息, 请参阅 Kafka 运用手册。

该函数调用后,会依据指定装备创立一个 Kafka Consumer 实例,并回来句柄。

下面经过比如,展现怎么在 DolphinDB 中订阅消费 kafka 中 Topic 为 dolphindb-consumer-test 的数据,将其实时同步到流数据表 KafkaTest 中。

用 Kafka + DolphinDB 实时计算K线

DolphinDB 中创立 Consumer 实例

DolphinDB GUI 中履行以下脚本, 创立 Consumer 实例:

consumerCfg = dict(string, any)
consumerCfg["metadata.broker.list"] = "192.193.168.4:9092"
consumerCfg["group.id"] = "test"
consumer = kafka::consumer(consumerCfg)

用 Kafka + DolphinDB 实时计算K线

DolphinDB 中消费数据

DolphinDB GUI 中履行以下脚本,创立一张同享内存表 kafkaTest

share table(1:0,`id`timestamp`val,[INT,TIMESTAMP,INT]) as `kafkaTest

用 Kafka + DolphinDB 实时计算K线

DolphinDB GUI 中履行以下脚本,订阅 Kafka 中的 dolphindb-consumer-test 主题的数据:

topics = ["dolphindb-consumer-test"]
kafka::subscribe(consumer, topics)

用 Kafka + DolphinDB 实时计算K线

留意:订阅函数支撑传入一个string类型的向量,完成一起订阅多个topic

DolphinDB GUI 中履行以下脚本,界说多线程轮询处理消费行列:

def parse(x) {
        dict = parseExpr(x).eval()
        return table(dict[`id] as `id, dict[`timestamp] as `datetime, dict[`val] as `val)
}
conn = kafka::createSubJob(consumer, kafkaTest, parse, "kafka consumer")

用 Kafka + DolphinDB 实时计算K线

DolphinDB GUI 中履行以下脚本,检查订阅状态:

kafka::getJobStat()

用 Kafka + DolphinDB 实时计算K线

回来:

用 Kafka + DolphinDB 实时计算K线

验证数据

运用 kafka-console-producer 终端东西,在控制台输入音讯出产到 kafka:

./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --topic dolphindb-consumer-test

用 Kafka + DolphinDB 实时计算K线

控制台输入音讯:

{"id":1001,"timestamp":1660920813123,"val":1000}
{"id":1001,"timestamp":1660920814123,"val":2000}
{"id":1001,"timestamp":1660920815123,"val":3000}

用 Kafka + DolphinDB 实时计算K线

用 Kafka + DolphinDB 实时计算K线

经过 DolphinDB GUI 检查流数据表中的成果,如下图所示:

用 Kafka + DolphinDB 实时计算K线

用 Kafka + DolphinDB 实时计算K线

由此验证 DolphinDB Kafka Consumer 消费数据的完整性和正确性,消费吞吐量相关的信息见第四章。

DolphinDB GUI 中履行以下脚本,撤销订阅:

kafka::cancelSubJob(conn)

用 Kafka + DolphinDB 实时计算K线

3. 经过 Kafka 插件实时核算K线

3.1 环境预备

  • 布置 DolphinDB 集群,版别为v2.00.7

    • 快速体验,参阅 单节点布置 发动 DolphinDB 单节点。
    • 出产环境,参阅 高可用集群布置 完成 DolphinDB 高可用集群搭建。
  • 布置 Kafka 集群,版别为 2.13-3.1.0

    • 快速体验,参阅 Apache Kakfa Quickstart 发动 Kafka 节点。
    • 出产环境,参阅 Running Kafka in Production 完成 Kafka 集群搭建。

3.2 出产数据

本节经过 DolphinDB 的 replay 历史数据回放东西和 Kafka 插件,把逐笔成交数据实时发送到 Kafka 中。

Kafka 创立 Topic :topic-message

运用 Kafka 集群自带的 kafka-topics.sh 终端命令创立 Topic:

./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092

用 Kafka + DolphinDB 实时计算K线

控制台输出成果:

Created topic topic-message.

用 Kafka + DolphinDB 实时计算K线

加载 Kafka 插件并创立 Kafka Producer

DolphinDB GUI 衔接 DolphinDB 节点后履行以下脚本:

// 加载插件
path = "/DolphinDB/server/plugins/kafka"
loadPlugin(path + "/PluginKafka.txt")
loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");
// 界说创立 Kafka Producer 的函数
def initKafkaProducerFunc(metadataBrokerList){
	producerCfg = dict(STRING, ANY)
	producerCfg["metadata.broker.list"] = metadataBrokerList
	return kafka::producer(producerCfg)
}
// 创立 Kafka Producer 并回来句柄
producer = initKafkaProducerFunc("192.193.168.5:8992")

用 Kafka + DolphinDB 实时计算K线

  • 本例中插件的装置途径为 /DolphinDB/server/plugins/kafka,用户需求依据自己实践环境进行修正。
  • 推荐 Kafka server 和 DolphinDB Server 在同一网段中。

推送数据到 Kafka Topic

DolphinDB GUI 中履行以下脚本:

// 界说推送数据到 KafKa "topic-message" Topic 的函数
def sendMsgToKafkaFunc(dataType, producer, msg){
	startTime = now()
	try {
		for(i in msg){
			kafka::produce(producer, "topic-message", 1, i, true) 
		}		
		cost = now() - startTime
		writeLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")
	} 
	catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}
}
// 创立 DolphinDB 流数据表 tickStream
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
share(streamTable(35000000:0, colName, colType), `tickStream)
// 订阅 tickStream,处理函数是 sendMsgToKafkaFunc
subscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc{`tick, producer}, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)
getHomeDir()
// 控速回放 DolphinDB 分布式中的历史数据至 tickStream
dbName = "dfs://SH_TSDB_tick"
tbName = "tick"
replayDay =  2021.12.08
testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityID
submitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)

用 Kafka + DolphinDB 实时计算K线

kafka::produce 函数会将任意表结构的 msg 以 json 格式发送至指定的 Kafka topic。此处的 writeLog 函数会在 DolphinDB 节点的运行日志中打印每批推送的状况,便利代码调试和运维调查。

能够运用 kafka-console-consumer 命令行东西消费 Topic 为 topic-message 中的数据,验证数据是否成功写入:

./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message

用 Kafka + DolphinDB 实时计算K线

3.3 消费数据

创立顾客,主题并进行订阅

DolphinDB GUI 中履行以下脚本:

// 创立 Kafka Consumer 并回来句柄
consumerCfg = dict(STRING, ANY)
consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"
consumerCfg["group.id"] = "topic-message"
consumer = kafka::consumer(consumerCfg)
// 订阅 Kafka 中的 "topic-message" 主题的数据
topics = ["topic-message"]
kafka::subscribe(consumer, topics);

用 Kafka + DolphinDB 实时计算K线

DolphinDB 订阅 Kafka音讯行列中数据

DolphinDB GUI 中履行以下脚本:

// 订阅 Kafka 发布音讯,写入流表 tickStream_kafka
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
share(streamTable(35000000:0, colName, colType), `tickStreamkafka)
go
// Kafka 音讯解析函数
def parse(mutable dictVar, mutable tickStreamkafka){
	try{
		t = dictVar
		t.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))
		tickStreamkafka.append!(t);
	}catch(ex){
		print("kafka errors : " + ex)
	}
}
colType[1] = STRING;
decoder = EncoderDecoder::jsonDecoder(colName, colType, parse{, tickStreamkafka}, 15, 100000, 0.5)
// 创立 subjob 函数
conn =  kafka::createSubJob(consumer, , decoder, "topic-message")

用 Kafka + DolphinDB 实时计算K线

3.4 流核算引擎实时核算 K 线

运用 DolphinDB 内置流核算引擎核算分钟 K 线,并将成果输出到名为 OHLCVwap的成果表中。

DolphinDB GUI 中履行以下脚本:

// 创立接收实时核算成果的流数据表
colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Vwap
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
share(streamTable(2000000:0, colName, colType), `OHLCStream)
// K 线指标核算元表达式
aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>
// 创立引擎并将 kafka 中订阅的数据注入流核算引擎
createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)
subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVwap"), msgAsTable=true, batchSize=1000, throttle=1, hash=0)

用 Kafka + DolphinDB 实时计算K线

  • 设置参数 offset 为 – 1,订阅将会从提交订阅时流数据表的当前行开始。
  • 设置 useSystemTime=true,表明时间序列引擎会按照数据注入时间序列引擎的时间(毫秒精度的本地体系时间,与数据中的时间列无关),每隔固定时间截取固定长度窗口的流数据进行核算。

4. 功能测验

4.1 硬件环境

用 Kafka + DolphinDB 实时计算K线

4.2 软件环境

  • DolphinDB:2.00.7
  • 内核版别: Linux 3.10.0-1160.el7.x86_64
  • 操作体系版别:CentOS Linux 7 (Core)
  • Kafka版别:2.13-3.1.0
  • JDK:1.8

4.3 测验成果

测验数据表结构如下:

用 Kafka + DolphinDB 实时计算K线

测验成果如下:

用 Kafka + DolphinDB 实时计算K线

测验成果阐明

  • 测验环境为出产等级的常用装备,意图是下降用户选型评估成本
  • 测验成果为履行10次取平均值
  • 指标 RPS 是指每秒消费的记录数

4.4 测验流程

相关阐明

  • 发动测验前清空一切数据。
  • 每次测验先把一切数据写入 Kafka,再加载 Kafka 插件同步数据到 DolphinDB中。意图是将同步数据的压力全部会集到 Kafka 插件。
  • 以Kafka插件从收到第一批数据到收到最终一批数据的时间差作为同步数据的总耗时。

测验流程

  • 加载 Kafka 插件并创立 Kafka Producer 发送数据到 Kafka 中(以发送100万条数据为例)

DolphinDB GUI 衔接 DolphinDB 履行以下脚本,本例中插件的装置途径为 /DolphinDB/server/plugins/kafka,用户需求依据自己实践环境进行修正:

// 加载插件
try{
	loadPlugin("/DolphinDB/server/plugins/kafka/PluginKafka.txt")
	loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt")
} catch(ex){print(ex)}
// 创立 Producer
producerCfg = dict(STRING, ANY);
producerCfg["metadata.broker.list"] = "192.193.168.5:8992";
producer = kafka::producer(producerCfg);
kafka::producerFlush(producer);
//向kafka传100万数据
tbl = table("R5L1B3T1N03D01" as deviceId, "2022-02-22 13:55:47.377" as timestamps, "voltage" as deviceType , 1.5 as value )
// 创立 Consume
consumerCfg = dict(STRING, ANY)
consumerCfg["group.id"] = "test10"
consumerCfg["metadata.broker.list"] = "192.193.168.5:8992";
for(i in 1..1000000) {
	aclMsg = select *, string(now()) as pluginSendTime from tbl;
	for(i in aclMsg) {
		kafka::produce(producer, "test3", "1", i, true);
	}
}
consumer = kafka::consumer(consumerCfg)
topics=["test10"];
kafka::subscribe(consumer, topics);
for(i in 1..1000000) {
	aclMsg = select *, string(now()) as pluginSendTime from tbl;
	for(i in aclMsg) {
		kafka::produce(producer, "test10", "1", i, true);
	}
}
  • 订阅 Kafka 中数据进行消费

    // 创立存储解析完数据的表 colDefName = [“deviceId”,”timestamps”,”deviceType”,”value”, “pluginSendTime”, “pluginReceived”]

    colDefType = [SYMBOL,TIMESTAMP,SYMBOL,DOUBLE,TIMESTAMP,TIMESTAMP] dest = table(1:0, colDefName, colDefType); share dest as `streamZd

    // 解析函数 def temporalHandle(mutable dictVar, mutable dest){ try{ t = dictVar t.replaceColumn!(timestamps, temporalParse(dictVar[timestamps],”yyyy-MM-dd HH:mm:ss.SSS”)) t.replaceColumn!(pluginSendTime, timestamp(dictVar[pluginSendTime])) t.update!(`received, now()); dest.append!(t); }catch(ex){ print(“kafka errors : ” + ex) } }

    // 创立 decoder name = colDefName[0:5]; type = colDefType[0:5]; type[1] = STRING; type[4] = STRING; decoder = EncoderDecoder::jsonDecoder(name, type, temporalHandle{, dest}, 15, 100000, 0.5)

    // 创立subjob函数 kafka::createSubJob(consumer, , decoder, `DecoderKafka)

此刻咱们调查同享流表的数据量,当到达 100 万条时阐明消费完成,测验完毕。

5. Kerberos 认证

5.1 什么是 Kerberos ?

Kerberos 是一种依据加密 Ticket 的身份认证协议,支撑双向认证且功能较高。主要有三个组成部分:Kdc, Client 和 Service。

出产环境的 Kafka 一般需求开启 Kerberos 认证,为 Kafka 提供权限办理,进步安全性。

用 Kafka + DolphinDB 实时计算K线

5.2 前置条件

  • Java 8+
  • kerberos:包含 Kdc 和 Client
  • keytab 证书

5.3 认证相关装备阐明

环境相关装备阐明

以下是 Kerberos 认证涉及的要害装备信息,详细装备文件的途径依据实践状况调整

  1. 装置 kdc

    yum install -y krb5-server krb5-libs krb5-workstation krb5-devel krb5-auth-dialog

用 Kafka + DolphinDB 实时计算K线

2. 装备 /etc/krb5.conf

[realms]
 HADOOP.COM = {
  kdc = cnserver9:88
  admin_server = cnserver9:749
  default_domain = HADOOP.COM
 }

用 Kafka + DolphinDB 实时计算K线

3. 装备 /var/kerberos/krb5kdc/kadm5.acl

*/admin@HADOOP.COM	*

用 Kafka + DolphinDB 实时计算K线

4. 创立生成 kdc 数据库文件

sudo kdb5_util create -r HADOOP.COM –s

用 Kafka + DolphinDB 实时计算K线

5. 发动 kerberos 服务

sudo systemctl start krb5kdc
sudo systemctl status krb5kdc

用 Kafka + DolphinDB 实时计算K线

6. 装置 kerberos 客户端

yum install -y krb5-devel krb5-workstation krb5-client

用 Kafka + DolphinDB 实时计算K线

7. 发动 kerberos 客户端

sudo kadmin.local -q "addprinc hadoop/admin"

用 Kafka + DolphinDB 实时计算K线

DolphinDB Kafka Plugin 装备阐明

  • 要害装备参数阐明

    • security.protocol:指定通信协议
    • sasl.kerberos.service.name:指定 service 名称
    • sasl.mechanism:SASL机制,包含 GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER
    • sasl.kerberos.keytab:keytab 文件的途径
    • sasl.kerberos.principal:指定 principal
  • 详细代码完成

    // 加载插件 try{loadPlugin(“/path/to/DolphinDBPlugin/kafka/bin/linux/PluginKafka.txt”)} catch(ex){print(ex)}

    // 出产者装备 producerCfg = dict(STRING, ANY); producerCfg[“bootstrap.servers”] = “cnserver9:9992”; producerCfg[“security.protocol”] = “SASL_PLAINTEXT”; producerCfg[“sasl.kerberos.service.name”] = “kafka”; producerCfg[“sasl.mechanism”] = “GSSAPI”; producerCfg[“sasl.kerberos.keytab”] = “/home/test/hadoop.keytab”; producerCfg[“sasl.kerberos.principal”] = “kafka/cnserver9@HADOOP.COM”; producer = kafka::producer(producerCfg);

    // 顾客装备 consumerCfg = dict(STRING, ANY) consumerCfg[“group.id”] = “test” consumerCfg[“bootstrap.servers”] = “cnserver9:9992”; consumerCfg[“security.protocol”] = “SASL_PLAINTEXT”; consumerCfg[“sasl.kerberos.service.name”] = “kafka”; consumerCfg[“sasl.mechanism”] = “GSSAPI”; consumerCfg[“sasl.kerberos.keytab”] = “/home/test/hadoop.keytab”; consumerCfg[“sasl.kerberos.principal”] = “kafka/cnserver9@HADOOP.COM”; consumer = kafka::consumer(consumerCfg)

用 Kafka + DolphinDB 实时计算K线

留意:适配 Kerberos 认证只需修正 Kafka 插件有关出产者和顾客的装备即可,其余脚本无需改动。

6. 其他阐明

本教程展现了 DolphinDB Kafka Plugin 中常用的接口函数,完整的函数支撑请参阅官网文档:DolphinDB Kafka 插件官方教程

运用过程中假如遇到任何问题,欢迎我们在项目仓库反应

  • Github 仓库:DolphinDB Kafka Plugin
  • Gitee 仓库:DolphinDB Kafka Plugin

7. 参阅链接

  • github.com/edenhill/li…
  • github.com/edenhill/li…
  • help.ubuntu.com/community/K…