背景:kafka 客户端之producer API发送消息以及简单源码分析

从Kafka 0.11开始,KafkaProducer又支持两种模式:幂等生产者和事务生产者。幂等生产者加强了Kafka的交付语义,从至少一次交付到精确一次交付。特别是生产者的重试将不再引入重复。事务性生产者允许应用程序原子地将消息发送到多个分区(和主题)。

幂等性

Kafka在 0.11 版本引入了一项重大特性,幂等性。所谓的幂等性就是指 Producer 不论向 Server 发送多少次重复数据,Server 端都只会持久化一条。
拿 http 举例来说,一次或多次请求,得到的响应是一致的(网络超时等问题除外),换句话说,就是执行多次操作与执行一次操作的影响是一样的。

如果,某个系统是不具备幂等性的,如果用户重复提交了某个表格,就可能会造成不良影响。例如:用户在浏览器上点击了多次提交订单按钮,会在后台生成多个一模一样的订单。

幂等性基本原理

在这里插入图片描述
对于Kafka来说,要解决的是生产者发送消息的幂等问题。在生产者生产消息时,如果出现 retry 时,有可能会一条消息被发送了多次,如果Kafka不具备幂等性的,就有可能会在partition中保存多条一模一样的消息。

为了实现生产者的幂等性,Kafka 引入了 Producer ID(PID)和 Sequence Number 的概念。

  • PID:每个 Producer 在初始化时,都会分配一个唯一的 PID,这个 PID 对用户来说,是透明的
  • Sequence Number:针对每个生产者(对应 PID )发送到指定主题分区的消息都对应一个从 0 开始递增的 Sequence Number,Server 端就是根据这个值来判断数据是否重复

在这里插入图片描述
producer初始化会由server端生成一个PID,然后发送每条信息都包含该PID和sequence number,在server端,是按照partition同样存放一个sequence numbers 信息,通过判断客户端发送过来的sequence number与server端number+1差值来决定数据是否重复或者漏掉。

当 Producer 发送消息给 Broker 时,Broker 接收到消息并将其追加到消息流中。此时,Broker 返回 Ack 信号给 Producer 时,发生异常导致 Producer 接收 Ack 信号失败。对于 Producer 来说,会触发重试机制,将消息再次发送,但是,由于引入了幂等性,在每条消息中附带了 PID(Producer ID)和Sequence Number。相同的 PID 和 Sequence Number 发送给 Broker,而之前 Broker 缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条,不会出现重复发送的情况。

生成PID的流程

//在执行创建事务时
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//会创建一个Sender,并启动线程,执行如下run方法
Sender{void run(long now) {if (transactionManager != null) {try {........if (!transactionManager.isTransactional()) {// 为idempotent producer生成一个producer idmaybeWaitForProducerId();} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {

为什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于5

通常情况下为了保证数据顺序性,我们可以通过MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1来保证,这个也只是针对单实例。在kafka2.0+版本上,只要开启幂等性,不用设置这个参数也能保证发送数据的顺序性。

其实这里,要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 实例会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据(这个 5 是写死的,至于为什么是 5,可能跟经验有关,当不设置幂等性时,当这个设置为 5 时,性能相对来说较高,社区是有一个相关测试文档),如果超过 5,ProducerStateManager 就会将最旧的 batch 数据清除。

假设应用将 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 设置为 6,假设发送的请求顺序是 1、2、3、4、5、6,这时候 server 端只能缓存 2、3、4、5、6 请求对应的 batch 数据,这时候假设请求 1 发送失败,需要重试,当重试的请求发送过来后,首先先检查是否为重复的 batch,这时候检查的结果是否,之后会开始 check 其 sequence number 值,这时候只会返回一个 OutOfOrderSequenceException 异常,client 在收到这个异常后,会再次进行重试,直到超过最大重试次数或者超时,这样不但会影响 Producer 性能,还可能给 Server 带来压力(相当于client 狂发错误请求)。

幂等性的注意事项

  • 幂等性 Producer 只能保证单分区上的幂等性:即只能保证某个主题上的一个分区上不出现重复消息,无法实现多个分区的幂等性
  • 幂等性 Producer 只能实现单会话上的幂等性,不能实现跨会话的幂等性

会话:Producer 进程的一次运行,如果重启 Producer 进程,将丢失幂等性保证

配置幂等性

properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);

要启用幂等(idempotence),必须将enable.idempotence配置设置为true。 如果设置,则retries(重试)配置将默认为Integer.MAX_VALUE,acks配置将默认为all,如果显性的将acks设置为0,-1,那么将会报错。

此外,如果send(ProducerRecord)即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。最后,生产者只能保证单个会话内发送的消息的幂等性。

Exactly Once语义

将服务器的 ACK 级别设置为-1,可以保证 Producer 到 Server 之间不会丢失数据,即 At Least Once(至少一次) 语义。相对的,将服务器 ACK 级别设置为 0,可以保证生产者每条消息只会被发送一次,即At Most Once(最多一次) 语义。
At Least Once 可以保证数据不丢失,但是不能保证数据不重复;相对的,At Most Once 可以保证数据不重复,但是不能保证数据不丢失。但是,对于一些非常重要的信息,比如说交易数据,下游数据消费者要求数据既不重复也不丢失,即 Exactly Once(刚好一次) 语义。在 0.11 版本以前的 Kafka,对此是无能为力的,只能保证数据不丢失,再在下游消费者对数据做全局去重。对于多个下游应用的情况,每个都需要单独做全局去重,这就对性能造成了很大影响。

0.11 版本的 Kafka,引入了一项重大特性:幂等性。幂等性结合 At Least Once 语义,就构成了 Kafka 的Exactly Once 语义。即:
At Least Once + 幂等性 = Exactly Once
Kafka 的幂等性实现其实就是将原来下游需要做的去重放在了数据上游。

事务

幂等性并不能跨多个分区运作,而事务可以弥补这个缺陷。Kafka 事务是 2017 年 Kafka 0.11 引入的新特性。类似于数据库的事务。Kafka 事务指的是在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,生产者生产消息以及消费者提交 offset 的操作可以在一个原子操作中,要么都成功,要么都失败。尤其是在生产者、消费者并存时,事务的保障尤其重要。(consumer-transform-producer模式)

消费者提交偏移量导致重复消费消息的场景:消费者在消费消息完成提交偏移量o2之前挂掉了(假设它最近提交的偏移量是o1),此时执行再均衡时,其它消费者会重复消费消息(o1到o2之间的消息)。

事务的应用情况

在一个原子操作中,根据包含的操作类型,可以分为三种情况:

  • 只有Producer生产消息;
  • 消费消息和生产消息并存,这个是事务场景中最常用的情况,就是我们常说的“consume-transform-produce ”模式
  • 只有consumer消费消息

前两种情况是事务引入的场景,最后一种情况没有使用价值(跟使用手动提交效果一样)。

相关属性配置

使用kafka的事务API 时的一些注意事项:

  • 需要消费者的自动模式设置为false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc
  • 生产者配置transaction.id属性
  • 生产者不需要再配置enable.idempotence,因为如果配置了transaction.id,则此时enable.idempotence会被设置为true
  • 消费者需要配置Isolation.level。在consume-trnasform-produce模式下使用事务时,必须设置为READ_COMMITTED。

Producer事务

为了实现跨分区跨会话的事务,需要引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的Transaction ID 获得原来的PID。
为了管理 Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和 Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。Transaction Coordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

要使用事务生产者和attendant API,必须设置transactional.id。如果设置了transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。此外,应该对包含在事务中的topic进行耐久性配置。特别是,replication.factor应该至少是3,而且这些topic的min.insync.replicas应该设置为2。最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。

transactional.id的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。

所有新的事务性API都是阻塞的,并且会在失败时抛出异常。

Producer接口中有关事务的方法定义

//producer提供的事务方法/*** 初始化事务。需要注意的有:* 1、前提* 需要保证transation.id属性被配置。* 2、这个方法执行逻辑是:*   (1)Ensures any transactions initiated by previous instances of the producer with the same*      transactional.id are completed. If the previous instance had failed with a transaction in*      progress, it will be aborted. If the last transaction had begun completion,*      but not yet finished, this method awaits its completion.*    (2)Gets the internal producer id and epoch, used in all future transactional*      messages issued by the producer.**/
public void initTransactions();/*** 开启事务*/
public void beginTransaction() throws ProducerFencedException ;/*** 为消费者提供的在事务内提交偏移量的操作*/
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId) throws ProducerFencedException ;/*** 提交事务*/
public void commitTransaction() throws ProducerFencedException;/*** 放弃事务,类似回滚事务的操作*/
public void abortTransaction() throws ProducerFencedException ;

创建生产者

配置transactional.id属性

public static Producer<String, String> createProducer() {Properties properties = new Properties();//配置文件里面的变量都是静态final类型的,并且都有默认的值//用于建立与 kafka 集群连接的 host/port//继承的hashtable,保证了线程安全properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP:9092");/*** producer 将试图批处理消息记录,以减少请求次数。这将改善 client 与 server 之间的性能。这项配置控制默认的批量处理消息字节数。* 不会试图处理大于这个字节数的消息字节数。发送到 brokers 的请求将包含多个批量处理,其中会包含对每个 partition 的一个请求。* 较小的批量处理数值比较少用,并且可能降低吞吐量(0 则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特* 定批量处理数值的内存大小**/properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");/*** producer 组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。通常来说,这只有在记录产生速度大于发送速度的时候才* 能发生。然而,在某些条件下,客户端将希望降低请求的数量,甚至降低到中等负载一下。这项设置将通过增加小的延迟来完成--即,不是立即* 发送一条记录,producer 将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理。这可以认为是 TCP 种 Nagle 的算* 法类似。这项设置设定了批量处理的更高的延迟边界:一旦我们获得某个 partition 的batch.size,他将会立即发送而不顾这项设置,* 然而如果我们获得消息字节数比这项设置要小的多,我们需要“linger”特定的时间以获取更多的消息。 这个设置默认为 0,即没有延迟。设* 定 linger.ms=5,例如,将会减少请求数目,但是同时会增加 5ms 的延迟**/properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");/*** producer 可以用来缓存数据的内存大小。如果数据产生速度大于向 broker 发送的速度,将会耗尽这个缓存空间,producer* 会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和 producer 能够使用的总内存相关,但并不是一个* 硬性的限制,因为不是producer 使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些* 用于维护请求当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。**/properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");/*** 该配置控制 KafkaProducer's send(),partitionsFor(),inittransaction (),sendOffsetsToTransaction(),commitTransaction() "* 和abortTransaction()方法将阻塞。对于send(),此超时限制了获取元数据和分配缓冲区的总等待时间"**/properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");//将消息发送到kafka server, 所以肯定需要用到序列化的操作  我们这里发送的消息是string类型的,所以使用string的序列化类properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//设置事务ID  如果配置了transactional.id属性,则enable.idempotence 会被设置为true.properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transactional-id");return new KafkaProducer<>(properties);}

创建消费者

  • 将配置中的自动提交属性(auto.commit)进行关闭
  • 而且在代码里面也不能使用手动提交commitSync( )或者commitAsync( )
  • 设置isolation.level
/*** 需要:* 1、关闭自动提交 enable.auto.commit* 2、isolation.level为 read_committed* @return*/
public static Consumer createConsumer() {Properties properties = new Properties();// bootstrap.servers是Kafka集群的IP地址。多个时,使用逗号隔开properties.put("bootstrap.servers", "IP:9092");// 消费者群组properties.put("group.id", "groupxt");// 设置隔离级别properties.put("isolation.level","read_committed");// 关闭自动提交properties.put("enable.auto.commit", "false");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");return new KafkaConsumer<String, String>(properties);
}

只有Producer生产消息

/*Producer异步发送带回调函数(事务)  在一个事务只有生产消息操作
*/
public static void onlyProduceInTransaction(){Producer<String,String> producer = ProducerTransaction.createProducer();// 1.初始化事务producer.initTransactions();try {// 2.开启事务producer.beginTransaction();// 3.kafka写操作集合// 3.1 do业务逻辑// 3.2 发送消息// 消息对象 - ProducerRecoderfor(int i=0;i<10;i++){ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);//就是多传入一个回调实例/*** 用户可以实现的回调接口,以允许代码在请求完成时执行。 这个回调通常会在后台 I/O 线程中执行,所以它应该很快。**/producer.send(record, new Callback() {/*** 用户可以实现的回调方法,以提供对请求完成的异步处理。 当发送到服务器的记录被确认时,将调用此方法。 当回调中的异常不为空时,元数据将包含除 topicPartition 之外的所有字段的特殊 -1 值,这将是有效的。* 参形:* metadata – 已发送记录的元数据(即分区和偏移量)。 如果发生错误,将返回除 topicPartition 之外所有其他字段都为-1的 的元数据,* exception – 在处理此记录期间引发的异常。 如果没有发生错误,则为 Null。* 可能抛出的异常包括: 不可重试异常(致命,永远不会发送消息):* InvalidTopicException OffsetMetadataTooLargeException RecordBatchTooLargeException RecordTooLargeException UnknownServerException UnknownProducerIdException InvalidProducerEpochException* 可重试异常(可以通过增加retries来覆盖): CorruptRecordException InvalidMetadataException NotEnoughReplicasAfterAppendException NotEnoughOutReplicasException Offset**/@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("partition : "+recordMetadata.partition()+" , offset : "+recordMetadata.offset());}});}// 3.3 do其他业务逻辑,还可以发送其他topic的消息。// 4.事务提交producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.// 5.放弃事务producer.abortTransaction();}finally{// 所有的通道打开都需要关闭  close方法会会将缓存队列状态置为关闭,唤醒io线程将内存中的数据发往broker,避免这个程序的进程突然挂掉,然后内存里面的消息丢失,所以这个方法结束的时候,将消息数据都发送出去producer.close();}}

只有consumer消费消息

/*** 在一个事务只有消息操作*/
public static void onlyConsumeInTransaction() {// 1.构建上产者Producer<String,String> producer = ProducerTransaction.createProducer();// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作producer.initTransactions();// 3.构建消费者和订阅主题Consumer consumer = createConsumer();consumer.subscribe(Arrays.asList("xt"));while (true) {// 4.开启事务producer.beginTransaction();// 5.1 接受消息Duration duration = Duration.ofMillis(500);ConsumerRecords<String, String> records = consumer.poll(duration);try {// 5.2 do业务逻辑;System.out.println("customer Message---");Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();for (ConsumerRecord<String, String> record : records) {// 5.2.1 处理消息 print the offset,key and value for the consumer records.System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());// 5.2.2 记录提交偏移量commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));}// 6.提交偏移量producer.sendOffsetsToTransaction(commits, "groupxt");// 7.事务提交producer.commitTransaction();}catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.// 8.放弃事务System.out.println(e.getMessage());producer.abortTransaction();}finally{producer.flush();}}}

消费消息和生产消息并存(consume-transform-produce)

在一个事务中,既有生产消息操作又有消费消息操作,即常说的Consume-tansform-produce模式。如下实例代码

/*** 在一个事务内,即有生产消息又有消费消息*/
public static void consumeTransferProduce() {// 1.构建上产者Producer<String,String> producer = ProducerTransaction.createProducer();// 2.初始化事务(生成productId),对于一个生产者,只能执行一次初始化事务操作producer.initTransactions();// 3.构建消费者和订阅主题Consumer consumer = createConsumer();consumer.subscribe(Arrays.asList("xt"));while (true) {// 4.开启事务producer.beginTransaction();// 5.1 接受消息Duration duration = Duration.ofMillis(5000);ConsumerRecords<String, String> records = consumer.poll(duration);System.out.println(records.count());try {// 5.2 do业务逻辑;System.out.println("customer Message---");Map<TopicPartition, OffsetAndMetadata> commits = new HashMap<>();for (ConsumerRecord<String, String> record : records) {// 5.2.1 读取消息,并处理消息。print the offset,key and value for the consumer records.System.out.printf("offset = %d, key = %s, value = %s\n",record.offset(), record.key(), record.value());// 5.2.2 记录提交的偏移量commits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset()));// 6.生产新的消息。比如外卖订单状态的消息,如果订单成功,则需要发送跟商家结转消息或者派送员的提成消息producer.send(new ProducerRecord<String, String>("xt", "data"));}// 7.提交偏移量producer.sendOffsetsToTransaction(commits, "groupxt");// 8.事务提交producer.commitTransaction();} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {// We can't recover from these exceptions, so our only option is to close the producer and exit.producer.close();} catch (KafkaException e) {// For all other exceptions, just abort the transaction and try again.// 7.放弃事务producer.abortTransaction();}finally{producer.flush();}}
}

Consumer事务

上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其是无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

相关配置文件字段

Broker configs

配置项描述
transactional.id.timeout.mstransaction coordinator没有从生产者哪里接收到任何事务状态更新的等待时间, 过了时间之后会主动地使生产者。
Transactional Id过期默认为604800000(7天)。这允许制作人每周定期工作来维护他们的id
max.transaction.timeout.ms事务允许的最大超时时间。
如果客户端请求的事务时间超过这个值,那么broker将在InitPidRequest中返回一个InvalidTransactionTimeout错误。这可以防止客户端超时时间过长,否则会导致客户读取事务中包含的Topic 时出现停顿。
默认为900000(15分钟)。这是需要发送事务消息的时间的保守上限。
transaction.state.log.replication.factor事务状态主题的副本数量。默认为3
transaction.state.log.num.partitions事务状态主题的分区数。默认为50
transaction.state.log.min.isr事务状态主题的每个分区需要考虑在线的insync副本的最小数量。默认为2
transaction.state.log.segment.bytes事务状态主题的段大小。Default: 104857600 bytes.

Producer configs

配置项描述
enable.idempotence是否启用幂等(默认为false)。
如果禁用,生产者将不设置PID字段在生产请求和当前生产者交付语义中。注意,为了使用事务,必须启用幂等性。当幂等性启用时,我们强制acks=all,retries > 1,并且max. flight.requests.per.connection=1。 如果这些配置没有这些值,我们就不能保证幂等性。
如果这些设置没有被应用程序显式覆盖,当幂等功能启用时,生产者将设置acks=all, retries=Integer.MAX_VALUE, max.inflight.requests.per.connection=1
transaction.timeout.ms事务协调器在主动中止正在进行的事务之前,等待事务状态更新的最长时间,这个配置值将与InitPidRequest一起发送到事务协调器。
如果该值大于代理中设置的max.transaction.timeout.ms,则请求将失败,并出现InvalidTransactionTimeout错误。
默认是60000ms。这使得事务不会阻塞下游消费超过一分钟,这在实时应用中通常是允许的。
transactional.id用于事务交付的TransactionalId。
这支持跨多个生产者会话的可靠性语义,因为它允许客户端保证使用相同TransactionalId的事务在启动任何新事务之前已经完成。
如果没有提供TransactionalId,那么生产者将被限制为幂等交付。
如果配置了transactional.id属性,则enable.idempotence 会被设置为true.

Consumer configs

配置项描述
isolation.level(default is read_uncommitted)
read_uncommitted:按照偏移量顺序使用已提交和未提交的消息。
read_committed:只使用非事务性消息或按偏移顺序提交的事务性消息。为了保持偏移顺序,这个设置意味着我们必须在消费者中缓冲消息,直到我们看到给定事务中的所有消息。

幂等性和事务性的关系

事务属性实现前提是幂等性,即在配置事务属性transaction id时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

  • 幂等性引入了Porducer ID
  • 事务属性引入了Transaction Id属性

transactionalId 、producerId 和 producerEpoch
在这里插入图片描述
一个app有一个tid,同一个应用的不同实例PID是一样的,只是epoch的值不同。
在这里插入图片描述

对于同一个事务ID,先保证epoch小的producer执行init-transaction和committransaction,然后epoch较大的procuder才能开始执行init-transaction和commit-transaction,如下顺序:
在这里插入图片描述
有了transactionId后,Kafka可保证:

跨Session的数据幂等发送。当具有相同Transaction ID的新的Producer实例被创建且工作时,旧的且拥有相同Transaction ID的Producer将不再工作。kafka保证了关联同一个事务的所有producer(一个应用有多个实例)必须按照顺序初始化事务、和提交事务,否则就会有问题,这保证了同一事务ID中消息是有序的(不同实例得按顺序创建事务和提交事务)。

spring-kafka的事务设置

kafka是跨Session的数据幂等发送,即如果应用部署多个实例时常会遇到“org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.”,必须保证这些实例生成者的提交事务顺序和创建顺序保持一致才可以,否则就无法成功。其实,在实践中,我们更多的是如何实现对应用单实例的事务性。可以通过spring-kafaka实现思路来学习,即每次创建生成者都设置一个不同的transactionId的值,如下代码:

====================================
类名:ProducerFactoryUtils
====================================
/*** Obtain a Producer that is synchronized with the current transaction, if any.* @param producerFactory the ConnectionFactory to obtain a Channel for* @param <K> the key type.* @param <V> the value type.* @return the resource holder.*/public static <K, V> KafkaResourceHolder<K, V> getTransactionalResourceHolder(final ProducerFactory<K, V> producerFactory) {Assert.notNull(producerFactory, "ProducerFactory must not be null");// 1.对于每一个线程会生成一个唯一key,然后根据key去查找resourceHolder@SuppressWarnings("unchecked")KafkaResourceHolder<K, V> resourceHolder = (KafkaResourceHolder<K, V>) TransactionSynchronizationManager.getResource(producerFactory);if (resourceHolder == null) {// 2.创建一个消费者Producer<K, V> producer = producerFactory.createProducer();// 3.开启事务producer.beginTransaction();resourceHolder = new KafkaResourceHolder<K, V>(producer);bindResourceToTransaction(resourceHolder, producerFactory);}return resourceHolder;}

在spring-kafka中,对于一个线程创建一个producer,事务提交之后,还会关闭这个producer并清除,后续同一个线程或者新的线程重新执行事务时,此时就会重新创建producer。

创建消费者代码

====================================
类名:DefaultKafkaProducerFactory
====================================
protected Producer<K, V> createTransactionalProducer() {Producer<K, V> producer = this.cache.poll();if (producer == null) {Map<String, Object> configs = new HashMap<>(this.configs);// 对于每一次生成producer时,都设置一个不同的transactionIdconfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);// 1.初始化话事务。producer.initTransactions();return new CloseSafeProducer<K, V>(producer, this.cache);}else {return producer;}}

Consume-transform-Produce 的流程图

在这里插入图片描述

流程1 :查找Tranaction Corordinator。

Producer向任意一个brokers发送 FindCoordinatorRequest请求来获取Transaction Coordinator的地址。

流程2:初始化事务 initTransaction

Producer发送InitpidRequest给事务协调器,获取一个Pid。InitpidRequest的处理过程是同步阻塞的,一旦该调用正确返回,Producer就可以开始新的事务。TranactionalId通过InitpidRequest发送给Tranciton Corordinator,然后在Tranaciton Log中记录这<TranacionalId,pid>的映射关系。除了返回PID之外,还具有如下功能:

对PID对应的epoch进行递增,这样可以保证同一个app的不同实例对应的PID是一样的,但是epoch是不同的。
回滚之前的Producer未完成的事务(如果有)。
流程3: 开始事务beginTransaction

执行Producer的beginTransacion(),它的作用是Producer在本地记录下这个transaction的状态为开始状态。

注意:这个操作并没有通知Transaction Coordinator。

流程4: Consume-transform-produce loop

流程4.0: 通过Consumtor消费消息,处理业务逻辑

流程4.1: producer向TransactionCordinantro发送AddPartitionsToTxnRequest

在producer执行send操作时,如果是第一次给<topic,partion>发送数据,此时会向Trasaction Corrdinator发送一个AddPartitionsToTxnRequest请求,Transaction Corrdinator会在transaction log中记录下tranasactionId和<topic,partion>一个映射关系,并将状态改为begin。AddPartionsToTxnRequest的数据结构如下:

AddPartitionsToTxnRequest => TransactionalId PID Epoch [Topic [Partition]]TransactionalId => stringPID => int64Epoch => int16Topic => stringPartition => int32

流程4.2: producer#send发送 ProduceRequst

生产者发送数据,虽然没有还没有执行commit或者absrot,但是此时消息已经保存到kafka上,而且即使后面执行abort,消息也不会删除,只是更改状态字段标识消息为abort状态。

流程4.3: AddOffsetCommitsToTxnRequest

Producer通过KafkaProducer.sendOffsetsToTransaction 向事务协调器器发送一个AddOffesetCommitsToTxnRequests:

AddOffsetsToTxnRequest => TransactionalId PID Epoch ConsumerGroupIDTransactionalId => stringPID => int64Epoch => int16ConsumerGroupID => string

在执行事务提交时,可以根据ConsumerGroupID来推断_customer_offsets主题中相应的TopicPartions信息。这样在

流程4.4: TxnOffsetCommitRequest

Producer通过KafkaProducer.sendOffsetsToTransaction还会向消费者协调器Cosumer Corrdinator发送一个TxnOffsetCommitRequest,在主题_consumer_offsets中保存消费者的偏移量信息。

TxnOffsetCommitRequest   => ConsumerGroupID PIDEpochRetentionTimeOffsetAndMetadata ConsumerGroupID => stringPID => int64Epoch => int32RetentionTime => int64OffsetAndMetadata => [TopicName [Partition Offset Metadata]]TopicName => stringPartition => int32Offset => int64Metadata => string

流程5: 事务提交和事务终结(放弃事务)

通过生产者的commitTransaction或abortTransaction方法来提交事务和终结事务,这两个操作都会发送一个EndTxnRequest给Transaction Coordinator。

流程5.1:EndTxnRequest。Producer发送一个EndTxnRequest给Transaction Coordinator,然后执行如下操作:

  • Transaction Coordinator会把PREPARE_COMMIT or PREPARE_ABORT
    消息写入到transaction log中记录
  • 执行流程5.2
  • 执行流程5.3

流程5.2:WriteTxnMarkerRequest

WriteTxnMarkersRequest => [CoorinadorEpoch PID Epoch Marker [Topic [Partition]]]CoordinatorEpoch => int32PID => int64Epoch => int16Marker => boolean (false(0) means ABORT, true(1) means COMMIT)Topic => stringPartition => int32
  • 对于Producer生产的消息。Tranaction Coordinator会发送WriteTxnMarkerRequest给当前事务涉及到每个<topic,partion>的leader,leader收到请求后,会写入一个COMMIT(PID) 或者 ABORT(PID)的控制信息到data log中
  • 对于消费者偏移量信息,如果在这个事务里面包含_consumer-offsets主题。Tranaction
    Coordinator会发送WriteTxnMarkerRequest给Transaction Coordinartor,Transaction Coordinartor收到请求后,会写入一个COMMIT(PID) 或者ABORT(PID)的控制信息到 data log中。

流程5.3:Transaction Coordinator会将最终COMPLETE_COMMIT或COMPLETE_ABORT消息写入Transaction Log中以标明该事务结束。

  • 只会保留这个事务对应的PID和timstamp。然后把当前事务其他相关消息删除掉,包括PID和tranactionId的映射关系。

References:

  • https://blog.csdn.net/weixin_44758876/article/details/120195566
  • https://www.cnblogs.com/fnlingnzb-learner/p/13646390.html
  • https://www.orchome.com/303
  • https://blog.csdn.net/looo000ngname/article/details/107183144
  • http://www.heartthinkdo.com/?p=2040#4

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)

查看全文
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

相关文章

  1. 图腾的由来

    如今的世界&#xff0c;还是科技领导着的时候&#xff0c;大家都用心来发展科技生产力。这些特性便是大家用共同的经历铸就而来的&#xff0c;遇强则强遇难而上永不退缩。...

    2024/4/19 18:24:40
  2. 出圈-java

    题目描述&#xff1a; 设有n个人围坐一圈并按顺时针方向从1到n编号&#xff0c;从第1个人开始进行1到m的报数&#xff0c;报数到第个m人&#xff0c;此人出圈&#xff0c;再从他的下一个人重新开始1到m的报数&#xff0c;如此进行下去直到所剩下一人为止。 输入&#xff1a; 输…...

    2024/4/16 21:31:22
  3. Unity 如何延时运行或者重复运行函数?

    1.Invoke(函数名称字符串,几秒种后执行); 这种调用方法只会执行一次。 2.InvokeRepeating(函数名称字符串,几秒钟后开始,间隔秒数);这种调用方法就像一个计时器&#xff0c;是执行多次的。 具体参考 https://jingyan.baidu.com/article/0eb457e5ccbf8f03f0a9057a.html...

    2024/4/13 13:52:34
  4. build_process-webpack的构建流程

    webpack的构建流程? 一、运行流程 webpack 的运行流程是一个串行的过程&#xff0c;它的工作流程就是将各个插件串联起来 在运行过程中会广播事件&#xff0c;插件只需要监听它所关心的事件&#xff0c;就能加入到这条webpack机制中&#xff0c;去改变webpack的运作&#xf…...

    2024/4/14 1:30:40
  5. pytorch本地图片数据集加载成字典

    from torchvision.transforms import transforms import pandas as pdfile_pathos.path.join(rE:\python存储\leaves\images) Image_list[] labels_list[] a{}for i in range(0,18352):image_pathos.path.join(rE:\python存储\leaves\images,{}.jpg.format(i))imageImage.open(…...

    2024/4/13 13:52:54
  6. 如何改变晶振频率?

    简 介&#xff1a; 本文就拆开的一个普通的晶体进行观察&#xff0c;构建了 Colpittz振荡电路&#xff0c;并对晶体表面留下记号笔痕迹后谐振频率的改变进行了测量。这种方法可以对晶体的谐振频率进行修改&#xff0c;反过来也可以用于测量微小质量的改变。 关键词&#xff1a;…...

    2024/4/13 13:52:29
  7. java 重载、重写 构造函数详解

    1、重写只能出现在继承关系之中。当一个类继承它的父类方法时&#xff0c;都有机会重写该父类的方法。一个特例是父类的方法被标识为final。重写的主要优点是能够定义某个子类型特有的行为。 复制代码 class Animal { public void eat(){ System.out.println ("Animal is …...

    2024/4/19 22:18:10
  8. AcWing 1875. 贝茜的报复(数学+暴力枚举)

    题目连接 https://www.acwing.com/problem/content/1877/ 思路 我们顺着不太好计算&#xff0c;所以我们反着计算&#xff0c;计算出所有满足条件的奇数个数&#xff0c;然后相乘就好了&#xff0c;复杂度为O(N4)O(N^4)O(N4), 代码 #include<bits/stdc.h> using nam…...

    2024/4/13 13:52:34
  9. AcWing 1904. 奶牛慢跑(单调栈)

    题目连接 https://www.acwing.com/problem/content/1906/ 思路 如果一个牛比前面的牛跑的还慢那么就可以把这两个牛合并&#xff0c;如果相同的话就不要合并了&#xff0c;所以我们最后构造的就是一个速度非严格单调递增的序列 #include<bits/stdc.h> using namespac…...

    2024/4/7 21:28:43
  10. JavaScript 基础笔记(pink老师)-- 对象

    JavaScript 基础笔记&#xff08;pink老师&#xff09;-- 对象...

    2024/4/15 5:17:59
  11. 数学建模竞赛时间表(全年11场数模竞赛)

    大家好&#xff0c;我是北海。今天整理了下常见的数模竞赛时间&#xff0c;按竞赛正常举办时间顺序排序。 1、美国大学生数学建模竞赛 主办方&#xff1a;美国数学及其应用联合会 竞赛时间&#xff1a;寒假期间&#xff0c;春节前后 2、MathorCup高校数学建模挑战赛 主办方…...

    2024/4/13 13:53:29
  12. sqli-labs(Less62-65)布尔类型脚本

    Less-62Less-63Less-64Less-65Less-62 import requests from lxml import etree""" Less-62布尔类型爆破脚本 改源码$times 13000,重置一下challenges数据库,然后启动程序&#xff0c;包没下先pip下载 原理是按照payload循环字典&#xff0c;根据响应的长度&am…...

    2024/4/16 8:40:42
  13. (未学习)从零开始学习C++ Day 016

    本科郑州大学应用化学系&#xff0c;研究生福州大学物理化学&#xff0c;后从事营地教育两年&#xff0c;现在跨专业考研至计算机。 写这个文章记录的初衷是希望通过这样的方式来监督自己每日学习一定量的编程保持练习&#xff0c;虽然初试的成绩还未出。但只要有一线希望自然…...

    2024/4/20 12:12:31
  14. redis夺命连环问11--集群模式工作原理能说一下么?集群模式下的key是如何寻址的?分布式寻址都有哪些算法?了解一致性hash算法吗?

    目录Redis集群模式的工作原理能说一下么&#xff1f;首先谈数据分区规则再谈分区具体方案最后谈实例通信机制盲目扩大集群规模&#xff0c;实例间的通信量也会增加&#xff0c;集群性能变慢&#xff0c;如何降低实例间的通信开销&#xff1f;在集群模式下&#xff0c;redis 的 …...

    2024/4/30 16:52:10
  15. 02.25 Chat Servers Outgoing Traffic聊天服务流量输出

    聊天服务器传出流量|断续器 (jxnu.edu.cn)https://acs.jxnu.edu.cn/problem/CF5A 描述&#xff1a; Polycarp is working on a new project called "Polychat". Following modern tendencies in IT, he decided, that this project should contain chat as well. To…...

    2024/4/18 12:53:46
  16. JAVA入门-面向对象

    类&#xff1a;是对一切相似事物的统称.是一个泛泛的概念。可以用来描述事物的特征和行为 对象&#xff1a;就是一个具体的实体。 类和对象的关系&#xff1a; 对象是类的具体的实现&#xff0c;在类下面延伸出来具体对象 在java中先有类 再有对象 一、声明一个类 语法格…...

    2024/4/13 15:56:10
  17. Redis安装和启动所遇到的问题

    Redis安装和启动所遇到的问题 1、到redis官网https://redis.io/下载所需版本的压缩包&#xff08;源码包&#xff09; 2、将linux版本的redis上传到指定服务器目录&#xff08;我的目录是&#xff1a; /usr/java&#xff09; 3、将压缩包进行解压&#xff0c;解压到指定目录&a…...

    2024/4/13 13:53:49
  18. AcWing 1929. 镜子田地(DFS)

    题面连接 https://www.acwing.com/problem/content/description/1931/ 思路 因为光路可逆&#xff0c;那么对于每一个点的方向就是固定的&#xff0c;也就是入射角和反射角&#xff0c;那么对于每一个点的走向我们都是可以提前预知的&#xff0c;所以我们直接对每个外圈点进…...

    2024/4/13 13:53:34
  19. javaWeb学习笔记3—Tomcat

    五.Tomcat 学习视频地址 Tomcat五.Tomcat1.概念2.web资源分类3.常用的web服务器4.Tomcat服务器和Servlet版本的对应关系5.tomcat安装和使用6.修改端口号7.部署web工程到Tomcat8.idea配置tomcat9.使用maven配置1.概念 2.web资源分类 3.常用的web服务器 4.Tomcat服务器和Servlet…...

    2024/4/13 13:53:44
  20. STM32学习笔记1

    软件安装 STM32 CubeMx安装 STM32CubeMX 是 ST 意法半导体近几年来大力推荐的STM32 芯片图形化配置工具&#xff0c;目的就是为了方便开发者&#xff0c; 允许用户使用图形化向导生成C 初始化代码&#xff0c;可以大大减轻开发工作&#xff0c;时间和费用&#xff0c;提高开发…...

    2024/5/2 20:21:21

最新文章

  1. Maven 在项目的 pom.xml 文件中 指定 阿里云的景象仓库

    配置 在 项目的 pom.xml 文件中添加如下配置即可 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation&…...

    2024/5/6 6:14:51
  2. 梯度消失和梯度爆炸的一些处理方法

    在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言&#xff0c;在此感激不尽。 权重和梯度的更新公式如下&#xff1a; w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...

    2024/3/20 10:50:27
  3. [C++/Linux] UDP编程

    一. UDP函数 UDP&#xff08;用户数据报协议&#xff0c;User Datagram Protocol&#xff09;是一种无连接的网络协议&#xff0c;用于在互联网上交换数据。它允许应用程序发送数据报给另一端的应用程序&#xff0c;但不保证数据报能成功到达&#xff0c;也就是说&#xff0c;它…...

    2024/5/3 8:59:07
  4. 设计模式:组合模式

    定义 组合模式(Composite Pattern)是一种结构型设计模式,它允许你将对象组合成树形结构来表示“部分-整体”的层次结构。组合模式使得客户端可以统一对待单个对象和组合对象。 应用场景 组合模式适用于以下场景: 表达对象的部分-整体层次结构:当你想要表示对象的部分-整…...

    2024/5/3 22:02:39
  5. 【外汇早评】美通胀数据走低,美元调整

    原标题:【外汇早评】美通胀数据走低,美元调整昨日美国方面公布了新一期的核心PCE物价指数数据,同比增长1.6%,低于前值和预期值的1.7%,距离美联储的通胀目标2%继续走低,通胀压力较低,且此前美国一季度GDP初值中的消费部分下滑明显,因此市场对美联储后续更可能降息的政策…...

    2024/5/4 23:54:56
  6. 【原油贵金属周评】原油多头拥挤,价格调整

    原标题:【原油贵金属周评】原油多头拥挤,价格调整本周国际劳动节,我们喜迎四天假期,但是整个金融市场确实流动性充沛,大事频发,各个商品波动剧烈。美国方面,在本周四凌晨公布5月份的利率决议和新闻发布会,维持联邦基金利率在2.25%-2.50%不变,符合市场预期。同时美联储…...

    2024/5/4 23:54:56
  7. 【外汇周评】靓丽非农不及疲软通胀影响

    原标题:【外汇周评】靓丽非农不及疲软通胀影响在刚结束的周五,美国方面公布了新一期的非农就业数据,大幅好于前值和预期,新增就业重新回到20万以上。具体数据: 美国4月非农就业人口变动 26.3万人,预期 19万人,前值 19.6万人。 美国4月失业率 3.6%,预期 3.8%,前值 3…...

    2024/5/4 23:54:56
  8. 【原油贵金属早评】库存继续增加,油价收跌

    原标题:【原油贵金属早评】库存继续增加,油价收跌周三清晨公布美国当周API原油库存数据,上周原油库存增加281万桶至4.692亿桶,增幅超过预期的74.4万桶。且有消息人士称,沙特阿美据悉将于6月向亚洲炼油厂额外出售更多原油,印度炼油商预计将每日获得至多20万桶的额外原油供…...

    2024/5/4 23:55:17
  9. 【外汇早评】日本央行会议纪要不改日元强势

    原标题:【外汇早评】日本央行会议纪要不改日元强势近两日日元大幅走强与近期市场风险情绪上升,避险资金回流日元有关,也与前一段时间的美日贸易谈判给日本缓冲期,日本方面对汇率问题也避免继续贬值有关。虽然今日早间日本央行公布的利率会议纪要仍然是支持宽松政策,但这符…...

    2024/5/4 23:54:56
  10. 【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响

    原标题:【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响近日伊朗局势升温,导致市场担忧影响原油供给,油价试图反弹。此时OPEC表态稳定市场。据消息人士透露,沙特6月石油出口料将低于700万桶/日,沙特已经收到石油消费国提出的6月份扩大出口的“适度要求”,沙特将满…...

    2024/5/4 23:55:05
  11. 【外汇早评】美欲与伊朗重谈协议

    原标题:【外汇早评】美欲与伊朗重谈协议美国对伊朗的制裁遭到伊朗的抗议,昨日伊朗方面提出将部分退出伊核协议。而此行为又遭到欧洲方面对伊朗的谴责和警告,伊朗外长昨日回应称,欧洲国家履行它们的义务,伊核协议就能保证存续。据传闻伊朗的导弹已经对准了以色列和美国的航…...

    2024/5/4 23:54:56
  12. 【原油贵金属早评】波动率飙升,市场情绪动荡

    原标题:【原油贵金属早评】波动率飙升,市场情绪动荡因中美贸易谈判不安情绪影响,金融市场各资产品种出现明显的波动。随着美国与中方开启第十一轮谈判之际,美国按照既定计划向中国2000亿商品征收25%的关税,市场情绪有所平复,已经开始接受这一事实。虽然波动率-恐慌指数VI…...

    2024/5/4 23:55:16
  13. 【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试

    原标题:【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试美国和伊朗的局势继续升温,市场风险情绪上升,避险黄金有向上突破阻力的迹象。原油方面稍显平稳,近期美国和OPEC加大供给及市场需求回落的影响,伊朗局势并未推升油价走强。近期中美贸易谈判摩擦再度升级,美国对中…...

    2024/5/4 23:54:56
  14. 【原油贵金属早评】市场情绪继续恶化,黄金上破

    原标题:【原油贵金属早评】市场情绪继续恶化,黄金上破周初中国针对于美国加征关税的进行的反制措施引发市场情绪的大幅波动,人民币汇率出现大幅的贬值动能,金融市场受到非常明显的冲击。尤其是波动率起来之后,对于股市的表现尤其不安。隔夜美国股市出现明显的下行走势,这…...

    2024/5/6 1:40:42
  15. 【外汇早评】美伊僵持,风险情绪继续升温

    原标题:【外汇早评】美伊僵持,风险情绪继续升温昨日沙特两艘油轮再次发生爆炸事件,导致波斯湾局势进一步恶化,市场担忧美伊可能会出现摩擦生火,避险品种获得支撑,黄金和日元大幅走强。美指受中美贸易问题影响而在低位震荡。继5月12日,四艘商船在阿联酋领海附近的阿曼湾、…...

    2024/5/4 23:54:56
  16. 【原油贵金属早评】贸易冲突导致需求低迷,油价弱势

    原标题:【原油贵金属早评】贸易冲突导致需求低迷,油价弱势近日虽然伊朗局势升温,中东地区几起油船被袭击事件影响,但油价并未走高,而是出于调整结构中。由于市场预期局势失控的可能性较低,而中美贸易问题导致的全球经济衰退风险更大,需求会持续低迷,因此油价调整压力较…...

    2024/5/4 23:55:17
  17. 氧生福地 玩美北湖(上)——为时光守候两千年

    原标题:氧生福地 玩美北湖(上)——为时光守候两千年一次说走就走的旅行,只有一张高铁票的距离~ 所以,湖南郴州,我来了~ 从广州南站出发,一个半小时就到达郴州西站了。在动车上,同时改票的南风兄和我居然被分到了一个车厢,所以一路非常愉快地聊了过来。 挺好,最起…...

    2024/5/4 23:55:06
  18. 氧生福地 玩美北湖(中)——永春梯田里的美与鲜

    原标题:氧生福地 玩美北湖(中)——永春梯田里的美与鲜一觉醒来,因为大家太爱“美”照,在柳毅山庄去寻找龙女而错过了早餐时间。近十点,向导坏坏还是带着饥肠辘辘的我们去吃郴州最富有盛名的“鱼头粉”。说这是“十二分推荐”,到郴州必吃的美食之一。 哇塞!那个味美香甜…...

    2024/5/4 23:54:56
  19. 氧生福地 玩美北湖(下)——奔跑吧骚年!

    原标题:氧生福地 玩美北湖(下)——奔跑吧骚年!让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 啊……啊……啊 两…...

    2024/5/4 23:55:06
  20. 扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!

    原标题:扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!扒开伪装医用面膜,翻六倍价格宰客!当行业里的某一品项火爆了,就会有很多商家蹭热度,装逼忽悠,最近火爆朋友圈的医用面膜,被沾上了污点,到底怎么回事呢? “比普通面膜安全、效果好!痘痘、痘印、敏感肌都能用…...

    2024/5/5 8:13:33
  21. 「发现」铁皮石斛仙草之神奇功效用于医用面膜

    原标题:「发现」铁皮石斛仙草之神奇功效用于医用面膜丽彦妆铁皮石斛医用面膜|石斛多糖无菌修护补水贴19大优势: 1、铁皮石斛:自唐宋以来,一直被列为皇室贡品,铁皮石斛生于海拔1600米的悬崖峭壁之上,繁殖力差,产量极低,所以古代仅供皇室、贵族享用 2、铁皮石斛自古民间…...

    2024/5/4 23:55:16
  22. 丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者

    原标题:丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者【公司简介】 广州华彬企业隶属香港华彬集团有限公司,专注美业21年,其旗下品牌: 「圣茵美」私密荷尔蒙抗衰,产后修复 「圣仪轩」私密荷尔蒙抗衰,产后修复 「花茵莳」私密荷尔蒙抗衰,产后修复 「丽彦妆」专注医学护…...

    2024/5/4 23:54:58
  23. 广州械字号面膜生产厂家OEM/ODM4项须知!

    原标题:广州械字号面膜生产厂家OEM/ODM4项须知!广州械字号面膜生产厂家OEM/ODM流程及注意事项解读: 械字号医用面膜,其实在我国并没有严格的定义,通常我们说的医美面膜指的应该是一种「医用敷料」,也就是说,医用面膜其实算作「医疗器械」的一种,又称「医用冷敷贴」。 …...

    2024/5/4 23:55:01
  24. 械字号医用眼膜缓解用眼过度到底有无作用?

    原标题:械字号医用眼膜缓解用眼过度到底有无作用?医用眼膜/械字号眼膜/医用冷敷眼贴 凝胶层为亲水高分子材料,含70%以上的水分。体表皮肤温度传导到本产品的凝胶层,热量被凝胶内水分子吸收,通过水分的蒸发带走大量的热量,可迅速地降低体表皮肤局部温度,减轻局部皮肤的灼…...

    2024/5/4 23:54:56
  25. 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...

    解析如下&#xff1a;1、长按电脑电源键直至关机&#xff0c;然后再按一次电源健重启电脑&#xff0c;按F8健进入安全模式2、安全模式下进入Windows系统桌面后&#xff0c;按住“winR”打开运行窗口&#xff0c;输入“services.msc”打开服务设置3、在服务界面&#xff0c;选中…...

    2022/11/19 21:17:18
  26. 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。

    %读入6幅图像&#xff08;每一幅图像的大小是564*564&#xff09; f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...

    2022/11/19 21:17:16
  27. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...

    win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面&#xff0c;在等待界面中我们需要等待操作结束才能关机&#xff0c;虽然这比较麻烦&#xff0c;但是对系统进行配置和升级…...

    2022/11/19 21:17:15
  28. 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...

    有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows&#xff0c;请勿关闭计算机”的提示&#xff0c;要过很久才能进入系统&#xff0c;有的用户甚至几个小时也无法进入&#xff0c;下面就教大家这个问题的解决方法。第一种方法&#xff1a;我们首先在左下角的“开始…...

    2022/11/19 21:17:14
  29. win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...

    置信有很多用户都跟小编一样遇到过这样的问题&#xff0c;电脑时发现开机屏幕显现“正在配置Windows Update&#xff0c;请勿关机”(如下图所示)&#xff0c;而且还需求等大约5分钟才干进入系统。这是怎样回事呢&#xff1f;一切都是正常操作的&#xff0c;为什么开时机呈现“正…...

    2022/11/19 21:17:13
  30. 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...

    Win7系统开机启动时总是出现“配置Windows请勿关机”的提示&#xff0c;没过几秒后电脑自动重启&#xff0c;每次开机都这样无法进入系统&#xff0c;此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一&#xff1a;开机按下F8&#xff0c;在出现的Windows高级启动选…...

    2022/11/19 21:17:12
  31. 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...

    有不少windows10系统用户反映说碰到这样一个情况&#xff0c;就是电脑提示正在准备windows请勿关闭计算机&#xff0c;碰到这样的问题该怎么解决呢&#xff0c;现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法&#xff1a;1、2、依次…...

    2022/11/19 21:17:11
  32. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...

    今天和大家分享一下win7系统重装了Win7旗舰版系统后&#xff0c;每次关机的时候桌面上都会显示一个“配置Windows Update的界面&#xff0c;提示请勿关闭计算机”&#xff0c;每次停留好几分钟才能正常关机&#xff0c;导致什么情况引起的呢&#xff1f;出现配置Windows Update…...

    2022/11/19 21:17:10
  33. 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...

    只能是等着&#xff0c;别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚&#xff0c;只能是考虑备份数据后重装系统了。解决来方案一&#xff1a;管理员运行cmd&#xff1a;net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...

    2022/11/19 21:17:09
  34. 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?

    原标题&#xff1a;电脑提示“配置Windows Update请勿关闭计算机”怎么办&#xff1f;win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢&#xff1f;一般的方…...

    2022/11/19 21:17:08
  35. 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...

    关机提示 windows7 正在配置windows 请勿关闭计算机 &#xff0c;然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;关机提示 windows7 正在配…...

    2022/11/19 21:17:05
  36. 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...

    钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...

    2022/11/19 21:17:05
  37. 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...

    前几天班里有位学生电脑(windows 7系统)出问题了&#xff0c;具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面&#xff0c;长时间没反应&#xff0c;无法进入系统。这个问题原来帮其他同学也解决过&#xff0c;网上搜了不少资料&#x…...

    2022/11/19 21:17:04
  38. 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...

    本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法&#xff0c;并在最后教给你1种保护系统安全的好方法&#xff0c;一起来看看&#xff01;电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中&#xff0c;添加了1个新功能在“磁…...

    2022/11/19 21:17:03
  39. 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...

    许多用户在长期不使用电脑的时候&#xff0c;开启电脑发现电脑显示&#xff1a;配置windows更新失败&#xff0c;正在还原更改&#xff0c;请勿关闭计算机。。.这要怎么办呢&#xff1f;下面小编就带着大家一起看看吧&#xff01;如果能够正常进入系统&#xff0c;建议您暂时移…...

    2022/11/19 21:17:02
  40. 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...

    配置windows update失败 还原更改 请勿关闭计算机&#xff0c;电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;配置windows update失败 还原更改 请勿关闭计算机&#x…...

    2022/11/19 21:17:01
  41. 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...

    不知道大家有没有遇到过这样的一个问题&#xff0c;就是我们的win7系统在关机的时候&#xff0c;总是喜欢显示“准备配置windows&#xff0c;请勿关机”这样的一个页面&#xff0c;没有什么大碍&#xff0c;但是如果一直等着的话就要两个小时甚至更久都关不了机&#xff0c;非常…...

    2022/11/19 21:17:00
  42. 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...

    当电脑出现正在准备配置windows请勿关闭计算机时&#xff0c;一般是您正对windows进行升级&#xff0c;但是这个要是长时间没有反应&#xff0c;我们不能再傻等下去了。可能是电脑出了别的问题了&#xff0c;来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...

    2022/11/19 21:16:59
  43. 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...

    我们使用电脑的过程中有时会遇到这种情况&#xff0c;当我们打开电脑之后&#xff0c;发现一直停留在一个界面&#xff1a;“配置Windows Update失败&#xff0c;还原更改请勿关闭计算机”&#xff0c;等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢&#xff0…...

    2022/11/19 21:16:58
  44. 如何在iPhone上关闭“请勿打扰”

    Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...

    2022/11/19 21:16:57