Commit 9bed5cc3 authored by 杨林's avatar 杨林

修改入库的字段

parent 3af73789
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
<groupId>groupId</groupId> <groupId>groupId</groupId>
<artifactId>eventtracking</artifactId> <artifactId>eventtracking</artifactId>
<version>0.5</version> <version>0.8</version>
<properties> <properties>
<maven.compiler.source>11</maven.compiler.source> <maven.compiler.source>11</maven.compiler.source>
......
# 消费埋点数据的配置文件
# kafka 主题 # kafka 主题
kafka.event.topic=queue-buried-point-v1-to-bigdata kafka.event.topic=queue-buried-point-v1-to-bigdata
#kafka.event.topic=event_tracker
# kafka集群 # kafka集群
kafka.cluster=qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092,qmbigdata04:9092,qmbigdata05:9092 kafka.cluster=qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092,qmbigdata04:9092,qmbigdata05:9092
#kafka.cluster=8.135.22.177:9092,8.135.53.75:9092,8.135.100.29:9092,8.135.58.206:9092,47.119.179.1:9092
# kafka消费者组 # kafka消费者组
kafka.consumer.group=test-point-group kafka.consumer.group=point-group
# kafka key 序列化 # kafka key 序列化
kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka values 序列化 # kafka values 序列化
...@@ -21,7 +18,7 @@ kerberos.hive.path=/opt/software/kerberos/hive.keytab ...@@ -21,7 +18,7 @@ kerberos.hive.path=/opt/software/kerberos/hive.keytab
kerberos.hive.user=hive@QIAOMENG.COM kerberos.hive.user=hive@QIAOMENG.COM
# kerberos 配置文件地址 # kerberos 配置文件地址
kerberos.config.path=/etc/krb5.conf kerberos.config.path=/etc/krb5.conf
kerberos.debug=true kerberos.debug=false
kerberos.javax.auth.useSubjectCredsOnly=false kerberos.javax.auth.useSubjectCredsOnly=false
# hive catalog # hive catalog
hive.catalog=event_tracking hive.catalog=event_tracking
......
...@@ -25,9 +25,6 @@ import scala.language.postfixOps ...@@ -25,9 +25,6 @@ import scala.language.postfixOps
*/ */
object Entry extends App { object Entry extends App {
//认证
KerberosAuth.auth()
// 创建flink环境 // 创建flink环境
val env = FlinkEnv.env val env = FlinkEnv.env
// 添加kafka源 // 添加kafka源
...@@ -42,7 +39,6 @@ object Entry extends App { ...@@ -42,7 +39,6 @@ object Entry extends App {
.addSource(kafka) .addSource(kafka)
// 数据清洗 // 数据清洗
.map(elem => { .map(elem => {
// val res = mutable.HashMap[String, String]()
val map = JsonUtils parseFirstKey elem val map = JsonUtils parseFirstKey elem
map.getOrElse("data", "") map.getOrElse("data", "")
}) })
...@@ -51,23 +47,24 @@ object Entry extends App { ...@@ -51,23 +47,24 @@ object Entry extends App {
DataFormat.formatTrack(x) DataFormat.formatTrack(x)
) )
//认证
KerberosAuth.auth()
private val tEnv: StreamTableEnvironment = FlinkEnv.tEnv private val tEnv: StreamTableEnvironment = FlinkEnv.tEnv
private val properties: Properties = PropertiesTools.getProperties private val properties: Properties = PropertiesTools.getProperties
private val catalog: String = properties.getProperty("hive.catalog") private val catalog: String = properties.getProperty("hive.catalog")
private val hiveDir: String = properties.getProperty("hive.config.path") private val hiveDir: String = properties.getProperty("hive.config.path")
private val hive = new HiveCatalog(catalog, "default", hiveDir) private val hive = new HiveCatalog(catalog, "default", hiveDir)
tEnv.registerCatalog(catalog, hive) tEnv.registerCatalog(catalog, hive)
tEnv.useCatalog(catalog) tEnv.useCatalog(catalog)
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE) tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tEnv.useDatabase("qm_tmp") tEnv.useDatabase("qm_tmp")
tEnv.createTemporaryView("event", dsa) tEnv.createTemporaryView("event", dsa)
// INSERT INTO qm_tmp.event_track_v1 PARTITION(dt,hr,mm)
private val sql: String = private val sql: String =
""" """
|insert into qm_tmp.event_track partition(dt,hr,mm) |INSERT INTO qm_tmp.event_track_v1 PARTITION(dt,hr,mm)
|select |SELECT
| track_id, | track_id,
| distinct_id, | distinct_id,
| lib_detail, | lib_detail,
...@@ -89,18 +86,24 @@ object Entry extends App { ...@@ -89,18 +86,24 @@ object Entry extends App {
| accept, | accept,
| accept_encoding, | accept_encoding,
| ip, | ip,
| ip_info,
| url, | url,
| referrer, | referrer,
| remark, | remark,
| user_id, | user_id,
| created_at, | CAST(created_time AS STRING) AS created_time,
| from_unixtime(unix_timestamp(created_at),'yyyy-MM-dd') as dt, | CAST(updated_time AS STRING) AS updated_time,
| from_unixtime(unix_timestamp(created_at),'HH') as hr, | anonymous_id,
| from_unixtime(unix_timestamp(created_at),'mm') as mm | CAST(`time` AS STRING) AS `time`,
| from event | CAST(flush_time AS STRING) AS flush_time,
| FROM_UNIXTIME(created_time,'yyyy-MM-dd') AS dt,
| FROM_UNIXTIME(created_time,'HH') AS hr,
| FROM_UNIXTIME(created_time,'mm') AS mm
| FROM event
|""".stripMargin |""".stripMargin
// dsa.executeAndCollect()
// .foreach(println)
tEnv.executeSql(sql).print() tEnv.executeSql(sql).print()
env.execute("event_track") env.execute("event_track")
......
package flink package flink
import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.{EnvironmentSettings, TableConfig}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
/** /**
* Created with IntelliJ IDEA. * Created with IntelliJ IDEA.
...@@ -16,22 +18,25 @@ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment ...@@ -16,22 +18,25 @@ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object FlinkEnv { object FlinkEnv {
// 创建流环境 // 创建流环境
private[this] val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment private[this] val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1000)
// 设置流表 // 设置流表
private[this] val settings: EnvironmentSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build private[this] val settings: EnvironmentSettings = EnvironmentSettings
.newInstance
.useBlinkPlanner
.inStreamingMode
.build
// private val config: TableConfig = TableConfig.getDefault private val config: TableConfig = TableConfig.getDefault
// 设置检查点 // 设置检查点
environment environment
.enableCheckpointing(1000)
.getCheckpointConfig .getCheckpointConfig
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) .setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
// 设置检查点在一分钟完成 // 设置检查点在一分钟完成
// environment environment
// .getCheckpointConfig .getCheckpointConfig
// .setCheckpointTimeout(60000) .setCheckpointTimeout(60000)
// 确保检查点之间有500毫秒的进程 // 确保检查点之间有500毫秒的进程
environment environment
.getCheckpointConfig .getCheckpointConfig
...@@ -41,12 +46,12 @@ object FlinkEnv { ...@@ -41,12 +46,12 @@ object FlinkEnv {
.getCheckpointConfig .getCheckpointConfig
.setMaxConcurrentCheckpoints(1) .setMaxConcurrentCheckpoints(1)
// 启用保留的外部检查点 // 启用保留的外部检查点
// environment environment
// .getCheckpointConfig .getCheckpointConfig
// .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(2) environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// environment.setParallelism(3) // environment.setParallelism(3)
/** /**
* 获取flink运行环境 * 获取flink运行环境
...@@ -60,6 +65,6 @@ object FlinkEnv { ...@@ -60,6 +65,6 @@ object FlinkEnv {
* *
* @return 运行环境 * @return 运行环境
*/ */
def tEnv: StreamTableEnvironment = StreamTableEnvironment.create(environment, settings) def tEnv: StreamTableEnvironmentImpl = StreamTableEnvironmentImpl.create(environment, settings, config)
} }
...@@ -9,7 +9,7 @@ package model ...@@ -9,7 +9,7 @@ package model
* Time: 16:58 * Time: 16:58
*/ */
case class TrackItem( case class TrackItem(
track_id: Long, track_id: String,
distinct_id: String, distinct_id: String,
lib_detail: String, lib_detail: String,
lib_version: String, lib_version: String,
...@@ -30,10 +30,13 @@ case class TrackItem( ...@@ -30,10 +30,13 @@ case class TrackItem(
accept: String, accept: String,
accept_encoding: String, accept_encoding: String,
ip: String, ip: String,
ip_info: String,
url: String, url: String,
referrer: String, referrer: String,
remark: String, remark: String,
user_id: String, user_id: String,
created_at: String created_time: Long,
updated_time: Long,
anonymous_id: String,
`time`:Long,
flush_time: Long
) )
...@@ -19,7 +19,8 @@ object KerberosAuth { ...@@ -19,7 +19,8 @@ object KerberosAuth {
def auth(): Unit = { def auth(): Unit = {
System.setProperty("java.security.krb5.conf", properties.getProperty("kerberos.config.path")) System.setProperty("java.security.krb5.conf", properties.getProperty("kerberos.config.path"))
System.setProperty("javax.security.auth.useSubjectCredsOnly", properties.getProperty("kerberos.javax.auth.useSubjectCredsOnly")) System.setProperty("javax.security.auth.useSubjectCredsOnly", properties.getProperty("kerberos.javax.auth.useSubjectCredsOnly"))
System.setProperty("sun.security.krb5.debug", "false") System.setProperty("sun.security.krb5.debug", properties.getProperty("kerberos.debug"))
val conf: Configuration = new Configuration() val conf: Configuration = new Configuration()
conf.set("hadoop.security.authentication", properties.getProperty("hadoop.security.auth")) conf.set("hadoop.security.authentication", properties.getProperty("hadoop.security.auth"))
UserGroupInformation.setConfiguration(conf) UserGroupInformation.setConfiguration(conf)
......
...@@ -7,7 +7,6 @@ import model.IpItem ...@@ -7,7 +7,6 @@ import model.IpItem
import java.io.File import java.io.File
import java.net.InetAddress import java.net.InetAddress
import scala.io.Source
import scala.language.postfixOps import scala.language.postfixOps
/** /**
...@@ -18,9 +17,7 @@ import scala.language.postfixOps ...@@ -18,9 +17,7 @@ import scala.language.postfixOps
* Date: 2021-07-19 * Date: 2021-07-19
* Time: 18:31 * Time: 18:31
*/ */
class GeoIp(ip: String) { class GeoIp(ip: String, database: File = new File("src/main/resources/GeoLite2-City.mmdb"), asnDatabase: File = new File("src/main/resources/GeoLite2-ASN.mmdb")) {
private[this] val database: File = new File("src/main/resources/GeoLite2-City.mmdb")
private[this] val asnDatabase = new File("src/main/resources/GeoLite2-ASN.mmdb")
private[this] val reader: DatabaseReader = new DatabaseReader.Builder(database).build private[this] val reader: DatabaseReader = new DatabaseReader.Builder(database).build
private[this] val asnReader = new DatabaseReader.Builder(asnDatabase).build private[this] val asnReader = new DatabaseReader.Builder(asnDatabase).build
private[this] val address: InetAddress = InetAddress.getByName(ip) private[this] val address: InetAddress = InetAddress.getByName(ip)
...@@ -51,10 +48,9 @@ class GeoIp(ip: String) { ...@@ -51,10 +48,9 @@ class GeoIp(ip: String) {
} }
object GeoIp extends App { object GeoIp extends App {
def apply(ip: String): GeoIp = new GeoIp(ip) def apply(ip: String, city: File, asn: File): GeoIp = new GeoIp(ip, city, asn)
println(apply("39.149.36.225").getCity)
val source = Source.fromFile(new File("src/main/resources/ip.txt")) def apply(ip: String) = new GeoIp(ip)
source.getLines().map(x => apply(x).getCity).foreach(println)
} }
......
DROP TABLE IF EXISTS qm_tmp.event_track; DROP TABLE IF EXISTS qm_tmp.event_track;
CREATE TABLE IF NOT EXISTS qm_tmp.event_track ( CREATE EXTERNAL TABLE IF NOT EXISTS qm_tmp.event_track (
track_id BIGINT COMMENT '事件ID', track_id STRING COMMENT '事件ID',
distinct_id STRING COMMENT '设备ID', distinct_id STRING COMMENT '设备ID',
lib_detail STRING COMMENT '库详情', lib_detail STRING COMMENT '库详情',
lib_version STRING COMMENT '库版本', lib_version STRING COMMENT '库版本',
...@@ -22,14 +22,19 @@ CREATE TABLE IF NOT EXISTS qm_tmp.event_track ( ...@@ -22,14 +22,19 @@ CREATE TABLE IF NOT EXISTS qm_tmp.event_track (
accept STRING COMMENT '', accept STRING COMMENT '',
accept_encoding STRING COMMENT '接收编码', accept_encoding STRING COMMENT '接收编码',
ip STRING COMMENT 'ip', ip STRING COMMENT 'ip',
ip_info STRING COMMENT 'ip信息 json',
url STRING COMMENT '服务器url', url STRING COMMENT '服务器url',
referrer STRING COMMENT '来源页面', referrer STRING COMMENT '来源页面',
remark STRING COMMENT '环境标记', remark STRING COMMENT '环境标记',
user_id STRING COMMENT '用户id', user_id STRING COMMENT '用户id',
created_at STRING COMMENT '创建时间' created_time STRING COMMENT '创建时间',
updated_time STRING COMMENT '',
anonymous_id STRING COMMENT '',
`time` STRING COMMENT '',
_flush_time STRING COMMENT ''
) PARTITIONED BY ( dt string, hr string, mm string ) ) PARTITIONED BY ( dt string, hr string, mm string )
STORED AS ORC
TBLPROPERTIES ( TBLPROPERTIES (
'is_generic'='false',
'partition.time-extractor.timestamp-pattern' = '$dt $hr:$mm:00', 'partition.time-extractor.timestamp-pattern' = '$dt $hr:$mm:00',
'sink.partition-commit.delay' = '0s', 'sink.partition-commit.delay' = '0s',
'sink.partition-commit.watermark-time-zone' = 'Aisa/Shanghai', 'sink.partition-commit.watermark-time-zone' = 'Aisa/Shanghai',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment