2023年11月26日发(作者:)
spark⼩⽂件合并解决多级分区
package
import DateFormat
import
import .{FileSystem, Path}
import
import ntext
import .{SparkConf, SparkContext, sql}
import .{DataFrame, SQLContext, SaveMode}
/**
* Created by hadoop on 下午10:01.
*
*/
object AccesslogETL {
return result
}
(s" $inputLocation move to $inputDoingLocation success")
}
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val endTime = (dataTime)
val beginTime = (e() - 2*60*60*1000)
val curHourTime = (e() - 1*60*60*1000)
var resultDF:DataFrame = null
// 遍历⽬录下的所有house⼦⽬录
atus(new Path(inputDoingLocation+"/*")).foreach(p=>{
val hLoc = ng
// 调⽤计算分区⼤⼩的代码
val getPartitionNum = (houseLoc:String)=>{
//计算逻辑略
1
}
val partitionNum = getPartitionNum(hLoc)
("hLoc:" + hLoc + ", partitionNum:" + partitionNum)
// 根据每个house⽬录⽣成DataFrame, 过程略
val hDF:DataFrame = null // 调⽤⽣成DataFrame的逻辑,这⾥使⽤null占位
//最近1个⼩时的log
val curHourDF = (s"acctime>='$curHourTime'")
// 1个⼩时之前的log
val preHourDF = (s"acctime>'$beginTime' and acctime<'$curHourTime' ")
// 1个⼩时之前的数据, 数据量⼩, 分区数设置为1/3分区⼤⼩
val preHourPartNum = if(partitionNum/3 == 0) 1 else partitionNum/3
val newDF = ce(partitionNum).unionAll(ce(preHourPartNum))
if(resultDF != null){
resultDF = ll(newDF)
}else{
resultDF = newDF


发布评论