Commit 8359e6c9 authored by 杨林's avatar 杨林

Merge branch 'yang' into 'master'

Yang

See merge request !1
parents f840e443 1be54eee
# goodsinfo
# goodsinfo
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>groupId</groupId>
<artifactId>goodsinfo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>org.scalaj</groupId>
<artifactId>scalaj-http_2.11</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-high-level-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.6.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client-sniffer -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>7.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.24</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.lettuce/lettuce-core -->
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>6.0.1.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scalatest/scalatest -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>3.3.0-SNAP3</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.12.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.12.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.12.2</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.xml</include>
<include>**/*.dic</include>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.6</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 主类 -->
<mainClass>qm.entry.Entry</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
# kafka主题
kafka.topic = goods
#kafka.topic = jdgoods
# kafka集群
kafka.cluster = qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092
# kafka消费者组
kafka.consumer.group = goods-group
# kafka消费者key反序列化
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
# kafka消费者value反序列化
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
# kafka消费偏移量
kafka.consumer.offset.commit.auto.interval = 10000
kafka.consumer.offset.commit.auto = true
# 分词接口
nlp.interface.url = http://bigdata.xiaomanxiong.com/nlp
# 分词验证
nlp.interface.auth = QapsTeh8kHYdXTOfeLx05BfmIiQ=
# es集群
es.cluster = qmbigdata01:9200,qmbigdata02:9200,qmbigdata03:9200,qmbigdata04:9200,qmbigdata05:9200
# es索引
es.indices = goods
\ No newline at end of file
package qm.entry
/**
* @ClassName: qm.entry
* @Description: 程序处理入口
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 11:20
*/
object Entry {
def main(args: Array[String]): Unit = {
FlinkEntry.entry()
}
}
package qm.entry
/**
* @ClassName: qm.entry
* @Description:
* @Author:LinYoung
* @Date: 2021/5/26
* @Time: 11:21
*/
object FlinkEntry {
def entry(): Unit = {
}
}
package qm.tools.base64
import java.util.Base64
/**
* @ClassName: qm.tools
* @Description: base64工具
* @Author: LinYoung
* @Date: 2021/4/15
* @Time: 20:21
*/
object Base64Tools {
//Base64编码
private[this] val encoder: Base64.Encoder = Base64.getEncoder
def encode(str: String): String = {
encoder.encodeToString(str.getBytes)
}
}
package qm.tools.es
/**
* @ClassName: qm.tools.es
* @Description:
* @Author:LinYoung
* @Date: 2021/5/26
* @Time: 11:26
*/
class ESTools {
}
package qm.tools.http
import java.util.Properties
import com.alibaba.fastjson.JSON
import org.apache.http.client.methods.{HttpGet, HttpPost, HttpRequestBase}
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.{CloseableHttpClient, HttpClients}
import org.apache.http.util.EntityUtils
import qm.tools.json.Jackson
import qm.tools.properties.PropertiesTools
import qm.tools.unicode.UnicodeUtils
import scala.collection.mutable
/**
* @ClassName: qm.tools.http
* @Description:
* @Author:LinYoung
* @Date: 2021/5/26
* @Time: 11:25
*/
object HttpTools {
private[this] val properties: Properties = PropertiesTools.getProperties
def apply(method: String, url: String, header: String): HttpTools = new HttpTools(method, url, header)
}
class HttpTools(method: String, url: String, header: String =
"""
|{"Content-Type":"application/json"}
|""".stripMargin) {
private[this] val httpClient: CloseableHttpClient = HttpClients.createDefault()
var httpRequest: HttpRequestBase = _
if (method == "GET") {
httpRequest = new HttpGet(url)
}
else if (method == "POST") {
httpRequest = new HttpPost(url)
}
if (header != null) {
val json = JSON.parseObject(header)
json.keySet().toArray.map(_.toString).foreach(key => httpRequest.setHeader(key, json.getString(key)))
}
def nlp_request(params: String): String = {
if (params != null) {
val body: mutable.Map[String, String] = mutable.Map()
body += ("text" -> params)
val str = Jackson.parseBeanToString(body)
httpRequest.asInstanceOf[HttpPost].setEntity(new StringEntity(str, "utf-8"))
}
val response = httpClient.execute(httpRequest)
val str = EntityUtils.toString(response.getEntity, "utf-8")
val map = Jackson.parseFirstKey(UnicodeUtils.decode(str))
val code = map.getOrElse("code", "0").toInt
val msg = map.getOrElse("msg", "")
if (code == 200 && msg.equals("success")) {
map.getOrElse("data", "")
}
else ""
}
}
package qm.tools.json
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
/**
* @ClassName: qm.tools.json
* @Description: json转换类
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 13:24
*/
object Jackson {
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
/**
* 解析一级key
*
* @param str 要解析的字符串
* @return 解析结果
*/
def parseFirstKey(str: String): mutable.Map[String, String] = {
val map: mutable.Map[String, String] = mutable.Map()
Try {
// 获取所有节点
val node = mapper.readTree(str)
// 获取所有key
val keys = node.fieldNames()
// 循环所有节点
while (keys.hasNext) {
val next = keys.next()
// 解析一级key
map += (next -> node.get(next).toString)
}
} match {
case Success(value) => map
case Failure(exception) => map
}
}
/**
* 解析所有key
* 递归操作
*
* @param str 要解析的字符串
* @param res 解析结果map
* @return 返回res
*/
def parseAllKey(str: String, res: mutable.Map[String, String]): mutable.Map[String, String] = {
Try {
// 获取所有节点
val node = mapper.readTree(str)
// 获取节点所有key
val keys = node.fieldNames()
// 循环读取所有key
while (keys.hasNext) {
val key = keys.next()
val chNode = node.get(key)
// 如果子节点是json对象,递归解析子节点
if (chNode.isObject) {
parseAllKey(chNode.toString, res)
}
else {
res += (key -> chNode.toString)
}
}
} match {
// 解析成功
case Success(value) => res
// 解析失败
case Failure(exception) => res
}
}
/**
* 对象转json
*
* @param bean 要转的对象
* @return json结果
*/
def parseBeanToString(bean: Any): String = {
mapper.writeValueAsString(bean)
}
def main(args: Array[String]): Unit = {
val str =
"""
|{
| "name": "qmbigdata05",
| "cluster_name": "ES-Cluster",
| "cluster_uuid": "QdDDUfHFRdyse9OmjPsHxA",
| "version": {
| "number": "7.10.0",
| "build_flavor": "default",
| "build_type": "tar",
| "build_hash": "51e9d6f22758d0374a0f3f5c6e8f3a7997850f96",
| "build_date": "2020-11-09T21:30:33.964949Z",
| "build_snapshot": false,
| "lucene_version": "8.7.0",
| "minimum_wire_compatibility_version": "6.8.0",
| "minimum_index_compatibility_version": "6.0.0-beta1"
| },
| "tagline": "You Know, for Search",
| "list":[{
| "add":123456
| }]
|}
|""".stripMargin
val map: mutable.Map[String, String] = mutable.Map()
parseAllKey(str, map)
println(parseFirstKey(str))
println(map)
}
}
package qm.tools.properties
import java.util.Properties
/**
* @ClassName: qm.tools
* @Description: 获取属性工具
* @Author: LinYoung
* @Date: 2021/5/26
* @Time: 11:28
*/
object PropertiesTools {
private[this] val properties = new Properties()
properties.load(this.getClass.getResourceAsStream("/app.properties"))
def getProperties: Properties = properties
}
package qm.tools.unicode
import java.util.regex.Pattern
/**
* @ClassName: qm.qiaomeng.jackson
* @Description:
* @Author:LinYoung
* @Date: 2021/3/26
* @Time: 13:09
*/
object UnicodeUtils {
/**
* 解码
* @param str 要解码的字符串
* @return 解码后的字符串
*/
def decode(str: String): String = {
val pattern = Pattern.compile("(\\\\u(\\p{XDigit}{4}))")
val matcher = pattern.matcher(str)
var unicode = str
while (matcher.find()) {
val ch: Char = Integer.parseInt(matcher.group(2), 16).asInstanceOf[Char]
unicode = unicode.replace(matcher.group(1), ch + "")
}
unicode
}
}
# kafka主题
kafka.topic = goods
#kafka.topic = jdgoods
# kafka集群
kafka.cluster = qmbigdata01:9092,qmbigdata02:9092,qmbigdata03:9092
# kafka消费者组
kafka.consumer.group = goods-group
# kafka消费者key反序列化
kafka.consumer.key.deserializer = org.apache.kafka.common.serialization.StringDeserializer
# kafka消费者value反序列化
kafka.consumer.value.deserializer = org.apache.kafka.common.serialization.StringDeserializer
# kafka消费偏移量
kafka.consumer.offset.commit.auto.interval = 10000
kafka.consumer.offset.commit.auto = true
# 分词接口
nlp.interface.url = http://bigdata.xiaomanxiong.com/nlp
# 分词验证
nlp.interface.auth = QapsTeh8kHYdXTOfeLx05BfmIiQ=
# es集群
es.cluster = qmbigdata01:9200,qmbigdata02:9200,qmbigdata03:9200,qmbigdata04:9200,qmbigdata05:9200
# es索引
es.indices = goods
\ No newline at end of file
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment