fanout模式

  • 广播模式
  • Exchange会把消息发送给所有绑定在当前交换机上的队列

准备步骤

  • 在对应的Virtual Hosts中创建Exchange
  • Exchange的模式改为fanout模式

生产者

package fanout;
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.concurrent.TimeoutException;
public class MyProducer {
    public static void main(String[] args) throws Exception, TimeoutException {
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        for (int i=0;i<10000;i++){
            channel.basicPublish("exchange01","",null,("Hello Fan Out"+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者1号

package fanout;
import RabbitUtils.RabbitUtils;
import com.google.gson.Gson;
import com.rabbitmq.client.*;
import work.queue.Meg;
import java.io.IOException;
public class MyConsumer01 {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("baidu",false,false,false,null);
        channel.basicQos(1);//每次只会取一条数据处理
        channel.queueBind("baidu","exchange01","");
        channel.basicConsume("baidu",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String jsonMeg = new String(body);
                System.out.println("  百度:                           "+jsonMeg);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

消费者2号

package fanout;
import RabbitUtils.RabbitUtils;
import com.google.gson.Gson;
import com.rabbitmq.client.*;
import work.queue.Meg;
import java.io.IOException;
public class MyConsumer01 {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("google",false,false,false,null);
        channel.basicQos(1);//每次只会取一条数据处理
        channel.queueBind("google","exchange01","");//绑定交换机
        channel.basicConsume("google",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String jsonMeg = new String(body);
                System.out.println("MyConsumer01  :  "+jsonMeg);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

Rabbitmq/direct模式 上一篇
Rabbitmq/hello-world 下一篇