0%

kafka 生产者设置

本文是「kafka权威指南」的第三章笔记。

在设置生产者之前我们先看看整个生产者到kafka broker的流程是怎么样的。

这个图是原文里的。

可以看到一个标准的ProducerRecord是必须要包含Topic和Value,然后Partition和Key是可选的。

从图中也可以看到,生产者需要把Key,Value序列化成数组,然后再去网络上传输。

如果record里有包含了Partition了,那就不需要分区器干活了,如果没有包含,那分区器就会根据record对象的key来选择一个分区。

选好分区之后,那生产者就知道往哪个topic和分区里发送记录了。然后这个记录就被添加到一个记录批次里,这个批次里所有消息都会被发送到相同的topic和分区上,这是由一个独立线程来处理的。

broker收到这些消息事会返回一个响应,如果成功,那就返回一个RecordMetaData对象,包含了topic和分区信息,以及record在分区里偏移量。

从这个流程里看到,这里一部分是生产者内部要处理的,一部分是生产者跟broker之间协调的。

生产者内部

key.serializer

这个是必须要设置的,就算你只发送value。
因为broker希望接收到的消息都是字节数组,默认提供了ByteArraySerializer,StringSerializer和IntegerSerializer这几种。
这个就看你的key的内容是什么了。一般可能就用ByteArraySerializer了

value.serializer

跟上面的key.serializer一样,如果你的key和value都是一样类型,那就用一样的序列化器,如果不一样 就需要使用不同的。

生产者配置

acks

  • acks=0
  • acks=1
  • acks=all
    这个很容易理解,0表示不等待broker响应,1表示leader响应,all表示所有副本都接收到才会响应成功。
    这种的就看整个系统的需求了。这里少了个类似most这样的参数。

buffer.memory

设置生产者的缓冲区大小,超过大小要么被阻塞,要么抛异常。

compression.type

默认情况下是不压缩的,但是该参数可以设置为snappy,gzip,lz4这3种。从CPU占用来看,snappy效果最好。gzip比较耗cpu,但是压缩率更高。在CPU资源宝贵的情况下还是用snappy吧。

retries

重试次数
这个很容易理解,0表示不等待broker响应,1表示leader响应,all表示所有副本都接收到才会响应成功。
这种的就看整个系统的需求了。这里少了个类似most这样的参数。

buffer.memory

设置生产者的缓冲区大小,超过大小要么被阻塞,要么抛异常。

compression.type

默认情况下是不压缩的,但是该参数可以设置为snappy,gzip,lz4这3种。从CPU占用来看,snappy效果最好。gzip比较耗cpu,但是压缩率更高。在CPU资源宝贵的情况下还是用snappy吧。

retries

重试次数。 默认情况下是每次之间等待100ms,但是可以通过retry.backoff.ms来修改时间间隔。这个时间设置最好提前测试好恢复一个崩溃节点需要多久。

batch.size

指定一个批次可以使用的内存大小,按字节数来计算,而不是个数。
但是当满足下面这个参数的时候,就算一个批次没有满也会发送。

linger.ms

指定了发送批次之前等待更多消息加入批次的时间。这个最好设置大于0的数字,这样可以提升吞吐量,但是也别太大了。

max.in.flight.requests.per.connection

指定了生产者在收到服务器响应之前可以发送多少个消息。设置为1就可以保证消息是按照发送顺序写入broker的。

timeout.ms

指定了broker等待同步副本返回消息确认的时间,与acks的配置相匹配(1和all的确认时间可完全不同)

request.timeout.ms

指定了生产者在发送数据时等待broker返回响应的时间。

metadata.fetch.timeout.ms

指定了生产者获取元数据(比如目标分区的leader)时等待服务器返回响应的时间。

max.block.ms

指定了调用了send()或者使用partitionsFor()获取元数据时生产者的阻塞时间。当生产者的发送缓冲区满了,或者没有可用的元数据时,这些方法就会抛出异常。

max.request.size

控制生产者发送的请求大小。这个参数要跟broker的message.max.bytes相匹配

receive.buffer.bytes和send.buffer.bytes

就是指定tcp socket接收和发送缓冲区的大小,如果设置为-1就是使用操作系统的值。

上面这些就基本是发送者的参数了。感觉kafka在这些命名上最好都统一跟broker的命名靠拢。不过问题不大。
性能和安全性肯定不能同时满足的,看自己取舍了。