2023年11月26日发(作者:)
orc⼩⽂件合并趣谈
前⾔
这周做了个事情趁热沉淀⼀下。问题很明确。问题由来,要追溯到去年,了。之前做到和的管理,后续做了简
治理⼩⽂件集群治理存储计算
单(其中,就有⼩⽂件趋势监控)。最近,集群中namenode压⼒有所显现。于是,针对⼩⽂件多的⽬录进⾏了排查和治理。进⽽,有
hdfs画像
了今天的这个主题ORC⼩⽂件合并趣谈。
核⼼问题
这⾥,⾸先治理的是实时导⼊数据的⽬录。这⾥增量数据采⽤以动态分区增量写⼊的⽅式。众所周知,spark在处理时,每个都会
SparkSQLtask
写⼊⼀个⽂件(如果task处理的数据,包含n个分区的数据,就会产⽣n个⽂件)。进⽽,在并⾏度⾼的情况下,导致对应增量分区⽂件很多(存
储并不⼤)。
在中,平台统⼀要求将hive表的格式向格式靠拢。orc的表在存储和查询上都有很好的提升。所以,这个问题就间接的转化为解决
存储治理orc
orc⼩⽂件问题。
解决问题
处理上,采⽤程序升级和定时合并的⽅式。本⽂,主要介绍如何定时合并orc⽂件。
措施
⽅案对⽐
经过分析,总结了两种⽅式。
使⽤ORC原⽣DDL⽅式合并⼩⽂件功能。
ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE;
附:hive-ddl
优点:
1. 原⽣⽀持,开发量⼩。
2. 避免了数据的解压、解码过程。
缺点:
1. 不够优雅,⽆法指定最终合并的⽂件数,需要多次执⾏。
2. 可以控制最终⽂件数和⼤⼩。
缺点:
1. 需要⼀定的开发量。
2. 合并后,hive元数据需要主动去刷新处理(直接操作hdfs⽂件,⽆法同步到hive元数据)这点很重要。
实现
流程图
1. 主线程从元数据库获取需要合并处理(⽂件数⼤于1)的分区信息。
mainMetaStore
2. 根据分⼯不同,使⽤两个线程池完成异步处理。
3. 管理合并orc格式的hdfs线程。
mergeHdfsThreadPool
4. 管理统计合并分区的元数据信息线程,回馈到元数据库中。
flushMetastoreThreadPoolMetaStore
核⼼代码实现
ORC合并
这⾥,参照官⽹Using Core Java。⽅式,实现简单的⽂件合并处理。
/**
* 合并orc⽂件
* @param fileDir 需要合并的分区⽬录
* @throws Exception
*/
public static void orcFileRollUp(String fileDir) throws Exception {
Path srcPath = new Path(fileDir);
if (!(srcPath)) {
throw new Exception("fileDir is not exists");
}
if (!ctory(srcPath)) {
throw new Exception("fileDir is not directory");
}
FileStatus[] files = atus(srcPath);
try {
TypeDescription schema = getSchema(files);
if (schema != null) {
//删除g临时⽬录
String outFile = fileDir + tor + MARGE_FILE_NAME;
Path outMergeFilePath=null;
try {
outMergeFilePath = new Path(outFile);
throw new Exception(lStackTrace(e));
}
}
⽂件合并会先写到⽂件中,合并完成后,再将重命名为正式⽂件结尾。最后,将之前的⼩⽂件删除。
.g.g.merge
重新统计分区元数据
采⽤hive原⽣的统计⽅式。StatsDev。
其他注意点
1. 要在内的线程结束后执⾏。如何知道⼀个线程池内所有线程执⾏完毕?
flushMetastoreThreadPoolmergeHdfsThreadPool
线程池的,当所有线程都关闭时,会返回。
isTerminatedtrue
while(true){
if(inated()){
//转交flushMetastoreThreadPool执⾏
break;
}
}
结束
寥寥数笔,欢迎交流。


发布评论