探索利用 LineageLogger 获取hive的字段级血缘关系

2024-05-09 11:28

本文主要是介绍探索利用 LineageLogger 获取hive的字段级血缘关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

apache hive 源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger 类可以获取 insert hql 的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer的原因,使我们重写 hook 类无法实现字段级血缘。

  if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")// 版本 2.3 加入|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {transformations.add(new Generator(postExecHooks));}

现在考虑通过LineageLogger 搭配日志监测服务来实现字段级血缘

  1. 加入插件 conf/hive-site.xml
  <property><name>hive.exec.post.hooks</name><value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value></property>
  1. 打开日志 conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
  1. hive任务日志目录
>set system:hive.log.dir; # 服务日志>set hive.querylog.location; #查询日志/tmp/hive-{用户名}/

4.写脚本监测

# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeErrorimport requestslog_path_list = ["/tmp/root/hive.log"
]def read_hive_log(file_path):"""读取Hive日志文件并返回包含关键词的行内容列表参数:file_path (str):Hive日志文件的路径返回:content (list):包含关键词的行内容json列表"""save_dict = {}if os.path.exists('./hash_index.log'):try:with open("./hash_index.log", 'r') as f:file_content = f.read()if file_content != '':save_dict = json.loads(file_content)except json.JSONDecodeError as e:print(f"无法将文件内容转换为JSON:{e}")new_file = log_path.split("/")[-1]if new_file in save_dict.keys():old_size = save_dict.get(new_file).get('size', 0)line_index = save_dict.get('index', 0)else:# print("此为新文件,从头开始读取")old_size = 0line_index = 0is_new_file = Falsetry:new_size: int = os.path.getsize(file_path)except Exception as e:print("读取文件大小失败:", e)new_size = 0if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):is_new_file = Truecontent = []is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = Falsetry:with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:for line_number, line in enumerate(log_file, 1):if search_keyword in line:if is_new_file:if not is_new_file_only_one:print("是新文件,从头开始读取")is_new_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberelse:if line_number >= line_index:if not is_old_file_only_one:print("是旧文件,从上次读取位置继续读取: {}".format(line_index))is_old_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberexcept Exception as e:print(f"读取Hive日志文件失败:{e}")return content, new_size, line_index, new_filedef parse_vertice(vertices):"""解析顶点数据并返回顶点字典参数:vertices(list): 顶点数据列表返回值:vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)"""vertex_dict = {}for vertex in vertices:vertex_id = vertex.get("id", "")vertex_type = vertex.get("vertexType", "")vertex_names = vertex.get("vertexId", "").split(".")if len(vertex_names) >= 3:db_name = vertex_names[0]tb_name = vertex_names[1]col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""if col_name not in partition_field:vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})return vertex_dictdef parse_edge(edges):"""解析边的函数参数:edges (list): 边的列表返回值:list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式"""edge_elem_list = []for edge in edges:source_arr = edge.get("sources", [])target_arr = edge.get("targets", [])expression = edge.get("expression", "")edge_type = edge.get("edgeType", "")edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})return edge_elem_listdef parse_lineage_log(content: list):column_info_dict = {}# 去重数据for (line_number, line) in content:try:lineage_dict = json.loads(line)vertex_dict = parse_vertice(lineage_dict.get('vertices', []))edge_list = parse_edge(lineage_dict.get('edges', []))tb, column_info = get_column_depend(vertex_dict, edge_list)column_info_dict[tb] = column_infoexcept JSONDecodeError as e:print("json解析错误: {}".format(line))print("该行错误位置: {}".format(line_number))return column_info_dictif __name__ == '__main__':print("开始启动....")log_dict = {}for log_path in log_path_list:contents, file_size, index, new_file_name = read_hive_log(log_path)column_info_dicts = parse_lineage_log(contents)print("{} 文件执行完".format(log_path))log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))with open("./hash_index.log", 'w') as f:f.write(json.dumps(log_dict))print("执行结束...")

这篇关于探索利用 LineageLogger 获取hive的字段级血缘关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/973283

相关文章

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

Java发送SNMP至交换机获取交换机状态实现方式

《Java发送SNMP至交换机获取交换机状态实现方式》文章介绍使用SNMP4J库(2.7.0)通过RCF1213-MIB协议获取交换机单/多路状态,需开启SNMP支持,重点对比SNMPv1、v2c、v... 目录交换机协议SNMP库获取交换机单路状态获取交换机多路状态总结交换机协议这里使用的交换机协议为常

MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决

《MyBatis/MyBatis-Plus同事务循环调用存储过程获取主键重复问题分析及解决》MyBatis默认开启一级缓存,同一事务中循环调用查询方法时会重复使用缓存数据,导致获取的序列主键值均为1,... 目录问题原因解决办法如果是存储过程总结问题myBATis有如下代码获取序列作为主键IdMappe

C#使用iText获取PDF的trailer数据的代码示例

《C#使用iText获取PDF的trailer数据的代码示例》开发程序debug的时候,看到了PDF有个trailer数据,挺有意思,于是考虑用代码把它读出来,那么就用到我们常用的iText框架了,所... 目录引言iText 核心概念C# 代码示例步骤 1: 确保已安装 iText步骤 2: C# 代码程

Spring Boot中获取IOC容器的多种方式

《SpringBoot中获取IOC容器的多种方式》本文主要介绍了SpringBoot中获取IOC容器的多种方式,包括直接注入、实现ApplicationContextAware接口、通过Spring... 目录1. 直接注入ApplicationContext2. 实现ApplicationContextA

python获取指定名字的程序的文件路径的两种方法

《python获取指定名字的程序的文件路径的两种方法》本文主要介绍了python获取指定名字的程序的文件路径的两种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要... 最近在做项目,需要用到给定一个程序名字就可以自动获取到这个程序在Windows系统下的绝对路径,以下

SpringBoot 获取请求参数的常用注解及用法

《SpringBoot获取请求参数的常用注解及用法》SpringBoot通过@RequestParam、@PathVariable等注解支持从HTTP请求中获取参数,涵盖查询、路径、请求体、头、C... 目录SpringBoot 提供了多种注解来方便地从 HTTP 请求中获取参数以下是主要的注解及其用法:1

解决hive启动时java.net.ConnectException:拒绝连接的问题

《解决hive启动时java.net.ConnectException:拒绝连接的问题》Hadoop集群连接被拒,需检查集群是否启动、关闭防火墙/SELinux、确认安全模式退出,若问题仍存,查看日志... 目录错误发生原因解决方式1.关闭防火墙2.关闭selinux3.启动集群4.检查集群是否正常启动5.

Python获取浏览器Cookies的四种方式小结

《Python获取浏览器Cookies的四种方式小结》在进行Web应用程序测试和开发时,获取浏览器Cookies是一项重要任务,本文我们介绍四种用Python获取浏览器Cookies的方式,具有一定的... 目录什么是 Cookie?1.使用Selenium库获取浏览器Cookies2.使用浏览器开发者工具

Java获取当前时间String类型和Date类型方式

《Java获取当前时间String类型和Date类型方式》:本文主要介绍Java获取当前时间String类型和Date类型方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,... 目录Java获取当前时间String和Date类型String类型和Date类型输出结果总结Java获取