본문 바로가기

Database

[실습] Amazon Managed Workflows for Apache (MWAA) 구성하기

반응형

안녕하세요 서후아빠입니다. ^_^
이번 세션은 Amazon Managed Workflows for Apache (MWAA = Airflow)을 사용하는 방법에 대해서 실습해 보도록 하겠습니다. Airflow를 구성하기 위해서는 3가지 역량이 필요합니다. 1단계 WorkFlow(DAG) 작성을 위한 Python 작성 능력, 2단계 Airflow 아키텍처 구성 능력, 3단계 데이터 정합성 확인 능력입니다. 1단계 및 3단계는 WorkFlow를 잘 알고 있는 직원이 진행하는 것이 적합하므로 가급적 고객이 처리하도록 협의하시기 바랍니다. 


AWS MWAA

구분 내용
설명 스크립트로 데이터 파이프라인 구성 및 관리하는 도구로서 DAG(Directed Acyclic Graph)로 워크플로우 정의
사용자 인터페이스로 데이터 파이프라인 모니터링
커뮤니티에서 개발한 커스텀 플러그인 세트를 통해 기능 확장
AWS 서비스 및 Apache Spark 등과 통합
분야 ETL(압도적), AL/ML, DevOps
구성요소 Web server : DAG 목록, Task 상태, 로그 등을 UI로 제공
Scheduler : DAG 분석 및 모니터링, Worker 스케줄링, Task 핸들링 
Worker : Task 실행
Metadata Database : DAG, Task 등의 실행기록 및 메타 데이터 저장, 관리

비용 산정 어려움 DAG당 필요한 Worker 수 산정의 어려움
 - 가벼운 DAG 경우 1개의 Worker에서 여러 개의 DAG 처리
 - 무거운 DAG 경우 여러 개의 Worker에서 1개의 DAG 처리
DAG당 처리 시간 산정의 어려움
 - 가벼운 DAG 경우 몇초 필요
 - 무거운 DAG 경우 몇시간 필요

1단계 : S3 bucket 생성, DAG(And PlugIn)을 S3 bucket에 업로드

S3 > Buckets > Create bucket > Bucket name(airflow-bucket), AWS Region(ap-northeast-2), Bucket Versioning (Enable)
S3 > Buckets > airflow-bucket > Create folder >  Folder name(dags)

버킷은 DAG or PlugIn용도로써 버킷명은 'airflow-'로 시작해야 합니다.
버킷 버전 관리 활성화하고 'requirements.txt' 파일을 통하여 버전 지정 가능합니다.

S3 > Buckets > airflow-bucket > dags > Upload > random_sleep.py

# DAG 예시 (파일명 : random_sleep.py)
import random
import time

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

args = {
    "owner": "airflow",
    "start_date": airflow.utils.dates.days_ago(2),
    "provide_context": True,
}


def task_proc(**context):
    s = random.random() * 10 + 10
    print(f"sleep {s} times.")
    time.sleep(s)


with DAG(
    dag_id="random_sleep",
    default_args=args,
    schedule_interval=None,
) as dag:
    t1 = PythonOperator(task_id="random_sleep1", python_callable=task_proc)
    t2 = PythonOperator(task_id="random_sleep2", python_callable=task_proc)
    t3 = PythonOperator(task_id="random_sleep3", python_callable=task_proc)
    t4 = PythonOperator(task_id="random_sleep4", python_callable=task_proc)
    t5 = PythonOperator(task_id="random_sleep5", python_callable=task_proc)

    t1 >> [t2, t3, t4] >> t5

2단계 : MWAA Environment 생성 

Amazon MWAA > Environments > Create environment

