消息存储流程
入口: org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest
消息到达broker后会经过netty的解码、消息处理器等,最后根据不同的消息类型(普通消息,事务消息)有不同的处理,在对普通消息进行处理时,会对延时消息进行额外的处理(ack消息的延时级别大于0,所以会按照延时消息进行处理)
在将消息追加到commitlog中时, 会加锁进行。在写入消息前会按照固定的格式进行编码, 然后会判断文件剩余大小是否足够 消息长度 + 8字节(4字节文件剩余大小,4字节文件尾魔数), 若不够, 则在文件末尾写入8字节的文件结束的数据, 并新建一个文件, 将消息储存.
若空间足够, 则向末尾写入消息.
写入完消息后, 执行刷盘操作和主从复制操作
消息处理器
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;final int opaque = cmd.getOpaque();if (pair != null) {// 创建处理请求的runnableRunnable run = new Runnable() {@Overridepublic void run() {try {// 前置钩子doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);// 创建响应回调函数final RemotingResponseCallback callback = new RemotingResponseCallback() {@Overridepublic void callback(RemotingCommand response) {// 执行后置钩子方法doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);if (!cmd.isOnewayRPC()) {if (response != null) {response.setOpaque(opaque);response.markResponseType();try {ctx.writeAndFlush(response);} catch (Throwable e) {// 省略。。。 }} else {// resp为空,不做任何处理,也不返回}}}};/*** 1. 接收消息的处理器为 SendMessageProcessor* 2. 处理消息拉取请求的 PullMessageProcessor*//*** 如果是异步处理器,即AsyncNettyRequestProcessor* 则执行asyncProcessRequest方法,实际上,目前为止rocketmq的处理器都是AsyncNettyRequestProcessor类型的* 但是asyncProcessRequest只有SendMessageProcessor进行了重写,而默认的方法实际上还是同步的方法processRequest*/if (pair.getObject1() instanceof AsyncNettyRequestProcessor) {// 异步处理AsyncNettyRequestProcessor processor = (AsyncNettyRequestProcessor)pair.getObject1();processor.asyncProcessRequest(ctx, cmd, callback);} else {// 同步处理NettyRequestProcessor processor = pair.getObject1();RemotingCommand response = processor.processRequest(ctx, cmd);callback.callback(response);}} catch (Throwable e) {// 省略。。。 }}};// 省略。。。 try {// 提交处理请求的runnablefinal RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);pair.getObject2().submit(requestTask);} catch (RejectedExecutionException e) {// 省略。。。 }}// 省略。。。
}
/*** 消息处理器入口,接收消息经过此处理器* @param ctx* @param request* @return* @throws RemotingCommandException*/
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {// 消费者发送的ACK消息case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default:SendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {// 批量处理消息return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {// 单条消息return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}
}
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,SendMessageContext mqtraceContext,SendMessageRequestHeader requestHeader) {// 创建响应对象,包括自动创建topic的逻辑final RemotingCommand response = preSend(ctx, request, requestHeader);final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();if (response.getCode() != -1) {return CompletableFuture.completedFuture(response);}final byte[] body = request.getBody();int queueIdInt = requestHeader.getQueueId();TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (queueIdInt < 0) {queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());}MessageExtBrokerInner msgInner = new MessageExtBrokerInner();msgInner.setTopic(requestHeader.getTopic());msgInner.setQueueId(queueIdInt);// 处理重试和死信队列消息,将会对死信消息替换为死信topicif (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {return CompletableFuture.completedFuture(response);}msgInner.setBody(body);msgInner.setFlag(requestHeader.getFlag());// ....省略CompletableFuture<PutMessageResult> putMessageResult = null;String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);// 如果是事务消息if (transFlag != null && Boolean.parseBoolean(transFlag)) {if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()+ "] sending transaction message is forbidden");return CompletableFuture.completedFuture(response);}putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);} else {// 普通消息putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);}return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
}
消息存储
org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
org.apache.rocketmq.store.CommitLog#asyncPutMessage
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {PutMessageStatus checkStoreStatus = this.checkStoreStatus();if (checkStoreStatus != PutMessageStatus.PUT_OK) {return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));}PutMessageStatus msgCheckStatus = this.checkMessage(msg);if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));}long beginTime = this.getSystemClock().now();// 调用commitLog储存消息CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);putResultFuture.thenAccept((result) -> {long elapsedTime = this.getSystemClock().now() - beginTime;if (elapsedTime > 500) {log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);}this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);if (null == result || !result.isOk()) {this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();}});return putResultFuture;
}
org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {// Set the storage timemsg.setStoreTimestamp(System.currentTimeMillis());// Set the message body BODY CRC (consider the most appropriate setting// on the client)msg.setBodyCRC(UtilAll.crc32(msg.getBody()));// Back to ResultsAppendMessageResult result = null;StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();String topic = msg.getTopic();int queueId = msg.getQueueId();final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {// Delay Delivery// 如果是延时级别大于0,则为延时消息if (msg.getDelayTimeLevel() > 0) {if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}// 设置topic为SCHEDULE_TOPIC_XXXX,表示延时队列的消息topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 备份topic、queueID,如果是重试消息,则是把%RETRY%的topic放入,因为ack消息在这里之前已经把topic换成了新的MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));msg.setTopic(topic);msg.setQueueId(queueId);}}InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();if (bornSocketAddress.getAddress() instanceof Inet6Address) {msg.setBornHostV6Flag();}InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();if (storeSocketAddress.getAddress() instanceof Inet6Address) {msg.setStoreHostAddressV6Flag();}PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();// 将消息编码, 放入encoderBufferPutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);if (encodeResult != null) {return CompletableFuture.completedFuture(encodeResult);}msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));long elapsedTimeInLock = 0;MappedFile unlockMappedFile = null;// 加锁putMessageLock.lock(); //spin or ReentrantLock ,depending on store configtry {// 获取到最后一个可写的MappedFileMappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();this.beginTimeInLock = beginLockTimestamp;// Here settings are stored timestamp, in order to ensure an orderly// globalmsg.setStoreTimestamp(beginLockTimestamp);// 文件为空或者满了,则创建一个if (null == mappedFile || mappedFile.isFull()) {mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise}if (null == mappedFile) {log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));}// 向文件最后追加消息result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// 上面如果文件写满了,会返回这个,下面再创建一个文件进行写入unlockMappedFile = mappedFile;// Create a new file, re-write the messagemappedFile = this.mappedFileQueue.getLastMappedFile(0);if (null == mappedFile) {// XXX: warn and notify melog.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));}result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);break;case MESSAGE_SIZE_EXCEEDED:case PROPERTIES_SIZE_EXCEEDED:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));case UNKNOWN_ERROR:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));default:beginTimeInLock = 0;return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));}elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;beginTimeInLock = 0;} finally {putMessageLock.unlock();}if (elapsedTimeInLock > 500) {log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);}if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {this.defaultMessageStore.unlockMappedFile(unlockMappedFile);}PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);// StatisticsstoreStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());// 刷盘CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);// 主从复制CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {if (flushStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(flushStatus);}if (replicaStatus != PutMessageStatus.PUT_OK) {putMessageResult.setPutMessageStatus(replicaStatus);if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) {log.error("do sync transfer other node, wait return, but failed, topic: {} tags: {} client address: {}",msg.getTopic(), msg.getTags(), msg.getBornHostNameString());}}return putMessageResult;});
}
追加消息:org.apache.rocketmq.store.MappedFile#appendMessagesInner
org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;// 获取当前的写入指针位置int currentPos = this.wrotePosition.get();if (currentPos < this.fileSize) {// 文件没被写满// 获取mappedFile 在内存中的映射ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();byteBuffer.position(currentPos);AppendMessageResult result;if (messageExt instanceof MessageExtBrokerInner) {// 单条消息result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBatch) {result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}this.wrotePosition.addAndGet(result.getWroteBytes());this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {// 省略事务消息。。。ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();// 获取消息整体长度(前四个字节为长度)final int msgLen = preEncodeBuffer.getInt(0);// Determines whether there is sufficient free space// 确定是否有足够的可用空间, maxBlank 是 传进来的“this.fileSize - currentPos”, 为剩余大小// 确定消息长度 + 8字节是否大于剩余大小。因为要有8字节存储:4字节的剩余大小,4字节的魔数if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {// 如果没有足够的空间this.msgStoreItemMemory.clear();// 1 TOTALSIZEthis.msgStoreItemMemory.putInt(maxBlank);// 2 MAGICCODEthis.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);// 3 The remaining space may be any value// Here the length of the specially set maxBlankfinal long beginTimeMills = CommitLog.this.defaultMessageStore.now();// 向最后写入4字节剩余大小,4字节的文件尾魔数byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);return new AppendMessageResult(AppendMessageStatus.END_OF_FILE,wroteOffset,maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */msgIdSupplier,msgInner.getStoreTimestamp(),queueOffset,CommitLog.this.defaultMessageStore.now() - beginTimeMills);}int pos = 4 + 4 + 4 + 4 + 4;// 6 QUEUEOFFSETpreEncodeBuffer.putLong(pos, queueOffset);pos += 8;// 7 PHYSICALOFFSETpreEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;// 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMPpos += 8 + 4 + 8 + ipLen;// refresh store time stamp in lockpreEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());final long beginTimeMills = CommitLog.this.defaultMessageStore.now();// Write messages to the queue buffer// 向内存中写入消息byteBuffer.put(preEncodeBuffer);msgInner.setEncodedBuff(null);AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);// 省略。。。return result;
}
消息编码
org.apache.rocketmq.store.CommitLog.MessageExtEncoder#encode(org.apache.rocketmq.store.MessageExtBrokerInner)
// Initialization of storage space 重置buffer
this.resetByteBuffer(encoderBuffer, msgLen);
// 1 TOTALSIZE 消息长度 4字节
this.encoderBuffer.putInt(msgLen);
// 2 MAGICCODE 魔数 4字节
this.encoderBuffer.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC 消息体CRC 4字节
this.encoderBuffer.putInt(msgInner.getBodyCRC());
// 4 QUEUEID queueId 4字节
this.encoderBuffer.putInt(msgInner.getQueueId());
// 5 FLAG 消息flag 4字节
this.encoderBuffer.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET, need update later,消息在consumeQueue的偏移量,需要后续填充 8字节
this.encoderBuffer.putLong(0);
// 7 PHYSICALOFFSET, need update later,消息在commitlog的偏移量,需要后续填充 8字节
this.encoderBuffer.putLong(0);
// 8 SYSFLAG 消息系统flag,例如是否压缩、是否是事务消息 4字节
this.encoderBuffer.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP producer调用消息发送API的时间戳 8字节
this.encoderBuffer.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST producer的IP和端口号 8字节
socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
// 11 STORETIMESTAMP 消息存储时间戳 8字节
this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS broker的IP和端口号 8字节
socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
// 13 RECONSUMETIMES 消息重试次数 4字节
this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset 事务消息物理偏移量 8字节
this.encoderBuffer.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY 消息体长度 4字节
this.encoderBuffer.putInt(bodyLength);
// 消息体
if (bodyLength > 0)this.encoderBuffer.put(msgInner.getBody());
// 16 TOPIC topic名称长度 1字节
this.encoderBuffer.put((byte) topicLength);
// topic名
this.encoderBuffer.put(topicData);
// 17 PROPERTIES 消息属性长度 2字节
this.encoderBuffer.putShort((short) propertiesLength);
// 消息属性
if (propertiesLength > 0)this.encoderBuffer.put(propertiesData);
encoderBuffer.flip();