sparksession操作hive数据库
1.需要将core-site.xml,hdfs-site.xml,hive-site.xml放入resources中。
2.创建sparksession的时候加入属性
.enableHiveSupport()
3.操作hive数据库
spark.sql("use hisms_sl")spark.sql("show tables").show(10)val data:DataFrame=spark.sql("select * from sl_data")data.createOrReplaceTempView("sl_data")data.select(data("id"),data("zfy")).filter(data("id").<=(100)).show()val table1:DataFrame=spark.sql("select\n`id` as id,\n`BAH` as akc190,\nconcat(`XM`,`CSRQ`) as aac001 \nfrom sl_data") //保存数据到hive中 table2.write.format("orc").saveAsTable("sn_20180108")
sparksession读取text文件,存储到mysql数据库
val text:DataFrame=spark.read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").option("header", true)//首行作为字段名.option("delimiter", ",")//指定定界符--分隔符.load("file:///C:\\Users\\91BGJK2\\Desktop\\sl_hive.txt") text.createOrReplaceTempView("hisms") val table1:DataFrame=spark.sql("select\n`id` as id,\n`BAH` as akc190,\nconcat(`XM`,`CSRQ`) as aac001\nfrom hisms") //保存到jdbc table1.write.mode(SaveMode.Append).format("jdbc") //记得指定编码,防止写入数据库乱码.option("url","jdbc:mysql://10.111.121.111:3306/hisms_sn?useUnicode=true&characterEncoding=UTF-8").option("dbtable","sn_test2").option("user", "root").option("password", "root").save();
/*** 配置文件读取mysql*/ val properties=Map("url"->"jdbc:mysql://10.111.121.111:3306/hisms_sn","driver"->"com.mysql.jdbc.Driver","user"->"root","dbtable"->"t_drg_fz","password"->"root") val t_drg_fz=spark.read.format("jdbc").options(properties).load()