direct模式

  • 默认模式
  • 通过routingKey发送给对应的queue

准备步骤

  • 在对应的Virtual Hosts中创建Exchange

生产者

import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeoutException;
public class MyProducer {
    public static void main(String[] args) throws Exception, TimeoutException {
        LinkedHashMap<String, String> strMap = new LinkedHashMap<>();
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        for (int i=0;i<10;i++){
               /**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
            channel.basicPublish("routing01",String.valueOf(i),null,("Hello Fan Out"+i).getBytes());
        }
        channel.close();
        connection.close();
    }
}

消费者

package Routing;

import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.*;

import java.io.IOException;

public class MyConsumer {
    public static void main(String[] args) throws Exception{
        Connection connection = RabbitUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("baidu",false,false,false,null);
        channel.basicQos(1);//每次只会取一条数据处理
         /**
     * Bind a queue to an exchange, with no extra arguments.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue
     * @param exchange the name of the exchange
     * @param routingKey the routing key to use for the binding
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
        channel.queueBind("baidu","routing01",String.valueOf(1));
        channel.basicConsume("baidu",false,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //super.handleDelivery(consumerTag, envelope, properties, body);
                String jsonMeg = new String(body);
                System.out.println("  百度:                           "+jsonMeg);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

Rabbitmq/queue与消费者 上一篇
Rabbitmq/fanout模式 下一篇