作者:vivo 互联网查找团队- Deng Jie

Kafka中的实时数据是以Topic的概念进行分类存储,而Topic的数据是有必定时效性的,比方保存24小时、36小时、48小时等。而在定位一些实时数据的Case时,假如没有对实时数据进行前史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。

一、背景

Kafka中的实时数据是以Topic的概念进行分类存储,而Topic的数据是有必定时效性的,比方保存24小时、36小时、48小时等。而在定位一些实时数据的Case时,假如没有对实时数据进行前史归档,在排查问题时,没有日志追述,会很难定位是哪个环节的问题。因此,咱们需求对处理的这些实时数据进行记载归档并存储。

二、内容

2.1 案例剖析

这儿以i视频和vivo短视频实时数据为例,之前存在这样的协作问题:

数据上游内容方供给实时Topic(存放i视频和vivo短视频相关实时数据),数据侧对实时数据进行逻辑处理后,发送给下流工程去建库实时索引,当使命履行一段时刻后,工程侧建索引偶然会提出数据没有发送过去的Case,前期由于没有对数据做存储,在定位问题的时分会比较麻烦,常常需求查看实时日志,需求花费很长的时刻来剖析这些Case是出现在哪个环节。

为了处理这个问题,咱们能够将实时Topic中的数据,在发送给其他Topic的时分,增加盯梢机制,进行数据分流,Sink到存储介质(比方HDFS、Hive等)。这儿,咱们挑选运用Hive来进行存储,主要是查询便利,支撑SQL来快速查询。如下图所示:

Kafka实时数据即席查询应用与实践

在完结优化后的计划时,有两种办法能够完结盯梢机制,它们分别是Flink SQL写Hive、Flink DataStream写Hive。接下来,分别对这两种完结计划进行介绍和实践。

2.2 计划一:Flink SQL写Hive

这种办法比较直接,能够在Flink使命里边直接操作实时Topic数据后,将消费后的数据进行分流盯梢,作为日志记载写入到Hive表中,详细完结过程如下:

  • 结构Hive Catalog;

  • 创立Hive表;

  • 写入实时数据到Hive表。

2.2.1 结构Hive Catalog

在结构Hive Catalog时,需求初始化Hive的相关信息,部分代码片段如下所示:

// 设置履行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
 StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,settings);
 // 结构 Hive Catalog 称号
 String name = "video-hive-catalog";
 // 初始化数据库名
 String defaultDatabase = "comsearch";
 // Hive 装备文件途径地址
 String hiveConfDir = "/appcom/hive/conf";
 // Hive 版别号
 String version = "3.1.2";
 // 实例化一个 HiveCatalog 目标
 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
 // 注册HiveCatalog
 tEnv.registerCatalog(name, hive);
 // 设定当时 HiveCatalog
 tEnv.useCatalog(name);
 // 设置履行SQL为Hive
 tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
 // 运用数据库
 tEnv.useDatabase("db1");

在以上代码中,咱们首先设置了 Flink 的履行环境和表环境,然后创立了一个 HiveCatalog,并将其注册到表环境中。

2.2.2 创立Hive表

假如Hive表不存在,能够经过在程序中履行建表句子,详细SQL见表句子代码如下所示:

-- 创立表句子
tEnv.executeSql("CREATE TABLE IF NOT EXISTS TABLE `xxx_table`(
  `content_id` string,
  `status` int)
PARTITIONED BY (
  `dt` string,
  `h` string,
  `m` string)
stored as ORC
TBLPROPERTIES (
  'auto-compaction'='true',
  'sink.partition-commit.policy.kind'='metastore,success-file',
  'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00'
)")

在创立Hive表时咱们运用了IF NOT EXISTS关键字,假如Hive中该表不存在会主动在Hive上创立,也能够提前在Hive中创立好该表,Flink SQL中就无需再履行建表SQL,由于用了Hive的Catalog,Flink SQL运行时会找到表。这儿,咱们设置了auto-compaction特点为true,用来使小文件主动兼并,1.12版的新特性,处理了实时写Hive产生的小文件问题。一起,指定metastore值是专门用于写入Hive的,也需求指定success-file值,这样CheckPoint触发完数据写入磁盘后会创立_SUCCESS文件以及Hive metastore上创立元数据,这样Hive才能够对这些写入的数据可查。

2.2.3 写入实时数据到Hive表

