2023年11月26日发(作者:)
spark常见RDD练习
Spark 常⽤RDD练习
其实还是推荐这个⽹站,写的很棒,
⼀、Transformation
1 map
Applies a transformation function on each item of the RDD and returns the result as a new RDD. (返回⼀个新的RDD,该
RDD有每⼀个输⼊元素经过func函数转换后组成)
def map[U: ClassTag](f: T => U): RDD[U]
val a=elize(List("dog","salmon","salmon","rat","elephant"),3)
res2: Array[String] = Array(dog, salmon, salmon, rat, elephant)
val b = (_.length)
This is a specialized map that is called only once for each partition. The entire content of the respective partitions is
available as a sequential stream of values via the input argument (Iterarator[T]). The custom function must return yet
another Iterator[U]. The combined result iterators are automatically converted into a new RDD. (类似于map,但独⽴的在
RDD的每⼀个分⽚上运⾏,因此在类型为T的RDD上运⾏时,func的函数类型必须是Iterator[T] =>Iterator[U])
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
参数说明:
Iterator[T] 传⼊T类型的Iterator
Iterator[U] 返回U类型的Iterator
preservesPartitioning 是否保留⽗RDD的partitioner分区信息
//example
val a = elize(1 to 9,3)
def myfunc[T](iter:Iterator[T]):Iterator[(T,T)]={
| var res = List[(T,T)]()
| var pre =
| while(t){
| val cur = ;
| res .::= (pre, cur)
| pre = cur;
def ++(other: RDD[T]): RDD[T]
def union(other: RDD[T]): RDD[T]
val a = elize(1 to 3)
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
// createCombiner: V => C 将当前的V作为参数,返回的类型为C(相当于初始换操作)
// mergeValue: (C, V) => C 将元素V 合并到C(即createCombiner),(该函数作⽤于每个分区)
// mergeCombiners: (C, C) => C 将两个元素C合并(该函数作⽤于不同的分区)
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
// numPartitions: Int 分区数
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolea
n = true, serializerClass: String = null): RDD[(K, C)]
// partitioner: Partitioner 分区器
// mapSideCombine: Boolean = true 是否在map端进⾏合并(默认为true)
// Example
val a = elize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val b = elize(List(1,1,2,2,2,1,2,2,2), 3)
val c = (a)
t
res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
val d = eByKey(List(_),(x:List[String],y:String) => y :: x,(m:List[String],n:List[String]) => m ::: n)
t
res1: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
10 groupByKey [Pair]
发⽣fhuffle
// groupByKey() 调⽤了combinebyKey
def groupByKey(): RDD[(K, Iterable[V])]
K key
Iterable[V] value的迭代器
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
numPartitions: Int 分区数
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
partitioner: Partitioner 分区器
// example
先对每个分区的原属进⾏聚合,然后对所有分区的结果做聚合,聚合过程中使⽤的是给定的函数及初始值,返回⼀个与原RDD不同类
型的RDD(需要⼀个合并RDD类型T到结果类型U 的函数,还需要⼀个合并类型U的函数,这两个函数都可以修改和返回它们的第⼀个
参数)
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
//zeroValue: U 给定的初始值,作⽤于局部聚合的每个分区及全局聚合的每个分区
//seqOp: (U, T) 每个分区聚合函数
//combOp: (U, U) 全局聚合
//example
val z = elize(List(1,2,3,4,5,6),2)
def myfunc(index:Int,iter:Iterator[Int]):Iterator[String]={(x => ("[partID: " + index + ",val: " + x + "]"))}
myfunc: (index: Int, iter: Iterator[Int])Iterator[String]
titionsWithIndex(myfunc).collect
res24: Array[String] = Array([partID: 0,val: 1], [partID: 0,val: 2], [partID: 0,val: 3], [partID: 1,val: 4], [partID: 1,val: 5], [partID: 1,val: 6])
ate(5)((_,_),_+_)
res26: Int = 16
//分析:
初始值为5,
partition 0 求最⼤值(5,1,2,3)得到最⼤值 5
partition 1 求最⼤值(5,4,5,6)得到最⼤值 6
全局聚合 5+5+6
13 aggregateByKey [Pair]
作⽤于相同key的value,初始值只作⽤于局部聚合,⽽不作⽤于全局聚合
发⽣shuffle
// 底层调⽤了combineByKey
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
// zeroValue: U ⾃定义的初始值
// (seqOp: (U, V) ⇒ U 局部聚合函数,返回类型为U类型
// combOp: (U, U) ⇒ U 全局聚合,返回类型为U类型
def aggregateByKey[U](zeroValue: U, numPartitions: Int)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
// numPartitions: Int 分区数
def aggregateByKey[U](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
// partitioner: Partitioner 分区器
// example
val pairRDD = elize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
def myfunc(index:Int,iter:Iterator[(String,Int)]):Iterator[String]={(x=>"[partID: "+ index + ",val: "+ x +"]")}
myfunc: (index: Int, iter: Iterator[(String, Int)])Iterator[String]
res1: Array[String] = Array([partID: 0,val: (cat,2)], [partID: 0,val: (cat,5)], [partID: 0,val: (mouse,4)], [partID: 1,val: (cat,12)], [partID: 1,val: (dog,12)], [pa
rtID: 1,val: (mouse,2)])
ateByKey(0)((_,_),_+_).collect
res2: Array[(String, Int)] = Array((dog,12), (cat,17), (mouse,6))
ateByKey(100)((_,_),_+_).collect
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
//func: (V, V) 传⼊的func
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
//numPartitions: Int 分区数
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
//partitioner: Partitioner 分区器
val a = elize(List("dog","cat","owl","gnu","ant"),2)
ByKey(_+_).collect
res8: Array[(Int, String)] = Array((3,dogcatowlgnuant))
val a = elize(List("dog","tiger","lion","cat","panther","eagle"))
ByKey(_+_).collect
res10: Array[(Int, String)] = Array((4,lion), (3,dogcat), (7,panther), (5,tigereagle))
15 sortBy
This function sorts the input RDD’s data and stores it in a new RDD. The first parameter requires you to specify a
Performs an inner join using two key-value RDDs. Please note that the keys must be generally comparable to make this
work. (在类型(K,V)和(K,W)的RDD上调⽤,返回⼀个相同的key对应的所有元素在⼀起的(K,(V,W))的RDD
发⽣shuffle
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
// other: RDD[(K, W)] 传⼊的RDD
// RDD[(K, (Iterable[V], Iterable[W]))] 返回的RDD(K:key,(Iterable[V], Iterable[W]) vaiue的迭代器
def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
// numPartitions: Int 分区数
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))]
// partitioner: Partitioner 分区器
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]
//exampie
val a = elize(List(1,2,1,3),1)
合并分区(coalesce:多—>少,窄依赖),
def coalesce ( numPartitions : Int , shuffle : Boolean = false ): RDD [T]
// numPartitions : Int 分区数
// shuffle : Boolean 是否发⽣shuffle,默认为false
def repartition ( numPartitions : Int ): RDD [T]
// numPartitions : Int 分区数,发⽣shuffle
val y = elize(1 to 10,5)
res16: Int = 5
val z = ce(2,false)
```scala
def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])]
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])]
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])]
val a = elize(1 to 9)
y(x=>{if(x%2==0) “even” else “odd”}).collect
res10: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9)))
val a = elize(1 to 9)
def myfunc(a:Int):Int={a%2}
myfunc: (a: Int)Int
y(myfunc).collect
res11: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
def myfunc(a:Int):Double={a%2}
y(myfunc).collect
res12: Array[(Double, Iterable[Int])] = Array((0.0,CompactBuffer(2, 4, 6, 8)), (1.0,CompactBuffer(1, 3, 5, 7, 9)))
val a = elize(1 to 9, 3)
def myfunc(a: Int) : Int =
| {
| a % 2
| }
myfunc: (a: Int)Int
y(x => myfunc(x), 3).collect
res13: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
y(myfunc(_), 1).collect
res15: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4, 6, 8)), (1,CompactBuffer(1, 3, 5, 7, 9)))
##### 22 combineByKey[Pair]
- [ ] ⽤于将[K,V]型的RDD转换为[K,C]型的RDD,V和C可以相同也可以不同
```scala
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
// createCombiner 组合器函数,⽤于将V类型转换为C类型,输⼊参数为RDD[K,V]中的V,输出为C
// mergeValue 合并值函数,将⼀个C类型和⼀个V类型值合并成⼀个C类型,输⼊类型为(C, V),输出类型为C
// mergeCombiners 合并器函数,将两个C类型值合并为⼀个C类型,输⼊为(C, C),输出为C
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
// numPartitions: Int 结果的分区数,默认保持原有的分区数
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Bo
olean = true, serializerClass: String = null): RDD[(K, C)]
// partitioner 分区器,默认为HashPartitioner
// mapSideCombine 是否在map端进⾏combin操作,默认为true
// example
val a = elize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"),3)
val b = elize(List(1,1,2,2,2,1,2,2,2),3)
val c = (a)
res0: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
val d = eByKey(List(_),(x:List[String],y:String)=>y :: x,(x:List[String],y:List[String])=> x ::: y)
t
res22: Array[(Int, List[String])] = Array(
(1,List(cat, dog, turkey)),
(2,List(gnu, rabbit, salmon, bee, bear, wolf)))
⼆、Action
1 countByKey [Pair]
对相同key的value进⾏计数
def countByKey(): Map[K, Long]
// 返回⼀个Map[K,long] K:RDD中的key,value:相同key所对应的value的个数
//example
val c = elize(List((3,"gun"),(3,"yak"),(5,"mouse"),(3,"dog")),2)
yKey
res3: [Int,Long] = Map(3 -> 3, 5 -> 1)
2 reduce
def reduce(f: (T, T) => T): T
// f: (T, T) 函数的参数列表(T,T),函数的返回值类型 T
//example
val a = elize(1 to 10,3)
(_+_)
res6: Int = 55
3 collect
将RDD转换为Scala数组并返回
def collect(): Array[T]
Array[T] 返回结果
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U]
// f: PartialFunction[T, U]): RDD[U] 偏函数,输⼊类型为T,结果为RDD[U]
def toArray(): Array[T]
// example
def take(num: Int): Array[T]
// num: Int 传⼊的参数,即要获取的前多少个元素
// Array[T] 返回结果
val b = elize(List("dog", "cat", "ape", "salmon", "gnu"), 2)
(2)
res11: Array[String] = Array(dog, cat)
val b = elize(1 to 10000,5000)
(10)
res13: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
7 takeSample
8 takeOrdered


发布评论