Flink原理与实现:Flink中的状态管理,keygroup,namespace

namespace维护每个subtask的状态

 

上面Flink原理与实现的文章中,有引用word count的例子,但是都没有包含状态管理。也就是说,如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。

Flink通过定期地做checkpoint来实现容错和恢复。

State

Keyed State和Operator State

Flink中包含两种基础的状态:Keyed State和Operator State。

Keyed State

顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。

Operator State

与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。

举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

原始状态和Flink托管状态 (Raw and Managed State)

Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态。

托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。

下面是Flink整个状态框架的类图,还是比较复杂的,可以先扫一眼,看到后面再回过来看:

image.png

通过框架提供的接口,我们来更新和管理状态的值。

而raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

下文中所提到的状态,如果没有特殊说明,均为托管状态。

使用Keyed State

首先看一下Keyed State下,我们可以用哪些原子状态:

  • ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。
  • ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable<T>来遍历状态值。
  • ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态将会在Flink未来版本中被删除)。
  • MapState:即状态值为一个map。用户通过putputAll方法添加元素。

以上所有的状态类型,都有一个clear方法,可以清除当前key对应的状态。

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄(state handle)。

接下来看下,我们如何得到这个状态句柄。Flink通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息。与上面的状态对应,从StateDescriptor派生了ValueStateDescriptorListStateDescriptor等descriptor。

具体如下:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptor)

接下来我们看一下创建和使用ValueState的例子:


 
  1. public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {

  2.  
  3. /**

  4. * ValueState状态句柄. 第一个值为count,第二个值为sum。

  5. */

  6. private transient ValueState<Tuple2<Long, Long>> sum;

  7.  
  8. @Override

  9. public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {

  10. // 获取当前状态值

  11. Tuple2<Long, Long> currentSum = sum.value();

  12.  
  13. // 更新

  14. currentSum.f0 += 1;

  15. currentSum.f1 += input.f1;

  16.  
  17. // 更新状态值

  18. sum.update(currentSum);

  19.  
  20. // 如果count >=2 清空状态值,重新计算

  21. if (currentSum.f0 >= 2) {

  22. out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));

  23. sum.clear();

  24. }

  25. }

  26.  
  27. @Override

  28. public void open(Configuration config) {

  29. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =

  30. new ValueStateDescriptor<>(

  31. "average", // 状态名称

  32. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // 状态类型

  33. Tuple2.of(0L, 0L)); // 状态默认值

  34. sum = getRuntimeContext().getState(descriptor);

  35. }

  36. }

  37.  
  38. // ...

  39. env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))

  40. .keyBy(0)

  41. .flatMap(new CountWindowAverage())

  42. .print();

  43.  
  44. // the printed output will be (1,4) and (1,5)

由于状态需要从RuntimeContext中创建和获取,因此如果要使用状态,必须使用RichFunction。普通的Function是无状态的。

KeyedStream上的scala api则提供了一些语法糖,让创建和使用状态更加方便:


 
  1. val stream: DataStream[(String, Int)] = ...

  2.  
  3. val counts: DataStream[(String, Int)] = stream

  4. .keyBy(_._1)

  5. .mapWithState((in: (String, Int), count: Option[Int]) =>

  6. count match {

  7. case Some(c) => ( (in._1, c), Some(c + in._2) )

  8. case None => ( (in._1, 0), Some(in._2) )

  9. })

Inside Keyed State

上面以Keyed State为例讲了如何使用状态,接下来我们从代码层面分析一下,框架在内部做了什么事情。

先看下上面例子中open方法中获取状态句柄的代码:

    sum = getRuntimeContext().getState(descriptor);

它调用了RichFlatMapFunction.getRuntimeContext().getState方法,最终会调用StreamingRuntimeContext.getState方法:


 
  1. public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {

  2. KeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);

  3. stateProperties.initializeSerializerUnlessSet(getExecutionConfig());

  4. return keyedStateStore.getState(stateProperties);

  5. }

checkPreconditionsAndGetKeyedStateStore方法中:


 
  1. KeyedStateStore keyedStateStore = operator.getKeyedStateStore();

  2. return keyedStateStore;

即返回了AbstractStreamOperator.keyedStateStore变量。这个变量的初始化在AbstractStreamOperator.initState方法中:


 
  1. private void initKeyedState() {

  2. try {

  3. TypeSerializer<Object> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

  4. // create a keyed state backend if there is keyed state, as indicated by the presence of a key serializer

  5. if (null != keySerializer) {

  6. KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(

  7. container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),

  8. container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),

  9. container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());

  10.  
  11. long estimatedStateSizeInMB = config.getStateSize();

  12.  
  13. this.keyedStateBackend = container.createKeyedStateBackend(

  14. keySerializer,

  15. // The maximum parallelism == number of key group

  16. container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),

  17. subTaskKeyGroupRange,

  18. estimatedStateSizeInMB);

  19.  
  20. this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());

  21. }

  22.  
  23. // ...

  24. }

它先调用StreamTask.createKeyedStateBackend方法创建stateBackend,然后将stateBackend传入DefaultKeyedStateStore。

StreamTask.createKeyedStateBackend方法通过它内部的stateBackend来创建keyed statebackend:


 
  1. backend = stateBackend.createKeyedStateBackend(

  2. getEnvironment(),

  3. getEnvironment().getJobID(),

  4. operatorIdentifier,

  5. keySerializer,

  6. numberOfKeyGroups,

  7. keyGroupRange,

  8. estimatedStateSizeInMB,

  9. getEnvironment().getTaskKvStateRegistry());

看一下statebackend的初始化,在StreamTask.createStateBackend方法中,这个方法会根据配置项state.backend的值创建backend,其中内置的backend有jobmanagerfilesystemrocksdb

jobmanager的state backend会把状态存储在job manager的内存中。
filesystem会把状态存在文件系统中,有可能是本地文件系统,也有可能是HDFS、S3等分布式文件系统。
rocksdb会把状态存在rocksdb中。

所以可以看到,创建了state backend之后,创建keyed stated backend,实际上就是调用具体的state backend来创建。我们以filesystem为例,实际就是FsStateBackend.createKeyedStateBackend方法,这个方法也很简单,直接返回了HeapKeyedStateBackend对象。

先不展开说HeapKeyedStateBackend类,我们返回去看创建keyed state,最终返回的是DefaultKeyedStateStore对象,它的getStategetListStategetReducingState等方法,都是对底层keyed state backend的一层封装,keyedStateBackend.getPartitionedState来返回具体的state handle(DefaultKeyedStateStore.getPartitionedState方法)。

这个方法实际调用了AbstractKeyedStateBackend.getPartitionedState方法,HeapKeyedStateBackendRocksDBKeyedStateBackend都从这个基类派生。

这个类有一个成员变量:

    private final HashMap<String, InternalKvState<?>> keyValueStatesByName;

它保存了的一个映射。map value中的InternalKvState,实际为创建的HeapValueStateHeapListStateRocksDBValueStateRocksDBListStat等实现。

回到上面AbstractKeyedStateBackend.getPartitionedState,正常的代码路径下,它会调用AbstractKeyedStateBackend.getOrCreateKeyedState方法来创建这个InternalKvState,其方法如下:


 
  1. S state = stateDescriptor.bind(new StateBackend() {

  2. @Override

  3. public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {

  4. return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);

  5. }

  6.  
  7. @Override

  8. public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {

  9. return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);

  10. }

  11. // ...

AbstractKeyedStateBackend.createValueStateAbstractKeyedStateBackend.createListState等方法是AbstractKeyedStateBackend的抽象方法,具体还是在HeapKeyedStateBackend、RocksDBKeyedStateBackend等类中实现的,所以这里创建的state只是一个代理,它proxy了具体的上层实现。在我们的例子中,最后绕了一个圈,调用的仍然是HeapKeyedStateBackend.createValueState方法,并将state name对应的state handle放入到keyValueStatesByName这个map中,保证在一个task中只有一个同名的state handle。

回来看HeapKeyedStateBackend,这个类有一个成员变量:

    private final Map<String, StateTable<K, ?, ?>> stateTables = new HashMap<>();

它的key为state name, value为StateTable,用来存储这个state name下的状态值。它会将所有的状态值存储在内存中。

它的createValueState方法:


 
  1. StateTable<K, N, V> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc);

  2. return new HeapValueState<>(this, stateDesc, stateTable, keySerializer, namespaceSerializer);

即先注册StateTable,然后返回一个HeapValueState。

