Cassandra + PySpark DataFrames Revisted

A little while back I wrote a post on working with DataFrames from PySpark, using Cassandra as a data source. DataFrames are, in my opinion, a fantastic, flexible api that makes Spark roughly 14 orders of magnitude nicer to work with as opposed to RDDs. When I wrote the original blog post, the only way to work with DataFrames from PySpark was to get an RDD and call toDF().

Sound freaking amazing - what’s the problem?

Well, there’s a lot of overhead here. Getting an RDD back and transforming it to a DataFrame requires doing a query in the JVM, serializing about a gazallion objects to send to the Python virtual machine over the Java Gateway server, deserialize with Py4J, then reencode the entire thing and send back to the JVM. Yikes.

Working with DataFrames this way is awesome because they expose a very sensible API that’s also extremely performant. Instead of doing the filters and calculations on the Python side, they push predicates and commands into the JVM. We can avoid all the overhead of Python and still utilize the libraries we’re familiar with like matplotlib, Pandas, and Seaborn later on.

Let’s get our hands dirty

First, you’ll need to get the connector. You can do this from source or grab a prebuilt version I put up on S3:

wget https://s3.amazonaws.com/haddad.public/spark-cassandra-connector-assembly-1.4.0-M1-SNAPSHOT.jar

For my examples, I’ve set up a keyspace with some sample data.

create KEYSPACE training WITH replication =
   {'class': 'SimpleStrategy', 'replication_factor': 1};

use training;

create table user (
    user_id int primary key,
    name text,
    age int,
    favorite_foods set<text>
);

insert into user (user_id, name, age, favorite_foods)
    VALUES (1, 'Jon', 34, {'Bacon', 'Cheese'});
insert into user (user_id, name, age, favorite_foods)
    VALUES (2, 'Dani', 22, {'Wine', 'Kale', 'Pizza'});
insert into user (user_id, name, age, favorite_foods)
    VALUES (3, 'Patrick', 108, {'Pie', 'Steak', 'Muffins'});
insert into user (user_id, name, age, favorite_foods)  
    VALUES (4, 'Baby Luke', 1, {'Candy', 'Fear'});
insert into user (user_id, name, age, favorite_foods)
    VALUES (5, 'Larry', 10, {'Anger'});

I set up a zsh function to launch spark with the connector in the classpath. You’ll need to set SPARK_CONNECTOR_JAR to point to wherever you put the connector JAR. I also prefer to use iPython notebooks instead of a normal Python shell.

export PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook"

pys () {
	pyspark --jars $SPARK_CONNECTOR_JAR \
            --driver-class-path $SPARK_CONNECTOR_JAR \
            --conf spark.cassandra.connection.host=127.0.0.1 \
            --master spark://127.0.0.1:7077
}

Start your notebook up using pys in a shell. You’ll need to have a handful of python requirements installed. Start with iPython, tornado, pyzmq, jinja2, and jsonschema. I recommend using a virtualenv.

The first thing we’ll want to do is import and create an SQLContext. The SQLContext is the module that allows us to register DataFrames as tables.

from pyspark.sql import SQLContext
sql = SQLContext(sc)

When we want to read a DataFrame out of Cassandra, we use the SQLContext.read.format call. We pass the connector class, and call .load(), passing our keyspace and table.

user = sql.read.format("org.apache.spark.sql.cassandra").\
               load(keyspace="training", table="user")

We can now perform our filters, aggregations, joins, etc, on our DataFrames. It’s important to reiterate that the .where() pushes the predicate into the JVM for filtering.

adults = user.where(user.age > 21)

We can make PySpark perform all it’s queries and display results by calling .show():

adults.show()
+-------+---+--------------------+-------+
|user_id|age|      favorite_foods|   name|
+-------+---+--------------------+-------+
|      1| 34|ArrayBuffer(Bacon...|    Jon|
|      2| 22|ArrayBuffer(Kale,...|   Dani|
|      3|108|ArrayBuffer(Muffi...|Patrick|
+-------+---+--------------------+-------+

We can register a DataFrame as a temporary table, which allows us to perform SQL on it.

user.registerTempTable("user")
sql.sql("SELECT * from user where age > 21")

Let’s say we want to create a new table, and populate it from existing data. Let’s create an index that lets us look up our users by their age.

create table users_by_age (
    age int,
    user_id int,
    name text,
    primary key(age, user_id));

We can write out a DataFrame to a table by using .write(), with the Cassandra connector as the argument to .format().

user.select("age", "user_id", "name").write.\
    format("org.apache.spark.sql.cassandra").\
    options(table="users_by_age", keyspace="training").\
    save(mode="append")

Next steps

If you’ve got a mostly OLTP workload, Cassandra is a great database with a lot of momentum. Take the time to get familiar with CQL and Cassandra’s data modeling. Leverage the free DataStax Academy to learn from the experts. Once you understand Cassandra, adding on Spark lets you solve analytical challenges.

I’ve set up a training repo that’s an iPython notebook that you can use to get familiar with the various commands and features of Spark. Clone it and go through the examples. By the end you’ll be a PySpark pro.

If you found this post helpful, please consider sharing to your network. I'm also available to help you be successful with your distributed systems! Please reach out if you're interested in working with me, and I'll be happy to schedule a free one-hour consultation.