首页 > 编程学习 > 【Flink】复杂事件处理CEP底层实现(有限状态机)和应用

文章目录

  • 一 Flink CEP简介
    • 1 什么是复杂事件处理CEP
    • 2 Flink CEP
      • (1)导入依赖
      • (2)代码编写
      • (3)优化模板
    • 3 实现CEP底层 -- 有限状态机
    • 4 使用CEP处理超时事件

一 Flink CEP简介

1 什么是复杂事件处理CEP

一个或多个由简单事件构成的事件流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂事件。

特征有如下几点:

  • 目标:从有序的简单事件流中发现一些高阶特征。
  • 输入:一个或多个由简单事件构成的事件流。
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件。
  • 输出:满足规则的复杂事件。

如下图中,将输入流中的元素,按照连续两个事件,且第一个元素为正方形,第二个元素为三角形进行过滤:

在这里插入图片描述

CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的事件流中找出有意义的模式和复杂的关系,以接近实时或准实时的获得通知并阻止一些行为。

CEP支持在流上进行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。

看起来很简单,但是它有很多不同的功能:

  • 输入的流数据,尽快产生结果
  • 在2个event流上,基于时间进行聚合类的计算
  • 提供实时/准实时的警告和通知
  • 在多样的数据源中产生关联并分析模式
  • 高吞吐、低延迟的处理

市场上有多种CEP的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。

2 Flink CEP

Flink为CEP提供了专门的Flink CEP library,它包含如下组件:

  • Event Stream
  • pattern定义
  • pattern检测
  • 生成Alert

在这里插入图片描述

首先,开发人员要在DataStream流上定义出模式条件,之后Flink CEP引擎进行模式检测,必要时生成告警。

(1)导入依赖

为了使用Flink CEP,需要导入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>

(2)代码编写

使用API完成检测用户连续三次登录失败的需求

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<day06.Example3.Event> stream = env.fromElements(new day06.Example3.Event("user-1", "log-fail", 1000L),new day06.Example3.Event("user-1", "log-fail", 2000L),new day06.Example3.Event("user-2", "log-succ", 3000L),new day06.Example3.Event("user-1", "log-fail", 4000L),new day06.Example3.Event("user-1", "log-fail", 5000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<day06.Example3.Event>() {@Overridepublic long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {return element.timestamp;}}));// 定义模板(org.apache.flink.cep.pattern.Pattern)Pattern<day06.Example3.Event, day06.Example3.Event> pattern = Pattern.<day06.Example3.Event>begin("first")   // 给第一个匹配事件起名.where(new SimpleCondition<day06.Example3.Event>() {@Overridepublic boolean filter(day06.Example3.Event value) throws Exception {return value.eventType.equals("log-fail");}}).next("second")  //next表示严格紧邻.where(new SimpleCondition<day06.Example3.Event>() {@Overridepublic boolean filter(day06.Example3.Event value) throws Exception {return value.eventType.equals("log-fail");}}).next("third").where(new SimpleCondition<day06.Example3.Event>() {@Overridepublic boolean filter(day06.Example3.Event value) throws Exception {return value.eventType.equals("log-fail");}});// 在流上使用模板,参数为输入的流和要匹配的模板PatternStream<day06.Example3.Event> patternStream = CEP.pattern(stream.keyBy(r -> r.orderId), pattern);// 使用select方法将匹配到的事件取出patternStream.select(new PatternSelectFunction<day06.Example3.Event, String>() {@Overridepublic String select(Map<String, List<day06.Example3.Event>> map) throws Exception {// map的key是给时间起的名字,v为名字对应的事件列表// 上例中事件只有一个,列表中只有一个元素day06.Example3.Event first = map.get("first").get(0);day06.Example3.Event second = map.get("second").get(0);day06.Example3.Event third = map.get("third").get(0);String result = "用户:" + first.orderId + "分别在以下三个时间:" + first.timestamp+ "、" + second.timestamp + "、" + third.timestamp + "登录失败了";return result;}}).print();env.execute();
}

(3)优化模板

