参考引用

flink 体系化总结 https://www.yuque.com/wangzhiwuimportbigdata/ihf2lk/ka2x0h#Aq72c

大数据成神之路 https://github.com/wangzhiwubigdata/God-Of-BigData

大数据方向学习指南 https://blog.csdn.net/u013411339/article/details/118643213

五分钟学大数据 https://www.cnblogs.com/itlz/p/14356708.html

背景知识

实时计算的三个特征:

无限数据:无限数据指的是一种不断增长的,基本上无限的数据集。这些通常被称为 流数据,而与之相对的是有限的数据集。
无界数据处理:一种持续的数据处理模式,能够通过处理引擎重复的去处理上面的无限数据,是能够突破有限数据处理引擎的瓶颈的。
低延迟:延迟是多少并没有明确的定义。但我们都知道数据的价值将随着时间的流逝降低,时效性将是需要持续解决的问题。

分布式事务-两阶段提交协议(2PC)

两阶段提交协议(Two-Phase Commit,2PC)是很常用的解决分布式事务问题的方式,它可以保证在分布式事务中,要么所有参与进程都提交事务,要么都取消,即实现 ACID 中的 A (原子性)

在数据一致性的环境下,其代表的含义是:要么所有备份数据同时更改某个数值,要么都不改,以此来达到数据的强一致性

两阶段提交协议中有两个重要角色,协调者(Coordinator)和参与者(Participant),其中协调者只有一个,起到分布式事务的协调管理作用,参与者有多个

顾名思义,两阶段提交将提交过程划分为连续的两个阶段:表决阶段(Voting)和提交阶段(Commit)

两阶段提交协议过程如下图所示:

第一阶段:表决阶段

  1. 协调者向所有参与者发送一个 VOTE_REQUEST 消息。
  2. 当参与者接收到 VOTE_REQUEST 消息,向协调者发送VOTE_COMMIT 消息作为回应,告诉协调者自己已经做好准备提交准备,如果参与者没有准备好或遇到其他故障,就返回一个VOTE_ABORT 消息,告诉协调者目前无法提交事务。

第二阶段:提交阶段

  1. 协调者收集来自各个参与者的表决消息。如果**所有参与者一致认为可以提交事务,那么协调者决定事务的最终提交,在此情形下协调者向所有参与者发送一个 GLOBAL_COMMIT 消息,通知参与者进行本地提交;如果所有参与者中有任意一个返回消息是 VOTE_ABORT,协调者就会取消事务**,向所有参与者广播一条 GLOBAL_ABORT 消息通知所有的参与者取消事务。
  2. 每个提交了表决信息的参与者等候协调者返回消息,如果参与者接收到一个 GLOBAL_COMMIT 消息,那么参与者提交本地事务,否则如果接收到 GLOBAL_ABORT 消息,则参与者取消本地事务。
  3. Flink 核心介绍

核心理念:Apache Flink 是为分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理**

容错机制 Checkpoint

Checkpoint 机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的Checkpoint机制原理来自 Chandy-Lamport algorithm算法。

在这里插入图片描述

Flink Checkpoint 的两阶段实现

  1. beginTransaction,开启一个事务,在临时目录中创建一个临时文件,之后写入数据到该文件中。此过程为不同的事务创建隔离,避免数据混淆。
  2. preCommit。预提交阶段。将缓存数据块写出到创建的临时文件,然后关闭该文件,确保不再写入新数据到该文件,同时开启一个新事务,执行属于下一个检查点的写入操作。
  3. commit。在提交阶段,以原子操作的方式将上一阶段的文件写入真正的文件目录下。如果提交失败,Flink应用会重启,并调用TwoPhaseCommitSinkFunction#recoverAndCommit方法尝试恢复并重新提交事务。
  4. abort。一旦终止事务,删除临时文件。

状态管理 State

进行有状态的计算是 Flink 最重要的特性之一。所谓的状态,其实指的是 Flink 程序的中间计算结果。Flink 支持了不同类型的状态,并且针对状态的持久化还提供了专门的机制和状态管理器。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uIppEhsz-1642490867743)(大数据实时计算.assets/1638433332739.png)]

Keyed States:记录每个Key对应的状态值一个Task上可能包含多个Key不同Task上不会出现相同的Key ,常用的 MapState, ValueState

Operator States:记录每个Task对应的状态值数据类型

State-Keyed State

基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state,比如:stream.keyBy(…)。KeyBy之后的Operator State,可以理解为分区过的Operator State。

保存state的数据结构:

ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素。

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄

State-Operator State

与Key无关的State,与Operator绑定的state,整个operator只对应一个state

举例来说,Flink中的 Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

Broadcast State

Broadcast State 是 Flink 1.5 引入的新特性。在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

class MyBroadcastProcessImpl[KS, IN1, IN2, OUT] extends                                                                 BroadcastProcessFunction[IN1, IN2, OUT] {// 数据流处理部分override def processElement(in1: IN1,ctx: BroadcastProcessFunction[IN1, IN2, OUT]#ReadOnlyContext,collector: Collector[OUT]): Unit = ???// 广播流处理部分override def processBroadcastElement(in2: IN2,ctx: BroadcastProcessFunction[IN1, IN2, OUT]#Context,collector: Collector[OUT]): Unit = ???
}

过期时间 TTL 功能的用法

val ttlConfig: StateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(60)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build
val stateDescriptor = new ValueStateDescriptor[String]("testState", 															    classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)

时间概念 Time

Flink 支持了流处理程序在时间上的三个定义:事件时间 EventTime 、摄入时间 IngestionTime 、处理时间 ProcessingTime

  • EventTime :是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。
  • IngestionTime :是数据进入Flink的时间。
  • ProcessingTime:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

使用方式如下:

 val env = StreamExecutionEnvironment.getExecutionEnvironrnent()// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // 使用摄入时间env.setStrearnTimeCharacteristic(TimeCharacteristic.IngestionTime)// 使用事件时间env.setStrearnTimeCharacteristic(TimeCharacteristic.EventTime)

延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据:

  • 设置允许延迟的时间是通过 allowedLateness(lateness: Time)设置
  • 保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T])保存
  • 获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X])获取

Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现

数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。

WaterMark + EventTimeWindow + Allowed Lateness方案

在这里插入图片描述

窗口机制 Windows

流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段

划分窗口就两种方式:

  1. 根据时间进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。
  2. 根据数据进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。
    在这里插入图片描述
时间窗口类型
  • 滚动窗口(Tumbling Windows)
    将数据依据固定的窗口长度对数据进行切片。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YlkMFD0F-1642490867744)(大数据实时计算.assets/1638414446274.png)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ubVvmbxy-1642490867745)(大数据实时计算.assets/1638414489259.png)]

适用场景: 基于时间段、时间片的各种统计指标

  • 滑动窗口(Sliding Windows)
    滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

在这里插入图片描述

适用场景: 对最近一个时间段内的统计(求某接口最近5min的失败率)来决定是否要报警

  • 会话窗口(Session Windows)
    由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
    在这里插入图片描述

    适用场景: 基于session 的概念,适合一些类似UEBA 类行为分析

编程与案例

Flink 标准编程模型

Flink 应用程序主要由三部分组成,**源 Source ** 、转换 transformation 、**目的地 sink ** 。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个目的地(sink)结束。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v6fJfZi7-1642491346272)(大数据实时计算.assets/1638408640697.png)]

Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等,当然你也可以定义自己的 source。

Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select 等,操作很多,可以将数据转换计算成你想要的数据。

Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HzbZwH1Q-1642491346273)(大数据实时计算.assets/1638433754930.png)]

统计分析

  val conf: Configuration = new Configuration()conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)val env = StreamExecutionEnvironment.getExecutionEnvironmentval kafkaStream: DataStream[JSONObject] = getKafkaStream(env, jobParams)...flowDataStream.keyBy(_.upstreamAddr).window(TumblingProcessingTimeWindows.of(Time.seconds(60))).process(new FlowBeanWindowsCalc()).flatMap(_.toList).addSink(sensitiveEsSinkBuilder.build()).setParallelism(1)...class FlowBeanWindowsCalc extends ProcessWindowFunction[FlowBean, Array[String],String, TimeWindow] {override def process(upStreamAddr: String,context: Context, elements: Iterable[FlowBean],out: Collector[Array[String]]) = {val flowRelationArray = ArrayBuffer[String]()val timeSlot = context.window.getEndval keyByRemoteAddr = elements.toList.groupBy(_.remoteAddr)keyByRemoteAddr.foreach {case (remoteAddr, flowBeanLists) => {val upFlowJson = new mutable.HashMap[String,Any]()val timeList = flowBeanLists.map(_.timestamp)val startTime = timeList.minval endTime = timeList.maxval visit = flowBeanLists.sizeval statusCount = flowBeanLists.map(f=>(f.status,1)).groupBy(_._1).mapValues(_.size)val uriStatistics =  ArrayBuffer[mutable.HashMap[String,Any]]()flowBeanLists.groupBy(_.uri).foreach {case (uri, byUriFlowInfo) => {val hasSensitive = byUriFlowInfo.exists(_.hasSenstive)val uriStatusCode = byUriFlowInfo.map(f=>(f.status,1)).groupBy(_._1).mapValues(_.size)val requestTime = byUriFlowInfo.map(_.requestTime)val slideSize = scala.math.ceil(byUriFlowInfo.size / 10.0).toIntval lag =  Map("max"->requestTime.max,"min"->requestTime.min,"avg"->requestTime.sum / requestTime.size)val lagTrend = requestTime.grouped(slideSize).map(_.sum).toArrayval visit = byUriFlowInfo.sizeval visitTrend = byUriFlowInfo.map(_.visit).grouped(slideSize).map(_.sum).toArrayval requestTraffic = byUriFlowInfo.map(_.requestTrafficSize)val requestTrafficTotal = requestTraffic.sumval requestTrafficTrend = requestTraffic.grouped(slideSize).map(r=>r.sum/r.size).toArrayval responseTraffic = byUriFlowInfo.map(_.responseTrafficSize)val responseTrafficTotal = responseTraffic.sumval responseTrafficTrend = responseTraffic.grouped(slideSize).map(r=>r.sum/r.size).toArrayval uriType = byUriFlowInfo.head.uriType}}...upFlowJson.put("uri_statistics",uriStatistics.toArray)upFlowJson.put("src_node",remoteAddr)upFlowJson.put("dst_node",upStreamAddr)flowRelationArray.append(Json(DefaultFormats).write(upFlowJson))}}out.collect(flowRelationArray.toArray)}
}

标准 CEP

val cepPattern = Pattern//第一步: 外网连接 135 端口.begin[JSONObject]("inbound135", AfterMatchSkipStrategy.skipPastLastEvent).where { (value, ctx) => {val networkCondition =                             JsonPathUtils.readValue(value,"network.direction") == "inbound"val portCondition = JsonPathUtils.readValue(value,"destination.port") == 135newworkCondition && portCondition}// 第二步: Administrator 账号登录成功,同时登录IP是上一步的ip .followedBy("loginSuccess").where((value, ctx) => {val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 4624val sourceIp = JsonPathUtils.readValue(value,"source.ip")val inbound135 = ctx.getEventsForPattern("inbound135")var sourceIpCondition = inbound135.exist(cahedData => {JsonPathUtils.readValue(cahedData,"source.ip") == sourceIp})val userCondition = value.getJSONObject("user").getString("name") == "Administrator"eventCondition && sourceIpCondition && userCondition})//第三步 有进程创建行为,且logonId 为 第二步 操作所产生.followedBy("traceProcess").where((value, ctx) => {val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 1val logonId = JsonPathUtils.readValue(value,"winlog.event_data.logonId")val loginSuccess = ctx.getEventsForPattern("loginSuccess")var logonCondition = loginSuccess.exist(cachedData => {JsonPathUtils.readValue(cahedData,"winlog.event_data.TargeLogonId") == logonId})eventCondition && logonCondition})//第四步,上一步创建的进程 创建了 .dll  or .ps1 类型文件.followedBy("traceFile").where((value, ctx) => {val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 11val pid = JsonPathUtils.readValue(value,"process.pid")val traceProcess = ctx.getEventsForPattern("traceProcess")val pidCondiditon = traceProcess.exist(cacheData => {JsonPathUtils.readValue(cacheData,"process.pid") == pid)})val filePathCondititon = JsonPathUtils.readValue(value,"file.path").endsWith(".dll") ||JsonPathUtils.readValue(value,"file.path").endsWith("ps1")eventCondition && pidCondiditon && filePathCondititon}).followedBy("traceNetwork").where((value, ctx) => {val eventCondition = JsonPathUtils.readValue(value,"winlog.eventId") == 3val networkCondition = value.getJSONObject("network").getString("direction") == "outbound"val pid = JsonPathUtils.readValue(value,"process.pid")val traceProcess = ctx.getEventsForPattern("traceProcess")var pidCondiditon = traceProcess.exist(cacheData => {JsonPathUtils.readValue(cacheData,"process.pid") == pid})eventCondition && networkCondition && pidCondiditon}).oneOrMore.optional.within(Time.minutes(30L))val patternStream = CEP.pattern(kafkaStream, cepPattern)val result = patternStream.select(patternMap => {val inbound135 = patternMap("inbound135")val loginSuccess = patternMap("loginSuccess")val traceProcess = patternMap("traceProcess")val traceFile = patternMap("traceFile")val traceNetwork = patternMap("traceNetwork")var ruleInfoTmp =s"""|主机 ${inbound135.head.getString("computer_name")}发现 $ruleName事件:|${inbound135.head.getString("@timestamp")} 遭遇 外部ip   ${inbound135.map(_.getString("ip")).mkString} 连接 本地  端口|${loginSuccess.head.getString("@timestamp")} ip:${loginSuccess.map(_.getJSONObject("source").getString("ip")).mkString} 账号: ${loginSuccess.map(_.getJSONObject("user").getString("name")).mkString} 登录成功|${traceProcess.head.getString("@timestamp")} 执行命令:${traceProcess.map(_.getJSONObject("process").getString("args")).mkString}, 创建进程:${traceProcess.map(_.getJSONObject("process").getString("pid")).mkString}|${traceFile.head.getString("@timestamp")} 创建疑似恶意文件 ${traceFile.map(_.getJSONObject("file").getString("path")).mkString}|${traceNetwork.head.getString("@timestamp")} 网络外连 ${traceNetwork.map(_.getJSONObject("destination").toJSONString).mkString}""".stripMarginruleInfoTmp += s"\n详细日志如下:\n ${patternMap.values.map(_.map(_.toJSONString).mkString("\n")).mkString("\n")}"ruleInfoTmp})

Flink Sql

在这里插入图片描述

  • 统计、pattern recognition、ETL

    val conf: Configuration = new Configuration()conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
    val bsEnv = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    val tbEnv = StreamTableEnvironment.create(bsEnv)val createTable:String ="""|CREATE TABLE  hadoop.iceberg_db.iceberg_001 (|    id BIGINT COMMENT 'unique id',|    data STRING,|    `@timestamp` STRING|) WITH ('connector'='iceberg','write.format.default'='parquet')|""".stripMarginval sourceTable:String ="""|CREATE TABLE sourceTable (| userid int,| f_random_str STRING|) WITH (| 'connector' = 'datagen',| 'rows-per-second'='1000',| 'fields.userid.kind'='random',| 'fields.userid.min'='1',| 'fields.userid.max'='100',| 'fields.f_random_str.length'='10'|)|""".stripMargintbEnv.toRetractStream[Row](tab)tbEnv.executeSql("select * from sourceTable").print()
    tbEnv.executeSql("insert into hadoop.iceberg_db.iceberg_002 select * from sourceTable")
    

状态计算

class MyMergeStateImpl extends KeyedProcessFunction[String, ProcessNode, String]{val ttlConfig: StateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(60)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).buildvar procTreeNodesState: MapState[String, ArrayBuffer[ProcessNode]] = _
val procTreeNodesStateDesc = new MapStateDescriptor[String, ArrayBuffer[ProcessNode]]("ProcessTreeNode",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[ArrayBuffer[ProcessNode]]() {})).enableTimeToLive(ttlConfig)var procTreeIndexState: MapState[String, String] = _val procTreeIndexStateDesc = new MapStateDescriptor[String, String]("ProcessTreeIndex",BasicTypeInfo.STRING_TYPE_INFO,BasicTypeInfo.STRING_TYPE_INFO).enableTimeToLive(ttlConfig)override def open(parameters: Configuration): Unit = {super.open(parameters)procTreeNodesState = getRuntimeContext.getMapState(procTreeNodesStateDesc)procTreeIndexState = getRuntimeContext.getMapState(procTreeIndexStateDesc)}override def processElement(processNode: ProcessNode,context: KeyedProcessFunction[String, ProcessNode,String]#Context,collector: Collector[String]): Unit = {mergeProcessTreeNode(processNode)}private def mergeProcessTreeNode(curTreeNode:ProcessNode):Unit = {... nodeFlag match {// 同一棵树 : 直接 把当前节点添加进去case "sameTree" =>val upTreeId = if(curTreeId != "") curTreeId else parentTreeIdval updateNode = procTreeNodesState.get(upTreeId)ProcessTree.mergeNode(curTreeNode, updateNode)procTreeNodesState.put(curTreeId, updateNode)procTreeIndexState.put(curTreeNode.selfCode,upTreeId)procTreeIndexState.put(curTreeNode.parentCode,upTreeId)// 不同树: 分别把两个棵树得所有节点 合并在一起,产生一棵新树case "differentTree" =>val mergeTreeId = UUID.randomUUID().toStringval mergeNodeBuffer = new ArrayBuffer[ProcessNode]()procTreeNodesState.get(curTreeId).foreach( ProcessTree.mergeNode(_, mergeNodeBuffer))procTreeNodesState.get(parentTreeId).foreach( ProcessTree.mergeNode(_, mergeNodeBuffer))ProcessTree.mergeNode(curTreeNode, mergeNodeBuffer)procTreeIndexState.entries().asScala.foreach(indexEntry => {if (indexEntry.getValue == curTreeId || indexEntry.getValue == parentTreeId){indexEntry.setValue(mergeTreeId)}})procTreeNodesState.remove(curTreeId)procTreeNodesState.remove(parentTreeId)procTreeNodesState.put(mergeTreeId, mergeNodeBuffer.sortBy(_.timestamp))case "notTree" =>val newTreeId = UUID.randomUUID().toStringprocTreeNodesState.put(newTreeId, ArrayBuffer.apply(curTreeNode))procTreeIndexState.put(curTreeNode.selfCode, newTreeId)procTreeIndexState.put(curTreeNode.parentCode, newTreeId)}procTreeIndexState.remove("")// 清理历史数据procTreeNodesState.entries().asScala.foreach(treeEntry => {val nodeList = treeEntry.getValueval treeMaxTime = nodeList.maxBy(_.timestamp).timestampval timeFilterNode = nodeList.filter(n=>{System.currentTimeMillis() - 5 * 60 * 1000 < n.timestamp })treeEntry.setValue(timeFilterNode)procTreeNodesState.put(treeEntry.getKey, treeEntry.getValue)if(timeFilterNode.isEmpty) noneTreeId.append(treeEntry.getKey)})}
}

