Running Hadoop

From info319

Hadoop Setup - Installation & Configuration

Compile a test program

Save the following program into a file called WordCount.java straight into the Hadoop installation folder (e.g., /opt/hadoop-2.8.1 or C:\Programs\hadoop-2.8.1). (This is not pretty, but we want to keep things simple in this first exercise. If you have added HADOOP_HOME/bin to your PATH you can instead save the file, and run hadoop and hdfs, from anywhere on your computer.)

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Translate the program into bytecode:

  • Linux:
   bin/hadoop com.sun.tools.javac.Main WordCount.java
  • Windows:
   bin\hadoop com.sun.tools.javac.Main WordCount.java

(From now on, we mostly give Linux-command lines. Replace / with \ and so forth for Windows.)

Assemble the bytecode-classes into a jar-file like this:

   jar cf wc.jar WordCount*.class

Compiling via command line steps: [1] or [2] or [3] Solving Javac errors: [4]

Run the test program

Save these lines into a file called lines.txt in your Hadoop installation folder:

Deer Bear River
Car Car River
Deer Car Bear

Put the file into Hadoop Distributed File System (HDFS), in a new HDFS-folder called myin:

   bin/hdfs dfs -mkdir myin
   bin/hdfs dfs -ls
   bin/hdfs dfs -ls myin
   bin/hdfs dfs -put lines.txt myin
   bin/hdfs dfs -ls myin
   bin/hdfs dfs -cat myin/lines.txt

(If you were running Hadoop on a cluster and lines.txt was a large file, it would now be sharded across the cluster nodes.)

Run the WordCount program in Hadoop:

   bin/hadoop jar wc.jar WordCount myin myout

