In a previous post, I went through the setup for running Airflow in Docker on Windows. That example just used some simple Python operators, but I wanted to run through an example that interacted with a database. In this case, the database is a Postgres DB running on my laptop, and the Airflow instance in Docker can interact with it.
Connection:
We’ll need to edit the default Postgres connection in Airflow. In the Web GUI, we can go to Admin, then Connections. For the postgres_default record, we’ll edit:
Set host = host.docker.internal
Set database, port, login, password to correct values for your Postgres instance.
Database:
We’ll set up a simple table for the database operations:
create table insert_test (
id int GENERATED ALWAYS AS IDENTITY NOT NULL PRIMARY KEY,
name text NOT NULL,
created_date timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
);
DAG:
We’ll create a DAG and use the Postgres Operator to insert some data into our table. It’s much like the Python example. The postgres_conn_id value will point to the connection we edited earlier, and the SQL holds the statement that we want to execute. We could also use a script location here instead.
step_1 = PostgresOperator(
task_id='step_1',
postgres_conn_id='postgres_default',
dag=test_dag,
sql = "INSERT INTO insert_test (name) VALUES ('Name1');"
)
Here’s the complete DAG:
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
test_dag = DAG(
dag_id='test_dag',
default_args={"start_date" : "2024-05-06"}
)
step_1 = PostgresOperator(
task_id='step_1',
postgres_conn_id='postgres_default',
dag=test_dag,
sql = "INSERT INTO insert_test (name) VALUES ('Name1');"
)
step_2 = PostgresOperator(
task_id='step_2',
postgres_conn_id='postgres_default',
dag=test_dag,
sql = "INSERT INTO insert_test (name) VALUES ('Name2');"
)
step_1 >> step_2
Once we run the workflow, we’ll check back in our insert_test table and see that two records have been inserted.
Links:
Airflow – How-to Guide for PostgresOperator
Stack Overflow – Allow docker container to connect to a local/host postgres database