2023年11月26日发(作者:)

spark⼩⽂件合并解决多级分区

package

import DateFormat

import

import .{FileSystem, Path}

import

import ntext

import .{SparkConf, SparkContext, sql}

import .{DataFrame, SQLContext, SaveMode}

/**

* Created by hadoop on 下午10:01.

*

*/

object AccesslogETL {

return result

}

(s" $inputLocation move to $inputDoingLocation success")

}

val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

val endTime = (dataTime)

val beginTime = (e() - 2*60*60*1000)

val curHourTime = (e() - 1*60*60*1000)

var resultDF:DataFrame = null

// 遍历⽬录下的所有house⼦⽬录

atus(new Path(inputDoingLocation+"/*")).foreach(p=>{

val hLoc = ng

// 调⽤计算分区⼤⼩的代码

val getPartitionNum = (houseLoc:String)=>{

//计算逻辑略

1

}

val partitionNum = getPartitionNum(hLoc)

("hLoc:" + hLoc + ", partitionNum:" + partitionNum)

// 根据每个house⽬录⽣成DataFrame, 过程略

val hDF:DataFrame = null // 调⽤⽣成DataFrame的逻辑,这⾥使⽤null占位

//最近1个⼩时的log

val curHourDF = (s"acctime>='$curHourTime'")

// 1个⼩时之前的log

val preHourDF = (s"acctime>'$beginTime' and acctime<'$curHourTime' ")

// 1个⼩时之前的数据, 数据量⼩, 分区数设置为1/3分区⼤⼩

val preHourPartNum = if(partitionNum/3 == 0) 1 else partitionNum/3

val newDF = ce(partitionNum).unionAll(ce(preHourPartNum))

if(resultDF != null){

resultDF = ll(newDF)

}else{

resultDF = newDF