本文目录一览:
- 1、Spark 处理小文件
- 2、spark之RDD详解----五大特性
- 3、【spark系列8】spark delta读数据实现分析
- 4、Spark RDD 分布式弹性数据集
- 5、Spark读取HDFS数据分区参考
- 6、什么是Spark,如何使用Spark进行数据分析
Spark 处理小文件
不论是Hive还是Spark SQL在使用过程中都可能会遇到小文件过多的问题。小文件过多最直接的表现是任务执行时间长,查看Spark log会发现大量的数据移动的日志。我们可以查看log中展现的日志信息,去对应的路径下查看文件的大小和个数。
通过上述命令可以查看文件的个数以及大小。count查看出的文件大小单位是B,需要转换为MB。
在spark官方的推荐文档中,parquet格式的文件推荐大小是128MB,小于该大小的均可以称之为小文件,在实际的工作,往往小文件的大小仅仅为几KB,表现为,可能文件大小为几百MB,但是文件个数可能到达了几十万个。一般来说,我们可以通过简单相除获得文件的平均大小,如果文件数目不多,我们也可以通过下述命令获得每个文件的大小。-spark分类数据的样例文件
1.任务执行时间长
2.真实的文件大小独占一个数据存储块,存放到DataNode节点中。同时 DataNode一般默认存三份副本,以保障数据安全。同时该文件所存放的位置也写入到NameNode的内存中,如果有Secondary NameNode高可用节点,也可同时复制一份过去。NameNode的内存数据将会存放到硬盘中,如果HDFS发生重启,将产生较长时间的元数据从硬盘读到内存的过程。-spark分类数据的样例文件
3.不论在Hive还是在Spark中,每一个存储块都对应一个Map程序,一个Map呈现就需要一个JVM,启动一个JVM去读取或者写小文件是吃力不讨好的行为。在实际的生产中,为了更好的管理集群资源,一般会要求程序执行时限制Executor数量和每个Executor的核心数量,需要频繁创建Executor来读取写入。-spark分类数据的样例文件
5.影响磁盘寻址时间
小文件合并,本质上就是通过某种操作,将一系列小文件合并成大文件。我们知道,以MapReduce为代表的大数据系统,都习惯用K-V键值对的形式来处理文件,最后文件落盘,也是一个reduce对应一个输出文件。所以直观上,我们可以减少reduce数量,达到减少文件数量的目的。-spark分类数据的样例文件
从Map到Reduce需要一个Shuffle过程,所以我们将小文件合并理解为通过一个Shuffle,合并小文件成一个大文件。基于这样的思想,我们的策略可以分为两类:一类是原来的计算已经有Shuffle了,那么我们可以认为控制输出文件的数量;二类是强制触发Shuffle,进行小文件合并。-spark分类数据的样例文件
1-设置参数 (一般用于Hive)
2-distribute by rand()
往动态分区插入数据时,在已经写好的SQL末尾加上distribute by rand()
该算子只是起到打散的效果,但是我们还要设置文件的大小,以免打散后仍然有小文件。
表示每个reduce的大小,Hive可以数据总量,得到reduce个数,假设hive认为会有10个reduce,那么,这里rand()则会为 x % 10
3-group by
我们知道,group by算子会触发Shuffle,因此只要我们设置好Shuffle时的文件个数就好,在Spark SQL中,我们可以设置partition个数,因为一个partition会对应一个文件。-spark分类数据的样例文件
上述的操作,会触发shuffle,因此我们再设置partition个数。
则表示,shuffle后,只会产生10个partition.
4-repartition()
5-coalesce()
需要注意的是,4和5都是spark 2.4以及以后才会支持的。
spark之RDD详解----五大特性
spark Github :
RDD: 让开发者大大降低开发分布式应用程序的门槛以及执行效率。
RDD源码:
弹性:代表着spark在分布式计算的时候,可以容错 ---计算层面
分布式:把一份数据拆分成多份,在各个节点上并行的运行,他们之间没有任何的依赖关系
数据集:一个文件就是一个数据集
partitioned collection of elements :数据可以拆分成分区
that can be operated on in parallel.:每个分区的内容可以并行的被操作
解释:
RDD(1,2,3,4,5,6,7,8,9) 假如需要 + 1
那么数据被分成三个分区,只要每个分区上的内容都执行+1的操作就可以
Hadoop001: (1,2,3) +1
Hadoop002: (4,5,6) +1
Hadoop003: (7,8,9) +1
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[ ]]
) extends Serializable with Logging {
(1)抽象类:RDD必然是由子类实现的,我们使用的直接使用其子类即可
(2)Serializable:可以序列化
(3)Logging:spark1.6可以使用,spark2.0之后不可以使用
(4)T:存储各种数据类型
(5)SparkContext
(6)@transient
大数据里面一般是移动数据不是移动计算,所以数据本地化计算这样性能更高。
def compute(split: Partition, context: TaskContext): Iterator[T]
RDD计算是对RDD里面的分区做计算,所以传入split: Partition 对应的RDD特点第二点
protected def getPartitions: Array[Partition]:
拿到分区,RDD是由一系列的分区构成,所以得到的一定是分区 Array[Partition] 对应着第一大特点
......
【spark系列8】spark delta读数据实现分析
本文基于delta 0.7.0
spark 3.0.1
我们之前的 spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析 , spark delta写操作ACID事务实现分析 分析了delta写数据的流程,这次我们分析一下delta是怎么读取数据的。-spark分类数据的样例文件
spark 的delta datasource的构建要从 DataSource.lookupDataSourceV2 开始,之后会流向到loadV1Source,这里会进行dataSource.createRelation进行构建datasource的Relation的构建,直接转到deltaDataSource 的createRelation:-spark分类数据的样例文件
通过之前文章的分析,我们直到deltalog记录了AddFile和Remove记录,那现在读数据怎么读取呢?全部在allFiles方法。
重点看一下:allFiles方法:
这里调用了state方法,而它又调用了stateReconstruction方法,
stateReconstruction方法在checkpoint的时用到了,在这里也用到了,主要是重新构造文件状态,合并AddFile和RemoveFile:
而关键在于InMemoryLogReplay的append方法和checkpoint方法,这里做到了文件状态的合并:
重点就在case add: AddFile和 case remove: RemoveFile处理以及checkpoint方法,能够很好的合并文件状态。
再调用collect方法,返回DeltaScan,之后获取文件路径作为要处理的文件路径。
注意:spark读取delta格式整个流程和spark读取其他数据格式流程一致,主要区别在于读取数据之前,会把文件状态在内存中进行一次合并,这样只需要读取文件状态为Addfile的就行了
Spark RDD 分布式弹性数据集
rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。
rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。-spark分类数据的样例文件
rdd的特性总结:
分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。-spark分类数据的样例文件
另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。
上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。
如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。-spark分类数据的样例文件
用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。
从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。-spark分类数据的样例文件
HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。
spark.sparkContext.textFile(" hdfs://user/local/admin.text ") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。-spark分类数据的样例文件
textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。
总结下HadoopRDD分区规则:
1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。
2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。-spark分类数据的样例文件
3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。
总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。
rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。
RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。
从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。
从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。
Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。
从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。
def compute(split: Partition, context: TaskContext): Iterator[T]
compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。
partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。-spark分类数据的样例文件
分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。
HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。
RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。-spark分类数据的样例文件
rdd中的算子可以分为两种,一个是transformation, 一个是action算子。
1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。
2. Action:行动算子,这类算子会触发SparkContext提交Job作业。
Spark读取HDFS数据分区参考
refer:
hive metastore 和 parquet 转化的方式通过 spark.sql.hive.convertMetastoreParquet 控制,默认为 true。
如果设置为 true ,会使用 org.apache.spark.sql.execution.FileSourceScanExec ,否则会使用 org.apache.spark.sql.hive.execution.HiveTableScanExec。-spark分类数据的样例文件
前者对分区规则做了一些优化,如果 文件是:
HiveTableScanExec
通过文件数量,大小进行分区。
例如:读入一份 2048M 大小的数据,hdfs 块大小设置为 128M
该目录有1000个小文件,则会生成1000个partition。
如果只有1个文件,则会生成 16 个分区。
如果有一个大文件1024M,其余 999 个文件共 1024M,则会生成 1007个分区。
什么是Spark,如何使用Spark进行数据分析
Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法-spark分类数据的样例文件
数据科学家为了回答一个问题或进行深入研究,会使用相关的技术分析数据。通常,他们的工作包含特殊的分析,所以他们使用交互式shell,以使得他们能在最短的时间内看到查询结果和代码片段。Spark的速度和简单的API接口很好地符合这个目标,它的内建库意味着很多算法可以随时使用。-spark分类数据的样例文件
Spark通过若干组件支持不同的数据科学任务。Spark shell使得用Python或Scala进行交互式数据分析变得简单。Spark SQL也有一个独立的SQL shell,可以用SQL进行数据分析,也可以在Spark程序中或Spark shell中使用Spark SQL。MLlib库支持机器学习和数据分析。而且,支持调用外部的MATLAB或R语言编写的程序。Spark使得数据科学家可以用R或Pandas等工具处理包含大量数据的问题。-spark分类数据的样例文件