皮皮网
皮皮网

【livy 源码分析】【广州旅行世界源码】【git上的源码】kafka源码 consume

来源:winmtr源码 发表时间:2025-01-19 10:39:30

1.golang中使用kafka的综合指南
2.9、NIFI综合应用场景-通过NIFI配置kafka的数据同步
3.kafka简介
4.sarama 源码解析--Kafka的重平衡
5.kafka原理 消费者偏移量__consumer_offsets_相关解析

kafka源码 consume

golang中使用kafka的综合指南

       kafka是一个被广泛使用的分布式、可扩展、高性能、可靠的流处理平台。在处理kafka数据时,livy 源码分析确保处理效率和可靠性需要采取多种最佳实践,本文将通过介绍这些实践以及使用sarama库实现,来提高kafka消费的效率。

       选择合适的提交策略是关键步骤之一。自动提交是sarama默认选择,它会定期提交已成功消费的消息偏移量,确保消费者在重新启动或消费失败时可以从断点继续。手动提交则允许用户更灵活地控制何时提交消息偏移量,提供更高的控制度。

       减少kafka的传输次数可以优化数据读取与写入。批量发送消息的效果优于逐个发送,较大的批次能提高kafka数据发送效率。同时,长轮询策略减少无数据时的请求次数,进一步降低传输频率。广州旅行世界源码

       消费者组的使用是另一个关键点。它允许在多个消费者之间分配消息,提供横向扩展能力。在sarama中,通过消费者组接口的三个方法:Setup、Cleanup、ConsumeClaim来实现消费组的创建、清理和消费分区任务。确保处理rebalance事件,可以避免消息处理的混乱和重复。

       调整消费者缓冲区大小可以优化内存使用和消息处理速度。增加缓冲区大小可以提高吞吐量,但同时也会消耗更多内存。正确设置缓冲区大小有助于平衡性能与资源使用。

       处理rebalance事件是维护消费者组稳定性的关键。在sarama中,正确处理Setup和Cleanup函数,确保在消费者组发生变化时,应用程序能够正常处理消费者离开或加入,避免消息处理的git上的源码混乱。

       监控消费者状态对确保系统健康和性能至关重要。虽然Golang没有内置对Kafka监控的支持,但可以利用外部库和工具追踪延迟、处理时间和错误率。设置警报可以及时发现和处理问题,确保应用健壮、可靠且高效。

       遵循上述实践,结合sarama库,可以在使用kafka时提高处理效率和可靠性,确保应用程序在分布式环境中稳定运行。

9、NIFI综合应用场景-通过NIFI配置kafka的数据同步

       本文旨在介绍nifi与kafka的交互过程,即生产数据到kafka中,然后通过nifi消费kafka中的数据。

       本文前提是nifi、kafka环境正常。

       本文分为三个部分,即处理器说明、生产数据到kafka中以及消费kafka中的dnsmap源码怎么用数据。

       一、处理器说明

       1、处理器说明

       1.1、PublishKafka_0_

       描述:使用Kafka 0..x Producer API将FlowFile的内容作为消息发送到Apache Kafka。要发送的消息可以是单独的FlowFiles,也可以使用用户指定的定界符(例如换行符)进行定界。用于获取消息的辅助NiFi处理器是ConsumeKafka_0_。

       2、属性配置

       2.1、ConsumeKafka_0_

       描述:消耗来自专门针对Kafka 0..x Consumer API构建的Apache Kafka的消息。用于发送消息的辅助NiFi处理器是PublishKafka_0_。

       在下面的列表中,列出属性及其默认值,以及属性是否支持NiFi表达式语言。

       二、Producer生产

       1、创建并配置处理器GenerateFlowFile

       创建处理器组kafka,进入组后创建GenerateFlowFile处理器。每1秒生产一次数据。

       - 文件大小b - 每次生成个相同文件 - 每次生成的android源码格式文本流文件内容唯一

       2、创建并配置处理器PublishKafka_0_

       创建处理器组kafka,进入组后创建PublishKafka_0_处理器。

       - Brokers设置为...:,...:,...:。图为示例。

       - topic设置为nifi-topic,如果topic不存在,会自动创建

       - Delivery Guarantee,对应kafka的acks机制,选择最为安全的Guarantee Replicated Delivery,相当于acks=all

       3、配置GenerateFlowFile和PublishKafka_0_连接

       连接GenerateFlowFile和PublishKafka_0_

       4、负载均衡并发

       5、验证

       启动并查看监听kafka消费数据,也可以通过 server1:/topic/meta...工具查看生产的数据 在kafka所在服务器执行监听命令:

       三、Consumer消费

       1、创建并配置ConsumeKafka_0_处理器并连接

       2、验证

       启动生产者、消费者,验证nifi是否将数据写入kafka、并且kafka的数据是否被消费。以下为模板界面。

       以上完成了nifi读取kafka中的数据(消费)。类似的也可以通过nifi将数据写入到nifi中,此处不再赘述。

