• 附.项目实战
    • 1. 项目整体介绍
      • 1.1 电商用户行为分析
      • 1.2 项目模块设计
    • 2. 实时热门商品统计
    • 3. 实时流量统计
      • 3.1 PVTopN - 热点网站
      • 3.2 PV - 网站总浏览量
      • 3.3 UV - 网站独立访客数
    • 4. 市场营销商业指标统计分析
      • 4.1 APP 市场推广统计
      • 4.2 页面广告分析
    • 5. 恶意登陆监控
      • 5.1 Demo
      • 5.2 CEP
    • 6. 订单支付实时监控
      • 6.1 付款超时 - Cep Code
      • 6.2 付款超时 - Without Cep
      • 6.3 实时对账 - connect
      • 6.4 实时对账 - intervalJoin
    • 7. 电商常见指标汇总
  • 附.Q&A

注:次文档参考 【尚硅谷】大数据高级 flink技术精讲(2020年6月) 编写。

1.由于视频中并未涉及到具体搭建流程,Flink 环境搭建部分并未编写。
2.视频教程 Flink 版本为 1.10.0,此文档根据 Flink v1.11.1 进行部分修改。
3.文档中大部分程序在 Windows 端运行会有超时异常,需要打包后在 Linux 端运行。
4.程序运行需要的部分 Jar 包,请很具情况去掉 pom 中的 “scope” 标签的再进行打包,才能在集群上运行。
5.原始文档在 Markdown 中编写,此处目录无法直接跳转。且因字数限制,分多篇发布

此文档仅用作个人学习,请勿用于商业获利。

附.项目实战

此处大部分为代码部分,具体操作略

1. 项目整体介绍

1.1 电商用户行为分析

日志分类:

  • 用户
    • 登陆方式
    • 上线时间点和时长
    • 页面停留和跳转
  • 用户对商品的行为数据
    • 收藏/喜欢/评分/评价/打标签
    • 点击/浏览/购买/支付

日志分析有哪几类

  • 统计分析
    • 点击、浏览
    • 热门商品,近期热门商品、分类热门商品、流量统计
  • 偏好统计
    • 收藏、喜欢、评分、打标签
    • 用户画像,推荐列表
  • 风险控制
    • 下订单、支付、登陆
    • 刷单监控、订单失效监控、恶意登陆(短时间内频繁登陆失败)

1.2 项目模块设计

  • 实时统计分析

    • 热门商品
    • 流量统计
      • 热门页面统计
      • PV
      • UV
    • 市场营销指标
      • APP 市场推广
      • 页面广告分析
    • 实时访问流量统计
    • 页面广告点击量统计
  • 业务流程及风险控制

    • 页面广告黑名单过滤
    • 恶意登陆监控
    • 订单支付
      • 超时失效
      • 实时对账

2. 实时热门商品统计

需求

求一段时间内商品访问量的 Top N

需求分析

一段时间 -> 需要使用时间窗进行分组求和

Top N -> 需要对这个窗口内的数据进行统计排序。也就是接收上一步的数据,按窗口分组,然后排序

数据处理流程

source == map、assignAscendingTimestamps ==> 样例类 == filter、keyBy、timeWindow ==> 开窗
== aggregate(new CountAgg(), new ItemCountWindowResult()) ==> 窗口内进行分组求和,并获取窗口信息
== keyBy(windowEnd) ==> 取当前分组内的TopN,通过定时器触发 Sink

Pom

    <properties><flink.version>1.11.1</flink.version><scala.binary.version>2.11</scala.binary.version><kafka.version>2.4.1</kafka.version></properties><dependencies><!--  ======== Flink Core ========  --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- 由于集群上已经有该 jar 包,若要上传到集群上执行,则去掉以下注释 --><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><!-- 由于集群上已经有该 jar 包,若要上传到集群上执行,则去掉以下注释 --><scope>provided</scope></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--  ======== Flink Sink Connector ========  --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version>
<!--            <scope>provided</scope>--></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_${scala.binary.version}</artifactId><version>${kafka.version}</version><scope>provided</scope></dependency></dependencies><build><resources><resource><directory>src/main/resources</directory><includes><include>*.properties</include><include>*.txt</include>
<!--                    <include>*.csv</include>--></includes><excludes><exclude>*.xml</exclude><exclude>*.yaml</exclude></excludes></resource></resources><plugins><!-- 编译 Scala 需要用到的插件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><configuration><addScalacArgs>-target:jvm-1.8</addScalacArgs></configuration><executions><execution><!-- 声明绑定到 maven 的 compile 阶段 --><goals><goal>compile</goal></goals></execution></executions></plugin><!-- 项目打包需要用到的插件 --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>

Code

package com.mso.hotitems_analysisimport java.sql.Timestamp
import java.util.Propertiesimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer// 定义输入数据的样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)// 定义窗口聚合结果样例类
case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)/*** 热门商品统计。** 设置滑动事件窗口。窗口大小 1h,滑动步长 5min* 窗口聚合:定义窗口聚合规则 和 输出数据结构 - .aggregate( new CountAgg(), new WindowResultFunction())* 进行统计整理:按照关窗的时间分组,使用状态编程,并定义定时器定时输出 - keyBy("windowEnd")** 最终排序输出 - keyedProcessFunction :* - 针对有状态流的 API* - KeyedProcessFunction 会对分区后的每一条子流进行处理* - 以 windowEnd 作为 key,保证分流以后每一条流的数据都在一个时间窗口内* - 从 ListState 中读取当前流的状态,存储数据进行排序输出** 用 ProcessFunction 来定义 KeyedStream 的处理逻辑* 分区之后,每个 KeyedStream 都有其自己的生命周期* - open : 初始化,在这里可以获取当前流的状态* - processElement : 处理流中每一个元素时调用* - onTimer : 定时调用,注册定时器 Timer 并触发之后的回调操作*/
object HotItems {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 1. Source - data file : UserBehavior.csv//    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)//    val inputStream: DataStream[String] = environment.readTextFile(parameterTool.get("input-path"))val properties = new Properties()properties.setProperty("bootstrap.servers", "test01:9092")properties.setProperty("group.id", "consumer-group")properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")properties.setProperty("auto.offset.reset", "latest")val inputStream: DataStream[String] = environment.addSource(new FlinkKafkaConsumer[String]("HotItems", new SimpleStringSchema(), properties))// 2. 将数据转换为样例类,并提取 timestamp 定义为 watermarkval dataStream: DataStream[UserBehavior] = inputStream.map((data: String) => {val dataArray: Array[String] = data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps((_: UserBehavior).timestamp * 1000L) // 此处的测试数据为升序排序,因此不定义延迟时间// 3. Transformval processedStream: DataStream[String] = dataStream.filter((_: UserBehavior).behavior == "pv").keyBy((data: UserBehavior) => data.itemId).timeWindow(Time.hours(1), Time.minutes(5)).aggregate(new CountAgg(), new ItemCountWindowResult()).keyBy((data: ItemViewCount) => data.windowEnd).process(new TopNHotItems(5))// 4、sink,控制台输出processedStream.print()environment.execute("HotItems Job")}
}// 自定义聚合函数,来一条数据就 +1,
// * @param <IN>  The type of the values that are aggregated (input values)
// * @param <ACC> The type of the accumulator (intermediate aggregate state).
// * @param <OUT> The type of the aggregated result
class CountAgg() extends AggregateFunction[UserBehavior, Long, Long] {override def createAccumulator(): Long = 0Loverride def add(value: UserBehavior, accumulator: Long): Long = accumulator + 1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a + b
}// 自定义窗口函数,结合 window 信息,包装成样例类
class ItemCountWindowResult extends WindowFunction[Long, ItemViewCount, Long, TimeWindow] {override def apply(key: Long, window: TimeWindow, input: Iterable[Long], out: Collector[ItemViewCount]): Unit = {out.collect(ItemViewCount(key, window.getEnd, input.iterator.next()))}
}// 自定义 KeyedProcessFunction。 对窗口聚合结果进行分组,并做排序取 TopN 输出
class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Long, ItemViewCount, String] {// 定义一个 ListState,用来保存当前窗口所有的 count 结果private var itemCountListState: ListState[ItemViewCount] = _override def open(parameters: Configuration): Unit = {itemCountListState = getRuntimeContext.getListState(new ListStateDescriptor[ItemViewCount]("itemCount-ListState", classOf[ItemViewCount]))}override def processElement(value: ItemViewCount, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#Context, out: Collector[String]): Unit = {// 每来一条数据就添加到状态中itemCountListState.add(value)// 注册定时器,在 windowEnd + 1 触发ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}// 定时器触发时,从状态中取数据,然后排序输出override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {// 将所有state中的数据取出,放到一个List Buffer中val allItemCountList: ListBuffer[ItemViewCount] = new ListBuffer()import scala.collection.JavaConversions._for (itemCount <- itemCountListState.get()) {allItemCountList += itemCount}// 按照 count 大小排序,并取前 n 个val sortedItemsCountList: ListBuffer[ItemViewCount] = allItemCountList.sortBy((_: ItemViewCount).count)(Ordering.Long.reverse).take(topSize)// 清空状态itemCountListState.clear()// 将排名结果格式化输出val result: StringBuilder = new StringBuilder()result.append("Time : ").append(new Timestamp(timestamp - 1)).append("\n")// 遍历 sorted 列表,输出每一个商品的信息for (i <- sortedItemsCountList.indices) {val currentItemCount: ItemViewCount = sortedItemsCountList(i)result.append("No").append(i + 1).append(":").append(" 商品ID=").append(currentItemCount.itemId).append(" 浏览量=").append(currentItemCount.count).append("\n")}result.append("================================")out.collect(result.toString())// 由于测试是历史数据,所有数据都会一次性输出,此处需要控制输出频率Thread.sleep(500)}
}// 自定义预聚合函数计算平均数,状态为 (sum, count)
class AverageAgg() extends AggregateFunction[UserBehavior, (Long, Int), Double] {override def createAccumulator(): (Long, Int) = (0L, 0)override def add(value: UserBehavior, accumulator: (Long, Int)): (Long, Int) =(accumulator._1 + value.timestamp, accumulator._2 + 1)override def getResult(accumulator: (Long, Int)): Double = accumulator._1 / accumulator._2.toDoubleoverride def merge(a: (Long, Int), b: (Long, Int)): (Long, Int) =(a._1 + b._1, a._2 + b._2)
}

KafkaUtil Code

package com.mso.hotitems_analysisimport java.io.InputStream
import java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}object KafkaProducerUtil {def main(args: Array[String]): Unit = {writeToKafka("HotItems")}def writeToKafka(topic: String): Unit = {val properties = new Properties()properties.put("bootstrap.servers", "test01:9092")properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")// 定义一个 kafka producerval producer = new KafkaProducer[String, String](properties)val stream: InputStream = getClass.getResourceAsStream("/UserBehavior.csv")val lines: Iterator[String] = scala.io.Source.fromInputStream(stream).getLinesfor (line <- lines) {val record = new ProducerRecord[String, String](topic, line)producer.send(record)}// 发送完毕,关闭生产者producer.close()}
}

Code - HotItemsWithTableApi

package com.mso.hotitems_analysisimport java.net.URL
import java.sql.Timestampimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._object HotItemsWithTableApi {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)// 1. Source - data file : UserBehavior.csv//    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)//    val inputStream: DataStream[String] = environment.readTextFile(parameterTool.get("input-path"))val resource: URL = getClass.getResource("/UserBehavior.csv")val inputStream: DataStream[String] = environment.readTextFile(resource.getPath)// 2. 将数据转换为样例类,并提取 timestamp 定义为 watermarkval dataStream: DataStream[UserBehavior] = inputStream.map((data: String) => {val dataArray: Array[String] = data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps((_: UserBehavior).timestamp * 1000L) // 此处的测试数据为升序排序,因此不定义延迟时间// 创建 Table 执行环境val environmentSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()val tableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(environment, environmentSettings)// **********************// Option 1 : Use Table APi & SQL// **********************val tableStream: Table = tableEnvironment.fromDataStream(dataStream, 'itemId, 'behavior, 'timestamp.rowtime() as 'ts)// 分组开窗增量聚合val aggTable: Table = tableStream.filter('behavior === "pv").window(Slide over 1.hours() every 5.minutes() on 'ts as 'sw).groupBy('itemId, 'sw).select('itemId, 'itemId.count() as 'cnt, 'sw.end() as 'windowEnd)// 用 SQL 实现分组选取 Top N 的功能tableEnvironment.createTemporaryView("aggTable", aggTable, 'itemId, 'cnt, 'windowEnd)val resultTable: Table = tableEnvironment.sqlQuery("""|select *|from (|select * , ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num|from aggTable|) where row_num<=5|""".stripMargin)//     打印输出tableEnvironment.toRetractStream[(Long, Long, Timestamp, Long)](resultTable).print("resultTable")// **********************// Option 2 : Use SQL// **********************// 将 dataStream 注册成表tableEnvironment.createTemporaryView("dataStream_table", dataStream, 'itemId, 'behavior, 'timestamp.rowtime() as 'ts)// 用 SQL 实现分组选取 Top N 的功能val resultTable2: Table = tableEnvironment.sqlQuery("""|select *|from (|   select * , ROW_NUMBER() over (partition by windowEnd order by cnt desc) as row_num|   from (|     select itemId, count(itemId) as cnt, hop_end(ts, interval '5' minutes, interval '1' hour) as windowEnd|     from dataStream_table|     where behavior = 'pv'|     group by hop(ts, interval '5' minutes, interval '1' hour), itemId|   )|) where row_num<=5|""".stripMargin)// 打印输出tableEnvironment.toRetractStream[(Long, Long, Timestamp, Long)](resultTable2).print("resultTable2")environment.execute("HotItemsWithTableApi")}
}

3. 实时流量统计

3.1 PVTopN - 热点网站

package com.mso.networkflow_analysisimport java.sql.Timestamp
import java.text.SimpleDateFormat
import java.time.Duration
import java.utilimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{MapState, MapStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer
import scala.util.matching.Regex// 输入数据样例类
case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)// 窗口聚合结果样例类
case class PageViewCount(url: String, windowEnd: Long, count: Long)/*** 需求 : 每 10min 的 PageView TopN ,每 10s 打印一次** 思路:* 日志解析成样例类后,按照 url 分组,开滑动窗口,每 10s 滑动一次。* 将窗口内的数据按照访问量进行排序,并通过定时器打印输出。** 注意:* 此处由于要考虑乱序数据,所以有两个定时器。* 一个用于窗口关闭后的数据输出,一个用于达到 allowedLateness 后进行状态清空。* 且也是因为要处理迟到数据,所以使用 MapState,防止数据重复输出**/
object NetworkFlowTopNPage {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val parameterTool: ParameterTool = ParameterTool.fromArgs(args)val inputStream: DataStream[String] = environment.readTextFile(parameterTool.get("input-path"))//    val inputStream: DataStream[String] = environment.socketTextStream("test01", 7777)val lateOutputTag = new OutputTag[ApacheLogEvent]("ApacheLogEvent-LateData")val dataStream: DataStream[PageViewCount] = inputStream.map((data: String) => {val strings: Array[String] = data.split(" ")val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")val eventTime: Long = simpleDateFormat.parse(strings(3).trim).getTimeApacheLogEvent(strings(0).trim, strings(1).trim, eventTime, strings(5).trim, strings(6).trim)}).filter((data: ApacheLogEvent) => {val pattern: Regex = "^((?!\\.(css|js)$).)*$".r(pattern findFirstIn data.url).nonEmpty}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner {new SerializableTimestampAssigner[ApacheLogEvent] {override def extractTimestamp(element: ApacheLogEvent, recordTimestamp: Long): Long = element.eventTime}}).keyBy((data: ApacheLogEvent) => data.url).timeWindow(Time.minutes(10), Time.seconds(10)).allowedLateness(Time.minutes(1)).sideOutputLateData(lateOutputTag).aggregate(new PageCountAgg(), new PageCountWindowResult())val resultStream: DataStream[String] = dataStream.keyBy((data: PageViewCount) => data.windowEnd).process(new TopNHotPage(5))val lateDataStream: DataStream[ApacheLogEvent] = dataStream.getSideOutput(lateOutputTag)lateDataStream.print("lateData")resultStream.print("result")environment.execute("NetworkFlowTopNPage")}
}class PageCountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {override def createAccumulator(): Long = 0Loverride def add(value: ApacheLogEvent, accumulator: Long): Long = accumulator + 1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a
}class PageCountWindowResult extends WindowFunction[Long, PageViewCount, String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PageViewCount]): Unit = {out.collect(PageViewCount(key, window.getEnd, input.head))}
}class TopNHotPage(topSize: Int) extends KeyedProcessFunction[Long, PageViewCount, String] {//  lazy val pageCountListState: ListState[PageViewCount] = getRuntimeContext.getListState(new ListStateDescriptor[PageViewCount]("pageCountListState", classOf[PageViewCount]))// 此处使用 MapState。// 一方面可以节省状态空间,少存储了 windowEnd 字段;// 另一方面,在处理迟到数据时,可以更新状态数据,而不是 List 的追加状态数据lazy val pageCountMapState: MapState[String, Long] =getRuntimeContext.getMapState(new MapStateDescriptor[String, Long]("pageCountMapState", classOf[String], classOf[Long]))override def processElement(value: PageViewCount, ctx: KeyedProcessFunction[Long, PageViewCount, String]#Context, out: Collector[String]): Unit = {//    pageCountListState.add(value)pageCountMapState.put(value.url, value.count)// 用于定时输出数据ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)// 用于等待迟到数据ctx.timerService().registerEventTimeTimer(value.windowEnd + 60 * 1000L)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PageViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {// 当 Watermark 达到最大延迟时间后(allowedLateness),再执行清空状态操作if (timestamp == ctx.getCurrentKey + 60 * 1000L) {//      pageCountListState.clear()pageCountMapState.clear()return}// Option 1//    val allPageCountList: ListBuffer[PageViewCount] = new ListBuffer()//    val iter1: util.Iterator[PageViewCount] = pageCountListState.get().iterator()//    while (iter1.hasNext) {//      allPageCountList += iter1.next()//    }//    val sortedPageCountList: ListBuffer[PageViewCount] = allPageCountList.sortWith((_: PageViewCount).count > (_: PageViewCount).count).take(topSize)//    val result1: StringBuilder = new StringBuilder()//    result1.append("时间(List):").append(new Timestamp(timestamp - 1)).append("\n")//    for (i <- sortedPageCountList.indices) {//      val currentPageCount: PageViewCount = sortedPageCountList(i)//      result1.append("NO").append(i + 1).append(":")//        .append(" URL=").append(currentPageCount.url)//        .append(" 访问量=").append(currentPageCount.count).append("\n")//    }//    result1.append("================================")//    out.collect(result1.toString())// Option 2val allPageCountMap: ListBuffer[(String, Long)] = new ListBuffer[(String, Long)]()val iter2: util.Iterator[util.Map.Entry[String, Long]] = pageCountMapState.entries().iterator()while (iter2.hasNext) {val entry: util.Map.Entry[String, Long] = iter2.next()allPageCountMap += ((entry.getKey, entry.getValue))}val sortedPageCountMap: ListBuffer[(String, Long)] = allPageCountMap.sortWith((_: (String, Long))._2 > (_: (String, Long))._2).take(topSize)val result2: StringBuilder = new StringBuilder()result2.append("时间(Map):").append(new Timestamp(timestamp - 1)).append("\n")for (i <- sortedPageCountMap.indices) {val currentPageCount: (String, Long) = sortedPageCountMap(i)result2.append("NO").append(i + 1).append(":").append(" URL=").append(currentPageCount._1).append(" 访问量=").append(currentPageCount._2).append("\n")}result2.append("================================")out.collect(result2.toString())// 由于测试是历史数据,所有数据都会一次性输出,此处需要控制输出频率Thread.sleep(500)}
}

3.2 PV - 网站总浏览量

package com.mso.networkflow_analysisimport org.apache.flink.api.common.functions.{AggregateFunction, MapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorimport scala.util.Random// 定义输入输出的样例类
case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)case class PVCount(windowEnd: Long, count: Long)/*** 需求 : 每 1H 的 PageView Count** 思路:* 日志解析成样例类后,按照 url 分组,开滚动窗口,并统计窗口内的数据量。** 注:* 要处理窗口内的数据量,有两种思路。* 一种是使用 map("pv", 1),将所有数据分到同一组进行统计求和;* 一种是自定义 Mapper,生成随机的 key,再使用 aggregate 进行预聚合,并封装当前分组的关闭时间窗,按照时间窗分组求和,并输出* 将窗口内的数据按照访问量进行排序,并通过定时器打印输出。*/
object PageView {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val parameterTool: ParameterTool = ParameterTool.fromArgs(args)val inputStream: DataStream[String] = environment.readTextFile(parameterTool.get("input-path"))val dataStream: DataStream[UserBehavior] = inputStream.map((data: String) => {val dataArray: Array[String] = data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps((_: UserBehavior).timestamp * 1000L)val resultStream: DataStream[PVCount] = dataStream.filter((_: UserBehavior).behavior == "pv").map((data: UserBehavior) => ("pv", 1L)) // map 成二元组 ("pv", count).keyBy((data: (String, Long)) => data._1) // 所有数据分到一组进行统计.timeWindow(Time.hours(1)).aggregate(new PVCountAgg(), new PVCountWindowFunction())resultStream.print("resultStream")val resultStream2: DataStream[PVCount] = dataStream.filter((_: UserBehavior).behavior == "pv")//      .map((data: UserBehavior) => ("pv", 1L)) // map 成二元组 ("pv", count).map(new RandomMapper) // 自定义 Mapper,负载均衡.keyBy((data: (String, Long)) => data._1).timeWindow(Time.hours(1)).aggregate(new PVCountAgg(), new PVCountWindowFunction()).keyBy((data: PVCount) => data.windowEnd).process(new TotalPvCountResult)resultStream2.print("resultStream2")environment.execute("PageView")}
}class PVCountAgg extends AggregateFunction[(String, Long), Long, Long] {override def createAccumulator(): Long = 0Loverride def add(value: (String, Long), accumulator: Long): Long = accumulator + 1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a
}class PVCountWindowFunction extends WindowFunction[Long, PVCount, String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[PVCount]): Unit = {out.collect(PVCount(window.getEnd, input.head))}
}// 自定义 Map 类,生成随机key,防止热点问题
class RandomMapper extends MapFunction[UserBehavior, (String, Long)] {override def map(value: UserBehavior): (String, Long) = ("pv" + Random.nextInt(10), 1L)
}// 自定义 RichMap 类,使用当前任务的索引生成 key,防止热点问题
class RandomRichMapper extends RichMapFunction[UserBehavior, (String, Long)] {lazy private val indexOfThisSubtask: Int = getRuntimeContext.getIndexOfThisSubtaskoverride def map(value: UserBehavior): (String, Long) = ("pv" + indexOfThisSubtask, 1L)
}// 自定义 KeyedProcessFunction, 将聚合结果合并
class TotalPvCountResult extends KeyedProcessFunction[Long, PVCount, PVCount] {// 定义一个状态,保存当前所有结果的和lazy val totalCountState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("totalCountState", classOf[Long]))override def processElement(value: PVCount, ctx: KeyedProcessFunction[Long, PVCount, PVCount]#Context, out: Collector[PVCount]): Unit = {totalCountState.update(totalCountState.value() + value.count)// 注册定时器,windowEnd+1 之后触发ctx.timerService().registerEventTimeTimer(value.windowEnd + 1)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, PVCount, PVCount]#OnTimerContext, out: Collector[PVCount]): Unit = {// 定时器触发,所有分区 count 值都已经到达,输出总和//    out.collect(PVCount(timestamp - 1, totalCountState.value()))    // windowEnd + 1 时触发out.collect(PVCount(ctx.getCurrentKey, totalCountState.value()))totalCountState.clear()}
}

3.3 UV - 网站独立访客数

Code 1

package com.mso.networkflow_analysisimport java.net.URLimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorcase class UvCount(windowEnd: Long, UvCount: Long)/*** 需求 : 每 1H 的 UniqueVisitor Count** 思路:* 日志解析成样例类后,开 1H 的全窗口,创建 SetState 状态,对每一条数据调用状态并添加当前 userId,窗口关闭时获取 SetState 的 size。** 注:* 此处可以使用批量聚合:.apply(new UvCountByAllWindowFunction)* 也可以使用增量聚合:.aggregate(new UvCountByAgg, new UvCountByAllWindowFunction2)* 但是当数据量很大时,会造成 OOM,因此需要借助 布隆过滤器 和 Redis*/
object UniqueVisitor {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)//    val parameterTool: ParameterTool = ParameterTool.fromArgs(args)//    val inputStream: DataStream[String] = environment.readTextFile(parameterTool.get("input-path"))val resource: URL = getClass.getResource("/UserBehavior.csv")val inputStream: DataStream[String] = environment.readTextFile(resource.getPath)val processStream: AllWindowedStream[UserBehavior, TimeWindow] = inputStream.map((data: String) => {val dataArray: Array[String] = data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps((_: UserBehavior).timestamp * 1000L).filter((_: UserBehavior).behavior == "pv").timeWindowAll(Time.hours(1))// 使用全窗口函数,窗口关闭后进行聚合val uvStream1: DataStream[UvCount] = processStream.apply(new UvCountByAllWindowFunction)// 使用增量聚合函数,来一条聚合一次val uvStream2: DataStream[UvCount] = processStream.aggregate(new UvCountByAgg, new UvCountByAllWindowFunction2)uvStream1.print("uvStream1")uvStream2.print("uvStream2")environment.execute("UniqueVisitor")}
}// 全窗口函数,当窗口关闭时批处理窗口内的数据
class UvCountByAllWindowFunction extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {// 定义一个 set 来保存所有的 userId,自动去重var userIdSet: Set[Long] = Set[Long]()// 把当前窗口所有数据的ID收集到set中,最后输出set的大小for (userBehavior <- input) {userIdSet += userBehavior.userId}out.collect(UvCount(window.getEnd, userIdSet.size))}
}// 自定义增量函数
class UvCountByAgg extends AggregateFunction[UserBehavior, Set[Long], Long] {override def createAccumulator(): Set[Long] = Set[Long]()override def add(value: UserBehavior, accumulator: Set[Long]): Set[Long] = accumulator + value.userIdoverride def getResult(accumulator: Set[Long]): Long = accumulator.sizeoverride def merge(a: Set[Long], b: Set[Long]): Set[Long] = a ++ b
}// 自定义窗口函数,包装成样例类
class UvCountByAllWindowFunction2 extends AllWindowFunction[Long, UvCount, TimeWindow] {override def apply(window: TimeWindow, input: Iterable[Long], out: Collector[UvCount]): Unit = {out.collect(UvCount(window.getEnd, input.head))}
}

Code 2

package com.mso.networkflow_analysisimport java.langimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import redis.clients.jedis.Jedis/*** 需求 : 每 1H 的 UniqueVisitor Count** 思路:* 日志解析成样例类后,获取 userId : map("uv", data.userId),将数据分到 1组内,开一小时滚动窗口。* 使用触发器,处理数据将 userId 使用布隆过滤器处理后存放到 redis 中,并清空 Flink 中数据的状态。** 注:*/
object UniqueVisitorWithBloomFilter {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)val parameterTool: ParameterTool = ParameterTool.fromArgs(args)val inputStream: DataStream[String] = environment.readTextFile(parameterTool.get("input-path"))//    val inputStream: DataStream[String] = environment.readTextFile("E:\\Personal\\PersonalPractice\\UserBehaviorAnalysis\\NetworkFlowAnalysis\\src\\main\\resources\\UserBehavior.csv")val resultStream = inputStream.map((data: String) => {val dataArray: Array[String] = data.split(",")UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)}).assignAscendingTimestamps((_: UserBehavior).timestamp * 1000L).filter((_: UserBehavior).behavior == "pv").map((data: UserBehavior) => ("uv", data.userId)).keyBy((data: (String, Long)) => data._1).timeWindow(Time.hours(1)).trigger(new MyTrigger()).process(new MyUVCountWithBloomFilter())resultStream.print("resultStream")environment.execute("UniqueVisitorWithBloomFilter")// redis-cli -p 7000 -a myPassWd// 127.0.0.1:7000> keys 15*// 127.0.0.1:7000> hgetall countMap}
}// 自定义触发器,每来一条数据就触发一次窗口操作
class MyTrigger extends Trigger[(String, Long), TimeWindow] {// 数据来了之后触发计算并清空状态,不保存数据override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.FIRE_AND_PURGE// 仅根据数据进行计算,所以对于时间不做操作override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUE// 仅根据数据进行计算,所以对于时间不做操作override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = TriggerResult.CONTINUEoverride def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {}
}// 自定义 ProcessFunction,把当前数据进行处理,位图保存在 redis 中
class MyUVCountWithBloomFilter extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {var jedis: Jedis = _var bloom: Bloom = _override def open(parameters: Configuration): Unit = {//    jedis = new Jedis("test01", 6379)jedis = new Jedis("10.10.7.25", 7000)jedis.auth("myPassWd")// 假定有 1 亿用户 10^8 条数据,每个用户占 100 Byte => 10^10 Byte => 10GB// 压缩率 1000 => 每个用户占 1 bit => 10^8 bit => 10M => 扩充 10 倍 => 100M (128M)bloom = new Bloom(1 << 29) // 128M}// 每来一个数据,主要是要用布隆过滤器判断 redis 位图中对应位置是否为 1override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[UvCount]): Unit = {// bitmap 用当前窗口的 end 作为 key,保存到 redis 里, (windowEnd, bitmap)val storeKey: String = context.window.getEnd.toString// 把每个窗口的 uv count 值,作为状态也存入 redis 中,存成一张叫做 countMap 的表val countMap = "countMap"// 先获取当前的 Count 值var count = 0Lif (jedis.hget(countMap, storeKey) != null) {count = jedis.hget(countMap, storeKey).toLong}// 用 userId,计算 hash 值,判断数据是否在位图中val userId: String = elements.head._2.toStringval offset: Long = bloom.hash(userId, 61)// 定义一个标识位,判断 reids 位图中有没有这一位val isExist: lang.Boolean = jedis.getbit(storeKey, offset)// 如果不存在,那么就将对应位置置为 1,count + 1;如果存在,不做操作if (!isExist) {jedis.setbit(storeKey, offset, true)jedis.hset(countMap, storeKey, (count + 1).toString)//      out.collect(UvCount(storeKey.toLong, count + 1))} else {//      out.collect(UvCount(storeKey.toLong, count))}}
}// 自定义一个布隆过滤器
class Bloom(size: Long) extends Serializable {// 定义位图的总大小,应该时 2 的整次幂,默认 134217728 bit (16 M)private val cap: Long = if (size > 0) size else 1 << 27// 定义 hash 函数def hash(value: String, seed: Int): Long = {var result = 0Lfor (i <- 0 until value.length) {result = result * seed + value.charAt(i)}// 返回一个在 cap 范围内的一个值result & (cap - 1)}
}

4. 市场营销商业指标统计分析

4.1 APP 市场推广统计

AppMarketingByChannel

package com.mso.market_analysisimport java.sql.Timestamp
import java.util.UUID
import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorimport scala.util.Random// 输入数据样例类
case class MarketUserBehavior(userId: String, behavior: String, channel: String, timestamp: Long)// 输出结果样例类
case class MarketCount(windowStart: String, windowEnd: String, channel: String, behavior: String, count: Long)/*** 需求:* 统计一小时内 各个推广渠道下各个行为 的总量,每 5s 输出一次** 思路:* 日志解析成样例类后,按照渠道和行为类型分组,开滑动窗口,窗口大小为 1H,滑动步长为 5s,使用全窗口函数统计求和。** 注:* window function 定义了要对窗口中收集的数据做的计算操作,分为以下两类:* - 增量聚合函数 - incremental aggregation functions : 每条数据到来就进行计算,保持一个简单的状态*   - reduce(reduceFunction) - 输入输出中间状态的类型相同*   - aggregate(aggregateFunction) - 输入输出中间状态的类型不同*   - sum(), min(), max()* - 全窗口函数 - full window functions : 先把窗口所有数据收集起来,等到计算的时候会遍历所有的数据*   - apply(windowFunction)*   - process(processWindowFunction) - processWindowFunction 比 windowFunction 提供了更多的上下文信息*   - aggregate(preAggregator, windowFunction)*   - aggregate(preAggregator, ProcessWindowFunction)*   - reduce(preAggregator, windowFunction)*   - reduce(preAggregator, ProcessWindowFunction)**/
object AppMarketingByChannel {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val inputStream: DataStream[MarketUserBehavior] = environment.addSource(new SimulatedMarketEventSource()).assignAscendingTimestamps((_: MarketUserBehavior).timestamp)val resultStream: DataStream[MarketCount] = inputStream.filter((_: MarketUserBehavior).behavior != "UNISTALL") // 过滤卸载行为.keyBy((data: MarketUserBehavior) => (data.channel, data.behavior)) // 按照渠道和行为类型分组.timeWindow(Time.hours(1), Time.seconds(5)).process(new MarketCountByChannelPWF())resultStream.print("resultStream")environment.execute("AppMarketingByChannel")}
}class MarketCountByChannelPWF extends ProcessWindowFunction[MarketUserBehavior, MarketCount, (String, String), TimeWindow] {override def process(key: (String, String), context: Context, elements: Iterable[MarketUserBehavior], out: Collector[MarketCount]): Unit = {val windowStart: String = new Timestamp(context.window.getStart).toStringval windowEnd: String = new Timestamp(context.window.getEnd).toStringval chanel: String = key._1val behavior: String = key._2val count: Int = elements.sizeout.collect(MarketCount(windowStart, windowEnd, chanel, behavior, count))}
}// 自定义数据源
class SimulatedMarketEventSource() extends RichSourceFunction[MarketUserBehavior] {// 定义是否运行的标志位var running = true// 定义用户行为的集合val behaviorTypes: Seq[String] = Seq("CLICK", "DOWNLOAD", "INSTALL", "UNISTALL")// 定义渠道的集合val channelSets: Seq[String] = Seq("wechat", "weibo", "appstore", "huaweistore")// 定义一个随机数发生器val rand: Random = new Random()override def run(ctx: SourceFunction.SourceContext[MarketUserBehavior]): Unit = {// 定义一个生成数据的上限val maxElements: Long = Long.MaxValuevar count = 0L// 随机生成所有数据while (running && count < maxElements) {val id: String = UUID.randomUUID().toStringval behavior: String = behaviorTypes(rand.nextInt(behaviorTypes.size))val channel: String = channelSets(rand.nextInt(channelSets.size))val ts: Long = System.currentTimeMillis()ctx.collect(MarketUserBehavior(id, behavior, channel, ts))count += 1TimeUnit.MILLISECONDS.sleep(10L)}}override def cancel(): Unit = {running = false}
}

AppMarketing

package com.mso.market_analysisimport java.sql.Timestampimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector/*** 需求:* 统计一小时内 所有推广渠道所有行为 的总量,每 5s 输出一次** 思路:* 日志解析成样例类后,开滑动窗口,窗口大小为 1H,滑动步长为 5s,统计时间窗内的数据量。* 可以使用增量聚合和全量聚合等多种方法。也可以生成随机 key,防止 OOM**/
object AppMarketing {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val inputStream: DataStream[MarketUserBehavior] = environment.addSource(new SimulatedMarketEventSource).assignAscendingTimestamps((_: MarketUserBehavior).timestamp)val processStream: WindowedStream[String, String, TimeWindow] = inputStream.filter((_: MarketUserBehavior).behavior != "UNISTALL") // 过滤卸载行为.map((data: MarketUserBehavior) => "total").keyBy((data: String) => data).timeWindow(Time.hours(1), Time.seconds(5))// Option 1 : 全窗口函数批量执行聚合val resultStream1: DataStream[MarketCount] = processStream.process(new MarketCountPWF())// Option 2 : 增量聚合后开窗合并val resultStream2: DataStream[MarketCount] = processStream.aggregate(new MarketCountAgg, new MarketCountWF)// Option 3 : 自定义 Mapper 生成随机 key,每个分区聚合完成后,再按 window,getEnd 分组求和,定时器触发结果输出,防止 OOM// 略resultStream1.print("resultStream1")resultStream2.print("resultStream2")environment.execute("AppMarketing")}
}class MarketCountPWF extends ProcessWindowFunction[String, MarketCount, String, TimeWindow] {override def process(key: String, context: Context, elements: Iterable[String], out: Collector[MarketCount]): Unit = {val windowStart: String = new Timestamp(context.window.getStart).toStringval windowEnd: String = new Timestamp(context.window.getEnd).toStringval count: Int = elements.sizeout.collect(MarketCount(windowStart, windowEnd, "app marketing", "total", count))}
}class MarketCountAgg extends AggregateFunction[String, Long, Long] {override def createAccumulator(): Long = 0Loverride def add(value: String, accumulator: Long): Long = accumulator + 1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a + b
}class MarketCountWF extends WindowFunction[Long, MarketCount, String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[MarketCount]): Unit = {val windowStart: String = new Timestamp(window.getStart).toStringval windowEnd: String = new Timestamp(window.getEnd).toStringval count: Long = input.headout.collect(MarketCount(windowStart, windowEnd, "app marketing", "total", count))}
}

4.2 页面广告分析

页面广告点击量统计 & 黑名单过滤

package com.mso.market_analysisimport java.net.URL
import java.sql.Timestampimport org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector// 输入的广告点击事件样例类
case class AdClickEvent(userId: Long, adId: Long, province: String, city: String, timestamp: Long)// 按照省份统计的输出结果样例类
case class AdCountByProvince(province: String, windowEnd: String, count: Long)// 输出的黑名单报警信息
case class BlackListWarning(userId: Long, adId: Long, msg: String)/*** 需求:* 统计每个小时各个省份的广告点击量,每 5s 输出一次结果。* 判断用户对广告的点击次数,过滤恶意点击行为。** 思路:* 使用 userId、adId 分组,自定义 KeyedProcessFunction 过滤掉恶意点击行为,并发送到侧输出流。* 需要定义的状态有 countState(点击次数) 和 isSentState(是否标记为黑名单,且仅标记并输出一次)**/
object AdAnalysisByProvince {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)val resource: URL = getClass.getResource("/AdClickLog.csv")val inputStream: DataStream[AdClickEvent] = environment.readTextFile(resource.getPath).map((data: String) => {val strings: Array[String] = data.split(",")AdClickEvent(strings(0).toLong, strings(1).toLong, strings(2), strings(3), strings(4).toLong)}).assignAscendingTimestamps((_: AdClickEvent).timestamp * 1000L)val dataStream: DataStream[AdClickEvent] = inputStream.keyBy((data: AdClickEvent) => (data.userId, data.adId)).process(new FilterBlackListUserKPF(50))val resultStream: DataStream[AdCountByProvince] = dataStream.keyBy((data: AdClickEvent) => data.province).timeWindow(Time.hours(1), Time.seconds(5)).aggregate(new AdCountByProvinceAgg, new AdCountByProvinceWF)resultStream.print("resultStream")dataStream.getSideOutput(new OutputTag[BlackListWarning]("blackList")).print("blackList")environment.execute("AdAnalysisByProvince")}
}class AdCountByProvinceAgg extends AggregateFunction[AdClickEvent, Long, Long] {override def createAccumulator(): Long = 0Loverride def add(value: AdClickEvent, accumulator: Long): Long = accumulator + 1override def getResult(accumulator: Long): Long = accumulatoroverride def merge(a: Long, b: Long): Long = a + b
}class AdCountByProvinceWF extends WindowFunction[Long, AdCountByProvince, String, TimeWindow] {override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[AdCountByProvince]): Unit = {out.collect(AdCountByProvince(key, new Timestamp(window.getEnd).toString, input.head))}
}class FilterBlackListUserKPF(maxCount: Int) extends KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent] {// 定义状态,保存当前用户对当前广告的点击量lazy private val countState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("countState", classOf[Long]))// 保存是否发送过黑名单的状态lazy private val isSentState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isSentState", classOf[Boolean]))//  // 保存定时器触发的时间戳//  lazy val resetTimerState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("resetTimerState", classOf[Long]))override def processElement(value: AdClickEvent, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#Context, out: Collector[AdClickEvent]): Unit = {// 取出count状态val curCount: Long = countState.value()// 如果是第一次处理,注册定时器,每天00:00触发if (curCount == 0) {val ts: Long = (ctx.timerService().currentProcessingTime() / (1000 * 60 * 60 * 24) + 1) * (1000 * 60 * 60 * 24)// 此处使用 ProcessingTime 注册定时器,也可用 EventTime。// 当凌晨数据稀疏,没有数据产生时,watermark 没有更新到第二天,那么就不会触发定时器。当然两种时间都是可以的ctx.timerService().registerProcessingTimeTimer(ts)}// 判断计数是否达到上限,如果到达则加入黑名单if (curCount >= maxCount) {// 判断是否发送过黑名单,只发送一次if (!isSentState.value()) {isSentState.update(true)// 输出到侧输出流ctx.output(new OutputTag[BlackListWarning]("blackList"), BlackListWarning(value.userId, value.adId, "Click over " + maxCount + " times today."))}return}// 计数状态加1,输出数据到主流countState.update(curCount + 1)out.collect(value)}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[(Long, Long), AdClickEvent, AdClickEvent]#OnTimerContext, out: Collector[AdClickEvent]): Unit = {countState.clear()isSentState.clear()}
}

5. 恶意登陆监控

5.1 Demo

package com.mso.loginfail_detectimport java.net.URL
import java.time.Duration
import java.{lang, util}import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorimport scala.collection.mutable.ListBuffer// 输入的登陆事件样例类
case class LoginEvent(userId: Long, ip: String, eventType: String, loginEventTime: Long)// 输出的异常报警信息样例类
case class LoginFailWarning(userId: Long, firstFailTime: Long, lastFailTime: Long, warningMsg: String)/*** 需求:* 若用户在 2s 内连续登陆失败次数大于 2,那么输出告警** 思路:* 按用户分组,使用 KeyProcessFunction,定义登陆失败状态,进行统计并告警* Option 1 : 使用定时器,2s 内若登陆失败次数大于 2,则输出告警。定义两个状态,loginFailListState、timerTsState。* Option 2 : 定义状态 loginFailListState, 根据状态内的数据量和时间来判断登陆失败次数和登陆失败间距,并输出告警** 注:* 相比于 Option 1, Option 2 可以在未到达 2s 内就输出告警。* 但是这两种方法无法处理乱序数据,当数据乱序时,会影响告警结果。* Flink 为我们提供了更方便的处理方法:CEP(复杂事件处理 - Complex Event Processing)* CEP 允许在流式数据中检测事件模式,让我们有机会掌握数据中重要的部分*/
object LoginFail {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)val resource: URL = getClass.getResource("/LoginLog.csv")val inputStream: DataStream[LoginEvent] = environment.readTextFile(resource.getPath).map((data: String) => {val strings: Array[String] = data.split(",")LoginEvent(strings(0).toLong, strings(1), strings(2), strings(3).toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[LoginEvent] {override def extractTimestamp(element: LoginEvent, recordTimestamp: Long): Long = element.loginEventTime * 1000L}))val resultStream: DataStream[LoginFailWarning] = inputStream.keyBy((data: LoginEvent) => data.userId)//      .process(new LoginFailKPF(2)).process(new LoginFailKPF2(2))resultStream.print("resultStream")environment.execute("LoginFail")}
}// Option 1 : 定义两个状态,一个用于保存登陆失败的时间,一个用于保存定时器状态(用于删除定时器)
// 注:这种方法仅能等到 2s 内才会触发定时器告警
class LoginFailKPF(maxFailTimes: Int) extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning] {// 保存所有登陆失败的时间,用于获取登陆次数var loginFailListState: ListState[Long] = _// 保存定时器状态,用于清除定时器var timerTsState: ValueState[Long] = _override def open(parameters: Configuration): Unit = {loginFailListState = getRuntimeContext.getListState(new ListStateDescriptor[Long]("loginFailState", classOf[Long]))timerTsState = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTsState", classOf[Long]))}override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {val loginFailList: lang.Iterable[Long] = loginFailListState.get()// 判断类型是否是fail,只添加fail的事件到状态if (value.eventType == "fail") {loginFailListState.add(value.loginEventTime)if (!loginFailList.iterator().hasNext) {ctx.timerService().registerEventTimeTimer(value.loginEventTime * 1000L + 2000L)}if (timerTsState.value() == 0) {ctx.timerService().registerEventTimeTimer(value.loginEventTime * 1000L + 2000L)}} else {// 如果是成功,清空状态ctx.timerService().deleteEventTimeTimer(timerTsState.value())loginFailListState.clear()timerTsState.clear()}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#OnTimerContext, out: Collector[LoginFailWarning]): Unit = {// 触发定时器的时候,根据状态里的失败个数决定是否输出报警val allLoginFails: ListBuffer[Long] = new ListBuffer[Long]()val iter: util.Iterator[Long] = loginFailListState.get().iterator()while (iter.hasNext) {allLoginFails += iter.next()}// 判断个数if (allLoginFails.length >= maxFailTimes) {out.collect(LoginFailWarning(ctx.getCurrentKey, allLoginFails.head, allLoginFails.last, "login fail in 2 seconds for " + allLoginFails.length + " times."))}// 清空状态loginFailListState.clear()timerTsState.clear()}
}// Option 2 : 定义一个状态,保存登陆失败的时间,可通过状态的数量大小判断失败次数,可通过状态内的时间间距判断是否在2s
class LoginFailKPF2(maxFailTimes: Int) extends KeyedProcessFunction[Long, LoginEvent, LoginFailWarning] {// 保存所有登陆失败的时间,用于获取登陆次数 和 几次登陆失败的时间间距var loginFailListState: ListState[Long] = _override def open(parameters: Configuration): Unit = {loginFailListState = getRuntimeContext.getListState(new ListStateDescriptor[Long]("loginFailState", classOf[Long]))}override def processElement(value: LoginEvent, ctx: KeyedProcessFunction[Long, LoginEvent, LoginFailWarning]#Context, out: Collector[LoginFailWarning]): Unit = {if (value.eventType == "fail") {// 如果是失败,判断之前是否有登录失败事件val iter: util.Iterator[Long] = loginFailListState.get().iterator()if (iter.hasNext) {// 如果已经有登录失败事件,就比较事件时间val firstFail: Long = iter.next()if (value.loginEventTime < firstFail + 2) {// 如果两次间隔小于2秒,输出报警out.collect(LoginFailWarning(value.userId, firstFail, value.loginEventTime, "login fail in 2 seconds."))}// 更新最近一次的登录失败事件,保存在状态里loginFailListState.clear()loginFailListState.add(value.loginEventTime)} else {// 如果是第一次登录失败,直接添加到状态loginFailListState.add(value.loginEventTime)}} else {// 如果是成功,清空状态loginFailListState.clear()}}
}

