作者简介: 大数据专业硕士在读,CSDN人工智能范畴博客专家,阿里云专家博主,百度飞桨校领航团团长、飞桨发明营导师,粉丝4w+,➡️个人主页。专注大数据与人工智能常识同享。
大众号: GoAI的学习小屋,免费同享书籍、简历、导图等材料,更有沟通群同享AI和大数据,加群办法大众号回复“加群”或➡️点击链接。
专栏引荐: 现在在写一个CV方向专栏,后期会更新不限于深度学习、OCR、方针检测、图画分类、图画切开等方向,现在活动仅19.9,尽管付费但会长时刻更新且价格便宜,感兴趣的小伙伴能够关注下,有擅长CV的大佬能够联络我协作一同写。➡️专栏地址
学习者福利: 强烈引荐一个优秀AI学习网站,包含机器学习、深度学习等理论与实战教程,十分适宜AI学习者。➡️网站链接。
大数据开发笔记系列:
1️⃣大数据开发笔记(一):HDFS介绍 |
---|
2️⃣大数据开发笔记(二):Yarn散布式集群操作体系 |
3️⃣大数据开发笔记(三):Mapreduce |
4️⃣大数据开发笔记(四):Hive数据库房 |
➡️大数据开发笔记(四):Hive数据库房 |
➡️大数据开发笔记(四):Hive数仓调优 |
5️⃣大数据开发笔记(五):Zookeeper |
6️⃣大数据开发笔记(六):Flume根底学习 |
7️⃣大数据开发笔记(七):Kafka散布式流式处理 |
8️⃣大数据开发笔记(八):Spark归纳总结及Sparksql |
9️⃣大数据开发笔记(九):Flink根底 |
➡️大数据开发笔记(九):Flink归纳学习 |
大数据开发笔记(十):Hbase列存储数据库总结 |
➡️大数据开发笔记(十):Hbase实践 |
一、HDFS
1.根本操作:
查看一切指令 hadoop fs
1、查看一切目录及其文件 hadoop fs -ls /
2、hdfs文件体系创立目录 hadoop fs -mkdir /input(用于测验代码)
2.1、hdfs文件体系创立目录(批量)
hadoop fs -mkdir -p /inout/tmp 在input文件夹内创立tmp文件夹
3、hdfs文件体系创立文件 hadoop fs -touchz /a.txt
4、hdfs文件体系删去文件
hadoop fs -rmr /a.txt
hadoop fs -rmr -skipTrash /a.txt(越过收回站彻底删去)
5.hdfs上传本地文件 (留意:有必要先建好hdfs上目录再put)
hadoop fs -put t.txt /test 将本地文件t.txt上传至hdfs上test文件夹内;
hadoop fs -put /a.txt
6.查看hdfs文件内容
hadoop fs -cat /a.txt
hadoop fs -tail /a.txt (从尾部开端看)
hadoop fs -text /a.txt (查看二进制数据)
7、hdfs下载文件
hadoop fs -get /a.txt .
留意终究有一点,这个.代表下载到本地指令行地点目录;
8、递归删去目录
hadoop fs -rmr /input/tmp
9、查看hdfs文件的巨细
hadoop fs -du -h /b.txt
-du -s或许-du -h
10、查看hdfs文件行数
hadoop fs -cat /b.txt | wc -l
终究是字母l
cat或许text 都能够
实战:
查看集群ip状况 cat /etc/hosts
查看hadoop版别
echo $HADOOP_HOME/
which $HADOOP_HOME/
运转集群脚本
sh -x run.sh
运转run脚本,最好是 -X调试办法
2.HDFS的优缺点有哪些?
(1)HDFS的长处
高容错性
①:数据自动保存多个副本。它经过增加副本的办法,进步容错性
②:某一个副本丢掉今后,它能够自动康复
适宜批处理即就近准则
①:移动核算而非非数据,数据方位暴露给核算机结构
②:本地化,数据不移动,代码(使命)移动。
适宜处理大数据
①:数据规划:能够处理数据规划到达GB、TB、乃至PB等级的数据
②:文件规划:能够处理百万规划以上的文件数量,数量相当之大
可构建在廉价机器上,经过多副本机制,进步可靠性
(2)HDFS的缺点
不适宜低延时数据拜访,比方毫秒级的存储数据,是做不到的。寻址时刻长,适宜读取大文件,低推迟与高吞吐率。
不适宜小文件存储
占用NameNode很多内存,寻找时刻超越读取时刻
不支撑并发写入,文件随机修正
①:一个文件只能有一个写,不答应多个线程一同写
②:仅支撑数据append(追加),不支撑文件的修正
3.HDFS全体架构介绍

1)Client:便是客户端。
(1)文件切分。文件上传HDFS的时分,Client将文件切分红一个一个的Block,然后进行存储;
(2)与NameNode交互,获取文件的方位信息;
(3)与DataNode交互,读取或许写入数据;
(4)Client供给一些指令来办理HDFS,比方发动或许封闭HDFS;
(5)Client能够经过一些指令来拜访HDFS;
2)NameNode:便是Master,它是一个主管、办理者。
(1)办理HDFS的称号空间;namespace
(2)办理数据块(Block)映射信息;
(3)装备副本战略(默许);3
(4)处理客户端读写恳求。
3) DataNode:便是Slave。NameNode下达指令,DataNode履行实践的操作。
(1)存储实践的数据块;
(2)履行数据块的读/写操作。
4) SecondaryNameNode:并非NameNode的热备。当NameNode挂掉的时分,它并不能马上替换NameNode并供给服务。
(1)辅佐NameNode,分管其作业量;
(2)定时兼并Fsimage和Edits,并推送给NameNode;
(3)在紧急状况下,可辅佐康复NameNode。
DataNode的作业机制?
- 一个数据块在DataNode上以文件办法存储在磁盘上,包含两个文件,一个是数据自身,一个是元数据包含数据块的长度,块数据的校验和,以及时刻戳。
- DataNode发动后向NameNode注册,经往后,周期性(1小时)的向NameNode上报一切的块信息。
- 心跳是每3秒一次,心跳回来成果带有NameNode给该DataNode的指令如仿制块数据到另一台机器,或删去某个数据块。假定超越10分钟没有收到某个DataNode的心跳,则以为该节点不行用。
什么是机架感知?什么时分会运用机架感知?
浅显的来说便是NN(NameNode)经过读取咱们的装备来装备各个节点地点的机架信息,数据的流水线仿制和HDFS仿制副本时分。
4.HDFS数据写入(上传)流程是怎样的?

一 HDFS读流程归纳:
- client跟namenode通讯查询元数据,namenode经过查询元数据,找到文件块地点的datanode服务器
- 挑选一台datanode(就近准则,然后随机)服务器,恳求树立socket流
- datanode开端发送数据(从磁盘里边读取数据放入流,以packet为单位来做校验,巨细为64k)
- 客户端以packet为单位接纳,现在本地缓存,然后写入方针文件
详细流程
- Client 主张文件上传恳求,经过 RPC 与 NameNode 树立通讯,NameNode查看方针文件是否已存在,父目录是否存在,回来是否能够上传;
- Client 恳求第一个 block 该传输到哪些 DataNode 服务器上;
- NameNode 依据装备文件中指定的备份数量及副本放置战略进行文件分配,回来可用的 DataNode 的地址,如A,B,C
- Client 恳求 3 台 DataNode 中的一台 A 上传数据(实质上是一个 RPC 调用,树立 pipeline),A 收到恳求会继续调用 B,然后 B 调用 C,将整个pipeline 树立完结,后逐级回来 client;
- Client 开端往 A 上传第一个 block(先从磁盘读取数据放到一个本地内存缓存),以 packet 为单位(默许 64K),A 收到一个 packet 就会传给 B,B 传给 C;A 每传一个 packet 会放入一个应对行列等候应对
- 数据被切开成一个个 packet 数据包在 pipeline 上顺次传输,在pipeline 反方向上,逐一发送 ack(指令正确应对),终究由 pipeline中第一个 DataNode 节点 A 将 pipeline ack 发送给 client;
- 当一个 block 传输完结之后,client 再次恳求 NameNode 上传第二个block 到服务器
5.HDFS数据**读取(下载)**流程是怎样的?

HDFS写流程归纳
- 客户端跟namenode通讯恳求上传文件,namenode查看方针文件是否已存在,父目录是否存在,用户是否有权限等
- namenode回来是否能够上传
- client恳求第一个 block该传输到哪些datanode服务器上
- namenode回来3个datanode服务器ABC
- client恳求3台dn中的一台A上传数据(实质上是一个RPC调用,树立pipeline),A收到恳求会继续调用B,然后B调用C,将整个pipeline树立完结,逐级回来客户端
- client开端往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给B,B传给C;A每传一个packet会放入一个应对行列等候应对
- 当一个block传输完结之后,client再次恳求namenode上传第二个block的服务器。
写流程详细:
- Client 主张文件读取恳求经过RPC与NameNode树立通讯,nameNode查看文件方位,来承认恳求文件 block 地点的方位
- NameNode会视状况回来文件的部分或许悉数block列表,关于每个block,NameNode 都会回来含有该 block 副本的 DataNode 地址;
- 这些回来的 DN 地址,会依照集群拓扑结构得出 DataNode 与客户端的间隔,然后进行排序,排序两个规则:网络拓扑结构中心隔 Client 近的排靠前;心跳机制中超时报告的 DN 状况为 STALE,这样的排靠后;
- Client 选取排序靠前的 DataNode 来读取 block,假定客户端自身便是DataNode,那么将从本地直接获取数据
- 底层上实质是树立 Socket Stream(FSDataInputStream),重复的调用父类 DataInputStream 的 read 办法,直到这个块上的数据读取完毕;
- 当读完列表的 block 后,若文件读取还没有完毕,客户端会继续向NameNode 获取下一批的 block 列表;
- 读取完一个 block 都会进行 checksum 验证,假定读取 DataNode 时呈现错误,客户端会告诉 NameNode,然后再从下一个具有该 block 副本的DataNode 继续读。
- Read 办法是并行的读取 block 信息,不是一块一块的读取;NameNode 只是回来Client恳求包含块的DataNode地址,并不是回来恳求块的数据;终究读取来一切的 block 会兼并成一个完整的终究文件。
hdfs-site.xml的3个首要特色是?
- dfs.name.dir→决议的是元数据存储的路径和DFS的存储办法(磁盘或远端)
- dfs.data.dir→决议的是数据存储的路径
- fs.checkpoint.dir→用于 SecondaryNameNode
NN和2NN作业机制

\1. 第一阶段:NameNode发动
- 第一次发动NameNode格局化后,创立Fsimage和Edits文件。假定不是第一次发动,直接加载修正日志和镜像文件到内存。
- 客户端对元数据进行增删改的恳求。
- NameNode记载操作日志,更新翻滚日志。
- NameNode在内存中对数据进行增删改。
\2. 第二阶段:Secondary NameNode作业
- Secondary NameNode问询NameNode是否需求CheckPoint。直接带回NameNode是否查看成果。
- Secondary NameNode恳求履行CheckPoint。
- NameNode翻滚正在写的Edits日志。
- 将翻滚前的修正日志和镜像文件复制到Secondary NameNode。
- Secondary NameNode加载修正日志和镜像文件到内存,并兼并。
- 生成新的镜像文件fsimage.chkpoint。
- 复制fsimage.chkpoint到NameNode。
- NameNode将fsimage.chkpoint从头命名成fsimage。
NN和2NN作业机制详解:
Fsimage:NameNode内存中元数据序列化后构成的文件。
Edits:记载客户端更新元数据信息的每一步操作(可经过Edits运算出元数据)。
NameNode发动时,先翻滚Edits并生成一个空的edits.inprogress,然后加载Edits和Fsimage到内存中,此刻NameNode内存就持有最新的元数据信息。Client开端对NameNode发送元数据的增删改的恳求,这些恳求的操作首要会被记载到edits.inprogress中(查询元数据的操作不会被记载在Edits中,由于查询操作不会更改元数据信息),假定此刻NameNode挂掉,重启后会从Edits中读取元数据的信息。然后,NameNode会在内存中履行元数据的增删改的操作。
由于Edits中记载的操作会越来越多,Edits文件会越来越大,导致NameNode在发动加载Edits时会很慢,所以需求对Edits和Fsimage进行兼并(所谓兼并,便是将Edits和Fsimage加载到内存中,照着Edits中的操作一步步履行,终究构成新的Fsimage)。SecondaryNameNode的效果便是帮助NameNode进行Edits和Fsimage的兼并作业。
SecondaryNameNode首要会问询NameNode是否需求CheckPoint(触发CheckPoint需求满足两个条件中的恣意一个,定时时刻到和Edits中数据写满了)。直接带回NameNode是否查看成果。SecondaryNameNode履行CheckPoint操作,首要会让NameNode翻滚Edits并生成一个空的edits.inprogress,翻滚Edits的目的是给Edits打个符号,今后一切新的操作都写入edits.inprogress,其他未兼并的Edits和Fsimage会复制到SecondaryNameNode的本地,然后将复制的Edits和Fsimage加载到内存中进行兼并,生成fsimage.chkpoint,然后将fsimage.chkpoint复制给NameNode,重命名为Fsimage后替换掉原来的Fsimage。NameNode在发动时就只需求加载之前未兼并的Edits和Fsimage即可,由于兼并过的Edits中的元数据信息现已被记载在Fsimage中。
二、Mapreduce
1.MapReduce作业流程





Application运转流程:
mr程序最早发动MRAppMaster(AM),AM发动后依据本次application的信息,核算出需求的maptask实例数量,然后向RM恳求机器发动相应数量的maptask进程,RM经过心跳感知现在集群container(容器)的作业繁忙状况,分配相应的container资源,相应containers的nodemanagr在各自节点上发动container。
2.Mapreduce详细流程总结:
1、inputformat:MR结构根底类之一,包含**数据切开(Data Splits)和记载读取器(Record Reader)**两部分。每个split包含后一个Block中开端部分的数据能够处理记载跨Block问题,每读取一条记载,调用一次map函数。
2、Map:每一个切片对应一个map,map输出的数据,放入环形溢写缓冲区,缓冲区默许100M,到达80M进行溢写,写入到本地文件。
3、Shuffle:shuffle是MapReduce核算结构的中心,包含了Partion, Sort, Spill, Meger, Combiner, Copy, Memery, Disk等分组动作;
3.1、partition对map的内容依据kv对进行分区
3.2、sort(快速排序),溢写到磁盘
3.3、数据兼并combiner(①削减数据写入磁盘的数据量 ② 削减网络传输的数据量 , 数据压缩)
AM监控到一切maptask进程使命完结之后,会依据客户指定的参数发动相应数量的reducetask进程,并告知reducetask进程要处理的数据规划(数据分区)。Reducetask进程发动之后,依据MRAppMaster告知的待处理数据地点方位,从若干台maptask运转地点机器上获取到若干个maptask输出成果文件,并在本地进行从头归并排序,然后依照相同key的KV为一个组,调用客户界说的reduce()办法进行逻辑运算,并搜集运算输出的成果KV,然后调用客户指定的outputformat将成果数据输出到外部存储。
3.4、fetch (经过RM,reduce找到指定的map自动fetch数据)
3.5、溢写,排序(归并排序)
3.6、merger(数据兼并①削减数据量 ② 进步履行功率)
4、reduce(汇总,聚合的进程)
5、output(hdfs)
3.Shuffle进程中触及两次排序:
1.快速排序:
sort阶段,环形缓冲区到达80%时,对数据进行快速排序,排序依照key的索引进行字典次序排序,然后开端进行溢写,从内存缓冲区不断溢出本地磁盘文件,或许会溢出多个文件 。
(1)算法进程
1.从数列中挑出一个元素,称为 “基准”(pivot);
2.从头排序数列,一切元素比基准值小的摆放在基准前面,一切元素比基准值大的摆在基准的后边(相同的数能够到任一边)。
在这个分区退出之后,该基准就处于数列的中心方位。这个称为分区(partition)操作;
\3. 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序;
2.归并排序
在小的文件merge成大文件时选用,归并排序在map端和reduce端都或许呈现。
算法进程:
1.恳求空间,使其巨细为两个现已排序序列之和,该空间用来寄存兼并后的序列;
2.设定两个指针,开端方位别离为两个现已排序序列的开端方位;
3.比较两个指针所指向的元素,挑选相对小的元素放入到兼并空间,并移动指针到下一方位;
4.重复进程 3 直到某一指针到达序列尾;
5.将另一序列剩下的一切元素直接仿制到兼并序列尾。
MR流程中触及的快排、归并排序,可参阅以下文章
图文详解—十大经典排序算法_珞沫的博客-CSDN博客_排序算法
三、海量数据处理实例剖析
1、海量日志数据,提取出某日拜访百度次数最多的那个IP。
处理方案:首要是将这一天,而且是拜访百度的日志中的IP取出来,逐一写入到一个大文件中。留意到IP是32位的,最多有个2^32个IP。同样能够选用映射的办法,比方模1000,把整个大文件映射为1000个小文件,再找出每个小文中呈现频率最大的IP(能够选用hash_map进行频率核算,然后再找出频率最大的几个)及相应的频率。然后再在这1000个最大的IP中,找出那个频率最大的IP,即为所求。
2、有一个1G巨细的一个文件,里边每一行是一个词,词的巨细不超越16字节,内存约束巨细是1M。回来频数最高的100个词。
处理方案:次序读文件中,关于每个词x,取hash(x)%5000,然后依照该值存到5000个小文件(记为x0,x1,…x4999)中。这样每个文件大概是200k左右。假定其间的有的文件超越了1M巨细,还能够依照相似的办法继续往下分,直到分解得到的小文件的巨细都不超越1M。 对每个小文件,核算每个文件中呈现的词以及相应的频率(能够选用trie树/hash_map等),并取出呈现频率最大的100个词(能够用含100个结点的最小堆),并把100个词及相应的频率存入文件,这样又得到了5000个文件。下一步便是把这5000个文件进行归并(相似与归并排序)的进程了。
3、有10个文件,每个文件1G,每个文件的每一行寄存的都是用户的query,每个文件的query都或许重复。要求你依照query的频度排序。
方案1: 次序读取10个文件,依照hash(query)%10的成果将query写入到别的10个文件(记为)中。这样新生成的文件每个的巨细大约也1G(假定hash函数是随机的)。 找一台内存在2G左右的机器,顺次对用hash_map(query, query_count)来核算每个query呈现的次数。运用快速/堆/归并排序依照呈现次数进行排序。将排序好的query和对应的query_cout输出到文件中。这样得到了10个排好序的文件(记为)。
对这10个文件进行归并排序(内排序与外排序相结合)。
方案2: 一般query的总量是有限的,只是重复的次数比较多而已,或许关于一切的query,一次性就能够加入到内存了。这样,咱们就能够选用trie树/hash_map等直接来核算每个query呈现的次数,然后按呈现次数做快速/堆/归并排序就能够了。
方案3: 与方案1相似,但在做完hash,分红多个文件后,能够交给多个文件来处理,选用散布式的架构来处理(比方MapReduce),终究再进行兼并。
4、 给定a、b两个文件,各寄存50亿个url,每个url各占64字节,内存约束是4G,让你找出a、b文件一同的url。
方案1:能够估计每个文件安的巨细为5G64=320G,远远大于内存约束的4G。所以不或许将其彻底加载到内存中处理。考虑采纳分而治之的办法。通读文件a,对每个url求取hash(url)%1000,然后依据所获得的值将url别离存储到1000个小文件(记为a0,a1,…,a999)中。这样每个小文件的大约为300M。通读文件b,采纳和a相同的办法将url别离存储到1000小文件(记为b0,b1,…,b999)。这样处理后,一切或许相同的url都在对应的小文件(a0vsb0,a1vsb1,…,a999vsb999)中,不对应的小文件不或许有相同的url。然后咱们只需求出1000对小文件中相同的url即可。求每对小文件中相同的url时,能够把其间一个小文件的url存储到hash_set中。然后遍历另一个小文件的每个url,看其是否在刚才构建的hash_set中,假定是,那么便是一同的url,存到文件里边就能够了。
方案2:假定答应有必定的错误率,能够运用Bloom filter,4G内存大概能够表明340亿bit。将其间一个文件中的url运用Bloom filter映射为这340亿bit,然后挨个读取别的一个文件的url,查看是否与Bloom filter,假定是,那么该url应该是一同的url(留意会有必定的错误率)。
5、腾讯面试题:给40亿个不重复的unsigned int的整数,没排过序的,然后再给一个数,怎么快速判别这个数是否在那40亿个数傍边?
方案1:恳求512M的内存,一个bit位代表一个unsigned int值。读入40亿个数,设置相应的bit位,读入要查询的数,查看相应bit位是否为1,为1表明存在,为0表明不存在。
方案2:由于2^32为40亿多,所以给定一个数或许在,也或许不在其间;这儿咱们把40亿个数中的每一个用32位的二进制来表明假定这40亿个数开端放在一个文件中。然后将这40亿个数分红两类: 1.最高位为0 2.最高位为1 并将这两类别离写入到两个文件中,其间一个文件中数的个数<=20亿,而另一个>=20亿(这相当于减半了);与要查找的数的最高位比较并接着进入相应的文件再查找再然后把这个文件为又分红两类: 1.次最高位为0 2.次最高位为1,并将这两类别离写入到两个文件中,其间一个文件中数的个数<=10亿,而另一个>=10亿(这相当于减半了); 与要查找的数的次最高位比较并接着进入相应的文件再查找。 ……. 以此类推,就能够找到了,而且时刻复杂度为O(logn),方案2完。
附:这儿,再简略介绍下,位图办法: 运用位图法判别整形数组是否存在重复 判别调集中存在重复是常见编程使命之一,当调集中数据量比较大时咱们一般希望少进行几次扫描,这时两层循环法就不行取了。位图法比较适宜于这种状况,它的做法是依照调集中最大元素max创立一个长度为max+1的新数组,然后再次扫描原数组,遇到几就给新数组的第几方位上1,如遇到5就给新数组的第六个元素置1,这样下次再遇到5想置位时发现新数组的第六个元素现已是1了,这阐明这次的数据必定和以前的数据存在着重复。这种给新数组初始化时置零其后置一的做法相似于位图的处理办法故称位图法。它的运算次数最坏的状况为2N。假定已知数组的最大值即能事先给新数组定长的话功率还能进步一倍。
四、YARN
1.YARN中心组件

1.RM(ResourceManager)资源办理器
RM是一个全局的资源办理器,担任整个体系的资源办理和分配。
它首要由两个组件构成:调度器(Scheduler)和运用程序办理器(Applications Manager,ASM),浅显讲是用于办理NodeManager节点的资源,包含cup、内存等。
2.NodeManager(NM)+ (DataNode 硬盘 CPU 内存)
NM是每个节点上的资源和使命办理器,一方面,它会定时地向RM报告本节点上的资源运用状况和各个Container的运转状况;另一方面,它接纳并处理来自AM的Container发动/中止等各种恳求。
3.Applications Manager(运用程序办理器)new ApplicationMaster 监控一切job的使命运转成果的监控 –> 客户端
担任办理整个体系中一切运用程序,包含运用程序提交、与调度器协商资源以发动ApplicationMaster、监控ApplicationMaster运转状况并在失利时从头发动它等。是AM的AM。
4.ApplicationMaster(AM)job进程监控
ApplicationMaster 办理在YARN内运转的每个运用程序实例。每个运用程序对应一个ApplicationMaster。ApplicationMaster 担任和谐来自 ResourceManager 的资源,并经过 NodeManager 监督容器的履行和资源运用(CPU、内存等的资源分配),浅显讲是办理主张的使命,跟着使命创立而创立,使命的完结而完毕。
5.Container(重要) –> 资源接口 –> map reduce mapContainer reduceContainer spark –> sparkContainer
Container是YARN中的资源笼统,它封装了NodeManager节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM恳求资源时,RM为AM回来的资源便是用Container表明的。YARN会为每个使命分配一个Container,且该使命只能运用该Container中描绘的资源。(Container的具有优先等级,包含:行列 一般用户 VIP会员, 权重高的必定是先履行)
2.YARN作业流程

