博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink State和容错机制
阅读量:5969 次
发布时间:2019-06-19

本文共 2745 字,大约阅读时间需要 9 分钟。

  hot3.png

1. Flink Barriers

     Flink分布式快照的核心元素是流barriers。 这些barriers被注入数据流并与记录一起作为数据流的一部分流动。 barriers永远不会超过记录,流量严格符合要求。 barriers将数据流中的记录分为进入当前快照的记录集和进入下一个快照的记录。 每个barriers都携带快照的ID,该快照的记录barriers前面推送的数据。 barriers不会中断流的流动,因此非常轻量级。 来自不同快照的多个障碍可以同时在流中,这意味着可以同时发生各种快照。

Checkpoint barriers in data streams

2. Flink Checkpoint的过程

    Aligning data streams at operators with multiple inputs

2.1 Barries 对齐过程

(1).   一旦operator从输入流接收到快照barrier n,它就不能处理来自该流的任何其他记录,直到它从其他输入接收到barrier n为止。 否则,它会混合属于快照n的记录和属于快照n + 1的记录。

(2).  包含barrier n的流数据暂时被Operator搁置。 从这些流接收的记录不会被处理,而是放入输入缓冲区。

(3).  一旦最后一个流接收到屏障n,Operator就会向下一个Operator发出所有挂起的流数据,然后自己发出快照n个屏障。

(4).  之后,它将继续处理来自所有输入流的记录,在处理来自流的记录之前,会优先处理来自输入缓冲区的记录。

2.2 Checkpoint 过程

2.2.1 State数据

  • 用户定义的状态:这是由转换函数(如map()或filter())直接创建和修改的状态。   

  • 系统状态:此状态是指作为运算符计算一部分的数据缓冲区。 此状态的典型示例是窗口缓冲区,系统在其中收集(和聚合)窗口记录,直到窗口被评估和逐出。

2.2.2 快照数据

  • 对于每个并行流数据源,启动快照时流中的偏移/位置

  • 对于每个运算符,state数据会被作为快照的一部分

 

Illustration of the Checkpointing Mechanism

2.2.3 State存储配置

    

   Flink使用state: 分为 Keyed State和 Operator State

    

   Keyed State和 Operator State的区别如图

   Flinkç¶æ管çå容éæºå¶ä»ç»

2.2.4 Checkpoint代码设置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// start a checkpoint every 1000 msenv.enableCheckpointing(1000);// advanced options:// set mode to exactly-once (this is the default)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// make sure 500 ms of progress happen between checkpointsenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);// checkpoints have to complete within one minute, or are discardedenv.getCheckpointConfig().setCheckpointTimeout(60000);// allow only one checkpoint to be in progress at the same timeenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// enable externalized checkpoints which are retained after job cancellationenv.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

2.2.5 SavePoint

Savepoint:是一种特殊的checkpoint,只不过不像checkpoint定期的从系统中去触发的,它是用户通过命令触发,存储格式和checkpoint也是不相同的,会将数据按照一个标准的格式存储,不管配置什么样,Flink都会从这个checkpoint恢复,是用来做版本升级一个非常好的工具;

 a. 停止flink任务,并且savepoint

bin/flink cancel -s [:targetDirectory] :jobId

b. 从savepoint中恢复任务

bin/flink run -s :savepointPath [:runArgs]

 

2.3 Exactly Once 和 At Least Once的区别

      由于存在Barriers 对齐的步骤,所以会存在毫秒级别的延迟。如果对实时性要求很高的程序,可以在checkpoint 期间跳过barriers对齐,一旦operator看到每个输入的barrier,就会绘制检查点快照。当跳过对齐时,即使在检查点n的某些检查点barriers到达之后,operator也会继续处理所有输入。这样,在获取检查点n的状态快照之前,operator还处理属于检查点n + 1的元素。在还原时,这些记录将作为重复记录出现,因为它们都包含在检查点n的状态快照中,并将在检查点n之后作为数据的一部分进行重放。

       barriers 对齐仅适用于具有多个前驱(连接)的运算符以及具有多个发送方的运算符(在流repartitioning/shuffle)。因此,数据流在并行流操作(map(),flatMap(),filter(),...)实际上即使在At Least Once模式下也能提供Exactly Once。

// exactly_once 模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// at least once模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

 

转载于:https://my.oschina.net/manmao/blog/3023722

你可能感兴趣的文章
CSU Double Shortest Paths 湖南省第十届省赛
查看>>
webgl像机世界
查看>>
php正则怎么使用(最全最细致)
查看>>
javascript数学运算符
查看>>
LC.155. Min Stack(非优化,两个stack 同步 + -)
查看>>
交互设计[3]--点石成金
查看>>
SCCM TP4部署Office2013
查看>>
Android创建启动画面
查看>>
Linux中date命令的各种实用方法--转载
查看>>
mysqld -install命令时出现install/remove of the service denied错误的原因和解决办法
查看>>
苹果企业版帐号申请记录
查看>>
C++ Error: error LNK2019: unresolved external symbol
查看>>
Bitmap 和Drawable 的区别
查看>>
Java操作mongoDB2.6的常见API使用方法
查看>>
如何给服务器设置邮件警报。
查看>>
CEF js调用C#封装类含注释
查看>>
麦克劳林
查看>>
Eclipse SVN修改用户名和密码
查看>>
架构师的职责都有哪些?
查看>>
SVN: bdb: BDB1538 Program version 5.3 doesn't match environment version 4.7
查看>>