Flink 之 env 配置总结
一. 检查点配置
// 设置 EXACTLY_ONCE 语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// Checkpoint 的处理超时时间,超时后不会继续保存这个检查点
env.getCheckpointConfig().setCheckpointTimeout(60000L);
// 最大允许同时处理几个 Checkpoint,指的是同一个任务的 Checkpoint
// 比如: 上一个Checkpoint处理到一半,这里又收到一个待处理的Checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
// 与上面 setMaxConcurrentCheckpoints(2) 冲突
// 指当前checkpoint 的处理完成时间与接收最新一个 checkpoint 之间的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
// 如果同时开启了 savepoint 且有更新的备份,是否倾向于使用更老的备份 checkpoint 来恢复,默认false
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 最多能容忍几次 checkpoint 处理失败,默认0,即 checkpoint 处理失败,就当作程序执行异常
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
二. 重启策略配置
# 设置方法
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 5000));
# 重启超过 10 次,程序停止,每次失败后延迟 5 秒后重启
RestartStrategies.fixedDelayRestart(10, Time.seconds(5))
# 30 秒内重启超过 3 次,程序停止,每次失败后延迟 2 秒重启
RestartStrategies.failureRateRestart(3, Time.seconds(30), Time.seconds(2));
# 从不重启
RestartStrategies.noRestart()