Python 通过thrift接口连接Hbase读取存储数据

2024-03-26 10:38

本文主要是介绍Python 通过thrift接口连接Hbase读取存储数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Python 通过thrift接口连接Hbase读取存储数据

原创 2013年03月14日 23:23:41

介绍:

Hbase:开源的分布式数据库

资料介绍:http://www.oschina.net/p/hbase

Thrift:一个软件框架,用来进行可扩展且跨语言的服务的开发。最初由Facebook开发,作为Hadoop的一个工具,提供跨语言服务开发;

资料介绍:http://dongxicheng.org/search-engine/thrift-guide/

官方使用手册:http://download.csdn.net/detail/wyjzt999/5141006从安装到使用都很全面

我们项目里客户端是用python开发,因此需要Thrift提供server端,经过thrift对Hbase进行数据读写操作,性能非常不错,并且可以在Hadoop集群上做并行拓展,稳定性高,Facebook内部通信也是采用thrift来做;

 

首先学习一下Hbase的表结构:

Row Key

Row key行键 (Row key)可以是任意字符串(最大长度是 64KB,实际应用中长度一般为 10-100bytes),在hbase内部,row key保存为字节数组。

列族 (column family)

hbase表中的每个列,都归属与某个列族。列族是表的chema的一部分(而列不是),必须在使用表之前定义。列名都以列族作为前缀。例如courses:history , courses:math 都属于 courses 这个列族。

时间戳

HBase中通过row和columns确定的为一个存贮单元称为cell。每个 cell都保存着同一份数据的多个版本。版本通过时间戳来索引。时间戳的类型是 64位整型。时间戳可以由hbase(在数据写入时自动 )赋值,此时时间戳是精确到毫秒的当前系统时间。时间戳也可以由客户显式赋值。如果应用程序要避免数据版本冲突,就必须自己生成具有唯一性的时间戳。每个 cell中,不同版本的数据按照时间倒序排序,即最新的数据排在最前面。

为了避免数据存在过多版本造成的的管理 (包括存贮和索引)负担,hbase提供了两种数据版本回收方式。一是保存数据的最后n个版本,二是保存最近一段时间内的版本(比如最近七天)。用户可以针对每个列族进行设置。

对Hbase而言,表结构设计会对系统的性能以及开销上造成很大的区别;


 

 1.首先建立与thriftserver端的连接

[python] view plain copy
  1. from thrift import Thrift  
  2. from thrift.transport import TSocket, TTransport  
  3. from thrift.protocol import TBinaryProtocol  
  4. from hbase import Hbase  
  5.   
  6. #server端地址和端口  
  7. transport = TSocket.TSocket(host, port)  
  8. #可以设置超时  
  9. transport.setTimeout(5000)  
  10. #设置传输方式(TFramedTransport或TBufferedTransport)  
  11. trans = TTransport.TBufferedTransport(transport)  
  12. #设置传输协议  
  13. protocol = TBinaryProtocol.TBinaryProtocol(trans)  
  14. #确定客户端  
  15. client = Hbase.Client(protocol)  
  16. #打开连接  
  17. transport.open()  

2.然后就可以做具体的操作,比如查表,删表,插入Row Key等等

[python] view plain copy
  1. from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation, TRegionInfo  
  2. from hbase.ttypes import IOError, AlreadyExists  
  3.   
  4. #获取表名  
  5. client.getTableNames()  
  6. #创建新表  
  7. _TABLE = "keyword"  
[python] view plain copy
  1. demo = ColumnDescriptor(name='data:',maxVersions = 10)#列族data能保留最近的10个数据,每个列名后面要跟:号  
  2. createTable(_TABLE, [demo])  

[python] view plain copy
  1. #创建列名2个data:url data:word    
  2. tmp1= [Mutation(column="data:url", value="www.baidu.com")]  
  3. tmp2= [Mutation(column="data:word", value="YaGer")]  
  4. #新建2个列 (表名,行键, 列名)  
  5. client.mutateRow(_TABLE, row, tmp1)  
  6. client.mutateRow(_TABLE, row, tmp1)  


[python] view plain copy
  1. #从表中取数据  
  2. #通过最大版本数取数据  
  3. client.getByMaxver(_TABLE,'00001','data:word'10)#一次取10个版本  
  4. #取列族内数据  
  5. client.getColumns(_TABLE, '00001')  



3.支持thrift并行拓展和失效转移Failover机制

[python] view plain copy
  1. #file name:hbaseconn.py  
  2. #! /usr/bin/env python    
  3. # -*- coding: utf-8 -*-  
  4. #提供建立连接的方法和取数据操作的方法  
  5. import logging  
  6. import traceback  
  7. import time,sys  
  8. from unittest import TestCase, main  
  9. import socket  
  10.   
  11. from thrift import Thrift    
  12. from thrift.transport import TSocket    
  13. from thrift.transport import TTransport    
  14. from thrift.protocol import TBinaryProtocol    
  15. from hbase import Hbase    
  16.   
  17. from hbase.ttypes import IOError as HbaseIOError, AlreadyExists  
  18. from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation  
  19.   
  20.   
  21. _TABLE = 'thrift_check'  
  22. _ROW = 'test_for_thrift'  
  23. _DATA = 'data:word'  
  24. _VALUE = 'Flag'  
  25.   
  26. class DbConnError(Exception):  
  27.     """Conn Db exception.  
  28.      
  29.     Timeout or any except for connection from low layer api 
  30.     """  
  31.     pass  
  32.   
  33. class Connection:  
  34.     def __init__(self, trans, client, addr, port):  
  35.         self.trans = trans  
  36.         self.client = client  
  37.         self.hp = addr  
  38.         self.port = port  
  39.         pass  
  40.   
  41. class CenterDb:  
  42.     @classmethod  
  43.     def open(cls, host_port):  
  44.   
  45.         cls.tc_list = []  
  46.         for hp in host_port:  
  47.             trans, client = cls._open(*hp)  
  48.             cls.tc_list.append(Connection(trans, client, hp[0], hp[1]))  
  49.   
  50.         return cls.tc_list  
  51.  
  52.     @classmethod  
  53.     def _open(cls, host, port):  
  54.         transport = TSocket.TSocket(host, port)  
  55.         transport.setTimeout(5000)  
  56.         trans = TTransport.TBufferedTransport(transport)  
  57.         protocol = TBinaryProtocol.TBinaryProtocol(trans)  
  58.         client = Hbase.Client(protocol)  
  59.         ok = False  
  60.         try:  
  61.             trans.open()  
  62.             ok = True  
  63.         except TSocket.TTransportException, e:  
  64.             logerr('CenterDb(Hbase) Open error(%s, %d)' % (host, port))  
  65.             ok = False  
  66.         else:  
  67.             pass  
  68. #            dbg('CenterDb connected (%s, %d)' % (host, port))  
  69.         return trans, client  
  70.  
  71.     @classmethod  
  72.     def _initTable(cls, client):  
  73.         dat = ColumnDescriptor(name = 'data', maxVersions = 1)  
  74.         tmp = [Mutation(column = _DATA, value = _VALUE)]  
  75.         try:  
  76.            client.createTable(_TABLE, [dat])  
  77.            client.mutateRow(_TABLE, _ROW, tmp)  
  78.            dbg("Create Table For Thrift Test Success!")  
  79.            return  True  
  80.         except AlreadyExists:  
  81.             return True  
  82.         return False  
  83.  
  84.     @classmethod  
  85.     def _get(cls, client):  
  86.         client.getVer(_TABLE,_ROW,_DATA, 1)  
  87.         return True  
  88.  
  89.     @classmethod  
  90.     def _reconnect(cls, trans):  
  91.         trans.close()  
  92.         trans.open()  
  93.   
  94.   
  95.     def __init__(self, transport, client):  
  96.         self.t = transport  
  97.         self.c = client  
  98.   
  99.     def __str__(self):  
  100.         return 'CenterDb.open(%s, %d)' % (self.t, self.c)  
  101.   
  102.     def __del__(self):  
  103.         self.t.close()  
  104.       
  105.   
  106.     def getColumns(self, table, row):  
  107.   
  108.         tr_list = []  
  109.         tr_list = self._failover('getRow', table, row)  
  110.         if (not tr_list):  
  111.             return {}  
  112.         return tr_list[0].columns  
  113.    



