网络协议之:基于UDP的高速数据传输
887 2023-04-03 03:57:28
整体交互发送时序图如下:
/* * Instantiate with a producer group name. */DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
@Overridepublic SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { Validators.checkMessage(msg, this); //a msg.setTopic(withNamespace(msg.getTopic())); //b return this.defaultMQProducerImpl.send(msg); //c}
public static String wrapNamespace(String namespace, String resourceWithOutNamespace) { if (StringUtils.isEmpty(namespace) || StringUtils.isEmpty(resourceWithOutNamespace)) { return resourceWithOutNamespace; } if (isSystemResource(resourceWithOutNamespace) || isAlreadyWithNamespace(resourceWithOutNamespace, namespace)) { return resourceWithOutNamespace; } String resourceWithoutRetryAndDLQ = withOutRetryAndDLQ(resourceWithOutNamespace); StringBuffer strBuffer = new StringBuffer(); if (isRetryTopic(resourceWithOutNamespace)) { //判断是否为重试队列 strBuffer.append(MixAll.RETRY_GROUP_TOPIC_PREFIX); } if (isDLQTopic(resourceWithOutNamespace)) { //判断是否为死信队列 strBuffer.append(MixAll.DLQ_GROUP_TOPIC_PREFIX); } return strBuffer.append(namespace).append(NAMESPACE_SEPARATOR).append(resourceWithoutRetryAndDLQ).toString();}
以上源码:
a处验证消息和topic是否为空
b处通过nameSpace进行判断该topic是否为特定的消息类型(重试消息或者死信消息类型的TOPIC)开头,若是并且对应的NameSpace不为空,在原来的topic基础上拼接namespace。NamespaceUtil#wrapNamespace()方法。
c处发送同步消息,DefaultMQProducer#send(Message) 对 DefaultMQProducerImpl#send(Message) 进行封装。
/** * DEFAULT SYNC ------------------------------------------------------- */public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return send(msg, this.defaultMQProducer.getSendMsgTimeout());}public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);}private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 校验 Producer 处于运行状态 this.makeSureStateOK(); // 校验消息格式 Validators.checkMessage(msg, this.defaultMQProducer); // 调用编号;用于下面打印日志,标记为同一次发送消息 final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 获取 Topic路由信息 <a> TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; // 最后选择消息要发送到的队列实例 Exception exception = null; SendResult sendResult = null; // 最后一次发送结果 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步几次调用,同步和异步情况下默认为3次 int times = 0; //第几次发送 String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名 // 循环调用timesTotal次数发送消息,直到成功 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择消息要发送到的队列 <b> MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 调用发送消息核心方法 <c> sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // 更新Broker可用性信息,在选择发送到的消息队列时,会参考Broker发送消息的延迟 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) {// 打印异常,更新Broker可用性信息,更新继续循环 //当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) {// 打印异常,更新Broker可用性信息,继续循环 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) {// 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环 endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { //以下几种情况全部继续重试发送消息 case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default:// 如果有发送结果,进行返回,否则,抛出异常 if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } // 若发送结果不为空,返回发送结果 if (sendResult != null) { return sendResult; } // 根据不同情况,抛出不同的异常 String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } // Namesrv找不到异常 List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList(); if (null == nsList || nsList.isEmpty()) { throw new MQClientException( "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION); } // 消息路由找不到异常 throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}
以上两个send方法对sendsendDefaultImpl封装。
<b>处调用MQFaultStrategy#selectOneMessageQueue()方法。<c>处调用发送消息核心方法。
继续深入<a>处获取Topic路由信息方法 :
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { // 从缓存中获取 Topic发布信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); // 当无可用的 Topic发布信息时,从Namesrv获取一次 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } // 若获取的 Topic发布信息时候可用,则返回 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //当从 Namesrv 无法获取时,DefaultMQProducer#createTopicKey对应的Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; }}
获得 Topic发布信息。优先从缓存topicPublishInfoTable,若获取不到则从Namesrv中获得。
继续深入<b>处选择消息要发送到的队列,MQFaultStrategy类MQFaultStrategy的类图:
public class MQFaultStrategy { private final static InternalLogger log = ClientLogger.getLog(); // 延迟故障容错,维护每个Broker的发送消息的延迟 private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl(); // 发送消息延迟容错开关 private boolean sendLatencyFaultEnable = false; // 延迟级别数组 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // 不可用时长数组 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; public long[] getNotAvailableDuration() { return notAvailableDuration; } public void setNotAvailableDuration(final long[] notAvailableDuration) { this.notAvailableDuration = notAvailableDuration; } public long[] getLatencyMax() { return latencyMax; } public void setLatencyMax(final long[] latencyMax) { this.latencyMax = latencyMax; } public boolean isSendLatencyFaultEnable() { return sendLatencyFaultEnable; } public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) { this.sendLatencyFaultEnable = sendLatencyFaultEnable; } /** * @Description 根据Topic路由信息 选择一个消息队列 * @param tpInfo * @param lastBrokerName * @return org.apache.rocketmq.common.message.MessageQueue **/ public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { try { // 获取 brokerName=lastBrokerName 并且 可用的一个消息队列 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 选择一个消息队列,不考虑队列的可用性 return tpInfo.selectOneMessageQueue(); } // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性,未开启容错策略选择消息队列逻辑 return tpInfo.selectOneMessageQueue(lastBrokerName); } /** * @Description 更新延迟容错信息 * @param brokerName * @param currentLatency * @param isolation * @return void **/ public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } /** * @Description 计算延迟对应的不可用时间 * @param currentLatency * @return long **/ private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }}
Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。
若开启容错的策略。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
#updateFaultItem方法更新延迟容错信息。当 Producer发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMax,notAvailableDuration的配置,对应如下:
继续看延迟故障容错的接口和实现:
//延迟故障容错接口public interface LatencyFaultTolerance<T> { /** * 更新对应的延迟和不可用时长 * @param name 对象 * @param currentLatency 延迟 * @param notAvailableDuration 不可用时长 */ void updateFaultItem(final T name, final long currentLatency, final long notAvailableDuration); /** * 对象是否可用 * @param name 对象 * @return 对象 */ boolean isAvailable(final T name); /** * 移除对象 * @param name 对象 */ void remove(final T name); /** * 获取一个对象 * @return 对象 */ T pickOneAtLeast();}
// 延迟故障容错实现。维护每个对象的信息public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { // 对象故障信息Table private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16); // 对象选择Index private final ThreadLocalIndex whichItemWorst = new ThreadLocalIndex(); @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); } return true; } @Override public void remove(final String name) { this.faultItemTable.remove(name); } /** * 选择一个相对优秀的对象 * @return */ @Override public String pickOneAtLeast() { // 创建数组 final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } if (!tmpList.isEmpty()) { //先打乱再排序 Collections.shuffle(tmpList); Collections.sort(tmpList); // 选择顺序在前一半的对象 final int half = tmpList.size() / 2; if (half <= 0) { return tmpList.get(0).getName(); } else { final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; } @Override public String toString() { return "LatencyFaultToleranceImpl{" + "faultItemTable=" + faultItemTable + ", whichItemWorst=" + whichItemWorst + '}'; } // 对象故障信息。维护对象的名字、延迟、开始可用的时间 class FaultItem implements Comparable<FaultItem> { // 对象名 private final String name; // 延迟 private volatile long currentLatency; // 开始可用时间 private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } /** * 比较对象 * 可用性 > 延迟 > 开始可以时间 * @param other * @return */ @Override public int compareTo(final FaultItem other) { if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; } /** * 是否可用:当开始可用时间大于当前时间 * @return */ public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; } @Override public int hashCode() { int result = getName() != null ? getName().hashCode() : 0; result = 31 * result + (int) (getCurrentLatency() ^ (getCurrentLatency() >>> 32)); result = 31 * result + (int) (getStartTimestamp() ^ (getStartTimestamp() >>> 32)); return result; } @Override public boolean equals(final Object o) { if (this == o) return true; if (!(o instanceof FaultItem)) return false; final FaultItem faultItem = (FaultItem) o; if (getCurrentLatency() != faultItem.getCurrentLatency()) return false; if (getStartTimestamp() != faultItem.getStartTimestamp()) return false; return getName() != null ? getName().equals(faultItem.getName()) : faultItem.getName() == null; } @Override public String toString() { return "FaultItem{" + "name='" + name + '\'' + ", currentLatency=" + currentLatency + ", startTimestamp=" + startTimestamp + '}'; } public String getName() { return name; } public long getCurrentLatency() { return currentLatency; } public void setCurrentLatency(final long currentLatency) { this.currentLatency = currentLatency; } public long getStartTimestamp() { return startTimestamp; } public void setStartTimestamp(final long startTimestamp) { this.startTimestamp = startTimestamp; } }}
继续看调用发送消息核心方法DefaultMQProducerImpl#sendKernelImpl()方法:
// 发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker。private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 获取broker地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; if (brokerAddr != null) { // 是否使用broker vip通道,broker会开启两个端口对外服务 brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //对于MessageBatch,已在生成过程中设置了ID if (!(msg instanceof MessageBatch)) { // 若不是批量发送则设置唯一编号 MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } // 消息压缩 int sysFlag = 0; boolean msgBodyCompressed = false; if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } // 事务消息 final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // 发送消息校验 if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); this.executeCheckForbiddenHook(checkForbiddenContext); } // 发送消息前逻辑 if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } // 构建发送消息请求 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTopic(msg.getTopic()); requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); requestHeader.setQueueId(mq.getQueueId()); requestHeader.setSysFlag(sysFlag); requestHeader.setBornTimestamp(System.currentTimeMillis()); requestHeader.setFlag(msg.getFlag()); requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); requestHeader.setBatch(msg instanceof MessageBatch); if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } // 跟据不同的消息模式发送消息 SendResult sendResult = null; switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } // 发送消息后置逻辑 if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } // 返回发送结果 return sendResult; } catch (RemotingException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (MQBrokerException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } catch (InterruptedException e) { if (this.hasSendMessageHook()) { context.setException(e); this.executeSendMessageHookAfter(context); } throw e; } finally { msg.setBody(prevBody); msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } } // broker为空抛出异常 throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null); }
接收时序图:
对应的类图:
//处理接收的消息请求@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (request.getCode()) { case RequestCode.CONSUMER_SEND_MSG_BACK: return this.consumerSendMsgBack(ctx, request); default: //解析请求 SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } // 发送Context,在hook场景下使用 mqtraceContext = buildMsgContext(ctx, requestHeader); // hook:处理发送消息前逻辑 this.executeSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; // 处理发送消息逻辑 if (requestHeader.isBatch()) { response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = this.sendMessage(ctx, request, mqtraceContext, requestHeader); } // hook:处理发送消息后逻辑 this.executeSendMessageHookAfter(response, mqtraceContext); return response; }}// 发送消息,并返回发送消息结果private RemotingCommand sendMessage(final ChannelHandlerContext ctx, final RemotingCommand request, final SendMessageContext sendMessageContext, final SendMessageRequestHeader requestHeader) throws RemotingCommandException { //初始化创建响应 final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class); final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader(); response.setOpaque(request.getOpaque()); response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId()); response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn())); log.debug("receive SendMessage request command, {}", request); // 如果未开始接收消息,抛出系统异常 final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp(); if (this.brokerController.getMessageStore().now() < startTimstamp) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp))); return response; } // 消息配置(Topic配置)校验 <a> response.setCode(-1); super.msgCheck(ctx, requestHeader, response); if (response.getCode() != -1) { return response; } final byte[] body = request.getBody(); // 如果队列编号小于0,从可用队列随机选择 int queueIdInt = requestHeader.getQueueId(); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); } // 创建MessageExtBrokerInner MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(requestHeader.getTopic()); msgInner.setQueueId(queueIdInt); //处理是否重试和死信 if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) { return response; } msgInner.setBody(body); msgInner.setFlag(requestHeader.getFlag()); MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties())); msgInner.setPropertiesString(requestHeader.getProperties()); msgInner.setBornTimestamp(requestHeader.getBornTimestamp()); msgInner.setBornHost(ctx.channel().remoteAddress()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes()); PutMessageResult putMessageResult = null; Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties()); // 校验是否不允许发送事务消息 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); // <b> } // 添加消息 return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);}private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response, RemotingCommand request, MessageExt msg, TopicConfig topicConfig) { // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入死信队列(Dead Letter Queue) String newTopic = requestHeader.getTopic(); if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 获取订阅分组配置 String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName); if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark( "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return false; } // 计算最大可消费次数 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { // 超过最大消费次数 newTopic = MixAll.getDLQTopic(groupName); int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 ); msg.setTopic(newTopic); msg.setQueueId(queueIdInt); if (null == topicConfig) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("topic[" + newTopic + "] not exist"); return false; } } } int sysFlag = requestHeader.getSysFlag(); if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) { sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG; } msg.setSysFlag(sysFlag); return true;}private RemotingCommand handlePutMessageResult(PutMessageResult putMessageResult, RemotingCommand response, RemotingCommand request, MessageExt msg, SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext, ChannelHandlerContext ctx, int queueIdInt) { if (putMessageResult == null) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("store putMessage return null"); return response; } boolean sendOK = false; switch (putMessageResult.getPutMessageStatus()) { // Success case PUT_OK: sendOK = true; response.setCode(ResponseCode.SUCCESS); break; case FLUSH_DISK_TIMEOUT: response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT); sendOK = true; break; case FLUSH_SLAVE_TIMEOUT: response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT); sendOK = true; break; case SLAVE_NOT_AVAILABLE: response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE); sendOK = true; break; // Failed case CREATE_MAPEDFILE_FAILED: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("create mapped file failed, server is busy or broken."); break; case MESSAGE_ILLEGAL: case PROPERTIES_SIZE_EXCEEDED: response.setCode(ResponseCode.MESSAGE_ILLEGAL); response.setRemark( "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k."); break; case SERVICE_NOT_AVAILABLE: response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE); response.setRemark( "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small."); break; case OS_PAGECACHE_BUSY: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while"); break; case UNKNOWN_ERROR: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR"); break; default: response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UNKNOWN_ERROR DEFAULT"); break; } String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER); if (sendOK) { // 统计 this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1); this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes()); this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum()); // 响应 response.setRemark(null); responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); responseHeader.setQueueId(queueIdInt); responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset()); doResponse(ctx, request, response); // hook:设置发送成功到context if (hasSendMessageHook()) { sendMessageContext.setMsgId(responseHeader.getMsgId()); sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } // 响应给 Producer 可能发生异常,#doResponse(ctx, request, response)已经进行返回。若发生异常直接打印日志方便排查 Broker 接收消息成功后响应是否存在异常 return null; } else { // hook:设置发送失败到context if (hasSendMessageHook()) { int wroteSize = request.getBody().length; int incValue = (int)Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE); sendMessageContext.setCommercialSendTimes(incValue); sendMessageContext.setCommercialSendSize(wroteSize); sendMessageContext.setCommercialOwner(owner); } } return response;}
sendMessage和sendBatchMessage处理逻辑基本一致,只是sendBatchMessage不支持对RETRY类型的消息的处理和事务消息处理。
继续深入<a>处的#msgCheck方法:
// 校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。protected RemotingCommand msgCheck(final ChannelHandlerContext ctx, final SendMessageRequestHeader requestHeader, final RemotingCommand response) { // 检查 broker 是否有写入权限 if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission()) && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden"); return response; } // 检查topic是否可以被发送。目前是{@link MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC}不被允许发送 if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) { String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words."; log.warn(errorMsg); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorMsg); return response; } TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (null == topicConfig) { // 不存在topicConfig,则进行创建 int topicSysFlag = 0; if (requestHeader.isUnitMode()) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } else { topicSysFlag = TopicSysFlag.buildSysFlag(true, false); } } log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress()); // 跟据requestHeader和ctx创建topic配置 topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod( requestHeader.getTopic(), requestHeader.getDefaultTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getDefaultTopicQueueNums(), topicSysFlag); if (null == topicConfig) { if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } } // 若还是没配置,则报错提示进行手动创建topic配置 //创建会存在不成功的情况,例如说:defaultTopic的Topic配置不存在,又或者存在但是不允许继承 if (null == topicConfig) { response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!" + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } } // 队列编号是否正确 int queueIdInt = requestHeader.getQueueId(); int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums()); if (queueIdInt >= idValid) { String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s", queueIdInt, topicConfig.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(errorInfo); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(errorInfo); return response; } return response;}
校验消息是否正确,主要是Topic配置方面,例如:Broker 是否有写入权限,topic配置是否存在,队列编号是否正确。
继续深入<b>处DefaultMessageStore#putMessage方法:
/** * @Description 存储消息封装,最终存储需要 CommitLog 实现。 * @param msg * @return org.apache.rocketmq.store.PutMessageResult **/public PutMessageResult putMessage(MessageExtBrokerInner msg) { if (this.shutdown) { log.warn("message store has shutdown, so putMessage is forbidden"); return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } // 从节点不允许写入 if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is slave mode, so putMessage is forbidden "); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } // store是否允许写入 if (!this.runningFlags.isWriteable()) { long value = this.printTimes.getAndIncrement(); if ((value % 50000) == 0) { log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits()); } return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null); } else { this.printTimes.set(0); } // 消息的topic过长 if (msg.getTopic().length() > Byte.MAX_VALUE) { log.warn("putMessage message topic length too long " + msg.getTopic().length()); return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null); } // 消息的附加属性过长 if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) { log.warn("putMessage message properties length too long " + msg.getPropertiesString().length()); return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null); } // 操作系统缓存页是否繁忙 if (this.isOSPageCacheBusy()) { return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); } long beginTime = this.getSystemClock().now(); // 添加消息到commitLog中 PutMessageResult result = this.commitLog.putMessage(msg); long eclipseTime = this.getSystemClock().now() - beginTime; if (eclipseTime > 500) { log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length); } this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime); if (null == result || !result.isOk()) { this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); } return result;}
存储消息的封装,最终存储需要从CommitLog 实现。