1. Kafka介绍

1.1.1. Kafka是什么

    kafka运用scala开发,支撑多语言客户端(c++、java、python、go等)
    Kafka最先由LinkedIn公司开发,之后成为Apache的尖端项目。
    Kafka是一个分布式的、分区化、可复制提交的日志服务
    LinkedIn运用Kafka完结了公司不同运用程序之间的松耦和,那么作为一个可扩展、高可靠的音讯体系
    支撑高Throughput的运用
    scale out:无需停机即可扩展机器
    耐久化:经过将数据耐久化到硬盘以及replication防止数据丢掉
    支撑online和offline的场景

1.1.2. Kafka的特点

Kafka是分布式的,其一切的构件borker(服务端集群)、producer(音讯出产)、consumer(音讯顾客)都能够是分布式的。

在音讯的出产时能够运用一个标识topic来区别,且能够进行分区;每一个分区都是一个次序的、不可变的音讯行列, 而且能够持续的添加。

同时为发布和订阅供给高吞吐量。据了解,Kafka每秒能够出产约25万音讯(50 MB),每秒处理55万音讯(110 MB)。

音讯被处理的状况是在consumer端保护,而不是由server端保护。当失利时能自动平衡

1.1.3. 常用的场景

监控:主机经过Kafka发送与体系和运用程序健康相关的指标,然后这些信息会被搜集和处理然后创立监控仪表盘并发送警告。

音讯行列: 运用程度运用Kafka作为传统的音讯体系完结标准的行列和音讯的发布—订阅,例如查找和内容提要(Content Feed)。比起大多数的音讯体系来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模音讯处理运用的解决方案。音讯体系 一般吞吐量相对较低,可是需求更小的端到端延时,并尝尝依赖于Kafka供给的强大的耐久性确保。在这个范畴,Kafka足以比美传统音讯体系,如ActiveMR或RabbitMQ

站点的用户活动追踪: 为了更好地理解用户行为,改进用户体验,将用户查看了哪个页面、点击了哪些内容等信息发送到每个数据中心的Kafka集群上,并经过Hadoop进行剖析、生成日常报告。

流处理:保存搜集流数据,以供给之后对接的Storm或其他流式计算框架进行处理。许多用户会将那些从原始topic来的数据进行 阶段性处理,汇总,扩大或许以其他的方式转换到新的topic下再持续后边的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内 容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需求对这个内容进行清理,比如回复正常数据或许删除重复数据,最终再将内容匹配的结果返 还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。

日志聚合:运用Kafka替代日志聚合(log aggregation)。日志聚合一般来说是从服务器上搜集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉 文件的细节,将其更明晰地抽象成一个个日志或事情的音讯流。这就让Kafka处理进程推迟更低,更简单支撑多数据源和分布式数据处理。比起以日志为中心的 体系比如Scribe或许Flume来说,Kafka供给同样高效的功能和因为复制导致的更高的耐用性确保,以及更低的端到端推迟

耐久性日志:Kafka能够为一种外部的耐久性日志的分布式体系供给服务。这种日志能够在节点间备份数据,并为毛病节点数据回复供给一种重新同步的机制。Kafka中日志压缩功能为这种用法供给了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。

1.1.4. Kafka中包含以下基础概念

    1.Topic(论题):Kafka中用于区别不同类别信息的类别称号。由producer指定
    2.Producer(出产者):将音讯发布到Kafka特定的Topic的目标(进程)
    3.Consumers(顾客):订阅并处理特定的Topic中的音讯的目标(进程)
    4.Broker(Kafka服务集群):已发布的音讯保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个署理(Broker). 顾客能够订阅一个或多个论题,并从Broker拉数据,然后消费这些已发布的音讯。
    5.Partition(分区):Topic物理上的分组,一个topic能够分为多个partition,每个partition是一个有序的行列。partition中的每条音讯都会被分配一个有序的id(offset)
    Message:音讯,是通信的基本单位,每个producer能够向一个topic(主题)发布一些音讯。

1.1.5. 音讯

音讯由一个固定巨细的报头和可变长度但不透明的字节阵列负载。报头包含格式版别和CRC32效验和以检测损坏或切断

1.1.6. 音讯格式

    1. 4 byte CRC32 of the message
    2. 1 byte "magic" identifier to allow format changes, value is 0 or 1
    3. 1 byte "attributes" identifier to allow annotations on the message independent of the version
       bit 0 ~ 2 : Compression codec
           0 : no compression
           1 : gzip
           2 : snappy
           3 : lz4
       bit 3 : Timestamp type
           0 : create time
           1 : log append time
       bit 4 ~ 7 : reserved
    4. (可选) 8 byte timestamp only if "magic" identifier is greater than 0
    5. 4 byte key length, containing length K
    6. K byte key
    7. 4 byte payload length, containing length V
    8. V byte payload

2. Kafka深层介绍

2.1.1. 架构介绍

go操作Kafka

  • Producer:Producer即出产者,音讯的产生者,是音讯的⼊口。

  • kafka cluster:kafka集群,一台或多台服务器组成

    • Broker:Broker是指部署了Kafka实例的服务器节点。每个服务器上有一个或多个kafka的实 例,咱们权且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的 编号,如图中的broker-0、broker-1等……
    • Topic:音讯的主题,能够理解为音讯的分类,kafka的数据就保存在topic。在每个broker上 都能够创立多个topic。实践运用中通常是一个事务线建一个topic。
    • Partition:Topic的分区,每个topic能够有多个分区,分区的作用是做负载,进步kafka的吞 吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式便是一个一个的⽂件夹!
    • Replication:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)毛病的 时分会选择一个备胎(Follower)上位,成为Leader。在kafka中默许副本的最大数量是10 个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机 器对同一个分区也只可能寄存一个副本(包括自己)。
  • Consumer:顾客,即音讯的消费方,是音讯的出口。

    • Consumer Group:咱们能够将多个消费组组成一个顾客组,在kafka的规划中同一个分 区的数据只能被顾客组中的某一个顾客消费。同一个顾客组的顾客能够消费同一个 topic的不同分区的数据,这也是为了进步kafka的吞吐量!

2.1.2. ⼯作流程

咱们看上⾯的架构图中,producer便是出产者,是数据的进口。Producer在写入数据的时分会把数据 写入到leader中,不会直接将数据写入follower!那leader怎样找呢?写入的流程又是什么样的呢?我 们看下图:

go操作Kafka

    1.⽣产者从Kafka集群获取分区leader信息
    2.⽣产者将音讯发送给leader
    3.leader将音讯写入本地磁盘
    4.follower从leader拉取音讯数据
    5.follower将音讯写入本地磁盘后向leader发送ACK
    6.leader收到一切的follower的ACK之后向出产者发送ACK

2.1.3. 选择partition的原则

那在kafka中,假如某个topic有多个partition,producer⼜怎样知道该将数据发往哪个partition呢? kafka中有几个原则:

    1.partition在写入的时分能够指定需求写入的partition,假如有指定,则写入对应的partition2.假如没有指定partition,可是设置了数据的key,则会依据key的值hash出一个partition3.假如既没指定partition,又没有设置key,则会采用轮询⽅式,即每次取一小段时刻的数据写入某
    个partition,下一小段的时刻写入下一个partition

2.1.4. ACK应答机制

producer在向kafka写入音讯的时分,能够设置参数来承认是否承认kafka接收到数据,这个参数可设置 的值为 0,1,all

  • 0代表producer往集群发送数据不需求等到集群的返回,不确保音讯发送成功。安全性最低可是效 率最高。
  • 1代表producer往集群发送数据只要leader应答就能够发送下一条,只确保leader发送成功。
  • all代表producer往集群发送数据需求一切的follower都完结从leader的同步才会发送下一条,确保 leader发送成功和一切的副本都完结备份。安全性最⾼高,可是效率最低。

最终要留意的是,假如往不存在的topic写数据,kafka会⾃动创立topic,partition和replication的数量 默许配置都是1。

2.1.5. Topic和数据⽇志

topic 是同⼀类别的音讯记载(record)的调集。在Kafka中,⼀个主题通常有多个订阅者。关于每个 主题,Kafka集群保护了⼀个分区数据⽇志⽂件结构如下:

go操作Kafka

每个partition都是⼀个有序而且不可变的音讯记载调集。当新的数据写⼊时,就被追加到partition的末 尾。在每个partition中,每条音讯都会被分配⼀个次序的唯⼀标识,这个标识被称为offset,即偏移 量。留意,Kafka只确保在同⼀个partition内部音讯是有序的,在不同partition之间,并不能确保音讯 有序。

