MapReduce

At present, long running queries are implemented by writing a python script that runs locally and talks to a single Cassandra node. This is problematic. It requires that the system executing the script stay running for the duration. The query itself is slowed down by Cassandra having to communicate over a slower network back to the system running the query. While we have a fast connection to the datacenter in Blue Fin, the same cannot be said for engineers working from other locations. Finally, a Cassandra cluster is composed of several nodes. Any one node might be responsible for data needed in the long running query we're trying to perform. By only talking to one node, we potentially create a lot of excess network chatter, and put excessive load on that node as it holds a large number of rows in memory.

There are a number of possible solutions to this problem, all of which revolve around Cassandra's Hadoop support. Some of these solutions were covered in depth at the March meetup of Cassandra London.

DataStax Enterprise

DataStax Enterprise has out of the box Hadoop integration. Enterprise replaces HDFS, the underlying database in Hadoop, with its own CassandraFS. This removes the need for a Hadoop Name Node and makes operations much simpler. DataStax OpsCenter also provides a nice web UI for managing Hadoop jobs.

Unfortunately, while DataStax provides a Community Edition, the Hadoop functionality is not present in it.

Hadoop

You can overlay a Hadoop cluster over the Cassandra nodes. The ideal configuration for this would include a separate server for the Name Node / Job Tracker, with a Task Tracker on each Cassandra node. A Task Tracker is what does the actual work, so running it on the Cassandra node gives you data locality. At least one node needs to be a data node for housekeeping purposes (HDFS?).

Tuning

Jairam suggests in his talk that you tune cassandra.range.batch.size down to avoid timeouts, and to tune rpc_timeout_in_ms up to avoid further timeouts.

Virtual analytics datacenter

You can run the Hadoop Task Trackers on a separate Cassandra cluster. You then tell Cassandra to use the NetworkTopologyStrategy to replicate data to the Cassandra/Hadoop cluster. This reduces load on the primary cluster, preventing runaway Hadoop jobs from having a detrimental impact on its performance.

Setup

Some documentation [ 1, 2 ] and the Cassandra High Performance Cookbook suggest putting a Task Tracker and Data Node on each Cassandra node. The Job Tracker and Name Node should be a single, separate machine.

This thread does a good job of explaining data locality in Hadoop and Cassandra.

Several talks advise using the multi-datacenter functionality of Cassandra to put Hadoop analytics in its own ring that's replicated from the main Cassandra ring. Tom Haddon has suggested that we will likely want to create a secondary Cassandra cluster anyway, for backup purposes, and that we could run the Hadoop analytics on top of that.

Task Tracker / Data Node (on Cassandra node)
  1. sudo add-apt-repository "deb http://archive.cloudera.com/debian maverick-cdh3 contrib"

  2. wget -O - http://archive.cloudera.com/debian/archive.key | sudo apt-key add - 

  3. sudo apt-get install hadoop-0.20-{datanode,tasktracker} openjdk-6-jre hadoop-pig

  4. Edit /etc/default/hadoop-0.20, adding:

    export JAVA_HOME="/usr/lib/jvm/java-6-openjdk-amd64/"
    export HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:$HADOOP_CLASSPATH
  5. Edit /etc/hadoop-0.20/conf/core-site.xml (10.55.60.140 is the name node):

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>fs.default.name</name>
        <value>hdfs://10.55.60.140:8020</value>
      </property>
    </configuration>
  6. Edit /etc/hadoop-0.20/conf/hdfs-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>dfs.replication</name>
        <value>1</value>
      </property>
    </configuration>
  7. Edit /etc/hadoop-0.20/conf/mapred-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>mapred.job.tracker</name>
        <value>10.55.60.140:8021</value>
      </property>
    <!-- From Cassandra High Performance Cookbook. -->
    <!-- "Co-locating Hadoop Task Trackers on Cassandra nodes" -->
    <!-- Limit the resources available to the Task Tracker -->
      <property>
        <name>mapred.tasktracer.reduce.tasks.maximum</name>
        <value>1</value>
      </property>
      <property>
        <name>mapred.tasktracer.map.tasks.maximum</name>
        <value>3</value>
      </property>
      <property>
        <name>mapred.child.java.opts</name>
        <value>-Xmx150m</value>
      </property>
    </configuration>
  8. sudo service hadoop-0.20-datanode start

  9. sudo service hadoop-0.20-tasktracker start

  10. Make sure you can access HDFS: sudo -u hdfs JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ hadoop fs -ls /

  11. Locally, set up an SSH tunnel to the task tracker web interface:
    ssh -N -L 50060:10.55.60.139:50060 10.55.60.139 # hadoop task tracker tunnel

