官网说:在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为“发布/订阅”。
这句话什么意思?
就是生产者 直接将消息 发送至X 交换机,交换机路由分发给不同的队列。
常用交换机类型:
Fanout:广播,将消息交给所有绑定到交换机上的队列
Direct:定向,将消息发送到指定的 RoutingKey 的 队列
Topic:通配符,将消息交给符合routing pattern 的队列
Headers:参数匹配 (不讲解)
交换机:只负责发送消息,并不存储消息,因此,如果没有队列与交换机绑定。或者没有符合的路由规则的队列,那么消息就会消失。
一旦交换机写死,那么就只能按照次交换机的类型来发送
要学会一个英语单词
Declare : 声明
下面代码最终实现效果如:一个生产者,生产的2条数据,2个消费者都可以接收
生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubprovider {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接工程对象 记得是RabbitMQ包下的
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.31.127.248");
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/govbuy");
//2 用工厂对象创建连接
Connection connection = connectionFactory.newConnection();
//3 创建队列
Channel channel = connection.createChannel();
//4 创建交换机
/** 参数说明
* String exchange, 交换机名称
* BuiltinExchangeType type, 交换机的类型 枚举类型
* DIRECT("direct") 定向,
* FANOUT("fanout") 扇形广播,发送消息到每个与其绑定的队列,
* TOPIC("topic") 通配符,
* HEADERS("headers") 参数匹配;
* boolean durable, 是否持久化
* boolean autoDelete, 是否自动删除
* boolean internal, 内部使用 一般为false
* Map<String, Object> arguments 参数列表 设为null
*/
String exchangename = "test_fanout";
channel.exchangeDeclare(exchangename, BuiltinExchangeType.FANOUT,true,false,false,null);
//5 创建队列
//创建队列名称
String queque1 = "test_fanout_queue1";
String queque2 = "test_fanout_queue2";
channel.queueDeclare(queque1,true,false,false,null);
channel.queueDeclare(queque2,true,false,false,null);
//6 绑定队列、交换机
/**
* 相关参数
* String queue, 队列名称
* String exchange, 交换机名称
* String routingKey, 路由key,绑定规则 如果交换机类型为fanout 那么路由key为"" ,为什么呢?因为广播,需要发送到与之绑定的所有队列
* Map<String, Object> arguments
*
*/
channel.queueBind(queque1,exchangename,"");
channel.queueBind(queque2,exchangename,""); // 绑定完成后,没有发送消息钱,就可以登录 15672 在交换机看到这2个队列了
//声明一下数据
String body = "张三,调用了findAll方法";
//7 发送消息
channel.basicPublish(exchangename,"",null,body.getBytes());
//8 释放资源
channel.close();
connection.close();
}
}
消费者代码
消费者1 和 2 的区别时 绑定的队列名称不一样,看清楚,其他地方都一样)
消费者1 代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubconsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接工厂对象 并设置相应参数
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
//2 创建连接
Connection connection = connectionFactory.newConnection();
//3 创建队列
Channel channel = connection.createChannel();
//4 设置队列相关信息
//创建队列名称
String queque1 = "test_fanout_queue1";
String queque2 = "test_fanout_queue2";
//5 创建 Counsumer
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("拿到数据,执行相关业务逻辑");
}
};
channel.basicConsume(queque1,true,consumer);
}
}
消费者2 代码
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PubSubconsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接工厂对象 并设置相应参数
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
//2 创建连接
Connection connection = connectionFactory.newConnection();
//3 创建队列
Channel channel = connection.createChannel();
//4 设置队列相关信息
//创建队列名称
String queque1 = "test_fanout_queue1";
String queque2 = "test_fanout_queue2";
//5 创建 Counsumer
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
System.out.println("拿到数据,执行相关业务逻辑");
}
};
channel.basicConsume(queque2,true,consumer);
}
}
最终效果
先开启2个消费者,然后 开启生产者,发现2个消费者都接收到数据了。
启动生产者 消费者 1 2 显示相应的结果
特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤
评论(0)