SpringBoot整合

pom依赖配置

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

yml配置

spring:
  rabbitmq:
    host: 192.168.1.171
    username: admin
    password: admin
    virtual-host: /text01
    port: 30672
    publisher-confirm-type: correlated #必须配置这个才会确认回调 confirm
    publisher-returns: true #return回调 
    listener:
      type: simple
      simple:
        acknowledge-mode: manual #手动确认ack
        prefetch: 10 #限制每次发送一条数据。
        concurrency: 3 #同一个队列启动几个消费者
        max-concurrency: 3 #启动消费者最大数量
        default-requeue-rejected: true

生产者

package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionManager;
import java.util.UUID;
@Component
public class Producer {
    final
    RabbitTemplate rabbitTemplate;
    public Producer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
    public void send(String exchange,String key,String meg){
        
        this.rabbitTemplate.convertAndSend(exchange,key,meg,new CorrelationData(UUID.randomUUID().toString()));
    }
}

DirectConsumer

package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class DirectConsumer {
    @RabbitListener(containerFactory = "rabbitListenerContainerFactory",bindings=@QueueBinding(
            value=@Queue(value="directQueue",autoDelete="false"),
            exchange=@Exchange(value="directEx",type= ExchangeTypes.DIRECT),
            key = {"1.2","1.3"}
    ),ackMode = "MANUAL")
    public void handler(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException {
        System.out.println("handler:   "+body);
        //channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
        channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
        //headers.forEach((k,v)->{System.out.println("key:"+k+"    value:"+v);});
    }
    @RabbitListener(containerFactory = "rabbitListenerContainerFactory",bindings=@QueueBinding(
            value=@Queue(value="directQueue",autoDelete="false"),
            exchange=@Exchange(value="directEx",type= ExchangeTypes.DIRECT),
            key = {"1.2","1.3"}
    ),ackMode = "MANUAL")
    public void handler1(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
        System.out.println("handler1:   "+body);
        //channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
        Thread.sleep(1000);
        channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
        //headers.forEach((k,v)->{System.out.println("key:"+k+"    value:"+v);});
    }
}

FanoutConsumer

package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class FanoutConsumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanoutQueue01",autoDelete = "true"),
            exchange = @Exchange(value = "fanout",type = ExchangeTypes.FANOUT)
    ),ackMode = "MANUAL")
    public void handler1(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
        System.out.println("handler1:   "+body);
        //channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
        Thread.sleep(1000);
        channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
        //headers.forEach((k,v)->{System.out.println("key:"+k+"    value:"+v);});
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanoutQueue02",autoDelete = "true"),
            exchange = @Exchange(value = "fanout",type = ExchangeTypes.FANOUT)
    ),ackMode = "MANUAL")
    public void handler(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
        System.out.println("handler:   "+body);
        //channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
        channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
        //headers.forEach((k,v)->{System.out.println("key:"+k+"    value:"+v);});
    }
}

TopicConsumer

package com.example.demo;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class TopicConsumer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topicQueue01",autoDelete = "true"),
            exchange = @Exchange(value = "topic",type = ExchangeTypes.TOPIC),
            key = {"#.1","1.#"}
    ),ackMode = "MANUAL")
    public void handler1(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
        System.out.println(headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+"topicQueue01:   "+body);
        //channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
        channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
        //headers.forEach((k,v)->{System.out.println("key:"+k+"    value:"+v);});
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topicQueue02",autoDelete = "true"),
            exchange = @Exchange(value = "topic",type = ExchangeTypes.TOPIC),
            key = {"#.2","2.#","3.*"}
    ),ackMode = "MANUAL")
    public void handler(@Payload String body, @Headers Map<String,Object> headers, Channel channel) throws IOException, InterruptedException {
        System.out.println(headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY)+"topicQueue02:                          "+body);
        //channel.basicNack((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false,false);
        channel.basicAck((Long)headers.get(AmqpHeaders.DELIVERY_TAG),false);
        //headers.forEach((k,v)->{System.out.println("key:"+k+"    value:"+v);});
    }
}

Text

package com.example.demo;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class DemoApplicationTests {
    @Autowired
    Producer producer;
    @Test
    void Text() {
        producer.rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->{
                System.out.println("Return");
            }
        );
        producer.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("Ack:"+ack);
            }
        });
        for (int i=0;i<10;i++){
            producer.send("topic",i+".1","Hello world:"+i);
        }
    }
}

本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!

语法基础 上一篇
Rabbitmq/提取工具类 下一篇