1.rddçç¹ç¹
2.Apache 两个开源项目比较:Flink vs Spark
3.请问各位大神,源码spark的ml和mllib两个包区别和联系?!?
4.SPARK-38864 - Spark支持unpivot源码分析
5.RDDï¼DataFrameåDataSetçåºå«
6.è°è°RDDï¼DataFrameï¼Datasetçåºå«ååèªçä¼å¿
rddçç¹ç¹
rddçç¹ç¹å¦ä¸ï¼
1ãRDDæ¯Sparkæä¾çæ ¸å¿æ½è±¡ï¼å ¨ç§°ä¸ºResillientDistributedDatasetï¼å³å¼¹æ§åå¸å¼æ°æ®éã
2ãRDDå¨æ½è±¡ä¸æ¥è¯´æ¯ä¸ç§å ç´ éåï¼å å«äºæ°æ®ãå®æ¯è¢«ååºçï¼å为å¤ä¸ªååºï¼æ¯ä¸ªååºåå¸å¨é群ä¸çä¸åèç¹ä¸ï¼ä»è让RDDä¸çæ°æ®å¯ä»¥è¢«å¹¶è¡æä½ã
3ãRDDé常éè¿Hadoopä¸çæ件ï¼å³HDFSæ件æè Hive表ï¼æ¥è¿è¡å建ï¼ææ¶ä¹å¯ä»¥éè¿åºç¨ç¨åºä¸çéåæ¥å建ã
4ãRDDæéè¦çç¹æ§å°±æ¯ï¼æä¾äºå®¹éæ§ï¼å¯ä»¥èªå¨ä»èç¹å¤±è´¥ä¸æ¢å¤è¿æ¥ãå³å¦ææ个èç¹ä¸çRDDpartitionï¼å 为èç¹æ éï¼å¯¼è´æ°æ®ä¸¢äºï¼é£ä¹RDDä¼èªå¨éè¿èªå·±çæ°æ®æ¥æºéæ°è®¡ç®è¯¥partitionãè¿ä¸å对使ç¨è æ¯éæçã
5ãRDDçæ°æ®é»è®¤æ åµä¸åæ¾å¨å åä¸çï¼ä½æ¯å¨å åèµæºä¸è¶³æ¶ï¼Sparkä¼èªå¨å°RDDæ°æ®åå ¥ç£çã
Apache 两个开源项目比较:Flink vs Spark
时间久远,我对云计算与大数据已感生疏,分析尤其是源码Flink的崛起。自动驾驶平台需云计算支撑,分析包括机器学习、源码深度学习训练、分析100000000源码高清地图、源码模拟仿真模块,分析以及车联网。源码近日看到一篇Spark与Flink的分析比较文章,遂转发分享,源码以便日后重新学习该领域新知识。分析
Apache Flink作为新一代通用大数据处理引擎,源码致力于整合各类数据负载。分析它似乎与Apache Spark有着相似目标。源码两者都旨在构建一个单一平台,用于批处理、流媒体、交互式、图形处理、机器学习等。因此,Flink与Spark在理念上并无太大差异。但在实施细节上,它们却存在显著区别。
以下比较Spark与Flink的不同之处。尽管两者在某些方面存在相似之处,但也有许多不同之处。ag家网源码
1. 抽象
在Spark中,批处理采用RDD抽象,而流式传输使用DStream。Flink为批处理数据集提供数据集抽象,为流应用程序提供DataStream。尽管它们听起来与RDD和DStreams相似,但实际上并非如此。
以下是差异点:
在Spark中,RDD在运行时表示为Java对象。随着project Tungsten的推出,它略有变化。但在Apache Flink中,数据集被表示为一个逻辑计划。这与Spark中的Dataframe相似,因此在Flink中可以像使用优化器优化的一等公民那样使用API。然而,Spark RDD之间并不进行任何优化。
Flink的数据集类似Spark的Dataframe API,在执行前进行了优化。
在Spark 1.6中,数据集API被添加到spark中,可能最终取代RDD抽象。
在Spark中,所有不同的抽象,如DStream、Dataframe都建立在RDD抽象之上。但在Flink中,红绿指标源码推荐Dataset和DataStream是基于顶级通用引擎构建的两个独立抽象。尽管它们模仿了类似的API,但在DStream和RDD的情况下,无法将它们组合在一起。尽管在这方面有一些努力,但最终结果还不够明确。
无法将DataSet和DataStream组合在一起,如RDD和DStreams。
因此,尽管Flink和Spark都有类似的抽象,但它们的实现方式不同。
2. 内存管理
直到Spark 1.5,Spark使用Java堆来缓存数据。虽然项目开始时更容易,但它导致了内存不足(OOM)问题和垃圾收集(gc)暂停。因此,从1.5开始,Spark进入定制内存管理,称为project tungsten。
Flink从第一天起就开始定制内存管理。实际上,这是Spark向这个方向发展的灵感之一。不仅Flink将数据存储在它的自定义二进制布局中,它确实直接对二进制数据进行操作。在Spark中,所有数据帧操作都直接在Spark 1.5的project tungsten二进制数据上运行。
在JVM上执行自定义内存管理可以提高性能并提高资源利用率。源码测试运营
3. 实施语言
Spark在Scala中实现。它提供其他语言的API,如Java、Python和R。
Flink是用Java实现的。它确实提供了Scala API。
因此,与Flink相比,Spark中的选择语言更好。在Flink的一些scala API中,java抽象也是API的。这会有所改进,因为已经使scala API获得了更多用户。
4. API
Spark和Flink都模仿scala集合API。所以从表面来看,两者的API看起来非常相似。
5. 流
Apache Spark将流式处理视为快速批处理。Apache Flink将批处理视为流处理的特殊情况。这两种方法都具有令人着迷的含义。
以下是两种不同方法的差异或含义:
Apache Flink提供事件级处理,也称为实时流。它与Storm模型非常相似。
Spark只有不提供事件级粒度的最小批处理(mini-batch)。这种方法被称为近实时。
Spark流式处理是更快的批处理,Flink批处理是有限的流处理。
虽然大多数应用程序都可以近乎实时地使用,exe格式源码查询但很少有应用程序需要事件级实时处理。这些应用程序通常是Storm流而不是Spark流。对于他们来说,Flink将成为一个非常有趣的选择。
运行流处理作为更快批处理的优点之一是,我们可以在两种情况下使用相同的抽象。Spark非常支持组合批处理和流数据,因为它们都使用RDD抽象。
在Flink的情况下,批处理和流式传输不共享相同的API抽象。因此,尽管有一些方法可以将基于历史文件的数据与流相结合,但它并不像Spark那样干净。
在许多应用中,这种能力非常重要。在这些应用程序中,Spark代替Flink流式传输。
由于最小批处理的性质,Spark现在对窗口的支持非常有限。允许根据处理时间窗口批量处理。
与其他任何系统相比,Flink提供了非常灵活的窗口系统。Window是Flink流API的主要焦点之一。它允许基于处理时间、数据时间和无记录等的窗口。这种灵活性使Flink流API与Spark相比非常强大。
6. SQL界面
截至目前,最活跃的Spark库之一是spark-sql。Spark提供了像Hive一样的查询语言和像DSL这样的Dataframe来查询结构化数据。它是成熟的API并且在批处理中广泛使用,并且很快将在流媒体世界中使用。
截至目前,Flink Table API仅支持DSL等数据帧,并且仍处于测试阶段。有计划添加sql接口,但不确定何时会落在框架中。
目前为止,Spark与Flink相比有着不错的SQL故事。
7. 数据源集成
Spark数据源API是框架中最好的API之一。数据源API使得所有智能资源如NoSQL数据库、镶嵌木地板、优化行列(Optimized Row Columnar,ORC)成为Spark上的头等公民。此API还提供了在源级执行谓词下推(predicate push down)等高级操作的功能。
Flink仍然在很大程度上依赖于map / reduce InputFormat来进行数据源集成。虽然它是足够好的提取数据API,但它不能巧妙地利用源能力。因此Flink目前落后于目前的数据源集成技术。
8. 迭代处理
Spark最受关注的功能之一就是能够有效地进行机器学习。在内存缓存和其他实现细节中,它是实现机器学习算法的真正强大的平台。
虽然ML算法是循环数据流,但它表示为Spark内部的直接非循环图。通常,没有分布式处理系统鼓励循环数据流,因为它们变得难以理解。
但是Flink对其他人采取了一些不同的方法。它们在运行时支持受控循环依赖图(cyclic dependence graph)。这使得它们与DAG表示相比以非常有效的方式表示ML算法。因此,Flink支持本机平台中的迭代,与DAG方法相比,可实现卓越的可扩展性和性能。
9. 流作为平台与批处理作为平台
Apache Spark来自Map / Reduce时代,它将整个计算表示为数据作为文件集合的移动。这些文件可能作为磁盘上的阵列或物理文件驻留在内存中。这具有非常好的属性,如容错等。
但是Flink是一种新型系统,它将整个计算表示为流处理,其中数据有争议地移动而没有任何障碍。这个想法与像akka-streams这样的新的反应流系统非常相似。
. 成熟
Flink像批处理这样的部分已经投入生产,但其他部分如流媒体、Table API仍在不断发展。这并不是说在生产中就没人使用Flink流。
请问各位大神,spark的ml和mllib两个包区别和联系?!?
在技术角度上,Spark的ML和Mllib包处理数据集的方式不同。ML包面向的是Dataset,具体来说是Dataframe,而Mllib则直接面对RDD。Dataset和RDD之间的区别在于,Dataset是在RDD基础上进行深度优化的版本。
Dataset优化了性能和静态类型分析,提供了类似于SQL语言的功能,能够在编译时捕获错误。相比于RDD,Dataset的combinators(如map和foreach等)性能表现更优。
在编程过程中,构建机器学习算法的方式也有所不同。ML包提倡使用pipelines进行数据处理。想象数据如同水流,从管道的一端流入,另一端流出。具体实现为:DataFrame --> Pipeline --> 新DataFrame。Pipeline是通过连接Transformer和Estimator实现的数据处理流程。
Transformer的输入是DataFrame,输出同样是DataFrame。而Estimator的输入是DataFrame,输出则是一个Transformer。这种流程使得数据处理逻辑清晰,易于理解和维护。
SPARK- - Spark支持unpivot源码分析
unpivot是数据库系统中用于列转行的内置函数,如SQL SERVER, Oracle等。以数据集tb1为例,每个数字代表某个人在某个学科的成绩。若要将此表扩展为三元组,可使用union实现。但随列数增加,SQL语句变长。许多SQL引擎提供内置函数unpivot简化此过程。unpivot使用时需指定保留列、进行转行的列、新列名及值列名。
SPARK从SPARK-版本开始支持DataSet的unpivot函数,逐步扩展至pyspark与SQL。在Dataset API中,ids为要保留的Column数组,Column类提供了从String构造Column的隐式转换,方便使用。利用此API,可通过unpivot函数将数据集转换为所需的三元组。values表示转行列,variableColumnName为新列名,valueColumnName为值列名。
Analyser阶段解析unpivot算子,将逻辑执行计划转化为物理执行计划。当用户开启hive catalog,SPARK SQL根据表名和metastore URL查找表元数据,转化为Hive相关逻辑执行计划。物理执行计划如BroadcastHashJoinExec,表示具体的执行策略。规则ResolveUnpivot将包含unpivot的算子转换为Expand算子,在物理执行计划阶段执行。此转换由开发者自定义规则完成,通过遍历逻辑执行计划树,根据节点类型及状态进行不同处理。
unpivot函数实现过程中,首先将原始数据集投影为包含ids、variableColumnName、valueColumnName的列,实现语义转换。随后,通过map函数处理values列,构建新的行数据,最终返回Expand算子。在物理执行计划阶段,Expand算子将数据转换为所需形式,实现unpivot功能。
综上所述,SPARK内置函数unpivot的实现通过解析列参数,组装Expand算子完成,为用户提供简便的列转行功能。通过理解此过程,可深入掌握SPARK SQL的开发原理与内在机制。
RDDï¼DataFrameåDataSetçåºå«
RDDãDataFrameåDataSetæ¯å®¹æ产çæ··æ·çæ¦å¿µï¼å¿ é¡»å¯¹å ¶ç¸äºä¹é´å¯¹æ¯ï¼æå¯ä»¥ç¥éå ¶ä¸å¼åã
RDDåDataFrame
RDD-DataFrame
ä¸å¾ç´è§å°ä½ç°äºDataFrameåRDDçåºå«ã左侧çRDD[Person]è½ç¶ä»¥Person为类ååæ°ï¼ä½Sparkæ¡æ¶æ¬èº«ä¸äºè§£
Personç±»çå é¨ç»æãèå³ä¾§çDataFrameå´æä¾äºè¯¦ç»çç»æä¿¡æ¯ï¼ä½¿å¾Spark
SQLå¯ä»¥æ¸ æ¥å°ç¥é该æ°æ®éä¸å å«åªäºåï¼æ¯åçå称åç±»ååæ¯ä»ä¹ãDataFrameå¤äºæ°æ®çç»æä¿¡æ¯ï¼å³schemaãRDDæ¯åå¸å¼ç
Java对象çéåãDataFrameæ¯åå¸å¼çRow对象çéåãDataFrameé¤äºæä¾äºæ¯RDDæ´ä¸°å¯çç®å以å¤ï¼æ´éè¦çç¹ç¹æ¯æåæ§è¡æ
çãåå°æ°æ®è¯»å以åæ§è¡è®¡åçä¼åï¼æ¯å¦filterä¸æ¨ãè£åªçã
æåæ§è¡æç
RDD
APIæ¯å½æ°å¼çï¼å¼ºè°ä¸åæ§ï¼å¨å¤§é¨ååºæ¯ä¸å¾åäºå建æ°å¯¹è±¡èä¸æ¯ä¿®æ¹è对象ãè¿ä¸ç¹ç¹è½ç¶å¸¦æ¥äºå¹²åæ´æ´çAPIï¼å´ä¹ä½¿å¾Sparkåºç¨ç¨åºå¨è¿
è¡æå¾åäºå建大é临æ¶å¯¹è±¡ï¼å¯¹GCé æååãå¨ç°æRDD
APIçåºç¡ä¹ä¸ï¼æ们åºç¶å¯ä»¥å©ç¨mapPartitionsæ¹æ³æ¥éè½½RDDå个åçå çæ°æ®å建æ¹å¼ï¼ç¨å¤ç¨å¯å对象çæ¹å¼æ¥åå°å¯¹è±¡åé åGCç
å¼éï¼ä½è¿çºç²äºä»£ç çå¯è¯»æ§ï¼èä¸è¦æ±å¼åè 对Sparkè¿è¡æ¶æºå¶æä¸å®çäºè§£ï¼é¨æ§è¾é«ãå¦ä¸æ¹é¢ï¼Spark
SQLå¨æ¡æ¶å é¨å·²ç»å¨åç§å¯è½çæ åµä¸å°½ééç¨å¯¹è±¡ï¼è¿æ ·åè½ç¶å¨å é¨ä¼æç ´äºä¸åæ§ï¼ä½å¨å°æ°æ®è¿åç»ç¨æ·æ¶ï¼è¿ä¼éæ°è½¬ä¸ºä¸å¯åæ°æ®ãå©ç¨
DataFrame APIè¿è¡å¼åï¼å¯ä»¥å è´¹å°äº«åå°è¿äºä¼åææã
åå°æ°æ®è¯»å
åæ大æ°æ®ï¼æå¿«çæ¹æ³å°±æ¯ ââ忽ç¥å®ãè¿éçâ忽ç¥â并ä¸æ¯çè§æ ç¹ï¼èæ¯æ ¹æ®æ¥è¯¢æ¡ä»¶è¿è¡æ°å½çåªæã
ä¸æ讨论ååºè¡¨æ¶æå°çååºåª æ便æ¯å ¶ä¸ä¸ç§ââå½æ¥è¯¢çè¿æ»¤æ¡ä»¶ä¸æ¶åå°ååºåæ¶ï¼æ们å¯ä»¥æ ¹æ®æ¥è¯¢æ¡ä»¶åªæè¯å®ä¸å å«ç®æ æ°æ®çååºç®å½ï¼ä»èåå°IOã
对äºä¸äºâæºè½âæ°æ®æ ¼ å¼ï¼Spark
SQLè¿å¯ä»¥æ ¹æ®æ°æ®æ件ä¸é带çç»è®¡ä¿¡æ¯æ¥è¿è¡åªæãç®åæ¥è¯´ï¼å¨è¿ç±»æ°æ®æ ¼å¼ä¸ï¼æ°æ®æ¯å段ä¿åçï¼æ¯æ®µæ°æ®é½å¸¦ææ大å¼ãæå°å¼ãnullå¼æ°éç
ä¸äºåºæ¬çç»è®¡ä¿¡æ¯ãå½ç»è®¡ä¿¡æ¯è¡¨åæä¸æ°æ®æ®µè¯å®ä¸å æ¬ç¬¦åæ¥è¯¢æ¡ä»¶çç®æ æ°æ®æ¶ï¼è¯¥æ°æ®æ®µå°±å¯ä»¥ç´æ¥è·³è¿(ä¾å¦ææ´æ°åaæ段çæ大å¼ä¸ºï¼èæ¥
询æ¡ä»¶è¦æ±a > )ã
æ¤å¤ï¼Spark SQLä¹å¯ä»¥å åå©ç¨RCFileãORCãParquetçåå¼åå¨æ ¼å¼çä¼å¿ï¼ä» æ«ææ¥è¯¢çæ£æ¶åçåï¼å¿½ç¥å ¶ä½åçæ°æ®ã
æ§è¡ä¼å
人å£æ°æ®åæ示ä¾
为äºè¯´ææ¥è¯¢ä¼åï¼æ们æ¥çä¸å¾å±ç¤ºç人å£æ°æ®åæç示ä¾ãå¾ä¸æé äºä¸¤ä¸ªDataFrameï¼å°å®ä»¬joinä¹åååäºä¸æ¬¡filteræä½ãå¦
æåå°ä¸å¨å°æ§è¡è¿ä¸ªæ§è¡è®¡åï¼æç»çæ§è¡æçæ¯ä¸é«çãå 为joinæ¯ä¸ä¸ªä»£ä»·è¾å¤§çæä½ï¼ä¹å¯è½ä¼äº§çä¸ä¸ªè¾å¤§çæ°æ®éãå¦ææ们è½å°filter
ä¸æ¨å° joinä¸æ¹ï¼å 对DataFrameè¿è¡è¿æ»¤ï¼åjoinè¿æ»¤åçè¾å°çç»æéï¼ä¾¿å¯ä»¥ææ缩çæ§è¡æ¶é´ãèSpark
SQLçæ¥è¯¢ä¼åå¨æ£æ¯è¿æ ·åçãç®èè¨ä¹ï¼é»è¾æ¥è¯¢è®¡åä¼åå°±æ¯ä¸ä¸ªå©ç¨åºäºå ³ç³»ä»£æ°ççä»·åæ¢ï¼å°é«ææ¬çæä½æ¿æ¢ä¸ºä½ææ¬æä½çè¿ç¨ã
å¾å°çä¼åæ§è¡è®¡åå¨è½¬æ¢æç© çæ§è¡è®¡åçè¿ç¨ä¸ï¼è¿å¯ä»¥æ ¹æ®å ·ä½çæ°æ®æºçç¹æ§å°è¿æ»¤æ¡ä»¶ä¸æ¨è³æ°æ®æºå ãæå³ä¾§çç©çæ§è¡è®¡åä¸Filterä¹æ以æ¶å¤±ä¸è§ï¼å°±æ¯å ä¸ºæº¶å ¥äºç¨äºæ§è¡æç»ç读åæä½ç表æ«æèç¹å ã
对äºæ®éå¼åè èè¨ï¼æ¥è¯¢ä¼å å¨çæä¹å¨äºï¼å³ä¾¿æ¯ç»éªå¹¶ä¸ä¸°å¯çç¨åºåååºç次ä¼çæ¥è¯¢ï¼ä¹å¯ä»¥è¢«å°½é转æ¢ä¸ºé«æçå½¢å¼äºä»¥æ§è¡ã
RDDåDataSet
DataSet以Catalysté»è¾æ§è¡è®¡å表示ï¼å¹¶ä¸æ°æ®ä»¥ç¼ç çäºè¿å¶å½¢å¼è¢«åå¨ï¼ä¸éè¦ååºååå°±å¯ä»¥æ§è¡sortingãshuffleçæä½ã
DataSetåç«éè¦ä¸ä¸ªæ¾å¼çEncoderï¼æ对象åºåå为äºè¿å¶ï¼å¯ä»¥æ对象çschemeæ å°ä¸ºSparkSQlç±»åï¼ç¶èRDDä¾èµäºè¿è¡æ¶åå°æºå¶ã
éè¿ä¸é¢ä¸¤ç¹ï¼DataSetçæ§è½æ¯RDDçè¦å¥½å¾å¤ã
DataFrameåDataSet
Datasetå¯ä»¥è®¤ä¸ºæ¯DataFrameçä¸ä¸ªç¹ä¾ï¼ä¸»è¦åºå«æ¯Datasetæ¯ä¸ä¸ªrecordåå¨çæ¯ä¸ä¸ªå¼ºç±»åå¼èä¸æ¯ä¸ä¸ªRowãå æ¤å ·æå¦ä¸ä¸ä¸ªç¹ç¹ï¼
DataSetå¯ä»¥å¨ç¼è¯æ¶æ£æ¥ç±»å
并ä¸æ¯é¢å对象çç¼ç¨æ¥å£ãç¨wordcount举ä¾ï¼
//DataFrame
// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
.flatMap(_.split(" ")) // Split on whitespace
.filter(_ != "") // Filter empty words
.toDF() // Convert to DataFrame to perform aggregation / sorting
.groupBy($"value") // Count number of occurences of each word
.agg(count("*") as "numOccurances")
.orderBy($"numOccurances" desc) // Show most common words first
åé¢çæ¬DataFrameä¼ç»§æ¿DataSetï¼DataFrameæ¯é¢åSpark SQLçæ¥å£ã
//DataSet,å®å ¨ä½¿ç¨scalaç¼ç¨ï¼ä¸è¦åæ¢å°DataFrame
val wordCount =
ds.flatMap(_.split(" "))
.filter(_ != "")
.groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
DataFrameåDataSetå¯ä»¥ç¸äºè½¬åï¼ df.as[ElementType] è¿æ ·å¯ä»¥æDataFrame转å为DataSetï¼ ds.toDF() è¿æ ·å¯ä»¥æDataSet转å为DataFrameã
è°è°RDDï¼DataFrameï¼Datasetçåºå«ååèªçä¼å¿
RDDãDataFrameãDatasetå ¨é½æ¯sparkå¹³å°ä¸çåå¸å¼å¼¹æ§æ°æ®éï¼ä¸ºå¤çè¶ å¤§åæ°æ®æä¾ä¾¿å©
2ãä¸è é½ææ°æ§æºå¶ï¼å¨è¿è¡å建ã转æ¢ï¼å¦mapæ¹æ³æ¶ï¼ä¸ä¼ç«å³æ§è¡ï¼åªæå¨éå°Actionå¦foreachæ¶ï¼ä¸è æä¼å¼å§éåè¿ç®ï¼æ端æ åµä¸ï¼å¦æ代ç éé¢æå建ã转æ¢ï¼ä½æ¯åé¢æ²¡æå¨Actionä¸ä½¿ç¨å¯¹åºçç»æï¼å¨æ§è¡æ¶ä¼è¢«ç´æ¥è·³è¿ï¼å¦