这里整理一下从应用层面创建一个ValueState的state handle的过程:


 
  1. sum = getRuntimeContext().getState(descriptor) (app code)

  2. --> RichFlatMapFunction.getRuntimeContext().getState

  3. --> StreamingRuntimeContext.getState

  4. --> KeyedStateStore.getState(stateProperties)

  5. --> AbstractStreamOperator.keyedStateStore.getState

  6. --> DefaultKeyedStateStore.getState

  7. --> DefaultKeyedStateStore.getPartitionedState

  8. --> AbstractKeyedStateBackend.getPartitionedState

  9. --> AbstractKeyedStateBackend.getOrCreateKeyedState

  10. --> HeapKeyedStateBackend.createValueState

  11. --> HeapKeyedStateBackend.tryRegisterStateTable

  12. --> return new HeapValueState

而从框架层面看,整个调用流程如下:


 
  1. Task.run

  2. --> StreamTask.invoke

  3. --> StreamTask.initializeState

  4. --> StreamTask.initializeOperators

  5. --> AbstractStreamOperator.initializeState

  6. --> AbstractStreamOperator.initKeyedState

  7. --> StreamTask.createKeyedStateBackend

  8. --> MemoryStateBackend.createKeyedStateBackend

  9. --> HeapKeyedStateBackend.<init>

整体来看,创建一个state handle还是挺绕的,中间经过了多层封装和代理。


创建完了state handle,接下来看看如何获取和更新状态值。

首先需要讲一下HeapState在内存中是如何组织的,还是以最简单的HeapValueState为例,
具体的数据结构,是在其基类AbstractHeapState中,以StateTable<K, N, SV> stateTable的形式存在的,其中K代表Key的类型,N代表state的namespace(这样属于不同namespace的state可以重名),SV代表state value的类型。

StateTable类内部数据结构如下:


 
  1. protected final KeyGroupRange keyGroupRange;

  2. /** Map for holding the actual state objects. */

  3. private final List<Map<N, Map<K, ST>>> state;

  4. /** Combined meta information such as name and serializers for this state */

  5. protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;

最核心的数据结构是state成员变量,它保存了一个list,其值类型为Map<N, Map<K, ST>>,即按namespace和key分组的两级map。那么它为什么是一个list呢,这里就要提到keyGroupRange成员变量了,它代表了当前state所包含的key的一个范围,这个范围根据当前的sub task id以及最大并发进行计算,在AbstractStreamOperator.initKeyedState方法中:


 
  1. KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(

  2. container.getEnvironment().getTaskInfo().getMaxNumberOfParallelSubtasks(),

  3. container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(),

  4. container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());

举例来说,如果当前task的并发是2,最大并发是128,那么task-1所属的state backend的keyGroupRange为[0,63],而task-2所属的state backend的keyGroupRange为[64,127]。

这样,task-1中的StateTable.state这个list,最大size即为64。获取特定key的state value时,会先计算key的hash值,然后用hash值 % 最大并发,这样会得到一个[0,127]之间的keyGroup,到这个list中get到这个下标的Map<N, Map<K,V>>值,然后根据 namespace + key二级获取到真正的state value。

看到这里,有人可能会问,对于一个key,如何保证在task-1中,它计算出来的keyGroup一定是在[0,63]之间,在task-2中一定是在[64,127]之间呢?

原因是,在KeyedStream中,使用了KeyGroupStreamPartitioner这种partitioner来向下游task分发keys,而这个类重载的selectChannels方法如下:


 
  1. K key;

  2. try {

  3. key = keySelector.getKey(record.getInstance().getValue());

  4. } catch (Exception e) {

  5. throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e);

  6. }

  7. returnArray[0] = KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfOutputChannels);

  8. return returnArray;

这里关键是KeyGroupRangeAssignment.assignKeyToParallelOperator方法,它中间调用了KeyGroupRangeAssignment.assignToKeyGroup方法来确定一个key所属的keyGroup,这个跟state backend计算keyGroup是同一个方法。然后根据这个keyGroup,它会计算出拥有这个keyGroup的task,并将这个key发送到此task。所以能够保证,从KeyedStream上emit到下游task的数据,它的state所属的keyGroup一定是在当前task的keyGroupRange中的。

上面已经提到了获取ValueState的值,这里贴一下代码,结合一下就很容易理解了:


 
  1. Map<N, Map<K, V>> namespaceMap = stateTable.get(backend.getCurrentKeyGroupIndex());

  2. if (namespaceMap == null) {

  3. return stateDesc.getDefaultValue();

  4. }

  5.  
  6. Map<K, V> keyedMap = namespaceMap.get(currentNamespace);

  7. if (keyedMap == null) {

  8. return stateDesc.getDefaultValue();

  9. }

  10.  
  11. V result = keyedMap.get(backend.getCurrentKey());

  12. if (result == null) {

  13. return stateDesc.getDefaultValue();

  14. }

  15.  
  16. return result;

而更新值则通过ValueState.update方法进行更新,这里就不贴代码了。

上面讲了最简单的ValueState,其他类型的state,其实也是基本一样的,只不过stateTable中状态值的类型不同而已。如HeapListState,它的状态值类型为ArrayList;HeapMapState,它的状态值类型为HashMap。而值类型的不同,导致了在State上的接口也有所不同,如ListState会有add方法,MapState有putget方法。在这里就不展开说了。


Checkpoint

到上面为止,都是简单的关于状态的读写,而且状态都还是只在Task本地,接下来就会涉及到checkpoint。
所谓checkpoint,就是在某一时刻,将所有task的状态做一个快照(snapshot),然后存储到memory/file system/rocksdb等。

关于Flink的分布式快照,请参考 分布式Snapshot和Flink Checkpointing简介 及相关论文,这里不详述了。

Flink的checkpoint,是由CheckpointCoordinator来协调的,它位于JobMaster中。但是其实在ExecutionGraph中已经创建了,见ExecutionGraph.enableSnapshotCheckpointing方法。

当Job状态切换到RUNNING时,CheckpointCoordinatorDeActivator(从JobStatusListener派生)会触发回调coordinator.startCheckpointScheduler();,根据配置的checkpoint interval来定期触发checkpoint。

每个checkpoint由checkpoint ID和timestamp来唯一标识,其中checkpoint ID可以是standalone(基于内存)的,也可能是基于ZK的。
已经完成的checkpoint,保存在CompletedCheckpointStore中,可以是StandaloneCompletedCheckpointStore(保存在JobMaster内存中),也可以是ZooKeeperCompletedCheckpointStore(保存在ZK中),甚至是自己实现的store,比如基于HDFS的。

触发checkpoint的方法在CheckpointCoordinator.ScheduledTrigger中,只有一行:

    triggerCheckpoint(System.currentTimeMillis(), true);

这个方法比较长,它会先做一系列检查,如检查coordinator自身的状态(是否被shutdown),还会检查与上次checkpoint的时间间隔、当前的并发checkpoint数是否超过限制,如果都没问题,再检查所有task的状态是否都为RUNNING,都没问题之后,触发每个Execution的checkpoint:


 
  1. for (Execution execution: executions) {

  2. execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);

  3. }

看下Execution.triggerCheckpoint方法:


 
  1. public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {

  2. final SimpleSlot slot = assignedResource;

  3.  
  4. if (slot != null) {

  5. final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

  6.  
  7. taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);

  8. } else {

  9. LOG.debug("The execution has no slot assigned. This indicates that the execution is " +

  10. "no longer running.");

  11. }

  12. }

很简单,通过RPC调用向TaskManager触发当前JOB的checkpoint,然后一路调用下去:


 
  1. RpcTaskManagerGateway.triggerCheckpoint

  2. --> TaskExecutorGateway.triggerCheckpoint

  3. --> TaskExecutor.triggerCheckpoint

  4. --> task.triggerCheckpointBarrier

  5. --> StatefulTask.triggerCheckpoint

  6. --> StreamTask.triggerCheckpoint

  7. --> StreamTask.performCheckpoint

具体做checkpoint的时候,会先向下游广播checkpoint barrier,然后调用StreamTask.checkpointState方法做具体的checkpoint,实际会调用到StreamTask.executeCheckpointing方法。

checkpoint里,具体操作为,遍历每个StreamTask中的所有operator:

  1. 调用operator的snapshotState(FSDataOutputStream out, long checkpointId, long timestamp)方法,存储operator state,这个结果会返回operator state handle,存储于nonPartitionedStates中。这里实际处理的时候,只有当user function实现了Checkpointed接口,才会做snapshot。需要注意的是,此接口已经deprecated,被CheckpointedFunction代替,而对CheckpointedFunction的snapshot会在下面的第2步中来做,因此这两个接口一般来说是2选1的。
  2. 调用operator的snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions)方法,返回OperatorSnapshotResult对象。注意虽然每个snapshot方法返回的都是一个RunnableFuture,不过目前实际上还是同步做的checkpoint(可以比较容易改成异步)。

    1. 这里会先调用AbstractStreamOperator.snapshotState方法,为rich function做state snapshot
    2. 调用operatorStateBackend.snapshot方法,对operator state做snapshot。
    3. 调用keyedStateBackend.snapshot方法,对keyed state做snapshot。
    4. 调用timerServiceBackend.snapshot方法,对processing time/event time window中注册的timer回调做snapshot(恢复状态的时候必须也要恢复timer回调)
  3. 调用StreamTask.runAsyncCheckpointingAndAcknowledge方法确认上面的snapshot是否都成功,如果成功,则会向CheckpointCoordinator发送ack消息。
  4. CheckpointCoordinator收到ack消息后,会检查本地是否存在这个pending的checkpoint,并且这个checkpoint是否超时,如果都OK,则判断是否收到所有task的ack消息,如果是,则表示已经完成checkpoint,会得到一个CompletedCheckpoint并加入到completedCheckpointStore中。

在上面的checkpoint过程中,如果state backend选择的是jobmanager,那么最终返回的state handle为ByteStreamStateHandle,这个state handle中包含了snapshot后的所有状态数据。而如果是filesystem,则state handle只会包含数据的文件句柄,数据则在filesystem中,这个下面会再细说。


Filesystem State Backend

上面提到的都是比较简单的基于内存的state backend,在实际生产中是不太可行的。因此一般会使用filesystem或者rocksdb的state backend。我们先讲一下基于filesystem的state backend。

基于内存的state backend实现为MemoryStateBackend,基于文件系统的state backend的实现为FsStateBackend。FsStateBackend有一个策略,当状态的大小小于1MB(可配置,最大1MB)时,会把状态数据直接存储在meta data file中,避免出现很小的状态文件。

FsStateBackend另外一个成员变量就是basePath,即checkpoint的路径。实际做checkpoint时,生成的路径为:<base-path>/<job-id>/chk-<checkpoint-id>/

而且filesystem推荐使用分布式文件系统,如HDFS等,这样在fail over时可以恢复,如果是本地的filesystem,那恢复的时候是会有问题的。

回到StreamTask,在做checkpoint的时候,是通过CheckpointStateOutputStream写状态的,FsStateBack会使用FsCheckpointStreamFactory,然后通过FsCheckpointStateOutputStream去写具体的状态,这个实现也比较简单,就是一个带buffer的写文件系统操作。最后向上层返回的StreamStateHandle,视状态的大小,如果状态特别小,则会直接返回带状态数据的ByteStreamStateHandle,否则会返回FileStateHandle,这个state handle包含了状态文件名和大小。

需要注意的是,虽然checkpoint是写入到文件系统中,但是基于FsStateBackend创建的keyed state backend,仍然是HeapKeyedStateBackend,也就是说,keyed state的读写仍然是会在内存中的,只有在做checkpoint的时候才会持久化到文件系统中。

RocksDB State Backend

RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,KeyedStateBackend等会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地。

从RocksDBStateBackend创建出来的RocksDBKeyedStateBackend,更新的时候会直接以key + namespace作为key,然后把具体的值更新到rocksdb中。

如果是ReducingState,则在add的时候,会先从rocksdb中读取已有的值,然后根据用户的reduce function进行reduce,再把新值写入rocksdb。

做checkpoint的时候,会首先在本地对rockdb做checkpoint(rocksdb自带的checkpoint功能),这一步是同步的。然后将checkpoint异步复制到远程文件系统中。最后返回RocksDBStateHandle

RocksDB克服了HeapKeyedStateBackend受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用。


Queryable State

Queryable State,顾名思义,就是可查询的状态,表示这个状态,在流计算的过程中就可以被查询,而不像其他流计算框架,需要存储到外部系统中才能被查询。目前可查询的state主要针对partitionable state,如keyed state等。

简单来说,当用户在job中定义了queryable state之后,就可以在外部,通过QueryableStateClient,通过job id, state name, key来查询所对应的状态的实时的值。

