HIVE 处理日志,自定义inputformat 完整版

2023-12-09 04:09

本文主要是介绍HIVE 处理日志,自定义inputformat 完整版,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

网上找了很多材料都是写了部份代码的,今天在峰哥的帮助下实现了此功能。


为何要设置此功能是由于 hive fields terminated by '||||' 不支持 字符串导致


将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义

由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,


此功能需要二个CLASS 类:ClickstreamInputFormat ClickstreamRecordReader


package com.jd.cloud.clickstore;import java.io.IOException; import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.JobConfigurable; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat;/** 
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat 
* 
* @author winston 
* 
*/ 
public class ClickstreamInputFormat extends TextInputFormat implements JobConfigurable { public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new ClickstreamRecordReader((FileSplit) genericSplit,job); } 
} 



package com.jd.cloud.clickstore;import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;public class ClickstreamRecordReader implementsRecordReader<LongWritable, Text> {private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader lineReader;int maxLineLength;public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)throws IOException {maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",Integer.MAX_VALUE);start = inputSplit.getStart();end = start + inputSplit.getLength();final Path file = inputSplit.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);// Open file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(file);boolean skipFirstLine = false;if (codec != null) {lineReader = new LineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;--start;fileIn.seek(start);}lineReader = new LineReader(fileIn, job);}if (skipFirstLine) {start += lineReader.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,int maxLineLength) {this.maxLineLength = maxLineLength;this.lineReader = new LineReader(in);this.start = offset;this.pos = offset;this.end = endOffset;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,Configuration job) throws IOException {this.maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength", Integer.MAX_VALUE);this.lineReader = new LineReader(in, job);this.start = offset;this.pos = offset;this.end = endOffset;}public LongWritable createKey() {return new LongWritable();}public Text createValue() {return new Text();}/*** Reads the next record in the split. get usefull fields from the raw nginx* log.* * @param key* key of the record which will map to the byte offset of the* record's line* @param value* the record in text format* @return true if a record existed, false otherwise* @throws IOException*/public synchronized boolean next(LongWritable key, Text value)throws IOException {// Stay within the splitwhile (pos < end) {key.set(pos);int newSize = lineReader.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0)return false;String str = value.toString().toLowerCase().replaceAll("\\@\\_\\@", "\001");value.set(str);pos += newSize;if (newSize < maxLineLength)return true;}return false;}public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized long getPos() throws IOException {return pos;}public synchronized void close() throws IOException {if (lineReader != null)lineReader.close();}// 测试 输出//public static void main(String ags[]){// String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001");// System.out.println(str1);//}
}




1.上传到 HIVE 服务器上 JAVAC 编译

javac -cp ./:/usr/lib/hadoop/hadoop-common.jar:/home/op1/hadoop/hadoop-core-1.0.3.jar:/usr/lib/hadoop/lib/commons-logging-1.1.1.jar */**/*/*/*


2.JAR 打包 类文件

jar -cf ClickstreamInputFormat.jar /home/op1/uerdwdb/src/

3.复制 Hive/lib Hadoop/lib 文件夹内


4.Hive 创建表命令

create table hive_text(num int,name string,`add` string)
stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
location '/home/op1/uerdwdb/text.txt';

这篇关于HIVE 处理日志,自定义inputformat 完整版的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/472421

相关文章

Spring Boot @RestControllerAdvice全局异常处理最佳实践

《SpringBoot@RestControllerAdvice全局异常处理最佳实践》本文详解SpringBoot中通过@RestControllerAdvice实现全局异常处理,强调代码复用、统... 目录前言一、为什么要使用全局异常处理?二、核心注解解析1. @RestControllerAdvice2

怎样通过分析GC日志来定位Java进程的内存问题

《怎样通过分析GC日志来定位Java进程的内存问题》:本文主要介绍怎样通过分析GC日志来定位Java进程的内存问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、GC 日志基础配置1. 启用详细 GC 日志2. 不同收集器的日志格式二、关键指标与分析维度1.

解读GC日志中的各项指标用法

《解读GC日志中的各项指标用法》:本文主要介绍GC日志中的各项指标用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、基础 GC 日志格式(以 G1 为例)1. Minor GC 日志2. Full GC 日志二、关键指标解析1. GC 类型与触发原因2. 堆

Java实现自定义table宽高的示例代码

《Java实现自定义table宽高的示例代码》在桌面应用、管理系统乃至报表工具中,表格(JTable)作为最常用的数据展示组件,不仅承载对数据的增删改查,还需要配合布局与视觉需求,而JavaSwing... 目录一、项目背景详细介绍二、项目需求详细介绍三、相关技术详细介绍四、实现思路详细介绍五、完整实现代码

一文详解Java Stream的sorted自定义排序

《一文详解JavaStream的sorted自定义排序》Javastream中的sorted方法是用于对流中的元素进行排序的方法,它可以接受一个comparator参数,用于指定排序规则,sorte... 目录一、sorted 操作的基础原理二、自定义排序的实现方式1. Comparator 接口的 Lam

MySQL 打开binlog日志的方法及注意事项

《MySQL打开binlog日志的方法及注意事项》本文给大家介绍MySQL打开binlog日志的方法及注意事项,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要... 目录一、默认状态二、如何检查 binlog 状态三、如何开启 binlog3.1 临时开启(重启后失效)

电脑提示xlstat4.dll丢失怎么修复? xlstat4.dll文件丢失处理办法

《电脑提示xlstat4.dll丢失怎么修复?xlstat4.dll文件丢失处理办法》长时间使用电脑,大家多少都会遇到类似dll文件丢失的情况,不过,解决这一问题其实并不复杂,下面我们就来看看xls... 在Windows操作系统中,xlstat4.dll是一个重要的动态链接库文件,通常用于支持各种应用程序

SQL Server数据库死锁处理超详细攻略

《SQLServer数据库死锁处理超详细攻略》SQLServer作为主流数据库管理系统,在高并发场景下可能面临死锁问题,影响系统性能和稳定性,这篇文章主要给大家介绍了关于SQLServer数据库死... 目录一、引言二、查询 Sqlserver 中造成死锁的 SPID三、用内置函数查询执行信息1. sp_w

Java对异常的认识与异常的处理小结

《Java对异常的认识与异常的处理小结》Java程序在运行时可能出现的错误或非正常情况称为异常,下面给大家介绍Java对异常的认识与异常的处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参... 目录一、认识异常与异常类型。二、异常的处理三、总结 一、认识异常与异常类型。(1)简单定义-什么是

SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志

《SpringBoot项目配置logback-spring.xml屏蔽特定路径的日志》在SpringBoot项目中,使用logback-spring.xml配置屏蔽特定路径的日志有两种常用方式,文中的... 目录方案一:基础配置(直接关闭目标路径日志)方案二:结合 Spring Profile 按环境屏蔽关