topic模式
- 与direct模式差不多,都要通过routingKey发送给对应的queue
- 区别就是topic是模糊匹配routingKey
准备步骤
- 在对应的Virtual Hosts中创建Exchange
- Exchange的模式改为topic模式
生产者
package topic;
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeoutException;
public class MyProducer {
public static void main(String[] args) throws Exception, TimeoutException {
LinkedHashMap<String, String> strMap = new LinkedHashMap<>();
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
for (int i=0;i<10;i++){
channel.basicPublish("topic01","1."+String.valueOf(i),null,("Hello Fan Out"+i).getBytes());
}
channel.close();
connection.close();
}
}消费者
package topic;
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("baidu",false,false,false,null);
channel.basicQos(1);//每次只会取一条数据处理
channel.queueBind("baidu","topic01","1.#");
channel.basicConsume("baidu",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String jsonMeg = new String(body);
System.out.println(" 百度: "+jsonMeg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}ps
- 要点:channel.queueBind()
- 在绑定的时候使用 # * 进行模糊处理
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!