Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据

2024-03-18 20:48

本文主要是介绍Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前提Spark集群已经搭建完毕,如果不知道怎么搭建,请参考这个链接:
http://qindongliang.iteye.com/blog/2224797

注意提交作业,需要使用sbt打包成一个jar,然后在主任务里面添加jar包的路径远程提交即可,无须到远程集群上执行测试,本次测试使用的是Spark的Standalone方式

sbt依赖如下:


Java代码 复制代码  收藏代码
  1. name := "spark-hello"  
  2.   
  3. version := "1.0"  
  4.   
  5. scalaVersion := "2.11.7"  
  6. //使用公司的私服  
  7. resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"  
  8. //使用内部仓储  
  9. externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)  
  10. //Hadoop的依赖  
  11. libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1"  
  12. //Spark的依赖  
  13. libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.1"  
  14. //Spark SQL 依赖  
  15. libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.4.1"  
  16. //java servlet 依赖  
  17. libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1"  
  18.       
name := "spark-hello"version := "1.0"scalaVersion := "2.11.7"
//使用公司的私服
resolvers += "Local Maven Repository" at "http://dev.bizbook-inc.com:8083/nexus/content/groups/public/"
//使用内部仓储
externalResolvers := Resolver.withDefaultResolvers(resolvers.value, mavenCentral = false)
//Hadoop的依赖
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.7.1"
//Spark的依赖
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "1.4.1"
//Spark SQL 依赖
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "1.4.1"
//java servlet 依赖
libraryDependencies += "javax.servlet" % "javax.servlet-api" % "3.0.1"


demo1:使用Scala读取HDFS的数据:

