There are some different types of Executors in airflow, like SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor, MesosExecutor. The quite common executors contain LocalExecutor and CeleryExecutor.
For LocalExecutor, it is efficient to run several tasks in local system. While, when you want to run it in a distributed way, you can think of CeleryExecutor. If you have touched big data distributed frameworks, like hadoop, spark or smiliar big data set. LocalExecutor is more like the local deployment mode. And the CeleryExecutor is more like the standalone mode.
Here, I will give a short description for CeleryExecutor mode, but I will implement LocalExecutor mode in this article.
For the CeleryExecutor, which is one of the ways you can scale out the number of workers, you need to setup a Celery backend. The celery backend includes PostgreSQL, Redis, RabbitMQ, etc.
The Celery in the airflow architecture consists of two components:
Broker — — Stores commands for executions
Result backend — — Stores status of completed commands
If you just have one server (machine), you’d better choose LocalExecutor mode. I will introduce the standalone (truly distributed) mode in the future based on Hadoop ecosystem.
Preparations:
Pip download (Ubuntu)
sudo apt-get -y install python3-pip
virtual environment built
pip3 install virtualenv
virtualenv --no-site-packages venv
Configuration:
setting the virtual environment:
Basically, I download all the dependencies under a virtual environment for my python project, no matter it is locally or included in the program folder.
After activating the venv, you can download any dependencies.
source venv/bin/activate
(venv)$ pip3 install libraries
1.Airflow:
pip install apache-airflow['postgres']
change the config file of airflow, go to airflow.cfg:
# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor
The LocalExecutor can parallelize task instances locally.
change the connection for your specified dataset in the airflow.cfg:
# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
# The format is:
#sql_alchemy_conn = #postgresql+psycopg2://$your_db_user:$your_db_password@$your_postgre#s_db_host:$postgres_port/$db_name
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
2.PostgreSQL:
sudo apt-get install postgresql postgresql-contrib
Create the psql object with:
sudo -u postgres psql
Then create the user and database for the airflow (same with the configuration in airflow.cfg):
postgres=# CREATE USER airflow PASSWORD 'airflow';
CREATE ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO newt;
GRANT
Check the created user and database:
postgres=# \du
postgres=# \l
- Delete the created users if you need: (add the airflow before)
\du
DROP USER airflow(created before);
Check PostgreSQL settings:
change the IP addresses can access the database in pg_hba.conf:
sudo vim /etc/postgresql/10/main/pg_hba.conf
If you want to ensure the security, you can set like this:
# IPv4 local connections:
host all all 127.0.0.1/32 md5
# IPv6 local connections:
host all all 0.0.0.0/0 reject
It is like the access list in the firewall, only IP with 127.0.0.1/32 can access and reject other IP addresses.
change the listening address in postgresql.conf:
sudo vim /etc/postgresql/10/main/postgresql.conf
If you want to make sure the security, try to change the * to localhost, which will ensure the listening localhost only.
Restart the PostgreSQL service:
sudo service postgresql restart
Additional commands:
- Exit the interface after \du or \l;
Press q
- Exit the postgres=#:
Ctrl+D
- Connect the PostgreSQL:
psql -U database_name(airflow here)
Additional notice:
check the path dependences:
vim ~/.bashrc
add the AIRFLOW_HOME path:
export AIRFLOW_HOME=xxx/anomaly_detection/airflow_home
export PATH=$PATH:/home/newt/.local/bin/activate the setting:
activate the setting:
source ~/.bashrc
If I initialize the airflow database with:
airflow initdb
Pay attention to the threads and parallelism:
After using the PostgreSQL, there is no limitation for the max_threads any more. While you should make sure you’d better not test with heavy tasks on your own laptop with threads 4 or above, it is easy to get crashed.
Parallelism can be the max value.
Errors I have met:
I got the error like:
sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) FATAL: role ‘xxx’ does not exist
Cuz I have not created a new user after I did the following commands:
sudo -u postgres psql
CREATE USER airflow PASSWORD 'xxx';
CREATE DATABASE airflow;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
References:
https://stackoverflow.com/questions/38200666/airflow-parallelism