[python] view plain copy
  1. #file name: thriftmanage.py  
  2. #! /usr/bin/python  
  3. # -*- coding: utf-8 -*-  
  4. #提供管理thrift连接的方法  
  5. #建一个线程循环检测thrift的可使用性,非连接池  
  6. import logging  
  7. import time,sys,random  
  8. import threading  
  9. from hbase import Hbase    
  10. from hbase.ttypes import IOError as HbaseIOError  
  11. from hbase.ttypes import ColumnDescriptor, Mutation, BatchMutation  
  12. import hbaseconn  
  13. from hbaseconn import CenterDb  
  14.   
  15.   
  16. class Failover(threading.Thread):  
  17.     def __init__(self,serverid, serverlist):  
  18.         threading.Thread.__init__(self)  
  19.   
  20.         self.serverlist = serverlist  
  21.         self.serverid = serverid  
  22.           
  23.         self.lenlist = len(serverlist)  
  24.           
  25.         self.conn_list = []  
  26.         self.invalid_con_list = []  
  27.         ''''' 
  28.             cur_conn : now oper is using  
  29.             my_conn : this schd server should use 
  30.             [(trans1,client1),(trans2,client2),...]  
  31.         '''  
  32.         self.cur_conn = None  
  33.         self.my_conn = None  
  34.           
  35.         self._makeConn()  
  36.         self._getConn()  
  37.         self._init_for_check()  
  38.   
  39.         self.switched_flag = False  
  40.   
  41.   
  42.     def _makeConn(self):  
  43.         '''''make all the conntion to thrift serverlist 
  44.         '''  
  45.         self.conn_list = CenterDb.open(self.serverlist)  
  46.   
  47.     def _getConn(self):  
  48.         '''''get only one connection from the conn_list,confirm cur_conn and my_conn 
  49.         '''  
  50.         self.cur_conn = self.my_conn = self.conn_list[(int(self.serverid) % self.lenlist)]   
  51.   
  52.         self.other_conn = self.conn_list[:]  
  53.         self.other_conn.remove(self.my_conn)  
  54.       
  55.         return True  
  56.   
  57.     def _init_for_check(self):  
  58.       
  59.         '''''init _TABLLE test_for_flag  
  60.         '''  
  61.         try:  
  62.             if not CenterDb._initTable( self.my_conn.client ):  
  63.                 dbg("Error In Create Table For Thrift Test!")  
  64.         except Exception, e:  
  65.             dbg("init_for_check thrift:%s" % e)  
  66.   
  67.           
  68.         '''''make the only conn for check proc 
  69.         '''  
  70.         self.check_conn = CenterDb.open( ((self.my_conn.hp, self.my_conn.port),))[0]    
  71.   
  72.     def _switch(self):  
  73.         '''''when my_conn failed, choose the other client randomly; 
  74.            when my_conn is reset OK, cur_conn will use my_conn again 
  75.         '''  
  76.         #print 'Schd%s come in _switch' % self.serverid  
  77.         if 0 == len(self.other_conn):  
  78.             return False  
  79.         trycount = 0  
  80.         while True:  
  81.             try:  
  82.                 if trycount == 3*self.lenlist :#try 3*length times  
  83.                     return False  
  84.                 tmp_conn = random.choice(self.other_conn)  
  85.                 #CenterDb._reconnect(tmp_conn.trans)  
  86.                 #DEBUG  
  87.                 if self._checker(tmp_conn):  
  88.                     self.cur_conn = tmp_conn  
  89.                     dbg('Schd%s _switch cur_conn: %s' % (self.serverid, self.cur_conn.hp))  
  90.                     return True  
  91.                 else:  
  92.                     trycount += 1  
  93.                     logerr('Schd%s _switch for %d times' % (self.serverid, trycount))  
  94.                     CenterDb._reconnect(tmp_conn.trans)#close this trans and try again;breakdown early  
  95.                     if self._checker(tmp_conn):  
  96.                         self.cur_conn = tmp_conn  
  97.                         return True  
  98.                     else:  
  99.                         continue#can't be used  
  100.             except Exception, e:  
  101.                 continue  
  102.   
  103.     def _failover(self,oper, *args, **kwargs):  
  104.         result = []  
  105.         try :  
  106.             result = getattr(self.cur_conn.client, oper)(*args, **kwargs)  
  107.             return result  
  108.   
  109.         except HbaseIOError, e:  
  110.              logerr("_failover : %s " % e)  
  111.   
  112.         except Exception, e:  
  113.             logerr( 'Schd%s _failover : Connect to %s thrift Server closed! Choose another......Reason:%s ' % \  
  114.                 (self.serverid, self.cur_conn.hp, e))  
  115.             self.cur_conn.trans.close()  
  116.             self.switched_flag = True  
  117.             if self._switch():  
  118.                 logerr('Schd%s _failover : Now using %s server' % (self.serverid, self.cur_conn.hp))  
  119.                 result = getattr(self.cur_conn.client, oper)(*args, **kwargs)  
  120.             else:   
  121.                 logerr( 'Schd%s _failover : Switch 3 rotate Find No Healthy Thrift server !' % self.serverid)  
  122.         return result  
  123.   
  124.     def getByMaxver(self, table, row, column, max_ver):  
  125.         """Get cell list by ver 
  126.         Get cell list no more than max versions 
  127.         Args: 
  128.             table: table name 
  129.             row: row key 
  130.             column: column name 
  131.             max_ver: max version to retrieve cell 
  132.         """  
  133.         cell_list = []  
  134.         start_time = time.time()  
  135.         cell_list = self._failover('getVer', table, row, column, max_ver)  
  136.         if (not cell_list):  
  137.             return []  
  138.           
  139.         take = time.time() - start_time  
  140.         if take > 0.015:  
  141.             logerr('Hbase over 15ms:take %s ms' % str("%.3f" % (take*1000)))  
  142.   
  143.         return map(lambda x: (x.value, x.timestamp), cell_list)  
  144.       
  145.     def getColumns(self, table, row):  
  146.         tr_list = []  
  147.         tr_list = self._failover('getRow', table, row)  
  148.         if (not tr_list):  
  149.             return {}  
  150.         return tr_list[0].columns  
  151.   
  152.                   
  153.     def _checker(self,conn):  
  154.         '''''check my_conn , be sure it is connected and get data OK 
  155.         '''  
  156.         try:  
  157.             if conn.trans.isOpen():  
  158.                 CenterDb._get(conn.client)  
  159.                 return True      
  160.             else:   
  161.                 return False   
  162.               
  163.         except Exception, e:  
  164.             logerr( 'Schd%s  _checker : Connect to %s closed! Please Restart now..... Reason: %s ' % \  
  165.                 (self.serverid, conn.hp, e))  
  166.             return False  
  167.           
  168.     def _restart(self):  
  169.         '''''if my_conn failed, restart it's trans  
  170.         '''  
  171.         while True:  
  172.             #print 'come in _restart %s ' % self.my_conn.hp  
  173.             self.my_conn.trans.close()  
  174.             self.check_conn.trans.close()  
  175.             try:  
  176.                 time.sleep(2)  
  177.                 self.my_conn.trans.open()  
  178.                 self.check_conn.trans.open()  
  179.                 if self.my_conn.trans.isOpen() and self.check_conn.trans.isOpen():  
  180.                     CenterDb._get(self.check_conn.client)  
  181.                     #self.check_conn.client.getVer('keywordurl','test_for_thrift','data:word', 1)  
  182.                     return True  
  183.                 elsecontinue      
  184.             except Exception, e:  
  185.                 logerr('Schd%s _restart : Connect to %s is not restart yet ... Reason:%s ' % \  
  186.                     (self.serverid, self.my_conn.hp, e))  
  187.                 self.my_conn.trans.close()  
  188.                 self.check_conn.trans.close()  
  189.                 continue  
  190.   
  191.     def run(self):  
  192.         while True:  
  193.             time.sleep(1)  
  194.             if self._checker(self.check_conn) and (not self.switched_flag):  
  195.                 continue  
  196.             else:  
  197.                 if self._restart():  
  198.                     logerr( 'Schd%s Connection from  %s to my:%s Recovered ! ' % \  
  199.                         (self.serverid,self.cur_conn.hp,self.my_conn.hp))  
  200.                     self.cur_conn = self.my_conn  
  201.                     self.switched_flag = False  
  202.                 continue  


使用thrift生成的代码中提供的方法有:
void enableTable(Bytes tableName)
void disableTable(Bytes tableName)
bool isTableEnabled(Bytes tableName)
void compact(Bytes tableNameOrRegionName)
void majorCompact(Bytes tableNameOrRegionName)
getTableNames()
getColumnDescriptors(Text tableName)
getTableRegions(Text tableName)
void createTable(Text tableName, columnFamilies)
void deleteTable(Text tableName)
get(Text tableName, Text row, Text column)
getVer(Text tableName, Text row, Text column, i32 numVersions)
getVerTs(Text tableName, Text row, Text column, i64 timestamp, i32 numVersions)
getRow(Text tableName, Text row)
getRowWithColumns(Text tableName, Text row,  columns)
getRowTs(Text tableName, Text row, i64 timestamp)
getRowWithColumnsTs(Text tableName, Text row,  columns, i64 timestamp)
getRows(Text tableName,  rows)
getRowsWithColumns(Text tableName,  rows,  columns)
getRowsTs(Text tableName,  rows, i64 timestamp)
getRowsWithColumnsTs(Text tableName,  rows,  columns, i64 timestamp)
void mutateRow(Text tableName, Text row,  mutations)
void mutateRowTs(Text tableName, Text row,  mutations, i64 timestamp)
void mutateRows(Text tableName,  rowBatches)
void mutateRowsTs(Text tableName,  rowBatches, i64 timestamp)
i64 atomicIncrement(Text tableName, Text row, Text column, i64 value)
void deleteAll(Text tableName, Text row, Text column)
void deleteAllTs(Text tableName, Text row, Text column, i64 timestamp)
void deleteAllRow(Text tableName, Text row)
void deleteAllRowTs(Text tableName, Text row, i64 timestamp)
ScannerID scannerOpenWithScan(Text tableName, TScan scan)
ScannerID scannerOpen(Text tableName, Text startRow,  columns)
ScannerID scannerOpenWithStop(Text tableName, Text startRow, Text stopRow,  columns)
ScannerID scannerOpenWithPrefix(Text tableName, Text startAndPrefix,  columns)
ScannerID scannerOpenTs(Text tableName, Text startRow,  columns, i64 timestamp)
ScannerID scannerOpenWithStopTs(Text tableName, Text startRow, Text stopRow,  columns, i64 timestamp)
scannerGet(ScannerID id)
scannerGetList(ScannerID id, i32 nbRows)
void scannerClose(ScannerID id)


 
 参考:http://yannramin.com/2008/07/19/using-facebook-thrift-with-python-and-hbase/