queryable state目前支持两种方法来定义:

  • 通过KeyedStream.asQueryableState方法,生成一个QueryableStream,需要注意的是,这个stream类似于一个sink,是不能再做transform的。 实现上,生成QueryableStream就是为当前stream加上一个operator:QueryableAppendingStateOperator,它的processElement方法,每来一个元素,就会调用state.add去更新状态。因此这种方式有一个限制,只能使用ValueDescriptor, FoldingStateDescriptor或者ReducingStateDescriptor,而不能是ListStateDescriptor,因为它可能会无限增长导致OOM。此外,由于不能在stream后面再做transform,也是有一些限制。
  • 通过managed keyed state。

    
     
    1. ValueStateDescriptor<Tuple2<Long, Long>> descriptor =

    2. new ValueStateDescriptor<>(

    3. "average", // the state name

    4. TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),

    5. Tuple2.of(0L, 0L));

    6. descriptor.setQueryable("query-name"); // queryable state name

这个只需要将具体的state descriptor标识为queryable即可,这意味着可以将一个pipeline中间的operator的state标识为可查询的。

首先根据state descriptor的配置,会在具体的TaskManager中创建一个KvStateServer,用于state查询,它就是一个简单的netty server,通过KvStateServerHandler来处理请求,查询state value并返回。

但是一个partitionable state,可能存在于多个TaskManager中,因此需要有一个路由机制,当QueryableStateClient给定一个query name和key时,要能够知道具体去哪个TaskManager中查询。

为了做到这点,在Job的ExecutionGraph(JobMaster)上会有一个用于定位KvStateServer的KvStateLocationRegistry,当在TaskManager中注册了一个queryable KvStateServer时,就会调用JobMaster.notifyKvStateRegistered,通知JobMaster。

具体流程如下图:

image.png

这个设计看起来很美好,通过向流计算实时查询状态数据,免去了传统的存储等的开销。但实际上,除了上面提到的状态类型的限制之外,也会受netty server以及state backend本身的性能限制,因此并不适用于高并发的查询。


参考资料:

  1. Dynamic Scaling: Key Groups
  2. Stateful Stream Processing
  3. Working with State
  4. Scaling to large state
  5. Queryable state design doc

热门文章

暂无图片
编程学习 ·

使用pip离线安装python扩展包依赖模块

简答来说就是从一台有网的主机下载好,放到离线主机上,用pip实现1.查看安装了哪些pip3 freeze网上一般都是pip3 freeze >requirements.txt 这就是查看安装了那些,然后存到文件里面2.就是把安装好的打包了,上面那个文件存的就是要打包的,我们完全可以直接,在里面写好想…
暂无图片
编程学习 ·

Linux centos7 乱码设置中文字符集

1.locale 查看现在使用的字符集locale -a 查看有哪些字符集utf8的就可以显示中文yum -y install kde-l10n-Chinese 安装后选个uft8的 ,设置一下全局变量vi /etc/profileexport LANG=en_CA.utf8=号后面是字符集,这个大家随意最后让这个配置文件生效就可以了. /etc/profile 可能…
暂无图片
编程学习 ·

yum本地云搭建

yum Yum(全称为 Yellow dog Updater, Modified)是一个在Fedora和RedHat以及CentOS中的Shell前端软件包管理器。基于RPM包管理,能够从指定的服务器自动下载RPM包并且安装,可以自动处理依赖性关系,并且一次安装所有依赖的软件包,无须繁琐地一次次下载、安装。 yum仓库配置文…
暂无图片
编程学习 ·

Mybatis多数据源配置

pom.xml <!-- Spring Boot Mybatis 依赖 --> <dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.2.0</version> </dependency><!-- MySQL 连…
暂无图片
编程学习 ·

Kotlin - 变量 val 和 var

什么是变量 变量是一个值的存储空间,这个值可以是一个字符串、一个数字或者其他东西。 每个变量都有一个名称(或标识符)来区别于其他变量。 可以通过变量的名称访问值。变量是程序中最常用的元素之一,因此理解如何使用它们非常重要。 声明变量 在开始使用变量之前,必须先声明…
暂无图片
编程学习 ·

linux 修改时间并永久生效

Centos系统,必须同时修改系统时间和硬件时间,才可以保证修改有效,单纯的使用date命令修改系统时间,是立即生效,重启后系统还原。具体操作如下:1.date {查看目前本地的时间}2.hwclock --show {查看硬件的时间}3.如果硬件时间和系统时间不同,那就对硬件的时间进行修改4.hw…
暂无图片
编程学习 ·

Python代码

hello world!!! import numpy as npfrom sklearn import linear_modelfrom mpl_toolkits.mplot3d import Axes3D import matplotlib.pyplot as pltxx, yy = np.meshgrid(np.linspace(0,10,10), np.linspace(0,100,10)) zz = 1.0 * xx + 3.5 * yy + np.random.randint(0,100,…
暂无图片
编程学习 ·

Exception 的意义

Exception 的意义 文章目录Exception 的意义引言Exception 的语义自底向上的观点自顶向下的观点结论 引言 为什么程序设计语言要加入 Exception 机制?这个问题的答案或许不是那么显然。 Exception 常见于 “操作过程可能出现意外” 的场景。比如,试图打开文件时发现文件不存在…
暂无图片
编程学习 ·

extern随笔

extern的用法总结函数声明是可有可无的,因为函数不加修饰符默认是extern的; 全局变量在其他文件中使用时,extern关键词是必须的,如果变量在其他文件中没有extern且没有显示的初始化,则会被当成变量的定义。局部变量是不需要extern关键字描述的,而且局部变量在程序运行时才…
暂无图片
编程学习 ·

LeetCode第47题 全排列 II

题目描述 给定一个可包含重复数字的序列,返回所有不重复的全排列。解题思路 1、题意理解,该序列有重复的数字,需要用一个boolean变量来标记该数字是否已经加入列表中 2、如果无法想象需要在哪里剪枝,画图。这里以[1,1,2]的全排列为例。圈蓝圈的表示需要剪枝的地方。代码:i…
暂无图片
编程学习 ·

【Docker】 Docker pull的时候指定仓库

1.概述 默认情况下docker pull会从docker hub拉取镜像文件,也可以手动指定一个仓库地址拉取镜像。假如你设置了一个本地仓库地址,那么你只要指定这个地址拉取镜像即可。仓库地址类似一个URL,但是没有协议头http://。 例如从一个镜像地址:myregistry.local:5000,拉取镜像文…
暂无图片
编程学习 ·

杰里之难点播歌曲概率出现卡顿现象篇

其实本身对于杰里的IC性能不是很稳定,客观的评价。 对于TWS经常性的会遇到播歌曲卡顿: 小编总结了以下几个方面: 1 硬件天线首先要调试OK,保证单耳捂住不卡,过8852测试仪器可以通过。 2 软件晶振在合理有效范围内 3 软件主频提升至最高主频 4 提升flash的供电电压 5 保证软…
暂无图片
编程学习 ·

Golang语言基础教程:函数的返回值

函数的返回值1.1 什么是函数的返回值一个函数被调用后,返回给调用处的执行结果,叫做函数的返回值。调用处需要使用变量接收该结果1.2 一个函数可以返回多个值一个函数可以没有返回值,也可以有一个返回值,也可以有返回多个值。package main ​ import "fmt" ​ fu…
暂无图片
编程学习 ·

网络安全技术及应用第3版 主编贾铁军等——教材习题 期末重点 复习题 知识提炼(第8章 防火墙应用技术)

参考教材:网络安全技术及应用 第3版 主编贾铁军等 第8章 防火墙应用技术填空题论述题1)==防火墙的 分类 及 主要技术 有哪些?==2)正确配置防火墙以后,是否能够必然保证网络安全?如果不是,试介绍防火墙的缺点。3) 防火墙阻止SYN Flood攻击,可使用SYN网关防护方式。说明S…
暂无图片
编程学习 ·

标准序列化机制

序列化就是将对象转化为字节流,反序列化就是将字节流转化为对象。1. 基本用法Serializable要让一个类支持序列化,只需要让这个类实现接口 java.io.Serializable,Serializable 没有定义任何方法,只是一个标记接口。比如,对于57节提到的Student类,为支持序列化,可改为:pu…
暂无图片
编程学习 ·

深度优先算法(DFS)的python实现及骑士周游问题解析

背景: 骑士周游问题 在棋盘格里,马走日,遍历所有网格点,找到每个网格都走过,且只有一次的路径。算法实现: 用于解决骑士周游问题的图搜索算法是深度优先搜索(DFS),该算法是逐层建立搜索树,沿着树的单支尽量深入的向下搜索。连接尽量多的顶点,必要时可以进行分支。 深…