RabbitMQ简介和交换机入门使用

  • 时间:
  • 来源:互联网
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_42402854/article/details/103081576

一、RabbitMQ简介

1、什么是MQ

      消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。其主要用途:不同进程Process/线程Thread之间通信。

1)为什么会产生消息队列?有几个原因:

      不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

      不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

     MQ框架非常之多,比较流行的有RabbitMQ、ActiveMQ、ZeroMQ、kafka,以及阿里开源的RocketMQ等。本文介绍RabbitMQ。 了解更多消息中间件推荐文章:https://www.cnblogs.com/huojg-21442/p/7601380.html

2、什么是RabbitMQ

     RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。 -- 百度百科

      简单来说,RabbitMQ就是一个开源的消息代理和队列服务器,并且是基于AMQP协议的,是AMQP的一个实现,支持多种客户端。

1)为什么使用RabbitMQ?

开源的消息中间件

可以跨平台,跨语言。数据的生成和消费可以是不同的语言。

提供可靠性消息投递模式,返回模式,在易用性、扩展性、高可用性等方面表现不俗。

与springAMQP完美的整合 集群模式丰富,表达式配置,HA模式,镜像队列模型。

2)RabbitMQ一些基础概念

Server:又称Broker,接受客户端的连接,实现AMQP实体服务。简单来说就是消息队列服务器实体。

Connection:连接,应用程序跟Broker的网络连接。

Channel:消息通道/网络信道,几乎所有的操作都是在channel中进行。数据的流转都要在channel上进行。channel是进行消息读写的通道。在客户端的每个连接里,可以建立多个channel,每个channel代表一个会话任务。

Message:消息,服务器与应用程序之间传送的数据,由Properties和body组成。Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性。body则就是消息体的内容。

Virtual host:虚拟主机/虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个broker里可以开设多个vhost,用作不同用户的权限分离。一个虚拟地址里面可以有多个交换机 exchange和消息队列message queue。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列中。接收消息,根据路由机转发消息到绑定的队列。

Binding:绑定,交换机和队列之间的虚拟链接,绑定中可以包含routing key。它的作用就是把exchange和queue按照路由规则绑定起来。

Routing Key:路由关键字,可以用它来确定如何路由一个特定消息,exchange根据这个关键字进行消息投递。

Queue:消息队列载体,保存消息并将它们转发给消费者,每个消息都会被投入到一个或多个队列。

Producer:消息生产者,就是投递消息的程序。

Consumer:消息消费者,就是接受消息的程序。

3)消息队列的使用过程大概如下:

      生产者把消息交给服务器,服务器里面有虚拟主机,主机里面有AMQP的核心exchange交换机。生产者需要有服务器的ip和端口号,找到服务器,服务器需要把消息投递到哪个虚拟主机上。

      接下来,虚拟主机把消息交给交换机,交换机接收到消息后会根据路由规则找到指定的消息队列 ,所以生产者生产消息时需要指定消息的routing key。交换机会把消息交给消息队列。到此,消息的生产者的任务就做完了。

      消费者可以监听消息队列,由于交换机和消息队列会进行绑定,消费者会监听消息队列message queue,RabbitMQ会把消息队列交给消费者。

    

3、开发语言:Erlang – 面向并发的编程语言。

     

4、什么是AMQP协议

      AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,

      AMQP是具有现代特征的二进制协议。是一个提供统一消息服务的的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计,是一个规范。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

      消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

      在 AMQP 模型中,消息的 producer 将 Message 发送给 Exchange,Exchange 负责交换 / 路由,将消息正确地转发给相应的 Queue。消息的 Consumer 从 Queue 中读取消息。

     

 

二、Java入门使用

    QueueingConsumer在Rabbitmq客户端4.x版本就被标记为@Deprecated:

    参考文章:为什么 QueueingConsumer 会被 Deprecated ?

    RabbitMQ Java客户端使用 com.rabbitmq.client 作为其顶级软件包。关键的类和接口是:

    

com.rabbitmq.client.Connection接口:

      

    com.rabbitmq.client.Channel接口挺重要,包含了很多消息读写的重载方法,具体看API

    对于类和接口中的方法参考官方API:RabbitMQ Java Client 5.7.3 API

 1、使用默认的交换机

   

1)消息生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();

        //4.发布消息
        String routingKey = "test1";
        String msg="hello rabbitmq consumer message";
        for (int i = 0; i < 2; i++) {
            /**
             参数:
                 exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
                 routingKey -路由键
                 mandatory -如果要设置“强制性”标志,则为true
                 props -消息的其他属性-路由标头等
                 body -消息正文
             */
            channel.basicPublish("", routingKey, false, null, msg.getBytes());
        }
        //5.释放资源
        channel.close();
        connection.close();
    }

}

2)消息消费者

import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();

        //4. 创建消息队列
        String queueName = "test1";
        channel.queueDeclare(queueName, true, false, false, null);


        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理
        /**
         参数:
             queue -队列名称
             autoAck-如果服务器应考虑一旦传递已确认的消息,则为true;如果服务器应该期望显式确认,则返回false
             callback -消费者对象的接口
         */
        boolean autoAck = false;
        channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
            /**
             参数:
                 consumerTag-与消费者相关联的消费者标签
                 envelope -消息的打包数据
                 properties -消息的内容头数据
                 body -消息正文(客户端特定的不透明字节数组)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, 
                                       byte[] body) throws IOException {
                System.out.println("------------consumer message-----------");
                System.out.println("sonsumerTag:" + consumerTag);
                System.out.println("envelope:" + envelope);
                System.out.println("properties:" + properties);
                System.out.println("msg:" + new String(body));
            }
        });
    }
}

三、exchange交换机机制

1、什么是交换机

      rabbitmq的message model实际上消息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知道消息发送到那个队列中去。因此,当exchange收到message时,必须准确知道该如何分发。是append到一定规则的queue,还是append到多个queue中,还是被丢弃?这些规则都是通过exchagne的4种type去定义的。

      exchange是一个消息的agent,每一个虚拟的host中都有定义。它的职责是把message路由到不同的queue中。

2、交换器分类

     RabbitMQ的Exchange(交换器)分为四类:direct(默认)、headers、fanout、topic

     其中headers交换器允许你匹配AMQP消息的header而非路由键,除此之外headers交换器和direct交换器完全一致,但性能却很差,几乎用不到,忽略。

3、Direct Exchange交换机

      直连型交换机,也非常的简单,所有发送到Direct Exchange交换机的消息被转发到 RouteKey中指定的Queue,消息携带的路由键与队列名要完全匹配。

      

1)消息生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();

        //4.发布消息
        String exchangeName="test_direct_exchange";
        String routingKey = "test.direct";
        String msg="hello rabbitmq consumer message - test_direct";
        for (int i = 0; i < 2; i++) {
            /**
             参数:
                 exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
                 routingKey -路由键
                 mandatory -如果要设置“强制性”标志,则为true
                 props -消息的其他属性-路由标头等
                 body -消息正文
             */
            channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
        }
        //5.释放资源
        channel.close();
        connection.close();
    }

}

2)消息消费者

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();


        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
        String queueName = "test_queue";
        String exchangeName="test_direct_exchange";
        String exchangeType="direct";
        String routingKey="test.direct";
        /**
         参数:
             queue -队列名称
             durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
             exclusive -如果我们声明一个排他队列,则为true(仅限此连接)
             autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)
             arguments -队列的其他属性(构造参数)
         */
        channel.queueDeclare(queueName, true, false, false, null);
        /**
         参数:
             exchange -交易所名称
             type -交易所类型
             durable -如果我们声明持久交换,则为true(该交换将在服务器重启后保留下来)
             autoDelete -如果服务器在不再使用交换机时应删除该交换机,则为true
             arguments -用于交换的其他属性(构造参数)
         */
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        /**
         参数:
             queue -队列名称
             exchange -交易所名称
             routingKey -用于绑定的路由键
         */
        channel.queueBind(queueName, exchangeName, routingKey);

        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理
        /**
         参数:
             queue -队列名称
             autoAck-如果服务器应考虑一旦传递已确认的消息,则为true;如果服务器应该期望显式确认,则返回false
             callback -消费者对象的接口
         */
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
            /**
             参数:
                 consumerTag-与消费者相关联的消费者标签
                 envelope -消息的打包数据
                 properties -消息的内容头数据
                 body -消息正文(客户端特定的不透明字节数组)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("------------consumer message-----------");
                System.out.println("sonsumerTag:" + consumerTag);
                System.out.println("envelope:" + envelope);
                System.out.println("properties:" + properties);
                System.out.println("msg:" + new String(body));
            }
        });
    }
}

     

4、Topic Exchange交换机

      主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。消息携带的路由键与队列名属于模糊匹配。

       

简单地介绍下规则:

    *  (星号) 用来表示一个单词 (必须出现的)

    #  (井号) 用来表示任意数量(零个或多个)单词

1)消息生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();

        //4.发布消息
        String exchangeName="test_topic_exchange";
        String routingKey = "test.topic";
        String msg="hello rabbitmq consumer message - test_topic_exchange";
        for (int i = 0; i < 2; i++) {
            /**
             参数:
                 exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
                 routingKey -路由键
                 mandatory -如果要设置“强制性”标志,则为true
                 props -消息的其他属性-路由标头等
                 body -消息正文
             */
            channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
        }
        //5.释放资源
        channel.close();
        connection.close();
    }

}

2)消息消费者

import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();


        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
        String queueName = "test_queue";
        String queueName2 = "test_queue2";
        String exchangeName ="test_topic_exchange";
        String exchangeType = "topic";
        String routingKey = "test.#";
        String routingKey2 = "*.topic";
        /**
         参数:
             queue -队列名称
             durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
             exclusive -如果我们声明一个排他队列,则为true(仅限此连接)
             autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)
             arguments -队列的其他属性(构造参数)
         */
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);
        /**
         参数:
             exchange -交易所名称
             type -交易所类型
             durable -如果我们声明持久交换,则为true(该交换将在服务器重启后保留下来)
             autoDelete -如果服务器在不再使用交换机时应删除该交换机,则为true
             arguments -用于交换的其他属性(构造参数)
         */
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        /**
         参数:
             queue -队列名称
             exchange -交易所名称
             routingKey -用于绑定的路由键
         */
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.queueBind(queueName2, exchangeName, routingKey2);

        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理
        /**
         参数:
             queue -队列名称
             autoAck-如果服务器应考虑一旦传递已确认的消息,则为true;如果服务器应该期望显式确认,则返回false
             callback -消费者对象的接口
         */
        boolean autoAck = false;
        channel.basicConsume(queueName, autoAck, new DefaultConsumer(channel) {
            /**
             参数:
                 consumerTag-与消费者相关联的消费者标签
                 envelope -消息的打包数据
                 properties -消息的内容头数据
                 body -消息正文(客户端特定的不透明字节数组)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("------------consumer message-----------");
                System.out.println("sonsumerTag:" + consumerTag);
                System.out.println("envelope:" + envelope);
                System.out.println("properties:" + properties);
                System.out.println("msg:" + new String(body));
            }
        });

    }
}

   创建了两个队列,这是只对一个队列的消息进行了消费

    

  

5、Fanout Exchange交换机

     扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的,不处理的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

     

1)消息生产者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();

        //4.发布消息
        String exchangeName="test_fanout_exchange";
        String routingKey = "";
        String msg="hello rabbitmq consumer message - test_fanout_exchange";
        for (int i = 0; i < 2; i++) {
            /**
             参数:
                 exchange -将消息发布到的交换机, 若为空字符串时,使用默认的交换机
                 routingKey -路由键
                 mandatory -如果要设置“强制性”标志,则为true
                 props -消息的其他属性-路由标头等
                 body -消息正文
             */
            channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes());
        }
        //5.释放资源
        channel.close();
        connection.close();
    }

}

2)消息消费者

import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建工厂类
        ConnectionFactory factory = new ConnectionFactory();
        factory.setPort(5672);
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        //默认情况下为“ guest” /“ guest”,仅限本地主机连接
        factory.setUsername("guest");
        factory.setPassword("guest");

        //2.通过工厂创建connection
        Connection connection = factory.newConnection();
        //3.创建channel对象
        Channel channel = connection.createChannel();


        //4. 创建消息队列和direct交换机,并通过channel让交换机跟消息队列进行绑定
        String queueName = "test_queue";
        String queueName2 = "test_queue2";
        String exchangeName ="test_fanout_exchange";
        String exchangeType = "fanout";
        String routingKey = "test.#"; // 随便写,扇形交换机不处理路右键
        String routingKey2 = "1*";
        /**
         参数:
             queue -队列名称
             durable -如果我们声明一个持久队列,则为true(该队列将在服务器重启后保留下来)
             exclusive -如果我们声明一个排他队列,则为true(仅限此连接)
             autoDelete -如果我们声明一个自动删除队列,则为true(服务器将在不再使用它时将其删除)
             arguments -队列的其他属性(构造参数)
         */
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueDeclare(queueName2, true, false, false, null);
        /**
         参数:
             exchange -交易所名称
             type -交易所类型
             durable -如果我们声明持久交换,则为true(该交换将在服务器重启后保留下来)
             autoDelete -如果服务器在不再使用交换机时应删除该交换机,则为true
             arguments -用于交换的其他属性(构造参数)
         */
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        /**
         参数:
             queue -队列名称
             exchange -交易所名称
             routingKey -用于绑定的路由键
         */
        channel.queueBind(queueName, exchangeName, routingKey);
        channel.queueBind(queueName2, exchangeName, routingKey2);

        //5.通过channel把消费者和消息队列进行关联,获取消息进行处理
        /**
         参数:
             queue -队列名称
             autoAck-如果服务器应考虑一旦传递已确认的消息,则为true;如果服务器应该期望显式确认,则返回false
             callback -消费者对象的接口
         */
        boolean autoAck = false;
        channel.basicConsume(queueName2, autoAck, new DefaultConsumer(channel) {
            /**
             参数:
                 consumerTag-与消费者相关联的消费者标签
                 envelope -消息的打包数据
                 properties -消息的内容头数据
                 body -消息正文(客户端特定的不透明字节数组)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                System.out.println("------------consumer message-----------");
                System.out.println("sonsumerTag:" + consumerTag);
                System.out.println("envelope:" + envelope);
                System.out.println("properties:" + properties);
                System.out.println("msg:" + new String(body));
            }
        });

    }
}

   

   

          

      一开始,我对RabbitMQ一点都不了解,但是项目中需要用到RabbitMQ,查资料算是springboot使用RabbitMQ实现了项目中的业务,但是,知其然不知其所以然,所以,从头扎实的了解下RabbitMQ

     RabbitMQ 不同交换机的处理机制,也算入门了,交换机的创建不同,其他基本类似,刚开始先会使用即可。

   

     站在前辈的肩膀上,每天进步一点点

ends~

本文链接http://element-ui.cn/news/show-280.aspx