Commit 7220e9e1 authored by 杨林's avatar 杨林

入口文件

parent c2767332
package entry package entry
import flink.GetEnvironment import flink.FlinkEnv
import model.TrackItem
import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation}
import org.apache.flink.table.api.SqlDialect
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
import tools.auth.KerberosAuth
import tools.dataprocess.DataFormat
import tools.json.JsonUtils
import tools.kafka.KafkaConsumer
import tools.properties.PropertiesTools
import java.util.Properties
import scala.language.postfixOps
/** /**
* Created with IntelliJ IDEA. * Created with IntelliJ IDEA.
...@@ -10,13 +23,85 @@ import flink.GetEnvironment ...@@ -10,13 +23,85 @@ import flink.GetEnvironment
* Date: 2021-07-16 * Date: 2021-07-16
* Time: 9:34 * Time: 9:34
*/ */
object Entry { object Entry extends App {
def main(args: Array[String]): Unit = {
// 创建flink环境 //认证
val env = GetEnvironment.env KerberosAuth.auth()
// 添加kafka源
// 添加hive sink // 创建flink环境
val env = FlinkEnv.env
env.execute() // 添加kafka源
}
val kafka = KafkaConsumer.getConsumer
// 设置从头开始消费
kafka.setStartFromEarliest()
// 数据处理流程
private val dsa: DataStream[TrackItem] = env
.addSource(kafka)
// 数据清洗
.map(elem => {
// val res = mutable.HashMap[String, String]()
val map = JsonUtils parseFirstKey elem
map.getOrElse("data", "")
})
.filter(_.nonEmpty)
.map(x =>
DataFormat.formatTrack(x)
)
private val tEnv: StreamTableEnvironment = FlinkEnv.tEnv
private val properties: Properties = PropertiesTools.getProperties
private val catalog: String = properties.getProperty("hive.catalog")
private val hiveDir: String = properties.getProperty("hive.config.path")
private val hive = new HiveCatalog(catalog, "default", hiveDir)
tEnv.registerCatalog(catalog, hive)
tEnv.useCatalog(catalog)
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tEnv.useDatabase("qm_tmp")
tEnv.createTemporaryView("event", dsa)
private val sql: String =
"""
|insert into qm_tmp.event_track partition(dt,hr,mm)
|select
| track_id,
| distinct_id,
| lib_detail,
| lib_version,
| lib,
| app_version,
| lib_method,
| event_name,
| type,
| properties,
| HOST,
| user_agent,
| ua_platform,
| ua_version,
| ua_language,
| connection,
| Pragma,
| cache_control,
| accept,
| accept_encoding,
| ip,
| ip_info,
| url,
| referrer,
| remark,
| user_id,
| created_at,
| from_unixtime(unix_timestamp(created_at),'yyyy-MM-dd') as dt,
| from_unixtime(unix_timestamp(created_at),'HH') as hr,
| from_unixtime(unix_timestamp(created_at),'mm') as mm
| from event
|""".stripMargin
tEnv.executeSql(sql).print()
env.execute("event_track")
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment