안녕하세요 서후아빠입니다. ^_^
이번 세션은 Amazon Managed Workflows for Apache (MWAA = Airflow)을 사용하는 방법에 대해서 실습해 보도록 하겠습니다. Airflow를 구성하기 위해서는 3가지 역량이 필요합니다. 1단계 WorkFlow(DAG) 작성을 위한 Python 작성 능력, 2단계 Airflow 아키텍처 구성 능력, 3단계 데이터 정합성 확인 능력입니다. 1단계 및 3단계는 WorkFlow를 잘 알고 있는 직원이 진행하는 것이 적합하므로 가급적 고객이 처리하도록 협의하시기 바랍니다.
AWS MWAA
구분 | 내용 |
설명 | 스크립트로 데이터 파이프라인 구성 및 관리하는 도구로서 DAG(Directed Acyclic Graph)로 워크플로우 정의 사용자 인터페이스로 데이터 파이프라인 모니터링 커뮤니티에서 개발한 커스텀 플러그인 세트를 통해 기능 확장 AWS 서비스 및 Apache Spark 등과 통합 |
분야 | ETL(압도적), AL/ML, DevOps |
구성요소 | Web server : DAG 목록, Task 상태, 로그 등을 UI로 제공 Scheduler : DAG 분석 및 모니터링, Worker 스케줄링, Task 핸들링 Worker : Task 실행 Metadata Database : DAG, Task 등의 실행기록 및 메타 데이터 저장, 관리 ![]() |
비용 산정 어려움 | DAG당 필요한 Worker 수 산정의 어려움 - 가벼운 DAG 경우 1개의 Worker에서 여러 개의 DAG 처리 - 무거운 DAG 경우 여러 개의 Worker에서 1개의 DAG 처리 DAG당 처리 시간 산정의 어려움 - 가벼운 DAG 경우 몇초 필요 - 무거운 DAG 경우 몇시간 필요 |
1단계 : S3 bucket 생성, DAG(And PlugIn)을 S3 bucket에 업로드
S3 > Buckets > Create bucket > Bucket name(airflow-bucket), AWS Region(ap-northeast-2), Bucket Versioning (Enable)
S3 > Buckets > airflow-bucket > Create folder > Folder name(dags)
버킷은 DAG or PlugIn용도로써 버킷명은 'airflow-'로 시작해야 합니다. 버킷 버전 관리 활성화하고 'requirements.txt' 파일을 통하여 버전 지정 가능합니다. |
S3 > Buckets > airflow-bucket > dags > Upload > random_sleep.py
# DAG 예시 (파일명 : random_sleep.py)
import random
import time
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(2),
"provide_context": True,
}
def task_proc(**context):
s = random.random() * 10 + 10
print(f"sleep {s} times.")
time.sleep(s)
with DAG(
dag_id="random_sleep",
default_args=args,
schedule_interval=None,
) as dag:
t1 = PythonOperator(task_id="random_sleep1", python_callable=task_proc)
t2 = PythonOperator(task_id="random_sleep2", python_callable=task_proc)
t3 = PythonOperator(task_id="random_sleep3", python_callable=task_proc)
t4 = PythonOperator(task_id="random_sleep4", python_callable=task_proc)
t5 = PythonOperator(task_id="random_sleep5", python_callable=task_proc)
t1 >> [t2, t3, t4] >> t5
2단계 : MWAA Environment 생성
Amazon MWAA > Environments > Create environment
Specify details | Configure advanced settings |
Environment details - Name : myairflow - Airflow version : 2.4.3 (Latest) - Weekly maintenance window start (UTC) : Thursday, 15:00 DAG code in Amazon S3 - S3 Bucket : s3://airflow-bucket - DAGs folder : s3://airflow-bucket/dags - Plugins file (옵션) : - - Requirements file (옵션) : - |
Networking - Virtual private cloud (VPC) : vpcA - Subnets : priA-sn-a, priA-sn-c - Web server access : Public network (or Private) - Security group(s) : 인바운드 443 허용된 보안그룹 선택 Environment class - type : mw1.small - Maximum worker count(1~25) : 1 - Minimum worker count(1~) : 1 - Scheduler count(2~5) : 2 Encryption(KMS 연동) : Disabled Monitoring - Airflow task logs : INFO - Airflow web server logs : WARNING - Airflow scheduler logs : WARNING - Airflow worker logs : WARNING - Airflow DAG processing logs : WARNING Airflow configuration options (옵션) : - Permissions - Execution role(자동 생성 가능) : AmazonMWAA-myairflow |
Requirements file 옵션을 사용하려면 airflow-bucket 버킷에 파일(ex : requirements.txt)을 사전에 업로드 해놔야 합니다. type(ex :mw1.small) 선택 : DAG capacity, Scheduler CPU, Worker CPU, Web server CPU Monitoring의 Log level 옵션 : CRITICAL, ERROR, WARNING, INFO Airflow configuration options : Configuration option/Custom value 추가 기능 생성은 20~30분 소요되며, Security group(s)에서 포트가 허용되지 않으면 생성 실패됩니다. 또한 Web server에 기본적으로 EIP가 하나 연결되는데, 등록할 수 있는 EIP 갯수가 부족하면 실패됩니다. ※ 서브넷이 Private이지만 Web server access를 Public으로 선택하면 Public에서 Web server 접속이 됩니다. |
# AmazonMWAA-myairflow 예시
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "airflow:PublishMetrics",
"Resource": "arn:aws:airflow:ap-northeast-2:111111111111:environment/myairflow"
},
{
"Effect": "Deny",
"Action": "s3:ListAllMyBuckets",
"Resource": [
"arn:aws:s3:::airflow-bucket",
"arn:aws:s3:::airflow-bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject*",
"s3:GetBucket*",
"s3:List*"
],
"Resource": [
"arn:aws:s3:::airflow-bucket",
"arn:aws:s3:::airflow-bucket/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogStream",
"logs:CreateLogGroup",
"logs:PutLogEvents",
"logs:GetLogEvents",
"logs:GetLogRecord",
"logs:GetLogGroupFields",
"logs:GetQueryResults"
],
"Resource": [
"arn:aws:logs:ap-northeast-2:111111111111:log-group:airflow-myairflow-*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:DescribeLogGroups"
],
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": "cloudwatch:PutMetricData",
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:GetQueueUrl",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:ap-northeast-2:*:airflow-celery-*"
},
{
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:DescribeKey",
"kms:GenerateDataKey*",
"kms:Encrypt"
],
"NotResource": "arn:aws:kms:*:111111111111:key/*",
"Condition": {
"StringLike": {
"kms:ViaService": [
"sqs.ap-northeast-2.amazonaws.com"
]
}
}
}
]
}
3단계 : Web Server 접속하여 수동으로 DAG를 Trigging
Amazon MWAA > Environments > myairflow에서 Open Airflow UI링크 클릭 (ex : https://ap-northeast-2.console.aws.amazon.com/mwaa/home?region=ap-northeast-2#environments/myairflowEnvironment/sso)
Airflow Web > DAGs > DAG 확인(random_sleep.py)
Airflow Web > DAGs > random_sleep.py 선택 > Graph(tab) > ▶ > Trigger DAG
CloudWatch > Log group > Airflow 관련 그룹 확인
'Database' 카테고리의 다른 글
[이른/실습] Amazon OpenSearch Service-1편(생성) (0) | 2022.12.16 |
---|---|
[실습] RDS 생성하기 (MySQL) (0) | 2022.08.08 |
[이론] AWS Database 기본 개념 (0) | 2022.07.06 |
[Tip] AWS DMS endpoint 실패 조치하기 (0) | 2022.07.06 |
[Tip] Amazon RDS 서브넷 그룹 (Subnet groups) 생성 Error 조치 방법 알아보기 (0) | 2022.07.01 |