Commit 6ae2daeb authored by 杨林's avatar 杨林

创建flink运行环境

parent dc799ae3
package flink
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* Created with IntelliJ IDEA.
* Class: GetEnvironment
* Description:
* User: lin
* Date: 2021-07-16
* Time: 9:35
*/
object GetEnvironment {
def env: StreamExecutionEnvironment = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置检查点
environment.enableCheckpointing(5000)
.getCheckpointConfig
.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置检查点在一分钟完成
environment.getCheckpointConfig.setCheckpointTimeout(60000)
// 确保检查点之间有500毫秒的进程
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//只允许同时有一个检查点
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 启用保留的外部检查点
environment.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
environment.setParallelism(3)
environment
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment