我们好,我是老兵。

Flink依据流编程模型,内置了许多强壮功用的算子,能够协助咱们快速开发使用程序。

作为Flink开发内行,大多算子的写法和场景想来已是了然于胸,但是运用进程常常会有一些小小的问题:

  1. 部分算子长时刻未用,忘了用法。。
  2. 某些场景挑选什么算子?怎么挑选?含糊不清。。

工欲善其事,必先利其器!快速高效的运用适宜的算子开发程序,往往能够达到事半功倍的效果。

想着好记忆不如烂笔头这个道理,特此收拾一份常见的Flink算子开发手册!!也作为自己的作业笔记。欢迎我们保藏~

1 DataStream API

Flink DataStream API让用户灵敏且高效编写Flink流式程序。首要分为DataSource模块、Transformation模块以及DataSink模块。

1万字Flink算子大全手册:实战 + 原理干货

  • Source模块界说数据接入功用,包含内置数据源和外部数据源。
  • Transformation模块界说DataStream数据流各种转化操作。
  • Sink模块界说数据输出功用,存储成果到外部存储介质中。

履行环境: StreamExecutionEnvironment
体系模块 :
DataSouce、Transformation和DataSink

2 DataSource 输入

DataSource输入模块界说了DataStream API中的数据输入操作,Flink输入数据源分为内置数据源第三方数据源两种类型。

  • 内置数据源包含文件Socket网络端口以及调集类型数据,不需要引进其他依靠库,在Flink体系内部现已完结。
  • 第三方数据源界说了Flink和外部体系数据交互逻辑,例如Apache Kafka ConnectorElastic Search Connector等。
  • 一起用户能够自界说数据源。

1万字Flink算子大全手册:实战 + 原理干货

2.1 readTextFile、readFile算子

支撑读取文本文件到Flink体系,转化成DataStream数据集。

  • readTextFile算子直接读取体系文本文件(.log|.txt …)
  • readFile算子能够指定InputFormat读取特定数据类型的文件(包含CSV、JSON或许自界说InputFormat)
// 读取文本文件
val textInputStream = env.readTextFile(
   "/data/example.log")
// 指定InputFormat,读取CSV文件
val csvInputStream = env.readFile(
   // 能够自界说类型(InputFormat)
   new CsvInputFormat[String] (   
      new Path("/data/example.csv")
   ) {
     override def fillRecord(out: String,
       onbjects: Array[AnyRef]: String) = {
         return null
       }
   }, "/data/example.csv"
)

2.2 Socket算子

支撑从Socket端口读取数据,转化成DataStream算子。

  • 算子参数:Ip地址、端口、delimiter字符串切割符、最大重试次数maxRetry
  • maxRetry首要供给使命失利重连机制。当设定为0时,Flink使命直接停止。
  • Unix环境下,履行nc -lk [:port] 启动网络服务
// Flink程序读取Socket端口(9999)数据
val socketDataStream = 
   env.socketTextStream("localhost", 9999)

2.3 调集算子

支撑操作Flink内置调集类(Collection),转化成DataStream。

  • 支撑JavaScala算子常见调集类
  • 实质是将本地调集数据分发到长途履行;适用于本地测验,留意数据结构类型的共同性
// fromElements元素调集转化
val elementDataStream = 
 env.fromElements(
   Tuple2('aa', 1L),Tuple2('bb', 2L)
 )
 // fromCollection数组转化(Java)
 String[] collections = new String[] {
   "aa", "bb"
 };
 DataStream<String> collectionDatastream =
  env.fromCollection(
   Arrays.asList(collections)
  );
  // List列表转化(Java)
  List<String> arrays = new ArrayList<>();
  arrays.add("aa")
  arrays.add("bb")
  DataStream<String> arrayDataStream = 
   env.fromCollection(arrays)

2.4 外部数据源算子

支撑从第三方数据源体系读取数据,转化成DataStreams算子。

  • 常见外部数据源算子: Hadoop FileSystem、ElasticSearch、 Apache Kafka、RabbitMQ等
  • 运用时需要在Maven环境中增加jar包依靠(pom)
// Maven配置
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-1.2_2.12</artifactId>
  <version>1.9.1</version>