归纳:
进程1:用户将运用程序提交到 ResourceManager 上;
进程2:ResourceManager 为运用程序 ApplicationMaster 恳求资源,并与某个 NodeManager 通讯发动第一个 Container,以发动ApplicationMaster;
进程3:ApplicationMaster 与 ResourceManager 注册进行通讯,为内部要履行的使命恳求资源,一旦得到资源后,将于 NodeManager 通讯,以发动对应的 Task;
进程4:一切使命运转完结后,ApplicationMaster 向 ResourceManager 刊出,整个运用程序运转完毕。
详细流程:
– client向RM提交运用程序,其间包含发动该运用的ApplicationMaster的有必要信息,例如ApplicationMaster程序、发动ApplicationMaster的指令、用户程序等
– ResourceManager发动一个NodeManager的一个container用于运转ApplicationMaster
– 发动中的ApplicationMaster向ResourceManager注册自己,发动成功后与RM坚持心跳
– ApplicationMaster向ResourceManager发送恳求,恳求相应数目的container
– 恳求成功的container,由ApplicationMaster进行初始化。container的发动信息初始化后,AM与对应的NodeManager通讯,要求NM发动container
– NM发动container
– container运转期间,ApplicationMaster对container进行监控。container经过RPC协议向对应的AM报告自己的进展和状况等信息
– 运用运转完毕后,ApplicationMaster向ResourceManager刊出自己,并答应归于它的container被收回
3.YARN默许的调度器别离是什么,及他们差异?
YARN调度器首要分为三类:
– 1、FIFO :先进先出,同一个行列中现先提交的先履行,后边等候
– 2、Capacity Scheduler(容量调度器): 答应创立多个使命行列,每个行列运用一切资源的一部分。多个使命行列可 以一同履行。可是一个行列内部仍是先进先出。
– 3、Fair Scheduler(公正调度): 第一个程序在发动时能够占用其他行列资源(100%占用),当其他行列有 使命提交时,占用资源的行列需求将资源还给该使命。还资源的时分,功率 比较慢。
五、Zookeeper
参阅我的这篇文章:Zookeeper_GoAl的博客-CSDN博客
六、Hive
Hive:依据Hadoop一个数据库房东西,能够将结构化的数据文件映射为一张数据库表,并供给简略的 sql 查询功用,能够将 sql 句子转化为 MapReduce 使命进行运转。适宜数据库房的核算剖析。
1、Hive架构

2、三大组件
1、用户接口:包含 CLI、JDBC/ODBC、WebGUI。
- CLI(command line interface)为 shell 指令行,进行交互式履行SQL:直接与Driver进行交互。CLI发动的时分,会一同发动一个 Hive 副本
- JDBC/ODBC 驱动是 Hive 的 JAVA 完结,作为JAVA的API:JDBC是经过Thift Server来接入,然后发送给Driver
- WebGUI 是经过浏览器拜访 Hive。
- HiveServer2依据Thrift, 答应长途客户端运用多种编程言语如Java、Python向Hive提交恳求
2、Metastore:存储元数据
- Hive 将元数据存储在数据库中,如MySQL、derby
- Hive 中的元数据包含表的姓名、表的列、分区及其特色、表的特色(是否为外部表等)、表的数据地点目录等。
3、Driver(驱动模块):包含解说器、编译器、优化器、履行器
- 经过该模块对输入进行解析编译,对需求的核算进行优化,然后依照指定的进程进行(一般发动多个MR使命来履行)
- 解说器、编译器、优化器、履行器完结 HQL 查询句子从词法剖析、语法剖析、编译、优化以及查询方案的生成。生成的查询方案存储在 HDFS 中,并在随后由 MapReduce 调用履行
3.Hive 内部表和外部表差异?:
(1)是否直接经过external
(2)删去外部表,元数据得到删去,可是数据不会真实删去,针对内部表,元数据和数据都被删去
(3)在导入数据到外部表,数据并没有移动到自己的数据库房目录下,也便是说外部表中的数据并不是由它自己来办理的!而内部表则不相同
留意:内部表和外部表场景:
内部表:逻辑处理的中心进程生成的中心表,或许一些暂时表,直接删去即可
外部表:能够用户存储一些日志信息,数据不会被删去
4.Hive与联络型数据库差异:
Hive表明纯逻辑,只要表界说,不存数据。读多写少,不支撑数据的改写和删去

5.order by ,sort by , distribute by , cluster by 的差异?
1.Order by会对所给的悉数数据进行全局排序,只发动一个reduce来处理。
Sort by是部分排序,它能够依据数据量的巨细发动一到多个reducer来作业,并在每个reduce中独自排序。
3.Distribute by 相似于mr中的partition,选用hash算法,在map端将查询成果中hash值相同的成果分发到对应的reduce中,结合sort by运用。
4.Cluster by 能够看作是distribute by 和sort by的结合,当两者后边所跟的字段列名相一同,效果就等同于运用cluster by,可是cluster by终究的成果只能是降序,无法指定升序和降序。
注:不带count,sum这些聚合函数的,都不会走mapreduce。
6.Hive最全函数介绍:
UDF:一般函数:1对1的联络,select句子,例如:数据格局
UDAF:聚合函数:多对1的联络,结合group by联合运用
UDTF:生成函数:1对多
用 UDF 函数解析公共字段;用 UDTF 函数解析事情字段。 自界说 UDF:承继 UDF,重写 evaluate 办法 自界说 UDTF:承继自 GenericUDTF,重写 3 个办法:initialize(自界说输出的列名和类型), process(将成果回来 forward(result),close 。 为什么要自界说 UDF/UDTF,由于自界说函数,能够自己埋点 Log 打印日志,出错或许数据异常,便利调试。
Hive窗口函数:
Hive指令行窗口查看函数界说 :desc function 函数名;
HIve窗口函数实战可参阅:Hive开窗函数总结_Abysscarry的博客-CSDN博客_hive的开窗函数
Hive常用日期函数
unix_timestamp:回来当时或指定时刻的时刻戳
from_unixtime:将时刻戳转为日期格局
current_date:当时日期
current_timestamp:当时的日期加时刻
to_date:抽取日期部分
year:获取年
month:获取月
day:获取日
hour:获取时
minute:获取分
second:获取秒
weekofyear:当时时刻是一年中的第几周
dayofmonth:当时时刻是一个月中的第几天
months_between: 两个日期间的月份
add_months:日期加减月
datediff:两个日期相差的天数
date_add:日期加天数
date_sub:日期减天数
last_day:日期的当月的终究一天
日期处理函数实例
1)date_format函数(依据格局整理日期)
hive (gmall)> select date_format(‘2019-02-10′,’yyyy-MM’);
2019-02
2)date_add函数(加减日期)
hive (gmall)> select date_add(‘2019-02-10’,1);
2019-02-11
hive (gmall)> select date_add(‘2019-02-10’,-1);
2019-02-09
3)next_day函数
(1)取当时天的下一个周一
hive (gmall)> select next_day(‘2019-02-12′,’MO’);
2019-02-18
阐明:星期一到星期日的英文(Monday,Tuesday、Wednesday、Thursday、Friday、Saturday、Sunday)
(2)取当时周的周一
hive (gmall)> select date_add(next_day(‘2019-02-12′,’MO’),-7);
2019-02-11
4)last_day函数(求当月终究一天日期)
hive (gmall)> select last_day(‘2019-02-10’);
2019-02-28
常用取整函数
round: 四舍五入 ceil: 向上取整 floor: 向下取整
常用字符串操作函数 upper: 转大写 lower: 转小写 length: 长度 trim: 前后去空格 lpad: 向左补齐,到指定长度 rpad: 向右补齐,到指定长度 regexp_replace: SELECT regexp_replace(‘100-200’, ‘(\d+)’, ‘num’) ; 运用正则表达式匹配方针字符串,匹配成功后替换!
调集操作 size: 调集中元素的个数 map_keys: 回来map中的key map_values: 回来map中的value array_contains: 判别array中是否包含某个元素 sort_array: 将array中的元素排序
7.Hive的数据办理:
1)内表外表(2)Partition辅佐查询,缩小查询规划,加速数据检索速度(3)Bucket 操控reduce数量
hive分区
参阅我的这篇文章:CSDN
hive分桶:
分桶是将整个数据内容依照某列特色值去hash值进行差异,对获得的hash再做模运算(columnValue.hashCode % 桶数),具有相同成果的数据进入同一个文件中。
8.Hive实战数据剖析
参阅我的这两篇文章:Hive实战_GoAl的博客-CSDN博客
七、Sqoop实战
1 Mysql数据导入HDFS上.
1. 全量导入:
将mysql表中悉数数据都导入HDFS,假定HDFS中存在这个目录的话就会报错,默许存储的HDFS目录是 /user/root/XXX.
bin/sqoop import (在sqoop的装置目录内,import表名是导入)
–connect jdbc:mysql://192.168.52.130:3306/userdb (衔接:协议:数据库类型://ip地址:端口号/数据库)
–username root (用户名 root)
–password 123456 (暗码 123456)
–table emp (表 emp)
–m 1 (–num-mappers:运用几个mapper,写1就能够)
若要导入到HDFS指定目录下,并指定字段之间的分隔符:
运用参数 –target-dir 来指定导出目的地,
运用参数 –delete-target-dir 来判别导出目录是否存在,假定存在就删掉.
运用参数 –fields-terminated-by ‘\t’ 运用”\t”来切开字段,sqoop默许是运用’,’逗号进行切开的.
bin/sqoop import (在sqoop的装置目录内,import表名是导入)
–connect jdbc:mysql://192.168.52.130:3306/userdb (衔接:协议:数据库类型://ip地址:端口号/数据库)
–username root (用户名 root)
–password 123456 (暗码 123456)
–table emp (表 emp)
–delete-target-dir (假定指定目录存在就删去它)
–target-dir /sqoop/emp (导入到指定目录)
–fields-terminated-by ‘\t’ (指定字段切开符为\t)
–m 1 (–num-mappers:运用几个mapper,写1就能够)
2.增量导入:
将数据库中某一字段,增加的导入,在HDFS上独自构成一个文件.
留意:增量导入的时分,必定不能加参数–delete-target-dir不然会报错
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/myhive
–username root
–password 123456
–table emp
–incremental append (表明增量导入)
–check-column id (查看哪个字段,这儿查看的是mysql数据库表中的id字段)
–last-value 4 (id字段终究一个id是4,那增量导入的话便是从id=5开端往后导入)
–m 1
3.减量导入:
设置where条件,经过条件能够判别削减的数据或增加的数据,操控愈加灵敏一些,例如能够经过表创立时刻来判别数据是哪一天生成的等,每个表均设置3个字段,create_time(表创立时刻),update_time(表更新时刻),is_delete(是否删去)
留意:where条件的当地需求运用“”双引号引起来,不然where条件失效
bin/sqoop import \
–connect jdbc:mysql://192.168.52.130:3306/userdb \
–username root \
–password admin \
–table emp \
–incremental append \
–where “create_time > ‘2019-02-14 00:00:00′ and is_delete=’1’ and create_time < ‘2019-02-14 23:59:59′” \
–target-dir /sqoop/incement \
–check-column id \
–m 1
**4.SQL*****句子查找导入HDFS***
咱们还能够经过 –query参数来指定咱们的sql句子,经过sql句子来过滤咱们的数据进行导入
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–delete-target-dir
–query ‘select phno from emp_conn where 1=1 and $CONDITIONS’
–target-dir /sqoop/emp_conn
–m 1
留意事项:
运用sql句子来进行查找是不能加参数–table
而且有必要要增加where条件,
而且where条件后边有必要带一个$CONDITIONS 这个字符串,
而且这个sql句子有必要用单引号,不能用双引号.
2. Mysql数据导入Hive上.
1.将咱们mysql表傍边的数据直接导入到hive表中的话,需求将hive的一个叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包复制到sqoop的lib目录下
cp /export/servers/hive-1.1.0-cdh5.14.0/lib/hive-exec-1.1.0-cdh5.14.0.jar /export/servers/sqoop-1.4.6-cdh5.14.0/lib/
2.将咱们mysql傍边的数据导入到hive表傍边来,需求准备hive数据库与表
hive (default)> create database sqooptohive;
hive (default)> use sqooptohive;
hive (sqooptohive)> create external table emp_hive(id int,name string,deg string,salary int ,dept string) row format delimited fields terminated by ‘\t’;
3.导入句子
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–table emp
–fields-terminated-by ‘\t’ (字段之间的分隔符)
–hive-import (将数据从mysql数据库中导入到hive表中)
–hive-table qooptohive.emp_hive (后边接要创立的hive表,数据库中的某个表,运用”.”衔接)
–hive-overwrite (覆盖掉在hive表中现已存在的数据)
–delete-target-dir
–m 1
留意:咱们还能够导入联络表到hive并自动创立hive表,导入
bin/sqoop import
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–table emp_conn
–hive-import
–hive-database sqooptohive (–hive-database 后边直接接数据库名)
–m 1
3.Sqoop的数据导出
*将数据从HDFS**把文件导出到mysql数据库,*导出前,方针表有必要存在于方针数据库中。
数据是在HDFS傍边的如下目录/sqoop/emp,数据内容如下
1201,gopal,manager,50000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1202,manisha,Proof reader,50000,TP,2018-06-15 18:54:32.0,2018-06-17 20:26:08.0,1
1203,khalil,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1204,prasanth,php dev,30000,AC,2018-06-17 18:54:32.0,2018-06-17 21:05:52.0,0
1205,kranthi,admin,20000,TP,2018-06-17 18:54:32.0,2018-06-17 18:54:32.0,1
1.创立mysql表
CREATE TABLE
emp_out
(
id
INT(11) DEFAULT NULL,
name
VARCHAR(100) DEFAULT NULL,
deg
VARCHAR(100) DEFAULT NULL,
salary
INT(11) DEFAULT NULL,
dept
VARCHAR(10) DEFAULT NULL,
create_time
TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
update_time
TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_delete
BIGINT(20) DEFAULT ‘1’) ENGINE=INNODB DEFAULT CHARSET=utf8;
2.履行导出指令:经过export来完结数据的导出,将hdfs的数据导出到mysql傍边去
bin/sqoop export
–connect jdbc:mysql://192.168.52.130:3306/userdb
–username root
–password 123456
–table emp_out
–export-dir /sqoop/emp
–input-fields-terminated-by “,”
3.验证mysql表数据

