Airflow ETL for Google Sheets and PostgreSQL
01 Jul 2018
Imagine you have a table with sales that looks like this:
Now imagine that those sales still have to be closed by a sales person after they enter this table.
One typical problem that can arise from this process, is that the status of the sale may be managed in a different source. For example an Excel sheet or in the case for this tutorial, on a Google spreadsheet.
The spreadsheet could be something like:
This way, every time a sale enters the database, maybe the sales team sees this new sale on some internal dashboard and adds it to this spreadsheet. Then, they reach out the client and update the status accordingly.
This type of data integration is a very common problem inside companies. Luckily, we have several tools we could use to tackle this issue in an efficient and clean manner. This means, easy to monitor, easy to maintain and readable.
There are several technologies out there that can help you solve this problem. In my case I chose Airflow. So I’ll show you how to write a simple Airflow DAG to extract the data from the spreadsheet to a csv file, filter the columns we need, and send it to a Postgres database.
Google Drive access
In order to access the Google Drive API, we need to download a credentials file. You can follow the steps mentioned in the official documentation. The process is pretty straightforward and you should end up with a client_secret.json file which is later used to authorize the first request. After that, another credentials.json file will be automatically downloaded and all the subsequent requests will use this file instead.
Let’s create a basic script to download our sheet as a csv file and to authorize the application. We can do that with the following snippet:
A couple of dependencies you’ll need to run this script are: google-api-python-client and oauth2client.
Don’t lose the credentials.json file, you’ll need it later so Airflow can access the API without have to go through the browser authorization process.
Let’s create a table to hold the data we’ll extract from the spreadsheet:
Later you can cross the information from this table with your original sales table in order to integrate the data.
We are ready to write our Airflow DAG. If you are not familiarized with the Airflow concepts, check out the tutorial for a quick pipeline definition.
First, we need to import some dependencies:
We can use pretty much keep all the default configuration from the tutorial example, but let’s adjust the start date to yesterday, so
the DAG runs only once when testing it:
Make sure you adjust your start_date and schedule_interval to suit your needs. In this case, if I turn on the DAG, it will run once for every day since the start date until now.
The first operator will by a PythonOperator. It will use a function to download the spreadsheet as a csv file. We’ll use the same code we used to authorize the application the first time, but now we have the credentials.json file so we don’t need to authorize again via the web browser.
I created a sales folder in the dags directory and moved both the client_secret and credentials json files into it. The DAG script has to be in that same folder.
This method will download the spreadsheet as a statuses.csv file and will put it in the same folder.
We are going to need two more python functions for other operators. One to select the columns we need, which in this case are sale_id and status and another one to load the resulting csv file into the target table:
Finally, let’s create another task to truncate the table. Since this spreadsheet could be randomly updated, it’s easier to just truncate the table and reload the entire dataset. For that, let’s do something different and take advantage of the PostgresOperator.
First, add a connection for a new Postgres database using the Airflow connections menu:
Remember the connection id for later use.
Now we can define the tasks using the Airflow operators:
Since these tasks depend on each other, we need to declare the order in which they’ll run:
The complete script should look like this:
Now, if you go to the Airflow dashboard, you should see your Dag in the list.
You can toggle the off button of the Dag to start it, and hit refresh a couple of times until it finishes:
As you can see the four tasks were run successfully, and since my start date was yesterday, it only ran once and will keep running once a day.
If you go to your sales dags directory, you should see both the statuses and the final csv file:
client_secret.json credentials.json final.csv __pycache__ sales_etl.py statuses.csv
These files will be overwritten every time the dag runs.
And if you check your sale_statuses table, you should see the data that has been loaded:
And that’s it! I’m still learning a lot about Airflow and ETL’s in general so if you have any comments or suggestion, you can leave a message below.