Airflow ETL for moving data from Postgres to Postgres
29 Jul 2018
You need to move data from a PostgreSQL table to another. It could be a table from the same or a different database.
The easiest way would be to use the psycopg library to grab the data from the source using a normal cursor and then
intert it into the destination table:
This would work fine for small amounts of data. But it wouldn’t be performant if the size of the table is too big. The Airflow worker
would load all the data retrieved from the query into memory before loading it into the destination table, making it difficult if you are
copying GB’s or even TB’s of data. For that case, we can use a server side cursor instead.
This type of cursor doesn’t fetch all the rows at once, but instead it uses batches.
For example, let’s say you want to grab all the users created on a certain day and copy them to another table on a daily basis.
You could use this simple PythonOperator:
As you can see, we are grabbing the users that were created on the execution date. This gives us more control when running backfills.
Also, notice that since we’re using a server side cursor, we have to iterate until the cursor doesn’t contain any more rows. I’m
using a batch size of 2000 as an example.
Finally, psycopg2 provides a nice method called execute_values that allows us to insert the whole batch at once.
Keep in mind that if you have several Dags and tasks doing the same thing, it’s probably more convenient to encapsulate the logic and write a custom Operator. On the other hand,
the transformations in between the loads could vary a lot, so it can become a bit trickier to reuse the code. It’s up to you to decide.