理论教育 Scala开发实战:消息消费者设计与Receiver方法

Scala开发实战:消息消费者设计与Receiver方法

时间:2023-11-24 理论教育 版权反馈
【摘要】:基于Receiver实现的方法主要是通过Receiver来接收数据,其中Receiver是用Kafka的高级消费API实现的。Consumer Group,它是逻辑Consumer应用程序集群范围内的识别符,一般通过字符串进行识别。同一个Consumer Group中的所有Consumer将分担从一个指定Apache Kafka的Topic中的读取任务,同时,同一个Consumer Group中所有Consumer从Topic中读取的线程数量最大值,即是N,多余的线程将会闲置。多个不同的Apache Kafka Consumer Group可以并行运行,同一个Apache Kaf

Scala开发实战:消息消费者设计与Receiver方法

Apache Kafka是一个分布式、分区、可复制的提交日志服务的发布订阅消息架构,怎样通过配置Spark Streaming来接收Apache Kafka上的数据呢?上一小节已经对Apache Spark Streaming和Kafka整合的框架流程进行了详细阐述,下面对Apache Spark Streaming和Kafka整合方法进行详细分析和阐述。Spark Streaming已经实现并提供了两种方法来与Apache Kaf⁃ka集成,第一种方法是,基于Receiver和Kafka的高级API来实现;第二种方法是,不使用Receiver,直接用Kafka的低级API来实现。第二种方法在Apache Spark中的1.3版本中出现,并体现了比较好的性能。这两种方法有着不同的编程模型、性能特征及语义特征,将对这两种方法进行详细介绍。

基于Receiver实现的方法主要是通过Receiver来接收数据,其中Receiver是用Kafka的高级消费API实现的。对于所有的Receivers,先从Kafka接收数据,之后通过Receiver存储在Spark的executor中,最后Apache Spark Streaming启动任务来处理这些数据。

在Apache Spark Streaming的默认配置下,在任务执行失败的情况下,基于Receiver实现的方法会出现丢失数据的情况,为了确保零数据丢失,我们的在Apache Spark Streaming中,另外加一个WAL(Write Ahead Logs)功能,这种基于WAL容错的设计方案已经在Spark 1.2版本中实现。

从Kafka中接收到的数据,使用WAL功能,并同步日志信息到分布式文件系统(比如:HDFS)中,这样即使Apache Spark Streaming任务执行出错,也能从WAL中实现所有数据恢复。

在Apache Spark Streaming与Apache Kafka整合中,其中的核心类入口是Apache Spark Streaming实现的KafkaUtils类,KafkaUtils类中包含很多createStream()方法,这些方法主要是从Apache Kafka的Brokers获取消息,之后创建Apache Spark Streaming的输入流。KafkaU⁃tils类的createStream()方法又要用到核心类KafkaInputDStream和DirectKafkaInputDStream,其中KafkaInputDStream类是使用Apache Kafka的The High Level Consumer API实现的,DirectKafkaInputDStream类是使用Apache Kafka的Low Level Consumer(Simple Consumer)API实现的,这两个类实际上就是分别对应上面提及的Apache Spark Streaming的两种方法实现。

基于Receiver实现的Apache Kafka和Apache Spark Streaming整合方法,也是用KafkaU⁃tils.createStream()方法来创建Apache Spark Streaming的输入流的,其中核心逻辑实现可以参考KafkaInputDStream类,这里先概要总结相关注意点与实现细节。

●细节一:Apache Kafka中的Topic分区与Apache Spark Streaming生成的RDDs分区没有关联。因此使用KafkaUtils.createStream()方法来增加Apache Kafka中Topic的数量,仅仅是增加了线程数量来消费Apache Kafka的Topic,其中接收这些数据的Receiver还是一个。换句话说,在基于Receiver实现的Apache Kafka和Apache Spark Streaming整合方法中,使用KafkaUtils.createStream()方法增加Apache Kafka中Topic的数量,并不会增加Apache Spark Streaming并行处理数据的能力。

●细节二:可以通过Apache Kafka中的Groups和Topics来创建多个Kafka输入DStre⁃ams,同时这些Apache Kafka中的Groups和Topics数据,可以用多个Receiver来并行接收。

●细节三:如果你启用了WAL(Write Ahead Logs)功能,比如将数据的日志信息写入到HDFS文件系统中,这里要通过KafkaUtils.createStream()方法修改输入流的存储级别为StorageLevel.MEMORY AND DISK SER。

