多条告白如次剧本只需引入一次
正文重要领会RocketMQ中怎样保护动静无序的。
RocketMQ的本子为:4.2.0release。
一.时序图
仍旧老规则,先把领会进程的时序图摆出来:
1.Producer发送程序动静
2.Consumer接受程序动静(一)
3.Consumer接受程序动静(二)
二.源码领会–Producer发送程序动静
1DefaultMQProducer#send:发送动静,入参中有自设置的动静部队采用器。
//DefaultMQProducer#sendpublicSendResultsend(Messagemsg,MessageQueueSelectorselector,Objectarg)throwsMQClientException,RemotingException,MQBrokerException,InterruptedException{returnthis.defaultMQProducerImpl.send(msg,selector,arg);}1.1DefaultMQProducerImpl#makeSureStateOK:保证Producer的状况是运奇迹态-ServiceState.RUNNING。
//DefaultMQProducerImpl#makeSureStateOKprivatevoidmakeSureStateOK()throwsMQClientException{if(this.serviceState!=ServiceState.RUNNING){thrownewMQClientException("TheproducerservicestatenotOK,"+this.serviceState+FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);}}1.2DefaultMQProducerImpl#tryToFindTopicPublishInfo:按照Topic获得颁布Topic用到的路由消息。
//DefaultMQProducerImpl#tryToFindTopicPublishInfoprivateTopicPublishInfotryToFindTopicPublishInfo(finalStringtopic){TopicPublishInfotopicPublishInfo=this.topicPublishInfoTable.get(topic);if(null==topicPublishInfo||!topicPublishInfo.ok()){this.topicPublishInfoTable.putIfAbsent(topic,newTopicPublishInfo());this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);//为空则从NameServer革新获得,false,不传入defaultMQProducertopicPublishInfo=this.topicPublishInfoTable.get(topic);}if(topicPublishInfo.isHaveTopicRouterInfo()||topicPublishInfo.ok()){//有了路由消息并且状况OK,则归来returntopicPublishInfo;}else{this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic,true,this.defaultMQProducer);topicPublishInfo=this.topicPublishInfoTable.get(topic);returntopicPublishInfo;}}1.3挪用自设置动静部队采用器的select本领。
//DefaultMQProducerImpl#sendSelectImplMessageQueuemq=null;try{mq=selector.select(topicPublishInfo.getMessageQueueList(),msg,arg);}catch(Throwablee){thrownewMQClientException("selectmessagequeuethrowedexception.",e);}//Producer#mainSendResultsendResult=producer.send(msg,newMessageQueueSelector(){@OverridepublicMessageQueueselect(List<MessageQueue>mqs,Messagemsg,Objectarg){Integerid=(Integer)arg;intindex=id%mqs.size();returnmqs.get(index);}},orderId);1.4DefaultMQProducerImpl#sendKernelImpl:发送动静的中心实行本领。
//DefaultMQProducerImpl#sendKernelImpl......switch(communicationMode){caseSYNC:longcostTimeSync=System.currentTimeMillis()-beginStartTime;if(timeout<costTimeSync){thrownewRemotingTooMuchRequestException("sendKernelImplcalltimeout");}sendResult=this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout-costTimeSync,communicationMode,context,this);break;......1.4.1MQClientAPIImpl#sendMessage:发送动静。
//MQClientAPIImpl#sendMessage......switch(communicationMode){//按照发送动静的形式(同步/异步)采用各别的办法,默许是同步caseSYNC:longcostTimeSync=System.currentTimeMillis()-beginStartTime;if(timeoutMillis<costTimeSync){thrownewRemotingTooMuchRequestException("sendMessagecalltimeout");}returnthis.sendMessageSync(addr,brokerName,msg,timeoutMillis-costTimeSync,request);......1.4.1.1MQClientAPIImpl#sendMessageSync:发送同步动静。
//MQClientAPIImpl#sendMessageSyncprivateSendResultsendMessageSync(finalStringaddr,finalStringbrokerName,finalMessagemsg,finallongtimeoutMillis,finalRemotingCommandrequest)throwsRemotingException,MQBrokerException,InterruptedException{RemotingCommandresponse=this.remotingClient.invokeSync(addr,request,timeoutMillis);assertresponse!=null;returnthis.processSendResponse(brokerName,msg,response);}1.4.1.1.1NettyRemotingClient#invokeSync:结构RemotingCommand,挪用的办法是同步。
//NettyRemotingClient#invokeSyncRemotingCommandresponse=this.invokeSyncImpl(channel,request,timeoutMillis-costTime);if(this.rpcHook!=null){this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel),request,response);}returnresponse;三.源码领会–Consumer接受程序动静(一)
1DefaultMQPushConsumer#registerMessageListener:把Consumer传入的动静监听器介入到messageListener中。
//DefaultMQPushConsumer#registerMessageListenerpublicvoidregisterMessageListener(MessageListenerOrderlymessageListener){this.messageListener=messageListener;this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);}1.1DefaultMQPushConsumerImpl#registerMessageListener:把Consumer传入的动静监听器介入到messageListenerInner中。
//DefaultMQPushConsumerImpl#registerMessageListenerpublicvoidregisterMessageListener(MessageListenermessageListener){this.messageListenerInner=messageListener;}2DefaultMQPushConsumer#start:启用Consumer。
//DefaultMQPushConsumer#startpublicvoidstart()throwsMQClientException{this.defaultMQPushConsumerImpl.start();}2.1DefaultMQPushConsumerImpl#start:启用ConsumerImpl。
//DefaultMQPushConsumerImpl#startswitch(this.serviceState){caseCREATE_JUST://方才创造......if(this.getMessageListenerInner()instanceofMessageListenerOrderly){//无序动静效劳this.consumeOrderly=true;this.consumeMessageService=newConsumeMessageOrderlyService(this,(MessageListenerOrderly)this.getMessageListenerInner());}elseif(this.getMessageListenerInner()instanceofMessageListenerConcurrently){//并发无序动静效劳this.consumeOrderly=false;this.consumeMessageService=newConsumeMessageConcurrentlyService(this,(MessageListenerConcurrently)this.getMessageListenerInner());}......this.consumeMessageService.start();//启用动静效劳......mQClientFactory.start();//启用MQClientInstance......2.1.1newConsumeMessageOrderlyService():结构程序动静效劳。
//ConsumeMessageOrderlyService#ConsumeMessageOrderlyServicepublicConsumeMessageOrderlyService(DefaultMQPushConsumerImpldefaultMQPushConsumerImpl,MessageListenerOrderlymessageListener){this.defaultMQPushConsumerImpl=defaultMQPushConsumerImpl;this.messageListener=messageListener;this.defaultMQPushConsumer=this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();this.consumerGroup=this.defaultMQPushConsumer.getConsumerGroup();this.consumeRequestQueue=newLinkedBlockingQueue<Runnable>();this.consumeExecutor=newThreadPoolExecutor(//主动静耗费线程池,平常实行收到的ConsumeRequest。多线程this.defaultMQPushConsumer.getConsumeThreadMin(),this.defaultMQPushConsumer.getConsumeThreadMax(),1000*60,TimeUnit.MILLISECONDS,this.consumeRequestQueue,newThreadFactoryImpl("ConsumeMessageThread_"));this.scheduledExecutorService=Executors.newSingleThreadScheduledExecutor(newThreadFactoryImpl("ConsumeMessageScheduledThread_"));}2.1.2ConsumeMessageOrderlyService#start:启用动静部队存户端范例。
//DefaultMQPushConsumerImpl#startthis.consumeMessageService.start();//ConsumeMessageOrderlyService#startpublicvoidstart(){if(MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())){this.scheduledExecutorService.scheduleAtFixedRate(newRunnable(){@Overridepublicvoidrun(){ConsumeMessageOrderlyService.this.lockMQPeriodically();//准时向broker发送批量锁住暂时正在耗费的部队汇合的动静}},1000*1,ProcessQueue.REBALANCE_LOCK_INTERVAL,TimeUnit.MILLISECONDS);}}2.1.2.1ConsumeMessageOrderlyService#lockMQPeriodically:准时向broker发送批量锁住暂时正在耗费的部队汇合的动静。
2.1.2.1.1RebalanceImpl#lockAll:锁住一切正在动静的部队。
//ConsumeMessageOrderlyService#lockMQPeriodicallyif(!this.stopped){this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();}//RebalanceImpl#lockAllHashMap<String,Set<MessageQueue>>brokerMqs=this.buildProcessQueueTableByBrokerName();//按照brokerName从processQueueTable获得正在耗费的部队汇合......Set<MessageQueue>lockOKMQSet=this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(),requestBody,1000);//向Broker发送锁住动静部队的训令for(MessageQueuemq:lockOKMQSet){ProcessQueueprocessQueue=this.processQueueTable.get(mq);if(processQueue!=null){if(!processQueue.isLocked()){log.info("themessagequeuelockedOK,Group:{}{}",this.consumerGroup,mq);}processQueue.setLocked(true);processQueue.setLastLockTimestamp(System.currentTimeMillis());}}......2.1.3MQClientInstance#start:启用MQClientInstance。进程较搀杂,放到大题目四中领会。
//DefaultMQPushConsumerImpl#startmQClientFactory.start();四.源码领会–Consumer接受程序动静(二)
1MQClientInstance#start:启用存户端范例MQClientInstance。
//MQClientInstance#startsynchronized(this){switch(this.serviceState){caseCREATE_JUST:......//Startpullservice启用拉废除息效劳this.pullMessageService.start();//Startrebalanceservice启用耗费端负载平衡效劳this.rebalanceService.start();......1.1PullMessageService#run:启用拉废除息效劳。本质挪用的是DefaultMQPushConsumerImpl的pullMessage本领。
//PullMessageService#runpublicvoidrun(){log.info(this.getServiceName()+"servicestarted");while(!this.isStopped()){try{PullRequestpullRequest=this.pullRequestQueue.take();this.pullMessage(pullRequest);}catch(InterruptedExceptionignored){}catch(Exceptione){log.error("PullMessageServiceRunMethodexception",e);}}log.info(this.getServiceName()+"serviceend");}//PullMessageService#pullMessageprivatevoidpullMessage(finalPullRequestpullRequest){finalMQConsumerInnerconsumer=this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());if(consumer!=null){DefaultMQPushConsumerImplimpl=(DefaultMQPushConsumerImpl)consumer;impl.pullMessage(pullRequest);//挪用DefaultMQPushConsumerImpl的pullMessage}else{log.warn("NomatchedconsumerforthePullRequest{},dropit",pullRequest);}}1.1.1.1DefaultMQPushConsumerImpl#pullMessage:拉废除息。提交到ConsumeMessageOrderlyService的线程池consumeExecutor中实行。
//DefaultMQPushConsumerImpl#pullMessage......PullCallbackpullCallback=newPullCallback(){@OverridepublicvoidonSuccess(PullResultpullResult){switch(pullResult.getPullStatus()){caseFOUND:longprevRequestOffset=pullRequest.getNextOffset();pullRequest.setNextOffset(pullResult.getNextBeginOffset());longpullRT=System.currentTimeMillis()-beginTimestamp;......DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);......1.1.1.1.1.1.1