Redis Stream实现消息队列

Redis Stream实现消息队列

Redis Stream实现消息队列

一、stream简介

Redis Stream 是 Redis 5.0 版本新增加的数据结构。

Redis Stream 主要用于消息队列(MQ,Message Queue),Redis 本身是有一个 Redis 发布订阅 (pub/sub) 来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅 (pub/sub) 可以分发消息,但无法记录历史消息。

而 Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。

Redis Stream 的结构如下所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容:

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建

上图解析:

  • Consumer Group :消费组,使用 XGROUP CREATE 命令创建,一个消费组有多个消费者(Consumer)。
  • last_delivered_id :游标,每个消费组会有个游标 last_delivered_id,任意一个消费者读取了消息都会使游标last_delivered_id 往前移动。
  • pending_ids :消费者(Consumer)的状态变量,作用维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack (Acknowledge character:确认字符)。

消息队列相关命令:

XADD - 添加消息到末尾
XTRIM - 对流进行修剪,限制长度
XDEL - 删除消息
XLEN - 获取流包含的元素数量,即消息长度
XRANGE - 获取消息列表,会自动过滤已经删除的消息
XREVRANGE - 反向获取消息列表,ID 从大到小
XREAD - 以阻塞或非阻塞方式获取消息列表

消费者组相关命令:

XGROUP CREATE - 创建消费者组
XREADGROUP GROUP - 读取消费者组中的消息
XACK - 将消息标记为"已处理"
XGROUP SETID - 为消费者组设置新的最后递送消息ID
XGROUP DELCONSUMER - 删除消费者
XGROUP DESTROY - 删除消费者组
XPENDING - 显示待处理消息的相关信息
XCLAIM - 转移消息的归属权
XINFO - 查看流和消费者组的相关信息;
XINFO GROUPS - 打印消费者组的信息;
XINFO STREAM - 打印流信息

二、问题

1.怎么避免消息丢失?

为了解决组内消息读取但处理期间消费者崩溃带来的消息丢失问题,Stream 设计了 Pending 列表,用于记录读取但并未处理完毕的消息。命令 XPENDIING 用来获消费组或消费内消费者的未处理完毕的消息

2.Streams 消息太多了怎么办?

  • stream本身有做自动清理操作,当消息接近100*100条时,stream会将之前的消息清除

  • 设置stream的上限,超过这个上限的时候会清除多余的

  • 设置定时任务,定时清理stream中的数据,XTRIM命令

3.死信问题

每个Pending的消息有4个属性

  • 消息ID

  • 所属消费者

  • IDLE,已读取时长

  • delivery counter,消息被读取次数

写一个定时任务,每5秒读取一次pending,获取没有被ACK的数据,

此时合一获取到此条消息的”已读取时间””消息被读取次数”

如果消息超过60 秒还没有被消费(可自定义)且消息被读取次数为1 ,我们就可以考虑转组,

如果消息被读数为2或者超过2,说明已经转过组,还没有被消费,我们就默认有问题,

另发送消息通知管理员,把改消息ACK或者从pending删除

三、消息发送流程

消息发送逻辑图

消息发送流程图

1.pms将原始数据发送到msg_parse_stream队列

2.pms监听msg_parse_stream队列,解析生成消息

3.pms将完整的消息发送到msg_data_stream队列

4.notice监听msg_parse_stream队列,将消息发送

5.notice将发送完成的信息放到msg_record_stream队列

6.pms监听msg_record_stream队列,记录消息发送记录

四、实现

1、基础配置

1.1、配置Redis

密码

配置文件中的参数:requirepass ,就是配置redis访问密码的参数;

#默认情况下,是注释的

requirepass xxxx;

远程连接

可利用搜索功能 找到 bind 127.0.0.1 -::1 ,把这一行注释掉

找到 protected-mode yes 把 yes 改为 no

监听

notify-keyspace-events AKEx ,设置监听全部

1.2、导入jar包

注意:spring boot和fastjson的版本,spring boot版本要2.3.0以上,fastjson版本要1.2.79

    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.3.12.RELEASE</version>    </parent>    <groupId>com.zm</groupId>    <artifactId>redis-stream</artifactId>    <version>1.0-SNAPSHOT</version>    <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>org.projectlombok</groupId>            <artifactId>lombok</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-data-redis</artifactId>        </dependency>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.79</version>        </dependency>    </dependencies>
