@GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); }
//使用匿名队列作为消息队列 @Bean public Queue queue() { return new AnonymousQueue(); } //声明DirectExchange交换器,绑定队列到交换器 @Bean public Declarables declarables() { DirectExchange exchange = new DirectExchange(EXCHANGE); return new Declarables(queue(), exchange, BindingBuilder.bind(queue()).to(exchange).with("")); }
//监听队列,队列名称直接通过SpEL表达式引用Bean @RabbitListener(queues = "#{queue.name}") public void memberService(String userName) { log.info("memberService: welcome message sent to new user {} from {}", userName, System.getProperty("server.port"));
@Slf4j @Configuration @RestController @RequestMapping("fanoutwrong") public class FanoutQueueWrong { private static final String QUEUE = "newuser"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate;
@GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } //声明FanoutExchange,然后绑定到队列,FanoutExchange绑定队列的时候不需要routingKey @Bean public Declarables declarables() { Queue queue = new Queue(QUEUE); FanoutExchange exchange = new FanoutExchange(EXCHANGE); return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange)); } //会员服务实例1 @RabbitListener(queues = QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName);
} //会员服务实例2 @RabbitListener(queues = QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName);
} //营销服务实例1 @RabbitListener(queues = QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } //营销服务实例2 @RabbitListener(queues = QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
@Slf4j @Configuration @RestController @RequestMapping("fanoutright") public class FanoutQueueRight { private static final String MEMBER_QUEUE = "newusermember"; private static final String PROMOTION_QUEUE = "newuserpromotion"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } @Bean public Declarables declarables() { //会员服务队列 Queue memberQueue = new Queue(MEMBER_QUEUE); //营销服务队列 Queue promotionQueue = new Queue(PROMOTION_QUEUE); //广播交换器 FanoutExchange exchange = new FanoutExchange(EXCHANGE); //两个队列绑定到同一个交换器 return new Declarables(memberQueue, promotionQueue, exchange, BindingBuilder.bind(memberQueue).to(exchange), BindingBuilder.bind(promotionQueue).to(exchange)); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
[20:02:31.533] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [WARN ] [o.s.a.r.l.ConditionalRejectingErrorHandler:129 ] - Execution of Rabbit message listener failed. org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void org.geekbang.time.commonmistakes.asyncprocess.deadletter.MQListener.handler(java.lang.String)' threw exception at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:219) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:143) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:132) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1569) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1488) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1476) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1467) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1411) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException: error at org.geekbang.time.commonmistakes.asyncprocess.deadletter.MQListener.handler(MQListener.java:14) at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:211) ... 13 common frames omitted