Hadoop IO and file formats

In this post dedicated to Big Data I would like to summarize hadoop file formats and provide some brief introduction to this topic. As things are constantly evolving especially in the big data area I will be glad for comments in case I missed something important. Big Data framework changes but InputFormat and OutputFormat stays the same. Doesn’t matter what’s big data technology is in use, can be hadoop, spark or …

Let’s start with some basic terminology and general principles. Key term in mapreduce paradigm is split which defines a chunk of the data processed by single map. Split is further divided into record where every record is represented as a key-value pair. That is what you actually know from mapper API as your input. The number of splits gives you essentially the number of map tasks necessary to process the data which is not in clash with number of defined mappers for your mapreduce slots. This just means that some map tasks need to wait untill the map slot is available for processing. This abstraction is hidden in IO layer particularly InputFormat or OutputFormat class which contains RecordReader, RecordWriter class responsible for further division into records. Hadoop comes with a bunch of pre-defined file formats classes e.g. TextInputFormat, DBInputFormat, CombinedInputFormat and many others. Needless to say that there is nothing which prevents you from coming with your custom file formats.

Described abstraction model is closely related to mapreduce paradigm but what is the relation to underlying storage like HDFS? First of all, mapreduce and distributed file system (DFS) are two core hadoop concepts  which are “independent” and the relation is defined just through the API between those components. The well-known DFS implementation is HDFS but there are several other possibilities(s3, azure blob, …). DFS is constructed for large datasets. Core concept in DFS is a block which represents a basic unit of the original dataset for a manipulation and processing e.g. replication etc. This fact puts also additional requirements on dataset file format: it has to be splittable – that means that you can process a given block independently from the rest of the dataset. If the file format is not splittable and you would run a mapreduce job you wouldn’t get any level of parallelism and the dataset would be processed by a single mapper. Splittability requirement also applies if the compression is desired as well.

What is the relation between a block from DFS and split from mapreduce? Both of them are essentially key abstractions for paralelization but just in different frameworks and in ideal case they are aligned. If they are perfectly aligned that hadoop can take full advantage of so-called data locality feature which runs the map or reduce tasks on a cluster node where the data resides and minimize the additional network traffic. In case of imprecise alignment a remote reads will happen for records missing for a given split. For that reasons file formats includes sync markers or points.

To take an advantage and full power of hadoop you design your system for a big files. Typically the DFS block size is 64MB but can be bigger. That means that biggest hadoop enemy is a small file. The number of files which lives in DFS is somehow limited by the size of Name Node memory. All the datasets metadata are kept in memory. Hadoop offers several strategies how to avoid of this bad scenario. Let’s go through those file formats.

HAR file (stands for hadoop archive) – is a specific file format which essentially packs a bunch of files into a single logical unit which is kept on name node. HAR files doesn’t support additional compression and as far as I know are transparent to mapreduce. Can help if name nodes are running out of memory.

Sequence file is a kind of file based data structure. This file format is splittable as it contains a sync point after several records. Record consists of key – value and metadata. Where key and value is serialized via class whose name is kept in the metadata. Classes used for serialization needs to be on CLASSPATH.

Map file is again a kind of hadoop file based data structure and it differs from a sequence file in a matter of the order. Map file is sorted and you can perform a look up. Behavior pretty similar to java.util.Map class.

Avro data file is based on avro serializaton framework which was primarily created for hadoop. It is a splittable file format with a metadata section at the beginning and then a sequence of avro serialized objects. Metadata section contains a schema for a avro serialization. This format allows a comparison of data without deserialization.

Google Protocol buffers are not natively supported by hadoop but you can plug the support via libraries as elephant-bird from twitter.

So what about file formats as XML and JSON? They are not natively splittable and so “hard” to deal.  A common practice is to store then into text file a single message per line.

For textual files needless to say that those files are the first class citizens in hadoop. TextInputFormat and TextOutputFormat deal with those. Byte offset is used as key and  the value is  the line content.

This blog post just scratches the surface of hadoop file formats but I hope that it provides a good introduction and explain connection between two essential concepts – mapreduce and DFS. For further reference book Hadoop Definite guide goes into the great detail.

Advertisements