flink常见的实时去重计划:
状况后端、布隆过滤器、bitmap都完成过了,本文完成外部数据库的示例。 外部数据库这儿选用redis,因为是key-value数据库,MySQL的话选用唯一键完成也是一样的效果。
完成思路
从数据源读取数据需求去重,去重选用redis进行,flink的app以及redis的精确性问题能够参阅flink的端到端精确一次语义,此处不是要点。 数据流通从source到redis,redis的数据类型选用调集(set),具有主动去重的特性。
maven依赖
需求增加redisSink的maven依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
redisSink
官方文档:bahir.apache.org/docs/flink/…
官方示例:
public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>>{
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
}
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build();
DataStream<String> stream = ...;
stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
能够看到需求完成RedisMapper
接口,而且重写三个办法。
示例代码运用的是redis的hash数据类型,而咱们需求运用set数据类型。从官方给出的信息(如下图)可知,咱们需求运用SADD的redis command。
RedisCommandDescription
第一个结构参数需求供给redis command
(上图所示右半部分);第二个结构参数为additionalKey
参数主要是针对SORTED_SET和HASH结构的,在HASH结构里,这个additionalKey对应hash的key,getKeyFromData
办法得到的数据对应hash的field,getValueFromData
获取的数据对应hash的value。
而set数据类型是不需求供给additionalKey
的,但是需求注意,从source的数据需求插入到同一个set中,那个该set的key需求固定,所以咱们此处需求重写的getKeyFromData
办法应该回来一个固定的字符串作为set的key。
终究完成的接口:
package others;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
/**
* @projectName: wc
* @package: others
* @className: RedisSetMapper
* @author: NelsonWu
* @description: TODO
* @date: 2024/2/25 22:53
* @version: 1.0
*/
public class RedisSetMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
// redis中set需求运用sadd命令增加值
return new RedisCommandDescription(RedisCommand.SADD);
}
@Override
public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
return "FlinkDeduplication"; // set的key
}
@Override
public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
return stringIntegerTuple2.f1.toString(); // 需求注意,这儿只能回来string数据类型
}
}
redis的相关装备这儿就不说了,如果非同一台机器访问redis记得设置密码或许关闭默认的模式,否则flink会抛出异常,以及bind相关的IP。能够参阅下这儿:zhuanlan.zhihu.com/p/28101275
flink application示例
flink的主程序这边比较简单,把处理完的数据发送给redis即可,redis会主动进行去重。 source此处运用的话调集,即有界流,输入1, 2, 33, 1, 3, 33这几个数据。终究在redis中应该只看到4个数据就是对的。
package sink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.util.Collector;
import others.RedisSetMapper;
/**
* @projectName: wc
* @package: sink
* @className: sinkRedisDemo
* @author: NelsonWu
* @description: TODO
* @date: 2024/2/25 22:30
* @version: 1.0
*/
public class sinkRedisDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.fromElements(1, 2, 33, 1, 3, 33);
SingleOutputStreamOperator<Tuple2<String, Integer>> dataStream = source.flatMap(new FlatMapFunction<Integer, Tuple2<String, Integer>>() {
@Override
public void flatMap(Integer value, Collector<Tuple2<String, Integer>> out) throws Exception {
String key = String.valueOf(value);
Tuple2<String, Integer> tupleValue = Tuple2.of(key, value);
out.collect(tupleValue);
}
});
FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
.setHost("172.20.132.197")
.setPort(6379)
.setPassword("redis")
.build();
dataStream.addSink(new RedisSink<Tuple2<String, Integer>>(jedisPoolConfig, new RedisSetMapper()));
env.execute("flink_redis_Deduplication");
}
}
终究执行结果:
参阅
redis装置与装备:zhuanlan.zhihu.com/p/28101275
redis的大数据去重计划:
flink的redisSink以及示例demo: