spark将数据写入hbase以及从hbase读取数据

2024-08-27 10:32

本文主要是介绍spark将数据写入hbase以及从hbase读取数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文:http://blog.csdn.net/u013468917/article/details/52822074

本文将介绍

1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase

2、spark从hbase中读取数据并转化为RDD

操作方式为在eclipse本地运行spark连接到远程的hbase。

java版本:1.7.0

scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自带zookeeper,选择自己部署的)

hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

将RDD写入hbase

注意点:

依赖:

将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath

此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能会有相同名称的类,不要导错

连接集群:

spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

如果不设置,默认连接的是localhost:2181会报错:connection refused 

本文使用的是第二种方式。

hbase创建表:

虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据

使用saveAsHadoopDataset写入数据

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.client.Put  
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  6. import org.apache.hadoop.hbase.mapred.TableOutputFormat  
  7. import org.apache.hadoop.hbase.util.Bytes  
  8. import org.apache.hadoop.mapred.JobConf  
  9. import org.apache.spark.SparkConf  
  10. import org.apache.spark.SparkContext  
  11. import org.apache.spark.rdd.RDD.rddToPairRDDFunctions  
  12.   
  13. object TestHBase {  
  14.   
  15.   def main(args: Array[String]): Unit = {  
  16.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  17.     val sc = new SparkContext(sparkConf)  
  18.   
  19.     val conf = HBaseConfiguration.create()  
  20.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  21.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  22.     //设置zookeeper连接端口,默认2181  
  23.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  24.   
  25.     val tablename = "account"  
  26.       
  27.     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!  
  28.     val jobConf = new JobConf(conf)  
  29.     jobConf.setOutputFormat(classOf[TableOutputFormat])  
  30.     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  31.       
  32.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  33.   
  34.   
  35.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  36.       /*一个Put对象就是一行记录,在构造方法中指定主键  
  37.        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换  
  38.        * Put.add方法接收三个参数:列族,列名,数据  
  39.        */  
  40.       val put = new Put(Bytes.toBytes(arr(0).toInt))  
  41.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  42.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  43.       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset  
  44.       (new ImmutableBytesWritable, put)   
  45.     }}  
  46.       
  47.     rdd.saveAsHadoopDataset(jobConf)  
  48.       
  49.     sc.stop()  
  50.   }  
  51.   
  52. }  

使用saveAsNewAPIHadoopDataset写入数据


[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  5. import org.apache.spark._  
  6. import org.apache.hadoop.mapreduce.Job  
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  8. import org.apache.hadoop.hbase.client.Result  
  9. import org.apache.hadoop.hbase.client.Put  
  10. import org.apache.hadoop.hbase.util.Bytes  
  11.   
  12. object TestHBase3 {  
  13.   
  14.   def main(args: Array[String]): Unit = {  
  15.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  16.     val sc = new SparkContext(sparkConf)  
  17.       
  18.     val tablename = "account"  
  19.       
  20.     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  21.     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
  22.     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  23.       
  24.     val job = new Job(sc.hadoopConfiguration)  
  25.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
  26.     job.setOutputValueClass(classOf[Result])    
  27.     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
  28.   
  29.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  30.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  31.       val put = new Put(Bytes.toBytes(arr(0)))  
  32.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  33.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  34.       (new ImmutableBytesWritable, put)   
  35.     }}  
  36.       
  37.     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  38.   }  
  39.   
  40. }  


从hbase读取数据转化成RDD

本例基于官方提供的例子

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  
  4. import org.apache.hadoop.hbase.client.HBaseAdmin  
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  6. import org.apache.spark._  
  7. import org.apache.hadoop.hbase.client.HTable  
  8. import org.apache.hadoop.hbase.client.Put  
  9. import org.apache.hadoop.hbase.util.Bytes  
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  11. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  12. import org.apache.hadoop.mapred.JobConf  
  13. import org.apache.hadoop.io._  
  14.   
  15. object TestHBase2 {  
  16.   
  17.   def main(args: Array[String]): Unit = {  
  18.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  19.     val sc = new SparkContext(sparkConf)  
  20.       
  21.     val tablename = "account"  
  22.     val conf = HBaseConfiguration.create()  
  23.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  24.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  25.     //设置zookeeper连接端口,默认2181  
  26.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  27.     conf.set(TableInputFormat.INPUT_TABLE, tablename)  
  28.   
  29.     // 如果表不存在则创建表  
  30.     val admin = new HBaseAdmin(conf)  
  31.     if (!admin.isTableAvailable(tablename)) {  
  32.       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))  
  33.       admin.createTable(tableDesc)  
  34.     }  
  35.   
  36.     //读取数据并转化成rdd  
  37.     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  
  38.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  39.       classOf[org.apache.hadoop.hbase.client.Result])  
  40.   
  41.     val count = hBaseRDD.count()  
  42.     println(count)  
  43.     hBaseRDD.foreach{case (_,result) =>{  
  44.       //获取行键  
  45.       val key = Bytes.toString(result.getRow)  
  46.       //通过列族和列名获取列  
  47.       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))  
  48.       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))  
  49.       println("Row key:"+key+" Name:"+name+" Age:"+age)  
  50.     }}  
  51.   
  52.     sc.stop()  
  53.     admin.close()  
  54.   }  
  55. }  


