Commit d2a6c5f7 authored by 杨林's avatar 杨林

Merge branch 'lin' into 'master'

修改入库的字段

See merge request !2
parents 14f67d11 9bed5cc3
......@@ -6,7 +6,7 @@
<groupId>groupId</groupId>
<artifactId>eventtracking</artifactId>
<version>0.5</version>
<version>0.8</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
......
# 消费埋点数据的配置文件
# kafka 主题
kafka.event.topic=queue-buried-point-v1-to-bigdata
#kafka.event.topic=event_tracker
# kafka集群
kafka.cluster=qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092,qmbigdata04:9092,qmbigdata05:9092
#kafka.cluster=8.135.22.177:9092,8.135.53.75:9092,8.135.100.29:9092,8.135.58.206:9092,47.119.179.1:9092
# kafka消费者组
kafka.consumer.group=test-point-group
kafka.consumer.group=point-group
# kafka key 序列化
kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka values 序列化
......@@ -21,7 +18,7 @@ kerberos.hive.path=/opt/software/kerberos/hive.keytab
kerberos.hive.user=hive@QIAOMENG.COM
# kerberos 配置文件地址
kerberos.config.path=/etc/krb5.conf
kerberos.debug=true
kerberos.debug=false
kerberos.javax.auth.useSubjectCredsOnly=false
# hive catalog
hive.catalog=event_tracking
......
......@@ -25,9 +25,6 @@ import scala.language.postfixOps
*/
object Entry extends App {
//认证
KerberosAuth.auth()
// 创建flink环境
val env = FlinkEnv.env
// 添加kafka源
......@@ -42,7 +39,6 @@ object Entry extends App {
.addSource(kafka)
// 数据清洗
.map(elem => {
// val res = mutable.HashMap[String, String]()
val map = JsonUtils parseFirstKey elem
map.getOrElse("data", "")
})
......@@ -51,23 +47,24 @@ object Entry extends App {
DataFormat.formatTrack(x)
)
//认证
KerberosAuth.auth()
private val tEnv: StreamTableEnvironment = FlinkEnv.tEnv
private val properties: Properties = PropertiesTools.getProperties
private val catalog: String = properties.getProperty("hive.catalog")
private val hiveDir: String = properties.getProperty("hive.config.path")
private val hive = new HiveCatalog(catalog, "default", hiveDir)
tEnv.registerCatalog(catalog, hive)
tEnv.useCatalog(catalog)
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tEnv.useDatabase("qm_tmp")
tEnv.createTemporaryView("event", dsa)
// INSERT INTO qm_tmp.event_track_v1 PARTITION(dt,hr,mm)
private val sql: String =
"""
|insert into qm_tmp.event_track partition(dt,hr,mm)
|select
|INSERT INTO qm_tmp.event_track_v1 PARTITION(dt,hr,mm)
|SELECT
| track_id,
| distinct_id,
| lib_detail,
......@@ -89,18 +86,24 @@ object Entry extends App {
| accept,
| accept_encoding,
| ip,
| ip_info,
| url,
| referrer,
| remark,
| user_id,
| created_at,
| from_unixtime(unix_timestamp(created_at),'yyyy-MM-dd') as dt,
| from_unixtime(unix_timestamp(created_at),'HH') as hr,
| from_unixtime(unix_timestamp(created_at),'mm') as mm
| from event
| CAST(created_time AS STRING) AS created_time,
| CAST(updated_time AS STRING) AS updated_time,
| anonymous_id,
| CAST(`time` AS STRING) AS `time`,
| CAST(flush_time AS STRING) AS flush_time,
| FROM_UNIXTIME(created_time,'yyyy-MM-dd') AS dt,
| FROM_UNIXTIME(created_time,'HH') AS hr,
| FROM_UNIXTIME(created_time,'mm') AS mm
| FROM event
|""".stripMargin
// dsa.executeAndCollect()
// .foreach(println)
tEnv.executeSql(sql).print()
env.execute("event_track")
......
package flink
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl
/**
* Created with IntelliJ IDEA.
......@@ -16,22 +18,25 @@ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object FlinkEnv {
// 创建流环境
private[this] val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
private[this] val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(1000)
// 设置流表
private[this] val settings: EnvironmentSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
private[this] val settings: EnvironmentSettings = EnvironmentSettings
.newInstance
.useBlinkPlanner
.inStreamingMode
.build
// private val config: TableConfig = TableConfig.getDefault
private val config: TableConfig = TableConfig.getDefault
// 设置检查点
environment
.enableCheckpointing(1000)
.getCheckpointConfig
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
// 设置检查点在一分钟完成
// environment
// .getCheckpointConfig
// .setCheckpointTimeout(60000)
environment
.getCheckpointConfig
.setCheckpointTimeout(60000)
// 确保检查点之间有500毫秒的进程
environment
.getCheckpointConfig
......@@ -41,12 +46,12 @@ object FlinkEnv {
.getCheckpointConfig
.setMaxConcurrentCheckpoints(1)
// 启用保留的外部检查点
// environment
// .getCheckpointConfig
// .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//
environment
.getCheckpointConfig
.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// environment.setParallelism(3)
// environment.setParallelism(3)
/**
* 获取flink运行环境
......@@ -60,6 +65,6 @@ object FlinkEnv {
*
* @return 运行环境
*/
def tEnv: StreamTableEnvironment = StreamTableEnvironment.create(environment, settings)
def tEnv: StreamTableEnvironmentImpl = StreamTableEnvironmentImpl.create(environment, settings, config)
}
......@@ -9,7 +9,7 @@ package model
* Time: 16:58
*/
case class TrackItem(
track_id: Long,
track_id: String,
distinct_id: String,
lib_detail: String,
lib_version: String,
......@@ -30,10 +30,13 @@ case class TrackItem(
accept: String,
accept_encoding: String,
ip: String,
ip_info: String,
url: String,
referrer: String,
remark: String,
user_id: String,
created_at: String
created_time: Long,
updated_time: Long,
anonymous_id: String,
`time`:Long,
flush_time: Long
)
......@@ -19,7 +19,8 @@ object KerberosAuth {
def auth(): Unit = {
System.setProperty("java.security.krb5.conf", properties.getProperty("kerberos.config.path"))
System.setProperty("javax.security.auth.useSubjectCredsOnly", properties.getProperty("kerberos.javax.auth.useSubjectCredsOnly"))
System.setProperty("sun.security.krb5.debug", "false")
System.setProperty("sun.security.krb5.debug", properties.getProperty("kerberos.debug"))
val conf: Configuration = new Configuration()
conf.set("hadoop.security.authentication", properties.getProperty("hadoop.security.auth"))
UserGroupInformation.setConfiguration(conf)
......
......@@ -7,7 +7,6 @@ import model.IpItem
import java.io.File
import java.net.InetAddress
import scala.io.Source
import scala.language.postfixOps
/**
......@@ -18,9 +17,7 @@ import scala.language.postfixOps
* Date: 2021-07-19
* Time: 18:31
*/
class GeoIp(ip: String) {
private[this] val database: File = new File("src/main/resources/GeoLite2-City.mmdb")
private[this] val asnDatabase = new File("src/main/resources/GeoLite2-ASN.mmdb")
class GeoIp(ip: String, database: File = new File("src/main/resources/GeoLite2-City.mmdb"), asnDatabase: File = new File("src/main/resources/GeoLite2-ASN.mmdb")) {
private[this] val reader: DatabaseReader = new DatabaseReader.Builder(database).build
private[this] val asnReader = new DatabaseReader.Builder(asnDatabase).build
private[this] val address: InetAddress = InetAddress.getByName(ip)
......@@ -51,10 +48,9 @@ class GeoIp(ip: String) {
}
object GeoIp extends App {
def apply(ip: String): GeoIp = new GeoIp(ip)
println(apply("39.149.36.225").getCity)
val source = Source.fromFile(new File("src/main/resources/ip.txt"))
source.getLines().map(x => apply(x).getCity).foreach(println)
def apply(ip: String, city: File, asn: File): GeoIp = new GeoIp(ip, city, asn)
def apply(ip: String) = new GeoIp(ip)
}
......
DROP TABLE IF EXISTS qm_tmp.event_track;
CREATE TABLE IF NOT EXISTS qm_tmp.event_track (
track_id BIGINT COMMENT '事件ID',
CREATE EXTERNAL TABLE IF NOT EXISTS qm_tmp.event_track (
track_id STRING COMMENT '事件ID',
distinct_id STRING COMMENT '设备ID',
lib_detail STRING COMMENT '库详情',
lib_version STRING COMMENT '库版本',
......@@ -22,14 +22,19 @@ CREATE TABLE IF NOT EXISTS qm_tmp.event_track (
accept STRING COMMENT '',
accept_encoding STRING COMMENT '接收编码',
ip STRING COMMENT 'ip',
ip_info STRING COMMENT 'ip信息 json',
url STRING COMMENT '服务器url',
referrer STRING COMMENT '来源页面',
remark STRING COMMENT '环境标记',
user_id STRING COMMENT '用户id',
created_at STRING COMMENT '创建时间'
created_time STRING COMMENT '创建时间',
updated_time STRING COMMENT '',
anonymous_id STRING COMMENT '',
`time` STRING COMMENT '',
_flush_time STRING COMMENT ''
) PARTITIONED BY ( dt string, hr string, mm string )
STORED AS ORC
TBLPROPERTIES (
'is_generic'='false',
'partition.time-extractor.timestamp-pattern' = '$dt $hr:$mm:00',
'sink.partition-commit.delay' = '0s',
'sink.partition-commit.watermark-time-zone' = 'Aisa/Shanghai',
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment