Commit a7dfac64 authored by 杨林's avatar 杨林

程序入口

parent a5e93955
package entry
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import tools.kafka.KafkaConsumer
import org.apache.flink.api.scala._
import tools.dataformat.JD
import tools.es.SaveData
import tools.json.Jackson
import scala.collection.mutable
/**
* Created with IntelliJ IDEA.
* Class: Entry
* Description: 程序入口
* User: lin
* Date: 2021-06-04
* Time: 13:46
*/
object Entry {
def main(args: Array[String]): Unit = {
// 配置环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(1000L)
// kafka源
val consumer = KafkaConsumer.getConsumer
consumer.setStartFromEarliest()
// 添加源
environment.addSource(consumer)
.map(x => {
val res = mutable.Map[String, String]()
val map = Jackson.parseAllKey(x, res)
map
})
.map(x => JD.formatJDData(x))
.map(x => SaveData.saveToES(x))
// .executeAndCollect()
// .foreach(println)
environment.execute("jd")
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment