Airflow is an Apache project that allows you to build, schedule, and manage workflows. The pipelines and activities are defined as Python code, instead of using a graphical tool like SSIS. Airflow was created at Airbnb, but it’s now open source. Airflow has a web interface that can be used to view the workflows, as well as schedule and troubleshoot them.
Install and Setup:
If you’re running Windows (as I am), it’s possible to install Airflow. You’ll need Python 3.8 (or higher) and the Windows Subsystem for Linux. Here are some instructions for install.
However, I’m going to run Airflow in Docker, it seems to be a more straightforward way.
Open the Docker Desktop to start the Docker daemon. Search for the apache/airflow Docker image, and pull the image. You can do this in Docker Desktop, or at the command line with this command:
docker pull apache/airflow
Once you have the image, you can start it with this command:
docker run -d -p 8080:8080 apache/airflow standalone
-d is for detach, which will run the container in the background. 8080 is the default port, so the -p flag will match the container to your host PC’s port. Standalone is an Airflow command, that will run an “all-in-one” copy of Airflow.
In your browser, you can go to localhost:8080 and login to get to the Airflow web interface. It looks like doing this will trigger the admin account to be setup.
There should be one container running, so we can run
docker ps
to see all of the running containers, and get the name. Running
docker logs container_name
will display the log records for that container (Substituting container_name for the actual name of your container).
You can search the container logs for a line ‘Airflow is ready’. Underneath that will be a line that starts with ‘Login with username’ that will give you the username (admin) and the password for that user.
Concepts and Terminology:
The workflows are described as DAGs, which stands for Directed Acyclic Graph. So the workflow is a graph, a set of nodes or activities. The graph is directed, so the flow moves from one activity to the next, and the graph is acyclic, it has a beginning and an end and doesn’t go in a loop.
We choose from Operators, which are units that can complete a certain action, like run Python code, or send an email. An operator gets instantiated as a Task in the workflow.
Sensors are operators that will wait until a set condition is true, like a file being copied to a given location.
DAG:
We can create a simple DAG just to get a grasp of how a flow works. We’ll have some Python print commands and string them together.
We’ll start with some import statements. We’ll need to import DAG, the Python operator, and the Python datetime library for one of the print statements.
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime
Next we’ll define our DAG, naming it test_dag.
test_dag = DAG(
dag_id='test_dag',
default_args={"start_date" : "2024-05-03"}
)
We’ll define some Python functions, just some simple print commands.
def step_start():
print ('Start')
def step_date():
print (datetime.now)
def step_end():
print ('End')
We’ll take these functions and use these to define three tasks. python_callable will point to the function to call, and dag refers to test_dag we set up earlier.
step_start = PythonOperator( task_id='step_start', python_callable=step_start, dag=test_dag ) step_date = PythonOperator( task_id='step_date', python_callable=step_date, dag=test_dag ) step_end = PythonOperator( task_id='step_end', python_callable=step_end, dag=test_dag )
As the last step, we’ll specify the order that the tasks run in.
step_start >> step_date >> step_end
We can save everything into one Python file. Here’s the code for the entire DAG.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
test_dag = DAG(
dag_id='test_dag',
default_args={"start_date" : "2024-05-03"}
)
def step_start():
print ('Start')
def step_date():
print (datetime.now)
def step_end():
print ('End')
step_start = PythonOperator(
task_id='step_start',
python_callable=step_start,
dag=test_dag
)
step_date = PythonOperator(
task_id='step_date',
python_callable=step_date,
dag=test_dag
)
step_end = PythonOperator(
task_id='step_end',
python_callable=step_end,
dag=test_dag
)
step_start >> step_date >> step_end
Running:
We’ll need to get our code file into Docker, into the dags directory under airflow. We can use the docker cp command.
docker cp test.py brave_newton:/opt/airflow/dags
test.py is the DAG file, and brave_newton is the container name.
We can then go back to the Web Admin page and refresh. Our DAG should come up momentarily. We can start the job manually by clicking the play button under Actions. Once it has completed, we’ll see the status of the job on this same screen.
Next Steps:
This was a very simple example, just to get familiar with Airflow. I’ll come back in a later post and have Airflow interact with a database.
Links: