SpringBoot整合
pom依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>yml配置
spring:
rabbitmq:
host: 192.168.1.171
username: admin
password: admin
virtual-host: /text01
port: 30672
publisher-confirm-type: correlated #必须配置这个才会确认回调 confirm
publisher-returns: true #return回调
listener:
type: simple
simple:
acknowledge-mode: manual #手动确认ack
prefetch: 10 #限制每次发送一条数据。
concurrency: 3 #同一个队列启动几个消费者
max-concurrency: 3 #启动消费者最大数量
default-requeue-rejected: true生产者
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionManager;
import java.util.UUID;
@Component
public class Producer {
final
RabbitTemplate rabbitTemplate;
public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void send(String exchange,String key,String meg){
this.rabbitTemplate.convertAndSend(exchange,key,meg,new CorrelationData(UUID.randomUUID().toString()));
}
}DirectConsumer
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class DirectConsumer {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory",bindings=@QueueBinding(
value=@Queue(value="directQueue",autoDelete="false"),
exchange=@Exchange(value="directEx",type= ExchangeTypes.DIRECT),
key = {"1.2","1.3"}
),ackMode = "MANUAL")
public void handler(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException {
System.out.println("handler: "+body);
//channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
//headers.forEach((k,v)->{System.out.println("key:"+k+" value:"+v);});
}
@RabbitListener(containerFactory = "rabbitListenerContainerFactory",bindings=@QueueBinding(
value=@Queue(value="directQueue",autoDelete="false"),
exchange=@Exchange(value="directEx",type= ExchangeTypes.DIRECT),
key = {"1.2","1.3"}
),ackMode = "MANUAL")
public void handler1(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
System.out.println("handler1: "+body);
//channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
Thread.sleep(1000);
channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
//headers.forEach((k,v)->{System.out.println("key:"+k+" value:"+v);});
}
}FanoutConsumer
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class FanoutConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanoutQueue01",autoDelete = "true"),
exchange = @Exchange(value = "fanout",type = ExchangeTypes.FANOUT)
),ackMode = "MANUAL")
public void handler1(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
System.out.println("handler1: "+body);
//channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
Thread.sleep(1000);
channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
//headers.forEach((k,v)->{System.out.println("key:"+k+" value:"+v);});
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "fanoutQueue02",autoDelete = "true"),
exchange = @Exchange(value = "fanout",type = ExchangeTypes.FANOUT)
),ackMode = "MANUAL")
public void handler(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
System.out.println("handler: "+body);
//channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
//headers.forEach((k,v)->{System.out.println("key:"+k+" value:"+v);});
}
}TopicConsumer
package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class TopicConsumer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topicQueue01",autoDelete = "true"),
exchange = @Exchange(value = "topic",type = ExchangeTypes.TOPIC),
key = {"#.1","1.#"}
),ackMode = "MANUAL")
public void handler1(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
System.out.println(headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+"topicQueue01: "+body);
//channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
//headers.forEach((k,v)->{System.out.println("key:"+k+" value:"+v);});
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "topicQueue02",autoDelete = "true"),
exchange = @Exchange(value = "topic",type = ExchangeTypes.TOPIC),
key = {"#.2","2.#","3.*"}
),ackMode = "MANUAL")
public void handler(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
System.out.println(headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+"topicQueue02: "+body);
//channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
//headers.forEach((k,v)->{System.out.println("key:"+k+" value:"+v);});
}
}Text
package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemoApplicationTests {
@Autowired
Producer producer;
@Test
void Text() {
producer.rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{
System.out.println("Return");
}
);
producer.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("Ack:"+ack);
}
});
for (int i=0;i<10;i++){
producer.send("topic",i+".1","Hello world:"+i);
}
}
}本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!