[python] view plain copy
  1. <pre name="code" class="python"><pre></pre>  
  2. <pre></pre>  
  3. <pre></pre>  
  4. <pre></pre>  
  5. <pre></pre>  
  6. <pre></pre>  
  7. <pre></pre>  
  8. <pre></pre>  
  9. <pre></pre>  
  10. <pre></pre>  
  11. <pre></pre>  
  12. <pre></pre>  
  13.                     </pre> 

这篇关于Python 通过thrift接口连接Hbase读取存储数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/848229

相关文章

Java注解之超越Javadoc的元数据利器详解

《Java注解之超越Javadoc的元数据利器详解》本文将深入探讨Java注解的定义、类型、内置注解、自定义注解、保留策略、实际应用场景及最佳实践,无论是初学者还是资深开发者,都能通过本文了解如何利用... 目录什么是注解?注解的类型内置注编程解自定义注解注解的保留策略实际用例最佳实践总结在 Java 编程

Python中模块graphviz使用入门

《Python中模块graphviz使用入门》graphviz是一个用于创建和操作图形的Python库,本文主要介绍了Python中模块graphviz使用入门,具有一定的参考价值,感兴趣的可以了解一... 目录1.安装2. 基本用法2.1 输出图像格式2.2 图像style设置2.3 属性2.4 子图和聚