connector

消费 kafka 数据 (source)
val kafkaProperties = new Properties()
kafkaProperties.setProperty("bootstrap.servers", jobParams.bootstrapServers)
kafkaProperties.setProperty("auto.offset.reset", jobParams.autoOffsetReset)
kafkaProperties.setProperty("group.id", jobParams.groupId)
kafkaProperties.setProperty("default.api.timeout.ms", "600000")val kafkaStream = env.addSource(new FlinkKafkaConsumer(jobParams.kafkaInputTopic,new SimpleStringSchema(),                                                     kafkaProperties).setCommitOffsetsOnCheckpoints(true).setStartFromGroupOffsets()).setParallelism(jobParams.kafkaParallelism).name("kafka 数据流")// 过滤掉非json 格式的脏数据.map(recode => {try {JSON.parseObject(recode)}catch {case e: Exception => log.error(recode + ">>>" + e.getMessage)}}).filter(_ != null).name("format data from kafka")
广播消息源(广播规则)
val broadcastDescriptor = new MapStateDescriptor[String, Map[String, Any]]                               ("broadcastRules",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(classOf[Map[String, Any]]))env.addSource(new RichSourceFunction[Map[String, Any]]() {private var isRunning = trueprivate var lastModifyTime: Long = _private var ruleParser: RulParser = _override def open(parameters: Configuration) {lastModifyTime = 0ruleParser = new FalcoRuleParser()ruleParser.parsedRule(jobParams.ruleIndex)}@throws[Exception]override def run(ctx: SourceFunction.SourceContext[Map[String, Any]]) = {while (isRunning) {val currentMaxTime = ruleParser.getRuleUpdateTime(jobParams)val isChanged = if (lastModifyTime == currentMaxTime) false else trueif (isChanged) {ctx.collect(ruleParser.getRuleCondition)lastModifyTime = currentMaxTime}TimeUnit.SECONDS.sleep(60)}}override def cancel(): Unit = {isRunning = false}}).setParallelism(1).name("规则监控流").broadcast(broadcastDescriptor)
自定义消息源(SocketServer)
class SyslogUDPServerSource(serverHost:String, serverPort:Int) extends RichSourceFunction[JSONObject] {private lazy val log = LoggerFactory.getLogger(this.getClass.getName)private val splitPackageBuffer = new ArrayBuffer[String]private var channel: DatagramChannel = _private var selector: Selector = _private var isRunning: Boolean = falseprivate var grokCompiler: GrokCompiler = _private var grok: Grok = _override def open(parameters: Configuration): Unit = {channel = DatagramChannel.openchannel.configureBlocking(false)channel.bind(new InetSocketAddress(serverHost, serverPort))selector = Selector.openchannel.register(selector, SelectionKey.OP_READ)isRunning = truegrokCompiler = GrokCompiler.newInstance()grokCompiler.registerDefaultPatterns()// 进行注册, registerDefaultPatterns()方法注册的是Grok内置的patternsgrok = grokCompiler.compile(Constant.GROK_PATTERN)log.info(s"UDP服务器启动:${serverHost}:${serverPort}")}override def run(ctx: SourceFunction.SourceContext[JSONObject]): Unit = {//轮询服务while (isRunning) {// 通过选择器,查询IO事件while (selector.select > 0) {val iterator = selector.selectedKeys.iteratorval buffer = ByteBuffer.allocate(1024 * 1000)while (iterator.hasNext) {val selectionKey = iterator.nextif (selectionKey.isReadable) {channel.receive(buffer)buffer.flipval receiveStr = new String(buffer.array, 0, buffer.limit)val parserData = SyslogGrokExtract.getMessage(receiveStr)ctx.collect(parserData)} buffer.compact()iterator.remove()}}TimeUnit.MILLISECONDS.sleep(500)}}override def cancel(): Unit = {try {isRunning = falseselector.close()channel.close()} catch {case e: IOException =>e.printStackTrace()}}
}
Sink 结果到 Kafka
val kafkaProducer = new FlinkKafkaProducer[String](jobParams.bootstrapServers,                                                  jobParams.kafkaOutputTopic,new SimpleStringSchema)
kafkaProducer.setWriteTimestampToKafka(true)
rtnDataStream.addSink(kafkaProducer)
Sink 结果到 ElasticSearch
def getElasticSearchSinkBuilder(esHosts: String, esPort: Int,esUser: String, esPassword: String) = {val httpHosts = new java.util.ArrayList[HttpHost]esHosts.split(",").foreach(esaddr => {httpHosts.add(new HttpHost(esaddr, esPort, "http"))})new ElasticsearchSink.Builder[ElasticSearchSinkBean](httpHosts,new ElasticsearchSinkFunction[ElasticSearchSinkBean] {def createIndexRequest(esBean: ElasticSearchSinkBean): IndexRequest = {Requests.indexRequest().index(esBean.indexName).`type`("doc").id(esBean.docId).source(esBean.message, XContentType.JSON)}override def process(esBean: ElasticSearchSinkBean,runtimeContext: RuntimeContext,requestIndexer: RequestIndexer): Unit = {requestIndexer.add(createIndexRequest(esBean))}})
}
...
// 对 DataStream 使用 addSink 绑定输出
val esSinkBuilder = getElasticSearchSinkBuilder(xxxx)
esSinkBuilder.setBulkFlushMaxActions(1000)
formatESData.addSink(esSinkBuilder.build()).setParallelism(5)
Sink 结果到 Mysql
class JDBCSink() extends RichSinkFunction[SensorReading]{ // 定义sql连接、预编译器var conn: Connection = _var insertStmt: PreparedStatement = _var updateStmt: PreparedStatement = _// 初始化,创建连接和预编译语句override def open(parameters: Configuration): Unit = {super.open(parameters)conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink?serverTimezone=UTC", "root", "root")insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERE name = ?")}// 调用连接,执行sqloverride def invoke(value: SensorReading,context: SinkFunction.Context[_]): Unit = {// 执行更新语句updateStmt.setString(2, value.id)updateStmt.setDouble(1, value.temperature)updateStmt.execute()// 如果update没有查到数据,那么执行插入语句if( updateStmt.getUpdateCount == 0 ){insertStmt.setString(1, value.id)insertStmt.setDouble(2, value.temperature)insertStmt.execute()}}// 关闭时做清理工作override def close(): Unit = {insertStmt.close()updateStmt.close()conn.close()}
}
...
// 在主程序里面通过addSink绑定
stream.addSink(new JDBCSink())

其他

