敞开生长之旅!这是我参与「日新计划 12 月更文挑战」的第4天 /post/716729…

四种Api形式

Flink核心API

  • 初级API(Stateful Stream Processing):供给了对时间和状况的细粒度操控,简洁性和易用性较差, 首要运用在一些复杂事情处理逻辑上。
  • 中心API(DataStream/DataSet API):首要供给了针对流数据和批数据的处理,是对初级API进行了一 些封装,供给了filter、sum、max、min等高档函数,简略易用。
  • Table API:一般与DataSet或许DataStream严密关联,能够经过一个DataSet或DataStream创立出 一个Table,然后再运用相似于filter, join,或许 select这种操作。最后还能够将一个Table目标转成 DataSet或DataStream。
  • SQL:Flink的SQL底层是根据Apache Calcite,Apache Calcite完成了规范的SQL,运用起来比其他 API更加灵敏,由于能够直接运用SQL句子。Table API和SQL能够很容易地结合在一块运用,由于它 们都回来Table目标。

Api图示

Flink核心API

DataStream Api

DataSource

DataSource是程序输入的数据源,Flink自身内置了非常多的数据源,也支撑自定义数据源,现在供给的内置数据源,在企业中开发肯定是足够的

  • 根据socket

  • 根据Collection

  • 第三方数据源(不局限于以下三个)

    • Kafka
    • RabbitMQ
    • NiFi

针对source的这些Connector中,工作中最常用的便是kafka

恢复机制

程序过错,机器故障、网络故障时,Flink有容错机制能够恢复并持续运行。

针对Flink供给的接口,假如敞开了checkpoint,Flink能够供给容错性保证

  • Socket:at most once
  • Collection:exactly once
  • Kafka0.10以上:exactly once
根据collection的source
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecuti
 //运用collection调集生成DataStream
 DataStreamSource<Integer> text = env.fromCollection(Arrays.asList(1, 2
 text.print().setParallelism(1);
 env.execute("StreamCollectionSourceJava"); 

Transformation

transformation是Flink程序的核算算子,担任对数据进行处理,Flink供给了大量的算子,这点上和Spark相似

算子 阐明
map 输入一个元素进行处理,回来一个元素
flatMap 输入一个元素进行处理,回来多个元素
filter 对数据进行过滤,契合条件留下
keyBy 根据key分组,相同key数据放入同一个分区
reduce 对当时元素和上一次的成果进行聚合操作
aggregations sum(),min(),max()等
union 兼并多个流,多个流数据类型必须共同
connect 只能衔接两个流,两个流的数据类型能够不同
split 把一个流分为多个流
shuffle 对数据进行随机分区
rebalance 对数据进行再平衡,从头分区,消除数据歪斜
rescale 重分区
partationCustom 自定义分区

剖析几个要点的算子:

  • union

    • 多个流兼并,可是数据类型必须共同
    • 处理规矩也必须共同
  • connect

    • 两个流被connect后,指示放入同一个流,内部依然坚持各自的数据和方法不改变,彼此独立。
    • connect办法会回来connectedStream,在此流中需求适用CoMap,CoFlatMap,相似正常流的map和flatMap
  • split

    • 把一个流切分红多个流
    • 切分后的流不能再分
    • 场景:将一份数据切分红多份,便于对每一份数据进行不同的逻辑处理
    • 该办法现已过时,官方不引荐运用,官方推进运用side output办法
  • side output

    • 进行切分后的流还能够二次切分
union与connect

Flink核心API

union能够衔接多个流,最后汇总成一个流,流里边的数据运用相同的核算规矩

connect值能够衔接2个流,最后汇总成一个流,可是流里边的两份数据彼此还是独立的,每一份数据使

用一个核算规矩

流切分

假如是只需求切分一次的话运用split或许side output都能够

假如想要切分屡次,就不能运用split了,需求运用side output

Flink核心API

分区算子
  • Random:随机分区
  • rebalance:对数据集进行再平衡,从头分区,消除数据歪斜
  • rescale:重分区
  • custom partition:自定义分区

random:随机分区,它表明将上游数据随机发送到下流算子实例的每个分区,代码层面是调用shuffle办法,shuffle底层关于的是shufflePartitioner类,这个类有selectChannel函数,这个函数会进行核算数据会发送到哪个分区,里边调用的是random.nextInt办法,所以该算子是随机分区的。

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
 return random.nextInt(numberOfChannels);
}

rebalance:从头平衡分区(循环分区),它会对数据进行再平衡,未每个分区创立相同的负载,实际上便是经过循环的方法给下流算子的每个分区分配数据,在代码层面调用的是rebalance办法,查看源码,该办法调用的是RebalancePartitioner这个类,里边有一个setup函数和selectChannel函数,setup会根据分区数初始化一个随机值nextChannelToSentTo,然后selectChannel函数会运用该随机值+1和分区数取模,把核算的值赋给该变量,后面以此类推,完成向多个下流算子实例的多个分区循环发送数据,这样每个分区获取的数据根本共同。

public void setup(int numberOfChannels) {
 super.setup(numberOfChannels);
 nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels)
}
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
 nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
 return nextChannelToSendTo;
}

