Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
J
jd_goods
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
bigdata
jd_goods
Commits
dac2e6a5
Commit
dac2e6a5
authored
Jun 11, 2021
by
杨林
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'yang' into 'master'
京东商品解析完成 See merge request
!1
parents
c55b0d08
b78253c1
Changes
18
Expand all
Hide whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
1501 additions
and
0 deletions
+1501
-0
pom.xml
pom.xml
+277
-0
src/main/resources/app.properties
src/main/resources/app.properties
+23
-0
src/main/scala/entry/Entry.scala
src/main/scala/entry/Entry.scala
+43
-0
src/main/scala/model/CouponItem.scala
src/main/scala/model/CouponItem.scala
+17
-0
src/main/scala/model/Item.scala
src/main/scala/model/Item.scala
+52
-0
src/main/scala/tools/base64/Base64Tools.scala
src/main/scala/tools/base64/Base64Tools.scala
+19
-0
src/main/scala/tools/dataformat/JD.scala
src/main/scala/tools/dataformat/JD.scala
+94
-0
src/main/scala/tools/es/ESClient.scala
src/main/scala/tools/es/ESClient.scala
+81
-0
src/main/scala/tools/es/ESUtils.scala
src/main/scala/tools/es/ESUtils.scala
+267
-0
src/main/scala/tools/es/SaveData.scala
src/main/scala/tools/es/SaveData.scala
+28
-0
src/main/scala/tools/es/create_es.json
src/main/scala/tools/es/create_es.json
+133
-0
src/main/scala/tools/es/es.md
src/main/scala/tools/es/es.md
+143
-0
src/main/scala/tools/http/HttpTools.scala
src/main/scala/tools/http/HttpTools.scala
+82
-0
src/main/scala/tools/json/Jackson.scala
src/main/scala/tools/json/Jackson.scala
+138
-0
src/main/scala/tools/kafka/KafkaConsumer.scala
src/main/scala/tools/kafka/KafkaConsumer.scala
+37
-0
src/main/scala/tools/properties/PropertiesTools.scala
src/main/scala/tools/properties/PropertiesTools.scala
+18
-0
src/main/scala/tools/timestamp/TimestampTools.scala
src/main/scala/tools/timestamp/TimestampTools.scala
+20
-0
src/main/scala/tools/unicode/UnicodeUtils.scala
src/main/scala/tools/unicode/UnicodeUtils.scala
+29
-0
No files found.
pom.xml
0 → 100644
View file @
dac2e6a5
This diff is collapsed.
Click to expand it.
src/main/resources/app.properties
0 → 100644
View file @
dac2e6a5
# kafka 主题
kafka.topic
=
jd_item_list
#kafka.topic = jdgoods
# kafka集群
kafka.cluster
=
qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092,qmbigdata04:9092,qmbigdata05:9092
#kafka.cluster=8.135.22.177:9092,8.135.53.75:9092,8.135.100.29:9092,8.135.58.206:9092,47.119.179.1:9092
# kafka消费者组
kafka.consumer.group
=
jd-goods-group
# kafka key 序列化
kafka.consumer.key.deserializer
=
org.apache.kafka.common.serialization.StringDeserializer
# kafka values 序列化
kafka.consumer.value.deserializer
=
org.apache.kafka.common.serialization.StringDeserializer
# kafka偏移量
kafka.consumer.offset.commit.auto.interval
=
10000
kafka.consumer.offset.commit.auto
=
true
# 算法分词接口
nlp.interface.url
=
http://bigdata.xiaomanxiong.com/nlp
# es机器
es.cluster
=
qmbigdata01:9200,qmbigdata02:9200,qmbigdata03:9200,qmbigdata04:9200,qmbigdata05:9200
# 本地测试
#es.cluster=localhost:9200
# es索引
es.indices
=
goods_v2
\ No newline at end of file
src/main/scala/entry/Entry.scala
0 → 100644
View file @
dac2e6a5
package
entry
import
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import
tools.kafka.KafkaConsumer
import
org.apache.flink.api.scala._
import
tools.dataformat.JD
import
tools.es.SaveData
import
tools.json.Jackson
import
scala.collection.mutable
/**
* Created with IntelliJ IDEA.
* Class: Entry
* Description: 程序入口
* User: lin
* Date: 2021-06-04
* Time: 13:46
*/
object
Entry
{
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
// 配置环境
val
environment
=
StreamExecutionEnvironment
.
getExecutionEnvironment
environment
.
enableCheckpointing
(
1000L
)
// kafka源
val
consumer
=
KafkaConsumer
.
getConsumer
consumer
.
setStartFromEarliest
()
// 添加源
environment
.
addSource
(
consumer
)
.
map
(
x
=>
{
val
res
=
mutable
.
Map
[
String
,
String
]()
val
map
=
Jackson
.
parseAllKey
(
x
,
res
)
map
})
.
map
(
x
=>
JD
.
formatJDData
(
x
))
.
map
(
x
=>
SaveData
.
saveToES
(
x
))
// .executeAndCollect()
// .foreach(println)
environment
.
execute
(
"jd"
)
}
}
src/main/scala/model/CouponItem.scala
0 → 100644
View file @
dac2e6a5
package
model
/**
* Created with IntelliJ IDEA.
* Class: CouponItem
* Description:
* User: lin
* Date: 2021-06-04
* Time: 17:56
*/
case
class
CouponItem
(
discount
:
Double
,
link
:
String
,
quota
:
Double
,
useEndTime
:
Long
,
useStartTime
:
Long
)
src/main/scala/model/Item.scala
0 → 100644
View file @
dac2e6a5
package
model
import
com.fasterxml.jackson.annotation.JsonIgnoreProperties
/**
* @ClassName: Item
* @Description: TODO
* @Create by: LinYoung
* @Date: 2021/1/22 14:32
*/
@JsonIgnoreProperties
(
ignoreUnknown
=
true
)
case
class
Item
(
platform
:
Int
,
//商品平台
item_id
:
String
,
//商品id
item_name
:
String
,
//商品名称
item_desc
:
String
,
//商品描述
brand_name
:
String
,
//品牌名称
item_url
:
String
,
//商品链接
item_pic_url
:
String
,
//商品图片链接
item_volume
:
String
,
//商品销量
origin_category
:
String
,
//商品原始类目
origin_category_id
:
String
,
//商品原始类目id
new_category
:
String
,
//商品类目
new_category_id
:
String
,
//商品类目id
price
:
Double
,
//商品原价
coupon_price
:
Double
,
//商品券后价
commission_rate
:
Double
,
//佣金比例
coupon_commission
:
Double
,
//券后佣金
commission_start_time
:
Long
,
//计划开始时间
commission_end_time
:
Long
,
//计划结束时间
coupon_amount
:
Double
,
//优惠券额度
coupon_tips
:
String
,
//优惠券满减信息
coupon_count
:
Int
,
//优惠券数量
coupon_remain
:
Int
,
//优惠券剩余量
coupon_start_time
:
Long
,
//优惠券开始时间
coupon_end_time
:
Long
,
//优惠券结束时间
coupon_link
:
String
,
//优惠券链接
item_label
:
String
,
//商品标签
shop_id
:
String
,
//店铺id
shop_name
:
String
,
//店铺名称
shop_level
:
Double
,
//店铺等级
description_level
:
Double
,
//描述分
express_level
:
Double
,
//物流分
server_level
:
Double
,
//服务分
shop_type
:
Int
,
//店铺类型
createTime
:
Long
//创建时间
)
src/main/scala/tools/base64/Base64Tools.scala
0 → 100644
View file @
dac2e6a5
package
tools.base64
import
java.util.Base64
/**
* @ClassName: qm.tools
* @Description: base64工具
* @Author: LinYoung
* @Date: 2021/4/15
* @Time: 20:21
*/
object
Base64Tools
{
//Base64编码
private
[
this
]
val
encoder
:
Base64.Encoder
=
Base64
.
getEncoder
def
encode
(
str
:
String
)
:
String
=
{
encoder
.
encodeToString
(
str
.
getBytes
)
}
}
src/main/scala/tools/dataformat/JD.scala
0 → 100644
View file @
dac2e6a5
package
tools.dataformat
import
model.
{
CouponItem
,
Item
}
import
tools.http.HttpTools
import
tools.json.Jackson
import
scala.collection.mutable
import
scala.collection.immutable
/**
* Created with IntelliJ IDEA.
* Class: TB
* Description:
* User: lin
* Date: 2021-06-04
* Time: 14:18
*/
object
JD
{
/**
* 解析淘宝字段
*
* @param x 淘宝字段映射
* @return 返回格式化结果
*/
def
formatJDData
(
x
:
mutable.Map
[
String
,
String
])
:
Item
=
{
val
title
=
x
.
getOrElse
(
"skuName"
,
""
).
replaceAll
(
"\""
,
""
)
val
list
=
Jackson
.
parseStringToArray
(
x
.
getOrElse
(
"couponList"
,
"[]"
))
val
image
=
Jackson
.
parseStringToArray
(
x
.
getOrElse
(
"imageList"
,
"[]"
))
// 取第一张券
var
couponItem
:
CouponItem
=
null
if
(
list
.
isEmpty
)
{
couponItem
=
CouponItem
(
0.0
,
""
,
0.0
,
0L
,
0L
)
}
else
{
val
coupon
=
list
(
0
).
asInstanceOf
[
immutable.HashMap
[
String
,
Any
]]
couponItem
=
CouponItem
(
discount
=
coupon
.
getOrElse
(
"discount"
,
"0"
).
toString
.
replaceAll
(
"\""
,
""
).
toDouble
,
link
=
coupon
.
getOrElse
(
"link"
,
""
).
toString
.
replaceAll
(
"\""
,
""
),
quota
=
coupon
.
getOrElse
(
"quota"
,
"0"
).
toString
.
replaceAll
(
"\""
,
""
).
toDouble
,
useEndTime
=
coupon
.
getOrElse
(
"useEndTime"
,
"0"
).
toString
.
replaceAll
(
"\""
,
""
).
toLong
,
useStartTime
=
coupon
.
getOrElse
(
"useStartTime"
,
"0"
).
toString
.
replaceAll
(
"\""
,
""
).
toLong
)
}
var
imageUrl
:
String
=
null
if
(
image
.
isEmpty
)
{
imageUrl
=
""
}
else
{
val
map
=
image
(
0
).
asInstanceOf
[
immutable.Map
[
String
,
Any
]]
imageUrl
=
map
.
getOrElse
(
"url"
,
""
).
toString
}
val
price
=
x
.
getOrElse
(
"price"
,
""
).
replaceAll
(
"\""
,
""
).
toDouble
Item
(
platform
=
2
,
item_id
=
x
.
getOrElse
(
"skuId"
,
""
).
replaceAll
(
"\""
,
""
),
item_name
=
title
,
item_desc
=
x
.
getOrElse
(
"document"
,
""
).
replaceAll
(
"\""
,
""
),
brand_name
=
x
.
getOrElse
(
"brandName"
,
""
).
replaceAll
(
"\""
,
""
),
item_url
=
x
.
getOrElse
(
"materialUrl"
,
""
).
replaceAll
(
"\""
,
""
),
item_pic_url
=
imageUrl
,
item_volume
=
x
.
getOrElse
(
"inOrderCount30Days"
,
""
).
replaceAll
(
"\""
,
""
),
origin_category
=
x
.
getOrElse
(
"cid1Name"
,
""
).
replaceAll
(
"\""
,
""
)
+
"/"
+
x
.
getOrElse
(
"cid2Name"
,
""
).
replaceAll
(
"\""
,
""
)
+
"/"
+
x
.
getOrElse
(
"cid3Name"
,
""
).
replaceAll
(
"\""
,
""
),
origin_category_id
=
x
.
getOrElse
(
"cid1"
,
""
).
replaceAll
(
"\""
,
""
)
+
"/"
+
x
.
getOrElse
(
"cid2"
,
""
).
replaceAll
(
"\""
,
""
)
+
"/"
+
x
.
getOrElse
(
"cid3"
,
""
).
replaceAll
(
"\""
,
""
),
new_category
=
""
,
new_category_id
=
""
,
price
=
price
,
coupon_price
=
price
-
couponItem
.
discount
,
commission_rate
=
x
.
getOrElse
(
"commissionShare"
,
"0"
).
replaceAll
(
"\""
,
""
).
toDouble
/
100
,
coupon_commission
=
x
.
getOrElse
(
"couponCommission"
,
"0"
).
toDouble
,
commission_start_time
=
0L
,
commission_end_time
=
0L
,
coupon_amount
=
couponItem
.
discount
,
coupon_tips
=
"满"
+
couponItem
.
quota
+
"减"
+
couponItem
.
discount
,
coupon_count
=
9999
,
coupon_remain
=
9999
,
coupon_start_time
=
couponItem
.
useStartTime
,
coupon_end_time
=
couponItem
.
useEndTime
,
coupon_link
=
couponItem
.
link
,
item_label
=
HttpTools
(
"POST"
).
nlp_request
(
title
),
shop_id
=
x
.
getOrElse
(
"shopId"
,
""
).
replaceAll
(
"\""
,
""
),
shop_name
=
x
.
getOrElse
(
"shopName"
,
""
).
replaceAll
(
"\""
,
""
),
shop_level
=
x
.
getOrElse
(
"shopLevel"
,
"0"
).
replaceAll
(
"\""
,
""
).
toDouble
,
description_level
=
x
.
getOrElse
(
"userEvaluateScore"
,
"0"
).
replaceAll
(
"\""
,
""
).
toDouble
,
express_level
=
x
.
getOrElse
(
"logisticsLvyueScore"
,
"0"
).
replaceAll
(
"\""
,
""
).
toDouble
,
server_level
=
x
.
getOrElse
(
"afterServiceScore"
,
"0"
).
replaceAll
(
"\""
,
""
).
toDouble
,
shop_type
=
x
.
getOrElse
(
"shopLabel"
,
"0"
).
replaceAll
(
"\""
,
""
).
toInt
,
createTime
=
System
.
currentTimeMillis
()
/
1000
)
}
}
src/main/scala/tools/es/ESClient.scala
0 → 100644
View file @
dac2e6a5
package
tools.es
import
org.apache.http.HttpHost
import
org.elasticsearch.client.sniff.Sniffer
import
org.elasticsearch.client.
{
RestClient
,
RestClientBuilder
,
RestHighLevelClient
}
import
tools.properties.PropertiesTools
import
java.io.IOException
import
java.util.Properties
import
scala.collection.mutable.ArrayBuffer
/**
* @ClassName: ESClient
* @Description: 连接ES客户端
* @Create by: LinYang
* @Date: 2020/12/17 10:34
*/
object
ESClient
{
private
[
this
]
val
properties
:
Properties
=
PropertiesTools
.
getProperties
//主机和端口
private
[
this
]
val
hostsAndPorts
:
Array
[
String
]
=
properties
.
getProperty
(
"es.cluster"
).
split
(
","
)
private
[
this
]
var
highLevelClient
:
RestHighLevelClient
=
_
private
[
this
]
var
sniffer
:
Sniffer
=
_
// 构建客户端
private
[
this
]
def
esClient
:
RestClientBuilder
=
{
val
httpHosts
:
ArrayBuffer
[
HttpHost
]
=
new
ArrayBuffer
[
HttpHost
]()
if
(
0
!=
hostsAndPorts
.
length
)
{
for
(
elem
<-
hostsAndPorts
)
{
val
hostAndPort
:
Array
[
String
]
=
elem
.
split
(
":"
)
val
host
=
hostAndPort
(
0
)
val
port
=
hostAndPort
(
1
).
trim
.
toInt
val
httpHost
=
new
HttpHost
(
host
,
port
)
httpHosts
.+=(
httpHost
)
}
}
val
restClientBuilder
=
RestClient
.
builder
(
httpHosts
.
toArray
:
_
*
)
.
setFailureListener
(
new
RestClient
.
FailureListener
()
{
def
onFailure
(
node
:
Nothing
)
:
Unit
=
{
super
.
onFailure
(
node
)
}
})
// .setRequestConfigCallback((requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder.setSocketTimeout(10000))
restClientBuilder
}
/**
* 获取客户端
*
* @return 返回ES官方推荐的客户端
*/
def
getClient
:
RestHighLevelClient
=
{
highLevelClient
=
new
RestHighLevelClient
(
esClient
)
val
lowLevelClient
=
highLevelClient
.
getLowLevelClient
sniffer
=
Sniffer
.
builder
(
lowLevelClient
)
.
setSniffAfterFailureDelayMillis
(
10000
)
.
build
()
highLevelClient
}
/**
* 关闭客户端连接
*/
def
closeClient
()
:
Unit
=
{
if
(
null
!=
highLevelClient
)
{
try
{
sniffer
.
close
()
highLevelClient
.
close
()
}
catch
{
case
e
:
IOException
=>
e
.
printStackTrace
()
}
}
}
}
src/main/scala/tools/es/ESUtils.scala
0 → 100644
View file @
dac2e6a5
package
tools.es
import
org.elasticsearch.action.delete.
{
DeleteRequest
,
DeleteResponse
}
import
org.elasticsearch.action.get.
{
GetRequest
,
GetResponse
}
import
org.elasticsearch.action.index.
{
IndexRequest
,
IndexResponse
}
import
org.elasticsearch.action.search.SearchRequest
import
org.elasticsearch.action.update.
{
UpdateRequest
,
UpdateResponse
}
import
org.elasticsearch.client.indices.GetIndexRequest
import
org.elasticsearch.client.
{
RequestOptions
,
RestHighLevelClient
}
import
org.elasticsearch.common.unit.
{
Fuzziness
,
TimeValue
}
import
org.elasticsearch.common.xcontent.XContentType
import
org.elasticsearch.index.query.QueryBuilders
import
org.elasticsearch.search.builder.SearchSourceBuilder
import
org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder
import
org.elasticsearch.search.sort.SortOrder
import
org.elasticsearch.search.
{
SearchHit
,
SearchHits
}
import
tools.properties.PropertiesTools
import
java.io.IOException
import
java.util.Properties
import
java.util.concurrent.TimeUnit
/**
* @ClassName: ESUtil
* @Description: ES的工具类,实现ES的增删查改
* @Create by: LinYang
* @Date: 2020/12/17 15:27
*/
object
ESUtils
{
private
[
this
]
val
sourceBuilder
=
new
SearchSourceBuilder
private
[
this
]
var
searchRequest
:
SearchRequest
=
_
private
[
this
]
val
client
:
RestHighLevelClient
=
ESClient
.
getClient
private
[
this
]
val
properties
:
Properties
=
PropertiesTools
.
getProperties
private
[
this
]
val
indexName
:
String
=
properties
.
getProperty
(
"es.indices"
)
//增
/**
* 添加数据到ES,指定索引名和id
*
* @param id id
* @param content 要添加的内容,格式为json
*/
def
addOne
(
id
:
String
,
content
:
String
)
:
String
=
{
val
indexRequest
=
new
IndexRequest
(
indexName
)
indexRequest
.
id
(
id
)
indexRequest
.
source
(
content
,
XContentType
.
JSON
)
var
response
:
IndexResponse
=
null
try
{
response
=
client
.
index
(
indexRequest
,
RequestOptions
.
DEFAULT
)
}
catch
{
case
e
:
IOException
=>
e
.
printStackTrace
()
}
response
.
getResult
.
toString
}
//删
/**
* 删除索引
*
* @return 是否删除成功
*/
def
deleteIndex
()
:
String
=
{
val
deleteRequest
=
new
DeleteRequest
(
indexName
)
val
response
=
client
.
delete
(
deleteRequest
,
RequestOptions
.
DEFAULT
)
response
.
getResult
.
toString
}
/**
* 删除指定的数据
*
* @param id 数据id
* @return 删除结果
*/
def
deleteDoc
(
id
:
String
)
:
String
=
{
val
request
=
new
DeleteRequest
(
indexName
,
id
)
val
response
:
DeleteResponse
=
client
.
delete
(
request
,
RequestOptions
.
DEFAULT
)
response
.
getResult
.
toString
}
//改
/**
* 修改指定文件的数据
*
* @param id 指定id
* @param content 修改的内容
* @return 返回修改结果
*/
def
updateIndex
(
id
:
String
,
content
:
String
,
routing
:
String
)
:
String
=
{
val
request
=
new
UpdateRequest
(
indexName
,
id
).
doc
(
content
,
XContentType
.
JSON
).
routing
(
routing
)
var
response
:
UpdateResponse
=
null
try
{
response
=
client
.
update
(
request
,
RequestOptions
.
DEFAULT
)
response
.
getResult
.
toString
}
catch
{
case
exception
:
Exception
=>
""
}
}
//查
/**
* 查询指定的索引是否存在
*
* @return 返回是否存在
*/
def
indexExists
()
:
Boolean
=
{
var
exits
:
Boolean
=
false
try
{
val
request
:
GetIndexRequest
=
new
GetIndexRequest
(
indexName
)
exits
=
client
.
indices
.
exists
(
request
,
RequestOptions
.
DEFAULT
)
}
catch
{
case
e
:
Exception
=>
e
.
printStackTrace
()
}
exits
}
/**
* 通过id获取一个文档
*
* @param id id
* @return 搜索结果
*/
def
getById
(
id
:
String
)
:
GetResponse
=
{
val
getRequest
=
new
GetRequest
(
indexName
,
id
)
client
.
get
(
getRequest
,
RequestOptions
.
DEFAULT
)
}
/**
* 无条件查询指定索引
*
* @param size 取出多少条数据
* @return 返回长度为size的搜索结果数组
*/
def
searchAll
(
size
:
Int
)
:
Array
[
SearchHit
]
=
{
if
(!
indexExists
)
{
return
null
}
searchRequest
=
new
SearchRequest
(
indexName
)
//构建查询
val
queryBuilder
=
QueryBuilders
.
matchAllQuery
()
//查询
sourceBuilder
.
query
(
queryBuilder
)
.
timeout
(
new
TimeValue
(
60
,
TimeUnit
.
SECONDS
))
.
size
(
size
)
searchRequest
.
source
(
sourceBuilder
)
//搜索
val
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
)
response
.
getHits
.
getHits
}
/**
* 根据字段精准匹配 term
*
* @param fieldName 字段名
* @param text 匹配的值
* @param size 取出的条数
* @return 返回搜索结果,长度为size的结果数组
*/
def
termMatching
(
fieldName
:
String
,
text
:
AnyRef
)
:
SearchHits
=
{
if
(!
indexExists
)
{
return
null
}
searchRequest
=
new
SearchRequest
(
indexName
)
val
termQueryBuilder
=
QueryBuilders
.
termQuery
(
fieldName
,
text
)
sourceBuilder
.
query
(
termQueryBuilder
)
searchRequest
.
source
(
sourceBuilder
)
val
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
)
response
.
getHits
}
/**
* 根据指定索引字段模糊匹配
*
* @param fieldName 字段名字
* @param text 匹配文本
* @param size 取出的长度
* @param sortField 排序字段
* @param sortOrder 排序方式
* 升序:SortOrder.ASC,
* 降序:SortOrder.DESC
* @return 返回size长度的结果数组
*/
def
fuzzyMatching
(
fieldName
:
String
,
text
:
AnyRef
,
size
:
Int
,
sortField
:
String
,
sortOrder
:
SortOrder
)
:
Array
[
SearchHit
]
=
{
if
(!
indexExists
)
{
return
null
}
searchRequest
=
new
SearchRequest
(
indexName
)
val
matchQueryBuilder
=
QueryBuilders
.
matchQuery
(
fieldName
,
text
)
.
fuzziness
(
Fuzziness
.
AUTO
)
.
prefixLength
(
3
)
.
maxExpansions
(
10
)
sourceBuilder
.
query
(
matchQueryBuilder
)
.
timeout
(
new
TimeValue
(
60
,
TimeUnit
.
SECONDS
))
.
size
(
size
)
searchRequest
.
source
(
sourceBuilder
)
val
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
)
response
.
getHits
.
getHits
}
/**
* 根据指定的多字段进行模糊匹配
*
* @param size 取出的长度
* @param text 匹配的文本
* @param fieldNames 字段名
* @return 返回结果的数组
*/
def
multiFuzzyMatching
(
size
:
Int
,
text
:
Any
,
fieldNames
:
String*
)
:
SearchHits
=
{
// 多字段匹配
if
(!
indexExists
)
{
return
null
}
searchRequest
=
new
SearchRequest
(
indexName
)
val
multiMatchQueryBuilder
=
QueryBuilders
.
multiMatchQuery
(
text
,
fieldNames
:
_
*
)
.
fuzziness
(
Fuzziness
.
AUTO
)
.
maxExpansions
(
10
)
.
prefixLength
(
3
)
sourceBuilder
.
query
(
multiMatchQueryBuilder
).
size
(
size
)
searchRequest
.
source
(
sourceBuilder
)
val
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
)
response
.
getHits
}
//复合查询
def
boolQuery
(
source
:
String
,
name
:
String
,
price
:
Double
,
value
:
Any
,
size
:
Int
)
:
SearchHits
=
{
searchRequest
=
new
SearchRequest
(
indexName
)
val
boolQueryBuilder
=
QueryBuilders
.
boolQuery
//精准查询source字段的值,filedName写‘source’,source写‘淘宝’、‘拼多多’……
.
must
(
QueryBuilders
.
matchQuery
(
"source"
,
source
))
//
.
must
(
QueryBuilders
.
rangeQuery
(
"price"
).
gte
(
price
*
0.8
))
//匹配item_name字段的值
.
should
(
QueryBuilders
.
matchQuery
(
name
,
value
))
sourceBuilder
.
query
(
boolQueryBuilder
).
size
(
size
)
searchRequest
.
source
(
sourceBuilder
)
val
highlightBuilder
=
new
HighlightBuilder
highlightBuilder
.
field
(
"item_name"
,
10
)
.
preTags
(
"<"
)
.
postTags
(
">"
)
sourceBuilder
.
highlighter
(
highlightBuilder
)
val
response
=
client
.
search
(
searchRequest
,
RequestOptions
.
DEFAULT
)
response
.
getHits
}
}
src/main/scala/tools/es/SaveData.scala
0 → 100644
View file @
dac2e6a5
package
tools.es
import
model.Item
import
tools.base64.Base64Tools
import
tools.json.Jackson
/**
* Created with IntelliJ IDEA.
* Class: SaveData
* Description:
* User: lin
* Date: 2021-06-04
* Time: 15:04
*/
object
SaveData
{
def
saveToES
(
x
:
Item
)
:
String
=
{
// 判断ES是否重复
val
item_id
=
x
.
item_id
val
id
=
Base64Tools
.
encode
(
"jd"
+
item_id
)
val
exists
=
ESUtils
.
getById
(
id
).
isExists
if
(
exists
)
{
"商品已存在"
}
// 存入ES
ESUtils
.
addOne
(
id
,
Jackson
.
parseBeanToString
(
x
))
}
}
src/main/scala/tools/es/create_es.json
0 → 100644
View file @
dac2e6a5
{
"settings"
:
{
"number_of_shards"
:
1
,
"number_of_replicas"
:
1
,
"analysis"
:
{
"analyzer"
:
{
"label_analyzer"
:
{
"type"
:
"pattern"
,
"pattern"
:
","
},
"category_analyzer"
:
{
"type"
:
"pattern"
,
"pattern"
:
"/"
}
}
}
},
"mappings"
:
{
"properties"
:
{
"platform"
:
{
"type"
:
"integer"
},
"item_id"
:
{
"type"
:
"keyword"
},
"item_name"
:
{
"search_analyzer"
:
"ik_smart"
,
"similarity"
:
"BM25"
,
"analyzer"
:
"ik_max_word"
,
"type"
:
"text"
},
"item_desc"
:
{
"search_analyzer"
:
"ik_smart"
,
"similarity"
:
"BM25"
,
"analyzer"
:
"ik_max_word"
,
"type"
:
"text"
},
"brand_name"
:
{
"type"
:
"keyword"
},
"item_url"
:
{
"type"
:
"keyword"
},
"item_pic_url"
:
{
"type"
:
"keyword"
},
"item_volume"
:
{
"type"
:
"integer"
},
"origin_category"
:
{
"type"
:
"text"
,
"analyzer"
:
"category_analyzer"
,
"search_analyzer"
:
"category_analyzer"
,
"similarity"
:
"BM25"
},
"new_category"
:
{
"type"
:
"text"
,
"analyzer"
:
"category_analyzer"
,
"search_analyzer"
:
"category_analyzer"
,
"similarity"
:
"BM25"
},
"price"
:
{
"type"
:
"double"
},
"coupon_price"
:
{
"type"
:
"double"
},
"commission_rate"
:
{
"type"
:
"double"
},
"coupon_commission"
:
{
"type"
:
"double"
},
"commission_start_time"
:
{
"type"
:
"long"
},
"commission_end_time"
:
{
"type"
:
"long"
},
"coupon_amount"
:
{
"type"
:
"double"
},
"coupon_tips"
:
{
"type"
:
"keyword"
},
"coupon_count"
:
{
"type"
:
"integer"
},
"coupon_remain"
:
{
"type"
:
"integer"
},
"coupon_start_time"
:
{
"type"
:
"long"
},
"coupon_end_time"
:
{
"type"
:
"long"
},
"coupon_link"
:
{
"type"
:
"keyword"
},
"item_label"
:
{
"type"
:
"text"
,
"analyzer"
:
"label_analyzer"
,
"search_analyzer"
:
"label_analyzer"
,
"similarity"
:
"BM25"
},
"shop_id"
:
{
"type"
:
"keyword"
},
"shop_name"
:
{
"type"
:
"keyword"
},
"shop_level"
:
{
"type"
:
"integer"
},
"description_level"
:
{
"type"
:
"double"
},
"express_level"
:
{
"type"
:
"double"
},
"server_level"
:
{
"type"
:
"double"
},
"shop_type"
:
{
"type"
:
"integer"
},
"createTime"
:
{
"type"
:
"long"
}
}
}
}
\ No newline at end of file
src/main/scala/tools/es/es.md
0 → 100644
View file @
dac2e6a5
## 建库语句
~~~
shell
put /goods_v2
{
"settings"
:
{
"number_of_shards"
: 1,
"number_of_replicas"
: 1,
"analysis"
:
{
"analyzer"
:
{
"label_analyzer"
:
{
"type"
:
"pattern"
,
"pattern"
:
","
}
,
"category_analyzer"
:
{
"type"
:
"pattern"
,
"pattern"
:
"
\\
"
}
}
}
}
,
"mappings"
:
{
"properties"
:
{
"platform"
:
{
"type"
:
"integer"
}
,
"item_id"
:
{
"type"
:
"keyword"
}
,
"item_name"
:
{
"search_analyzer"
:
"ik_smart"
,
"similarity"
:
"BM25"
,
"analyzer"
:
"ik_max_word"
,
"type"
:
"text"
}
,
"item_desc"
:
{
"search_analyzer"
:
"ik_smart"
,
"similarity"
:
"BM25"
,
"analyzer"
:
"ik_max_word"
,
"type"
:
"text"
}
,
"brand_name"
:
{
"type"
:
"keyword"
}
,
"item_url"
:
{
"type"
:
"keyword"
}
,
"item_pic_url"
:
{
"type"
:
"keyword"
}
,
"item_volume"
:
{
"type"
:
"integer"
}
,
"origin_category"
:
{
"type"
:
"text"
,
"analyzer"
:
"category_analyzer"
,
"search_analyzer"
:
"category_analyzer"
,
"similarity"
:
"BM25"
}
,
"new_category"
:
{
"type"
:
"text"
,
"analyzer"
:
"category_analyzer"
,
"search_analyzer"
:
"category_analyzer"
,
"similarity"
:
"BM25"
}
,
"price"
:
{
"type"
:
"double"
}
,
"coupon_price"
:
{
"type"
:
"double"
}
,
"commission_rate"
:
{
"type"
:
"double"
}
,
"coupon_commission"
:
{
"type"
:
"double"
}
,
"commission_start_time"
:
{
"type"
:
"long"
}
,
"commission_end_time"
:
{
"type"
:
"long"
}
,
"coupon_amount"
:
{
"type"
:
"double"
}
,
"coupon_tips"
:
{
"type"
:
"keyword"
}
,
"coupon_count"
:
{
"type"
:
"integer"
}
,
"coupon_remain"
:
{
"type"
:
"integer"
}
,
"coupon_start_time"
:
{
"type"
:
"long"
}
,
"coupon_end_time"
:
{
"type"
:
"long"
}
,
"coupon_link"
:
{
"type"
:
"keyword"
}
,
"item_label"
:
{
"type"
:
"text"
,
"analyzer"
:
"label_analyzer"
,
"search_analyzer"
:
"label_analyzer"
,
"similarity"
:
"BM25"
}
,
"shop_id"
:
{
"type"
:
"keyword"
}
,
"shop_name"
:
{
"type"
:
"keyword"
}
,
"shop_level"
:
{
"type"
:
"integer"
}
,
"description_level"
:
{
"type"
:
"double"
}
,
"express_level"
:
{
"type"
:
"double"
}
,
"server_level"
:
{
"type"
:
"double"
}
,
"shop_type"
:
{
"type"
:
"integer"
}
,
"createTime"
:
{
"type"
:
"long"
}
}
}
}
~~~
~~~
shell
curl qmbigdata01:9200/goods?pretty
\
-X PUT
\
-H "Content-Type:application/json"
\
-d '{"settings":{"number_of_shards":1,"number_of_replicas":1,"analysis":{"analyzer":{"label_analyzer":{"type":"pattern","pattern":","},"category_analyzer":{"type":"pattern","pattern":"/"}}}},"mappings":{"properties":{"platform":{"type":"integer"},"item_id":{"type":"keyword"},"item_name":{"search_analyzer":"ik_smart","similarity":"BM25","analyzer":"ik_max_word","type":"text"},"item_desc":{"search_analyzer":"ik_smart","similarity":"BM25","analyzer":"ik_max_word","type":"text"},"brand_name":{"type":"keyword"},"item_url":{"type":"keyword"},"item_pic_url":{"type":"keyword"},"item_volume":{"type":"integer"},"origin_category":{"type":"text","analyzer":"category_analyzer","search_analyzer":"category_analyzer","similarity":"BM25"},"new_category":{"type":"text","analyzer":"category_analyzer","search_analyzer":"category_analyzer","similarity":"BM25"},"price":{"type":"double"},"coupon_price":{"type":"double"},"commission_rate":{"type":"double"},"coupon_commission":{"type":"double"},"commission_start_time":{"type":"long"},"commission_end_time":{"type":"long"},"coupon_amount":{"type":"double"},"coupon_tips":{"type":"keyword"},"coupon_count":{"type":"integer"},"coupon_remain":{"type":"integer"},"coupon_start_time":{"type":"long"},"coupon_end_time":{"type":"long"},"coupon_link":{"type":"keyword"},"item_label":{"type":"text","analyzer":"label_analyzer","search_analyzer":"label_analyzer","similarity":"BM25"},"shop_id":{"type":"keyword"},"shop_name":{"type":"keyword"},"shop_level":{"type":"integer"},"description_level":{"type":"double"},"express_level":{"type":"double"},"server_level":{"type":"double"},"shop_type":{"type":"integer"},"createTime":{"type":"long"}}}}'
~~~
\ No newline at end of file
src/main/scala/tools/http/HttpTools.scala
0 → 100644
View file @
dac2e6a5
package
tools.http
import
java.util.Properties
import
com.alibaba.fastjson.JSON
import
org.apache.http.client.methods.
{
HttpGet
,
HttpPost
,
HttpRequestBase
}
import
org.apache.http.entity.StringEntity
import
org.apache.http.impl.client.
{
CloseableHttpClient
,
HttpClients
}
import
org.apache.http.util.EntityUtils
import
tools.json.Jackson
import
tools.properties.PropertiesTools
import
tools.unicode.UnicodeUtils
import
scala.collection.mutable
/**
* @ClassName: qm.tools.http
* @Description: 请求网络工具
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 11:25
*/
object
HttpTools
{
private
[
this
]
val
properties
:
Properties
=
PropertiesTools
.
getProperties
private
[
this
]
val
url
=
properties
.
getProperty
(
"nlp.interface.url"
)
def
apply
(
method
:
String
)
:
HttpTools
=
new
HttpTools
(
method
,
this
.
url
)
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
str
=
apply
(
"POST"
).
nlp_request
(
"男钱包男钱夹男短款票夹驾驶证钱夹手拿多用票夹横款钱包"
)
println
(
str
)
}
}
class
HttpTools
(
method
:
String
,
url
:
String
,
header
:
String
=
"""
|{"Content-Type":"application/json"}
|"""
.
stripMargin
)
{
// 创建请求客户端
private
[
this
]
val
httpClient
:
CloseableHttpClient
=
HttpClients
.
createDefault
()
// 创建基本请求
private
[
this
]
var
httpRequest
:
HttpRequestBase
=
_
// 判断请求的方式
if
(
method
==
"GET"
)
{
httpRequest
=
new
HttpGet
(
url
)
}
else
if
(
method
==
"POST"
)
{
httpRequest
=
new
HttpPost
(
url
)
}
// 定义请求头
if
(
header
!=
null
)
{
val
json
=
JSON
.
parseObject
(
header
)
json
.
keySet
().
toArray
.
map
(
_
.
toString
).
foreach
(
key
=>
httpRequest
.
setHeader
(
key
,
json
.
getString
(
key
)))
}
/**
* 请求分词工具方法
*
* @param params 要分词的标题
* @return 分词结果
*/
def
nlp_request
(
params
:
String
)
:
String
=
{
if
(
params
!=
null
)
{
val
body
:
mutable.Map
[
String
,
String
]
=
mutable
.
Map
()
body
+=
(
"text"
->
params
)
val
str
=
Jackson
.
parseBeanToString
(
body
)
httpRequest
.
asInstanceOf
[
HttpPost
].
setEntity
(
new
StringEntity
(
str
,
"utf-8"
))
}
val
response
=
httpClient
.
execute
(
httpRequest
)
val
str
=
EntityUtils
.
toString
(
response
.
getEntity
,
"utf-8"
)
val
map
=
Jackson
.
parseFirstKey
(
UnicodeUtils
.
decode
(
str
))
val
code
=
map
.
getOrElse
(
"code"
,
"0"
).
toInt
val
msg
=
map
.
getOrElse
(
"msg"
,
""
).
replaceAll
(
"\""
,
""
)
if
(
code
==
200
&&
msg
.
equals
(
"success"
))
{
map
.
getOrElse
(
"data"
,
""
).
replaceAll
(
"\""
,
""
)
}
else
""
}
}
src/main/scala/tools/json/Jackson.scala
0 → 100644
View file @
dac2e6a5
package
tools.json
import
com.fasterxml.jackson.databind.ObjectMapper
import
com.fasterxml.jackson.module.scala.DefaultScalaModule
import
scala.collection.mutable
import
scala.util.
{
Failure
,
Success
,
Try
}
/**
* @ClassName: qm.tools.json
* @Description: json转换类
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 13:24
*/
object
Jackson
{
private
val
mapper
=
new
ObjectMapper
()
mapper
.
registerModule
(
DefaultScalaModule
)
/**
* 解析一级key
*
* @param str 要解析的字符串
* @return 解析结果
*/
def
parseFirstKey
(
str
:
String
)
:
mutable.Map
[
String
,
String
]
=
{
val
map
:
mutable.Map
[
String
,
String
]
=
mutable
.
Map
()
Try
{
// 获取所有节点
val
node
=
mapper
.
readTree
(
str
)
// 获取所有key
val
keys
=
node
.
fieldNames
()
// 循环所有节点
while
(
keys
.
hasNext
)
{
val
next
=
keys
.
next
()
// 解析一级key
map
+=
(
next
->
node
.
get
(
next
).
toString
)
}
}
match
{
case
Success
(
value
)
=>
map
case
Failure
(
exception
)
=>
map
}
}
/**
* 解析所有key
* 递归操作
*
* @param str 要解析的字符串
* @param res 解析结果map
* @return 返回res
*/
def
parseAllKey
(
str
:
String
,
res
:
mutable.Map
[
String
,
String
])
:
mutable.Map
[
String
,
String
]
=
{
Try
{
// 获取所有节点
val
node
=
mapper
.
readTree
(
str
)
// 获取节点所有key
val
keys
=
node
.
fieldNames
()
// 循环读取所有key
while
(
keys
.
hasNext
)
{
val
key
=
keys
.
next
()
val
chNode
=
node
.
get
(
key
)
// 如果子节点是json对象,递归解析子节点
if
(
chNode
.
isObject
)
{
parseAllKey
(
chNode
.
toString
,
res
)
}
else
{
res
+=
(
key
->
chNode
.
toString
)
}
}
}
match
{
// 解析成功
case
Success
(
value
)
=>
res
// 解析失败
case
Failure
(
exception
)
=>
res
}
}
/**
* 对象转json
*
* @param bean 要转的对象
* @return json结果
*/
def
parseBeanToString
(
bean
:
Any
)
:
String
=
{
mapper
.
writeValueAsString
(
bean
)
}
/**
* json转数组
*
* @param str json数组
* @return scala数组
*/
def
parseStringToArray
(
str
:
String
)
:
Array
[
Any
]
=
{
var
array
=
Array
[
Any
]()
Try
{
array
=
mapper
.
readValue
(
str
,
classOf
[
Array
[
Any
]])
}
match
{
case
Success
(
value
)
=>
array
case
Failure
(
exception
)
=>
array
}
}
def
main
(
args
:
Array
[
String
])
:
Unit
=
{
val
str
=
"""
|{
| "name": "qmbigdata05",
| "cluster_name": "ES-Cluster",
| "cluster_uuid": "QdDDUfHFRdyse9OmjPsHxA",
| "version": {
| "number": "7.10.0",
| "build_flavor": "default",
| "build_type": "tar",
| "build_hash": "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",
| "build_date": "2020-11-09T21:30:33.964949Z",
| "build_snapshot": false,
| "lucene_version": "8.7.0",
| "minimum_wire_compatibility_version": "6.8.0",
| "minimum_index_compatibility_version": "6.0.0-beta1"
| },
| "tagline": "You Know, for Search",
| "list":[{
| "add":123456
| }]
|}
|"""
.
stripMargin
val
map
:
mutable.Map
[
String
,
String
]
=
mutable
.
Map
()
parseAllKey
(
str
,
map
)
println
(
parseFirstKey
(
str
))
println
(
map
)
}
}
src/main/scala/tools/kafka/KafkaConsumer.scala
0 → 100644
View file @
dac2e6a5
package
tools.kafka
import
java.util.Properties
import
org.apache.flink.api.common.serialization.SimpleStringSchema
import
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import
org.apache.kafka.clients.consumer.ConsumerConfig
import
tools.properties.PropertiesTools
/**
* @ClassName: KafkaConsumer
* @Description: TODO
* @Create by: LinYoung
* @Date: 2021/1/13 14:54
*/
object
KafkaConsumer
{
private
[
this
]
val
properties
:
Properties
=
PropertiesTools
.
getProperties
private
[
this
]
val
topic
:
String
=
properties
.
getProperty
(
"kafka.topic"
)
private
[
this
]
val
bootstrapServer
:
String
=
properties
.
getProperty
(
"kafka.cluster"
)
private
[
this
]
val
group
:
String
=
properties
.
getProperty
(
"kafka.consumer.group"
)
private
[
this
]
val
keyDeserializer
:
String
=
properties
.
getProperty
(
"kafka.consumer.key.deserializer"
)
private
[
this
]
val
valueDeserializer
:
String
=
properties
.
getProperty
(
"kafka.consumer.value.deserializer"
)
private
[
this
]
val
commitInterval
:
String
=
properties
.
getProperty
(
"kafka.consumer.offset.commit.auto.interval"
)
private
[
this
]
val
AutoCommit
:
String
=
properties
.
getProperty
(
"kafka.consumer.offset.commit.auto"
)
def
getConsumer
:
FlinkKafkaConsumer
[
String
]
=
{
val
property
=
new
Properties
()
// 对接kafka
property
.
put
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
bootstrapServer
)
property
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
group
)
property
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
keyDeserializer
)
property
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
valueDeserializer
)
property
.
put
(
ConsumerConfig
.
AUTO_COMMIT_INTERVAL_MS_CONFIG
,
commitInterval
)
property
.
put
(
ConsumerConfig
.
ENABLE_AUTO_COMMIT_CONFIG
,
AutoCommit
)
new
FlinkKafkaConsumer
[
String
](
topic
,
new
SimpleStringSchema
(),
property
)
}
}
src/main/scala/tools/properties/PropertiesTools.scala
0 → 100644
View file @
dac2e6a5
package
tools.properties
import
java.util.Properties
/**
* @ClassName: qm.tools
* @Description: 获取属性工具
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 11:28
*/
object
PropertiesTools
{
private
[
this
]
val
properties
=
new
Properties
()
properties
.
load
(
this
.
getClass
.
getResourceAsStream
(
"/app.properties"
))
def
getProperties
:
Properties
=
properties
}
src/main/scala/tools/timestamp/TimestampTools.scala
0 → 100644
View file @
dac2e6a5
package
tools.timestamp
import
java.text.
{
ParsePosition
,
SimpleDateFormat
}
/**
* @ClassName: qm.qiaomeng.tools
* @Description: 时间戳转换工具
* @Author: LinYoung
* @Date: 2021/4/15
* @Time: 14:46
*/
object
TimestampTools
{
def
date2Timestamp
(
date
:
String
)
:
Long
=
{
if
(
date
.
isEmpty
)
{
0
}
else
{
new
SimpleDateFormat
(
"yyyy-MM-dd"
).
parse
(
date
,
new
ParsePosition
(
0
)).
getTime
}
}
}
src/main/scala/tools/unicode/UnicodeUtils.scala
0 → 100644
View file @
dac2e6a5
package
tools.unicode
import
java.util.regex.Pattern
/**
* @ClassName: qm.qiaomeng.jackson
* @Description:
* @Author:LinYoung
* @Date: 2021/3/26
* @Time: 13:09
*/
object
UnicodeUtils
{
/**
* 解码
* @param str 要解码的字符串
* @return 解码后的字符串
*/
def
decode
(
str
:
String
)
:
String
=
{
val
pattern
=
Pattern
.
compile
(
"(\\\\u(\\p{XDigit}{4}))"
)
val
matcher
=
pattern
.
matcher
(
str
)
var
unicode
=
str
while
(
matcher
.
find
())
{
val
ch
:
Char
=
Integer
.
parseInt
(
matcher
.
group
(
2
),
16
).
asInstanceOf
[
Char
]
unicode
=
unicode
.
replace
(
matcher
.
group
(
1
),
ch
+
""
)
}
unicode
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment