Apache-AIRFLOW

 

 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

https://www.youtube.com/watch?v=ZET50M20hkU

This one : https://www.youtube.com/watch?v=K9AnJ9_ZAXE


00:00 - Airflow Introduction
03:06 - Run Airflow in Python Env
10:44 - Run Airflow in Docker
17:55 - Airflow Basics and Core Concepts
21:55 - Airflow Task Lifecycle
26:19 - Airflow Basic Architecture
28:14 - Airflow DAG with Bash Operator
40:09 - Airflow DAG with Python Operator
45:04 - Data Sharing via Airflow XComs
52:53 - Airflow Task Flow API
57:56 - Airflow Catch-Up and Backfill
01:02:09 - Airflow Scheduler with Cron Expression
01:07:25 - Airflow Connection to Postgres
01:08:58 - Airflow Postgres Operator
01:19:30 - Airflow Docker Install Python Package 2 ways
01:29:34 - Airflow AWS S3 Sensor Operator
01:42:37 - Airflow Hooks S3 PostgreSQL
02:00:43 - Course Bonus

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

What is Apache Airflow ?


Apache Airflow is an open-source platform designed for orchestrating complex workflows and data pipelines. It allows you to define, schedule, manage, and monitor workflows as directed acyclic graphs (DAGs). Airflow was originally developed by Airbnb and was later open-sourced and contributed to the Apache Software Foundation.

Key features of Apache Airflow include:

  1. Workflow Automation: Airflow enables you to automate and schedule complex workflows involving tasks that need to be executed in a specific sequence or based on certain conditions.

  2. Directed Acyclic Graphs (DAGs): Workflows in Airflow are represented as DAGs, which consist of tasks and dependencies. Tasks can be anything from running a script, executing a SQL query, transferring files, or triggering external processes.

  3. Task Dependencies: Airflow allows you to define dependencies between tasks, specifying the order in which tasks should be executed. Tasks can run in parallel or sequentially, based on their dependencies.

  4. Dynamic Workload Management: Airflow's dynamic configuration capabilities enable you to create reusable templates for tasks and dynamically generate task instances with different parameters.

  5. Extensibility: Airflow is highly extensible through custom operators and sensors. You can create your own operators to perform specific tasks or integrate with external services.

  6. Scheduler: Airflow has a built-in scheduler that manages the execution of tasks based on the defined schedule and task dependencies.

  7. Logging and Monitoring: Airflow provides a user interface (UI) that allows you to monitor the status of workflows, track task execution, view logs, and troubleshoot issues.

  8. Integration with External Systems: Airflow can integrate with various external systems and databases, such as Amazon S3, Google Cloud Storage, relational databases, and more.

  9. Scalability: Airflow can be configured to work in a distributed environment, allowing you to scale out the execution of tasks as your workflow demands grow.

  10. Extensive Community and Ecosystem: Being open-source, Airflow has a vibrant community contributing to its development. It has a wide range of third-party plugins, connectors, and integrations with other tools.




Amazon Managed Workflows for Apache Airflow 

-- Airflow is commonly used for tasks such as running ETL jobs ETL(Extract - Transform - Load)

-- Managing Machine Learning Pipelines

-- Automating DevOps Tasks



03:06 - Run Airflow in Python Env

First of all let's create a Python project and then install Airflow.

Airflow requires a python version above 3.6

Create a python environment : python3 -m venv py_env

The command python3 -m venv py_env is used to create a virtual environment for Python projects using the built-in venv module in Python 3. A virtual environment is a self-contained directory that houses its own Python interpreter and isolated libraries. This is particularly useful when working on multiple projects with different dependencies or when you want to avoid conflicts between system-wide Python installations and project-specific requirements.

source py_env/bin/activate

Next we are going to install Apache Airflow locally.

Official : Apache Github repo: https://github.com/apache/airflow

pip install 'apache-airflow==2.7.0' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.8.txt"

There was a missing of GCC . There is an instruction which asks us to install command line tools. 

Again we execute the pip install command again

pip install 'apache-airflow==2.7.0' \
 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.0/constraints-3.8.txt"

This time there is no error. This time the Airfow is installed successfully.


......Next ....Initialize the database for Airflow

Before that we need to integrate the Airflow home directory , by default it will create a folder as "airflow" in default home directory .

i export AIRFLOW_HOME=${CURRENT_DIRECORY}

export AIRFLOW_HOME=.

next we initialize the database :

> airflow db init

