2023年11月26日发(作者:)

orc⼩⽂件合并趣谈

前⾔

这周做了个事情趁热沉淀⼀下。问题很明确。问题由来,要追溯到去年,了。之前做到的管理,后续做了简

治理⼩⽂件集群治理存储计算

(其中,就有⼩⽂件趋势监控)。最近,集群中namenode压⼒有所显现。于是,针对⼩⽂件多的⽬录进⾏了排查和治理。进⽽,有

hdfs画像

了今天的这个主题ORC⼩⽂件合并趣谈。

核⼼问题

这⾥,⾸先治理的是实时导⼊数据的⽬录。这⾥增量数据采⽤以动态分区增量写⼊的⽅式。众所周知,spark在处理时,每个都会

SparkSQLtask

写⼊⼀个⽂件(如果task处理的数据,包含n个分区的数据,就会产⽣n个⽂件)。进⽽,在并⾏度⾼的情况下,导致对应增量分区⽂件很多(存

储并不⼤)。

中,平台统⼀要求将hive表的格式向格式靠拢。orc的表在存储和查询上都有很好的提升。所以,这个问题就间接的转化为解决

存储治理orc

orc⼩⽂件问题。

解决问题

处理上,采⽤程序升级和定时合并的⽅式。本⽂,主要介绍如何定时合并orc⽂件。

措施

⽅案对⽐

经过分析,总结了两种⽅式。

使⽤ORC原⽣DDL⽅式合并⼩⽂件功能。

ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE;

附:hive-ddl

优点:

1. 原⽣⽀持,开发量⼩。

2. 避免了数据的解压、解码过程。

缺点:

1. 不够优雅,⽆法指定最终合并的⽂件数,需要多次执⾏。

2. 可以控制最终⽂件数和⼤⼩。

缺点:

1. 需要⼀定的开发量。

2. 合并后,hive元数据需要主动去刷新处理(直接操作hdfs⽂件,⽆法同步到hive元数据)这点很重要。

实现

流程图

1. 主线程从元数据库获取需要合并处理(⽂件数⼤于1)的分区信息。

mainMetaStore

2. 根据分⼯不同,使⽤两个线程池完成异步处理。

3. 管理合并orc格式的hdfs线程。

mergeHdfsThreadPool

4. 管理统计合并分区的元数据信息线程,回馈到元数据库中。

flushMetastoreThreadPoolMetaStore

核⼼代码实现

ORC合并

这⾥,参照官⽹Using Core Java。⽅式,实现简单的⽂件合并处理。

/**

* 合并orc⽂件

* @param fileDir 需要合并的分区⽬录

* @throws Exception

*/

public static void orcFileRollUp(String fileDir) throws Exception {

Path srcPath = new Path(fileDir);

if (!(srcPath)) {

throw new Exception("fileDir is not exists");

}

if (!ctory(srcPath)) {

throw new Exception("fileDir is not directory");

}

FileStatus[] files = atus(srcPath);

try {

TypeDescription schema = getSchema(files);

if (schema != null) {

//删除g临时⽬录

String outFile = fileDir + tor + MARGE_FILE_NAME;

Path outMergeFilePath=null;

try {

outMergeFilePath = new Path(outFile);

throw new Exception(lStackTrace(e));

}

}

⽂件合并会先写到⽂件中,合并完成后,再将重命名为正式⽂件结尾。最后,将之前的⼩⽂件删除。

.g.g.merge

重新统计分区元数据

采⽤hive原⽣的统计⽅式。StatsDev

其他注意点

1. 要在内的线程结束后执⾏。如何知道⼀个线程池内所有线程执⾏完毕?

flushMetastoreThreadPoolmergeHdfsThreadPool

线程池的,当所有线程都关闭时,会返回

isTerminatedtrue

while(true){

if(inated()){

//转交flushMetastoreThreadPool执⾏

break;

}

}

结束

寥寥数笔,欢迎交流。