八、Hbase–散布式列存储NOSQL数据库
1、Hbase数据存储在hdfs,少量存内存
2、hbase适宜海量稀少数据存储
hbase归于nosql数据库,列存储
3、与传统联络型数据库对比:
行存储:传统联络型数据mysql、oracle
长处:确保数据完整性,写入查看
缺点:读的进程会产生冗余信息
列存储:Nosql数据库
长处:读的进程不会产生冗余
缺点:写入功率差,不确保完整性
4、Hbase长处:
(1)存储海量数据
(2)快速随机拜访
(3)进行很多的改写操作
Hbase的长处及运用场景:
- 半结构化或非结构化数据:
关于数据结构字段不够承认或乱七八糟十分难按一个概念去进行抽取的数据适宜用HBase,由于HBase支撑动态增加列。
- 记载很稀少:
RDBMS的行有多少列是固定的。为null的列浪费了存储空间。HBase为null的Column不会被存储,这样既节约了空间又进步了读功用。
- 多版别号数据:
依据Row key和Column key定位到的Value能够有随意数量的版别号值,因此关于需求存储变动历史记载的数据,用HBase是很便利的。比方某个用户的Address变更,用户的Address变更记载也许也是具有研讨含义的。
- 仅要求终究一致性:
关于数据存储事务的要求不像金融行业和财务体系这么高,只需确保终究一致性就行。(比方HBase+elasticsearch时,或许呈现数据不一致)
- 高可用和海量数据以及很大的瞬间写入量:
WAL处理高可用,支撑PB级数据,put功用高
- 适用于刺进比查询操作更频频的状况。比方,关于历史记载表和日志文件。(HBase的写操作愈加高效)
- 事务场景简略:
不需求太多的联络型数据库特性,列入穿插列,穿插表,事务,衔接等。
Hbase的缺点:
- 单一RowKey固有的局限性决议了它不或许有效地支撑多条件查询[2]
- 不适宜于大规划扫描查询
- 不直接支撑 SQL 的句子查询
5、Hbase结构:rowkey -> Column Family -> Column Qualifer列族详细列
rowkey行键
table的主键,table中的记载依照rowkey 的字典序进行排序
Column Family列族
hbase表中的每个列,都归属与某个列族。列族是表的schema的一部分(而列不是),有必要在运用表之前界说。
Timestamp时刻戳
每次数据操作对应的时刻戳,能够看作是数据的version number版别号
Column列
列族下面的详细列
归于某一个ColumnFamily,相似于咱们mysql傍边创立的详细的列
cell单元格
由{row key, column( = + ), version} 唯一承认的单元
cell中的数据是没有类型的,悉数是以字节数组进行存储
6、Hbase逻辑模型:三维有序
Rowkey -> Column Family -> Column Qualifier -> Timestamp
rowkey行(正序, 从小到大)、column列(正序从小到大)、timestamp时刻(倒叙从大到小)
面试点:为什么说hbase表的列族不宜超越3个?
a、列族数量决议store, 一个store至少有一个memstore,而memstore占内存
b、假定列族越多的话,构成更多的flush会产生更多IO
flush的最小单位是region, 一个region中的某个列族做flush , 其他的列族也会做flush
频频的flush产生更多的storeFile,storeFile增多就会产生更多compaction操作
compaction操作和flush都是重IO操作
c、列族过多,split操作会呈现数据不均匀的状况
散列准则:
条件:服务器的装备不是很好而且对查询速度要求不是很高
rowkey规划为:random+时刻
目的:防止某一个或某几个regionserver成为热门
有序准则:
条件:服务器自身的装备要高一些, 会呈现一个或是多个region热门效应
rowkey规划为:时刻+random
Hbase shell 根底
list_namespace 查看一切数据,相似于show database;
scan ‘hbase:meta’ 查看元数据信息
–创立表 ‘cf1′,’cf2’ 表明列族
create ‘badou_20_a’,’cf1′,’cf2′
— 查看表的结构
describe ‘badou_20_a’
— 删去cf1列族
alter ‘badou_20_a’,{NAME=>’cf1′,METHOD=>’delete’}
— 查看存在哪些表
list
exists ‘badou_20_a’
— 保存两个版别的数据, IN_MEMORY数据保存到内存中
alter ‘badou_20_a’,{NAME=>’cf2′,VERSIONS=>2,IN_MEMORY=>true}
— 删去表
disable ‘badou_20_a’ : 将表转化为去激活的状况
drop ‘badou_20_a’ : 删去表
— 激活表
enable ‘badou_20_a’
— 刺进记载
put ‘badou_20′,’1003′,’cf2:name’,’root’
put ‘badou_20′,’1004′,’cf2:name’,’scott’
— 获取记载
scan ‘badou_20’ 留意 hbase表的数据量特别大的时分, scan 慎用
— 依据rowkey 查询
get ‘badou_20′,’1001’
— 依据列族获取
get ‘badou_20′,’1001′,{COLUMN=>’cf2:name’}
— 依据列族和指定的时刻戳进行获取
get ‘badou_20′,’1001′,{COLUMN=>’cf2:name’,TIMESTAMP=>1615465406738}
— 查询表的记载
count ‘badou_20’
— 强制刷出内存的数据到HDFS
flush ‘badou_20’
— 清除表的数据,保存表的结构
truncate ‘order’
9、hbase shell 进阶
— 修正 badou_20 版别为2
put ‘badou_20′,’1001′,’cf2:name’,’max’
put ‘badou_20′,’1001′,’cf2:name’,’avg’
alter ‘badou_20′,{NAME=>’cf2’,VERSIONS=>2}
怎么显示两个版别?
scan ‘badou_20’,{VERSIONS=>2}
get ‘badou_20′,’1001′,{COLUMN=>’cf2:name’,VERSIONS=>2}
get ‘badou_20′,’1001′,{COLUMN=>’cf2’,VERSIONS=>2}
— 修正表的版别
alter ‘badou_20′,{NAME=>’cf2’,VERSIONS=>3}
alter ‘badou_20′,{NAME=>’cf2’,VERSIONS=>4}
— TTL 依照规则的时刻对数据进行超时刻设置
TTL => ‘FOREVER’ 表明数据永不过期
TTL => ’60 SECONDS 表明一分钟之前的数据会过期
create ‘tt_table’,{NAME=>’cf1′,TTL=>60}
1616311193758
put ‘tt_table’,’rowkey001′,’cf1:age’,’30’,1616311993900
九、Flume实战
*1、搜集目录到HDFS*
搜集需求:某服务器的某特定目录下,会不断产生新的文件,每逢有新文件呈现,就需求把文件搜集到HDFS中去
依据需求,首要界说以下3大要素
搜集源,即source——监控文件目录 : spooldir
下沉方针,即sink——HDFS文件体系 : hdfs sink
source和sink之间的传递通道——channel,可用file channel 也能够用内存channel
装备文件编写:
#界说三大组件的称号
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# 装备source组件
agent1.sources.source1.type = spooldir
agent1.sources.source1.spoolDir = /home/hadoop/logs/
agent1.sources.source1.fileHeader = false
#装备拦截器
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# 装备sink组件
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
#agent1.sinks.sink1.hdfs.round = true
#agent1.sinks.sink1.hdfs.roundValue = 10
#agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
Channel参数解说:
capacity:默许该通道中最大的能够存储的event数量
trasactionCapacity:每次最大能够从source中拿到或许送到sink中的event数量
keep-alive:event增加到通道中或许移出的答应时刻
*2、搜集文件到HDFS*
搜集需求:比方事务体系运用log4j生成的日志,日志内容不断增加,需求把追加到日志文件中的数据实时搜集到hdfs
依据需求,首要界说以下3大要素
搜集源,即source——监控文件内容更新 : exec ‘tail -F file’
下沉方针,即sink——HDFS文件体系 : hdfs sink
Source和sink之间的传递通道——channel,可用file channel 也能够用 内存channel
装备文件编写:
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
agent1.sources.source1.channels = channel1
#configure host for source
agent1.sources.source1.interceptors = i1
agent1.sources.source1.interceptors.i1.type = host
agent1.sources.source1.interceptors.i1.hostHeader = hostname
# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.rollSize = 102400
agent1.sinks.sink1.hdfs.rollCount = 1000000
agent1.sinks.sink1.hdfs.rollInterval = 60
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
更多source和sink组件:
Flume支撑很多的source和sink类型,详细手册可参阅官方文档
Flume 1.9.0 User Guide — Apache Flume
十、Kafka介绍
1.Apache Kafka简介
Kakfa开端由Linkedin公司开发,运用 Scala 编写,具有高吞吐、可耐久化、可水平扩展的依据发布/订阅办法的散布式音讯行列,支撑分区战略、多副本战略,依据zookeeper和谐的散布式音讯体系,首要运用于大数据的实时或离线数据处理、日志搜集以及实时指标监控等范畴。

