This post explains about the small files problem that I faced with my Hadoop MapReduce program and how I solved it.
I wrote a MR program and ran it - the input consisted of 1000 such small files. It simply created 1000 Mappers - one each for file. The MR program ran longer than what I thought it would take to complete.
I thought of improving this and made some changes - and it resulted in drastic improvement in performance. My MR ran in 4 times less than the original time.
The default FileInputFormat in my case, simply created 1000 splits (as all 1000 files
were less than 64 MB) resulting in 1000 mappers being run which was inefficient.
The solution is to use CombineFileInputFormat. This class is really efficient, as it also takes rack and node locality into account. However, it is an abstract class. Though it does most of the work, some implementation is left to us (which is not easy to write).
I read one superb implementation in this this blog post. When I used this approach, my MR ran in less time, but it created only 1 mapper - yes only 1 for the entire MR run. But, this is not ideal. Let's say we have a total of 600 MB of data in 100 files - we would ideally need 10 splits - each split to deal with 64 MB of data - note that, each split data (64 MB) might be contained in multiple files.
And the default is 0 - as you can see in the variable declaration.
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
public class MyCombineFileInputFormat extends CombineFileInputFormat<LongWritable, Text> {
public MyCombineFileInputFormat() {
//this is the most important line!
//setting the max split size to 64 MB
setMaxSplitSize(67108864);
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit is,
TaskAttemptContext tac) throws IOException {
return new CombineFileRecordReader<LongWritable,Text>((CombineFileSplit) is,
tac, MyRecordReader.class);
}
}
And, the record reader can just reuse the LineRecordReader like this:
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class MyRecordReader extends RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
public MyRecordReader(CombineFileSplit split, TaskAttemptContext context,
Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index),
split.getLength(index), split.getLocations());
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(filesplit, context);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext tac)
throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lineRecordReader.nextKeyValue();
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return lineRecordReader.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lineRecordReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
Once this was done, the number of mappers created was ideal and this drastically reduced the overall time taken for my MR program.
Problem:
The small files problem is known in Hadoop. The problem is that if there are a number of small files, all less than the default size of 64 MB, the MapReduce (MR) is not very efficient.I wrote a MR program and ran it - the input consisted of 1000 such small files. It simply created 1000 Mappers - one each for file. The MR program ran longer than what I thought it would take to complete.
I thought of improving this and made some changes - and it resulted in drastic improvement in performance. My MR ran in 4 times less than the original time.
Explanation:
Let me explain the solution before I proceed to provide code. The default input format in MR is FileInputFormat which is an implementation of InputFormat. Every InputFormat implementation should return a List<InputSplit>. Hadoop internally calls size() on this list and creates those many mappers i.e. 1 mapper for 1 split.The default FileInputFormat in my case, simply created 1000 splits (as all 1000 files
were less than 64 MB) resulting in 1000 mappers being run which was inefficient.
The solution is to use CombineFileInputFormat. This class is really efficient, as it also takes rack and node locality into account. However, it is an abstract class. Though it does most of the work, some implementation is left to us (which is not easy to write).
I read one superb implementation in this this blog post. When I used this approach, my MR ran in less time, but it created only 1 mapper - yes only 1 for the entire MR run. But, this is not ideal. Let's say we have a total of 600 MB of data in 100 files - we would ideally need 10 splits - each split to deal with 64 MB of data - note that, each split data (64 MB) might be contained in multiple files.
Key Idea:
So, I wondered why this happened and browsed through the CombinFileInputFormat.java source (wow, one of the perks of using open source software). I found that, in the getInputSplits() implementation, if the maxSplitSize is zero, it creates only one split!And the default is 0 - as you can see in the variable declaration.
Implementation:
So, the trick lies in setting this to a different value in your implementation, like:import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
public class MyCombineFileInputFormat extends CombineFileInputFormat<LongWritable, Text> {
public MyCombineFileInputFormat() {
//this is the most important line!
//setting the max split size to 64 MB
setMaxSplitSize(67108864);
}
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit is,
TaskAttemptContext tac) throws IOException {
return new CombineFileRecordReader<LongWritable,Text>((CombineFileSplit) is,
tac, MyRecordReader.class);
}
}
And, the record reader can just reuse the LineRecordReader like this:
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
public class MyRecordReader extends RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
public MyRecordReader(CombineFileSplit split, TaskAttemptContext context,
Integer index) throws IOException {
FileSplit filesplit = new FileSplit(split.getPath(index), split.getOffset(index),
split.getLength(index), split.getLocations());
lineRecordReader = new LineRecordReader();
lineRecordReader.initialize(filesplit, context);
}
@Override
public void initialize(InputSplit split, TaskAttemptContext tac)
throws IOException, InterruptedException {
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return lineRecordReader.nextKeyValue();
}
@Override
public LongWritable getCurrentKey() throws IOException, InterruptedException {
return lineRecordReader.getCurrentKey();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return lineRecordReader.getCurrentValue();
}
@Override
public float getProgress() throws IOException, InterruptedException {
return lineRecordReader.getProgress();
}
@Override
public void close() throws IOException {
lineRecordReader.close();
}
}
Once this was done, the number of mappers created was ideal and this drastically reduced the overall time taken for my MR program.
No comments:
Post a Comment