rescale:重分区

查看源码,rescale底层对应的是RescalePartitioner这个类里边有一个selectChannel函数,这里边的numberOfChannels是分区数量,其实也能够认为是咱们所说的算子的并行度,由于一个分区是由一个线程担任处理的,它们两个是一一对应的。

public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
 if (++nextChannelToSendTo >= numberOfChannels) {
 nextChannelToSendTo = 0;
 }
 return nextChannelToSendTo;
}
* The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downs
* For example, if the upstream operation has parallelism 2 and the downstream 
* has parallelism 4, then one upstream operation would distribute elements to 
* downstream operations while the other upstream operation would distribute to
* two downstream operations. If, on the other hand, the downstream operation h
* 2 while the upstream operation has parallelism 4 then two upstream operation
* distribute to one downstream operation while the other two upstream operatio
* distribute to the other downstream operations.

假如上游操作有2个并发,而下流操作有4个并发,那么上游的1个并发成果循环分配给下流的2个并发操 作,上游的别的1个并发成果循环分配给下流的别的2个并发操作。另一种状况,假如上游有4个并发操作而下流有2个并发操作,那么上游的其中2个并发操作的成果会分配给下流的一个并发操作,而上游的别的2个并发操作的成果则分配给下流的别的1个并发操作。

注意:rescale与rebalance的区别是rebalance会发生全量重分区,而rescale不会。

broadcast:播送分区,将上游算子实例中的数据输出到下流算子实例的每个分区中,合适用于大数据集

查看源码,broadcast底层对应的是BroadcastPartitioner这个类.看这个类中selectChannel函数代码的注释,提示播送分区不支撑挑选Channel,由于会输出数据到下流的每个Channel中,便是发送到下流算子实例的每个分区中

custom partition:自定义分区,能够按照自定义规矩完成自定义分区需求完成Partitioner接口

图解几个分区算子

Flink核心API

Flink核心API

Flink核心API

具体的Api参考官网案例

DataSink

DataSink是 输出组件,担任把核算好的数据输出到其它存储介质中。

一般会输出到音讯队列或许数据库,print办法一般适用于测试

Flink内置Connectors

容错保障

Redis:at least once

Kafka:at least once/exactly once

DataSet Api

DataSet也可分为三块来进行剖析

  • DataSource
  • Transformation
  • Sink

DataSource

针对DataSet批处理而言,运用最多的是读取HDFS数据。

  • 根据调集

    • fromCollection
  • 根据文件

    • readTextFile(path),读取hdfs中的数据文件

Transformation

常见算子如下:

算子 阐明
map 输入一个元素进行处理,回来一个元素
mapPartation 相似map,一次处理一个分区的数据
flatMap 输入一个元素进行处理,能够回来多个元素
filter 对数据进行过滤,契合条件的数据会被留下
reduce 对当时元素和上一次的成果进行聚合操作
aggregation sum(),min(),max()等

用法与DataStream相关的算子相似

DataSet一些常用的算子:

算子 解释
distinct 回来数据集中去重之后的元素
join 内衔接
outerjoin 外衔接
cross 获取两个数据集的笛卡尔积
union 回来多个数据集的总和,数据类型需求共同
first-n 获取调集中的前N个元素
  • distinct:去重算子

  • join:内衔接,衔接两份数据,相似sql join

  • outer join:相似SQL左右衔接

  • cross:获取笛卡尔积

  • union:回来两个数据集的总和,数据类型需求共同

  • first-n:获取调集中前N个元素

    • 这个比较常用,实际工作中咱们经常会核算TopN的产品信息,或许商户信息等

