×

spark分类数据的样例文件

spark分类数据的样例文件(spark分类数据的样例文件有哪些)

admin admin 发表于2023-03-24 07:04:08 浏览50 评论0

抢沙发发表评论

本文目录一览:

Spark 处理小文件

不论是Hive还是Spark SQL在使用过程中都可能会遇到小文件过多的问题。小文件过多最直接的表现是任务执行时间长,查看Spark log会发现大量的数据移动的日志。我们可以查看log中展现的日志信息,去对应的路径下查看文件的大小和个数。

通过上述命令可以查看文件的个数以及大小。count查看出的文件大小单位是B,需要转换为MB。

在spark官方的推荐文档中,parquet格式的文件推荐大小是128MB,小于该大小的均可以称之为小文件,在实际的工作,往往小文件的大小仅仅为几KB,表现为,可能文件大小为几百MB,但是文件个数可能到达了几十万个。一般来说,我们可以通过简单相除获得文件的平均大小,如果文件数目不多,我们也可以通过下述命令获得每个文件的大小。-spark分类数据的样例文件

1.任务执行时间长

2.真实的文件大小独占一个数据存储块,存放到DataNode节点中。同时 DataNode一般默认存三份副本,以保障数据安全。同时该文件所存放的位置也写入到NameNode的内存中,如果有Secondary NameNode高可用节点,也可同时复制一份过去。NameNode的内存数据将会存放到硬盘中,如果HDFS发生重启,将产生较长时间的元数据从硬盘读到内存的过程。-spark分类数据的样例文件

3.不论在Hive还是在Spark中,每一个存储块都对应一个Map程序,一个Map呈现就需要一个JVM,启动一个JVM去读取或者写小文件是吃力不讨好的行为。在实际的生产中,为了更好的管理集群资源,一般会要求程序执行时限制Executor数量和每个Executor的核心数量,需要频繁创建Executor来读取写入。-spark分类数据的样例文件

5.影响磁盘寻址时间

小文件合并,本质上就是通过某种操作,将一系列小文件合并成大文件。我们知道,以MapReduce为代表的大数据系统,都习惯用K-V键值对的形式来处理文件,最后文件落盘,也是一个reduce对应一个输出文件。所以直观上,我们可以减少reduce数量,达到减少文件数量的目的。-spark分类数据的样例文件

从Map到Reduce需要一个Shuffle过程,所以我们将小文件合并理解为通过一个Shuffle,合并小文件成一个大文件。基于这样的思想,我们的策略可以分为两类:一类是原来的计算已经有Shuffle了,那么我们可以认为控制输出文件的数量;二类是强制触发Shuffle,进行小文件合并。-spark分类数据的样例文件

1-设置参数 (一般用于Hive)

2-distribute by rand()

往动态分区插入数据时,在已经写好的SQL末尾加上distribute by rand()

该算子只是起到打散的效果,但是我们还要设置文件的大小,以免打散后仍然有小文件。

表示每个reduce的大小,Hive可以数据总量,得到reduce个数,假设hive认为会有10个reduce,那么,这里rand()则会为 x % 10

3-group by

我们知道,group by算子会触发Shuffle,因此只要我们设置好Shuffle时的文件个数就好,在Spark SQL中,我们可以设置partition个数,因为一个partition会对应一个文件。-spark分类数据的样例文件

上述的操作,会触发shuffle,因此我们再设置partition个数。

则表示,shuffle后,只会产生10个partition.

4-repartition()

5-coalesce()

需要注意的是,4和5都是spark 2.4以及以后才会支持的。

spark之RDD详解----五大特性

spark Github :

RDD: 让开发者大大降低开发分布式应用程序的门槛以及执行效率。

RDD源码:

弹性:代表着spark在分布式计算的时候,可以容错 ---计算层面

分布式:把一份数据拆分成多份,在各个节点上并行的运行,他们之间没有任何的依赖关系

数据集:一个文件就是一个数据集

partitioned collection of elements :数据可以拆分成分区

that can be operated on in parallel.:每个分区的内容可以并行的被操作

解释:

RDD(1,2,3,4,5,6,7,8,9) 假如需要 + 1

那么数据被分成三个分区,只要每个分区上的内容都执行+1的操作就可以

Hadoop001: (1,2,3) +1

Hadoop002: (4,5,6) +1

Hadoop003: (7,8,9) +1

@transient private var sc: SparkContext,

