生产者
发送消息:
Properties props = new Properties();//kafka 集群,broker-listprops.put("bootstrap.servers", "172.24.211.140:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) { var record = new ProducerRecord<>("test", "Precision Products", "France"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println(metadata); } });}producer.close();
配置
- acks
- 定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的
- acks=0 ,生产者在成功写入消息之前不会等待任何来自服务器的响应 当 broker 故障时有可能 丢失数据
- acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应 如果在 follower同步成功之前 leader 故障,那么将会丢失数据
- acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应 如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成 数据重复
- buffer.memory
- 设置生产者内存缓冲区的大小
- compression.type
- 设置消息压缩算法
- retries
- 决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误
- batch.size
- 指定了一个批次可以使用的内存大小
- linger.ms
- KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去
- client.id
- max.in.flight.requests.per.connection
- 指定了生产者在收到服务器响应之前可以发送多少个消息
- timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
- max.block.ms
- 调用send时最长的阻塞时间
- max.request.siz
- receive.buffer.bytes 和 send.buffer.bytes
- 分别指定了 TCP socket 接收和发送数据包的缓冲区大小
顺序保证
- 将max.in.flight.requests.per.connection设置为1
保证顺序的方法就是:
- 每个主题只分为一个区
- 或者每次发送的消息发送到同一个分区
序列化器
- 自定义序列化器:实现`Serializer`接口
- 不推荐使用
- 其他序列化
- avro:一种将shcema嵌入在数据里的序列化方式
分区策略
分区的原因:
- 方便扩展
- 提高并发
分区原则:
- 指明 partition 的情况下,直接将指明的值直接作为 partiton 值
- 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition数进行取余得到 partition 值
- 否则就是随机取一个值 然后在这个值的基础上进行轮询
自定义分区器:
实现Partitioner
接口
数据可靠性保证
- Exactly Once
将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 AtLeast Once 语义
At Least Once + 幂等性 = Exactly Once
另外一个需要注意的是要注册发送回调,发生不成功,需要客户端进行重试
生产者幂等
生产者可以指定 enable.idempotence 来实现生产幂等,其原理是 broker 引入了ProducerID和SequenceNumber,每个新的Producer初始化时,会被分配一个唯一的ProducerID,同时生产者为每条消息生成SequenceNumber,broker 端通过这两个字段来判断数据之前是否发送过,从而实现幂等
这种幂等判断只能保证某个主题的一个分区上不出现重复消息
事务
能保证多条消息原子性地写入到目标分区,同时也能保证 Consumer 只能看到事务成功提交的消息
producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction();
是一种类似 2PC 的实现