1.3、yaml文件
spring:  redis:    host: 127.0.0.1    port: 6379    password:    database: 0    lettuce:      pool:        max-active: 8        max-idle: 8        max-wait: -1ms        min-idle: 0    timeout: 5000ms     redisstream:  parse_stream: msg_parse_stream  parse_group_one: msg_parse_group_one  parse_consumer_one: msg_parse_consumer_one  parse_consumer_two: msg_parse_consumer_two  data_stream: msg_data_stream  data_group_one: msg_data_group_one  data_consumer_one: msg_data_consumer_one  data_consumer_two: msg_data_consumer_two  record_stream: msg_record_stream  record_group_one: msg_record_group_one  record_consumer_one: msg_record_consumer_one  record_consumer_two: msg_record_consumer_two   
1.4、设置Redis序列化与反序列化
@Configuration@AutoConfigureAfter(RedisAutoConfiguration.class)public class RedisConfig extends CachingConfigurerSupport {    /**     * 操作模板类     */    @Bean("redisTemplate")    public <T> RedisTemplate<String, T> getRedisTemplate(RedisConnectionFactory redisConnectionFactory) {        RedisTemplate<String, T> template = new RedisTemplate<>();        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();        GenericFastJsonRedisSerializer genericFastJsonRedisSerializer = new GenericFastJsonRedisSerializer();        //序列化        template.setKeySerializer(stringRedisSerializer);        template.setHashKeySerializer(stringRedisSerializer);        template.setValueSerializer(genericFastJsonRedisSerializer);        template.setHashValueSerializer(genericFastJsonRedisSerializer);        template.setDefaultSerializer(genericFastJsonRedisSerializer);        template.setConnectionFactory(redisConnectionFactory);        template.afterPropertiesSet();        return template;    }}
1.5、获取配置文件中的数据
@Data@Component@ConfigurationProperties(prefix = "redisstream")public class RedisStreamConfig {    /**     * 解析消息流     */    private String parseStream;    private String parseGroupOne;    private String parseConsumerOne;    private String parseConsumerTwo;    /**     * 消息数据流     */    private String dataStream;    private String dataGroupOne;    private String dataConsumerOne;    private String dataConsumerTwo;    /**     * 消息记录流     */    private String recordStream;    private String recordGroupOne;    private String recordConsumerOne;    private String recordConsumerTwo;    }

2、stream配置

下面以消息解析队列的创建为例:

2.1、创建相关的监听类

msg_parse_stream消息解析队列主要是用于消息原始数据的解析,生成消息

监听类是有几个要监听的stream流,就创建几个,一般生产者需要两个,一个监听消息解析队列,一个监听消息记录队列

例:ListenerMsgParseStream类

这里类需要实现StreamListener接口,该接口下只有一个要实现的方法——onMessage方法,代码:

/** * @ClassName ListenerMsgParseStream * @Description 监听消息类--监听 msg_parse_stream流 * @Author wk * @DATE 2022/4/21 10:35 * @Company 杭州震墨科技有限公司 **/@Slf4j@Componentpublic class ListenerMsgParseStream implements StreamListener<String, MapRecord<String, String, String>> {    @Autowired    private RedisStreamService<String> redisStreamService;    @Autowired    private RedisStreamConfig redisStreamConfig;    @Autowired    private MsgParseQueueService msgParseQueueService;    @SneakyThrows    @Override    public void onMessage(MapRecord<String, String, String> entries) {        log.info("接受到来自redis的消息,message_id = {},stream = {},body = {}",entries.getId(),entries.getStream(),entries.getValue());        //解析数据,推送到消息数据队列        Boolean parseStatus = msgParseQueueService.parseMsgData(entries.getValue());        if (parseStatus){            // 消费完成后手动确认消费ACK            redisStreamService.ack(entries.getStream(), redisStreamConfig.getParseGroupOne(),entries.getId().getValue());        }    }}
2.2、将消费者监听类绑定到相应的stream流上
/** * @ClassName RedisStreamConfig * @Description 将消费者监听类绑定到相应的stream流上 * 生产者绑定 msg_parse_stream流--未解析的消息 * msg_record_stream流--发送后的消息 * @Author wk * @DATE 2022/4/15 14:27 * @Company 杭州震墨科技有限公司 **/@Configurationpublic class ProducerParseConfig {    @Autowired    private RedisStreamConfig redisStreamConfig;    @Autowired    private RedisStreamService<String> redisStreamService;    @Autowired    private ListenerMsgParseStream listenerMsgParseStream;    @Autowired    private ListenerMsgParseStream2 listenerMsgParseStream2;    /**     * 描述: 构建流读取请求     *     * @param     * @return org.springframework.data.redis.stream.Subscription     * @author wangke     * @date 2022/4/15 22:27     */    private StreamMessageListenerContainer.ConsumerStreamReadRequest<String> Construct(String key, String group, String consumerName) {        //初始化stream和group        redisStreamService.initStream(key, group);        //指定消费最新消息        StreamOffset<String> offset = StreamOffset.create(key, ReadOffset.lastConsumed());        //创建消费者        Consumer consumer = Consumer.from(group, consumerName);        return StreamMessageListenerContainer.StreamReadRequest                .builder(offset)                .errorHandler((error) -> {})                .cancelOnError(e -> false)                .consumer(consumer)                .autoAcknowledge(false) //不自动ACK确认                .build();    }    /**     * 描述: 解析消息队列 的订阅者1     *     * @param     * @return org.springframework.data.redis.stream.Subscription     * @author wangke     * @date   2022/4/15 22:27     */    @Bean    public Subscription subscriptionWithParseMsg(RedisConnectionFactory factory){        //创建容器        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer                .StreamMessageListenerContainerOptions                .builder()                .pollTimeout(Duration.ofSeconds(5))                .build();        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);        //构建流读取请求        StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerOne());        //将监听类绑定到相应的stream流上        Subscription subscription = listenerContainer.register(build, listenerMsgParseStream);        //启动监听        listenerContainer.start();        return subscription;    }    /**     * 描述: 解析消息队列 的订阅者2     *     * @param     * @return org.springframework.data.redis.stream.Subscription     * @author wangke     * @date   2022/4/15 22:27     */    @Bean    public Subscription subscriptionWithParseMsg2(RedisConnectionFactory factory){        //创建容器        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer                .StreamMessageListenerContainerOptions                .builder()                .pollTimeout(Duration.ofSeconds(1))                .build();        StreamMessageListenerContainer<String, MapRecord<String, String, String>> listenerContainer = StreamMessageListenerContainer.create(factory, options);        //构建流读取请求        StreamMessageListenerContainer.ConsumerStreamReadRequest<String> build = this.Construct(redisStreamConfig.getParseStream(),redisStreamConfig.getParseGroupOne(), redisStreamConfig.getParseConsumerTwo());        //将监听类绑定到相应的stream流上        Subscription subscription = listenerContainer.register(build, listenerMsgParseStream2);        //启动监听        listenerContainer.start();        return subscription;    }}
2.3、死信问题
2.3.1、判断

思路:

每5秒获取一次某个消费者组的没有ACK的消息,若消息已读时长超过60 秒的且被读取次数==1,则进行转组操作,否则通知管理员并手动ACK或者删除(看需求自己选择)

    @Autowired    private RedisStreamService<String> redisStreamService;    @Autowired    private RedisStreamConfig redisStreamConfig;    @Autowired    private HandleDeadLetter handleDeadLetter;/** * 描述: 定时任务 * 每5秒获取一次msg_data_stream中msg_data_group_one组 pending中没有ACK的消息 * 若消息已读时长超过60 秒的且被读取次数==1,则进行转组操作 * 否则手动ACK并通知管理员 * * @param * @return void * @author wangke * @date 2022/4/19 16:08 */@Scheduled(cron = "0/5 * * * * ?")public void scanPendingMsg() {    //获取group中pending消息,本质上就是执行XPENDING命令    PendingMessagesSummary pendingMessagesSummary = redisStreamService            .readWithPending(redisStreamConfig.getParseStream(),            redisStreamConfig.getParseGroupOne());    //所有pending消息数量    long totalPendingMessages = pendingMessagesSummary.getTotalPendingMessages();    if (totalPendingMessages == 0) {        return;    }    //生成等待转组的数据    Map<String, List<RecordId>> consumerRecordIdMap = deadLetter            .waitChangeConsumerMap(pendingMessagesSummary,            redisStreamConfig.getParseStream());    //最后将待转组的消息进行转组    if (!consumerRecordIdMap.isEmpty()) {        deadLetter.changeConsumer(consumerRecordIdMap,redisStreamConfig.getParseStream(), redisStreamConfig.getParseGroupOne());    }}

DeadLetter类

/** * @ClassName DeadLetter * @Description 处理死信问题 * @Author wk * @DATE 2022/4/19 11:41 * @Company 杭州震墨科技有限公司 **/@Slf4j@Componentpublic class DeadLetter {    private DeadLetter() {    }    private static DeadLetter deadLetter;    static {        deadLetter = new DeadLetter();    }    public static DeadLetter getInstance() {        return deadLetter;    }    @Autowired    private RedisStreamService<String> redisStreamService;    @Autowired    private MsgRecordQueueService msgRecordQueueService;    /**     * 描述: 生成等待转组的数据     *     * @param pendingMessagesSummary     * @return java.util.Map<java.lang.String, java.util.List < org.springframework.data.redis.connection.stream.RecordId>>     * @author wangke     * @date 2022/4/19 16:37     */    public Map<String, List<RecordId>> waitChangeConsumerMap(PendingMessagesSummary pendingMessagesSummary, String key) {        //消费者组名称        String groupName = pendingMessagesSummary.getGroupName();        //pending队列中最小id        String minMessageId = pendingMessagesSummary.minMessageId();        //pending队列中最大id        String maxMessageId = pendingMessagesSummary.maxMessageId();        //获取每个消费者的pending消息数量        Map<String, Long> pendingMessagesPerConsumer = pendingMessagesSummary.getPendingMessagesPerConsumer();        //待转组的消息        Map<String, List<RecordId>> consumerRecordIdMap = new HashMap<>();        //遍历每个消费者pending消息        pendingMessagesPerConsumer.entrySet().forEach(entry -> {            //带转组的recordId            List<RecordId> list = new ArrayList<>();            //消费者            String consumer = entry.getKey();            Long consumerPendingMessages = entry.getValue();            log.info("消费者:{},一共有{}条pending消息", consumer, consumerPendingMessages);            if (consumerPendingMessages > 0) {                //读取消费者pending队列前10 条记录,从id = 0的记录开始,一直到最大值                PendingMessages pendingMessages = redisStreamService.readWithPending(key, Consumer.from(groupName, consumer));                //遍历pending详情                pendingMessages.forEach(message -> {                    //消息的id                    RecordId recordId = message.getId();                    //消息已读时长(消息从消费组中获取,到此刻的时间)                    Duration elapsedTimeSinceLastDelivery = message.getElapsedTimeSinceLastDelivery();                    //消息被读取次数(消息被获取的次数)                    long deliveryCount = message.getTotalDeliveryCount();                    //判断是否是超过60 秒没有消费                    if (elapsedTimeSinceLastDelivery.getSeconds() > 60) {                        //判断消息被读取次数是否为 1次                        if (1 == deliveryCount) {                            //进行转组                            list.add(recordId);                        } else {                            //手动确认并记录异常                            log.info("手动ACK消息,并记录异常,id={},elapsedTimeSinceLastDelivery={},deliveryCount{}", recordId, elapsedTimeSinceLastDelivery, deliveryCount);                            msgRecordQueueService.saveErrorMsgRecord(key,recordId);                            redisStreamService.ack(key,groupName,recordId.getValue());                        }                    }                });                if (list.size() > 0) {                    consumerRecordIdMap.put(consumer, list);                }            }        });        return consumerRecordIdMap;    }    /**     * 描述: 对消息进行转组     *     * @param consumerRecordIdMap     * @return void     * @author wangke     * @date 2022/4/19 16:12     */    public void changeConsumer(Map<String, List<RecordId>> consumerRecordIdMap, String key, String group) {        consumerRecordIdMap.entrySet().forEach(entry -> {            //根据当前consumer获取新的consumer  命令 XINFO CONSUMERS mystream mygroup            String oldConsumer = entry.getKey();            StreamInfo.XInfoConsumers consumers = redisStreamService.getConsumers(key, group);            if (consumers.size()<0){                log.info("转组失败:{}组没有消费者",group);                handleFailureMsg(key,group,entry.getValue());                return;            }            String[] newConsumer = {""};            for (int i = 0; i <consumers.size(); i++) {                if (!oldConsumer.equals(consumers.get(i).consumerName())){                    newConsumer[0] = consumers.get(i).consumerName();                    break;                }            }            if (newConsumer[0].equals("")){                log.info("转组失败:{}组没有其他消费者",group);                handleFailureMsg(key,group,entry.getValue());                return;            }            List<RecordId> recordIds = entry.getValue();            //转组            List<ByteRecord> retVal = (List<ByteRecord>) redisStreamService.getStringRedisTemplate().execute(new RedisCallback<List<ByteRecord>>() {                @Override                public List<ByteRecord> doInRedis(RedisConnection redisConnection) throws DataAccessException {                    // 相当于执行XCLAIM操作,批量将某一个consumer中的消息转到另外一个consumer中                    return redisConnection.streamCommands().xClaim(key.getBytes(),                            group, newConsumer[0], minIdle(Duration.ofSeconds(10)).ids(recordIds));                }            });            if (retVal.size()>0){                for (ByteRecord byteRecord : retVal) {                    log.info("改了消息的消费者:id={}, value={},newConsumer={}", byteRecord.getId(), byteRecord.getValue(), newConsumer[0]);                }            }        });    }    /**     * 描述:处理转组失败的消息,手动ack     *     * @param     * @return     * @author wangke     * @date   2022/4/22 17:36     */    private void handleFailureMsg(String key, String group, List<RecordId> recordIds){        for (RecordId recordId : recordIds) {            //记录并ACK            msgRecordQueueService.saveErrorMsgRecord(key,recordId);            redisStreamService.ack(key,group,recordId.getValue());        }    }}
2.3.2、处理

思路:

每5秒获取一次某个消费者组的没有ACK的消息,主要消费那些转组过来的消息,如果转组次数大于1,则进行尝试消费

    @Autowired    private RedisStreamService<String> redisStreamService;    @Autowired    private RedisStreamConfig redisStreamConfig;    @Autowired    private HandleDeadLetter handleDeadLetter;/** * 描述:  每隔5秒钟,扫描一下有没有等待自己消费的 *       主要消费那些转组过来的消息,如果转组次数大于1,则进行尝试消费 * * @param * @return void * @author wangke * @date   2022/4/20 14:07 */@Scheduled(cron = "0/5 * * * * ?")public void handleMsg() {    /*从消费者的pending队列中读取消息,能够进到这里面的,一定是非业务异常,例如接口超时、服务器宕机等。    对于业务异常,例如字段解析失败等,丢进异常表或者redis*/    PendingMessages pendingMessages = redisStreamService.readWithPending(redisStreamConfig.getParseStream(),            Consumer.from(redisStreamConfig.getParseGroupOne(),                    redisStreamConfig.getParseConsumerOne()));    //消费消息    handleDeadLetter.consumptionMsg(pendingMessages,redisStreamConfig.getParseStream());}

HandleDeadLetter类

/** * @ClassName hanld * @Description 处理 * @Author wk * @DATE 2022/4/20 13:56 * @Company 杭州震墨科技有限公司 **/@Slf4j@Componentpublic class HandleDeadLetter {    private HandleDeadLetter() {}    private static HandleDeadLetter handleDeadLetter;    static {        handleDeadLetter = new HandleDeadLetter();    }    public static HandleDeadLetter getInstance() {        return handleDeadLetter;    }    @Autowired    private RedisStreamService<String> redisStreamService;    @Autowired    private MsgParseQueueService msgParseQueueService;    @Autowired    private MsgDataQueueService msgDataQueueService;    @Autowired    private MsgRecordQueueService msgRecordQueueService;    /**     * 描述:  消费消息     *    主要消费那些转组过来的消息,如果转组次数大于1,则进行尝试消费     *     * @param pendingMessages     * @return void     * @author wangke     * @date 2022/4/20 14:06     */    public void consumptionMsg(PendingMessages pendingMessages, String key) {        if (pendingMessages.size() > 0) {            pendingMessages.forEach(pendingMessage -> {                // 最后一次消费到现在的间隔                Duration elapsedTimeSinceLastDelivery = pendingMessage.getElapsedTimeSinceLastDelivery();                String groupName = pendingMessage.getGroupName();                String consumerName = pendingMessage.getConsumerName();                // 转组次数                long totalDeliveryCount = pendingMessage.getTotalDeliveryCount();                // 只消费转组次数大于1次的                if (totalDeliveryCount > 1) {                    try {                        RecordId id = pendingMessage.getId();                        //获取消息列表,会自动过滤已经删除的消息                        List<MapRecord<String, String, String>> result = redisStreamService.getMsgList(key, Range.rightOpen(id.toString(), id.toString()));                        MapRecord<String, String, String> entries = result.get(0);                        // 消费消息                        log.info("获取到转组的消息,消费了该消息id={}, 消息value={}, 消费者={}", entries.getId(), entries.getValue(),consumerName);                        //处理业务                        this.handleBusiness(key,entries.getValue());                        // 手动ack消息                        redisStreamService.ack(groupName, entries);                    } catch (Exception e) {                        // 异常处理                        e.printStackTrace();                    }                }            });        }    }    /**     * 描述: 处理业务     *     * @param key     * @param value     * @return void     * @author wangke     * @date   2022/4/20 17:35     */    private void handleBusiness(String key, Map<String, String> value) {        //根据key的不同选择不同的业务进行处理,同监听类中的业务处理方法        switch (key){            case RedisStreamConstants.MSG_PARSE_STREAM:                msgParseQueueService.saveMsgData(value);                redisStreamService.insertStreamAll(RedisStreamConstants.MSG_DATA_STREAM, value);                break;            case RedisStreamConstants.MSG_DATA_STREAM:                msgDataQueueService.sendMsg(value);                redisStreamService.insertStreamAll(RedisStreamConstants.MSG_RECORD_STREAM, value);                break;            case RedisStreamConstants.MSG_RECORD_STREAM:                msgRecordQueueService.saveMsgRecord(value);                break;            default:                break;        }    }}

3、相关服务

3.1、BaseQueueService
/** * @InterfaceName BaseQueueService * @Description 队列基础服务类 * @Author wk * @DATE 2022/4/21 14:35 * @Company 杭州震墨科技有限公司 **/public interface BaseQueueService {    /**     * 描述: 从Redis获取消息配置信息     *     * @param key     * @return com.zm.msg.model.MsgConfig     * @author wangke     * @date 2022/4/21 14:38     */    MsgConfig getMsgConfigByKey(String key);    /**     * 描述: 替换模板中的动态字段,返回组装好的消息内容     *     * @param msg 消息模板     * @param obj 模板消息数据实体     * @return     * @throws Exception 反射获取get,set方法失败     * @author wangke     * @date 2022/4/21 14:55     */    String buildContent(String msg, Object obj) throws Exception;}
3.2、MsgParseQueueService
/** * @InterfaceName MsgParseQueueService * @Description 消息解析队列服务类--待解析的消息 * @Author wk * @DATE 2022/4/21 10:15 * @Company 杭州震墨科技有限公司 **/public interface MsgParseQueueService {    /**     * 描述: 添加消息到parse_stream     *     * @param configKey config的key     * @return void     * @author wangke     * @date   2022/4/21 16:19     */    void saveMsgData(String configKey);    /**     * 描述: 添加消息到parse_stream     *     * @param value     * @return void     * @author wangke     * @date   2022/4/21 16:19     */    void saveMsgData(Map<String, String> value);        /**     * 描述: 解析数据,根据模板生成消息     *      * @param value     * @return 解析成功返回true     * @throws Exception 反射获取方法失败     * @author wangke     * @date   2022/4/21 13:53     */    Boolean parseMsgData(Map<String, String> value) throws Exception;}
3.3、MsgDataQueueService
/** * @InterfaceName MsgDataQueueService * @Description 消息数据队列服务类--解析后待发送的消息 * @Author wk * @DATE 2022/4/21 10:13 * @Company 杭州震墨科技有限公司 **/public interface MsgDataQueueService {    /**     * 描述: 添加消息到date_stream     *     * @param value     * @return void     * @author wangke     * @date   2022/4/21 16:19     */    void sendMsg(Map<String, String> value);    /**     * 描述: 添加消息到date_stream     *     * @param model msgConfigKey,msgContent,msgCreateUser,msgSendUser,sendDingDing     * @return void     * @throws Exception 创建消息参数缺失,未配置消息模板     * @author wangke     * @date   2022/4/21 16:19     */    void sendMsg(ContentVO model);    }
3.4、MsgRecordQueueService
/** * @InterfaceName MsgRecordQueueService * @Description 消息记录队列服务类--发送后的消息 * @Author wk * @DATE 2022/4/21 10:15 * @Company 杭州震墨科技有限公司 **/public interface MsgRecordQueueService {    /**     * 描述: 保存消息记录     *     * @param value 消息     * @return 记录成功返回true     * @throws Exception 分离用户id出错     * @author wangke     * @date 2022/4/22 8:37     */    Boolean saveMsgRecord(Map<String, String> value);    /**     * 描述: 记录发送失败的消息     *     * @param key stream_key     * @param recordId stream_id     * @return void     * @author wangke     * @date 2022/4/24 14:38     */    void saveErrorMsgRecord(String key, RecordId recordId);}
免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部