目录
- 一、简介
- 二、maven依赖
- 三、编码实现
- 3.1、配置文件
- 3.2、配置类
- 3.3、监听器
- 3.4、消费服务
- 3.5、实体
- 四、验证
- 五、优化
- 5.1、注册任务执行器
- 5.2、配置任务执行器
- 5.3、启用异步执行器
一、简介
本篇文章主要来讲Spring Boot 整合Redis实现消息队列,实现redis用作消息队列有多种方式,比如:
- 基于List 的 rpush+lpop 或 lpush+rpop
- 基于List 的 rpush+blpop 或 lpush+brpop (阻塞式获取消息)
- 基于Sorted Set 的优先级队列
- Redis Stream (Redis5.0版本开始)
- Pub/Sub 机制
不过这里讲的是Pub/Sub 机制的,这种方式优缺点大致如下:
优点:
- 一个消息可以发布到多个消费者
- 消费者可以同时订阅多个信道,因此可以接收多种消息(处理时先根据信道判断)
- 消息即时发送,消费者会自动接收到信道发布的消息
缺点:
- 消息发布时,如果客户端不在线,则消息丢失
- 消费者处理消息时出现了大量消息积压,则可能会断开通道,导致消息丢失
- 消费者接收消息的时间不一定是一致的,可能会有差异(业务处理需要判重)
二、maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.0</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.alian</groupId><artifactId>redis-message-queue</artifactId><version>0.0.1-SNAPSHOT</version><name>redis-message-queue</name><description>redis-message-queue</description><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><project.package.directory>target</project.package.directory><java.version>1.8</java.version><!--com.fasterxml.jackson 版本--><jackson.version>2.9.10</jackson.version><!--lombok 版本--><lombok.version>1.16.14</lombok.version><!--阿里巴巴fastjson 版本--><fastjson.version>1.2.68</fastjson.version><!--junit 版本--><junit.version>4.12</junit.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--redis依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><!--用于序列化--><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>${jackson.version}</version></dependency><!--java 8时间序列化--><dependency><groupId>com.fasterxml.jackson.datatype</groupId><artifactId>jackson-datatype-jsr310</artifactId><version>${jackson.version}</version></dependency><!--JSON--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!--日志输出--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
三、编码实现
3.1、配置文件
application.properties
# 端口
server.port=8090
# 上下文路径
server.servlet.context-path=/redisQueue# Redis数据库索引(默认为0)
spring.redis.database=0
# Redis服务器地址
#spring.redis.host=192.168.0.193
spring.redis.host=127.0.0.1
# Redis服务器连接端口
spring.redis.port=6379
# Redis服务器连接密码(默认为空)
spring.redis.password=
# 连接池最大连接数(使用负值表示没有限制)
spring.redis.jedis.pool.max-active=20
# 连接池中的最小空闲连接
spring.redis.jedis.pool.min-idle=10
# 连接池中的最大空闲连接
spring.redis.jedis.pool.max-idle=10
# 连接池最大阻塞等待时间(使用负值表示没有限制)
spring.redis.jedis.pool.max-wait=20000
# 读时间(毫秒)
spring.redis.timeout=10000
# 连接超时时间(毫秒)
spring.redis.connect-timeout=10000
3.2、配置类
RedisConfiguration.java
package com.alian.queue.config;import com.alian.queue.listener.RedisMessageListenerListener;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;@Slf4j
@Configuration
@EnableCaching
public class RedisConfiguration {/*** redis配置** @param redisConnectionFactory* @return*/@Beanpublic RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {// 实例化redisTemplateRedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();//设置连接工厂redisTemplate.setConnectionFactory(redisConnectionFactory);// key采用String的序列化redisTemplate.setKeySerializer(keySerializer());// value采用jackson序列化redisTemplate.setValueSerializer(valueSerializer());// Hash key采用String的序列化redisTemplate.setHashKeySerializer(keySerializer());// Hash value采用jackson序列化redisTemplate.setHashValueSerializer(valueSerializer());//执行函数,初始化RedisTemplateredisTemplate.afterPropertiesSet();return redisTemplate;}@Beanpublic ChannelTopic channelTopic() {return new ChannelTopic("TOPIC_USER");}@Beanpublic RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, channelTopic());return redisMessageListenerContainer;}/*** key类型采用String序列化** @return*/private RedisSerializer<String> keySerializer() {return new StringRedisSerializer();}/*** value采用JSON序列化** @return*/private RedisSerializer<Object> valueSerializer() {//设置jackson序列化Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);//设置序列化对象jackson2JsonRedisSerializer.setObjectMapper(getMapper());return jackson2JsonRedisSerializer;}/*** 使用com.fasterxml.jackson.databind.ObjectMapper* 对数据进行处理包括java8里的时间** @return*/private ObjectMapper getMapper() {ObjectMapper mapper = new ObjectMapper();//设置可见性mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);//默认键入对象mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);//设置Java 8 时间序列化JavaTimeModule timeModule = new JavaTimeModule();timeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));timeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(DateTimeFormatter.ofPattern("HH:mm:ss")));timeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));timeModule.addDeserializer(LocalDate.class, new LocalDateDeserializer(DateTimeFormatter.ofPattern("yyyy-MM-dd")));timeModule.addDeserializer(LocalTime.class, new LocalTimeDeserializer(DateTimeFormatter.ofPattern("HH:mm:ss")));//禁用把时间转为时间戳mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);mapper.registerModule(timeModule);return mapper;}}
这里就是在整合redis的前提下(如果不懂可以参考:SpringBoot整合redis(redis支持单节点和集群)),然后新增了如下配置:
@Beanpublic ChannelTopic channelTopic() {return new ChannelTopic("TOPIC_USER");}@Beanpublic RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, channelTopic());return redisMessageListenerContainer;}
RedisMessageListenerContainer 是为Redis消息侦听器 MessageListener 提供异步行为的容器。处理侦听、转换和消息分派的低级别详细信息。它与低级别Redis(每个订阅一个连接)相反,容器只使用一个连接,该连接对所有注册的侦听器都是“多路复用”的,消息调度是通过任务执行器完成的。容器以惰性方式使用连接(仅当至少配置了一个侦听器时才使用连接),同时添加和删除侦听器具有未定义的结果,强烈建议对这些方法进行相应的同步/排序。
- 创建一个Redis消息监听器容器
- 设置 Redis 连接工厂
- 将监听器和管道名想绑定
- 配置管道名和推送消息时的管道名要一致,不然监听器监听不到消息
我这里使用的是主题订阅:ChannelTopic,你也可以使用模式匹配:PatternTopic,从而匹配多个信道。
3.3、监听器
package com.alian.queue.listener;import com.alian.queue.service.ConsumeService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;@Slf4j
@Component
public class RedisMessageListenerListener implements MessageListener {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Autowiredprivate ConsumeService consumeService;/*** 消息处理** @param message* @param pattern*/@Overridepublic void onMessage(Message message, byte[] pattern) {String channel = new String(pattern);log.info("onMessage --> 消息通道是:{}", channel);try {RedisSerializer<?> valueSerializer = redisTemplate.getValueSerializer();Object deserialize = valueSerializer.deserialize(message.getBody());log.info("反序列化的结果:{}", deserialize);if (deserialize == null) {return;}String md5DigestAsHex = DigestUtils.md5DigestAsHex(deserialize.toString().getBytes(StandardCharsets.UTF_8));log.info("计算得到的key: {}", md5DigestAsHex);Boolean result = redisTemplate.opsForValue().setIfAbsent(md5DigestAsHex, "1", 20, TimeUnit.SECONDS);if (Boolean.TRUE.equals(result)) {// redis消息进行处理consumeService.processMessage(channel, deserialize.toString());log.info("处理redis消息完成");} else {log.info("其他服务处理中");}} catch (Exception e) {e.printStackTrace();log.error("处理redis消息异常:", e);}}}
我们实现MessageListener 接口,就可以通过onMessage()方法接收到消息了,该方法有两个参数:
- 参数 message 的 getBody() 方法以二进制形式获取消息体, getChannel() 以二进制形式获取消息通道
- 参数 pattern 二进制形式的消息通道(实际和 message.getChannel() 返回值相同)
3.4、消费服务
ConsumeService.java
package com.alian.queue.service;import com.alian.queue.dto.UserDto;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;@Slf4j
@Service
public class ConsumeService {public void processMessage(String channel,String message) {// 可以根据channel再继续映射到不同的实现UserDto userDto = JSONObject.parseObject(message, UserDto.class);log.info("接收的结果:{}", userDto);// 做业务...// 还可以分布式锁幂等处理}
}
这个就是消息处理了,可以根据channel再继续映射到不同的实现,然后业务也可以继续使用分布式锁进行逻辑判断处理,这里就不具体去操作了。
3.5、实体
UserDto.java
package com.alian.queue.dto;import lombok.Data;import java.io.Serializable;
import java.time.LocalDateTime;@Data
public class UserDto implements Serializable {private String id;//员工IDprivate String name;//员工姓名private int age;//员工年龄private String department;//部门private double salary;//工资private LocalDateTime hireDate;//入职时间public UserDto() {}/** 简单的构造方法用于测试*/public UserDto(String id, String name, int age, String department, double salary, LocalDateTime hireDate) {this.id = id;this.name = name;this.age = age;this.department = department;this.salary = salary;this.hireDate = hireDate;}
}
四、验证
我们把上面的服务启动多个实例,这里就用端口区别,分别是 8090 和 8091,然后发送消息到 Redis,下面使我们的测试类:
@Slf4j
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class RedisMessageQueueTest {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@Testpublic void sendMessage() {UserDto userDto = new UserDto("BAT002", "包雅馨", 25, "财务部", 8800.0, LocalDateTime.of(2016, 11, 10, 8, 30, 0));// 注意这里的通道名【TOPIC_USER】要和RedisMessageListenerContainer里面配置的一致redisTemplate.convertAndSend("TOPIC_USER", JSON.toJSONString(userDto));log.info("发送成功");}}
注意这里的通道名要和 RedisMessageListenerContainer 里配置的一致,不然消息发送不出去的。
8090的日志
onMessage --> 消息通道是:TOPIC_USER
反序列化的结果:{"age":25,"department":"财务部","hireDate":"2016-11-10T08:30:00","id":"BAT002","name":"包雅馨","salary":8800.0}
计算得到的key: c69dc1563cc892718bf3ee0c5b90320b
接收的结果:UserDto(id=BAT002, name=包雅馨, age=25, department=财务部, salary=8800.0, hireDate=2016-11-10T08:30)
处理redis消息完成
8091的日志
onMessage --> 消息通道是:TOPIC_USER
反序列化的结果:{"age":25,"department":"财务部","hireDate":"2016-11-10T08:30:00","id":"BAT002","name":"包雅馨","salary":8800.0}
计算得到的key: c69dc1563cc892718bf3ee0c5b90320b
其他服务处理中
从结果上可以看到,分布式服务都可以接收到消息,但是最终只有一台服务会真正进行业务的处理,因为我这里使用了最简单的分布式锁来控制了,实际上 redisTemplate.opsForValue().setIfAbsent() 并不是最优解,尤其是在集群模式,我这里只是为了演示要有这么一个操作,推荐还是使用 Redisson 去做分布式锁更可靠。如果有不懂的可以参考我之前的文章:SpringBoot基于Redisson实现分布式锁并分析其原理
五、优化
其实我们还可以继续去优化我们的配置,消息的接收都是在频繁的创建线程,从而占用系统资源,我们可以通过线程池的方式去优化,RedisMessageListenerContainer 类中有一个方法setTaskExecutor(Executor taskExecutor)可以为监听容器配置线程池。配置线程池以后,所有的线程都会由该线程池产生,因此我们可以通过调节线程池来控制队列监听的速率。修改步骤大致如下:
5.1、注册任务执行器
RedisConfiguration 新注册Bean:Executor
@Beanpublic Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//设置核心线程数executor.setCorePoolSize(5);//设置最大线程数executor.setMaxPoolSize(10);//设置任务队列容量executor.setQueueCapacity(10);//设置线程活跃时间(秒)executor.setKeepAliveSeconds(60);//设置默认线程名称(线程前缀名称,有助于区分不同线程池之间的线程比如:taskExecutor-)executor.setThreadNamePrefix("taskExecutor-");//设置拒绝策略executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//设置允许核心线程超时executor.setAllowCoreThreadTimeOut(true);return executor;}
5.2、配置任务执行器
redisMessageListenerContainer.setTaskExecutor(taskExecutor())
@Beanpublic RedisMessageListenerContainer getRedisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, RedisMessageListenerListener redisMessageListenerListener) {RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();// 设置连接工厂redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);// 绑定监听器和信道redisMessageListenerContainer.addMessageListener(redisMessageListenerListener, channelTopic());// 配置任务执行器redisMessageListenerContainer.setTaskExecutor(taskExecutor());return redisMessageListenerContainer;}
5.3、启用异步执行器
RedisConfiguration 增加注解@EnableAsync,为了其他操作也能用到异步操作处理。
@Slf4j
@EnableAsync
@Configuration
@EnableCaching
public class RedisConfiguration {//...
}