5.2 CEP

CEP 的特点

  • 目标:从有序的简单事件流中发现一些高阶tezheng
  • 输入:一个或多个由简单事件构成的事件流
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件
  • 输出:满足规则的复杂事件

Pom

        <!--  ======== Flink Libraries ========  --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-cep --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency>

Demo

package com.mso.loginfail_detectimport java.time.Duration
import java.utilimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Timeobject LoginFailWithCep {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)val parameterTool: ParameterTool = ParameterTool.fromArgs(args)val inputStream: DataStream[LoginEvent] = environment.readTextFile(parameterTool.get("inputPath"))//    val resource: URL = getClass.getResource("/LoginLog.csv")//    val inputStream: DataStream[LoginEvent] = environment//      .readTextFile(resource.getPath).map((data: String) => {val strings: Array[String] = data.split(",")LoginEvent(strings(0).toLong, strings(1), strings(2), strings(3).toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[LoginEvent] {override def extractTimestamp(element: LoginEvent, recordTimestamp: Long): Long = element.loginEventTime * 1000L}))// 定义匹配模式val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern.begin[LoginEvent]("firstFail").where((_: LoginEvent).eventType == "fail") // 第一次登陆失败.next("nextFail").where((_: LoginEvent).eventType == "fail") // 第二次登陆失败.within(Time.seconds(2)) // 在 2s 内检测// 在事件流上应用模式,得到一个 PatternStreamval patternStream: PatternStream[LoginEvent] = CEP.pattern(inputStream.keyBy((data: LoginEvent) => data.userId), loginFailPattern)// 从 PatternStream 上应用 select function,检出匹配事件序列val loginFailDataStream: DataStream[LoginFailWarning] = patternStream.select(new LoginFailMatchPSF())loginFailDataStream.print("loginFailDataStream")environment.execute("LoginFailWithCep")}
}class LoginFailMatchPSF extends PatternSelectFunction[LoginEvent, LoginFailWarning] {/*** 从 pattern 中按照名称取出对应的事件* Map(String, List[Event]) :* 其中 String 为 Pattern 中定义的 事件名称("firstFail", "nextFail")* List[Event] 为各个 key 匹配到的数据集。由于此处仅匹配一次,所以数据集中只有一条数据** @param pattern 在事件流上匹配搭配的数据集* @return*/override def select(pattern: util.Map[String, util.List[LoginEvent]]): LoginFailWarning = {val firstFail: LoginEvent = pattern.get("firstFail").iterator().next()val lastFail: LoginEvent = pattern.get("nextFail").get(0)LoginFailWarning(firstFail.userId, firstFail.loginEventTime, lastFail.loginEventTime, "login fail!")}
}

6. 订单支付实时监控

6.1 付款超时 - Cep Code

package com.mso.orderpay_detectimport java.net.URL
import java.time.Duration
import java.utilimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time// 定义输入订单事件的样例类
case class OrderEvent(orderId: Long, eventType: String, txId: String, orderEventTime: Long)// 定义输出结果样例类
case class OrderResult(orderId: Long, resultMsg: String)/*** 需求:* 检测订单支付状态,若创建订单后 15min 内支付则支付成功,否则支付失败*/
object OrderTimeout {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)// 1. 读取订单数据val resource: URL = getClass.getResource("/OrderLog.csv")val inputStream: DataStream[OrderEvent] = environment.readTextFile(resource.getPath).map((data: String) => {val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[OrderEvent] {override def extractTimestamp(element: OrderEvent, recordTimestamp: Long): Long = element.orderEventTime * 1000L}))// 2. 定义一个匹配模式val orderPayPattern: Pattern[OrderEvent, OrderEvent] = Pattern.begin[OrderEvent]("create").where((_: OrderEvent).eventType == "create").followedBy("pay").where((_: OrderEvent).eventType == "pay").within(Time.minutes(15))// 3. 把模式应用到 stream 上,得到一个 PatternStreamval patternStream: PatternStream[OrderEvent] = CEP.pattern(inputStream.keyBy((_: OrderEvent).orderId), orderPayPattern)val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeoutOutputTag")// 4. 调用 select 方法,提取事件序列,超时的事件要做报警提示val resultStream: DataStream[OrderResult] = patternStream.select(orderTimeoutOutputTag, new OrderTimeoutFunctionPTF, new OrderPayedSelectPSF)resultStream.print("PayedSuccessfully")resultStream.getSideOutput(orderTimeoutOutputTag).print("OrderTimeout")environment.execute("OrderTimeout")}
}class OrderTimeoutFunctionPTF extends PatternTimeoutFunction[OrderEvent, OrderResult] {override def timeout(pattern: util.Map[String, util.List[OrderEvent]], timeoutTimestamp: Long): OrderResult = {OrderResult(pattern.get("create").iterator().next().orderId, "OrderTimeout. Time : " + timeoutTimestamp)}
}class OrderPayedSelectPSF extends PatternSelectFunction[OrderEvent, OrderResult] {override def select(pattern: util.Map[String, util.List[OrderEvent]]): OrderResult = {OrderResult(pattern.get("pay").get(0).orderId, "PayedSuccessfully")}
}

6.2 付款超时 - Without Cep

package com.mso.orderpay_detectimport java.net.URL
import java.time.Durationimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector/*** 需求:* 检测订单支付状态,若创建订单后 15min 内支付则支付成功,否则支付失败*/
object OrderTimeoutWithoutCep {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)// 1、读取订单数据val resource: URL = getClass.getResource("/OrderLog.csv")val inputStream: DataStream[OrderEvent] = environment.readTextFile(resource.getPath).map((data: String) => {val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[OrderEvent] {override def extractTimestamp(element: OrderEvent, recordTimestamp: Long): Long = element.orderEventTime * 1000L}))val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeoutOutputTag")val resultStream: DataStream[OrderResult] = inputStream.keyBy((_: OrderEvent).orderId).process(new OrderPayMatchDetectKPF())resultStream.print("payedSuccessfully")resultStream.getSideOutput(orderTimeoutOutputTag).print("OrderTimeout")environment.execute("OrderTimeoutWithoutCep")}
}class OrderPayMatchDetectKPF extends KeyedProcessFunction[Long, OrderEvent, OrderResult] {lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isPayedState", classOf[Boolean]))lazy val isCreatedState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isCreatedState", classOf[Boolean]))lazy val timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTsState", classOf[Long]))lazy val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeoutOutputTag")override def processElement(value: OrderEvent, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, out: Collector[OrderResult]): Unit = {var isPayed: Boolean = isPayedState.value()var isCreated: Boolean = isCreatedState.value()var timerTs: Long = timerTsState.value()// Case 1 : eventType == "create"if (value.eventType == "create") {// Case 1.1 pay came first.  out.collect And clear stateif (isPayed) {out.collect(OrderResult(value.orderId, "PayedSuccessfully."))isPayedState.clear()timerTsState.clear()ctx.timerService().deleteEventTimeTimer(timerTs)}// Case 1.2 pay didn't come. register 15min EventTimeTimer And update state, then waitelse {val ts: Long = value.orderEventTime * 1000L + 15 * 60 * 1000Lctx.timerService().registerEventTimeTimer(ts)timerTsState.update(ts)isCreatedState.update(true)}}// Case 2 : eventType == "pay"else if (value.eventType == "pay") {// Case 2.1 create came firstif (isCreated) {// Case 2.1.1 payed didn't timeoutif (value.orderEventTime * 1000L < timerTs) {out.collect(OrderResult(value.orderId, "PayedSuccessfully."))}// Case 2.1.2 payed timeoutelse {ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "OrderTimeout. Payed came but timeout 15min."))}isCreatedState.clear()timerTsState.clear()ctx.timerService().deleteEventTimeTimer(timerTs)}// Case 2.2 create didn't first. register current time EventTimeTimer And update state, then waitelse {val ts: Long = value.orderEventTime * 1000Lctx.timerService().registerEventTimeTimer(ts)timerTsState.update(ts)isPayedState.update(true)}}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {if (isPayedState.value()) {ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "OrderTimeout. Already payed but not found create log."))} else {ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "OrderTimeout. Still unpaid after 15 minutes."))}isPayedState.clear()isCreatedState.clear()timerTsState.clear()}}

6.3 实时对账 - connect

Code

package com.mso.orderpay_detectimport java.net.URL
import java.time.Durationimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector// 定义接收流事件的样例类
case class ReceiptEvent(txId: String, payChannel: String, receiptEventTime: Long)/*** 需求:* 实时对账** 思路:* 使用 connect 连接两条流,自定义 CoProcessFunction 对 ConnectedStreams 中的数据分别处理。* 对 ConnectedStreams 中的两种流分别注册状态。* 若当前数据查询到另一条流的状态不为空时,那么匹配成功。若为空,那么自己更新状态并等待另一条流中的数据。* 若 watermark 达到定时器时间时,触发定时器,将未匹配的数据输出到不同的侧输出流*/
object OrderPayTxMatch {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)// 读取订单事件流val orderLogResource: URL = getClass.getResource("/OrderLog.csv")val orderEventStream: KeyedStream[OrderEvent, String] = environment.readTextFile(orderLogResource.getPath).map((data: String) => {val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[OrderEvent] {override def extractTimestamp(element: OrderEvent, recordTimestamp: Long): Long = element.orderEventTime * 1000L})).filter((_: OrderEvent).txId != "").keyBy((_: OrderEvent).txId)// 读取支付到账事件流val receiptLogResource: URL = getClass.getResource("/ReceiptLog.csv")val receiptEventStream: KeyedStream[ReceiptEvent, String] = environment.readTextFile(receiptLogResource.getPath).map((data: String) => {val dataArray: Array[String] = data.split(",")ReceiptEvent(dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[ReceiptEvent] {override def extractTimestamp(element: ReceiptEvent, recordTimestamp: Long): Long = element.receiptEventTime * 1000L})).keyBy((_: ReceiptEvent).txId)// 定义测输出流val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")// 将两条流连接起来,共同处理val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream.connect(receiptEventStream).process(new OrderPayTxMatchCPF())resultStream.print("matched")resultStream.getSideOutput(unmatchedPays).print("unmatchedPays")resultStream.getSideOutput(unmatchedReceipts).print("unmatchedReceipts")environment.execute("OrderPayTxMatch")}
}class OrderPayTxMatchCPF extends CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {// 定义状态来保存已经到达的订单支付事件和到账事件lazy val payState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("payState", classOf[OrderEvent]))lazy val receiptState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("receiptState", classOf[ReceiptEvent]))// 定义测输出流val unmatchedPays = new OutputTag[OrderEvent]("unmatchedPays")val unmatchedReceipts = new OutputTag[ReceiptEvent]("unmatchedReceipts")// 订单支付事件数据的处理override def processElement1(value: OrderEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 判断有没有对应的到账事件val receipt: ReceiptEvent = receiptState.value()if (receipt != null) {// 如果已经有 receipt,在主流输出匹配信息out.collect((value, receipt))receiptState.clear()} else {// 如果还没到,那么把 pay 存入状态,并且注册一个定时器等待payState.update(value)ctx.timerService().registerEventTimeTimer(value.orderEventTime * 1000L + 5000L)}}// 到账事件的处理override def processElement2(value: ReceiptEvent, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 同样的处理流程val pay: OrderEvent = payState.value()if (pay != null) {out.collect((pay, value))payState.clear()} else {receiptState.update(value)ctx.timerService().registerEventTimeTimer(value.receiptEventTime * 1000L + 5000L)}}override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {// 到时间了,如果还没有收到某个事件,那么输出报警信息if (payState.value() != null) {// Case 1 : receipt 没来,输出 pay 到侧输出流ctx.output(unmatchedPays, payState.value())}if (receiptState.value() != null) {// Case 2 : pay 没来,输出 receipt 到侧输出流ctx.output(unmatchedReceipts, receiptState.value())}// Case 3 : 若都为空,那么说明匹配成功,且数据已经输出到 out.collect((value, receipt))// clear statepayState.clear()receiptState.clear()}
}

6.4 实时对账 - intervalJoin

Code

package com.mso.orderpay_detectimport java.net.URL
import java.time.Durationimport org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector/*** 需求:* 实时对账** 思路:* 使用 intervalJoin 进行统计。局限性是仅能输出匹配到的数据。*/object OrderPayTxMatchByJoin {def main(args: Array[String]): Unit = {val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)environment.setParallelism(1)// 读取订单事件流val orderLogResource: URL = getClass.getResource("/OrderLog.csv")val orderEventStream: KeyedStream[OrderEvent, String] = environment.readTextFile(orderLogResource.getPath).map((data: String) => {val dataArray: Array[String] = data.split(",")OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[OrderEvent] {override def extractTimestamp(element: OrderEvent, recordTimestamp: Long): Long = element.orderEventTime * 1000L})).filter((_: OrderEvent).txId != "").keyBy((_: OrderEvent).txId)// 读取支付到账事件流val receiptLogResource: URL = getClass.getResource("/ReceiptLog.csv")val receiptEventStream: KeyedStream[ReceiptEvent, String] = environment.readTextFile(receiptLogResource.getPath).map((data: String) => {val dataArray: Array[String] = data.split(",")ReceiptEvent(dataArray(0).trim, dataArray(1).trim, dataArray(2).toLong)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner[ReceiptEvent] {override def extractTimestamp(element: ReceiptEvent, recordTimestamp: Long): Long = element.receiptEventTime * 1000L})).keyBy((_: ReceiptEvent).txId)// 将两条流连接起来,共同处理val resultStream: DataStream[(OrderEvent, ReceiptEvent)] = orderEventStream.intervalJoin(receiptEventStream).between(Time.seconds(-5), Time.seconds(5)).process(new OrderPayTxMatchByJoinPJF())resultStream.print("matched")environment.execute("OrderPayTxMatchByJoin")}
}class OrderPayTxMatchByJoinPJF extends ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)] {override def processElement(left: OrderEvent, right: ReceiptEvent, ctx: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {out.collect(left, right)}
}

7. 电商常见指标汇总

附.Q&A

  1. 线上常用的提交 Flink 任务的方式
每次提交都会创建一个新的 Flink 集群,为每个 job 提供一个 yarn-session,任务之间互相独立,互不影响,方便管理。
任务执行完成之后创建的集群也会消失。(作业模式)# shell demo
$ bin/yarn-session.sh -n 7 -s 8 -jm 3072 -tm 32768 -qu root.*.*-nm *-* -d其中申请 7 个 taskManager,每个 8 核,每个 taskmanager 有 32768M 内存。集群默认只有一个 Job Manager。但为了防止单点故障,需要配置了高可用。结合 ZooKeeper 的使用,来达到高可用。
  1. 怎么做压力测试和监控
一,产生数据流的速度如果过快,而下游的算子消费不过来的话,会产生背压问题。背压的监控可以使用 Flink Web UI(localhost:8081) 来可视化监控,一旦报警就能知道。一般情况下背压问题的产生可能是由于 sink 这个操作符没有优化好,做一下优化就可以了。比如如果是写入ElasticSearch,那么可以改成批量写入,可以调大 ElasticSearch 队列的大小等等策略。二,设置水位线的最大延迟时间这个参数,如果设置的过大,可能会造成内存的压力。可以设置的最大延迟时间小一些,然后把迟到元素发送到侧输出流中去。晚一点更新结果。或者使用类似于 RocksDB 这样的状态后端,RocksDB 会开辟堆外存储空间,但 IO 速度会变慢,需要权衡。三,还有就是滑动窗口的长度如果过长,而滑动距离很短的话,Flink的性能会下降的很厉害。
参见链接: https://www.infoq.cn/article/sIhs_qY6HCpMQNblTI9M四,状态后端使用 RocksDB,还没有碰到被撑爆的问题。五,尽量使用滚动窗口,这样会大大减轻存储的压力。六,如果想要达到极限的低延迟,可以考虑使用处理时间(Processing Time)。七,业务逻辑的编写是最重要的。在算子中(例如 processElement, onTimer )业务逻辑一定要尽可能的简单,而不是特别复杂的业务逻辑(举个极端的例子,机器学习这种 CPU 密集型的程序,十几张表的 Join)。如果业务逻辑很复杂的话,将会阻塞数据流的向下传递。性能会急剧下降。
  1. 为什么使用 Flink 替代 Spark
一,Flink是真正的流处理,延迟在毫秒级,Spark Streaming是微批,延迟在秒级。二,Flink可以处理事件时间,而Spark Streaming只能处理机器时间,无法保证时间语义的正确性。三,Flink的检查点算法比Spark Streaming更加灵活,性能更高。Spark Streaming的检查点算法是在每个stage结束以后,才会保存检查点。四,Flink易于实现端到端一致性。
  1. Flink 的 CheckPoint 存在哪里
状态后端。内存,文件系统,或者 RocksDB。
  1. 如果下级存储不支持事务,Flink 怎么保证 exactly-once
幂等性写入(Redis, ElasticSearch)
  1. 说一下 Flink 状态机制
流处理程序内部的一致性,端到端一致性。
  1. 怎么去重。考虑一个实时场景:双十一场景,滑动窗口长度为 1 小时,滑动距离为 10 秒钟,亿级用户,计算 UV
使用类似于 scala 的 set 数据结构或者 redis 的 set 数据结构显然是不行的,因为可能有上亿个 Key,内存放不下。
所以可以考虑使用布隆过滤器(Bloom Filter)来去重。
大数据一旦需要去重,就是需要考虑布隆过滤器、位图。
  1. Flink 的 checkpoint 机制对比 spark 有什么不同和优势
park streaming 的 Checkpoint 仅仅是针对 driver 的故障恢复做了数据和元数据的 Checkpoint。
而 flink 的 checkpoint 机制要复杂了很多,它采用的是轻量级的分布式快照,实现了每个操作符的快照,及循环流的在循环的数据的快照。参见教材内容和链接:
https://cloud.tencent.com/developer/article/1189624
  1. Flink 的 Watermark 详细说明
  1. Flink ExactlyOnce 语义是如何实现的,状态是如何存储的
一,Flink应用程序内部的exactly-once二,端到端一致性
  1. Flink CEP 编程中当状态没有到达的时候会将数据保存在哪里
CEP 当然在流式处理中是要支持 EventTime 的,那么相对应的要支持数据的晚到现象,也就是 watermark 的处理逻辑。
在 Flink 的处理逻辑中,将晚到数据明细存储在了 Map<Long, List> 的结构中,也就是说,如果 watermark 设置为当前时间减去 5 分钟,那么内存中就会存储5分钟的数据,
这也是对内存的极大损伤之一。
  1. Flink 三种时间概念分别说出应用场景
Event Time
Processing Time:没有事件时间的情况下,或者对实时性要求超高的情况下。
Ingestion Time:存在多个Source Operator的情况下,每个Source Operator会使用自己本地系统时钟指派Ingestion Time。后续基于时间相关的各种操作,都会使用数据记录中的Ingestion Time。
  1. Flink 程序在面对数据高峰期时如何处理
1,使用大容量的Kafka把数据先放到消息队列里面。再使用Flink进行消费,不过这样会影响到一点实时性。2,使用滚动窗口3,使用处理时间4,下游使用消费速度快的外围设备(如Kafka)
  1. flink 消费 kakfa 保证数据不丢失(flink消费kafka数据不丢不重,flink消费kafka的时候挂了怎么恢复数据)
端到端一致性(exactly-once),flink 会维护消费 kafka 的偏移量,checkpoint 操作。
  1. Flink 过来的数据量太大怎么处理
加机器,考虑使用处理时间(ProcessingTime),前面使用Kafka来做蓄水池,降低消费数据的速度。
尽量使用滚动窗口,窗口没有重合,数据不会复制到不同的窗口中去。
  1. Flink的资源是如何设置的,设置资源的时候依据是什么
提前 mock 比较大的数据量,做一下压力测试,然后决定使用多少资源。以下 6 个方面是确定 Flink 集群大小时最先要考虑的一些因素:1. 记录数和每条记录的大小
确定集群大小的首要事情就是估算预期进入流计算系统的每秒记录数(也就是我们常说的吞吐量),以及每条记录的大小。不同的记录类型会有不同的大小,这将最终影响 Flink 应用程序平稳运行所需的资源。2. 不同 key 的数量和每个 key 存储的 state 大小
应用程序中不同 key 的数量和每个 key 所需要存储的 state 大小,都将影响到 Flink 应用程序所需的资源,从而能高效地运行,避免任何反压。3. 状态的更新频率和状态后端的访问模式
第三个考虑因素是状态的更新频率,因为状态的更新通常是一个高消耗的动作。而不同的状态后端(如 RocksDB,Java Heap)的访问模式差异很大,RocksDB 的每次读取和更新都会涉及序列化和反序列化以及 JNI 操作,而 Java Heap 的状态后端不支持增量 checkpoint,导致大状态场景需要每次持久化的数据量较大。这些因素都会显著地影响集群的大小和 Flink 作业所需的资源。4. 网络容量
网络容量不仅仅会受到 Flink 应用程序本身的影响,也会受到可能正在交互的 Kafka、HDFS 等外部服务的影响。这些外部服务可能会导致额外的网络流量。例如,启用 replication 可能会在网络的消息 broker 之间产生额外的流量。5. 磁盘带宽
如果你的应用程序依赖了基于磁盘的状态后端,如 RocksDB,或者考虑使用 Kafka 或 HDFS,那么磁盘的带宽也需要纳入考虑。6. 机器数量及其可用 CPU 和内存
最后但并非最不重要的,在开始应用部署前,你需要考虑集群中可用机器的数量及其可用的 CPU 和内存。这最终确保了在将应用程序投入生产之后,集群有充足的处理能力。
  1. flink 用jdbc并发的写 mysql 的性能很差,怎么处理
一般不直接写入 mysql,一般先写入消息队列(redis,kafka,rabbitmq,...),用消息队列来保护 mysql。
查看全文
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

相关文章

  1. 计算机原理--进制运算的基础

    进制运算的基础进制概述二进制运算的基础 进制概述进制的定义进位制是一种计数方式,亦称进位计数法或位值计数法 有限种数字符号来表示无限的数值 使用的数字符号的数目称为这种进位制的基数或底数常见的进制二十进制玛雅文明的玛雅数学 因努伊特的因努伊特数字六十进制时间、…...

    2024/4/23 7:14:41
  2. Python基础之一:Python3 简介

    Python基础之一:Python3 简介 一、简介 1.定义 Python是一种跨平台的计算机程序设计语言。 是一个高层次的结合了解释性、编译性、互动性和面向对象的脚本语言。最初被设计用于编写自动化脚本(shell),随着版本的不断更新和语言新功能的添加,越多被用于独立的、大型项目的开发…...

    2024/4/11 21:08:31
  3. 【redis】redis消息队列与订阅模式

    个人学习笔记分享,当前能力有限,请勿贬低,菜鸟互学,大佬绕道 如有勘误,欢迎指出和讨论,本文后期也会进行修正和补充前言 设想一个场景: 你和你女朋友(假设有)打算出门,你问你女朋友打扮好了没,她说还没。 于是过了五分钟,你再去问,她说还没。 再过五分钟,你又问,…...

    2024/4/11 21:08:29
  4. LostDungeon迷失地牢-壹

    背景 作为一个地牢RPG游戏狂热爱好者,我闲暇时间总是会搜罗一些地牢之类的游戏。 但是! 这个题材里,优秀的作品太少了。我接触过良心作品的有《贪婪洞窟》,《不思议地下城》,还有一个用Java写的开源地牢游戏《像素地牢》(这个游戏最吸引人的地方就是丰富的mod生态,只要你…...

    2024/4/24 22:34:12
  5. 【转】什么是公网安备

    网站备案是根据国家法律法规需要网站的所有者向国家有关部门申请的备案,公安局备案是其中一种。公安局备案一般按照各地公安机关指定的地点和方式进行。网站备案的目的就是为了防止在网上从事非法的网站经营活动,打击不良互联网信息的传播,如果网站不备案的话,很有可能被查…...

    2024/4/15 14:59:34
  6. 第八课

    函数一 1. 函数简介 • 函数也是一个对象 • 函数用来保存一些可执行的代码,并且可以在需要时,对这些语句进行多次调用1、语法 2、 def 函数名([形参1,形参2,形参3…]): 3、   代码块注意: 函数名必须符合标识符的规范(可以包含字母、数字、下划线但是不能以数字开头…...

    2024/4/17 19:00:11
  7. Jenkins勾建新项目模块

    1、在对应linux服务器上创建项目模块文件夹:mkdir -p 文件夹名2、Jenkins操作页面构建新的服务模块流程:输入服务模块名称也可以根据已有配置修改创建修改服务模块配置:#!/bin/bash #copy file and restart tomcatsshpass -p zjrc@161 scp -P 22 /datafile/jenkins/.jenki…...

    2024/4/27 15:01:05
  8. 温州银行科技岗笔试题

    第一场笔试是那种通用金融题、英语题、还有找规律的题、以及考你对温州银行的了解程度第二场科技岗的笔试题有些题不记得具体的内容了,就考点有点印象。1.给了3个SQL语句,问哪个最慢2.分布式事务处理的特性 事务四大特性(简称ACID) ①原子性(Atomicity):事务中的全部操作在数…...

    2024/4/15 7:58:53
  9. Oracle查询表创建时间

    SELECT * FROM USER_TABLES 查看当前用户下的表; SELECT * FROM DBA_TABLES 查看数据库中所有的表;SELECT CREATED,LAST_DDL_TIME FROM USER_OBJECTSWHERE OBJECT_NAME=(表名);CREATED 为创建时间 ,LAST_DDL_TIME为最后修改时间...

    2024/4/21 2:33:18
  10. Kettle构建Hadoop ETL实践(二):安装与配置

    目录一、安装1. 安装环境(1)选择操作系统(2)安装规划2. 安装前准备(1)安装Java环境(2)安装GNOME Desktop图形界面(3)安装配置VNC远程控制(4)在客户端安装vncviewer3. 安装运行Kettle(1)下载和解压(2)运行Kettle程序(3)创建Spoon快捷启动方式二、配置1. 配置文…...

    2024/4/26 8:11:47
  11. 从《三体》到“中美科技战”,3分钟理解“网络”D丝为什么要迎娶“算力”白富美

    从三体到中美科技战,理解网络与算力深度融合助力高维度竞争1:对抗封锁,需要云天明、面壁人和执剑人早在奥巴马时代,白宫就对《三体》表现出了极大的兴趣:“奥巴马读了《三体1》和《三体2》,然后就痴迷于刘慈欣的作品,一心等着《三体3》的出版。《三体3》的中文版出了之后…...

    2024/4/20 20:08:07
  12. 多维数据的存储和写入(基于numpy库)

    多维数据的存储和写入(基于numpy库) 写入CSV文件(多维数据) a.tofile(frame,sep=’’,format=’%s’) frame:文件、字符串 sep:数据分隔字符串,如果是空串,写入文件是二进制。 format:写入文件的格式。 读出CSV文件(多维数据) np.fromfile(frame,dtype=float,count=-…...

    2024/4/26 21:10:04
  13. 数据结构 树的遍历算法(递归与非递归的实现)

    采用Swift 5 递归的本质就是采用的内存栈 非递归方式,就采用栈的方式(后序的非递归较为复杂,需要记录上次输出节点与当前遍历到的节点进行判断,如果是左子树回退,去遍历当前的节点的右子树,如果是右子树回退栈就直接输出节点) 节点定义 class TreeNode: Equatable {var …...

    2024/4/10 11:28:32
  14. 游戏行业被 DDoS “围攻”,微软带它来“解救”

    2020年7月31日,游戏圈一年一度的盛会——第十八届 ChinaJoy 在上海开幕。纵观今年的展会,各大参展厂商可谓是使出了浑身解数,除了展示各类精彩游戏大作,还分享了大量数字娱乐产业最前沿的科技成果和未来趋势,例如 VR 游戏、游戏中的 AI 技术,以及此次的重中之重:云游戏!…...

    2024/4/10 11:28:31
  15. 动吧旅游 生态系统项目 part 1

    产品功能的实现: 1.首先实现软件的功能; 2.学会控制; 3.运维(项目运行时日志的分析,项目的布署,项目的拓展) 1. 项目简介 1.1概述 动吧旅游生态系统,应市场高端用户需求,公司决定开发这样的一套旅游系统,此系统包含旅游电商系统(广告子系统,推荐子系统,评价子系统,…...

    2024/4/11 21:08:25
  16. 【Linux】crontab定时任务配置全过程

    因为测试工作中需要在服务器上配置定时任务执行脚本,使用到了linux 的crontab。特此记录一下配置的整个流程。 crontab命令用于设置周期性被执行的指令。该命令从标准输入设备读取指令,并将其存放于“crontab”文件中,以供之后读取和执行。1.检查是否安装了crontab,如果提示…...

    2024/4/27 22:09:04
  17. 小程序配置模板消息

    1.登录微信公众号平台,需要有权限的工作人员进行扫码登录。 2.登录进去点击订阅消息。3.点击添加按钮,可以在搜索框进行搜索想要的模板类型,然后点击选用即可。4.选择提醒模板消息的参数,需要跟客户沟通需要什么参数就配什么参数,场景说明随意写明内容。5.选完模板消息之后…...

    2024/4/26 3:01:24
  18. 优质视频剪辑软件推荐,有你需要的那款吗?

    哪些视频剪辑软件好用?今天macw小编为大家总结了几款好用的视频剪辑软件推荐给大家下载,绝对给力,让视频剪辑更简单! 1、DaVinci Resolve Studio 16 for Mac(达芬奇调色视频剪辑软件) DaVinci Resolve 为迄今最先进的调色工具和专业多轨道剪辑功能合而为一,如今您只需要一个…...

    2024/4/11 21:08:22
  19. 深入了解服务网格数据平面性能和调优

    在腾讯,已经有很多产品已使用或者正在尝试使用istio来作为其微服务治理的基础平台。不过在使用istio时,也有一些对通信性能要求较高的业务会对istio的性能有一些担忧。由于envoy sidecar的引入,使两个微服务之间的通信路径变长,导致服务延时受到了一些影响,istio社区一直以…...

    2024/4/15 4:36:29
  20. springboot日志切面通用类

    文章目录1.pom.xml文件导入AOP依赖2.pom.xml导入Lombok依赖3.关于Lombok的注意点 1.pom.xml文件导入AOP依赖 <!--aop依赖--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId>&…...

    2024/4/24 1:39:09

最新文章

  1. C++下跨平台websocket库及使用示例

    websocketpp库使用非常方便&#xff0c;开源&#xff0c;可跨平台&#xff0c;内部实现全部是head文件&#xff0c;无cpp&#xff0c;接口简单易用。源码路径比如 https://github.com/zaphoyd/websocketpp.git 使用方法是&#xff0c;先下载源码包并复制道工程的include目录下…...

    2024/4/28 6:57:50
  2. 梯度消失和梯度爆炸的一些处理方法

    在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言&#xff0c;在此感激不尽。 权重和梯度的更新公式如下&#xff1a; w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...

    2024/3/20 10:50:27
  3. Redis Stack十部曲之三:理解Redis Stack中的数据类型

    文章目录 前言String字符串作为计数器限制 List限制列表阻塞列表自动创建和删除聚合类型键限制 Set限制 Hash限制 Sorted Set范围操作字典操作更新分数 JSON路径限制 BitMapBitfieldProbabilisticHyperLogLogBloom filterCuckoo filtert-digestTop-KCount-min sketchConfigurat…...

    2024/4/19 15:45:16
  4. 三防笔记本丨工业笔记本电脑丨车辆检修的应用以及优势

    伴随着汽车技术的不断更新迭代以及车辆复杂性的增加&#xff0c;现代车辆检修工作需要更高效、更精确的方法来确保车辆的安全和性能。在这过程中&#xff0c;工业笔记本电脑作为一种强大的工具&#xff0c;为车辆检修提供了诊断、记录、分析和解决问题的核心功能 故障诊断与维修…...

    2024/4/27 2:08:45
  5. 416. 分割等和子集问题(动态规划)

    题目 题解 class Solution:def canPartition(self, nums: List[int]) -> bool:# badcaseif not nums:return True# 不能被2整除if sum(nums) % 2 ! 0:return False# 状态定义&#xff1a;dp[i][j]表示当背包容量为j&#xff0c;用前i个物品是否正好可以将背包填满&#xff…...

    2024/4/28 4:04:40
  6. 【Java】ExcelWriter自适应宽度工具类(支持中文)

    工具类 import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.CellType; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet;/*** Excel工具类** author xiaoming* date 2023/11/17 10:40*/ public class ExcelUti…...

    2024/4/27 3:39:11
  7. Spring cloud负载均衡@LoadBalanced LoadBalancerClient

    LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon&#xff0c;直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件&#xff0c;我们讨论Spring负载均衡以Spring Cloud2020之后版本为主&#xff0c;学习Spring Cloud LoadBalance&#xff0c;暂不讨论Ribbon…...

    2024/4/27 12:24:35
  8. TSINGSEE青犀AI智能分析+视频监控工业园区周界安全防范方案

    一、背景需求分析 在工业产业园、化工园或生产制造园区中&#xff0c;周界防范意义重大&#xff0c;对园区的安全起到重要的作用。常规的安防方式是采用人员巡查&#xff0c;人力投入成本大而且效率低。周界一旦被破坏或入侵&#xff0c;会影响园区人员和资产安全&#xff0c;…...

    2024/4/27 12:24:46
  9. VB.net WebBrowser网页元素抓取分析方法

    在用WebBrowser编程实现网页操作自动化时&#xff0c;常要分析网页Html&#xff0c;例如网页在加载数据时&#xff0c;常会显示“系统处理中&#xff0c;请稍候..”&#xff0c;我们需要在数据加载完成后才能继续下一步操作&#xff0c;如何抓取这个信息的网页html元素变化&…...

    2024/4/27 3:39:08
  10. 【Objective-C】Objective-C汇总

    方法定义 参考&#xff1a;https://www.yiibai.com/objective_c/objective_c_functions.html Objective-C编程语言中方法定义的一般形式如下 - (return_type) method_name:( argumentType1 )argumentName1 joiningArgument2:( argumentType2 )argumentName2 ... joiningArgu…...

    2024/4/27 3:39:07
  11. 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】

    &#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格…...

    2024/4/27 3:39:07
  12. 【ES6.0】- 扩展运算符(...)

    【ES6.0】- 扩展运算符... 文章目录 【ES6.0】- 扩展运算符...一、概述二、拷贝数组对象三、合并操作四、参数传递五、数组去重六、字符串转字符数组七、NodeList转数组八、解构变量九、打印日志十、总结 一、概述 **扩展运算符(...)**允许一个表达式在期望多个参数&#xff0…...

    2024/4/27 12:44:49
  13. 摩根看好的前智能硬件头部品牌双11交易数据极度异常!——是模式创新还是饮鸩止渴?

    文 | 螳螂观察 作者 | 李燃 双11狂欢已落下帷幕&#xff0c;各大品牌纷纷晒出优异的成绩单&#xff0c;摩根士丹利投资的智能硬件头部品牌凯迪仕也不例外。然而有爆料称&#xff0c;在自媒体平台发布霸榜各大榜单喜讯的凯迪仕智能锁&#xff0c;多个平台数据都表现出极度异常…...

    2024/4/27 21:08:20
  14. Go语言常用命令详解(二)

    文章目录 前言常用命令go bug示例参数说明 go doc示例参数说明 go env示例 go fix示例 go fmt示例 go generate示例 总结写在最后 前言 接着上一篇继续介绍Go语言的常用命令 常用命令 以下是一些常用的Go命令&#xff0c;这些命令可以帮助您在Go开发中进行编译、测试、运行和…...

    2024/4/26 22:35:59
  15. 用欧拉路径判断图同构推出reverse合法性:1116T4

    http://cplusoj.com/d/senior/p/SS231116D 假设我们要把 a a a 变成 b b b&#xff0c;我们在 a i a_i ai​ 和 a i 1 a_{i1} ai1​ 之间连边&#xff0c; b b b 同理&#xff0c;则 a a a 能变成 b b b 的充要条件是两图 A , B A,B A,B 同构。 必要性显然&#xff0…...

    2024/4/27 18:40:35
  16. 【NGINX--1】基础知识

    1、在 Debian/Ubuntu 上安装 NGINX 在 Debian 或 Ubuntu 机器上安装 NGINX 开源版。 更新已配置源的软件包信息&#xff0c;并安装一些有助于配置官方 NGINX 软件包仓库的软件包&#xff1a; apt-get update apt install -y curl gnupg2 ca-certificates lsb-release debian-…...

    2024/4/28 4:14:21
  17. Hive默认分割符、存储格式与数据压缩

    目录 1、Hive默认分割符2、Hive存储格式3、Hive数据压缩 1、Hive默认分割符 Hive创建表时指定的行受限&#xff08;ROW FORMAT&#xff09;配置标准HQL为&#xff1a; ... ROW FORMAT DELIMITED FIELDS TERMINATED BY \u0001 COLLECTION ITEMS TERMINATED BY , MAP KEYS TERMI…...

    2024/4/27 13:52:15
  18. 【论文阅读】MAG:一种用于航天器遥测数据中有效异常检测的新方法

    文章目录 摘要1 引言2 问题描述3 拟议框架4 所提出方法的细节A.数据预处理B.变量相关分析C.MAG模型D.异常分数 5 实验A.数据集和性能指标B.实验设置与平台C.结果和比较 6 结论 摘要 异常检测是保证航天器稳定性的关键。在航天器运行过程中&#xff0c;传感器和控制器产生大量周…...

    2024/4/27 13:38:13
  19. --max-old-space-size=8192报错

    vue项目运行时&#xff0c;如果经常运行慢&#xff0c;崩溃停止服务&#xff0c;报如下错误 FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory 因为在 Node 中&#xff0c;通过JavaScript使用内存时只能使用部分内存&#xff08;64位系统&…...

    2024/4/27 1:03:20
  20. 基于深度学习的恶意软件检测

    恶意软件是指恶意软件犯罪者用来感染个人计算机或整个组织的网络的软件。 它利用目标系统漏洞&#xff0c;例如可以被劫持的合法软件&#xff08;例如浏览器或 Web 应用程序插件&#xff09;中的错误。 恶意软件渗透可能会造成灾难性的后果&#xff0c;包括数据被盗、勒索或网…...

    2024/4/27 3:22:12
  21. JS原型对象prototype

    让我简单的为大家介绍一下原型对象prototype吧&#xff01; 使用原型实现方法共享 1.构造函数通过原型分配的函数是所有对象所 共享的。 2.JavaScript 规定&#xff0c;每一个构造函数都有一个 prototype 属性&#xff0c;指向另一个对象&#xff0c;所以我们也称为原型对象…...

    2024/4/27 22:51:49
  22. C++中只能有一个实例的单例类

    C中只能有一个实例的单例类 前面讨论的 President 类很不错&#xff0c;但存在一个缺陷&#xff1a;无法禁止通过实例化多个对象来创建多名总统&#xff1a; President One, Two, Three; 由于复制构造函数是私有的&#xff0c;其中每个对象都是不可复制的&#xff0c;但您的目…...

    2024/4/27 3:39:00
  23. python django 小程序图书借阅源码

    开发工具&#xff1a; PyCharm&#xff0c;mysql5.7&#xff0c;微信开发者工具 技术说明&#xff1a; python django html 小程序 功能介绍&#xff1a; 用户端&#xff1a; 登录注册&#xff08;含授权登录&#xff09; 首页显示搜索图书&#xff0c;轮播图&#xff0…...

    2024/4/26 23:53:24
  24. 电子学会C/C++编程等级考试2022年03月(一级)真题解析

    C/C++等级考试(1~8级)全部真题・点这里 第1题:双精度浮点数的输入输出 输入一个双精度浮点数,保留8位小数,输出这个浮点数。 时间限制:1000 内存限制:65536输入 只有一行,一个双精度浮点数。输出 一行,保留8位小数的浮点数。样例输入 3.1415926535798932样例输出 3.1…...

    2024/4/27 20:28:35
  25. 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...

    解析如下&#xff1a;1、长按电脑电源键直至关机&#xff0c;然后再按一次电源健重启电脑&#xff0c;按F8健进入安全模式2、安全模式下进入Windows系统桌面后&#xff0c;按住“winR”打开运行窗口&#xff0c;输入“services.msc”打开服务设置3、在服务界面&#xff0c;选中…...

    2022/11/19 21:17:18
  26. 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。

    %读入6幅图像&#xff08;每一幅图像的大小是564*564&#xff09; f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...

    2022/11/19 21:17:16
  27. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...

    win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面&#xff0c;在等待界面中我们需要等待操作结束才能关机&#xff0c;虽然这比较麻烦&#xff0c;但是对系统进行配置和升级…...

    2022/11/19 21:17:15
  28. 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...

    有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows&#xff0c;请勿关闭计算机”的提示&#xff0c;要过很久才能进入系统&#xff0c;有的用户甚至几个小时也无法进入&#xff0c;下面就教大家这个问题的解决方法。第一种方法&#xff1a;我们首先在左下角的“开始…...

    2022/11/19 21:17:14
  29. win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...

    置信有很多用户都跟小编一样遇到过这样的问题&#xff0c;电脑时发现开机屏幕显现“正在配置Windows Update&#xff0c;请勿关机”(如下图所示)&#xff0c;而且还需求等大约5分钟才干进入系统。这是怎样回事呢&#xff1f;一切都是正常操作的&#xff0c;为什么开时机呈现“正…...

    2022/11/19 21:17:13
  30. 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...

    Win7系统开机启动时总是出现“配置Windows请勿关机”的提示&#xff0c;没过几秒后电脑自动重启&#xff0c;每次开机都这样无法进入系统&#xff0c;此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一&#xff1a;开机按下F8&#xff0c;在出现的Windows高级启动选…...

    2022/11/19 21:17:12
  31. 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...

    有不少windows10系统用户反映说碰到这样一个情况&#xff0c;就是电脑提示正在准备windows请勿关闭计算机&#xff0c;碰到这样的问题该怎么解决呢&#xff0c;现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法&#xff1a;1、2、依次…...

    2022/11/19 21:17:11
  32. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...

    今天和大家分享一下win7系统重装了Win7旗舰版系统后&#xff0c;每次关机的时候桌面上都会显示一个“配置Windows Update的界面&#xff0c;提示请勿关闭计算机”&#xff0c;每次停留好几分钟才能正常关机&#xff0c;导致什么情况引起的呢&#xff1f;出现配置Windows Update…...

    2022/11/19 21:17:10
  33. 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...

    只能是等着&#xff0c;别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚&#xff0c;只能是考虑备份数据后重装系统了。解决来方案一&#xff1a;管理员运行cmd&#xff1a;net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...

    2022/11/19 21:17:09
  34. 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?

    原标题&#xff1a;电脑提示“配置Windows Update请勿关闭计算机”怎么办&#xff1f;win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢&#xff1f;一般的方…...

    2022/11/19 21:17:08
  35. 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...

    关机提示 windows7 正在配置windows 请勿关闭计算机 &#xff0c;然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;关机提示 windows7 正在配…...

    2022/11/19 21:17:05
  36. 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...

    钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...

    2022/11/19 21:17:05
  37. 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...

    前几天班里有位学生电脑(windows 7系统)出问题了&#xff0c;具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面&#xff0c;长时间没反应&#xff0c;无法进入系统。这个问题原来帮其他同学也解决过&#xff0c;网上搜了不少资料&#x…...

    2022/11/19 21:17:04
  38. 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...

    本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法&#xff0c;并在最后教给你1种保护系统安全的好方法&#xff0c;一起来看看&#xff01;电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中&#xff0c;添加了1个新功能在“磁…...

    2022/11/19 21:17:03
  39. 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...

    许多用户在长期不使用电脑的时候&#xff0c;开启电脑发现电脑显示&#xff1a;配置windows更新失败&#xff0c;正在还原更改&#xff0c;请勿关闭计算机。。.这要怎么办呢&#xff1f;下面小编就带着大家一起看看吧&#xff01;如果能够正常进入系统&#xff0c;建议您暂时移…...

    2022/11/19 21:17:02
  40. 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...

    配置windows update失败 还原更改 请勿关闭计算机&#xff0c;电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;配置windows update失败 还原更改 请勿关闭计算机&#x…...

    2022/11/19 21:17:01
  41. 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...

    不知道大家有没有遇到过这样的一个问题&#xff0c;就是我们的win7系统在关机的时候&#xff0c;总是喜欢显示“准备配置windows&#xff0c;请勿关机”这样的一个页面&#xff0c;没有什么大碍&#xff0c;但是如果一直等着的话就要两个小时甚至更久都关不了机&#xff0c;非常…...

    2022/11/19 21:17:00
  42. 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...

    当电脑出现正在准备配置windows请勿关闭计算机时&#xff0c;一般是您正对windows进行升级&#xff0c;但是这个要是长时间没有反应&#xff0c;我们不能再傻等下去了。可能是电脑出了别的问题了&#xff0c;来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...

    2022/11/19 21:16:59
  43. 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...

    我们使用电脑的过程中有时会遇到这种情况&#xff0c;当我们打开电脑之后&#xff0c;发现一直停留在一个界面&#xff1a;“配置Windows Update失败&#xff0c;还原更改请勿关闭计算机”&#xff0c;等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢&#xff0…...

    2022/11/19 21:16:58
  44. 如何在iPhone上关闭“请勿打扰”

    Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...

    2022/11/19 21:16:57