常见的交换机类型:

  • Direct Exchange:直连交换机。消息会被路由到所有路由键绑定队列
  • Fanout Exchange:广播交换机。消息会被路由到所有绑定队列,忽略路由键。
  • Topic Exchange:主题交换机。消息会被路由到所有绑定队列,支持通配符。

最简单的消息队列只需要生产者类消费者类,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@SpringBootTest
public class RabbitProducer {

@Test
public void testSimpleQueue() {
// 创建连接工厂
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.20.80");
connectionFactory.setUsername("leke");
connectionFactory.setPassword("leke@@@");
// 创建 RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 发送消息
rabbitTemplate.convertAndSend("e.test", "q.test", "hello, spring amqp!");

}
}


@Component
public class RabbitConsumer {
// 表明监听的队列名称
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "q.test", durable = "true"),
exchange = @Exchange(value = "e.test")))
public void listenSimpleQueueMessage(String msg) {
//3、打印消息,模拟处理消息
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}

}

但从项目设计角度出发,我们需要这样使用消息队列,如下:

RabbitMqConstant常量类:存放交换机名称和队列名称

1
2
3
4
5
6
public class RabbitMqConstant {

public static final String TEST_EXCHANGE = "e.test";
public static final String TEST_QUEUE = "q.test";

}

RabbitConfig配置类:注册交换机类和队列类,绑定交换机和队列(如果采用注解绑定,可以不写)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class RabbitConfig {
@Bean
public DirectExchange testExchange() {
return new DirectExchange(RabbitMqConstant.TEST_EXCHANGE);
}

@Bean
public Queue testQueue() {
return new Queue(RabbitMqConstant.TEST_QUEUE);
}

@Bean
public Binding testBinding() {
return BindingBuilder.bind(testQueue())
.to(testExchange())
.with(RabbitMqConstant.TEST_QUEUE);
}

// 其他的配置,例如配置消息监听容器工厂。以下代码定义了如何处理消息消费过程中的异常情况,特别是通过自定义的异常策略来控制消息的重试逻辑,增强了消息处理的健壮性和灵活性。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
@SuppressWarnings("unchecked")
FatalExceptionStrategy strategy = LekeFatalExceptionStrategy.retryOn(DataAccessException.class,
RemoteAccessException.class, Exception.class);
factory.setErrorHandler(new ConditionalRejectingErrorHandler(strategy));
return factory;
}
}
class FatalExceptionStrategy extends DefaultExceptionStrategy {

// 构建策略
@SuppressWarnings("unchecked")
public static FatalExceptionStrategy retryOn(Class<? extends Throwable>... exClasses) {
Set<Class<? extends Throwable>> set = new HashSet<>();
for (Class<? extends Throwable> clazz : exClasses) {
set.add(clazz);
}
return new FatalExceptionStrategy(set);
}

final Set<Class<? extends Throwable>> retryables;

public FatalExceptionStrategy(Set<Class<? extends Throwable>> exceptions) {
this.retryables = exceptions == null ? Collections.emptySet() : Collections.unmodifiableSet(exceptions);
}

@Override
protected boolean isUserCauseFatal(Throwable e) {
boolean needRetry = retryables.stream().anyMatch(clazz -> clazz.isInstance(e));
return !needRetry;
}
}

RabbitConsumer消费者类

1
2
3
4
5
6
7
8
9
10
11
@Component
public class RabbitConsumer {
// @RabbitListener(queues = RabbitMqConstant.TEST_QUEUE)
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = RabbitMqConstant.TEST_QUEUE, durable = "true"),
exchange = @Exchange(value = RabbitMqConstant.TEST_EXCHANGE)))
public void listenSimpleQueueMessage(String msg) {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}

}

RabbitProducer生产者类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootTest
public class RabbitProducer {

@Test
public void testSimpleQueue() {
// 创建连接工厂
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.20.80");
connectionFactory.setUsername("leke");
connectionFactory.setPassword("leke@@@");
// 创建 RabbitTemplate
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 配置交换机和队列
// DirectExchange testExchange = new DirectExchange(RabbitMqConstant.TEST_EXCHANGE);
// Queue testQueue = new Queue(RabbitMqConstant.SUBMIT_TASK_QUEUE);
// Binding binding = BindingBuilder.bind(testQueue).to(testExchange).with(RabbitMqConstant.TEST_QUEUE);
// 发送消息
rabbitTemplate.convertAndSend(RabbitMqConstant.TEST_EXCHANGE, RabbitMqConstant.TEST_QUEUE, "hello, spring amqp!");

}
}