文章目录

  • 一.基本程序结构
  • 二.创建表环境
  • 三.在 Catalog 中注册表
    • 3.1 表(Table)的概念
    • 3.2 连接到文件系统(Csv 格式)
    • 3.3 连接到 Kafka
  • 四. 表的查询
    • 4.1 Table API 的调用
    • 4.2 SQL 查询
  • 五. 将DataStream 转换成表
    • 5.1 代码表达
    • 5.2 数据类型与 Table schema 的对应
  • 六. 创建临时视图(Temporary View)
  • 七. 输出表
    • 7.1 输出到文件
    • 7.2 更新模式(Update Mode)
    • 7.3 输出到 Kafka
    • 7.4 输出到 ElasticSearch
    • 7.5 输出到 MySql
  • 八. 将表转换成 DataStream
  • 九. Query 的解释和执行
  • 十. 案例
    • 10.1 输入案例
    • 10.2 输出案例
    • 10.3 Kafka输入和输出
  • 参考:

一.基本程序结构

Table API和SQL的程序结构,与流式处理的程序结构十分类似;

也可以近似的认为有这么几步:
首先创建执行环境,然后定义source、transform、sink。

具体操作流程如下:

StreamTableEnvironment tableEnv = ... // 创建表的执行环境// 创建一张表,用于读取数据
tableEnv.connect(...).createTemporaryTable("inputTable");// 注册一张表,用于把计算结果输出
tableEnv.connect(...).createTemporaryTable("outputTable");// 通过 Table API 查询算子,得到一张结果表
Table result = tableEnv.from("inputTable").select(...);// 通过SQL查询语句,得到一张结果表
Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...");// 将结果表写入输出表中
result.insertInto("outputTable");

二.创建表环境

创建表环境最简单的方式,就是基于流处理执行环境调 create方法直接创建:

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

表环境(TableEnvironment)是 flink 中集成 Table API & SQL 的核心概念。它负责:

  1. 注册 catalog
  2. 在内部 catalog 中注册表
  3. 执行 SQL 查询
  4. 注册用户自定义函数
  5. 将 DataStream 或 DataSet 转换为表
  6. 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数, 可以用来配置 TableEnvironment 的一些特性。
比如,配置老版本的流式查询(Flink-Streaming-Query):

EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useOldPlanner() // 使用老版本planner 
.inStreamingMode() // 流处理模式 
.build(); 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

基于老版本的批处理环境(Flink-Batch-Query):

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment;BatchTableEnvironment batchTableEnv = BatchTableEnvironment.create(batchEnv);

基于 blink 版本的流处理环境(Blink-Streaming-Query):

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode().build();StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

基于 blink 版本的批处理环境(Blink-Batch-Query):

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode().build();TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

三.在 Catalog 中注册表

3.1 表(Table)的概念

TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个Catalog-Table 表之间的 map。

