2023年11月26日发(作者:)
使⽤sparksql实现hive表⼩⽂件合并
在使⽤spark streaming将数据灌⼊hive时,或者spark批处理时如果分区设得很⼤,会导致hive表⽣成很多hdfs⼩⽂件,这个问题到⽬前为⽌
spark都没有⽐较好的解决⽅法(不知道spark3.0情况是什么样的)
折中解决:hive表按⼩时分区,sparkstreaming灌⼊hive后,在每个⼩时节点设置⼀个合并任务,将上⼀个⼩时分区数据的⼩⽂件进⾏合并,其实
就是将分区数据读出来使⽤spark重新分区减少分区量,再灌⼊原分区,则合并后该分区的⽂件数就是spark计算中的分区数,下⾯是代码
package com.ops
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import org.apache.log4j.{Level, Logger}
object MergeHiveFile {
Logger.getLogger("org").setLevel(Level.ERROR)
val logger = Logger.getLogger(MergeHiveFile.getClass)
logger.setLevel(Level.INFO)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("yarn")
.appName("MergeHiveFile")
.config("", "hdfs://:8020/user/hive/warehouse")
.config("izer", "rializer")
.config("tringFields", "100")
.config("", "128")
.config("tringFields", "100")
.enableHiveSupport()
.getOrCreate()
val df = new SimpleDateFormat("yyyyMMddHH")
val currentDT = new Date()
val calendar = Calendar.getInstance()
calendar.setTime(currentDT)
calendar.add(Calendar.HOUR, -1)
val shouldMergeDT = df.format(new Date(calendar.getTimeInMillis))
println(shouldMergeDT)
val shouldMergeDayPartition = shouldMergeDT.substring(0, 8)
val shouldMergeHourPartition = shouldMergeDT.substring(8, 10)
spark.sql(
s"""select
|userid,
|lal,
|ts,
|fpr,
|rts,
|wx,
|url,
|min_time,
|avg_time,
|max_time,
|open_cnt,
|stime,
|file_time,
|host,
|etl_time,
|diff_time
|from front_logs.t_pp_nrt
|where pday = '$shouldMergeDayPartition'
|and phour = '$shouldMergeHourPartition'""".stripMargin)
.repartition(6)
.registerTempTable("tmp_t_pp_nrt")
spark.sql(s"insert overwrite table front_logs.t_pp_nrt partition(pday = '$shouldMergeDayPartition',phour = '$shouldMergeHourPartition')" +
s"select * from tmp_t_pp_nrt")
println(s"合并front_logs.t_po_nrt分区$shouldMergeDayPartition,$shouldMergeHourPartition 完成")
}
}


发布评论