flink Transformation算子(更新中)

2024-06-03 02:28

本文主要是介绍flink Transformation算子(更新中),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

flink Transformation算子部分

Transformation算子

map

该方法是将一个DataStream调用map方法返回一个新的DataStream。本质是将该DataStream中对应的每一条数据依次迭代出来,应用map方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中对应的每一条数据,与新生成的DataStream中数据是一一对应的,也可以说是存在着映射关系的。
package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MapDemo  {public static void main(String[] args) throws  Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataSource = env.socketTextStream("192.168.25.62", 8899);dataSource.map(new QueryCategoryNameFromMySQLFunction()).print();env.execute();}
}
 package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class QueryCategoryNameFromMySQLFunction extends RichMapFunction<String, Tuple4<String, String, String, Double>> {private Connection connection;private PreparedStatement preparedStatement;@Overridepublic void open(Configuration parameters) throws Exception {connection=DriverManager.getConnection("jdbc:mysql://localhost:3306/dy_flink?characterEncoding=utf-8","root","");preparedStatement=connection.prepareStatement("select name from t_category where id = ?");super.open(parameters);}@Overridepublic Tuple4<String, String, String, Double> map(String s) throws Exception {String[] fields = s.split(",");String cid = fields[0];preparedStatement.setInt(1,Integer.parseInt(cid));ResultSet resultSet = preparedStatement.executeQuery();String name = "未知";while (resultSet.next()){name = resultSet.getString(1);}resultSet.close();return  Tuple4.of(fields[0], fields[1], name, Double.parseDouble(fields[3]));}@Overridepublic void close() throws Exception {if(connection!=null){connection.close();}if(preparedStatement!=null){preparedStatement.close();}}
}

flatMap扁平化映射(DataStream → DataStream)

- 该方法是将一个DataStream调用flatMap方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条数据依次迭代出来,应用flatMap方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过flatMap方法传入的计算逻辑后,会返回零到多条数据。所谓的扁平化即将原来的数据压平,返回多条数据。
package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8889);SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapData = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] strings = value.split(" ");for (String string : strings) {out.collect(Tuple2.of(string, 1));}}});flatMapData.print();env.execute("pxj");}
}
package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;public class FlatMapDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8887);SingleOutputStreamOperator<Tuple2<String, Integer>> myflatMap = source.transform("MyflatMap", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}), new MyflatMap());myflatMap.print();env.execute("pxj");}
}
package com.lyj.sx.flink.day03;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;public  class MyflatMap extends AbstractStreamOperator<Tuple2<String, Integer>> implements OneInputStreamOperator<String, Tuple2<String, Integer>> {@Overridepublic void processElement(StreamRecord<String> element) throws Exception {String[] split = element.getValue().split(",");for (String s : split) {output.collect(element.replace(Tuple2.of(s,1)));}}@Overridepublic void setKeyContextElement(StreamRecord<String> record) throws Exception {System.out.println("StreamRecord..............");OneInputStreamOperator.super.setKeyContextElement(record);}
}

keyBy按key分区(DataStream → KeyedStream)

