airflow 4

[Airflow] RuntimeError: Cannot re-initialize CUDA in forked subprocess 에러 해결

실행 환경 OS: MacOS apache-airflow: 2.2.3 버전 문제 상황 Airflow로 개발중인 기능은 다음과 같았다. HuggingFace에 업로드 된 크롤링 데이터를 서버로 가져와 학습시킬 수 있는 형태로 전처리한다. 전처리된 데이터를 Lora 활용하여 kullm(구름)의 fine-tuning을 진행한다. 2번 기능을 테스트하던 중 아래와 같은 에러가 발생했다. RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method airflow 없이 독립적으로 실행할 때는 정상적으로 동작하던 코드였는데, Airflow로 ..

[Airflow] Airflow 아키텍처

DAG 디렉토리 DAG 파일들을 저장한다. 이 디렉토리 내부의 python 스크립트 파일은 Scheduler에 의해 탐색되고 DAG을 파싱한다. Scheduler 각종 메타 정보의 기록을 담당한다. DAG 디렉토리 내부의 python 스크립트 파일에서 DAG를 파싱하여 DB에 저장한다. 실행 진행 상황과 결과를 DB에 저장한다. Excutor를 통해 실제로 스케줄링된 DAG를 실행한다. Executor 스케줄링된 DAG를 Workers에게 작업을 실행시키는 역할을 한다. 크게 Local Executor와 Remote Executor로 나눌 수 있다. Local Executor는 DAG Run을 프로세스 단위로 실행한다. Remote Executor는 DAG Run을 외부 프로세스로 실행한다. Worker..

[Airflow] DAG 작성하기

DAG Airflow에서는 DAG이라는 단위로 스케줄링을 관리한다. DAG는 Directed Acyclic Graph의 약자로, 순환하지 않고 방향이 존재하는 그래프를 의미한다. 위와 그림과 같은 DAG는 a, b, c, d, e라고 하는 task의 조합으로 이루어져 있다. task는 파이프라인에서 실행되는 단위 작업이다. DAG 내에서 task는 이전에 수행되어야 하는 task가 모두 완료되면 실행되고, 여러 task를 동시에 실행시킬 수 있다. 예를 들어 DAG 구조가 위 그림과 같다면, a task가 완료된 후 b, c task가 동시에 실행되고, b와 c task가 모두 완료되어야 d task가 실행된다. DAG 작성하기 먼저 DAG를 담을 디렉토리를 Airflow 기본 디렉토리 경로에서 생성한다..

[Airflow] Airflow 설치 및 실행하기

Airflow 등장 전 Batch Process 구축 방법 Batch Process란? 예약된 시간에 일괄적으로 처리하는 프로세스를 말한다. Linux Crontab OS에 의해 지정된 시간마다 명령어가 수행된다. crontab -e를 입력하면 아래와 같은 에디터가 나온다. 이 에디터에 0 * * * * predict.py를 입력하면 매 시 0분에 predict.py가 실행된다. Linux Crontab의 문제점 명령어를 수행하다가 오류가 발생한 경우, 재실행을 하거나 오류가 발생했다는 알람을 받을 수 없다. 과거 실행 이력 및 실행 로그를 확인하기 어렵다. 여러 파일을 실행하거나, 복잡한 파이프라인을 만들기 힘들다. 따라서 간단한 Batch Process를 처리하는데 Linux Crontab이 적합할 ..