表(Table)是由一个“标识符”来指定的,由 3 部分组成:Catalog 名、数据库(database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

表可以是常规的(Table,表),或者虚拟的(View,视图)。常规表(Table)一般可以 用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从 DataStream 转 换而来。视图可以从现有的表中创建,通常是 table API 或者 SQL 查询的一个结果。

3.2 连接到文件系统(Csv 格式)

连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传 入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做 FileSystem()。

代码如下:

tableEnv 
.connect( new FileSystem().path("sensor.txt")) // 定义表数据来源,外部连接 
.withFormat(new OldCsv()) // 定义从外部系统读取数据之后的格式化方法
.withSchema( new Schema() 
.field("id", DataTypes.STRING()) 
.field("timestamp", DataTypes.BIGINT()) 
.field("temperature", DataTypes.DOUBLE()) ) // 定义表结构 
.createTemporaryTable("inputTable"); // 创建临时表

这是旧版本的 csv 格式描述器。由于它是非标的,跟外部系统对接并不通用,所以将被 弃用,以后会被一个符合 RFC-4180 标准的新 format 描述器取代。新的描述器就叫 Csv(),但 flink 没有直接提供,需要引入依赖 flink-csv:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.9.0</version>
</dependency>

代码非常类似,只需要把 withFormat 里的 OldCsv 改成 Csv 就可以了。

3.3 连接到 Kafka

kafka 的连接器 flink-kafka-connector 中,1.9 版本的已经提供了 Table API 的支持。我们 可以在 connect 方法中直接传入一个叫做 Kafka 的类,这就是 kafka 连接器的描述器 ConnectorDescriptor。

tableEnv.connect(
new Kafka()
.version("0.11") // 定义 kafka 的版本
.topic("sensor") // 定义主题
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat(new Csv())
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("timestamp", DataTypes.BIGINT())
.field("temperature", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaInputTable");

当然也可以连接到 ElasticSearch、MySql、HBase、Hive 等外部系统,实现方式基本上是 类似的。

四. 表的查询

利用外部系统的连接器 connector,我们可以读写数据,并在环境的 Catalog 中注册表。 接下来就可以对表做查询转换了。

Flink 给我们提供了两种查询方式:Table API 和 SQL。

4.1 Table API 的调用

Table API 是集成在 Scala 和 Java 语言内的查询 API。与 SQL 不同,Table API 的查询不会 用字符串表示,而是在宿主语言中一步一步调用完成的。

Table API 基于代表一张“表”的 Table 类,并提供一整套操作处理的方法 API。这些方 法会返回一个新的 Table 对象,这个对象就表示对输入表应用转换操作的结果。有些关系型 转换操作,可以由多个方法调用组成,构成链式调用结构。例如 table.select(…).filter(…),其 中 select(…)表示选择表中指定的字段,filter(…)表示筛选条件。

代码中的实现如下:

Table sensorTable = tableEnv.from("inputTable");Table resultTable = senorTable
.select("id, temperature")
.filter("id ='sensor_1'");

4.2 SQL 查询

Flink 的 SQL 集成,基于的是 ApacheCalcite,它实现了 SQL 标准。在 Flink 中,用常规字 符串来定义 SQL 查询语句。SQL 查询的结果,是一个新的 Table。

代码实现如下:

Table resultSqlTable = tableEnv.sqlQuery("select id, temperature from
inputTable where id ='sensor_1'");

当然,也可以加上聚合操作,比如我们统计每个 sensor 温度数据出现的个数,做个 count统计:

Table aggResultTable = sensorTable
.groupBy("id")
.select("id, id.count as count");

SQL 的实现:

Table aggResultSqlTable = tableEnv.sqlQuery("select id, count(id) as cnt from
inputTable group by id");

这里 Table API 里指定的字段,前面加了一个单引号’,这是 Table API 中定义的 Expression类型的写法,可以很方便地表示一个表中的字段。 字段可以直接全部用双引号引起来,也可以用半边单引号+字段名的方式。以后的代码
中,一般都用后一种形式。

五. 将DataStream 转换成表

Flink 允许我们把 Table 和 DataStream 做转换:我们可以基于一个 DataStream,先流式 地读取数据源,然后 map 成 POJO,再把它转成 Table。Table 的列字段(column fields),就是 POJO 里的字段,这样就不用再麻烦地定义 schema 了。

5.1 代码表达

代码中实现非常简单,直接用 tableEnv.fromDataStream()就可以了。默认转换后的 Table schema 和 DataStream 中的字段定义一一对应,也可以单独指定出来。
这就允许我们更换字段的顺序、重命名,或者只选取某些字段出来,相当于做了一次

map 操作(或者 Table API 的 select 操作)。

代码具体如下:

DataStream<String> inputStream = env.readTextFile("sensor.txt");DataStream<SensorReading> dataStream = inputStream
.map( line -> {
String[] fields = line.split(",");return new SensorReading(fields[0], new Long(fields[1]), new
Double(fields[2]));} );Table sensorTable = tableEnv.fromDataStream(dataStream, "id, timestamp.rowtime as ts, temperature");

5.2 数据类型与 Table schema 的对应

在上节的例子中,DataStream 中的数据类型,与表的 Schema 之间的对应关系,是按 照类中的字段名来对应的(name-based mapping),所以还可以用 as 做重命名。

基于名称的对应:

Table sensorTable = tableEnv.fromDataStream(dataStream, "timestamp as ts, id as myId, temperature");

Flink 的 DataStream 和 DataSet API 支持多种类型。

组合类型,比如元组(内置 Scala 和 Java 元组)、POJO、Scala case 类和 Flink 的 Row 类 型等,允许具有多个字段的嵌套数据结构,这些字段可以在 Table 的表达式中访问。其他类 型,则被视为原子类型。

六. 创建临时视图(Temporary View)

1.10及之后的版本才支持
创建临时视图的第一种方式,就是直接从 DataStream 转换而来。同样,可以直接对应 字段转换;也可以在转换的时候,指定相应的字段。

代码如下:

tableEnv.createTemporaryView("sensorView", dataStream);
tableEnv.createTemporaryView("sensorView", dataStream, "id, temperature, timestamp as ts");

另外,当然还可以基于 Table 创建视图:

tableEnv.createTemporaryView("sensorView", sensorTable);

View 和 Table 的 Schema 完全相同。事实上,在 Table API 中,可以认为 View 和 Table是等价的。

七. 输出表

表的输出,是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口,可以 支持不同的文件格式、存储数据库和消息队列。

具体实现,输出表最直接的方法,就是通过 Table.insertInto() 方法将一个 Table 写入 注册过的 TableSink 中。

7.1 输出到文件

代码如下:

// 注册输出表
tableEnv.connect(new FileSystem().path("…\\resources\\out.txt")
) // 定义到文件系统的连接
.withFormat(new Csv()) // 定义格式化方法,Csv 格式
.withSchema(new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
) // 定义表结构
.createTemporaryTable("outputTable"); // 创建临时表resultSqlTable.insertInto("outputTable");

7.2 更新模式(Update Mode)

在流处理过程中,表的处理并不像传统定义的那样简单。

对于流式查询(Streaming Queries),需要声明如何在(动态)表和外部连接器之间执行 转换。与外部系统交换的消息类型,由更新模式(update mode)指定。

Flink Table API 中的更新模式有以下三种:

  1. 追加模式(Append Mode)
    在追加模式下,表(动态表)和外部连接器只交换插入(Insert)消息。

  2. 撤回模式(Retract Mode)
    在撤回模式下,表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。
    ⚫ 插入(Insert)会被编码为添加消息;
    ⚫ 删除(Delete)则编码为撤回消息;
    ⚫ 更新(Update)则会编码为,已更新行(上一行)的撤回消息,和更新行(新行) 的添加消息。
    在此模式下,不能定义 key,这一点跟 upsert 模式完全不同。

  3. Upsert(更新插入)模式
    在 Upsert 模式下,动态表和外部连接器交换 Upsert 和 Delete 消息。 这个模式需要一个唯一的 key,通过这个 key 可以传递更新消息。为了正确应用消息,
    外部连接器需要知道这个唯一 key 的属性。
    • 插入(Insert)和更新(Update)都被编码为 Upsert 消息;
    • 删除(Delete)编码为 Delete 信息。
    这种模式和 Retract 模式的主要区别在于,Update 操作是用单个消息编码的,所以效率 会更高。

7.3 输出到 Kafka

除了输出到文件,也可以输出到 Kafka。我们可以结合前面 Kafka 作为输入数据,构建 数据管道,kafka 进,kafka 出。

代码如下:

// 输出到 kafkatableEnv.connect(new Kafka()
.version("0.11")
.topic("sinkTest")
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")
)
.withFormat( new Csv() )
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("temp", DataTypes.DOUBLE())
)
.createTemporaryTable("kafkaOutputTable");resultTable.insertInto("kafkaOutputTable");

7.4 输出到 ElasticSearch

ElasticSearch 的 connector 可以在 upsert(update+insert,更新插入)模式下操作,这样 就可以使用 Query 定义的键(key)与外部系统交换 UPSERT/DELETE 消息。

另外,对于“仅追加”(append-only)的查询,connector 还可以在 append 模式下操作, 这样就可以与外部系统只交换 insert 消息。

es 目前支持的数据格式,只有 Json,而 flink 本身并没有对应的支持,所以还需要引入 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.9.0</version>
</dependency>

代码实现如下:

// 输出到 estableEnv.connect(new Elasticsearch()
.version("6")
.host("localhost", 9200, "http")
.index("sensor")
.documentType("temp")
.inUpsertMode()   // 指定是 Upsert 模式
.withFormat(new Json())
.withSchema( new Schema()
.field("id", DataTypes.STRING())
.field("count", DataTypes.BIGINT())
)
.createTemporaryTable("esOutputTable");aggResultTable.insertInto("esOutputTable");

7.5 输出到 MySql

Flink 专门为 Table API 的 jdbc 连接提供了 flink-jdbc 连接器,我们需要先引入依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.12</artifactId><version>1.10.1</version>
</dependency>

jdbc 连接的代码实现比较特殊,因为没有对应的 java/scala 类实现 ConnectorDescriptor, 所以不能直接 tableEnv.connect()。不过 Flink SQL 留下了执行 DDL 的接口:tableEnv.sqlUpdate()。

对于 jdbc 的创建表操作,天生就适合直接写 DDL 来实现,所以我们的代码可以这样写:

// 输出到 MysqlString sinkDDL= "create table jdbcOutputTable (" +
"	id varchar(20) not null, " +
"	cnt bigint not null " +
") with (" +
"	'connector.type' = 'jdbc', " +
"	'connector.url' = 'jdbc:mysql://localhost:3306/test', " +
"	'connector.table' = 'sensor_count', " +
"	'connector.driver' = 'com.mysql.jdbc.Driver', " +
"	'connector.username' = 'root', " +
"	'connector.password' = '123456' )";tableEnv.sqlUpdate(sinkDDL);	// 执行 DDL 创建表aggResultSqlTable.insertInto("jdbcOutputTable");

八. 将表转换成 DataStream

表可以转换为 DataStream 或 DataSet。这样,自定义流处理或批处理程序就可以继续在 Table API 或 SQL 查询的结果上运行了。
将表转换为 DataStream 或 DataSet 时,需要指定生成的数据类型,即要将表的每一行转 换成的数据类型。通常,最方便的转换类型就是 Row。当然,因为结果的所有字段类型都是 明确的,我们也经常会用元组类型来表示。

表作为流式查询的结果,是动态更新的。所以,将这种动态查询转换成的数据流,同样 需要对表的更新操作进行编码,进而有不同的转换模式。
Table API 中表到 DataStream 有两种模式:
• 追加模式(Append Mode) 用于表只会被插入(Insert)操作更改的场景。
• 撤回模式(Retract Mode)
用于任何场景。有些类似于更新模式中 Retract 模式,它只有 Insert 和 Delete 两类操作。 得到的数据会增加一个 Boolean 类型的标识位(返回的第一个字段),用它来表示到底
是新增的数据(Insert),还是被删除的数据(老数据, Delete)。

代码实现如下:

DataStream<Row> resultStream = tableEnv.toAppendStream(resultTable, Row.class);
DataStream<Tuple2<Boolean, Row>> aggResultStream = tableEnv.toRetractStream(aggResultTable, Row.class);resultStream.print("result"); aggResultStream.print("aggResult");

所以,没有经过 groupby 之类聚合操作,可以直接用 toAppendStream 来转换;而如果 经过了聚合,有更新操作,一般就必须用 toRetractDstream。

九. Query 的解释和执行

Table API 提供了一种机制来解释(Explain)计算表的逻辑和优化查询计划。这是通过
TableEnvironment.explain(table)方法或 TableEnvironment.explain()方法完成的。 explain 方法会返回一个字符串,描述三个计划:
⚫ 未优化的逻辑查询计划
⚫ 优化后的逻辑查询计划
⚫ 实际执行计划

我们可以在代码中查看执行计划:

String explaination = tableEnv.explain(resultTable);System.out.println(explaination);

Query 的解释和执行过程,老 planner 和 blink planner 大体是一致的,又有所不同。整 体来讲,Query 都会表示成一个逻辑查询计划,然后分两步解释:

  1. 优化查询计划
  2. 解释成 DataStream 或者 DataSet 程序

而 Blink 版本是批流统一的,所以所有的 Query,只会被解释成 DataStream 程序;另外 在批处理环境 TableEnvironment 下,Blink 版本要到 tableEnv.execute()执行调用才开始解释。

十. 案例

因为我本地Flink 1.9.0版本,与课程上的代码会存在一定的差异

pom文件配置

<!-- csv --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>1.9.0</version>
</dependency>

10.1 输入案例

代码:

package org.flink.tableapi;import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;/*** @author 只是甲* @date    2021-09-26*/public class TableTest2_CommonApi {public static void main(String[] args) throws Exception{// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 1.1 基于老版本planner的流处理EnvironmentSettings oldStreamSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment oldStreamTableEnv = StreamTableEnvironment.create(env, oldStreamSettings);// 1.2 基于老版本planner的批处理ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();BatchTableEnvironment oldBatchTableEnv = BatchTableEnvironment.create(batchEnv);// 1.3 基于Blink的流处理EnvironmentSettings blinkStreamSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings);// 1.4 基于Blink的批处理EnvironmentSettings blinkBatchSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();TableEnvironment blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings);// 2. 表的创建:连接外部系统,读取数据// 2.1 读取文件String filePath = "C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new OldCsv().field("id", Types.STRING()).field("timestamp", Types.INT()).field("temp", Types.DOUBLE())).withSchema(new Schema().field("id", Types.STRING()).field("timestamp", Types.INT()).field("temp", Types.DOUBLE()))//.inAppendMode().registerTableSource("inputTable");Table inputTable = tableEnv.scan("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 打印输出tableEnv.toAppendStream(resultTable, Row.class).print("result");tableEnv.toRetractStream(aggTable, Row.class).print("agg");tableEnv.toRetractStream(sqlAggTable, Row.class).print("sqlagg");env.execute();}
}

测试记录:
image.png

10.2 输出案例

从文件读取,处理后,输出到文件

代码:

package org.flink.tableapi;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;/*** @author 只是甲* @date    2021-09-26*/public class TableTest3_FileOutput {public static void main(String[] args) throws Exception {// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 2. 表的创建:连接外部系统,读取数据// 读取文件String filePath = "C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor.txt";tableEnv.connect( new FileSystem().path(filePath)).withFormat( new OldCsv().field("id", Types.STRING()).field("timestamp", Types.INT()).field("temp", Types.DOUBLE())).withSchema(new Schema().field("id", Types.STRING()).field("timestamp", Types.INT()).field("temp", Types.DOUBLE())).registerTableSource("inputTable");Table inputTable = tableEnv.scan("inputTable");
//        inputTable.printSchema();
//        tableEnv.toAppendStream(inputTable, Row.class).print();// 3. 查询转换// 3.1 Table API// 简单转换Table resultTable = inputTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计Table aggTable = inputTable.groupBy("id").select("id, id.count as count, temp.avg as avgTemp");// 3.2 SQLtableEnv.sqlQuery("select id, temp from inputTable where id = 'senosr_6'");Table sqlAggTable = tableEnv.sqlQuery("select id, count(id) as cnt, avg(temp) as avgTemp from inputTable group by id");// 4. 输出到文件// 连接外部文件注册输出表String outputPath = "C:\\Users\\Administrator\\IdeaProjects\\FlinkStudy\\src\\main\\resources\\sensor_out.txt";tableEnv.connect( new FileSystem().path(outputPath)).withFormat( new OldCsv().field("id", Types.STRING()).field("temp", Types.DOUBLE())).withSchema(new Schema().field("id", Types.STRING()).field("temp", Types.DOUBLE())).registerTableSink("outputTable");resultTable.insertInto("outputTable");
//        aggTable.insertInto("outputTable");env.execute();}
}

10.3 Kafka输入和输出

本案例测试Kafka输入和输出

代码:

package org.flink.tableapi;import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.OldCsv;
import org.apache.flink.table.descriptors.Schema;/*** @author 只是甲* @date    2021-09-26*/public class TableTest5_KafkaPipeLine {public static void main(String[] args) throws Exception {// 1. 创建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);// 2. 连接Kafka,读取数据tableEnv.connect(new Kafka().version("universal").topic("flink_source").startFromLatest().property("zookeeper.connect", "10.31.1.124:2181").property("bootstrap.servers", "10.31.1.124:9092").property("group.id", "testGroup")).withFormat(new Csv().fieldDelimiter(',').deriveSchema()).withSchema(new Schema().field("id", Types.STRING()).field("timestamp", Types.LONG()).field("temp", Types.DOUBLE())).inAppendMode().registerTableSource("inputTable");// 3. 查询转换// 简单转换Table sensorTable = tableEnv.scan("inputTable");Table resultTable = sensorTable.select("id, temp").filter("id === 'sensor_6'");// 聚合统计//Table aggTable = sensorTable.groupBy("id")//        .select("id, id.count as count, temp.avg as avgTemp");// 4. 建立kafka连接,输出到不同的topic下tableEnv.connect(new Kafka().version("universal").topic("flink_sink").property("zookeeper.connect", "10.31.1.124:2181").property("bootstrap.servers", "10.31.1.124:9092").property("group.id", "testGroup")).withFormat(new Csv().fieldDelimiter(',').deriveSchema()).withSchema(new Schema().field("id", Types.STRING())//.field("timestamp", Types.LONG()).field("temp", Types.DOUBLE())).inAppendMode().registerTableSink("outputTable");resultTable.insertInto("outputTable");env.execute();}
}

开启Kafka生产者:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-producer.sh --broker-list 10.31.1.124:9092 --topic flink_source

输入信息:

sensor_1,1547718199,35.8
sensor_6,1547718201,15.4
sensor_7,1547718202,6.7
sensor_10,1547718205,38.1
sensor_1,1547718207,36.3
sensor_1,1547718209,32.8
sensor_1,1547718212,37.1

开启Kafka消费者:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/kafka/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server 10.31.1.124:9092 --topic flink_sink

参考:

  1. https://www.bilibili.com/video/BV1qy4y1q728
  2. https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_11-table-api%e5%92%8cflink-sql
查看全文
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

相关文章

  1. springboot+Mybatis+thymeleaf+pageHelper实现分页查询

    maven依赖 <!-- 分页插件 --><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId><version>1.2.12</version></dependency><!--测试类--><depend…...

    2024/4/28 20:45:25
  2. GreenHand爬虫系列06——爬取高清壁纸

    鄙人的第5篇拙作被屏蔽了&#xff0c;为了不再惹沈河大大生气&#xff0c;我会写得更加简略&#xff0c;敬请原宥&#xff01; 目标网址&#xff1a;https://ss.netnr.com/wallpaper#22 1.导入模块 #json爬取壁纸 import json import os import re import requests2.初始化函数…...

    2024/4/28 21:50:08
  3. 极客头条:网易回应旗下游戏集体崩溃:用干冰修好了服务器;GitHub更换CEO,由产品主管接手;Nginx 1.21.4发布

    一分钟速览新闻点&#xff01; 网易回应旗下游戏集体崩溃&#xff1a;用干冰修好了服务器腾讯披露三款自研芯片58同城&#xff1a;网店客服平均月薪达7230元前华为高管杨柘从小米离职&#xff0c;加入中田体育任首席战略官腾讯将投入200亿培育超1000家中小企业小米获MI商标转让…...

    2024/4/27 9:09:31
  4. 自相关系数在脑电中的应用

    关注“心仪脑”查看更多脑科学知识的分 关键词&#xff1a;自相关系数、脑电 在介绍自相关系数之前先介绍一下相关系数。 01什么是相关系数 最常用的相关系数全称是皮尔逊相关系数&#xff0c;两个变量X和Y之间的相关程度&#xff0c;准确来说是线性相关程度。 。分子是X&am…...

    2024/4/28 3:01:21
  5. 聊聊spring事务失效的12种场景

    前言 对于从事java开发工作的同学来说&#xff0c;spring的事务肯定再熟悉不过了。 在某些业务场景下&#xff0c;如果一个请求中&#xff0c;需要同时写入多张表的数据。为了保证操作的原子性&#xff08;要么同时成功&#xff0c;要么同时失败&#xff09;&#xff0c;避免…...

    2024/4/28 23:35:08
  6. 打印九九口诀表

    下面是一个完整的下三角九九口诀表&#xff1a; 1*11 2*12 2*24 3*13 3*26 3*39 4*14 4*28 4*312 4*416 5*15 5*210 5*315 5*420 5*525 6*16 6*212 6*318 6*424 6*530 6*636 7*17 7*214 7*321 7*428 7*535 7*642 7*749 8*18 8*216 8*32…...

    2024/4/28 16:28:27
  7. 每日一题:367. 有效的完全平方数

    解题思路 可以用sqrt函数解决 但是如果是进阶方法 可以用减奇数的方式 例如11 413 9135 161357 代码 class Solution {public boolean isPerfectSquare(int num) {int n 1;while(num>0){num num - n;n 2;}return num0;} }...

    2024/4/28 21:59:01
  8. 计算机网络 之 常见网络协议相关总结

    1 TCP/IP,TCP的相关编程。IP地址分为几类,如何用二进制编码区分? TCP与UDP相比存在可靠的传输保证,三次握手协议等,但UDP速度快,TCP要保证数据的接受要有序,每个数据包都有一个信号。而且TCP要求ACK。 TCP/IP由网络层的IP协议和传输层的TCP协议组成,协议采用了四层的层级…...

    2024/4/28 3:05:45
  9. HTML5画布和javascript烟花js特效

    下载地址 学会做一个烟花HTML5画布和JavaScript。在这个演示,像素大小的火箭发射自动从底部的屏幕,通过鼠标的点击。您还可以单击并拖动来启动多个焰火。    本教程介绍了跟踪和对象包含在数组循环,跟踪鼠标点击和坐标,动画烟花特定目标坐标,使美丽的粒子爆炸通过使用随机…...

    2024/4/28 8:27:51
  10. 软件开发算法中的一些数学问题分享,ICG游戏

    软件开发算法中的一些数学问题分享&#xff0c;ICG游戏 1&#xff0c;石子游戏 题目出处力扣 亚历克斯和李用几堆石子在做游戏。偶数堆石子排成一行&#xff0c;每堆都有正整数颗石子 piles[i] 。 游戏以谁手中的石子最多来决出胜负。石子的总数是奇数&#xff0c;所以没有平…...

    2024/4/28 3:54:40
  11. 2021电赛D 题《基于互联网的摄像测量系统》

    2021电赛D 题《基于互联网的摄像测量系统》任务要求说明方案选择实现过程任务 设计并制作一个基于互联网的摄像测量系统。系统构成如图 1 所示。图中边长为 1 米的正方形区域三个顶点分别为 A、B 和 O。系统有两个独立的摄像节点&#xff0c;分别放置在 A 和 B。两个摄像节点拍…...

    2024/4/28 12:45:57
  12. FastThreadLocal线程为啥比ThreadLocal线程快

    1、FastThreadLocal的引入背景和原理简介 既然jdk已经有ThreadLocal&#xff0c;为何netty还要自己造个FastThreadLocal&#xff1f;FastThreadLocal快在哪里&#xff1f; 这需要从jdk ThreadLocal的本身说起。如下图&#xff1a; 在java线程中&#xff0c;每个线程都有一个T…...

    2024/4/27 21:58:50
  13. grep的时候Binary file matches **.log 怎么解决

    操作 grep "xxx" a.log 结果 Binary file a.log matches 原因&#xff1a;grep认为a.log是二进制文件 解决方法&#xff1a;grep -a "xxx" a.log 可以看看grep -a参数的功能 [appadmintest3 ~/tmp]$ grep --help |grep /-a -a, --text equivalent to --bin…...

    2024/4/28 12:30:36
  14. 综述(五)无人驾驶中决策系统的工作流程及原理

    汽车在通过感知系统获取到周围环境的信息后&#xff0c;需要进行最为核心的大脑决策工作&#xff0c;通过采用最优的决策方法&#xff0c;达到安全驾驶汽车的目的。决策结果最终输出给执行模块&#xff0c;实现汽车的加减速、转向、变换车道、超越前车等功能。同时需要监视周围…...

    2024/4/28 11:19:04
  15. c语言学习(初识数据类型)

    数据类型: 数据类型指的是用于声明不同类型的变量或函数的一个广泛的系统。变量的类型决定了变量存储占用的空间&#xff0c;以及如何解释存储的位模式&#xff08;位模式&#xff1a;计算机中所有二进制的0、1代码所组成的数字串。计算机的储存模式规定&#xff0c;储存单位以…...

    2024/4/15 5:03:48
  16. 文件存储技术-数据持久化-Android

    学习笔记持久化技术简介第一种方式:文件存储将数据存储到文件中从文件中读取数据案例-文件存储技术MainActivity.ktactivity_main.xml持久化技术简介 数据持久化就是指将那些内存中的瞬时数据保存到存储设备中&#xff0c;保证即使在手机或计算机关机的情况下&#xff0c;这些数…...

    2024/4/27 23:33:41
  17. nginx重定向方法

    # 将/login页面重定向到/login.html&#xff0c;并且带参数rewrite ^/login$ https://xx.cn/login.html$1;#通过$request_uri变量匹配所有的URI。rewrite ^ https://www.xxxx.com$request_uri? permanent;# 使用return指令&#xff0c;通过301状态码和$request_uri参数(推荐)r…...

    2024/4/28 3:12:07
  18. Windows下MySQL安装

    在Windows下安装MySQL 压缩包安装与安装包安装的区别&#xff1a; &#xff08;1&#xff09;压缩包&#xff1a; 可以自由选择路径安装&#xff0c;可以不用来回切换屏幕。当不需要时&#xff0c;停止服务&#xff0c;删除文件即可。 &#xff08;2&#xff09;安装包&#x…...

    2024/4/29 0:03:20
  19. springboot 集成cas5.3 实现自定义认证策略

    如果CAS框架提供的方案还是不能满足我们的需要&#xff0c;比如我们不仅需要用户名和密码&#xff0c;还要验证其他信息&#xff0c;比如邮箱&#xff0c;手机号&#xff0c;但是邮箱&#xff0c;手机信息在另一个数据库&#xff0c;还有在一段时间内同一IP输入错误次数限制等。…...

    2024/4/28 1:57:43
  20. BCJC 60 《图灵的秘密》读书笔记6希尔伯特的思想

    数学家希尔伯特有一思想.那就是"人定胜天" 他说世界上根本沒有不可解的问题. “我们必须知道” “我们必将知道” 这一思想为我们后人开拓创新.探索大自然增加了动力....

    2024/4/28 0:29:08

最新文章

  1. BTCOIN的革命之路:通过SocialFi重塑全球金融生态系统

    BTCOIN的革命之路&#xff1a;通过SocialFi重塑全球金融生态系统 今日&#xff0c;BTCOIN宣布发布WEB3.0论坛引发业内现象级关注&#xff1a;作为一个倡导WEB3.0理念的数字金融平台&#xff0c;在数字货币的波澜壮阔中&#xff0c;BTCOIN以其独特的生态定位和战略愿景&#xff…...

    2024/4/29 0:23:41
  2. 梯度消失和梯度爆炸的一些处理方法

    在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言&#xff0c;在此感激不尽。 权重和梯度的更新公式如下&#xff1a; w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...

    2024/3/20 10:50:27
  3. Linux nsenter命令全面解析

    Linux nsenter命令是一个强大的工具&#x1f6e0;️&#xff0c;用于进入到已存在的命名空间&#xff08;Namespace&#xff09;中执行命令。由于Linux的命名空间技术是构建容器技术的基础&#xff0c;nsenter因此成为了容器管理和调试中不可或缺的工具&#x1f433;。本文将从…...

    2024/4/25 15:31:15
  4. Unity核心学习

    目录 认识模型的制作流程模型的制作过程 2D相关图片导入设置图片导入概述纹理类型设置纹理形状设置纹理高级设置纹理平铺拉伸设置纹理平台打包相关设置 SpriteSprite Editor——Single图片编辑Sprite Editor——Multiple图片编辑Sprite Editor——Polygon图片编辑SpriteRendere…...

    2024/4/24 7:49:17
  5. ASP.NET Core 标识(Identity)框架系列(一):如何使用 ASP.NET Core 标识(Identity)框架创建用户和角色?

    前言 ASP.NET Core 内置的标识&#xff08;identity&#xff09;框架&#xff0c;采用的是 RBAC&#xff08;role-based access control&#xff0c;基于角色的访问控制&#xff09;策略&#xff0c;是一个用于管理用户身份验证、授权和安全性的框架。 它提供了一套工具和库&…...

    2024/4/26 14:55:59
  6. 【外汇早评】美通胀数据走低,美元调整

    原标题:【外汇早评】美通胀数据走低,美元调整昨日美国方面公布了新一期的核心PCE物价指数数据,同比增长1.6%,低于前值和预期值的1.7%,距离美联储的通胀目标2%继续走低,通胀压力较低,且此前美国一季度GDP初值中的消费部分下滑明显,因此市场对美联储后续更可能降息的政策…...

    2024/4/28 13:52:11
  7. 【原油贵金属周评】原油多头拥挤,价格调整

    原标题:【原油贵金属周评】原油多头拥挤,价格调整本周国际劳动节,我们喜迎四天假期,但是整个金融市场确实流动性充沛,大事频发,各个商品波动剧烈。美国方面,在本周四凌晨公布5月份的利率决议和新闻发布会,维持联邦基金利率在2.25%-2.50%不变,符合市场预期。同时美联储…...

    2024/4/28 3:28:32
  8. 【外汇周评】靓丽非农不及疲软通胀影响

    原标题:【外汇周评】靓丽非农不及疲软通胀影响在刚结束的周五,美国方面公布了新一期的非农就业数据,大幅好于前值和预期,新增就业重新回到20万以上。具体数据: 美国4月非农就业人口变动 26.3万人,预期 19万人,前值 19.6万人。 美国4月失业率 3.6%,预期 3.8%,前值 3…...

    2024/4/26 23:05:52
  9. 【原油贵金属早评】库存继续增加,油价收跌

    原标题:【原油贵金属早评】库存继续增加,油价收跌周三清晨公布美国当周API原油库存数据,上周原油库存增加281万桶至4.692亿桶,增幅超过预期的74.4万桶。且有消息人士称,沙特阿美据悉将于6月向亚洲炼油厂额外出售更多原油,印度炼油商预计将每日获得至多20万桶的额外原油供…...

    2024/4/28 13:51:37
  10. 【外汇早评】日本央行会议纪要不改日元强势

    原标题:【外汇早评】日本央行会议纪要不改日元强势近两日日元大幅走强与近期市场风险情绪上升,避险资金回流日元有关,也与前一段时间的美日贸易谈判给日本缓冲期,日本方面对汇率问题也避免继续贬值有关。虽然今日早间日本央行公布的利率会议纪要仍然是支持宽松政策,但这符…...

    2024/4/27 17:58:04
  11. 【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响

    原标题:【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响近日伊朗局势升温,导致市场担忧影响原油供给,油价试图反弹。此时OPEC表态稳定市场。据消息人士透露,沙特6月石油出口料将低于700万桶/日,沙特已经收到石油消费国提出的6月份扩大出口的“适度要求”,沙特将满…...

    2024/4/27 14:22:49
  12. 【外汇早评】美欲与伊朗重谈协议

    原标题:【外汇早评】美欲与伊朗重谈协议美国对伊朗的制裁遭到伊朗的抗议,昨日伊朗方面提出将部分退出伊核协议。而此行为又遭到欧洲方面对伊朗的谴责和警告,伊朗外长昨日回应称,欧洲国家履行它们的义务,伊核协议就能保证存续。据传闻伊朗的导弹已经对准了以色列和美国的航…...

    2024/4/28 1:28:33
  13. 【原油贵金属早评】波动率飙升,市场情绪动荡

    原标题:【原油贵金属早评】波动率飙升,市场情绪动荡因中美贸易谈判不安情绪影响,金融市场各资产品种出现明显的波动。随着美国与中方开启第十一轮谈判之际,美国按照既定计划向中国2000亿商品征收25%的关税,市场情绪有所平复,已经开始接受这一事实。虽然波动率-恐慌指数VI…...

    2024/4/28 15:57:13
  14. 【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试

    原标题:【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试美国和伊朗的局势继续升温,市场风险情绪上升,避险黄金有向上突破阻力的迹象。原油方面稍显平稳,近期美国和OPEC加大供给及市场需求回落的影响,伊朗局势并未推升油价走强。近期中美贸易谈判摩擦再度升级,美国对中…...

    2024/4/27 17:59:30
  15. 【原油贵金属早评】市场情绪继续恶化,黄金上破

    原标题:【原油贵金属早评】市场情绪继续恶化,黄金上破周初中国针对于美国加征关税的进行的反制措施引发市场情绪的大幅波动,人民币汇率出现大幅的贬值动能,金融市场受到非常明显的冲击。尤其是波动率起来之后,对于股市的表现尤其不安。隔夜美国股市出现明显的下行走势,这…...

    2024/4/25 18:39:16
  16. 【外汇早评】美伊僵持,风险情绪继续升温

    原标题:【外汇早评】美伊僵持,风险情绪继续升温昨日沙特两艘油轮再次发生爆炸事件,导致波斯湾局势进一步恶化,市场担忧美伊可能会出现摩擦生火,避险品种获得支撑,黄金和日元大幅走强。美指受中美贸易问题影响而在低位震荡。继5月12日,四艘商船在阿联酋领海附近的阿曼湾、…...

    2024/4/28 1:34:08
  17. 【原油贵金属早评】贸易冲突导致需求低迷,油价弱势

    原标题:【原油贵金属早评】贸易冲突导致需求低迷,油价弱势近日虽然伊朗局势升温,中东地区几起油船被袭击事件影响,但油价并未走高,而是出于调整结构中。由于市场预期局势失控的可能性较低,而中美贸易问题导致的全球经济衰退风险更大,需求会持续低迷,因此油价调整压力较…...

    2024/4/26 19:03:37
  18. 氧生福地 玩美北湖(上)——为时光守候两千年

    原标题:氧生福地 玩美北湖(上)——为时光守候两千年一次说走就走的旅行,只有一张高铁票的距离~ 所以,湖南郴州,我来了~ 从广州南站出发,一个半小时就到达郴州西站了。在动车上,同时改票的南风兄和我居然被分到了一个车厢,所以一路非常愉快地聊了过来。 挺好,最起…...

    2024/4/28 1:22:35
  19. 氧生福地 玩美北湖(中)——永春梯田里的美与鲜

    原标题:氧生福地 玩美北湖(中)——永春梯田里的美与鲜一觉醒来,因为大家太爱“美”照,在柳毅山庄去寻找龙女而错过了早餐时间。近十点,向导坏坏还是带着饥肠辘辘的我们去吃郴州最富有盛名的“鱼头粉”。说这是“十二分推荐”,到郴州必吃的美食之一。 哇塞!那个味美香甜…...

    2024/4/25 18:39:14
  20. 氧生福地 玩美北湖(下)——奔跑吧骚年!

    原标题:氧生福地 玩美北湖(下)——奔跑吧骚年!让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 啊……啊……啊 两…...

    2024/4/26 23:04:58
  21. 扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!

    原标题:扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!扒开伪装医用面膜,翻六倍价格宰客!当行业里的某一品项火爆了,就会有很多商家蹭热度,装逼忽悠,最近火爆朋友圈的医用面膜,被沾上了污点,到底怎么回事呢? “比普通面膜安全、效果好!痘痘、痘印、敏感肌都能用…...

    2024/4/27 23:24:42
  22. 「发现」铁皮石斛仙草之神奇功效用于医用面膜

    原标题:「发现」铁皮石斛仙草之神奇功效用于医用面膜丽彦妆铁皮石斛医用面膜|石斛多糖无菌修护补水贴19大优势: 1、铁皮石斛:自唐宋以来,一直被列为皇室贡品,铁皮石斛生于海拔1600米的悬崖峭壁之上,繁殖力差,产量极低,所以古代仅供皇室、贵族享用 2、铁皮石斛自古民间…...

    2024/4/28 5:48:52
  23. 丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者

    原标题:丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者【公司简介】 广州华彬企业隶属香港华彬集团有限公司,专注美业21年,其旗下品牌: 「圣茵美」私密荷尔蒙抗衰,产后修复 「圣仪轩」私密荷尔蒙抗衰,产后修复 「花茵莳」私密荷尔蒙抗衰,产后修复 「丽彦妆」专注医学护…...

    2024/4/26 19:46:12
  24. 广州械字号面膜生产厂家OEM/ODM4项须知!

    原标题:广州械字号面膜生产厂家OEM/ODM4项须知!广州械字号面膜生产厂家OEM/ODM流程及注意事项解读: 械字号医用面膜,其实在我国并没有严格的定义,通常我们说的医美面膜指的应该是一种「医用敷料」,也就是说,医用面膜其实算作「医疗器械」的一种,又称「医用冷敷贴」。 …...

    2024/4/27 11:43:08
  25. 械字号医用眼膜缓解用眼过度到底有无作用?

    原标题:械字号医用眼膜缓解用眼过度到底有无作用?医用眼膜/械字号眼膜/医用冷敷眼贴 凝胶层为亲水高分子材料,含70%以上的水分。体表皮肤温度传导到本产品的凝胶层,热量被凝胶内水分子吸收,通过水分的蒸发带走大量的热量,可迅速地降低体表皮肤局部温度,减轻局部皮肤的灼…...

    2024/4/27 8:32:30
  26. 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...

    解析如下&#xff1a;1、长按电脑电源键直至关机&#xff0c;然后再按一次电源健重启电脑&#xff0c;按F8健进入安全模式2、安全模式下进入Windows系统桌面后&#xff0c;按住“winR”打开运行窗口&#xff0c;输入“services.msc”打开服务设置3、在服务界面&#xff0c;选中…...

    2022/11/19 21:17:18
  27. 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。

    %读入6幅图像&#xff08;每一幅图像的大小是564*564&#xff09; f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...

    2022/11/19 21:17:16
  28. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...

    win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面&#xff0c;在等待界面中我们需要等待操作结束才能关机&#xff0c;虽然这比较麻烦&#xff0c;但是对系统进行配置和升级…...

    2022/11/19 21:17:15
  29. 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...

    有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows&#xff0c;请勿关闭计算机”的提示&#xff0c;要过很久才能进入系统&#xff0c;有的用户甚至几个小时也无法进入&#xff0c;下面就教大家这个问题的解决方法。第一种方法&#xff1a;我们首先在左下角的“开始…...

    2022/11/19 21:17:14
  30. win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...

    置信有很多用户都跟小编一样遇到过这样的问题&#xff0c;电脑时发现开机屏幕显现“正在配置Windows Update&#xff0c;请勿关机”(如下图所示)&#xff0c;而且还需求等大约5分钟才干进入系统。这是怎样回事呢&#xff1f;一切都是正常操作的&#xff0c;为什么开时机呈现“正…...

    2022/11/19 21:17:13
  31. 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...

    Win7系统开机启动时总是出现“配置Windows请勿关机”的提示&#xff0c;没过几秒后电脑自动重启&#xff0c;每次开机都这样无法进入系统&#xff0c;此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一&#xff1a;开机按下F8&#xff0c;在出现的Windows高级启动选…...

    2022/11/19 21:17:12
  32. 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...

    有不少windows10系统用户反映说碰到这样一个情况&#xff0c;就是电脑提示正在准备windows请勿关闭计算机&#xff0c;碰到这样的问题该怎么解决呢&#xff0c;现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法&#xff1a;1、2、依次…...

    2022/11/19 21:17:11
  33. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...

    今天和大家分享一下win7系统重装了Win7旗舰版系统后&#xff0c;每次关机的时候桌面上都会显示一个“配置Windows Update的界面&#xff0c;提示请勿关闭计算机”&#xff0c;每次停留好几分钟才能正常关机&#xff0c;导致什么情况引起的呢&#xff1f;出现配置Windows Update…...

    2022/11/19 21:17:10
  34. 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...

    只能是等着&#xff0c;别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚&#xff0c;只能是考虑备份数据后重装系统了。解决来方案一&#xff1a;管理员运行cmd&#xff1a;net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...

    2022/11/19 21:17:09
  35. 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?

    原标题&#xff1a;电脑提示“配置Windows Update请勿关闭计算机”怎么办&#xff1f;win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢&#xff1f;一般的方…...

    2022/11/19 21:17:08
  36. 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...

    关机提示 windows7 正在配置windows 请勿关闭计算机 &#xff0c;然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;关机提示 windows7 正在配…...

    2022/11/19 21:17:05
  37. 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...

    钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...

    2022/11/19 21:17:05
  38. 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...

    前几天班里有位学生电脑(windows 7系统)出问题了&#xff0c;具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面&#xff0c;长时间没反应&#xff0c;无法进入系统。这个问题原来帮其他同学也解决过&#xff0c;网上搜了不少资料&#x…...

    2022/11/19 21:17:04
  39. 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...

    本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法&#xff0c;并在最后教给你1种保护系统安全的好方法&#xff0c;一起来看看&#xff01;电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中&#xff0c;添加了1个新功能在“磁…...

    2022/11/19 21:17:03
  40. 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...

    许多用户在长期不使用电脑的时候&#xff0c;开启电脑发现电脑显示&#xff1a;配置windows更新失败&#xff0c;正在还原更改&#xff0c;请勿关闭计算机。。.这要怎么办呢&#xff1f;下面小编就带着大家一起看看吧&#xff01;如果能够正常进入系统&#xff0c;建议您暂时移…...

    2022/11/19 21:17:02
  41. 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...

    配置windows update失败 还原更改 请勿关闭计算机&#xff0c;电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;配置windows update失败 还原更改 请勿关闭计算机&#x…...

    2022/11/19 21:17:01
  42. 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...

    不知道大家有没有遇到过这样的一个问题&#xff0c;就是我们的win7系统在关机的时候&#xff0c;总是喜欢显示“准备配置windows&#xff0c;请勿关机”这样的一个页面&#xff0c;没有什么大碍&#xff0c;但是如果一直等着的话就要两个小时甚至更久都关不了机&#xff0c;非常…...

    2022/11/19 21:17:00
  43. 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...

    当电脑出现正在准备配置windows请勿关闭计算机时&#xff0c;一般是您正对windows进行升级&#xff0c;但是这个要是长时间没有反应&#xff0c;我们不能再傻等下去了。可能是电脑出了别的问题了&#xff0c;来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...

    2022/11/19 21:16:59
  44. 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...

    我们使用电脑的过程中有时会遇到这种情况&#xff0c;当我们打开电脑之后&#xff0c;发现一直停留在一个界面&#xff1a;“配置Windows Update失败&#xff0c;还原更改请勿关闭计算机”&#xff0c;等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢&#xff0…...

    2022/11/19 21:16:58
  45. 如何在iPhone上关闭“请勿打扰”

    Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...

    2022/11/19 21:16:57