본문 바로가기

Study/Bigdata

python 3.7.5 오픈소스 워크플로우 엔진 apache airflow(celery,rabbitmq,postgresql) 구축하기

 

기본적으로, rabbitmq, postgres는 이미 설정하셨다는 가정하에 설명되는 글입니다. 

 

pip install --upgrade pip

pip install  apache-airflow[postgres,celery,rabbitmq,ssh]  psycopg2-binary

pip install --upgrade apache-airflow[postgres,celery,rabbitmq,ssh]  psycopg2-binary

echo 'export AIRFLOW_HOME=~/airflow' >> ~/.bash_profile

 

다음과 같이 하면 airflow는 매우 쉽게 설치가 됩니다. 일단 저는 python 3.7.5 환경에서 하였습니다 

 

저도 이번에 airflow 를 하면서 외부 다른 외국 블로거나 글을 봤는데, 설명이 틀린것이 있더군요 

 

airflow를 설치하고

 

airflow initdb 를 하면 기본적으로

airflow.cfg 라는 환경 파일이 만들어집니다

 

이때 제가 한 설정은 다음과 같습니다 

 

executor = CeleryExecutor

sql_alchemy_conn =  postgresql+psycopg2://airflow:airflow@192.168.0.210/airflow

broker_url = pyamqp://airflow:airflow@devcloud/airflow_vhost

--> 해당설정에서 amqp 프로토콜로 rabbitmq 를 설정하라고 블로그에 많이 나옵니다. 이거 버전이 바뀐지는 모르겠으네 pyamqp 로 하셔야 합니다 설치방법은 pip install pyamqp로 하시면 됩니다. 만약 amqp로 할경우 오류 1(블로그 하단) 번과 같은 메세지를 만나게 됩니다. 

 

result_backend=  db+postgresql://airflow:airflow@192.168.0.210/airflow

--> 어떤글에서는 postgresql+psycopg2://airflow:airflow@192.168.0.210/airflow 과 동일하게 한다면 된다고 하는데 스케쥴러 실행할때 backend 오류나면서 postgres를 찾을수 없다는 오류가 나옵니다. 이부분은 celery document(airflow에 옵션을 넣으면 celery가 자동으로 설치가 됩니다.) 다음과 같이 하라고 나와 잇습니다. 

--> 그리고 어떤 글을 보면 celery_result_backend 옵션으로 소개되어잇는데, airflow 문서를 보면 버전업 되면서 result_backend 로 됩니다. 

 

default_timezone = Asia/Seoul

 

apache airflow

정상적으로 실행하면, running에서 sucess로 변합니다. 만약 실행했는데, queue에는 생성되었는데 celery가 동작하지 않으면 어디선가 잘못된것입니다.

 

apache airflow celery

실제 데몬을 띄우면 볼일은 없겠지만 튜토리얼을 돌리게 될겨우, rabbitmq에 queue에 쌓이면 celery에 의해서 처리가 됩니다 

 

 

apache airflow flower

 

airflow flower를 통해서 보시면, celery가 제대로 떠있는것을 대쉬보드로 보실수 있습니다 

 

 

[오류1]

[2019-11-26 09:06:19,812: CRITICAL/MainProcess] Unrecoverable error: SystemError("<method '_basic_recv' of '_librabbitmq.Connection' objects> returned a result with an error set")
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/messaging.py", line 624, in _receive_callback
    return on_m(message) if on_m else self.receive(decoded, message)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 568, in on_task_received
    callbacks,
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/strategy.py", line 203, in task_message_handler
    handle(req)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/worker.py", line 223, in _process_task_sem
    return self._quick_acquire(self._process_task, req)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/asynchronous/semaphore.py", line 62, in acquire
    callback(*partial_args, **partial_kwargs)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/worker.py", line 228, in _process_task
    req.execute_using_pool(self.pool)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/request.py", line 551, in execute_using_pool
    correlation_id=task_id,
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/concurrency/base.py", line 155, in apply_async
    **options)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/billiard/pool.py", line 1530, in apply_async
    self._quick_put((TASK, (result._job, None, func, args, kwds)))
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 819, in send_job
    body = dumps(tup, protocol=protocol)
TypeError: can't pickle memoryview objects

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
    self.blueprint.start(self)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
    return self.obj.start()
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
    blueprint.start(self)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
    c.loop(*c.loop_args())
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
    cb(*cbargs)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/transport/base.py", line 238, in on_readable
    reader(loop)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/transport/base.py", line 220, in _read
    drain_events(timeout=0)
  File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/librabbitmq/__init__.py", line 227, in drain_events
    self._basic_recv(timeout)
SystemError: <method '_basic_recv' of '_librabbitmq.Connection' objects> returned a result with an error set