Spark SQL 有两种方法将RDD 转为DataFrame。
1.使用反射机制,推导包含指定类型对象RDD 的schema。这种基于反射机制的方法使代码更简洁,如果事先知道数据schema,推荐使用这种方式。
2.编程方式构建一个schema,然后应用到指定RDD 上。这种方式会让代码比较冗长,但如果事先不知道数据有哪些字段,或者数据schema 是运行时读取进来的,需要用这种方式。
一、利用反射推导schema
Spark SQL 的Scala 接口支持自动将包含case class 对象的RDD 转为DataFrame。对应的case class 定义了表的schema。case class 的参数名通过反射,映射为表的字段名。case class 还可以嵌套一些复杂类型,如Seq 和Array。RDD 隐式转换成DataFrame 后,可以进一步注册成表,就可以对表中数据使用SQL 语句查询了。
引入spark.implicits._包,用于从RDD 到DataFrame 的隐式转换。
import spark.implicits._
从一个文本文件中创建一个Person 对象的RDD,将其转换成一个Dataframe。
将DataFrame 注册成一个临时视图,SQL 语句可以通过Spark 的sql 方法来运行。
peopleDF.createOrReplaceTempView("people")(www.daowen.com)
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
查询结果中某一行的列可以通过字段索引来访问,或者通过字段名称访问。
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
运行结果为:
二、编程方式定义Schema
如果能事先通过case class 定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,又或者字段对不同用户有所不同),那么需要按以下三个步骤,以编程方式的创建一个DataFrame:
(1)从已有的RDD 创建一个包含Row 对象的RDD。
(2)用StructType 创建一个schema,和步骤1 中创建的RDD 的结构相匹配。
(3) 把得到的schema 应用于包含Row 对象的RDD, 调用SparkSession.createDataFrame 方法来实现这一步。
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。