2023年11月26日发(作者:)

使⽤sparksql实现hive表⼩⽂件合并

在使⽤spark streaming将数据灌⼊hive时,或者spark批处理时如果分区设得很⼤,会导致hive表⽣成很多hdfs⼩⽂件,这个问题到⽬前为⽌

spark都没有⽐较好的解决⽅法(不知道spark3.0情况是什么样的)

折中解决:hive表按⼩时分区,sparkstreaming灌⼊hive后,在每个⼩时节点设置⼀个合并任务,将上⼀个⼩时分区数据的⼩⽂件进⾏合并,其实

就是将分区数据读出来使⽤spark重新分区减少分区量,再灌⼊原分区,则合并后该分区的⽂件数就是spark计算中的分区数,下⾯是代码

package com.ops

import java.text.SimpleDateFormat

import java.util.{Calendar, Date}

import org.apache.log4j.{Level, Logger}

object MergeHiveFile {

Logger.getLogger("org").setLevel(Level.ERROR)

val logger = Logger.getLogger(MergeHiveFile.getClass)

logger.setLevel(Level.INFO)

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder()

.master("yarn")

.appName("MergeHiveFile")

.config("", "hdfs://:8020/user/hive/warehouse")

.config("izer", "rializer")

.config("tringFields", "100")

.config("", "128")

.config("tringFields", "100")

.enableHiveSupport()

.getOrCreate()

val df = new SimpleDateFormat("yyyyMMddHH")

val currentDT = new Date()

val calendar = Calendar.getInstance()

calendar.setTime(currentDT)

calendar.add(Calendar.HOUR, -1)

val shouldMergeDT = df.format(new Date(calendar.getTimeInMillis))

println(shouldMergeDT)

val shouldMergeDayPartition = shouldMergeDT.substring(0, 8)

val shouldMergeHourPartition = shouldMergeDT.substring(8, 10)

spark.sql(

s"""select

|userid,

|lal,

|ts,

|fpr,

|rts,

|wx,

|url,

|min_time,

|avg_time,

|max_time,

|open_cnt,

|stime,

|file_time,

|host,

|etl_time,

|diff_time

|from front_logs.t_pp_nrt

|where pday = '$shouldMergeDayPartition'

|and phour = '$shouldMergeHourPartition'""".stripMargin)

.repartition(6)

.registerTempTable("tmp_t_pp_nrt")

spark.sql(s"insert overwrite table front_logs.t_pp_nrt partition(pday = '$shouldMergeDayPartition',phour = '$shouldMergeHourPartition')" +

s"select * from tmp_t_pp_nrt")

println(s"合并front_logs.t_po_nrt分区$shouldMergeDayPartition,$shouldMergeHourPartition 完成")

}

}