Java代码 复制代码  收藏代码
  1. /** * 
  2.    * Spark读取来自HDFS的数据 
  3.    */  
  4. ef readDataFromHDFS(): Unit ={  
  5.    //以standalone方式运行,提交到远程的spark集群上面  
  6.    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")  
  7.    conf.setJars(Seq(jarPaths));  
  8.    //得到一个Sprak上下文  
  9.    val sc = new SparkContext(conf)  
  10.    val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")  
  11.    //获取第一条数据  
  12.    //val data=textFile.first()  
  13.   // println(data)  
  14.    //遍历打印  
  15.      /** 
  16.       * collect() 方法 游标方式迭代收集每行数据 
  17.       * take(5)   取前topN条数据 
  18.       * foreach() 迭代打印 
  19.       * stop()    关闭链接 
  20.       */  
  21.   textFile.collect().take(5).foreach( line => println(line) )  
  22.    //关闭资源  
  23.    sc.stop()  
 /** ** Spark读取来自HDFS的数据*/
def readDataFromHDFS(): Unit ={//以standalone方式运行,提交到远程的spark集群上面val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("load hdfs data")conf.setJars(Seq(jarPaths));//得到一个Sprak上下文val sc = new SparkContext(conf)val textFile=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000")//获取第一条数据//val data=textFile.first()// println(data)//遍历打印/*** collect() 方法 游标方式迭代收集每行数据* take(5)   取前topN条数据* foreach() 迭代打印* stop()    关闭链接*/textFile.collect().take(5).foreach( line => println(line) )//关闭资源sc.stop()
}


demo2:使用Scala 在客户端造数据,测试Spark Sql:

Java代码 复制代码  收藏代码
  1. def mappingLocalSQL1() {  
  2.    val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("hdfs data count")  
  3.    conf.setJars(Seq(jarPaths));  
  4.    val sc = new SparkContext(conf)  
  5.    val sqlContext=new SQLContext(sc);  
  6.    //导入隐式sql的schema转换  
  7.    import sqlContext.implicits._  
  8.    val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()  
  9.    df.registerTempTable("records")  
  10.    println("Result of SELECT *:")  
  11.    sqlContext.sql("SELECT * FROM records").collect().foreach(println)  
  12.    //聚合查询  
  13.    val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)  
  14.    println(s"COUNT(*): $count")  
  15.    sc.stop()  
  16.  }  
 def mappingLocalSQL1() {val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("hdfs data count")conf.setJars(Seq(jarPaths));val sc = new SparkContext(conf)val sqlContext=new SQLContext(sc);//导入隐式sql的schema转换import sqlContext.implicits._val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF()df.registerTempTable("records")println("Result of SELECT *:")sqlContext.sql("SELECT * FROM records").collect().foreach(println)//聚合查询val count = sqlContext.sql("SELECT COUNT(*) FROM records").collect().head.getLong(0)println(s"COUNT(*): $count")sc.stop()}




Spark SQL 映射实体类的方式读取HDFS方式和字段,注意在Scala的Objcet最上面有个case 类定义,一定要放在
这里,不然会出问题:





demo2:使用Scala 远程读取HDFS文件,并映射成Spark表,以Spark Sql方式,读取top10:

Java代码 复制代码  收藏代码
  1.  val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"  
  2.   /**Spark SQL映射的到实体类的方式**/  
  3.   def mapSQL2(): Unit ={  
  4.     //使用一个类,参数都是可选类型,如果没有值,就默认为NULL  
  5.     //SparkConf指定master和任务名  
  6.     val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")  
  7.     //设置上传需要jar包  
  8.     conf.setJars(Seq(jarPaths));  
  9.     //获取Spark上下文  
  10.     val sc = new SparkContext(conf)  
  11.     //得到SQL上下文  
  12.     val sqlContext=new SQLContext(sc);  
  13.     //必须导入此行代码,才能隐式转换成表格  
  14.     import sqlContext.implicits._  
  15.     //读取一个hdfs上的文件,并根据某个分隔符split成数组  
  16.     //然后根据长度映射成对应字段值,并处理数组越界问题  
  17.     val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1"))  
  18.       .map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))  
  19.     else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)  
  20.     else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)  
  21.     else   Model( Some(p(0)),None,None,None )  
  22.       )).toDF()//转换成DF  
  23.     //注册临时表  
  24.     model.registerTempTable("monitor")  
  25.     //执行sql查询  
  26.     val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")  
  27. //    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")  
  28.       println("开始")  
  29.       it.collect().take(8).foreach(line => println(line))  
  30.       println("结束")  
  31.     sc.stop();  
  32.   }  
 val jarPaths="target/scala-2.11/spark-hello_2.11-1.0.jar"/**Spark SQL映射的到实体类的方式**/def mapSQL2(): Unit ={//使用一个类,参数都是可选类型,如果没有值,就默认为NULL//SparkConf指定master和任务名val conf = new SparkConf().setMaster("spark://h1:7077").setAppName("spark sql query hdfs file")//设置上传需要jar包conf.setJars(Seq(jarPaths));//获取Spark上下文val sc = new SparkContext(conf)//得到SQL上下文val sqlContext=new SQLContext(sc);//必须导入此行代码,才能隐式转换成表格import sqlContext.implicits._//读取一个hdfs上的文件,并根据某个分隔符split成数组//然后根据长度映射成对应字段值,并处理数组越界问题val model=sc.textFile("hdfs://h1:8020/user/webmaster/crawldb/etl_monitor/part-m-00000").map(_.split("\1")).map( p =>  ( if (p.length==4) Model(Some(p(0)), Some(p(1)), Some(p(2)), Some(p(3).toLong))else if (p.length==3) Model(Some(p(0)), Some(p(1)), Some(p(2)),None)else if (p.length==2) Model(Some(p(0)), Some(p(1)),None,None)else   Model( Some(p(0)),None,None,None ))).toDF()//转换成DF//注册临时表model.registerTempTable("monitor")//执行sql查询val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor  limit 10 ")
//    val it = sqlContext.sql("SELECT rowkey,title,dtime FROM monitor WHERE title IS  NULL AND dtime IS NOT NULL      ")println("开始")it.collect().take(8).foreach(line => println(line))println("结束")sc.stop();}


在IDEA的控制台,可以输出如下结果:

 

这篇关于Intellj IDEA +SBT + Scala + Spark Sql读取HDFS数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/823597

相关文章

MySQL中EXISTS与IN用法使用与对比分析

《MySQL中EXISTS与IN用法使用与对比分析》在MySQL中,EXISTS和IN都用于子查询中根据另一个查询的结果来过滤主查询的记录,本文将基于工作原理、效率和应用场景进行全面对比... 目录一、基本用法详解1. IN 运算符2. EXISTS 运算符二、EXISTS 与 IN 的选择策略三、性能对比

MySQL常用字符串函数示例和场景介绍

《MySQL常用字符串函数示例和场景介绍》MySQL提供了丰富的字符串函数帮助我们高效地对字符串进行处理、转换和分析,本文我将全面且深入地介绍MySQL常用的字符串函数,并结合具体示例和场景,帮你熟练... 目录一、字符串函数概述1.1 字符串函数的作用1.2 字符串函数分类二、字符串长度与统计函数2.1

SpringBoot多环境配置数据读取方式

《SpringBoot多环境配置数据读取方式》SpringBoot通过环境隔离机制,支持properties/yaml/yml多格式配置,结合@Value、Environment和@Configura... 目录一、多环境配置的核心思路二、3种配置文件格式详解2.1 properties格式(传统格式)1.

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

使用IDEA部署Docker应用指南分享

《使用IDEA部署Docker应用指南分享》本文介绍了使用IDEA部署Docker应用的四步流程:创建Dockerfile、配置IDEADocker连接、设置运行调试环境、构建运行镜像,并强调需准备本... 目录一、创建 dockerfile 配置文件二、配置 IDEA 的 Docker 连接三、配置 Do

MySQL 内存使用率常用分析语句

《MySQL内存使用率常用分析语句》用户整理了MySQL内存占用过高的分析方法,涵盖操作系统层确认及数据库层bufferpool、内存模块差值、线程状态、performance_schema性能数据... 目录一、 OS层二、 DB层1. 全局情况2. 内存占js用详情最近连续遇到mysql内存占用过高导致

解决pandas无法读取csv文件数据的问题

《解决pandas无法读取csv文件数据的问题》本文讲述作者用Pandas读取CSV文件时因参数设置不当导致数据错位,通过调整delimiter和on_bad_lines参数最终解决问题,并强调正确参... 目录一、前言二、问题复现1. 问题2. 通过 on_bad_lines=‘warn’ 跳过异常数据3

idea的终端(Terminal)cmd的命令换成linux的命令详解

《idea的终端(Terminal)cmd的命令换成linux的命令详解》本文介绍IDEA配置Git的步骤:安装Git、修改终端设置并重启IDEA,强调顺序,作为个人经验分享,希望提供参考并支持脚本之... 目录一编程、设置前二、前置条件三、android设置四、设置后总结一、php设置前二、前置条件

Mysql中设计数据表的过程解析

《Mysql中设计数据表的过程解析》数据库约束通过NOTNULL、UNIQUE、DEFAULT、主键和外键等规则保障数据完整性,自动校验数据,减少人工错误,提升数据一致性和业务逻辑严谨性,本文介绍My... 目录1.引言2.NOT NULL——制定某列不可以存储NULL值2.UNIQUE——保证某一列的每一

解密SQL查询语句执行的过程

《解密SQL查询语句执行的过程》文章讲解了SQL语句的执行流程,涵盖解析、优化、执行三个核心阶段,并介绍执行计划查看方法EXPLAIN,同时提出性能优化技巧如合理使用索引、避免SELECT*、JOIN... 目录1. SQL语句的基本结构2. SQL语句的执行过程3. SQL语句的执行计划4. 常见的性能优