direct模式
- 默认模式
- 通过routingKey发送给对应的queue
准备步骤
- 在对应的Virtual Hosts中创建Exchange
生产者
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.LinkedHashMap;
import java.util.concurrent.TimeoutException;
public class MyProducer {
public static void main(String[] args) throws Exception, TimeoutException {
LinkedHashMap<String, String> strMap = new LinkedHashMap<>();
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
for (int i=0;i<10;i++){
/**
* Publish a message.
*
* Publishing to a non-existent exchange will result in a channel-level
* protocol exception, which closes the channel.
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
channel.basicPublish("routing01",String.valueOf(i),null,("Hello Fan Out"+i).getBytes());
}
channel.close();
connection.close();
}
}消费者
package Routing;
import RabbitUtils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("baidu",false,false,false,null);
channel.basicQos(1);//每次只会取一条数据处理
/**
* Bind a queue to an exchange, with no extra arguments.
* @see com.rabbitmq.client.AMQP.Queue.Bind
* @see com.rabbitmq.client.AMQP.Queue.BindOk
* @param queue the name of the queue
* @param exchange the name of the exchange
* @param routingKey the routing key to use for the binding
* @return a binding-confirm method if the binding was successfully created
* @throws java.io.IOException if an error is encountered
*/
channel.queueBind("baidu","routing01",String.valueOf(1));
channel.basicConsume("baidu",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//super.handleDelivery(consumerTag, envelope, properties, body);
String jsonMeg = new String(body);
System.out.println(" 百度: "+jsonMeg);
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!