spark将数据写入hbase以及从hbase读取数据

2024-08-27 10:32

本文主要是介绍spark将数据写入hbase以及从hbase读取数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文:http://blog.csdn.net/u013468917/article/details/52822074

本文将介绍

1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase

2、spark从hbase中读取数据并转化为RDD

操作方式为在eclipse本地运行spark连接到远程的hbase。

java版本:1.7.0

scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自带zookeeper,选择自己部署的)

hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

将RDD写入hbase

注意点:

依赖:

将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath

此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能会有相同名称的类,不要导错

连接集群:

spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

如果不设置,默认连接的是localhost:2181会报错:connection refused 

本文使用的是第二种方式。

hbase创建表:

虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据

使用saveAsHadoopDataset写入数据

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.client.Put  
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  6. import org.apache.hadoop.hbase.mapred.TableOutputFormat  
  7. import org.apache.hadoop.hbase.util.Bytes  
  8. import org.apache.hadoop.mapred.JobConf  
  9. import org.apache.spark.SparkConf  
  10. import org.apache.spark.SparkContext  
  11. import org.apache.spark.rdd.RDD.rddToPairRDDFunctions  
  12.   
  13. object TestHBase {  
  14.   
  15.   def main(args: Array[String]): Unit = {  
  16.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  17.     val sc = new SparkContext(sparkConf)  
  18.   
  19.     val conf = HBaseConfiguration.create()  
  20.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  21.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  22.     //设置zookeeper连接端口,默认2181  
  23.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  24.   
  25.     val tablename = "account"  
  26.       
  27.     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!  
  28.     val jobConf = new JobConf(conf)  
  29.     jobConf.setOutputFormat(classOf[TableOutputFormat])  
  30.     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  31.       
  32.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  33.   
  34.   
  35.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  36.       /*一个Put对象就是一行记录,在构造方法中指定主键  
  37.        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换  
  38.        * Put.add方法接收三个参数:列族,列名,数据  
  39.        */  
  40.       val put = new Put(Bytes.toBytes(arr(0).toInt))  
  41.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  42.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  43.       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset  
  44.       (new ImmutableBytesWritable, put)   
  45.     }}  
  46.       
  47.     rdd.saveAsHadoopDataset(jobConf)  
  48.       
  49.     sc.stop()  
  50.   }  
  51.   
  52. }  

使用saveAsNewAPIHadoopDataset写入数据


[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  5. import org.apache.spark._  
  6. import org.apache.hadoop.mapreduce.Job  
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  8. import org.apache.hadoop.hbase.client.Result  
  9. import org.apache.hadoop.hbase.client.Put  
  10. import org.apache.hadoop.hbase.util.Bytes  
  11.   
  12. object TestHBase3 {  
  13.   
  14.   def main(args: Array[String]): Unit = {  
  15.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  16.     val sc = new SparkContext(sparkConf)  
  17.       
  18.     val tablename = "account"  
  19.       
  20.     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  21.     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
  22.     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  23.       
  24.     val job = new Job(sc.hadoopConfiguration)  
  25.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
  26.     job.setOutputValueClass(classOf[Result])    
  27.     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
  28.   
  29.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  30.     val rdd = indataRDD.map(_.split(',')).map{arr=>{  
  31.       val put = new Put(Bytes.toBytes(arr(0)))  
  32.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  33.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  34.       (new ImmutableBytesWritable, put)   
  35.     }}  
  36.       
  37.     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  38.   }  
  39.   
  40. }  


从hbase读取数据转化成RDD

本例基于官方提供的例子

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}  
  4. import org.apache.hadoop.hbase.client.HBaseAdmin  
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  6. import org.apache.spark._  
  7. import org.apache.hadoop.hbase.client.HTable  
  8. import org.apache.hadoop.hbase.client.Put  
  9. import org.apache.hadoop.hbase.util.Bytes  
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  11. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  12. import org.apache.hadoop.mapred.JobConf  
  13. import org.apache.hadoop.io._  
  14.   
  15. object TestHBase2 {  
  16.   
  17.   def main(args: Array[String]): Unit = {  
  18.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  19.     val sc = new SparkContext(sparkConf)  
  20.       
  21.     val tablename = "account"  
  22.     val conf = HBaseConfiguration.create()  
  23.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  24.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  25.     //设置zookeeper连接端口,默认2181  
  26.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  27.     conf.set(TableInputFormat.INPUT_TABLE, tablename)  
  28.   
  29.     // 如果表不存在则创建表  
  30.     val admin = new HBaseAdmin(conf)  
  31.     if (!admin.isTableAvailable(tablename)) {  
  32.       val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))  
  33.       admin.createTable(tableDesc)  
  34.     }  
  35.   
  36.     //读取数据并转化成rdd  
  37.     val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  
  38.       classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
  39.       classOf[org.apache.hadoop.hbase.client.Result])  
  40.   
  41.     val count = hBaseRDD.count()  
  42.     println(count)  
  43.     hBaseRDD.foreach{case (_,result) =>{  
  44.       //获取行键  
  45.       val key = Bytes.toString(result.getRow)  
  46.       //通过列族和列名获取列  
  47.       val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))  
  48.       val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))  
  49.       println("Row key:"+key+" Name:"+name+" Age:"+age)  
  50.     }}  
  51.   
  52.     sc.stop()  
  53.     admin.close()  
  54.   }  
  55. }  


这篇关于spark将数据写入hbase以及从hbase读取数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1111416

相关文章

在MySQL中实现冷热数据分离的方法及使用场景底层原理解析

《在MySQL中实现冷热数据分离的方法及使用场景底层原理解析》MySQL冷热数据分离通过分表/分区策略、数据归档和索引优化,将频繁访问的热数据与冷数据分开存储,提升查询效率并降低存储成本,适用于高并发... 目录实现冷热数据分离1. 分表策略2. 使用分区表3. 数据归档与迁移在mysql中实现冷热数据分

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MyBatis-Plus通用中等、大量数据分批查询和处理方法

《MyBatis-Plus通用中等、大量数据分批查询和处理方法》文章介绍MyBatis-Plus分页查询处理,通过函数式接口与Lambda表达式实现通用逻辑,方法抽象但功能强大,建议扩展分批处理及流式... 目录函数式接口获取分页数据接口数据处理接口通用逻辑工具类使用方法简单查询自定义查询方法总结函数式接口

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左