1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package qm.flink
import qm.jackson.Jackson
import qm.model.ActionItem
import java.util
import scala.collection.mutable.ArrayBuffer
/**
* @ClassName: Analysis
* @Description: TODO
* @Create
* @Date: 2021/2/11 12:33
*/
object Analysis {
/**
* 对接过来的数据进行处理,返回多个平台比价
*
* @param x 要处理的数据,json字符串
* @return 返回搜索结果
*/
def analysis(x: String): Array[ActionItem] = {
//按照指定的字符进行切割
val collections: Array[String] = x.split("#QM#MQ#")
if (collections.length < 2) {
return Array(ActionItem("", "", "", "0000", "", "", "", "", "", "", "", "","","","","","","","",""))
}
val head: String = collections(0)
val tail: String = collections(1)
//用jackson处理json字符串
val headMap = new util.HashMap[String, Any]()
val tailMap = new util.HashMap[String, Any]()
Jackson.autoParseJson(head, headMap)
Jackson.autoParseJson(tail, tailMap)
//创建时间
val ctime = (java.lang.Long.valueOf(headMap.getOrDefault("ctime", 0L).toString) / 1000).toString
//用户的IP地址
val ip = headMap.getOrDefault("ip", "").toString
//取出单条埋点数据的信息
val project = tailMap.getOrDefault("projects", "").toString
//设备id
var deviceId = tailMap.getOrDefault("device_id", "").toString
if (deviceId.isEmpty) deviceId=tailMap.getOrDefault("idfa_id", "").toString
if (project.isEmpty) {
return Array(ActionItem(ip, deviceId, "", ctime, "", "", "", "", "", "", "", "","","","","","","","",""))
}
val events = parseProjects(project)
val results = new ArrayBuffer[ActionItem]()
for (event <- events) {
val eventMap = new util.HashMap[String, Any]()
Jackson.autoParseJson(event._2,eventMap)
val page_id = eventMap.getOrDefault("page_id", "").toString
val last_page_id = eventMap.getOrDefault("last_page_id", "").toString
val appear_time = eventMap.getOrDefault("appear_time", "").toString
val disappear_time = eventMap.getOrDefault("disappear_time", "").toString
val extras = eventMap.getOrDefault("extras", "").toString
val click_id = eventMap.getOrDefault("click_id","").toString
val click_time = eventMap.getOrDefault("click_time","").toString
val exposure_id = eventMap.getOrDefault("exposure_id","").toString
val exposure_time = eventMap.getOrDefault("exposure_time","").toString
val extras_map = new util.HashMap[String, Any]()
Jackson.autoParseJson(extras,extras_map)
val user_id = extras_map.getOrDefault("user_id", "").toString
val source = extras_map.getOrDefault("source", "").toString
val itemId = extras_map.getOrDefault("itemId", "").toString
val platform = extras_map.getOrDefault("platform", "").toString
val inter = eventMap.getOrDefault("inter", "").toString
val app_channel = eventMap.getOrDefault("app_channel", "").toString
val app_version = eventMap.getOrDefault("app_version", "").toString
results.append(ActionItem(ip, deviceId, event._1,ctime,page_id,last_page_id,appear_time,disappear_time,extras,click_id,click_time,exposure_id,exposure_time,inter,app_channel,app_version,user_id,source,itemId,platform))
}
results.toArray
}
def parseProjects(projectStr: String): Array[(String, String)] = {
// json数组转数组
val projects: Array[AnyRef] = Jackson.string2Array(projectStr)
val eventTuple = new ArrayBuffer[(String, String)]()
//数组不为空
if (projects.nonEmpty) {
val eventMap = new util.HashMap[String, Any]()
for (project <- projects) {
Jackson.autoParseJson(project.toString, eventMap)
val eventName = eventMap.getOrDefault("event_name", "").toString
val eventDataStr = eventMap.getOrDefault("event_data", "").toString
// 解析eventData
val eventdata = Jackson.string2Array(eventDataStr)
for (data <- eventdata) {
eventTuple.append((eventName, data.toString))
}
}
}
eventTuple.toArray
}
}