topic模式

  • 与direct模式差不多,都要通过routingKey发送给对应的queue
  • 区别就是topic是模糊匹配routingKey

准备步骤

  • 在对应的Virtual Hosts中创建Exchange
  • Exchange的模式改为topic模式

生产者

package topic;
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeoutException;
public class MyProducer {
    public static void main(String[] args) throws Exception, TimeoutException {
        LinkedHashMap<String, String> strMap = new LinkedHashMap<>();
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        for (int i=0;i<10;i++){
            channel.basicPublish("topic01","1."+String.valueOf(i),null,("Hello Fan Out"+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者

package topic;
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("baidu",false,false,false,null);
        channel.basicQos(1);//每次只会取一条数据处理
        channel.queueBind("baidu","topic01","1.#");
        channel.basicConsume("baidu",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String jsonMeg = new String(body);
                System.out.println("  百度:                           "+jsonMeg);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

ps

  • 要点:channel.queueBind()
  • 在绑定的时候使用 # * 进行模糊处理

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

Rabbitmq/安装RabbitMq 上一篇
Rabbitmq/发送者与交换机 下一篇