1.Spark原理详解
2.Sparkå
å管ç详解ï¼ä¸ï¼ââå
å管ç
3.spark sql源码系列 | with as 语句真的内内存会把查询的数据存内存嘛?
4.源码解析Spark中的Parquet高性能向量化读
5.spark原理系列 broadcast广播原理优缺点示例源码权威讲解
6.大数据面试题-Spark的内存模型
Spark原理详解
Spark原理详解: Spark是一个专为大规模数据处理设计的内存计算框架,其高效得益于其核心组件——弹性数据分布集RDD。存源RDD是管理Spark的数据结构,它将数据存储在分布式内存中,内内存通过逻辑上的存源集中管理和物理上的分布式存储,提供了高效并行计算的管理newlvq源码能力。 RDD的内内存五个关键特性如下:每个RDD由多个partition组成,用户可以指定分区数量,存源默认为CPU核心数。管理每个partition独立处理,内内存便于并行计算。存源
Spark的管理计算基于partition,算子作用于partition上,内内存无需保存中间结果,存源提高效率。管理
RDD之间有依赖性,数据丢失时仅重新计算丢失分区,避免全量重算。
对于key-value格式的RDD,有Partitioner决定分片和数据分布,优化数据处理的本地化。
Spark根据数据位置调度任务,实现“移动计算”而非数据。
Spark区分窄依赖(一对一)和宽依赖(一对多),前者不涉及shuffle,后者则会根据key进行数据切分。 Spark的执行流程包括用户提交任务、生成DAG、划分stage和task、在worker节点执行计算等步骤。创建RDD的方式多样,包括程序中的集合、本地文件、HDFS、数据库、NoSQL和数据流等。 技术栈方面,Spark与HDFS、YARN、MR、Hive等紧密集成,提供SparkCore、图书管理系统java源码SparkSQL、SparkStreaming等扩展功能。 在编写Spark代码时,首先创建SparkConf和SparkContext,然后操作RDD进行转换和应用Action,最后关闭SparkContext。理解底层机制有助于优化资源使用,如HDFS文件的split与partition关系。 搭建Spark集群涉及上传、配置worker和master信息,以及启动和访问。内存管理则需注意Executor的off-heap和heap,以及Spark内存的分配和使用。Sparkå å管ç详解ï¼ä¸ï¼ââå å管ç
å¼¹æ§åå¸å¼æ°æ®éï¼RDDï¼ä½ä¸ºSparkææ ¹æ¬çæ°æ®æ½è±¡ï¼æ¯åªè¯»çååºè®°å½ï¼Partitionï¼çéåï¼åªè½åºäºå¨ç¨³å®ç©çåå¨ä¸çæ°æ®éä¸å建ï¼æè å¨å ¶ä»å·²æçRDDä¸æ§è¡è½¬æ¢ï¼Transformationï¼æä½äº§çä¸ä¸ªæ°çRDDã转æ¢åçRDDä¸åå§çRDDä¹é´äº§ççä¾èµå ³ç³»ï¼ææäºè¡ç»ï¼Lineageï¼ãååè¡ç»ï¼Sparkä¿è¯äºæ¯ä¸ä¸ªRDDé½å¯ä»¥è¢«éæ°æ¢å¤ãä½RDDçææ转æ¢é½æ¯æ°æ§çï¼å³åªæå½ä¸ä¸ªè¿åç»æç»Driverçè¡å¨ï¼Actionï¼åçæ¶ï¼Sparkæä¼å建任å¡è¯»åRDDï¼ç¶åçæ£è§¦å转æ¢çæ§è¡ãTaskå¨å¯å¨ä¹å读åä¸ä¸ªååºæ¶ï¼ä¼å å¤æè¿ä¸ªååºæ¯å¦å·²ç»è¢«æä¹ åï¼å¦æ没æåéè¦æ£æ¥Checkpointææç §è¡ç»éæ°è®¡ç®ãæ以å¦æä¸ä¸ªRDDä¸è¦æ§è¡å¤æ¬¡è¡å¨ï¼å¯ä»¥å¨ç¬¬ä¸æ¬¡è¡å¨ä¸ä½¿ç¨persistæcacheæ¹æ³ï¼å¨å åæç£çä¸æä¹ åæç¼åè¿ä¸ªRDDï¼ä»èå¨åé¢çè¡å¨æ¶æå计ç®é度ãäºå®ä¸ï¼cacheæ¹æ³æ¯ä½¿ç¨é»è®¤çMEMORY_ONLYçåå¨çº§å«å°RDDæä¹ åå°å åï¼æ ç¼åæ¯ä¸ç§ç¹æ®çæä¹ åãå å åå å¤åå¨å åç设计ï¼ä¾¿å¯ä»¥å¯¹ç¼åRDDæ¶ä½¿ç¨çå ååç»ä¸çè§åå管çï¼åå¨å åçå ¶ä»åºç¨åºæ¯ï¼å¦ç¼åbroadcastæ°æ®ï¼ææ¶ä¸å¨æ¬æç讨论èå´ä¹å ï¼ã
RDDçæä¹ åç±SparkçStorage模å [1] è´è´£ï¼å®ç°äºRDDä¸ç©çåå¨ç解è¦åãStorage模åè´è´£ç®¡çSparkå¨è®¡ç®è¿ç¨ä¸äº§ççæ°æ®ï¼å°é£äºå¨å åæç£çãå¨æ¬å°æè¿ç¨ååæ°æ®çåè½å°è£ äºèµ·æ¥ãå¨å ·ä½å®ç°æ¶Driver端åExecutor端çStorage模åææäºä¸»ä»å¼çæ¶æï¼å³Driver端çBlockManager为Masterï¼Executor端çBlockManager为SlaveãStorage模åå¨é»è¾ä¸ä»¥Block为åºæ¬åå¨åä½ï¼RDDçæ¯ä¸ªPartitionç»è¿å¤çåå¯ä¸å¯¹åºä¸ä¸ªBlockï¼BlockIdçæ ¼å¼ä¸º rdd_RDD-ID_PARTITION-ID ï¼ãMasterè´è´£æ´ä¸ªSparkåºç¨ç¨åºçBlockçå æ°æ®ä¿¡æ¯ç管çåç»´æ¤ï¼èSlaveéè¦å°Blockçæ´æ°çç¶æä¸æ¥å°Masterï¼åæ¶æ¥æ¶Masterçå½ä»¤ï¼ä¾å¦æ°å¢æå é¤ä¸ä¸ªRDDã
å¨å¯¹RDDæä¹ åæ¶ï¼Sparkè§å®äºMEMORY_ONLYãMEMORY_AND_DISKç7ç§ä¸åç åå¨çº§å« ï¼èåå¨çº§å«æ¯ä»¥ä¸5个åéçç»å [2] ï¼
éè¿å¯¹æ°æ®ç»æçåæï¼å¯ä»¥çåºåå¨çº§å«ä»ä¸ä¸ªç»´åº¦å®ä¹äºRDDçPartitionï¼åæ¶ä¹å°±æ¯Blockï¼çåå¨æ¹å¼ï¼
RDDå¨ç¼åå°åå¨å åä¹åï¼Partitionä¸çæ°æ®ä¸è¬ä»¥è¿ä»£å¨ï¼ Iterator ï¼çæ°æ®ç»ææ¥è®¿é®ï¼è¿æ¯Scalaè¯è¨ä¸ä¸ç§éåæ°æ®éåçæ¹æ³ãéè¿Iteratorå¯ä»¥è·åååºä¸æ¯ä¸æ¡åºååæè éåºååçæ°æ®é¡¹(Record)ï¼è¿äºRecordç对象å®ä¾å¨é»è¾ä¸å ç¨äºJVMå å å åçotheré¨åç空é´ï¼åä¸Partitionçä¸åRecordç空é´å¹¶ä¸è¿ç»ã
RDDå¨ç¼åå°åå¨å åä¹åï¼Partition被转æ¢æBlockï¼Recordå¨å å æå å¤åå¨å åä¸å ç¨ä¸åè¿ç»ç空é´ãå°Partitionç±ä¸è¿ç»çåå¨ç©ºé´è½¬æ¢ä¸ºè¿ç»åå¨ç©ºé´çè¿ç¨ï¼Spark称ä¹ä¸ºâå±å¼âï¼Unrollï¼ãBlockæåºåååéåºåå两ç§åå¨æ ¼å¼ï¼å ·ä½ä»¥åªç§æ¹å¼åå³äºè¯¥RDDçåå¨çº§å«ãéåºååçBlock以ä¸ç§DeserializedMemoryEntryçæ°æ®ç»æå®ä¹ï¼ç¨ä¸ä¸ªæ°ç»åå¨ææçJava对象ï¼åºååçBlockå以SerializedMemoryEntryçæ°æ®ç»æå®ä¹ï¼ç¨åèç¼å²åºï¼ByteBufferï¼æ¥åå¨äºè¿å¶æ°æ®ãæ¯ä¸ªExecutorçStorage模åç¨ä¸ä¸ªé¾å¼Mapç»æï¼LinkedHashMapï¼æ¥ç®¡çå å åå å¤åå¨å åä¸ææçBlock对象çå®ä¾ [6] ï¼å¯¹è¿ä¸ªLinkedHashMapæ°å¢åå é¤é´æ¥è®°å½äºå åçç³è¯·åéæ¾ã
å 为ä¸è½ä¿è¯åå¨ç©ºé´å¯ä»¥ä¸æ¬¡å®¹çº³Iteratorä¸çæææ°æ®ï¼å½åç计ç®ä»»å¡å¨Unrollæ¶è¦åMemoryManagerç³è¯·è¶³å¤çUnroll空é´æ¥ä¸´æ¶å ä½ï¼ç©ºé´ä¸è¶³åUnroll失败ï¼ç©ºé´è¶³å¤æ¶å¯ä»¥ç»§ç»è¿è¡ã对äºåºååçPartitionï¼å ¶æéçUnroll空é´å¯ä»¥ç´æ¥ç´¯å 计ç®ï¼ä¸æ¬¡ç³è¯·ãèéåºååçPartitionåè¦å¨éåRecordçè¿ç¨ä¸ä¾æ¬¡ç³è¯·ï¼å³æ¯è¯»åä¸æ¡Recordï¼éæ ·ä¼°ç®å ¶æéçUnroll空é´å¹¶è¿è¡ç³è¯·ï¼ç©ºé´ä¸è¶³æ¶å¯ä»¥ä¸æï¼éæ¾å·²å ç¨çUnroll空é´ãå¦ææç»Unrollæåï¼å½åPartitionæå ç¨çUnroll空é´è¢«è½¬æ¢ä¸ºæ£å¸¸çç¼åRDDçåå¨ç©ºé´ï¼å¦ä¸å¾2æ示ã
å¨ ãSparkå å管ç详解ï¼ä¸ï¼ââå ååé ã çå¾3åå¾5ä¸å¯ä»¥çå°ï¼å¨éæå å管çæ¶ï¼Sparkå¨åå¨å åä¸ä¸é¨ååäºä¸åUnroll空é´ï¼å ¶å¤§å°æ¯åºå®çï¼ç»ä¸å å管çæ¶å没æ对Unroll空é´è¿è¡ç¹å«åºåï¼å½åå¨ç©ºé´ä¸è¶³æ¯ä¼æ ¹æ®å¨æå ç¨æºå¶è¿è¡å¤çã
ç±äºåä¸ä¸ªExecutorçææç计ç®ä»»å¡å ±äº«æéçåå¨å å空é´ï¼å½ææ°çBlockéè¦ç¼åä½æ¯å©ä½ç©ºé´ä¸è¶³ä¸æ æ³å¨æå ç¨æ¶ï¼å°±è¦å¯¹LinkedHashMapä¸çæ§Blockè¿è¡æ·æ±°ï¼Eviction)ï¼è被æ·æ±°çBlockå¦æå ¶åå¨çº§å«ä¸åæ¶å å«åå¨å°ç£ççè¦æ±ï¼åè¦å¯¹å ¶è¿è¡è½çï¼Dropï¼ï¼å¦åç´æ¥å é¤è¯¥Blockã
åå¨å åçæ·æ±°è§å为ï¼
è½ççæµç¨åæ¯è¾ç®åï¼å¦æå ¶åå¨çº§å«ç¬¦å _useDisk 为trueçæ¡ä»¶ï¼åæ ¹æ®å ¶ _deserialized å¤ææ¯å¦æ¯éåºååçå½¢å¼ï¼è¥æ¯åå¯¹å ¶è¿è¡åºååï¼æåå°æ°æ®åå¨å°ç£çï¼å¨Storage模åä¸æ´æ°å ¶ä¿¡æ¯ã
Executorå è¿è¡çä»»å¡åæ ·å ±äº«æ§è¡å åï¼Sparkç¨ä¸ä¸ªHashMapç»æä¿åäºä»»å¡å°å åèè´¹çæ å°ãæ¯ä¸ªä»»å¡å¯å ç¨çæ§è¡å å大å°çèå´ä¸º 1/2N ~ 1/N ï¼å ¶ä¸N为å½åExecutorå æ£å¨è¿è¡çä»»å¡ç个æ°ãæ¯ä¸ªä»»å¡å¨å¯å¨ä¹æ¶ï¼è¦åMemoryManager请æ±ç³è¯·æå°ä¸º1/2Nçæ§è¡å åï¼å¦æä¸è½è¢«æ»¡è¶³è¦æ±å该任å¡è¢«é»å¡ï¼ç´å°æå ¶ä»ä»»å¡éæ¾äºè¶³å¤çæ§è¡å åï¼è¯¥ä»»å¡æå¯ä»¥è¢«å¤éã
æ§è¡å å主è¦ç¨æ¥åå¨ä»»å¡å¨æ§è¡Shuffleæ¶å ç¨çå åï¼Shuffleæ¯æç §ä¸å®è§å对RDDæ°æ®éæ°ååºçè¿ç¨ï¼æ们æ¥çShuffleçWriteåRead两é¶æ®µå¯¹æ§è¡å åç使ç¨ï¼
å¨ExternalSorteråAggregatorä¸ï¼Sparkä¼ä½¿ç¨ä¸ç§å«AppendOnlyMapçåå¸è¡¨å¨å å æ§è¡å åä¸åå¨æ°æ®ï¼ä½å¨Shuffleè¿ç¨ä¸æææ°æ®å¹¶ä¸è½é½ä¿åå°è¯¥åå¸è¡¨ä¸ï¼å½è¿ä¸ªåå¸è¡¨å ç¨çå åä¼è¿è¡å¨ææ§å°éæ ·ä¼°ç®ï¼å½å ¶å¤§å°ä¸å®ç¨åº¦ï¼æ æ³åä»MemoryManagerç³è¯·å°æ°çæ§è¡å åæ¶ï¼Sparkå°±ä¼å°å ¶å ¨é¨å 容åå¨å°ç£çæ件ä¸ï¼è¿ä¸ªè¿ç¨è¢«ç§°ä¸ºæº¢å(Spill)ï¼æº¢åå°ç£ççæ件æåä¼è¢«å½å¹¶(Merge)ã
Shuffle Writeé¶æ®µä¸ç¨å°çTungstenæ¯Databrickså ¬å¸æåºç对Sparkä¼åå ååCPU使ç¨ç计å [4] ï¼è§£å³äºä¸äºJVMå¨æ§è½ä¸çéå¶åå¼ç«¯ãSparkä¼æ ¹æ®Shuffleçæ åµæ¥èªå¨éæ©æ¯å¦éç¨TungstenæåºãTungstenéç¨ç页å¼å å管çæºå¶å»ºç«å¨MemoryManagerä¹ä¸ï¼å³Tungsten对æ§è¡å åç使ç¨è¿è¡äºä¸æ¥çæ½è±¡ï¼è¿æ ·å¨Shuffleè¿ç¨ä¸æ éå ³å¿æ°æ®å ·ä½åå¨å¨å å è¿æ¯å å¤ãæ¯ä¸ªå å页ç¨ä¸ä¸ªMemoryBlockæ¥å®ä¹ï¼å¹¶ç¨ Object obj å long offset è¿ä¸¤ä¸ªåéç»ä¸æ è¯ä¸ä¸ªå å页å¨ç³»ç»å åä¸çå°åãå å çMemoryBlockæ¯ä»¥longåæ°ç»çå½¢å¼åé çå åï¼å ¶ obj çå¼ä¸ºæ¯è¿ä¸ªæ°ç»ç对象å¼ç¨ï¼ offset æ¯longåæ°ç»çå¨JVMä¸çåå§å移å°åï¼ä¸¤è é å使ç¨å¯ä»¥å®ä½è¿ä¸ªæ°ç»å¨å å çç»å¯¹å°åï¼å å¤çMemoryBlockæ¯ç´æ¥ç³è¯·å°çå ååï¼å ¶ obj 为nullï¼ offset æ¯è¿ä¸ªå ååå¨ç³»ç»å åä¸çä½ç»å¯¹å°åãSparkç¨MemoryBlockå·§å¦å°å°å å åå å¤å å页ç»ä¸æ½è±¡å°è£ ï¼å¹¶ç¨é¡µè¡¨(pageTable)管çæ¯ä¸ªTaskç³è¯·å°çå å页ã
Tungsten页å¼ç®¡çä¸çææå åç¨ä½çé»è¾å°å表示ï¼ç±é¡µå·å页å å移éç»æï¼
æäºç»ä¸ç寻åæ¹å¼ï¼Sparkå¯ä»¥ç¨ä½é»è¾å°åçæéå®ä½å°å å æå å¤çå åï¼æ´ä¸ªShuffle Writeæåºçè¿ç¨åªéè¦å¯¹æéè¿è¡æåºï¼å¹¶ä¸æ éååºååï¼æ´ä¸ªè¿ç¨é常é«æï¼å¯¹äºå å访é®æçåCPU使ç¨æç带æ¥äºææ¾çæå [5] ã
Sparkçåå¨å ååæ§è¡å åæçæªç¶ä¸åç管çæ¹å¼ï¼å¯¹äºåå¨å åæ¥è¯´ï¼Sparkç¨ä¸ä¸ªLinkedHashMapæ¥éä¸ç®¡çææçBlockï¼Blockç±éè¦ç¼åçRDDçPartition转åèæï¼è对äºæ§è¡å åï¼Sparkç¨AppendOnlyMapæ¥åå¨Shuffleè¿ç¨ä¸çæ°æ®ï¼å¨Tungstenæåºä¸çè³æ½è±¡æ为页å¼å å管çï¼å¼è¾äºå ¨æ°çJVMå å管çæºå¶ã
Sparkçå å管çæ¯ä¸å¥å¤æçæºå¶ï¼ä¸Sparkççæ¬æ´æ°æ¯è¾å¿«ï¼ç¬è æ°´å¹³æéï¼é¾å æåè¿°ä¸æ¸ ãé误çå°æ¹ï¼è¥è¯»è æ好ç建议åæ´æ·±çç解ï¼è¿æä¸åèµæã
spark sql源码系列 | with as 语句真的会把查询的数据存内存嘛?
在探讨 Spark SQL 中 with...as 语句是否真的会把查询的数据存入内存之前,我们需要理清几个关键点。首先,网上诸多博客常常提及 with...as 语句会将数据存放于内存中,来提升性能。那么,实际情况究竟如何呢?
让我们以 hive-sql 的视角来解答这一问题。在 hive 中,有一个名为 `hive.optimize.cte.materialize.threshold` 的参数。默认情况下,其值为 -1,代表关闭。当值大于 0 时(如设置为 2),with...as 语句生成的表将在被引用次数达到设定值后物化,从而确保 with...as 语句仅执行一次,进而提高效率。
接下来,我们通过具体测试来验证上述结论。在不调整该参数的情况下,执行计划显示 test 表被读取了两次。此时,我们将参数调整为 `set hive.optimize.cte.materialize.threshold=1`,执行计划显示了 test 表被物化的情况,表明查询结果已被缓存。
转而观察 Spark SQL 端,我们并未发现相关优化参数。Spark 对 with...as 的操作相对较少,在源码层面,咖啡源码通过获取元数据时所做的参数判断(如阈值与 cte 引用次数),我们可以发现 Spark 在这个逻辑上并未提供明确的优化机制,来专门针对 with...as 语句进行高效管理。
综上所述,通过与 hive-sql 的对比以及深入源码分析,我们得出了 with...as 语句在 Spark SQL 中是否把数据存入内存的结论,答案并不是绝对的。关键在于是否通过参数调整来物化结果,以及 Spark 在自身框架层面并未提供特定优化策略来针对 with...as 语句进行内存管理。因此,正确使用 with...as 语句并结合具体业务场景,灵活调整优化参数策略,是实现性能提升的关键。
源码解析Spark中的Parquet高性能向量化读
在Spark中,Parquet的高性能向量化读取是自2.0版本开始引入的特性。它与传统的逐行读取和解码不同,采用列式批处理方式,显著提升了列解码的速度,据Databricks测试,速度比非向量化版本快了9倍。本文将深入解析Spark的源码,揭示其如何支持向量化Parquet文件读取。
Spark的向量化读取主要依赖于ColumnBatch和ColumnVector数据结构。ColumnBatch是每次读取返回的批量数据容器,其中包含一个ColumnVectors数组,每个ColumnVector负责存储一批数据中某一列的所有值。这种设计使得数据可以按列进行高效访问,同时也提供按行的视图,通过InternalRow对象逐行处理。
在读取过程中,Spark通过VectorizedParquetRecordReader、VectorizedColumnReader和VectorizedValuesReader三个组件协同工作。VectorizedParquetRecordReader负责启动批量读取,它根据指定的批次大小和内存模式创建实例。VectorizedColumnReader和VectorizedValuesReader则负责实际的列值读取,根据列的类型和编码进行相应的解码处理。
值得注意的是,Spark在数据加载时会重复使用ColumnBatch和ColumnVector实例,以减少内存占用,优化计算效率。ColumnVector支持堆内存和堆外内存,yum源码以适应不同的存储需求。通过这些优化,向量化读取在处理大型数据集时表现出色,尤其是在性能上。
然而,尽管Spark的向量化读取已经非常高效,Iceberg中的Parquet向量化读取可能更快,这可能涉及到Iceberg对Parquet文件的特定优化,或者其在数据处理流程中的其他改进,但具体原因需要进一步深入分析才能揭示。
spark原理系列 broadcast广播原理优缺点示例源码权威讲解
Spark广播(broadcast)的原理是通过将一个只读变量从驱动程序发送到集群上的所有工作节点,以便在运行任务时能够高效地访问这个变量。广播变量只会被发送一次,并且在工作节点上缓存,以供后续任务重用。
这种方式可以避免在任务执行期间多次传输相同的数据,从而提高性能和效率。
在Spark中,广播变量的实现主要依赖于DriverEndpoint和ExecutorEndpoint之间的通信机制。
具体来说,当驱动程序将广播变量发送给工作节点时,它会使用BlockManager将序列化的块存储在内存中,并将块的元数据注册到BlockManagerMaster。
然后,当工作节点执行任务时,它会向BlockManagerMaster请求获取广播变量的块,并从本地BlockManager中获取这些块的数据。这样,每个工作节点都可以在本地快速访问广播变量的数据。
总结起来,Spark广播的实现涉及驱动程序对广播变量进行序列化和发送,以及工作节点接收、反序列化和缓存广播变量的块。这种机制有效地将只读数据分发到集群上的所有工作节点,提高了任务执行的性能和效率。
广播变量在以下场景中非常有用:
总之,广播变量适用于需要在多个任务之间共享只读数据,并且能够提供更高效的数据访问和减少网络传输开销的情况。通过使用广播变量,可以提高Spark应用程序的性能和效率。
虽然广播在分布式计算中有很多优点,lstm源码但它也存在一些缺点:
因此,在使用广播变量时需要考虑其局限性和适用场景。如果数据集较大,实时性要求高,或者需要频繁修改数据,可能需要考虑其他替代方案来避免广播的缺点。
示例源码broadcast方法
功能:将只读变量广播到集群,返回一个Broadcast对象以在分布式函数中进行读取变量将仅发送一次到每个执行器,同时调用了内部的方法broadcastInternal
基础类Broadcast抽象类
Broadcast 是 Spark 中的一个广播变量类。广播变量允许程序员在每台机器上缓存一个只读的变量,而不是将它与任务一起传输。通过使用广播变量,可以以高效的方式为每个节点提供大型输入数据集的副本。
Broadcast 类的构造函数接收一个唯一标识符 id,用于标识广播变量。
Broadcast 类是一个抽象类,有以下几个主要方法:
Broadcast 类还定义了一些受保护的方法,用于实际获取广播变量的值、取消持久化广播变量的值以及销毁广播变量的状态。
Broadcast 类还具有 _isValid 和 _destroySite 两个私有变量,分别表示广播变量是否有效(即尚未销毁)以及销毁广播变量的位置信息。
总体来说,Broadcast 类提供了管理广播变量的功能,并确保广播变量的正确使用和销毁。
实现类TorrentBroadcast
TorrentBroadcast 是使用类似 BitTorrent 协议实现的 Broadcast 的具体实现(目前spark中只有一种实现)。它继承自 Broadcast 类,并提供以下功能:
TorrentBroadcast 包含以下主要成员变量和方法:
TorrentBroadcast 通过将广播数据分成小块并使用类似 BitTorrent 的协议进行分布式传输,以提高广播性能和可靠性。它允许在集群中高效地广播大量数据,并减少了驱动程序的负载。
内部版本广播方法broadcastInternal
该方法是spark内部版本的广播 - 将只读变量广播到集群,变量将仅发送一次到每个执行器。该方法中使用了broadcastManager对象中的newBroadcast创建广播变量
broadcastManager初始化和创建广播对象初始化
BroadcastManager构造函数会调用自身的initialize方法,创建一个TorrentBroadcastFactory实例.对象在实例化时,会自动调用自身的writeBlocks,把数据写入blockManager:
使用了实现了BroadcastFactory接口的TorrentBroadcastFactory工厂方法。TorrentBroadcastFactory 是一个使用类似 BitTorrent 的协议来进行广播数据分布式传输的广播工厂。
创建广播变量
TorrentBroadcastFactory实例通过调用newBroadcast() 方法创建新的 TorrentBroadcast对象即广播变量。 可以参考上文实现类
源码拓展BroadcastManager对象
BroadcastManager 是 Spark 中负责管理广播变量的类。它包含以下主要功能:
此外,BroadcastManager 还包含了一些内部变量,如下:
总而言之,BroadcastManager 提供了广播变量的管理和操作功能,确保广播变量能够在集群中高效地分发和访问。
BroadcastFactory接口
BroadcastFactory 是 Spark 中所有广播实现的接口,用于允许多个广播实现。它定义了以下方法:
通过实现BroadcastFactory 接口,可以自定义广播实现,并在 SparkContext 中使用相应的广播工厂来实例化广播变量。
TorrentBroadcastFactory
TorrentBroadcastFactory 是一个使用类似 BitTorrent 的协议来进行广播数据分布式传输的广播工厂。它实现了 BroadcastFactory 接口,并提供以下功能:
TorrentBroadcastFactory 主要用于支持使用 BitTorrent-like 协议进行分布式传输的广播操作,以提高广播数据在集群中的传输效率和可靠性。
BitTorrent 协议
BitTorrent 是一种流行的文件分享协议,它使用了一种名为 "块链" 的技术。块链技术通常用于比特币等加密货币,但在 BitTorrent 中,它用于分发大型文件。
BitTorrent 的工作原理
初始化: 当一个用户想要下载一个文件时,他首先创建一个 "种子" 文件,这个文件包含该文件的所有块的哈希列表。 查找: 下载者使用 BitTorrent 客户端软件查找其他下载者,并请求他们分享文件块。 交换: 下载者与其他下载者交换文件块。每个下载者不仅下载文件,还同时通过上传已下载的块来帮助其他下载者。 完整性: 每个块都有一个哈希值,用于验证块的完整性。如果某个块的哈希值不匹配,则该块被认为是无效的,需要重新下载。
块链技术
BitTorrent 使用块链来确保每个块的完整性。每个块都包含前一个块的哈希值,这使得整个文件的所有块形成了一个链。如果某个块被修改或损坏,它的哈希值将不再匹配,BitTorrent 客户端将自动从其他下载者那里请求一个新的块。
安全性
BitTorrent 协议不使用加密,这意味着在交换文件块时,你的数据可能被第三方监听。为了提高安全性,你可以使用一个加密的 BitTorrent 客户端,如 BitTorrent Secure。
总结
BitTorrent 协议是一种高效的文件分享协议,它使用块链技术来保证文件块的完整性和安全性。然而,由于其不加密的特点,它可能不适合传输敏感信息。
大数据面试题-Spark的内存模型
面试题来源:可回答:1)Spark内存管理的结构;2)Spark的Executor内存分布(参考“内存空间分配”)
1、堆内和堆外内存规划
作为一个JVM 进程,Executor 的内存管理建立在JVM的内存管理之上,Spark对JVM的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。
堆内内存受到JVM统一管理,堆外内存是直接向操作系统进行内存的申请和释放。
默认情况下,Spark 仅仅使用了堆内内存。Executor 端的堆内内存区域大致可以分为以下四大块:堆内内存的大小,由Spark应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划。
Spark对堆内内存的管理是一种逻辑上的”规划式”的管理。不同管理模式下,这三部分占用的空间大小各不相同。
堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
利用JDK Unsafe API(从Spark 2.0开始),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。
在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。
2、内存空间分配
静态内存管理与统一内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域。
统一内存管理的堆内内存结构如图所示:其中最重要的优化在于动态占用机制。统一内存管理的堆外内存结构如下图所示。
凭借统一内存管理机制,Spark 在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护 Spark 内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能。
3、存储内存管理
RDD的持久化机制
弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合。RDD的持久化由 Spark的Storage模块负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和 Executor 端的Storage模块构成了主从式的架构。
在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK 等7种不同的存储级别,而存储级别是以下5个变量的组合。
通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的 Partition(同时也就是Block)的存储方式。
4、执行内存管理
执行内存主要用来存储任务在执行Shuffle时占用的内存。
若在map端选择普通的排序方式,会采用ExternalSorter进行外排,在内存中存储数据时主要占用堆内执行空间。
若在map端选择 Tungsten 的排序方式,则采用ShuffleExternalSorter直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
在Shuffle Write 阶段中用到的Tungsten是Databricks公司提出的对Spark优化内存和CPU使用的计划。在Shuffle过程中,Spark会根据Shuffle的情况来自动选择是否采用Tungsten排序。
Tungsten 采用的页式内存管理机制建立在MemoryManager之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。每个内存页用一个MemoryBlock来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。
Sparkåç | å å管ç
Sparkä½ä¸ºä¸ä¸ªåºäºå åçåå¸å¼è®¡ç®å¼æï¼å ¶å å管ç模åå¨æ´ä¸ªç³»ç»ä¸æ®æ¼çé常éè¦çè§è²ã
å¨æ§è¡Sparkçåºç¨ç¨åºæ¶ï¼Sparké群ä¼å¯å¨DriveråExecutor两ç§JVMè¿ç¨ï¼
Spark管ççå å主è¦åå为4个åºåï¼
Executorä½ä¸ºä¸ä¸ªJVMè¿ç¨ï¼å®çå å管ç建ç«å¨JVMçå å管çä¹ä¸ï¼Spark对JVMçå å ï¼On-heapï¼ç©ºé´è¿è¡äºæ´ä¸ºè¯¦ç»çåé ï¼ä»¥å åå©ç¨å åãåæ¶ï¼Sparkå¼å ¥äºå å¤ï¼Off-heapï¼å åï¼ä½¿ä¹å¯ä»¥ç´æ¥å¨å·¥ä½èç¹çç³»ç»å åä¸å¼è¾ç©ºé´ï¼è¿ä¸æ¥ä¼åäºå åç使ç¨ã
å å å åç大å°ï¼ç± Spark åºç¨ç¨åºå¯å¨æ¶ç executor-memory æ spark.executor.memory åæ°é ç½®ãExecutor å è¿è¡ç并åä»»å¡å ±äº« JVM å å å åï¼è¿äºä»»å¡å¨ç¼å RDD æ°æ®å广æï¼Broadcastï¼æ°æ®æ¶å ç¨çå å被è§å为åå¨ï¼Storageï¼å åï¼èè¿äºä»»å¡å¨æ§è¡ Shuffle æ¶å ç¨çå å被è§å为æ§è¡ï¼Executionï¼å åï¼å©ä½çé¨åä¸åç¹æ®è§åï¼é£äº Spark å é¨ç对象å®ä¾ï¼æè ç¨æ·å®ä¹ç Spark åºç¨ç¨åºä¸ç对象å®ä¾ï¼åå ç¨å©ä½ç空é´ãä¸åç管ç模å¼ä¸ï¼è¿ä¸é¨åå ç¨ç空é´å¤§å°åä¸ç¸åã
Spark 对å å å åç管çæ¯ä¸ç§é»è¾ä¸ç"è§åå¼"ç管çï¼å 为对象å®ä¾å ç¨å åçç³è¯·åéæ¾é½ç± JVM å®æï¼Spark åªè½å¨ç³è¯·ååéæ¾åè®°å½è¿äºå åï¼æ们æ¥çå ¶å ·ä½æµç¨ï¼
为äºè¿ä¸æ¥ä¼åå åç使ç¨ä»¥åæé« Shuffle æ¶æåºçæçï¼Spark å¼å ¥äºå å¤ï¼Off-heapï¼å åï¼ä½¿ä¹å¯ä»¥ç´æ¥å¨å·¥ä½èç¹çç³»ç»å åä¸å¼è¾ç©ºé´ï¼åå¨ç»è¿åºååçäºè¿å¶æ°æ®ãå©ç¨ JDK Unsafe APIï¼ä» Spark 2.0 å¼å§ï¼ï¼å¨ç®¡çå å¤çåå¨å åæ¶ä¸ååºäº Tachyonï¼èæ¯ä¸å å¤çæ§è¡å åä¸æ ·ï¼åºäº JDK Unsafe API å®ç°ï¼Spark å¯ä»¥ç´æ¥æä½ç³»ç»å å¤å åï¼åå°äºä¸å¿ è¦çå åå¼éï¼ä»¥åé¢ç¹ç GC æ«æååæ¶ï¼æåäºå¤çæ§è½ãå å¤å åå¯ä»¥è¢«ç²¾ç¡®å°ç³è¯·åéæ¾ï¼èä¸åºååçæ°æ®å ç¨ç空é´å¯ä»¥è¢«ç²¾ç¡®è®¡ç®ï¼æ以ç¸æ¯å å å åæ¥è¯´éä½äºç®¡ççé¾åº¦ï¼ä¹éä½äºè¯¯å·®ã
å¨é»è®¤æ åµä¸å å¤å å并ä¸å¯ç¨ï¼å¯éè¿é ç½® spark.memory.offHeap.enabled åæ°å¯ç¨ï¼å¹¶ç± spark.memory.offHeap.size åæ°è®¾å®å å¤ç©ºé´ç大å°ãé¤äºæ²¡æ other 空é´ï¼å å¤å åä¸å å å åçååæ¹å¼ç¸åï¼ææè¿è¡ä¸ç并åä»»å¡å ±äº«åå¨å ååæ§è¡å åã
Spark 1.6 ä¹åé»è®¤ä¸ºç»ä¸ç®¡çï¼UnifiedMemoryManagerï¼æ¹å¼ï¼1.6 ä¹åéç¨çéæ管çï¼StaticMemoryManagerï¼æ¹å¼ä»è¢«ä¿çï¼å¯éè¿é ç½® spark.memory.useLegacyMode=true åæ°å¯ç¨éæå å管çæ¹å¼ãä¸é¢æ们ä»ç»ä¸ä¸¤ç§å å管ç模åçè¿åã
å¨ Spark æåéç¨çéæå å管çæºå¶ä¸ï¼åå¨å åãæ§è¡å ååå ¶ä»å åç大å°å¨ Spark åºç¨ç¨åºè¿è¡æé´å为åºå®çï¼ä½ç¨æ·å¯ä»¥åºç¨ç¨åºå¯å¨åè¿è¡é ç½®ï¼å å å åçåé å¦ä¸æ示ï¼
Spark 1.6 ä¹åå¼å ¥çç»ä¸å å管çæºå¶ï¼ä¸éæå å管ççåºå«å¨äºåå¨å ååæ§è¡å åå ±äº«åä¸å空é´ï¼å¯ä»¥å¨æå ç¨å¯¹æ¹ç空é²åºåãå¦ä¸å¾æ示ï¼
å ¶ä¸æéè¦çä¼åå¨äºå¨æå ç¨æºå¶ï¼å ¶è§åå¦ä¸ï¼
æ°ççæ¬å¼å ¥äºæ°çé 置项ï¼
ååç»ä¸å å管çæºå¶ï¼Spark å¨ä¸å®ç¨åº¦ä¸æé«äºå å åå å¤å åèµæºçå©ç¨çï¼éä½äºå¼åè ç»´æ¤ Spark å åçé¾åº¦ï¼ä½å¹¶ä¸æå³çå¼åè å¯ä»¥é«ææ 忧ãè¬å¦ï¼æ以å¦æåå¨å åç空é´å¤ªå¤§æè 说ç¼åçæ°æ®è¿å¤ï¼åèä¼å¯¼è´é¢ç¹çå ¨éåå¾åæ¶ï¼éä½ä»»å¡æ§è¡æ¶çæ§è½ï¼å 为ç¼åç RDD æ°æ®é常é½æ¯é¿æé©»çå åçãæ以è¦æ³å ååæ¥ Spark çæ§è½ï¼éè¦å¼åè è¿ä¸æ¥äºè§£åå¨å ååæ§è¡å ååèªç管çæ¹å¼åå®ç°åçã
Sparkä¸cacheåpersistçåºå«
cache
ããé»è®¤æ¯å°æ°æ®åæ¾å°å åä¸ï¼ææ§è¡
ããdef cache(): this.type = persist()
ããpersist
ããå¯ä»¥æå®æä¹ åç级å«ã
ããæ常ç¨çæ¯MEMORY_ONLYåMEMORY_AND_DISKã
ããâ_2â表示æå¯æ¬æ°ãå°½éé¿å 使ç¨_2åDISK_ONLY级å«
ããcacheåpersistç注æç¹
ãã1.é½æ¯ææ§è¡(æçå«å»¶è¿æ§è¡)ï¼éè¦action触åæ§è¡ï¼æå°åä½æ¯partition
ãã2.对ä¸ä¸ªRDDè¿è¡cacheæè persistä¹åï¼ä¸æ¬¡ç´æ¥ä½¿ç¨è¿ä¸ªåéï¼å°±æ¯ä½¿ç¨æä¹ åçæ°æ®
ãã3.å¦æ使ç¨ç¬¬äºç§æ¹å¼ï¼ä¸è½ç´§è·actionç®å
[SPARK][SQL] Tungsten Codegen优势与表达式生成
今天,我们将深入探讨Spark中的CodeGen,特别是在Tungsten框架下的优势与表达式生成。
Tungsten,启动于年,旨在优化CPU和内存使用。在对数据处理瓶颈进行分析后,发现主要瓶颈并非来自I/O或网络问题,而是CPU和内存限制。Tungsten通过代码生成,利用现代编译器和CPU的性能,优化了数据处理效率。
Tungsten的CodeGen分为两个主要部分:基本表达式代码生成与全阶段代码生成(WSCG)。基本表达式代码生成针对的是最简单的运算任务,而WSCG则进一步整合同一Stage内的操作符,生成统一的代码块,将所有计算融合为一个函数。
在Spark中,优化主要分为静态优化和动态优化。静态优化包括基于规则和基于代价的优化,目标是在执行前确定最优执行计划。动态优化,则通过自适应执行优化(AQE)等技术,在执行过程中调整计划,消除数据倾斜、降低IO,提高资源利用率。
Spark Codegen的优势主要体现在两个方面:基本表达式的代码生成与全阶段代码生成。基本代码生成将表达式转化为独立的处理模块;全阶段代码生成则将多个RDD的计算整合,生成一次性执行的代码,大幅提升计算效率。
与传统SQL执行的火山模型相比,Codegen的优势主要体现在:无虚函数调用,内存中的中间数据与CPU寄存器操作,以及循环展开与SIMD优化。在火山模型中,每次操作符处理数据需要保存在内存中,而Codegen则将数据直接放入寄存器,减少内存访问延迟。同时,编译器和CPU能够高效处理简单循环,避免了复杂函数调用带来的开销。
Codegen的实现依赖于Janino动态编译器,该工具可以快速编译Java代码。在Codegen中,表达式被转换为特定的Java代码,省去了对象创建与函数调用的开销,提高了计算效率。
在使用Janino实现表达式生成时,Spark定义了CodegenContext来管理生成代码所需的信息,如变量、函数和对象定义。表达式代码生成的过程涉及到将表达式绑定到输入模式、规范表达式、并生成实际的Java代码。
对于Project(投影)操作的代码生成,Spark在关闭全阶段代码生成时,会调用特定的子类来生成Java代码,实现高效的数据投影逻辑。Project操作涉及到将输出列与输入模式绑定,生成Java代码以实现数据的投影与输出。
总结而言,Tungsten Codegen通过静态和动态优化策略,显著提升了Spark的计算效率。它通过基本表达式代码生成与全阶段代码生成,有效减少了虚函数调用,优化了内存访问,以及利用SIMD特性,实现了高性能的数据处理。