2023年11月26日发(作者:)

SparkTransform算⼦和Action算⼦列举和⽰例

Spark 算⼦概念

Transformation 变换/转换:这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说

从⼀个RDD 转换⽣成另⼀个 RDD 的转换操作不是马上执⾏,需要等到有 Action 操作的时候才会真正触发运算。

提交不执⾏

rdd.map{}

.filter{}

Action ⾏动算⼦:这类算⼦会触发 SparkContext 提交 Job 作业。

Action 算⼦会触发 Spark 提交作业(Job)。

提交执⾏

rdd.count

算⼦再细分

Value数据类型的Transformation算⼦,对value进⾏操作

如:map,filter等

Key-Value数据类型的Transfromation算⼦,只有key-value形式的数据才能⽤。

如:reduceByKey,groupByKey等

Transformations 算⼦

Transform⾥⾯的Value算⼦

- map

将原来 RDD 的每个数据项通过 map 中的⽤户⾃定义函数 f 映射转变为⼀个新的元素,返回类型:MappedRDD

。图 1中每个⽅框表⽰⼀个 RDD 分区,左侧的分区经过⽤户⾃定义函数 f:T->U 映射为右侧的新 RDD 分区。

val rdd = elize(Array(1,2,3,4,5), 3)

{x=>x*100}.collect

结果为 Array(100,200,300,400,500)

flatMap

将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将⽣成的 RDD 的每个集合中的元素合并为⼀个集合。返回类型:

FlatMappedRDD

图 表 ⽰ RDD 的 ⼀ 个 分 区 ,进 ⾏ flatMap函 数 操 作, flatMap 中 传 ⼊ 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将

分区中的数据通过⽤户⾃定义函数 f 转换为新的数据。外部⼤⽅框可以认为是⼀个 RDD 分区,⼩⽅框代表⼀个集合。 V1、 V2、 V3

在⼀个集合作为 RDD 的⼀个数据项,可能存储为数组或其他容器,转换为V’1、 V’2、 V’3 后,将原来的数组或容器结合拆散,

拆散的数据形成为 RDD 中的数据项

val rdd=elize(Array("ab","cd","ef"), 3)

p(x=>x+"@").collect

结果为:Array(a,b,@,c,d,@,e,f,@)

mapPartitions

mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 ⾏

操 作。返回类型:MapPartitionsRDD

效果和map⼀样,只是操作的对象是⼀个迭代器。

val rdd=sc.parallelize(Array(1,2,3,4,5,6), 3)

rdd.mapPartitions{Iter=>iter.filter(_>=3)}.collect

返回值:Array(3,4,5,6)

glom

glom函数将每个分区形成⼀个数组,内部实现是返回的GlommedRDD。 图4中的每个⽅框代表⼀个RDD分区。图4中的⽅框代表⼀

个分区。 该图表⽰含有V1、 V2、 V3的分区通过函数glom形成⼀数组Array[(V1),(V2),(V3)]

val rdd=elize(Array(1,2,3,4,5,6), 2)

().collect

返回值:Array(Array(1,2,3),Array(4,5,6))

union

使⽤ union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进⾏去

重操作,保存所有元素

val rdd= sc.parallelize(Array(1,2,3,4), 3)

val r=rdd.union(rdd).union(rdd)

println(r.)//输出分区数

结果为:9

cartesian

对 两 个 RDD 内 的 所 有 元 素 进 ⾏ 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD。图中左侧⼤⽅框代表两个

RDD,⼤⽅框内的⼩⽅框代表 RDD 的分区。右侧⼤⽅框代表合并后的 RDD,⼤⽅框内的⼩⽅框代表分区。图6中的⼤⽅框代表

RDD,⼤⽅框中的⼩⽅框代表RDD分区。

val rdd= elize(Array(1,2), 3)

println(rdd.cartesian(rdd).)

输出:(1,1)(1,2)(2,1)(2,2)

结果为:9 (分区数也⼀次乘上去)

groupBy

将元素通过函数⽣成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为⼀组。

val rdd = elize(1 to 9, 3)

y(x =>{

if (x % 2 == 0) "even" else "odd"

})

.collect

结果:Array((even,ArrayBuffer(2, 4, 6, 8)),(odd,ArrayBuffer(1, 3, 5, 7, 9)))

filter

filter 函数功能是对元素进⾏过滤,对每个 元 素 应 ⽤ f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将

被过滤掉

val rdd= elize(Array(1,2,3,4), 3)

{_>=3}.collect

返回:Array(3,4)

distinct

distinct将RDD中的元素进⾏去重操作。图9中的每个⽅框代表⼀个RDD分区,通过distinct函数,将数据去重

val rdd= sc.parallelize(Array(1,1,2,2,3,3,4,4), 3)

