Running Hbase Testing Utility On Windows

Written on March 8, 2018

The HBase Testing Utility is a vital tool for anyone writing HBase applications. It sets up (and tears down) a lightweight HBase instance locally to allow local integration tests. I’ve previously discussed this and how to use it with BDD Cucumber tests in this blog post, complete with working repo. However, it is not trivially easy to get working on Windows machines, nor is it documented anywhere.

In this blog post, I’ll show how to get the HBase Testing Utility running on Windows machines, no admin access required. There’s no accompanying GitHub project for this post, as it’s fairly short and generic. I’ll assume you already have a working HBase Testing Utility test that runs on Unix, and you want to port it to Windows.

  1. Download/clone Winutils. This contains several Hadoop versions compiled for Windows.
  2. Go to Control Panel, and find Edit environment variables for your account in System.
  3. Add the following user variables:
    • hadoop.home.dir=<PATH_TO_DESIRED_HADOOP_VERSION> (in my case, this was C:\Users\bwatson\apps\hadoop-2.8.3)
    • append %HADOOP_HOME%/bin to Path
  4. Before calling new HBaseTestingUtility(); the temporary HBase data directory needs to be set. Add System.setProperty("", "C:/Temp/hbase"); to the code. This path can be changed, but it’s important to keep it short. Using the JUnit TemporaryFolder or the default path results in paths too long, and shows an error similar to Failed to move meta file for ReplicaBeingWritten, blk_1073741825_1001, RBW.
  5. You may get the error:
     Caused by: java.lang.NoSuchMethodError:;)Lcom/google/common/hash/HashCode;*

    This is caused by different versions of Guava being pulled in by various dependencies. I resolved this by excluding Guava from my Hadoop dependencies, and forcing version 15.0 to be used. My Gradle dependencies sections looks like:

     dependencies {
         compile group: '', name: 'guava', version: '15.0'
         compile(group: 'org.apache.hbase', name: 'hbase-client', version: '1.4.2') {
             exclude group: '', module: 'guava'
         compile(group: 'org.apache.hbase', name: 'hbase-testing-util', version: '1.4.2') {
             exclude group: '', module: 'guava'
         testCompile group: 'junit', name: 'junit', version: '4.1.2'
         //Needed for JUnit
         testCompile group: 'org.hamcrest', name: 'hamcrest-all', version: '1.3'

Following these steps, the HBase Testing Utility should work on Windows machines.

Testing HBase Applications with BDD Integration Tests

Written on June 30, 2017

In this blog post, we’ll look at how it’s possible to easily test HBase applications using business-driven development (BDD) and integration tests. The accompanying code for this tutorial can be found on my GitHub.


Applications that rely on big data technologies are often challenging to test. They can require lots of confusing mocking and complex test frameworks, making it difficult to write useful tests. Far too often I’ve seen tests that, when painfully unravelled, are just mocks testing other mocks, while the business logic (the thing you actually want to test!) goes untested.

Wouldn’t it be great if you were able to easily test business logic against an locally-running instance of the big data technology you’re running, with minimal configuration or boilerplate? That’s where this tutorial comes in. We’ll combine the popular BDD tool Cucumber with HBase’s own integration testing tools to enable an HBase application to be easily and fully tested.

Cucumber in <100 words

Tools like Cucumber make it easy to test business logic - business analysts can write requirements in a Given, When, Then format (known as Gherkin) that developers can then turn into tests. For example, a Gherkin Scenario (a single test) may take the form:

Scenario: Add a new car to the system
  Given a car to be added with registration "AB56 XYZ", model "Vauxhall Vectra" and production year 2016
  When the car is added
  Then the car can be retrieved

If the test passes, the business analyst and client can sleep happily knowing that the developer has implemented exactly what they want. If the test breaks six months from now, any member of the team can quickly identify what has broken, because it’s written in plain English.

If we can write Cucumber tests that run as integration tests - over an actual instance of the technology we’re using - we can have high confidence that our application will work in production.

Integration Testing HBase

HBase has an easy-to-use integration framework called the HBaseTestingUtility. It quickly (~10s) spawns up a single-node HBase instance within the JVM in two lines of code:

HBaseTestingUtility hbaseUtility = new HBaseTestingUtility();

HBASE_UTILITY.getConnection() then provides the Connection we use to interact with HBase via the standard Java API.

Putting it All Together and Analysing the Application

Let’s now analyse the application I created to demonstrate how HBase applications can be BDD integration tested. Clone the application if you haven’t already:

git clone

Once inside the folder, run mvn clean test. In about 15s you should see BUILD SUCCESS - in that time, an actual HBase instance has been set up on your machine, data has been inserted and retrieved from it via our application, and it has all been wiped down. No need for Docker, VMs, or any manual effort!

The application itself is fairly trivial - users can pass a Car into CarWriter, and it is written to HBase, with the registration as the rowkey. As a result, the tests are straightforward - they just need to pass Cars in, and confirm that they look right when they’re retrieved. The CarWriter class that we’re testing looks like:

public class CarWriter {
    public CarWriter(Connection connection) {
        this.connection = connection;

    public void addNewCar(Car car) throws IOException {
        LOGGER.debug("Adding new car with registration {}", car.getRegistration());
        Table table = this.connection.getTable(TABLE_NAME);
        Put p = new Put(Bytes.toBytes(car.getRegistration()));
        p.addColumn(COLUMN_FAMILY, MODEL_COLUMN_QUALIFIER, Bytes.toBytes(car.getModel()));
        p.addColumn(COLUMN_FAMILY, PRODUCTION_YEAR_COLUMN_QUALIFIER, Bytes.toBytes(car.getProductionYear()));
        LOGGER.debug("Car successfully added");

Now moving onto our test classes, we use two features of JUnit’s Rules to get HBase working:

  • ExternalResource sets up an external resource (in this case the HBase test cluster) before any tests run , and then tears it down once all tests have finished.
  • ClassRule then tells the current test class that we want to use a given ExternalResource.

So we define our HBaseTestServer class as an ExternalResource:

public class HBaseTestServer extends ExternalResource {
    private HBaseTestingUtility hbaseUtility;

    protected void before() throws Exception {
        this.hbaseUtility = new HBaseTestingUtility();
        this.hbaseUtility.createTable(TableName.valueOf("cars"), "c");

    public Connection getConnection() {
        try {
            return this.hbaseUtility.getConnection();
        } catch (IOException e) {
            throw new RuntimeException("Unable to get HBase connection, has the test been initialised correctly?", e);

    protected void after() {
        if (this.hbaseUtility != null) {
            try {
            } catch (Exception e) {
                throw new RuntimeException("Unable to close HBase connection", e);

and then call it from the Cucumber runner TestRunner using the @ClassRule annotation:

public class TestRunner {
    public static HBaseTestServer HBASE_TEST_SERVER = new HBaseTestServer();

The @RunWith(Cucumber.class) annotation tells JUnit to run the test as a Cucumber test, which makes it look for .feature files and their corresponding Step definitions (in this case in AddCarScenarios) within the same package.

The Scenarios themselves are in add-cars.feature:

Feature: Adding cars to HBase

  Scenario: Add a new car to HBase
    Given a car to be added with registration "AB56 XYZ", model "Vauxhall Vectra" and productionYear 2016
    When the car is added
    Then the car is available in HBase

  Scenario: Add an old car to HBase
    Given a car to be added with registration "OLD 1", model "Rover" and productionYear 1970
    When the car is added
    Then the car is available in HBase

Notice how easy it would be to add a new test - even the business analyst could add one.

Finally, AddCarScenarios contains the code that performs each Step of the tests:

public class AddCarScenarios extends TestRunner {
    private Car inputCar;
    private Connection connection;

    public void setup() {
        this.connection = HBASE_TEST_SERVER.getConnection();

    @Given("^a car to be added with registration \"([^\"]*)\", model \"([^\"]*)\" and productionYear (\\d+)$")
    public void aCarToBeAdded(String registration, String model, int productionYear) {
        this.inputCar = new Car(registration, model, productionYear);

    @When("^the car is added$")
    public void theCarIsAdded() throws IOException {
        CarWriter carWriter = new CarWriter(this.connection);

    @Then("^the car is available in HBase$")
    public void theCarIsAvailableInHBase() throws IOException {
        Table table = this.connection.getTable(TableName.valueOf("cars"));
        Get get = new Get(Bytes.toBytes(this.inputCar.getRegistration()));
        Result result = table.get(get);
        Car returnedCar = convertResultToCar(result);
        assertEquals(inputCar, returnedCar);

    private Car convertResultToCar(Result result) {
        String registration = Bytes.toString(result.getRow());
        String model = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("model")));
        int productionYear = Bytes.toInt(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("productionYear")));
        return new Car(registration, model, productionYear);

To access HBase, we get the open Connection from the running HBase instance. The Car is added into HBase via the CarWriter, and the HBase Java API is then used to retrieve it and ensure it has been stored correctly.

Notice there are no mocks at all in this code! We’re running easy-to-understand test cases on an actual instance of HBase, and all in under 20 seconds.


By now you should have a good idea of how to easily BDD integration test HBase. I’ve intentionally kept the code as minimal as possible to allow it to be used as a base for future projects, so feel free to fork (or just copy and paste!). As an aside, this isn’t just limited to HBase - I’ve got Cucumber-based integration tests running in production on a system that uses HBase, Kafka, Solr and Zookeeper.

Kafka Streams and Drools - a lightweight real-time rules engine

Written on September 18, 2016


About a year ago I delivered a POC for a client, combining Spark Streaming and Drools to create a (near) real-time rules engine. Around the same time, Cloudera posted a blog in which they did the same thing. In both implementations, messages are delivered from an incoming Kafka topic to a Spark Streaming job which applies the Drools rules, and output is then placed onto an output Kafka topic.

However, this approach has downsides. Applying Drools rules to data isn’t something that requires an advanced streaming engine, and so Spark is arguably overkill unless it’s being used elsewhere in the solution. This is true of many of the ETL implementations of Spark that I see. Given that Cloudera’s solution uses Kafka to move data to and from the Spark Streaming pipeline, it would be useful if Kafka itself could apply the Drools, cutting out Spark entirely.

Kafka Streams

This is now possible thanks to the recent release of Kafka Streams. This new lightweight API (< 9000 lines of code) within Kafka allows data to be processed in real-time in the Kafka brokers themselves - no additional services need to be installed on the cluster, and no additional management or coordinator job is required. A full overview of Kafka Streams can be found on Confluent’s website. For me the key sentence in that post is

The gap we see Kafka Streams filling is less the analytics-focused domain these frameworks focus on and more building core applications and microservices that process data streams.

Clearly then Kafka Streams can provide a solution for allowing Drools to be applied in real-time, without the overhead and resource requirements of installing and maintaining Spark, especially in situations where Kafka is already being used as the messaging system.

The Application

The application I have built uses a Kafka Streams pipeline to read data from a Kafka topic, apply a simple rule (if the input String contains an e it prepends the message with 0), and write it to an output topic. The code for this application can be found on my GitHub. It contains an integration test which sets up a local Kafka and Zookeeper environment (this won’t work on Windows), which is probably the best starting point for investigating.

The central method of the code is:

public static KafkaStreams runKafkaStream(PropertiesConfiguration properties) {
    String droolsRuleName = properties.getString("droolsRuleName");
    DroolsRulesApplier rulesApplier = new DroolsRulesApplier(droolsRuleName);
    KStreamBuilder builder = new KStreamBuilder();

    String inputTopic = properties.getString("inputTopic");
    String outputTopic = properties.getString("outputTopic");
    KStream<byte[], String> inputData =;
    KStream<byte[], String> outputData = inputData.mapValues(rulesApplier::applyRule);;

    Properties streamsConfig = createStreamConfig(properties);
    KafkaStreams streams = new KafkaStreams(builder, streamsConfig);

    Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    return streams;

The application itself can be run by following the Confluent Quickstart guide, with the following amendments:

  1. Create the topics inputTopic and outputTopic (or change within the project before building it).
  2. Build the kafka-streams-drools project with mvn clean install, move the resulting fat JAR onto a node within the cluster, and then run it with java -cp kafka-streams-drools-0.0.1-SNAPSHOT-jar-with-dependencies.jar Do this step instead of executing the ./bin/kafka-run-class command.

If a Kafka environment is already present, only follow step 2.

Apache Flume Interceptors: Modifying Records

Written on August 5, 2015


Apache Flume offers interceptors as a way of modifying records (known in Flume as events) as they pass through a Flume channel. This is fairly well-documented in the official Flume documentation and in a handful of blog posts, but most centre on implementations that only change the event header. In this blog post I’ll be looking briefly at what Flume events are, and will then be showing an example of how to create a custom Flume interceptor that can modify an event body.

Flume Events

Flume events consist of two components: the header and the body. The event header is useful for storing key-value information that can define how to deal with the data - for example the in-built Host interceptor adds the hostname of the Flume agent’s host to the header. Flume configuration can then be used to send events to different channels based on which hostname is present.

A Flume event

The event body contains the event information itself - in most cases this will be a line of text converted into a byte array. In the event of an Avro source being used, the event header will contain the schema and the body will contain the Avro record itself.

Modifying the Event Body

Interceptor tutorials and those bundled with Flume mostly concentrate on how to modify or use the header. These aren’t much use if you want to modify the raw data records themselves as they flow through the channel. Uses of this include:

  • Adding additional information,
  • Data cleansing,
  • Filtering data based on information within the body.

Our example will look at the first situation. Let’s assume we want to add the time in nanoseconds at which each record is processed to the file. It’s worth noting here that Flume currently cannot guarantee order, so this can’t be used to work out an ordering. Code for this is included in my GitHub repository - see that if you want the full code, including unit tests and Maven POM.


This code is an adapted version of [org.apache.flume.interceptor.TimestampInterceptor]. The key differences are:

  • Enabling a configurable separator to be added through configuration,
  • Changing the timestamp from seconds to nanoseconds,
  • Appending the timestamp to the event body rather than adding it to the header.

The key method here is:

public Event intercept(Event event) {
   byte[] eventBody = event.getBody();
   event.setBody(appendTimestampToBody(eventBody, System.nanoTime()));
   return event;

in which the event body (a byte[]) is retrieved and modified.

The configurable separator is added through modifying the Builder class, which is responsible for instantiating the interceptor. The configure(Context context) method provides access to properties from the Flume .conf. We use this to retrieve the separator and pass it into the main intercept(Event event) method:

public Interceptor build() {
    return new TimestampBodyInterceptor(this.separator);

public void configure(Context context) {
    this.separator = context.getString(Constants.SEPARATOR);


To install this interceptor (assuming you have Flume installed and running):

  1. Check out this code: git clone
  2. Modify the <flume.version> property in pom.xml to correspond to your version of Flume
  3. Build it: mvn clean package
  4. Copy the jar generated in the target directory into <flume-home>/lib
  5. vim <flume-home>/conf/flume.conf (or whatever the config file is):
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =$Builder
a1.sources.r1.interceptors.i1.separator = ,

Restart Flume and your data should now be appearing in the sink location with the timestamp appended to each record.

Apache Crunch Toolkit #3: Secondary Sort in Apache Crunch

Written on June 29, 2015

Secondary sorting is one of the most common requirements in data processing, and a staple of MapReduce. Let’s look at Apache Crunch’s support for it. The Apache Crunch User Guide does include some useful information on this, and an example is provided in the Crunch source code, but there isn’t really a good walkthrough anywhere.

What is Secondary Sort?

Secondary sorting means that a group operation is performed on one field in the key but another component of the key is sorted on. This enables reducers to receive sorted values. Let’s take a look at an example. Say we have some (very simple) HTTP logs showing who within a network has visited which domains. Each time someone visits a domain, it is appended to this list.

internalIp  externalDomain

Now we’ve been assigned the problem of working out how many unique domains each person has visited. Maybe the most obvious solution would be to group by internal IP and then count how many distinct values exist. However that is very risky and inefficient as it requires us to store in memory the complete set of domains visited by an internal IP. For each new internal IP we iterate over, we have to then see if it’s in that set, and add it if it’s not. Once we’ve exhausted all values for that key we can sum the length of the list. Not ideal!

Let’s introduce secondary sorting to solve this problem. If we group by internal IP but sort by external domain, we will get domains in alphabetical order following the grouping operation (custom sorting algorithms can also be implemented). For example, our value iterator for the key in the data above would now look like:

To calculate the number of distinct domains visited we now just need to simply store the last domain we saw, and only increase the distinct count if the next domain we see is different. We avoid having to maintain a set of domains in memory.

Implementing Secondary Sort in Apache Crunch

My complete code for secondary sort can be found on my GitHub page. Let’s start by discussing the main SecondarySortRunner class and the outline of the code.

Pipeline pipeline = new MRPipeline(SecondarySortRunner.class, getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
// Parse the log data into the correct format to be accepted by
// secondary sort
PTable<String, Pair<String, String>> parsedLogTable = lines.parallelDo(
		new HttpLogProcessor(),Writables.tableOf(Writables.strings(),
// Performs the secondary sort
PCollection<String> output = SecondarySort.sortAndApply(parsedLogTable,
		new CountUniqueDomains(), Writables.strings());
pipeline.writeTextFile(output, outputPath);

There are two key parts to the secondary sort operation: that which creates parsedLogTable and that which creates output. Basically, parsedLogTable is created by the HttpLogProcessor method, which parses our raw input data into the format which is accepted by Crunch’s secondary sort calculator. We then call the Crunch library method SecondarySort.sortAndApply() to perform the secondary sort.

public void process(String line,
		Emitter<Pair<String, Pair<String, String>>> emitter) {
	splitString = line.split(TAB_SEPARATOR);
			Pair.of(splitString[1], NULL_STRING)));

This class is responsible for parsing our raw input data. Crunch requires that data be a Pair<String,Pair<String,String>> if it is to be processed by SecondarySort. The key of the Pair is the primary group key - in this case the internal IP address. The first element of the value Pair is the secondary sort field - the field to be sorted on - in this case the domain. The second element is the value to be pulled through as normal. We leave this null as we don’t use it.

As a disclaimer, this code has no error handling and should not be used in any kind of production environment. I just prefer to keep it short to aid in understanding.

public void process(Pair<String, Iterable<Pair<String, String>>> input,
		Emitter<String> emitter) {
	this.domainsVisited = 0L;
	this.currentDomain = "";
	this.previousDomain = "";

	// Looping through each external domain. Domains are sorted so a count
	// can be maintained rather than a set
	for (Pair<String, String> pair : input.second()) {
		this.currentDomain = pair.first();
		if (!this.currentDomain.equals(this.previousDomain)) {
			this.previousDomain = this.currentDomain;
	emitter.emit(input.first() + StringUtils.COMMA_STR
			+ this.domainsVisited);

This method demonstrates where we get the advantage of secondary sort. We don’t have to maintain a set of domains here, and can instead rapidly iterate over our values, increasing a counter when the domain changes. We finish by emitting the internal IP and a count of the number of domains visited by it.