2023年11月26日发(作者:)

spark常见RDD练习

Spark 常⽤RDD练习

其实还是推荐这个⽹站,写的很棒,

⼀、Transformation

1 map

Applies a transformation function on each item of the RDD and returns the result as a new RDD. (返回⼀个新的RDD,该

RDD有每⼀个输⼊元素经过func函数转换后组成)

def map[U: ClassTag](f: T => U): RDD[U]

val a=elize(List("dog","salmon","salmon","rat","elephant"),3)

res2: Array[String] = Array(dog, salmon, salmon, rat, elephant)

val b = (_.length)

This is a specialized map that is called only once for each partition. The entire content of the respective partitions is

available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet

another Iterator[U]. The combined result iterators are automatically converted into a new RDD. (类似于map,但独⽴的在

RDD的每⼀个分⽚上运⾏,因此在类型为T的RDD上运⾏时,func的函数类型必须是Iterator[T] =>Iterator[U])

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

参数说明:

Iterator[T] 传⼊T类型的Iterator

Iterator[U] 返回U类型的Iterator

preservesPartitioning 是否保留⽗RDDpartitioner分区信息

//example

val a = elize(1 to 9,3)

def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={

| var res = List[(T,T)]()

| var pre =

| while(t){

| val cur = ;

| res .::= (pre, cur)

| pre = cur;

def ++(other: RDD[T]): RDD[T]

def union(other: RDD[T]): RDD[T]

val a = elize(1 to 3)

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

// createCombiner: V => C 将当前的V作为参数,返回的类型为C(相当于初始换操作)

// mergeValue: (C, V) => C 将元素V 合并到C(即createCombiner),(该函数作⽤于每个分区)

// mergeCombiners: (C, C) => C 将两个元素C合并(该函数作⽤于不同的分区)

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

// numPartitions: Int 分区数

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolea

n = true, serializerClass: String = null): RDD[(K, C)]

// partitioner: Partitioner 分区器

// mapSideCombine: Boolean = true 是否在map端进⾏合并(默认为true

// Example

val a = elize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

val b = elize(List(1,1,2,2,2,1,2,2,2), 3)

val c = (a)

t

res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))

val d = eByKey(List(_),(x:List[String],y:String) => y :: x,(m:List[String],n:List[String]) => m ::: n)

t

res1: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))

10 groupByKey [Pair]

发⽣fhuffle

// groupByKey() 调⽤了combinebyKey

def groupByKey(): RDD[(K, Iterable[V])]

K key

Iterable[V] value的迭代器

def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

numPartitions: Int 分区数

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

partitioner: Partitioner 分区器

// example

先对每个分区的原属进⾏聚合,然后对所有分区的结果做聚合,聚合过程中使⽤的是给定的函数及初始值,返回⼀个与原RDD不同类

型的RDD(需要⼀个合并RDD类型T到结果类型U 的函数,还需要⼀个合并类型U的函数,这两个函数都可以修改和返回它们的第⼀个

参数)

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

//zeroValue: U 给定的初始值,作⽤于局部聚合的每个分区及全局聚合的每个分区

//seqOp: (U, T) 每个分区聚合函数

//combOp: (U, U) 全局聚合

//example

val z = elize(List(1,2,3,4,5,6),2)

def myfunc(index:Int,iter:Iterator[Int]):Iterator[String]={(x => ("[partID: " + index + ",val: " + x + "]"))}

myfunc: (index: Int, iter: Iterator[Int])Iterator[String]

titionsWithIndex(myfunc).collect

res24: Array[String] = Array([partID: 0,val: 1], [partID: 0,val: 2], [partID: 0,val: 3], [partID: 1,val: 4], [partID: 1,val: 5], [partID: 1,val: 6])

ate(5)((_,_),_+_)

res26: Int = 16

//分析:

初始值为5

partition 0 求最⼤值(5,1,2,3)得到最⼤值 5

partition 1 求最⼤值(5,4,5,6)得到最⼤值 6

全局聚合 5+5+6

13 aggregateByKey [Pair]

作⽤于相同key的value,初始值只作⽤于局部聚合,⽽不作⽤于全局聚合

发⽣shuffle

// 底层调⽤了combineByKey

def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

// zeroValue: U ⾃定义的初始值

// (seqOp: (U, V) U 局部聚合函数,返回类型为U类型

// combOp: (U, U) U 全局聚合,返回类型为U类型

def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

// numPartitions: Int 分区数

def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) U, combOp: (U, U) U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

// partitioner: Partitioner 分区器

// example

val pairRDD = elize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)

def myfunc(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={(x=>"[partID: "+ index + ",val: "+ x +"]")}

myfunc: (index: Int, iter: Iterator[(String, Int)])Iterator[String]

res1: Array[String] = Array([partID: 0,val: (cat,2)], [partID: 0,val: (cat,5)], [partID: 0,val: (mouse,4)], [partID: 1,val: (cat,12)], [partID: 1,val: (dog,12)], [pa

rtID: 1,val: (mouse,2)])

ateByKey(0)((_,_),_+_).collect

res2: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))