</dependency>
// 读取Kafka数据源(Java)
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", "localhost:9092");
...
DataStream<String> kafkaStream = 
  env.addSource(
    new FlinkKafkaStream010<> (
      "topic-1",
      new SimpleStringSchema(),
      properties
    )
  )

2.5 自界说数据源算子

支撑完结内置的Function相关接口,自界说数据源。

详细的内置办法包含但不限于:

  • SourceFunction接口
  • ParallelSourceFunction接口
  • RichParallelSourceFunction类

后续再经过env的addSource()办法增加,详细完结不展开。

3 DataStream转化

Flink对若干个DataStream操作生成新的DataStream,该进程被称为Transformation

1万字Flink算子大全手册:实战 + 原理干货

Flink程序中大大都逻辑均在Transformation进程中完结,包含转化过滤排序衔接关联挑选聚合等操作。

留意和Spark中transformation的区别。

1万字Flink算子大全手册:实战 + 原理干货

Flink中DataStream转化能够分为几种类型:

  • Single DataStream: 单个DataStream数据集元素处理逻辑
  • Multi DataStream: 多个DataStream数据集元素处理逻辑
  • 物理分区:数据集并行度和数据分区处理

3.1 Map算子(#Single)

对数据会集每个元素进行转化操作,生成新DataStream。

1万字Flink算子大全手册:实战 + 原理干货

  • 底层为MapFunction算子。经过调用map函数,对每个元素履行操作。
  • 常用于数据清洗、核算和转化等。
val inputStream = env.fromElements(
  ("aa", 1), ("bb", 2), ("cc", 3)
)
// 第一种写法: map操作,完结每个元素 + 1
val mapStream1 = inputStream.map(
  t => (t._1, t.2 + 1)
)
// 第二种写法: 指定MapFunction
val mapStream2 = inputStream.map(
 new MapFunction[(String, Int), (String, Int)] {
   override def map(t: (String, Int)): 
     (String, Int) = {
       (t._1, t._2 + 1)}
 }
)

3.2 FlatMap算子(#Single)

支撑对数据会集一切元素转化成多个元素,生成新DataStream。

1万字Flink算子大全手册:实战 + 原理干货

val flatDataStream = env.fromCollections()
val resultStream = flatDataStream.flatMap{
   line => line.split(",")
}

3.3 filter算子(#Single)

支撑对数据集进行过滤挑选,生成新的DataStream

1万字Flink算子大全手册:实战 + 原理干货

// 通配符写法
val filterDataStream = dataStream.fliter {
 _ % 2 == 0
}
// 指定运算符表达式
val filterDS = dataStream.filter(
    x => x % 2 == 0
)

3.4 keyBy算子(#Single)

依据指定Key对DataStream数据集分区,生成新的KeyedStream

1万字Flink算子大全手册:实战 + 原理干货

  • 相同Key值的数据归并到同一分区
  • 类似于Spark中的groupByKey
val inputStream = env.fromElements(
 ("aa", 11), ("aa", 22), ("bb", 33)
)
// 依据第一个字段作为key分区
// 转化为KeyedStream[(String, String), Tuple]
val keyedStream: inputStream.keyBy(0)

3.5 reduce算子(#Single)

支撑对输入KeyedSteam依据reduce()聚合,生成新的DataStream

1万字Flink算子大全手册:实战 + 原理干货

  • 依据key分区聚合构成KeyedStream
  • 支撑运算符和自界说reduceFunc函数
val inputStream = env.fromElements(
 ("aa", 11), ("bb", 33), ("cc", 22), ("aa", 21)
)
// 指定第一个字段分区key
val keyedStream = inputStream.keyBy(0)
// 对第二个字段进行累加求和
val reduceDataStream = keyedStream.reduce {
  (t1, t2) => (t1._1, t1._2 + t2._2)
}

自界说Reduce函数,需要完结匿名类。

val reduceDataStream = keyedStream.reeduce(
  new ReduceFunction[(String, Int)] {
     override def reduce(t1: (String,Int),
        t2: (String, Int)): (String, Int) = {
           (t1._1, t1._2 + t2._2)
        }
  }
)

3.6 aggregations算子(#Single)

DataStream基础聚合算子,经过输入KeyedStream进行聚合生成新的DataStream

1万字Flink算子大全手册:实战 + 原理干货

  • 依据指定字段聚合,可自界说聚合逻辑
  • 底层封装了sum、min、max等函数
val inputStream = env.fromElements(
  (1, 7), (2, 8), (3, 11), (2, 3)
)
// 指定第一个字段分区key
val keyedStream:
[(Int, Int), Tuple] = inputStream.keyBy(0)
// 第二个字段sum核算
val sumStream = keyedStream.sum(1)
// 终究输出成果
sumStream.print()

3.7 Connect兼并算子(#Multi)

兼并多种类型数据集,并保存原数据集的数据类型,生成ConnectedStream

1万字Flink算子大全手册:实战 + 原理干货

  • 同享状况数据,可相互获取数据集状况
  • 某些场景下可代替join算子,变相完结flink双流join功用
// 创立不同数据类型数据集
val stream1 = env.fromElements(
  ("aa", 3), ("bb", 4), ("cc", 11), ("dd", 22)
)
val stream2 = env.fromElements(
 (1, 2, 11, 8)
)
// 衔接数据集
// 回来[(String, Int), Int]
// 类似: [("aa", 3),1]
val connectedStream = stream1.connect(stream2)

3.8 Connect算子—CoMap(#Multi)

ConnectedStream数据流的Map功用算子,操作兼并数据集一切元素

  • 界说CoMapFunction对象,参数为输入数据类型、输出数据类型和mapFunc
  • 子map函数多线程交替履行,生成终究的兼并目标数据集
// 上文Connected操作后构成的数据流
// 参数: 第1个为stream1类型;第2个为stream2类型;第3个为stream3类型
val resultStream = connnectedStream.map(
  new CoMapFunction[(String, Int), Int, (Int, String)] {
    // 界说第一个数据集处理逻辑,输入值为stream1
    override def map1(in1: (String, Int)): (Int, String) = {
       (in1._2, in1._1)
    }
    // 界说第二个数据集处理逻辑,输入值为stream2
    override def map2(in2: Int): (Int,String)={
       (in2, "default")
    }
)

3.9 Connect算子—CoFlatMap(#Multi)

ConnectedStream数据流的flatmap功用算子

在flatmap()办法中指定CoFlatMapFunction,并别离完结flatmap1()和flatmap2()函数。

val resultStream2 = connectedStream.flatMap(
  new CoFlatMapFunction[(String, Int), Int, (String, Int, Int)] {
     // 举例: 函数中同享变量,完结两个数据调集并
     var value = 0
     // 界说第1个数据集处理函数
     override def flatMap1(in1: (String, Int),
      collect: Collector[(String, Int, Int)]): Unit = {
        collect.collect((in1._1, in1._2, value))
      }
  }
  // 界说第2个数据集处理函数
  override def flatMap2(in2: Int, collect: Collector[(String, Int, Int)]): Unit = {
    value = in2
  }
)

3.10 Union算子(#Multi)

将两个或许多个数据调集并,生成与输入数据集类型共同的DataStream

1万字Flink算子大全手册:实战 + 原理干货

  • 输入数据集的数据类型要求共同
  • 输出数据集的数据类型和输入数据共同
  • 留意和connect算子的区别
val stream1 = env.fromElements(
  ("aa", 3), ("bb", 22), ("cc", 45)
)
val stream2 = env.fromElements(
 ("dd", 23), ("ff", 21), ("gg", 89)
)
val stream3 = ....
// 兼并数据集
val unionStream = stream1.union(stream2)
val unionStream2 = stream1.union(
  stream2, stream3
)

3.11 Split算子(#Multi)

将DataStream数据集按照条件拆分,转化成两个数据集的DataStream算子

1万字Flink算子大全手册:实战 + 原理干货

  • 将接入的数据路由到多个输出数据集,在split函数中界说拆分逻辑
  • 能够被看作是union的逆向完结
val stream1 = env.fromElements(
 ("aa", 3), ("bb", 33), 
 ("cc", 56),("aa", 23), ("cc", 67)
)
// 依据第二个字段的奇偶性标记数据(切分)
val splitStream = stream1.split(
 v => if (v._2 % 2 == 1 Seq("even") 
          else Seq("odd"))
)

3.12 Select算子(#Multi)

Select挑选算子,经过条件挑选数据会集元素,生成新的DataStream

1万字Flink算子大全手册:实战 + 原理干货

// 挑选偶数数据
val evenStream = splitedStream.select("even")
//挑选一切数据
val allStream = splitedStream.select("even", "odd")

3.13 window窗口算子(时刻机制)

Flink的窗口算子是实时核算的中心算子,常用于某固定时刻内指标核算

1万字Flink算子大全手册:实战 + 原理干货

1)窗口API

Flink供给了高级窗口API算子,封装底层窗口操作,包含窗口类型、触发器、侧输出等。一起依据上游输入Stream流分为Non-Keyed和Keyed两种类型。

  • Non-keyed(上游为Non-KeyedStream) 直接调用windowAll(),获取全局核算
val inputStream: DataStream = ...
// 当传入为KeyedStream时,调用window()函数
inputStream.keyBy(0).window(new WindowFunc(...))
// 当传入为不做处理的Non-Keyed输入Stream流
// 直接运用windowAll()全局核算
inputStream.windowAll(new WindowFunc(...))
  • keyed(上游为KeyedStream类型)
    调用DataStream的内置window()
stream.keyBy(..//keyed输入流.)
   .window(..//窗口类型.)
   .trigger(.//触发器<可选>..)
   .evictor(.//剔除器<可选>.)
   .allowdedLateness(.//推迟处理机制.)
   .sideOutputLateDate(.//侧输出.)
   .reduce/fold.aggregate/apply(.//核算函数.)

2)窗口类型

依据窗口的分配方法分为: 翻滚滑动会话全局等,别离支撑不同窗口流动方法和规模。

一起支撑事件时刻和处理时刻数据流。

  • Tumbling Window Join (翻滚窗口)

    1万字Flink算子大全手册:实战 + 原理干货

  • Sliding Window Join (滑动窗口)

    1万字Flink算子大全手册:实战 + 原理干货

  • Session Widnow Join(会话窗口)

    1万字Flink算子大全手册:实战 + 原理干货

以十分钟时刻滑动窗口核算案例说明:

val tumblingStream = inputStream
  .keyBy(0)
  .window(
    TumblingEventTimeWindows.of(
    Time.seconds(10))
  ).process(...)

4 DataSink输出

Flink读取数据源,经过系列Transform操作后,成果一般转存至外部存储介质或许下游,即Flink的DataSink进程。

1万字Flink算子大全手册:实战 + 原理干货

Flink将外部存储的衔接逻辑封装在Connector衔接器中,常见的有:

  • Apache Kafka
  • ElasticSearch
  • Hadoop FileSystem
  • Redis
  • 文件体系、端口

4.1 文件|端口

支撑文件、客户端、Socket网络输出,为Flink内置算子,不需要依靠三方库

常见有writeAsCSV(本地文件)、writeToSocket(Socket网络)

 // 本地csv
inputStream.writeAsCsv(
 "file://path/xx.csv", WriteMode.OVERWRITE
)
// Socket网络
inputStream.writeToSocket(
 host, post, new SimpleStringSchema()
)

4.2 外部第三方

依据SinkFunction界说,需要引进外部三方依靠库,设置三方体系参数

val dataStream = ...
// 界说FlinkKafkaProducer
val kafkaProducer = new FlinkKafkaProducer011[Sting] (
  "localhost:9092", //kafka broker list衔接
  "xxx-topic", // kafka topic
  new SimpleStringSchema() //序列化
)
// 增加SinkFunc
dataStream.addSink(kafkaProducer())

5 总结

Flink内置的算子库种类全、功用强壮,熟练掌握算子的运用方法和场景使用,是实时核算的必备技术。

后面还会持续更新此系列,欢迎增加我的个人微信: youlong525,一起学习交流~

未完待续。。

》》》更多好文,欢迎重视公众号: 大数据兵工厂