spark学习(5)--之spark计算结果保存到oracle中

2024-06-08 14:58

本文主要是介绍spark学习(5)--之spark计算结果保存到oracle中,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在spark把计算结果保存到oracle中的操作和前边的学习到的spark计算步骤基本一样,都是
第一步创建SparkContext对象来连接spark
第二步读取文件
第三步执行计算
第四步就就开始往hadoop中保存或者oracle中保存
在创建工程的时候我们要导入spark中lib的包还需要把oracle中的驱动导入到程序当中,oracle的驱动在安装oracle的路径C:\oracle\product\10.2.0\db_1\jdbc\lib\ojdbc14.jar
这里我们主要是使用jdbc来往oracle中保存数据,需要注意保存到数据中的操作可能有个错误就是序列化问题,代码如下:

package demoimport org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库rdd3.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()//讲sparkcontext对象关闭掉sc.stop()}
}

它会报一个如下的错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializableat org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:916)at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:915)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)at org.apache.spark.rdd.RDD.foreach(RDD.scala:915)at demo.MyCountToOracle$.main(MyCountToOracle.scala:33)at demo.MyCountToOracle.main(MyCountToOracle.scala)
Caused by: java.io.NotSerializableException: oracle.jdbc.driver.T4CPreparedStatement
Serialization stack:- object not serializable (class: oracle.jdbc.driver.T4CPreparedStatement, value: oracle.jdbc.driver.T4CPreparedStatement@43d38654)- field (class: demo.MyCountToOracle$$anonfun$main$1, name: statement$1, type: interface java.sql.PreparedStatement)- object (class demo.MyCountToOracle$$anonfun$main$1, <function1>)at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)

这个是因为RDD是由分区组成,而T4CPreparedStatement没有实现序列化,所以不过在分区之间进行操作导致的解决这种问题,就需要用到一个算子foreachPartion。

package demoimport org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import java.sql.Connection
import java.sql.DriverManagerobject MyCountToOracle1 {def main(args: Array[String]): Unit = {//创建sparkcontextval conf= new SparkConf().setAppName("MyWebCount").setMaster("local");val sc=new SparkContext(conf)//读入数据val rdd1=sc.textFile("G:/msdownld.tmp/localhost_access_log.2017-07-30.txt").map((line:String)=>{//[30/Jul/2017:12:54:56 +0800] "GET /MyDemoWeb/java.jsp HTTP/1.1" 200 240 192.168.88.1 - - val  line1=line.substring(line.indexOf("\"")+1, line.lastIndexOf("\""))val line2=line1.substring(line1.indexOf(" ")+1, line1.lastIndexOf(" "))val pageName=line2.substring(line2.lastIndexOf("/")+1);(pageName,1)})val rdd2=rdd1.reduceByKey(_+_)//通过网页名称进行排序val rdd3=rdd2.sortBy(_._2, true);rdd3.foreachPartition(saveAsOracle)//讲sparkcontext对象关闭掉sc.stop()}def saveAsOracle(iter:Iterator[(String,Int)]):Unit={//创建oracle链接Class.forName("oracle.jdbc.OracleDriver") //注册Oracle的驱动val conn:Connection=DriverManager.getConnection("jdbc:oracle:thin:@192.168.112.130:1521/orcl", "scott", "tiger")val statement=conn.prepareStatement("insert into pageview values(?,?)") //循环遍历写入数据库iter.foreach(f=>{statement.setString(1, f._1)statement.setInt(2, f._2)statement.executeUpdate();})statement.close()conn.close()}
}

这篇关于spark学习(5)--之spark计算结果保存到oracle中的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1042515

相关文章

Java使用HttpClient实现图片下载与本地保存功能

《Java使用HttpClient实现图片下载与本地保存功能》在当今数字化时代,网络资源的获取与处理已成为软件开发中的常见需求,其中,图片作为网络上最常见的资源之一,其下载与保存功能在许多应用场景中都... 目录引言一、Apache HttpClient简介二、技术栈与环境准备三、实现图片下载与保存功能1.

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

Oracle修改端口号之后无法启动的解决方案

《Oracle修改端口号之后无法启动的解决方案》Oracle数据库更改端口后出现监听器无法启动的问题确实较为常见,但并非必然发生,这一问题通常源于​​配置错误或环境冲突​​,而非端口修改本身,以下是系... 目录一、问题根源分析​​​二、保姆级解决方案​​​​步骤1:修正监听器配置文件 (listener.

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio

重新对Java的类加载器的学习方式

《重新对Java的类加载器的学习方式》:本文主要介绍重新对Java的类加载器的学习方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、介绍1.1、简介1.2、符号引用和直接引用1、符号引用2、直接引用3、符号转直接的过程2、加载流程3、类加载的分类3.1、显示

Oracle 通过 ROWID 批量更新表的方法

《Oracle通过ROWID批量更新表的方法》在Oracle数据库中,使用ROWID进行批量更新是一种高效的更新方法,因为它直接定位到物理行位置,避免了通过索引查找的开销,下面给大家介绍Orac... 目录oracle 通过 ROWID 批量更新表ROWID 基本概念性能优化建议性能UoTrFPH优化建议注

PostgreSQL 序列(Sequence) 与 Oracle 序列对比差异分析

《PostgreSQL序列(Sequence)与Oracle序列对比差异分析》PostgreSQL和Oracle都提供了序列(Sequence)功能,但在实现细节和使用方式上存在一些重要差异,... 目录PostgreSQL 序列(Sequence) 与 oracle 序列对比一 基本语法对比1.1 创建序

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen

Oracle数据库常见字段类型大全以及超详细解析

《Oracle数据库常见字段类型大全以及超详细解析》在Oracle数据库中查询特定表的字段个数通常需要使用SQL语句来完成,:本文主要介绍Oracle数据库常见字段类型大全以及超详细解析,文中通过... 目录前言一、字符类型(Character)1、CHAR:定长字符数据类型2、VARCHAR2:变长字符数

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.