1.RocketMQ 源码分析 — Message 发送与接收

1.RocketMQ 源码分析 — Message 发送与接收

1.概述

  • Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
  • Broker 接收消息。

整体交互发送时序图如下:

2.Producer 发送消息:

/* * Instantiate with a producer group name. */DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
DefaultMQProducer#send(Message)
@Overridepublic SendResult send(    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    Validators.checkMessage(msg, this);             //a    msg.setTopic(withNamespace(msg.getTopic()));    //b    return this.defaultMQProducerImpl.send(msg);    //c}
public static String wrapNamespace(String namespace, String resourceWithOutNamespace) {        if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) {            return resourceWithOutNamespace;        }        if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) {            return resourceWithOutNamespace;        }        String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace);        StringBuffer strBuffer = new StringBuffer();        if (isRetryTopic(resourceWithOutNamespace)) {   //判断是否为重试队列            strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX);        }        if (isDLQTopic(resourceWithOutNamespace)) {     //判断是否为死信队列            strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX);        }        return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();}

以上源码:

a处验证消息和topic是否为空

b处通过nameSpace进行判断该topic是否为特定的消息类型(重试消息或者死信消息类型的TOPIC)开头,若是并且对应的NameSpace不为空,在原来的topic基础上拼接namespace。NamespaceUtil#wrapNamespace()方法

c处发送同步消息,DefaultMQProducer#send(Message) 对 DefaultMQProducerImpl#send(Message) 进行封装。

DefaultMQProducerImpl#send()
/** * DEFAULT SYNC ------------------------------------------------------- */public SendResult send(    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return send(msg, this.defaultMQProducer.getSendMsgTimeout());}public SendResult send(Message msg,    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}private SendResult sendDefaultImpl(    Message msg,    final CommunicationMode communicationMode,    final SendCallback sendCallback,    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {    // 校验 Producer 处于运行状态    this.makeSureStateOK();    // 校验消息格式    Validators.checkMessage(msg, this.defaultMQProducer);    // 调用编号;用于下面打印日志,标记为同一次发送消息    final long invokeID = random.nextLong();    long beginTimestampFirst = System.currentTimeMillis();    long beginTimestampPrev = beginTimestampFirst;    long endTimestamp = beginTimestampFirst;    // 获取 Topic路由信息     <a>    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());    if (topicPublishInfo != null && topicPublishInfo.ok()) {        boolean callTimeout = false;        MessageQueue mq = null;         // 最后选择消息要发送到的队列实例        Exception exception = null;        SendResult sendResult = null;   // 最后一次发送结果        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;   // 同步几次调用,同步和异步情况默认为3次        int times = 0;                                  //第几次发送        String[] brokersSent = new String[timesTotal];  // 存储每次发送消息选择的broker名        // 循环调用timesTotal次数发送消息,直到成功        for (; times < timesTotal; times++) {            String lastBrokerName = null == mq ? null : mq.getBrokerName();            // 选择消息要发送到的队列   <b>            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);            if (mqSelected != null) {                mq = mqSelected;                brokersSent[times] = mq.getBrokerName();                try {                    beginTimestampPrev = System.currentTimeMillis();                    if (times > 0) {                        //Reset topic with namespace during resend.                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));                    }                    long costTime = beginTimestampPrev - beginTimestampFirst;                    if (timeout < costTime) {                        callTimeout = true;                        break;                    }                        // 调用发送消息核心方法     <c>                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);                    endTimestamp = System.currentTimeMillis();                    // 更新Broker可用性信息,在选择发送到的消息队列时,会参考Broker发送消息的延迟                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                    switch (communicationMode) {                        case ASYNC:                            return null;                        case ONEWAY:                            return null;                        case SYNC:                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {                                    continue;                                }                            }                            return sendResult;                        default:                            break;                    }                } catch (RemotingException e) {// 打印异常,更新Broker可用性信息,更新继续循环                    //当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。                    endTimestamp = System.currentTimeMillis();                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                    log.warn(msg.toString());                    exception = e;                    continue;                } catch (MQClientException e) {// 打印异常,更新Broker可用性信息,继续循环                    endTimestamp = System.currentTimeMillis();                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                    log.warn(msg.toString());                    exception = e;                    continue;                } catch (MQBrokerException e) {// 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环                    endTimestamp = System.currentTimeMillis();                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);                    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                    log.warn(msg.toString());                    exception = e;                    switch (e.getResponseCode()) {                        //以下几种情况全部继续重试发送消息                        case ResponseCode.TOPIC_NOT_EXIST:                        case ResponseCode.SERVICE_NOT_AVAILABLE:                        case ResponseCode.SYSTEM_ERROR:                        case ResponseCode.NO_PERMISSION:                        case ResponseCode.NO_BUYER_ID:                        case ResponseCode.NOT_IN_CURRENT_UNIT:                            continue;                        default:// 如果有发送结果,进行返回,否则,抛出异常                            if (sendResult != null) {                                return sendResult;                            }                            throw e;                    }                } catch (InterruptedException e) {                    endTimestamp = System.currentTimeMillis();                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);                    log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);                    log.warn(msg.toString());                    log.warn("sendKernelImpl exception", e);                    log.warn(msg.toString());                    throw e;                }            } else {                break;            }        }        // 若发送结果不为空,返回发送结果        if (sendResult != null) {            return sendResult;        }        // 根据不同情况,抛出不同的异常        String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",            times,            System.currentTimeMillis() - beginTimestampFirst,            msg.getTopic(),            Arrays.toString(brokersSent));        info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);        MQClientException mqClientException = new MQClientException(info, exception);        if (callTimeout) {            throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");        }        if (exception instanceof MQBrokerException) {            mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());        } else if (exception instanceof RemotingConnectException) {            mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);        } else if (exception instanceof RemotingTimeoutException) {            mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);        } else if (exception instanceof MQClientException) {            mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);        }        throw mqClientException;    }    // Namesrv找不到异常    List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();    if (null == nsList || nsList.isEmpty()) {        throw new MQClientException(            "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);    }    // 消息路由找不到异常    throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),        null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}

以上两个send方法对sendsendDefaultImpl封装。

<b>处调用MQFaultStrategy#selectOneMessageQueue()方法。<c>处调用发送消息核心方法。

继续深入<a>处获取Topic路由信息方法 :

DefaultMQProducerImpl#tryToFindTopicPublishInfo()
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {    // 从缓存中获取 Topic发布信息    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);    // 当无可用的 Topic发布信息时,从Namesrv获取一次    if (null == topicPublishInfo || !topicPublishInfo.ok()) {        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);        topicPublishInfo = this.topicPublishInfoTable.get(topic);    }    // 若获取的 Topic发布信息时候可用,则返回    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {        return topicPublishInfo;    } else {        //当从 Namesrv 无法获取时,DefaultMQProducer#createTopicKey对应的Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);        topicPublishInfo = this.topicPublishInfoTable.get(topic);        return topicPublishInfo;    }}

获得 Topic发布信息。优先从缓存topicPublishInfoTable,若获取不到则从Namesrv中获得。

继续深入<b>处选择消息要发送到的队列,MQFaultStrategy类MQFaultStrategy的类图:

MQFaultStrategy
public class MQFaultStrategy {    private final static InternalLogger log = ClientLogger.getLog();    // 延迟故障容错,维护每个Broker的发送消息的延迟    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();    // 发送消息延迟容错开关    private boolean sendLatencyFaultEnable = false;    // 延迟级别数组    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};    // 不可用时长数组    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};    public long[] getNotAvailableDuration() {        return notAvailableDuration;    }    public void setNotAvailableDuration(final long[] notAvailableDuration) {        this.notAvailableDuration = notAvailableDuration;    }    public long[] getLatencyMax() {        return latencyMax;    }    public void setLatencyMax(final long[] latencyMax) {        this.latencyMax = latencyMax;    }    public boolean isSendLatencyFaultEnable() {        return sendLatencyFaultEnable;    }    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {        this.sendLatencyFaultEnable = sendLatencyFaultEnable;    }    /**     * @Description 根据Topic路由信息 选择一个消息队列     * @param       tpInfo     * @param       lastBrokerName     * @return      org.apache.rocketmq.common.message.MessageQueue     **/    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {        if (this.sendLatencyFaultEnable) {            try {                // 获取 brokerName=lastBrokerName 并且 可用的一个消息队列                int index = tpInfo.getSendWhichQueue().getAndIncrement();                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();                    if (pos < 0)                        pos = 0;                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))                            return mq;                    }                }                // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }            // 选择一个消息队列,不考虑队列的可用性            return tpInfo.selectOneMessageQueue();        }        // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性,未开启容错策略选择消息队列逻辑        return tpInfo.selectOneMessageQueue(lastBrokerName);    }    /**     * @Description 更新延迟容错信息     * @param       brokerName     * @param       currentLatency     * @param       isolation     * @return      void     **/    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {        if (this.sendLatencyFaultEnable) {            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);        }    }    /**     * @Description 计算延迟对应的不可用时间     * @param       currentLatency     * @return      long     **/    private long computeNotAvailableDuration(final long currentLatency) {        for (int i = latencyMax.length - 1; i >= 0; i--) {            if (currentLatency >= latencyMax[i])                return this.notAvailableDuration[i];        }        return 0;    }}

Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。

若开启容错的策略。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。

#updateFaultItem方法更新延迟容错信息。当 Producer发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMax,notAvailableDuration的配置,对应如下:

producer发送消息消耗时长Broker不可用时长>= 15000 ms600000ms>= 3000 ms180000 ms>= 2000 ms120000 ms>= 1000 ms60000 ms>= 550 ms30000 ms>= 100 ms0 ms>= 50 ms0 ms

继续看延迟故障容错的接口和实现:

LatencyFaultTolerance
//延迟故障容错接口public interface LatencyFaultTolerance<T> {    /**     * 更新对应的延迟和不可用时长     * @param name  对象     * @param currentLatency 延迟     * @param notAvailableDuration  不可用时长     */    void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration);    /**     * 对象是否可用     * @param name  对象     * @return  对象     */    boolean isAvailable(final T name);    /**     * 移除对象     * @param name 对象     */    void remove(final T name);    /**     * 获取一个对象     * @return  对象     */    T pickOneAtLeast();}
LatencyFaultToleranceImpl
// 延迟故障容错实现。维护每个对象的信息public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {    // 对象故障信息Table    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);    // 对象选择Index    private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex();    @Override    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {        FaultItem old = this.faultItemTable.get(name);        if (null == old) {            final FaultItem faultItem = new FaultItem(name);            faultItem.setCurrentLatency(currentLatency);            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);            old = this.faultItemTable.putIfAbsent(name, faultItem);            if (old != null) {                old.setCurrentLatency(currentLatency);                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);            }        } else {            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    }    @Override    public boolean isAvailable(final String name) {        final FaultItem faultItem = this.faultItemTable.get(name);        if (faultItem != null) {            return faultItem.isAvailable();        }        return true;    }    @Override    public void remove(final String name) {        this.faultItemTable.remove(name);    }    /**     * 选择一个相对优秀的对象     * @return     */    @Override    public String pickOneAtLeast() {        // 创建数组        final Enumeration<FaultItem> elements = this.faultItemTable.elements();        List<FaultItem> tmpList = new LinkedList<FaultItem>();        while (elements.hasMoreElements()) {            final FaultItem faultItem = elements.nextElement();            tmpList.add(faultItem);        }        if (!tmpList.isEmpty()) {            //先打乱再排序            Collections.shuffle(tmpList);            Collections.sort(tmpList);            // 选择顺序在前一半的对象            final int half = tmpList.size() / 2;            if (half <= 0) {                return tmpList.get(0).getName();            } else {                final int i = this.whichItemWorst.getAndIncrement() % half;                return tmpList.get(i).getName();            }        }        return null;    }    @Override    public String toString() {        return "LatencyFaultToleranceImpl{" +            "faultItemTable=" + faultItemTable +            ", whichItemWorst=" + whichItemWorst +            '}';    }    // 对象故障信息。维护对象的名字、延迟、开始可用的时间    class FaultItem implements Comparable<FaultItem> {        // 对象名        private final String name;        // 延迟        private volatile long currentLatency;        // 开始可用时间        private volatile long startTimestamp;        public FaultItem(final String name) {            this.name = name;        }        /**         * 比较对象         * 可用性 > 延迟 > 开始可以时间         * @param other         * @return         */        @Override        public int compareTo(final FaultItem other) {            if (this.isAvailable() != other.isAvailable()) {                if (this.isAvailable())                    return -1;                if (other.isAvailable())                    return 1;            }            if (this.currentLatency < other.currentLatency)                return -1;            else if (this.currentLatency > other.currentLatency) {                return 1;            }            if (this.startTimestamp < other.startTimestamp)                return -1;            else if (this.startTimestamp > other.startTimestamp) {                return 1;            }            return 0;        }        /**         * 是否可用:当开始可用时间大于当前时间         * @return         */        public boolean isAvailable() {            return (System.currentTimeMillis() - startTimestamp) >= 0;        }        @Override        public int hashCode() {            int result = getName() != null ? getName().hashCode() : 0;            result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32));            result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32));            return result;        }        @Override        public boolean equals(final Object o) {            if (this == o)                return true;            if (!(o instanceof FaultItem))                return false;            final FaultItem faultItem = (FaultItem) o;            if (getCurrentLatency() != faultItem.getCurrentLatency())                return false;            if (getStartTimestamp() != faultItem.getStartTimestamp())                return false;            return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null;        }        @Override        public String toString() {            return "FaultItem{" +                "name='" + name + '\'' +                ", currentLatency=" + currentLatency +                ", startTimestamp=" + startTimestamp +                '}';        }        public String getName() {            return name;        }        public long getCurrentLatency() {            return currentLatency;        }        public void setCurrentLatency(final long currentLatency) {            this.currentLatency = currentLatency;        }        public long getStartTimestamp() {            return startTimestamp;        }        public void setStartTimestamp(final long startTimestamp) {            this.startTimestamp = startTimestamp;        }    }}

继续看调用发送消息核心方法DefaultMQProducerImpl#sendKernelImpl()方法:

DefaultMQProducerImpl#sendKernelImpl()
// 发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker。private SendResult sendKernelImpl(final Message msg,                                      final MessageQueue mq,                                      final CommunicationMode communicationMode,                                      final SendCallback sendCallback,                                      final TopicPublishInfo topicPublishInfo,                                      final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        long beginStartTime = System.currentTimeMillis();        // 获取broker地址        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        if (null == brokerAddr) {            tryToFindTopicPublishInfo(mq.getTopic());            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());        }        SendMessageContext context = null;        if (brokerAddr != null) {            // 是否使用broker vip通道,broker会开启两个端口对外服务            brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);            byte[] prevBody = msg.getBody();            try {                //对于MessageBatch,已在生成过程中设置了ID                if (!(msg instanceof MessageBatch)) {                    // 若不是批量发送则设置唯一编号                    MessageClientIDSetter.setUniqID(msg);                }                boolean topicWithNamespace = false;                if (null != this.mQClientFactory.getClientConfig().getNamespace()) {                    msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());                    topicWithNamespace = true;                }                // 消息压缩                int sysFlag = 0;                boolean msgBodyCompressed = false;                if (this.tryToCompressMessage(msg)) {                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;                    msgBodyCompressed = true;                }                // 事务消息                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;                }                // 发送消息校验                if (hasCheckForbiddenHook()) {                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());                    checkForbiddenContext.setCommunicationMode(communicationMode);                    checkForbiddenContext.setBrokerAddr(brokerAddr);                    checkForbiddenContext.setMessage(msg);                    checkForbiddenContext.setMq(mq);                    checkForbiddenContext.setUnitMode(this.isUnitMode());                    this.executeCheckForbiddenHook(checkForbiddenContext);                }                // 发送消息前逻辑                if (this.hasSendMessageHook()) {                    context = new SendMessageContext();                    context.setProducer(this);                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());                    context.setCommunicationMode(communicationMode);                    context.setBornHost(this.defaultMQProducer.getClientIP());                    context.setBrokerAddr(brokerAddr);                    context.setMessage(msg);                    context.setMq(mq);                    context.setNamespace(this.defaultMQProducer.getNamespace());                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);                    if (isTrans != null && isTrans.equals("true")) {                        context.setMsgType(MessageType.Trans_Msg_Half);                    }                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {                        context.setMsgType(MessageType.Delay_Msg);                    }                    this.executeSendMessageHookBefore(context);                }                // 构建发送消息请求                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());                requestHeader.setTopic(msg.getTopic());                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());                requestHeader.setQueueId(mq.getQueueId());                requestHeader.setSysFlag(sysFlag);                requestHeader.setBornTimestamp(System.currentTimeMillis());                requestHeader.setFlag(msg.getFlag());                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));                requestHeader.setReconsumeTimes(0);                requestHeader.setUnitMode(this.isUnitMode());                requestHeader.setBatch(msg instanceof MessageBatch);                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);                    if (reconsumeTimes != null) {                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);                    }                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);                    if (maxReconsumeTimes != null) {                        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);                    }                }                // 跟据不同的消息模式发送消息                SendResult sendResult = null;                switch (communicationMode) {                    case ASYNC:                        Message tmpMessage = msg;                        boolean messageCloned = false;                        if (msgBodyCompressed) {                            //If msg body was compressed, msgbody should be reset using prevBody.                            //Clone new message using commpressed message body and recover origin massage.                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66                            tmpMessage = MessageAccessor.cloneMessage(msg);                            messageCloned = true;                            msg.setBody(prevBody);                        }                        if (topicWithNamespace) {                            if (!messageCloned) {                                tmpMessage = MessageAccessor.cloneMessage(msg);                                messageCloned = true;                            }                            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));                        }                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeAsync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            tmpMessage,                            requestHeader,                            timeout - costTimeAsync,                            communicationMode,                            sendCallback,                            topicPublishInfo,                            this.mQClientFactory,                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),                            context,                            this);                        break;                    case ONEWAY:                    case SYNC:                        long costTimeSync = System.currentTimeMillis() - beginStartTime;                        if (timeout < costTimeSync) {                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");                        }                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(                            brokerAddr,                            mq.getBrokerName(),                            msg,                            requestHeader,                            timeout - costTimeSync,                            communicationMode,                            context,                            this);                        break;                    default:                        assert false;                        break;                }                // 发送消息后置逻辑                if (this.hasSendMessageHook()) {                    context.setSendResult(sendResult);                    this.executeSendMessageHookAfter(context);                }                // 返回发送结果                return sendResult;            } catch (RemotingException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (MQBrokerException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } catch (InterruptedException e) {                if (this.hasSendMessageHook()) {                    context.setException(e);                    this.executeSendMessageHookAfter(context);                }                throw e;            } finally {                msg.setBody(prevBody);                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));            }        }        // broker为空抛出异常        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }

3.Broker接收消息

接收时序图:

对应的类图:

SendMessageProcessor#sendMessage
//处理接收的消息请求@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx,                                      RemotingCommand request) throws RemotingCommandException {    SendMessageContext mqtraceContext;    switch (request.getCode()) {        case RequestCode.CONSUMER_SEND_MSG_BACK:            return this.consumerSendMsgBack(ctx, request);        default:            //解析请求            SendMessageRequestHeader requestHeader = parseRequestHeader(request);            if (requestHeader == null) {                return null;            }            // 发送Context,在hook场景下使用            mqtraceContext = buildMsgContext(ctx, requestHeader);            // hook:处理发送消息前逻辑            this.executeSendMessageHookBefore(ctx, request, mqtraceContext);            RemotingCommand response;            // 处理发送消息逻辑            if (requestHeader.isBatch()) {                response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);            } else {                response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);            }            // hook:处理发送消息后逻辑            this.executeSendMessageHookAfter(response, mqtraceContext);            return response;    }}// 发送消息,并返回发送消息结果private RemotingCommand sendMessage(final ChannelHandlerContext ctx,                                        final RemotingCommand request,                                        final SendMessageContext sendMessageContext,                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {    //初始化创建响应    final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);    final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();    response.setOpaque(request.getOpaque());    response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());    response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));    log.debug("receive SendMessage request command, {}", request);    // 如果未开始接收消息,抛出系统异常    final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();    if (this.brokerController.getMessageStore().now() < startTimstamp) {        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));        return response;    }    // 消息配置(Topic配置)校验         <a>    response.setCode(-1);    super.msgCheck(ctx, requestHeader, response);    if (response.getCode() != -1) {        return response;    }    final byte[] body = request.getBody();    // 如果队列编号小于0,从可用队列随机选择    int queueIdInt = requestHeader.getQueueId();    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());    if (queueIdInt < 0) {        queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();    }    // 创建MessageExtBrokerInner    MessageExtBrokerInner msgInner = new MessageExtBrokerInner();    msgInner.setTopic(requestHeader.getTopic());    msgInner.setQueueId(queueIdInt);    //处理是否重试和死信    if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {        return response;    }    msgInner.setBody(body);    msgInner.setFlag(requestHeader.getFlag());    MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));    msgInner.setPropertiesString(requestHeader.getProperties());    msgInner.setBornTimestamp(requestHeader.getBornTimestamp());    msgInner.setBornHost(ctx.channel().remoteAddress());    msgInner.setStoreHost(this.getStoreHost());    msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());    PutMessageResult putMessageResult = null;    Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());    // 校验是否不允许发送事务消息    String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);    if (traFlag != null && Boolean.parseBoolean(traFlag)) {        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {            response.setCode(ResponseCode.NO_PERMISSION);            response.setRemark(                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()                    + "] sending transaction message is forbidden");            return response;        }        putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);    } else {        putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);      // <b>    }    // 添加消息    return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);}private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,                                      RemotingCommand request,                                      MessageExt msg, TopicConfig topicConfig) {    // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改"%DLQ%" + 分组名,即加入死信队列(Dead Letter Queue)    String newTopic = requestHeader.getTopic();    if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {        // 获取订阅分组配置        String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());        SubscriptionGroupConfig subscriptionGroupConfig =            this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);        if (null == subscriptionGroupConfig) {            response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);            response.setRemark(                "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));            return false;        }        // 计算最大可消费次数        int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();        if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {            maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();        }        int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();        if (reconsumeTimes >= maxReconsumeTimes) {      // 超过最大消费次数            newTopic = MixAll.getDLQTopic(groupName);            int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;            topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,                DLQ_NUMS_PER_GROUP,                PermName.PERM_WRITE, 0            );            msg.setTopic(newTopic);            msg.setQueueId(queueIdInt);            if (null == topicConfig) {                response.setCode(ResponseCode.SYSTEM_ERROR);                response.setRemark("topic[" + newTopic + "] not exist");                return false;            }        }    }    int sysFlag = requestHeader.getSysFlag();    if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {        sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;    }    msg.setSysFlag(sysFlag);    return true;}private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response,                                                   RemotingCommand request, MessageExt msg,                                                   SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx,                                                   int queueIdInt) {    if (putMessageResult == null) {        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark("store putMessage return null");        return response;    }    boolean sendOK = false;    switch (putMessageResult.getPutMessageStatus()) {        // Success        case PUT_OK:            sendOK = true;            response.setCode(ResponseCode.SUCCESS);            break;        case FLUSH_DISK_TIMEOUT:            response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);            sendOK = true;            break;        case FLUSH_SLAVE_TIMEOUT:            response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);            sendOK = true;            break;        case SLAVE_NOT_AVAILABLE:            response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);            sendOK = true;            break;        // Failed        case CREATE_MAPEDFILE_FAILED:            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("create mapped file failed, server is busy or broken.");            break;        case MESSAGE_ILLEGAL:        case PROPERTIES_SIZE_EXCEEDED:            response.setCode(ResponseCode.MESSAGE_ILLEGAL);            response.setRemark(                "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");            break;        case SERVICE_NOT_AVAILABLE:            response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);            response.setRemark(                "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");            break;        case OS_PAGECACHE_BUSY:            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");            break;        case UNKNOWN_ERROR:            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("UNKNOWN_ERROR");            break;        default:            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("UNKNOWN_ERROR DEFAULT");            break;    }    String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);    if (sendOK) {        // 统计        this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);        this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),            putMessageResult.getAppendMessageResult().getWroteBytes());        this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());        // 响应        response.setRemark(null);        responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());        responseHeader.setQueueId(queueIdInt);        responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());        doResponse(ctx, request, response);        // hook:设置发送成功到context        if (hasSendMessageHook()) {            sendMessageContext.setMsgId(responseHeader.getMsgId());            sendMessageContext.setQueueId(responseHeader.getQueueId());            sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());            int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();            int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);            sendMessageContext.setCommercialSendTimes(incValue);            sendMessageContext.setCommercialSendSize(wroteSize);            sendMessageContext.setCommercialOwner(owner);        }                // 响应给 Producer 可能发生异常,#doResponse(ctx, request, response)已经进行返回。若发生异常直接打印日志方便排查 Broker 接收消息成功后响应是否存在异常        return null;    } else {        // hook:设置发送失败到context        if (hasSendMessageHook()) {            int wroteSize = request.getBody().length;            int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);            sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);            sendMessageContext.setCommercialSendTimes(incValue);            sendMessageContext.setCommercialSendSize(wroteSize);            sendMessageContext.setCommercialOwner(owner);        }    }    return response;}

sendMessage和sendBatchMessage处理逻辑基本一致,只是sendBatchMessage不支持对RETRY类型的消息的处理和事务消息处理。

继续深入<a>处的#msgCheck方法:

AbstractSendMessageProcessor#msgCheck
// 校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,    final SendMessageRequestHeader requestHeader, final RemotingCommand response) {    // 检查 broker 是否有写入权限    if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())        && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {        response.setCode(ResponseCode.NO_PERMISSION);        response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()            + "] sending message is forbidden");        return response;    }    // 检查topic是否可以被发送。目前是{@link MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC}不被允许发送    if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {        String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";        log.warn(errorMsg);        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark(errorMsg);        return response;    }    TopicConfig topicConfig =        this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());    if (null == topicConfig) { // 不存在topicConfig,则进行创建        int topicSysFlag = 0;        if (requestHeader.isUnitMode()) {            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);            } else {                topicSysFlag = TopicSysFlag.buildSysFlag(true, false);            }        }        log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());        // 跟据requestHeader和ctx创建topic配置        topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(            requestHeader.getTopic(),            requestHeader.getDefaultTopic(),            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),            requestHeader.getDefaultTopicQueueNums(), topicSysFlag);        if (null == topicConfig) {            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {                topicConfig =                    this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(                        requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,                        topicSysFlag);            }        }        // 若还是没配置,则报错提示进行手动创建topic配置        //创建会存在不成功的情况,例如说:defaultTopic的Topic配置不存在,又或者存在但是不允许继承        if (null == topicConfig) {            response.setCode(ResponseCode.TOPIC_NOT_EXIST);            response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));            return response;        }    }    // 队列编号是否正确    int queueIdInt = requestHeader.getQueueId();    int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());    if (queueIdInt >= idValid) {        String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",            queueIdInt,            topicConfig.toString(),            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));        log.warn(errorInfo);        response.setCode(ResponseCode.SYSTEM_ERROR);        response.setRemark(errorInfo);        return response;    }    return response;}

校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。

继续深入<b>处DefaultMessageStore#putMessage方法:

DefaultMessageStore#putMessage
/** * @Description 存储消息封装,最终存储需要 CommitLog 实现。 * @param       msg * @return      org.apache.rocketmq.store.PutMessageResult **/public PutMessageResult putMessage(MessageExtBrokerInner msg) {    if (this.shutdown) {        log.warn("message store has shutdown, so putMessage is forbidden");        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);    }    // 从节点不允许写入    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {        long value = this.printTimes.getAndIncrement();        if ((value % 50000) == 0) {            log.warn("message store is slave mode, so putMessage is forbidden ");        }        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);    }    // store是否允许写入    if (!this.runningFlags.isWriteable()) {        long value = this.printTimes.getAndIncrement();        if ((value % 50000) == 0) {            log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());        }        return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);    } else {        this.printTimes.set(0);    }    // 消息的topic过长    if (msg.getTopic().length() > Byte.MAX_VALUE) {        log.warn("putMessage message topic length too long " + msg.getTopic().length());        return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);    }    // 消息的附加属性过长    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());        return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);    }    // 操作系统缓存页是否繁忙    if (this.isOSPageCacheBusy()) {        return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);    }    long beginTime = this.getSystemClock().now();    // 添加消息到commitLog中    PutMessageResult result = this.commitLog.putMessage(msg);    long eclipseTime = this.getSystemClock().now() - beginTime;    if (eclipseTime > 500) {        log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);    }    this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);    if (null == result || !result.isOk()) {        this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();    }    return result;}

存储消息的封装,最终存储需要从CommitLog 实现。

免责声明:本网信息来自于互联网,目的在于传递更多信息,并不代表本网赞同其观点。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,并请自行核实相关内容。本站不承担此类作品侵权行为的直接责任及连带责任。如若本网有任何内容侵犯您的权益,请及时联系我们,本站将会在24小时内处理完毕。
相关文章
返回顶部