理论教育 Spark SQL 支持多种数据源,包括Parquet文件

Spark SQL 支持多种数据源,包括Parquet文件

时间:2023-06-14 理论教育 版权反馈
【摘要】:Spark SQL 支持基于DataFrame 操作一系列不同的数据源。数据源可由其全名指定,如org.apache.spark.sql.parquet,而对于内建支持的数据源可以使用简写名。Spark SQL 提供对Parquet 文件的读写支持,而且Parquet 文件能够自动保存原始数据的schema。

Spark SQL 支持多种数据源,包括Parquet文件

Spark SQL 支持基于DataFrame 操作一系列不同的数据源。DataFrame 既可以当成一个普通RDD 来操作,也可以将其注册成一个临时表来查询。把DataFrame 注册为表之后,就可以基于这个表执行SQL 语句。

一、加载保存函数

所有操作都会以默认Parquet 数据源来加载数据,也可以手动指定数据源。数据源可由其全名指定,如org.apache.spark.sql.parquet,而对于内建支持的数据源可以使用简写名(JSON,Parquet,JDBC)。任意类型数据源创建的DataFrame 都可以用下面这种语法转成其他类型数据格式。

val df = sqlContext.read.format("json").load("file:///hadoop/spark-2.1.0-binhadoop2.6/

examples/src/main/resources/people.json")

df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

也可以直接直接对文件使用SQL 查询:

val df = sqlContext.sql("SELECT * FROM parquet.`file:///hadoop/spark-2.1.0-bin

hadoop2.6/examples/src/main/resources/users.parquet`")

save 操作有四种保存模式,SaveMode.ErrorIfExists(默认),如果数据已存在抛出异常;SaveMode.Append,如果数据或表已经存在,将DataFrame 的数据追加到已有数据的尾部;SaveMode.Overwrite,如果数据或表已经存在,会被DataFrame 数据覆盖;SaveMode.Ignore,如果数据已经存在,放弃保存DataFrame 数据。

二、读写Parquet 文件

Parquet 是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet 可以兼容Hadoop 生态圈中大多数计算框架(Hadoop、Spark 等),被多种查询引擎支持(Hive、Impala、Drill 等)。Spark SQL 提供对Parquet 文件的读写支持,而且Parquet 文件能够自动保存原始数据的schema。写Parquet 文件的时候,所有的字段都会自动转成nullable,以便向后兼容。

会在/soft/data 下生成文件夹people.parquet,里面有两个文件:part-00000-b7110dce-a8f1-4f20-8bf2-d936839aec8d.snappy.parquet,_SUCCESS。执行结果为:

三、连接Hive 操作数据

为了让Spark 能够连接到Hive 的原有数据仓库,需要将Hive 中的hive-site.xml文件拷贝到Spark 的conf 目录下,这样就可以通过这个配置文件找到Hive 的元数据以及元数据存放位置(MySQL 中),还需要准备将MySQL 相关驱动mysql-connectorjava-5.1.40-bin.jar 拷贝到spark 的jars 目录下。

由于Hive 是单节点,而Spark 是集群方式运行,在进行Hive 数据操作的时候,需要将代码运行在Hive 的部署节点上,本书中的Hive 是部署在slave2 节点,因此需要在slave2 上启动spark-shell 运行。

四、连接JDBC 读写数据

Spark SQL 支持使用JDBC 访问其他数据库,通过使用JdbcRDD 可以方便的操作Spark SQL 的DataFrame。需要将MySQL 相关驱动拷贝到spark 的jars 目录下,如果前面已经操作过,可以忽略此步。

1.读取MySQL 数据库中的数据

在MySQL 中执行如下操作:

salve2 节点运行spark-shell 执行如下命令:

val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("dbtable", "student").option("user", "root").option("password","Aa123456@").load()

jdbcDF.show

运行结果为:

| 2| John| 36|

| 3|Xiaoming| 26|

+---+--------+---+(www.daowen.com)

也可以通过如下方式创建JdbcRDD:

val prop = new Properties()

prop.put(“user”, “root”) //表示用户名是root

prop.put(“password”, “Aa123456”) //表示密码是hadoop

val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306/spark", "student",connectionProperties).load

2.向MySQL 插入数据

import java.util.Properties

import org.apache.spark.sql.types._

import org.apache.spark.sql.Row

//两条数据表示两个学生信息

val studentRDD = spark.sparkContext.parallelize(Array("4 Lili 25","5 Mike 63")).map(_.split(" "))

//设置模式信息

val schema = StructType(List(StructField("id", IntegerType, true),StructField("name",StringType, true),StructField("age", IntegerType, true)))

//创建Row 对象,每个Row 对象都是rowRDD 中的一行

val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))

//建立起Row 对象和模式之间的对应关系,也就是把数据和模式对应起来

val studentDF = spark.createDataFrame(rowRDD, schema)

//创建一个prop 变量用来保存JDBC 连接参数

val prop = new Properties()

prop.put(“user”, “root”) //表示用户名是root

prop.put(“password”, “Aa123456@”) //表示密码是Aa123456@

//连接数据库,采用append 模式,表示追加记录到数据库spark 的student 表中

studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "student",prop)

//也可以通过如下执行保存操作

//studentDF.write.mode("append")format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("dbtable", "student").option("user", "root").option("password", "Aa123456@").save

在MySQL 执行select * from student;运行结果为:

免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。

我要反馈