Working Relationally With Cassandra
I’ve spent the last 4 years working in the big data world with Cassandra because it’s the only practical solution if you have a requirement to scale out, uptime is a priority, and you need predictable performance. I’ve heard different ways of describing where Cassandra fits in your architecture, but I think the best way to think of it is close to your customer. Think of the servers your mobile apps communicate with or what holds your product inventory.
Part of the reality of having a linearly scalable system is that we acknowledge that a thing calls physics actually exists. Networks are not infinitely fast, hardware does fail, and even light must obey the speed limit. These are non perfect conditions that we must design our distributed systems around and there’s very little to suggest some cloud provider is going to find a way around it.
One of the design decisions of Cassandra was to favor denormalization (copying data to multiple places) over expensive network calls to perform joins. Logically it makes sense - a single request is going to be a lot faster than thousands of requests for data across potentially hundreds of machines with thousands of network round trips.
We usually tell people to model their data to satisfy the queries they want to perform. If you’re coming from a relational background, this may seem awkward at first. It requires that when you’re doing your initial data model you understand your queries up front.
Tip: To learn more about data modeling with Cassandra I recommend watching DS201: DataStax Enterprise Foundations of Apache Cassandra on DataStax Academy.
Of course, knowing all your queries up front isn’t always possible. While I’ve always found it to be reasonably straightforward to model tables for OLTP purposes, there’s usually a need to do data exploration or intense analytics across large datasets. In the past I’ve done things like ETL my Cassandra data out to another system which is built to handle these situations. This worked fine at the time but it came at the cost of maintaining a separate tool and process to shove the data in there. There’s also the time between ETL processes, where your analytics queries are done on stale data.
Fortunately in the last couple of years Spark has risen in popularity, and for a really good reason. Fast in-memory processing with support for functional programming, R-like dataframes, SQL, and machine learning all make Spark a great choice for distributed processing of big data.
In the last 2 years there’s been a lot of work done on the spark-cassandra-connector, available on GitHub as an open source project from DataStax (disclosure, I work at DataStax) to make Spark work really well with Cassandra.
In this post we’ll take a look at how to use the spark-sql with the connector to issue queries to our Cassandra cluster.
I’m using a Spark 1.6.1 cluster and Cassandra 3.6, but the techniques in this post will work with Cassandra 2.1.5 and greater. The dataset I’m referencing in this post is Cassandra 3.0 or greater as it uses the date
type only available in Cassandra 3.0 or greater.
I have loaded the movielens-small
dataset via Cassandra Dataset Manager (cdm). You can read up on cdm in my previous blog post.
If you’d like to follow along, simply do the following to install the movielens_small dataset to your local cassandra dev environment.
pip install cassandra-dataset-manager
cdm install movielens-small
For this post, I’m going to connect to my cluster using spark-sql. This runs the Hive metastore service in local mode and lets us execute queries input from the command line. In a later post I’ll explore other alternatives to connect from applications.
From the command line, let’s start spark-sql. I’ve built my connector from source, so I’m using the –jars flag. If you didn’t go this route you’d probably use --packages spark-cassandra-connector
instead:
spark-sql --jars spark-cassandra-connector-assembly-1.6.0-M2-25-gb476e45.jar --conf spark.cassandra.connection.host=127.0.0.1 --master spark://127.0.0.1:7077
First we register temporary tables. We’ll use the same names as the tables in our movielens_small schema. For reference, here’s our movies table, as per cqlsh:
cqlsh:movielens_small> desc movies;
CREATE TABLE movies (
id uuid PRIMARY KEY,
avg_rating float,
genres set<text>,
name text,
release_date date,
url text,
video_release_date date
)
Within spark-sql, you can register this table, referencing the Cassandra connector, keyspace and table as follows. I’ll go ahead and register our ratings_by_movie table as well. I’ve cleaned up the output a little to try to minimize confusion.
spark-sql> CREATE TEMPORARY TABLE movies
USING org.apache.spark.sql.cassandra
OPTIONS (
table "movies",
keyspace "movielens_small",
cluster "Test Cluster",
pushdown "true"
);
Time taken: 1.823 seconds
spark-sql> CREATE TEMPORARY TABLE ratings_by_movie
USING org.apache.spark.sql.cassandra
OPTIONS (
table "ratings_by_movie",
keyspace "movielens_small",
cluster "Test Cluster",
pushdown "true"
);
Time taken: 0.15 seconds
We can now execute SQL queries against these tables. For instance, let’s calculate the average rating per movie and return the top 5 movies in our database:
spark-sql> SELECT name, avg(rating) as ar
FROM ratings_by_movie
JOIN movies ON ratings_by_movie.movie_id = movies.id
GROUP BY name
ORDER BY ar DESC LIMIT 5;
Santa with Muscles (1996) 5.0
Aiqing wansui (1994) 5.0
Saint of Fort Washington, The (1993) 5.0
Someone Else's America (1995) 5.0
Entertaining Angels: The Dorothy Day Story (1996) 5.0
Time taken: 1.553 seconds, Fetched 5 row(s)
Cool, we can JOIN, aggregate, and sort our Cassandra data. Sadly this small dataset doesn’t accurately reflect the real world, as we all know Santa with Muscles was total crap and is obviously not one of the best movies of all time.
Fortunately our Cassandra data model has the avg_rating
handy so we don’t have to calculate this on every query. We’ll use it in the next example.
Let’s do something a little more interesting. Let’s pull back the top 3 movies in each genre. For that we’ll need to use the explode()
function to produce a row per genre a movie is in, then rank()
, an aggregation function that’s built for this exact use case.
SELECT * from (
SELECT name, avg_rating, explode(genres) as genre, rank()
OVER (PARTITION BY genre ORDER BY avg_rating DESC) as rank
FROM movies
) tmp
WHERE rank <= 3
ORDER BY genre, rank;
The results come back immediately:
Star Wars (1977) 4.3584905 Action 1
Godfather, The (1972) 4.283293 Action 2
Raiders of the Lost Ark (1981) 4.252381 Action 3
Star Kid (1997) 5.0 Adventure 1
Star Wars (1977) 4.3584905 Adventure 2
Raiders of the Lost Ark (1981) 4.252381 Adventure 3
...
High Noon (1952) 4.1022725 Western 1
Wild Bunch, The (1969) 4.023256 Western 2
Butch Cassidy and the Sundance Kid (1969) 3.949074 Western 3
unknown 3.4444444 unknown 1
Good Morning (1971) 1.0 unknown 2
Time taken: 1.608 seconds, Fetched 60 row(s)
Hopefully at this point you feel comfortable with the idea of firing up the spark-sql shell, registering temporary tables, and performing SQL queries against them.
If you’re more comfortable working with a vendor, check out DataStax Enterprise, where all this works out of the box without any extra configuration.
If you found this post helpful, please consider sharing to your network. I'm also available to help you be successful with your distributed systems! Please reach out if you're interested in working with me, and I'll be happy to schedule a free one-hour consultation.