RabbitMQ消息队列

RabbitMQ消息队列

  1. 消息队列的概念
  2. windows下面安装RabbitMQ
  3. RabbitMQ的初步使用
  4. RabbitMQ的几种模式
  5. springboot整合RabbitMQ
  6. 关于持久化操作,消息确认(自动和手动),事务管理

消息队列的概念

首先消息队列是应用程序之间的一种通信办法,其次,在应用程序中可以将一些无需及时返回的以及耗时的业务提取出来,通过异步的方式,当然异步的方式主要就是节省了服务器的响应时间,从而提高系统的吞吐量

消息队列的应用场景一般在应用解耦、异步处理(提高系统响应速度)、流量削峰(高峰堆积消息,峰后继续处理消息)、日志处理、通讯上面。

消息队列模型(AMQP和JMS)

AMQP高级消息队列协议,是一个进程间传递异步消息的网络协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式.

JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

注意:JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式,JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。JMS规定了两种消息模式;而AMQP的消息模式更加丰富,简单的说就好比一个是已经开发好的应用,一个定义了一系列的标准。

关于消息队列的产品

kafka

Apache下的一个子项目,使用scala实现的一个高性能分布式Publish/Subscribe消息队列系统。

  1. 快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化
  2. 高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率
  3. 高堆积:支持topic下消费者较长时间离线,消息堆积量大;
  4. 完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡

rabbitmq

RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 :

  1. 能够保证严格的消息顺序,提供丰富的消息拉取模式
  2. 高效的订阅者水平扩展能力.实时的消息订阅机制
  3. 支持事务消息,亿级消息堆积能力.
  4. 生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持

支持的模式有以下(来源于官网):

image-20210125214506194

image-20210125214611902

image-20210125214630439

windows下面安装RabbitMQ

因为RabbitMQ基于erlang语言,所以首先先安装erlang语言,安装过程主要为

  1. 下载并安装erlang语言,配置相应的环境变量。CMD执行

    1
    erl # 截图如下

    image-20210125225309280

2.下载并安装rabbitmq,然后在sbin目录下执行

1
2
rabbitmq-plugins enable rabbitmq_management # 截图如下
rabbitmq-plugins disable rabbitmq_management # 该命令为关闭

image-20210125230927166

3.在此目录下查看rabbitmq状态

1
rabbitmqctl status # 截图如下

image-20210125231404437

4.输入地址http://localhost:15672/ 默认账户密码guest

image-20210125231613526

关于用户

image-20210125231702280

角色说明:

  1. 超级管理员(administrator) 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。

  2. 监控者(monitoring) 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

  3. 策略制定者(policymaker)可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。

  4. 普通管理者(management)仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。

关于 Virtual Hosts配置

Virtual Hosts就是虚拟主机的意思.相当于一个独立的数据库,数据库之间不受影响.一般以/开头

image-20210125232301322

当然同理可以不同的用户能访问的虚拟注意不一样

RabbitMQ的初步使用

1.创建一个maven项目,pom.xml文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wwj</groupId>
<artifactId>rabbitmq1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
</project>

2.创建生产者和消费者都需要提供一个连接对象。先封装一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Util {
public static Connection getConnection() throws Exception{
//1、创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//2、设置RabbitMQ服务主机地址
factory.setHost("localhost");
//3、设置RabbitMQ服务端口
factory.setPort(5672);
//4、设置虚拟主机名字,默认/-factory.setVirtualHost("szitheima")
factory.setVirtualHost("/test");
//5、设置用户连接名
factory.setUsername("guest");
//6、设置链接密码
factory.setPassword("guest");
//7、创建链接
Connection connection = factory.newConnection();
return connection;
}

public static void main(String[] args) {
try {
Connection con = Util.getConnection();
System.out.println("连接成功");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}

结果如下:

image-20210125235252704

RabbitMQ的几种模式

模式一简单发送消息

生产者:也就是要发送消息的程序.

消费者:消息的接受者,会一直等待消息到来.

消息队列:生产者向其中投递消息,消费者从其中取出消息.

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class P {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
//定义队列.分别对应名字,持久化,独占连接,自动删除,附加参数
channel.queueDeclare("hello1",true,false,false,null);
//构建消息
String message = "第一条消息";
//分别对应交换机,路由值,其他参数,消息主题
channel.basicPublish("","hello1",null,message.getBytes("utf-8"));
channel.close();
con.close();
}
}

可以看到有一条消息待读

image-20210126001101869

消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.wwj.util.Util;

public class C {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
channel.queueDeclare("hello1",true,false,false,null);
Consumer c = new DefaultConsumer(channel) {
//重写处理传递的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body,"utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
};
};
//创建消费者 分别对应主题,应答策略,消费者回调方法
channel.basicConsume("hello1",true,c);
//如果不关闭.消费者一直存在
};
}

image-20210126002157514

以下可以看到消息在被读取后就删除了,消费者一直挂起中

image-20210126002323551

模式二多个消费者(有竞争关系的)

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwj.util.Util;

public class P {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
//定义队列.分别对应名字,持久化,独占连接,自动删除,附加参数
channel.queueDeclare("hello1",true,false,false,null);
//构建10条消息
for (int i = 1; i < 11; i++) {
String message = "第"+i+"条消息";
//分别对应交换机,路由值,其他参数,消息主题
channel.basicPublish("","hello1",null,message.getBytes("utf-8"));
}
channel.close();
con.close();
}
}

构建多个消费者,代码一样,参照模式一

先启动消费者,然后启动生产者,观察

image-20210126003314899

image-20210126003413596

相当于10个相同的任务,被雇佣的2个员工分担了

模式三(发布订阅模式)

注意:生产者是通过发布消息是通过交换机传递到队列中

关于交换机的说明

交换机,。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:

​ Fanout:广播,将消息交给所有绑定到交换机的队列

​ Direct:定向,把消息交给符合指定routing key 的队列

​ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwj.util.Util;

public class P {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
//改为定义交换机
channel.exchangeDeclare("exchange1", BuiltinExchangeType.FANOUT);
//构建10条消息
for (int i = 1; i < 11; i++) {
String message = "第"+i+"条消息";
//分别对应交换机,路由值,其他参数,消息主题
channel.basicPublish("exchange1","",null,message.getBytes("utf-8"));
}
channel.close();
con.close();
}
}

消费者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.wwj.util.Util;

public class C1 {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
channel.queueDeclare("q1",true,false,false,null);
//通过绑定的方式将主题绑定到交换机中
channel.queueBind("q1", "exchange1", "");
Consumer c = new DefaultConsumer(channel) {
//重写处理传递的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body,"utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
};
};
//创建消费者 分别对应主题,应答策略,消费者回调方法
channel.basicConsume("q1",true,c);
//如果不关闭.消费者一直存在
};
}

同样启动消费者,然后观察结果

image-20210126005606249

image-20210126005650320

可以看到消费者都消费了10条消息。q1和q2主题绑定在了同一个交换机上面

模式四(有选择性的接收)

主要是在exchange交换上加上了路由值。可以将路由值理解为分类值

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwj.util.Util;

public class P {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
//需要将交换机类型改为定向
channel.exchangeDeclare("exchange1", BuiltinExchangeType.DIRECT);
//构建10条消息
for (int i = 1; i < 4; i++) {
String message = "第"+i+"条消息";
//定义分类值
String rk = "";
//代表rk为不同的类型
switch (i){
case 1:
rk = "apple";
break;
case 2:
rk = "huawei";
break;
case 3:
rk = "xiaomi";
break;
}
//分别对应交换机,路由值,其他参数,消息主题
channel.basicPublish("exchange1",rk,null,message.getBytes("utf-8"));
}
channel.close();
con.close();
}
}

消费者代码如下:消费者1消费apple 消费者2消费huawei和xiaomi

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.wwj.util.Util;

public class C2 {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
channel.queueDeclare("q2",true,false,false,null);
//通过绑定的方式将主题绑定到交换机中
channel.queueBind("q2", "exchange1", "huawei");
channel.queueBind("q2", "exchange1", "xiaomi");
Consumer c = new DefaultConsumer(channel) {
//重写处理传递的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body,"utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
};
};
//创建消费者 分别对应主题,应答策略,消费者回调方法
channel.basicConsume("q2",true,c);
//如果不关闭.消费者一直存在
};
}

同样观察结果

image-20210126012017799

image-20210126012551797

可以不同的类别分发到了不同主题中

模式伍(基于特定规则的接收)

在模式四的基础上,增加了通配符,也就是类别可以无限细化.因为routingkey 一般都是有一个或多个单词组成,多个单词之间以“ . ”分割

  1. #:匹配一个或多个词
  2. *:匹配不多不少恰好1个词

比如item.#:能够匹配item.insert.abc 或者 item.insert,item.*:只能匹配item.insert

生产者代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.wwj.util.Util;

public class P {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
//需要将交换机类型改为主题
channel.exchangeDeclare("exchange1", BuiltinExchangeType.TOPIC);
//构建10条消息
for (int i = 1; i < 5; i++) {
String message = "第"+i+"条消息";
//定义分类值
String rk = "";
//代表rk为不同的类型
switch (i){
case 1:
rk = "apple";
break;
case 2:
rk = "huawei";
break;
case 3:
rk = "xiaomi";
break;
case 4:
rk = "huawei.rongyao";
break;
}
//分别对应交换机,路由值,其他参数,消息主题
channel.basicPublish("exchange1",rk,null,message.getBytes("utf-8"));
}
channel.close();
con.close();
}
}

