2.31. Spring boot with RabbitMQ(AMQP)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
@Configuration
public class RabbitMQConfig {
public final static String QUEUE_NAME = "spring-boot-queue";
public final static String EXCHANGE_NAME = "spring-boot-exchange";
public final static String ROUTING_KEY = "spring-boot-key";
// 創建隊列
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME);
}
// 創建一個 topic 類型的交換器
@Bean
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
// 使用路由鍵(ROUTING_KEY)把隊列(Queue)綁定到交換器(Exchange)
@Bean
public Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("127.0.0.1", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
return new RabbitTemplate(connectionFactory);
}
}
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage")
public String sendMessage() {
new Thread(() -> {
for (int i = 0; i < 100; i++) {
String value = new DateTime().toString("yyyy-MM-dd HH:mm:ss");
System.out.println("send message {}", value);
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value);
}
}).start();
return "ok";
}
}
@Component
public class Consumer {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consumeMessage(String message) {
System.out.println("consume message {}", message);
}
}