分布式开放消息系统(RocketMQ)的原理与实践
分布式开放消息系统(RocketMQ)的原理与实践
分布式消息系统作为实现分布式系统可扩展、可伸缩性的关键组件,需要具有高吞吐量、高可用等特点。而谈到消息系统的设计,就回避不了两个问题:
- 消息的顺序问题
- 消息的重复问题
RocketMQ作为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?
关键特性以及其实现原理
一、顺序消息
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单创建、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是可以并行消费的。
假如生产者产生了2条消息:M1、M2,要保证这两条消息的顺序,应该怎样做?你脑中想到的可能是这样:
你可能会采用这种方式保证消息顺序
M1发送到S1后,M2发送到S2,如果要保证M1先于M2被消费,那么需要M1到达消费端后,通知S2,然后S2再将M2发送到消费端。
这个模型存在的问题是,如果M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就需要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M1、M2发送到同一个Server上:
保证消息顺序,你改进后的方法
这样可以保证M1先于M2到达MQServer(客户端等待M1成功后再发送M2),根据先达到先被消费的原则,M1会先于M2被消费,这样就保证了消息的顺序。
这个模型,理论上可以保证消息的顺序,但在实际运用中你应该会遇到下面的问题:
网络延迟问题
只要将消息从一台服务器发往另一台服务器,就会存在网络延迟问题。如上图所示,如果发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保证消息的顺序。即使M1和M2同时到达消费端,由于不清楚消费端1和消费端2的负载情况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和M2发往同一个消费者即可,且发送M1后,需要消费端响应成功后才能发送M2。
但又会引入另外一个问题,如果发送M1后,消费端1没有响应,那是继续发送M2呢,还是重新发送M1?一般为了保证消息一定被消费,肯定会选择重发M1到另外一个消费端2,就如下图所示。
保证消息顺序的正确姿势
这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种情况,一种是M1确实没有到达,另外一种情况是消费端1已经响应,但是Server端没有收到。如果是第二种情况,重发M1,就会造成M1被重复消费。也就是我们后面要说的第二个问题,消息重复问题。
回过头来看消息顺序问题,严格的顺序消息非常容易理解,而且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:
保证
生产者 - MQServer - 消费者
是一对一对一的关系
但是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会导致更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。
但我们的最终目标是要集群的高容错性和高吞吐量。这似乎是一对不可调和的矛盾,那么阿里是如何解决的?
世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!
有些问题,看起来很重要,但实际上我们可以通过合理的设计或者将问题分解来规避。如果硬要把时间花在解决它们身上,实际上是浪费的,效率低下的。从这个角度来看消息的顺序问题,我们可以得出两个结论:
1、不关注乱序的应用实际大量存在
2、队列无序并不意味着消息无序
最后我们从源码角度分析RocketMQ怎么实现发送顺序消息。
一般消息是通过轮询所有队列来发送的(负载均衡策略),顺序消息可以根据业务,比如说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:
// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);} }, orderId);
在获取到路由信息以后,会根据MessageQueueSelector
实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。
private SendResult send() {// 获取topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;// 根据我们的算法,选择一个发送队列// 这里的arg = orderIdmq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);}} }
二、消息重复
上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?还是“恰好”不解决。
造成消息的重复的根本原因是:网络不可达。只要通过网络交换数据,就无法避免这个问题。所以解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?
1、消费端处理消息的业务逻辑保持幂等性
2、保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现
第1条很好理解,只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,如果新到的消息ID已经在日志表中,那么就不再处理这条消息。
我们可以看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条可以消息系统实现,也可以业务端实现。正常情况下出现重复消息的概率不一定大,且由消息系统实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务端自己处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的原因。
RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。
三、事务消息
RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。我们以一个转帐的场景为例来说明这个问题:Bob向Smith转账100块。
在单机环境下,执行事务的情况,大概是下面这个样子:
单机环境下转账事务示意图
当用户增长到一定程度,Bob和Smith的账户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样:
集群环境下转账事务示意图
这时候你会发现,同样是一个转账的业务,在集群环境下,耗时居然成倍的增长,这显然是不能够接受的。那我们如何来规避这个问题?
大事务 = 小事务 + 异步
将大事务拆分成多个小事务异步执行。这样基本上能够将跨机事务的执行效率优化到与单机一致。转账的事务就可以分解成如下两个小事务:
小事务+异步消息
图中执行本地事务(Bob账户扣款)和发送异步消息应该保持同时成功或者失败中,也就是扣款成功了,发送消息一定要成功,如果扣款失败了,就不能再发送消息。那问题是:我们是先扣款还是先发送消息呢?
首先我们看下,先发送消息,大致的示意图如下:
事务消息:先发送消息
存在的问题是:如果消息发送成功,但是扣款失败,消费端就会消费此消息,进而向Smith账户加钱。
先发消息不行,那我们就先扣款呗,大致的示意图如下:
事务消息-先扣款
存在的问题跟上面类似:如果扣款成功,发送消息失败,就会出现Bob扣钱了,但是Smith账户未加钱。
可能大家会有很多的方法来解决这个问题,比如:直接将发消息放到Bob扣款的事务中去,如果发送失败,抛出异常,事务回滚。这样的处理方式也符合“恰好”不需要解决的原则。RocketMQ支持事务消息,下面我们来看看RocketMQ是怎样来实现的。
RocketMQ实现发送事务消息
RocketMQ第一阶段发送Prepared消息
时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息
,它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
那我们来看下RocketMQ源码,是不是这样来处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example
工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer
):
// 未决事务,MQ服务器回查客户端 // 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 构造事务消息的生产者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 设置事务决断处理类 producer.setTransactionCheckListener(transactionCheckListener); // 本地事务的处理逻辑,相当于示例中检查Bob账户并扣钱的逻辑 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 构造MSG,省略构造参数 Message msg = new Message(......); // 发送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();
接着查看sendMessageInTransaction
方法的源码,总共分为3个阶段:发送Prepared消息
、执行本地事务、发送确认消息。
public TransactionSendResult sendMessageInTransaction(.....) {// 逻辑代码,非实际代码// 1.发送消息sendResult = this.send(msg);// sendResult.getSendStatus() == SEND_OK// 2.如果消息发送成功,处理与消息关联的本地事务单元LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);// 3.结束事务this.endTransaction(sendResult, localTransactionState, localException); }
endTransaction
方法会将请求发往broker(mq server)
去更新事物消息的最终状态:
- 根据
sendResult
找到Prepared消息
- 根据
localTransaction
更新消息的最终状态
如果endTransaction
方法执行失败,导致数据没有发送到broker
,broker
会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态
则会向Producer
发起CheckTransaction
请求,Producer
会调用DefaultMQProducerImpl.checkTransactionState()
方法来处理broker
的定时回调请求,而checkTransactionState
会调用我们的事务设置的决断方法,最后调用endTransactionOneway
让broker
来更新消息的最终状态。
再回到转账的例子,如果Bob的账户的余额已经减少,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题?解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程中有可能会出现消息重复的问题,按照前面的思路解决即可。
消费事务消息
这样基本上可以解决超时问题,但是如果消费失败怎么办?阿里提供给我们的解决方法是:人工解决。大家可以考虑一下,按照事务的流程,因为某种原因Smith加款失败,需要回滚整个流程。如果消息系统要实现这个回滚流程的话,系统复杂度将大大提升,且很容易出现Bug,估计出现Bug的概率会比消费失败的概率大很多。我们需要衡量是否值得花这么大的代价来解决这样一个出现概率非常小的问题,这也是大家在解决疑难问题时需要多多思考的地方。
20160321补充:在3.2.6版本中移除了事务消息的实现,所以此版本不支持事务消息,具体情况请参考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
四、Producer如何发送消息
Producer
轮询某topic下的所有队列的方式来实现发送方的负载均衡,如下图所示:
producer发送消息负载均衡
首先分析一下RocketMQ的客户端发送消息的源码:
// 构造Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 初始化Producer,整个应用生命周期内,只需要初始化1次 producer.start(); // 构造Message Message msg = new Message("TopicTest1",// topic"TagA",// tag:给消息打标签,用于区分一类消息,可为null"OrderID188",// key:自定义Key,可以用于去重,可为null("Hello MetaQ").getBytes());// body:消息内容 // 发送消息并返回结果 SendResult sendResult = producer.send(msg); // 清理资源,关闭网络连接,注销自己 producer.shutdown();
在整个应用生命周期内,生产者需要调用一次start方法来初始化,初始化主要完成的任务有:
- 如果没有指定
namesrv
地址,将会自动寻址- 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向所有broker发送心跳...
- 启动负载均衡的服务
初始化完成后,开始发送消息,发送消息的主要代码如下:
private SendResult sendDefaultImpl(Message msg,......) {// 检查Producer的状态是否是RUNNINGthis.makeSureStateOK();// 检查msg是否合法:是否为null、topic,body是否为空、body是否超长Validators.checkMessage(msg, this.defaultMQProducer);// 获取topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 从路由信息中选择一个消息队列MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);// 将消息发送到该队列上去sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); }
代码中需要关注的两个方法tryToFindTopicPublishInfo
和selectOneMessageQueue
。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,所以tryToFindTopicPublishInfo
会首先从缓存中获取topic路由信息,如果没有获取到,则会自己去namesrv
获取路由信息。selectOneMessageQueue
方法通过轮询的方式,返回一个队列,以达到负载均衡的目的。
如果Producer发送消息失败,会自动重试,重试的策略:
- 重试次数 < retryTimesWhenSendFailed(可配置)
- 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
- 同时满足上面两个条件后,Producer会选择另外一个队列发送消息
五、消息存储
RocketMQ的消息存储是由consume queue
和commit log
配合完成的。
1、Consume Queue
consume queue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件commit log
上的位置。
我们可以在配置中指定consume queue与commitlog存储的目录
每个topic
下的每个queue
都有一个对应的consumequeue
文件,比如:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue文件组织,如图所示:
Consume Queue文件组织示意图
- 根据
topic
和queueId
来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另一个ConsumeQueue。 - 按照消费端的
GroupName
来分组重试队列,如果消费端消费失败,消息将被发往重试队列中,比如图中的%RETRY%ConsumerGroupA
。 - 按照消费端的
GroupName
来分组死信队列,如果消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,比如图中的%DLQ%ConsumerGroupA
。
死信队列(Dead Letter Queue)一般用于存放由于某种原因无法传递的消息,比如处理失败或者已经过期的消息。
Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,如下图所示:
consumequeue文件存储单元格式
- CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量
- Size存储中消息的大小
- Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时如果指定了Tag,会根据HashCode来快速查找到订阅的消息)
2、Commit Log
CommitLog:消息存放的物理文件,每台broker
上的commitlog
被本机所有的queue
共享,不做任何区分。
文件的默认位置如下,仍然可通过配置文件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构如下表所示,按照编号顺序以及编号对应的内容依次存储。
Commit Log存储单元结构图
3、消息存储实现
消息存储实现,比较复杂,也值得大家深入了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。
// Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting msg.setBodyCRC(UtilAll.crc32(msg.getBody())); StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); synchronized (this) {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();// Here settings are stored timestamp, in order to ensure an orderly globalmsg.setStoreTimestamp(beginLockTimestamp);// MapedFile:操作物理文件在内存中的映射以及将内存数据持久化到物理文件中MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();// 将Message追加到文件commitlogresult = mapedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// Create a new file, re-write the messagemapedFile = this.mapedFileQueue.getLastMapedFile();result = mapedFile.appendMessage(msg, this.appendMessageCallback);break;DispatchRequest dispatchRequest = new DispatchRequest(topic,// 1queueId,// 2result.getWroteOffset(),// 3result.getWroteBytes(),// 4tagsCode,// 5msg.getStoreTimestamp(),// 6result.getLogicsOffset(),// 7msg.getKeys(),// 8/*** Transaction*/msg.getSysFlag(),// 9msg.getPreparedTransactionOffset());// 10// 1.分发消息位置到ConsumeQueue// 2.分发到IndexService建立索引this.defaultMessageStore.putDispatchRequest(dispatchRequest); }
4、消息的索引文件
如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:
消息索引
索引文件主要用于根据key来查询消息的,流程主要是:
- 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
- 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
- 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)
六、消息订阅
RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另外一种是Pull模式,即消费端在需要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。
首先看下消费端的负载均衡:
消费端负载均衡
消费端会通过RebalanceService线程,10秒钟做一次基于topic下的所有队列负载:
- 遍历Consumer下的所有topic,然后根据topic订阅所有的消息
- 获取同一topic和Consumer Group下的所有Consumer
- 然后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等
如同上图所示:如果有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它类似于我们的分页,TOPIC下面的所有queue就是记录,Consumer的个数就相当于总的页数,那么每页有多少条记录,就类似于某个Consumer会消费哪些队列。
通过这样的策略来达到大体上的平均消费,这样的设计也可以很方面的水平扩展Consumer来提高消费能力。
消费端的Push模式是通过长轮询的模式来实现的,就如同下图:
Push模式示意图
Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,如果有消息就立即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。如果broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。
当然,Consumer端是通过一个线程将阻塞队列LinkedBlockingQueue<PullRequest>
中的PullRequest
发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest
时,如果发现没有消息,就会把PullRequest
扔到ConcurrentHashMap中缓存起来。broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest
检查,直到有数据返回。
七、RocketMQ的其他特性
前面的6个特性都是基本上都是点到为止,想要深入了解,还需要大家多多查看源码,多多在实际中运用。当然除了已经提到的特性外,RocketMQ还支持:
- 定时消息
- 消息的刷盘策略
- 主动同步策略:同步双写、异步复制
- 海量消息堆积能力
- 高效通信
- .......
其中涉及到的很多设计思路和解决方法都值得我们深入研究:
- 消息的存储设计:既要满足海量消息的堆积能力,又要满足极快的查询效率,还要保证写入的效率。
- 高效的通信组件设计:高吞吐量,毫秒级的消息投递能力都离不开高效的通信。
- .......
RocketMQ最佳实践
一、Producer最佳实践
1、一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤。
2、每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
3、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。
4、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
5、某些应用如果不关注消息是否发送成功,请直接使用sendOneWay
方法发送消息。
二、Consumer最佳实践
1、消费过程要做到幂等(即消费端去重)
2、尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。
3、优化每条消息消费过程
三、其他配置
线上应该关闭autoCreateTopicEnable
,即在配置文件中将其设置为false
。
RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic
,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker
上面创建名为TBW102
的TOPIC)路由信息,然后Producer
会选择一台Broker
发送消息,选中的broker
在存储消息时,发现消息的topic
还没有创建,就会自动创建topic
。后果就是:以后所有该TOPIC的消息,都将发送到这台broker
上,达不到负载均衡的目的。
所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。
RocketMQ设计相关
RocketMQ的设计假定:
每台PC机器都可能宕机不可服务
任意集群都有可能处理能力不足
最坏的情况一定会发生
内网环境需要低延迟来提供最佳用户体验
RocketMQ的关键设计:
分布式集群化
强数据安全
海量数据堆积
毫秒级投递延迟(推拉模式)
这是RocketMQ在设计时的假定前提以及需要到达的效果。我想这些假定适用于所有的系统设计。随着我们系统的服务的增多,每位开发者都要注意自己的程序是否存在单点故障,如果挂了应该怎么恢复、能不能很好的水平扩展、对外的接口是否足够高效、自己管理的数据是否足够安全...... 多多规范自己的设计,才能开发出高效健壮的程序。
附录:RocketMQ涉及到的几个专业术语和整体架构介绍
一、RocketMQ中的专业术语
Topictopic
表示消息的第一级类型,比如一个电商系统的消息可以分为:交易消息、物流消息...... 一条消息必须有一个Topic
。
TagTag
表示消息的第二级类型,比如交易消息又可以分为:交易创建消息,交易完成消息..... 一条消息可以没有Tag
。RocketMQ提供2级消息分类,方便大家灵活控制。
Queue
一个topic
下,我们可以设置多个queue(消息队列)
。当我们发送消息时,需要要指定该消息的topic
。RocketMQ会轮询该topic
下的所有队列,将消息发送出去。
Producer 与 Producer GroupProducer
表示消息队列的生产者。消息队列的本质就是实现了publish-subscribe模式,生产者生产消息,消费者消费消息。所以这里的Producer
就是用来生产和发送消息的,一般指业务系统。
Producer Group
是一类Producer
的集合名称,这类Producer
通常发送一类消息,且发送逻辑一致。
Consumer 与 Consumer Group
消息消费者,一般由后台系统异步消费消息。
Push Consumer
Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。Pull Consumer
Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
Consumer Group
是一类Consumer
的集合名称,这类Consumer
通常消费一类消息,且消费逻辑一致。
Broker
消息的中转者,负责存储和转发消息。可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker
是RocketMQ的核心,它不不能挂的,所以需要保证broker
的高可用。
广播消费
一条消息被多个Consumer
消费,即使这些Consumer
属于同一个Consumer Group
,消息也会被Consumer Group
中的每个Consumer
都消费一次。在广播消费中的Consumer Group
概念可以认为在消息划分方面无意义。
集群消费
一个Consumer Group
中的Consumer
实例平均分摊消费消息。例如某个Topic
有 9 条消息,其中一个Consumer Group
有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
NameServerNameServer
即名称服务,两个功能:
- 接收
broker
的请求,注册broker
的路由信息 - 接口
client
的请求,根据某个topic
获取其到broker
的路由信息NameServer
没有状态,可以横向扩展。每个broker
在启动的时候会到NameServer
注册;Producer
在发送消息前会根据topic
到NameServer
获取路由(到broker
)信息;Consumer
也会定时获取topic
路由信息。
二、RocketMQ Overview
rocketmq overview
Producer
向一些队列轮流发送消息,队列集合称为Topic
,Consumer
如果做广播消费,则一个consumer
实例消费这个Topic
对应的所有队列;如果做集群消费,则多个Consumer
实例平均消费这个Topic
对应的队列集合。
再看下RocketMQ物理部署结构图:
RocketMQ网络部署图
RocketMQ网络部署特点:
Name Server
是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。- Broker部署相对复杂,Broker分为
Master
与Slave
,一个Master
可以对应多个Slave
,但是一个Slave
只能对应一个Master
,Master
与Slave
的对应关系通过指定相同的BrokerName
,不同的BrokerId
来定义,BrokerId=0
表示Master
,非0表示Slave
。Master
也可以部署多个。每个Broker
与Name Server
集群中的所有节点建立长连接,定时注册Topic
信息到所有Name Server
。 Producer
与Name Server
集群中的其中一个节点(随机选择)建立长连接,定期从Name Server
取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer 完全无状态,可集群部署。Consumer
与Name Server
集群中的其中一个节点(随机选择)建立长连接,定期从Name Server
取Topic 路由信息,并向提供Topic
服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer
既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。
RocketMQ事务消费和顺序消费详解
一、RocketMq有3中消息类型
1.普通消费
2. 顺序消费
3.事务消费
- 顺序消费场景
在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成。也就是这个三个环节要有顺序,这个订单才有意义。RocketMQ可以保证顺序消费。
- rocketMq实现顺序消费的原理
produce在发送消息的时候,把消息发到同一个队列(queue)中,消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
注意:是把把消息发到同一个队列(queue),不是同一个topic,默认情况下一个topic包括4个queue
单个节点(Producer端1个、Consumer端1个)
1、Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,发送顺序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
2、Consumer.java
package order; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 设置自动提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",内容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
结果如下图所示:
这个五条数据被顺序消费了
- 多个节点(Producer端1个、Consumer端2个)
Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,发送顺序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 1); System.out.println(sendResult); } for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 2); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer1.java
/** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到 *,第二个线程无法访问这个队列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 设置自动提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",内容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
Consumer2.java
/** * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交) */ public class Consumer2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到 *,第二个线程无法访问这个队列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 设置自动提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",内容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer2 Started."); } }
先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息
Consumer1消费情况如图,都按照顺序执行了
Consumer2消费情况如图,都按照顺序执行了
二、事务消费
这里说的主要是分布式事物。下面的例子的数据库分别安装在不同的节点上。
事物消费需要先说说什么是事务。比如说:我们跨行转账,从工商银行转到建设银行,也就是我从工商银行扣除1000元之后,我的建设银行也必须加1000元。这样才能保证数据的一致性。假如工商银行转1000元之后,建设银行的服务器突然宕机,那么我扣除了1000,但是并没有在建设银行给我加1000,就出现了数据的不一致。因此加1000和减1000才行,减1000和减1000必须一起成功,一起失败。
再比如,我们进行网购的时候,我们下单之后,订单提交成功,仓库商品的数量必须减一。但是订单可能是一个数据库,仓库数量可能又是在另个数据库里面。有可能订单提交成功之后,仓库数量服务器突然宕机。这样也出现了数据不一致的问题。
使用消息队列来解决分布式事物:
现在我们去外面饭店吃饭,很多时候都不会直接给了钱之后直接在付款的窗口递饭菜,而是付款之后他会给你一张小票,你拿着这个小票去出饭的窗口取饭。这里和我们的系统类似,提高了吞吐量。即使你到第二个窗口,师傅告诉你已经没饭了,你可以拿着这个凭证去退款,即使中途由于出了意外你无法到达窗口进行取饭,但是只要凭证还在,可以将钱退给你。这样就保证了数据的一致性。
如何保证凭证(消息)有2种方法:
1、在工商银行扣款的时候,余额表扣除1000,同时记录日志,而且这2个表是在同一个数据库实例中,可以使用本地事物解决。然后我们通知建设银行需要加1000给该用户,建设银行收到之后给我返回已经加了1000给用户的确认信息之后,我再标记日志表里面的日志为已经完成。
2、通过消息中间件
原文地址:http://www.jianshu.com/p/453c6e7ff81c
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。
细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
例子:
Consumer.java
package transaction; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,订阅消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br> * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTransactionTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(msg + ",内容:" + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("transaction_Consumer Started."); } }
Producer.java
package transaction; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * 发送事务消息例子 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); // 事务回查最小并发数 producer.setCheckThreadPoolMinSize(2); // 事务回查最大并发数 producer.setCheckThreadPoolMaxSize(2); // 队列数 producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" // }; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 1; i <= 2; i++) { try { Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i, ("Hello RocketMQ " + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(10); } catch (MQClientException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
TransactionExecuterImpl .java --执行本地事务
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; /** * 执行本地事务 */ public class TransactionExecuterImpl implements LocalTransactionExecuter { // private AtomicInteger transactionIndex = new AtomicInteger(1); public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { System.out.println("执行本地事务msg = " + new String(msg.getBody())); System.out.println("执行本地事务arg = " + arg); String tags = msg.getTags(); if (tags.equals("transaction2")) { System.out.println("======我的操作============,失败了 -进行ROLLBACK"); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW; } }
TransactionCheckListenerImpl--未决事务,服务器回查客户端(目前已经被阉割啦)
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * 未决事务,服务器回查客户端 */ public class TransactionCheckListenerImpl implements TransactionCheckListener { // private AtomicInteger transactionIndex = new AtomicInteger(0); //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString())); // return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW; } }
producer端:发送数据到MQ,并且处理本地事物。这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据。第二个数据失败了,不会被消费。
Consumer只会接收到一个,第二个数据不会被接收到
分布式消息队列RocketMQ&Kafka -- 消息的“顺序消费”
在说到消息中间件的时候,我们通常都会谈到一个特性:消息的顺序消费问题。这个问题看起来很简单:Producer发送消息1, 2, 3。。。 Consumer按1, 2, 3。。。顺序消费。
但实际情况却是:无论RocketMQ,还是Kafka,缺省都不保证消息的严格有序消费!
这个特性看起来很简单,但为什么缺省他们都不保证呢?
“严格的顺序消费”有多么困难
下面就从3个方面来分析一下,对于一个消息中间件来说,”严格的顺序消费”有多么困难,或者说不可能。
发送端
发送端不能异步发送,异步发送在发送失败的情况下,就没办法保证消息顺序。
比如你连续发了1,2,3。 过了一会,返回结果1失败,2, 3成功。你把1再重新发送1遍,这个时候顺序就乱掉了。
存储端
对于存储端,要保证消息顺序,会有以下几个问题:
(1)消息不能分区。也就是1个topic,只能有1个队列。在Kafka中,它叫做partition;在RocketMQ中,它叫做queue。 如果你有多个队列,那同1个topic的消息,会分散到多个分区里面,自然不能保证顺序。
(2)即使只有1个队列的情况下,会有第2个问题。该机器挂了之后,能否切换到其他机器?也就是高可用问题。
比如你当前的机器挂了,上面还有消息没有消费完。此时切换到其他机器,可用性保证了。但消息顺序就乱掉了。
要想保证,一方面要同步复制,不能异步复制;另1方面得保证,切机器之前,挂掉的机器上面,所有消息必须消费完了,不能有残留。很明显,这个很难!!!
接收端
对于接收端,不能并行消费,也即不能开多线程或者多个客户端消费同1个队列。
总结
从上面的分析可以看出,要保证消息的严格有序,有多么困难!
发送端和接收端的问题,还好解决一点,限制异步发送,限制并行消费。但对于存储端,机器挂了之后,切换的问题,就很难解决了。
你切换了,可能消息就会乱;你不切换,那就暂时不可用。这2者之间,就需要权衡了。
业务需要全局有序吗?
通过上面分析可以看出,要保证一个topic内部,消息严格的有序,是很困难的,或者说条件是很苛刻的。
那怎么办呢?我们一定要使出所有力气、用尽所有办法,来保证消息的严格有序吗?
这里就需要从另外一个角度去考虑这个问题:业务角度。正如在下面这篇博客中所说的:
http://www.jianshu.com/p/453c6e7ff81c
实际情况中:
(1)不关注顺序的业务大量存在;
(2) 队列无序不代表消息无序。
第(2)条的意思是说:我们不保证队列的全局有序,但可以保证消息的局部有序。
举个例子:保证来自同1个order id的消息,是有序的!
下面就看一下在Kafka和RocketMQ中,分别是如何对待这个问题的:
Kafka中:发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton和key是可选的。
如果你指定了partition,那就是所有消息发往同1个partition,就是有序的。并且在消费端,Kafka保证,1个partition只能被1个consumer消费。
或者你指定key(比如order id),具有同1个key的所有消息,会发往同1个partition。也是有序的。
RocketMQ: RocketMQ在Kafka的基础上,把这个限制更放宽了一步。只指定(topic, key),不指定具体发往哪个队列。也就是说,它更加不希望业务方,非要去要一个全局的严格有序。
消费模式
一、集群消费
之前的博客中,启动的都是单个Consumer,如果启动多个呢?
RocketMQ-集群消费
其实,对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/……的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!
我们考虑一下这种情况:比如C2发生了重启,一条消息发往C3进行消费,但是这条消息的处理需要0.1S,而此时C2刚好完成重启,那么C2是否可能会收到这条消息呢?答案是肯定的,也就是consume broker的重启,或者水平扩容,或者不遵守先订阅后生产消息,都可能导致消息的重复消费!关于去重的话题会在后续中予以介绍!
至于消息分发到C1/C2/C3,其实也是可以设置策略的:
RocketMQ-消息负载策略
使用哪种策略,只需要实例化对应的对象即可,如:
AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle();
consumer.setAllocateMessageQueueStrategy(aqs);
上面内容,其实是一种消费模式——集群消费。
RocketMQ的消费模式有2种,查看一下源码:
public enum MessageModel {/*** broadcast*/BROADCASTING,/*** clustering*/CLUSTERING;
}
在默认情况下,就是集群消费(CLUSTERING),也就是上面提及的消息的负载均衡消费。另一种消费模式,是广播消费(BROADCASTING)。
二、广播消费
广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。
RocketMQ-广播消费模式设置
/*** Consumer,订阅消息*/
public class Consumer2 {public static void main(String[] args) throws InterruptedException, MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");consumer.setConsumeMessageBatchMaxSize(10);// 设置为广播消费模式consumer.setMessageModel(MessageModel.BROADCASTING);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "*");consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {for (MessageExt msg : msgs) {System.out.println(" Receive New Messages: " + msg);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重试}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 成功}});consumer.start();System.out.println("Consumer Started.");}
}
内容补充
《RocketMQ(三)——HelloWorld》那篇博客的最后提到了单批次消息消费数量 ,本文既然提到了集群消费,那就针对这两个内容再进行一次补充吧。
如果我们有2台节点,Producerw往MQ上写入20条数据 其中Consumer1中拉取了12条 。Consumer2中拉取了8 条,这种情况下,假如Consumer1宕机,那么我们消费数据的时候,只能消费到Consumer2中的8条,Consumer1中的12条已经持久化了。需要Consumer1恢复之后这12条数据才能继续被消费。其实这种先启动producer往MQ上写数据,然后再启动Consumer的情况本来就是违规操作,正确的情况应该是先启动Consumer后再启动producer。
集群消费和广播消费
基本概念
MQ 是基于发布订阅模型的消息系统。在 MQ 消息系统中消息的订阅方订阅关注的 Topic,以获取并消费消息。由于订阅方应用一般是分布式系统,以集群方式部署有多台机器。因此 MQ 约定以下概念。
集群:MQ 约定使用相同 Consumer ID 的订阅者属于同一个集群,同一个集群下的订阅者消费逻辑必须完全一致(包括 Tag 的使用),这些订阅者在逻辑上可以认为是一个消费节点。
集群消费:当使用集群消费模式时,MQ 认为任意一条消息只需要被集群内的任意一个消费者处理即可。
广播消费:当使用广播消费模式时,MQ 会将每条消息推送给集群内所有注册过的客户端,保证消息至少被每台机器消费一次。
场景对比
集群消费模式:
适用场景&注意事项
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理,如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证消息的每一次失败重投等逻辑都能路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播消费模式:
适用场景&注意事项
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,MQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,第一次启动时默认从最新消息消费,客户端的消费进度是被持久化在客户端本地的隐藏文件中,因此不建议删除该隐藏文件,否则会丢失部分消息。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播模式下服务端不维护消费进度,所以服务端不提供堆积查询和报警功能。
使用集群模式模拟广播:
适用场景&注意事项
- 每条消息都需要被多台机器处理,每台机器的逻辑可以相同也可以不一样。
- 消费进度在服务端维护,可靠性高于广播模式。
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
相关文章
- 1.1.6 session 服务器端会话技术
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_43209097/article/details/103270498Session:主菜 1. 概念:服务器端会话技术,在一次会话的多次请求间共享数据,…...
2024/3/28 23:40:34 - Docker安装Redis
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_42119415/article/details/103270972前期准备: 安装好Docker查询redis列表 docker search redis拉取最新版本 dock…...
2024/3/27 15:16:34 - 用了这个方法,两周没变过的模型精度居然提升了(附资源)
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/dQCFKyQDXYm3F8rB0/article/details/103271166作者 | Derrick Mwiti译者 | 刘畅编辑 | Jane出品 | AI科技大本营(ID:rgzn…...
2024/3/28 23:08:15 - SpringBoot 导出 Excel
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_29323645/article/details/103202565添加 maven 依赖<dependency><groupId>commons-fileupload</groupI…...
2024/3/18 13:42:23 - java 下载文件
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_29323645/article/details/103202799下载的工具类完成package com.geotmt.billingcenter.common.utils;import javax.se…...
2024/3/18 17:14:11 - 安卓调用系统分享
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_44554456/article/details/1032705141.调用系统的分享功能 2.通过第三方SDK,如ShareSDK,友盟等 3.自行使用各自平…...
2024/3/19 12:58:23 - 地理宣讲
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/RuiKe1400360107/article/details/103270212湖南师范大学——2019年11月28日 10:00——广东广雅中学(地理)湖南师范大学—…...
2024/3/19 12:57:33 - 排序算法2
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_42025376/article/details/103270502声明添加要改颜色的字体 添加要改颜色的字体 添加要改颜色的字体 添加要改颜色的字…...
2024/3/19 12:57:02 - netty搭建TCP服务器过程中一些入门常见问题记录
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/hero_lxz/article/details/1032702541.客户端第一次连接能进入到handler的方法中,但是一旦这个客户端关闭了连接,那么重新…...
2024/3/19 12:57:15 - 鼠标悬停下划线显示特效,html鼠标悬停显示下划线
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/allen_csdns/article/details/103270368源码Demo: <!DOCTYPE html> <html lang="en" > <head>…...
2024/3/19 12:55:46 - html+jQuery实现拖动滑块图片拼图验证码插件,移动端适用
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/allen_csdns/article/details/103270404一、index.html <!DOCTYPE html> <html lang="zh"> <hea…...
2024/3/27 20:51:04 - 使用 chroot 监狱限制 SSH 用户访问指定目录
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/netgc/article/details/103270160使用 chroot 监狱限制 SSH 用户访问指定目录 将 SSH 用户会话限制访问到特定的目录内,特…...
2024/3/19 12:53:31 - 并查集算法
原文链接:https://blog.csdn.net/dm_vincent/article/details/7655764出处链接:https://blog.csdn.net/dm_vincent/article/details/7655764本文主要介绍解决动态连通性一类问题的一种算法,使用到了一种叫做并查集的数据结构,称为Union-Find。更多的信息可以参考Algorithms…...
2024/3/21 11:22:42 - XSS攻击Filter
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_35830057/article/details/103263465private String stripXSS(String value) {if (value != null) {// NOTE: It‘s hig…...
2024/3/29 3:09:35 - BJFU_数据结构习题_295基于双向链表的双向冒泡排序法
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_43722827/article/details/103257814欢迎登录北京林业大学OJ系统 http://www.bjfuacm.com 295基于双向链表的双向冒…...
2024/3/28 10:28:46 - HDU 1000.A + B Problem:~scanf("%d%d",&a,&b)表示什么意思 输入字母字符为什么会进入死循环
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/YY51930460/article/details/103258893最近在刷杭电的题目 在此记录下自己踩过的坑 希望即将到来的比赛能取得好成绩话不…...
2024/3/28 16:39:51 - python基础——数据类型
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_43593912/article/details/103250248一、变量 什么是变量?当你把值赋给一个名字时,它会存储在内存中,把这块内存称为…...
2024/3/19 9:54:58 - 抽象类,接口的区别与使用
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq_43117793/article/details/103262854抽象类 1.is a 关系。(是不是) 对事物的抽象。 抽象类=普通类+抽象方法 2.继承…...
2024/3/25 20:17:28 - 刷牙
这是发生在我小时候的一件事。那时,不知道为什么,我特别不爱刷牙,于是就引出了这个故事。 那是个炎热的夏天。早上,温暖的阳光透过薄薄的窗帘,照在我的脸上,把我叫醒了。老让爸爸、妈妈催着刷牙多烦呀。算啦,还是主动一些吧。趁着爸爸、妈妈没起,我就已经把牙刷完了,并…...
2024/3/19 9:03:08 - Java基础之代码块
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_43171186/article/details/103263104在java中一对{}包裹的代码就叫做代码块; 块{} : 是一个作用域 1、 {}写在方法…...
2024/3/19 9:02:52
最新文章
- 理解JVM:从字节码到程序运行
大家好,我是程序员大猩猩。 今天我们来讲一下JVM,好多面试者在面试的时候,都会被问及JVM相关知识。那么JVM到底是什么,要理解它到底是出于什么原因? JVM俗称Java虚拟机,它是一个抽象的计算机,…...
2024/3/29 5:27:57 - 梯度消失和梯度爆炸的一些处理方法
在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言,在此感激不尽。 权重和梯度的更新公式如下: w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...
2024/3/20 10:50:27 - 论文导读 | 漫谈编辑问题
摘要 本文着眼于深度学习模型在各个领域中的编辑问题,从通用的分类器编辑算法切入,展开介绍针对扩散模型的图像编辑问题和针对大语言模型的知识编辑问题,希望能为读者关于“修改模型的行为”这一话题提供一些启发。 引言 当我们训练好一个…...
2024/3/29 0:55:13 - 通过Appium和Xcode Accessibility Inspector获取iOS应用元素定位的方法
在 iOS 移动应用程序上使用选择器查找元素定位是我们在移动端 UI 自动化测试的先决条件。 但是,由于应用程序内容在原生 iOS 应用程序中的呈现方式,我们可以用来定位应用程序元素的选择器与 Web 浏览器元素有很大不同。 在本文中,我们将了解 …...
2024/3/28 8:59:16 - Java学习笔记(17)
集合进阶 单列集合 Collection List set Add clear remove contains isempty size Add方法可能也会添加失败 同理,可能删除失败 Contains细节 为什么要重写equals? 因为contains底层用的是object类中的equals方法,比较的是地址值…...
2024/3/28 15:19:38 - 【外汇早评】美通胀数据走低,美元调整
原标题:【外汇早评】美通胀数据走低,美元调整昨日美国方面公布了新一期的核心PCE物价指数数据,同比增长1.6%,低于前值和预期值的1.7%,距离美联储的通胀目标2%继续走低,通胀压力较低,且此前美国一季度GDP初值中的消费部分下滑明显,因此市场对美联储后续更可能降息的政策…...
2024/3/27 10:21:24 - 【原油贵金属周评】原油多头拥挤,价格调整
原标题:【原油贵金属周评】原油多头拥挤,价格调整本周国际劳动节,我们喜迎四天假期,但是整个金融市场确实流动性充沛,大事频发,各个商品波动剧烈。美国方面,在本周四凌晨公布5月份的利率决议和新闻发布会,维持联邦基金利率在2.25%-2.50%不变,符合市场预期。同时美联储…...
2024/3/24 20:11:25 - 【外汇周评】靓丽非农不及疲软通胀影响
原标题:【外汇周评】靓丽非农不及疲软通胀影响在刚结束的周五,美国方面公布了新一期的非农就业数据,大幅好于前值和预期,新增就业重新回到20万以上。具体数据: 美国4月非农就业人口变动 26.3万人,预期 19万人,前值 19.6万人。 美国4月失业率 3.6%,预期 3.8%,前值 3…...
2024/3/29 2:45:46 - 【原油贵金属早评】库存继续增加,油价收跌
原标题:【原油贵金属早评】库存继续增加,油价收跌周三清晨公布美国当周API原油库存数据,上周原油库存增加281万桶至4.692亿桶,增幅超过预期的74.4万桶。且有消息人士称,沙特阿美据悉将于6月向亚洲炼油厂额外出售更多原油,印度炼油商预计将每日获得至多20万桶的额外原油供…...
2024/3/24 20:11:23 - 【外汇早评】日本央行会议纪要不改日元强势
原标题:【外汇早评】日本央行会议纪要不改日元强势近两日日元大幅走强与近期市场风险情绪上升,避险资金回流日元有关,也与前一段时间的美日贸易谈判给日本缓冲期,日本方面对汇率问题也避免继续贬值有关。虽然今日早间日本央行公布的利率会议纪要仍然是支持宽松政策,但这符…...
2024/3/29 5:19:52 - 【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响
原标题:【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响近日伊朗局势升温,导致市场担忧影响原油供给,油价试图反弹。此时OPEC表态稳定市场。据消息人士透露,沙特6月石油出口料将低于700万桶/日,沙特已经收到石油消费国提出的6月份扩大出口的“适度要求”,沙特将满…...
2024/3/28 17:01:12 - 【外汇早评】美欲与伊朗重谈协议
原标题:【外汇早评】美欲与伊朗重谈协议美国对伊朗的制裁遭到伊朗的抗议,昨日伊朗方面提出将部分退出伊核协议。而此行为又遭到欧洲方面对伊朗的谴责和警告,伊朗外长昨日回应称,欧洲国家履行它们的义务,伊核协议就能保证存续。据传闻伊朗的导弹已经对准了以色列和美国的航…...
2024/3/24 5:55:47 - 【原油贵金属早评】波动率飙升,市场情绪动荡
原标题:【原油贵金属早评】波动率飙升,市场情绪动荡因中美贸易谈判不安情绪影响,金融市场各资产品种出现明显的波动。随着美国与中方开启第十一轮谈判之际,美国按照既定计划向中国2000亿商品征收25%的关税,市场情绪有所平复,已经开始接受这一事实。虽然波动率-恐慌指数VI…...
2024/3/29 1:13:26 - 【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试
原标题:【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试美国和伊朗的局势继续升温,市场风险情绪上升,避险黄金有向上突破阻力的迹象。原油方面稍显平稳,近期美国和OPEC加大供给及市场需求回落的影响,伊朗局势并未推升油价走强。近期中美贸易谈判摩擦再度升级,美国对中…...
2024/3/26 23:04:51 - 【原油贵金属早评】市场情绪继续恶化,黄金上破
原标题:【原油贵金属早评】市场情绪继续恶化,黄金上破周初中国针对于美国加征关税的进行的反制措施引发市场情绪的大幅波动,人民币汇率出现大幅的贬值动能,金融市场受到非常明显的冲击。尤其是波动率起来之后,对于股市的表现尤其不安。隔夜美国股市出现明显的下行走势,这…...
2024/3/26 11:20:25 - 【外汇早评】美伊僵持,风险情绪继续升温
原标题:【外汇早评】美伊僵持,风险情绪继续升温昨日沙特两艘油轮再次发生爆炸事件,导致波斯湾局势进一步恶化,市场担忧美伊可能会出现摩擦生火,避险品种获得支撑,黄金和日元大幅走强。美指受中美贸易问题影响而在低位震荡。继5月12日,四艘商船在阿联酋领海附近的阿曼湾、…...
2024/3/24 20:11:18 - 【原油贵金属早评】贸易冲突导致需求低迷,油价弱势
原标题:【原油贵金属早评】贸易冲突导致需求低迷,油价弱势近日虽然伊朗局势升温,中东地区几起油船被袭击事件影响,但油价并未走高,而是出于调整结构中。由于市场预期局势失控的可能性较低,而中美贸易问题导致的全球经济衰退风险更大,需求会持续低迷,因此油价调整压力较…...
2024/3/28 9:10:53 - 氧生福地 玩美北湖(上)——为时光守候两千年
原标题:氧生福地 玩美北湖(上)——为时光守候两千年一次说走就走的旅行,只有一张高铁票的距离~ 所以,湖南郴州,我来了~ 从广州南站出发,一个半小时就到达郴州西站了。在动车上,同时改票的南风兄和我居然被分到了一个车厢,所以一路非常愉快地聊了过来。 挺好,最起…...
2024/3/29 0:49:46 - 氧生福地 玩美北湖(中)——永春梯田里的美与鲜
原标题:氧生福地 玩美北湖(中)——永春梯田里的美与鲜一觉醒来,因为大家太爱“美”照,在柳毅山庄去寻找龙女而错过了早餐时间。近十点,向导坏坏还是带着饥肠辘辘的我们去吃郴州最富有盛名的“鱼头粉”。说这是“十二分推荐”,到郴州必吃的美食之一。 哇塞!那个味美香甜…...
2024/3/24 20:11:15 - 氧生福地 玩美北湖(下)——奔跑吧骚年!
原标题:氧生福地 玩美北湖(下)——奔跑吧骚年!让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 啊……啊……啊 两…...
2024/3/27 7:12:50 - 扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!
原标题:扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!扒开伪装医用面膜,翻六倍价格宰客!当行业里的某一品项火爆了,就会有很多商家蹭热度,装逼忽悠,最近火爆朋友圈的医用面膜,被沾上了污点,到底怎么回事呢? “比普通面膜安全、效果好!痘痘、痘印、敏感肌都能用…...
2024/3/24 20:11:13 - 「发现」铁皮石斛仙草之神奇功效用于医用面膜
原标题:「发现」铁皮石斛仙草之神奇功效用于医用面膜丽彦妆铁皮石斛医用面膜|石斛多糖无菌修护补水贴19大优势: 1、铁皮石斛:自唐宋以来,一直被列为皇室贡品,铁皮石斛生于海拔1600米的悬崖峭壁之上,繁殖力差,产量极低,所以古代仅供皇室、贵族享用 2、铁皮石斛自古民间…...
2024/3/26 11:21:23 - 丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者
原标题:丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者【公司简介】 广州华彬企业隶属香港华彬集团有限公司,专注美业21年,其旗下品牌: 「圣茵美」私密荷尔蒙抗衰,产后修复 「圣仪轩」私密荷尔蒙抗衰,产后修复 「花茵莳」私密荷尔蒙抗衰,产后修复 「丽彦妆」专注医学护…...
2024/3/28 18:26:34 - 广州械字号面膜生产厂家OEM/ODM4项须知!
原标题:广州械字号面膜生产厂家OEM/ODM4项须知!广州械字号面膜生产厂家OEM/ODM流程及注意事项解读: 械字号医用面膜,其实在我国并没有严格的定义,通常我们说的医美面膜指的应该是一种「医用敷料」,也就是说,医用面膜其实算作「医疗器械」的一种,又称「医用冷敷贴」。 …...
2024/3/28 12:42:28 - 械字号医用眼膜缓解用眼过度到底有无作用?
原标题:械字号医用眼膜缓解用眼过度到底有无作用?医用眼膜/械字号眼膜/医用冷敷眼贴 凝胶层为亲水高分子材料,含70%以上的水分。体表皮肤温度传导到本产品的凝胶层,热量被凝胶内水分子吸收,通过水分的蒸发带走大量的热量,可迅速地降低体表皮肤局部温度,减轻局部皮肤的灼…...
2024/3/28 20:09:10 - 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...
解析如下:1、长按电脑电源键直至关机,然后再按一次电源健重启电脑,按F8健进入安全模式2、安全模式下进入Windows系统桌面后,按住“winR”打开运行窗口,输入“services.msc”打开服务设置3、在服务界面,选中…...
2022/11/19 21:17:18 - 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。
%读入6幅图像(每一幅图像的大小是564*564) f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...
2022/11/19 21:17:16 - 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...
win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面,在等待界面中我们需要等待操作结束才能关机,虽然这比较麻烦,但是对系统进行配置和升级…...
2022/11/19 21:17:15 - 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...
有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows,请勿关闭计算机”的提示,要过很久才能进入系统,有的用户甚至几个小时也无法进入,下面就教大家这个问题的解决方法。第一种方法:我们首先在左下角的“开始…...
2022/11/19 21:17:14 - win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...
置信有很多用户都跟小编一样遇到过这样的问题,电脑时发现开机屏幕显现“正在配置Windows Update,请勿关机”(如下图所示),而且还需求等大约5分钟才干进入系统。这是怎样回事呢?一切都是正常操作的,为什么开时机呈现“正…...
2022/11/19 21:17:13 - 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...
Win7系统开机启动时总是出现“配置Windows请勿关机”的提示,没过几秒后电脑自动重启,每次开机都这样无法进入系统,此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一:开机按下F8,在出现的Windows高级启动选…...
2022/11/19 21:17:12 - 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...
有不少windows10系统用户反映说碰到这样一个情况,就是电脑提示正在准备windows请勿关闭计算机,碰到这样的问题该怎么解决呢,现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法:1、2、依次…...
2022/11/19 21:17:11 - 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...
今天和大家分享一下win7系统重装了Win7旗舰版系统后,每次关机的时候桌面上都会显示一个“配置Windows Update的界面,提示请勿关闭计算机”,每次停留好几分钟才能正常关机,导致什么情况引起的呢?出现配置Windows Update…...
2022/11/19 21:17:10 - 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...
只能是等着,别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚,只能是考虑备份数据后重装系统了。解决来方案一:管理员运行cmd:net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...
2022/11/19 21:17:09 - 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?
原标题:电脑提示“配置Windows Update请勿关闭计算机”怎么办?win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢?一般的方…...
2022/11/19 21:17:08 - 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...
关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!关机提示 windows7 正在配…...
2022/11/19 21:17:05 - 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...
钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...
2022/11/19 21:17:05 - 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...
前几天班里有位学生电脑(windows 7系统)出问题了,具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面,长时间没反应,无法进入系统。这个问题原来帮其他同学也解决过,网上搜了不少资料&#x…...
2022/11/19 21:17:04 - 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...
本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法,并在最后教给你1种保护系统安全的好方法,一起来看看!电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中,添加了1个新功能在“磁…...
2022/11/19 21:17:03 - 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...
许多用户在长期不使用电脑的时候,开启电脑发现电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机。。.这要怎么办呢?下面小编就带着大家一起看看吧!如果能够正常进入系统,建议您暂时移…...
2022/11/19 21:17:02 - 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...
配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!配置windows update失败 还原更改 请勿关闭计算机&#x…...
2022/11/19 21:17:01 - 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...
不知道大家有没有遇到过这样的一个问题,就是我们的win7系统在关机的时候,总是喜欢显示“准备配置windows,请勿关机”这样的一个页面,没有什么大碍,但是如果一直等着的话就要两个小时甚至更久都关不了机,非常…...
2022/11/19 21:17:00 - 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...
当电脑出现正在准备配置windows请勿关闭计算机时,一般是您正对windows进行升级,但是这个要是长时间没有反应,我们不能再傻等下去了。可能是电脑出了别的问题了,来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...
2022/11/19 21:16:59 - 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...
我们使用电脑的过程中有时会遇到这种情况,当我们打开电脑之后,发现一直停留在一个界面:“配置Windows Update失败,还原更改请勿关闭计算机”,等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢࿰…...
2022/11/19 21:16:58 - 如何在iPhone上关闭“请勿打扰”
Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...
2022/11/19 21:16:57