rdd.distinct().collect

结果:1234

subtract

subtract相当于进⾏集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。图10中左侧的⼤⽅框代表两个RDD,⼤⽅框内

的⼩⽅框代表RDD的分区。 右侧⼤⽅框

代表合并后的RDD,⼤⽅框内的⼩⽅框代表分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第⼀个RDD有,

第⼆个RDD没有,则在新RDD元素中包含V2

val rdd1 = elize(1 to 9, 3)

val rdd2 = elize(1 to 3, 3)

ct(b).collect

返回: Array(6, 9, 4, 7, 5, 8)

sample

sample 将 RDD 这个集合内的元素进⾏采样,获取所有元素的⼦集。⽤户可以设定是否有放回的抽样、百分⽐、随机种⼦,进⽽决定

采样⽅式。内部实现是⽣成 SampledRDD(withReplacement, fraction, seed)。

函数参数设置:

withReplacement=true,表⽰有放回的抽样。

withReplacement=false,表⽰⽆放回的抽样

Fraction=>,⼀个⼤于0,⼩于或等于1的⼩数值,⽤于控制要读取的数据所占整个数据集的概率.

Seed=>,这个值如果没有传⼊,默认值是⼀个0~ue之间的整数

val rdd = elize(1 to 10000, 3)

(false, 0.1, 0)

takeSample

takeSample()函数和上⾯的sample函数是⼀个原理,但是不使⽤相对⽐例采样,⽽是按设定的采样个数进⾏采样,同时返回结果不

再是RDD,⽽是相当于对采样后的数据进⾏Collect()

cache

cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能

persist

persist 函数对 RDD 进⾏缓存操作。数据缓存在哪⾥依据 StorageLevel 这个枚举类型进⾏确定。 有以下⼏种类型的组合(见

10), DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进⾏序列化存储。

下⾯为函数定义, StorageLevel 是枚举类型,代表存储模式,⽤户可以通过按需进⾏选择。persist(newLevel:StorageLevel).图 中

列出persist 函数可以进⾏缓存的模式。例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的⽅式存

储,其他同理

val rdd= sc.parallelize(1 to 100, 3)

rdd.persist(StorageLevel.MEMORY_AND_DISK)

repartition和coalesce

repartition(numPartitions:Int):RDD[T]

coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

他们两个都是RDD的分区进⾏重新划分,repartition只是coalesce接⼝中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划

分成M个分区)

1。 如果 N⼩于M ⼀般情况下N个分区有数据分布不均匀的状况,利⽤HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设

置为true。

2。如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若⼲个分区合并成⼀个新的分区,最终合并为M

个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为⽆效的,不进⾏shuffle过程,⽗RDD和⼦

RDD之间是窄依赖关系。

3。如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,⽗⼦RDD是窄依赖关系,他们同处在⼀个Stage中,就可能造成

Spark程序的并⾏度不够,从⽽影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并⾏度,可以讲shuffle设置为true。

总之:如果shuff为false时,如果传⼊的参数⼤于现有的分区数⽬,RDD的分区数不变,也就是说不经过shuffle,是⽆法将RDDde分区数

变多的。

Transform⾥⾯的Key-Value形式的RDD

mapValues

mapValues :针对(Key, Value)型数据中的 Value 进⾏ Map 操作,⽽不对 Key 进⾏处理

val rdd= sc.parallelize(Array(("a",1),("b",2),("c",3)), 3)

rdd.mapValues {_ * 10 }.foreach(println)

结果:(a,100),(b,200),(c,300)

combineByKey

combineByKey[C](

createCombiner:(V) C,

mergeValue:(C, V) C,

mergeCombiners:(C, C) C,

partitioner:Partitioner,

mapSideCombine:Boolean=true,

serializer:Serializer=null):RDD[(K,C)]

说明:数据为RDD[K,V]

createCombiner:是将V值类型转为C类型.

mergeValue:将C和V进⾏操作(理解为单分区⾥⾯的操作)

mergeCombiners:将C和C进⾏操作//理解为分区与分区直接的操作

partitioner:分区⽅式

mapSideCombine:是否在map端先进⾏操作

serializer:序列号⽅式

图中的⽅框代表 RDD 分区。如图,通过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))

val rdd= elize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)

val createCombiner=(a:Int)=>List(a)//5转成List(5)

val mergeValue=(a:List[Int],b:Int)=>a.::(b)//List(5)4操作成 List(5,4)

val mergeCombiners=(a:List[Int],b:List[Int])=>a.:::(b)//List(4,5)List(1)操作得List(1,4,5)

eByKey(createCombiner, mergeValue, mergeCombiners)

得出结果((0,List(1, 4, 5)),(2,List(2, 3)))

reduceByKey