DataSink

Flink针对DataSet供给了一些现已完成好的数据目的地

其中最常见的是向HDFS中写入数据:

  • writeAsText():将元素以字符串方法逐行写入,这些字符串经过调用每个元素的toString()办法来获取

  • writeAsCsv():将元组以逗号分隔写入文件中,行及字段之间的分隔是可配置的,每个字段的值来自目标

    的toString()办法

  • print:打印每个元素的toString()办法的值,测试时用

Table Api & Flink SQL

Table API和SQL的由来:

Flink针对规范的流处理和批处理供给了两种联系型API,Table API和SQL。

  • Table API答应用户以一种很直观的方法进行select 、fifilter和join操作。
  • Flink SQL根据 Apache Calcite完成规范SQL。
  • Flink Table API、SQL和Flink的DataStream API、DataSet API是严密联系在一起的。
  • Table API和SQL是一种联系型 API,用户能够像操作 Mysql 数据库表相同的操作数据,而不需求写代码,更不需求手工的对代码进行调优

依靠信息:

flink-table-api-java-bridge_2.12
版别:1.11.0
flink-table-planner-blink_2.12
版别:1.11.0

Table API和SQL经过join API集成在一起,这个join API的中心概念是Table,Table能够作为查询的输入和输出。

运用案例

// 获取TableEnvironment
 EnvironmentSettings sSettings = EnvironmentSettings.newInstance().use
 TableEnvironment sTableEnv = TableEnvironment.create(sSettings);
// 创立输入表
 sTableEnv.executeSql("" +
 "create table myTable(\n" +
 "id int,\n" +
 "name string\n" +
 ") with (\n" +
 "'connector.type' = 'filesystem',\n" +
 "'connector.path' = 'D:\data\source',\n" +
 "'format.type' = 'csv'\n" +
 ")");
// 书写sql句子
Table result = sTableEnv.sqlQuery("select id,name from myTable")
// 执行并打印
result.execute().print()

DataStream、DataSet与Table互转

Table API和SQL能够很容易的和DataStream和DataSet程序集成到一块。

经过TableEnvironment ,能够把 DataStream 或 者 DataSet注册为Table。

这样就能够运用Table API 和 SQL查询了。

1.DataStream创立表

  • 创立view视图
  • 创立table目标
 //获取StreamTableEnvironment
 StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExec
 EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().us
 StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssE
 //获取DataStream
 ArrayList<Tuple2<Integer, String>> data = new ArrayList<>();
 data.add(new Tuple2<Integer,String>(1,"jack"));
 data.add(new Tuple2<Integer,String>(2,"tom"));
 data.add(new Tuple2<Integer,String>(3,"mick"));
 DataStreamSource<Tuple2<Integer, String>> stream = ssEnv.fromCollecti
 //第一种:将DataStream转换为view视图
 ssTableEnv.createTemporaryView("myTable",stream,$("id"),$("name"));
 ssTableEnv.sqlQuery("select * from myTable where id > 1").execute().print
 //第二种:将DataStream转换为table目标
 Table table = ssTableEnv.fromDataStream(stream, $("id"), $("name"));
 table.select($("id"), $("name"))
 .filter($("id").isGreater(1))
 .execute()
 .print();

2.DataSet创立表

新的Blink引擎不支撑这种操作,老版别支撑,现已不常用了。

将 Table 转换为 DataStream 或许 DataSet 时,你需求指定生成的 DataStream 或许 DataSet 的数据类型

通常最方便的挑选是转换成 Row

  • Row: 经过角标映射字段,支撑恣意数量的字段,支撑 null 值,无类型安全(type-safe)查看
  • POJO: Java中的实体类,这个实体类中的字段称号需求和Table中的字段称号坚持共同,支撑恣意数量的字段,支撑null值,有类型安全查看。
  • Case Class: 经过角标映射字段,不支撑null值,有类型安全查看。
  • Tuple: 经过角标映射字段,Scala中约束22个字段,Java中约束25个字段,不支撑null值,有类型安全查看。
  • Atomic Type: Table 必须有一个字段,不支撑 null 值,有类型安全查看。

3.将表转换成 DataStream

两种形式

  • Append Mode:仅附加,不更新
  • Retract Mode:能够始终运用此形式,它运用一个Boolean标识来编码INSERT和DELETE更改