Apache-AIRFLOW
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
https://www.youtube.com/watch?v=ZET50M20hkU
This one : https://www.youtube.com/watch?v=K9AnJ9_ZAXE
00:00 - Airflow Introduction
03:06 - Run Airflow in Python Env
10:44 - Run Airflow in Docker
17:55 - Airflow Basics and Core Concepts
21:55 - Airflow Task Lifecycle
26:19 - Airflow Basic Architecture
28:14 - Airflow DAG with Bash Operator
40:09 - Airflow DAG with Python Operator
45:04 - Data Sharing via Airflow XComs
52:53 - Airflow Task Flow API
57:56 - Airflow Catch-Up and Backfill
01:02:09 - Airflow Scheduler with Cron Expression
01:07:25 - Airflow Connection to Postgres
01:08:58 - Airflow Postgres Operator
01:19:30 - Airflow Docker Install Python Package 2 ways
01:29:34 - Airflow AWS S3 Sensor Operator
01:42:37 - Airflow Hooks S3 PostgreSQL
02:00:43 - Course Bonus
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
What is Apache Airflow ?
Apache Airflow is an open-source platform designed for orchestrating complex workflows and data pipelines. It allows you to define, schedule, manage, and monitor workflows as directed acyclic graphs (DAGs). Airflow was originally developed by Airbnb and was later open-sourced and contributed to the Apache Software Foundation.
Key features of Apache Airflow include:
Workflow Automation: Airflow enables you to automate and schedule complex workflows involving tasks that need to be executed in a specific sequence or based on certain conditions.
Directed Acyclic Graphs (DAGs): Workflows in Airflow are represented as DAGs, which consist of tasks and dependencies. Tasks can be anything from running a script, executing a SQL query, transferring files, or triggering external processes.
Task Dependencies: Airflow allows you to define dependencies between tasks, specifying the order in which tasks should be executed. Tasks can run in parallel or sequentially, based on their dependencies.
Dynamic Workload Management: Airflow's dynamic configuration capabilities enable you to create reusable templates for tasks and dynamically generate task instances with different parameters.
Extensibility: Airflow is highly extensible through custom operators and sensors. You can create your own operators to perform specific tasks or integrate with external services.
Scheduler: Airflow has a built-in scheduler that manages the execution of tasks based on the defined schedule and task dependencies.
Logging and Monitoring: Airflow provides a user interface (UI) that allows you to monitor the status of workflows, track task execution, view logs, and troubleshoot issues.
Integration with External Systems: Airflow can integrate with various external systems and databases, such as Amazon S3, Google Cloud Storage, relational databases, and more.
Scalability: Airflow can be configured to work in a distributed environment, allowing you to scale out the execution of tasks as your workflow demands grow.
Extensive Community and Ecosystem: Being open-source, Airflow has a vibrant community contributing to its development. It has a wide range of third-party plugins, connectors, and integrations with other tools.
Amazon Managed Workflows for Apache Airflow
-- Airflow is commonly used for tasks such as running ETL jobs ETL(Extract - Transform - Load)
-- Managing Machine Learning Pipelines
-- Automating DevOps Tasks
03:06 - Run Airflow in Python Env
First of all let's create a Python project and then install Airflow.
Airflow requires a python version above 3.6
Create a python environment : python3 -m venv py_env
The command python3 -m venv py_env is used to create a virtual environment for Python projects using the built-in venv
module in Python 3. A virtual environment is a self-contained directory
that houses its own Python interpreter and isolated libraries. This is
particularly useful when working on multiple projects with different
dependencies or when you want to avoid conflicts between system-wide
Python installations and project-specific requirements.
source py_env/bin/activate
Next we are going to install Apache Airflow locally.
Official : Apache Github repo: https://github.com/apache/airflow
pip install 'apache-airflow==2.7.0' \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.8.txt"
There was a missing of GCC . There is an instruction which asks us to install command line tools.
Again we execute the pip install command again
pip install 'apache-airflow==2.7.0' \
--constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.8.txt"
This time there is no error. This time the Airfow is installed successfully.
......Next ....Initialize the database for Airflow
Before that we need to integrate the Airflow home directory , by default it will create a folder as "airflow" in default home directory .
i export AIRFLOW_HOME=${CURRENT_DIRECORY}
export AIRFLOW_HOME=.
next we initialize the database :
> airflow db init
This command will create a SQlite database , a lock folder and some configuration files.
Next I will start Airflow webserver
start a webserver
airflow webserver -p 8080
you can also use other ports for example , 8088 , 8082
Now lets open the link on the browser , it requires my username and password
Lets go back into the terminal and stop the webserver .
airflow users create -username admin --firstname firstname --lastname lastname --role Admin --email admin@domain.com
This prompts for the password : set the password and we are ready to go
lets start the webserver :
start a webserver
airflow webserver -p 8080
we can see all the DAGs but it says that there is no scheduler running .In order to start the DAGS we need to start the scheduler
Open up another terminal and set the export AIRFLOW_HOME=.
> airflow scheduler
Let's go back to browser and refresh the page , now the message is gone
from the tree view we can see multiple jobs
refresh button is on the right side. we can see that the task has been executed and it has been marked Dark green once it has run successfully.
Lets open a VSC and create a folder "airflow_docker"
we will open up apache airflow > documentation page
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
We need to install Docker and Docker compose on our laptop .
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.0/docker-compose.yaml'
we will use the local executor we are not using Celery to Local
remove Celery parameters
Redis is needed for Celery and not needed here .
celery worker and flower removed. we save the yaml file and we are ready to go .
Setting up Plugins .
mkdir -p ./dags ./logs ./plugins ./config echo -e "AIRFLOW_UID=$(id -u)" > .env
AIRFLOW_UID=50000
docker compose up airflow-init
Use docker ps to see what containers are running .
What is a workflow ?
Workflow is a sequence of tasks , In Airflow - workflow is defined as DAGs - Directed Acyclic Graph
We have a workflow with task A , followed by task B and C
A task represent a unit of work within a DAG as shown in an example DAG, it is represented in the note in the DAG Graph , and it is written in python and there is a dependency for each task, for example there is a dependency between tasks for example task C is the downstream of Task A . Task C is also the up stream of Task E , the Goal of the task
The Goal of a task is to achieve a specific thing. The method it uses is called operator .
- While DAGs describe how to run a workflow .
- Operator determine what actually gets done
Operator determines what actually gets done by a Task.
In Airflow there are many kinds of operator , such as
-- Bash Operator
-- Python Operator
-- And you can also write your customized operator. Each task is an implementation of an operator >
-- for example for python operator to execute some Python Code or a Bash Operator to run a Bash command and you can write your own customized operator
To sum up , the operator determines what is going to be done. The task implements an operator.
DAGs is collection of all the tasks that you want to run. organized in a way that reflects their relationship
Exection Date : Logic date when the DAG runs and its tasks instances are running for
Task Instance : A Task Instance is a run of a task in specific point in time (Execution date)
Dag Run : A DAG run is an instantiation of a DAG , containing task instances that run for a specific execution_date.
Task Lifestyle : Basic Architecture :
Stages
> docker-compose up -d
> docker-compose down -v
-v means that we are not only shutting down the airflow containers we are also moving the volumes we defined .
AIRFLOW_CORE_LOAD_EXAMPLES = true -to -false
> docker-compose up airflow-init
after that we launch
> docker-compose up -d
we go to dags folder and create a dag file.
out_first_dag.py
For retry delay we need to import "timedelta"
we change the DAG id and this changes the DAG version.
This means that task2 & 3 are down stream tasks of task 1 . Lets change the DAG id version and refresh the page again.
Each time you change your DAG ID : It changes the version.
Airflow DAG with Python Operator :
import python operator
from airflow.operators.python import pythonOperator
Lets make python Operator to take some parameters .
In Python Operator there is a parameter called op-kwargs - which is a dictionary of key word documents that will get unpacked in the Python function .
change the DAG id and
Can we shar information between different tasks -- we can achieve this from airflow xcoms
Basically we can push information to xcoms in one task and pull information in other tasks .
By default every functions return value will be automatically will be pushed into xcoms
lets go to Visual Studio Code and create a new python function called
def get_name
comment out task1 and update the DAG id.
Go to Admins > xcoms you will find the returnd valid in xcoms
Lets see how much lines of code we can reduce by using "taskflow api"
dag_with_taskflow_api.py
We have to define our DAG
we can also see the returned names are been pushed into Xcoms and taskflow apis takes care of xcom values pushed and pulled .
What if we need to put Firstname and Lastname instead of name
Now we have re-written the Python DAG into Taskflow API
Airflow Catch-Up and Backfill
In the VSC we will create a new DAG , dag_with_catchup.py and open it













































Comments
Post a Comment