Commit 14f67d11 authored by 杨林's avatar 杨林

Merge branch 'lin' into 'master'

Lin

See merge request !1
parents dc799ae3 3af73789
This diff is collapsed.
# 消费埋点数据的配置文件
# kafka 主题
kafka.event.topic=queue-buried-point-v1-to-bigdata
#kafka.event.topic=event_tracker
# kafka集群
kafka.cluster=qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092,qmbigdata04:9092,qmbigdata05:9092
#kafka.cluster=8.135.22.177:9092,8.135.53.75:9092,8.135.100.29:9092,8.135.58.206:9092,47.119.179.1:9092
# kafka消费者组
kafka.consumer.group=test-point-group
# kafka key 序列化
kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka values 序列化
kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka偏移量
kafka.consumer.offset.commit.auto.interval=10000
kafka.consumer.offset.commit.auto=true
# hadoop 认证方式
hadoop.security.auth=Kerberos
# kerberos hive认证密钥
kerberos.hive.path=/opt/software/kerberos/hive.keytab
kerberos.hive.user=hive@QIAOMENG.COM
# kerberos 配置文件地址
kerberos.config.path=/etc/krb5.conf
kerberos.debug=true
kerberos.javax.auth.useSubjectCredsOnly=false
# hive catalog
hive.catalog=event_tracking
# hive 配置文件路径
hive.config.path=/etc/alternatives/hive-conf
\ No newline at end of file
package entry
import flink.FlinkEnv
import model.TrackItem
import org.apache.flink.streaming.api.scala.{DataStream, createTypeInformation}
import org.apache.flink.table.api.SqlDialect
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog
import tools.auth.KerberosAuth
import tools.dataprocess.DataFormat
import tools.json.JsonUtils
import tools.kafka.KafkaConsumer
import tools.properties.PropertiesTools
import java.util.Properties
import scala.language.postfixOps
/**
* Created with IntelliJ IDEA.
* Class: Entry
* Description:
* User: lin
* Date: 2021-07-16
* Time: 9:34
*/
object Entry extends App {
//认证
KerberosAuth.auth()
// 创建flink环境
val env = FlinkEnv.env
// 添加kafka源
val kafka = KafkaConsumer.getConsumer
// 设置从头开始消费
kafka.setStartFromEarliest()
// 数据处理流程
private val dsa: DataStream[TrackItem] = env
.addSource(kafka)
// 数据清洗
.map(elem => {
// val res = mutable.HashMap[String, String]()
val map = JsonUtils parseFirstKey elem
map.getOrElse("data", "")
})
.filter(_.nonEmpty)
.map(x =>
DataFormat.formatTrack(x)
)
private val tEnv: StreamTableEnvironment = FlinkEnv.tEnv
private val properties: Properties = PropertiesTools.getProperties
private val catalog: String = properties.getProperty("hive.catalog")
private val hiveDir: String = properties.getProperty("hive.config.path")
private val hive = new HiveCatalog(catalog, "default", hiveDir)
tEnv.registerCatalog(catalog, hive)
tEnv.useCatalog(catalog)
tEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tEnv.useDatabase("qm_tmp")
tEnv.createTemporaryView("event", dsa)
private val sql: String =
"""
|insert into qm_tmp.event_track partition(dt,hr,mm)
|select
| track_id,
| distinct_id,
| lib_detail,
| lib_version,
| lib,
| app_version,
| lib_method,
| event_name,
| type,
| properties,
| HOST,
| user_agent,
| ua_platform,
| ua_version,
| ua_language,
| connection,
| Pragma,
| cache_control,
| accept,
| accept_encoding,
| ip,
| ip_info,
| url,
| referrer,
| remark,
| user_id,
| created_at,
| from_unixtime(unix_timestamp(created_at),'yyyy-MM-dd') as dt,
| from_unixtime(unix_timestamp(created_at),'HH') as hr,
| from_unixtime(unix_timestamp(created_at),'mm') as mm
| from event
|""".stripMargin
tEnv.executeSql(sql).print()
env.execute("event_track")
}
package flink
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
/**
* Created with IntelliJ IDEA.
* Class: GetEnvironment
* Description:
* User: lin
* Date: 2021-07-16
* Time: 9:35
*/
object FlinkEnv {
// 创建流环境
private[this] val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 设置流表
private[this] val settings: EnvironmentSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
// private val config: TableConfig = TableConfig.getDefault
// 设置检查点
environment
.enableCheckpointing(1000)
.getCheckpointConfig
.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
// 设置检查点在一分钟完成
// environment
// .getCheckpointConfig
// .setCheckpointTimeout(60000)
// 确保检查点之间有500毫秒的进程
environment
.getCheckpointConfig
.setMinPauseBetweenCheckpoints(500)
//只允许同时有一个检查点
environment
.getCheckpointConfig
.setMaxConcurrentCheckpoints(1)
// 启用保留的外部检查点
// environment
// .getCheckpointConfig
// .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
//
environment.getCheckpointConfig.setTolerableCheckpointFailureNumber(2)
// environment.setParallelism(3)
/**
* 获取flink运行环境
*
* @return 运行环境
*/
def env: StreamExecutionEnvironment = environment
/**
* 获取flink表环境
*
* @return 运行环境
*/
def tEnv: StreamTableEnvironment = StreamTableEnvironment.create(environment, settings)
}
package model
/**
* Created with IntelliJ IDEA.
* Class: IpItem
* Description: ip类
* User: lin
* Date: 2021-07-21
* Time: 16:52
*/
case class IpItem(
ip: String,
countryName: String,
countryNameEN: String,
isoCode: String,
subName: String,
subNameEN: String,
subCode: String,
cityName: String,
cityNameEN: String,
organization: String
)
package model
/**
* Created with IntelliJ IDEA.
* Class: TrackItem
* Description:
* User: lin
* Date: 2021-07-22
* Time: 16:58
*/
case class TrackItem(
track_id: Long,
distinct_id: String,
lib_detail: String,
lib_version: String,
lib: String,
app_version: String,
lib_method: String,
event_name: String,
`type`: String,
properties: String,
HOST: String,
user_agent: String,
ua_platform: String,
ua_version: String,
ua_language: String,
`connection`: String,
Pragma: String,
cache_control: String,
accept: String,
accept_encoding: String,
ip: String,
ip_info: String,
url: String,
referrer: String,
remark: String,
user_id: String,
created_at: String
)
package tools.auth
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
import tools.properties.PropertiesTools
/**
* Created with IntelliJ IDEA.
* Class: KerberosAuth
* Description: kerberos认证
* User: lin
* Date: 2021-07-19
* Time: 16:17
*/
object KerberosAuth {
private[this] val properties = PropertiesTools.getProperties
def auth(): Unit = {
System.setProperty("java.security.krb5.conf", properties.getProperty("kerberos.config.path"))
System.setProperty("javax.security.auth.useSubjectCredsOnly", properties.getProperty("kerberos.javax.auth.useSubjectCredsOnly"))
System.setProperty("sun.security.krb5.debug", "false")
val conf: Configuration = new Configuration()
conf.set("hadoop.security.authentication", properties.getProperty("hadoop.security.auth"))
UserGroupInformation.setConfiguration(conf)
UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.hive.user"), properties.getProperty("kerberos.hive.path"))
}
}
package tools.dataprocess
import model.TrackItem
import tools.geoip.GeoIp
import tools.json.JsonUtils
import tools.timestamp.DateHelper
/**
* Created with IntelliJ IDEA.
* Class: FormatTrack
* Description:
* User: lin
* Date: 2021-07-22
* Time: 17:03
*/
object DataFormat {
def formatTrack(x: String): TrackItem = {
val data = JsonUtils parseFirstKey x
val data_decode = data getOrElse("data_decode", "")
val dataMap = JsonUtils parseFirstKey data_decode
val lib = dataMap.getOrElse("lib", "")
val libMap = JsonUtils parseFirstKey lib
val ip = data.getOrElse("ip", "").replaceAll("\"", "")
TrackItem(
track_id = dataMap.getOrElse("_track_id", "").replaceAll("\"", "").toLong,
distinct_id = dataMap.getOrElse("distinct_id", "").replaceAll("\"", ""),
lib_detail = libMap.getOrElse("$lib_detail", "").replaceAll("\"", ""),
lib_version = libMap.getOrElse("$lib_version", "").replaceAll("\"", ""),
lib = libMap.getOrElse("$lib", "").replaceAll("\"", ""),
app_version = libMap.getOrElse("$app_version", "").replaceAll("\"", ""),
lib_method = libMap.getOrElse("$lib_method", "").replaceAll("\"", ""),
event_name = dataMap.getOrElse("event", "").replaceAll("\"", ""),
`type` = dataMap.getOrElse("type", "").replaceAll("\"", ""),
properties = dataMap.getOrElse("properties", ""),
HOST = data.getOrElse("Host", "").replaceAll("\"", ""),
user_agent = data.getOrElse("User_Agent", "").replaceAll("\"", ""),
ua_platform = data.getOrElse("ua_platform", "").replaceAll("\"", ""),
ua_version = data.getOrElse("ua_version", "").replaceAll("\"", ""),
ua_language = data.getOrElse("ua_language", "").replaceAll("\"", ""),
`connection` = data.getOrElse("Connection", "").replaceAll("\"", ""),
Pragma = data.getOrElse("Pragma", "").replaceAll("\"", ""),
cache_control = data.getOrElse("Cache_Control", "").replaceAll("\"", ""),
accept = data.getOrElse("Accept", "").replaceAll("\"", ""),
accept_encoding = data.getOrElse("Accept_Encoding", "").replaceAll("\"", ""),
ip = ip,
ip_info = JsonUtils.parseBeanToString(GeoIp(ip).getCity),
url = data.getOrElse("url", "").replaceAll("\"", ""),
referrer = data.getOrElse("referrer", "").replaceAll("\"", ""),
remark = data.getOrElse("remark", "").replaceAll("\"", ""),
user_id = data.getOrElse("user_id", "").replaceAll("\"", ""),
created_at = DateHelper.parseTimestampToDataTime(data.getOrElse("created_at", "").toLong)
)
}
}
package tools.geoip
import com.maxmind.geoip2.DatabaseReader
import com.maxmind.geoip2.model.{AsnResponse, CityResponse}
import com.maxmind.geoip2.record.{City, Country, Subdivision}
import model.IpItem
import java.io.File
import java.net.InetAddress
import scala.io.Source
import scala.language.postfixOps
/**
* Created with IntelliJ IDEA.
* Class: GeoIp
* Description: 解析ip类
* User: lin
* Date: 2021-07-19
* Time: 18:31
*/
class GeoIp(ip: String) {
private[this] val database: File = new File("src/main/resources/GeoLite2-City.mmdb")
private[this] val asnDatabase = new File("src/main/resources/GeoLite2-ASN.mmdb")
private[this] val reader: DatabaseReader = new DatabaseReader.Builder(database).build
private[this] val asnReader = new DatabaseReader.Builder(asnDatabase).build
private[this] val address: InetAddress = InetAddress.getByName(ip)
private[this] val response: CityResponse = reader.city(address)
private[this] val asnResponse: AsnResponse = asnReader.asn(address)
private[this] val country: Country = response.getCountry
private[this] val subdivision: Subdivision = response.getMostSpecificSubdivision
private[this] val city: City = response.getCity
/**
* 根据给定ip
*
* @return 根据ip得到信息
*/
def getCity: IpItem =
IpItem(
ip = ip,
countryName = country.getNames.get("zh-CN"),
countryNameEN = country.getName, isoCode = country.getIsoCode,
subName = subdivision.getNames.get("zh-CN"),
subNameEN = subdivision.getName,
subCode = subdivision.getIsoCode,
cityName = city.getNames.get("zh-CN"),
cityNameEN = city.getName,
organization = asnResponse.getAutonomousSystemOrganization
)
}
object GeoIp extends App {
def apply(ip: String): GeoIp = new GeoIp(ip)
println(apply("39.149.36.225").getCity)
val source = Source.fromFile(new File("src/main/resources/ip.txt"))
source.getLines().map(x => apply(x).getCity).foreach(println)
}
package tools.gziptools
import java.io.ByteArrayOutputStream
import java.nio.charset.{Charset, StandardCharsets}
import java.util.zip.GZIPOutputStream
/**
* Created with IntelliJ IDEA.
* Class: GzipUtils
* Description: 压缩解压缩工具
* User: lin
* Date: 2021-07-20
* Time: 13:30
*/
object GzipUtils {
/**
* 压缩方法
*
* @param str 要压缩的字符串
* @param charset 字符串编码
* @return 压缩后的字符串
*/
def compress(str: String, charset: Charset): String = {
if (str.isEmpty) {
return ""
}
val out = new ByteArrayOutputStream()
var gzip: GZIPOutputStream = null
try {
gzip = new GZIPOutputStream(out)
gzip.write(str.getBytes(charset))
gzip.close()
} catch {
case exception: Exception => exception.printStackTrace()
}
out.toString(charset)
}
/**
* 默认字符集为UTF_8压缩
*
* @param str 要压缩的字符串
* @return 压缩后的字符串
*/
def compress(str: String): String = {
compress(str, StandardCharsets.UTF_8)
}
}
package tools.json
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
/**
* Created with IntelliJ IDEA.
* Class: Jackson
* Description: json解析类
* User: lin
* Date: 2021-07-19
* Time: 16:25
*/
object JsonUtils {
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
/**
* 解析一级key
*
* @param str 要解析的字符串
* @return 解析结果
*/
def parseFirstKey(str: String): mutable.HashMap[String, String] = {
val map: mutable.HashMap[String, String] = mutable.HashMap()
Try {
// 获取所有节点
val node = mapper.readTree(str)
// 获取所有key
val keys = node.fieldNames()
// 循环所有节点
while (keys.hasNext) {
val next = keys.next()
// 解析一级key
map += (next -> node.get(next).toString)
}
} match {
case Success(value) => map
case Failure(exception) => map
}
}
/**
* 解析所有key
* 递归操作
*
* @param str 要解析的字符串
* @param res 解析结果map
* @return 返回res
*/
def parseAllKey(str: String, res: mutable.HashMap[String, String]): mutable.HashMap[String, String] = {
Try {
// 获取所有节点
val node = mapper.readTree(str)
// 获取节点所有key
val keys = node.fieldNames()
// 循环读取所有key
while (keys.hasNext) {
val key = keys.next()
val chNode = node.get(key)
// 如果子节点是json对象,递归解析子节点
if (chNode.isObject) {
parseAllKey(chNode.toString, res)
}
else {
res += (key -> chNode.toString)
}
}
} match {
// 解析成功
case Success(value) => res
// 解析失败
case Failure(exception) => res
}
}
/**
* 对象转json
*
* @param bean 要转的对象
* @return json结果
*/
def parseBeanToString(bean: Any): String = mapper.writeValueAsString(bean)
/**
* json转数组
*
* @param str json数组
* @return scala数组
*/
def parseStringToArray(str: String): Array[Any] = mapper.readValue(str, classOf[Array[Any]])
}
package tools.kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import tools.properties.PropertiesTools
/**
* Created with IntelliJ IDEA.
* Class: KafkaConsumer
* Description: 创建kafka消费者
* User: lin
* Date: 2021-07-19
* Time: 16:19
*/
object KafkaConsumer {
private[this] val properties: Properties = PropertiesTools.getProperties
private[this] val bootstrapServer: String = properties.getProperty("kafka.cluster")
private[this] val keyDeserializer: String = properties.getProperty("kafka.consumer.key.deserializer")
private[this] val valueDeserializer: String = properties.getProperty("kafka.consumer.value.deserializer")
private[this] val commitInterval: String = properties.getProperty("kafka.consumer.offset.commit.auto.interval")
private[this] val AutoCommit: String = properties.getProperty("kafka.consumer.offset.commit.auto")
private[this] val group:String = properties.getProperty("kafka.consumer.group")
private[this] val topic:String = properties.getProperty("kafka.event.topic")
def getConsumer: FlinkKafkaConsumer[String] = {
val property = new Properties()
// 对接kafka
property.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
property.put(ConsumerConfig.GROUP_ID_CONFIG, group)
property.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
property.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
property.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval)
property.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AutoCommit)
new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), property)
}
}
package tools.properties
import java.util.Properties
/**
* Created with IntelliJ IDEA.
* Class: PropertiesTools
* Description: 属性获取类
* User: lin
* Date: 2021-07-19
* Time: 16:19
*/
object PropertiesTools {
private[this] val properties = new Properties()
properties.load(this.getClass.getResourceAsStream("/event.properties"))
def getProperties: Properties = properties
}
package tools.timestamp
import java.text.SimpleDateFormat
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import java.time.{OffsetDateTime, ZoneId, ZonedDateTime}
import java.util.Date
import scala.language.postfixOps
/**
* Created with IntelliJ IDEA.
* Class: DateHelper
* Description: 时间处理类
* User: lin
* Date: 2021-07-19
* Time: 16:17
*/
object DateHelper {
private[this] val DATE_TIME_FORMATTER: DateTimeFormatter = new DateTimeFormatterBuilder().appendOptional(DateTimeFormatter.ISO_DATE_TIME).appendOptional(DateTimeFormatter.ISO_OFFSET_DATE_TIME).appendOptional(DateTimeFormatter.ISO_INSTANT).appendOptional(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SX")).appendOptional(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ssX")).appendOptional(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).toFormatter.withZone(ZoneId.systemDefault())
/**
* 把时间字符串转换成标准时间
*
* @param str 时间字符串
* @return 标准时间
*/
def parseDateTimeString(str: String): OffsetDateTime = ZonedDateTime.from(DATE_TIME_FORMATTER.parse(str)).toOffsetDateTime
/**
* 时间戳转换成时间
*
* @param timestamp 时间戳(毫秒)
* @param pattern 时间格式
* @return 指定格式的日期
*/
def parseTimestampToDataTime(timestamp: Long, pattern: String): String = new SimpleDateFormat(pattern).format(new Date(formatTimestamp(timestamp)))
/**
* 时间戳转换成时间(默认格式)
*
* @param timestamp 时间戳(毫秒)
* @return 默认格式“yyyy-MM-dd HH:mm:ss”
*/
def parseTimestampToDataTime(timestamp: Long): String = parseTimestampToDataTime(formatTimestamp(timestamp), "yyyy-MM-dd HH:mm:ss")
/**
* 把时间格式定为毫秒,如果不是毫秒 x1000
*
* @param timestamp 时间戳
* @return 格式化时间戳
*/
def formatTimestamp(timestamp: Long): Long = if (timestamp.toString.length < 13) timestamp * 1000 else timestamp
/**
* 把时间转换成时间戳
*
* @param time 时间
* @param pattern 时间格式
* @return 时间戳
*/
def parseDateTimeToTimestamp(time: String, pattern: String): Long = new SimpleDateFormat(pattern).parse(time).getTime
/**
* 把时间转换成时间戳(默认格式)
*
* @param time 时间
* @return 时间戳(默认格式:”yyyy-MM-dd HH:mm:ss“)
*/
def parseDateTimeToTimestamp(time: String): Long = parseDateTimeToTimestamp(time, "yyyy-MM-dd HH:mm:ss")
}
DROP TABLE IF EXISTS qm_tmp.event_track;
CREATE TABLE IF NOT EXISTS qm_tmp.event_track (
track_id BIGINT COMMENT '事件ID',
distinct_id STRING COMMENT '设备ID',
lib_detail STRING COMMENT '库详情',
lib_version STRING COMMENT '库版本',
lib STRING COMMENT '库操作系统',
app_version STRING COMMENT 'app版本号',
lib_method STRING COMMENT '库方法',
event_name STRING COMMENT '事件名称',
`type` STRING COMMENT '记录类型,区分本条数据是埋点数据还是其他数据',
properties STRING COMMENT '埋点事件属性 json数组',
HOST STRING COMMENT '埋点服务器',
user_agent STRING COMMENT '用户代理',
ua_platform STRING COMMENT '用户手机平台 操作系统类型',
ua_version STRING COMMENT '操作系统版本',
ua_language STRING COMMENT '操作系统语言',
`connection` STRING COMMENT '连接状态',
Pragma STRING COMMENT 'Http协议版本号',
cache_control STRING COMMENT '缓存控制',
accept STRING COMMENT '',
accept_encoding STRING COMMENT '接收编码',
ip STRING COMMENT 'ip',
ip_info STRING COMMENT 'ip信息 json',
url STRING COMMENT '服务器url',
referrer STRING COMMENT '来源页面',
remark STRING COMMENT '环境标记',
user_id STRING COMMENT '用户id',
created_at STRING COMMENT '创建时间'
) PARTITIONED BY ( dt string, hr string, mm string )
TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern' = '$dt $hr:$mm:00',
'sink.partition-commit.delay' = '0s',
'sink.partition-commit.watermark-time-zone' = 'Aisa/Shanghai',
'sink.partition-commit.policy.kind' = 'metastore'
);
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment