1.Flume+Kafka+Flink+Redis构建大数据实时处理系统(PV、异异步UV)
2.Flink遇到Kafka - FlinkKafkaConsumer运行机制
3.如何高效接入 Flink:Connecter / Catalog API 核心设计与社区进展
4.FlinkââExactly-Once
5.Apache Flink 1.13.0 发布公告
6.Flink Sink的步请反压优化(Sink异步化)
Flume+Kafka+Flink+Redis构建大数据实时处理系统(PV、UV)
大数据处理的求源常用方法目前流行两种:离线处理和在线实时流处理。在互联网应用中,异异步无论是步请哪一种处理方式,基本数据来源都是求源tinyplay 源码日志数据,例如web应用的异异步用户访问日志、点击日志等。步请
大数据处理目前流行的求源是离线处理和在线处理,基本处理架构如下:
对于数据分析结果时间要求严格的异异步,可以采用在线实时处理方式。步请例如使用Flink、求源SparkStreaming进行处理。异异步例如天猫双十一的步请成交额实时动态更新,就需要采用在线处理。求源接下来介绍实时数据处理方式,即基于Flink的在线处理,在以下完整案例中,我们将完成以下几项工作:
需要注意的是,本案例核心在于如何构建实时处理系统,让我们对大数据实时处理系统有一个基本的、清晰的了解与认识。
实时处理系统整体架构如下:
从以上架构可以看出,其由以下三个重要组成部分:
从构建实时处理系统的角度出发,我们需要做的是,如何让数据在各个不同集群系统之间打通,即需要做各个系统之前的整合,包括Flume与Kafka的整合,Kafka与Flink的整合。当然,各个环境是否使用集群,依个人实际需要而定,在我们的环境中,Flume、Kafka、Flink都使用集群。
对于Flume而言,关键在于如何采集数据,并且将其发送到Kafka上,由于我们这里使用Flume集群的方式,Flume集群的优雅的骨架源码配置也是十分关键的。对于Kafka,关键就是如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。
在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent。
对于Kafka而言,关键在于如何接收来自Flume的数据。从整体上讲,逻辑应该是比较简单的,即可以在Kafka中创建一个用于我们实时处理系统的topic,然后Flume将其采集到的数据发送到该topic上即可。
在我们的场景中,两个Flume Agent分别部署在两台Web服务器上,用来采集Web服务器上的日志数据,然后其数据的下沉方式都为发送到另外一个Flume Agent上,所以这里我们需要配置三个Flume Agent。
在Kafka中,先创建一个topic,用于后面接收Flume采集过来的数据:
Flink提供了特殊的Kafka Connectors来从Kafka topic中读取数据或者将数据写入到Kafka topic中,Flink的Kafka Consumer与Flink的检查点机制相结合,提供exactly-once处理语义。为了做到这一点,Flink并不完全依赖于Kafka的consumer组的offset跟踪,而是在自己的内部去跟踪和检查。
Flink的kafka consumer叫做FlinkKafkaConsumer(对于Kafka 0.9.0.X来说是 等),它提供了对一个或多个Kafka topic的访问。
Flink Kafka Consumer、等的构造函数接收以下参数:
1、topic名称或者名称列表
2、反序列化来自kafka的数据的DeserializationSchema/Keyed Deserialization Schema
3、Kafka consumer的一些配置,下面的配置是必需的: "bootstrap.servers"(以逗号分隔的Kafka brokers列表) "zookeeper.connect"(以逗号分隔的Zookeeper 服务器列表) "group.id"(consumer组的id)
例如:
Java 代码:
Scala 代码:
当前FlinkKafkaConsumer的实现会建立一个到Kafka客户端的连接来查询topic的列表和分区。
为此,consumer需要能够访问到从提交Job任务的react源码解析掘金服务器到Flink服务器的consumer,如果你在客户端遇到任何Kafka Consumer的问题,你都可以在客户端日志中看到关于请求失败的日志。
Flink Kafka Consumer将会从一个topic中消费记录并以一致性的方式周期性地检查所有Kafka偏移量以及其他操作的状态。Flink将保存流程序到状态的最新的checkpoint中,并重新从Kafka中读取记录,记录从保存在checkpoint中的偏移位置开始读取。
checkpoint的时间间隔定义了程序在发生故障时可以恢复多少。
同时需要注意的是Flink只能在有足够的slots时才会去重启topology,所以如果topology由于TaskManager丢失而失败时,任然需要有足够的slot可用。Flink on YARN支持YARN container丢失自动重启。
所谓Flink和Redis的整合,指的是在我们的实时处理系统中的数据的落地方式,即在Flink中包含了我们处理数据的逻辑,而数据处理完毕后,产生的数据处理结果该保存到什么地方呢?显然就有很多种方式了,关系型数据库、NoSQL、HDFS、HBase等,这应该取决于具体的业务和数据量,在这里,我们使用Redis来进行最后分析数据的存储。
所以实际上做这一步的整合,其实就是开始写我们的业务处理代码了,因为通过前面Flume-Kafka-FLink的整合,已经打通了整个数据的流通路径,接下来关键要做的是,在Flink中,如何处理我们的数据并保存到Redis中。
Flink自带的connector提供了一种简洁的写入Redis的方式,只需要在项目中加入下面的依赖即可实现。
兼容版本:Redis 2.8.5 注意:Flink的connector并不是Flink的安装版本,需要写入用户的jar包并上传才能使用。
数据可视化处理目前我们需要完成两部分的工作:
对于Web项目的开发,因个人技术栈能力而异,选择的语言和技术也有所不同,只要能够达到我们最终数据可视化的目的,其实都行的。这个项目中我们要展示的画线指标源码公式是pv和uv数据,难度不大,因此可以选择Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我个人非常喜欢,因为开发非常快,但因为前面一直用的是Java,因此这里我还是选择使用SpringMVC来完成。
至于UI这一块,我前端能力一般,普通的开发没有问题,但是要做出像上面这种地图类型的UI界面来展示数据的话,确实有点无能为力。好在现在第三方的UI框架比较多,对于图表类展示的,比如就有highcharts和echarts,其中echarts是百度开源的,有丰富的中文文档,非常容易上手,所以这里我选择使用echarts来作为UI,并且其刚好就有能够满足我们需求的地图类的UI组件。
对于页面数据的动态刷新有两种方案,一种是定时刷新页面,另外一种则是定时向后端异步请求数据。
目前我采用的是第一种,页面定时刷新,有兴趣的同学也可以尝试使用第二种方法,只需要在后端开发相关的返回JSON数据的API即可。
至此,从整个大数据实时处理系统的构建到最后的数据可视化处理工作,我们都已经完成了,可以看到整个过程下来涉及到的知识层面还是比较多的,不过我个人觉得,只要把核心的原理牢牢掌握了,对于大部分情况而言,环境的搭建以及基于业务的开发都能够很好地解决。
Flink遇到Kafka - FlinkKafkaConsumer运行机制
在使用Flink处理Kafka数据流时,时隙调整源码Flink在job开启checkpoint时,一边消费Kafka topic数据,一边定时将offset和其他operator状态记录到checkpoint中。在job失败后,Flink重启job并从最后一个checkpoint恢复所有状态,从checkpoint记录的offset开始重新消费Kafka topic。记录offset的间隔决定失败后恢复程度。配置Flink Kafka consumer容错机制时,需要确保task slot数量足够,否则job重启失败,Flink on YARN支持自动重启丢失的YARN containers。
配置向Kafka brokers或Zookeeper提交offset行为,Flink Kafka Consumer不依赖这些提交确保容错,而是将其作为状态监控的标记。在Checkpointing disabled时,使用Kafka properties中的enable.auto.commit配置自动提交offset行为。在Checkpointing enabled时,Flink将offset存入checkpoint,保证与Kafka brokers中committed offset一致性,通过setCommitOffsetOnCheckpoints调整自动提交行为。
Flink Kafka Consumer运行机制中,Flink异步记录checkpoint,只有在enableCheckpointing()时,checkpoint完成时记录offset并提交,确保exactly-once。在错误处理上,Flink依赖带barrier的checkpointing机制,barrier作为分隔符,确保在发生错误时,job恢复到正确状态。在Job失败后,FlinkKafkaConsumer从最后一个成功committed的checkpoint中恢复相应partition offset,避免数据丢失和重复写入,保证exactly-once。
此外,Flink 1.3引入了更多监控功能,如总消息数、瞬时读取速度、latency等,可在Flink Web UI的Task Metric中查看监控指标,帮助优化和监控Flink Kafka应用性能。
如何高效接入 Flink:Connecter / Catalog API 核心设计与社区进展
为了高效接入 Flink,理解其核心设计与社区进展至关重要。本文将分层次介绍 Flink API 的核心内容,从 DataStream API 到 Table 和 SQL API,再到 Connector 和 Catalog API。
首先,DataStream API 是基础,针对 Java 开发者,通过实现 Source 和 Sink API 来直接构建 Stream 算法。为了支持 Table 和 Catalog,需要在 Source 和 Sink 的基础上,针对 Connector 提供的 API 进行二次开发。
在 Source API 部分,从 Flink 1. 开始引入,逐渐稳定并标记为 Public,取代了之前的 InputFormat 和 SourceFunction。设计上采用主从结构,类似 Flink 集群,分为 SplitEnumerator 和 SourceReader。SplitEnumerator 负责枚举分片,而 SourceReader 以 Subtask 级别并发执行,通过 RPC 与 Enumerator 通信,封装 SourceEvent 协调工作。为了简化开发,引入了 SourceReaderBase 和 SplitReader,将外部系统交互与 Flink 协作分离,减轻开发者负担。
在开发 Source 时,需注意与外部系统和 Flink 之间交互的分离,避免影响主线程和 Checkpoint 运行。利用 SourceReaderBase 和 SplitReader,可以复用现有逻辑,降低开发复杂度。
为了满足更多需求,Flink 在最近版本中引入了 Hybrid Source,它允许在不同 Source 之间切换,如从文件系统切换到消息队列。通过封装现有 Enumerator 和 Reader,提供切换能力,确保数据流平滑过渡。
Watermark Alignment 机制解决了不同 Source 间进度差异导致的下游算子等待问题,通过 CoordinatorStore 交换 Watermark 信息,确保进度同步。
SinkAPI 设计相对简单,Sink 作为工厂类构建拓扑,核心组件是 SinkWriter 负责数据序列化和外部系统输出。对于 Exactly-once 和第二阶段提交的需求,引入 SinkCommitter 组件。Async Sink 基类提供通用的异步输出逻辑,内置异常重试,实现 at-least-once 语义,简化开发。
集成至 Table/SQL API 时,Source 和 Sink 需实现 DynamicTableSource 和 DynamicTableSink 接口,提供对 Planner 的兼容性和配置构建。LookupTableSource 支持外部系统查询,通过 LookupFunction 实现点查逻辑。Sink 的实现同样依赖于 DynamicTableSink 接口。
为了简化用户配置和管理,Catalog API 提供外部系统信息的统一抽象,包括 Database、Table 等概念的映射。它能够简化配置,降低使用门槛,并支持血缘信息管理,通过 Catalog Modification Listener 监听表的增删操作。
实现自己的 Connector 时,首先接触的是 Source 和 Sink API,这是构建 Stream 算法的基础。为了支持 SQL 和 Table 生态,需实现 DynamicTableSource 和 DynamicTableSink。通过对接 Catalog API,可以利用外部系统的概念,简化用户操作,降低使用成本,并获得血缘管理能力。
FlinkââExactly-Once
Apache Flinkæ¯ç®åå¸åºæåå ³æ³¨çæµè®¡ç®å¤çå¼æï¼ç¸è¾äºSpark Streamingçä¾æSpark Coreå®ç°çå¾®æ¹å¤ç模åï¼Flinkæ¯ä¸ä¸ªçº¯ç²¹çæµå¤çå¼æï¼å ¶åºäºæä½ç¬¦çè¿ç»æµæ¨¡åï¼å¯ä»¥è¾¾å°å¾®ç§çº§å«ç延è¿ãFlinkå®ç°äºæµæ¹ä¸ä½å模å¼ï¼å®ç°æç §äºä»¶å¤çåæ åºå¤ç两ç§å½¢å¼ï¼åºäºå å计ç®ã强大é«æçååæºå¶åå å管çï¼åºäºè½»é级åå¸å¼å¿«ç §checkpointæºå¶ï¼ä»èèªå¨å®ç°äºExactly-Onceä¸è´æ§è¯ä¹ã
1. æ°æ®æºç«¯
æ¯æå¯é çæ°æ®æº(å¦kafka), æ°æ®å¯é读
Apache Flinkå ç½®FlinkKafkaConsumerç±»ï¼ä¸ä¾èµäº kafka å ç½®çæ¶è´¹ç»offset管çï¼å¨å é¨èªè¡è®°å½åç»´æ¤ consumer çoffsetã
2. Flinkæ¶è´¹ç«¯
è½»éçº§å¿«ç §æºå¶: ä¸è´æ§checkpointæ£æ¥ç¹
Flinkéç¨äºä¸ç§è½»éçº§å¿«ç §æºå¶(æ£æ¥ç¹checkpoint)æ¥ä¿éExactly-Onceçä¸è´æ§è¯ä¹ãæè°çä¸è´æ£æ¥ç¹ï¼å³å¨æ个æ¶é´ç¹ä¸ææä»»å¡ç¶æçä¸ä»½æ·è´(å¿«ç §)ã该æ¶é´ç¹æ¯ææä»»å¡å好å¤çå®ä¸ä¸ªç¸åæ°æ®çæ¶é´ã
é´éæ¶é´èªå¨æ§è¡åå¸å¼ä¸è´æ§æ£æ¥ç¹(Checkpoints)ç¨åºï¼å¼æ¥æå ¥barrieræ£æ¥ç¹åç线ï¼å åç¶æèªå¨åå¨ä¸ºcpè¿ç¨æ件ãä¿è¯æ°æ®Exactly Oncey精确ä¸æ¬¡å¤çã
(1) ä»source(Input)端å¼å§ï¼JobManagerä¼åæ¯ä¸ªsource(Input)åéæ£æ¥ç¹barrieræ¶æ¯ï¼å¯å¨æ£æ¥ç¹ãå¨ä¿è¯ææçsource(Input)æ°æ®é½å¤çå®æåï¼Flinkå¼å§ä¿åå ·ä½çä¸è´æ§æ£æ¥ç¹checkpointsï¼å¹¶å¨è¿ç¨ä¸å¯ç¨barrieræ£æ¥ç¹åç线ã
(2) æ¥æ¶æ°æ®åbarrieræ¶æ¯ï¼ä¸¤ä¸ªè¿ç¨å¼æ¥è¿è¡ãå¨ææçsource(Input)æ°æ®é½å¤çå®æåï¼å¼å§å°èªå·±çæ£æ¥ç¹(checkpoints)ä¿åå°ç¶æå(StateBackend)ä¸ï¼å¹¶éç¥JobManagerå°Barrierååå°ä¸æ¸¸
(3) barrieråä¸æ¸¸ä¼ éæ¶ï¼ä¼è¿è¡barrier对é½ç¡®è®¤ãå¾ barrieré½å°é½åæè¿è¡checkpointsæ£æ¥ç¹ä¿åã
(4) éå¤ä»¥ä¸æä½ï¼ç´å°æ´ä¸ªæµç¨å®æã
3. è¾åºç«¯
ä¸ä¸æSparkçè¾åºç«¯Exactly-Onceä¸è´æ§ä¸å®ç°ç±»ä¼¼ï¼é¤äºç®æ æºéè¦æ»¡è¶³ä¸å®æ¡ä»¶ä»¥å¤ï¼Flinkå ç½®çäºé¶æ®µæ交æºå¶ä¹åç¸å®ç°äºäºå¡ä¸è´æ§ã**æ¯æå¹çåå ¥ãäºå¡åå ¥æºå¶(äºé¶æ®µæ交)
**è¿ä¸ååä¸æSparkçå¹åå ¥ç¹æ§å 容ä¸è´ï¼å³ç¸åKey/ID æ´æ°åå ¥ï¼æ°æ®ä¸åãåå©æ¯æ主é®å¯ä¸æ§çº¦æçåå¨ç³»ç»ï¼å®ç°å¹çæ§åå ¥æ°æ®ï¼æ¤å¤å°ä¸å继ç»èµè¿°ã
Flinkå¨å¤çå®source端æ°æ®æ¥æ¶åoperatorç®å计ç®è¿ç¨ï¼å¾ è¿ç¨ä¸ææçcheckpointé½å®æåï¼åå¤åéæ°æ®å°sink端ï¼æ¤æ¶å¯å¨äºå¡ãå ¶ä¸åå¨ä¸¤ç§æ¹å¼ï¼ (1) WALé¢åæ¥å¿: å°è®¡ç®ç»æå åå ¥å°æ¥å¿ç¼å(ç¶æå端/WAL)ä¸ï¼çcheckpoint确认å®æåä¸æ¬¡æ§åå ¥å°sinkã(2) äºé¶æ®µæ交: 对äºæ¯ä¸ªcheckpointå建äºå¡ï¼å é¢æ交æ°æ®å°sinkä¸ï¼ç¶åçææçcheckpointå ¨é¨å®æååçæ£æ交请æ±å°sink, 并æç¶ææ¹ä¸ºå·²ç¡®è®¤ã
æ´ä½ææ³: 为checkpointå建äºå¡ï¼çå°ææçcheckpointå ¨é¨çæ£çå®æåï¼ææ计ç®ç»æåå ¥å°sinkä¸ã
Apache Flink 1..0 发布公告
Apache Flink 社区宣布发布 Flink 1..0 版本,此次发布得到了多位贡献者的支持,解决了超过个问题。Flink 1..0 版本的一大亮点是将流处理应用程序的使用变得更加自然和易于管理,通过引入新的反应式扩展模式,只需调整并行进程数,就可以扩展流应用程序,就如同管理任何其他应用程序一样方便。此外,还对系统进行了多项改进,旨在帮助用户更好地了解应用程序的性能。
该版本中,Flink 引入了响应式缩放,这是一种旨在使流处理应用程序在资源管理和部署方面具有双重性质的机制。在 Flink 应用程序的部署模式中,Flink 可以主动管理资源并根据需要分配和释放工作人员,这对于快速调整资源需求的作业和应用程序特别有用。对于长时间运行的流应用程序,将它们部署为可以像其他任何长时间运行的应用程序一样,即不需要知道它在Kubernetes、EKS、Yarn等上运行,无需管理资源的数量和并行性设置,仅使用分配给它的工人数即可。这种部署模式被称为反应式缩放。
为了帮助用户分析和理解应用程序的性能,Flink 1..0 版本提供了新工具,包括瓶颈检测、背压监控、CPU 火焰图以及状态访问延迟指标等,以识别数据速率不足或资源超出预期的原因。新版本还改进了背压度量系统,提供了工作数据流的图形表示,并引入了CPU 火焰图,以可视化分析哪些方法正在消耗 CPU 资源,以及这些方法与其他方法的比较。此外,还增加了对状态后端延迟指标的访问,以了解状态后端是否响应良好。
另一个值得注意的改进是支持从保存点恢复时更改 Flink 应用程序的状态后端,这意味着应用程序的状态不再被锁定在最初启动应用程序时所使用的状态后端中。这样,用户可以在状态变得太大时从 HashMap 状态后端(JVM Heap 中的纯内存)切换到 RocksDB 状态后端,以提高性能和效率。
Kubernetes 本地部署现在支持自定义 Pod 模板,用户可以以 Kubernetes-y 的方式设置和配置 JobManagers 和 TaskManagers 窗格,并具有 Flink 的 Kubernetes 集成中直接内置的配置选项之外的灵活性。未对齐的检查点已成熟到可以鼓励所有用户尝试的程度,这使从保留的检查点扩展应用程序变得方便,如果应用程序存在背压问题。此外,SQL 和 Table API 的功能也在不断改进,包括通过表值函数定义时间窗口,增强 SQL 与 Table API/SQL 之间的互操作性,以及改进 SQL 客户端和初始化脚本的功能。
在 PyFlink 中,此版本的总体主题是使 Python DataStream API 和 Table API 更接近于与 Java / Scala API 的功能对等,引入了有状态操作在 Python DataStream API 中的使用,支持用户定义窗口和基于行的操作在 Python Table API 中,以及批处理执行模式对 PyFlink DataStream 程序的支持。此外,Flink 文档已从 Jekyll 迁移到 Hugo,以提供更好的用户体验,而 Flink Web UI 现在显示导致作业失败的最后异常的历史记录,有助于调试。
为了更轻松地配置和控制执行,SQL 客户端现在接受初始化脚本来配置会话,以定义和控制执行。此外,此版本还改进了 SQL 与时间相关的函数的行为,支持在 TIMESTAMP_LTZ 列上定义事件时间属性,以在夏时制的支持下优雅地执行窗口处理。Flink 还引入了 JDBC 接收器的事务性提交结果,以确保一次准确地交付符合 XA 的数据库结果,以及支持常规的 Python 用户定义的集合函数和 Pandas UDAF 在 PyFlink 的 Table API 中的 Group Windows。
在批处理执行程序中,改进了内存稳定性和排序合并阻塞改组的性能,并支持异步查找模式和查找缓存的 HBase 查找表源。最后,用户在升级到 Flink 1..0 版本时需要考虑的更改包括阅读发行说明以了解与以前 1.x 版本的 API 兼容性,以及查看完整的发行版变更日志和更新的文档以获取详细信息。
Flink Sink的反压优化(Sink异步化)
在Flink项目中,我们面临一个场景,即从阿里SLS接收监控指标并进行清洗,然后写入TSDB。起初运行平稳,但在指标数量增加后,发现SLS消费存在延迟问题。因此,我们着手优化Sink的异步处理。
问题的起因和定位涉及到了Sink的同步写入策略。原设计中,每接收到一条数据,Sink就立即同步调用TSDB接口,导致性能受限。为提升效率,我们需要将Sink的处理逻辑转变为异步模式。
异步优化的关键在于引入一个比喻,就像组织会议:首先确定参会者,只有当所有人都到位(即await()方法调用完成)时,会议才能开始。在Flink中,我们通过设置一个栅栏计数器来模拟这个过程,当处理任务(SinkTaskProcessor)完成一个数据写入请求,计数器减一,直到所有任务完成,数据才会被真正写入TSDB。
SinkTaskProcessor是用户必须实现的接口,负责处理数据写入。而AbstractAsyncRichSinkFunction作为抽象类,继承了RichSinkFunction并实现了CheckpointedFunction。AsyncSinkTaskRunnable则是提交到线程池的任务,它负责从数据缓存队列中取出数据,并交给SinkTaskProcessor处理,同时设置了ms的超时防止阻塞。
源代码位于cn.sh.flink.learning.sink.async包下的SlowlyRickSinkTestFunction,这是一个模拟处理耗时任务的类,真正的数据处理工作由SinkTaskProcessor负责。我们鼓励大家试用并提供反馈,如果发现任何问题或有改进意见,欢迎通过私信或issue进行交流。