Commit c052db5f authored by 杨林's avatar 杨林

Merge branch 'yang' into 'master'

淘宝商品解析完成

See merge request !1
parents d2ea242a a4f13bf7
This diff is collapsed.
# kafka 主题
kafka.topic=tb_item_list
#kafka.topic = jdgoods
# kafka集群
kafka.cluster=qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092,qmbigdata04:9092,qmbigdata05:9092
#kafka.cluster=8.135.22.177:9092,8.135.53.75:9092,8.135.100.29:9092,8.135.58.206:9092,47.119.179.1:9092
# kafka消费者组
kafka.consumer.group=tb-goods-group
# kafka key 序列化
kafka.consumer.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka values 序列化
kafka.consumer.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# kafka偏移量
kafka.consumer.offset.commit.auto.interval=10000
kafka.consumer.offset.commit.auto=true
# 算法分词接口
nlp.interface.url=http://bigdata.xiaomanxiong.com/nlp
# es机器
es.cluster=qmbigdata01:9200,qmbigdata02:9200,qmbigdata03:9200,qmbigdata04:9200,qmbigdata05:9200
# 本地测试
#es.cluster=localhost:9200
# es索引
es.indices=goods_v2
\ No newline at end of file
package entry
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import tools.kafka.KafkaConsumer
import org.apache.flink.api.scala._
import tools.dataformat.TB
import tools.es.SaveData
import tools.json.Jackson
import scala.collection.mutable
/**
* Created with IntelliJ IDEA.
* Class: Entry
* Description: 程序入口
* User: lin
* Date: 2021-06-04
* Time: 13:46
*/
object Entry {
def main(args: Array[String]): Unit = {
// 配置环境
val environment = StreamExecutionEnvironment.getExecutionEnvironment
environment.enableCheckpointing(1000L)
// kafka源
val consumer = KafkaConsumer.getConsumer
consumer.setStartFromEarliest()
// 添加源
environment.addSource(consumer)
.map(x => {
val res = mutable.Map[String, String]()
val map = Jackson.parseAllKey(x, res)
map
})
.map(x => TB.formatTBData(x))
.map(x => SaveData.saveToES(x))
// .executeAndCollect()
// .foreach(println)
environment.execute("tb")
}
}
package model
import com.fasterxml.jackson.annotation.JsonIgnoreProperties
/**
* @ClassName: Item
* @Description: TODO
* @Create by: LinYoung
* @Date: 2021/1/22 14:32
*/
@JsonIgnoreProperties(ignoreUnknown = true)
case class Item(
platform: Int, //商品平台
item_id: String, //商品id
item_name: String, //商品名称
item_desc: String, //商品描述
brand_name: String, //品牌名称
item_url: String, //商品链接
item_pic_url: String, //商品图片链接
item_volume: String, //商品销量
origin_category: String, //商品原始类目
origin_category_id: String, //商品原始类目id
new_category: String, //商品类目
new_category_id: String, //商品类目id
price: Double, //商品原价
coupon_price: Double, //商品券后价
commission_rate: Double, //佣金比例
coupon_commission: Double, //券后佣金
commission_start_time: Long, //计划开始时间
commission_end_time: Long, //计划结束时间
coupon_amount: Double, //优惠券额度
coupon_tips: String, //优惠券满减信息
coupon_count: Int, //优惠券数量
coupon_remain: Int, //优惠券剩余量
coupon_start_time: Long, //优惠券开始时间
coupon_end_time: Long, //优惠券结束时间
coupon_link: String, //优惠券链接
item_label: String, //商品标签
shop_id: String, //店铺id
shop_name: String, //店铺名称
shop_level: Double, //店铺等级
description_level: Double, //描述分
express_level: Double, //物流分
server_level: Double, //服务分
shop_type: Int, //店铺类型
createTime: Long //创建时间
)
package tools.base64
import java.util.Base64
/**
* @ClassName: qm.tools
* @Description: base64工具
* @Author: LinYoung
* @Date: 2021/4/15
* @Time: 20:21
*/
object Base64Tools {
//Base64编码
private[this] val encoder: Base64.Encoder = Base64.getEncoder
def encode(str: String): String = {
encoder.encodeToString(str.getBytes)
}
}
package tools.dataformat
import model.Item
import tools.http.HttpTools
import tools.timestamp.TimestampTools
import scala.collection.mutable
/**
* Created with IntelliJ IDEA.
* Class: TB
* Description:
* User: lin
* Date: 2021-06-04
* Time: 14:18
*/
object TB {
/**
* 解析淘宝字段
*
* @param x 淘宝字段映射
* @return 返回格式化结果
*/
def formatTBData(x: mutable.Map[String, String]): Item = {
val price = x.getOrElse("zk_final_price", "0.0").replaceAll("\"", "").toDouble
val coupon_amount = x.getOrElse("coupon_amount", "0.0").replaceAll("\"", "").toDouble
val coupon_price = price - coupon_amount
val commission_rate = x.getOrElse("commission_rate", "0").replaceAll("\"", "").toInt / 10000.0
val title = x.getOrElse("title", "").replaceAll("\"", "")
Item(
platform = 1,
item_id = x.getOrElse("item_id", "").replaceAll("\"", ""),
item_name = title,
item_desc = x.getOrElse("item_description", "").replaceAll("\"", ""),
brand_name = "",
item_url = x.getOrElse("item_url", "").replaceAll("\"", ""),
item_pic_url = if (x.getOrElse("white_image", "").replaceAll("\"", "").isEmpty) x.getOrElse("pict_url", "").replaceAll("\"", "") else x.getOrElse("white_image", "").replaceAll("\"", ""),
item_volume = x.getOrElse("volume", "").replaceAll("\"", ""),
origin_category = x.getOrElse("level_one_category_name", "").replaceAll("\"", "") + "/" + x.getOrElse("category_name", "").replaceAll("\"", ""),
origin_category_id = x.getOrElse("level_one_category_id", "").replaceAll("\"", "") + "/" + x.getOrElse("category_id", "").replaceAll("\"", ""),
new_category = "",
new_category_id = "",
price = price,
coupon_price = coupon_price,
commission_rate = commission_rate,
coupon_commission = coupon_price,
commission_start_time = 0,
commission_end_time = 0,
coupon_amount = coupon_amount,
coupon_tips = x.getOrElse("coupon_info", "").replaceAll("\"", ""),
coupon_count = x.getOrElse("coupon_total_count", "0").replaceAll("\"", "").toInt,
coupon_remain = x.getOrElse("coupon_remain_count", "0").replaceAll("\"", "").toInt,
coupon_start_time = TimestampTools.date2Timestamp(x.getOrElse("coupon_start_time", "").replaceAll("\"", "")),
coupon_end_time = TimestampTools.date2Timestamp(x.getOrElse("coupon_end_time", "").replaceAll("\"", "")),
coupon_link = x.getOrElse("coupon_share_url", "").replaceAll("\"", ""),
item_label = HttpTools("POST").nlp_request(title),
shop_id = x.getOrElse("seller_id", "").replaceAll("\"", ""),
shop_name = x.getOrElse("shop_title", "").replaceAll("\"", ""),
shop_level = 0,
description_level = 0,
express_level = 0,
server_level = 0,
shop_type = 0,
createTime = System.currentTimeMillis() / 1000
)
}
}
package tools.es
import org.apache.http.HttpHost
import org.elasticsearch.client.sniff.Sniffer
import org.elasticsearch.client.{RestClient, RestClientBuilder, RestHighLevelClient}
import tools.properties.PropertiesTools
import java.io.IOException
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
/**
* @ClassName: ESClient
* @Description: 连接ES客户端
* @Create by: LinYang
* @Date: 2020/12/17 10:34
*/
object ESClient {
private[this] val properties: Properties = PropertiesTools.getProperties
//主机和端口
private[this] val hostsAndPorts: Array[String] = properties.getProperty("es.cluster").split(",")
private[this] var highLevelClient: RestHighLevelClient = _
private[this] var sniffer: Sniffer = _
// 构建客户端
private[this] def esClient: RestClientBuilder = {
val httpHosts: ArrayBuffer[HttpHost] = new ArrayBuffer[HttpHost]()
if (0 != hostsAndPorts.length) {
for (elem <- hostsAndPorts) {
val hostAndPort: Array[String] = elem.split(":")
val host = hostAndPort(0)
val port = hostAndPort(1).trim.toInt
val httpHost = new HttpHost(host, port)
httpHosts.+=(httpHost)
}
}
val restClientBuilder = RestClient.builder(httpHosts.toArray: _*)
.setFailureListener(new RestClient.FailureListener() {
def onFailure(node: Nothing): Unit = {
super.onFailure(node)
}
})
// .setRequestConfigCallback((requestConfigBuilder: RequestConfig.Builder) => requestConfigBuilder.setSocketTimeout(10000))
restClientBuilder
}
/**
* 获取客户端
*
* @return 返回ES官方推荐的客户端
*/
def getClient: RestHighLevelClient = {
highLevelClient = new RestHighLevelClient(esClient)
val lowLevelClient = highLevelClient.getLowLevelClient
sniffer = Sniffer.builder(lowLevelClient)
.setSniffAfterFailureDelayMillis(10000)
.build()
highLevelClient
}
/**
* 关闭客户端连接
*/
def closeClient(): Unit = {
if (null != highLevelClient) {
try {
sniffer.close()
highLevelClient.close()
} catch {
case e: IOException => e.printStackTrace()
}
}
}
}
package tools.es
import org.elasticsearch.action.delete.{DeleteRequest, DeleteResponse}
import org.elasticsearch.action.get.{GetRequest, GetResponse}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.update.{UpdateRequest, UpdateResponse}
import org.elasticsearch.client.indices.GetIndexRequest
import org.elasticsearch.client.{RequestOptions, RestHighLevelClient}
import org.elasticsearch.common.unit.{Fuzziness, TimeValue}
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder
import org.elasticsearch.search.sort.SortOrder
import org.elasticsearch.search.{SearchHit, SearchHits}
import tools.properties.PropertiesTools
import java.io.IOException
import java.util.Properties
import java.util.concurrent.TimeUnit
object ESUtils {
private[this] val sourceBuilder = new SearchSourceBuilder
private[this] var searchRequest: SearchRequest = _
private[this] val client: RestHighLevelClient = ESClient.getClient
private[this] val properties: Properties = PropertiesTools.getProperties
private[this] val indexName: String = properties.getProperty("es.indices")
//增
/**
* 添加数据到ES,指定索引名和id
*
* @param id id
* @param content 要添加的内容,格式为json
*/
def addOne(id: String,
content: String): String = {
val indexRequest = new IndexRequest(indexName)
indexRequest.id(id)
indexRequest.source(content, XContentType.JSON)
var response: IndexResponse = null
try {
response = client.index(indexRequest, RequestOptions.DEFAULT)
} catch {
case e: IOException => e.printStackTrace()
}
response.getResult.toString
}
//删
/**
* 删除索引
*
* @return 是否删除成功
*/
def deleteIndex(): String = {
val deleteRequest = new DeleteRequest(indexName)
val response = client.delete(deleteRequest, RequestOptions.DEFAULT)
response.getResult.toString
}
/**
* 删除指定的数据
*
* @param id 数据id
* @return 删除结果
*/
def deleteDoc(id: String): String = {
val request = new DeleteRequest(indexName, id)
val response: DeleteResponse = client.delete(request, RequestOptions.DEFAULT)
response.getResult.toString
}
//改
/**
* 修改指定文件的数据
*
* @param id 指定id
* @param content 修改的内容
* @return 返回修改结果
*/
def updateIndex(id: String, content: String, routing: String): String = {
val request = new UpdateRequest(indexName, id).doc(content, XContentType.JSON).routing(routing)
var response: UpdateResponse = null
try {
response = client.update(request, RequestOptions.DEFAULT)
response.getResult.toString
} catch {
case exception: Exception => ""
}
}
//查
/**
* 查询指定的索引是否存在
*
* @return 返回是否存在
*/
def indexExists(): Boolean = {
var exits: Boolean = false
try {
val request: GetIndexRequest = new GetIndexRequest(indexName)
exits = client.indices.exists(request, RequestOptions.DEFAULT)
} catch {
case e: Exception => e.printStackTrace()
}
exits
}
/**
* 通过id获取一个文档
*
* @param id id
* @return 搜索结果
*/
def getById(id: String): GetResponse = {
val getRequest = new GetRequest(indexName, id)
client.get(getRequest, RequestOptions.DEFAULT)
}
/**
* 无条件查询指定索引
*
* @param size 取出多少条数据
* @return 返回长度为size的搜索结果数组
*/
def searchAll(size: Int): Array[SearchHit] = {
if (!indexExists) {
return null
}
searchRequest = new SearchRequest(indexName)
//构建查询
val queryBuilder = QueryBuilders.matchAllQuery()
//查询
sourceBuilder
.query(queryBuilder)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.size(size)
searchRequest.source(sourceBuilder)
//搜索
val response = client.search(searchRequest, RequestOptions.DEFAULT)
response.getHits.getHits
}
/**
* 根据字段精准匹配 term
*
* @param fieldName 字段名
* @param text 匹配的值
* @param size 取出的条数
* @return 返回搜索结果,长度为size的结果数组
*/
def termMatching(fieldName: String, text: AnyRef): SearchHits = {
if (!indexExists) {
return null
}
searchRequest = new SearchRequest(indexName)
val termQueryBuilder = QueryBuilders.termQuery(fieldName, text)
sourceBuilder.query(termQueryBuilder)
searchRequest.source(sourceBuilder)
val response = client.search(searchRequest, RequestOptions.DEFAULT)
response.getHits
}
/**
* 根据指定索引字段模糊匹配
*
* @param fieldName 字段名字
* @param text 匹配文本
* @param size 取出的长度
* @param sortField 排序字段
* @param sortOrder 排序方式
* 升序:SortOrder.ASC,
* 降序:SortOrder.DESC
* @return 返回size长度的结果数组
*/
def fuzzyMatching(fieldName: String, text: AnyRef, size: Int, sortField: String, sortOrder: SortOrder): Array[SearchHit] = {
if (!indexExists) {
return null
}
searchRequest = new SearchRequest(indexName)
val matchQueryBuilder = QueryBuilders.matchQuery(fieldName, text)
.fuzziness(Fuzziness.AUTO)
.prefixLength(3)
.maxExpansions(10)
sourceBuilder.query(matchQueryBuilder)
.timeout(new TimeValue(60, TimeUnit.SECONDS))
.size(size)
searchRequest.source(sourceBuilder)
val response = client.search(searchRequest, RequestOptions.DEFAULT)
response.getHits.getHits
}
/**
* 根据指定的多字段进行模糊匹配
*
* @param size 取出的长度
* @param text 匹配的文本
* @param fieldNames 字段名
* @return 返回结果的数组
*/
def multiFuzzyMatching(size: Int, text: Any, fieldNames: String*): SearchHits = {
// 多字段匹配
if (!indexExists) {
return null
}
searchRequest = new SearchRequest(indexName)
val multiMatchQueryBuilder = QueryBuilders.multiMatchQuery(text, fieldNames: _*)
.fuzziness(Fuzziness.AUTO)
.maxExpansions(10)
.prefixLength(3)
sourceBuilder.query(multiMatchQueryBuilder).size(size)
searchRequest.source(sourceBuilder)
val response = client.search(searchRequest, RequestOptions.DEFAULT)
response.getHits
}
//复合查询
def boolQuery(source: String,
name: String,
price: Double,
value: Any,
size: Int): SearchHits = {
searchRequest = new SearchRequest(indexName)
val boolQueryBuilder = QueryBuilders.boolQuery
//精准查询source字段的值,filedName写‘source’,source写‘淘宝’、‘拼多多’……
.must(QueryBuilders.matchQuery("source", source))
//
.must(QueryBuilders.rangeQuery("price").gte(price * 0.8))
//匹配item_name字段的值
.should(QueryBuilders.matchQuery(name, value))
sourceBuilder.query(boolQueryBuilder).size(size)
searchRequest.source(sourceBuilder)
val highlightBuilder = new HighlightBuilder
highlightBuilder.field("item_name", 10)
.preTags("<")
.postTags(">")
sourceBuilder.highlighter(highlightBuilder)
val response = client.search(searchRequest, RequestOptions.DEFAULT)
response.getHits
}
}
package tools.es
import model.Item
import tools.base64.Base64Tools
import tools.json.Jackson
/**
* Created with IntelliJ IDEA.
* Class: SaveData
* Description:
* User: lin
* Date: 2021-06-04
* Time: 15:04
*/
object SaveData {
def saveToES(x: Item): String = {
// 判断ES是否重复
val item_id = x.item_id
val id = Base64Tools.encode("tb" + item_id)
val exists = ESUtils.getById(id).isExists
if (exists) {
"商品已存在"
}
// 存入ES
ESUtils.addOne(id, Jackson.parseBeanToString(x))
}
}
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"label_analyzer": {
"type": "pattern",
"pattern": ","
},
"category_analyzer": {
"type": "pattern",
"pattern": "/"
}
}
}
},
"mappings": {
"properties": {
"platform": {
"type": "integer"
},
"item_id": {
"type": "keyword"
},
"item_name": {
"search_analyzer": "ik_smart",
"similarity": "BM25",
"analyzer": "ik_max_word",
"type": "text"
},
"item_desc": {
"search_analyzer": "ik_smart",
"similarity": "BM25",
"analyzer": "ik_max_word",
"type": "text"
},
"brand_name": {
"type": "keyword"
},
"item_url": {
"type": "keyword"
},
"item_pic_url": {
"type": "keyword"
},
"item_volume": {
"type": "integer"
},
"origin_category": {
"type": "text",
"analyzer": "category_analyzer",
"search_analyzer": "category_analyzer",
"similarity": "BM25"
},
"new_category": {
"type": "text",
"analyzer": "category_analyzer",
"search_analyzer": "category_analyzer",
"similarity": "BM25"
},
"price": {
"type": "double"
},
"coupon_price": {
"type": "double"
},
"commission_rate": {
"type": "double"
},
"coupon_commission": {
"type": "double"
},
"commission_start_time": {
"type": "long"
},
"commission_end_time": {
"type": "long"
},
"coupon_amount": {
"type": "double"
},
"coupon_tips": {
"type": "keyword"
},
"coupon_count": {
"type": "integer"
},
"coupon_remain": {
"type": "integer"
},
"coupon_start_time": {
"type": "long"
},
"coupon_end_time": {
"type": "long"
},
"coupon_link": {
"type": "keyword"
},
"item_label": {
"type": "text",
"analyzer": "label_analyzer",
"search_analyzer": "label_analyzer",
"similarity": "BM25"
},
"shop_id": {
"type": "keyword"
},
"shop_name": {
"type": "keyword"
},
"shop_level": {
"type": "integer"
},
"description_level": {
"type": "double"
},
"express_level": {
"type": "double"
},
"server_level": {
"type": "double"
},
"shop_type": {
"type": "integer"
},
"createTime": {
"type": "long"
}
}
}
}
\ No newline at end of file
## 建库语句
~~~shell
put /goods_v2
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"label_analyzer": {
"type": "pattern",
"pattern": ","
},
"category_analyzer": {
"type": "pattern",
"pattern": "\\"
}
}
}
},
"mappings": {
"properties": {
"platform": {
"type": "integer"
},
"item_id": {
"type": "keyword"
},
"item_name": {
"search_analyzer": "ik_smart",
"similarity": "BM25",
"analyzer": "ik_max_word",
"type": "text"
},
"item_desc": {
"search_analyzer": "ik_smart",
"similarity": "BM25",
"analyzer": "ik_max_word",
"type": "text"
},
"brand_name": {
"type": "keyword"
},
"item_url": {
"type": "keyword"
},
"item_pic_url": {
"type": "keyword"
},
"item_volume": {
"type": "integer"
},
"origin_category": {
"type": "text",
"analyzer": "category_analyzer",
"search_analyzer": "category_analyzer",
"similarity": "BM25"
},
"new_category": {
"type": "text",
"analyzer": "category_analyzer",
"search_analyzer": "category_analyzer",
"similarity": "BM25"
},
"price": {
"type": "double"
},
"coupon_price": {
"type": "double"
},
"commission_rate": {
"type": "double"
},
"coupon_commission": {
"type": "double"
},
"commission_start_time": {
"type": "long"
},
"commission_end_time": {
"type": "long"
},
"coupon_amount": {
"type": "double"
},
"coupon_tips": {
"type": "keyword"
},
"coupon_count": {
"type": "integer"
},
"coupon_remain": {
"type": "integer"
},
"coupon_start_time": {
"type": "long"
},
"coupon_end_time": {
"type": "long"
},
"coupon_link": {
"type": "keyword"
},
"item_label": {
"type": "text",
"analyzer": "label_analyzer",
"search_analyzer": "label_analyzer",
"similarity": "BM25"
},
"shop_id": {
"type": "keyword"
},
"shop_name": {
"type": "keyword"
},
"shop_level": {
"type": "integer"
},
"description_level": {
"type": "double"
},
"express_level": {
"type": "double"
},
"server_level": {
"type": "double"
},
"shop_type": {
"type": "integer"
},
"createTime": {
"type": "long"
}
}
}
}~~~
~~~shell
curl qmbigdata01:9200/goods?pretty \
-X PUT \
-H "Content-Type:application/json" \
-d '{"settings":{"number_of_shards":1,"number_of_replicas":1,"analysis":{"analyzer":{"label_analyzer":{"type":"pattern","pattern":","},"category_analyzer":{"type":"pattern","pattern":"/"}}}},"mappings":{"properties":{"platform":{"type":"integer"},"item_id":{"type":"keyword"},"item_name":{"search_analyzer":"ik_smart","similarity":"BM25","analyzer":"ik_max_word","type":"text"},"item_desc":{"search_analyzer":"ik_smart","similarity":"BM25","analyzer":"ik_max_word","type":"text"},"brand_name":{"type":"keyword"},"item_url":{"type":"keyword"},"item_pic_url":{"type":"keyword"},"item_volume":{"type":"integer"},"origin_category":{"type":"text","analyzer":"category_analyzer","search_analyzer":"category_analyzer","similarity":"BM25"},"new_category":{"type":"text","analyzer":"category_analyzer","search_analyzer":"category_analyzer","similarity":"BM25"},"price":{"type":"double"},"coupon_price":{"type":"double"},"commission_rate":{"type":"double"},"coupon_commission":{"type":"double"},"commission_start_time":{"type":"long"},"commission_end_time":{"type":"long"},"coupon_amount":{"type":"double"},"coupon_tips":{"type":"keyword"},"coupon_count":{"type":"integer"},"coupon_remain":{"type":"integer"},"coupon_start_time":{"type":"long"},"coupon_end_time":{"type":"long"},"coupon_link":{"type":"keyword"},"item_label":{"type":"text","analyzer":"label_analyzer","search_analyzer":"label_analyzer","similarity":"BM25"},"shop_id":{"type":"keyword"},"shop_name":{"type":"keyword"},"shop_level":{"type":"integer"},"description_level":{"type":"double"},"express_level":{"type":"double"},"server_level":{"type":"double"},"shop_type":{"type":"integer"},"createTime":{"type":"long"}}}}'
~~~
\ No newline at end of file
package tools.http
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils
import tools.json.Jackson
import tools.properties.PropertiesTools
import tools.unicode.UnicodeUtils
import scala.collection.mutable
/**
* @ClassName: qm.tools.http
* @Description: 请求网络工具
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 11:25
*/
object HttpTools {
private[this] val properties: Properties = PropertiesTools.getProperties
private[this] val url = properties.getProperty("nlp.interface.url")
def apply(method: String): HttpTools = new HttpTools(method, this.url)
def main(args: Array[String]): Unit = {
val str = apply("POST").nlp_request("男钱包男钱夹男短款票夹驾驶证钱夹手拿多用票夹横款钱包")
println(str)
}
}
class HttpTools(method: String, url: String, header: String =
"""
|{"Content-Type":"application/json"}
|""".stripMargin) {
// 创建请求客户端
private[this] val httpClient: CloseableHttpClient = HttpClients.createDefault()
// 创建基本请求
private[this] var httpRequest: HttpRequestBase = _
// 判断请求的方式
if (method == "GET") {
httpRequest = new HttpGet(url)
}
else if (method == "POST") {
httpRequest = new HttpPost(url)
}
// 定义请求头
if (header != null) {
val json = JSON.parseObject(header)
json.keySet().toArray.map(_.toString).foreach(key => httpRequest.setHeader(key, json.getString(key)))
}
/**
* 请求分词工具方法
*
* @param params 要分词的标题
* @return 分词结果
*/
def nlp_request(params: String): String = {
if (params != null) {
val body: mutable.Map[String, String] = mutable.Map()
body += ("text" -> params)
val str = Jackson.parseBeanToString(body)
httpRequest.asInstanceOf[HttpPost].setEntity(new StringEntity(str, "utf-8"))
}
val response = httpClient.execute(httpRequest)
val str = EntityUtils.toString(response.getEntity, "utf-8")
val map = Jackson.parseFirstKey(UnicodeUtils.decode(str))
val code = map.getOrElse("code", "0").toInt
val msg = map.getOrElse("msg", "").replaceAll("\"", "")
if (code == 200 && msg.equals("success")) {
map.getOrElse("data", "").replaceAll("\"", "")
}
else ""
}
}
package tools.json
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
/**
* @ClassName: qm.tools.json
* @Description: json转换类
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 13:24
*/
object Jackson {
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
/**
* 解析一级key
*
* @param str 要解析的字符串
* @return 解析结果
*/
def parseFirstKey(str: String): mutable.Map[String, String] = {
val map: mutable.Map[String, String] = mutable.Map()
Try {
// 获取所有节点
val node = mapper.readTree(str)
// 获取所有key
val keys = node.fieldNames()
// 循环所有节点
while (keys.hasNext) {
val next = keys.next()
// 解析一级key
map += (next -> node.get(next).toString)
}
} match {
case Success(value) => map
case Failure(exception) => map
}
}
/**
* 解析所有key
* 递归操作
*
* @param str 要解析的字符串
* @param res 解析结果map
* @return 返回res
*/
def parseAllKey(str: String, res: mutable.Map[String, String]): mutable.Map[String, String] = {
Try {
// 获取所有节点
val node = mapper.readTree(str)
// 获取节点所有key
val keys = node.fieldNames()
// 循环读取所有key
while (keys.hasNext) {
val key = keys.next()
val chNode = node.get(key)
// 如果子节点是json对象,递归解析子节点
if (chNode.isObject) {
parseAllKey(chNode.toString, res)
}
else {
res += (key -> chNode.toString)
}
}
} match {
// 解析成功
case Success(value) => res
// 解析失败
case Failure(exception) => res
}
}
/**
* 对象转json
*
* @param bean 要转的对象
* @return json结果
*/
def parseBeanToString(bean: Any): String = {
mapper.writeValueAsString(bean)
}
/**
* json转数组
*
* @param str json数组
* @return scala数组
*/
def parseStringToArray(str: String):Array[Any] = {
mapper.readValue(str, classOf[Array[Any]])
}
def main(args: Array[String]): Unit = {
val str =
"""
|{
| "name": "qmbigdata05",
| "cluster_name": "ES-Cluster",
| "cluster_uuid": "QdDDUfHFRdyse9OmjPsHxA",
| "version": {
| "number": "7.10.0",
| "build_flavor": "default",
| "build_type": "tar",
| "build_hash": "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",
| "build_date": "2020-11-09T21:30:33.964949Z",
| "build_snapshot": false,
| "lucene_version": "8.7.0",
| "minimum_wire_compatibility_version": "6.8.0",
| "minimum_index_compatibility_version": "6.0.0-beta1"
| },
| "tagline": "You Know, for Search",
| "list":[{
| "add":123456
| }]
|}
|""".stripMargin
val map: mutable.Map[String, String] = mutable.Map()
parseAllKey(str, map)
println(parseFirstKey(str))
println(map)
}
}
package tools.kafka
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import tools.properties.PropertiesTools
/**
* @ClassName: KafkaConsumer
* @Description: TODO
* @Create by: LinYoung
* @Date: 2021/1/13 14:54
*/
object KafkaConsumer {
private[this] val properties: Properties = PropertiesTools.getProperties
private[this] val topic: String = properties.getProperty("kafka.topic")
private[this] val bootstrapServer: String = properties.getProperty("kafka.cluster")
private[this] val group: String = properties.getProperty("kafka.consumer.group")
private[this] val keyDeserializer: String = properties.getProperty("kafka.consumer.key.deserializer")
private[this] val valueDeserializer: String = properties.getProperty("kafka.consumer.value.deserializer")
private[this] val commitInterval: String = properties.getProperty("kafka.consumer.offset.commit.auto.interval")
private[this] val AutoCommit: String = properties.getProperty("kafka.consumer.offset.commit.auto")
def getConsumer: FlinkKafkaConsumer[String] = {
val property = new Properties()
// 对接kafka
property.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
property.put(ConsumerConfig.GROUP_ID_CONFIG, group)
property.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer)
property.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer)
property.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, commitInterval)
property.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, AutoCommit)
new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), property)
}
}
package tools.properties
import java.util.Properties
/**
* @ClassName: qm.tools
* @Description: 获取属性工具
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 11:28
*/
object PropertiesTools {
private[this] val properties = new Properties()
properties.load(this.getClass.getResourceAsStream("/app.properties"))
def getProperties: Properties = properties
}
package tools.timestamp
import java.text.{ParsePosition, SimpleDateFormat}
/**
* @ClassName: qm.qiaomeng.tools
* @Description: 时间戳转换工具
* @Author: LinYoung
* @Date: 2021/4/15
* @Time: 14:46
*/
object TimestampTools {
def date2Timestamp(date: String): Long = {
if (date.isEmpty) {
0
} else {
new SimpleDateFormat("yyyy-MM-dd").parse(date, new ParsePosition(0)).getTime
}
}
}
package tools.unicode
import java.util.regex.Pattern
/**
* @ClassName: qm.qiaomeng.jackson
* @Description:
* @Author:LinYoung
* @Date: 2021/3/26
* @Time: 13:09
*/
object UnicodeUtils {
/**
* 解码
* @param str 要解码的字符串
* @return 解码后的字符串
*/
def decode(str: String): String = {
val pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))")
val matcher = pattern.matcher(str)
var unicode = str
while (matcher.find()) {
val ch: Char = Integer.parseInt(matcher.group(2), 16).asInstanceOf[Char]
unicode = unicode.replace(matcher.group(1), ch + "")
}
unicode
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment