前语

生产者担任向Kakfa发送消息。扮演的是一个消息投递的人物。为了保证消息可以顺利安全地发送到Kafka broker里面,Kafka对生产者精心规划许多机制。

本篇介绍Kafka中生产者的一些根底配备和一些生产者的机制。

看完这篇文章你将会收成:

Kafka:生产者全解

生产者

消息发送的流程

Kafka:生产者全解

消息的构成:

  • Topic:主题
  • Partition:分区
  • key:假设key有值,分区器会根据它进行分区
  • valuea:消息内容

ProducerRecord方针包含方针主题(Topic) 和要发送的内容。生产者需求把键和值方针序列化成字节数组

然后数据会序列化器解析,得到解析后的数据,再扔给分区器。

假设在ProducerRecord方针里面指清楚分区,那么分区器就不多做处理。假设没有指明,那就靠分区器根据方针的键来选择一个分区。

选择好分区之后,生产者就知道往哪个主题和分区发送这条记载。这条记载会被追加到一个记载批次里面,一个批次里面的全部消息会被发送到相同的主题和分区上。这部分作业是有一个独立的线程担任把这些记载批次发送到相应的broker上。

服务器收到这些消息时,就会回来一个照应。

假设消息写入Kafka成功,就回来一个RecordMetaData元数据方针,它包含了主题分区信息,以及记载在分区的偏移量

假设消息写入Kafka失利,就会回来一个差错,生产者收到差错之后会测验从头发送消息,几次之后假设仍是实不行,就回来差错信息。 (最大尽力交给)

生产者配备

要想往Kafka写入消息,第一步就是要创立一个生产者方针,并且设置一些配备特点

必选配备

下面罗列几个Kafka生产者有必选特点

bootstrap.servers

指定broker的地址清单,地址的格式为host:sport。清单里面不需求包含全部broker地址,生产者会从给定的broker里面找到其他broker的信息。(建议是供应两个broker,一旦其间一个宕机,别的一个顶上去衔接到集群)

key.serializer

broker期望接收到的键值都是字节数组。生产者接口容许运用参数化类型(Java里面叫泛型) ,因此可以把java方针作为键或值发送给broker。默许供应了ByteArraySerializerStringSerializerIntegerSerializer

value.serializer

跟前者相同,假设key、value都是同一个类型,那么就跟key.serializer运用同一个序列化器即可。

可选配备

acks

满意acks的设置,才代表生产者成功写入消息

Kafka:生产者全解

  • acks=0:代表生产者不需求任何服务器的照应。这种情况下,消息丢了就丢了,生产者自己也不知道。合适高吞吐量,但是消息并不重要的场景。
  • acks=1:只需首领副本收到消息,生产者就会收到来自服务器的成功照应。假设消息无法抵达首领节点(比如首领节点溃散,新的首领还没有被推举出来等),生产者就会收到一个差错照应。生产者会测验重发消息。此时的吞吐量取决于是同步发送方式,仍是异步发送方式
  • acks=all:只有当全部副本都收到消息,生产者才会收到来自服务器的一个成功照应。

buffer.memory

生产者内存缓冲池大小

用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息,

compression.type

紧缩算法,指的是消息被发送到broker之前运用哪一种紧缩算法进行紧缩

消息在发送到broker之前,要走网络,那么就意味着需求消耗带宽。假设我们这个消息大小太大,功用上就很欠好。所以我们一般会采用某种紧缩算法来紧缩消息。

  • snappysnappy是谷歌研制的,它占用较少的CPU,却可以得到较好的功用和紧缩比,假设是比较重视功用和互联网带宽(比如直播体系) 可以运用这种算法。
  • gzipgzip一般会占用较多的CPU,但是会提高更高的紧缩比,假设带宽有限,可以选用这种算法。特别的gzip对文本类型紧缩有特别好的作用。
  • lz4lz4则是寻求紧缩解压速度,他的紧缩比并不是很好。假设网络带宽条件比较好,可以选用这种紧缩算法。

retries

生产者重发消息的次数

字面意思,生产者重发消息的次数,超过了之后就会抛出一个重试失常。

batch.size

批次的大小

Kafka:生产者全解

按照字节数核算,不是按照消息个数来核算。当批次被填满的时分,批次里面的全部消息被发送出去。有时分也不需求等到被填满才发送,半满,乃至只包含1个消息的批次也有或许被发送。

  • 批次太大:占内存。
  • 批次太小:频频发送消息,带来额定的开支。

linger.ms

发送批次前等候下一个消息参加的时间,也就是批次的泊车等候时间

KafkaProducer会在批次被填满或许linger.ms抵达上限时把批次发送出去。

默许情况下,只需有可用的线程,生产者就会把消息发送出去。

max.in.flight.requests.per.connection

服务器照应之前能接受的消息数

生产者在收到服务器照应之前,可以发送多少个消息,它的值越高,约占内存,不过吞吐量也越高。

设置为1,则可以保证消息是按照发送次第写入服务器的,即就是发送了重试

但是其实没必要阿,Kafka是可以保证同一个分区里的消息是有序得。只需生产者按照必定次第发送消息,broker就会按照这个次第将他们写入同一个分区。顾客消费的时分也是按次第消费的。

消息发送方法

实例化生产者方针之后,就可以向Kafka发送消息了,发送消息有3种方法。消息先是被放进缓冲区,然后运用独自的线程发送到服务端。

发送并忘掉(fire-and-forget)

发送给服务器之后,不关心他是否成功抵达,大多情况下,消息会正常抵达,因为Kafka是高可用,并且要害是生产者会自动测验重发,有必定几率会丢掉消息。

ProducerRecord<String, String> record =
  new ProducerRecord<>("CustomerCountry", "Precision Products",
             "France"); 
try {
  producer.send(record); 
} catch (Exception e) {
  e.printStackTrace();
}

同步发送

运用send()方法发送消息,他会回来一个Future方针,通过调用get()方法进行等候,就可以知道消息是否发送成功。

同步,指的是发送给Kafka之后,我这边还需求有一个发送服务端等候服务请照应进程,通过调用那个Future方针的get方法来进行处理后续逻辑。

ProducerRecord<String, String> record =
  new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
  producer.send(record).get(); 
} catch (Exception e) {
  e.printStackTrace(); 
}

❌KafKaProducer一般会发生两种类型差错

  • 一类是可重试差错,这类差错可以通过重发消息来处理。比如关于衔接差错,可以通过再次建立衔接来处理。“无主”差错(no-leader)则是可以通过从头为分区从头推举首领来处理。假设多次都无法处理问题,则会抛出一个重试失常
  • 别的一类为无法通过重试处理,比如消息太大的失常,这类失常KafkaProducer不会进行任何重试,直接抛出失常。

异步发送

调用send()方法,指定一个回调函数,服务器在回来照应时调用该函数。

异步,指的是我不需求去处理消息等候进程,我们通过指定一个回调函数,让Kafka那儿收到消息之后调用这个回调函数即可。

大多数情况,我们不需求等候照应,虽然说,Kafka会把方针主题、分区信息和消息的偏移量发送过来,但是关于发送端的应用程序来说,并不是有必要的。不过我们遇到消息发送失利的时分,需求抛出失常,记载差错日志,或许把消息写入“差错消息”文件以便日后剖析

private class DemoProducerCallback implements Callback {
  @Override
  public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (e != null) {
      e.printStackTrace(); 
     }
   }
}
​
ProducerRecord<String, String> record =
  new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); 
producer.send(record, new DemoProducerCallback());

优点就是,我们可以对失常情况进行处理。

序列化器

之所以为什么需求运用序列化器,是因为我们有许多客户端,每个客户端传给Kafka的键值类型都是可以不同的,所以需求客户端指明运用的序列化器,让Kafka知道怎样去解析数据

举荐的运用计划是:Avro。(Hadoop也是运用这个)

Kafka有自己供应的默许序列化器(byteArray、String、Integer等) ,也可以我们用户自己去自定义序列化器

自定义序列化器

我们可以用业内常见的序列化结构,如AvroThrift或是Protobuf不建议运用我们自定义序列化器

因为当我们需求更新我们这个方针,新增或删去某一个字段,那么这个自定义序列化器就要发生改动。或许不止一个客户端用这个方针,那么全部的客户端序列化器都要跟着改。非常不利于扩展

怎样写一个自定义序列化器?

首要我们要知道Kafka接受的是字节数组,所以我们只需求把方针序列为字节数组回来出去即可

运用Avro序列化

Apache Avro是一种与编程言语无关的序列化格式。

Avro数据通过与言语无关的schema来定义。

schema通过JSON来描述,数据被序列化成二进制文件或许JSON文件。(一般都是用二进制文件)

Avro在读、写数据都需求用到schemaschema存在于Avro的数据文件中

Avro的特色在于,担任写消息的应用程序假设运用了新的schema(比如新增或删去某一个字段),担任读消息的应用程序可以持续处理消息而无需做任何改动。所以它非常合适Kafka。

留心:虽然担任读消息的应用程序不需求改schema,但是它仍是会读不到最新的字段,只不过他不会回来差错或许失常,读取到的是null。

在Kafka中运用Avro

Avro的数据文件包含了整个schema,虽然说这种开支对是可接受的,但是Kafka有许多消息,那么每个消息所带来的负担是不行忽视的。怎样处理呢?Kafka有他自己的一套东西。

Kafka采用的是运用schema注册表来抵达方针,这个不是Kafka自己完结的,需求凭借外部来完结。我们用的是Confluent Schema Registry

原理就是非常简略,原先Avro数据里面放schema,现在不放了,改为将全部的schema放在同一个当地存着(这个当地就是schema注册表) ,然后解析的时分根据Avro数据里的schema标识符去自己拉取schema下来。因此我们在发送消息的时分需求注册schema到schema注册表里面,然后塞入一个shcema标识符即可。

本质上:就是一个拉方式(poll) ,或许说是读扩散

分区

一个主题下,有一个或多个分区,在同一个分区内,消息是具有次第性的

ProducerRecord方针包含了方针主题、键、值。

:可以设置为默许的null,不过大多数应用程序会用到null。键主要有2个用途:

  • 可以作为消息的附加消息。
  • 也可以抉择消息该写到主题的哪个分区。具有相同键的消息可以被写到同一个分区

运用null作为键值,并且运用了默许的分区器,那么记载就会被随机地发送到主题内各个可用的分区上

假设键不为空,并且运用了默许的分区器。Kafka会对键进行散列,然后根据散列后的成果,把消息映射到特定的分区上。所以同一个键总是会被映射到同一个分区上。

要害一点是:这边散列的分区是全部的分区,并不是可用的分区。所以有或许映射到不行用的分区。但是这种情况很少发生,并且Kafka具有仿制功用可用性

一般我们是不会简单改动主题分区数量。因为一旦改动了,那么全部的映射联系都会发生变化。有或许同一个键的数据会被映射到不同的分区。所以假设想要通过键来映射分区,那么最好在创立主题的时分就把分区规划好。

原则上是:永久不要新增分区

分区器

默许分区器就是上面所谈论的情况,他是运用次数最频频、最常用的分区器。它选用的是散列分区这么一个战略

有些时分,我们的事务需求,需求我们对数据指定分区,并且支持独自性的分区,比如某个大客户的账号记载我们想要独自分配到某个分区。

黏性分区:StickyPartitioning Strategy

0.10版别之后的kafka完结了黏性分区战略,完结生产者发送数据分块优化。

我们知道,往Kafka发送消息,broker并不会立刻接收到消息。Kafka有按量准时进行一个Batch批次的消息发送。

从这个规划上来说,我们当然期望是消息尽或许填满一个批次,这样是最赚的。

实际上,抉择Batch怎样构成的一个因素是分区战略(Partition Strategy)。

Kafka2.4版别之前,选用的默许分区战略是轮询(Round-Robin),(既没有指定partition,又没有指定key的情况下,假设多条消息不是被发送到相同的分区,那么他们就不能被放到一个batch里)

所以这样就会形成一个大的Batch被拆分红多个小Batch。因此社区推出了一种新的分区战略黏性分区

黏性分区:会随机选择一个分区并尽或许地坚持运用该分区,代表黏住这个分区。

好处显著地下降给消息指定分区进程中地延时,有助于改进消息批处理,减少延迟,并减少broker的负载。