做wordcount的时候⽤到的,就是按照key来做⼀个聚合操作。底层⽤的是上⾯的combineByKey。只是那个C跟V是⼀个类型。

函数:reduceByKey(partitioner: Partitioner, func: (V, V) => V)

val rdd= elize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)

ByKey((a,b)=>a+b)

返回:((0,10),(2,5))

partitionBy

重分区partitionBy(Partitioner)

val rdd= elize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)

ionBy(new HashPartitioner(10))//重新分为10个区

Cogroup

返回两个rdd相同key的各⾃数据集合

cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

val rdd= sc.parallelize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)

rdd.cogroup(rdd).foreach(println)

返回

(0,(CompactBuffer(5, 4, 1),CompactBuffer(5, 4, 1)))

(2,(CompactBuffer(3, 2),CompactBuffer(3, 2)))

join

两个rdd做join操作。

RDD[(K,V)].join(other: RDD[(K, W)]): RDD[(K, (V, W))]

如果join不到的就丢弃

val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)

val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)

rdd.join(rdd2).foreach(println)

返回:

(0,(1,11))

(0,(1,22))

(0,(2,11))

(0,(2,22))

leftOutJoin和rightOutJoin

leftOutJoin:join过程中,如果找不到相同的key。则还是保留左边RDD中的数据;RDD[(K,V)].join(other: RDD[(K, W)]): RDD[(K,

(V, Option[W])]

val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)

val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)

rdd.leftOuterJoin(rdd2).foreach(println)

返回:

(0,(1,Some(11)))

(0,(1,Some(22)))

(0,(2,Some(11)))

(0,(2,Some(22)))

(2,(3,None))

rightOutJoin:join过程中,如果找不到相同的key。则还是保留右边RDD中的数据;RDD[(K,V)].join(other: RDD[(K, W)]): RDD[(K,

(Option[V], W)]

val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)

val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)

rdd.rightOutJoinjoin(rdd2).foreach(println)

返回:

(0,(Some(1),11))

(0,(Some(1),22))

(0,(Some(2),11))

(0,(Some(2),22))

(3,(None,33))

Actions 算⼦

我们以count为例:点进count的源码可以看到,程序提交了⼀个job

def count(): Long = (this, ratorSize _).sum

count

返回rdd的元素个数

foreach

对rdd的每个元素⼀次做操作,但没有返回值,最常⽤的就是

foreach(println)

saveAsTextFile

函数将数据输出,存储到 HDFS 的指定⽬录。将 RDD 中的每个元素映射转变为 (null, ng),然后再将其写⼊ HDFS。图 中

左侧⽅框代表 RDD 分区,右侧⽅框代表 HDFS 的 Block。通过函数将RDD 的每个分区存储为 HDFS 中的⼀个 Block

saveAsObjectFile

·top返回最⼤的k个元素。

·take返回前k个元素。

reduce

将数据聚合成⼀个元素

fold

old和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第⼀个元素是zeroValue

val a = elize(List(1,2,3), 3)

println(a.fold(10)(_ + _))

输出:46

- aggregate

aggregate先对每个分区的所有元素进⾏aggregate操作,再对分区的结果进⾏fold操作。

aggreagate与fold和reduce的不同之处在于,aggregate相当于采⽤归并的⽅式进⾏数据聚集,这种聚集是并⾏化的。 ⽽在fold和

reduce函数的运算过程中,每个分区中需要进⾏串⾏处理,每个分区串⾏计算完结果,结果再按之前的⽅式进⾏聚集,并返回最终聚集结

果。

函数的定义如下。

aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B

图33通过⽤户⾃定义函数对RDD 进⾏aggregate的聚集操作,图中的每个⽅框代表⼀个RDD分区。

ate(”V0@”,2)((A,B)=>(A.

1+”@”+B._1,A._2+B._2)),

(A,B)=>(A._1+”@”+B_1,A.

@+B_.2))

最后,介绍两个计算模型中的两个特殊变量。

⼴播(broadcast)变量:其⼴泛⽤于⼴播Map Side Join中的⼩表,以及⼴播⼤变量等场景。 这些数据集合在单节点内存能够容纳,

不需要像RDD那样在节点之间打散存储。

Spark运⾏时把⼴播变量数据发到各个节点,并保存下来,后续计算可以复⽤。 相⽐Hadoo的distributed cache,⼴播的内容可以跨作业

共享。 Broadcast的底层实现采⽤了BT机制。

val z = elize(List(1,2,3,4,5,6), 2)

// lets first print out the contents of the RDD with partition labels

def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

(x => "[partID:" + index + ", val: " + x + "]").iterator

}

titionsWithIndex(myfunc).collect

res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

ate(0)((_, _), _ + _)

res40: Int = 9