From bb36836f90aceb250fa506fdaa9b31761b312b83 Mon Sep 17 00:00:00 2001 From: RingEric <younglin495@gmail.com> Date: Thu, 28 Jan 2021 10:23:25 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86flink?= =?UTF-8?q?=E5=85=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/scala/qm/flink/FlinkEntry.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/scala/qm/flink/FlinkEntry.scala b/src/main/scala/qm/flink/FlinkEntry.scala index 8b989af..5d11c55 100644 --- a/src/main/scala/qm/flink/FlinkEntry.scala +++ b/src/main/scala/qm/flink/FlinkEntry.scala @@ -6,6 +6,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import qm.kafka.KafkaConsumer + /** * @ClassName: Flink * @Description: TODO @@ -30,9 +31,11 @@ object FlinkEntry { // æ•°æ®å¤„ç†æµç¨‹ Analysis.analysis(x) }) - .map(x => { - x.mkString("{", ",", "}") - }) + //将数组压平 + .flatMap(x => x) + //过滤掉ä¸åˆç†çš„æ•°æ® + .filter(x => x.eventName.nonEmpty) + .executeAndCollect() .foreach(println) -- 2.26.2