package qm.flink import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.catalog.hive.HiveCatalog import qm.auth.KerberosAuth import qm.kafka.KafkaConsumer import qm.model.ActionItem /** * @ClassName: Flink * @Description: TODO * @Create * @Date: 2021/2/10 9:54 */ object FlinkEntry { def entry(): Unit = { val env = StreamExecutionEnvironment .getExecutionEnvironment .enableCheckpointing(100000) env.getCheckpointConfig.setCheckpointTimeout(50000) env.getCheckpointConfig.setTolerableCheckpointFailureNumber(2) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv = StreamTableEnvironment.create(env, settings) //对接kafka val consumer: FlinkKafkaConsumer[String] = KafkaConsumer.getConsumer //从头开始消费 consumer.setStartFromEarliest() //添加源并设置并行度 val ds: DataStream[String] = env.addSource(consumer).setParallelism(4) // 处理入口 val dsa: DataStream[ActionItem] = ds.map(x => { // 数据处理流程 Analysis.analysis(x) }) //将数组压平 .flatMap(x => x) //过滤掉不合理的数据 .filter(x => x.eventName.nonEmpty) .filter(x => x.ctime.nonEmpty) .map(item => { item }) KerberosAuth.auth(false) // 构造hive catalog val name = "point_hive"; // Catalog 名称 val defaultDatabase = "default"; // 数据库名称 val hiveConfDir = "/etc/alternatives/hive-conf"; // hive 的配置路径 val hive = new HiveCatalog(name, defaultDatabase, hiveConfDir) tEnv.registerCatalog("point_hive", hive) tEnv.useCatalog("point_hive") tEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tEnv.useDatabase("qm_tmp") tEnv.createTemporaryView("users", dsa) val sql = """ |CREATE table if NOT EXISTS qm_tmp.point( | ip string, | deviceId string, | eventName string, | ctime string, | page_id string, | last_page_id string, | appear_time string, | disappear_time string, | extras string, | click_id string, | click_time string, | exposure_id string, | exposure_time string, | inter string, | app_channel string, | app_version string, | user_id string, | source string, | itemId string, | platform string |) |PARTITIONED BY (dt string,hr string,mm string) |TBLPROPERTIES('partition.time-extractor.timestamp-pattern' = '$dt $hr:$mm:00', | 'sink.partition-commit.delay' = '0s', | 'sink.partition-commit.watermark-time-zone' = 'Aisa/Shanghai', | 'sink.partition-commit.policy.kind' = 'metastore') |""".stripMargin tEnv.executeSql(sql) val insertSql = """ |insert into qm_tmp.point PARTITION(dt,hr,mm) |select |ip, |deviceId, |eventName, |ctime, |page_id, |last_page_id, |appear_time, |disappear_time, |extras, |click_id, |click_time, |exposure_id, |exposure_time, |inter, |app_channel, |app_version, |user_id, |source, |itemId, |platform, |from_unixtime(cast (ctime as bigint),'yyyy-MM-dd')as dt, |from_unixtime(cast (ctime as bigint),'HH')as hr, |from_unixtime(cast (ctime as bigint),'mm')as mm |from users |""".stripMargin tEnv.executeSql(insertSql).print() env.execute("point") } }