将flink输出到hdfs的数据压缩成gzip格式

2024-06-08 21:58

本文主要是介绍将flink输出到hdfs的数据压缩成gzip格式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

BaseRow.class

import java.io.Serializable;/*** 里面保存的要输出的分区目录和数据*/
public class BaseRow implements Serializable {/*** 分区目录*/private String partPath;/*** 输出数据*/private String result;public BaseRow() {}public BaseRow(String partPath, String result) {this.partPath = partPath;this.result = result;}public String getPartPath() {return partPath;}public void setPartPath(String partPath) {this.partPath = partPath;}public String getResult() {return result;}public void setResult(String result) {this.result = result;}@Overridepublic String toString() {return "BaseRow{" +"partPath='" + partPath + '\'' +", result='" + result + '\'' +'}';}
}

CompressionOutputStreamWrapper.class

import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.Serializable;public class CompressionOutputStreamWrapper implements Serializable {private CompressionOutputStream compressionOutputStream;private long pos;public CompressionOutputStreamWrapper() {}public CompressionOutputStreamWrapper(CompressionOutputStream compressionOutputStream, long pos) {this.compressionOutputStream = compressionOutputStream;this.pos = pos;}public CompressionOutputStream getCompressionOutputStream() {return compressionOutputStream;}public void setCompressionOutputStream(CompressionOutputStream compressionOutputStream) {this.compressionOutputStream = compressionOutputStream;}public long getPos() {return pos;}public void setPos(long pos) {this.pos = pos;}@Overridepublic String toString() {return "CompressionOutputStreamWrapper{" +"compressionOutputStream=" + compressionOutputStream +", pos=" + pos +'}';}
}

MyStreamWriterBase.class

import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;import java.io.IOException;public abstract class MyStreamWriterBase<T> implements Writer<T> {private static final long serialVersionUID = 2L;/*** The {@code FSDataOutputStream} for the current part file.*/private transient FSDataOutputStream outStream;//private transient CompressionOutputStream compressionOutputStream;private transient CompressionOutputStreamWrapper compressionOutputStreamWrapper;private boolean syncOnFlush;private String compressionCodec;public MyStreamWriterBase() {}public MyStreamWriterBase(String compressionCodec) {this.compressionCodec = compressionCodec;}protected MyStreamWriterBase(MyStreamWriterBase<T> other) {this.syncOnFlush = other.syncOnFlush;this.compressionCodec = other.compressionCodec;}/*** Controls whether to sync {@link FSDataOutputStream} on flush.*/public void setSyncOnFlush(boolean syncOnFlush) {this.syncOnFlush = syncOnFlush;}/*** Returns the current output stream, if the stream is open.* //*/@Overridepublic void open(FileSystem fs, Path path) throws IOException {if (outStream != null) {throw new IllegalStateException("Writer has already been opened");}outStream = fs.create(path, false);Class<?> codecClass = null;try {codecClass = Class.forName(compressionCodec);} catch (ClassNotFoundException e) {e.printStackTrace();}Configuration conf = fs.getConf();CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);compressionOutputStream = codec.createOutputStream(outStream);compressionOutputStreamWrapper = new CompressionOutputStreamWrapper();compressionOutputStreamWrapper.setCompressionOutputStream(compressionOutputStream);compressionOutputStreamWrapper.setPos(0);}@Overridepublic long flush() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}if (!syncOnFlush) {compressionOutputStream.flush();}return compressionOutputStreamWrapper.getPos();}@Overridepublic long getPos() throws IOException {if (outStream == null) {throw new IllegalStateException("Writer is not open");}return compressionOutputStreamWrapper.getPos();}@Overridepublic void close() throws IOException {if (compressionOutputStream != null) {flush();compressionOutputStream.close();compressionOutputStream = null;}if (outStream != null) {outStream.close();outStream = null;}}public boolean isSyncOnFlush() {return syncOnFlush;}protected CompressionOutputStream getCompressionStream() {if (compressionOutputStream == null) {throw new IllegalStateException("Output stream has not been opened");}return compressionOutputStream;}public CompressionOutputStreamWrapper getCompressionOutputStreamWrapper() {return compressionOutputStreamWrapper;}public void setCompressionOutputStreamWrapper(CompressionOutputStreamWrapper compressionOutputStreamWrapper) {this.compressionOutputStreamWrapper = compressionOutputStreamWrapper;}
}

