Producer发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。因此用户在编写自己的类时,只要实现kafka.producer.Partitioner接口即可。假设这条消息的key可以解析成整数,那么就可以将该数与Partition总数取余数,之后就可以将消息发送到指定的Partition上去,其中Partition序列号从零开始。现在大家自定义一个分区类SelfDefiningPartitioner,相关的语句如下所示:
通过自定义的类来实现Partition接口之后,就可以在程序中使用这个自定义类。现在创建一个Producer类ProducerInstance,在ProducerInstance类中,具体代码的编写步骤如下所示:
1)添加Kafka集群中Broker列表配置信息(格式为IP:port,或者hostname:port),这里无须指定集群中的所有Boker,只要指定其中部分即可,它会自动取meta信息并连接到对应的Boker节点。
2)添加Kafka集群序列化处理配置信息(用户可以自定义,其中serializer.class,默认为kafka.serializer.DefaultEncoder,即byte[],其中key.serializer.class为单独序列化可以处理类,默认和serializer.class一致),即kafka通过哪种序列化方式将消息传输给Boker,大家也可以在发送消息的时候指定序列化类型,不指定则使用默认序列化类型。
3)添加Kafka集群Broker分区类配置,指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。这里使用自定义的分区方式。
4)添加Kafka的触发acknowledgement机制,其中的值可以是0、1、-1。其中值为0时,表示Producer不等待Broker的acknowledgement,特点是延迟最少,但数据持久性保证最差;其中的值设置为1时,表示Broker中的Leader副本接收到数据后,才向Kafka的Pro⁃ducer发送acknowledgement,特点是数据具有比较好的持久性保证;其中的值设置为-1,表示Broker中所有的同步副本接收到数据后才向Kafka的Producer发送acknowledgement,特点是数据具有特别好的持久性保证。(www.daowen.com)
5)将这些配置加到Producer配置类中。
6)创建Producer类实例对象。
7)实现相应的业务逻辑,即实现Producer向Broker发送消息数据。
8)关闭Producer实例。
通过上面的步骤,完全使用Java语言,基于Kafka提供的ProducerAPI实现了KafkaPro⁃ducer的设计,其中包括自定义分区的设计,对于分区设计,读者可以按照自己的实际场景进行优化设计,其他直接按照这个模板就能简单地实现自己的KafkaProducer应用。为了能让读者对整个步骤有个全局了认识,这里自定义一个Producer类ProducerInstance,完整的生产者ProducerInstance类程序代码如下所示。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。