Kafka能够配置⼀个保存期限,⽤来标识⽇志会在Kafka集群内保存多⻓时刻。Kafka集群会保存在保存 期限内一切被发布的音讯,不论这些音讯是否被消费过。⽐如保存期限设置为两天,那么数据被发布到 Kafka集群的两天以内,一切的这些数据都能够被消费。当超越两天,这些数据将会被清空,以便为后 续的数据腾出空间。因为Kafka会将数据进⾏耐久化存储(即写⼊到硬盘上),所以保存的数据⼤⼩可 以设置为⼀个⽐较⼤的值。

2.1.6. Partition结构

Partition在服务器上的表现形式便是⼀个⼀个的⽂件夹,每个partition的⽂件夹下⾯会有多组segment ⽂件,每组segment⽂件⼜包含 .index ⽂件、 .log ⽂件、 .timeindex ⽂件三个⽂件,其中 .log ⽂ 件便是实践存储message的地⽅,⽽ .index 和 .timeindex ⽂件为索引⽂件,⽤于检索音讯。

2.1.7. 消费数据

多个顾客实例能够组成⼀个顾客组,并⽤⼀个标签来标识这个顾客组。⼀个顾客组中的不同消 费者实例能够运⾏在不同的进程甚⾄不同的服务器上。

假如一切的顾客实例都在同⼀个顾客组中,那么音讯记载会被很好的均衡的发送到每个顾客实 例。

假如一切的顾客实例都在不同的顾客组,那么每⼀条音讯记载会被⼴播到每⼀个顾客实例。

go操作Kafka

举个例⼦,如上图所示⼀个两个节点的Kafka集群上拥有⼀个四个partition(P0-P3)的topic。有两个 顾客组都在消费这个topic中的数据,顾客组A有两个顾客实例,顾客组B有四个顾客实例。 从图中咱们能够看到,在同⼀个顾客组中,每个顾客实例能够消费多个分区,可是每个分区最多只 能被顾客组中的⼀个实例消费。也便是说,假如有⼀个4个分区的主题,那么顾客组中最多只能有4 个顾客实例去消费,多出来的都不会被分配到分区。其实这也很好理解,假如允许两个顾客实例同 时消费同⼀个分区,那么就⽆法记载这个分区被这个顾客组消费的offset了。假如在顾客组中动态 的上线或下线顾客,那么Kafka集群会⾃动调整分区与顾客实例间的对应关系。

3. 操作Kafka

3.1.1. sarama

Go语言中连接kafka运用第三方库: github.com/Shopify/sarama。

3.1.2. 下载及装置

    go get github.com/Shopify/sarama

留意事项: sarama v1.20之后的版别加入了zstd压缩算法,需求用到cgo,在Windows渠道编译时会提示类似如下错误: github.com/DataDog/zstd exec: “gcc”:executable file not found in %PATH% 所以在Windows渠道请运用v1.19版别的sarama。(假如不会版别操控请查看博客里边的go module章节)

3.1.3. 连接kafka发送音讯

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
// 基于sarama第三方库开发的kafka client
func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需求leader和follow都承认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的音讯将在success channel返回
    // 结构一个音讯
    msg := &sarama.ProducerMessage{}
    msg.Topic = "web_log"
    msg.Value = sarama.StringEncoder("this is a test log")
    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()
    // 发送音讯
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

3.1.4. 连接kafka消费音讯

package main
import (
    "fmt"
    "github.com/Shopify/sarama"
)
// kafka consumer
func main() {
    consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    if err != nil {
        fmt.Printf("fail to start consumer, err:%v\n", err)
        return
    }
    partitionList, err := consumer.Partitions("web_log") // 依据topic取到一切的分区
    if err != nil {
        fmt.Printf("fail to get list of partition:err%v\n", err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList { // 遍历一切的分区
        // 针对每个分区创立一个对应的分区顾客
        pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d,err:%v\n", partition, err)
            return
        }
        defer pc.AsyncClose()
        // 异步从每个分区消费信息
        go func(sarama.PartitionConsumer) {
            for msg := range pc.Messages() {
                fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
            }
        }(pc)
    }
}

本文正在参加「金石方案 . 瓜分6万现金大奖」