Hadoop小文件合并

HDFS上的小文件问题产生原因及后果

产生原因:
后果:

几种文件合并的代码记录

抽象类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

public abstract class CombineSmallFile {

private static final Logger log = Logger.getLogger(CombineSmallFile.class);

public String tablePathStr;
public String beginPartDate;
public String endPartDate;
public FileSystem fs;
public String filePrefix;
public SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public CombineSmallFile(){
}


public void execute() throws IOException {

// 获取需要合并的分区列表
List<Path> needCombinePartitions = getNeedCombinePartition(tablePathStr);
log.info("got need combine partition!");

// 合并分区
for (Path partition:needCombinePartitions) {
log.info("combine partition:" + partition.getName());
ArrayList<Path> fileList;
boolean isGoOn = true;
while (isGoOn){
fileList = getFileList(partition,filePrefix);
if (fileList.size()<100){
isGoOn = false;
}
if (fileList.size() > 1) {
combineFiles(partition, fileList);
} else {
log.info("small files is less than 1");
}
}
}
}

public abstract void combineFiles(Path partition, ArrayList<Path> fileList) throws IOException;


private List<Path> getNeedCombinePartition(String tablePathStr) throws IOException{

Path tablePath = new Path(HdfsUtil.getBaseURI()+ tablePathStr);
FileStatus[] partitionFileStatus = fs.listStatus(tablePath);
ArrayList<Path> partitionPathList = new ArrayList<>();
for (FileStatus fileStatus:partitionFileStatus){
Path partitionPath = fileStatus.getPath();
String partitionName = partitionPath.getName();
if (fileStatus.isDirectory() && partitionName.compareTo(beginPartDate)>=0 && partitionName.compareTo(endPartDate)<=0 ){
partitionPathList.add(partitionPath);
log.info(partitionName);
}
}
return partitionPathList;
}

private ArrayList<Path> getFileList(Path partitionPath,String filePrefix) throws IOException {
ArrayList<Path> pathArrayList = new ArrayList<>();
// 分区下的所有文件
FileStatus[] fileStatuses = fs.listStatus(partitionPath);
for (FileStatus fileStatus : fileStatuses) {
Path filePath = fileStatus.getPath();
String filePathName = filePath.getName();
long accessTime = fileStatus.getModificationTime();
long expiredTime = System.currentTimeMillis() - accessTime;
if (filePathName.startsWith(".complete") && expiredTime >=12*3600*1000){
log.info("find a expire .complete file! delete it!\t\t"+filePathName + "\t" + sdf.format(accessTime));
fs.delete(filePath,true);
}
if (filePathName.equals("..combining") && expiredTime <=3*3600*1000){
log.info("this partition is combining, skip it.");
pathArrayList.clear();
return pathArrayList;
}
// 小于100M 的 complete 开头的文件 || 小于50M的combine开头文件
if ((filePathName.startsWith(filePrefix) && fileStatus.getLen()<=100*1024*1024) || (filePathName.startsWith("combined") && fileStatus.getLen()<=50*1024*1024)) {
pathArrayList.add(filePath);
}

// 每次合并不多于100个文件
if (pathArrayList.size()>=100){
return pathArrayList;
}
}
return pathArrayList;
}
}
合并 parquet 文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

import java.io.IOException;
import java.util.ArrayList;

public class CombineParquetSmallFile extends CombineSmallFile {

private static final Logger log = Logger.getLogger(CombineParquetSmallFile.class);

/**
* @param tablePathStr String for table path . eg: /apps/hive/warehouse/oracledb.db/t_ext
* @param beginPartDate begin partDate . eg: partdate=date20200912
* @param endPartDate end partDate . eg: partdate=date20200921
*/
public CombineParquetSmallFile(String tablePathStr, String beginPartDate, String endPartDate, String filePrefix){
this.tablePathStr = tablePathStr;
this.beginPartDate = beginPartDate;
this.endPartDate = endPartDate;
this.fs = HdfsUtil.getFileSystem();
this.filePrefix = filePrefix;
}

@Override
public void combineFiles(Path partition, ArrayList<Path> fileList) throws IOException {
// 获取第一个complete文件的 MessageType
MessageType messageType = null;
for (Path file : fileList){
if (file.getName().startsWith("complete")){
ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(HdfsUtil.getConf(), fileList.get(0));
messageType = parquetMetadata.getFileMetaData().getSchema();
log.info("get MessageType: " + file.getName());
break;
}
}
if (messageType == null){
log.warn("can't get MessageType");
}

// 新文件
Path combiningDir = new Path(partition.toString() + "/..combining");
if (fs.exists(combiningDir)){
log.info("combining history directory is exists! delete it!+\t"+sdf.format(fs.getFileStatus(combiningDir).getAccessTime()));
fs.delete(combiningDir, true);
}
fs.mkdirs(combiningDir);
String combineFileName = "/combined-" + System.currentTimeMillis() + ".parquet";
// partition/..combining/combine-1111.parquet
Path combineFilePath = new Path(combiningDir.toString() + combineFileName);
ParquetWriter<Group> writer = newParquetWriter(combineFilePath, messageType);

for (Path file : fileList) {
log.info("combining file: "+file.getName());
GroupReadSupport readSupport = new GroupReadSupport();
ParquetReader reader = ParquetReader.builder(readSupport, file).build();
Group group;
while ((group = (Group) reader.read()) != null) {
writer.write(group);
}
reader.close();
}
writer.close();
try {
// partition/..combining/combine-1111.parquet to partition/combine-1111.parquet
fs.rename(combineFilePath, new Path(partition.toString()+ combineFileName));
log.info("partition: " + partition.toString() + " combine completed ! begin to delete complete file..");
for (Path path : fileList) {
fs.delete(path, true);
}
fs.delete(combiningDir,true);
log.info("delete small file completed!");
} catch (Exception e) {
log.error("", e);
}
}

private ParquetWriter<Group> newParquetWriter(Path path, MessageType initSchema) throws IOException {
ParquetWriter<Group> parquetWriter = ExampleParquetWriter
.builder(path)
.withWriteMode(ParquetFileWriter.Mode.CREATE)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withConf(HdfsUtil.getConf())
.withPageSize(1024 * 8)
.withType(initSchema)
.build();
return parquetWriter;
}
}
合并 textfile 小文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;

public class CombineTextSmallFile extends CombineSmallFile {

private static final Logger log = Logger.getLogger(CombineTextSmallFile.class);

public CombineTextSmallFile(String tablePathStr, String beginPartDate, String endPartDate, String filePrefix){
this.tablePathStr = tablePathStr;
this.beginPartDate = beginPartDate;
this.endPartDate = endPartDate;
this.fs = HdfsUtil.getFileSystem();
this.filePrefix = filePrefix;
}

@Override
public void combineFiles(Path partition, ArrayList<Path> fileList) throws IOException {
// 新文件
Path combiningDir = new Path(partition.toString() + "/..combining");
if (fs.exists(combiningDir)){
log.info("combining history directory is exists! delete it!\t"+sdf.format(fs.getFileStatus(combiningDir).getModificationTime()));
fs.delete(combiningDir, true);
}
fs.mkdirs(combiningDir);
String combineFileName = "/combined-" + System.currentTimeMillis() + ".txt";
// partition/..combining/combine-1111.parquet
Path combineFilePath = new Path(combiningDir.toString() + combineFileName);

FSDataOutputStream fsDataOutputStream = fs.create(combineFilePath);

for (Path file : fileList) {
log.info("combining file: "+file.getName());
FSDataInputStream fsDataInputStream = fs.open(file);
IOUtils.copyBytes(fsDataInputStream,fsDataOutputStream,4096,false);
fsDataInputStream.close();
}
fsDataOutputStream.close();

try {
// partition/..combining/combine-1111.parquet to partition/combine-1111.parquet
fs.rename(combineFilePath, new Path(partition.toString()+ combineFileName));
log.info("partition: " + partition.toString() + " combine completed ! begin to delete complete file..");
for (Path path : fileList) {
fs.delete(path, true);
}
fs.delete(combiningDir,true);
log.info("delete small file completed!");
} catch (Exception e) {
log.error("", e);
}

}

}
合并 ORC 文件
1
待定