Matrix42

不饱食以终日,不弃功于寸阴

[译]Flink Broadcast State实用指南

从1.5.0开始,Flink提供了一种新的State类型,称为Broadcast State。在这篇文章中,我们将解释什么是Broadcast State,并展示如何将其应用于评估事件流上的动态模式的应用的示例。我们将向您介绍处理步骤和源代码,以实现此应用。

什么是Broadcast State?

Broadcast State可用于以特定方式组合和联合处理两个事件流。第一个流的事件被广播到一个算子的所有并行实例,该算子将它们保存为状态。另一个流的事件不广播,而是发送给同一个算子的单个实例,并与广播流的事件一起处理。对于需要连接低吞吐量和高吞吐量流或需要动态更新处理逻辑的应用来说,新的broadcast state非常适合。我们将使用一个具体示例来解释broadcast state,并在本文的其余部分更详细地展示其API。

Broadcast State下的动态模式评估

想象一下,一个电子商务网站捕获所有用户的交互作为用户行为流。运营网站的公司有兴趣分析交互,以增加收入,改善用户体验,并检测和防止恶意行为。该网站实现了一个流应用,该应用检测用户事件流上的模式。但是,公司希望避免每次模式改变时修改和重新部署应用。相反,当应用接收到来自模式流的新模式时,它会摄取第二个模式流并更新其活动模式。在下面,我们将逐步讨论这个应用,并展示它如何利用Flink中的broadcast state特性。

《[译]Flink Broadcast State实用指南》

我们的示例应用包含两个数据流。第一个流提供用户在网站上的行为,如上图的左上方所示。用户交互事件包括行为的类型(用户登录、用户注销、添加到购物车或完成支付)和用户的id,该id由颜色编码(不同颜色代表不同用户)。我们插图中的用户行为事件流包含用户1001的注销行为,用户1003的支付行为和用户1002的加入购物车行为。

第二个流提供应用将评估的行为模式。模式由两个连续的行为组成。在上面的图中,模式流包含以下两个:

  • 模式1:用户登录并立即退出而不浏览电子商务网站上的其他页面。
  • 模式2:用户向购物车中添加一个物品,然后在没有完成购买的情况下注销。

这些模式有助于企业更好地分析用户行为、检测恶意行为和改善网站体验。例如,如果项目被添加到购物车而没有后续购买,网站团队可以采取适当的行动,以更好地理解用户不完成购买的原因,并启动特定的程序来提高网站转化(例如提供折扣码、限时免费送货优惠等)。

在右侧,该图显示了算子的三个并行任务,它们摄取模式和用户行为流,评估行为流上的模式,并向下游发出模式匹配。为了简单起见,在我们的示例中,算子只计算一个模式,只包含两个后续行为。当从模式流接收到新模式时,替换当前活动的模式。原则上,还可以实现一个算子来同时评估更复杂的模式或多个模式,这些模式可以单独添加或删除。

我们将描述模式匹配应用如何处理用户行为流和模式流。

《[译]Flink Broadcast State实用指南》

首先,将模式发送给算子。该模式被广播到算子的所有三个并行任务。任务将模式存储在其broadcast state中。由于broadcast state只应该使用广播数据更新,所以所有任务的状态都是相同的。

《[译]Flink Broadcast State实用指南》

接下来,在用户id上对第一个用户行为进行分区,并将其发送给下游算子。分区确保同一个用户的所有行为都由同一个任务处理。上图显示了应用在第一个模式之后的状态,以及算子消耗了前三个行为事件。

当任务接收到新的用户行为时,它会通过查看用户的最新和先前行为来评估当前活动模式。对于每个用户,运算符将前面的操为存储在keyed state。由于图中的任务到目前为止只接收到每个用户的一个行为(我们刚刚启动应用),因此不需要对模式进行评估。最后,处于用户keyed state的前一个行为被更新为最新的行为,以便能够在同一用户的下一个行为到达时查找它。

《[译]Flink Broadcast State实用指南》

在处理前三个行为之后,下一个事件(用户1001的注销行为)被发送到处理用户1001的事件的任务。当任务接收到行为时,它从broadcast state和用户1001的前一个行为中查找当前模式。由于模式与两个行为匹配,任务将发出模式匹配事件。最后,该任务通过使用最新行为覆盖前一个事件来更新其keyed state。

《[译]Flink Broadcast State实用指南》

当一个新模式到达模式流时,它将被广播到所有任务,每个任务通过用新模式替换当前模式来更新其broadcast state。

《[译]Flink Broadcast State实用指南》

一旦用新模式更新broadcast state,匹配逻辑就会像以前一样继续,即用户行为事件按key进行分区,并由负责的任务进行评估。

如何用Broadcast State实现一个应用?

到目前为止,我们从概念上讨论了这个应用,并解释了它如何使用broadcast state来评估事件流上的动态模式。接下来,我们将展示如何使用Flink的Datastream API和broadcast state特性来实现示例应用。

让我们从应用的输入数据开始。我们有两个数据流,行为流和模式流。在这一点上,我们并不关流从何而来。这些流可能是从Kafka、Kinesis或任何其他系统中摄取的。行为和模式是Pojos,每个字段有两个:

DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???

ActionPatternPojos有两个字段:

  • Action: Long userId, String action
  • Pattern: String firstAction, String secondAction

第一步,我们在流上使用userId属性进行keyBy操作。

KeyedStream<Action, Long> actionsByUser = actions
  .keyBy((KeySelector<Action, Long>) action -> action.userId);

接下来,我们准备broadcast state。broadcast state始终表示为MapState,这是Flink提供的最通用的状态原语。

MapStateDescriptor<Void, Pattern> bcStateDescriptor = 
  new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));

由于我们一次仅评估和存储单个模式,我们将broadcast state配置为具有键类型Void和值类型Pattern的MapState。模式始终存储在MapState中,并将null作为键。

BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);

对于broadcast state应该使用MapStateDescriptor,我们在patterns流上调用broadcast()方法将它转换为BroadcastStream流bcedPatterns.

DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
 .connect(bcedPatterns)
 .process(new PatternEvaluator());

我们得到了keyed之后的actionsByUser流与广播流bcedPatterns,我们调用connect()方法将他们连接在一起然后在流上应用PatternEvaluatorPatternEvaluator实现了KeyedBroadcastProcessFunction接口。它应用我们前面讨论过的模式匹配逻辑,并发送包含用户id和匹配模式的记录的Tuple2<Long, Pattern>

public static class PatternEvaluator
    extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {

  // handle for keyed state (per user)
  ValueState<String> prevActionState;
  // broadcast state descriptor
  MapStateDescriptor<Void, Pattern> patternDesc;

  @Override
  public void open(Configuration conf) {
    // initialize keyed state
    prevActionState = getRuntimeContext().getState(
      new ValueStateDescriptor<>("lastAction", Types.STRING));
    patternDesc = 
      new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
  }

  /**
   * Called for each user action.
   * Evaluates the current pattern against the previous and
   * current action of the user.
   */
  @Override
  public void processElement(
     Action action, 
     ReadOnlyContext ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // get current pattern from broadcast state
   Pattern pattern = ctx
     .getBroadcastState(this.patternDesc)
     // access MapState with null as VOID default value
     .get(null);
   // get previous action of current user from keyed state
   String prevAction = prevActionState.value();
   if (pattern != null && prevAction != null) {
     // user had an action before, check if pattern matches
     if (pattern.firstAction.equals(prevAction) && 
         pattern.secondAction.equals(action.action)) {
       // MATCH
       out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
     }
   }
   // update keyed state and remember action for next pattern evaluation
   prevActionState.update(action.action);
 }

 /**
  * Called for each new pattern.
  * Overwrites the current pattern with the new pattern.
  */
 @Override
 public void processBroadcastElement(
     Pattern pattern, 
     Context ctx, 
     Collector<Tuple2<Long, Pattern>> out) throws Exception {
   // store the new pattern by updating the broadcast state
   BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
   // storing in MapState with null as VOID default value
   bcState.put(null, pattern);
 }
}

这个KeyedBroadcastProcessFunction接口提供了处理记录和发出结果的三种方法。

  • processBroadcastElement(): 在广播流的每个记录调进来的时候用。在PatternEvaluator函数,我们简单地将接收到的Pattern使用null键(记住,我们只在MapState).
  • processElement(): 在keyed stream的每个记录进来的时候调用。它提供对Broadcast State的只读访问,以防止对跨函数并行实例的不同broadcast state的修改。这PatternEvaluator的processElement()方法从broadcast state检索当前模式,从keyed state检索用户的先前行为。如果两者都存在,它将检查前面和当前的行为是否与模式匹配,如果匹配话,它会发出模式匹配记录。最后,它将keyed state更新为当前用户行为。
  • onTimer(): 在之前注册过的计时器触发时调用。计时器可以在processElement方法中注册,用于执行计算或清除将来的状态。为了保持代码的简洁性我们没有在我们的示例中实现这个方法。但是,当用户在一段时间内没有活动时,可以使用它来删除用户的最后一个行为,以避免由于不活动的用户而导致state的增长。

你可能已经注意到KeyedBroadcastProcessFunction的process方法。context 对象允许使用其他功能,如:

  • broadcast state(读写或只读,取决于方法)
  • TimerService,它允许访问记录的时间戳、当前watermark,并且可以注册计时器
  • 当前的key(仅在processElement()方法中可用),以及一种将函数应用于每个注册key的keyed state的方法(仅在processBroadcastElement()方法中可用)

这个KeyedBroadcastProcessFunction就像其他ProcessFunction一样完全可以访问Flink中的state和时间特性,因此可以用来实现复杂的逻辑。broadcast state被设计成一个通用的特性,可以适应不同的场景和用例。虽然我们只讨论了一个相当简单和受限的应用,但您可以通过多种方式使用broadcast state来实现应用的需求。

结语

在这篇文章中,我们向您介绍了一个示例应用,以解释Flink的broadcast state是什么,以及如何使用它来评估事件流上的动态模式。我们还讨论了API,并展示了示例应用的源码。

原文

《[译]Flink Broadcast State实用指南》

扫描二维码关注微信公众号

点赞

发表评论

电子邮件地址不会被公开。 必填项已用*标注

20 − 15 =