2.Kafka常用术语
**音讯:**message,音讯是kafka的根本数据单元,代表着一条一条的数据,为了进步网络和存储的运用率,出产者会批量发送音讯到Kafka,并在发送之前对音讯进行压缩。
主题:topic,主题是kafka对音讯的分类,是一个逻辑概念,能够看作音讯调集,用于接纳不同事务的音讯。
**分区:**partition,相似数据库的分区表,一般topic下会多个分区,每个分区内的数据是有序的,同一个topic的多个分区kafka不确保音讯的次序性,一个分区在逻辑上对应一个Log,对应磁盘上的一个文件夹。
**偏移量:**offset,偏移量是表明音讯在分区中的方位,kafka存储的文件是依照offset.log的格局来命名的,便于快速查找。

**副本:**replicas,副本是针对分区而言的,kafka对音讯做了冗余备份,目的便是为了容灾,每个分区能够指定多个副原本冗余数据,分为leader partition和follower partition,只要leader partition对外供给服务,follower partition单纯是从leader partition同步数据,因此会存在多份相同的数据。

出产者:producer,出产者是kafka集群的上游,望文生义便是往kafka输入数据的客户端。
顾客:comsumer,顾客是kafka集群的下流,与出产者相辅相成,kafka相似一个库房,出产者担任出产音讯往库房放,自然得有顾客从库房里拿音讯,不然库房简略爆满。
顾客组:Comsumer Group,简称CG,这个比较简略了解,便是将多个顾客绑缚起来,组团消费音讯,一个Consumer只能归于一个Consumer Group,Kafka还经过Consumer Group完结了顾客的水平扩展和毛病搬运。

节点:broker,一个broker便是一个kafka server实例,多个broker组成kafka集群,首要用于接纳出产者发送过来的音讯,写入磁盘,一同能够接纳顾客和其他broker的恳求。
从头负载均衡:rebalance,当顾客组的顾客实例呈现改动时,例如新增顾客或许削减顾客,都会触发kafka的Rebalance机制,这个进程比较耗功用,要尽量防止这个进程被触发。
Kafka架构
咱们把架构分主从架构和对等架构,主从架构便是分为办理节点和作业节点,责任不同,如HDFS 、spark、flink;对等架构则不差异节点特色,一切的实例责任都是相同的,kafka的架构有点相似于对等架构,但又不彻底是。Kafka的规划理念之一便是一同供给离线处理和实时处理。

Kafka ACK音讯承认机制有三个值,别离为1,0和-1,默许是1,对应不同的状况:
- ack=1,意味着producer要等候leader成功收到数据并得到承认,才发送下一条message,安全性较高可是功用相对较低。
- ack=0,意思便是说,我只管发送音讯,不必你给我回复,成就成,不成我也不论,这种战略的功用是最高的,可是简略丢掉数据。
- ack=-1,这种状况下,出产者只要收到一切副本写入成功的告诉后,才会以为音讯写入成功,安全性最高,可是功用是三者里边最差的。
kafka分区和副本机制
kafka分区机制:
完结kafka高吞吐量的重要手法,完结流量分发和负载均衡,试想一下,假定一切的音讯都往同一个数据写,关于服务器来说会构成极高的负载,特别是呈现热门数据的时分简略崩溃,关于多个出产者和多个顾客来说,只要一个分区能够用于出产和消费,这显然是十分受限的。
kafka供给了三种分区战略:
- 轮询战略:Round-robin,轮询战略是Kafka默许的分区战略,依据主题分区数量自始至终进行轮询,目的便是为了将音讯均匀地散布在分区中,确保音讯最大极限地被平均分配到一切分区上。
- 随机战略:Range Strategy,所谓随机便是咱们随意地将音讯放置到恣意一个分区上,随机分发,这样有或许会构成音讯分发不均匀,比较之下,轮询战略显得愈加合理,旧版别默许是用随机战略,新版别默许用的是轮询战略。
- 按key分发战略:望文生义便是依据音讯的key指定分区写入,这种办法主观性比较强,相对比较灵敏。
除此之外,kafka支撑自界说分区器,完结更多复杂的逻辑处理音讯。
kafka副本机制:
为了供给数据冗余、数据备份的安全战略,等同于备份,实践上,根本一切的散布式音讯行列都会存在副本机制,不光是音讯行列,HDFS也是如此。
前面在kafka常用术语中说到,Kafka 是有主题概念的,主题下区分红若干个分区,副本是分区的逻辑概念,分区能够指定多个副本。实质上副本便是一个只能不断追加的日志文件,在实践的出产中,为了保障数据安全,一般会装备多个副本,依据算法涣散在不同的broker上,一份数据(leader和副本)不会一同呈现在一台服务器上,这样当服务器呈现毛病时,能够最大程度确保数据不丢掉,如下图。

其间leader partition和 follower partition的作业原理如下,正常状况下,只要leader partition对外供给服务,follower partition担任从leader partition拉取数据,当leader发送毛病时,follower具有被选举为新leader的权力。

3.Kafka的优势
- 支撑数据离线和实时处理
- 能确保音讯的可靠性传递
- 支撑音讯的耐久化存储,并经过多副本散布式的存储方案来确保音讯的容错
- 高吞吐率,每秒处理百万级的音讯量
- 高并发,支撑数千个客户端一同读写
- 支撑在线水平扩展
kafka为什么能完结读写都这么快呢?
答:离不开kafka次序读写机制和零复制数据传输,削减了寻址的时刻耗费,降低了读写推迟,一同有利于快到定位音讯偏移量,零复制机制能够进步数据传输的功率,削减IO资源的占用。
十一、Spark
1.Spark介绍
1.1 什么是spark
- 依据内存的散布式核算结构
- 只担任算 不担任存
- spark 在离线核算 功用上 相似于mapreduce的效果
1.2 为什么用spark
- MapReduce的缺点
- 运转速度慢 (没有充分运用内存)
- 接口比较简略,仅支撑Map Reduce
- 功用比较单一 只能做离线核算
- 不适宜迭代核算(如机器学习、图核算等等),交互式处理(数据挖掘)
- 不适宜流式处理(点击日志剖析)
- 需求一种灵敏的结构可一同进行批处理、流式核算、交互式核算
- 内存核算引擎,供给cache机制来支撑需求重复迭代核算或许屡次数据同享,削减数据读取的IO开支
- DAG引擎,削减屡次核算之间中心成果写到HDFS的开支
- 运用多线程模型来削减task发动开支,shuffle进程中防止不必要的sort操作以及削减磁盘IO
- spark的缺点是:吃内存,不太稳定
- Spark优势
- 速度快(比mapreduce在内存中快100倍,在磁盘中快10倍)spark中的job中心成果能够不落地,能够寄存在内存中。 mapreduce中map和reduce使命都是以进程的办法运转着,而spark中的job是以线程办法运转在进程中。
- 易用性(能够经过java/scala/python/R开发spark运用程序)
- 通用性(能够运用spark sql/spark streaming/mlib/Graphx)
- 兼容性(spark程序能够运转在standalone/yarn/mesos)
2. RDD 的概念
RDD(Resilient Distributed Dataset)*叫做*弹性散布式数据集,是Spark中最根本的数据笼统,它代表一个不行变、可分区、里边的元素可并行核算的调集.
-
Dataset:一个数据集,简略的了解为调集,用于寄存数据的
-
Distributed:它的数据是散布式存储,而且能够做散布式的核算
-
Resilient:弹性的
-
它表明的是数据能够保存在磁盘,也能够保存在内存中
-
数据散布式也是弹性的
-
弹性:并不是指他能够动态扩展,而是
容错机制
。
- RDD会在多个节点上存储,就和hdfs的散布式道理是相同的。hdfs文件被切分为多个block存储在各个节点上,而RDD是被切分为多个partition。不同的partition或许在不同的节点上
- spark读取hdfs的场景下,spark把hdfs的block读到内存就会笼统为spark的partition。
- spark核算完毕,一般会把数据做耐久化到hive,hbase,hdfs等等。咱们就拿hdfs举例,将RDD耐久化到hdfs上,RDD的每个partition就会存成一个文件,假定文件小于128M,就能够了解为一个partition对应hdfs的一个block。反之,假定大于128M,就会被且分为多个block,这样,一个partition就会对应多个block。
-
-
一切spark中对数据的操作终究都会转化成RDD的操作
- spark sql
- spark streaming
- spark ml 、spark mllib
-
RDD是不行变的
- 父RDD 生成一个子 RDD 父RDD的状况不会改动
- 从容错的视点去做这样的规划
2.1 RDD的创立
-
创立RDD之前先要有spark context
conf = SparkConf().setAppName(appName).setMaster(master) sc = SparkContext(conf=conf)
-
经过内存中的数据创立RDD
- data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
-
创立RDD时能够指定 partition的数量(RDD会分红几份)一个partition会对应一个task,依据CPU的内核数来指定partition (1核对应2~4个partition)
-
从文件创立RDD 能够是HDFS支撑的任何一种存储介质
- 能够从 hdfs、 数据库(mysql) 、本地文件体系、 hbase 这些当地加载数据创立RDD
- rdd = sc.textFile(‘file:///root/tmp/test.txt’)
2.2 RDD的三类算子
-
transformation
- 一切的transformation 都是推迟履行的,只需不调用action 不会履行,只是记载进程
- transformation 这一类算子回来值仍是 rdd
- rdd.transformation 还会得到新的rdd
- map(func) 将func函数效果到数据集的每一个元素上,生成一个新的RDD回来
- filter(func) 选出一切func回来值为true的元素,生成一个新的RDD回来
- flatMap(func) 会先履行map的操作,再将一切目标兼并为一个目标
- union() 取并集
- intersection() 交集
- groupByKey() 以元组中的第0个元素作为key,进行分组,回来一个新的RDD 成果中 value是一个Iterable
- reducebykey(func) 将key相同的键值对,依照Function进行核算
- sortbykey(ascending=True, numPartitions=None, keyfunc=)Sorts this RDD, which is assumed to consist of (key, value) pairs.依照关键词排序,排完后函数操作
-
action
- 会触发之前的rdd一切的transformation
- 获取终究的成果
- collect 一切的成果都会加载到内存中
- reduce将RDD中元素两两传递给输入函数,一同产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到终究只要一个值中止。
- fitst 第一个
- take(num) 前n个
- count()
-
persist
- 数据存储,能够存到内存,也能够是磁盘
3. spark-core 实战
概况见spark-core 实战 经过spark完结ip地址查询
4. spark集群架构

-
Application
用户自己写的Spark运用程序,批处理作业的调集。Application的main办法为运用程序的入口,用户经过Spark的API,界说了RDD和对RDD的操作。
-
Client:客户端进程,担任提交作业到Master。
-
Master(类比与ResourceManager)
- Standalone办法中主控节点,担任接纳Client提交的作业,办理Worker,并指令Worker发动Driver和Executor。
-
Worker(类比于NodeManager)
- Standalone办法中slave节点上的看护进程,担任办理本节点的资源,定时向Master报告心跳,接纳Master的指令,发动Driver和Executor。
-
Driver(类比于ApplicationMaster)
- 一个Spark作业运转时包含一个Driver进程,也是作业的主进程,担任作业的解析、生成Stage并调度Task到Executor上。包含DAGScheduler,TaskScheduler。
- DAGScheduler: 完结将Spark作业分解成一到多个Stage,每个Stage依据RDD的Partition个数决议Task的个数,然后生成相应的Task set放到TaskScheduler中。
- TaskScheduler:完结Task分配到Executor上履行。
- Stage:一个Spark作业一般包含一到多个Stage。
- Task:一个Stage包含一到多个Task,经过多个Task完结并行运转的功用。
-
Executor(类比于Container):即真实履行作业的当地,一个集群一般包含多个Executor,每个Executor接纳Driver的指令Launch Task,一个Executor能够履行一到多个Task。
Spark考点总结
参阅:大数据面试杀招——Spark高频考点,必知必会!_Alice菌的博客-CSDN博客
**一、你是怎么了解Spark,它的特色是什么?****
Spark是一个依据内存的,用于大规划数据处理(离线核算、实时核算、快速查询(交互式查询))的一致剖析引擎。
它内部的组成模块,包含SparkCore,SparkSQL,SparkStreaming,SparkMLlib,SparkGraghx等…

- 快
Spark核算速度是MapReduce核算速度的10-100倍
- 易用
MR支撑1种核算模型,Spsark支撑更多的核算模型(算法多)
- 通用
Spark 能够进行离线核算、交互式查询(快速查询)、实时核算、机器学习、图核算
- 兼容性
Spark支撑大数据中的Yarn调度,支撑mesos。能够处理hadoop核算的数据。
**二、Spark有几种布置办法,请别离简要论说****
1) Local:运转在一台机器上,一般是练手或许测验环境。
2)Standalone:构建一个依据Mster+Slaves的资源调度集群,Spark使命提交给Master运转。是Spark自身的一个调度体系。
3)Yarn: Spark客户端直接衔接Yarn,不需求额定构建Spark集群。有yarn-client和yarn-cluster两种办法,首要差异在于:Driver程序的运转节点。
4)Mesos:国内大环境比较少用。
三**、Spark提交作业的参数**
由于咱们Spark使命是选用的Shell脚本进行提交,所以必定会触及到几个重要的参数,而这个也是在面试的时分简略被调查到的“细节”。
<span><span><code class="language-shell">executor-cores —— 每个executor运用的内核数,默许为1,官方主张2-5个,咱们企业是4个
num-executors —— 发动executors的数量,默许为2
executor-memory —— executor内存巨细,默许1G
driver-cores —— driver运用内核数,默许为1
driver-memory —— driver内存巨细,默许512M
</code></span></span>
**四、简述Spark的作业提沟通程****
Spark的使命提交办法实践上有两种,别离是YarnClient办法和YarnCluster办法。咱们在答复这个问题的时分,也需求分类去介绍。千万不要被冗长的进程吓到,必定要学会总结差异,发现规律,经过图形去增强记忆。
- YarnClient 运转办法介绍

ResourceManager接到ApplicationMaster的资源恳求后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上发动Executor进程,Executor进程发动后会向Driver反向注册,Executor悉数注册完结后Driver开端履行main函数,之后履行到Action算子时,触发一个job,并依据宽依靠开端区分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上履行。
- YarnCluster 办法介绍

Driver发动后向ResourceManager恳求Executor内存,ResourceManager接到ApplicationMaster的资源恳求后会分配container,然后在适宜的NodeManager上发动Executor进程,Executor进程发动后会向Driver反向注册,Executor悉数注册完结后Driver开端履行main函数,之后履行到Action算子时,触发一个job,并依据宽依靠开端区分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上履行。
**五、你是怎么了解Spark中血缘(RDD)的概念?它的效果是什么?****
- 概念
RDD是弹性散布式数据集,是Spark中最根本的数据笼统,代表一个不行变、可分区、里边的元素可并行核算 的调集。
- 效果
供给了一个笼统的数据模型,将详细的运用逻辑表达为一系列转化操作(函数)。别的不同RDD之间的转化操作之间还能够构成依靠联络,然后完结管道化,然后防止了中心成果的存储,大大降低了数据仿制、磁盘IO和序列化开支,而且还供给了更多的API(map/reduec/filter/groupBy…)
假定还想锦上添花,能够添上这一句:
RDD在Lineage依靠方面分为两种Narrow Dependencies与Wide Dependencies,用来处理数据容错时的高效性以及区分使命时分起到重要效果
**六、简述Spark的宽窄依靠,以及Spark怎么区分stage,每个stage又依据什么决议task个数?****
Spark的宽窄依靠问题是SparkCore部分的要点调查内容,大都呈现在书面考试中,咱们需求留意。
窄依靠:父RDD的一个分区只会被子RDD的一个分区依靠
宽依靠:父RDD的一个分区会被子RDD的多个分区依靠(触及到shuffle)
那Stage是怎么区分的呢?
依据RDD之间的依靠联络的不同将Job区分红不同的Stage,遇到一个宽依靠则区分一个Stage。
每个stage又依据什么决议task个数?
Stage是一个TaskSet,将Stage依据分区数区分红一个个的Task。
这儿为了便利咱们了解,贴上一张进程图

**七、列举Spark常用的transformation和action算子,有哪些算子会导致Shuffle?****
咱们在Spark开发进程中,避不开与各种算子打交道,其间Spark 算子分为transformation 和 action 算子,下面列出一些常用的算子,详细的功用还需求小伙伴们自行去了解。
- transformation
- map
- mapRartition
- flatMap
- filter
- …
- action
- reduce
- collect
- first
- take
假定面试官问你,那小伙叽,有哪些会引起Shuffle进程的Spark算子呢?
你只管自傲的答复:
- reduceByKey
- groupByKey
- …ByKey
**八、reduceByKey与groupByKey的差异,哪一种更具优势?****
既然你上面都说到 reduceByKey 和groupByKey ,那哪一种更具优势,你能简略剖析一下吗?
能问这样的问题,现已暗示面试官的水平不低了,那么咱们该怎么答复呢:
reduceByKey:依照key进行聚合,在shuffle之前有combine(预聚合)操作,回来成果是RDD[k,v]。
groupByKey:依照key进行分组,直接进行shuffle
所以,在实践开发进程中,reduceByKey比groupByKey,更主张运用。可是需求留意是否会影响事务逻辑。
**九、Repartition和Coalesce 的联络与差异,能简略说说吗?****
这道题就现已开端参和有“源码”的滋味了,为什么呢?
1)联络:
两者都是用来改动RDD的partition数量的,repartition底层调用的便是coalesce办法:coalesce(numPartitions, shuffle = true)
2)差异:
repartition必定会产生shuffle,coalesce 依据传入的参数来判别是否产生shuffle。
一般状况下增大rdd的partition数量运用repartition,削减partition数量时运用coalesce。
**十、简述下Spark中的缓存(cache和persist)与checkpoint机制,并指出两者的差异和联络****
关于Spark缓存和查看点的差异,大致能够从这3个视点去答复:
- 方位
Persist 和 Cache将数据保存在内存,Checkpoint将数据保存在HDFS
- 生命周期
Persist 和 Cache 程序完毕后会被清除或手动调用unpersist办法,Checkpoint永久存储不会被删去。
- RDD依靠联络
Persist 和 Cache,不会丢掉RDD间的依靠链/依靠联络,CheckPoint会斩断依靠链。
**十一、简述Spark中同享变量(播送变量和累加器)的根本原理与用处****
关于Spark中的播送变量和累加器的根本原理和用处,答案较为固定,咱们无需故意去记忆。
累加器(accumulator)是Spark中供给的一种散布式的变量机制,其原理相似于mapreduce,即散布式的改动,然后聚合这些改动。累加器的一个常见用处是在调试时对作业履行进程中的事情进行计数。
播送变量是在每个机器上缓存一份,不行变,只读的,相同的变量,该节点每个使命都能拜访,起到节约资源和优化的效果。它一般用来高效分发较大的目标。
**十二、当Spark触及到数据库的操作时,怎么削减Spark运转中的数据库衔接数?****
嗯,有点“调优”的滋味,感觉真实的“风暴”行将到来,这道题仍是很好答复的,咱们只需求削减衔接数据库的次数即可。
运用foreachPartition代替foreach,在foreachPartition内获取数据库的衔接。
**十三、能介绍下你所知道和运用过的Spark调优吗?****
资源参数调优
- num-executors:设置Spark作业总共要用多少个Executor进程来履行
- executor-memory:设置每个Executor进程的内存
- executor-cores:设置每个Executor进程的CPU core数量
- driver-memory:设置Driver进程的内存
- spark.default.parallelism:设置每个stage的默许task数量
- …
开发调优
- 防止创立重复的RDD
- 尽或许复用同一个RDD
- 对屡次运用的RDD进行耐久化
- 尽量防止运用shuffle类算子
- 运用map-side预聚合的shuffle操作
- 运用高功用的算子
①运用reduceByKey/aggregateByKey代替groupByKey ②运用mapPartitions代替一般map ③运用foreachPartitions代替foreach ④运用filter之后进行coalesce操作 ⑤运用repartitionAndSortWithinPartitions代替repartition与sort类操作
- 播送大变量
在算子函数中运用到外部变量时,默许状况下,Spark会将该变量仿制多个副本,经过网络传输到task中,此刻每个task都有一个变量副本。假定变量自身比较大的话(比方100M,乃至1G),那么很多的变量副本在网络中传输的功用开支,以及在各个节点的Executor中占用过多内存导致的频频GC(垃圾收回),都会极大地影响功用。
- 运用Kryo优化序列化功用
- 优化数据结构
在或许以及适宜的状况下,运用占用内存较少的数据结构,可是条件是要确保代码的可维护性。
十四、怎么运用Spark完结TopN的获取(描绘思路或运用伪代码)?
能让你运用伪代码来描绘这现已十分“苛刻”了,可是不慌,这儿供给3种思路供咱们参阅:
- 办法1:
(1)依照key对数据进行聚合(groupByKey)
(2)将value转化为数组,运用scala的sortBy或许sortWith进行排序(mapValues)
**留意:**当数据量太大时,会导致OOM
- 办法2:
(1)取出一切的key
(2)对key进行迭代,每次取出一个key运用spark的排序算子进行排序
- 办法3:
(1)自界说分区器,依照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
Spark面试干货
Spark面试干货总结!(8千字长文、27个常识点、21张图)
Spark实战
参阅:Spark实践_GoAl的博客-CSDN博客_外国spark实践
十二、Flink
1、简略介绍一下 Flink
Flink 是一个结构和散布式处理引擎,用于对无界和有界数据流进行有状况核算。而且 Flink 供给了数据散布、容错机制以及资源办理等中心功用。Flink供给了诸多高笼统层的API以便用户编写散布式使命:
DataSet API, 对静态数据进行批处理操作,将静态数据笼统成散布式的数据集,用户能够便利地运用Flink供给的各种操作符对散布式数据集进行处理,支撑Java、Scala和Python。
DataStream API,对数据流进行流处理操作,将流式的数据笼统成散布式的数据流,用户能够便利地对散布式数据流进行各种操作,支撑Java和Scala。
Table API,对结构化数据进行查询操作,将结构化数据笼统成联络表,并经过类SQL的DSL对联络表进行各种查询操作,支撑Java和Scala。
此外,Flink 还针对特定的运用范畴供给了范畴库,例如: Flink ML,Flink 的机器学习库,供给了机器学习Pipelines API并完结了多种机器学习算法, Gelly,Flink 的图核算库,供给了图核算的相关API及多种图核算算法完结。
2、Flink比较传统的Spark Streaming差异?
这个问题是一个十分宏观的问题,由于两个结构的不同点十分之多。可是在面试时有十分重要的一点必定要答复出来:Flink 是规范的实时处理引擎,依据事情驱动。而 Spark Streaming 是微批(Micro-Batch)的模型 。
下面咱们就分几个方面介绍两个结构的首要差异:
- 架构模型Spark Streaming 在运转时的首要人物包含:Master、Worker、Driver、Executor,Flink 在运转时首要包含:Jobmanager、Taskmanager和Slot。
- 使命调度Spark Streaming 连续不断的生成细小的数据批次,构建有向无环图DAG,Spark Streaming 会顺次创立 DStreamGraph、JobGenerator、JobScheduler。Flink 依据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph,然后提交给 JobManager进行处理,JobManager 会依据 JobGraph 生成 ExecutionGraph,ExecutionGraph 是 Flink 调度最中心的数据结构,JobManager 依据 ExecutionGraph 对 Job 进行调度。
- 时刻机制Spark Streaming 支撑的时刻机制有限,只支撑处理时刻。 Flink 支撑了流处理程序在时刻上的三个界说:处理时刻、事情时刻、注入时刻。一同也支撑 watermark 机制来处理滞后数据。
- 容错机制关于 Spark Streaming 使命,咱们能够设置 checkpoint,然后假定产生毛病并重启,咱们能够从前次 checkpoint 之处康复,可是这个行为只能使得数据不丢掉,或许会重复处理,不能做到刚好一次处理语义。Flink 则运用两阶段提交协议来处理这个问题。
3、Flink的组件栈有哪些?
依据 Flink 官网描绘,Flink 是一个分层架构的体系,每一层所包含的组件都供给了特定的笼统,用来服务于上层组件。

Runtime 层:Runtime层供给了支撑 Flink 核算的中心完结,比方:支撑散布式 Stream 处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层供给根底服务 。
API层: API 层首要完结了面向流(Stream)处理和批(Batch)处理API,其间面向流处理对应DataStream API,面向批处理对应DataSet API,后续版别,Flink有方案将DataStream和DataSet API进行一致 。
Libraries层: 该层称为Flink运用结构层,依据API层的区分,在API层之上构建的满足特定运用的完结核算结构,也别离对应于面向流处理和面向批处理两类。面向流处理支撑:CEP(复杂事情处理)、依据SQL-like的操作(依据Table的联络操作);面向批处理支撑:FlinkML(机器学习库)、Gelly(图处理)。
4、Flink 的运转有必要依靠 Hadoop组件吗?
Flink能够彻底独立于Hadoop,在不依靠Hadoop组件下运转。可是做为大数据的根底设施,Hadoop体系是任何大数据结构都绕不过去的。Flink能够集成很多Hadooop 组件,例如Yarn、Hbase、HDFS等等。例如,Flink能够和Yarn集成做资源调度,也能够读写HDFS,或许运用HDFS做查看点。
5、你们的Flink集群规划多大?
咱们留意,这个问题看起来是问你实践运用中的Flink集群规划,其实还隐藏着另一个问题:Flink能够支撑多少节点的集群规划?在答复这个问题时分,能够将自己出产环节中的集群规划、节点、内存状况阐明,一同阐明布置办法(一般是Flink on Yarn),除此之外,用户也能够一同在小集群(少于5个节点)和具有 TB 等级状况的上千个节点上运转 Flink 使命。
6、Flink的根底编程模型了解吗?

7、Flink集群有哪些人物?各自有什么效果?

其间JobManager扮演着集群中的办理者Master的人物,它是整个集群的和谐者,担任接纳Flink Job,和谐查看点,Failover 毛病康复等,一同办理Flink集群中从节点TaskManager。
TaskManager是实践担任履行核算的Worker,在其上履行Flink Job的一组Task,每个TaskManager担任办理其地点节点上的资源信息,如内存、磁盘、网络,在发动的时分将资源的状况向JobManager报告。
Client是Flink程序提交的客户端,当用户提交一个Flink程序时,会首要创立一个Client,该Client首要会对用户提交的Flink程序进行预处理,并提交到Flink集群中处理,所以Client需求从用户提交的Flink程序装备中获取JobManager的地址,并树立到JobManager的衔接,将Flink Job提交给JobManager。
8、说说 Flink 资源办理中 Task Slot 的概念

9、说说 Flink 的常用算子?
Flink 最常用的常用算子包含:Map:DataStream → DataStream
,输入一个参数产生一个参数,map的功用是对输入的参数进行转化操作。Filter
:过滤掉指定条件的数据。KeyBy
:依照指定的key进行分组。Reduce
:用来进行成果汇总兼并。Window
:窗口函数,依据某些特性将每个key的数据进行分组(例如:在5s内到达的数据)
10、说说你知道的Flink分区战略?
要搞懂什么是分区战略,需求清楚分区战略是用来决议数据怎么发送至下流。现在 Flink 支撑了8种分区战略的完结。

11、Flink的并行度了解吗?Flink的并行度设置是怎样的?
Flink中的使命被分为多个并行使命来履行,其间每个并行的实例处理一部分数据。这些并行实例的数量被称为并行度。咱们在实践出产环境中能够从四个不同层面设置并行度:
操作算子层面(Operator Level)
履行环境层面(Execution Environment Level)
客户端层面(Client Level)
体系层面(System Level)
需求留意的优先级:算子层面>环境层面>客户端层面>体系层面。
12、 Flink的Slot和parallelism有什么差异?
官网上十分经典的图:


13、Flink有没有重启战略?说说有哪几种?
Flink 完结了多种重启战略。
- 固定推迟重启战略(Fixed Delay Restart Strategy)
- 毛病率重启战略(Failure Rate Restart Strategy)
- 没有重启战略(No Restart Strategy)
- Fallback重启战略(Fallback Restart Strategy)
14、用过Flink中的散布式缓存吗?怎么运用?
Flink完结的散布式缓存和Hadoop有异曲同工之妙。目的是在本地读取文件,并把他放在 taskmanager 节点中,防止task重复拉取。
<span><span><code class="language-java">val env <span>=</span> ExecutionEnvironment<span>.</span>getExecutionEnvironment
<span>// register a file from HDFS</span>
env<span>.</span><span>registerCachedFile</span><span>(</span><span>"hdfs:///path/to/your/file"</span><span>,</span> <span>"hdfsFile"</span><span>)</span>
<span>// register a local executable file (script, executable, ...)</span>
env<span>.</span><span>registerCachedFile</span><span>(</span><span>"file:///path/to/exec/file"</span><span>,</span> <span>"localExecFile"</span><span>,</span> <span>true</span><span>)</span>
<span>// define your program and execute</span>
<span>.</span><span>.</span><span>.</span>
val input<span>:</span> DataSet<span>[</span>String<span>]</span> <span>=</span> <span>.</span><span>.</span><span>.</span>
val result<span>:</span> DataSet<span>[</span>Integer<span>]</span> <span>=</span> input<span>.</span><span>map</span><span>(</span><span>new</span> MyMapper<span>(</span><span>)</span><span>)</span>
<span>.</span><span>.</span><span>.</span>
env<span>.</span><span>execute</span><span>(</span><span>)</span>
</code></span></span>
15、说说Flink中的播送变量,运用时需求留意什么?
咱们知道Flink是并行的,核算进程或许不在一个 Slot 中进行,那么有一种状况即:当咱们需求拜访同一份数据。那么Flink中的播送变量便是为了处理这种状况。咱们能够把播送变量了解为是一个公共的同享变量,咱们能够把一个dataset 数据集播送出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
16、说说Flink中的窗口?
说说Flink中的窗口?

- time-tumbling-window 无堆叠数据的时刻窗口,设置办法举例:timeWindow(Time.seconds(5))
- time-sliding-window 有堆叠数据的时刻窗口,设置办法举例:timeWindow(Time.seconds(5), Time.seconds(3))
- count-tumbling-window无堆叠数据的数量窗口,设置办法举例:countWindow(5)
- count-sliding-window 有堆叠数据的数量窗口,设置办法举例:countWindow(5,3)
17、说说Flink中的状况存储?
Flink在做核算的进程中经常需求存储中心状况,来防止数据丢掉和状况康复。挑选的状况存储战略不同,会影响状况耐久化怎么和 checkpoint 交互。
Flink供给了三种状况存储办法:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。
18、Flink中的时刻有哪几类
Flink 中的时刻和其他流式核算体系的时刻相同分为三类:事情时刻,摄入时刻,处理时刻三种。假定以 EventTime
为基准来界说时刻窗口将构成EventTimeWindow,要求音讯自身就应该带着EventTime。假定以IngesingtTime
为基准来界说时刻窗口将构成 IngestingTimeWindow,以 source 的systemTime为准。假定以 ProcessingTime
基准来界说时刻窗口将构成 ProcessingTimeWindow,以 operator 的systemTime 为准。
19、Flink中的水印是什么概念,起到什么效果?
Watermark 是 Apache Flink 为了处理 EventTime 窗口核算提出的一种机制, 实质上是一种时刻戳。 一般来讲Watermark经常和Window一同被用来处理乱序事情。
20、Flink Table & SQL 了解吗?TableEnvironment这个类有什么效果
TableEnvironment是Table API和SQL集成的中心概念。这个类首要用来:
- 在内部 catalog 中注册表
- 注册外部 catalog
- 履行SQL查询
- 注册用户界说(标量,表或聚合)函数
- 将DataStream或DataSet转化为表
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
21、Flink SQL的完结原理是什么?是怎么完结 SQL 解析的呢?
首要咱们要知道 Flink 的SQL解析是依据Apache Calcite这个开源结构。

- 用户运用对外供给Stream SQL的语法开发事务运用
- 用calcite对StreamSQL进行语法查验,语法查验经往后,转化成calcite的逻辑树节点;终究构成calcite的逻辑方案
- 选用Flink自界说的优化规则和calcite火山模型、启示式模型一同对逻辑树进行优化,生成最优的Flink物理方案
- 对物理方案选用janino codegen生成代码,生成用低阶API DataStream 描绘的流运用,提交到Flink渠道履行
Flink中级
22、Flink是怎么支撑批流一体的?

23、Flink是怎么做到高效的数据交流的?
在一个Flink Job中,数据需求在不同的task中进行交流,整个数据交流是由 TaskManager 担任的,TaskManager 的网络组件首要从缓冲buffer中搜集records,然后再发送。Records 并不是一个一个被发送的,二是积累一个批次再发送,batch 技术能够愈加高效的运用网络资源。
24、Flink是怎么做容错的?
Flink完结容错首要靠强壮的CheckPoint机制和State机制。Checkpoint 担任定时制造散布式快照、对程序中的状况进行备份;State 用来存储核算进程中的中心状况。
24、Flink 散布式快照的原理是什么?
Flink的散布式快照是依据Chandy-Lamport算法量身定做的。简略来说便是继续创立散布式数据流及其状况的一致快照。

