理论教育 SparkStreaming实例:从入门到精通

SparkStreaming实例:从入门到精通

更新时间:2025-01-03 理论教育 版权反馈
【摘要】:Spark Streaming 的编程和Spark 的编程很相似,对于Spark 来说是对于RDD 的操作,而对于Spark Streaming 来说是对DStream 的操作。同Spark 初始化需要创建SparkContext 对象一样,使用Spark Streaming 就需要创建StreamingContext 对象。指定处理数据的时间间隔,如Seconds表示Spark Streaming 会以1s 为时间窗口进行数据处理。需要指明数据源,Spark Streaming 支持多种不同的数据源,包括Kafka,Flume,HDFS 等数据源。wordCounts.printwordCounts.saveAsHadoopFiles5.启动调用Start 操作,Spark Streaming 才开始启动监听相应的端口,然后收取数据,并进行统计。

Spark Streaming 的编程和Spark 的编程很相似,对于Spark 来说是对于RDD 的操作,而对于Spark Streaming 来说是对DStream 的操作。

1.创建StreamingContext 对象。同Spark 初始化需要创建SparkContext 对象一样,使用Spark Streaming 就需要创建StreamingContext 对象。创建StreamingContext 对象所需的参数与SparkContext 基本一致,包括指明Master,设定名称(如WordCount)。指定处理数据的时间间隔,如Seconds(1)表示Spark Streaming 会以1s 为时间窗口进行数据处理

2.创建InputDStream。需要指明数据源,Spark Streaming 支持多种不同的数据源,包括Kafka,Flume,HDFS 等数据源。

3.操作Dstream。对于从数据源得到的DStream,用户可以在其基础上进行各种操作,对Dstream 的操作就如同对RDD 操作一样,可以使用flatMap,split,map,reduceByKey 等操作。

4.启动Spark Streaming。通过start 启动流程,会按照步骤三的逻辑对数据源进行处理。

下面通过一个WordCount 的例子来说明Spark Streaming 中的具体执行流程:

1. Spark Streaming 初始化

在开始进行DStream 操作之前,需要对Spark Streaming 进行初始化生成StreamingContext。参数中比较重要的是第一个和第三个,第一个参数是指定Spark Streaming 运行的集群地址,而第三个参数是指定Spark Streaming 运行时的时间窗口大小。

val ssc = new StreamingContext(“Spark://…”, “WordCount”, Seconds(1), [Homes],[Jars])

在WordCount 中就是将1 秒钟的输入数据进行一次Spark Job 处理:

val ssc = new StreamingContext(sc, Seconds(1))

2. Spark Streaming 的输入操作

目前Spark Streaming 已支持了丰富的输入接口,大致分为两类:一类是磁盘输入,如以1s 作为时间间隔监控HDFS 文件系统的某个目录,将目录中内容的变化作为Spark Streaming 的输入;另一类就是网络流的方式,目前支持Kafka、Flume、Twitter 和TCP socket。

在WordCount 例子中,假定通过网络Socket 作为输入流,监听某个特定的端口,最后得出输入Dstream 对象。

val lines = ssc.socketTextStream("localhost",9999)

3. Spark Streaming 的转换操作

Spark Streaming 的转换操作与Spark RDD 的操作极为类似,Spark Streaming 通过转换操作将一个或多个DStream 转换成新的DStream。常用的操作包括map、filter、flatmap 和join,以及需要进行shuffle 操作的groupByKey,reduceByKey 等。

在WordCount 例子中,首先需要将DStream(lines)切分成单词,然后将相同单词的数量进行叠加,最终得到的wordCounts 就是每1s 的中间结果。

val words = lines.flatMap(_.split(“ ”))(www.daowen.com)

val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)

4. Spark Streaming 的输出操作

对于Spark Streaming 的输出操作,Spark 提供了将数据打印到屏幕及输入到文件中。

在WordCount 中将wordCounts首先打印到屏幕上,然后将其输入到HDFS 文件中。

wordCounts.print

wordCounts.saveAsHadoopFiles(“WordCount”)

5.启动

调用Start 操作,Spark Streaming 才开始启动监听相应的端口,然后收取数据,并进行统计。

在WordCount 中首先启动Netcat,Netcat 是个简易的数据服务器。再打开一个终端,输入命令"nc -lk 9999"。

可以在Spark 终端中看到有新内容输出:

如果系统没有安装nc,如master(具有界面的CentOS7)节点可以通过如下方式安装:

make && make install

如果系统是CentOS7 最小化安装,如slave1 和slave2 节点,可以通过yum 安装,命令如下:yum install nmap-ncat.x86_64。

另外,Spark Streaming 有特定的窗口操作,窗口操作涉及两个参数:一个是滑动窗口的宽度;另一个是窗口滑动的频率,这两个参数必须是1s 的倍数。例如以过去5 秒钟为一个输入窗口,每1 秒统计一下WordCount,那么会将过去5 秒钟的每一秒钟的WordCount 都进行统计,然后进行叠加,得出这个窗口中的单词统计。

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))

但是以增量的方式来计算会更加高效,例如,计算t+4 秒这个时刻过去5 秒窗口的WordCount,可以将t+3 时刻过去5 秒的统计量加上[t+3,t+4]的统计量,再减去[t-2,t-1]的统计量,这种方法可以复用中间三秒的统计量,提高统计的效率

val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _,Seconds(5s),seconds(1))

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