欢迎来到皮皮网官网

【凉山盐源码头】【atch源码】【类型源码】producer源码设置

时间:2024-11-28 18:35:11 来源:登录界面输入密码错误源码

1.搭建源码调试环境—RocketMQ源码分析(一)
2.详解rocketMQ顺序消息
3.RocketMQ第五讲

producer源码设置

搭建源码调试环境—RocketMQ源码分析(一)

       搭建源码调试环境,源码深入分析 RocketMQ 的设置内部运行机制。理解 RocketMQ 的源码目录结构是搭建调试环境的第一步,有助于我们快速定位代码功能和问题。设置

       目录结构主要包括:

       acl:权限控制模块,源码用于指定话题权限,设置凉山盐源码头确保只有拥有权限的源码消费者可以进行消费。

       broker:RocketMQ 的设置核心组件,负责接收客户端发送的源码消息、存储消息并传递给消费端。设置

       client:包含 Producer、源码Consumer 的设置代码,用于消息的源码生产和消费。

       common:公共模块,设置提供基础功能和服务。源码

       distribution:部署 RocketMQ 的工具,包含 bin、conf 等目录。

       example:提供 RocketMQ 的示例代码。

       filter:消息过滤器。

       namesvr:NameServer,所有 Broker 的注册中心。

       remoting:远程网络通信模块。

       srvutil:工具类。

       store:消息的atch源码存储机制。

       style:代码检查工具。

       tools:命令行监控工具。

       获取 RocketMQ 源码:从 Github 下载最新版本或选择其他版本。遇到下载困难时,可留言或私信寻求帮助。

       导入源码到 IDE 中,确保 Maven 目录正确,刷新并等待依赖下载完成。

       启动 RocketMQ 的 NameServer 和 Broker,配置相关参数,如环境变量、配置文件等。确保正确启动后,通过查看启动日志检查运行状态。

       进行消息生产与消费测试,使用源码自带的示例代码进行操作。设置 NameServer 地址后,启动 Producer 和 Consumer,验证消息成功发送与消费。

       使用 RocketMQ Dashboard 监控 RocketMQ 运行情况,持续优化和调试。

详解rocketMQ顺序消息

       RocketMQ是一个高效的消息中间件,具备高可用性和顺序消息处理能力。本文将深入解析RocketMQ顺序消息的类型源码场景应用、示例操作、原理以及源码实现。

       场景

       在有严格顺序要求的业务场景,如订单创建、支付和发货等,RocketMQ的顺序消息特性至关重要。它确保这些操作按特定顺序执行,避免潜在的错误结果。

       示例

       例如,在电商订单系统中,用户下单后,操作流程需要按以下顺序:下单、扣减库存、创建订单。不按顺序执行可能导致库存减少但订单未创建成功。RocketMQ通过确保相同业务操作发送至同一队列,实现消息的有序处理。

       发送和消费

       Producer发送顺序消息时,创建一个MessageQueueSelector来选择队列,如使用order.getId()。Consumer消费时,通过MessageListenerOrderly或ConsumeOrderlyEnable确保按发送顺序读取消息。以下为简单示例:

       Producer: DefaultMQProducer send(Message msg, MessageQueueSelector selector)

       Consumer: DefaultMQPushConsumer consumeMessage(Message msg, MessageListener listener)

       原理与源码

       RocketMQ利用消息队列实现顺序,同一队列内的koei源码消息按序,不同队列无序。生产者发送时会根据选择策略选择队列,消费者则按顺序消费。源码中,send方法(如DefaultMQProducerImpl.send())和consumeMessage方法(如ConsumeMessageOrderlyService.consumeMessageDirectly())具体操作了顺序消息的发送和消费。

RocketMQ第五讲

       broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。

        消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为位,左边补零,剩余为起始偏移量,比如代表了第一个文件,起始偏移量为0,文件大小为1G=;当第一个文件写满了,第二个文件为,起始偏移量为,以此类推。libva源码消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

        CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。

        broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。

        也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。

        引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。

        其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{ topic}/{ queueId}/{ fileName}。同样consumequeue文件采取定长设计,每一个条目共个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.M。

        IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: { fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为M,一个IndexFile可以保存 W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

        按照Message Key查询消息的时候,会用到这个索引文件。

        IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: { fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于+W 4+W = 个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

        其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图, Byte 的Header用于保存一些总的统计信息,4 W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。 W 是真正的索引数据,即一个 Index File 可以保存 W个索引。

        “按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。

        RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。

        属性和方法很多,就不往这里放了。

        文件存储实现类,包括多个内部类

        · 对于文件夹下的一个文件

        上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。

        RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型

        上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。

        上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。

        AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程

        FileWatchService:

        NettyEventExecutor:

        NettyNIOBoss_:一个

        NettyServerNIOSelector_:默认为三个

        NSScheduledThread:定时任务线程

        ServerHouseKeepingService:守护线程

        ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃

        RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程

        AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个:

        RocketmqBrokerAppender_inner

        RocketmqFilterAppender_inner

        RocketmqProtectionAppender_inner

        RocketmqRemotingAppender_inner

        RocketmqRebalanceLockAppender_inner

        RocketmqStoreAppender_inner

        RocketmqStoreErrorAppender_inner

        RocketmqWaterMarkAppender_inner

        RocketmqTransactionAppender_inner

        SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE

        PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE

        ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE

        QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE

        AdminBrokerThread_:remotingServer.registerDefaultProcessor

        ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT

        HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT

        EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION

        ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET

        brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true);

        ==================================================================

        BrokerControllerScheduledThread:=>

        BrokerController.this.getBrokerStats().record();

        BrokerController.this.consumerOffsetManager.persist();

        BrokerController.this.consumerFilterManager.persist();

        BrokerController.this.protectBroker();

        BrokerController.this.printWaterMark();

        log.info("dispatch behind commit log { } bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());

        BrokerController.this.brokerOuterAPI.fetchNameServerAddr();

        BrokerController.this.printMasterAndSlaveDiff();

        BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());

        BrokerFastFailureScheduledThread:=>

        FilterServerManagerScheduledThread:=>

        FilterServerManager.this.createFilterServer();

        ClientHousekeepingScheduledThread:=>

        ClientHousekeepingService.this.scanExceptionChannel();

        PullRequestHoldService

        FileWatchService

        AllocateMappedFileService

        AcceptSocketService

        BrokerStatsThread1

copyright © 2016 powered by 皮皮网   sitemap