Pages

Wednesday 30 April 2014

XML Processing in Map Reduce

In this post, I will describe how to process xml files using hadoop. XML files can be process using Hadoop streaming but we will process an other way which is more efficient than hadoop streaming.

  1.  Drive Class to run the program
  2. Mapper Class
  3. XmlInputFormat class

I am not using reducers to make the example simple. Now Lets do some programming to work out these things.

Driver Class:

Here is the code for driver class. which is explained below.
The code is very easy. In start tag and end tag you need to define one record

import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
*
* @author root
*/
public class XmlReader {


/**
 *
 * @author kiran Bhakre
 */
public static void main(String[] args) {
try {
runJob(args[0], args[1]);

} catch (IOException ex) {
//Logger.getLogger(XmlReader.class.getName()).log(Level.SEVERE, null, ex);
}

}


public static void runJob(String input,
String output ) throws IOException {

Configuration conf = new Configuration();

conf.set("xmlinput.start","<property>");
conf.set("xmlinput.end", "</property>");
conf
.set(
"io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,org.apache.hadoop.io.serializer.WritableSerialization");

Job job = new Job(conf, "jobName");


FileInputFormat.setInputPaths(job, input);
job.setJarByClass(XmlReader.class);
job.setMapperClass(XmlReadMapper.class);
job.setNumReduceTasks(0);
job.setInputFormatClass(XmlInputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
FileSystem dfs = FileSystem.get(outPath.toUri(), conf);
if (dfs.exists(outPath)) {
dfs.delete(outPath, true);
}


try {

job.waitForCompletion(true);

} catch (InterruptedException ex) {
//Logger.getLogger(XmlReader.class.getName()).log(Level.SEVERE, null, ex);
} catch (ClassNotFoundException ex) {
//Logger.getLogger(XmlReader.class.getName()).log(Level.SEVERE, null, ex);
}

}

}


conf.set("xmlinput.start", "<startingTag>");
conf.set("xmlinput.end", "</endingTag>");

Then you need to set input path, output path which i am taking as command line arguments, need to set mapper class.

Next we will define our mapper.

Mapper:

To parse the xml files, you need some parser library, There are many ways to parse xml file in java like using SAX, DOM parser. I have used jdom library to parse the xml file. Here is the code for mapper class which is explained below.

import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.input.SAXBuilder;

/**
 *
 * @author kiran Bhakre
 */
public class XmlReadMapper   extends
    Mapper<LongWritable, Text, NullWritable, Text> {

    @Override
    public void map(LongWritable key, Text value1,Context context) throws IOException, InterruptedException {
                
    String xmlString = value1.toString();
              
            SAXBuilder builder = new SAXBuilder();
            Reader in = new StringReader(xmlString);
            String value="";
            try {
           
           Document doc = builder.build(in);
           Element root = doc.getRootElement();
           
           String tag1 =root.getChild("tag").getChild("tag1").getTextTrim() ;
            
           String tag2 =root.getChild("tag").getChild("tag1").getChild("tag2").getTextTrim();
            value= tag1+ ","+tag2;
            context.write(NullWritable.get(), new Text(value));
       } catch (JDOMException ex) {
          // Logger.getLogger(XmlReadMapper.class.getName()).log(Level.SEVERE, null, ex);
       } catch (IOException ex) {
          // Logger.getLogger(XmlReadMapper.class.getName()).log(Level.SEVERE, null, ex);
        }
    
    }

}

The code is very simple, you are getting the record in value1 and then parsing the data and then sending the data using
context.write(NullWritable.get(), new Text(value));

I did not require key so i use NullWritable and value contains comma delimited record after parsing.

Next, i am also providing the Mahout XMLInputFormat class code which is compatible with new Hadoop API.

Mahout XMLinputFormat (Compatible with New Hadoop API):

import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

/**
 * @author kiran Bhakre
 * Reads element in records that are delimited by a specific Start and end tag.
 */
public class XmlInputFormat extends  TextInputFormat {

  public static final String START_TAG_KEY = "xmlinput.start";
  public static final String END_TAG_KEY = "xmlinput.end";

    @Override
    public RecordReader<LongWritable,Text> createRecordReader(InputSplit is, TaskAttemptContext tac)  {    
        return new XmlRecordReader();

    }
  public static class XmlRecordReader extends RecordReader<LongWritable,Text> {
    private  byte[] startTag;
    private  byte[] endTag;
    private  long start;
    private  long end;
    private  FSDataInputStream fsdatainstream;
    private  DataOutputBuffer buffer = new DataOutputBuffer();
    private LongWritable key = new LongWritable();
    private Text value = new Text();


        @Override
        public void initialize(InputSplit is, TaskAttemptContext tac) throws IOException, InterruptedException {
            FileSplit fileSplit= (FileSplit) is;
            startTag = tac.getConfiguration().get(START_TAG_KEY).getBytes("utf-8");
            endTag = tac.getConfiguration().get(END_TAG_KEY).getBytes("utf-8");

            
                start = fileSplit.getStart();
                end = start + fileSplit.getLength();
                Path file = fileSplit.getPath();

                FileSystem fs = file.getFileSystem(tac.getConfiguration());
                fsdatainstream = fs.open(fileSplit.getPath());
                fsdatainstream.seek(start);
        }

        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
             if (fsdatainstream.getPos() < end) {
        if (readUntilMatch(startTag, false)) {
          try {
            buffer.write(startTag);
            if (readUntilMatch(endTag, true)) {            
            value.set(buffer.getData(), 0, buffer.getLength());
            key.set(fsdatainstream.getPos());
                   return true;
            }
          } finally {
            buffer.reset();
          }
        }
      }
      return false;
        }

        @Override
        public LongWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
        }

        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
                   return value;
        }

        @Override
        public float getProgress() throws IOException, InterruptedException {
            return (fsdatainstream.getPos() - start) / (float) (end - start);
        }

        @Override
        public void close() throws IOException {
            fsdatainstream.close();
        }
        private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
      int i = 0;
      while (true) {
        int b = fsdatainstream.read();
        // end of file:
        if (b == -1) return false;
        // save to buffer:
        if (withinBlock) buffer.write(b);

        // check if we're matching:
        if (b == match[i]) {
          i++;
          if (i >= match.length) return true;
        } else i = 0;
        // see if we've passed the stop point:
        if (!withinBlock && i == 0 && fsdatainstream.getPos() >= end) return false;
      }
    }
  }    
}


To run this code, include the necessary jar files (jdom.jar,hadoop-core.jar) and you also need to make a single jar file. You can find the instructions to make a single jar file on the following link

http://java.sun.com/developer/technicalArticles/java_warehouse/single_jar/ 

Next, give the following command on the terminal to run the job.
hadoop jar xmlreader.jar /user/root/Data/file.xml output/

Conclusion:

In this way, we can process large amount of xml files using hadoop and Mahout XML input format.

Reference:
Some points are from xmlandhadoop blog.