Python使用Matplotlib绘制3D曲面图详解

《Python使用Matplotlib绘制3D曲面图详解》:本文主要介绍Python使用Matplotlib绘制3D曲面图,在Python中,使用Matplotlib库绘制3D曲面图可以通过mpl... 目录准备工作绘制简单的 3D 曲面图绘制 3D 曲面图添加线框和透明度控制图形视角Matplotlib

一文教你Python如何快速精准抓取网页数据

《一文教你Python如何快速精准抓取网页数据》这篇文章主要为大家详细介绍了如何利用Python实现快速精准抓取网页数据,文中的示例代码简洁易懂,具有一定的借鉴价值,有需要的小伙伴可以了解下... 目录1. 准备工作2. 基础爬虫实现3. 高级功能扩展3.1 抓取文章详情3.2 保存数据到文件4. 完整示例

MySQL 多表连接操作方法(INNER JOIN、LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN)

《MySQL多表连接操作方法(INNERJOIN、LEFTJOIN、RIGHTJOIN、FULLOUTERJOIN)》多表连接是一种将两个或多个表中的数据组合在一起的SQL操作,通过连接,... 目录一、 什么是多表连接?二、 mysql 支持的连接类型三、 多表连接的语法四、实战示例 数据准备五、连接的性

使用Python实现IP地址和端口状态检测与监控

《使用Python实现IP地址和端口状态检测与监控》在网络运维和服务器管理中,IP地址和端口的可用性监控是保障业务连续性的基础需求,本文将带你用Python从零打造一个高可用IP监控系统,感兴趣的小伙... 目录概述:为什么需要IP监控系统使用步骤说明1. 环境准备2. 系统部署3. 核心功能配置系统效果展

MySQL中的分组和多表连接详解

《MySQL中的分组和多表连接详解》:本文主要介绍MySQL中的分组和多表连接的相关操作,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友一起看看吧... 目录mysql中的分组和多表连接一、MySQL的分组(group javascriptby )二、多表连接(表连接会产生大量的数据垃圾)MySQL中的

基于Python打造一个智能单词管理神器

《基于Python打造一个智能单词管理神器》这篇文章主要为大家详细介绍了如何使用Python打造一个智能单词管理神器,从查询到导出的一站式解决,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 项目概述:为什么需要这个工具2. 环境搭建与快速入门2.1 环境要求2.2 首次运行配置3. 核心功能使用指

Java controller接口出入参时间序列化转换操作方法(两种)

《Javacontroller接口出入参时间序列化转换操作方法(两种)》:本文主要介绍Javacontroller接口出入参时间序列化转换操作方法,本文给大家列举两种简单方法,感兴趣的朋友一起看... 目录方式一、使用注解方式二、统一配置场景:在controller编写的接口,在前后端交互过程中一般都会涉及

Python实现微信自动锁定工具

《Python实现微信自动锁定工具》在数字化办公时代,微信已成为职场沟通的重要工具,但临时离开时忘记锁屏可能导致敏感信息泄露,下面我们就来看看如何使用Python打造一个微信自动锁定工具吧... 目录引言:当微信隐私遇到自动化守护效果展示核心功能全景图技术亮点深度解析1. 无操作检测引擎2. 微信路径智能获