MyStringWriter.class

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionOutputStream;import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.IllegalCharsetNameException;
import java.nio.charset.UnsupportedCharsetException;public class MyStringWriter<T> extends MyStreamWriterBase<T> {private static final long serialVersionUID = 1L;private String charsetName;private transient Charset charset;/*** Creates a new {@code StringWriter} that uses {@code "UTF-8"} charset to convert* strings to bytes.*/public MyStringWriter() {this("UTF-8");}public MyStringWriter(String compressionCodec, String charsetName) {super(compressionCodec);if(StringUtils.isBlank(charsetName)) {this.charsetName = "UTF-8";} else {this.charsetName = charsetName;}}/*** Creates a new {@code StringWriter} that uses the given charset to convert* strings to bytes.** @param charsetName Name of the charset to be used, must be valid input for {@code Charset.forName(charsetName)}*/public MyStringWriter(String charsetName) {this.charsetName = charsetName;}protected MyStringWriter(MyStringWriter<T> other) {super(other);this.charsetName = other.charsetName;}@Overridepublic void open(FileSystem fs, Path path) throws IOException {super.open(fs, path);try {this.charset = Charset.forName(charsetName);} catch (IllegalCharsetNameException e) {throw new IOException("The charset " + charsetName + " is not valid.", e);} catch (UnsupportedCharsetException e) {throw new IOException("The charset " + charsetName + " is not supported.", e);}}@Overridepublic void write(T element) throws IOException {BaseRow baseRow = (BaseRow) element;CompressionOutputStreamWrapper compressionOutputStreamWrapper = getCompressionOutputStreamWrapper();CompressionOutputStream outputStream = compressionOutputStreamWrapper.getCompressionOutputStream();byte[] bytes = baseRow.getResult().getBytes(charset);outputStream.write(bytes);outputStream.write('\n');long pos = compressionOutputStreamWrapper.getPos();pos += bytes.length + 1;compressionOutputStreamWrapper.setPos(pos);}@Overridepublic MyStringWriter<T> duplicate() {return new MyStringWriter<>(this);}String getCharsetName() {return charsetName;}
}

使用方式

DataStream<BaseRow> dataStream;dataStream = rowDataStream.map((MapFunction<Row, BaseRow>) row -> {try {return new BaseRow(null, (String) row.getField(0));} catch (Exception ex) {return null;}});
sink.setWriter(new MyStringWriter("org.apache.hadoop.io.compress.GzipCodec", null));
dataStream.addSink(sink);

这篇关于将flink输出到hdfs的数据压缩成gzip格式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1043425

相关文章

Python中你不知道的gzip高级用法分享

《Python中你不知道的gzip高级用法分享》在当今大数据时代,数据存储和传输成本已成为每个开发者必须考虑的问题,Python内置的gzip模块提供了一种简单高效的解决方案,下面小编就来和大家详细讲... 目录前言:为什么数据压缩如此重要1. gzip 模块基础介绍2. 基本压缩与解压缩操作2.1 压缩文

Mysql常见的SQL语句格式及实用技巧

《Mysql常见的SQL语句格式及实用技巧》本文系统梳理MySQL常见SQL语句格式,涵盖数据库与表的创建、删除、修改、查询操作,以及记录增删改查和多表关联等高级查询,同时提供索引优化、事务处理、临时... 目录一、常用语法汇总二、示例1.数据库操作2.表操作3.记录操作 4.高级查询三、实用技巧一、常用语

利用Python脚本实现批量将图片转换为WebP格式

《利用Python脚本实现批量将图片转换为WebP格式》Python语言的简洁语法和库支持使其成为图像处理的理想选择,本文将介绍如何利用Python实现批量将图片转换为WebP格式的脚本,WebP作为... 目录简介1. python在图像处理中的应用2. WebP格式的原理和优势2.1 WebP格式与传统

C++ 函数 strftime 和时间格式示例详解

《C++函数strftime和时间格式示例详解》strftime是C/C++标准库中用于格式化日期和时间的函数,定义在ctime头文件中,它将tm结构体中的时间信息转换为指定格式的字符串,是处理... 目录C++ 函数 strftipythonme 详解一、函数原型二、功能描述三、格式字符串说明四、返回值五

C#实现将Office文档(Word/Excel/PDF/PPT)转为Markdown格式

《C#实现将Office文档(Word/Excel/PDF/PPT)转为Markdown格式》Markdown凭借简洁的语法、优良的可读性,以及对版本控制系统的高度兼容性,逐渐成为最受欢迎的文档格式... 目录为什么要将文档转换为 Markdown 格式使用工具将 Word 文档转换为 Markdown(.

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Java中JSON格式反序列化为Map且保证存取顺序一致的问题

《Java中JSON格式反序列化为Map且保证存取顺序一致的问题》:本文主要介绍Java中JSON格式反序列化为Map且保证存取顺序一致的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录背景问题解决方法总结背景做项目涉及两个微服务之间传数据时,需要提供方将Map类型的数据序列化为co

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Ubuntu上手动安装Go环境并解决“可执行文件格式错误”问题

《Ubuntu上手动安装Go环境并解决“可执行文件格式错误”问题》:本文主要介绍Ubuntu上手动安装Go环境并解决“可执行文件格式错误”问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未... 目录一、前言二、系统架构检测三、卸载旧版 Go四、下载并安装正确版本五、配置环境变量六、验证安装七、常见

使用Java将实体类转换为JSON并输出到控制台的完整过程

《使用Java将实体类转换为JSON并输出到控制台的完整过程》在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用JSON格式,用Java将实体类转换为J... 在软件开发的过程中,Java是一种广泛使用的编程语言,而在众多应用中,数据的传输和存储经常需要使用j