package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8888);MapFunction<String, Tuple2<String, Integer>> mapFunction = new MapFunction<String, Tuple2<String, Integer>>() {Tuple2<String, Integer> t;@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {String[] strings = s.split(" ");for (String string : strings) {t = Tuple2.of(string, 1);}return t;}};source.map(mapFunction).print();env.execute("pxj");}
}
 package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo2 {public static void main(String[] args) throws  Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<Tuple3<String, String, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, String, Double>>() {@Overridepublic Tuple3<String, String, Double> map(String s) throws Exception {String[] fields = s.split(",");return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));}});KeyedStream<Tuple3<String, String, Double>, Tuple> tuple3TupleKeyedStream = tpStream.keyBy("f0", "f1");tuple3TupleKeyedStream.print();env.execute("pxj");}
}
package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo3 {public static void main(String[] args)throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<Tuple2<String, Integer>> map = source.map(new MapFunction<String, Tuple2<String, Integer>>() {Tuple2<String,Integer> t;@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {for (String string : s.split(",")) {t=Tuple2.of(string,1);}return t;}});map.print();KeyedStream<Tuple2<String, Integer>, String> keyedStream = map.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}});keyedStream.print();env.execute("pxj");}
}
package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo4 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<Tuple3<String, String, Double>> mapped = source.map(new MapFunction<String, Tuple3<String, String, Double>>() {@Overridepublic Tuple3<String, String, Double> map(String s) throws Exception {String[] fields = s.split(",");return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));}});mapped.keyBy(new KeySelector<Tuple3<String, String, Double>, String>() {@Overridepublic String getKey(Tuple3<String, String, Double> key) throws Exception {return key.f0+key.f1;}}).print();env.execute("pxj");}
}
 package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class KeyByDemo5 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);//山东省,济南市,5000SingleOutputStreamOperator<Tuple3<String, String, Double>> map = source.map(new MapFunction<String, Tuple3<String, String, Double>>() {@Overridepublic Tuple3<String, String, Double> map(String s) throws Exception {String[] fields = s.split(",");return Tuple3.of(fields[0], fields[1], Double.parseDouble(fields[2]));}});KeyedStream<Tuple3<String, String, Double>, Tuple2<String, String>> tuple3Tuple2KeyedStream = map.keyBy(t -> Tuple2.of(t.f0, t.f1), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));tuple3Tuple2KeyedStream.print();env.execute("pxj");}
}
 package com.lyj.sx.flink.day03;import com.lyj.sx.flink.bean.DataBean;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.beans.beancontext.BeanContext;public class KeyByDemo6 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<DataBean> beanStream = source.map(new MapFunction<String, DataBean>() {@Overridepublic DataBean map(String s) throws Exception {String[] fields = s.split(",");return DataBean.of(fields[0], fields[1], Double.parseDouble(fields[2]));}});KeyedStream<DataBean, DataBean> keyedStream = beanStream.keyBy(t -> t);keyedStream.print();env.execute("pxj");}
}

filter过滤(DataStream → DataStream)

- 该方法是将一个DataStream调用filter方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条输入数据依次迭代出来,应用filter方法传入的过滤逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过fliter方法传入的过滤逻辑后,返回true就会保留这条数据,返回false就会过滤掉该数据。
 package com.lyj.sx.flink.day03;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FilterDemo1 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<String> filter = source.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String s) throws Exception {return s.startsWith("h");}}).setParallelism(2);filter.print();env.execute("pxj");}
}
 package com.lyj.sx.flink.day03;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamFilter;public class FilterDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<String> transform = source.transform("myfilter", TypeInformation.of(String.class), new StreamFilter<>(w -> w.startsWith("h")));transform.print();env.execute("pxj");}
}
package com.lyj.sx.flink.day03;import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FilterDemo3 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env= StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());DataStreamSource<String> source = env.socketTextStream("192.168.25.62", 8886);SingleOutputStreamOperator<String> transform = source.transform("MyFilterFunction", TypeInformation.of(String.class), new MyFilterFunction());transform.print();env.execute("pxj");}
}
 package com.lyj.sx.flink.day03;import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;public class MyFilterFunction extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {@Overridepublic void processElement(StreamRecord<String> element) throws Exception {String elementValue = element.getValue();if(elementValue.startsWith("h")){output.collect(element);}}@Overridepublic void setKeyContextElement(StreamRecord<String> record) throws Exception {OneInputStreamOperator.super.setKeyContextElement(record);System.out.println("setKeyContextElement.........");}
}

整理人:pxj_sx(潘陈)
日 期:2024-06-02 16:06:42

这篇关于flink Transformation算子(更新中)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1025777

相关文章

MySQL 数据库表操作完全指南:创建、读取、更新与删除实战

