已复制
全屏展示
复制代码

Flink 之 env 配置总结


· 1 min read

一. 检查点配置


// 设置 EXACTLY_ONCE 语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


// Checkpoint 的处理超时时间,超时后不会继续保存这个检查点
env.getCheckpointConfig().setCheckpointTimeout(60000L);


// 最大允许同时处理几个 Checkpoint,指的是同一个任务的 Checkpoint
// 比如: 上一个Checkpoint处理到一半,这里又收到一个待处理的Checkpoint
env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);


// 与上面 setMaxConcurrentCheckpoints(2) 冲突
// 指当前checkpoint 的处理完成时间与接收最新一个 checkpoint 之间的时间间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);


// 如果同时开启了 savepoint 且有更新的备份,是否倾向于使用更老的备份 checkpoint 来恢复,默认false
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);

// 最多能容忍几次 checkpoint 处理失败,默认0,即 checkpoint 处理失败,就当作程序执行异常
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);

二. 重启策略配置

# 设置方法
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 5000));


# 重启超过 10 次,程序停止,每次失败后延迟 5 秒后重启
RestartStrategies.fixedDelayRestart(10, Time.seconds(5))

# 30 秒内重启超过 3 次,程序停止,每次失败后延迟 2 秒重启
RestartStrategies.failureRateRestart(3, Time.seconds(30), Time.seconds(2));

# 从不重启
RestartStrategies.noRestart()
🔗

文章推荐