Apache Crunch Toolkit #2: Viewing Pipeline Execution Plan Visualisations

Written on June 20, 2015

What is a pipeline execution plan visualisation?

Apache Crunch has in-built support for creating dot files which visually show how Crunch pipelines are executed under-the-hood. For example, a simple operation involving a GroupByKey() and a count() may produce the following diagram:

A pipeline execution plan visualisation for Apache Crunch

Why are they useful?

Crunch can sometimes provide several different ways to solve a problem, and it can be difficult to know which solution is best without delving into source code. Viewing a graph representation of the execution plan allows you to ensure that the job is being processed in the way you expect. In future blogs on Crunch I’ll be looking at different ways in which problems can be solved and using these graphs often.

How do I create a pipeline execution plan visualisation?

Simply add an output directory for the file to be stored in into the Configuration() object - this currently only works with the MRPipeline. I am using Crunch version 0.11.0-hadoop2:

...
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    conf.set("crunch.planner.dotfile.outputdir", "/tmp/crunch-demo/dot/");
    ...

and run the job. A file will now be created in that location.

How do I view the visualisation?

There are several options for viewing .dot files. Graphviz appears to be the most popular tool, and there is also the browser-based Erdos. However I have experienced some issues with Erdos being unable to display more complicated graphs.

Apache Crunch Toolkit #1: Easily Run Jobs in Apache Crunch

Written on June 4, 2015

Introduction to Apache Crunch

Apache Crunch is an incredibly useful Hadoop tool for extracting away the boilerplace produced by Java MapReduce jobs. Instead of clunky map() and reduce() methods, jobs are created as pipelines, similarly to Cascading. I’ve written a summary of Cascading vs Java MapReduce here, and the majority of the discussion also applies to Crunch. There’s also a great discussion of Cascading vs Crunch over at Quora - basically Cascading is good for jobs using basic data types with straightforward functionality, whereas Crunch is useful for more complex data types and algorithms.

Crunch has some pretty good credentials: it recently became a top-level Apache project and it’s used in production by Spotify. Its in-built support for Avro is fantastic, and it provides enough control that it’s possible to still write highly-efficient operations (with custom comparators) where required.

For the basics, read the official Apache Crunch Getting Started page. I found it to be very useful, but I think it misses one step that makes working with Crunch a breeze. That is the ability to easily run jobs. The Getting Started page requires you to have a Hadoop environment, and then requires you to spend time getting input data onto the HDFS, getting the Jar onto a job submission node… For someone just wanting to try out Crunch, it’s unnecessary and could prove to be a turn-off. In facy, using Hadoop’s local mode (which runs the job in a single JVM), it’s possible to run a Crunch job incredibly easily and on a local machine. Here’s how.

This will only work with UNIX-based operating systems - sorry Windows users!

Using Local Mode with Apache Crunch

I’ve placed my modified fork of the official Crunch demo in my GitHub repository. I have modified one class (WordCount.java) and created another (WordCountTest.java). Simply fork as you would the Getting Started demo:

git clone https://github.com/benwatson528/crunch-demo.git

To then run the pipeline (assuming /tmp/crunch-demo/output/ is a directory that can be written to), execute:

mvn package

And you’ve run a Crunch job! Alternately you can import the job into your favourite IDE and run the WordCountTest.java unit test. It’s that simple! Let’s look at how this works.

public static void main(String[] args) throws Exception {
	Main(args, new Configuration());
}

/**
 * Having main() call this method means that we're able to grab exit codes
 * within unit tests.
 */
public static int Main(String[] args, Configuration conf) throws Exception {
	return ToolRunner.run(conf, new WordCount(), args);
}

Here my only changes are to modify the main() method and create a Main() method. The Main() method returns an integer based on the success of the job and so by extracting it from the main() method we’re able to grab these integers.

package com.example;

import static org.junit.Assert.assertEquals;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.junit.Before;
import org.junit.Test;

public class WordCountTest {
	Configuration conf;
	// An input file residing in /src/test/resources
	String sampleInput = Thread.currentThread().getContextClassLoader()
			.getResource("sample-text.txt").getPath();
	// The output is placed on my local file system
	String outputFolder = "/tmp/crunch-demo/output/";

	@Before
	public void setup() {
		FileUtils.deleteQuietly(new File(outputFolder));
		this.conf = new Configuration();
		// Runs any MapReduce jobs locally
		this.conf.set("mapreduce.framework.name", "local");
		// Uses the local file system instead of the HDFS
		this.conf.set("fs.defaultFS", "file:///");
	}

	@Test
	public void testWordCount() throws Exception {
		String[] args = new String[] { this.sampleInput, this.outputFolder };
		int result = WordCount.Main(args, this.conf);
		assertEquals(0, result);
	}
}

First we need to give the job input. sampleInput simply retrieves the path to a file in /src/test/resources/ - in this case it’s just a sample text file I found online. outputFolder refers to the machine’s local file system. You can feel free to pipe the output into a location within the project directory; I’m just demonstrating that you can use any part of your local file system.

Next we use setup() to clear down the output folder and tell the job to run locally and using files on the local file system.

Finally the job itself is run from testWordCount(). args is constructed in the same way that it would be if the job was run using the standard hadoop jar syntax. WordCount.Main() kicks off the job and returns a 0 if the job has run successfully. That’s all there is to it!

I should point out that it’s unlikely a production-quality unit test would be written in this way. Here, our only verification that the job has run successfully is the return of an integer.

Summary

This technique has many advantages. It makes testing a pipeline a one-second affair rather than a painful “build->scp->clear output directory->run” process. Additionally, nothing here is being mocked or hidden. A real Crunch job is being run, albeit on one node. If it runs in this framework it will run on a cluster. That means there’s no need to hold your breath when running a new job on a cluster only to find that your unit tests have failed to test core Hadoop functionality.

It’s also worth mentioning MemPipeline here. It enables Crunch pipelines to be run in-memory, so is also a good candidate for unit tests. However it doesn’t have the full functionality of the MRPipeline, and I don’t see the point of having tests that run differently to how the real job would run, especially when it’s so easy to simulate the real thing.

Scalding vs Java MapReduce

Written on May 28, 2015

Introduction

A while ago I was fortunate enough to attend a great talk from Antonios Chalkiopoulos, one of the creators of Scalding. It’s basically a Scala interface on top of Cascading which purports to enable much faster development of MapReduce jobs by providing an easy abstraction. Additionally, it is much easier to use than Java MapReduce when it comes to chaining jobs - large workflows with multiple splits, merges and joins can be implemented, and the user doesn’t have to worry about writing code to deal with writing to and reading from disk inbetween stages.

Its word count example (taken from Scalding - Getting Started) is achingly concise:

import com.twitter.scalding._

class WordCountJob(args : Args) extends Job(args) {
  TypedPipe.from(TextLine(args("input")))
  .flatMap { line => line.split("""s+""") }
  .groupBy { word => word }
  .size
  .write(TypedTsv(args("output")))
}

That’s eight lines of code. The official Java MapReduce word count example is over fifty!

So RIP Java MapReduce?

Let’s not burn our Java MapReduce textbooks just yet… As a developer, I tend to use Java MapReduce when no other solution will work. Java MR might be old, but it is still the only tool that can solve certain large-scale problems. Sometimes development might take a while, but in the end I’m left with a job that does exactly what I want, and about as efficiently as I can get it on a Hadoop environment. This is achieved through several things, such as re-use of variables (i.e. never creating a variable inside a map or reduce method) and custom byte comparators.

Whenever I see a new tool offering a replacement to MapReduce (OK maybe Scalding isn’t so new any more), there are always three questions I ask:

1. Will it still enable a sufficient level of configurability?

To cut a long story short, yes. Scalding offers configurable source and tap creation, so reading or writing any format shouldn’t be a problem.

2. Will it be as efficient as Java MapReduce?

To answer this question, we really need to know what is going on under the hood. Currently, the official Cascading docs indicate that a Scalding flow will be transformed into a set of Java MapReduce jobs before being executed. So whilst the developer doesn’t have to deal with writing to disk after each stage, the job itself still does. We can’t really complain here though, as aside from the expected overhead from coding an abstraction this can’t make it any less efficient than Java Mapreduce.

There’s also good news on the horizon as Hortonworks have enabled Tez support for Cascading, and are currently in the process of enabling it for Scalding. This will enable much more efficient processing of jobs with multiple stages, as Tez is created to run exactly the kinds of workflows that Scalding can create, and removes the overheads of intermediate reducer writes, map reads and long job launch times.

Another efficiency issue is around Scala itself. With Java it’s possible to be very efficient by re-using variables, and being sure to never create a new variable within a mapper or reducer. Scala might not be able to offer that level of efficiency. Additionally, Benchmarks Game indicates that Scala is only about 80% as fast as Java.

The final main efficiency worry is around joining. In Java MapReduce it’s possible to implement byte comparators to ensure that joins are as efficient as needed. It doesn’t appear as though Scalding offers that level of configuration, which may be an issue for some jobs. However Scalding does come with several different types of joins built-in, so it should be possible to get decent performance in most circumstances.

3. Will developers be able to pick it up and understand it easily?

Scala isn’t a language that many people use on a regular basis. Combine that with the still relatively-uncommon knowledge of MapReduce, and it doesn’t look good for Scalding. It’s not much use converting 500 lines of Java MR into 15 beautiful lines of Scalding if you’re the only person in the development team who can understand it.

However, Scalding has some advantages. Scala is really accelerating in popularity, especially with its use in Apache Spark. This Typesafe survey indicates that 88% of Apache Spark users use Scala, and so it’s clearly a language worth leaning for anyone dealing with big data. It’s also pretty well-supported, and there are lots of good tutorials online for anyone who’s interested in learning it.