Specify details Configure advanced settings
Environment details
- Name : myairflow
- Airflow version : 2.4.3 (Latest)
- Weekly maintenance window start (UTC) : Thursday, 15:00
DAG code in Amazon S3
- S3 Bucket : s3://airflow-bucket
- DAGs folder : s3://airflow-bucket/dags
- Plugins file (옵션) : -
- Requirements file (옵션) : -
Networking
- Virtual private cloud (VPC) : vpcA
- Subnets : priA-sn-a, priA-sn-c
- Web server access : Public network (or Private)
- Security group(s) : 인바운드 443 허용된 보안그룹 선택
Environment class
- type : mw1.small
- Maximum worker count(1~25) : 1
- Minimum worker count(1~) : 1
- Scheduler count(2~5) : 2
Encryption(KMS 연동) : Disabled
Monitoring
- Airflow task logs : INFO
- Airflow web server logs : WARNING
- Airflow scheduler logs : WARNING
- Airflow worker logs : WARNING
- Airflow DAG processing logs : WARNING
Airflow configuration options (옵션) : -
Permissions 
- Execution role(자동 생성 가능) : AmazonMWAA-myairflow
Requirements file 옵션을 사용하려면 airflow-bucket 버킷에 파일(ex : requirements.txt)을 사전에 업로드 해놔야 합니다.
type(ex :mw1.small) 선택 : DAG capacity, Scheduler CPU, Worker CPU, Web server CPU 
Monitoring의 Log level 옵션 : CRITICAL, ERROR, WARNING, INFO
Airflow configuration options : Configuration option/Custom value 추가 기능

생성은 20~30분 소요되며, Security group(s)에서 포트가 허용되지 않으면 생성 실패됩니다.
또한 Web server에 기본적으로 EIP가 하나 연결되는데, 등록할 수 있는 EIP 갯수가 부족하면 실패됩니다.
  ※ 서브넷이 Private이지만 Web server access를 Public으로 선택하면 Public에서 Web server 접속이 됩니다.      
# AmazonMWAA-myairflow 예시
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "airflow:PublishMetrics",
            "Resource": "arn:aws:airflow:ap-northeast-2:111111111111:environment/myairflow"
        },
        {
            "Effect": "Deny",
            "Action": "s3:ListAllMyBuckets",
            "Resource": [
                "arn:aws:s3:::airflow-bucket",
                "arn:aws:s3:::airflow-bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"
            ],
            "Resource": [
                "arn:aws:s3:::airflow-bucket",
                "arn:aws:s3:::airflow-bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:CreateLogGroup",
                "logs:PutLogEvents",
                "logs:GetLogEvents",
                "logs:GetLogRecord",
                "logs:GetLogGroupFields",
                "logs:GetQueryResults"
            ],
            "Resource": [
                "arn:aws:logs:ap-northeast-2:111111111111:log-group:airflow-myairflow-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:DescribeLogGroups"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "cloudwatch:PutMetricData",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ChangeMessageVisibility",
                "sqs:DeleteMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl",
                "sqs:ReceiveMessage",
                "sqs:SendMessage"
            ],
            "Resource": "arn:aws:sqs:ap-northeast-2:*:airflow-celery-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:DescribeKey",
                "kms:GenerateDataKey*",
                "kms:Encrypt"
            ],
            "NotResource": "arn:aws:kms:*:111111111111:key/*",
            "Condition": {
                "StringLike": {
                    "kms:ViaService": [
                        "sqs.ap-northeast-2.amazonaws.com"
                    ]
                }
            }
        }
    ]
}

3단계 : Web Server 접속하여 수동으로 DAG를 Trigging

Amazon MWAA > Environments > myairflow에서 Open Airflow UI링크 클릭 (ex :  https://ap-northeast-2.console.aws.amazon.com/mwaa/home?region=ap-northeast-2#environments/myairflowEnvironment/sso)

Airflow Web > DAGs > DAG 확인(random_sleep.py)

[DAG 확인 (ramdom_sleep)]

Airflow Web > DAGs > random_sleep.py 선택 > Graph(tab) > ▶ > Trigger DAG

[수동으로 DAG를 Trigging]

CloudWatch > Log group > Airflow 관련 그룹 확인

[CloudWatch 생성된 Log group 목록]

반응형