Migrating From MySQL to Cassandra Using Spark
MySQL is a popular choice for new projects. It’s a flexible database that’s easy to set up and start querying. There’s loads of documentation, examples and frameworks it works with, such as Wordpress, Pandas, Ruby on Rails, and Django.
From the above paragraph it reads like a pretty fantastic database, and at small scale it can be great. The problem arises when you need to scale past a single server or have high availability needs. MySQL’s solution to both of these needs is replication. Replication is ok at handling read heavy workloads in a single datacenter, but it falls on it’s face under heavy writes or if you need multiple datacenters. Fortunately Cassandra excels at scalability and high availability. It’s a common story for people to migrate from a relational database to Cassandra for one or both of these reasons. (For further reading on choosing Cassandra even with small datasets read Matt Kennedy’s Little Big Data article)
Let’s pretend we run a site which lists movies and lets our users rate them. We’ve got some interesting metrics we let users dig into. We’re starting to get popular and we want to have multiple data centers around the world. This just isn’t feasible without a dedicated, specialized team, so we turn to Cassandra, which has this out of the box and is easy to set up.
We’ll create a schema based on the movielens dataset. It’s pretty straight forward. We’ve got movies and ratings. Movies have multiple genres. We have to split those out into a separate table since it’s a 1:many relationship. There’s a many:many relationship as well, users to movies. We need to be able to query this table multiple ways. I’ve imported the latest dataset into MySQL, which is composed of 27K movies and 22 million ratings. Mmmmm… delicious data.
In an ideal world, we’d do our migration with minimal risk and zero downtime. I wrote a post on how this is possible in June 2014. I’ll assume going forward that we’ll be migrating our data this way and won’t explicitly discuss it at each step.
Initial Schema
Here’s our movies table. It’s our source of reference table for our movie data, and is pretty simple. It’s just a movie_id
and a title. We’ve got 30K movies in here.
mysql> desc movies;
+----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| movie_id | int(11) | NO | PRI | NULL | |
| title | varchar(255) | YES | | NULL | |
+----------+--------------+------+-----+---------+-------+
2 rows in set (0.00 sec)
mysql> select count(*) from movies;
+----------+
| count(*) |
+----------+
| 27303 |
+----------+
1 row in set (0.01 sec)
Our genre table is simple. I’ve created an index so we can easily query for movies in a particular genre or get all the genres a movie is in.
mysql> desc genre;
+----------+--------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+----------+--------------+------+-----+---------+-------+
| movie_id | int(11) | NO | PRI | 0 | |
| genre | varchar(255) | NO | PRI | | |
+----------+--------------+------+-----+---------+-------+
2 rows in set (0.01 sec)
It’s pretty easy to select all the movies in a genre:
mysql> select * from movies join genre using (movie_id)
where genre = 'Adventure' limit 5;
+----------+---------------------+-----------+
| movie_id | title | genre |
+----------+---------------------+-----------+
| 1 | Toy Story (1995) | Adventure |
| 2 | Jumanji (1995) | Adventure |
| 8 | Tom and Huck (1995) | Adventure |
| 10 | GoldenEye (1995) | Adventure |
| 13 | Balto (1995) | Adventure |
+----------+---------------------+-----------+
5 rows in set (0.00 sec)
If we want to get a list of the genres for a movie we can do something like this:
mysql> select * from movies join genre using (movie_id)
where movie_id = 1;
+----------+------------------+-----------+
| movie_id | title | genre |
+----------+------------------+-----------+
| 1 | Toy Story (1995) | Adventure |
| 1 | Toy Story (1995) | Animation |
| 1 | Toy Story (1995) | Children |
| 1 | Toy Story (1995) | Comedy |
| 1 | Toy Story (1995) | Fantasy |
+----------+------------------+-----------+
5 rows in set (0.00 sec)
We’d like to know how many movies are in each genre. Using GROUP BY in SQL is straightforward:
mysql> select genre, count(genre) as c from genre
group by (genre)
order by c desc limit 5;
+----------+-------+
| genre | c |
+----------+-------+
| Drama | 14287 |
| Comedy | 9115 |
| Thriller | 4654 |
| Romance | 4386 |
| Action | 3924 |
+----------+-------+
5 rows in set (0.05 sec)
I’ll note that the above query can be very expensive when there’s a lot of data. The solution to this sort of thing is to either use a movies_per_genre
table or to cache our data somewhere else. Either way, in a popular system, live aggregations are typically a bad idea and should be avoided.
Prerequisites
Before we get started, there’s a few things we’re going to need if you want to follow along or perform a migration of your own. I strongly recommend reading my post on DataFrames with Spark as this post is a little more advanced and I won’t be covering all the basics again. I also recommend at least a basic understanding of Cassandra data modeling as I won’t be covering it in depth in this post.
First, download the MySQL JDBC connector from MySQL.com. Put it somewhere, make a note of the exact location.
Next, you’ll need the DataStax Cassandra Connector for Spark. I’ve cloned from the repo and have built it using sbt assembly
. You’ll want the full path to spark-cassandra-connector-assembly-1.5.0-M1-SNAPSHOT.jar
.
You’ll also want to have a Spark cluster running. It’s possible to do this on a local cluster but unless you’ve got a ton of RAM or a tiny dataset, you’re going to hit some performance issues, or just flat out run out of memory. Don’t do that.
It should go without saying you also need a MySQL server your migrating off of, and a Cassandra cluster you’re migrating to.
Starting PySpark
You’ll need to ensure that the Cassandra and MySQL libraries are included when you launch your Spark job. Unfortunately there’s a bug with PySpark where it doesn’t correctly pick up the libraries from –jars so you’ll have to list them as I have below. Here’s how I start the PySpark CLI in a Jupyter notebook (note the , separating in –jars and the : separating –driver-class-path). The MySQL JAR and the Cassandra JAR are in my current directory.
export PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook"
pyspark --jars spark-cassandra-connector-assembly-1.5.0-M1-SNAPSHOT.jar,mysql-connector-java-5.1.36-bin.jar \
--driver-class-path "spark-cassandra-connector-assembly-1.5.0-M1-SNAPSHOT.jar:mysql-connector-java-5.1.36-bin.jar" \
--conf spark.cassandra.connection.host=127.0.0.1 \
--master spark://127.0.0.1:7077
If you want to use Jupyter notebooks, you’ll need to install all the requirements for iPython notebooks.
Substitute your path to your JARS and spark master IP address for mine. I realize it’s an annoying wall of text, just throw it in a shell script, don’t bother trying to memorize it at this point.
Cassandra Schema
In the Cassandra world, we model our tables to be optimized to answer specific queries. If you haven’t looked at Cassandra in a while you may be surprised to see an SQL-like language, called CQL, which defines how our tables are defined. We get the benefits of a structured schema and a scalable distributed database. Awesome. As I said before, I won’t get too deep into the details of data modeling here; if you want to learn you should watch Cassandra Core Concepts on DataStax Academy.
Cassandra has a much more flexible type system than MySQL, supporting Maps, Sets, and Lists. We know when we get a movie we want to see the genre. It’s more efficient to include the list of a movie’s genres with the movie itself, so we’ll make use of Cassandra’s sets. We’ll also store the average rating and total ratings of the movie. When someone rates a movie, we can go ahead and update the average as well as the total.
CREATE TABLE movies (
movie_id int primary key,
title text,
avg_rating decimal,
total_ratings int,
genres set<text>
);
I mentioned above we wanted to be able to get a list of all the movies in a genre. That’s easy to define in CQL and also extremely efficient. We’ll use what’s called a clustering key to make sure that all our movies in a particular genre are stored together.
create table genre_to_movies (
genre text,
movie_id int,
name text,
primary key (genre, movie_id)
);
A feature from the current movie site is our ability to know how many movies are in each genre. We’ll store that in Cassandra as a table. When we add new movies, we’ll have to update the count in this table.
CREATE TABLE movies_per_genre (
genre text primary key,
movies int
);
We need to store ratings per movie, but we also need to be able to query for all the movies a user has rated. We’ll create 2 tables and store this two ways. This might make you nervous - isn’t it wasteful to store the same data twice? You don’t have to do this with a relational database, right?
WRONG. That’s exactly what indexes do. People always forget that indexes come at a pretty serious cost. Fortunately in Cassandra, writes are really cheap and doing 2 writes whenever someone rates a movie is still going to be a lot faster than how a relational database would do things.
create table ratings_by_movie (
movie_id int,
user_id int,
rating decimal,
ts int,
primary key(movie_id, user_id)
);
create table ratings_by_user (
user_id int,
movie_id int,
rating decimal,
ts int,
primary key(user_id, movie_id)
);
Using this data model we can easily answer both “who rated this movie” and “which movies has this user rated” and do so efficiently.
Preparing
Here we’re going to set up our environment. That means getting ahold of DataFrames that we’re going to need throughout this migration, and massaging our data to make it easier to store when we’re ready. I’ve fired up an iPython notebook using that terrifying pyspark
command listed above.
Like good Python citizens, we put all our imports at the top. We’ll need these later on.
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql import Row
from pyspark.sql import functions as F
Our trusty SQLContext. We need it to use DataFrames. You can read more about DataFrames in my previous blog post.
sql = SQLContext(sc)
We’re going to need to get a list of genres for each movie. This is known as collect_set
and collect_list
in Hive, but isn’t available in Spark DataFrames yet. There’s an open issue in the Spark Jira to add this in 1.6. For now, we’ll need to work with RDDs. There’s significant overhead here, but since this is a migration where we’re going to backfill data (a one time process) we can let it slide.
# we need to work with regular RDDS here because there's not an operation to
# aggregate values into a list
tuples = genres.rdd.map(lambda x: (x[0], x[1]))
def seq(x,y):
# seq takes the aggregate, in this case a list and the original type (string)
x.append(y)
return x
def comb(x,y):
# comb takes 2 of the new list type and combines them
x.extend(y)
return x
genres_sets_rdd = tuples.aggregateByKey(list(), seq, comb).\
map( lambda x: Row(movie_id=x[0], genres=x[1]) )
genre_sets = sql.createDataFrame(genres_sets_rdd)
The above code is a little tricky. What it does is allow us to aggregate a bunch of value in separate rows into a list of values in a single row. We temporarily use the RDD, then convert our data back into a DataFrame to be used optimally.
The original data had one row for each genre entry in a movie. It looked like this:
genres.show(5)
+——–+———+ |movie_id| genre| +——–+———+ | 1|Adventure| | 1|Animation| | 1| Children| | 1| Comedy| | 1| Fantasy| +——–+———+
We’ve collected all the genres and put them in a list, and the result looks like this:
genre_sets.head(5)
[Row(genres=[u’Comedy’], movie_id=131072), Row(genres=[u’Adventure’, u’Animation’, u’Children’, u’Comedy’, u’Fantasy’], movie_id=1), Row(genres=[u’Adventure’, u’Children’, u’Fantasy’], movie_id=2), Row(genres=[u’Comedy’, u’Romance’], movie_id=3), Row(genres=[u’Comedy’, u’Drama’, u’Romance’], movie_id=4)] When we start writing to Cassandra we’ll join this dataset against our original movies set, and store the genres with the movie data for performance.
Next we’ll calculate the statistics about each movie - specifically the average rating and the total number of ratings. We’re going to make use of the DataFrame groupBy to aggregate our stats by movie_id. We’ll also use the pyspark.sql.functions
module, specifically the functions that let us average and count. We’ll set aliases on those columns to match our cassandra schema.
avg_ratings = ratings.groupBy("movie_id").\
agg(F.avg(ratings.rating).alias("avg_rating"),
F.count(ratings.rating).alias("total_ratings"))
avg_ratings.show(10)
+——–+——————+————-+ |movie_id| avg_rating|total_ratings| +——–+——————+————-+ | 5031|3.2794117647058822| 34| | 81831|3.0833333333333335| 6| | 113831| 3.5| 1| | 431|3.6857084979236303| 9873| | 6831| 2.227272727272727| 11| | 70831| 3.5| 9| | 77231| 2.5| 3| | 115631| 0.5| 1| | 2231| 3.724941564691324| 7273| | 8631| 3.33125| 80| +——–+——————+————-+
Migration
Now that we’ve got all our data set up as DataFrames it should be easy to denormalize and save it out as whatever schema we’re interested in. We’ll use the Cassandra Connector to write out the data to our tables which I’ve created above.
Movies
First up will write out our new movies table. We’ll make use of the join()
operations in DataFrames, and pull in the list of genres and rating statistics. join()
works like it does in SQL, except here it’s expressed programatically. You could write actual SQL here as well, I just opted to use this. The select()
at the end is exactly like a SELECT fields
clause in a SQL statement, only pulling out the fields I’m interested in saving to Cassandra.
new_movies = movies.join(avg_ratings, "movie_id").\
join(genre_sets, "movie_id").\
select(movies.movie_id,
movies.title,
avg_ratings.avg_rating,
genre_sets.genres,
avg_ratings.total_ratings)
Let’s also give a quick spot check to see what we’ve got for data:
new_movies[new_movies.title.startswith("Back to the Future")].\
show()
+——–+——————–+——————+——————–+————-+ |movie_id| title| avg_rating| genres|total_ratings| +——–+——————–+——————+——————–+————-+ | 1270|Back to the Futur…|3.9144839785019387|ArrayBuffer(Adven…| 44097| | 2011|Back to the Futur…|3.4367573406658156|ArrayBuffer(Adven…| 20366| | 2012|Back to the Futur…| 3.240788975184694|ArrayBuffer(Adven…| 21116| +——–+——————–+——————+——————–+————-+
The output looks like crap here unfortunately but I think it’s good enough to recognize that it’s working. We’ll write out our DataFrame using the .write
call, ensuring to pass our Cassandra class to the format()
method. As long as the fields names and types in the DataFrame match the table’s fields and db types specified in options()
, it’ll just write everything out for you and you’re done with this part.
new_movies.write.format("org.apache.spark.sql.cassandra").\
options(keyspace="lens", table="movies").\
save(mode="append")
On the Cassandra side, in a single query, we can retrieve all the movie information we’re typically interested in, eliminating the need for a JOIN or aggregation:
cqlsh:lens> select * from movies where movie_id = 1270;
movie_id | avg_rating | genres | title | total_ratings
----------+--------------------+-----------------------------------+---------------------------+---------------
1270 | 3.9144839785019387 | {'Adventure', 'Comedy', 'Sci-Fi'} | Back to the Future (1985) | 44097
(1 rows)
Genres
We want 2 views into our genres, aside from the genres per movie, which we’ve covered above. First, all the movies in a particular genre. We can match the original genres table against our movies table, and write that out to our genre_to_movies
table, which we can quickly query.
genres.join(movies, "movie_id").\
write.format("org.apache.spark.sql.cassandra").\
options(keyspace="lens", table="genre_to_movies").\
save(mode="append")
Checking the table in the Cassandra CLI, we can relax. Jumamji made it in OK.
cqlsh:lens> select * from genre_to_movies
where genre = 'Adventure' limit 5;
genre | movie_id | title
-----------+----------+---------------------
Adventure | 1 | Toy Story (1995)
Adventure | 2 | Jumanji (1995)
Adventure | 8 | Tom and Huck (1995)
Adventure | 10 | GoldenEye (1995)
Adventure | 13 | Balto (1995)
(5 rows)
Our other view, the total number of movies in each genre, is trivial to write out:
# movies per genre
genres.groupBy("genre").\
agg(F.count(genres.movie_id).alias("movies")).\
write.format("org.apache.spark.sql.cassandra").\
options(keyspace="lens", table="movies_per_genre").save(mode="append")
Remember, most movies are in more than one genre so don’t get freaked out that the numbers don’t match up.
cqlsh:lens> select * from movies_per_genre ;
genre | movies
--------------------+--------
Musical | 1036
Comedy | 8378
Action | 3522
War | 1195
Horror | 2612
IMAX | 196
Mystery | 1516
Drama | 13355
Romance | 4131
Documentary | 2471
(no genres listed) | 250
Children | 1140
Thriller | 4183
Crime | 2941
Animation | 1027
Film-Noir | 330
Sci-Fi | 1743
Fantasy | 1412
Adventure | 2330
Western | 676
(20 rows)
User & Movie Ratings
Fortunately our ratings are already stored in a table that’s easy to migrate over. In fact, we can keep almost the same schema. We’ll write out 2 versions of the table, 1 optimized for each of the queries we’ll do.
ratings.write.format("org.apache.spark.sql.cassandra").\
options(keyspace="lens", table="ratings_by_movie").\
save(mode="append")
ratings.write.format("org.apache.spark.sql.cassandra").\
options(keyspace="lens", table="ratings_by_user").\
save(mode="append")
cqlsh:lens> select * from ratings_by_movie where movie_id = 1 limit 5;
movie_id | user_id | rating | ts
----------+---------+--------+------------
1 | 2 | 5.0 | 1084988984
1 | 4 | 5.0 | 938694800
1 | 9 | 4.0 | 864902809
1 | 12 | 2.0 | 858624872
1 | 16 | 3.0 | 863706156
(5 rows)
We can see the user with the user_id=2
has rated Toy Story. If we query for the opposite view, by user, we’ll see Toy Story shows up here:
cqlsh:lens> select * from ratings_by_user where user_id = 2 limit 5;
user_id | movie_id | rating | ts
---------+----------+--------+------------
2 | 1 | 5.0 | 1084988984
2 | 32 | 4.5 | 1084986679
2 | 48 | 2.0 | 1084986403
2 | 50 | 5.0 | 1084990478
2 | 110 | 4.5 | 1084990717
(5 rows)
If we wanted, we could have easily included the movie name with the ratings_by_user table, and in 1 query we could get a list of all the movies a person rated and include the movie information.
A notes on data modeling
I’ve used int
as a field above for primary key. This is fine if your data is coming from an external source, but if you’re currently generating auto incrementing ids you’ll want to move to UUIDs when you migrate to cassandra. This is outside the scope of this tutorial but when you’re doing your data model for Cassandra it’s something to be aware of.
Conclusion
Well at this point we’ve migrated a few tens of millions of rows out of MySQL. We can now set up a second data center and be ridiculously highly available. We’ll continue to rely on spark for any analytic queries we need. Maybe next we’ll add a recommendation engine to suggest movies to people based on what others like. We could mix in Spark Streaming to aggregate ratings on the fly or build interesting windowed dashboards. Hit me up on Twitter if you’ve got questions, I’m @rustyrazorblade
If you found this post helpful, please consider sharing to your network. I'm also available to help you be successful with your distributed systems! Please reach out if you're interested in working with me, and I'll be happy to schedule a free one-hour consultation.