RocketMQ源码解析之负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

Producer 的负载均衡

Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

RocketMQ 中 所有在服务端创建的 topic 都会被发布到 NameServer 供客户端查询使用,同时一个 topic 可以存在在多个 broker 上,TopicPublishInfo 不仅包含 topic 在哪个 broker 中,以及 topic 的存在broker路由信息,还包含了如果客户端不使用负载均衡发送消息时,默认使用的数据获取随机 MessageQueue 的服务。

public class TopicPublishInfo {private boolean orderTopic = false; //顺序 Topicprivate boolean haveTopicRouterInfo = false; //是否有路由信息//当前 topic 的 MessageQueue    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();//提供一个线程安全的Integer随机数获取服务    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private TopicRouteData topicRouteData; //有 topic 的broker路由信息public boolean ok() {return null != this.messageQueueList && !this.messageQueueList.isEmpty();}//通过BrokerName指定消息要发送的 Broker 服务器,即获取 BrokerName 指定的服务器上的随机一个 MessageQueuepublic MessageQueue selectOneMessageQueue(final String lastBrokerName) {if (lastBrokerName == null) { //没有指定BrokerName,随机获取一个 MessageQueuereturn selectOneMessageQueue();} else {int index = this.sendWhichQueue.getAndIncrement();//获取数据数for (int i = 0; i < this.messageQueueList.size(); i++) {int pos = Math.abs(index++) % this.messageQueueList.size(); //获取不越界的随机数if (pos < 0)pos = 0;MessageQueue mq = this.messageQueueList.get(pos);if (!mq.getBrokerName().equals(lastBrokerName)) { //通过 BrokerName 指定要返回的 MessageQueuereturn mq;}}return selectOneMessageQueue();}}public MessageQueue selectOneMessageQueue() {int index = this.sendWhichQueue.getAndIncrement(); //获取随机数int pos = Math.abs(index) % this.messageQueueList.size(); //获取不越界的随机数if (pos < 0)pos = 0;return this.messageQueueList.get(pos); //随机获取一个 MessageQueue}
}

当RocketMQ在发送消息时,Producer 会获取 Message 中的 topic 的信息,找到跟 topic 相关的发布路由信息TopicPublishInfo。

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//获取TopicPublishInfo 信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { // topic发布信息可使用boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//寻找发送消息的 MessageQueueMessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);//更新 latencyFaultTolerance 的 latency 时间//……}

如果在本地的 topicPublishInfoTable 已存在并且是可用的 ,则直接使用,如果不存在,则先在本地加入一个空的topic发布信息,再向 NameServer 获取 topic 的路由信息更新本地 topicPublishInfoTable 信息。

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//从本地topic发布信息表中获取if (null == topicPublishInfo || !topicPublishInfo.ok()) { //如果本地不存在或者本地的 topicPublishInfo 是不可用的this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); //在本地增加一个空this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); //通过远程 NameServer 更新本地 topic 相关信息topicPublishInfo = this.topicPublishInfoTable.get(topic);}if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo; //返回本地可用} else {//如果指定的 topic 不存在,则通过 defaultMQProducer 默认的“TBW102” 进行再次获取 topic 发布信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

获取可使用的 topic 发布信息后, 则通过 selectOneMessageQueue() 负载均衡获取一个发送消息的队列 MessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);}

selectOneMessageQueue() 通过 MQFaultStrategy 通过 latencyFaultTolerance 进行负载均衡,latencyFaultTolerance是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;
整个 MQFaultStrategy 的 selectOneMessageQueue 过程如下:
在这里插入图片描述

public class MQFaultStrategy {private final static InternalLogger log = ClientLogger.getLog();private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); //负载均很策略private boolean sendLatencyFaultEnable = false; //是否开始,默认不开启private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; //latency 时间的标准时间private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};//退避时间//选出 指定BrokerName下的一个 MessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) { //如果开启 latencyFaultTolerance 策略try {int index = tpInfo.getSendWhichQueue().getAndIncrement(); //随机获取一个数据数for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//先通过随机算法获取一个 MessageQueue //通过latencyFaultTolerance 判断当前 broker 是否已过退避时间//如果已过退避时间,则表示当前 MessageQueue 处于可用状态if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { //如果选出的 MessageQueue 所处的 broker 就是指定的 broker,则此 MessageQueue 就是最终选出的 MessageQueueif (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}//如果 TopicPublishInfo 中所有的 MessageQueue 都不可用,则获取一个响应最好的 broker 创建一个信息 MessageQueuefinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//选出最优的Broker——即最小的回避时间,如果回避时间一样则选择最大的开始时间(回避到期时间最小)int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);//获取最优 broker 的写消息的队列的格式 if (writeQueueNums > 0) {//如果最优 broker 下存在可以入的队列final MessageQueue mq = tpInfo.selectOneMessageQueue(); //随机获取一个 MessageQueue 对象if (notBestBroker != null) { //如果 最优 broker 存在则新建一个 MessageQueuemq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq; //返回新的MessageQueue} else {latencyFaultTolerance.remove(notBestBroker);//如果没有可写的队列,则从策略中移除当前 broker}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);//如果没有开启 使用随机获取 BrokerName 下的一个MessageQueue}
}

RocketMQ中 latencyFaultTolerance 中主要是保存回避时间信息和使用回避时间,开始时间判断MessageQueue的可用以及回避时间的更新操作

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);//记录故障项private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();//更新public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {FaultItem old = this.faultItemTable.get(name); //获取 name(brokerName) 的故障项if (null == old) {//不存在则创建 broker 的故障项final FaultItem faultItem = new FaultItem(name); //根据 broker 的name 创建faultItem.setCurrentLatency(currentLatency); //退避时间faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //退避结束的开始时间old = this.faultItemTable.putIfAbsent(name, faultItem); //存入tableif (old != null) { //如果已经存在old.setCurrentLatency(currentLatency); //更新退避时间old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //退避结束的开始时间}} else { //存在 broker 的故障项old.setCurrentLatency(currentLatency); //更新退避时间old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); //退避结束的开始时间}}//判断 broker 是否可用public boolean isAvailable(final String name) {final FaultItem faultItem = this.faultItemTable.get(name); //获取 broker 的故障项if (faultItem != null) {return faultItem.isAvailable(); //判断当前时间是否大于等于退避结束时间}return true;}//移除 name 的broker 的故障项public void remove(final String name) { this.faultItemTable.remove(name);}@Overridepublic String pickOneAtLeast() {final Enumeration<FaultItem> elements = this.faultItemTable.elements();List<FaultItem> tmpList = new LinkedList<FaultItem>();while (elements.hasMoreElements()) {final FaultItem faultItem = elements.nextElement();tmpList.add(faultItem);}if (!tmpList.isEmpty()) {Collections.shuffle(tmpList);Collections.sort(tmpList);final int half = tmpList.size() / 2;if (half <= 0) {return tmpList.get(0).getName();} else {final int i = this.whichItemWorst.getAndIncrement() % half;return tmpList.get(i).getName();}}return null;}//故障项对象,主要是保存broker的需要退避的时间和退避后的开始时间class FaultItem implements Comparable<FaultItem> {private final String name;//broker 名称private volatile long currentLatency; //退避时间private volatile long startTimestamp; //过了退避期的开始时间public FaultItem(final String name) {this.name = name;}//判断最优,先比较是否可用,如果当前对象可用,则最优;可用性相同则比较退避时间,退避时间越短则最优;//退避时间相等则比较退出退避的开始时间,谁先退出——即开始可用谁最优public int compareTo(final FaultItem other) {if (this.isAvailable() != other.isAvailable()) {if (this.isAvailable())return -1;if (other.isAvailable())return 1;}if (this.currentLatency < other.currentLatency)return -1;else if (this.currentLatency > other.currentLatency) {return 1;}if (this.startTimestamp < other.startTimestamp)return -1;else if (this.startTimestamp > other.startTimestamp) {return 1;}return 0;}//当前时间是否大于回避时间public boolean isAvailable() {return (System.currentTimeMillis() - startTimestamp) >= 0;}}
}

Consumer 的负载均衡

在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列—队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

Broker 端保存Consumer的订阅信息:Broker 收到 Consumer 发送的心跳消息后,会将订阅元数据保存到 ConsumerManager 的 topicConfigTable 中,在 topicConfigTable 保存客户端的通信信息和客户端订阅MQ的元数据

    public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {RemotingCommand response = RemotingCommand.createResponseCommand(null);HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);//解码心跳数据ClientChannelInfo clientChannelInfo = new ClientChannelInfo(ctx.channel(),heartbeatData.getClientID(),request.getLanguage(),request.getVersion());//获取客户端信息//解析客户端的 Consumer 的订阅元数据for (ConsumerData data : heartbeatData.getConsumerDataSet()) {//获取 broker 中 Consumer 的订阅元数据SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());boolean isNotifyConsumerIdsChangedEnable = true;if (null != subscriptionGroupConfig) { //如果已存在订阅消息//是否需要更新 Consumer 的idisNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0;if (data.isUnitMode()) {topicSysFlag = TopicSysFlag.buildSysFlag(false, true);}//创建 retry 的 topic,如果已存在直接返回String newTopic = MixAll.getRetryTopic(data.getGroupName());this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,subscriptionGroupConfig.getRetryQueueNums(),PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);}//注册保存 Consumer 的订阅元数据boolean changed = this.brokerController.getConsumerManager().registerConsumer(data.getGroupName(), //消费组名clientChannelInfo, //消费的客户端信息data.getConsumeType(), //消费方式 pull/pushdata.getMessageModel(), //消费模式 广播/集群data.getConsumeFromWhere(), //消费指针 接着上次消费/重头消费/指定时间消费等data.getSubscriptionDataSet(),//订阅元数据 包含订阅的topic ,tag等信息isNotifyConsumerIdsChangedEnable );if (changed) {log.info("registerConsumer info changed {} {}",data.toString(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}}for (ProducerData data : heartbeatData.getProducerDataSet()) {this.brokerController.getProducerManager().registerProducer(data.getGroupName(),clientChannelInfo);}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}//ConsumerManager
public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); //是否消费组的消费信息if (null == consumerGroupInfo) { //不存在则保存新的  ConsumerGroupInfoConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); //保存新的组的消费信息consumerGroupInfo = prev != null ? prev : tmp;}//更新 consumerGroupInfo 信息boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);boolean r2 = consumerGroupInfo.updateSubscription(subList); //更新订阅的元数据信息if (r1 || r2) {if (isNotifyConsumerIdsChangedEnable) {this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());}}this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2;}/*** 更新客户端通信信息*/       public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType,MessageModel messageModel, ConsumeFromWhere consumeFromWhere) {boolean updated = false;this.consumeType = consumeType;this.messageModel = messageModel;this.consumeFromWhere = consumeFromWhere;//找到 客户端的通信 channel 的客户端信息    ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); if (null == infoOld) {//没有的话直接存入新的 channel 的客户端信息ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); if (null == prev) {log.info("new consumer connected, group: {} {} {} channel: {}", this.groupName, consumeType,messageModel, infoNew.toString());updated = true;}infoOld = infoNew;} else {//如果已经存在了 channel 判断 channel 是否已经发生变化,变化了则更新客户端信息   if (!infoOld.getClientId().equals(infoNew.getClientId())) { log.error("[BUG] consumer channel exist in broker, but clientId not equal. GROUP: {} OLD: {} NEW: {} ",this.groupName,infoOld.toString(),infoNew.toString());this.channelInfoTable.put(infoNew.getChannel(), infoNew);}}this.lastUpdateTimestamp = System.currentTimeMillis();infoOld.setLastUpdateTimestamp(this.lastUpdateTimestamp);return updated;}/*** 更新Consumer的订阅元数据*/public boolean updateSubscription(final Set<SubscriptionData> subList) {boolean updated = false;for (SubscriptionData sub : subList) {SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); //获取 topic的订阅数据if (old == null) {//不存在topic的订阅数据则更新SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); if (null == prev) {updated = true;log.info("subscription changed, add new topic, group: {} {}",this.groupName,sub.toString());}//存在的话则比较版本号,如果是新的版本号,则更新为新的版本号} else if (sub.getSubVersion() > old.getSubVersion()) { if (this.consumeType == ConsumeType.CONSUME_PASSIVELY) {log.info("subscription changed, group: {} OLD: {} NEW: {}",this.groupName,old.toString(),sub.toString());}this.subscriptionTable.put(sub.getTopic(), sub);}}//移除已经过时的订阅元数据Iterator<Entry<String, SubscriptionData>> it = this.subscriptionTable.entrySet().iterator();while (it.hasNext()) {Entry<String, SubscriptionData> next = it.next();String oldTopic = next.getKey();boolean exist = false;for (SubscriptionData sub : subList) {if (sub.getTopic().equals(oldTopic)) {exist = true;break;}}if (!exist) {log.warn("subscription changed, group: {} remove topic {} {}",this.groupName,oldTopic,next.getValue().toString());it.remove();updated = true;}}this.lastUpdateTimestamp = System.currentTimeMillis();return updated;}

这样客户端的 Consumer 信息就保存到了 broker 中,broker 保存的消息提供给客户端进行负载均衡使用,当 Consumer 实例在启动的时候,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。

//MQClientInstance.javapublic void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// If not specified,looking address from name serverif (null == this.clientConfig.getNamesrvAddr()) {this.mQClientAPIImpl.fetchNameServerAddr();}// Start request-response channelthis.mQClientAPIImpl.start();// Start various schedule tasksthis.startScheduledTask();// Start pull servicethis.pullMessageService.start();// Start rebalance servicethis.rebalanceService.start(); //负载均衡启动// Start push servicethis.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}}//RebalanceService.javapublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);this.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");}

RebalanceService 线程的run()方法最终调用的是 RebalanceImpl 类的rebalanceByTopic()方法,该方法是实现Consumer端负载均衡的核心。ebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程:

  1. 从rebalanceImpl实例的获取本地的本地的订阅信息subscriptionInner,在后通过本地的订阅 topic 在本地缓存变量—topicSubscribeInfoTable中,获取该Topic主题下的消息消费队列集合(mqSet)。
  2. 集群模式下根据topic和consumerGroup为参数调用mQClientFactory.findConsumerIdList()方法向Broker端获取该消费组下消费者Id列表;广播模式不需要知道具体的该消费组下消费者,因为所有的消费者都要参与消费。
  3. 先对Topic下的消息消费队列、消费者Id排序,然后用消息队列分配策略算法(默认为:消息队列的平均分配算法),计算出待拉取的消息队列。
  4. 然后,调用updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。
    在这里插入图片描述
    红色部分,表示与分配到的消息队列集合mqSet互不包含的部分,设置ProcessQueue 的 Dropped属性为true,然后进行移除操作;绿色部分,表示与分配到的消息队列集合mqSet的交集但ProcessQueue已经过期,在Push模式的,设置ProcessQueue 的 Dropped属性为true,然后进行移除操作。
  5. 最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),并创建拉取请求对象—pullRequest添加到拉取列表—pullRequestList中,最后执行dispatchPullRequest()方法,将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。
RebalanceImpl.javapublic void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); //获取本地的订阅信息if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {this.rebalanceByTopic(topic, isOrder); //根据 topic 进行负载均衡信息} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}this.truncateMessageQueueNotMyTopic();//移除不是客户端订阅的topic 信息}private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: { //广播模式Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //获取订阅的topic的MessageQueueif (mqSet != null) {//更新 ProcessQueueTable 中的信息boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder); if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {//集群模式//获取订阅的topic的MessageQueue    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //通过broker根据订阅组consumerGroup获取订阅了该topic 的 ConsumerIdList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll); //将所有MessageQueue排序Collections.sort(cidAll);//将所有消费端Consumer排序/*** AllocateMessageQueueStrategy 消息队列分配策略算法  *机房下分配算法(机房内可以使用其他分配算法) AllocateMachineRoomNearby  *一致的哈希队列算法 AllocateMessageQueueConsistentHash*平均分配算法(默认使用) AllocateMessageQueueAveragely*机房哈希队列算法 AllocateMessageQueueByMachineRoom*指定分配 AllocateMessageQueueByConfig*循环平均分配算法 AllocateMessageQueueAveragelyByCircle*/   AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null;try {//通过分配算法获取本ClientId 下分配到的 MessageQueueallocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//通过负载均衡到的 MessageQueue 更新 ProcessQueueTable 中的信息boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);//如果 ProcessQueueTable 已经改动,则通知 push/pull 进行新的  ProcessQueueTable 处理操作//push 下 更新拉取线程的 每次拉取Message个数(控制阈值),pull下不做处理this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}

AllocateMessageQueueStrategy 消息队列分配策略算法有很多的实现方式,我们这里看一下默认使用的策略——平均分配算法,这里的平均分配算法,类似于分页的算法,将所有MessageQueue排好序类似于记录,将所有消费端Consumer排好序类似页数,并求出每一页需要包含的平均size和每个页面记录的范围range,最后遍历整个range而计算出当前Consumer端应该分配到的记录(这里即为:MessageQueue)。

public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {private final InternalLogger log = ClientLogger.getLog();@Overridepublic List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,List<String> cidAll) {if (currentCID == null || currentCID.length() < 1) {//判断当前的client id 是否合法throw new IllegalArgumentException("currentCID is empty");}if (mqAll == null || mqAll.isEmpty()) {//判断topic下是否存在 MessageQueuethrow new IllegalArgumentException("mqAll is null or mqAll empty");}if (cidAll == null || cidAll.isEmpty()) { //判断topic下是否存在消费端 consumer idthrow new IllegalArgumentException("cidAll is null or cidAll empty");}List<MessageQueue> result = new ArrayList<MessageQueue>();//如果 topic 的所有消费者中不包含当前消费者,则当前消费者分配到空的消费,即不参与消费if (!cidAll.contains(currentCID)) {log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",consumerGroup,currentCID,cidAll);return result;}int index = cidAll.indexOf(currentCID); //当前消费者的消费排位——当前页//整页分配后余下的个数,即 mqAll是否是cidAll整数倍,如果是整数倍的话则意味着所有页都是一样的,// 如果不能平均分配的话,则当前页多分配一条,当前页后用除数,则当前页前面的页当好多分配完余数int mod = mqAll.size() % cidAll.size(); //如果 MessageQueue 个数小于等于消费者数量,则每页数设置为1//如果 MessageQueue 个数等于消费者数量的倍数,则每页数设置为倍数//如果 MessageQueue 个数不能为消费者数量的整除,则当前页前面的页中个数为除数+1个,后边页数为整除数int averageSize =mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()+ 1 : mqAll.size() / cidAll.size());int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; //当前页的起始游标int range = Math.min(averageSize, mqAll.size() - startIndex);//当前页的个数for (int i = 0; i < range; i++) {result.add(mqAll.get((startIndex + i) % mqAll.size()));}return result;}@Overridepublic String getName() {return "AVG";}
}

通过负载均衡到的 MessageQueue 列表需要跟本地正在进行消费的 ProcessQueueTable 中执行消费的 ProcessQueue 列表进行对比筛选过滤, ProcessQueue 为MessageQueue的消费信息,如果 ProcessQueueTable 与分配到的消息队列集合 mqSet 做对比,将不包含在 mqSet 的 MessageQueue 的 ProcessQueue 移除 ProcessQueueTable ,如果包容部分的 ProcessQueue 有过期的,也需要进行移除

    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,final boolean isOrder) {boolean changed = false;Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();if (mq.getTopic().equals(topic)) { // 是否是订阅 topic 下的 MessageQueue//当前  ProcessQueue 消费的 MessageQueue 是否存在分配到的 mqSet 中if (!mqSet.contains(mq)) { //不存在则设置 ProcessQueue 的 Dropped 为true pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) { //判断是否需要移除消费it.remove(); //移除消费changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) { //判断 ProcessQueue 是否已过期switch (this.consumeType()) { //不同模式下的过期操作case CONSUME_ACTIVELY: //pull 下不用处理break;case CONSUME_PASSIVELY: //push 下需要移除处理pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}}}//初始化 PullRequest 请求List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) { //不再 processQueueTable 的不用处理if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq); //移除 MessageQueue 的消费指针ProcessQueue pq = new ProcessQueue();long nextOffset = this.computePullFromWhere(mq); //设置消费起始位置if (nextOffset >= 0) {//重新放入 MessageQueue 的执行 ProcessQueueProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { //已经存在的不做处理log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {//新入 processQueueTable 的需要初始化 PullRequest 请求(供 push 消费模式使用)log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);PullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}}}//分发 PullRequest 请求,pull不做处理, push 新开批量拉取 Message 线程 (pull 包装成 push)this.dispatchPullRequest(pullRequestList); return changed;}

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

查看全文
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!

相关文章

  1. Cheat Engine 新手教程

    Cheat EngineCheat Engine 新手教程Step - 1 - 怎样通关【简介】Step - 2 - 数值搜索Step - 3 - 未知数值Step - 4 - 浮点类型Step - 5 - 代码查找器Step - 6 - 指针Step - 7 - 代码注入Step - 8 - 多重指针第0重指针第1重指针第2重指针第3重指针第4重指针Step - 9 - 共享代码…...

    2024/5/8 11:32:49
  2. shell编程--正则表达式3(awk 工具使用方法、sort 工具、uniq 工具、tr 工具)

    文章目录一、 awk工具1.1 awk常见用法1.2 awk对文件的处理过程1.3 awk常见的内置变量二、 用法示例2.1 按行输出文本2.2 按字段输出文本2.3 通过管道、双引号调用 Shell 命令三、工具3.1 sort工具3.2 uniq工具3.3 tr工具 一、 awk工具 在 Linux/UNIX 系统中,awk 是一个功能强大…...

    2024/4/25 17:49:14
  3. 美团面试绝命7问答案以及解析Object 0 = new Object();

    问题:关于 Object o = new Object(); 1、解释一下对象的创建过程; 2、DCL(double check lock)写法中需不需要添加volatile; 3、对象在内存中的存储布局; 4、对象头具体包括什么; 5、对象怎么定位; 6、对象怎么分配; 7、Object o = new Object();在内存中占用多少字节;…...

    2024/4/24 21:25:53
  4. vue+elementUI日期选择器实现选择日期,自动对应周几,并且是包含所选择日期的一周的日期

    vue+elementUI日期选择器实现选择日期,自动对应周几之前在公司的项目里面遇到这个问题,如今想起来在这里总结一下,如果有更好的方法,请各位大神多多指教。在template里面给选择日期的el-date-picker一个监听事件change,我定义的是toWeek事件,v-show根据自己项目的需求可以…...

    2024/4/23 8:40:01
  5. 腾讯云TDSQL支撑富途核心系统,单日查询峰值超过50亿次

    受港股美股行情刺激,港股美股互联网券商领导者富途证券旗下富途牛牛App注册用户数近千万,开户客户数破百万。高速发展的背后,富途证券用创新技术打造的金融科技支撑系统发挥着重要作用。日前,富途技术团队透露,支撑富途证券核心系统的腾讯云分布式数据库TDSQL,单日查询峰…...

    2024/4/29 18:48:56
  6. Vue学习笔记——4.事件处理

    内联处理器中的方法 有时也需要在内联语句处理器中访问原始的 DOM 事件。可以用特殊变量 $event 把它传入方法: <!DOCTYPE html> <html lang="en"> <head><meta charset="UTF-8"><title>Title</title><script src…...

    2024/4/18 14:10:02
  7. 数据库审计系统基本原理与部署方式

    数据库审计系统简介什么是数据库审计?数据库审计是记录数据库被访问行为的日志系统。访问数据库的一般有两种行为,一种是应用服务区的访问,一种是数据库运维人员的访问。数据库审计(简称DBAudit)能够实时记录网络上的数据库活动,对数据库操作进行细粒度审计的合规性管理,…...

    2024/4/15 16:46:14
  8. 最新Landsat数据简介及批量下载介绍2020.07.29测试成功

    一、Landsat简介 美国NASA的陆地卫星(Landsat)计划,从1972年7月23日以来, 已发射8颗(第6颗发射失败)。目前Landsat1—4均相继失效,Landsat 5仍在超期运行(从1984年3月1日发射至今)。 Landsat 7 于1999年4月15日发射升空。 Landsat 8 于2013年2月11日发射升空, 经过100天…...

    2024/4/15 16:46:17
  9. iOS平台基于ffmpeg的视频直播系统源码技术揭秘

    iOS平台基于ffmpeg的视频直播技术揭秘 现在非常流行直播,相信很多人都跟我一样十分好奇这个技术是如何实现的,正好最近在做一个ffmpeg的项目,发现这个工具很容易就可以做直播,下面来给大家分享下技术要点: 首先你得编译出ffmpeg运行所需的静态库,这个百度一下有很多内容,这里我…...

    2024/4/15 16:46:12
  10. 感染危险的.C1H .C2H .C3H .C4H勒索病毒(GlobeImposter家族病毒)如何应对清除?

    GlobeImposter勒索病毒会对计算机数据文件进行加密,并要求为解密密钥/软件付费。在加密过程中,所有文件都以“ .C4H ”为扩展名;例如最初名为“ 1.jpg ” 加密后为“ 1.jpg.C4H ”。 该勒索软件的其他变体有“ .C1H ”和“ .C3H ”等。网络最近又发布了另外两个GlobeImposte…...

    2024/4/15 16:46:12
  11. Amazentis推出含突破性细胞营养Mitopure™的Timeline™

    Mitopure™是同类首创细胞营养素,经临床证明可增强细胞功能、增强肌肉力量,追求健康的消费者现在可通过Timeline Nutrition产品获得*。瑞士洛桑和旧金山--(美国商业资讯)--先锋生命科学公司Amazentis今天发布临床新发现,并在美国发售Timeline Cellular Nutrition。经过领先科…...

    2024/4/15 16:46:10
  12. Android的.dex、.odex与.oat文件扫盲

    文章目录一、前言,发现新玩意二、.dex、.odex与.oat文件介绍1、dex文件2、vdex文件3、odex文件4、oat文件 一、前言,发现新玩意 最近Unity项目打出的Android包在红米Note4真机上运行闪退了,查看日志如下: 07-28 17:56:49.623 7368 7368 D YSDK d.OnSupport: Device OAID…...

    2024/4/15 16:46:09
  13. netty简单入门案例

    Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。我们在网络编程的时候一般都会使用该框架作为基础框架,下面使用netty来实现简单的群聊系统来认识netty的基本使用方法。服务端:public class GroupChatServer {p…...

    2024/4/15 16:46:08
  14. Nuke与Natron的区别是什么?

    以前的教程都是说Natron这个开源的视频后期合成软件的优势和用处的,虽然也拿它和AE或者Nuke对比过,不过还是说的不多,说的不够。这里说的不够多的就是Natron的缺点和不足。当然我们依旧相信,Natron是免费正版市场中很优秀(当然不能说最优秀)的视频后期特效软件。 和商业软…...

    2024/4/15 16:46:07
  15. javaweb-Servlet&HTTP&Request-033

    文章目录Servlet:HTTP:Request:案例 Servlet: 1. 概念 2. 步骤 3. 执行原理 4. 生命周期 5. Servlet3.0 注解配置 6. Servlet的体系结构 每次继承抽象方法需要重写五个,但是实际使用的就一个,简便? Servlet -- 接口|GenericServlet -- 抽象类如果继承他,只需要写一个se…...

    2024/4/25 12:50:23
  16. sql中旅特殊符号转意

    特殊字符 --------------------------- 替代符号 & & < <>" " ’ ’ 1 2 3 4 5 6 7 8 9 小于等于 a<=b a <= b a <![CDATA[<= ]]>b 大于等于 a>=b a >= b a <![CDATA[…...

    2024/4/28 16:56:09
  17. 数据结构复习(更新ing.......)---基于浙大MOOC

    数据结构复习第一讲 基本概念什么是数据结构什么是算法算法的基本特征算法复杂度抽象数据类型应用实例:最大子列和问题第二讲 线性结构第三讲 树(上)第四讲 树(下)第五讲 图(上)第六讲 图(下)第七讲 排序(上)第八讲 排序(下)第九讲 散列查找第十讲 习题 如有错误,…...

    2024/4/29 9:14:34
  18. Java反射异常java.lang.IllegalAccessException

    抽取CRUD做父类 测试过程中调用出现如下异常: Java反射非法访问异常 java.lang.IllegalAccessException: Class com.hope.dao.BaseDao can not access a member of class com.hope.entity.User with modifiers “private”错误信息: java.lang.IllegalAccessException: Class…...

    2024/4/15 16:46:06
  19. print()和其关键字参数

    1)end关键字参数 print()函数自动在传入的字符串末尾添加了换行符。但是,可以设置 end 关键字参数,将它变成另一个字符串。 未添加之前: print(Jack Ma) print(Pony Ma) print(Lei Jun)打印结果会自动换行: Jack Ma Pony Ma Lei Jun添加end关键字后将换行符替换掉: print…...

    2024/4/15 16:46:02
  20. Python面向对象练习题汇总

    1. 什么是类,什么是对象?类:对一类事物的描述,是抽象的、概念上的定义。 对象:实际存在的该类事物的每个个体,因而也称实例(instance)。类是对象的抽象,对象是类的实例。2. python中如何定义一个类的格式 class 类名(object):成员(方法)3. 类(class)由哪三个部分构成 类…...

    2024/4/24 3:30:53

最新文章

  1. delphi6直连redis服务(用lua脚本redis模块)

    一、创建一个exe程序 创建一个exe程序,引用LuaRedis.pas单元(此单元自己封装的代码,目前主要封装了获取key和设置key/value功能),代码如下: unit Unit1;interfaceusesWindows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,Dialogs, StdCtrls;type…...

    2024/5/8 14:40:40
  2. 梯度消失和梯度爆炸的一些处理方法

    在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言&#xff0c;在此感激不尽。 权重和梯度的更新公式如下&#xff1a; w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...

    2024/5/7 10:36:02
  3. 【springboot开发】PO、DTO等对象的基本概念

    前言&#xff1a;java是面向对象的&#xff0c;java开发中由于多层开发结构&#xff0c;涉及到各种类别的实体类。 POJO 无规则简单java对象 PO 持久化对象数据库结构到java对象的映射PO平常不一定需要实现序列化&#xff0c;只是当采用分布式存储或者需要作为前端输出及远程调…...

    2024/5/4 6:52:47
  4. Go语言中如何实现继承

    完整课程请点击以下链接 Go 语言项目开发实战_Go_实战_项目开发_孔令飞_Commit 规范_最佳实践_企业应用代码-极客时间 Go语言中没有传统意义上的类和继承的概念&#xff0c;但可以通过嵌入类型&#xff08;embedded types&#xff09;来实现类似的功能。嵌入类型允许一个结构…...

    2024/5/5 8:37:47
  5. Web大并发集群部署之集群介绍

    一、传统web访问模型 传统web访问模型完成一次请求的步骤 1&#xff09;用户发起请求 2&#xff09;服务器接受请求 3&#xff09;服务器处理请求&#xff08;压力最大&#xff09; 4&#xff09;服务器响应请求 传统模型缺点 单点故障&#xff1b; 单台服务器资源有限&…...

    2024/5/7 1:33:15
  6. 416. 分割等和子集问题(动态规划)

    题目 题解 class Solution:def canPartition(self, nums: List[int]) -> bool:# badcaseif not nums:return True# 不能被2整除if sum(nums) % 2 ! 0:return False# 状态定义&#xff1a;dp[i][j]表示当背包容量为j&#xff0c;用前i个物品是否正好可以将背包填满&#xff…...

    2024/5/7 19:05:20
  7. 【Java】ExcelWriter自适应宽度工具类(支持中文)

    工具类 import org.apache.poi.ss.usermodel.Cell; import org.apache.poi.ss.usermodel.CellType; import org.apache.poi.ss.usermodel.Row; import org.apache.poi.ss.usermodel.Sheet;/*** Excel工具类** author xiaoming* date 2023/11/17 10:40*/ public class ExcelUti…...

    2024/5/7 22:31:36
  8. Spring cloud负载均衡@LoadBalanced LoadBalancerClient

    LoadBalance vs Ribbon 由于Spring cloud2020之后移除了Ribbon&#xff0c;直接使用Spring Cloud LoadBalancer作为客户端负载均衡组件&#xff0c;我们讨论Spring负载均衡以Spring Cloud2020之后版本为主&#xff0c;学习Spring Cloud LoadBalance&#xff0c;暂不讨论Ribbon…...

    2024/5/8 1:37:40
  9. TSINGSEE青犀AI智能分析+视频监控工业园区周界安全防范方案

    一、背景需求分析 在工业产业园、化工园或生产制造园区中&#xff0c;周界防范意义重大&#xff0c;对园区的安全起到重要的作用。常规的安防方式是采用人员巡查&#xff0c;人力投入成本大而且效率低。周界一旦被破坏或入侵&#xff0c;会影响园区人员和资产安全&#xff0c;…...

    2024/5/7 14:19:30
  10. VB.net WebBrowser网页元素抓取分析方法

    在用WebBrowser编程实现网页操作自动化时&#xff0c;常要分析网页Html&#xff0c;例如网页在加载数据时&#xff0c;常会显示“系统处理中&#xff0c;请稍候..”&#xff0c;我们需要在数据加载完成后才能继续下一步操作&#xff0c;如何抓取这个信息的网页html元素变化&…...

    2024/5/8 1:37:39
  11. 【Objective-C】Objective-C汇总

    方法定义 参考&#xff1a;https://www.yiibai.com/objective_c/objective_c_functions.html Objective-C编程语言中方法定义的一般形式如下 - (return_type) method_name:( argumentType1 )argumentName1 joiningArgument2:( argumentType2 )argumentName2 ... joiningArgu…...

    2024/5/7 16:57:02
  12. 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】

    &#x1f468;‍&#x1f4bb;博客主页&#xff1a;花无缺 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 花无缺 原创 收录于专栏 【洛谷算法题】 文章目录 【洛谷算法题】P5713-洛谷团队系统【入门2分支结构】&#x1f30f;题目描述&#x1f30f;输入格…...

    2024/5/7 14:58:59
  13. 【ES6.0】- 扩展运算符(...)

    【ES6.0】- 扩展运算符... 文章目录 【ES6.0】- 扩展运算符...一、概述二、拷贝数组对象三、合并操作四、参数传递五、数组去重六、字符串转字符数组七、NodeList转数组八、解构变量九、打印日志十、总结 一、概述 **扩展运算符(...)**允许一个表达式在期望多个参数&#xff0…...

    2024/5/7 1:54:46
  14. 摩根看好的前智能硬件头部品牌双11交易数据极度异常!——是模式创新还是饮鸩止渴?

    文 | 螳螂观察 作者 | 李燃 双11狂欢已落下帷幕&#xff0c;各大品牌纷纷晒出优异的成绩单&#xff0c;摩根士丹利投资的智能硬件头部品牌凯迪仕也不例外。然而有爆料称&#xff0c;在自媒体平台发布霸榜各大榜单喜讯的凯迪仕智能锁&#xff0c;多个平台数据都表现出极度异常…...

    2024/5/7 21:15:55
  15. Go语言常用命令详解(二)

    文章目录 前言常用命令go bug示例参数说明 go doc示例参数说明 go env示例 go fix示例 go fmt示例 go generate示例 总结写在最后 前言 接着上一篇继续介绍Go语言的常用命令 常用命令 以下是一些常用的Go命令&#xff0c;这些命令可以帮助您在Go开发中进行编译、测试、运行和…...

    2024/5/8 1:37:35
  16. 用欧拉路径判断图同构推出reverse合法性:1116T4

    http://cplusoj.com/d/senior/p/SS231116D 假设我们要把 a a a 变成 b b b&#xff0c;我们在 a i a_i ai​ 和 a i 1 a_{i1} ai1​ 之间连边&#xff0c; b b b 同理&#xff0c;则 a a a 能变成 b b b 的充要条件是两图 A , B A,B A,B 同构。 必要性显然&#xff0…...

    2024/5/7 16:05:05
  17. 【NGINX--1】基础知识

    1、在 Debian/Ubuntu 上安装 NGINX 在 Debian 或 Ubuntu 机器上安装 NGINX 开源版。 更新已配置源的软件包信息&#xff0c;并安装一些有助于配置官方 NGINX 软件包仓库的软件包&#xff1a; apt-get update apt install -y curl gnupg2 ca-certificates lsb-release debian-…...

    2024/5/7 16:04:58
  18. Hive默认分割符、存储格式与数据压缩

    目录 1、Hive默认分割符2、Hive存储格式3、Hive数据压缩 1、Hive默认分割符 Hive创建表时指定的行受限&#xff08;ROW FORMAT&#xff09;配置标准HQL为&#xff1a; ... ROW FORMAT DELIMITED FIELDS TERMINATED BY \u0001 COLLECTION ITEMS TERMINATED BY , MAP KEYS TERMI…...

    2024/5/8 1:37:32
  19. 【论文阅读】MAG:一种用于航天器遥测数据中有效异常检测的新方法

    文章目录 摘要1 引言2 问题描述3 拟议框架4 所提出方法的细节A.数据预处理B.变量相关分析C.MAG模型D.异常分数 5 实验A.数据集和性能指标B.实验设置与平台C.结果和比较 6 结论 摘要 异常检测是保证航天器稳定性的关键。在航天器运行过程中&#xff0c;传感器和控制器产生大量周…...

    2024/5/7 16:05:05
  20. --max-old-space-size=8192报错

    vue项目运行时&#xff0c;如果经常运行慢&#xff0c;崩溃停止服务&#xff0c;报如下错误 FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory 因为在 Node 中&#xff0c;通过JavaScript使用内存时只能使用部分内存&#xff08;64位系统&…...

    2024/5/8 1:37:31
  21. 基于深度学习的恶意软件检测

    恶意软件是指恶意软件犯罪者用来感染个人计算机或整个组织的网络的软件。 它利用目标系统漏洞&#xff0c;例如可以被劫持的合法软件&#xff08;例如浏览器或 Web 应用程序插件&#xff09;中的错误。 恶意软件渗透可能会造成灾难性的后果&#xff0c;包括数据被盗、勒索或网…...

    2024/5/8 1:37:31
  22. JS原型对象prototype

    让我简单的为大家介绍一下原型对象prototype吧&#xff01; 使用原型实现方法共享 1.构造函数通过原型分配的函数是所有对象所 共享的。 2.JavaScript 规定&#xff0c;每一个构造函数都有一个 prototype 属性&#xff0c;指向另一个对象&#xff0c;所以我们也称为原型对象…...

    2024/5/8 12:44:41
  23. C++中只能有一个实例的单例类

    C中只能有一个实例的单例类 前面讨论的 President 类很不错&#xff0c;但存在一个缺陷&#xff1a;无法禁止通过实例化多个对象来创建多名总统&#xff1a; President One, Two, Three; 由于复制构造函数是私有的&#xff0c;其中每个对象都是不可复制的&#xff0c;但您的目…...

    2024/5/8 9:51:44
  24. python django 小程序图书借阅源码

    开发工具&#xff1a; PyCharm&#xff0c;mysql5.7&#xff0c;微信开发者工具 技术说明&#xff1a; python django html 小程序 功能介绍&#xff1a; 用户端&#xff1a; 登录注册&#xff08;含授权登录&#xff09; 首页显示搜索图书&#xff0c;轮播图&#xff0…...

    2024/5/8 1:37:29
  25. 电子学会C/C++编程等级考试2022年03月(一级)真题解析

    C/C++等级考试(1~8级)全部真题・点这里 第1题:双精度浮点数的输入输出 输入一个双精度浮点数,保留8位小数,输出这个浮点数。 时间限制:1000 内存限制:65536输入 只有一行,一个双精度浮点数。输出 一行,保留8位小数的浮点数。样例输入 3.1415926535798932样例输出 3.1…...

    2024/5/7 17:09:45
  26. 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...

    解析如下&#xff1a;1、长按电脑电源键直至关机&#xff0c;然后再按一次电源健重启电脑&#xff0c;按F8健进入安全模式2、安全模式下进入Windows系统桌面后&#xff0c;按住“winR”打开运行窗口&#xff0c;输入“services.msc”打开服务设置3、在服务界面&#xff0c;选中…...

    2022/11/19 21:17:18
  27. 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。

    %读入6幅图像&#xff08;每一幅图像的大小是564*564&#xff09; f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...

    2022/11/19 21:17:16
  28. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...

    win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面&#xff0c;在等待界面中我们需要等待操作结束才能关机&#xff0c;虽然这比较麻烦&#xff0c;但是对系统进行配置和升级…...

    2022/11/19 21:17:15
  29. 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...

    有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows&#xff0c;请勿关闭计算机”的提示&#xff0c;要过很久才能进入系统&#xff0c;有的用户甚至几个小时也无法进入&#xff0c;下面就教大家这个问题的解决方法。第一种方法&#xff1a;我们首先在左下角的“开始…...

    2022/11/19 21:17:14
  30. win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...

    置信有很多用户都跟小编一样遇到过这样的问题&#xff0c;电脑时发现开机屏幕显现“正在配置Windows Update&#xff0c;请勿关机”(如下图所示)&#xff0c;而且还需求等大约5分钟才干进入系统。这是怎样回事呢&#xff1f;一切都是正常操作的&#xff0c;为什么开时机呈现“正…...

    2022/11/19 21:17:13
  31. 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...

    Win7系统开机启动时总是出现“配置Windows请勿关机”的提示&#xff0c;没过几秒后电脑自动重启&#xff0c;每次开机都这样无法进入系统&#xff0c;此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一&#xff1a;开机按下F8&#xff0c;在出现的Windows高级启动选…...

    2022/11/19 21:17:12
  32. 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...

    有不少windows10系统用户反映说碰到这样一个情况&#xff0c;就是电脑提示正在准备windows请勿关闭计算机&#xff0c;碰到这样的问题该怎么解决呢&#xff0c;现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法&#xff1a;1、2、依次…...

    2022/11/19 21:17:11
  33. 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...

    今天和大家分享一下win7系统重装了Win7旗舰版系统后&#xff0c;每次关机的时候桌面上都会显示一个“配置Windows Update的界面&#xff0c;提示请勿关闭计算机”&#xff0c;每次停留好几分钟才能正常关机&#xff0c;导致什么情况引起的呢&#xff1f;出现配置Windows Update…...

    2022/11/19 21:17:10
  34. 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...

    只能是等着&#xff0c;别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚&#xff0c;只能是考虑备份数据后重装系统了。解决来方案一&#xff1a;管理员运行cmd&#xff1a;net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...

    2022/11/19 21:17:09
  35. 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?

    原标题&#xff1a;电脑提示“配置Windows Update请勿关闭计算机”怎么办&#xff1f;win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢&#xff1f;一般的方…...

    2022/11/19 21:17:08
  36. 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...

    关机提示 windows7 正在配置windows 请勿关闭计算机 &#xff0c;然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;关机提示 windows7 正在配…...

    2022/11/19 21:17:05
  37. 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...

    钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...

    2022/11/19 21:17:05
  38. 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...

    前几天班里有位学生电脑(windows 7系统)出问题了&#xff0c;具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面&#xff0c;长时间没反应&#xff0c;无法进入系统。这个问题原来帮其他同学也解决过&#xff0c;网上搜了不少资料&#x…...

    2022/11/19 21:17:04
  39. 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...

    本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法&#xff0c;并在最后教给你1种保护系统安全的好方法&#xff0c;一起来看看&#xff01;电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中&#xff0c;添加了1个新功能在“磁…...

    2022/11/19 21:17:03
  40. 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...

    许多用户在长期不使用电脑的时候&#xff0c;开启电脑发现电脑显示&#xff1a;配置windows更新失败&#xff0c;正在还原更改&#xff0c;请勿关闭计算机。。.这要怎么办呢&#xff1f;下面小编就带着大家一起看看吧&#xff01;如果能够正常进入系统&#xff0c;建议您暂时移…...

    2022/11/19 21:17:02
  41. 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...

    配置windows update失败 还原更改 请勿关闭计算机&#xff0c;电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容&#xff0c;让我们赶快一起来看一下吧&#xff01;配置windows update失败 还原更改 请勿关闭计算机&#x…...

    2022/11/19 21:17:01
  42. 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...

    不知道大家有没有遇到过这样的一个问题&#xff0c;就是我们的win7系统在关机的时候&#xff0c;总是喜欢显示“准备配置windows&#xff0c;请勿关机”这样的一个页面&#xff0c;没有什么大碍&#xff0c;但是如果一直等着的话就要两个小时甚至更久都关不了机&#xff0c;非常…...

    2022/11/19 21:17:00
  43. 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...

    当电脑出现正在准备配置windows请勿关闭计算机时&#xff0c;一般是您正对windows进行升级&#xff0c;但是这个要是长时间没有反应&#xff0c;我们不能再傻等下去了。可能是电脑出了别的问题了&#xff0c;来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...

    2022/11/19 21:16:59
  44. 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...

    我们使用电脑的过程中有时会遇到这种情况&#xff0c;当我们打开电脑之后&#xff0c;发现一直停留在一个界面&#xff1a;“配置Windows Update失败&#xff0c;还原更改请勿关闭计算机”&#xff0c;等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢&#xff0…...

    2022/11/19 21:16:58
  45. 如何在iPhone上关闭“请勿打扰”

    Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...

    2022/11/19 21:16:57