每日一题
889 2023-04-03 04:46:46
Redis Stream 是 Redis 5.0 版本新增加的数据结构。
Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。
简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。
而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
上图解析:
消息队列相关命令:
XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表
消费者组相关命令:
XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息
为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,Stream 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令 XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息
设置stream的上限,超过这个上限的时候会清除多余的
设置定时任务,定时清理stream中的数据,XTRIM命令
每个Pending的消息有4个属性:
消息ID
所属消费者
IDLE,已读取时长
delivery counter,消息被读取次数
写一个定时任务,每5秒读取一次pending,获取没有被ACK的数据,
此时合一获取到此条消息的”已读取时间””消息被读取次数”
如果消息超过60 秒还没有被消费(可自定义)且消息被读取次数为1 ,我们就可以考虑转组,
如果消息被读数为2或者超过2,说明已经转过组,还没有被消费,我们就默认有问题,
另发送消息通知管理员,把改消息ACK或者从pending删除
消息发送逻辑图
消息发送流程图
1.pms将原始数据发送到msg_parse_stream
队列
2.pms监听msg_parse_stream
队列,解析生成消息
3.pms将完整的消息发送到msg_data_stream
队列
4.notice监听msg_parse_stream
队列,将消息发送
5.notice将发送完成的信息放到msg_record_stream
队列
6.pms监听msg_record_stream
队列,记录消息发送记录
配置文件中的参数:requirepass ,就是配置redis访问密码的参数;
#默认情况下,是注释的
requirepass xxxx;
远程连接
可利用搜索功能 找到
bind 127.0.0.1 -::1
,把这一行注释掉找到
protected-mode yes
把 yes 改为 no
监听
notify-keyspace-events AKEx ,设置监听全部
注意:spring boot和fastjson的版本,spring boot版本要2.3.0以上,fastjson版本要1.2.79
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> </parent> <groupId>com.zm</groupId> <artifactId>redis-stream</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.79</version> </dependency> </dependencies>
spring: redis: host: 127.0.0.1 port: 6379 password: database: 0 lettuce: pool: max-active: 8 max-idle: 8 max-wait: -1ms min-idle: 0 timeout: 5000ms redisstream: parse_stream: msg_parse_stream parse_group_one: msg_parse_group_one parse_consumer_one: msg_parse_consumer_one parse_consumer_two: msg_parse_consumer_two data_stream: msg_data_stream data_group_one: msg_data_group_one data_consumer_one: msg_data_consumer_one data_consumer_two: msg_data_consumer_two record_stream: msg_record_stream record_group_one: msg_record_group_one record_consumer_one: msg_record_consumer_one record_consumer_two: msg_record_consumer_two
@Configuration@AutoConfigureAfter(RedisAutoConfiguration.class)public class RedisConfig extends CachingConfigurerSupport { /** * 操作模板类 */ @Bean("redisTemplate") public <T> RedisTemplate<String, T> getRedisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, T> template = new RedisTemplate<>(); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); //序列化 template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setValueSerializer(genericFastJsonRedisSerializer); template.setHashValueSerializer(genericFastJsonRedisSerializer); template.setDefaultSerializer(genericFastJsonRedisSerializer); template.setConnectionFactory(redisConnectionFactory); template.afterPropertiesSet(); return template; }}
@Data@Component@ConfigurationProperties(prefix = "redisstream")public class RedisStreamConfig { /** * 解析消息流 */ private String parseStream; private String parseGroupOne; private String parseConsumerOne; private String parseConsumerTwo; /** * 消息数据流 */ private String dataStream; private String dataGroupOne; private String dataConsumerOne; private String dataConsumerTwo; /** * 消息记录流 */ private String recordStream; private String recordGroupOne; private String recordConsumerOne; private String recordConsumerTwo; }
下面以消息解析队列的创建为例:
msg_parse_stream消息解析队列主要是用于消息原始数据的解析,生成消息
监听类是有几个要监听的stream流,就创建几个,一般生产者需要两个,一个监听消息解析队列,一个监听消息记录队列
例:ListenerMsgParseStream类
这里类需要实现StreamListener接口,该接口下只有一个要实现的方法——onMessage方法,代码:
/** * @ClassName ListenerMsgParseStream * @Description 监听消息类--监听 msg_parse_stream流 * @Author wk * @DATE 2022/4/21 10:35 * @Company 杭州震墨科技有限公司 **/@Slf4j@Componentpublic class ListenerMsgParseStream implements StreamListener<String, MapRecord<String, String, String>> { @Autowired private RedisStreamService<String> redisStreamService; @Autowired private RedisStreamConfig redisStreamConfig; @Autowired private MsgParseQueueService msgParseQueueService; @SneakyThrows @Override public void onMessage(MapRecord<String, String, String> entries) { log.info("接受到来自redis的消息,message_id = {},stream = {},body = {}",entries.getId(),entries.getStream(),entries.getValue()); //解析数据,推送到消息数据队列 Boolean parseStatus = msgParseQueueService.parseMsgData(entries.getValue()); if (parseStatus){ // 消费完成后手动确认消费ACK redisStreamService.ack(entries.getStream(), redisStreamConfig.getParseGroupOne(),entries.getId().getValue()); } }}
/** * @ClassName RedisStreamConfig * @Description 将消费者监听类绑定到相应的stream流上 * 生产者绑定 msg_parse_stream流--未解析的消息 * msg_record_stream流--发送后的消息 * @Author wk * @DATE 2022/4/15 14:27 * @Company 杭州震墨科技有限公司 **/@Configurationpublic class ProducerParseConfig { @Autowired private RedisStreamConfig redisStreamConfig; @Autowired private RedisStreamService<String> redisStreamService; @Autowired private ListenerMsgParseStream listenerMsgParseStream; @Autowired private ListenerMsgParseStream2 listenerMsgParseStream2; /** * 描述: 构建流读取请求 * * @param * @return org.springframework.data.redis.stream.Subscription * @author wangke * @date 2022/4/15 22:27 */ private StreamMessageListenerContainer.ConsumerStreamReadRequest<String> Construct(String key, String group, String consumerName) { //初始化stream和group redisStreamService.initStream(key, group); //指定消费最新消息 StreamOffset<String> offset = StreamOffset.create(key, ReadOffset.lastConsumed()); //创建消费者 Consumer consumer = Consumer.from(group, consumerName); return StreamMessageListenerContainer.StreamReadRequest .builder(offset) .errorHandler((error) -> {}) .cancelOnError(e -> false) .consumer(consumer) .autoAcknowledge(false) //不自动ACK确认 .build(); } /** * 描述: 解析消息队列 的订阅者1 * * @param * @return org.springframework.data.redis.stream.Subscription * @author wangke * @date 2022/4/15 22:27 */ @Bean public Subscription subscriptionWithParseMsg(RedisConnectionFactory factory){ //创建容器 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(5)) .build(); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options); //构建流读取请求 StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerOne()); //将监听类绑定到相应的stream流上 Subscription subscription = listenerContainer.register(build, listenerMsgParseStream); //启动监听 listenerContainer.start(); return subscription; } /** * 描述: 解析消息队列 的订阅者2 * * @param * @return org.springframework.data.redis.stream.Subscription * @author wangke * @date 2022/4/15 22:27 */ @Bean public Subscription subscriptionWithParseMsg2(RedisConnectionFactory factory){ //创建容器 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options); //构建流读取请求 StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerTwo()); //将监听类绑定到相应的stream流上 Subscription subscription = listenerContainer.register(build, listenerMsgParseStream2); //启动监听 listenerContainer.start(); return subscription; }}
思路:
每5秒获取一次某个消费者组的没有ACK的消息,若消息已读时长超过60 秒的且被读取次数==1,则进行转组操作,否则通知管理员并手动ACK或者删除(看需求自己选择)
@Autowired private RedisStreamService<String> redisStreamService; @Autowired private RedisStreamConfig redisStreamConfig; @Autowired private HandleDeadLetter handleDeadLetter;/** * 描述: 定时任务 * 每5秒获取一次msg_data_stream中msg_data_group_one组 pending中没有ACK的消息 * 若消息已读时长超过60 秒的且被读取次数==1,则进行转组操作 * 否则手动ACK并通知管理员 * * @param * @return void * @author wangke * @date 2022/4/19 16:08 */@Scheduled(cron = "0/5 * * * * ?")public void scanPendingMsg() { //获取group中pending消息,本质上就是执行XPENDING命令 PendingMessagesSummary pendingMessagesSummary = redisStreamService .readWithPending(redisStreamConfig.getParseStream(), redisStreamConfig.getParseGroupOne()); //所有pending消息数量 long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages(); if (totalPendingMessages == 0) { return; } //生成等待转组的数据 Map<String, List<RecordId>> consumerRecordIdMap = deadLetter .waitChangeConsumerMap(pendingMessagesSummary, redisStreamConfig.getParseStream()); //最后将待转组的消息进行转组 if (!consumerRecordIdMap.isEmpty()) { deadLetter.changeConsumer(consumerRecordIdMap,redisStreamConfig.getParseStream(), redisStreamConfig.getParseGroupOne()); }}
DeadLetter类
/** * @ClassName DeadLetter * @Description 处理死信问题 * @Author wk * @DATE 2022/4/19 11:41 * @Company 杭州震墨科技有限公司 **/@Slf4j@Componentpublic class DeadLetter { private DeadLetter() { } private static DeadLetter deadLetter; static { deadLetter = new DeadLetter(); } public static DeadLetter getInstance() { return deadLetter; } @Autowired private RedisStreamService<String> redisStreamService; @Autowired private MsgRecordQueueService msgRecordQueueService; /** * 描述: 生成等待转组的数据 * * @param pendingMessagesSummary * @return java.util.Map<java.lang.String, java.util.List < org.springframework.data.redis.connection.stream.RecordId>> * @author wangke * @date 2022/4/19 16:37 */ public Map<String, List<RecordId>> waitChangeConsumerMap(PendingMessagesSummary pendingMessagesSummary, String key) { //消费者组名称 String groupName = pendingMessagesSummary.getGroupName(); //pending队列中最小id String minMessageId = pendingMessagesSummary.minMessageId(); //pending队列中最大id String maxMessageId = pendingMessagesSummary.maxMessageId(); //获取每个消费者的pending消息数量 Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer(); //待转组的消息 Map<String, List<RecordId>> consumerRecordIdMap = new HashMap<>(); //遍历每个消费者pending消息 pendingMessagesPerConsumer.entrySet().forEach(entry -> { //带转组的recordId List<RecordId> list = new ArrayList<>(); //消费者 String consumer = entry.getKey(); Long consumerPendingMessages = entry.getValue(); log.info("消费者:{},一共有{}条pending消息", consumer, consumerPendingMessages); if (consumerPendingMessages > 0) { //读取消费者pending队列前10 条记录,从id = 0的记录开始,一直到最大值 PendingMessages pendingMessages = redisStreamService.readWithPending(key, Consumer.from(groupName, consumer)); //遍历pending详情 pendingMessages.forEach(message -> { //消息的id RecordId recordId = message.getId(); //消息已读时长(消息从消费组中获取,到此刻的时间) Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery(); //消息被读取次数(消息被获取的次数) long deliveryCount = message.getTotalDeliveryCount(); //判断是否是超过60 秒没有消费 if (elapsedTimeSinceLastDelivery.getSeconds() > 60) { //判断消息被读取次数是否为 1次 if (1 == deliveryCount) { //进行转组 list.add(recordId); } else { //手动确认并记录异常 log.info("手动ACK消息,并记录异常,id={},elapsedTimeSinceLastDelivery={},deliveryCount{}", recordId, elapsedTimeSinceLastDelivery, deliveryCount); msgRecordQueueService.saveErrorMsgRecord(key,recordId); redisStreamService.ack(key,groupName,recordId.getValue()); } } }); if (list.size() > 0) { consumerRecordIdMap.put(consumer, list); } } }); return consumerRecordIdMap; } /** * 描述: 对消息进行转组 * * @param consumerRecordIdMap * @return void * @author wangke * @date 2022/4/19 16:12 */ public void changeConsumer(Map<String, List<RecordId>> consumerRecordIdMap, String key, String group) { consumerRecordIdMap.entrySet().forEach(entry -> { //根据当前consumer获取新的consumer 命令 XINFO CONSUMERS mystream mygroup String oldConsumer = entry.getKey(); StreamInfo.XInfoConsumers consumers = redisStreamService.getConsumers(key, group); if (consumers.size()<0){ log.info("转组失败:{}组没有消费者",group); handleFailureMsg(key,group,entry.getValue()); return; } String[] newConsumer = {""}; for (int i = 0; i <consumers.size(); i++) { if (!oldConsumer.equals(consumers.get(i).consumerName())){ newConsumer[0] = consumers.get(i).consumerName(); break; } } if (newConsumer[0].equals("")){ log.info("转组失败:{}组没有其他消费者",group); handleFailureMsg(key,group,entry.getValue()); return; } List<RecordId> recordIds = entry.getValue(); //转组 List<ByteRecord> retVal = (List<ByteRecord>) redisStreamService.getStringRedisTemplate().execute(new RedisCallback<List<ByteRecord>>() { @Override public List<ByteRecord> doInRedis(RedisConnection redisConnection) throws DataAccessException { // 相当于执行XCLAIM操作,批量将某一个consumer中的消息转到另外一个consumer中 return redisConnection.streamCommands().xClaim(key.getBytes(), group, newConsumer[0], minIdle(Duration.ofSeconds(10)).ids(recordIds)); } }); if (retVal.size()>0){ for (ByteRecord byteRecord : retVal) { log.info("改了消息的消费者:id={}, value={},newConsumer={}", byteRecord.getId(), byteRecord.getValue(), newConsumer[0]); } } }); } /** * 描述:处理转组失败的消息,手动ack * * @param * @return * @author wangke * @date 2022/4/22 17:36 */ private void handleFailureMsg(String key, String group, List<RecordId> recordIds){ for (RecordId recordId : recordIds) { //记录并ACK msgRecordQueueService.saveErrorMsgRecord(key,recordId); redisStreamService.ack(key,group,recordId.getValue()); } }}
思路:
每5秒获取一次某个消费者组的没有ACK的消息,主要消费那些转组过来的消息,如果转组次数大于1,则进行尝试消费
@Autowired private RedisStreamService<String> redisStreamService; @Autowired private RedisStreamConfig redisStreamConfig; @Autowired private HandleDeadLetter handleDeadLetter;/** * 描述: 每隔5秒钟,扫描一下有没有等待自己消费的 * 主要消费那些转组过来的消息,如果转组次数大于1,则进行尝试消费 * * @param * @return void * @author wangke * @date 2022/4/20 14:07 */@Scheduled(cron = "0/5 * * * * ?")public void handleMsg() { /*从消费者的pending队列中读取消息,能够进到这里面的,一定是非业务异常,例如接口超时、服务器宕机等。 对于业务异常,例如字段解析失败等,丢进异常表或者redis*/ PendingMessages pendingMessages = redisStreamService.readWithPending(redisStreamConfig.getParseStream(), Consumer.from(redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerOne())); //消费消息 handleDeadLetter.consumptionMsg(pendingMessages,redisStreamConfig.getParseStream());}
HandleDeadLetter类
/** * @ClassName hanld * @Description 处理 * @Author wk * @DATE 2022/4/20 13:56 * @Company 杭州震墨科技有限公司 **/@Slf4j@Componentpublic class HandleDeadLetter { private HandleDeadLetter() {} private static HandleDeadLetter handleDeadLetter; static { handleDeadLetter = new HandleDeadLetter(); } public static HandleDeadLetter getInstance() { return handleDeadLetter; } @Autowired private RedisStreamService<String> redisStreamService; @Autowired private MsgParseQueueService msgParseQueueService; @Autowired private MsgDataQueueService msgDataQueueService; @Autowired private MsgRecordQueueService msgRecordQueueService; /** * 描述: 消费消息 * 主要消费那些转组过来的消息,如果转组次数大于1,则进行尝试消费 * * @param pendingMessages * @return void * @author wangke * @date 2022/4/20 14:06 */ public void consumptionMsg(PendingMessages pendingMessages, String key) { if (pendingMessages.size() > 0) { pendingMessages.forEach(pendingMessage -> { // 最后一次消费到现在的间隔 Duration elapsedTimeSinceLastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery(); String groupName = pendingMessage.getGroupName(); String consumerName = pendingMessage.getConsumerName(); // 转组次数 long totalDeliveryCount = pendingMessage.getTotalDeliveryCount(); // 只消费转组次数大于1次的 if (totalDeliveryCount > 1) { try { RecordId id = pendingMessage.getId(); //获取消息列表,会自动过滤已经删除的消息 List<MapRecord<String, String, String>> result = redisStreamService.getMsgList(key, Range.rightOpen(id.toString(), id.toString())); MapRecord<String, String, String> entries = result.get(0); // 消费消息 log.info("获取到转组的消息,消费了该消息id={}, 消息value={}, 消费者={}", entries.getId(), entries.getValue(),consumerName); //处理业务 this.handleBusiness(key,entries.getValue()); // 手动ack消息 redisStreamService.ack(groupName, entries); } catch (Exception e) { // 异常处理 e.printStackTrace(); } } }); } } /** * 描述: 处理业务 * * @param key * @param value * @return void * @author wangke * @date 2022/4/20 17:35 */ private void handleBusiness(String key, Map<String, String> value) { //根据key的不同选择不同的业务进行处理,同监听类中的业务处理方法 switch (key){ case RedisStreamConstants.MSG_PARSE_STREAM: msgParseQueueService.saveMsgData(value); redisStreamService.insertStreamAll(RedisStreamConstants.MSG_DATA_STREAM, value); break; case RedisStreamConstants.MSG_DATA_STREAM: msgDataQueueService.sendMsg(value); redisStreamService.insertStreamAll(RedisStreamConstants.MSG_RECORD_STREAM, value); break; case RedisStreamConstants.MSG_RECORD_STREAM: msgRecordQueueService.saveMsgRecord(value); break; default: break; } }}
/** * @InterfaceName BaseQueueService * @Description 队列基础服务类 * @Author wk * @DATE 2022/4/21 14:35 * @Company 杭州震墨科技有限公司 **/public interface BaseQueueService { /** * 描述: 从Redis获取消息配置信息 * * @param key * @return com.zm.msg.model.MsgConfig * @author wangke * @date 2022/4/21 14:38 */ MsgConfig getMsgConfigByKey(String key); /** * 描述: 替换模板中的动态字段,返回组装好的消息内容 * * @param msg 消息模板 * @param obj 模板消息数据实体 * @return * @throws Exception 反射获取get,set方法失败 * @author wangke * @date 2022/4/21 14:55 */ String buildContent(String msg, Object obj) throws Exception;}
/** * @InterfaceName MsgParseQueueService * @Description 消息解析队列服务类--待解析的消息 * @Author wk * @DATE 2022/4/21 10:15 * @Company 杭州震墨科技有限公司 **/public interface MsgParseQueueService { /** * 描述: 添加消息到parse_stream * * @param configKey config的key * @return void * @author wangke * @date 2022/4/21 16:19 */ void saveMsgData(String configKey); /** * 描述: 添加消息到parse_stream * * @param value * @return void * @author wangke * @date 2022/4/21 16:19 */ void saveMsgData(Map<String, String> value); /** * 描述: 解析数据,根据模板生成消息 * * @param value * @return 解析成功返回true * @throws Exception 反射获取方法失败 * @author wangke * @date 2022/4/21 13:53 */ Boolean parseMsgData(Map<String, String> value) throws Exception;}
/** * @InterfaceName MsgDataQueueService * @Description 消息数据队列服务类--解析后待发送的消息 * @Author wk * @DATE 2022/4/21 10:13 * @Company 杭州震墨科技有限公司 **/public interface MsgDataQueueService { /** * 描述: 添加消息到date_stream * * @param value * @return void * @author wangke * @date 2022/4/21 16:19 */ void sendMsg(Map<String, String> value); /** * 描述: 添加消息到date_stream * * @param model msgConfigKey,msgContent,msgCreateUser,msgSendUser,sendDingDing * @return void * @throws Exception 创建消息参数缺失,未配置消息模板 * @author wangke * @date 2022/4/21 16:19 */ void sendMsg(ContentVO model); }
/** * @InterfaceName MsgRecordQueueService * @Description 消息记录队列服务类--发送后的消息 * @Author wk * @DATE 2022/4/21 10:15 * @Company 杭州震墨科技有限公司 **/public interface MsgRecordQueueService { /** * 描述: 保存消息记录 * * @param value 消息 * @return 记录成功返回true * @throws Exception 分离用户id出错 * @author wangke * @date 2022/4/22 8:37 */ Boolean saveMsgRecord(Map<String, String> value); /** * 描述: 记录发送失败的消息 * * @param key stream_key * @param recordId stream_id * @return void * @author wangke * @date 2022/4/24 14:38 */ void saveErrorMsgRecord(String key, RecordId recordId);}