Pattern<Example3.Event, Example3.Event> pattern = Pattern.<day06.Example3.Event>begin("log-fail")   // 给第一个匹配事件起名.where(new SimpleCondition<Example3.Event>() {@Overridepublic boolean filter(day06.Example3.Event value) throws Exception {return value.eventType.equals("log-fail");}}).times(3);
patternStream.select(new PatternSelectFunction<Example3.Event, String>() {@Overridepublic String select(Map<String, List<Example3.Event>> map) throws Exception {// map的key是给时间起的名字,v为名字对应的事件列表// 上例中事件只有一个,列表中只有一个元素day06.Example3.Event first = map.get("log-fail").get(0);day06.Example3.Event second = map.get("log-fail").get(1);day06.Example3.Event third = map.get("log-fail").get(2);String result = "用户:" + first.orderId + "分别在以下三个时间:" + first.timestamp+ "、" + second.timestamp + "、" + third.timestamp + "登录失败了";return result;}}).print();

3 实现CEP底层 – 有限状态机

使用状态机实现检测连续三次登录失败,实现原理如下图:

在这里插入图片描述

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Example3.Event> stream = env.fromElements(new day06.Example3.Event("user-1", "log-fail", 1000L),new day06.Example3.Event("user-1", "log-fail", 2000L),new day06.Example3.Event("user-2", "log-succ", 3000L),new day06.Example3.Event("user-1", "log-fail", 4000L),new day06.Example3.Event("user-1", "log-fail", 5000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Example3.Event>() {@Overridepublic long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {return element.timestamp;}}));stream.keyBy(r -> r.orderId).process(new KeyedProcessFunction<String, Example3.Event, String>() {private HashMap<Tuple2<String,String>,String> stateMachine = new HashMap<>();private ValueState<String> currentState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// 如果接收到初始登录状态为成功,返回SUCCESS// 状态转移矩阵// key:(状态,接收到事件的类型)// value:(将要跳转到的状态)stateMachine.put(Tuple2.of("INITIAL","log-succ"),"SUCCESS");stateMachine.put(Tuple2.of("INITIAL","log-fail"),"s1");stateMachine.put(Tuple2.of("s1","log-succ"),"SUCCESS");stateMachine.put(Tuple2.of("s1","log-fail"),"s2");stateMachine.put(Tuple2.of("s2","log-succ"),"SUCCESS");stateMachine.put(Tuple2.of("s2","log-fail"),"FAIL");currentState = getRuntimeContext().getState(new ValueStateDescriptor<String>("current-state", Types.STRING));}@Overridepublic void processElement(Example3.Event value, Context ctx, Collector<String> out) throws Exception {if(currentState.value() == null){currentState.update("INITIAL");}// 记录将要跳转到的状态// 如initial状态,到来事件值为log-succ,那么nextState为SUCCESS// 取状态机的value部分String nextState = stateMachine.get(Tuple2.of(currentState.value(), value.eventType));if(nextState.equals("FAIL")){out.collect("用户" + value.orderId + "连续三次登录失败了");currentState.update("s2");} else if(nextState.equals("SUCCESS")){currentState.clear();} else {currentState.update(nextState);}}}).print();env.execute();
}

4 使用CEP处理超时事件

现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。

先将事件流按照订单号 orderId 分流,然后定义这样的一个事件模式:在 15 分钟内,事件“create”与“pay”严格紧邻:

public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Example3.Event> stream = env.fromElements(new day06.Example3.Event("order-1", "create", 1000L),new day06.Example3.Event("order-2", "create", 2000L),new day06.Example3.Event("order-1", "pay", 19000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<day06.Example3.Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Example3.Event>() {@Overridepublic long extractTimestamp(day06.Example3.Event element, long recordTimestamp) {return element.timestamp;}}));Pattern<Example3.Event, Example3.Event> pattren = Pattern.<Example3.Event>begin("create").where(new SimpleCondition<Example3.Event>() {@Overridepublic boolean filter(Example3.Event value) throws Exception {return value.eventType.equals("create");}}).next("pay").where(new SimpleCondition<Example3.Event>() {@Overridepublic boolean filter(Example3.Event value) throws Exception {return value.eventType.equals("pay");}})// 要求两事件的间隔时间不能超过15分钟.within(Time.minutes(15));PatternStream<Example3.Event> patternStream = CEP.pattern(stream.keyBy(r -> r.orderId), pattren);SingleOutputStreamOperator<String> result = patternStream// 第一个参数是侧输出标签// 第二个参数用于将超时事件发送到侧输出流// 第三个参数用于处理正常事件.flatSelect(new OutputTag<String>("timeout") {},new PatternFlatTimeoutFunction<Example3.Event, String>() {@Overridepublic void timeout(Map<String, List<Example3.Event>> map, long l, Collector<String> collector) throws Exception {Example3.Event create = map.get("create").get(0);collector.collect("订单:" + create.orderId + "超时了");}},new PatternFlatSelectFunction<Example3.Event, String>() {@Overridepublic void flatSelect(Map<String, List<Example3.Event>> map, Collector<String> collector) throws Exception {Example3.Event pay = map.get("pay").get(0);collector.collect("订单:" + pay.orderId + "已支付");}});result.print();result.getSideOutput(new OutputTag<String>("timeout"){}).print();env.execute();
}

本文链接:https://www.ngui.cc/article/show-738623.html
Copyright © 2010-2022 ngui.cc 版权所有 |关于我们| 联系方式| 豫B2-20100000