Joining DataFrames With Pandas

In this post I’ll walk through the process of reading in various plain text database files using Pandas, and then joining together the different DataFrames. All my work was done through an IPython notebook.

I decided to mess around with the labor statistics database that’s up on Amazon. My end goal was to save all the relevant information into Cassandra for future analysis with PySpark. If the files were bigger, I’d do all the initial loading with PySpark, but they’re pretty small and Pandas has a lot of functionality that’s still missing on the Spark side.

I launched an EC2 image in us-east, mounted the snapshot and rsynced the files to my desktop. It’s about 15GB.

The file formats here are a bit inconsistent. It’s mostly fixed width but for some reason one of them is tab delimited. Fortunately Pandas is pretty flexible and can load pretty much anything you’ve got.

Obviously in order to use Pandas, we have to import it:

import pandas
path = "/Users/jhaddad/datasets/labor/data/time.series/{}"

First we read in the series data. You can see how we only have an item_code, not the actual item. For some reason the names at the top of the file are tab separated and are completely inconsistent with the rest of the data. I’ll just provide the field names and skip the header row. Since this is a CSV-like file we can use read_csv and just tell it we’re using tabs. We get back a DataFrame, which is one of the Pandas core data types.

names = ["series_id", "area_code", "item_code", 
         "footnote_codes", "begin_year",
         "begin_period", "end_year", "end_period"]

series = pandas.read_csv(path.format("ap/ap.series"), 
                         sep='\t', 
                         skiprows=1,
                         names=names)
series.head(1)
# not sure why i'm getting extra spaces, cleaning that up
series["item_code"] = series["item_code"].\
                        map(lambda x: str(x).strip())
series.set_index("series_id", inplace=True)

Next we read in areas and set the index just like before. We need the index on the PK for when we do our join. Without identifying the key, the join won’t work.

area = pandas.read_fwf(path.format("ap/ap.area"), 
                       widths=[4,100], 
                       names=["area_code", "area_name"], 
                       skiprows=2)
area.set_index("area_code", inplace=True)
area.head(1)
items = pandas.read_fwf(path.format("ap/ap.item"), 
                        widths=[7, 100], 
                        skiprows=2, 
                        names=["item_code", "item_name"])
items.set_index("item_code", inplace=True)
items.head(1)

Remember: Make sure you’ve set indexes properly on all your tables.

result = series.join(area, on="area_code").\
                join(items, on="item_code")
result.head(5)[["area_name", "begin_year", 
                "end_year", "item_name"]]
from cassandra.cqlengine.connection import setup
setup(["127.0.0.1"], default_keyspace="labor")
from cassandra.cqlengine.models import Model
from cassandra.cqlengine.columns import Text, Integer
from cassandra.cqlengine.management import sync_table

We’ll define a model and sync it to the database. Syncing a table is convenient, it’ll create or alter as required to match your definition:

class AveragePriceData(Model):
    series_id = Text(primary_key=True)
    footnote_codes = Text()
    item_name = Text()
    begin_year = Integer()
    end_year = Integer()
    area_name = Text()
    area_code = Text()
    item_code = Text()
    begin_period = Text()
    end_period = Text()
    
sync_table(AveragePriceData)        

Now we just iterate over all the data and save it to Cassandra. I’m not doing much in the way of catching errors, just printing them out when they happen. You can see I still have to do a little cleanup.

for k, v in result.iterrows():
    vals = v.to_dict()
    vals["series_id"] = k
    try:
        AveragePriceData.create(**vals)
    except Exception as e:
        print e
        print vals
        break
print "Created {}".format(k)

area_name <type ‘float’> is not a string {‘footnote_codes’: ’ ‘, ‘item_name’: ‘Fuel oil #2 per gallon (3.785 liters)’, ’end_year’: 1986, ‘area_name’: nan, ‘begin_year’: 1978, ‘area_code’: ‘A105’, ‘item_code’: ‘72511’, ‘series_id’: ‘APUA10572511 ‘, ‘begin_period’: ‘M01’, ’end_period’: ‘M13’} Created APUA10572511

Alright - you should now be able to read data out of various text files, normalize it, fix basic errors, then join the data together. After you’ve joined it, if you need to save it off somewhere, it’s easy to iterate over the DataFrame and save it off however you need it.

Questions? Hit me up on Twitter, I’m rustyrazorblade.

If you found this post helpful, please consider sharing to your network. I'm also available to help you be successful with your distributed systems! Please reach out if you're interested in working with me, and I'll be happy to schedule a free one-hour consultation.