在预备完结2.2.1和2.2.2中的过程后,接下来就能够在Flink使命中经过SQL来对实时数据进行操作了,详细完结代码片段如下所示:

// 编写事务SQL
 String insertSql = "insert into  xxx_table SELECT content_id, status, " +
                    " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM xxx_rt";
 // 履行 Hive SQL
 tEnv.executeSql(insertSql);
 // 履行使命
 env.execute();

将消费后的数据进行分类,编写事务SQL句子,将消费的数据作为日志记载,发送到Hive表进行存储,这样Kafka中的实时数据就存储到Hive了,便利运用Hive来对Kafka数据进行即席剖析。

2.2.4 避坑技巧

运用这种办法在处理的过程中,假如装备运用的是EventTime,在程序中装备’sink.partition-commit.trigger’=’partition-time’,最终会出现无法提交分区的情况。经过对源代码PartitionTimeCommitTigger的剖析,找到了出现这种异常情况的原因。

咱们能够经过看

org.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitionsorg.apache.flink.table.filesystem.stream.PartitionTimeCommitTigger#committablePartitions

中的一个函数,来说明详细的问题,部分源代码片段如下:

// PartitionTimeCommitTigger源代码函数代码片段
@Override
public List<String> committablePartitions(long checkpointId) {
 if (!watermarks.containsKey(checkpointId)) {
  throw new IllegalArgumentException(String.format(
    "Checkpoint(%d) has not been snapshot. The watermark information is: %s.",
    checkpointId, watermarks));
 }
 long watermark = watermarks.get(checkpointId);
 watermarks.headMap(checkpointId, true).clear();
 List<String> needCommit = new ArrayList<>();
 Iterator<String> iter = pendingPartitions.iterator();
 while (iter.hasNext()) {
  String partition = iter.next();
  // 经过分区的值来获取分区的时刻
  LocalDateTime partTime = extractor.extract(
    partitionKeys, extractPartitionValues(new Path(partition)));
  // 判别水印是否大于分区创立时刻+延迟时刻
  if (watermark > toMills(partTime) + commitDelay) {
   needCommit.add(partition);
   iter.remove();
  }
 }
 return needCommit;
}

经过剖析上述代码片段,咱们能够知道体系经过分区值来抽取相应的分区来创立时刻,然后进行比对,比方咱们设置的时刻 pattern 是 ‘dtdt h:$m:00′ , 某一时刻咱们正在往 /2022-02-26/18/20/ 这个分区下写数据,那么程序依据分区值,得到的 pattern 将会是2022-02-26 18:20:00,这个值在SQL中是依据 DATA_FORMAT 函数获取的。

而这个值是带有时区的,比方咱们的时区设置为东八区,2022-02-26 18:20:00这个时刻是东八区的时刻,换成规范 UTC 时刻是减去8个小时,也便是2022-02-26 10:20:00,而在源代码中的 toMills 函数在处理这个东八区的时刻时,并没有对时区进行处理,把这个其实应该是东八区的时刻当做了 UTC 时刻来处理,这样核算出来的值就比实践值大8小时,导致一直没有触发分区的提交。

假如咱们在数据源中结构的分区是 UTC 时刻,也便是不带分区的时刻,那么这个逻辑便是没有问题的,但是这样又不契合咱们的实践情况,比方对于分区2022-02-26 18:20:00,我期望我的分区肯定是东八区的时刻,而不是比东八区小8个小时的UTC时刻2022-02-26 10:20:00。

在理解了原因之后,咱们就能够针对上述异常情况进行优化咱们的完结计划,比方自界说一个分区类、或许修正缺省的时刻分区类。比方,咱们运用TimeZoneTableFunction类来完结一个自界说时区,部分参阅代码片段如下:

public class CustomTimeZoneTableFunction implements TimeZoneTableFunction {
  private transient DateTimeFormatter formatter;
  private String timeZoneId;
  public CustomTimeZoneTableFunction(String timeZoneId) {
    this.timeZoneId = timeZoneId;
  }
  @Override
  public void open(FunctionContext context) throws Exception {
    // 初始化 DateTimeFormatter 目标
    formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:00");
    formatter = formatter.withZone(ZoneId.of(timeZoneId));
  }
  @Override
  public void eval(Long timestamp, Collector<TimestampWithTimeZone> out) {
    // 将时刻戳转换为 LocalDateTime 目标
    LocalDateTime localDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC);
    // 将 LocalDateTime 目标转换为指定时区下的 LocalDateTime 目标
    LocalDateTime targetDateTime = localDateTime.atZone(ZoneId.of(timeZoneId)).toLocalDateTime();
    // 将 LocalDateTime 目标转换为 TimestampWithTimeZone 目标,并输出到下流
    out.collect(TimestampWithTimeZone.fromLocalDateTime(targetDateTime, ZoneId.of(timeZoneId)));
  }
}

