Airflow task to refresh PostgreSQL Materialized Views 06 Jul 2018

The problem

You need more performance from some PostgreSQL query. This could be an OLAP oriented query that doesn’t need to respond with the most updated data. This is a common scenario for analytic queries.

A solution

A solution could be to create a Materialized View, which will store both the data generated by the query and the query itself. Then, you can just refresh the view and the new data will replace the old content.

Now, after you create your materialized view, you are in charge of schedule when it will be refreshed. You could use a simple Cron, but maybe you want to use something a bit more sophisticated like Airflow. This could be achieved via a simple PostgresOperator:

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2018, 7, 3),
    "email": ["pabloacuna88@gmail.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG("refresh_sales_matviews", default_args=default_args, schedule_interval='@daily')

t1 = PostgresOperator(
    task_id='refresh_sales_table',
    postgres_conn_id="my_conn_id",
    sql="refresh materialized view sales",
    database="my_db",
    dag=dag)

This simple DAG will run a refresh command on a daily basis. In order to use the PostgresOperator, first you need to create a connection using the Airflow connections tab:

Airflow connections

And then use that connection id for the PostgresOperator postgres_conn_id parameter. You can execute any SQL command by using this operator.

Thanks for reading!