기본적으로, rabbitmq, postgres는 이미 설정하셨다는 가정하에 설명되는 글입니다.
pip install --upgrade pip
pip install apache-airflow[postgres,celery,rabbitmq,ssh] psycopg2-binary
pip install --upgrade apache-airflow[postgres,celery,rabbitmq,ssh] psycopg2-binary
echo 'export AIRFLOW_HOME=~/airflow' >> ~/.bash_profile
다음과 같이 하면 airflow는 매우 쉽게 설치가 됩니다. 일단 저는 python 3.7.5 환경에서 하였습니다
저도 이번에 airflow 를 하면서 외부 다른 외국 블로거나 글을 봤는데, 설명이 틀린것이 있더군요
airflow를 설치하고
airflow initdb 를 하면 기본적으로
airflow.cfg 라는 환경 파일이 만들어집니다
이때 제가 한 설정은 다음과 같습니다
executor = CeleryExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@192.168.0.210/airflow
broker_url = pyamqp://airflow:airflow@devcloud/airflow_vhost
--> 해당설정에서 amqp 프로토콜로 rabbitmq 를 설정하라고 블로그에 많이 나옵니다. 이거 버전이 바뀐지는 모르겠으네 pyamqp 로 하셔야 합니다 설치방법은 pip install pyamqp로 하시면 됩니다. 만약 amqp로 할경우 오류 1(블로그 하단) 번과 같은 메세지를 만나게 됩니다.
result_backend= db+postgresql://airflow:airflow@192.168.0.210/airflow
--> 어떤글에서는 postgresql+psycopg2://airflow:airflow@192.168.0.210/airflow 과 동일하게 한다면 된다고 하는데 스케쥴러 실행할때 backend 오류나면서 postgres를 찾을수 없다는 오류가 나옵니다. 이부분은 celery document(airflow에 옵션을 넣으면 celery가 자동으로 설치가 됩니다.) 다음과 같이 하라고 나와 잇습니다.
--> 그리고 어떤 글을 보면 celery_result_backend 옵션으로 소개되어잇는데, airflow 문서를 보면 버전업 되면서 result_backend 로 됩니다.
default_timezone = Asia/Seoul
정상적으로 실행하면, running에서 sucess로 변합니다. 만약 실행했는데, queue에는 생성되었는데 celery가 동작하지 않으면 어디선가 잘못된것입니다.
실제 데몬을 띄우면 볼일은 없겠지만 튜토리얼을 돌리게 될겨우, rabbitmq에 queue에 쌓이면 celery에 의해서 처리가 됩니다
airflow flower를 통해서 보시면, celery가 제대로 떠있는것을 대쉬보드로 보실수 있습니다
[오류1]
[2019-11-26 09:06:19,812: CRITICAL/MainProcess] Unrecoverable error: SystemError("<method '_basic_recv' of '_librabbitmq.Connection' objects> returned a result with an error set")
Traceback (most recent call last):
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/messaging.py", line 624, in _receive_callback
return on_m(message) if on_m else self.receive(decoded, message)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 568, in on_task_received
callbacks,
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/strategy.py", line 203, in task_message_handler
handle(req)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/worker.py", line 223, in _process_task_sem
return self._quick_acquire(self._process_task, req)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/asynchronous/semaphore.py", line 62, in acquire
callback(*partial_args, **partial_kwargs)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/worker.py", line 228, in _process_task
req.execute_using_pool(self.pool)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/request.py", line 551, in execute_using_pool
correlation_id=task_id,
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/concurrency/base.py", line 155, in apply_async
**options)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/billiard/pool.py", line 1530, in apply_async
self._quick_put((TASK, (result._job, None, func, args, kwds)))
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/concurrency/asynpool.py", line 819, in send_job
body = dumps(tup, protocol=protocol)
TypeError: can't pickle memoryview objects
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/worker.py", line 205, in start
self.blueprint.start(self)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/bootsteps.py", line 369, in start
return self.obj.start()
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/celery/worker/loops.py", line 91, in asynloop
next(loop)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/asynchronous/hub.py", line 362, in create_loop
cb(*cbargs)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/transport/base.py", line 238, in on_readable
reader(loop)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/kombu/transport/base.py", line 220, in _read
drain_events(timeout=0)
File "/home/airflow/.pyenv/versions/3.7.5/envs/dev/lib/python3.7/site-packages/librabbitmq/__init__.py", line 227, in drain_events
self._basic_recv(timeout)
SystemError: <method '_basic_recv' of '_librabbitmq.Connection' objects> returned a result with an error set
'Study > Bigdata' 카테고리의 다른 글
NIFI could not load known_hosts 해결방법 (0) | 2020.01.17 |
---|---|
NBP Ncloud Cloud Hadoop에서 pySpark로 Object Storage 읽오는 방법 (0) | 2019.12.18 |
인트라넷(폐쇄망) 환경에서 Ambari, HDP 배포하기 (0) | 2019.05.06 |
apache phoenix org.apache.phoenix.exception.PhoenixIOException: org.apache.hadoop.hbase.security.AccessDeniedException: Insufficient permissions for user jdbc (0) | 2018.12.12 |
Pyspark로 Spark on Yarn Code --1(개발환경구성) (0) | 2018.11.29 |
HDP3 에서 Spark 로 Hive Table 를 조회했는데 빈값이 나온경우 (0) | 2018.10.03 |