2023年11月26日发(作者:)
Spark的Transform算⼦和Action算⼦列举和⽰例
Spark 算⼦概念
Transformation 变换/转换:这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说
从⼀个RDD 转换⽣成另⼀个 RDD 的转换操作不是马上执⾏,需要等到有 Action 操作的时候才会真正触发运算。
提交不执⾏
rdd.map{}
.filter{}
Action ⾏动算⼦:这类算⼦会触发 SparkContext 提交 Job 作业。
Action 算⼦会触发 Spark 提交作业(Job)。
提交执⾏
rdd.count
算⼦再细分
Value数据类型的Transformation算⼦,对value进⾏操作
如:map,filter等
Key-Value数据类型的Transfromation算⼦,只有key-value形式的数据才能⽤。
如:reduceByKey,groupByKey等
Transformations 算⼦
Transform⾥⾯的Value算⼦
- map
将原来 RDD 的每个数据项通过 map 中的⽤户⾃定义函数 f 映射转变为⼀个新的元素,返回类型:MappedRDD
。图 1中每个⽅框表⽰⼀个 RDD 分区,左侧的分区经过⽤户⾃定义函数 f:T->U 映射为右侧的新 RDD 分区。
val rdd = elize(Array(1,2,3,4,5), 3)
{x=>x*100}.collect
结果为 Array(100,200,300,400,500)
flatMap
将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将⽣成的 RDD 的每个集合中的元素合并为⼀个集合。返回类型:
FlatMappedRDD
图 表 ⽰ RDD 的 ⼀ 个 分 区 ,进 ⾏ flatMap函 数 操 作, flatMap 中 传 ⼊ 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将
分区中的数据通过⽤户⾃定义函数 f 转换为新的数据。外部⼤⽅框可以认为是⼀个 RDD 分区,⼩⽅框代表⼀个集合。 V1、 V2、 V3
在⼀个集合作为 RDD 的⼀个数据项,可能存储为数组或其他容器,转换为V’1、 V’2、 V’3 后,将原来的数组或容器结合拆散,
拆散的数据形成为 RDD 中的数据项
val rdd=elize(Array("ab","cd","ef"), 3)
p(x=>x+"@").collect
结果为:Array(a,b,@,c,d,@,e,f,@)
mapPartitions
mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 ⾏
操 作。返回类型:MapPartitionsRDD
效果和map⼀样,只是操作的对象是⼀个迭代器。
val rdd=sc.parallelize(Array(1,2,3,4,5,6), 3)
rdd.mapPartitions{Iter=>iter.filter(_>=3)}.collect
返回值:Array(3,4,5,6)
glom
glom函数将每个分区形成⼀个数组,内部实现是返回的GlommedRDD。 图4中的每个⽅框代表⼀个RDD分区。图4中的⽅框代表⼀
个分区。 该图表⽰含有V1、 V2、 V3的分区通过函数glom形成⼀数组Array[(V1),(V2),(V3)]
val rdd=elize(Array(1,2,3,4,5,6), 2)
().collect
返回值:Array(Array(1,2,3),Array(4,5,6))
union
使⽤ union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进⾏去
重操作,保存所有元素
val rdd= sc.parallelize(Array(1,2,3,4), 3)
val r=rdd.union(rdd).union(rdd)
println(r.)//输出分区数
结果为:9
cartesian
对 两 个 RDD 内 的 所 有 元 素 进 ⾏ 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD。图中左侧⼤⽅框代表两个
RDD,⼤⽅框内的⼩⽅框代表 RDD 的分区。右侧⼤⽅框代表合并后的 RDD,⼤⽅框内的⼩⽅框代表分区。图6中的⼤⽅框代表
RDD,⼤⽅框中的⼩⽅框代表RDD分区。
val rdd= elize(Array(1,2), 3)
println(rdd.cartesian(rdd).)
输出:(1,1)(1,2)(2,1)(2,2)
结果为:9 (分区数也⼀次乘上去)
groupBy
将元素通过函数⽣成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为⼀组。
val rdd = elize(1 to 9, 3)
y(x =>{
if (x % 2 == 0) "even" else "odd"
})
.collect
结果:Array((even,ArrayBuffer(2, 4, 6, 8)),(odd,ArrayBuffer(1, 3, 5, 7, 9)))
filter
filter 函数功能是对元素进⾏过滤,对每个 元 素 应 ⽤ f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将
被过滤掉
val rdd= elize(Array(1,2,3,4), 3)
{_>=3}.collect
返回:Array(3,4)
distinct
distinct将RDD中的元素进⾏去重操作。图9中的每个⽅框代表⼀个RDD分区,通过distinct函数,将数据去重
val rdd= sc.parallelize(Array(1,1,2,2,3,3,4,4), 3)
rdd.distinct().collect
结果:1,2,3,4
subtract
subtract相当于进⾏集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。图10中左侧的⼤⽅框代表两个RDD,⼤⽅框内
的⼩⽅框代表RDD的分区。 右侧⼤⽅框
代表合并后的RDD,⼤⽅框内的⼩⽅框代表分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第⼀个RDD有,
第⼆个RDD没有,则在新RDD元素中包含V2
val rdd1 = elize(1 to 9, 3)
val rdd2 = elize(1 to 3, 3)
ct(b).collect
返回: Array(6, 9, 4, 7, 5, 8)
sample
sample 将 RDD 这个集合内的元素进⾏采样,获取所有元素的⼦集。⽤户可以设定是否有放回的抽样、百分⽐、随机种⼦,进⽽决定
采样⽅式。内部实现是⽣成 SampledRDD(withReplacement, fraction, seed)。
函数参数设置:
withReplacement=true,表⽰有放回的抽样。
withReplacement=false,表⽰⽆放回的抽样
Fraction=>,⼀个⼤于0,⼩于或等于1的⼩数值,⽤于控制要读取的数据所占整个数据集的概率.
Seed=>,这个值如果没有传⼊,默认值是⼀个0~ue之间的整数
val rdd = elize(1 to 10000, 3)
(false, 0.1, 0)
takeSample
takeSample()函数和上⾯的sample函数是⼀个原理,但是不使⽤相对⽐例采样,⽽是按设定的采样个数进⾏采样,同时返回结果不
再是RDD,⽽是相当于对采样后的数据进⾏Collect()
cache
cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能
persist
persist 函数对 RDD 进⾏缓存操作。数据缓存在哪⾥依据 StorageLevel 这个枚举类型进⾏确定。 有以下⼏种类型的组合(见
10), DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进⾏序列化存储。
下⾯为函数定义, StorageLevel 是枚举类型,代表存储模式,⽤户可以通过按需进⾏选择。persist(newLevel:StorageLevel).图 中
列出persist 函数可以进⾏缓存的模式。例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的⽅式存
储,其他同理
val rdd= sc.parallelize(1 to 100, 3)
rdd.persist(StorageLevel.MEMORY_AND_DISK)
repartition和coalesce
repartition(numPartitions:Int):RDD[T]
coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]
他们两个都是RDD的分区进⾏重新划分,repartition只是coalesce接⼝中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划
分成M个分区)
1。 如果 N⼩于M ⼀般情况下N个分区有数据分布不均匀的状况,利⽤HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设
置为true。
2。如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以将N个分区中的若⼲个分区合并成⼀个新的分区,最终合并为M
个分区,这时可以将shuff设置为false,在shuffl为false的情况下,如果M>N时,coalesce为⽆效的,不进⾏shuffle过程,⽗RDD和⼦
RDD之间是窄依赖关系。
3。如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,⽗⼦RDD是窄依赖关系,他们同处在⼀个Stage中,就可能造成
Spark程序的并⾏度不够,从⽽影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并⾏度,可以讲shuffle设置为true。
总之:如果shuff为false时,如果传⼊的参数⼤于现有的分区数⽬,RDD的分区数不变,也就是说不经过shuffle,是⽆法将RDDde分区数
变多的。
Transform⾥⾯的Key-Value形式的RDD
mapValues
mapValues :针对(Key, Value)型数据中的 Value 进⾏ Map 操作,⽽不对 Key 进⾏处理
val rdd= sc.parallelize(Array(("a",1),("b",2),("c",3)), 3)
rdd.mapValues {_ * 10 }.foreach(println)
结果:(a,100),(b,200),(c,300)
combineByKey
combineByKey[C](
createCombiner:(V) C,
mergeValue:(C, V) C,
mergeCombiners:(C, C) C,
partitioner:Partitioner,
mapSideCombine:Boolean=true,
serializer:Serializer=null):RDD[(K,C)]
说明:数据为RDD[K,V]
createCombiner:是将V值类型转为C类型.
mergeValue:将C和V进⾏操作(理解为单分区⾥⾯的操作)
mergeCombiners:将C和C进⾏操作//理解为分区与分区直接的操作
partitioner:分区⽅式
mapSideCombine:是否在map端先进⾏操作
serializer:序列号⽅式
图中的⽅框代表 RDD 分区。如图,通过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))
val rdd= elize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)
val createCombiner=(a:Int)=>List(a)//将5转成List(5)
val mergeValue=(a:List[Int],b:Int)=>a.::(b)//将List(5)和4操作成 List(5,4)
val mergeCombiners=(a:List[Int],b:List[Int])=>a.:::(b)//将List(4,5)和List(1)操作得List(1,4,5)
eByKey(createCombiner, mergeValue, mergeCombiners)
得出结果((0,List(1, 4, 5)),(2,List(2, 3)))
reduceByKey
做wordcount的时候⽤到的,就是按照key来做⼀个聚合操作。底层⽤的是上⾯的combineByKey。只是那个C跟V是⼀个类型。
函数:reduceByKey(partitioner: Partitioner, func: (V, V) => V)
val rdd= elize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)
ByKey((a,b)=>a+b)
返回:((0,10),(2,5))
partitionBy
重分区partitionBy(Partitioner)
val rdd= elize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)
ionBy(new HashPartitioner(10))//重新分为10个区
Cogroup
返回两个rdd相同key的各⾃数据集合
cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
val rdd= sc.parallelize(Array((0,5),(0,4),(2,3),(2,2),(0,1)), 3)
rdd.cogroup(rdd).foreach(println)
返回
(0,(CompactBuffer(5, 4, 1),CompactBuffer(5, 4, 1)))
(2,(CompactBuffer(3, 2),CompactBuffer(3, 2)))
join
两个rdd做join操作。
RDD[(K,V)].join(other: RDD[(K, W)]): RDD[(K, (V, W))]
如果join不到的就丢弃
val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)
val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)
rdd.join(rdd2).foreach(println)
返回:
(0,(1,11))
(0,(1,22))
(0,(2,11))
(0,(2,22))
leftOutJoin和rightOutJoin
leftOutJoin:join过程中,如果找不到相同的key。则还是保留左边RDD中的数据;RDD[(K,V)].join(other: RDD[(K, W)]): RDD[(K,
(V, Option[W])]
val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)
val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)
rdd.leftOuterJoin(rdd2).foreach(println)
返回:
(0,(1,Some(11)))
(0,(1,Some(22)))
(0,(2,Some(11)))
(0,(2,Some(22)))
(2,(3,None))
rightOutJoin:join过程中,如果找不到相同的key。则还是保留右边RDD中的数据;RDD[(K,V)].join(other: RDD[(K, W)]): RDD[(K,
(Option[V], W)]
val rdd= sc.parallelize(Array((0,1),(0,2),(2,3)), 3)
val rdd2= sc.parallelize(Array((0,11),(0,22),(3,33)), 3)
rdd.rightOutJoin:join(rdd2).foreach(println)
返回:
(0,(Some(1),11))
(0,(Some(1),22))
(0,(Some(2),11))
(0,(Some(2),22))
(3,(None,33))
Actions 算⼦
我们以count为例:点进count的源码可以看到,程序提交了⼀个job
def count(): Long = (this, ratorSize _).sum
count
返回rdd的元素个数
foreach
对rdd的每个元素⼀次做操作,但没有返回值,最常⽤的就是
foreach(println)
saveAsTextFile
函数将数据输出,存储到 HDFS 的指定⽬录。将 RDD 中的每个元素映射转变为 (null, ng),然后再将其写⼊ HDFS。图 中
左侧⽅框代表 RDD 分区,右侧⽅框代表 HDFS 的 Block。通过函数将RDD 的每个分区存储为 HDFS 中的⼀个 Block
saveAsObjectFile
·top返回最⼤的k个元素。
·take返回前k个元素。
reduce
将数据聚合成⼀个元素
fold
old和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第⼀个元素是zeroValue
val a = elize(List(1,2,3), 3)
println(a.fold(10)(_ + _))
输出:46
- aggregate
aggregate先对每个分区的所有元素进⾏aggregate操作,再对分区的结果进⾏fold操作。
aggreagate与fold和reduce的不同之处在于,aggregate相当于采⽤归并的⽅式进⾏数据聚集,这种聚集是并⾏化的。 ⽽在fold和
reduce函数的运算过程中,每个分区中需要进⾏串⾏处理,每个分区串⾏计算完结果,结果再按之前的⽅式进⾏聚集,并返回最终聚集结
果。
函数的定义如下。
aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B
图33通过⽤户⾃定义函数对RDD 进⾏aggregate的聚集操作,图中的每个⽅框代表⼀个RDD分区。
ate(”V0@”,2)((A,B)=>(A.
1+”@”+B._1,A._2+B._2)),
(A,B)=>(A._1+”@”+B_1,A.
@+B_.2))
最后,介绍两个计算模型中的两个特殊变量。
⼴播(broadcast)变量:其⼴泛⽤于⼴播Map Side Join中的⼩表,以及⼴播⼤变量等场景。 这些数据集合在单节点内存能够容纳,
不需要像RDD那样在节点之间打散存储。
Spark运⾏时把⼴播变量数据发到各个节点,并保存下来,后续计算可以复⽤。 相⽐Hadoo的distributed cache,⼴播的内容可以跨作业
共享。 Broadcast的底层实现采⽤了BT机制。
val z = elize(List(1,2,3,4,5,6), 2)
// lets first print out the contents of the RDD with partition labels
def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {
(x => "[partID:" + index + ", val: " + x + "]").iterator
}
titionsWithIndex(myfunc).collect
res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])
ate(0)((_, _), _ + _)
res40: Int = 9


发布评论