59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理

本文主要是介绍59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 三、检测模式
    • 1、将模式应用到流上
    • 2、从模式中选取
      • 1)、匹配事件的选择提取(PatternSelectFunction)
      • 2)、匹配事件的选择提取(PatternFlatSelectFunction)
      • 3)、匹配事件的通用处理(PatternProcessFunction)
    • 3、处理超时的部分匹配
      • 1)、使用 PatternProcessFunction 的侧输出流
      • 2)、使用 PatternTimeoutFunction的侧输出流


本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

三、检测模式

1、将模式应用到流上

将模式应用到事件流上只要调用 CEP 类的静态方法.pattern()即可,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:

// 代码片段,完整内容可以参考本文中的其他完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组
DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式
Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin( ... );// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

输入流根据你的使用场景可以是keyed或者non-keyed。

在 non-keyed 流上使用模式将会使你的作业并发度被设为1。

这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。

默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;

如果是处理时间语义,那么所谓先后就是数据到达的顺序。

对于时间戳相同或是同时到达的事件,我们还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:

// 代码片段
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的PatternStream<Event> patternStream = CEP.pattern(loginEventDS, loginEventPattern, comparator);

得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。

2、从模式中选取

PatternStream 的转换操作分成两种:选择提取(select)操作和处理(process)操作。与 DataStream 的转换类似,在调用API 时传入一个函数类,即选择操作传入的是一个 PatternSelectFunction,处理操作传入PatternProcessFunction。

1)、匹配事件的选择提取(PatternSelectFunction)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction:代码中基于 PatternStream 直接调用.select()方法,传入一个 PatternSelectFunction 作为参数。

  • 示例
static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("second").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).next("third").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}});// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"+ map.get("third").toString();}}).print("输出信息:\n");// 控制台输出:env.execute();}

PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。

如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。

如果个体模式是循环的,List 中就有可能有多个元素了。

可以将匹配到的事件包装成 String 类型输出,代码如下:


static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).times(3) // 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.select(new PatternSelectFunction<LoginEvent, String>() {@Overridepublic String select(Map<String, List<LoginEvent>> map) throws Exception {// list中放了一个匹配了3个事件的模式return map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()+ " \n" + map.get("pattern").get(2).toString();}}).print("输出信息:\n");// 控制台输出:env.execute();}

2)、匹配事件的选择提取(PatternFlatSelectFunction)

要实现一个flatSelect()方法,与 select()的不同就在于没有返回值,b并且多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

static void test3() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}).times(3) // 匹配三次.consecutive();// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {@Overridepublic void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out)throws Exception {out.collect(// list中放了一个匹配了3个事件的模式map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()+ " \n" + map.get("pattern").get(2).toString());}}).print("输出信息:\n");// 控制台输出:env.execute();}

3)、匹配事件的通用处理(PatternProcessFunction)

在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。

推荐使用PatternProcessFunction。

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。

PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time),还可以调用.output()方法将数据输出到侧输出流。在 CEP 中,侧输出流一般被用来处理超时事件。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {@Overridepublic void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;IN startEvent = match.get("start").get(0);IN endEvent = match.get("end").get(0);out.collect(OUT(startEvent, endEvent));}
}
  • 完整示例

package org.cep;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1004, "192.168.10.3", "S", 4L));static void testProcess() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {@Overridepublic void flatSelect(Map<String, List<LoginEvent>> pattern, Collector<String> out)throws Exception {out.collect(pattern.get("first").toString());}}).print("flatSelect输出信息:\n");patternStream.process(new PatternProcessFunction<LoginEvent, String>() {@Overridepublic void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)throws Exception {out.collect(match.get("first").toString());}}).print("process输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcess();}
}

PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。

3、处理超时的部分匹配

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {@Overridepublic void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;...}@Overridepublic void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;IN startEvent = match.get("start").get(0);ctx.output(outputTag, T(startEvent));}
}

processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出。

前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction。

PatternStream<Event> patternStream = CEP.pattern(input, pattern);OutputTag<String> outputTag = new OutputTag<String>("side-output"){};SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(outputTag,new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {public void timeout(Map<String, List<Event>> pattern,long timeoutTimestamp,Collector<TimeoutEvent> out) throws Exception {out.collect(new TimeoutEvent());}},new PatternFlatSelectFunction<Event, ComplexEvent>() {public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {out.collect(new ComplexEvent());}}
);DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);

1)、使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

官方推荐做法

完整示例如下:


package org.cep;import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1005, "192.168.10.8", "F", 26L),new LoginEvent(1004, "192.168.10.3", "S", 4L));// 推荐做法static void testProcessTimedOut() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));DataStream<String> resultStream = patternStream.process(new AlanProcessTimedOut(outputTag));// 正常流输出resultStream.print("输出信息:\n");// 超时流输出,通过OutputTag((SingleOutputStreamOperator<String>) resultStream).getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcessTimedOut();}static class AlanProcessTimedOut extends PatternProcessFunction<LoginEvent, String>implements TimedOutPartialMatchHandler<LoginEvent> {private OutputTag<String> outputTag;public AlanProcessTimedOut(OutputTag<String> outputTag) {this.outputTag = outputTag;}// 超时匹配处理@Overridepublic void processTimedOutMatch(Map<String, List<LoginEvent>> match, Context ctx) throws Exception {// OutputTag<LoginEvent> outputTag = new OutputTag<LoginEvent>("AlanProcessTimedOut");ctx.output(outputTag, match.get("first").toString());}// 正常匹配处理@Overridepublic void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)throws Exception {out.collect(match.get("first").toString());}}
}

2)、使用 PatternTimeoutFunction的侧输出流

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。

Flink CEP 中也保留了简化版的PatternSelectFunction,它无法直接处理超时事件,不过可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。

PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。

在调用 PatternStream 的.select()方法时需要传入三个参数:

  • 侧输出流标签(OutputTag)
  • 超时事件处理函数 PatternTimeoutFunction
  • 匹配事件提取函数PatternSelectFunction
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/** @Author: alanchan* @LastEditors: alanchan* @Description: */
public class TestCEPDemo {@Data@NoArgsConstructor@AllArgsConstructorstatic class LoginEvent {private Integer userId;private String ip;private String status;private Long timestamp;@Overridepublic boolean equals(Object obj) {if (obj instanceof LoginEvent) {LoginEvent loginEvent = (LoginEvent) obj;return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)&& this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();} else {return false;}}@Overridepublic int hashCode() {return super.hashCode() + Long.hashCode(timestamp);}}final static List<LoginEvent> loginEventList = Arrays.asList(new LoginEvent(1001, "192.168.10.1", "F", 2L),new LoginEvent(1001, "192.168.10.2", "F", 3L),new LoginEvent(1002, "192.168.10.8", "F", 4L),new LoginEvent(1001, "192.168.10.6", "F", 5L),new LoginEvent(1002, "192.168.10.8", "F", 7L),new LoginEvent(1002, "192.168.10.8", "F", 8L),new LoginEvent(1002, "192.168.10.8", "S", 6L),new LoginEvent(1003, "192.168.10.8", "F", 6L),new LoginEvent(1005, "192.168.10.8", "F", 26L),new LoginEvent(1004, "192.168.10.3", "S", 4L));static void testProcessTimedOut2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 按用户id分组DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp())).keyBy(loginEvent -> loginEvent.getUserId());// 定义模式Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {@Overridepublic boolean filter(LoginEvent value) throws Exception {return value.getStatus().equals("F");}}));// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStreamPatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);// 将匹配到的流选择出来输出OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));SingleOutputStreamOperator<String> resultStream = patternStream.select(outputTag,new PatternTimeoutFunction<LoginEvent, String>() {// 处理超时流@Overridepublic String timeout(Map<String, List<LoginEvent>> pattern, long timeoutTimestamp)throws Exception {return pattern.get("first").toString() + "  timeoutTimestamp:" + timeoutTimestamp;}}, new PatternSelectFunction<LoginEvent, String>() {// 处理正常流@Overridepublic String select(Map<String, List<LoginEvent>> pattern) throws Exception {return pattern.get("first").toString();}});// 正常流输出resultStream.print("输出信息:\n");// 超时流输出,通过OutputTagresultStream.getSideOutput(outputTag).print("timeout输出信息:\n");// 控制台输出:env.execute();}public static void main(String[] args) throws Exception {testProcessTimedOut2();}}

以上,本文介绍了Flink 的类库CEP的模式检测,主要介绍数据的三种选取方式以及延迟数据的处理。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

这篇关于59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/636780

相关文章

Python实现批量CSV转Excel的高性能处理方案

《Python实现批量CSV转Excel的高性能处理方案》在日常办公中,我们经常需要将CSV格式的数据转换为Excel文件,本文将介绍一个基于Python的高性能解决方案,感兴趣的小伙伴可以跟随小编一... 目录一、场景需求二、技术方案三、核心代码四、批量处理方案五、性能优化六、使用示例完整代码七、小结一、

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

PHP应用中处理限流和API节流的最佳实践

《PHP应用中处理限流和API节流的最佳实践》限流和API节流对于确保Web应用程序的可靠性、安全性和可扩展性至关重要,本文将详细介绍PHP应用中处理限流和API节流的最佳实践,下面就来和小编一起学习... 目录限流的重要性在 php 中实施限流的最佳实践使用集中式存储进行状态管理(如 Redis)采用滑动

Python中logging模块用法示例总结

《Python中logging模块用法示例总结》在Python中logging模块是一个强大的日志记录工具,它允许用户将程序运行期间产生的日志信息输出到控制台或者写入到文件中,:本文主要介绍Pyt... 目录前言一. 基本使用1. 五种日志等级2.  设置报告等级3. 自定义格式4. C语言风格的格式化方法

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

sky-take-out项目中Redis的使用示例详解

《sky-take-out项目中Redis的使用示例详解》SpringCache是Spring的缓存抽象层,通过注解简化缓存管理,支持Redis等提供者,适用于方法结果缓存、更新和删除操作,但无法实现... 目录Spring Cache主要特性核心注解1.@Cacheable2.@CachePut3.@Ca

QT Creator配置Kit的实现示例

《QTCreator配置Kit的实现示例》本文主要介绍了使用Qt5.12.12与VS2022时,因MSVC编译器版本不匹配及WindowsSDK缺失导致配置错误的问题解决,感兴趣的可以了解一下... 目录0、背景:qt5.12.12+vs2022一、症状:二、原因:(可以跳过,直奔后面的解决方法)三、解决方

MySQL中On duplicate key update的实现示例

《MySQL中Onduplicatekeyupdate的实现示例》ONDUPLICATEKEYUPDATE是一种MySQL的语法,它在插入新数据时,如果遇到唯一键冲突,则会执行更新操作,而不是抛... 目录1/ ON DUPLICATE KEY UPDATE的简介2/ ON DUPLICATE KEY UP

Python中Json和其他类型相互转换的实现示例

《Python中Json和其他类型相互转换的实现示例》本文介绍了在Python中使用json模块实现json数据与dict、object之间的高效转换,包括loads(),load(),dumps()... 项目中经常会用到json格式转为object对象、dict字典格式等。在此做个记录,方便后续用到该方