Spark SQL 支持基于DataFrame 操作一系列不同的数据源。DataFrame 既可以当成一个普通RDD 来操作,也可以将其注册成一个临时表来查询。把DataFrame 注册为表之后,就可以基于这个表执行SQL 语句。
一、加载保存函数
所有操作都会以默认Parquet 数据源来加载数据,也可以手动指定数据源。数据源可由其全名指定,如org.apache.spark.sql.parquet,而对于内建支持的数据源可以使用简写名(JSON,Parquet,JDBC)。任意类型数据源创建的DataFrame 都可以用下面这种语法转成其他类型数据格式。
val df = sqlContext.read.format("json").load("file:///hadoop/spark-2.1.0-binhadoop2.6/
examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
也可以直接直接对文件使用SQL 查询:
val df = sqlContext.sql("SELECT * FROM parquet.`file:///hadoop/spark-2.1.0-bin
hadoop2.6/examples/src/main/resources/users.parquet`")
save 操作有四种保存模式,SaveMode.ErrorIfExists(默认),如果数据已存在抛出异常;SaveMode.Append,如果数据或表已经存在,将DataFrame 的数据追加到已有数据的尾部;SaveMode.Overwrite,如果数据或表已经存在,会被DataFrame 数据覆盖;SaveMode.Ignore,如果数据已经存在,放弃保存DataFrame 数据。
二、读写Parquet 文件
Parquet 是一种流行的列式存储格式,可以高效地存储具有嵌套字段的记录。Parquet 可以兼容Hadoop 生态圈中大多数计算框架(Hadoop、Spark 等),被多种查询引擎支持(Hive、Impala、Drill 等)。Spark SQL 提供对Parquet 文件的读写支持,而且Parquet 文件能够自动保存原始数据的schema。写Parquet 文件的时候,所有的字段都会自动转成nullable,以便向后兼容。
会在/soft/data 下生成文件夹people.parquet,里面有两个文件:part-00000-b7110dce-a8f1-4f20-8bf2-d936839aec8d.snappy.parquet,_SUCCESS。执行结果为:
三、连接Hive 操作数据
为了让Spark 能够连接到Hive 的原有数据仓库,需要将Hive 中的hive-site.xml文件拷贝到Spark 的conf 目录下,这样就可以通过这个配置文件找到Hive 的元数据以及元数据存放位置(MySQL 中),还需要准备将MySQL 相关驱动mysql-connectorjava-5.1.40-bin.jar 拷贝到spark 的jars 目录下。
由于Hive 是单节点,而Spark 是集群方式运行,在进行Hive 数据操作的时候,需要将代码运行在Hive 的部署节点上,本书中的Hive 是部署在slave2 节点,因此需要在slave2 上启动spark-shell 运行。
四、连接JDBC 读写数据
Spark SQL 支持使用JDBC 访问其他数据库,通过使用JdbcRDD 可以方便的操作Spark SQL 的DataFrame。需要将MySQL 相关驱动拷贝到spark 的jars 目录下,如果前面已经操作过,可以忽略此步。
1.读取MySQL 数据库中的数据
在MySQL 中执行如下操作:
salve2 节点运行spark-shell 执行如下命令:
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("dbtable", "student").option("user", "root").option("password","Aa123456@").load()
jdbcDF.show
运行结果为:
| 2| John| 36|
| 3|Xiaoming| 26|
+---+--------+---+(www.daowen.com)
也可以通过如下方式创建JdbcRDD:
val prop = new Properties()
prop.put(“user”, “root”) //表示用户名是root
prop.put(“password”, “Aa123456”) //表示密码是hadoop
val jdbcDF2 = spark.read.jdbc("jdbc:mysql://localhost:3306/spark", "student",connectionProperties).load
2.向MySQL 插入数据
import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
//两条数据表示两个学生信息
val studentRDD = spark.sparkContext.parallelize(Array("4 Lili 25","5 Mike 63")).map(_.split(" "))
//设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name",StringType, true),StructField("age", IntegerType, true)))
//创建Row 对象,每个Row 对象都是rowRDD 中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//建立起Row 对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
//创建一个prop 变量用来保存JDBC 连接参数
val prop = new Properties()
prop.put(“user”, “root”) //表示用户名是root
prop.put(“password”, “Aa123456@”) //表示密码是Aa123456@
//连接数据库,采用append 模式,表示追加记录到数据库spark 的student 表中
studentDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark", "student",prop)
//也可以通过如下执行保存操作
//studentDF.write.mode("append")format("jdbc").option("url", "jdbc:mysql://localhost:3306/spark").option("dbtable", "student").option("user", "root").option("password", "Aa123456@").save
在MySQL 执行select * from student;运行结果为:
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。