理论教育 DataFrame和RDD的无缝衔接操作

DataFrame和RDD的无缝衔接操作

时间:2023-06-14 理论教育 版权反馈
【摘要】:Spark SQL 有两种方法将RDD 转为DataFrame。RDD 隐式转换成DataFrame 后,可以进一步注册成表,就可以对表中数据使用SQL 语句查询了。引入spark.implicits._包,用于从RDD 到DataFrame 的隐式转换。teenagersDF.map.show()运行结果为:二、编程方式定义Schema如果能事先通过case class 定义schema,那么需要按以下三个步骤,以编程方式的创建一个DataFrame:从已有的RDD 创建一个包含Row 对象的RDD。用StructType 创建一个schema,和步骤1 中创建的RDD 的结构相匹配。

DataFrame和RDD的无缝衔接操作

Spark SQL 有两种方法将RDD 转为DataFrame。

1.使用反射机制,推导包含指定类型对象RDD 的schema。这种基于反射机制的方法使代码更简洁,如果事先知道数据schema,推荐使用这种方式。

2.编程方式构建一个schema,然后应用到指定RDD 上。这种方式会让代码比较冗长,但如果事先不知道数据有哪些字段,或者数据schema 是运行时读取进来的,需要用这种方式。

一、利用反射推导schema

Spark SQL 的Scala 接口支持自动将包含case class 对象的RDD 转为DataFrame。对应的case class 定义了表的schema。case class 的参数名通过反射,映射为表的字段名。case class 还可以嵌套一些复杂类型,如Seq 和Array。RDD 隐式转换成DataFrame 后,可以进一步注册成表,就可以对表中数据使用SQL 语句查询了。

引入spark.implicits._包,用于从RDD 到DataFrame 的隐式转换。

import spark.implicits._

从一个文本文件中创建一个Person 对象的RDD,将其转换成一个Dataframe。

将DataFrame 注册成一个临时视图,SQL 语句可以通过Spark 的sql 方法来运行。

peopleDF.createOrReplaceTempView("people")(www.daowen.com)

val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

查询结果中某一行的列可以通过字段索引来访问,或者通过字段名称访问。

teenagersDF.map(teenager => "Name: " + teenager(0)).show()

运行结果为:

二、编程方式定义Schema

如果能事先通过case class 定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,又或者字段对不同用户有所不同),那么需要按以下三个步骤,以编程方式的创建一个DataFrame:

(1)从已有的RDD 创建一个包含Row 对象的RDD。

(2)用StructType 创建一个schema,和步骤1 中创建的RDD 的结构相匹配。

(3) 把得到的schema 应用于包含Row 对象的RDD, 调用SparkSession.createDataFrame 方法来实现这一步。

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