对于上述细节,如果读者对Apache Kafka中的Topic、partitions、parallelism几个概念不熟悉的话,可能比较难理解,下面对这几个概念再加以解读。

Apache Kafka将数据存储在Topic中,每个Topic都包含了一些可配置数量的partition。Topic的partition数量对于性能来说非常重要,而这个值一般是消费者parallelism的最大数量:如果一个Topic拥有N个partition,那么应用程序最大程度上只能进行N个线程的并行,最起码在使用Kafka内置Scala/Java消费者API时是这样的。

Consumer Group,它是逻辑Consumer应用程序集群范围内的识别符,一般通过字符串进行识别。同一个Consumer Group中的所有Consumer将分担从一个指定Apache Kafka的Topic中的读取任务,同时,同一个Consumer Group中所有Consumer从Topic中读取的线程数量最大值,即是N(等同于分区的数量),多余的线程将会闲置。

多个不同的Apache Kafka Consumer Group可以并行运行,同一个Apache Kafka的Topic,可以运行多个独立的逻辑Consumer应用程序。每个逻辑应用程序都会运行自己的Consumer线程,使用一个唯一的Consumer Group id进行区分。而每个应用程序通常可以使用不同的read parallelisms。

上面的逻辑有点深奥,这里用一些简单的例子来对上面的逻辑加深理解。

假设这里有一个应用程序使用一个Consumer对一个Apache Kafka Topic进行读取,这个Topic拥有10个分区。如果Consumer应用程序只配置一个线程对这个话题进行读取,那么这个线程将从10个分区中进行读取。

但是如果配置5个线程,那么每个线程都会从2个分区中进行读取。

如果配置10个线程,那么每个线程都会从1个分区的读取。

但是如果配置多达14个线程。那么这14个线程中的10个将平分10个分区的读取工作,剩下的4个将会被闲置。

通过上面的例子,读者应该比较清楚地了解到Consumer Group、Apache Kafka Topic、Read Parallelisms之间的关系。

但是上面这些例子在现实应用中,顺序执行时(不断配置线程数量),会触发Apache Kafka中的再平衡事件,在Apache Kafka中,再平衡是个生命周期事件(lifecycle event),在Consumer加入或者离开Consumer Group时都会触发再平衡事件。为了对Apache Kafka中的再平衡事件深入理解,这里继续进行解析一下。

假设应用程序使用Consumer Group id为“AQing”,并且从1个线程开始,这个线程将从10个分区中进行读取。在运行时,逐渐将线程从1个提升到14个。也就是说,在同一个Con⁃sumer群中,parallelism突然发生了变化。毫无疑问,这将造成Apache Kafka中的再平衡。一旦在平衡结束,14个线程中将有10个线程平分10个分区的读取工作,剩余的4个将会被闲置。因此初始线程以后只会读取一个分区中的内容,将不会再读取其他分区中的数据。

在基于Receiver实现的方法中,Apache Kafka的Topic分区与Apache Spark RDDs分区没有关联,而基于这种方法的Apache Spark Streaming中的KafkaInputDStream(又称为Kafka连接器)使用了Kafka的The High Level Consumer API实现,这意味着在Apache Spark Stream⁃ing中为Apache Kafka设置read parallelism将拥有两种策略。(www.daowen.com)

策略一:Input DStream的数量,因为Spark在每个Input DStream都会运行一个receiver(=task),这就意味着使用多个input DStream将跨多个节点并行进行读取操作,因此,这里寄希望于多主机和NIC。

策略二:Input DStream上的消费者线程数量,一个receiver(=task)将运行多个读取线程。这也就是说,读取操作在每个core/machine/NIC上将并行执行。

会发现这两种策略实际上是前面细节一和细节二描述的具体化。实际上,在生产中第一种策略更有效,因为从Apache Kafka中读取数据通常情况下会受到网络/NIC限制,也就是说,在同一个主机上运行多个线程不会增加读的吞吐量。但是有时候从Apache Kafka中读取也会遭遇CPU瓶颈。然而第二种策略,多个读取线程在将数据推送到Block时会出现锁竞争。

为了便于用代码的角度理解,先看策略一这种基于Input DStream的数量策略并行读取Apache Kafka上的消息实例,代码如下:

