kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)
1.简介
KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。
acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。
KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。
2.多线程实现的方式
2.1.线程封闭多线程
即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。
这种方式的并发度受限于分区的实际个数。
实现代码示例:
public class kafkaConsumer1 { public void ConsuermMultithread1() { Properties props = initConsifg(); // 此处初始化消费者配置参数省略 int consumerThreadNum = 5; for (int i = 0; i2.1.消息处理模块多线程
此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
public class kafkaConsumer1 { public void ConsuermMultithread1() { Properties props = initConsifg(); // 此处初始化消费者配置参数省略 int consumerThreadNum = 5; for (int i = 0; i此方法需要引入一个共享的offsets来参与提交。
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。