探索利用 LineageLogger 获取hive的字段级血缘关系

2024-05-09 11:28

本文主要是介绍探索利用 LineageLogger 获取hive的字段级血缘关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

apache hive 源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger 类可以获取 insert hql 的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer的原因,使我们重写 hook 类无法实现字段级血缘。

  if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")// 版本 2.3 加入|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {transformations.add(new Generator(postExecHooks));}

现在考虑通过LineageLogger 搭配日志监测服务来实现字段级血缘

  1. 加入插件 conf/hive-site.xml
  <property><name>hive.exec.post.hooks</name><value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value></property>
  1. 打开日志 conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
  1. hive任务日志目录
>set system:hive.log.dir; # 服务日志>set hive.querylog.location; #查询日志/tmp/hive-{用户名}/

4.写脚本监测

# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeErrorimport requestslog_path_list = ["/tmp/root/hive.log"
]def read_hive_log(file_path):"""读取Hive日志文件并返回包含关键词的行内容列表参数:file_path (str):Hive日志文件的路径返回:content (list):包含关键词的行内容json列表"""save_dict = {}if os.path.exists('./hash_index.log'):try:with open("./hash_index.log", 'r') as f:file_content = f.read()if file_content != '':save_dict = json.loads(file_content)except json.JSONDecodeError as e:print(f"无法将文件内容转换为JSON:{e}")new_file = log_path.split("/")[-1]if new_file in save_dict.keys():old_size = save_dict.get(new_file).get('size', 0)line_index = save_dict.get('index', 0)else:# print("此为新文件,从头开始读取")old_size = 0line_index = 0is_new_file = Falsetry:new_size: int = os.path.getsize(file_path)except Exception as e:print("读取文件大小失败:", e)new_size = 0if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):is_new_file = Truecontent = []is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = Falsetry:with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:for line_number, line in enumerate(log_file, 1):if search_keyword in line:if is_new_file:if not is_new_file_only_one:print("是新文件,从头开始读取")is_new_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberelse:if line_number >= line_index:if not is_old_file_only_one:print("是旧文件,从上次读取位置继续读取: {}".format(line_index))is_old_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberexcept Exception as e:print(f"读取Hive日志文件失败:{e}")return content, new_size, line_index, new_filedef parse_vertice(vertices):"""解析顶点数据并返回顶点字典参数:vertices(list): 顶点数据列表返回值:vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)"""vertex_dict = {}for vertex in vertices:vertex_id = vertex.get("id", "")vertex_type = vertex.get("vertexType", "")vertex_names = vertex.get("vertexId", "").split(".")if len(vertex_names) >= 3:db_name = vertex_names[0]tb_name = vertex_names[1]col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""if col_name not in partition_field:vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})return vertex_dictdef parse_edge(edges):"""解析边的函数参数:edges (list): 边的列表返回值:list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式"""edge_elem_list = []for edge in edges:source_arr = edge.get("sources", [])target_arr = edge.get("targets", [])expression = edge.get("expression", "")edge_type = edge.get("edgeType", "")edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})return edge_elem_listdef parse_lineage_log(content: list):column_info_dict = {}# 去重数据for (line_number, line) in content:try:lineage_dict = json.loads(line)vertex_dict = parse_vertice(lineage_dict.get('vertices', []))edge_list = parse_edge(lineage_dict.get('edges', []))tb, column_info = get_column_depend(vertex_dict, edge_list)column_info_dict[tb] = column_infoexcept JSONDecodeError as e:print("json解析错误: {}".format(line))print("该行错误位置: {}".format(line_number))return column_info_dictif __name__ == '__main__':print("开始启动....")log_dict = {}for log_path in log_path_list:contents, file_size, index, new_file_name = read_hive_log(log_path)column_info_dicts = parse_lineage_log(contents)print("{} 文件执行完".format(log_path))log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))with open("./hash_index.log", 'w') as f:f.write(json.dumps(log_dict))print("执行结束...")

这篇关于探索利用 LineageLogger 获取hive的字段级血缘关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/973283

相关文章

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

使用Python实现获取屏幕像素颜色值

《使用Python实现获取屏幕像素颜色值》这篇文章主要为大家详细介绍了如何使用Python实现获取屏幕像素颜色值,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 一、一个小工具,按住F10键,颜色值会跟着显示。完整代码import tkinter as tkimport pyau

python获取cmd环境变量值的实现代码

《python获取cmd环境变量值的实现代码》:本文主要介绍在Python中获取命令行(cmd)环境变量的值,可以使用标准库中的os模块,需要的朋友可以参考下... 前言全局说明在执行py过程中,总要使用到系统环境变量一、说明1.1 环境:Windows 11 家庭版 24H2 26100.4061

使用Python获取JS加载的数据的多种实现方法

《使用Python获取JS加载的数据的多种实现方法》在当今的互联网时代,网页数据的动态加载已经成为一种常见的技术手段,许多现代网站通过JavaScript(JS)动态加载内容,这使得传统的静态网页爬取... 目录引言一、动态 网页与js加载数据的原理二、python爬取JS加载数据的方法(一)分析网络请求1

通过cmd获取网卡速率的代码

《通过cmd获取网卡速率的代码》今天从群里看到通过bat获取网卡速率两段代码,感觉还不错,学习bat的朋友可以参考一下... 1、本机有线网卡支持的最高速度:%v%@echo off & setlocal enabledelayedexpansionecho 代码开始echo 65001编码获取: >

使用Python实现调用API获取图片存储到本地的方法

《使用Python实现调用API获取图片存储到本地的方法》开发一个自动化工具,用于从JSON数据源中提取图像ID,通过调用指定API获取未经压缩的原始图像文件,并确保下载结果与Postman等工具直接... 目录使用python实现调用API获取图片存储到本地1、项目概述2、核心功能3、环境准备4、代码实现

Python实现获取带合并单元格的表格数据

《Python实现获取带合并单元格的表格数据》由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,所以本文我们就来聊聊如何使用Python实现获取带合并单元格的表格数据吧... 由于在日常运维中经常出现一些合并单元格的表格,如果要获取数据比较麻烦,现将将封装成类,并通过调用list_exc

通过C#获取Excel单元格的数据类型的方法详解

《通过C#获取Excel单元格的数据类型的方法详解》在处理Excel文件时,了解单元格的数据类型有助于我们正确地解析和处理数据,本文将详细介绍如何使用FreeSpire.XLS来获取Excel单元格的... 目录引言环境配置6种常见数据类型C# 读取单元格数据类型引言在处理 Excel 文件时,了解单元格

Java根据IP地址实现归属地获取

《Java根据IP地址实现归属地获取》Ip2region是一个离线IP地址定位库和IP定位数据管理框架,这篇文章主要为大家详细介绍了Java如何使用Ip2region实现根据IP地址获取归属地,感兴趣... 目录一、使用Ip2region离线获取1、Ip2region简介2、导包3、下编程载xdb文件4、J

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数