Look at the results in the new HDFS-folder called myout:

   bin/hdfs dfs -ls
   bin/hdfs dfs -ls myout
   bin/hdfs dfs -cat myout/*

This last command may output the contents of several files, some of them binary. A prettier output is therefore, e.g.:

   bin/hdfs dfs -cat myout/part-r-00000

You can also check the file contents out into a regular (non-HDFS) file on your operating system, e.g.:

   bin/hdfs dfs -get myout/part-r-00000 results.txt
   ls
   cat results.txt

Creating your own programs

Save these lines into a file called triples.txt in your Hadoop installation folder:

AbrahamAbel born 1992
AbrahamAbel livesIn duluth
AbrahamAbel phone 789456123
BetsyBertram born 1987
BetsyBertram livesIn berlin
BetsyBertram phone _anon001
BetsyBertram phone _anon002
_anon001 area 56
_anon001 local 9874321
_anon001 ext 123
_anon002 area 56
_anon002 local 1234789
CharleneCharade born 1963
CharleneCharade bornIn bristol
CharleneCharade address _anon003
CharleneCharade knows BetsyBertram
_anon003 street brislingtonGrove
_anon003 number 13
_anon003 postCode bs4
_anon003 postName bristol
DirkDental born 1996
DirkDental bornIn bergen
DirkDental knows BetsyBertram
DirkDental knows CharleneCharade

Upload them into a new HDFS folder:

   bin/hdfs dfs -mkdir myin2
   bin/hdfs dfs -put triples.txt myin2

Task 1: TripleSorter

Copy WordCount.java into a new file called TripleSorter.java, and change the class name accordingly. Using the old class as a starting point, write a program that maps triples according to their subject (the first word in each triple) and reduces them so that all triples with the same subject are output on the same line, i.e., the output should look something like this:

AbrahamAbel	<AbrahamAbel livesIn duluth> <AbrahamAbel phone 789456123> <AbrahamAbel born 1992>  
BetsyBertram	<BetsyBertram born 1987> <BetsyBertram livesIn berlin> <BetsyBertram phone _anon001> <BetsyBertram phone _anon002>  
CharleneCharade	<CharleneCharade knows BetsyBertram> <CharleneCharade address _anon003> <CharleneCharade born 1963> <CharleneCharade bornIn bristol>  
DirkDental	<DirkDental born 1996> <DirkDental knows CharleneCharade> <DirkDental knows BetsyBertram> <DirkDental bornIn bergen>  
_anon001	<_anon001 ext 123> <_anon001 local 9874321> <_anon001 area 56>  
_anon002	<_anon002 local 1234789> <_anon002 area 56>  
_anon003	<_anon003 postZone _anon004> <_anon003 street brislingtonGrove> <_anon003 number 13>  
_anon004	<_anon004 postCode bs4>  
_anon004	<_area004 postName bristol>

To compile and run, use these commands:

   bin/hadoop com.sun.tools.javac.Main TripleSorter.java
   jar cf ts.jar TripleSorter*.class
   bin/hadoop jar ts.jar TripleSorter myin2 myout2
   bin/hdfs dfs -cat trout/*
   bin/hdfs dfs -cat trout/part-r-00000

But before each rerun, remember to delete the output folder:

   bin/hdfs dfs -rm myout2/*
   bin/hdfs dfs -rmdir myout2

or more compactly (this deletes a folder and its contents recursively - be careful!):

   bin/hdfs dfs -rm -R myout2

Task 2: Refined TripleSorter

Extend your program so that each line contains not only triples with a resource name as a subject, but also the triples where it is the object (the last word in each triple). For example, because of the triples

   CharleneCharade knows BetsyBertram
   DirkDental knows BetsyBertram
   DirkDental knows CharleneCharade

the output should now include these lines:

BetsyBertram	<CharleneCharade knows BetsyBertram> <DirkDental knows BetsyBertram> <BetsyBertram born 1987> ...
CharleneCharade	<DirkDental knows CharleneCharade> <CharleneCharade knows BetsyBertram> <CharleneCharade address _anon003> ... 

Note that everything that counts as a resource name is capitalised in triples.txt: AbrahamAbel, BetsyBertram, CharleneCharade, and DirkDental. This Java regex-test may be useful:

   if (some_string.matches("[A-Z].*")) {
       ....
   }

Bonus Task: TripleSorter for blank nodes

This task is harder: the final output should not contain any lines for anonymous resources (blank nodes) such as _anon1, which start with an underscore. Instead, triples that involve an anonymous resource should be listed along with the appropriate named resource. For example, the triple

   _anon001 area 56

should occur on this line in the output

BetsyBertram	... <BetsyBertram phone _anon001> <_anon001 area 56> ...

To keep things simple, you can assume that anonymous resources are not nested (so that _anon001 does not contain further anonymous resources). (Solving the latter more general, hierarchical problem may require a chain of Hadoop MapReduce jobs, so that the output folder of one job is the input to the next.)

Now, you can go on to refine your TripleSorter to work with RDF using Jena Elephas.

Solutions to the Tasks

Task 1: TripleSorter

Here is a simple triple sorter:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TripleSorter {

  public static class TripleMapper
       extends Mapper<Object, Text, Text, Text>{

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      String s = itr.nextToken();
      String p = itr.nextToken();
      String o = itr.nextToken();
      String triple = "<" + s + " " + p + " " + o + "> ";
      context.write(new Text(s), new Text(triple));
    }
  }

  public static class ResourceReducer
       extends Reducer<Text,Text,Text,Text> {

    public void reduce(Text key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException {
      String triples = new String();
      for (Text val : values) {
	triples = triples.concat(val.toString());
      }
      context.write(key, new Text(triples));
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "triple sorter");
    job.setJarByClass(TripleSorter.class);
    job.setMapperClass(TripleMapper.class);
    job.setCombinerClass(ResourceReducer.class);
    job.setReducerClass(ResourceReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Task 2: Refined TripleSorter

To create the refined triple sorter you only need to add a single linke to the end of the map method (after the line context.write(new Text(s), new Text(triple));'):

      if (o.matches("[A-Z].*")) {
        context.write(new Text(o), new Text(triple));
      }

Bonus task: TripleSorter for blank nodes

This task is mainly to motivate Spark, which makes it much easier to remove the blank nodes.