In this post I’ll walk through the process of reading in various plain text database files using Pandas, and then joining together the different DataFrames. All my work was done through an IPython notebook.

I decided to mess around with the labor statistics database that’s up on Amazon. My end goal was to save all the relevant information into Cassandra for future analysis with PySpark. If the files were bigger, I’d do all the initial loading with PySpark, but they’re pretty small and Pandas has a lot of functionality that’s still missing on the Spark side.

I launched an EC2 image in us-east, mounted the snapshot and rsynced the files to my desktop. It’s about 15GB.

The file formats here are a bit inconsistent. It’s mostly fixed width but for some reason one of them is tab delimited. Fortunately Pandas is pretty flexible and can load pretty much anything you’ve got.

Obviously in order to use Pandas, we have to import it:

import pandas
path = "/Users/jhaddad/datasets/labor/data/time.series/{}"

First we read in the series data. You can see how we only have an item_code, not the actual item. For some reason the names at the top of the file are tab separated and are completely inconsistent with the rest of the data. I’ll just provide the field names and skip the header row. Since this is a CSV-like file we can use read_csv and just tell it we’re using tabs. We get back a DataFrame, which is one of the Pandas core data types.

names = ["series_id", "area_code", "item_code", 
         "footnote_codes", "begin_year",
         "begin_period", "end_year", "end_period"]

series = pandas.read_csv(path.format("ap/ap.series"), 

series_id area_code item_code footnote_codes begin_year begin_period end_year end_period
0 APU0000701111 0000 701111 1980 M01 2008 M09
The file as a few oddities, one of them is a spacing issue. For some reason there was an extra space in the item_code. Rather than adjust my widths to work around it I ended up .map()ing a strip call to all the elements. We’ll also set the series_id as the index since it uniquely identifies each record.

# not sure why i'm getting extra spaces, cleaning that up
series["item_code"] = series["item_code"].\
                        map(lambda x: str(x).strip())
series.set_index("series_id", inplace=True)

Next we read in areas and set the index just like before. We need the index on the PK for when we do our join. Without identifying the key, the join won’t work.

area = pandas.read_fwf(path.format("ap/ap.area"), 
                       names=["area_code", "area_name"], 
area.set_index("area_code", inplace=True)

0000 U.S. city average
Reading in items is similar, we’ll set the index here on it’s key, item_code. Same rules apply as above in regards to setting the index on the primary key.

items = pandas.read_fwf(path.format("ap/ap.item"), 
                        widths=[7, 100], 
                        names=["item_code", "item_name"])
items.set_index("item_code", inplace=True)

701111 Flour, white, all purpose, per lb. (453.6 gm)
Time to join our tables. Below is the final result. You can see how we joined series against 2 tables. Quickly inspecting the top 5 results shows us the series, where in the US the data came from, the date range, and what we were looking at.

Remember: Make sure you’ve set indexes properly on all your tables.

result = series.join(area, on="area_code").\
                join(items, on="item_code")
result.head(5)[["area_name", "begin_year", 
                "end_year", "item_name"]]

area_name begin_year end_year item_name
APU0000701111 U.S. city average 1980 2008 Flour, white, all purpose, per lb. (453.6 gm)
APU0000701311 U.S. city average 1980 1981 Rice, white, long grain, precooked (cost per p…
APU0000701312 U.S. city average 1980 2008 Rice, white, long grain, uncooked, per lb. (45…
APU0000701321 U.S. city average 1980 1983 Spaghetti (cost per pound/453.6 grams)
APU0000701322 U.S. city average 1984 2008 Spaghetti and macaroni, per lb. (453.6 gm)
Just because it’s fun, let’s write out our dataset into Cassandra to use later on. I’ll use the object mapper provided by the DataStax native driver, cqlengine. All the imports we need:

from cassandra.cqlengine.connection import setup
setup([""], default_keyspace="labor")
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.columns import Text, Integer
from import sync_table

We’ll define a model and sync it to the database. Syncing a table is convenient, it’ll create or alter as required to match your definition:

class AveragePriceData(Model):
    series_id = Text(primary_key=True)
    footnote_codes = Text()
    item_name = Text()
    begin_year = Integer()
    end_year = Integer()
    area_name = Text()
    area_code = Text()
    item_code = Text()
    begin_period = Text()
    end_period = Text()

Now we just iterate over all the data and save it to Cassandra. I’m not doing much in the way of catching errors, just printing them out when they happen. You can see I still have to do a little cleanup.

for k, v in result.iterrows():
    vals = v.to_dict()
    vals["series_id"] = k
    except Exception as e:
        print e
        print vals
print "Created {}".format(k)

area_name is not a string {‘footnote_codes’: ‘ ‘, ‘item_name’: ‘Fuel oil #2 per gallon (3.785 liters)’, ‘end_year’: 1986, ‘area_name’: nan, ‘begin_year’: 1978, ‘area_code’: ‘A105’, ‘item_code’: ‘72511’, ‘series_id’: ‘APUA10572511 ‘, ‘begin_period’: ‘M01’, ‘end_period’: ‘M13’} Created APUA10572511

Alright - you should now be able to read data out of various text files, normalize it, fix basic errors, then join the data together. After you’ve joined it, if you need to save it off somewhere, it’s easy to iterate over the DataFrame and save it off however you need it.

Questions? Hit me up on Twitter, I’m rustyrazorblade.