Learning Spark——Spark连接Mysql、mapPartitions高效连接HBase

2024-04-22 05:32

本文主要是介绍Learning Spark——Spark连接Mysql、mapPartitions高效连接HBase,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

执行Spark任务免不了从多个数据源拿数据,除了从HDFS获取数据以外,我们还经常从Mysql和HBase中拿数据,今天讲一下如何使用Spark查询Mysql和HBase

1. Spark查询Mysql

首先,Spark连接Mysql当然需要有Mysql的驱动包,你可以在启动时加上如下命令:

bin/spark-shell --driver-class-path /home/hadoop/jars/mysql-connector-java-5.1.34.jar --jars /home/hadoop/jars/mysql-connector-java-5.1.34.jar

还有一种更方便的方法就是直接将这个jar包放到Spark放jar包的目录下面,我的目录是/data/install/spark-2.0.0-bin-hadoop2.7/jars,这样Spark就可以直接找到Mysql驱动包了

然后给出Spark读Mysql时的标准代码:

    val imeis = spark.read.format("jdbc").options(Map("url" -> DbUtil.IMEI_DB_URL,
//        "dbtable" -> "(SELECT id,imei,imeiid FROM t_imei_all) a","dbtable" -> DbUtil.IMEI_ALL_TABLE,"user" -> DbUtil.IMEI_DB_USERNAME,"password" -> DbUtil.IMEI_DB_PASSWORD,"driver" -> "com.mysql.jdbc.Driver",
//        "fetchSize" -> "1000","partitionColumn" -> "id","lowerBound" -> "1","upperBound" -> "15509195","numPartitions" -> "20")).load()

解释一下这段代码:

1、其中spark就是SparkSession

2、如果是读操作,就是spark.read,如果是写操作,就是spark.write

3、options里面就是我们需要查询的一些具体信息,有关配置如下:

配置项含义
url数据库连接地址,如:jdbc:mysql://localhost:3306/IMEI?useUnicode=true&characterEncoding=UTF8
dbtable要查询的表,这里有两种书写方式,一种可以直接写一个表名,如:t_test;另一种是写一条查询语句,但是注意要给一个别名,如:(SELECT id,imei,imeiid FROM t_imei_all) a。建议使用第一种,第二种查询会很慢
driver数据库驱动,如Mysql的是:com.mysql.jdbc.Driver
user数据库用户名
password数据库密码
partitionColumn, lowerBound, upperBound, numPartitions这几个参数用来指定用哪个列来分区,当我们查询的量很大时,例如超过千万的数据量,如果Spark不分区查询的话很快就会报OOM异常了。而且这几个参数只要指定其中一个,其他的就也要指定,partitionColumn是要分区的列,必须是整数类型;lowerBound和upperBound是分区的上下限;numPartitions是分区数
fetchsize用于读操作,每次读取多少条记录
batchsize用于写操作,每次写入多少条记录
isolationLevel用于写操作,数据库的隔离级别
truncate用于写操作,当Spark要执行覆盖表操作时,即启用了SaveMode.Overwrite,使用truncate比使用drop或者recreate操作更高效,默认是false
createTableOptions用于写操作,可以指定建表的语句,如: CREATE TABLE t (name string) ENGINE=InnoDB

4、最后的结果是一个DataFrame,可以很方便地使用SparkSql继续其他操作

2. 如何在RDD中高效连接HBase

连接HBase直接用HBase的API就好了,我们这里重点讲的是在RDD中连接HBase,大家都知道Spark处理的都是很大的数据量,而RDD连接HBase的时候势必会产生很多与HBase的连接,这样很快就会用光连接数,这里我们使用一个算子mapPartitions来解决这个问题

mapPartitions函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代器,返回的结果也是每一个分区的迭代器。如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多

比如,将RDD中的所有数据通过JDBC连接写入数据库,如果使用map函数,可能要为每一个元素都创建一个connection,这样开销很大,如果使用mapPartitions,那么只需要针对每一个分区建立一个connection

下面上栗子:RDD中的内容是字符串,这段字符串是HBase主键rowKey的一部分,现在要根据这条字符串查出HBase中的一条信息。例如HBase的主键是aaabbb111222,RDD中存的内容是aaa,我们要查出HBase主键前缀是aaa的所有记录的主键、包名列表和时间

代码如下:

package com.trigl.spark.mainimport com.trigl.spark.util.HbaseUtil
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}/*** 高效连接HBase示例* created by Trigl at 2017-05-20 15:34*/
object HBaseDemo {def main(args: Array[String]) {Logger.getLogger("org.apache.spark").setLevel(Level.WARN)System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val sparkConf = new SparkConf().setAppName("HBaseDemo")val sc = new SparkContext(sparkConf)val data = sc.textFile("/test/imei.txt").mapPartitions(getHBaseInfo)// 结果以 主键|包列表|时间 的格式存入HDFSdata.map(l => {val rowKey = l._2._1val pkgList = l._2._2val time = l._1rowKey + "|" + pkgList + "|" + time // 用"|"分隔}).repartition(1).saveAsTextFile("/test/fenxi/cpz")sc.stop()}/*** 从HBase查询** @param iter mapPartion算子的参数是Iterator* @return 返回的也是Iterator*/def getHBaseInfo(iter: Iterator[String]): Iterator[(String, (String, String))] = {var pkgList = List[(String, (String, String))]() // 结果格式为(日期,(主键,包名集合))// 建立连接查询表val conn = HbaseUtil.getConnection(HbaseUtil.TABLE_NAME_CPZ_APP)val table = conn.getTable(TableName.valueOf(HbaseUtil.TABLE_NAME_CPZ_APP))// 新建Scan用于指定查询内容val scan = new Scan()scan.setCaching(10000)scan.setCacheBlocks(false)// 要查询的列scan.addColumn(HbaseUtil.COLUMN_FAMILY.getBytes, "packagelist".getBytes)scan.addColumn(HbaseUtil.COLUMN_FAMILY.getBytes, "cdate".getBytes)while (iter.hasNext) {// 要查询的前缀val imei = iter.next()// HBase前缀查询scan.setRowPrefixFilter(imei.getBytes)// 查询结果val resultScanner = table.getScanner(scan)val it = resultScanner.iterator()if (it.hasNext) {val result: Result = it.next()// 主键val key = Bytes.toString(result.getRow)// 日期val cdate = Bytes.toString(result.getValue(HbaseUtil.COLUMN_FAMILY.getBytes, "cdate".getBytes))// 包列表val packagelist = Bytes.toString(result.getValue(HbaseUtil.COLUMN_FAMILY.getBytes, "packagelist".getBytes))// 添加到集合中pkgList.::=(cdate, (key, packagelist))}}// 关闭HBase连接table.close()conn.close()// 结果返回iteratorpkgList.iterator}
}

最后存的结果是:

8643960350683910864396035068383020170421134920XHt3IGTdqIKtV5Y|2627,com.sogou.activity.src,fc8dbdce14c111859fd0111b03e80cd7,0;2856,com.qihoo.appstore,aa90bca1ab548eadd44a0c1d8c34cbda,0|2017-04-21 13:49:18

上面的完整代码见我的github:

https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/main/JDBC2Mysql.scala

https://github.com/Trigl/SparkLearning/blob/master/src/main/scala/com/trigl/spark/main/HBaseDemo.scala

Refer:

http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

http://lxw1234.com/archives/2015/07/348.htm

http://blog.csdn.net/lsshlsw/article/details/48627737

这篇关于Learning Spark——Spark连接Mysql、mapPartitions高效连接HBase的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/924927

相关文章

MySQL 中的 JSON 查询案例详解

《MySQL中的JSON查询案例详解》:本文主要介绍MySQL的JSON查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 的 jsON 路径格式基本结构路径组件详解特殊语法元素实际示例简单路径复杂路径简写操作符注意MySQL 的 J

Windows 上如果忘记了 MySQL 密码 重置密码的两种方法

《Windows上如果忘记了MySQL密码重置密码的两种方法》:本文主要介绍Windows上如果忘记了MySQL密码重置密码的两种方法,本文通过两种方法结合实例代码给大家介绍的非常详细,感... 目录方法 1:以跳过权限验证模式启动 mysql 并重置密码方法 2:使用 my.ini 文件的临时配置在 Wi

MySQL重复数据处理的七种高效方法

《MySQL重复数据处理的七种高效方法》你是不是也曾遇到过这样的烦恼:明明系统测试时一切正常,上线后却频频出现重复数据,大批量导数据时,总有那么几条不听话的记录导致整个事务莫名回滚,今天,我就跟大家分... 目录1. 重复数据插入问题分析1.1 问题本质1.2 常见场景图2. 基础解决方案:使用异常捕获3.

SQL中redo log 刷⼊磁盘的常见方法

《SQL中redolog刷⼊磁盘的常见方法》本文主要介绍了SQL中redolog刷⼊磁盘的常见方法,将redolog刷入磁盘的方法确保了数据的持久性和一致性,下面就来具体介绍一下,感兴趣的可以了解... 目录Redo Log 刷入磁盘的方法Redo Log 刷入磁盘的过程代码示例(伪代码)在数据库系统中,r

mysql中的group by高级用法

《mysql中的groupby高级用法》MySQL中的GROUPBY是数据聚合分析的核心功能,主要用于将结果集按指定列分组,并结合聚合函数进行统计计算,下面给大家介绍mysql中的groupby用法... 目录一、基本语法与核心功能二、基础用法示例1. 单列分组统计2. 多列组合分组3. 与WHERE结合使

Mysql用户授权(GRANT)语法及示例解读

《Mysql用户授权(GRANT)语法及示例解读》:本文主要介绍Mysql用户授权(GRANT)语法及示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql用户授权(GRANT)语法授予用户权限语法GRANT语句中的<权限类型>的使用WITH GRANT

Mysql如何解决死锁问题

《Mysql如何解决死锁问题》:本文主要介绍Mysql如何解决死锁问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录【一】mysql中锁分类和加锁情况【1】按锁的粒度分类全局锁表级锁行级锁【2】按锁的模式分类【二】加锁方式的影响因素【三】Mysql的死锁情况【1

SQL BETWEEN 的常见用法小结

《SQLBETWEEN的常见用法小结》BETWEEN操作符是SQL中非常有用的工具,它允许你快速选取某个范围内的值,本文给大家介绍SQLBETWEEN的常见用法,感兴趣的朋友一起看看吧... 在SQL中,BETWEEN是一个操作符,用于选取介于两个值之间的数据。它包含这两个边界值。BETWEEN操作符常用

MySQL索引的优化之LIKE模糊查询功能实现

《MySQL索引的优化之LIKE模糊查询功能实现》:本文主要介绍MySQL索引的优化之LIKE模糊查询功能实现,本文通过示例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录一、前缀匹配优化二、后缀匹配优化三、中间匹配优化四、覆盖索引优化五、减少查询范围六、避免通配符开头七、使用外部搜索引擎八、分

MySql match against工具详细用法

《MySqlmatchagainst工具详细用法》在MySQL中,MATCH……AGAINST是全文索引(Full-Textindex)的查询语法,它允许你对文本进行高效的全文搜素,支持自然语言搜... 目录一、全文索引的基本概念二、创建全文索引三、自然语言搜索四、布尔搜索五、相关性排序六、全文索引的限制七