ateByKey(100)((_,_),_+_).collect

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

//func: (V, V) 传⼊的func

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

//numPartitions: Int 分区数

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

//partitioner: Partitioner 分区器

val a = elize(List("dog","cat","owl","gnu","ant"),2)

ByKey(_+_).collect

res8: Array[(Int, String)] = Array((3,dogcatowlgnuant))

val a = elize(List("dog","tiger","lion","cat","panther","eagle"))

ByKey(_+_).collect

res10: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))

15 sortBy

This function sorts the input RDD’s data and stores it in a new RDD. The first parameter requires you to specify a

Performs an inner join using two key-value RDDs. Please note that the keys must be generally comparable to make this

work. (在类型(K,V)和(K,W)的RDD上调⽤,返回⼀个相同的key对应的所有元素在⼀起的(K,(V,W))的RDD

发⽣shuffle

def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]

// other: RDD[(K, W)] 传⼊的RDD

// RDD[(K, (Iterable[V], Iterable[W]))] 返回的RDD(Kkey(Iterable[V], Iterable[W]) vaiue的迭代器

def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

// numPartitions: Int 分区数

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]

// partitioner: Partitioner 分区器

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]

//exampie

val a = elize(List(1,2,1,3),1)

合并分区(coalesce:多—>少,窄依赖),

def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]

// numPartitions : Int 分区数

// shuffle : Boolean 是否发⽣shuffle,默认为false

def repartition ( numPartitions : Int ): RDD [T]

// numPartitions : Int 分区数,发⽣shuffle

val y = elize(1 to 10,5)

res16: Int = 5

val z = ce(2,false)

```scala

def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]

def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]

def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]

val a = elize(1 to 9)

y(x=>{if(x%2==0) “even” else “odd”}).collect

res10: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))

val a = elize(1 to 9)

def myfunc(a:Int):Int={a%2}

myfunc: (a: Int)Int

y(myfunc).collect

res11: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))

def myfunc(a:Int):Double={a%2}

y(myfunc).collect

res12: Array[(Double, Iterable[Int])] = Array((0.0,CompactBuffer(2, 4, 6, 8)), (1.0,CompactBuffer(1, 3, 5, 7, 9)))

val a = elize(1 to 9, 3)

def myfunc(a: Int) : Int =

| {

| a % 2

| }

myfunc: (a: Int)Int

y(x => myfunc(x), 3).collect

res13: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))

y(myfunc(_), 1).collect

res15: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))

##### 22 combineByKey[Pair]

- [ ] ⽤于将[K,V]型的RDD转换为[K,C]型的RDD,VC可以相同也可以不同

```scala

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

// createCombiner 组合器函数,⽤于将V类型转换为C类型,输⼊参数为RDD[K,V]中的V,输出为C

// mergeValue 合并值函数,将⼀个C类型和⼀个V类型值合并成⼀个C类型,输⼊类型为(C, V),输出类型为C

// mergeCombiners 合并器函数,将两个C类型值合并为⼀个C类型,输⼊为(C, C),输出为C

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

// numPartitions: Int 结果的分区数,默认保持原有的分区数

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Bo

olean = true, serializerClass: String = null): RDD[(K, C)]

// partitioner 分区器,默认为HashPartitioner

// mapSideCombine 是否在map端进⾏combin操作,默认为true

// example

val a = elize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)

val b = elize(List(1,1,2,2,2,1,2,2,2),3)

val c = (a)

res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))

val d = eByKey(List(_),(x:List[String],y:String)=>y :: x,(x:List[String],y:List[String])=> x ::: y)

t

res22: Array[(Int, List[String])] = Array(

(1,List(cat, dog, turkey)),

(2,List(gnu, rabbit, salmon, bee, bear, wolf)))

⼆、Action

1 countByKey [Pair]

对相同key的value进⾏计数

def countByKey(): Map[K, Long]

// 返回⼀个Map[K,long] K:RDD中的key,value:相同key所对应的value的个数

//example

val c = elize(List((3,"gun"),(3,"yak"),(5,"mouse"),(3,"dog")),2)

yKey

res3: [Int,Long] = Map(3 -> 3, 5 -> 1)

2 reduce

def reduce(f: (T, T) => T): T

// f: (T, T) 函数的参数列表(T,T,函数的返回值类型 T

//example

val a = elize(1 to 10,3)

(_+_)

res6: Int = 55

3 collect

将RDD转换为Scala数组并返回

def collect(): Array[T]

Array[T] 返回结果

def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]

// f: PartialFunction[T, U]): RDD[U] 偏函数,输⼊类型为T,结果为RDD[U]

def toArray(): Array[T]

// example

def take(num: Int): Array[T]

// num: Int 传⼊的参数,即要获取的前多少个元素

// Array[T] 返回结果

val b = elize(List("dog", "cat", "ape", "salmon", "gnu"), 2)

(2)

res11: Array[String] = Array(dog, cat)

val b = elize(1 to 10000,5000)

(10)

res13: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

7 takeSample

8 takeOrdered