@transient private var deps: Seq[Dependency[ ]]

) extends Serializable with Logging {

(1)抽象类:RDD必然是由子类实现的,我们使用的直接使用其子类即可

(2)Serializable:可以序列化

(3)Logging:spark1.6可以使用,spark2.0之后不可以使用

(4)T:存储各种数据类型

(5)SparkContext

(6)@transient

大数据里面一般是移动数据不是移动计算,所以数据本地化计算这样性能更高。

def compute(split: Partition, context: TaskContext): Iterator[T]

RDD计算是对RDD里面的分区做计算,所以传入split: Partition 对应的RDD特点第二点

protected def getPartitions: Array[Partition]:

拿到分区,RDD是由一系列的分区构成,所以得到的一定是分区 Array[Partition] 对应着第一大特点

......

【spark系列8】spark delta读数据实现分析

本文基于delta 0.7.0

spark 3.0.1

我们之前的 spark delta写操作ACID事务前传--写文件基础类FileFormat/FileCommitProtocol分析 , spark delta写操作ACID事务实现分析 分析了delta写数据的流程,这次我们分析一下delta是怎么读取数据的。-spark分类数据的样例文件

spark 的delta datasource的构建要从 DataSource.lookupDataSourceV2 开始,之后会流向到loadV1Source,这里会进行dataSource.createRelation进行构建datasource的Relation的构建,直接转到deltaDataSource 的createRelation:-spark分类数据的样例文件

通过之前文章的分析,我们直到deltalog记录了AddFile和Remove记录,那现在读数据怎么读取呢?全部在allFiles方法。

重点看一下:allFiles方法:

这里调用了state方法,而它又调用了stateReconstruction方法,

stateReconstruction方法在checkpoint的时用到了,在这里也用到了,主要是重新构造文件状态,合并AddFile和RemoveFile:

而关键在于InMemoryLogReplay的append方法和checkpoint方法,这里做到了文件状态的合并:

重点就在case add: AddFile和 case remove: RemoveFile处理以及checkpoint方法,能够很好的合并文件状态。

再调用collect方法,返回DeltaScan,之后获取文件路径作为要处理的文件路径。

注意:spark读取delta格式整个流程和spark读取其他数据格式流程一致,主要区别在于读取数据之前,会把文件状态在内存中进行一次合并,这样只需要读取文件状态为Addfile的就行了

Spark RDD 分布式弹性数据集

rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。

rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。-spark分类数据的样例文件

rdd的特性总结:

分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。-spark分类数据的样例文件

另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。

上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。

如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。-spark分类数据的样例文件

用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。

从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。-spark分类数据的样例文件

HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。

spark.sparkContext.textFile(" hdfs://user/local/admin.text ") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。-spark分类数据的样例文件

textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。

总结下HadoopRDD分区规则:

1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。-spark分类数据的样例文件

3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。

RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。

从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。

从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。

Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。

从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。

def compute(split: Partition, context: TaskContext): Iterator[T]

compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。

partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。-spark分类数据的样例文件

分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。

HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。

RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。-spark分类数据的样例文件

rdd中的算子可以分为两种,一个是transformation, 一个是action算子。

1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

2. Action:行动算子,这类算子会触发SparkContext提交Job作业。

Spark读取HDFS数据分区参考

refer:

hive metastore 和 parquet 转化的方式通过 spark.sql.hive.convertMetastoreParquet 控制,默认为 true。

如果设置为 true ,会使用 org.apache.spark.sql.execution.FileSourceScanExec ,否则会使用 org.apache.spark.sql.hive.execution.HiveTableScanExec。-spark分类数据的样例文件

前者对分区规则做了一些优化,如果 文件是:

HiveTableScanExec

通过文件数量,大小进行分区。

例如:读入一份 2048M 大小的数据,hdfs 块大小设置为 128M

该目录有1000个小文件,则会生成1000个partition。

如果只有1个文件,则会生成 16 个分区。

如果有一个大文件1024M,其余 999 个文件共 1024M,则会生成 1007个分区。

什么是Spark,如何使用Spark进行数据分析

Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法-spark分类数据的样例文件

数据科学家为了回答一个问题或进行深入研究,会使用相关的技术分析数据。通常,他们的工作包含特殊的分析,所以他们使用交互式shell,以使得他们能在最短的时间内看到查询结果和代码片段。Spark的速度和简单的API接口很好地符合这个目标,它的内建库意味着很多算法可以随时使用。-spark分类数据的样例文件

Spark通过若干组件支持不同的数据科学任务。Spark shell使得用Python或Scala进行交互式数据分析变得简单。Spark SQL也有一个独立的SQL shell,可以用SQL进行数据分析,也可以在Spark程序中或Spark shell中使用Spark SQL。MLlib库支持机器学习和数据分析。而且,支持调用外部的MATLAB或R语言编写的程序。Spark使得数据科学家可以用R或Pandas等工具处理包含大量数据的问题。-spark分类数据的样例文件