Commit 0e703196 authored by 杨林's avatar 杨林

创建flink表运行环境

parent 8637cc17
package flink
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* Created with IntelliJ IDEA.
* Class: GetEnvironment
* Description:
* User: lin
* Date: 2021-07-16
* Time: 9:35
*/
object FlinkEnv {
// 创建流环境
private[this] val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置流表
private[this] val settings: EnvironmentSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
// private val config: TableConfig = TableConfig.getDefault
// 设置检查点
environment
.enableCheckpointing(1000)
.getCheckpointConfig
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
// 设置检查点在一分钟完成
// environment
// .getCheckpointConfig
// .setCheckpointTimeout(60000)
// 确保检查点之间有500毫秒的进程
environment
.getCheckpointConfig
.setMinPauseBetweenCheckpoints(500)
//只允许同时有一个检查点
environment
.getCheckpointConfig
.setMaxConcurrentCheckpoints(1)
// 启用保留的外部检查点
// environment
// .getCheckpointConfig
// .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// environment.setParallelism(3)
/**
* 获取flink运行环境
*
* @return 运行环境
*/
def env: StreamExecutionEnvironment = environment
/**
* 获取flink表环境
*
* @return 运行环境
*/
def tEnv: StreamTableEnvironment = StreamTableEnvironment.create(environment, settings)
}
package flink
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* Created with IntelliJ IDEA.
* Class: GetEnvironment
* Description:
* User: lin
* Date: 2021-07-16
* Time: 9:35
*/
object GetEnvironment {
def env: StreamExecutionEnvironment = {
val environment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置检查点
environment.enableCheckpointing(5000)
.getCheckpointConfig
.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 设置检查点在一分钟完成
environment.getCheckpointConfig.setCheckpointTimeout(60000)
// 确保检查点之间有500毫秒的进程
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
//只允许同时有一个检查点
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
// 启用保留的外部检查点
environment.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
environment.setParallelism(3)
environment
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment