Commit 9737e9b2 authored by 杨林's avatar 杨林

增加对京东数据的支持

parent 83b66d06
...@@ -3,10 +3,10 @@ package entry ...@@ -3,10 +3,10 @@ package entry
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import tools.kafka.KafkaConsumer import tools.kafka.KafkaConsumer
import org.apache.flink.api.scala._ import org.apache.flink.api.scala._
import tools.dataformat.TB import tools.dataformat.Data
import tools.es.SaveData
import tools.json.Jackson import tools.json.Jackson
import tools.properties.PropertiesTools
import java.util.Properties
import scala.collection.mutable import scala.collection.mutable
/** /**
...@@ -19,25 +19,53 @@ import scala.collection.mutable ...@@ -19,25 +19,53 @@ import scala.collection.mutable
*/ */
object Entry { object Entry {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {
val properties: Properties = PropertiesTools.getProperties
// 消费京东和淘宝的数据
val tb_topic: String = properties.getProperty("kafka.tb.topic")
val tb_group: String = properties.getProperty("kafka.tb.consumer.group")
val jd_topic: String = properties.getProperty("kafka.jd.topic")
val jd_group: String = properties.getProperty("kafka.jd.consumer.group")
// 配置环境 // 配置环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(1000L) // .enableCheckpointing(1000L)
// kafka源 // kafka源
val consumer = KafkaConsumer.getConsumer val tb_consumer = KafkaConsumer.getConsumer(tb_topic, tb_group)
consumer.setStartFromEarliest() val jd_consumer = KafkaConsumer.getConsumer(jd_topic, jd_group)
tb_consumer.setStartFromEarliest()
jd_consumer.setStartFromEarliest()
// 添加源 // 清洗淘宝数据
environment.addSource(consumer) val tbSource = environment
.addSource(tb_consumer)
.map(tb => {
val res = mutable.Map[String, String]()
Jackson.parseAllKey(tb, res)
})
.map(x => Data.formatTBData(x))
// 清洗京东数据
val jdSource = environment
.addSource(jd_consumer)
.map(x => { .map(x => {
val res = mutable.Map[String, String]() val res = mutable.Map[String, String]()
val map = Jackson.parseAllKey(x, res) Jackson.parseAllKey(x, res)
map
}) })
.map(x => TB.formatTBData(x)) .map(x => Data.formatJDData(x))
.map(x => SaveData.saveToES(x)) // 合并流
// .executeAndCollect() val ds = tbSource.union(jdSource)
// .foreach(println)
//存入ES
ds
// .map(x => SaveData.saveToES(x))
.executeAndCollect()
.foreach(println)
environment.execute("tb") // environment.execute("jingdong")
} }
} }
...@@ -15,15 +15,13 @@ import tools.properties.PropertiesTools ...@@ -15,15 +15,13 @@ import tools.properties.PropertiesTools
*/ */
object KafkaConsumer { object KafkaConsumer {
private[this] val properties: Properties = PropertiesTools.getProperties private[this] val properties: Properties = PropertiesTools.getProperties
private[this] val topic: String = properties.getProperty("kafka.topic")
private[this] val bootstrapServer: String = properties.getProperty("kafka.cluster") private[this] val bootstrapServer: String = properties.getProperty("kafka.cluster")
private[this] val group: String = properties.getProperty("kafka.consumer.group")
private[this] val keyDeserializer: String = properties.getProperty("kafka.consumer.key.deserializer") private[this] val keyDeserializer: String = properties.getProperty("kafka.consumer.key.deserializer")
private[this] val valueDeserializer: String = properties.getProperty("kafka.consumer.value.deserializer") private[this] val valueDeserializer: String = properties.getProperty("kafka.consumer.value.deserializer")
private[this] val commitInterval: String = properties.getProperty("kafka.consumer.offset.commit.auto.interval") private[this] val commitInterval: String = properties.getProperty("kafka.consumer.offset.commit.auto.interval")
private[this] val AutoCommit: String = properties.getProperty("kafka.consumer.offset.commit.auto") private[this] val AutoCommit: String = properties.getProperty("kafka.consumer.offset.commit.auto")
def getConsumer: FlinkKafkaConsumer[String] = { def getConsumer(topic: String, group: String): FlinkKafkaConsumer[String] = {
val property = new Properties() val property = new Properties()
// 对接kafka // 对接kafka
property.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer) property.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment