看下官方说明:https://www.rabbitmq.com/tutorials/tutorial-four-java.html
这是一个什么模式呢?
Direct交换机中接收到的数据,会以 路由 的形式进行绑定,发送。不再给全部消费者发送。这样设计的系统,会更加的合理。
生产者发送数据的时候需要绑定 路由key 了,以后 谁绑定了路由key ,就发送给谁。
这里 我不写图片中的Demo
小结:本来是放在最下面,但是放在上面方便查看,更易于理解
Routing模式要求队列在绑定交换机时 要制定routing key,消息会发给符合的队列中,记得交换机必须时direct 才可以。
代码实现的效果
一个Direct 交换机中 2个队列 分别绑定了不同的 路由key
队列1 绑定的路由key 有black、green、orange
队列2 绑定的路由key 有black
生产者 分别再 black、green、orange 的路由上 发送了一条消息,一共3条
队列1 全部接收到消息
队列2 只能接收到black的消息
生产者代码
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Routingprovider {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建 连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
//2 连接 连接
Connection connection = connectionFactory.newConnection();
//3 通过连接 创建信道
Channel channel = connection.createChannel();
// 设置 队列名称
String quotoName1 = "test_direct_queue1";
String quotoName2 = "test_direct_queue2";
// 让信道与队列进行声明(绑定)
channel.queueDeclare(quotoName1,true,false,false,null);
channel.queueDeclare(quotoName2,true,false,false,null);
//4 设置交换机名称
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//5 队列1的绑定 有 orange black green
channel.queueBind(quotoName1,exchangeName,"orange");
channel.queueBind(quotoName1,exchangeName,"black");
channel.queueBind(quotoName1,exchangeName,"green");
channel.queueBind(quotoName2,exchangeName,"black");
String orange = "Orange:消息 XXX";
String black = "Black: 消息 XXX";
String green = "Green: 消息 XXX";
//6 发送数据 一共发送3条数据
channel.basicPublish(exchangeName,"orange",null,orange.getBytes());
channel.basicPublish(exchangeName,"black",null,black.getBytes());
channel.basicPublish(exchangeName,"green",null,green.getBytes());
//7 释放资源
channel.close();
connection.close();
}
}
自己手写代码的时候 ,记得去看下交换机绑定的路由对不对,如果多绑定、少绑定,一定时你代码写错了,记得解除所有的routing key
消费者代码
消费者1:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Routingconsumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置相关参数
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
//2 建立连接
Connection connection = connectionFactory.newConnection();
//3 创建信道
Channel channel = connection.createChannel();
// 声明队列名称
String queue1 = "test_direct_queue1";
//6 创建接收回调
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
//接收消息
channel.basicConsume(queue1,true,consumer);
}
}
消费者2
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Routingconsumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置相关参数
connectionFactory.setHost("118.31.127.248");
connectionFactory.setPort(5672);
connectionFactory.setUsername("zanglikun");
connectionFactory.setPassword("zanglikun");
connectionFactory.setVirtualHost("/govbuy");
//2 建立连接
Connection connection = connectionFactory.newConnection();
//3 创建信道
Channel channel = connection.createChannel();
// 声明队列名称
String queue2 = "test_direct_queue2";
//6 创建接收回调
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(new String(body));
}
};
//接收消息
channel.basicConsume(queue2,true,consumer);
}
}
启动2个消费者,在启动生产者
此时 2个消费者 都能接收到消息了。
路由模式完成
特殊说明:
上述文章均是作者实际操作后产出。烦请各位,请勿直接盗用!转载记得标注原文链接:www.zanglikun.com
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤
第三方平台不会及时更新本文最新内容。如果发现本文资料不全,可访问本人的Java博客搜索:标题关键字。以获取最新全部资料 ❤
评论(0)