  • 大数据方向技能图谱

在这里插入图片描述

  • 大数据安全分析体系

在这里插入图片描述

查看全文
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

相关文章

  1. Docker部署Mysql

    目录 1.拉取Mysql镜像 2.创建Mysql容器 3.进入Mysql容器&#xff0c;登录mysql 4. 查看Mysql信息日志 5.在自己的电脑上连接数据库 1.拉取Mysql镜像 [rootVM-0-10-centos ~]# docker pull mysql:5.7 5.7: Pulling from library/mysql 72a69066d2fe: Pull complete 93619…...

    2024/5/2 15:33:56
  2. 基于Spingboot+maven+Quartz实现定时任务动态管理

    基于SpingbootmavenQuartz实现定时任务动态管理的demo 目录基于SpingbootmavenQuartz实现定时任务动态管理的demo描述实施样例中只写了运行定时的用例&#xff0c;其余用例调用manager中的方法即可描述 该demo为动态创建定时任务demo。 demo内容&#xff1a;动态管理定时任务&…...

    2024/4/14 15:59:13
  3. 集群的日志监控系统基础配置及nginx负载均衡配置

    集群的日志监控系统基础配置及nginx负载均衡配置前文环境及准备工作开始搭建Java配置Java应用部署nginx配置负载均衡避坑点总结前文 上次给大家讲解单台应用服务器的plg日志系统搭建细节&#xff0c;下面给大家讲解应用集群的配置方式。 Promtail Loki Grafana 日志监控系统…...

    2024/5/8 12:58:34
  4. Cherno C++ P10 C++头文件

    C头文件 YouTube视频链接 本文是ChernoP10视频的学习笔记&#xff0c;P8,P9分别介绍了变量和函数的基本知识&#xff0c;由于比较简单&#xff0c;所以没有制作笔记。 C头文件实际上负责什么 头文件通常用于声明某些类型的函数&#xff0c;以便能够用在我们的程序中。如果我们…...

    2024/5/3 20:57:39
  5. vue 添加埋点,记录访问次数,数据请求情况

    首先要store创建状态 store.js 文件 import Vue from vue import Vuex from vuex Vue.use(Vuex)//创建VueX对象 const state {routerName:, // 当前路由routerUrl:, // 当前urlasyncMethods:[], // 页面访问的数据接口startTime:new Date().getTime() // 开始进入页面时…...

    2024/4/7 4:39:23
  6. Android的性能优化

    启动优化的理由&#xff1a; 所谓的8秒定理&#xff0c;在移动和PC端&#xff0c;当用户对于响应的页面超过8秒&#xff0c;基本已经给当前应用否定的态度&#xff0c;已经将产品的竞争能力降到了最低。 接起来了解下优化的方向&#xff1a; 一、分析优化方向 app启动的三种…...

    2024/5/8 18:20:30
  7. 花2W去培训机构学的Python,现在已经入职了,之前的资料课件用不到了,分享给大家,请叫我资料侠。

    导读 前年毕业&#xff0c;实习发现工资才不到2k&#xff0c;又不想做着劳累的工作&#xff0c;拿着最低的工资。于是向家里要了一笔2W的巨款&#xff0c;报了一个IT的培训机构。 机构名称就不提了&#xff0c;当时推荐我学习Python&#xff0c;说它非常容易入门&#xff0c;…...

    2024/4/18 16:52:55
  8. 对联盟链而言,跨链协议为什么重要?

    区块链&#xff0c;本来是一种实时同步的分布式账本技术&#xff0c;它具有多方参与、共建共享特点&#xff0c;打破了企业原来彼此孤立的信息系统。随着区块链的越来越多&#xff0c;各条区块链自身反而又形成了信息孤岛。 “如果说共识机制是区块链的灵魂核心,那么对于区块链…...

    2024/4/20 0:45:09
  9. 【剑指 Offer 63】股票的最大利润 c++

    题目描述&#xff1a; 假设把某股票的价格按照时间先后顺序存储在数组中&#xff0c;请问买卖该股票一次可能获得的最大利润是多少&#xff1f; 示例 1: 输入: [7,1,5,3,6,4] 输出: 5 解释: 在第 2 天&#xff08;股票价格 1&#xff09;的时候买入&#xff0c;在第 5 天&am…...

    2024/4/14 15:59:23
  10. 基于AI深度学习的缺陷检测系统

    1. 基于AI深度学习的工业缺陷检测现状 在工业生产中&#xff0c;由于生产和运输环境中的不可控因素&#xff0c;很容易产生划痕、压伤、擦挂等缺陷。而其中的缺陷大部分都极其微小&#xff0c;甚至是肉眼难以识别&#xff0c;这些缺陷所造成的坏品率极大的制约了工业界的发展。…...

    2024/4/20 3:14:31
  11. 基于RTT Nano的多任务嵌入式程序设计

    文章目录一、RT-Thread 操作系统的特点和优势二、准备工作三、代码修改一、RT-Thread 操作系统的特点和优势 RT-Thread可选优先级抢占式调度&#xff0c;256/32/8个优先级&#xff0c;线程数不限。相同优先级线程时间片轮转调度。支持动态创建/销毁线程.RT-Thread固定分区内存…...

    2024/4/19 19:03:55
  12. 树莓派4B 32位系统初体验csi mipi接口摄像头

    目录 一、32位系统树莓派关机情况下接csi摄像头 二、开机开启camera 三、测试拍照和录像 拍照 录像 四、实时显示(类似监控) 通过motion工具借助浏览器http协议访问监控 安装motion&#xff1a; 修改配置文件&#xff1a; 通过VLC工具 一、32位系统树莓派关机情况下接…...

    2024/4/5 5:07:28
  13. 多卡聚合路由器在列车轨道交通的应用

    轨道交通的列车在行进过程中&#xff0c;车载设备会产生大量的运行数据和告警数据&#xff0c;以往是存储在车载设备内部&#xff0c;到检修站后由检修员导出。此模式会导致列车设备状态上报延时&#xff0c;运营中心无法实时获取列车重要故障告警&#xff0c;可能导致发生运营…...

    2024/4/26 9:24:02
  14. SSM-Mybatis之一:入门

    以下内容是基于MyBatis官网文档进行扩展的 1、什么是 MyBatis&#xff1f; MyBatis 是一款优秀的持久层框架&#xff0c;它支持自定义 SQL、存储过程以及高级映射。MyBatis免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis可以通过简单的 XML 或注解来配置…...

    2024/4/19 12:43:11
  15. hcip网络类型及二层封装技术详解

    网络类型 点到点&#xff1a;在一个网段内只能存在两个节点 MA多路访问--在同一个网段内&#xff0c;节点的数量不限制 正常需要存在二层地址&#xff0c;否则无法单播BMA -- 广播型多路访问NBMA -- 非广播型多路访问网络类型基于数据链路层选用的技术进行区分&#xff1a;部…...

    2024/4/14 16:00:04
  16. 基于spm12对fmri数据的分析

    任务描述 这批数据来自于spm12&#xff08;https://tinyurl.com/scelanz&#xff09;官网&#xff0c;被试需要做2个runs&#xff0c;如下图所示&#xff0c;需要对箭头的方向做出反应&#xff0c;箭头朝右就按右键&#xff0c;朝左按左键&#xff0c;这批数据中有被试的反应时…...

    2024/5/4 9:50:44
  17. 2022四川最新建筑施工架子工(建筑特种作业)模拟考试试题及答案

    百分百题库提供特种工&#xff08;架子工&#xff09;考试试题、特种工&#xff08;架子工&#xff09;考试预测题、特种工&#xff08;架子工&#xff09;考试真题、特种工&#xff08;架子工&#xff09;证考试题库等,提供在线做题刷题&#xff0c;在线模拟考试&#xff0c;助…...

    2024/4/14 16:00:34
  18. BUUCTF|PWN-[HarekazeCTF2019]baby_rop2-WP

    1.检查保护机制 开启了堆栈不可执行 2.用64位IDA打开该文件 &#xff08;1&#xff09;shiftf12,查看关键字符串 &#xff08;2&#xff09;双击&#xff0c;ctrlx&#xff0c;进入反汇编代码 &#xff08;3&#xff09;ROPgadget查看返回地址 &#xff08;4&#xff09;paylo…...

    2024/4/19 19:45:20
  19. C primer plus 第四章课后复习题答案笔记解释整理

    复习题 再次运行程序清单4.1&#xff0c;但是在要求输入名时&#xff0c;请输入名和姓&#xff08;根据英文书写习惯&#xff0c;名和姓中间有一个空格&#xff09;&#xff0c;看看程序会发生什么情况。为什么&#xff1f; C语言在使用scanf&#xff08;&#xff09;函数读取用…...

    2024/5/4 21:45:57
  20. Netty中的基本概念和初步学习

    重要概念 Channel channel是Netty的核心概念之一&#xff0c;它是Netty网络通信的主体&#xff0c;由它负责同对端进行网络通信、注册和数据操作等功能。 每当Netty建立了一个连接之后&#xff0c;都会有一个对应的channel实例&#xff0c;并且还有父子channel的概念&#…...

    2024/5/2 2:22:38

最新文章

  1. 部署kafka后启动报错(坑):无法指定被请求的地址

    启动kafka后报错&#xff1a;org.apache.kafka.common.KafkaException: Socket server failed to bind to 127.0.0.1:9092: 无法指定被请求的地址 1、编辑配置文件 vim config/server.properties 2、在listeners PLAINTEXT://your.host.name:9092下方添加服务器内外网地址配…...

    2024/5/8 19:52:55
  2. 梯度消失和梯度爆炸的一些处理方法

    在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言&#xff0c;在此感激不尽。 权重和梯度的更新公式如下&#xff1a; w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...

    2024/5/7 10:36:02
  3. SpringBoot和Vue2项目配置https协议

    1、SpringBoot项目 ① 去你自己的云申请并下载好相关文件&#xff0c;SpringBoot下载的是Tomcat&#xff08;默认&#xff09;&#xff0c;Vue2下载的是Nginx ② 将下载的压缩包里面的.pfx后缀文件拷贝到项目的resources目录下 ③ 编辑配置文件 &#xff08;主要是框里面的内…...

    2024/5/6 21:59:47
  4. 蓝桥杯加训

    1.两只塔姆沃斯牛&#xff08;模拟&#xff09; 思路&#xff1a;人和牛都记录三个数据&#xff0c;当前坐标和走的方向&#xff0c;如果人和牛的坐标和方向走重复了&#xff0c;那就说明一直在绕圈圈&#xff0c;无解 #include<iostream> using namespace std; const i…...

    2024/5/8 15:01:39
  5. linuxday05

    1、makedile原理&#xff08;增量编译生成代码&#xff09; # &#xff08;注释符&#xff09; 目标------依赖 目标不存在//目标比依赖旧才会执行命令&#xff1b; makefile的实现 1、命名要求&#xff08;Makefile/makefile&#xff09; 2、规则的集合 目标文件&#…...

    2024/5/7 13:05:29
  6. 【外汇早评】美通胀数据走低,美元调整

    原标题:【外汇早评】美通胀数据走低,美元调整昨日美国方面公布了新一期的核心PCE物价指数数据,同比增长1.6%,低于前值和预期值的1.7%,距离美联储的通胀目标2%继续走低,通胀压力较低,且此前美国一季度GDP初值中的消费部分下滑明显,因此市场对美联储后续更可能降息的政策…...

    2024/5/8 6:01:22
  7. 【原油贵金属周评】原油多头拥挤,价格调整

    原标题:【原油贵金属周评】原油多头拥挤,价格调整本周国际劳动节,我们喜迎四天假期,但是整个金融市场确实流动性充沛,大事频发,各个商品波动剧烈。美国方面,在本周四凌晨公布5月份的利率决议和新闻发布会,维持联邦基金利率在2.25%-2.50%不变,符合市场预期。同时美联储…...

    2024/5/7 9:45:25
  8. 【外汇周评】靓丽非农不及疲软通胀影响

    原标题:【外汇周评】靓丽非农不及疲软通胀影响在刚结束的周五,美国方面公布了新一期的非农就业数据,大幅好于前值和预期,新增就业重新回到20万以上。具体数据: 美国4月非农就业人口变动 26.3万人,预期 19万人,前值 19.6万人。 美国4月失业率 3.6%,预期 3.8%,前值 3…...

    2024/5/4 23:54:56
  9. 【原油贵金属早评】库存继续增加,油价收跌

    原标题:【原油贵金属早评】库存继续增加,油价收跌周三清晨公布美国当周API原油库存数据,上周原油库存增加281万桶至4.692亿桶,增幅超过预期的74.4万桶。且有消息人士称,沙特阿美据悉将于6月向亚洲炼油厂额外出售更多原油,印度炼油商预计将每日获得至多20万桶的额外原油供…...