消费者如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class C2 {
public static void main(String[] args) throws Exception {
Connection con = Util.getConnection();
Channel channel = con.createChannel();
channel.queueDeclare("q2",true,false,false,null);
//通过绑定的方式将主题绑定到交换机中
channel.queueBind("q2", "exchange1", "huawei.#");
channel.queueBind("q2", "exchange1", "xiaomi");
Consumer c = new DefaultConsumer(channel) {
//重写处理传递的方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String exchange = envelope.getExchange();
long deliveryTag = envelope.getDeliveryTag();
String message = new String(body,"utf-8");
System.out.println(
"routingKey:" + routingKey +
",exchange:" + exchange +
",deliveryTag:" + deliveryTag +
",message:" + message);
};
};
//创建消费者 分别对应主题,应答策略,消费者回调方法
channel.basicConsume("q2",true,c);
//如果不关闭.消费者一直存在
};
}

观察结果

image-20210126013353769

image-20210126013433116

Springboot整合RabbitMQ

创建springboot项目

pom.xml如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wwj</groupId>
<artifactId>srb</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

分别在src/main/java下构建启动程序

image-20210126023802653

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootApplication
public class App {
public static void main(String[] args){

SpringApplication.run(App.class, args);

}
}


@RestController
public class TController {

@RequestMapping("t")
public String t() {
return "ok";
}

}

server.port=8032

整合RabbitMQ

添加依赖

1
2
3
4
   <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

修改springboot的配置文件

1
2
3
4
5
6
7
8
9
10
11
12
#rabbitmq连接参数
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 开启发送确认
spring.rabbitmq.publisher-confirms=true
# 开启发送失败退回
spring.rabbitmq.publisher-returns=true
# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

初始化交换机和主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

/**
* 队列可以设置以下参数
* durable: 是否持久化;exclusive: 是否独享、排外的;autoDelete: 是否自动删除
* @return
*/
@Bean
public Queue firstQueue() {
return new Queue("q1");
}

@Bean
public Queue secondQueue() {
return new Queue("q2");
}


//具体用什么模式在这里定义不同的交换机类型 FanoutExchange DirectExchange TopicExchange
@Bean
TopicExchange exchange() {
return new TopicExchange("exchage1");
}

//配置不同队列到交换机上(选择不同的策略)
@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with("q1");
}


@Bean
Binding bindingExchangeMessage2() {
return BindingBuilder.bind(secondQueue()).to(exchange()).with("q2.#");
}

}

构建生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class Produce {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sen1() {
for (int i = 0; i < 5; i++) {
System.out.println("生产者发送消息,序号为: " + i);
rabbitTemplate.convertAndSend("exchage1", "q1", String.valueOf(i));
}
}

public void send2() {
for (int i = 0; i < 5; i++) {
System.out.println("生产者发送消息,序号为: " + i);
rabbitTemplate.convertAndSend("exchage1", "q2.#", String.valueOf(i));
}
}

}

构建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class Consumer {

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q1", autoDelete = "false"), exchange = @Exchange(value = "exchange1", type = ExchangeTypes.TOPIC), key = "q1.#"))
public void process1(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}


@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q1", autoDelete = "false"), exchange = @Exchange(value = "exchange1", type = ExchangeTypes.TOPIC), key = "q1.#"))
public void process2(Message message, Channel channel) throws IOException {
System.out.println("拒绝消息"+new String(message.getBody()));
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "q2", autoDelete = "false"), exchange = @Exchange(value = "exchange1", type = ExchangeTypes.TOPIC), key = "q2.#"))
public void process3(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}

}

最后在启动文件中加入@EnableRabbit

观察如下:

image-20210126043328416

调用send2发送消息

image-20210126043956076

接下来调用send1

image-20210126044505203

image-20210126044756877

关于持久化操作,消息确认(自动和手动),事务管理

  1. 持久化操作。如果队列与消息不持久化,那么服务在崩溃了之后,消息会丢失。所以可以在构建队列的时候设置durable,以及auto-delete设置为false
  2. 消息确认自动和手动:rabbitmq默认确认方式为自动,也就是消息一旦发出立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递,那么有可能的情况就是消费者崩掉以后,消息丢失的情况会产生
  3. 选择手动,通过channel.basicAck 确认 和channel.basicReject 拒绝。一般出现异常的时候,catch异常再拒绝入列
  4. 事务管理:。生产中不建议使用事务模式,性能比较低,尽量使用手动确认模式