HIVE 处理日志,自定义inputformat 完整版

2023-12-09 04:09

本文主要是介绍HIVE 处理日志,自定义inputformat 完整版,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

网上找了很多材料都是写了部份代码的,今天在峰哥的帮助下实现了此功能。


为何要设置此功能是由于 hive fields terminated by '||||' 不支持 字符串导致


将你的inputformat类打成jar包,如MyInputFormat.jar
将MyInputFormat.jar放到 hive/lib里,然后就可以建表了
假设你的inputFormat类路径是com.hive.myinput
则建表语句为:create table tbname(name stirng,id int, ...) stored as INPUTFORMAT 'com.hive.myinput' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

HiveIgnoreKeyTextOutputFormat是系统自带的outputformat类,你也可以自定义

由于hive是基于hadoop集群运行的,所以hadoop/lib里面也必须放入MyInputFormat.jar,


此功能需要二个CLASS 类:ClickstreamInputFormat ClickstreamRecordReader


package com.jd.cloud.clickstore;import java.io.IOException; import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapred.FileSplit; 
import org.apache.hadoop.mapred.InputSplit; 
import org.apache.hadoop.mapred.JobConf; 
import org.apache.hadoop.mapred.JobConfigurable; 
import org.apache.hadoop.mapred.RecordReader; 
import org.apache.hadoop.mapred.Reporter; 
import org.apache.hadoop.mapred.TextInputFormat;/** 
* 自定义hadoop的 org.apache.hadoop.mapred.InputFormat 
* 
* @author winston 
* 
*/ 
public class ClickstreamInputFormat extends TextInputFormat implements JobConfigurable { public RecordReader<LongWritable, Text> getRecordReader( InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(genericSplit.toString()); return new ClickstreamRecordReader((FileSplit) genericSplit,job); } 
} 



package com.jd.cloud.clickstore;import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;public class ClickstreamRecordReader implementsRecordReader<LongWritable, Text> {private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader lineReader;int maxLineLength;public ClickstreamRecordReader(FileSplit inputSplit, Configuration job)throws IOException {maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength",Integer.MAX_VALUE);start = inputSplit.getStart();end = start + inputSplit.getLength();final Path file = inputSplit.getPath();compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);// Open file and seek to the start of the splitFileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(file);boolean skipFirstLine = false;if (codec != null) {lineReader = new LineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;--start;fileIn.seek(start);}lineReader = new LineReader(fileIn, job);}if (skipFirstLine) {start += lineReader.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,int maxLineLength) {this.maxLineLength = maxLineLength;this.lineReader = new LineReader(in);this.start = offset;this.pos = offset;this.end = endOffset;}public ClickstreamRecordReader(InputStream in, long offset, long endOffset,Configuration job) throws IOException {this.maxLineLength = job.getInt("mapred.ClickstreamRecordReader.maxlength", Integer.MAX_VALUE);this.lineReader = new LineReader(in, job);this.start = offset;this.pos = offset;this.end = endOffset;}public LongWritable createKey() {return new LongWritable();}public Text createValue() {return new Text();}/*** Reads the next record in the split. get usefull fields from the raw nginx* log.* * @param key* key of the record which will map to the byte offset of the* record's line* @param value* the record in text format* @return true if a record existed, false otherwise* @throws IOException*/public synchronized boolean next(LongWritable key, Text value)throws IOException {// Stay within the splitwhile (pos < end) {key.set(pos);int newSize = lineReader.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));if (newSize == 0)return false;String str = value.toString().toLowerCase().replaceAll("\\@\\_\\@", "\001");value.set(str);pos += newSize;if (newSize < maxLineLength)return true;}return false;}public float getProgress() {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}public synchronized long getPos() throws IOException {return pos;}public synchronized void close() throws IOException {if (lineReader != null)lineReader.close();}// 测试 输出//public static void main(String ags[]){// String str1 ="123@_@abcd@_@fk".replaceAll("\\@\\_\\@", "\001");// System.out.println(str1);//}
}




1.上传到 HIVE 服务器上 JAVAC 编译

javac -cp ./:/usr/lib/hadoop/hadoop-common.jar:/home/op1/hadoop/hadoop-core-1.0.3.jar:/usr/lib/hadoop/lib/commons-logging-1.1.1.jar */**/*/*/*


2.JAR 打包 类文件

jar -cf ClickstreamInputFormat.jar /home/op1/uerdwdb/src/

3.复制 Hive/lib Hadoop/lib 文件夹内


4.Hive 创建表命令

create table hive_text(num int,name string,`add` string)
stored as INPUTFORMAT 'com.jd.cloud.clickstore.ClickstreamInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' 
location '/home/op1/uerdwdb/text.txt';

这篇关于HIVE 处理日志,自定义inputformat 完整版的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/472421

相关文章

Java 中的 @SneakyThrows 注解使用方法(简化异常处理的利与弊)

《Java中的@SneakyThrows注解使用方法(简化异常处理的利与弊)》为了简化异常处理,Lombok提供了一个强大的注解@SneakyThrows,本文将详细介绍@SneakyThro... 目录1. @SneakyThrows 简介 1.1 什么是 Lombok?2. @SneakyThrows

在 Spring Boot 中实现异常处理最佳实践

《在SpringBoot中实现异常处理最佳实践》本文介绍如何在SpringBoot中实现异常处理,涵盖核心概念、实现方法、与先前查询的集成、性能分析、常见问题和最佳实践,感兴趣的朋友一起看看吧... 目录一、Spring Boot 异常处理的背景与核心概念1.1 为什么需要异常处理?1.2 Spring B

python处理带有时区的日期和时间数据

《python处理带有时区的日期和时间数据》这篇文章主要为大家详细介绍了如何在Python中使用pytz库处理时区信息,包括获取当前UTC时间,转换为特定时区等,有需要的小伙伴可以参考一下... 目录时区基本信息python datetime使用timezonepandas处理时区数据知识延展时区基本信息

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

一文详解Java异常处理你都了解哪些知识

《一文详解Java异常处理你都了解哪些知识》:本文主要介绍Java异常处理的相关资料,包括异常的分类、捕获和处理异常的语法、常见的异常类型以及自定义异常的实现,文中通过代码介绍的非常详细,需要的朋... 目录前言一、什么是异常二、异常的分类2.1 受检异常2.2 非受检异常三、异常处理的语法3.1 try-

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Java Response返回值的最佳处理方案

《JavaResponse返回值的最佳处理方案》在开发Web应用程序时,我们经常需要通过HTTP请求从服务器获取响应数据,这些数据可以是JSON、XML、甚至是文件,本篇文章将详细解析Java中处理... 目录摘要概述核心问题:关键技术点:源码解析示例 1:使用HttpURLConnection获取Resp

Java中Switch Case多个条件处理方法举例

《Java中SwitchCase多个条件处理方法举例》Java中switch语句用于根据变量值执行不同代码块,适用于多个条件的处理,:本文主要介绍Java中SwitchCase多个条件处理的相... 目录前言基本语法处理多个条件示例1:合并相同代码的多个case示例2:通过字符串合并多个case进阶用法使用

Java实现优雅日期处理的方案详解

《Java实现优雅日期处理的方案详解》在我们的日常工作中,需要经常处理各种格式,各种类似的的日期或者时间,下面我们就来看看如何使用java处理这样的日期问题吧,感兴趣的小伙伴可以跟随小编一起学习一下... 目录前言一、日期的坑1.1 日期格式化陷阱1.2 时区转换二、优雅方案的进阶之路2.1 线程安全重构2