    2024/5/7 14:25:14
  10. 【外汇早评】日本央行会议纪要不改日元强势

    原标题:【外汇早评】日本央行会议纪要不改日元强势近两日日元大幅走强与近期市场风险情绪上升,避险资金回流日元有关,也与前一段时间的美日贸易谈判给日本缓冲期,日本方面对汇率问题也避免继续贬值有关。虽然今日早间日本央行公布的利率会议纪要仍然是支持宽松政策,但这符…...

    2024/5/4 23:54:56
  11. 【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响

    原标题:【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响近日伊朗局势升温,导致市场担忧影响原油供给,油价试图反弹。此时OPEC表态稳定市场。据消息人士透露,沙特6月石油出口料将低于700万桶/日,沙特已经收到石油消费国提出的6月份扩大出口的“适度要求”,沙特将满…...

    2024/5/4 23:55:05
  12. 【外汇早评】美欲与伊朗重谈协议

    原标题:【外汇早评】美欲与伊朗重谈协议美国对伊朗的制裁遭到伊朗的抗议,昨日伊朗方面提出将部分退出伊核协议。而此行为又遭到欧洲方面对伊朗的谴责和警告,伊朗外长昨日回应称,欧洲国家履行它们的义务,伊核协议就能保证存续。据传闻伊朗的导弹已经对准了以色列和美国的航…...

    2024/5/4 23:54:56
  13. 【原油贵金属早评】波动率飙升,市场情绪动荡

    原标题:【原油贵金属早评】波动率飙升,市场情绪动荡因中美贸易谈判不安情绪影响,金融市场各资产品种出现明显的波动。随着美国与中方开启第十一轮谈判之际,美国按照既定计划向中国2000亿商品征收25%的关税,市场情绪有所平复,已经开始接受这一事实。虽然波动率-恐慌指数VI…...

    2024/5/7 11:36:39
  14. 【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试

    原标题:【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试美国和伊朗的局势继续升温,市场风险情绪上升,避险黄金有向上突破阻力的迹象。原油方面稍显平稳,近期美国和OPEC加大供给及市场需求回落的影响,伊朗局势并未推升油价走强。近期中美贸易谈判摩擦再度升级,美国对中…...

    2024/5/4 23:54:56
  15. 【原油贵金属早评】市场情绪继续恶化,黄金上破

    原标题:【原油贵金属早评】市场情绪继续恶化,黄金上破周初中国针对于美国加征关税的进行的反制措施引发市场情绪的大幅波动,人民币汇率出现大幅的贬值动能,金融市场受到非常明显的冲击。尤其是波动率起来之后,对于股市的表现尤其不安。隔夜美国股市出现明显的下行走势,这…...

    2024/5/6 1:40:42
  16. 【外汇早评】美伊僵持,风险情绪继续升温

    原标题:【外汇早评】美伊僵持,风险情绪继续升温昨日沙特两艘油轮再次发生爆炸事件,导致波斯湾局势进一步恶化,市场担忧美伊可能会出现摩擦生火,避险品种获得支撑,黄金和日元大幅走强。美指受中美贸易问题影响而在低位震荡。继5月12日,四艘商船在阿联酋领海附近的阿曼湾、…...

    2024/5/4 23:54:56
  17. 【原油贵金属早评】贸易冲突导致需求低迷,油价弱势

    原标题:【原油贵金属早评】贸易冲突导致需求低迷,油价弱势近日虽然伊朗局势升温,中东地区几起油船被袭击事件影响,但油价并未走高,而是出于调整结构中。由于市场预期局势失控的可能性较低,而中美贸易问题导致的全球经济衰退风险更大,需求会持续低迷,因此油价调整压力较…...

    2024/5/4 23:55:17
  18. 氧生福地 玩美北湖(上)——为时光守候两千年

    原标题:氧生福地 玩美北湖(上)——为时光守候两千年一次说走就走的旅行,只有一张高铁票的距离~ 所以,湖南郴州,我来了~ 从广州南站出发,一个半小时就到达郴州西站了。在动车上,同时改票的南风兄和我居然被分到了一个车厢,所以一路非常愉快地聊了过来。 挺好,最起…...

    2024/5/7 9:26:26
  19. 氧生福地 玩美北湖(中)——永春梯田里的美与鲜

    原标题:氧生福地 玩美北湖(中)——永春梯田里的美与鲜一觉醒来,因为大家太爱“美”照,在柳毅山庄去寻找龙女而错过了早餐时间。近十点,向导坏坏还是带着饥肠辘辘的我们去吃郴州最富有盛名的“鱼头粉”。说这是“十二分推荐”,到郴州必吃的美食之一。 哇塞!那个味美香甜…...

    2024/5/4 23:54:56
  20. 氧生福地 玩美北湖(下)——奔跑吧骚年!

    原标题:氧生福地 玩美北湖(下)——奔跑吧骚年!让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 啊……啊……啊 两…...

    2024/5/8 19:33:07
  21. 扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!

    原标题:扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!扒开伪装医用面膜,翻六倍价格宰客!当行业里的某一品项火爆了,就会有很多商家蹭热度,装逼忽悠,最近火爆朋友圈的医用面膜,被沾上了污点,到底怎么回事呢? “比普通面膜安全、效果好!痘痘、痘印、敏感肌都能用…...

    2024/5/5 8:13:33
  22. 「发现」铁皮石斛仙草之神奇功效用于医用面膜

    原标题:「发现」铁皮石斛仙草之神奇功效用于医用面膜丽彦妆铁皮石斛医用面膜|石斛多糖无菌修护补水贴19大优势: 1、铁皮石斛:自唐宋以来,一直被列为皇室贡品,铁皮石斛生于海拔1600米的悬崖峭壁之上,繁殖力差,产量极低,所以古代仅供皇室、贵族享用 2、铁皮石斛自古民间…...

    2024/5/4 23:55:16
  23. 丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者

    原标题:丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者【公司简介】 广州华彬企业隶属香港华彬集团有限公司,专注美业21年,其旗下品牌: 「圣茵美」私密荷尔蒙抗衰,产后修复 「圣仪轩」私密荷尔蒙抗衰,产后修复 「花茵莳」私密荷尔蒙抗衰,产后修复 「丽彦妆」专注医学护…...

    2024/5/4 23:54:58
  24. 广州械字号面膜生产厂家OEM/ODM4项须知!

    原标题:广州械字号面膜生产厂家OEM/ODM4项须知!广州械字号面膜生产厂家OEM/ODM流程及注意事项解读: 械字号医用面膜,其实在我国并没有严格的定义,通常我们说的医美面膜指的应该是一种「医用敷料」,也就是说,医用面膜其实算作「医疗器械」的一种,又称「医用冷敷贴」。 …...

    2024/5/6 21:42:42
  25. 械字号医用眼膜缓解用眼过度到底有无作用?

    原标题:械字号医用眼膜缓解用眼过度到底有无作用?医用眼膜/械字号眼膜/医用冷敷眼贴 凝胶层为亲水高分子材料,含70%以上的水分。体表皮肤温度传导到本产品的凝胶层,热量被凝胶内水分子吸收,通过水分的蒸发带走大量的热量,可迅速地降低体表皮肤局部温度,减轻局部皮肤的灼…...

    2024/5/4 23:54:56
  26. 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...

    解析如下&#xff1a;1、长按电脑电源键直至关机&#xff0c;然后再按一次电源健重启电脑&#xff0c;按F8健进入安全模式2、安全模式下进入Windows系统桌面后&#xff0c;按住“winR”打开运行窗口&#xff0c;输入“services.msc”打开服务设置3、在服务界面&#xff0c;选中…...

    2022/11/19 21:17:18
  27. 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。

    %读入6幅图像&#xff08;每一幅图像的大小是564*564&#xff09; f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...

    2022/11/19 21:17:16
  28. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...

    win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面&#xff0c;在等待界面中我们需要等待操作结束才能关机&#xff0c;虽然这比较麻烦&#xff0c;但是对系统进行配置和升级…...

    2022/11/19 21:17:15
  29. 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...

    有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows&#xff0c;请勿关闭计算机”的提示&#xff0c;要过很久才能进入系统&#xff0c;有的用户甚至几个小时也无法进入&#xff0c;下面就教大家这个问题的解决方法。第一种方法&#xff1a;我们首先在左下角的“开始…...

    2022/11/19 21:17:14
  30. win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...

    置信有很多用户都跟小编一样遇到过这样的问题&#xff0c;电脑时发现开机屏幕显现“正在配置Windows Update&#xff0c;请勿关机”(如下图所示)&#xff0c;而且还需求等大约5分钟才干进入系统。这是怎样回事呢&#xff1f;一切都是正常操作的&#xff0c;为什么开时机呈现“正…...

    2022/11/19 21:17:13
  31. 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...

    Win7系统开机启动时总是出现“配置Windows请勿关机”的提示&#xff0c;没过几秒后电脑自动重启&#xff0c;每次开机都这样无法进入系统&#xff0c;此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一&#xff1a;开机按下F8&#xff0c;在出现的Windows高级启动选…...

    2022/11/19 21:17:12
  32. 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...

    有不少windows10系统用户反映说碰到这样一个情况&#xff0c;就是电脑提示正在准备windows请勿关闭计算机&#xff0c;碰到这样的问题该怎么解决呢&#xff0c;现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法&#xff1a;1、2、依次…...

    2022/11/19 21:17:11
  33. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...

    今天和大家分享一下win7系统重装了Win7旗舰版系统后&#xff0c;每次关机的时候桌面上都会显示一个“配置Windows Update的界面&#xff0c;提示请勿关闭计算机”&#xff0c;每次停留好几分钟才能正常关机&#xff0c;导致什么情况引起的呢&#xff1f;出现配置Windows Update…...

    2022/11/19 21:17:10
  34. 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...

    只能是等着&#xff0c;别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚&#xff0c;只能是考虑备份数据后重装系统了。解决来方案一&#xff1a;管理员运行cmd&#xff1a;net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...

    2022/11/19 21:17:09
  35. 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?

    原标题&#xff1a;电脑提示“配置Windows Update请勿关闭计算机”怎么办&#xff1f;win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢&#xff1f;一般的方…...

    2022/11/19 21:17:08
  36. 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...

    关机提示 windows7 正在配置windows 请勿关闭计算机 &#xff0c;然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;关机提示 windows7 正在配…...

    2022/11/19 21:17:05
  37. 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...

    钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...

    2022/11/19 21:17:05
  38. 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...

    前几天班里有位学生电脑(windows 7系统)出问题了&#xff0c;具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面&#xff0c;长时间没反应&#xff0c;无法进入系统。这个问题原来帮其他同学也解决过&#xff0c;网上搜了不少资料&#x…...

    2022/11/19 21:17:04
  39. 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...

    本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法&#xff0c;并在最后教给你1种保护系统安全的好方法&#xff0c;一起来看看&#xff01;电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中&#xff0c;添加了1个新功能在“磁…...

    2022/11/19 21:17:03
  40. 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...

    许多用户在长期不使用电脑的时候&#xff0c;开启电脑发现电脑显示&#xff1a;配置windows更新失败&#xff0c;正在还原更改&#xff0c;请勿关闭计算机。。.这要怎么办呢&#xff1f;下面小编就带着大家一起看看吧&#xff01;如果能够正常进入系统&#xff0c;建议您暂时移…...

    2022/11/19 21:17:02
  41. 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...

    配置windows update失败 还原更改 请勿关闭计算机&#xff0c;电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;配置windows update失败 还原更改 请勿关闭计算机&#x…...

    2022/11/19 21:17:01
  42. 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...

    不知道大家有没有遇到过这样的一个问题&#xff0c;就是我们的win7系统在关机的时候&#xff0c;总是喜欢显示“准备配置windows&#xff0c;请勿关机”这样的一个页面&#xff0c;没有什么大碍&#xff0c;但是如果一直等着的话就要两个小时甚至更久都关不了机&#xff0c;非常…...

    2022/11/19 21:17:00
  43. 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...

    当电脑出现正在准备配置windows请勿关闭计算机时&#xff0c;一般是您正对windows进行升级&#xff0c;但是这个要是长时间没有反应&#xff0c;我们不能再傻等下去了。可能是电脑出了别的问题了&#xff0c;来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...

    2022/11/19 21:16:59
  44. 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...

    我们使用电脑的过程中有时会遇到这种情况&#xff0c;当我们打开电脑之后&#xff0c;发现一直停留在一个界面&#xff1a;“配置Windows Update失败&#xff0c;还原更改请勿关闭计算机”&#xff0c;等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢&#xff0…...

    2022/11/19 21:16:58
  45. 如何在iPhone上关闭“请勿打扰”

    Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...

    2022/11/19 21:16:57