rabbitmq简单入门
这里写自定义目录标题
- 一、rabbitMq的概念
- 二、为什么使用rabbitMq
- 三、使用rabbitMq的缺点
- 四、MQ的选择
- 五、安装rabbitmq
- 六、RabbitMQ工作原理图
- 七、代码实现-简单工作
- 八、代码实现-多个消费者
- 八、代码实现-自动/手动应答
- 九、代码实现-消息发布确认的三种方式
- 十、代码实现-交换机
- 十一、代码实现-优先级队列
- 十二、代码实现-死信队列
- 十三、代码实现-延迟队列
- 十四、代码实现-RabbitMQ插件实现延迟队列
- 十五、代码实现-发布确认高级
- 十六、RabbitMq的界面Features属性介绍
一、rabbitMq的概念
rabbitMq
是一个消息中间件,他接收并且供第三方使用。就像快递一样,商家为生产者,快递站为MQ,而用户为消费者。
二、为什么使用rabbitMq
- 解耦
比如公司的系统A,需要将数据推送到其他不同的系统中去。这样子系统间的耦合度变高,系统A需要考虑一些列乱七八糟的因素,如其他系统挂了,需不需要重新推送、、等待问题。这时候如果使用rabbitMq
,则系统A只需要考虑将数据推送到rabbitMq
,其他系统需要消费的,自己去系统A获取即可,可以有效的做到系统A与其他系统的解耦。 - 削峰
比如公司的业务存在高峰期,如上午数据写入存在w/s
的数据量,而数据库的承受量加入只有k/s
,系统就有可能导致崩溃卡死。但是如果将消息写入rabbitmq
,由rabbitmq
控制消息读取速度小于系统承受量,这样即使在高峰时期也不会挂掉。 - 异步
比如系统A存在需求,如果接受某一份数据,则需要写入A系统和其他系统,时间是累加的。但是如果将数据发送到其他rabbitmq
,由各个系统自己实现写入,则写入的时间将会大幅度缩减。
三、使用rabbitMq的缺点
- 系统可用性
由于加入了rabbitmq
,导致系统的组件增加,部署与运维难度增加。而且如果rabbitmq
挂掉, 也会影响系统的使用。
2.数据一致性
因为加入了rabbitmq
,需要考虑的因为也会随之增加。比如消息丢失、消息重复消费、消息发送成功却是否被消费者成功消费等等、、、
四、MQ的选择
名称 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
activeMQ | 单机吞吐量万级,时效性ms级,可用性高,较低概率出现丢失数据。 | 官方社区现在对于 ActiveMQ 5.x的版本维护越来越少,高吞吐量场景较少使用。 | 早期使用的 ActiveMQ,随着其他MQ的出现使用量渐渐减少,而且没经过大规模吞吐量场景的验证,社区也不是很活跃,所以不推荐。 |
Kafka | 单机吞吐量 百万级,时效性 ms级,可用性非常高,消息可靠性可配置 0 丢失。而且是分布式的,一个数据有多个副本,少数机器宕机也不会丢失数据。 | 单机超过64个队列/分区,CPU会明显变高,队列越多越高,发送消息响应时间变长。消费失败不支持重试。 | 主要用于日志的采集与传输,一般大公司使用 |
RocketMQ | 单机吞吐量十万级,可用性高,支持分布式,消息可靠性可以做到0丢失,扩展性好,支持十亿级别消息堆积。 | 支持的客户端不多,目前只支持java和c++。 | 主要用于金融领域,被阿里广泛的用于订单、交易、重置、消息推送等场景。 |
RabbitMQ | 用erlang语言编写,性能较好,单机吞吐量万级,时效性μs级,可用性高,消息可靠性基本不丢失,而且支持大量的其他语言,社区活跃度高,更新频率高。 | 商业版需要收费,学习成本较高。 | 界面管理方便,功能完备。如果数据量较小可以推荐使用,适用于中小型公司。 |
五、安装rabbitmq
ps:由于需要的包下载较慢,可以直接从这里下载(网络较好的可以忽略):https://download.csdn.net/download/weixin_40496191/80386784
- 由于
rabbitmq
需要erlang
的支持,所以下载包之前需要确定erlang
和rabbitmq
的对应关系:https://www.rabbitmq.com/which-erlang.html - 安装ErLang
1)下载:wget https://github.com/rabbitmq/erlang-rpm/releases/tag/v21.3.1/erlang-21.3.1-1.el7.x86_64.rpm
2)安装:rpm -ivh erlang-21.3.1-1.el7.x86_64.rpm
ps1: 如果安装错误:rabbitmq-server-3.8.8-1.el7.noarch.rpm: 不是 rpm 软件包 (或者没有manifest)
。这是因为github网络访问较慢,有可能下载不全,所以如果wget
实在有问题,可以选择直接登录网站包下载下来,再传到linux
即可!
ps2:下载的版本el7
根据linux
来,可通过uname -a
查看: - 安装 RabbitMQ
1)下载依赖:yum install -y socat
2)下载包:wget https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.8.8/rabbitmq-server-3.8.8-1.el7.noarch.rpm
3)安装:rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
ps:如果安装报错,下载不全,解决办法如上! - 启动RabbitMQ:
systemctl start rabbitmq-server
- 设置RabbitMQ开机自启动:
systemctl enable rabbitmq-server
ps1:相关命令-关闭RabbitMQ:rabbitmqctl stop
ps2:相关命令-重启RabbitMQ:systemctl restart rabbitmq-server
ps3:相关命令-插件列表:rabbitmq-plugins list
ps4:相关命令-启动插件:rabbitmq-plugins enable XXX
(XXX为插件名)
ps5:相关命令-停用插件:rabbitmq-plugins disable XXX
- 查看启动后的情况 :
rabbitmqctl status
、systemctl status rabbitmq-server
- 新建一个用户
1)新建用户:rabbitmqctl add_user admin admin
2)授予管理员权限:rabbitmqctl set_user_tags admin administrator
3)设置admin可以使用的虚机权限:rabbitmqctl add_vhost admin
–>rabbitmqctl set_permissions -p admin admin ".*" ".*" ".*"
- 查看用户:
rabbitmqctl list_users
- 重启RabbitMQ:
systemctl restart rabbitmq-server
- 查看页面:
http://192.168.248.10:15672
输入账号密码:admin
/admin
进入该界面说明安装成功(这里因为做了一些代码操作,所以界面不是原始的,新安装的跟我的有些不同)
六、RabbitMQ工作原理图
producer
(生产者),即发消息的系统。connection
(连接通道):与RabbitMQ
服务器连接的TCP
通道exchange
(交换机)和queue
(队列):消息先发给交换机,再由交换机根据规则转发给队列。一个交换机可以对应多个队列。如果工作时,不省明交换机,则会使用默认交换机。consumer
(消费者):即最后消费队列消息的系统。
七、代码实现-简单工作
工作流程
由于我们后续测试都会连接rabbitmq
服务器,所以这里创建一个公共连接类。
- 创建公共连接类
package com.rabbitmqUtils;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* 创建信道* @create 2022-02-08 16:26* @desc**/
public class RabbitmqUtils {//队列名称public static final String RABBITMQ_QUEUE = "RABBITMQ_QUEUE";//交换机名称public static final String RABBITMQ_EXCHANGE = "RABBITMQ_EXCHANGE";public static Channel getChannel() throws IOException, TimeoutException {//创建一个连接工厂ConnectionFactory factory = new ConnectionFactory();//工厂ipfactory.setHost("192.168.248.10");//用户名factory.setUsername("admin");//密码factory.setPassword("admin");//创建链接Connection connection = factory.newConnection();//获取信道Channel channel = connection.createChannel();return channel;}
}
- 创建生产者
package com.rabbitmq1;import com.rabbitmq.client.Channel;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//生成队列,不创建交换机,走默认的交换机//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//发消息String message = "this is QUEUE_P1";//持续发送消息for (int i = 0; i < 10; i++) {Thread.sleep(1000);//1.交换机,简单版本不考虑,直接空字符串,即默认交换机//2.路由key,直接写队列名即可//3.参数,忽略//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, null, (message+i).getBytes());}System.out.println("消息发送成功");}
}
- 创建消费者
package com.rabbitmq1;import com.rabbitmq.client.*;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 15:34* @desc**/
public class Consume {//发消息public static void main(String[] args) throws IOException, TimeoutException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}
}
启动生产者类,查看rabbitmq
可视化界面,可以发现已经写入了10条
启动消费者,效果如下,可以看到队列消息已经被消费,则成功!
ps:如果报错ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occured
,则需要admin
授权
八、代码实现-多个消费者
工作原理图
生产者
package com.rabbitmq2;import com.rabbitmq.client.Channel;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//0是轮询、1是不公平分发,大于1则是预取值,默认为0。预取后,再进行不公平分发。channel.basicQos(0);//生成队列,//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//持续发送消息for (int i = 0; i < 10; i++) {String message="this is Product"+i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,忽略//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, null, message.getBytes());}System.out.println("消息发送成功");}
}
消费者1
package com.rabbitmq2;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程1....");Channel channel = RabbitmqUtils.getChannel();//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}
}
消费者2
package com.rabbitmq2;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程2....");Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, true, deliverCallback, cancelCallback);}
}
启动消费者1,再启动消费者2.然后启动生产者,可以看到,消息是轮询发送给两个消费者。
八、代码实现-自动/手动应答
- 概念
1)自动应答:消息从队列发送给消费者时,就已经默认消费成功。
优点:效率高。
缺点:一方面如果消费者在消费消息时候如果断开了,则消费者没有成功处理消息,而队列默认消费成功,就会造成数据丢失。另一方面,如果消费者系统性能交叉,没法及时处理消息,就会造成消息积压,内存耗尽而崩溃。
2)手动应答:消息从队列发送给消费者时,消费者需要手动确认消息,队列才会认为消费成功。
优点:数据传输较为安全,而且可操作性较高。
缺点:效率低 - 手动应答环境下的可操作性
1)根据不同消费者应答消息的效率,队列可以动态分配消息给消费者
2)对于一些特殊的队列信息,可以选择拒收,重新放回队列 - 代码实现
生产者
package com.rabbitmq3;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//生成队列,//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//持续发送消息for (int i = 0; i < 10; i++) {//发消息String message = "this is Product"+i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化,需要队列开启持久化才有效)//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");}
}
消费者1(接受,每秒接受1条,初始取5条)
package com.rabbitmq3;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程1....");Channel channel = RabbitmqUtils.getChannel();//0是轮询,默认值//1是不公平分发,即哪个消费者效率高,哪边分配的多//大于1则是预取值,即消费者一定会消费的消息数量。预取后,再进行不公平分发。channel.basicQos(5);//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {//睡眠try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));//手动应答//1.消息确认标记,2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, false, deliverCallback, cancelCallback);}
}
消费者2(拒收,每10s拒收一条,初始取2条)
package com.rabbitmq3;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/
public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程2....");Channel channel = RabbitmqUtils.getChannel();//0是轮询、1是不公平分发,大于1则是预取值,默认为0。预取后,再进行不公平分发。channel.basicQos(2);//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {//睡眠try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));//1.消息否定确认标记,2.消息是否重新被放回队列channel.basicReject(message.getEnvelope().getDeliveryTag(), true);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, false, deliverCallback, cancelCallback);}
}
可以去rabbitmq界面查看
结果:
九、代码实现-消息发布确认的三种方式
消息发布确认就是指生产者将消息发送到Broker 后,如果 Broker 收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到 Broker。对于消息发布确认一共有三种方式。
- 单个确认为每发送一次消息就进行一次确认,优点是准确无误,缺点是资源占用较大,速度较慢。1000条数据测试时间为1311ms
/*** 单个确认*/public static void publishDg() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//发送消息for (int i = 0; i < 1000; i++) {channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, ("message" + i).getBytes());//发布确认boolean flag = channel.waitForConfirms();if (flag) {System.out.println("发送成功");}}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));}
- 批量确认为每发送一批消息再进行一次确认,优点是比单个确认更快,但是无法精确定位到发送失败消息。1000条数据测试时间为170ms
public static void publishPl() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量确认大小int batchSize = 100;//批量发送消息、确认for (int i = 0; i < 1000; i++) {channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, ("message" + i).getBytes());if (i % batchSize == 0) {channel.waitForConfirms();}}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));}
- 异步确认为在发送前创建一个支持高并发的Map,key存储消息tag,values存储message,并调用监听器进行监听消息发送。一般在每次发送后用Map记录下发送消息,监听器根据结果回调相关函数,若发送成功,回调成功函数,在Map中删去该消息。发送失败,回调失败函数,在失败函数中通过Map显示该消息。异步确认在所有确认中综合性能最佳。1000条数据测试时间为43ms
public static void publishYb() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//线程安全有序的一个哈希表,适合高并发的情况//1.将序号和消息进行关联//2.轻松批量删除条目 只要给到序号//3.支持高并发ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();//--------------------监听器--------------------------//消息确认成功回调函数//1.消息的标记,2.是否为批量操作ConfirmCallback ackCallback = (deliveryTag, multiple) -> {//消息接受处理if (multiple) {//批量ConcurrentNavigableMap<Long, String> confirmd = concurrentSkipListMap.headMap(deliveryTag);confirmd.clear();} else {//非批量concurrentSkipListMap.remove(deliveryTag);}System.out.println("确认的消息:" + deliveryTag);};//消息确认失败回调函数//1.消息的标记,2.是否为批量操作ConfirmCallback nackCallback = (deliveryTag, multiple) -> {//消息未接受处理String message = concurrentSkipListMap.get(deliveryTag);System.out.println("未确认的消息:" + deliveryTag + ":" + message);};//消息监听器,监听失败和成功的消息channel.addConfirmListener(ackCallback, nackCallback);//批量发送消息、确认for (int i = 0; i < 1000; i++) {String message = ("message" + i);channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//记录发送的消息concurrentSkipListMap.put(channel.getNextPublishSeqNo(), message);}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));System.out.println(concurrentSkipListMap.size());}
十、代码实现-交换机
前面有说到,实际上生产者发送消息,消息是直接发给交换机,然后再由交换机根据相关规则分配给队列。而根据分配规则,常见的基类交换机分别有:直接交换机(direct),主题交换机(topic),标题交换机(topic),首部交换机(Headers)等。
ps:因为我这里用的是同一个交换机,如果同一个交换机,并且配置修改了,则需要删除原先的交换机,否则会报错。
- 交换机模式一fanout
这种模式跟广播一样,即发送到交换机的所有消息,都会发到交换机的所有队列中,代码如下:
生产者
package com.rabbitmq5;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}
}
消费者1
package com.rabbitmq5;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}
}
消费者2:
package com.rabbitmq5;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume02 {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "fanout");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);}
}
结果
2. 交换机模式之direct
这种方式相对于fanout来说增加了一定的限制,即消息只能够发送到交换机的固定rountKey的队列中去。代码如下:
生产者:
package com.rabbitmq6;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机
// channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "error", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}
}
消费者1:
package com.rabbitmq6;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "info");channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "warning");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("队列info/warning等待...");}
}
消费者2
package com.rabbitmq6;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "direct");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "info");channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "warning");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("队列error等待...");}
}
3. 交换机模式之topic
前面说的direct模式实际上就是rountkey绝对匹配。而topic是rountkey的模糊匹配。
*(星号)代表一个单词
#(井号)可以替代零个或多个单词
代码如下:
生产者
package com.rabbitmq7;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(RabbitmqUtils.RABBITMQ_EXCHANGE, "queue.queue.queue11", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}}
}
消费者1
package com.rabbitmq7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "*.queue.*");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("消费者|*.queue.*|等待中....");}
}
消费者2
package com.rabbitmq7;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机channel.exchangeDeclare(RabbitmqUtils.RABBITMQ_EXCHANGE, "topic");//声明一个临时队列String queueName = channel.queueDeclare().getQueue();//绑定交换机和队列的channel.queueBind(queueName, RabbitmqUtils.RABBITMQ_EXCHANGE, "queue.*.*");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功,否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(queueName, true, deliverCallback, cancelCallback);System.out.println("消费者|queue.*.*|等待中....");}
}
效果
十一、代码实现-优先级队列
队列的消费顺序一般是先进先出。但是在某些订单中业务中,我们需要给vip用户后下单,先出货的特殊权限,这时候就需要用到优先级队列。
原理,在原来先进先出的逻辑上,给队列备注优先级,最后的顺序如下:
优先级高–>优先级低–>没有备注优先级
ps:优先级的范围为0-255
生产者
package com.rabbitmq12;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//队列名称public static final String ORDER_QUEUE = "ORDER_QUEUE";//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//参数Map<String, Object> argument = new HashMap<>();argument.put("x-max-priority", 10);//设置优先级范围0-10,官方允许值是0-255。设置过大会浪费内存//生成队列,//不创建交换机,走默认的交换机//1.名称//2.队列消息是否持久化(否:存内存,是:存磁盘。默认否)//3.队列是否只供一个消费者消费,默认否//4.最后一个消费者断开连接后,是否自动删除。//5.其他参数channel.queueDeclare(ORDER_QUEUE, true, false, false, argument);//发消息String message = "this is QUEUE_P";//持续发送消息for (int i = 0; i < 10; i++) {//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,忽略//4.消息体if (i == 5) {AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();channel.basicPublish("", ORDER_QUEUE, properties, (message + i).getBytes());} else {channel.basicPublish("", ORDER_QUEUE, null, (message + i).getBytes());}}System.out.println("消息发送成功");}
}
消费者
package com.rabbitmq12;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 15:34* @desc**/
public class Consume {//队列名称public static final String ORDER_QUEUE = "ORDER_QUEUE";//发消息public static void main(String[] args) throws IOException, TimeoutException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者未成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答//3.消费者成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(ORDER_QUEUE, true, deliverCallback, cancelCallback);}
}
测试,启动生产者,再启动消费者
十二、代码实现-死信队列
死信指的是无法被消费的消息。这些消息因为一些如网络超时等原因,导致无法被消费,就成了死信消息。所以为了保证这些数据不丢失,就有了死信队列,专门对死信消息进行处理。
工作图:
代码如下:
生成者:
package com.rabbitmq8;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Product {//正常交换机public static String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明一个交换机(不需要重复声明)//channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");//设置ttl时间为10s,过期则进入死信队列AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();//发送死信消息for (int i = 0; i < 10; i++) {String message = "发送的消息" + i;//1.交换机,简单版本不考虑,直接空字符串即可(默认/无名交换机)//2.路由key,直接写队列名即可//3.参数,(消息持久化)//4.消息体channel.basicPublish(NORMAL_EXCHANGE, "normalQueue", properties, message.getBytes());}}
}
消费者(正常队列)
package com.rabbitmq8;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume01 {//正常交换机public static String NORMAL_EXCHANGE = "NORMAL_EXCHANGE";//死信交换机public static String DEAD_EXCHANGE = "DEAD_EXCHANGE";//正常队列public static String NORMAL_QUEUE = "NORMAL_QUEUE";//死信队列public static String DEAD_QUEUE = "DEAD_QUEUE";//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//声明普通和死信交换机channel.exchangeDeclare(NORMAL_EXCHANGE, "direct");channel.exchangeDeclare(DEAD_EXCHANGE, "direct");//声明死信队列channel.queueDeclare(DEAD_QUEUE, false, false, false, null);//设置参数Map<String, Object> arguments = new HashMap<>();//设置死信交换机arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key", "deadQueue");//设置正常队列长度arguments.put("x-max-length", 6);//设置过期时间,10s(一般不在这里设置,而是在生产者端配置,这样子过期时间可以由生产者随意改动)//arguments.put("x-message-ttl", "10000");//声明普通队列channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);//绑定普通交换机和队列的channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normalQueue");//绑定死信交换机和队列的channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "deadQueue");//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("拒绝");//拒绝,并且不放回队列channel.basicReject(message.getEnvelope().getDeliveryTag(), false);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, cancelCallback);System.out.println("正常队列准备消费消息......");}
}
消费者2(死信队列)
package com.rabbitmq8;import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/
public class Consume02 {//死信队列public static String DEAD_QUEUE = "DEAD_QUEUE";//接收消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息channel.basicConsume(DEAD_QUEUE, true, deliverCallback, cancelCallback);System.out.println("死信队列准备消费消息......");}
}
测试1:首先先运行死信队列,然后运行生产者。由于生产的消息没有被消费掉,消息超时自动进入死信队列
测试二:运行死信队列和正常队列,然后运行生产者。由于生产的消息被拒绝,消息超时自动进入死信队列
十三、代码实现-延迟队列
延迟队列:延迟队列里面的元素,是到达指定时候后,就对这些元素进行处理。如果订单功能,如果指定时间内不进行支付,则会取消订单。
原理图:
因为我们后续rabbitmq肯定是要在springboot框架运行,所以这里需要整合springboot。
- 在yml配置文件配置
spring:rabbitmq:host: 192.168.248.10port: 5672username: adminpassword: admin#交换机确认接口publisher-confirms: true
- 添加配置类
package com.rabbitmq9;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class Config {//普通交换机public static final String X_EXCHANGE = "X";//死信交换机public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列名称public static final String DEAD_LETTER_QUEUE = "QD";//声明xExchange 别名@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}//声明yExchange 别名@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A ttl 为10s@Bean("queueA")public Queue queuA() {Map<String, Object> argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//设置ttl,10sargument.put("x-message-ttl", 10000);//创建队列return QueueBuilder.durable(QUEUE_A).withArguments(argument).build();}//声明普通队列B ttl 为10s@Bean("queueB")public Queue queuB() {Map<String, Object> argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//设置ttl,40sargument.put("x-message-ttl", 40000);//创建队列return QueueBuilder.durable(QUEUE_B).withArguments(argument).build();}//声明普通队列C ttl 为生产者确定@Bean("queueC")public Queue queuC() {Map<String, Object> argument = new HashMap<>();//设置死信交换机argument.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyargument.put("x-dead-letter-routing-key", "YD");//创建队列return QueueBuilder.durable(QUEUE_C).withArguments(argument).build();}//死信队列@Bean("queueD")public Queue queuD() {//创建队列return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//绑定队列A@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定队列B@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定队列C@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}//绑定队列D@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}
- 添加生产消息类
package com.rabbitmq9;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public void sendMessage(@PathVariable String message) {log.info("发送消息");rabbitTemplate.convertAndSend("X", "XA", "消息来自10s的ttl:" + message);rabbitTemplate.convertAndSend("X", "XB", "消息来自40s的ttl:" + message);}@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendExpireMsg(@PathVariable String message, @PathVariable String ttlTime) {log.info("发送定时消息");rabbitTemplate.convertAndSend("X", "XC", "消息来自定时消息:" + message, msg -> {//发消息的时候,延迟延长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}
- 添加消费者类
package com.rabbitmq9;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class Consume {//接收消息@RabbitListener(queues = Config.DEAD_LETTER_QUEUE)public void receiveD(Message msg, Channel channel) {String message = new String(msg.getBody());log.info("接收到了延迟队列消息:" + message);}
}
测试1:http://localhost:8090/ttl/sendMsg/hahaha
测试2:http://localhost:8090/ttl/sendExpireMsg/5555/100
测试3:连续发送两个地址。可以看到延迟消息需要排队,没法优先发送延时时间短的,这是个弊端,可以使用插件克服这个问题。
http://localhost:8090/ttl/sendExpireMsg/5555/10000
http://localhost:8090/ttl/sendExpireMsg/558885/1000
十四、代码实现-RabbitMQ插件实现延迟队列
安装教程
- 下载:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0
- 将插件放入:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins下
- 进入目录:cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
- 安装:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启:systemctl restart rabbitmq-server
工作原理:
代码如下
配置类
package com.rabbitmq10;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.CustomAutowireConfigurer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class DelayConfig {//队列public static final String DELAY_QUEUE = "DELAY_QUEUE";//交换机public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";//routingKeypublic static final String DELAY_ROUNTING_KEY = "DELAY_ROUNTING_KEY";//声明交换机@Beanpublic CustomExchange delayEchange() {Map<String, Object> arguments = new HashMap<>();arguments.put("x-delayed-type", "direct");//1.交换机名称//2.交换机类型//3.是否需要持久化//4.是否需要自动删除//5.其他参数return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, arguments);}//声明队列@Beanpublic Queue delayQueue() {//创建队列return new Queue(DELAY_QUEUE);}//绑定队列@Beanpublic Binding delayBindingQueue(@Qualifier("delayQueue") Queue delayQueue, @Qualifier("delayEchange") CustomExchange delayEchange) {return BindingBuilder.bind(delayQueue).to(delayEchange).with("DELAY_ROUNTING_KEY").noargs();}}
消费者
package com.rabbitmq10;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class DelayConsume {//接收消息@RabbitListener(queues = DelayConfig.DELAY_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("接收到了插件延迟队列消息:" + message);}
}
发送消息类
package com.rabbitmq10;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/
@Slf4j
@RestController
@RequestMapping("/delayed")
public class SendDelayMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendExpireMsg/{message}/{ttlTime}")public void sendExpireMsg(@PathVariable String message, @PathVariable Integer ttlTime) {log.info("发送定时消息");rabbitTemplate.convertAndSend(DelayConfig.DELAY_EXCHANGE, DelayConfig.DELAY_ROUNTING_KEY, "消息来自定时消息:" + message, msg -> {//发消息的时候,延迟时长msg.getMessageProperties().setDelay(ttlTime);return msg;});}
}
测试:依次执行以下两个地址,可以发现延迟时间短的会先执行,不需要排队
http://localhost:8090/delayed/sendExpireMsg/10000/10000
http://localhost:8090/delayed/sendExpireMsg/500/500
十五、代码实现-发布确认高级
原理图
配置文件
spring:rabbitmq:host: 192.168.248.10port: 5672username: adminpassword: admin#交换机确认接口publisher-confirms: true#新版本:spring.rabbitmq.publisher-confirm-type=correlated#路由回退消息给生产者publisher-returns: true
发送消息类
package com.rabbitmq11;import com.rabbitmq10.DelayConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.CorrelationDataPostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @author 天真热* @create 2022-02-10 15:03* @desc**/
@Slf4j
@RestController
@RequestMapping("/confirm")
public class SendConfirmMessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsgToBadExchange/{message}")public void sendMsgToBadExchange(@PathVariable String message) {//类,可以在队列接收消息的时候接受CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE + "bad", ConfirmConfig.CONFIRM_ROUNTING_KEY, "消息来自定时消息:" + message, correlationData);log.info("发送消息");}@GetMapping("/sendMsgToBadRounting/{message}")public void sendMsgToBadRounting(@PathVariable String message) {//类,可以在队列接收消息的时候接受CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUNTING_KEY + "bad", "消息来自定时消息:" + message, correlationData);log.info("发送消息");}@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message) {//类,可以在队列接收消息的时候接受CorrelationData correlationData = new CorrelationData("1");//发送消息rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE, ConfirmConfig.CONFIRM_ROUNTING_KEY , "消息来自定时消息:" + message, correlationData);log.info("发送消息");}}
配置类
package com.rabbitmq11;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 发布确认** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class ConfirmConfig {//队列public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";//交换机public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";//routingKeypublic static final String CONFIRM_ROUNTING_KEY = "CONFIRM_ROUNTING_KEY";//备份交换机public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";//备份队列public static final String BACKUP_QUEUE = "BACKUP_QUEUE";//告警队列public static final String WARNING_QUEUE = "WARNING_QUEUE";//声明交换机//这里因为用的是之前的交换机,所以需要删除原先的交换机才能生效@Beanpublic DirectExchange confirmEchange() {return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();}//声明备份交换机@Beanpublic FanoutExchange backupEchange() {return new FanoutExchange(BACKUP_EXCHANGE);}//声明队列@Beanpublic Queue confirmQueue() {//创建队列return new Queue(CONFIRM_QUEUE);}//声明备份队列@Beanpublic Queue backupQueue() {//创建队列return new Queue(BACKUP_QUEUE);}//声明报警队列@Beanpublic Queue warningQueue() {//创建队列return new Queue(WARNING_QUEUE);}//绑定队列@Beanpublic Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmEchange") DirectExchange confirmEchange) {return BindingBuilder.bind(confirmQueue).to(confirmEchange).with(CONFIRM_ROUNTING_KEY);}//绑定备份队列@Beanpublic Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(backupQueue).to(backupEchange);}//绑定报警队列@Beanpublic Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(warningQueue).to(backupEchange);}}
告警配置类
package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;/*** 回调接口** @author 天真热* @create 2022-02-11 15:25* @desc**/
@Component
@Slf4j
public class CallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入@PostConstructprivate void init() {//注入rabbitTemplate.setConfirmCallback(this);rabbitTemplate.setReturnCallback(this);}/*** 交换机回调方法(针对于交换机是否成功接收消息)* 1.发消息 交换机接收到了 回调* 1.1 correlationData 保存回调消息的id及相关信息* 1.2 交换机收到消息 ack=true* 1.3 call null* 2. 发消息 交换机失败了 回调* 2.1 correlationData 保存回调消息的id及相关信息* 2.2 交换机收到消息 ack=false*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {String id = correlationData != null ? correlationData.getId() : "";if (ack) {System.out.println("交换机接受成功了");} else {System.out.println("交换机接受失败了");}}/*** 消息不可达到目的地时,返回给生产者** @param message 消息* @param replayCode 失败码* @param replayText 失败原因* @param exchanges 交换机* @param routingKey 路由*/@Overridepublic void returnedMessage(Message message, int replayCode, String replayText, String exchanges, String routingKey) {System.out.println("队列接受失败了");System.out.println("消息:" + message + ";消息码:" + replayCode + ";原因:" + replayText + ";交换机:" + exchanges + ";路由:" + routingKey);}
}
正常消费者
package com.rabbitmq11;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 发布确认** @author 天真热* @create 2022-02-10 10:09* @desc**/
@Configuration
public class ConfirmConfig {//队列public static final String CONFIRM_QUEUE = "CONFIRM_QUEUE";//交换机public static final String CONFIRM_EXCHANGE = "CONFIRM_EXCHANGE";//routingKeypublic static final String CONFIRM_ROUNTING_KEY = "CONFIRM_ROUNTING_KEY";//备份交换机public static final String BACKUP_EXCHANGE = "BACKUP_EXCHANGE";//备份队列public static final String BACKUP_QUEUE = "BACKUP_QUEUE";//告警队列public static final String WARNING_QUEUE = "WARNING_QUEUE";//声明交换机//这里因为用的是之前的交换机,所以需要删除原先的交换机才能生效@Beanpublic DirectExchange confirmEchange() {return (DirectExchange) ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build();}//声明备份交换机@Beanpublic FanoutExchange backupEchange() {return new FanoutExchange(BACKUP_EXCHANGE);}//声明队列@Beanpublic Queue confirmQueue() {//创建队列return new Queue(CONFIRM_QUEUE);}//声明备份队列@Beanpublic Queue backupQueue() {//创建队列return new Queue(BACKUP_QUEUE);}//声明报警队列@Beanpublic Queue warningQueue() {//创建队列return new Queue(WARNING_QUEUE);}//绑定队列@Beanpublic Binding confirmBindingQueue(@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmEchange") DirectExchange confirmEchange) {return BindingBuilder.bind(confirmQueue).to(confirmEchange).with(CONFIRM_ROUNTING_KEY);}//绑定备份队列@Beanpublic Binding backupBindingQueue(@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(backupQueue).to(backupEchange);}//绑定报警队列@Beanpublic Binding warningBindingQueue(@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupEchange") FanoutExchange backupEchange) {return BindingBuilder.bind(warningQueue).to(backupEchange);}}
告警消费者
package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class WarningConsume {//接收消息@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("报警发现不可路由的消息:" + message);}
}
备份消费者
package com.rabbitmq11;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 队列ttl消费者** @author 天真热* @create 2022-02-10 16:00* @desc**/
@Slf4j
@Component
public class BackupConsume {//接收消息@RabbitListener(queues = ConfirmConfig.BACKUP_QUEUE)public void receiveDelay(Message msg) {String message = new String(msg.getBody());log.info("走的是备份交换机消息:" + message);}
}
测试1-正常访问:http://localhost:8090/confirm/sendMsg/100
测试2-访问错误的路由:http://localhost:8090/confirm/sendMsgToBadExchange/100
测试3-访问错误的rountingKey:http://localhost:8090/confirm/sendMsgToBadRounting/100
十六、RabbitMq的界面Features属性介绍
- D:d 是 durable 的缩写,代表这个队列中的消息支持持久化。
- AD:ad 是 autoDelete 的缩写。代表当前队列的最后一个消费者退订时被自动删除。注意:此时不管队列中是否还存在消息,队列都会删除。
- excl:是 exclusive 的缩写。代表这是一个排他队列。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
- Args:是 arguments 的缩写。代表该队列配置了 arguments 参数。
- TTL:是 x-message-ttl 的缩写。设置队列中的所有消息的生存周期(统一为整个队列的所有消息设置生命周期), 也可以在发布消息的时候单独为某个消息指定剩余生存时间,单位毫秒。
- Exp:Auto Expire,是 x-expires 配置的缩写。当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp。注意这里是删除队列,不是队列中的消息。
- Lim:说明该队列配置了 x-max-length。限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉。
- Lim B:说明队列配置了 x-max-length-bytes。限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小。
- DLX:说明该队列配置了 x-dead-letter-exchange。当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉。
- DLK:x-dead-letter-routing-key 的缩写,将删除的消息推送到指定交换机的指定路由键的队列中去。
- Pri:x-max-priority 的缩写,优先级队列。表明该队列支持优先级,先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费。
- Ovfl:x-overflow 的缩写。队列中的消息溢出时,如何处理这些消息。要么丢弃队列头部的消息,要么拒绝接收后面生产者发送过来的所有消息。有两个配置项:drop-head,代表丢弃队列头部的消息,默认行为;reject-publish 设置队列中的消息溢出后,该队列的行为:”拒绝接收”(所有消息)。
- ha-all:镜像队列。all 表示镜像到集群上的所有节点,ha-params 参数忽略。
如若内容造成侵权/违法违规/事实不符,请联系编程学习网邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
相关文章
- HTML,css实现导航栏下拉菜单
<!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"/> <title></title> <style> * { margin: 0; padding: 0; } body, html { font-size: 14px; font-family: Microsoft Yahei, 微软雅黑, Simsun,…...
2024/4/16 19:40:58 - Kubernetes——集群扩展原理
摘要 Kubernetes 是一个高度开放可扩展的架构,可以通过自定义资源类型(CRD)来定义自己的类型,还可以自己来扩展 API 服务,用户的使用方式跟 Kubernetes 的原生对象无异。 一、使用自定义资源扩展 API 二、使用CRD扩展…...
2024/4/13 4:09:20 - Bubble Sort Graph
题目意思让你找一个最长的升序子序列 #include"bits/stdc.h" #define ll long long #define pi pair<int,int> #define inf 0x3f3f3f3f #define _for(i,a,b) for(int ia;i<b;i) #define for_(i,a,b) for(int ia;i<b;i) #define _fr(i,a,b) for(int ia;…...
2024/4/16 18:16:58 - NLP-生成模型-2017:Transformer中的那些为什么
1、简单描述一下Transformer中的前馈神经网络?使用了什么激活函数?相关优缺点? 前馈神经网络采用了两个线性变换,激活函数为Relu,公式如下: FFN(x)max(0,xW1b1)W2b2FFN(x) max(0, xW_1 b_1) W_2 b_2 FF…...
2024/4/14 3:18:17 - cc.的日常
2022.2.12 reconcile 调和 imminent 即将发生的 rebellion 叛乱 wedge 楔 reinforce 加强 diffuse 扩散 appointment 任命 irony 反讽 approximate 大约的 venue 场地 constituent 组成部分 robust 不妥协的 amplify 放大 eminent 卓越的 coalition 结合体 sal…...
2024/4/13 4:09:20 - Master Fur
1.模型(没毛的有在ArtStation中获取 作者名字Felipe Gomez) 2.刷毛前找图片参考 网站:链接: link. pureref.com 在这里插入图片描述 1.导入模型 tip: 2.分开模型的各个部位 3.转vdb 开始刷毛啦!! 一…...
2024/4/13 4:09:20 - Leetcode160.相交链表(哈希集合/双指针,C++)
*此题借鉴Leetcode官方题解 题目描述: 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点,返回 null 。 图示两个链表在节点 c1 开始相交: 思路1:哈希集…...
2024/4/13 4:09:35 - MySQL-JOIN关键字 整理总结
目录 1、inner join、left join、reght join、full join(full outer join) 2、union、union all 3、cross join 交叉连接 1、inner join、left join、reght join、full join(full outer join) 注意:oracle里面有full join,但是在mysql中没有full join。可以使…...
2024/4/13 4:09:30 - 1094 谷歌的招聘 (20 分)
2004 年 7 月,谷歌在硅谷的 101 号公路边竖立了一块巨大的广告牌(如下图)用于招聘。内容超级简单,就是一个以 .com 结尾的网址,而前面的网址是一个 10 位素数,这个素数是自然常数 e 中最早出现的 10 位连续…...
2024/4/6 7:48:21 - AI工程部署(三):K8S【Kubernetes;Docker集群管理工具】【作用:容器管理、服务编排、资源调度、集群管理】
手动docker run创建容器只是个toy,在企业里,往往会使用k8s平台管理容器; k8s是一个容器管理、服务编排、资源调度、集群管理的工具,由谷歌开源,在国内已成为主流; 我们开发的服务,最终都将运行…...
2024/4/17 14:39:40 - JavaScript变量和数据类型总结
📜个人简介 ⭐️个人主页:微风洋洋🙋♂️ 🍑博客领域:编程基础💡,后端💡,大数据,信息安全 🍅写作风格:干货,干货,还是tmd的干货 🌸精选专栏:【J…...
2024/4/13 4:09:05 - CSS基础知识一
1. CSS语法规范 CSS规则由两个主要的部分构成:选择器以及一条或多条声明。 • 选择器是用于指定CSS样式的HTML标签,花括号内是对该对象设置的具体样式 • 属性和属性值以"键值对"的形式出现 • 属性是对指定的对象设置的样式属性ÿ…...
2024/4/8 1:43:51 - 【集合】集合和数组的不同点, Collection,集合的遍历,利用数组遍历,迭代器遍历,list接口(继承自Collection接口),List相关集合特有的功能
集合 一.集合和数组的不同点: 1、数组的长度是不可变的,集合的长度是可变的 2、数组可以存放同一种基本数据类型或者引用数据类型的元素 而集合只能存放引用数据类型,并且集合中可以存放法不同数据类型的元素 (注意:这…...
2024/4/14 1:33:18 - Games101笔记——Lecture 14: Ray Tracing 2(Acceleration Radiometry)
Lecture 14: Ray Tracing 2(Acceleration & Radiometry) 本讲内容: Using AABBs to accelerate ray tracing uniform gridspatial partitions Basic radiometry(辐射度量学) Uniform Spatial Partitions (Grids) Preprocess - Buil…...
2024/4/18 6:15:55 - clickhouse部署
从clickhouse官网下载二进制安装包 解压安装 tar -xzvf clickhouse-common-static-dbg-21.9.4.35.tgz tar -xzvf ... sudo clickhouse-common-static-21.9.4.35/install/doinst.sh sudo clickhouse-client-21.9.4.35/install/doinst.sh sudo clickhouse-common-static-dbg-21…...
2024/4/18 22:18:22 - 八股文面经之Mybatis笔记
最近出去面试,在简历中写了些关于Mybatis的技术点,于是面试官就开始对我不断询问,本文特意记录下面试中遇到的一些问题。 说说什么是Mybatis Mybatis是一款对于Sql进行了一定封装的持久化sql框架,将常用的crud接口进行了一定的封…...
2024/4/14 18:01:06 - 13、概述篇
文章目录第1章、大厂面试题第2章、背景说明1、生产环境中遇到的问题2、为什么要调优3、不同阶段的考虑第3章、调优概述1、监控的依据2、调优的大方向第4章、性能优化的步骤1、第1步:性能监控【发现问题】2、第2步:性能分析【排查问题】3、第3步ÿ…...
2024/4/13 4:09:30 - 了解数组(#)
数组 (以下内容部分为原创,另加入狂神说、黑马程序员等素材仅供个人学习使用) 定义 一组相同类型数据的集合称为数组,数组中的每个数据成为元素 声明创建数组 模板: public static void main(String[] args) {in…...
2024/4/17 1:12:25 - 基于Redis6.0 部署迷你版本消息队列实战
技术研究背景 由于目前的研发团队处于公司初创阶段,尚未有能成熟的运维体系,对于市面上常见的成熟MQ搭建维护能力不足,但是又希望能有一款轻量级的消息系统供研发团队的成员使用,因此开展了对该方面相关的技术调研工作。 通过相…...
2024/4/13 4:09:05 - 赛联区块链培训:Web3的核心要素——区块链、加密资产、智能合约和预言机
在2008年,中本聪发布了比特币白皮书,彻底颠覆了我们对数字化交易的概念,并首次提出了一种无需可信中间方的安全在线交易模式。中本聪写道:“需要基于加密证明,而非信任,来建立电子支付系统。” 智能合约被…...
2024/4/18 15:40:20
最新文章
- 大型网站系统架构演化实例_7.使用NoSQL和搜索引擎
1.使用NoSQL和搜索引擎 随着网站业务越来越复杂,对数据存储和检索的需求也越来越复杂,网站需要采用一些非关系数据库技术如NoSQL和非数据库查询技术如搜索引擎。NoSQL和搜索引擎都是源自互联网的技术手段,对可伸缩的分布式特性具有更好的支持…...
2024/4/20 10:18:51 - 梯度消失和梯度爆炸的一些处理方法
在这里是记录一下梯度消失或梯度爆炸的一些处理技巧。全当学习总结了如有错误还请留言,在此感激不尽。 权重和梯度的更新公式如下: w w − η ⋅ ∇ w w w - \eta \cdot \nabla w ww−η⋅∇w 个人通俗的理解梯度消失就是网络模型在反向求导的时候出…...
2024/3/20 10:50:27 - 【嵌入式开发 Linux 常用命令系列 4.3 -- git add 不 add untracked file】
请阅读【嵌入式开发学习必备专栏 】 文章目录 git add 不add untracked file git add 不add untracked file 如果你想要Git在执行git add .时不添加未跟踪的文件(untracked files),你可以使用以下命令: git add -u这个命令只会加…...
2024/4/19 7:49:41 - 整理的微信小程序日历(单选/多选/筛选)
一、日历横向多选,支持单日、双日、三日、工作日等选择 效果图 wxml文件 <view class"calendar"><view class"section"><view class"title flex-box"><button bindtap"past">上一页</button&…...
2024/4/17 10:06:33 - 【外汇早评】美通胀数据走低,美元调整
原标题:【外汇早评】美通胀数据走低,美元调整昨日美国方面公布了新一期的核心PCE物价指数数据,同比增长1.6%,低于前值和预期值的1.7%,距离美联储的通胀目标2%继续走低,通胀压力较低,且此前美国一季度GDP初值中的消费部分下滑明显,因此市场对美联储后续更可能降息的政策…...
2024/4/19 14:24:02 - 【原油贵金属周评】原油多头拥挤,价格调整
原标题:【原油贵金属周评】原油多头拥挤,价格调整本周国际劳动节,我们喜迎四天假期,但是整个金融市场确实流动性充沛,大事频发,各个商品波动剧烈。美国方面,在本周四凌晨公布5月份的利率决议和新闻发布会,维持联邦基金利率在2.25%-2.50%不变,符合市场预期。同时美联储…...
2024/4/19 18:20:22 - 【外汇周评】靓丽非农不及疲软通胀影响
原标题:【外汇周评】靓丽非农不及疲软通胀影响在刚结束的周五,美国方面公布了新一期的非农就业数据,大幅好于前值和预期,新增就业重新回到20万以上。具体数据: 美国4月非农就业人口变动 26.3万人,预期 19万人,前值 19.6万人。 美国4月失业率 3.6%,预期 3.8%,前值 3…...
2024/4/19 11:57:31 - 【原油贵金属早评】库存继续增加,油价收跌
原标题:【原油贵金属早评】库存继续增加,油价收跌周三清晨公布美国当周API原油库存数据,上周原油库存增加281万桶至4.692亿桶,增幅超过预期的74.4万桶。且有消息人士称,沙特阿美据悉将于6月向亚洲炼油厂额外出售更多原油,印度炼油商预计将每日获得至多20万桶的额外原油供…...
2024/4/19 11:57:31 - 【外汇早评】日本央行会议纪要不改日元强势
原标题:【外汇早评】日本央行会议纪要不改日元强势近两日日元大幅走强与近期市场风险情绪上升,避险资金回流日元有关,也与前一段时间的美日贸易谈判给日本缓冲期,日本方面对汇率问题也避免继续贬值有关。虽然今日早间日本央行公布的利率会议纪要仍然是支持宽松政策,但这符…...
2024/4/19 11:57:52 - 【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响
原标题:【原油贵金属早评】欧佩克稳定市场,填补伊朗问题的影响近日伊朗局势升温,导致市场担忧影响原油供给,油价试图反弹。此时OPEC表态稳定市场。据消息人士透露,沙特6月石油出口料将低于700万桶/日,沙特已经收到石油消费国提出的6月份扩大出口的“适度要求”,沙特将满…...
2024/4/19 11:57:53 - 【外汇早评】美欲与伊朗重谈协议
原标题:【外汇早评】美欲与伊朗重谈协议美国对伊朗的制裁遭到伊朗的抗议,昨日伊朗方面提出将部分退出伊核协议。而此行为又遭到欧洲方面对伊朗的谴责和警告,伊朗外长昨日回应称,欧洲国家履行它们的义务,伊核协议就能保证存续。据传闻伊朗的导弹已经对准了以色列和美国的航…...
2024/4/19 11:58:14 - 【原油贵金属早评】波动率飙升,市场情绪动荡
原标题:【原油贵金属早评】波动率飙升,市场情绪动荡因中美贸易谈判不安情绪影响,金融市场各资产品种出现明显的波动。随着美国与中方开启第十一轮谈判之际,美国按照既定计划向中国2000亿商品征收25%的关税,市场情绪有所平复,已经开始接受这一事实。虽然波动率-恐慌指数VI…...
2024/4/19 11:58:20 - 【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试
原标题:【原油贵金属周评】伊朗局势升温,黄金多头跃跃欲试美国和伊朗的局势继续升温,市场风险情绪上升,避险黄金有向上突破阻力的迹象。原油方面稍显平稳,近期美国和OPEC加大供给及市场需求回落的影响,伊朗局势并未推升油价走强。近期中美贸易谈判摩擦再度升级,美国对中…...
2024/4/20 7:40:48 - 【原油贵金属早评】市场情绪继续恶化,黄金上破
原标题:【原油贵金属早评】市场情绪继续恶化,黄金上破周初中国针对于美国加征关税的进行的反制措施引发市场情绪的大幅波动,人民币汇率出现大幅的贬值动能,金融市场受到非常明显的冲击。尤其是波动率起来之后,对于股市的表现尤其不安。隔夜美国股市出现明显的下行走势,这…...
2024/4/19 11:58:39 - 【外汇早评】美伊僵持,风险情绪继续升温
原标题:【外汇早评】美伊僵持,风险情绪继续升温昨日沙特两艘油轮再次发生爆炸事件,导致波斯湾局势进一步恶化,市场担忧美伊可能会出现摩擦生火,避险品种获得支撑,黄金和日元大幅走强。美指受中美贸易问题影响而在低位震荡。继5月12日,四艘商船在阿联酋领海附近的阿曼湾、…...
2024/4/19 11:58:51 - 【原油贵金属早评】贸易冲突导致需求低迷,油价弱势
原标题:【原油贵金属早评】贸易冲突导致需求低迷,油价弱势近日虽然伊朗局势升温,中东地区几起油船被袭击事件影响,但油价并未走高,而是出于调整结构中。由于市场预期局势失控的可能性较低,而中美贸易问题导致的全球经济衰退风险更大,需求会持续低迷,因此油价调整压力较…...
2024/4/20 3:12:02 - 氧生福地 玩美北湖(上)——为时光守候两千年
原标题:氧生福地 玩美北湖(上)——为时光守候两千年一次说走就走的旅行,只有一张高铁票的距离~ 所以,湖南郴州,我来了~ 从广州南站出发,一个半小时就到达郴州西站了。在动车上,同时改票的南风兄和我居然被分到了一个车厢,所以一路非常愉快地聊了过来。 挺好,最起…...
2024/4/19 11:59:15 - 氧生福地 玩美北湖(中)——永春梯田里的美与鲜
原标题:氧生福地 玩美北湖(中)——永春梯田里的美与鲜一觉醒来,因为大家太爱“美”照,在柳毅山庄去寻找龙女而错过了早餐时间。近十点,向导坏坏还是带着饥肠辘辘的我们去吃郴州最富有盛名的“鱼头粉”。说这是“十二分推荐”,到郴州必吃的美食之一。 哇塞!那个味美香甜…...
2024/4/19 11:59:23 - 氧生福地 玩美北湖(下)——奔跑吧骚年!
原标题:氧生福地 玩美北湖(下)——奔跑吧骚年!让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 让我们红尘做伴 活得潇潇洒洒 策马奔腾共享人世繁华 对酒当歌唱出心中喜悦 轰轰烈烈把握青春年华 啊……啊……啊 两…...
2024/4/19 11:59:44 - 扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!
原标题:扒开伪装医用面膜,翻六倍价格宰客,小姐姐注意了!扒开伪装医用面膜,翻六倍价格宰客!当行业里的某一品项火爆了,就会有很多商家蹭热度,装逼忽悠,最近火爆朋友圈的医用面膜,被沾上了污点,到底怎么回事呢? “比普通面膜安全、效果好!痘痘、痘印、敏感肌都能用…...
2024/4/19 11:59:48 - 「发现」铁皮石斛仙草之神奇功效用于医用面膜
原标题:「发现」铁皮石斛仙草之神奇功效用于医用面膜丽彦妆铁皮石斛医用面膜|石斛多糖无菌修护补水贴19大优势: 1、铁皮石斛:自唐宋以来,一直被列为皇室贡品,铁皮石斛生于海拔1600米的悬崖峭壁之上,繁殖力差,产量极低,所以古代仅供皇室、贵族享用 2、铁皮石斛自古民间…...
2024/4/19 12:00:06 - 丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者
原标题:丽彦妆\医用面膜\冷敷贴轻奢医学护肤引导者【公司简介】 广州华彬企业隶属香港华彬集团有限公司,专注美业21年,其旗下品牌: 「圣茵美」私密荷尔蒙抗衰,产后修复 「圣仪轩」私密荷尔蒙抗衰,产后修复 「花茵莳」私密荷尔蒙抗衰,产后修复 「丽彦妆」专注医学护…...
2024/4/19 16:57:22 - 广州械字号面膜生产厂家OEM/ODM4项须知!
原标题:广州械字号面膜生产厂家OEM/ODM4项须知!广州械字号面膜生产厂家OEM/ODM流程及注意事项解读: 械字号医用面膜,其实在我国并没有严格的定义,通常我们说的医美面膜指的应该是一种「医用敷料」,也就是说,医用面膜其实算作「医疗器械」的一种,又称「医用冷敷贴」。 …...
2024/4/19 12:00:25 - 械字号医用眼膜缓解用眼过度到底有无作用?
原标题:械字号医用眼膜缓解用眼过度到底有无作用?医用眼膜/械字号眼膜/医用冷敷眼贴 凝胶层为亲水高分子材料,含70%以上的水分。体表皮肤温度传导到本产品的凝胶层,热量被凝胶内水分子吸收,通过水分的蒸发带走大量的热量,可迅速地降低体表皮肤局部温度,减轻局部皮肤的灼…...
2024/4/19 12:00:40 - 配置失败还原请勿关闭计算机,电脑开机屏幕上面显示,配置失败还原更改 请勿关闭计算机 开不了机 这个问题怎么办...
解析如下:1、长按电脑电源键直至关机,然后再按一次电源健重启电脑,按F8健进入安全模式2、安全模式下进入Windows系统桌面后,按住“winR”打开运行窗口,输入“services.msc”打开服务设置3、在服务界面,选中…...
2022/11/19 21:17:18 - 错误使用 reshape要执行 RESHAPE,请勿更改元素数目。
%读入6幅图像(每一幅图像的大小是564*564) f1 imread(WashingtonDC_Band1_564.tif); subplot(3,2,1),imshow(f1); f2 imread(WashingtonDC_Band2_564.tif); subplot(3,2,2),imshow(f2); f3 imread(WashingtonDC_Band3_564.tif); subplot(3,2,3),imsho…...
2022/11/19 21:17:16 - 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机...
win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”问题的解决方法在win7系统关机时如果有升级系统的或者其他需要会直接进入一个 等待界面,在等待界面中我们需要等待操作结束才能关机,虽然这比较麻烦,但是对系统进行配置和升级…...
2022/11/19 21:17:15 - 台式电脑显示配置100%请勿关闭计算机,“准备配置windows 请勿关闭计算机”的解决方法...
有不少用户在重装Win7系统或更新系统后会遇到“准备配置windows,请勿关闭计算机”的提示,要过很久才能进入系统,有的用户甚至几个小时也无法进入,下面就教大家这个问题的解决方法。第一种方法:我们首先在左下角的“开始…...
2022/11/19 21:17:14 - win7 正在配置 请勿关闭计算机,怎么办Win7开机显示正在配置Windows Update请勿关机...
置信有很多用户都跟小编一样遇到过这样的问题,电脑时发现开机屏幕显现“正在配置Windows Update,请勿关机”(如下图所示),而且还需求等大约5分钟才干进入系统。这是怎样回事呢?一切都是正常操作的,为什么开时机呈现“正…...
2022/11/19 21:17:13 - 准备配置windows 请勿关闭计算机 蓝屏,Win7开机总是出现提示“配置Windows请勿关机”...
Win7系统开机启动时总是出现“配置Windows请勿关机”的提示,没过几秒后电脑自动重启,每次开机都这样无法进入系统,此时碰到这种现象的用户就可以使用以下5种方法解决问题。方法一:开机按下F8,在出现的Windows高级启动选…...
2022/11/19 21:17:12 - 准备windows请勿关闭计算机要多久,windows10系统提示正在准备windows请勿关闭计算机怎么办...
有不少windows10系统用户反映说碰到这样一个情况,就是电脑提示正在准备windows请勿关闭计算机,碰到这样的问题该怎么解决呢,现在小编就给大家分享一下windows10系统提示正在准备windows请勿关闭计算机的具体第一种方法:1、2、依次…...
2022/11/19 21:17:11 - 配置 已完成 请勿关闭计算机,win7系统关机提示“配置Windows Update已完成30%请勿关闭计算机”的解决方法...
今天和大家分享一下win7系统重装了Win7旗舰版系统后,每次关机的时候桌面上都会显示一个“配置Windows Update的界面,提示请勿关闭计算机”,每次停留好几分钟才能正常关机,导致什么情况引起的呢?出现配置Windows Update…...
2022/11/19 21:17:10 - 电脑桌面一直是清理请关闭计算机,windows7一直卡在清理 请勿关闭计算机-win7清理请勿关机,win7配置更新35%不动...
只能是等着,别无他法。说是卡着如果你看硬盘灯应该在读写。如果从 Win 10 无法正常回滚,只能是考虑备份数据后重装系统了。解决来方案一:管理员运行cmd:net stop WuAuServcd %windir%ren SoftwareDistribution SDoldnet start WuA…...
2022/11/19 21:17:09 - 计算机配置更新不起,电脑提示“配置Windows Update请勿关闭计算机”怎么办?
原标题:电脑提示“配置Windows Update请勿关闭计算机”怎么办?win7系统中在开机与关闭的时候总是显示“配置windows update请勿关闭计算机”相信有不少朋友都曾遇到过一次两次还能忍但经常遇到就叫人感到心烦了遇到这种问题怎么办呢?一般的方…...
2022/11/19 21:17:08 - 计算机正在配置无法关机,关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机...
关机提示 windows7 正在配置windows 请勿关闭计算机 ,然后等了一晚上也没有关掉。现在电脑无法正常关机以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!关机提示 windows7 正在配…...
2022/11/19 21:17:05 - 钉钉提示请勿通过开发者调试模式_钉钉请勿通过开发者调试模式是真的吗好不好用...
钉钉请勿通过开发者调试模式是真的吗好不好用 更新时间:2020-04-20 22:24:19 浏览次数:729次 区域: 南阳 > 卧龙 列举网提醒您:为保障您的权益,请不要提前支付任何费用! 虚拟位置外设器!!轨迹模拟&虚拟位置外设神器 专业用于:钉钉,外勤365,红圈通,企业微信和…...
2022/11/19 21:17:05 - 配置失败还原请勿关闭计算机怎么办,win7系统出现“配置windows update失败 还原更改 请勿关闭计算机”,长时间没反应,无法进入系统的解决方案...
前几天班里有位学生电脑(windows 7系统)出问题了,具体表现是开机时一直停留在“配置windows update失败 还原更改 请勿关闭计算机”这个界面,长时间没反应,无法进入系统。这个问题原来帮其他同学也解决过,网上搜了不少资料&#x…...
2022/11/19 21:17:04 - 一个电脑无法关闭计算机你应该怎么办,电脑显示“清理请勿关闭计算机”怎么办?...
本文为你提供了3个有效解决电脑显示“清理请勿关闭计算机”问题的方法,并在最后教给你1种保护系统安全的好方法,一起来看看!电脑出现“清理请勿关闭计算机”在Windows 7(SP1)和Windows Server 2008 R2 SP1中,添加了1个新功能在“磁…...
2022/11/19 21:17:03 - 请勿关闭计算机还原更改要多久,电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机怎么办...
许多用户在长期不使用电脑的时候,开启电脑发现电脑显示:配置windows更新失败,正在还原更改,请勿关闭计算机。。.这要怎么办呢?下面小编就带着大家一起看看吧!如果能够正常进入系统,建议您暂时移…...
2022/11/19 21:17:02 - 还原更改请勿关闭计算机 要多久,配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以...
配置windows update失败 还原更改 请勿关闭计算机,电脑开机后一直显示以以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!配置windows update失败 还原更改 请勿关闭计算机&#x…...
2022/11/19 21:17:01 - 电脑配置中请勿关闭计算机怎么办,准备配置windows请勿关闭计算机一直显示怎么办【图解】...
不知道大家有没有遇到过这样的一个问题,就是我们的win7系统在关机的时候,总是喜欢显示“准备配置windows,请勿关机”这样的一个页面,没有什么大碍,但是如果一直等着的话就要两个小时甚至更久都关不了机,非常…...
2022/11/19 21:17:00 - 正在准备配置请勿关闭计算机,正在准备配置windows请勿关闭计算机时间长了解决教程...
当电脑出现正在准备配置windows请勿关闭计算机时,一般是您正对windows进行升级,但是这个要是长时间没有反应,我们不能再傻等下去了。可能是电脑出了别的问题了,来看看教程的说法。正在准备配置windows请勿关闭计算机时间长了方法一…...
2022/11/19 21:16:59 - 配置失败还原请勿关闭计算机,配置Windows Update失败,还原更改请勿关闭计算机...
我们使用电脑的过程中有时会遇到这种情况,当我们打开电脑之后,发现一直停留在一个界面:“配置Windows Update失败,还原更改请勿关闭计算机”,等了许久还是无法进入系统。如果我们遇到此类问题应该如何解决呢࿰…...
2022/11/19 21:16:58 - 如何在iPhone上关闭“请勿打扰”
Apple’s “Do Not Disturb While Driving” is a potentially lifesaving iPhone feature, but it doesn’t always turn on automatically at the appropriate time. For example, you might be a passenger in a moving car, but your iPhone may think you’re the one dri…...
2022/11/19 21:16:57