Scalding’s main advantage comes into play here too - it’s very easy to use it to write MapReduce jobs after picking up the basics. Chaining six MapReduce jobs together would be a nightmare in Java and would take days to implement for even simple operations, but in Scalding it’s almost effortless. Additionally, whilst it isn’t possible to write jobs as efficiently as in Java MapReduce, it’s a lot more difficult to write a job to be as inefficient as an inefficient MapReduce job. This will aid newer developers. It’s also much quicker to “sketch out” a job in Scalding than in Java, so would be very useful for planning jobs before they’re productionised.

Summary

Clearly Java MapReduce isn’t dead. For use cases requiring very efficient jobs, it’s the winner. However, Scalding looks very interesting for jobs requiring multiple stages that maybe don’t need to squeeze every ounce of efficiency from a job.

Using MultipleOutputs with ORC in MapReduce

Written on April 18, 2015

Introduction to ORC

ORC (Optimised Row Columnar) is a relatively new format being heavily pushed by the Hadoop community. It offers many useful optimisations and features for storing columnar data. Some of its key features include:

  • Predicate pushdown - only the columns you need are read into Hive queries, as opposed to the entire row,
  • MIN, MAX and SUM values of columns are stored within the data itself for rapid calculations,
  • Easy to use with Hive,
  • Small output files.

For more information on ORC, see Owen O’Malley’s Slideshare or Christian Prokopp’s article. For official documentation, see Apache ORC LanguageManual.

I’ve been using it with a client as a potential Apache Avro replacement, and I’ve been very impressed so far. However, it is still quite young - even by Hadoop terms - and so there isn’t a lot of content online for people wanting to use it. As such, I’ll be posting tutorials for new uses.

MapReduce MultipleOutputs with ORC Files

HadoopCraft has a great tutorial on using ORC as a MapReduce output. Outputting ORC files with more than one struct from a single MapReduce job is fairly straightforward, but requires some understanding of the ORC code.

Let’s assume that we want to output two different datasets - one representing a cat and the other representing a person. The relevant parts of the Reducer class would look something like:

public class MultipleORCReducer extends
    Reducer<Text, NullWritable, NullWritable, Writable> {
private static final String PERSON_OUTPUT_NAME = "personOut";
private static final String CAT_OUTPUT_NAME = "catOut";

private MultipleOutputs mOutputs;

private final OrcSerde orcSerde = new OrcSerde();
private List orcRecord;
private Writable row;

//ORC variables for the person
private final String personStruct = "struct<name:string,age:int>";
private final TypeInfo personTypeInfo =
    TypeInfoUtils.getTypeInfoFromTypeString(personStruct);
private final ObjectInspector personOip =
    TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(personTypeInfo);

//ORC variables for the cat
private final String catStruct = "struct<breed:string,colour:string>";
private final TypeInfo catTypeInfo =
    TypeInfoUtils.getTypeInfoFromTypeString(catStruct);
private final ObjectInspector catOip =
    TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(catTypeInfo);

@Override
protected void reduce(Text key Iterable values, Context context) {
	//For a person
	this.orcRecord = new ArrayList();
	this.orcRecord.add("Ben");
	this.orcRecord.add(25);
	this.row = orcSerde.serialize(this.orcRecord, personOip);
	this.mOutputs.write(PERSON_OUTPUT_NAME, NullWritable.get(), this.row);

	//For a cat
	this.orcRecord = new ArrayList();
	this.orcRecord.add("Tabby");
	this.orcRecord.add("ginger");
	this.row = orcSerde.serialize(this.orcRecord, catOip);
	this.mOutputs.write(CAT_OUTPUT_NAME, NullWritable.get(), this.row);
}

@Override
protected void setup(Context context) {
	this.mOutputs = new MultipleOutputs(context);
}

@Override
protected void cleanup(Context context) {
	this.mOutputs.close();
}
}

Notice that the only real difference between this and HadoopCraft’s tutorial for single outputs is that unique TypeInfo and ObjectInspector instances must be created for each different output.

The elements of the driver class related to outputs are:

private void prepareJob(Configuration conf, Job job) {
  //...
  conf.set("orc.create.index","true");
  OrcNewOutputFormat.setCompressOutput(job, true);
  OrcNewOutputFormat.setOutputPath(job, "<hdfs-output-location>");
  MultipleOutputs.addNamedOutput(
      job, PERSON_OUTPUT_NAME, OrcNewOutputFormat.class, NullWritable.class, Writable.class);
  MultipleOutputs.addNamedOutput(
      job, CAT_OUTPUT_NAME, OrcNewOutputFormat.class, NullWritable.class, Writable.class);
}

Again there’s nothing crazy going on here, just three lines which should be included in any ORC MapReduce job, and then the usual MultipleOutputs commands.