9、Flink 用户自定义 Functions 及 累加器详解

2024-05-01 18:36

本文主要是介绍9、Flink 用户自定义 Functions 及 累加器详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1)用户自定义函数
1.实现接口

最基本的方法是实现提供的接口。

# 根据提供的接口创建自定义函数
class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}# 调用创建的自定义函数
data.map(new MyMapFunction());
2.匿名类

可以将 function 当做匿名类传递。

data.map(new MapFunction<String, Integer> () {public Integer map(String value) { return Integer.parseInt(value); }
});
3.Java 8 Lambdas

Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。

data.filter(s -> s.startsWith("http://"));data.reduce((i1,i2) -> i1 + i2);
4.Rich functions

所有需要用户自定义 function 的转化操作都可以将 rich function 作为参数。

class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

替换成

class MyMapFunction extends RichMapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

并将 function 照常传递给 map transformation。

data.map(new MyMapFunction());

Rich functions 也可以定义成匿名类:

data.map (new RichMapFunction<String, Integer>() {public Integer map(String value) { return Integer.parseInt(value); }
});
2)累加器 和 计数器
1.概述

累加器是具有加法运算最终累加结果的一种简单结构,可在作业结束后使用

最简单的累加器是计数器: 可以使用 Accumulator.add(V value) 方法将其递增。

在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端,Flink 目前有如下内置累加器,每个都实现了累加器接口。

  • IntCounter , LongCounterDoubleCounter
  • Histogram(直方图): 离散数量的柱状直方图实现;在内部,它只是整形到整形的映射,可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况【详见 Metrics】。
2.使用累加器

首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

private IntCounter numLines = new IntCounter();

其次,必须在 rich function 的 open() 方法中注册累加器对象,也可以在此处定义累加器的名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在操作 function 中的任何位置(包括 open()close() 方法中)使用累加器。

this.numLines.add(1);

最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中(当前只有等待作业完成后执行才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用;Flink 计划在下一次迭代中提供上一次的迭代结果;可以使用 聚合器 来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3.定制累加器

自定义累加器只需要实现累加器接口,可以选择实现 Accumulator 或 SimpleAccumulator。

Accumulator 的实现十分灵活: 它定义了将要添加的值类型 V,并定义了最终的结果类型 R;例如,对于直方图,V 是一个数字且 R 是一个直方图。

SimpleAccumulator 适用于两种类型都相同的情况,例如计数器。

4.总结
1.通过调用 execute() 方法返回的 JobExecutionResult 对象获得累加器结果(只有等待作业完成后执行才起作用)。2.单个作业的所有累加器共享一个命名空间,可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

这篇关于9、Flink 用户自定义 Functions 及 累加器详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/952383

相关文章

MySQL数据库双机热备的配置方法详解

《MySQL数据库双机热备的配置方法详解》在企业级应用中,数据库的高可用性和数据的安全性是至关重要的,MySQL作为最流行的开源关系型数据库管理系统之一,提供了多种方式来实现高可用性,其中双机热备(M... 目录1. 环境准备1.1 安装mysql1.2 配置MySQL1.2.1 主服务器配置1.2.2 从

Linux kill正在执行的后台任务 kill进程组使用详解

《Linuxkill正在执行的后台任务kill进程组使用详解》文章介绍了两个脚本的功能和区别,以及执行这些脚本时遇到的进程管理问题,通过查看进程树、使用`kill`命令和`lsof`命令,分析了子... 目录零. 用到的命令一. 待执行的脚本二. 执行含子进程的脚本,并kill2.1 进程查看2.2 遇到的

MyBatis常用XML语法详解

《MyBatis常用XML语法详解》文章介绍了MyBatis常用XML语法,包括结果映射、查询语句、插入语句、更新语句、删除语句、动态SQL标签以及ehcache.xml文件的使用,感兴趣的朋友跟随小... 目录1、定义结果映射2、查询语句3、插入语句4、更新语句5、删除语句6、动态 SQL 标签7、ehc

详解SpringBoot+Ehcache使用示例

《详解SpringBoot+Ehcache使用示例》本文介绍了SpringBoot中配置Ehcache、自定义get/set方式,并实际使用缓存的过程,文中通过示例代码介绍的非常详细,对大家的学习或者... 目录摘要概念内存与磁盘持久化存储:配置灵活性:编码示例引入依赖:配置ehcache.XML文件:配置

从基础到高级详解Go语言中错误处理的实践指南

《从基础到高级详解Go语言中错误处理的实践指南》Go语言采用了一种独特而明确的错误处理哲学,与其他主流编程语言形成鲜明对比,本文将为大家详细介绍Go语言中错误处理详细方法,希望对大家有所帮助... 目录1 Go 错误处理哲学与核心机制1.1 错误接口设计1.2 错误与异常的区别2 错误创建与检查2.1 基础

k8s按需创建PV和使用PVC详解

《k8s按需创建PV和使用PVC详解》Kubernetes中,PV和PVC用于管理持久存储,StorageClass实现动态PV分配,PVC声明存储需求并绑定PV,通过kubectl验证状态,注意回收... 目录1.按需创建 PV(使用 StorageClass)创建 StorageClass2.创建 PV

Python版本信息获取方法详解与实战

《Python版本信息获取方法详解与实战》在Python开发中,获取Python版本号是调试、兼容性检查和版本控制的重要基础操作,本文详细介绍了如何使用sys和platform模块获取Python的主... 目录1. python版本号获取基础2. 使用sys模块获取版本信息2.1 sys模块概述2.1.1

一文详解Python如何开发游戏

《一文详解Python如何开发游戏》Python是一种非常流行的编程语言,也可以用来开发游戏模组,:本文主要介绍Python如何开发游戏的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、python简介二、Python 开发 2D 游戏的优劣势优势缺点三、Python 开发 3D

Redis 基本数据类型和使用详解

《Redis基本数据类型和使用详解》String是Redis最基本的数据类型,一个键对应一个值,它的功能十分强大,可以存储字符串、整数、浮点数等多种数据格式,本文给大家介绍Redis基本数据类型和... 目录一、Redis 入门介绍二、Redis 的五大基本数据类型2.1 String 类型2.2 Hash

Java中的.close()举例详解

《Java中的.close()举例详解》.close()方法只适用于通过window.open()打开的弹出窗口,对于浏览器的主窗口,如果没有得到用户允许是不能关闭的,:本文主要介绍Java中的.... 目录当你遇到以下三种情况时,一定要记得使用 .close():用法作用举例如何判断代码中的 input