《MySQL数据库表操作完全指南:创建、读取、更新与删除实战》本文系统讲解MySQL表的增删查改(CURD)操作,涵盖创建、更新、查询、删除及插入查询结果,也是贯穿各类项目开发全流程的基础数据交互原... 目录mysql系列前言一、Create(创建)并插入数据1.1 单行数据 + 全列插入1.2 多行数据

linux安装、更新、卸载anaconda实践

《linux安装、更新、卸载anaconda实践》Anaconda是基于conda的科学计算环境,集成1400+包及依赖,安装需下载脚本、接受协议、设置路径、配置环境变量,更新与卸载通过conda命令... 目录随意找一个目录下载安装脚本检查许可证协议,ENTER就可以安装完毕之后激活anaconda安装更

Nginx进行平滑升级的实战指南(不中断服务版本更新)

《Nginx进行平滑升级的实战指南(不中断服务版本更新)》Nginx的平滑升级(也称为热升级)是一种在不停止服务的情况下更新Nginx版本或添加模块的方法,这种升级方式确保了服务的高可用性,避免了因升... 目录一.下载并编译新版Nginx1.下载解压2.编译二.替换可执行文件,并平滑升级1.替换可执行文件

SQL Server跟踪自动统计信息更新实战指南

《SQLServer跟踪自动统计信息更新实战指南》本文详解SQLServer自动统计信息更新的跟踪方法,推荐使用扩展事件实时捕获更新操作及详细信息,同时结合系统视图快速检查统计信息状态,重点强调修... 目录SQL Server 如何跟踪自动统计信息更新:深入解析与实战指南 核心跟踪方法1️⃣ 利用系统目录

SpringBoot中六种批量更新Mysql的方式效率对比分析

《SpringBoot中六种批量更新Mysql的方式效率对比分析》文章比较了MySQL大数据量批量更新的多种方法,指出REPLACEINTO和ONDUPLICATEKEY效率最高但存在数据风险,MyB... 目录效率比较测试结构数据库初始化测试数据批量修改方案第一种 for第二种 case when第三种

MySQL追踪数据库表更新操作来源的全面指南

《MySQL追踪数据库表更新操作来源的全面指南》本文将以一个具体问题为例,如何监测哪个IP来源对数据库表statistics_test进行了UPDATE操作,文内探讨了多种方法,并提供了详细的代码... 目录引言1. 为什么需要监控数据库更新操作2. 方法1:启用数据库审计日志(1)mysql/mariad

SpringBoot整合Apache Flink的详细指南

《SpringBoot整合ApacheFlink的详细指南》这篇文章主要为大家详细介绍了SpringBoot整合ApacheFlink的详细过程,涵盖环境准备,依赖配置,代码实现及运行步骤,感兴趣的... 目录1. 背景与目标2. 环境准备2.1 开发工具2.2 技术版本3. 创建 Spring Boot

Spring Boot 整合 Apache Flink 的详细过程

《SpringBoot整合ApacheFlink的详细过程》ApacheFlink是一个高性能的分布式流处理框架,而SpringBoot提供了快速构建企业级应用的能力,下面给大家介绍Spri... 目录Spring Boot 整合 Apache Flink 教程一、背景与目标二、环境准备三、创建项目 & 添

Oracle 通过 ROWID 批量更新表的方法

《Oracle通过ROWID批量更新表的方法》在Oracle数据库中,使用ROWID进行批量更新是一种高效的更新方法,因为它直接定位到物理行位置,避免了通过索引查找的开销,下面给大家介绍Orac... 目录oracle 通过 ROWID 批量更新表ROWID 基本概念性能优化建议性能UoTrFPH优化建议注

Redis中6种缓存更新策略详解

《Redis中6种缓存更新策略详解》Redis作为一款高性能的内存数据库,已经成为缓存层的首选解决方案,然而,使用缓存时最大的挑战在于保证缓存数据与底层数据源的一致性,本文将介绍Redis中6种缓存更... 目录引言策略一:Cache-Aside(旁路缓存)策略工作原理代码示例优缺点分析适用场景策略二:Re