本章通过实际例子,讲解了如何使用java进行kafka开发。
添加依赖:
1 | <dependency> |
下面是创建主题的代码:
1 | public class TopicProcessor { |
首先定义了zookeeper相关连接信息。然后在createTopic中,先初始化ZkUtils,和zookeeper交互依赖于它。然后通过AdminUtils先判断是否存在你要创建的主题,如果不存在,则通过createTopic方法进行创建。传入参数包括主题名称,分区数量,副本数量等。
生产者生产消息
生产者生产消息代码如下:
1 | public class MessageProducer { |
1、首先初始化KafkaProducer对象。
1 | producer = new KafkaProducer<String, String>(configs); |
2、创建要发送的消息对象。
1 | ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message); |
3、通过producer的send方法,发送消息
4、发送消息时,可以通过回调函数,取得消息发送的结果。异常发生时,对异常进行处理。
初始化producer时候,需要注意下面属性设置:
1 | properties.put(ProducerConfig.ACKS_CONFIG,"all"); |
这里有三种值可供选择:
- 0,不等服务器响应,直接返回发送成功。速度最快,但是丢了消息是无法知道的
- 1,leader副本收到消息后返回成功
- all,所有参与的副本都复制完成后返回成功。这样最安全,但是延迟最高。
消费者消费消息
我们直接看代码
1 | public class MessageConsumer { |
代码逻辑如下:
1、初始化消费者KafkaConsumer,并订阅主题。
1 | kafkaConsumer = new KafkaConsumer<String, String>(properties); |
2、循环拉取消息
1 | ConsumerRecords<String,String> records = kafkaConsumer.poll(100); |
poll方法传入的参数100,是等待broker返回数据的时间,如果超过100ms没有响应,则不再等待。
3、拉取回消息后,循环处理。
1 | for(ConsumerRecord record:records){ |
消费相关代码比较简单,不过这个版本没有处理偏移量提交。学习过第四章-协调器相关的同学应该还记得偏移量提交的问题。我曾说过最佳实践是同步和异步提交相结合,同时在特定的时间点,比如再均衡前进行手动提交。
加入偏移量提交,需要做如下修改:
1、enable.auto.commit设置为false
2、消费代码如下:
1 | public static void main(String[] args){ |
3、订阅消息时,实现再均衡的回调方法,在此方法中手动提交偏移量
1 | kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() { |
通过以上三步,我们把自动提交偏移量改为了手动提交。正常消费时,异步提交kafkaConsumer.commitAsync()。即使偶尔失败,也会被后续成功的提交覆盖掉。而在发生异常的时候,手动提交 kafkaConsumer.commitSync()。此外在步骤3中,我们通过实现再均衡时的回调方法,手动同步提交偏移量,确保了再均衡前偏移量提交成功。
以上面的最佳实践提交偏移量,既能保证消费时较高的效率,又能够尽量避免重复消费。不过由于重复消费无法100%避免,消费逻辑需要自己处理重复消费的判断。
你真的不关注一下嘛~