理论教育 Scala开发实战:生产者

Scala开发实战:生产者

时间:2023-11-24 理论教育 版权反馈
【摘要】:Producer通过Push的方式将消息发送到Broker时,Kafka会根据Partition机制选择将消息存储到哪一个Partition。因此用户在编写自己的类时,只要实现kafka.producer.Partitioner接口即可。Partition机制设置合理,Producer发送的所有消息可以均匀分布到不同的Partition里。该机制中生产者维护一个与Borker的弹性连接池,该连接池基于Zookeeper Watchers的回调函数来保持更新,以便与所有存活的Broker建立或者保持连接。当一个Producer对某一个Topic有请求时,该Topic的Partition信息被返回。

Scala开发实战:生产者

Producer通过Push的方式将消息发送到Broker时,Kafka会根据Partition机制选择将消息存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那么这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同Broker的不同Partition里,极大地提高了吞吐率。Topic的Partition数量可以通过配置文件$KAFKA_HOME/config/server.properties中的配置项num.partitions来指定,默认参数为1。用户也可以通过参数指定的方式来创建Topic的Partition数量,同时也可以在Topic创建之后,通过Kafka提供的工具来修改Partition数量。

Producer发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Partition。因此用户在编写自己的类时,只要实现kafka.producer.Partitioner接口即可。假设这条消息的key可以解析成整数,那么就可以将该数与Partition总数取余数,之后将消息发送到指定的Partition上去,其中Partition序列号从零开始。参考代码如下所示(只摘录实现的partition方法):

通过自定义的类来实现Partition接口之后,就可以在程序中使用这个自定义类,为了更加理解Partition分区机制,这里用个实例来阐述,比如先指定一个主题TopicA,该TopicA主题有4个分区,之后可以模拟100条消息,其中消息的key值分别为0、1、2、3,最后通过程序调用Consumer并打印消息列表,大家可以发现,key相同的消息会被发送并存储到同一个Partition里,而且key的序号正好和Partition序号相同。(www.daowen.com)

Partition机制设置合理,Producer发送的所有消息可以均匀分布到不同的Partition里。先来分析一下Kafka支持的客户端的负载均衡机制,Kafka支持消息生产者在客户端的负载均衡,或者利用专有的负载均衡器来均衡TCP连接。一个专用的四层均衡器通过将TCP连接均衡到Kafka的Broker上来工作。在这种配置下,所有的来自同一个生产者的消息被发送到一个Borker上。这种做法的优点是,一个生产者只需要一个TCP连接,而不需要与Zoo⁃keeper的连接。缺点是负载均衡只能在TCP连接的层面上来做。因此,负载均衡的性能不是很好。

基于Zookeeper的客户端的负载均衡可以解决这个问题。它允许生产者动态地发现新的Broker,并且在每个请求上进行负载均衡。同样的,它允许生产者根据一些键将数据分开,而不是随机分,这可以增强与Consumer的黏性,如上面举的例子是根据用户ID来划分数据消费的情景。这种基于Zookeeper的负载均衡,主要通过Zookeeper Watchers注册机制来监控其动态。Zookeeper Watchers注册的事件有:新的Broker启动、Broker关闭、新注册的Top⁃ic、Borker注册一个已经存在的Topic。该机制中生产者维护一个与Borker的弹性连接池,该连接池基于Zookeeper Watchers的回调函数来保持更新,以便与所有存活的Broker建立或者保持连接。当一个Producer对某一个Topic有请求时,该Topic的Partition信息被返回。连接池中的一个连接就可以将数据发送到前面所选的那个Broker分区中。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