2.3 计划二:Flink DataStream写Hive

在一些特殊的场景下,Flink SQL假如无法完结咱们杂乱的事务需求,那么咱们能够考虑运用Flink DataStream写Hive这种完结计划。比方如下事务场景,现在需求完结这样一个事务需求,内容方将实时数据写入到Kafka消息行列中,然后由数据侧经过Flink使命消费内容方供给的数据源,接着对消费的数据进行分流处理(这儿的过程和Flink SQL写Hive的过程类似),每分钟进行存储到HDFS(MapReduce使命需求核算和重跑HDFS数据),然后经过MapReduce使命将HDFS上的这些日志数据生成Hive所需求格局,最终将这些Hive格局数据文件加载到Hive表中。完结Kafka数据到Hive的即席剖析功能,详细完结流程细节如下图所示:

Kafka实时数据即席查询应用与实践

详细中心完结过程如下:

  • 消费内容方Topic实时数据;

  • 生成数据预处理战略;

  • 加载数据;

  • 运用Hive SQL对Kafka数据进行即席剖析。

2.3.1 消费内容方Topic实时数据

编写消费Topic的Flink代码,这儿不对Topic中的数据做逻辑处理,在后边统一交给MapReduce来做数据预处理,直接消费并存储到HDFS上。详细完结代码如下所示:


public class Kafka2Hdfs {
    public static void main(String[] args) {
        // 判别参数是否有用
        if (args.length != 3) {
            LOG.error("kafka(server01:9092), hdfs(hdfs://cluster01/data/), flink(parallelism=2) must be exist.");
            return;
        }
        // 初始化Kafka连接地址和HDFS存储地址以及Flink并行度
        String bootStrapServer = args[0];
        String hdfsPath = args[1];
        int parallelism = Integer.parseInt(args[2]);
        // 实例化一个Flink使命目标
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000);
        env.setParallelism(parallelism);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // Flink消费Topic中的数据
        DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<>("test_bll_topic", new SimpleStringSchema(), configByKafkaServer(bootStrapServer)));
        // 实例化一个HDFS存储目标
        BucketingSink<String> sink = new BucketingSink<>(hdfsPath);
        // 自界说存储到HDFS上的文件名,用小时和分钟来命名,便利后边算战略
        sink.setBucketer(new DateTimeBucketer<String>("HH-mm"));
        // 设置存储HDFS的文件大小和存储文件时刻频率
        sink.setBatchSize(1024 * 1024 * 4);
        sink.setBatchRolloverInterval(1000 * 30);
        transction.addSink(sink);
        env.execute("Kafka2Hdfs");
    }
    // 初始化Kafka目标连接信息
    private static Object configByKafkaServer(String bootStrapServer) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootStrapServer);
        props.setProperty("group.id", "test_bll_group");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

留意事项:

  • 这儿咱们把时刻窗口设置小一些,每30s做一次Checkpoint,假如该批次的时刻窗口没有数据过来,就生成一个文件落地到HDFS上;

  • 另外,咱们重写了Bucketer为DateTimeBucketer,逻辑并不杂乱,在原有的办法上加一个年-月-日/时-分的文件生成途径,例如在HDFS上的生成途径:xxxx/2022-02-26/00-00。

详细DateTimeBucketer完结代码如下所示:

public class DateMinuteBucketer implements Bucketer<String> {
    private SimpleDateFormat baseFormatDay = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat baseFormatMin = new SimpleDateFormat("HH-mm");
    @Override
    public Path getBucketPath(Clock clock, Path basePath, String element) {
        return new Path(basePath + "/" + baseFormatDay.format(new Date()) + "/" + baseFormatMin.format(new Date()));
    }
}

2.3.2 生成数据预处理战略

