理论教育 Scala实战:设计消息消费者

Scala实战:设计消息消费者

时间:2023-11-24 理论教育 版权反馈
【摘要】:2)Consumer Group名是一个全局信息,要注意在启动新的消费者之前,要关闭旧的消费者。2)添加Consumer Group配置信息,如果不指定,Kafka会自动添加Consumer Group名称,比如上面用到的console-consumer-41521,就是Kafka自动生成的Consumer Group名称。5)添加Consumer多久更新offset到Zookeeper的配置信息,offset更新是基于时间的,而不是每次获得的消息。6)创建Consumer对象实例,将上面的配置信息添加到Consumer对象中。10)关闭Consumer对象实例。

Scala实战:设计消息消费者

Kafka中的Consumer设计是比较核心的部分,Kafka的Consumer API分为The Low Level Consumer API和The High Level Consumer API,先用The High Level Consumer API来开发Kaf⁃ka的Consumer程序,因为使用Kafka中的The High Level Consumer API,编写Consumer程序变得非常简单,不需要考虑过多的细节,只要能从Kafka中的Broker获取数据即可。

虽然在前面的Kafka的核心特征剖析部分,对Kafka的Consumer原理进行了详细地阐述和分析,这里还要注意如下细节:

1)The High Level Consumer API会在内部将消息进行持久化,并读到消息的offset,数据保存在Zookeeper中的Consumer Group名中(如/consumers/console-consumer-41521/off⁃sets/test/0。其中console-consumer-41521是消费组,test是topic,最后一个0表示第一个分区),每间隔一个很短的时间更新一次offset,那么可能在重启消费者时拿到重复的消息。此外,当分区Leader发生变更时,也可能拿到重复的消息。因此在关闭消费者时最好等待一定的时间(10s)然后再shutdown()。

2)Consumer Group名是一个全局信息,要注意在启动新的消费者之前,要关闭旧的消费者。如果启动新的进程并且Consumer Group名相同,Kafka会添加这个进程到可用消费线程组中,并用来消费Topic和触发重新分配负载均衡,那么同一个分区的消息就有可能发送到不同的进程中。

3)消费的线程多于分区数,一些线程可能永远无法看到一些消息。

4)分区数多于线程数,一些线程会收到多个分区的消息。

5)一个线程对应了多个分区,那么接收到的消息是不能保证顺序的。

注意:可用Zookeeper中的zkCli.sh命令查询Kafka中的信息。

1.用The High Level Consumer API编写Consumer程序 现在用Kafka的The High Level Consumer API来编写Consumer程序,先创建一个Con⁃sumerInstanceHighLevelAPI类,在类ConsumerInstanceHighLevelAPI中,编写步骤如下:

1)添加Zookeeper服务器地址的配置信息。

2)添加Consumer Group配置信息,如果不指定,Kafka会自动添加Consumer Group名称,比如上面用到的console-consumer-41521,就是Kafka自动生成的Consumer Group名称。

3)添加Kafka等待Zookeeper返回消息的超时时间配置信息。

4)添加Zookeeper同步最长延迟多久才产生异常配置信息。

5)添加Consumer多久更新offset到Zookeeper的配置信息,offset更新是基于时间的,而不是每次获得的消息。一旦在更新zookeeper时发生异常并重启,将可能获取已获取过的消息。

6)创建Consumer对象实例,将上面的配置信息添加到Consumer对象中。

7)创建线程数量,告诉Kafka该进程有多少线程来处理对应的Topic。

8)获取每个stream对应的Topic。

9)使用Executor来创建一个线程池,之后调用线程池来处理Topic。相关实现代码如下所示。(www.daowen.com)

10)关闭Consumer对象实例。

通过上面的步骤,我们用Java语言,基于Kafka提供的Consumer高级API实现了一个完整的KafkaConsumerDemo,很多代码多可以复用,读者只需要理解上面的步骤,就可以以其为模板,写出自己的KafkaConsumer。特别注意的是,上面是使用Apache Kafka提供的高级API实现的KafkaConsumerDemo,为了让读者有个全局的认知,这里我们提供了一个Con⁃sumer类ConsumerInstanceHighLevelAPI,完整的ConsumerInstanceHighLevelAPI类源代码如下所示。

2.用The Low Level Consumer API编写Consumer程序 使用Kafka的The Low Level Consumer API来编写Consumer的应用场景为,第一:针对一个消息读取多次;第二:在一个process中,仅仅处理一个topic中的一组partitions;第三:确保每个消息只被处理一次。因此,如果您的业务常用有这些需求,Kafka的The Low Level Consumer API就是最佳的选择了,现在用Kafka的The Low Level Consumer API来编写Consumer。

在用Kafka的The Low Level Consumer API来编写Consumer时,要注意几点,相对于Kafka中The High Level Consumer API,用Kafka的The Low Level Consumer API来编写Con⁃sumer需要程序员自己处理更多的细节,比如:

1)程序员必须实现,当消费停止时,如何持久化offset。

2)程序员必须实现,怎样处理Topic和分区,并确定哪个Broker上的Partition是Leader。

3)程序员必须实现,当Leader宕机时,怎样选择新的Leader。

基于Kafka的The Low Level Consumer API来编写Consume步骤如下:

1)创建一个Consumer类,这里为ConsumerInstanceLowLevelAPI类。

2)找到Broker中的Leader,以便读取Topic和Partition,这里主要是获得Metedata信息,注意,这里也不需要添加所有的Broker列表,只要连接其中的一个Broker,就可以通过集群中的配置信息(Kafka内部机制),获得所有的Broker列表。

3)通过获取的Topic和Partition信息,自己决定哪个副本作为Leader,并通过offset,来实现消息的持久化。在获取所有leader的同时,可以用metadata.replicas()更新最新的节点信息。

4)建立业务需要的数据。

5)获取数据。

6)确定当其中的Leader因为宕机而丢失时,怎样选举新的Leader。

上面就是使用Apache Kafka提供的低级ConsumerAPI来实现Kafka的Consumer的详细步骤,读者也可以用上面的模板编写自己的KafkaConsumer。但是用Kafka提供的低级API来实现Consumer比用Kafka提供的高级API来实现Consumer复杂很多,因为在使用低级API实现Kafka的Consumer,要对Consumeroffsets进行控制实现、对副本Leader实现选举机制,这对用户来说是一个极大的挑战。为了让读者从全局来理解基于Kafka低级API来实现Kaf⁃kaConsumer的过程,这里我们实现了一个Consumer类ConsumerInstanceLowLevelAPI,完整的ConsumerInstanceLowLevelAPI类源代码如下所示。读者在阅读如下代码时,不懂的地方可以参考代码中的注释和上面步骤分析,代码如下。

到目前为止,我们已经详细讲解了用Kafka的The Low Level Consumer API和The High Level Consumer API来编写Consumer应用程序,读者可以使用这些思路和方法来编写更加复杂的Consumer业务逻辑了。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