25、Flink是怎么确保Exactly-once语义的?
Flink经过完结两阶段提交和状况保存来完结端到端的一致性语义。 分为以下几个进程:
- 开端事务(beginTransaction)创立一个暂时文件夹,来写把数据写入到这个文件夹里边
- 预提交(preCommit)将内存中缓存的数据写入文件并封闭
- 预提交(preCommit)将内存中缓存的数据写入文件并封闭
- 预提交(preCommit)将内存中缓存的数据写入文件并封闭
- 若失利产生在预提交成功后,正式提交前。能够依据状况来提交预提交的数据,也可删去预提交的数据
26、Flink 的 kafka 衔接器有什么特别的当地?
Flink源码中有一个独立的connector模块,一切的其他connector都依靠于此模块,Flink 在1.9版别发布的全新kafka衔接器,摒弃了之前衔接不同版别的kafka集群需求依靠不同版别的connector这种做法,只需求依靠一个connector即可。
27、说说 Flink的内存办理是怎么做的?
Flink 并不是将很多目标存在堆上,而是将目标都序列化到一个预分配的内存块上。此外,Flink很多的运用了堆外内存。假定需求处理的数据超出了内存约束,则会将部分数据存储到硬盘上。Flink 为了直接操作二进制数据完结了自己的序列化结构。理论上Flink的内存办理分为三部分:
Network Buffers:这个是在TaskManager发动的时分分配的,这是一组用于缓存网络数据的内存,每个块是32K,默许分配2048个,能够经过“taskmanager.network.numberOfBuffers”修正。
Memory Manage pool:很多的Memory Segment块,用于运转时的算法(Sort/Join/Shuffle等),这部分发动的时分就会分配。下面这段代码,依据装备文件中的各种参数来核算内存的分配办法。(heap or off-heap,这个放到下节谈),内存的分配支撑预分配和lazy load,默许懒加载的办法。
User Code,这部分是除了Memory Manager之外的内存用于User code和TaskManager自身的数据结构。
28、说说 Flink的序列化怎么做的?
Java自身自带的序列化和反序列化的功用,可是辅佐信息占用空间比较大,在序列化目标时记载了过多的类信息。Apache Flink摒弃了Java原生的序列化办法,以独特的办法处理数据类型和序列化,包含自己的类型描绘符,泛型类型提取和类型序列化结构。TypeInformation 是一切类型描绘符的基类。它提醒了该类型的一些根本特色,而且能够生成序列化器。TypeInformation 支撑以下几种类型:
BasicTypeInfo: 恣意Java 根本类型或 String 类型
BasicArrayTypeInfo: 恣意Java根本类型数组或 String 数组
WritableTypeInfo: 恣意 Hadoop Writable 接口的完结类
WritableTypeInfo: 恣意 Hadoop Writable 接口的完结类
CaseClassTypeInfo: 恣意的 Scala CaseClass(包含 Scala tuples)
CaseClassTypeInfo: 恣意的 Scala CaseClass(包含 Scala tuples)
GenericTypeInfo: 恣意无法匹配之前几种类型的类
针对前六种类型数据集,Flink皆能够自动生成对应的TypeSerializer,能十分高效地对数据集进行序列化和反序列化。
29、Flink中的Window呈现了数据歪斜,你有什么处理办法?
window产生数据歪斜指的是数据在不同的窗口内堆积的数据量相差过多。实质上产生这种状况的原因是数据源头发送的数据量速度不同导致的。呈现这种状况一般经过两种办法来处理:
- 在数据进入窗口前做预聚合
- 从头规划窗口聚合的key
30、Flink中在运用聚合函数 GroupBy、Distinct、KeyBy 等函数时呈现数据热门该怎么处理?
数据歪斜和数据热门是一切大数据结构绕不过去的问题。处理这类问题首要从3个方面下手:
- 在事务上躲避这类问题
例如一个假定订单场景,北京和上海两个城市订单量增长几十倍,其他城市的数据量不变。这时分咱们在进行聚合的时分,北京和上海就会呈现数据堆积,咱们能够独自数据北京和上海的数据。
- Key的规划上
把热key进行拆分,比方上个例子中的北京和上海,能够把北京和上海依照地区进行拆分聚合。
- 参数设置
Flink 1.9.0 SQL(Blink Planner) 功用优化中一项重要的改善便是升级了微批模型,即 MiniBatch。原理是缓存必定的数据后再触发处理,以削减对State的拜访,然后进步吞吐和削减数据的输出量。
31、Flink使命推迟高,想处理这个问题,你会怎么下手?
在Flink的后台使命办理中,咱们能够看到Flink的哪个算子和task呈现了反压。最首要的手法是资源调优和算子调优。资源调优便是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包含:并行度的设置,State的设置,checkpoint的设置。
32、Flink是怎么处理反压的?
Flink 内部是依据 producer-consumer 模型来进行音讯传递的,Flink的反压规划也是依据这个模型。Flink 运用了高效有界的散布式堵塞行列,就像 Java 通用的堵塞行列(BlockingQueue)相同。下流顾客消费变慢,上游就会受到堵塞。
33、Flink的反压和Strom有哪些不同?
Storm 是经过监控 Bolt 中的接纳行列负载状况,假定超越高水位值就会将反压信息写到 Zookeeper ,Zookeeper 上的 watch 会告诉该拓扑的一切 Worker 都进入反压状况,终究 Spout 中止发送 tuple。Flink中的反压运用了高效有界的散布式堵塞行列,下流消费变慢会导致发送端堵塞。二者最大的差异是Flink是逐级反压,而Storm是直接从源头降速。
34、 Operator Chains(算子链)这个概念你了解吗?
为了更高效地散布式履行,Flink会尽或许地将operator的subtask链接(chain)在一同构成task。每个task在一个线程中履行。将operators链接成task是十分有效的优化:它能削减线程之间的切换,削减音讯的序列化/反序列化,削减数据在缓冲区的交流,削减了推迟的一同进步全体的吞吐量。这便是咱们所说的算子链。
35、说说Flink1.9的新特性?
- 支撑hive读写,支撑UDF
- Flink SQL TopN和GroupBy等优化
- Checkpoint跟savepoint针对实践事务场景做了优化
- Flink state查询
36、消费kafka数据的时分,怎么处理脏数据?
能够在处理前加一个fliter算子,将不符合规则的数据过滤出去。
Flink高级
37、Flink Job的提沟通程
用户提交的Flink Job会被转化成一个DAG使命运转,别离是:StreamGraph、JobGraph、ExecutionGraph,Flink中JobManager与TaskManager,JobManager与Client的交互是依据Akka东西包的,是经过音讯驱动。整个Flink Job的提交还包含着ActorSystem的创立,JobManager的发动,TaskManager的发动和注册。
38、Flink所谓”三层图”结构是哪几个”图”?
一个Flink使命的DAG生成核算图大致阅历以下三个进程:
StreamGraph 最接近代码所表达的逻辑层面的核算拓扑结构,依照用户代码的履行次序向StreamExecutionEnvironment增加StreamTransformation构成流式图。
JobGraph 从StreamGraph生成,将能够串联兼并的节点进行兼并,设置节点之间的边,安排资源同享slot槽位和放置相关联的节点,上传使命所需的文件,设置查看点装备等。相当于经过部分初始化和优化处理的使命图。
ExecutionGraph 由JobGraph转化而来,包含了使命详细履行所需的内容,是最贴近底层完结的履行图。
39、JobManger在集群中扮演了什么人物?
JobManager 担任整个 Flink 集群使命的调度以及资源的办理,从客户端中获取提交的运用,然后依据集群中 TaskManager 上 TaskSlot 的运用状况,为提交的运用分配相应的 TaskSlot 资源并指令 TaskManager 发动从客户端中获取的运用。JobManager 相当于整个集群的 Master 节点,且整个集群有且只要一个活跃的 JobManager ,担任整个集群的使命办理和资源办理。JobManager 和 TaskManager 之间经过 Actor System 进行通讯,获取使命履行的状况并经过 Actor System 将运用的使命履行状况发送给客户端。一同在使命履行的进程中,Flink JobManager 会触发 Checkpoint 操作,每个 TaskManager 节点 收到 Checkpoint 触发指令后,完结 Checkpoint 操作,一切的 Checkpoint 和谐进程都是在 Fink JobManager 中完结。当使命完结后,Flink 会将使命履行的信息反馈给客户端,而且释放掉 TaskManager 中的资源以供下一次提交使命运用。
40、JobManger在集群发动进程中起到什么效果?
JobManager的责任首要是接纳Flink作业,调度Task,搜集作业状况和办理TaskManager。它包含一个Actor,而且做如下操作:
RegisterTaskManager: 它由想要注册到JobManager的TaskManager发送。注册成功会经过AcknowledgeRegistration音讯进行Ack。
SubmitJob: 由提交作业到体系的Client发送。提交的信息是JobGraph办法的作业描绘信息。
CancelJob: 恳求撤销指定id的作业。成功会回来CancellationSuccess,不然回来CancellationFailure。
UpdateTaskExecutionState: 由TaskManager发送,用来更新履行节点(ExecutionVertex)的状况。成功则回来true,不然回来false。
RequestNextInputSplit: TaskManager上的Task恳求下一个输入split,成功则回来NextInputSplit,不然回来null。
JobStatusChanged: 它意味着作业的状况(RUNNING, CANCELING, FINISHED,等)产生改动。这个音讯由ExecutionGraph发送。
41、TaskManager在集群中扮演了什么人物?
TaskManager 相当于整个集群的 Slave 节点,担任详细的使命履行和对应使命在每个节点上的资源恳求和办理。客户端经过将编写好的 Flink 运用编译打包,提交到 JobManager,然后 JobManager 会依据已注册在 JobManager 中 TaskManager 的资源状况,将使命分配给有资源的 TaskManager节点,然后发动并运转使命。TaskManager 从 JobManager 接纳需求布置的使命,然后运用 Slot 资源发动 Task,树立数据接入的网络衔接,接纳数据并开端数据处理。一同 TaskManager 之间的数据交互都是经过数据流的办法进行的。能够看出,Flink 的使命运转其实是选用多线程的办法,这和 MapReduce 多 JVM 进行的办法有很大的差异,Flink 能够极大进步 CPU 运用功率,在多个使命和 Task 之间经过 TaskSlot 办法同享体系资源,每个 TaskManager 中经过办理多个 TaskSlot 资源池进行对资源进行有效办理。
42、TaskManager在集群发动进程中起到什么效果?
TaskManager的发动流程较为简略:
发动类:org.apache.flink.runtime.taskmanager.TaskManager
中心发动办法 : selectNetworkInterfaceAndRunTaskManager
发动后直接向JobManager
注册自己,注册完结后,进行部分模块的初始化。
43、Flink 核算资源的调度是怎么完结的?
TaskManager中最细粒度的资源是Task slot,代表了一个固定巨细的资源子集,每个TaskManager会将其所占有的资源平分给它的slot。
经过调整 task slot 的数量,用户能够界说task之间是怎么彼此阻隔的。每个 TaskManager 有一个slot,也就意味着每个task运转在独立的 JVM 中。每个 TaskManager 有多个slot的话,也便是说多个task运转在同一个JVM中。
经过调整 task slot 的数量,用户能够界说task之间是怎么彼此阻隔的。每个 TaskManager 有一个slot,也就意味着每个task运转在独立的 JVM 中。每个 TaskManager 有多个slot的话,也便是说多个task运转在同一个JVM中。

44、简述Flink的数据笼统及数据交流进程?
Flink 为了防止JVM的固有缺点例如java目标存储密度低,FGC影响吞吐和响应等,完结了自主办理内存。MemorySegment便是Flink的内存笼统。默许状况下,一个MemorySegment能够被看做是一个32kb大的内存块的笼统。这块内存既能够是JVM里的一个byte[],也能够是堆外内存(DirectByteBuffer)。在MemorySegment这个笼统之上,Flink在数据从operator内的数据目标在向TaskManager上搬运,准备被发给下个节点的进程中,运用的笼统或许说内存目标是Buffer。对接从Java目标转为Buffer的中心目标是另一个笼统StreamRecord。
45、Flink 中的散布式快照机制是怎么完结的?
Flink的容错机制的中心部分是制造散布式数据流和操作算子状况的一致性快照。 这些快照充任一致性checkpoint,体系能够在产生毛病时回滚。 Flink用于制造这些快照的机制在“散布式数据流的轻量级异步快照”中进行了描绘。 它受到散布式快照的规范Chandy-Lamport算法的启示,专门针对Flink的履行模型而定制。

46、简略说说FlinkSQL是怎么完结的?
Flink 将 SQL 校验、SQL 解析以及 SQL 优化交给了Apache Calcite。Calcite 在其他很多开源项目里也都运用到了,比如 Apache Hive, Apache Drill, Apache Kylin, Cascading。Calcite 在新的架构中处于中心的位置,如下图所示。

本文正在参与 人工智能创作者扶持方案