这儿,咱们需求对落地到HDFS上的文件进行预处理,处理的逻辑是这样的。比方,现在是2022-02-26 14:00,那么咱们需求将当天的13:55,13:56,13:57,13:58,13:59这最近5分钟的数据处理到一起,并加载到Hive的最近5分钟的一个分区里边去。那么,咱们需求生成这样一个逻辑战略集合,用HH-mm作为key,与之最近的5个文件作为value,进行数据预处理兼并。详细完结代码过程如下:

  • 过程一:获取小时循环战略;

  • 过程二:获取分钟循环战略;

  • 过程三:判别是否为5分钟的倍数;

  • 过程四:对分钟级别小于10的数字做0补齐(比方9补齐后变成09);

  • 过程五:对小时级别小于10的数字做0补齐(比方1补齐后变成01);

  • 过程六:生成时刻规模;

  • 过程七:输出成果。

其间,主要的逻辑是在生成时刻规模的过程中,依据小时和分钟数的不同情况,生成不同的时刻规模,并输出成果。在生成时刻规模时,需求留意前导0的处理,以及特殊情况(如小时为0、分钟为0等)的处理。最终,将生成的时刻规模输出即可。

依据上述过程编写对应的完结代码,生成当天所有日期命名规则,预览部分成果如下:

Kafka实时数据即席查询应用与实践

需求留意的是,假如发生了第二天00:00,那么咱们需求用到前一天的00-00=>23-59,23-58,23-57,23-56,23-55这5个文件中的数据来做预处理。

2.3.3 加载数据

在完结2.3.1和2.3.2里边的内容后,接下来,咱们能够运用Hive的load指令直接加载HDFS上预处理后的文件,把数据加载到对应的Hive表中,详细完结指令如下:

-- 加载数据到Hive表
load data inpath '<hdfs_path_hfile>' overwrite into table xxx.table partition(day='2022-02-26',hour='14',min='05')

2.3.4 即席剖析

之后,咱们运用Hive SQL来对Kafka数据进行即席剖析,示例SQL如下所示:

-- 查询某5分钟分区数据
select * from xxx.table where day='2022-02-26' and hour='14' and min='05'

2.4 Flink SQL与 Flink DataStream怎么挑选

Flink SQL 和 Flink DataStream 都是 Flink 中用于处理数据的中心组件,咱们能够依据自己实践的事务场景来挑选运用哪一种组件。

Flink SQL 是一种依据 SQL 言语的数据处理引擎,它能够将 SQL 查询句子转换为 Flink 的数据流处理程序。相比于 Flink DataStream,Flink SQL 愈加易于运用和保护,一起具有更快的开发速度和更高的代码复用性。Flink SQL 适用于需求快速开发和部署数据处理使命的场景,比方数据仓库、实时报表、数据清洗等。

Flink DataStream API是Flink数据流处理规范API,SQL是Flink后期版别供给的新的数据处理操作接口。SQL的引入为提高了Flink运用的灵活性。能够以为Flink SQL是一种经过字符串来界说数据流处理逻辑的描绘言语。

因此,在挑选 Flink SQL 和 Flink DataStream 时,需求依据详细的事务需求和数据处理使命的特点来进行挑选。假如需求快速开发和部署使命,能够挑选运用 Flink SQL;假如需求进行更为深入和定制化的数据处理操作,能够挑选运用 Flink DataStream。一起,也能够依据实践情况,结合运用 Flink SQL 和 Flink DataStream 来完结杂乱的数据处理使命。

三、 总结

在实践使用中,Kafka实时数据即席查询能够用于多种场景,如实时监控、实时报警、实时计算、实时剖析等。详细使用和实践中,需求留意以下几点:

  • 数据质量:Kafka实时数据即席查询需求保证数据质量,防止数据重复、丢失或过错等问题,需求进行数据质量监控和调优。

  • 体系杂乱性:Kafka实时数据即席查询需求涉及到多个体系和组件,包含Kafka、数据处理引擎(比方Flink)、查询引擎(比方Hive)等,需求对体系进行装备和管理,增加了体系的杂乱性。

  • 安全性:Kafka实时数据即席查询需求加强数据安全性保障,防止数据走漏或数据篡改等安全问题,做好Hive的权限管控。

  • 性能优化:Kafka实时数据即席查询需求对体系进行性能优化,包含优化数据处理引擎、查询引擎等,提高体系的性能和功率。

参阅:

  1. github.com/apache/flin…

  2. flink.apache.org/