这篇关于spark将数据写入hbase以及从hbase读取数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1111416

相关文章

SQL Server修改数据库名及物理数据文件名操作步骤

《SQLServer修改数据库名及物理数据文件名操作步骤》在SQLServer中重命名数据库是一个常见的操作,但需要确保用户具有足够的权限来执行此操作,:本文主要介绍SQLServer修改数据... 目录一、背景介绍二、操作步骤2.1 设置为单用户模式(断开连接)2.2 修改数据库名称2.3 查找逻辑文件名

canal实现mysql数据同步的详细过程

《canal实现mysql数据同步的详细过程》:本文主要介绍canal实现mysql数据同步的详细过程,本文通过实例图文相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的... 目录1、canal下载2、mysql同步用户创建和授权3、canal admin安装和启动4、canal

使用SpringBoot整合Sharding Sphere实现数据脱敏的示例

《使用SpringBoot整合ShardingSphere实现数据脱敏的示例》ApacheShardingSphere数据脱敏模块,通过SQL拦截与改写实现敏感信息加密存储,解决手动处理繁琐及系统改... 目录痛点一:痛点二:脱敏配置Quick Start——Spring 显示配置:1.引入依赖2.创建脱敏

详解如何使用Python构建从数据到文档的自动化工作流

《详解如何使用Python构建从数据到文档的自动化工作流》这篇文章将通过真实工作场景拆解,为大家展示如何用Python构建自动化工作流,让工具代替人力完成这些数字苦力活,感兴趣的小伙伴可以跟随小编一起... 目录一、Excel处理:从数据搬运工到智能分析师二、PDF处理:文档工厂的智能生产线三、邮件自动化:

Python数据分析与可视化的全面指南(从数据清洗到图表呈现)

《Python数据分析与可视化的全面指南(从数据清洗到图表呈现)》Python是数据分析与可视化领域中最受欢迎的编程语言之一,凭借其丰富的库和工具,Python能够帮助我们快速处理、分析数据并生成高质... 目录一、数据采集与初步探索二、数据清洗的七种武器1. 缺失值处理策略2. 异常值检测与修正3. 数据

pandas实现数据concat拼接的示例代码

《pandas实现数据concat拼接的示例代码》pandas.concat用于合并DataFrame或Series,本文主要介绍了pandas实现数据concat拼接的示例代码,具有一定的参考价值,... 目录语法示例:使用pandas.concat合并数据默认的concat:参数axis=0,join=

C#代码实现解析WTGPS和BD数据

《C#代码实现解析WTGPS和BD数据》在现代的导航与定位应用中,准确解析GPS和北斗(BD)等卫星定位数据至关重要,本文将使用C#语言实现解析WTGPS和BD数据,需要的可以了解下... 目录一、代码结构概览1. 核心解析方法2. 位置信息解析3. 经纬度转换方法4. 日期和时间戳解析5. 辅助方法二、L

使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)

《使用Python和Matplotlib实现可视化字体轮廓(从路径数据到矢量图形)》字体设计和矢量图形处理是编程中一个有趣且实用的领域,通过Python的matplotlib库,我们可以轻松将字体轮廓... 目录背景知识字体轮廓的表示实现步骤1. 安装依赖库2. 准备数据3. 解析路径指令4. 绘制图形关键

Java如何从Redis中批量读取数据

《Java如何从Redis中批量读取数据》:本文主要介绍Java如何从Redis中批量读取数据的情况,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一.背景概述二.分析与实现三.发现问题与屡次改进3.1.QPS过高而且波动很大3.2.程序中断,抛异常3.3.内存消

解决mysql插入数据锁等待超时报错:Lock wait timeout exceeded;try restarting transaction

《解决mysql插入数据锁等待超时报错:Lockwaittimeoutexceeded;tryrestartingtransaction》:本文主要介绍解决mysql插入数据锁等待超时报... 目录报错信息解决办法1、数据库中执行如下sql2、再到 INNODB_TRX 事务表中查看总结报错信息Lock