This command will create a SQlite database , a lock folder and some configuration files. 

Next I will start Airflow webserver

start a webserver

airflow webserver -p 8080

you can also use other ports for example , 8088 , 8082 

Now lets open the link on the browser , it requires my username and password


Lets go back into the terminal and stop the webserver  .


airflow users create -username admin --firstname firstname --lastname lastname --role Admin --email admin@domain.com

This prompts for the password : set the password and we are ready to go

lets start the webserver : 


start a webserver

airflow webserver -p 8080

we can see all the DAGs but it says that there is no scheduler running .

In order to start the DAGS we need to start the scheduler

Open up another terminal and set the export AIRFLOW_HOME=.

> airflow scheduler

Let's go back to browser and refresh the page , now the message is gone

from the tree view we can see multiple jobs

refresh button is on the right side. we can see that the task has been executed and it has been marked Dark green once it has run successfully. 

Lets open a VSC and create a folder "airflow_docker"

we will open up apache airflow > documentation page

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

We need to install Docker and Docker compose on our laptop .

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.0/docker-compose.yaml'

we will use the local executor we are not using Celery to Local
remove Celery parameters

Redis is needed for Celery and not needed here .



celery worker and flower removed. we save the yaml file and we are ready to go .

Setting up Plugins .

mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

AIRFLOW_UID=50000
docker compose up airflow-init

Use docker ps to see what containers are running . 

What is a workflow ?

Workflow is a sequence of tasks , In Airflow - workflow is defined as DAGs  - Directed Acyclic Graph

We have a workflow with task A , followed by task B and C


A task represent a unit of work within a DAG as shown in an example DAG, it is represented in the note in the DAG Graph , and it is written in python and there is a dependency for each task, for example there is a dependency between tasks for example task C is the downstream of Task A . Task C is also the up stream of Task E , the Goal of the task

The Goal of a task is to achieve a specific thing. The method it uses is called operator .

  • While DAGs describe how to run a workflow .
  • Operator determine what actually gets done

 Operator determines what actually gets done by a Task.

In Airflow there are many kinds of operator , such as

-- Bash Operator
-- Python Operator

-- And you can also write your customized operator. Each task is an implementation of an operator >
-- for example for python operator to execute some Python Code or a Bash Operator to run a Bash command and you can write your own customized operator

To sum up , the operator determines what is going to be done. The task implements an operator.

DAGs is collection of all the tasks that you want to run. organized in a way that reflects their relationship

Exection Date : Logic date when the DAG runs and its tasks instances are running for
Task Instance : A Task Instance is a run of a task in specific point in time (Execution date)
Dag Run : A DAG run is an instantiation of a DAG , containing task instances that run for a specific execution_date. 


Task Lifestyle : Basic Architecture :

Stages


 

 


 > docker-compose up -d

 > docker-compose down -v

 -v means that we are not only shutting down the airflow containers we are also moving the volumes we defined .

AIRFLOW_CORE_LOAD_EXAMPLES = true -to -false

> docker-compose up airflow-init

after that we launch 

> docker-compose up -d

we go to dags folder and create a dag file. 

out_first_dag.py


     For retry delay we need to import "timedelta"





we change the DAG id and this changes the DAG version.





This means that task2 & 3 are down stream tasks of task 1 . Lets change the DAG id version and refresh the page again.

Each time you change your DAG ID : It changes the version.






Airflow DAG with Python Operator :






import python operator

from airflow.operators.python import pythonOperator


Lets make python Operator to take some parameters .



In Python Operator there is a parameter called op-kwargs  - which is a dictionary of key word documents that will get unpacked in the Python function .


change the DAG id and

Can we shar information between different tasks -- we can achieve this from airflow xcoms
Basically we can push information to xcoms in one task and pull information in other tasks .

By default every functions return value will be automatically will be pushed into xcoms

lets go to Visual Studio Code and create a new python function called

def get_name







comment out task1 and update the DAG id.

Go to Admins > xcoms you will find the returnd valid in xcoms




Lets see how much lines of code we can reduce by using "taskflow api"

dag_with_taskflow_api.py


We have to define our DAG





we can also see the returned names are been pushed into Xcoms and taskflow apis takes care of xcom values pushed and pulled .

What if we need to put Firstname and Lastname instead of name 



Now we have re-written the Python DAG into Taskflow API

Airflow Catch-Up and Backfill

In the VSC we will create a new DAG , dag_with_catchup.py and open it








Comments