2023年11月26日发(作者:)

PySpark列的合并与拆分

1、读⼊数据

>>> from pyspark.sql import SparkSession

>>> spark = SparkSession.builder.master("local").appName("dataframe_split").config("", "some-value").getOrCreate()

>>> sc = spark.sparkContext

>>> df = spark.read.csv("./", inferSchema=True, header=True)

>>> df.show()

+---+-----------+

|gid| score|

+---+-----------+

| a1|90 80 79 80|

| a2|79 89 45 60|

| a3|57 56 89 75|

+---+-----------+

2、列数据的分割

>>> from pyspark.sql.functions import split, explode, concat, concat_ws

>>> df_split = df.withColumn("s", split(df['score'], " "))

>>> df_split.show()

+---+-----------+----------------+

|gid| score| s|

+---+-----------+----------------+

| a1|90 80 79 80|[90, 80, 79, 80]|

| a2|79 89 45 60|[79, 89, 45, 60]|

| a3|57 56 89 75|[57, 56, 89, 75]|

+---+-----------+----------------+

3、列数据的拆分

zipWithIndex:给每个元素⽣成⼀个索引

排序⾸先基于分区索引,然后是每个分区内的项⽬顺序.因此,第⼀个分区中的第⼀个item索引为0,最后⼀个分区中的最后⼀个item的索

引最⼤.当 RDD包含多个分区时此⽅法需要触发spark作业.

>>> attrs = sc.parallelize(["score_" + str(i) for i in range(numAttrs)]).zipWithIndex().collect()

>>> attrs

[('score_0', 0), ('score_1', 1), ('score_2', 2), ('score_3', 3)]

>>> for name, index in attrs:

... df_split = df_split.withColumn(name, df_split['s'].getItem(index))

>>> df_split.show()

+---+-----------+----------------+-------+-------+-------+-------+

|gid| score| s|score_0|score_1|score_2|score_3|

+---+-----------+----------------+-------+-------+-------+-------+

| a1|90 80 79 80|[90, 80, 79, 80]| 90| 80| 79| 80|

| a2|79 89 45 60|[79, 89, 45, 60]| 79| 89| 45| 60|

| a3|57 56 89 75|[57, 56, 89, 75]| 57| 56| 89| 75|

+---+-----------+----------------+-------+-------+-------+-------+

4、将⼀⾏分成多⾏

>>> df.show()

+---+-----------+

|gid| score|

+---+-----------+

| a1|90 80 79 80|

| a2|79 89 45 60|

| a3|57 56 89 75|

+---+-----------+

>>> df_explode = df.withColumn("e", explode(split(df['score'], " ")))

>>> df_explode.show()

+---+-----------+---+

|gid| score| e|

+---+-----------+---+

| a1|90 80 79 80| 90|

| a1|90 80 79 80| 80|

| a1|90 80 79 80| 79|

| a1|90 80 79 80| 80|

| a2|79 89 45 60| 79|

| a2|79 89 45 60| 89|

| a2|79 89 45 60| 45|

| a2|79 89 45 60| 60|

| a3|57 56 89 75| 57|

| a3|57 56 89 75| 56|

| a3|57 56 89 75| 89|

| a3|57 56 89 75| 75|

+---+-----------+---+

5、列数据的合并

列的合并有两个函数:⼀个不添加分隔符concat(),⼀个添加分隔符concat_ws()

concat

>>> df_concat = df_split.withColumn("score_concat", concat(df_split['score_0'], df_split['score_1'], df_split['score_2'], df_split['score_3']))

>>> df_concat.show()

+---+-----------+----------------+-------+-------+-------+-------+------------+

|gid| score| s|score_0|score_1|score_2|score_3|score_concat|

+---+-----------+----------------+-------+-------+-------+-------+------------+

| a1|90 80 79 80|[90, 80, 79, 80]| 90| 80| 79| 80| 90807980|

| a2|79 89 45 60|[79, 89, 45, 60]| 79| 89| 45| 60| 79894560|

| a3|57 56 89 75|[57, 56, 89, 75]| 57| 56| 89| 75| 57568975|

+---+-----------+----------------+-------+-------+-------+-------+------------+

caoncat_ws

>>> df_ws = df_split.withColumn("score_concat", concat_ws('-',df_split['score_0'], df_split['score_1'], df_split['score_2'], df_split['score_3']))

>>> df_ws.show()

+---+-----------+----------------+-------+-------+-------+-------+------------+

|gid| score| s|score_0|score_1|score_2|score_3|score_concat|

+---+-----------+----------------+-------+-------+-------+-------+------------+

| a1|90 80 79 80|[90, 80, 79, 80]| 90| 80| 79| 80| 90-80-79-80|

| a2|79 89 45 60|[79, 89, 45, 60]| 79| 89| 45| 60| 79-89-45-60|

| a3|57 56 89 75|[57, 56, 89, 75]| 57| 56| 89| 75| 57-56-89-75|

+---+-----------+----------------+-------+-------+-------+-------+------------+

6、多⾏转多列

pivot: 旋转当前列并执⾏指定的聚合操作

>>> df = spark.sparkContext.parallelize([[15,399,2],[15,1401,5],[15,1608,4],[15,20,4],[18,100,3],[18,1401,3], [18,399,1]]).toDF(["userID","movieID","rating"])

>>> resultDF = df.groupBy("userID").pivot("movieID").sum("rating").na.fill(-1)

>>> resultDF.show()

+------+---+---+---+----+----+

|userID| 20|100|399|1401|1608|

+------+---+---+---+----+----+

| 18| -1| 3| 1| 3| -1|

| 15| 4| -1| 2| 5| 4|

+------+---+---+---+----+----+