在上述代码中,建立了5个input DStream,因此从Apache Kafka中读取的工作将分担到5个核心上,即5个主机/NIC。所有Input Stream都是Consumer Group的一部分,而Apache Kafka将保证Topic的所有数据可以同时对这5个input DSream可用。换句话说,这种协同的input DStream设置及基于Consumer Group的行为,是由Apache Kafka API提供,通过Kaf⁃kaInputDStream完成。

再来看策略二,这种基于Input DStream上的消费者线程数量策略并行读取Kafka上的消息实例,代码如下:

在这段代码中,将建立一个单一的input DStream,它将在同一个receiver/task上运行3个消费者线程,因此可以理解为在同一个core/machine/NIC上对Kafka topic Consumer Group进行消息读取。其中KafkaUtils.createStream()方法被重载,因此这里有一些不同方法的特征。在这里,会选择Scala派生以获得最佳的策略。

为了更好地理解上面的代码,我们先看看input DStream和RDD的关系,input DStream创建RDD分区,并由KafkaInputDStream从Apache Kafka中读取相应的数据信息到Block。其中KafkaInputDStream建立的RDD分区数量由batchInterval/spark.streaming.blockInterval决定,而batchInterval则是数据流拆分成batche的时间间隔,它可以通过StreamingContext的一个构造函数参数设置。

基于第一种多输入流的策略,这些Consumer都是属于同一个Consumer Group,它们会给Consumer指定分区。这样一来则可能导致分区,再均衡的失败,系统中真正工作的消费者可能只会有几个。为了解决这个问题,可以把再均衡尝试设置的非常高,然后,将会碰到另一个问题———如果receiver宕机(OOM,或者硬件故障),这将停止从Kafka接收消息。

出现这种情况,最直接的办法就是在与上游数据源断开连接或者一个receiver失败时,重新启动流应用程序。但是,这种解决方案可能并不会产生实际效果,即使应用程序需要将Apache Kafka配置选项auto.offset.reset设置到最小,由于Spark Streaming中一些已知的bug,可能导致流应用程序发生一些你意想不到的问题,具体情况请参考相关文档。

对于Input DStream上的消费者线程数量策略,前面也进行了详细的分析,其中的线程数量可以通过KafkaUtils.createStream()方法来进行参数设置,同时,Apache Kafka中input topic的数量也可以通过这个方法的参数指定,具体情况还得根据实际应用场景进行分析,这些常见的Apache Spark Streaming问题,一些是由当下Apache Spark中存在的一些限制引起的,一些则是由于当下Kafka input DSream的一些设置造成的。

通过上面的分析发现,如果将策略一和策略二进行归并,将会展现更好的效果,参考代码如下所示:

从上面这部分代码可以看出,其中建立了5个input DStream,它们每个都会运行一个消费者线程。如果Consumer Group Topic拥有5个分区(或者更少),如果对系统的吞吐量有比较高的要求的话,那么这将是进行并行读取的最佳途径。

以上内容对基于Receiver实现的方法的核心原理、优缺点、性能分析进行了详细地分析及展现,下面用基于Receiver实现的Apache Kafka和Apache Spark Streaming整合方法来实现单词计数的Apache Spark Streaming应用程序。参考代码如下。

该应用程序创建了一个KafkaWordCount类,代码进行了详细注释,其中的代码已经在Apache Spark的发布包中,读者只需要搭建好Apache Spark的集群环境,运行如下命令,就可以了查看Apache Spark Streaming的实时流统计展现效果。参考命令如下:

这里对上面命令参数的含义稍微进行说明:

参数一:zoo01,zoo02,zoo03,建立初始化连接Apache Kafka集群的host/port列表,这些列表不需要包含所有的Apache Kafka集群服务器。因为一旦连接Apache Kafka集群,就可以通过Apache Kafka集群上的配置,获取所有的Apache Kafka的服务器。参数二:my-consumer-group,Apache Kafka消费组的名称。

参数三:topic1,topic2,Apache Kafka的Topic,可以是一个,也可以多个,多个之间用英文逗号隔开。

参数四:1,最后一个参数表示,Apache Kafka消费者可用的线程数量。

这里已经基于Receiver实现了Apache Kafka与Apache Spark Streaming的整合,并实现了实时流单词的统计。如果读者想更加深入了解基于Receiver实现的Apache Kafka和Apache Spark Streaming整合方法,也可以详细阅读Spark Streaming中的KafkaInputDStream类的源码

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