Pig (on Task Tracker node)
  1. export PIG_CONF_DIR=/etc/hadoop-0.20/conf

  2. export PIG_HOME=/usr/lib/pig/

  3. export PIG_INITIAL_ADDRESS=10.55.60.139

  4. export PIG_RPC_PORT=9160

  5. export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner

  6. cd; wget http://apache.mirrors.timporter.net/cassandra/1.1.6/apache-cassandra-1.1.6-src.tar.gz

  7. tar zxvf apache-cassandra-1.1.6-src.tar.gz; cd apache-cassandra-1.1.6-src; ant

  8. chmod +x examples/pig/bin/pig_cassandra

  9. As a test (once the Name Node / Job Tracker is set up):
    export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/
    ./examples/pig/bin/pig_cassandra -x mapreduce
    grunt> rows = LOAD 'cassandra://crashdb/OOPS' USING CassandraStorage();
    grunt> counted = foreach (group rows all) generate COUNT($1);
    grunt> dump counted;

Name Node / Job Tracker
  1. sudo add-apt-repository "deb http://archive.cloudera.com/debian maverick-cdh3 contrib"

  2. wget -O - http://archive.cloudera.com/debian/archive.key | sudo apt-key add - 

  3. sudo apt-get install hadoop-0.20-{namenode,jobtracker} openjdk-6-jre

  4. Add export JAVA_HOME="/usr/lib/jvm/java-6-openjdk-amd64/" to /etc/default/hadoop-0.20

  5. Edit /etc/hadoop-0.20/conf/core-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>fs.default.name</name>
        <value>hdfs://10.55.60.140:8020</value>
      </property>
      <property>
        <name>hadoop.tmp.dir</name>
        <value>/hadoop/hadoop-${user.name}</value>
      </property>
    </configuration>
  6. Edit /etc/hadoop-0.20/conf/hdfs-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>dfs.replication</name>
        <value>1</value>
      </property>
    </configuration>
  7. Edit /etc/hadoop-0.20/conf/mapred-site.xml:

    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->
    <configuration>
      <property>
        <name>mapred.job.tracker</name>
        <value>10.55.60.140:8021</value>
      </property>
    </configuration>
  8. sudo -u hdfs JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ hadoop namenode -format

  9. sudo service hadoop-0.20-namenode start

  10. sudo -u hdfs JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ hadoop fs -chmod 777 /

  11. sudo service hadoop-0.20-jobtracker start

  12. Check to make sure it's running: w3m http://localhost:50070

  13. Locally, set up an SSH tunnel to the job tracker web interface:
    ssh -N -L 50030:10.55.60.140:50030 10.55.60.140 # hadoop job tracker tunnel
  14. Locally, set up an SSH tunnel to the name node web interface:
    ssh -N -L 50070:10.55.60.140:50070 10.55.60.140 # hadoop name node tunnel

Pig natively

Cassandra 1.1 adds native support for Pig. This means that Pig can directly read and write to Cassandra without involving Hadoop. Instructions for setting this up can be found here.

However, running Pig directly on Cassandra doesn't seem to give us jobs that run independent of our local machines.

From #cassandra on Freenode (20121126):

[14:38:13] <ev>  Is adding Hadoop job trackers on Cassandra nodes the only way to get M/R jobs running independent of a local machine? Is this goal not possible with just Pig talking straight to Cassandra natively using CassandraStorage?
[14:39:15] <nickmbailey>         no, you need a jobtracker
[14:39:20] <nickmbailey>         but only one
[14:39:50] <nickmbailey>         http://wiki.apache.org/cassandra/HadoopSupport
[14:39:53] <ev>  nickmbailey: excellent. Thank you. Does that also require HDFS?
[14:39:59] <ev>  for the job tracker state
[14:41:28] <ev>  err task tracker state
[14:42:52] <nickmbailey>         you need at least one data node somewhere as well yes
[14:43:08] <ev>  nickmbailey: thank you! That clarifies things quite a bit.
[14:43:20] <nickmbailey>         no problem

ErrorTracker/MapReduce (last edited 2012-11-29 11:42:58 by ev)