kafka简介

       ä¸€ã€kafka定义

        二、kafka的优势

        三、kafka的原理

        四、kafka起源

        一、Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于年贡献给了Apache基金会并成为顶级开源项目。

        二、kafka的优势

        高吞吐量、低延迟:kafka美妙之处是可以处理几十万条信息,它的延迟最低只有几毫秒,每个topic可以分多个partition,consumer

        group对partition进行consume操作。

        可扩展性:kafka集群支持热扩展

        持久化、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

        容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

        高并发:支持数千个客户端同时读写

        三、kafka的原理

        kafka是如何实现以上所述这几点,我们逐一说明:

        1.高吞吐量、低延迟

        kafka在设计之初就是为了针对大数据量的传输处理,高吞吐量、低延迟最主要看的就是单位时间内所能读写的数据总量,我们先来看生产端。

        kafka采取了一定量的批处理机制,即当生产数据达到一定数量或者达到时间窗口后,将所收集到的数据一批次的提交到服务器,我们假设处理一次数据的时间为1ms,那每秒钟能处理条,延时为1ms,如果此时将处理间隔变成9ms,即每ms处理一批数据,假设这段时间接收到条处理,那每秒则能处理条,但是延时变成了ms。为了获得最大的吞吐量,需要牺牲一定的延迟,但是这样的牺牲是值得的。当确定了这种小批量方式之后,高速的写则取决于kafka自身写磁盘的速度了。而由于kafka本身对数据不做任何的处理,只管写入数据,保管数据,分发数据,因此会是一种批量顺序写入数据的情况,而磁盘的读写速度大量消耗在寻址上,也就是随机读写,但是对于顺序写入的速度是非常快的,甚至能媲美内存的随机写入速度。有人做过一个对比,普通磁盘顺序写入每秒能达到.2M/s,SSD的顺序写入速度为.2M/s,内存的顺序写入速度为.2M/s。kafka正是利用了这个特性,顺序写入,速度相对较快。而kafka本身虽然也是写入磁盘持久化数据,但实际上kafka是将数据顺序写入页缓存中(page cache),然后由操作系统自行决定何时写到磁盘上,因此kafka的写操作能在每秒轻轻松松达到写入数十万条记录。并且基于kafka的动态扩展,这个数字还能不断增大。

        kafka在消费端也有着高吞吐量,由于kafka是将数据写入到页缓存中,同时由于读写相间的间隔并不大,很大可能性会在缓存中命中,从而保证高吞吐量。另外kafka由于本身不对数据做任何的修改,完全使用零拷贝技术,大大提升数据的读取能力。

        2.kafka每个节点叫做broker,而每一个broker都是独立运行的,可以随时加入kafka集群,集群的心跳管理是由zookeeper负责,新加入的broker只要broker id不与原有的冲突就能顺利的加入集群中,实现动态扩展。

        3.kafka的持久化在上面已经提到,kafka绕过了java的堆处理数据,直接将数据写入页缓存,然后由操作系统来管理页缓存写入磁盘,实现持久化。kafka每一个主题topic是一个业务数据,他可由多个partition组成,而每个partition可以有多个replica副本,用于保证数据的可靠性。replica分为两个角色,一个是leader,一个是追随者,同一时间,每一个partition只能有一个leader,其他都是追问随者,laeder负责接收数据并写入log,而追随者不能被用户写入数据,只是从leader角色的replica副本中同步log写入自己的log,保持数据同步。kafka中有一个概念,ISR,全称是in-sync

        replica,即所有可用的replica副本,这里的ISR数量只要大于1,这个partition就能正常运作,因此容错性非常好,假设n个replica,那最多可以坏n-1个replica的情况下,还能保持系统正常运行。当replica迟滞到一定时间后,会被kafka从ISR中剔除,当再次同步后,可以再次加入ISR,如果这时候leader出现问题,会从ISR中重新选举一个leader,原先的leader再次同步成功后会重新加入ISR,成为一个flower。

        4.上面提到了kafka的ISR机制,kafka的容错性就是由ISR的机制来保证的。

        5.kafka集群可以动态扩展broker,多个partition同时写入消费数据,实现真正的高并发。

        四、kafka的起源

        kafka起源于LinkedIn公司,当时领英公司需要收集两大类数据,一是业务系统和应用程序的性能监控指标数据,而是用户的操作行为数据。当时为了收集这两类数据,领英自研了两套相应的数据收集系统,但是这两套系统都存在一些弊端,无法实现实时交互、实时性差、维护成本高。因此领英的工程师希望找到一个统一的组件来收集分发消费这些大批量的数据,ActiveMQ由于扩展性不足,不能支撑大数据量而被抛弃,从而决定自研一套满足需求的系统组件,也就是kafka。

        kafka的设计之初主要有三个目标:

        1.为生产者和消费者提供一套简单的API

        2.降低网络传输和磁盘存储开销

        3.具有高伸缩性架构

        目前kafka可以算是超额完成了目标。

        kafka的名称由来也很有意思,因为kafka系统的写操作性能特别强,因此想使用一个作家的名字来命名kafka,而Jay Kreps,kafka的三位作者之一,在上大学的时候很喜欢Franz Kafka,因此起来这样一个名字。

        kafka在年开源,年7月正式进入Apache进行孵化,年月顺利毕业,后成为Apache的顶级项目。

sarama 源码解析--Kafka的重平衡

       重平衡操作

       重平衡是动态调整Consumer Group下的Consumer订阅Topic的分区的一个关键操作。Sarama中的BalanceStrategyRange和BalanceStrategySticky策略具体实施这一操作。

       重平衡触发条件之一是成员数变更。这一过程包括以下步骤:

       1. 启动一个新的消费者实例。

       2. 调用Consume方法。

       3. Consume方法初始化连接信息,并启动一个goroutine。程序会阻塞在sess.ctx.Done()上。

       4. 在newSession方法中找到协调者信息,并发起join请求和syncgroup请求。Consumer Leader执行一次重平衡。

       5. 创建consumer group session,并初始化offset manager和开启心跳goroutine。

       6. 当心跳超时或收到coordinator的重平衡通知时,调用cancel()方法取消操作,退出Consume逻辑。

       7. 此时,Consume函数优雅退出。由于外层循环的存在,会重新执行Consume,实现一次重平衡。

       另一个触发重平衡的条件是订阅主题分区数发生变更。这一过程如下:

       1. 在Consume方法中开启心跳goroutine,并将consumer group session传递给它。

       2. 分区数发生变化时,调用sess.cancel(),Consume优雅退出并重新执行,实现重平衡。

kafka原理 消费者偏移量__consumer_offsets_相关解析

       知流Stream是滴滴开源的Kafka运维管控平台,对于参与开发有兴趣但担心能力的同学,可以联系我,我将作为你的导师带你一起参与开源项目。

       Kafka的日志文件中存有以`__consumer_offsets_`为前缀的文件夹,总共有个。新版Kafka推荐将消费者的位移信息保存在内部的`__consumer_offsets`topic中,此topic默认提供了`kafka_consumer_groups.sh`脚本供用户查看消费者信息。

       `__consumer_offsets`类似于普通的topic,其主要功能是保存消费者的位移信息。每一项消息采用KV格式,其结构包括group.id、topic、分区号作为键,位移值作为值。

       在高并发的消费环境中,多个消费者或消费组同时提交位移信息会增加`__consumer_offsets`的写入负载。因此,Kafka默认为该topic创建个分区,并通过`Math.abs(groupID.hashCode()) % numPartitions`的哈希求模运算将负载均匀分布在不同的分区上。

       通常,当集群中首次有消费者消费消息时,会自动创建`__consumer_offsets`topic,其副本因子默认为3,分区数默认为。消费者可以使用消费者命令指定消费组、topic和partition进行消费。

       为了演示,我们打开一个session,执行消费者命令,指定消费组`szz1-group`,topic`szz1-test-topic`,然后执行生产消息命令,发送几条消息,可以看到session中的消费者消费了这些消息。通过查看指定消费组的消费位置offset,可以了解到每个partition对应的消费者id。因为只开一个消费者,它同时消费了3个partition。CURRENT-OFFSET表示当前消费组消费到的偏移量,LOG-END-OFFSET表示日志最后的偏移量。当CURRENT-OFFSET等于LOG-END-OFFSET时,说明当前消费组已经全部消费完毕。

       关闭session后,发送新的消息,观察CURRENT-OFFSET保持不变,因为此时没有消费者消费这些消息。重新打开一个消费组继续消费,可以看到控制台输出了新发送的消息,并且偏移量更新了。

       对于新的消费组,如果不指定`--from-beginning`参数,默认会从最新的偏移量开始消费。如果需要从头开始消费,需要加上此参数。通过`Math.abs(groupID.hashCode()) % numPartitions`确定消费组在哪个`__consumer_offsets-`中。

       为了查询特定消费组的偏移量,可以先通过`consume_group`确定分区数,例如对于`szz1-group`,通过哈希求模运算确定消费组的偏移量信息存于哪个分区。可以通过命令查询,其结构为键和值,键由消费组+Topic+分区数确定,值包含了消费组的偏移量信息。

       日常运维与问题排查方面,滴滴开源的LogiKM一站式Kafka监控与管控平台能够提供高效的支持。

相关栏目:探索