At present, long running queries are implemented by writing a python script that runs locally and talks to a single Cassandra node. This is problematic. It requires that the system executing the script stay running for the duration. The query itself is slowed down by Cassandra having to communicate over a slower network back to the system running the query. While we have a fast connection to the datacenter in Blue Fin, the same cannot be said for engineers working from other locations. Finally, a Cassandra cluster is composed of several nodes. Any one node might be responsible for data needed in the long running query we're trying to perform. By only talking to one node, we potentially create a lot of excess network chatter, and put excessive load on that node as it holds a large number of rows in memory.
There are a number of possible solutions to this problem, all of which revolve around Cassandra's Hadoop support. Some of these solutions were covered in depth at the March meetup of Cassandra London.
DataStax Enterprise
DataStax Enterprise has out of the box Hadoop integration. Enterprise replaces HDFS, the underlying database in Hadoop, with its own CassandraFS. This removes the need for a Hadoop Name Node and makes operations much simpler. DataStax OpsCenter also provides a nice web UI for managing Hadoop jobs.
Unfortunately, while DataStax provides a Community Edition, the Hadoop functionality is not present in it.
Hadoop
You can overlay a Hadoop cluster over the Cassandra nodes. The ideal configuration for this would include a separate server for the Name Node / Job Tracker, with a Task Tracker on each Cassandra node. A Task Tracker is what does the actual work, so running it on the Cassandra node gives you data locality. At least one node needs to be a data node for housekeeping purposes (HDFS?).
Tuning
Jairam suggests in his talk that you tune cassandra.range.batch.size down to avoid timeouts, and to tune rpc_timeout_in_ms up to avoid further timeouts.
Virtual analytics datacenter
You can run the Hadoop Task Trackers on a separate Cassandra cluster. You then tell Cassandra to use the NetworkTopologyStrategy to replicate data to the Cassandra/Hadoop cluster. This reduces load on the primary cluster, preventing runaway Hadoop jobs from having a detrimental impact on its performance.
Setup
Some documentation [ 1, 2 ] and the Cassandra High Performance Cookbook suggest putting a Task Tracker and Data Node on each Cassandra node. The Job Tracker and Name Node should be a single, separate machine.
This thread does a good job of explaining data locality in Hadoop and Cassandra.
Several talks advise using the multi-datacenter functionality of Cassandra to put Hadoop analytics in its own ring that's replicated from the main Cassandra ring. Tom Haddon has suggested that we will likely want to create a secondary Cassandra cluster anyway, for backup purposes, and that we could run the Hadoop analytics on top of that.
Task Tracker / Data Node (on Cassandra node)
sudo add-apt-repository "deb http://archive.cloudera.com/debian maverick-cdh3 contrib"
wget -O - http://archive.cloudera.com/debian/archive.key | sudo apt-key add -
sudo apt-get install hadoop-0.20-{datanode,tasktracker} openjdk-6-jre hadoop-pig
Edit /etc/default/hadoop-0.20, adding:
export JAVA_HOME="/usr/lib/jvm/java-6-openjdk-amd64/" export HADOOP_CLASSPATH=/usr/share/cassandra/*:/usr/share/cassandra/lib/*:$HADOOP_CLASSPATH
Edit /etc/hadoop-0.20/conf/core-site.xml (10.55.60.140 is the name node):
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>fs.default.name</name> <value>hdfs://10.55.60.140:8020</value> </property> </configuration>
Edit /etc/hadoop-0.20/conf/hdfs-site.xml:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
Edit /etc/hadoop-0.20/conf/mapred-site.xml:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>10.55.60.140:8021</value> </property> <!-- From Cassandra High Performance Cookbook. --> <!-- "Co-locating Hadoop Task Trackers on Cassandra nodes" --> <!-- Limit the resources available to the Task Tracker --> <property> <name>mapred.tasktracer.reduce.tasks.maximum</name> <value>1</value> </property> <property> <name>mapred.tasktracer.map.tasks.maximum</name> <value>3</value> </property> <property> <name>mapred.child.java.opts</name> <value>-Xmx150m</value> </property> </configuration>
sudo service hadoop-0.20-datanode start
sudo service hadoop-0.20-tasktracker start
Make sure you can access HDFS: sudo -u hdfs JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ hadoop fs -ls /
- Locally, set up an SSH tunnel to the task tracker web interface:
ssh -N -L 50060:10.55.60.139:50060 10.55.60.139 # hadoop task tracker tunnel
Pig (on Task Tracker node)
export PIG_CONF_DIR=/etc/hadoop-0.20/conf
export PIG_HOME=/usr/lib/pig/
export PIG_INITIAL_ADDRESS=10.55.60.139
export PIG_RPC_PORT=9160
export PIG_PARTITIONER=org.apache.cassandra.dht.RandomPartitioner
cd; wget http://apache.mirrors.timporter.net/cassandra/1.1.6/apache-cassandra-1.1.6-src.tar.gz
tar zxvf apache-cassandra-1.1.6-src.tar.gz; cd apache-cassandra-1.1.6-src; ant
chmod +x examples/pig/bin/pig_cassandra
- As a test (once the Name Node / Job Tracker is set up):
export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ ./examples/pig/bin/pig_cassandra -x mapreduce grunt> rows = LOAD 'cassandra://crashdb/OOPS' USING CassandraStorage(); grunt> counted = foreach (group rows all) generate COUNT($1); grunt> dump counted;
Name Node / Job Tracker
sudo add-apt-repository "deb http://archive.cloudera.com/debian maverick-cdh3 contrib"
wget -O - http://archive.cloudera.com/debian/archive.key | sudo apt-key add -
sudo apt-get install hadoop-0.20-{namenode,jobtracker} openjdk-6-jre
Add export JAVA_HOME="/usr/lib/jvm/java-6-openjdk-amd64/" to /etc/default/hadoop-0.20
Edit /etc/hadoop-0.20/conf/core-site.xml:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>fs.default.name</name> <value>hdfs://10.55.60.140:8020</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/hadoop/hadoop-${user.name}</value> </property> </configuration>
Edit /etc/hadoop-0.20/conf/hdfs-site.xml:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration>
Edit /etc/hadoop-0.20/conf/mapred-site.xml:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>10.55.60.140:8021</value> </property> </configuration>
sudo -u hdfs JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ hadoop namenode -format
sudo service hadoop-0.20-namenode start
sudo -u hdfs JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64/ hadoop fs -chmod 777 /
sudo service hadoop-0.20-jobtracker start
Check to make sure it's running: w3m http://localhost:50070
- Locally, set up an SSH tunnel to the job tracker web interface:
ssh -N -L 50030:10.55.60.140:50030 10.55.60.140 # hadoop job tracker tunnel
- Locally, set up an SSH tunnel to the name node web interface:
ssh -N -L 50070:10.55.60.140:50070 10.55.60.140 # hadoop name node tunnel
Pig natively
Cassandra 1.1 adds native support for Pig. This means that Pig can directly read and write to Cassandra without involving Hadoop. Instructions for setting this up can be found here.
However, running Pig directly on Cassandra doesn't seem to give us jobs that run independent of our local machines.
From #cassandra on Freenode (20121126):
[14:38:13] <ev> Is adding Hadoop job trackers on Cassandra nodes the only way to get M/R jobs running independent of a local machine? Is this goal not possible with just Pig talking straight to Cassandra natively using CassandraStorage? [14:39:15] <nickmbailey> no, you need a jobtracker [14:39:20] <nickmbailey> but only one [14:39:50] <nickmbailey> http://wiki.apache.org/cassandra/HadoopSupport [14:39:53] <ev> nickmbailey: excellent. Thank you. Does that also require HDFS? [14:39:59] <ev> for the job tracker state [14:41:28] <ev> err task tracker state [14:42:52] <nickmbailey> you need at least one data node somewhere as well yes [14:43:08] <ev> nickmbailey: thank you! That clarifies things quite a bit. [14:43:20] <nickmbailey> no problem