In a previous post, I went through the setup for running Airflow in Docker on Windows. That example just used some simple Python operators, but I wanted to run through an example that interacted with a database. In this case, the database is a Postgres DB running on my laptop, and the Airflow instance in Docker can interact with it.

Connection:
We’ll need to edit the default Postgres connection in Airflow. In the Web GUI, we can go to Admin, then Connections. For the postgres_default record, we’ll edit:
Set host = host.docker.internal
Set database, port, login, password to correct values for your Postgres instance.

Database:
We’ll set up a simple table for the database operations:

create table insert_test (
    id int GENERATED ALWAYS AS IDENTITY NOT NULL PRIMARY KEY,
    name text NOT NULL,
    created_date timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);

DAG:
We’ll create a DAG and use the Postgres Operator to insert some data into our table. It’s much like the Python example. The postgres_conn_id value will point to the connection we edited earlier, and the SQL holds the statement that we want to execute. We could also use a script location here instead.

step_1 = PostgresOperator(
 task_id='step_1',
 postgres_conn_id='postgres_default',
 dag=test_dag,
 sql = "INSERT INTO insert_test (name) VALUES ('Name1');"
)

Here’s the complete DAG:

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator

test_dag = DAG(
 dag_id='test_dag', 
 default_args={"start_date" : "2024-05-06"}
)

step_1 = PostgresOperator(
 task_id='step_1',
 postgres_conn_id='postgres_default',
 dag=test_dag,
 sql = "INSERT INTO insert_test (name) VALUES ('Name1');"
)

step_2 = PostgresOperator(
 task_id='step_2',
 postgres_conn_id='postgres_default',
 dag=test_dag,
 sql = "INSERT INTO insert_test (name) VALUES ('Name2');"
)

step_1 >> step_2

Once we run the workflow, we’ll check back in our insert_test table and see that two records have been inserted.

Links:
Airflow – How-to Guide for PostgresOperator

Stack Overflow – Allow docker container to connect to a local/host postgres database