Files
msa-django-nhn/nhn/tasks.py
icurfer 7136e76d63
All checks were successful
Build And Test / build-and-push (push) Successful in 53s
v0.0.9 | Add Load Balancer API
- 로드밸런서 CRUD API 추가
- 리스너, 풀, 멤버, 헬스 모니터 API 추가
- L7 정책/룰, IP ACL 그룹/타깃 API 추가
- 쿼타 조회 API 추가

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-16 11:14:03 +09:00

460 lines
12 KiB
Python

"""
NHN Cloud Async Task Runner
Threading 기반 비동기 작업 실행
"""
import logging
import threading
from functools import wraps
import django
from django.db import close_old_connections
logger = logging.getLogger(__name__)
def run_async(func):
"""
함수를 비동기로 실행하는 데코레이터
Django DB 연결 관리 포함
"""
@wraps(func)
def wrapper(*args, **kwargs):
def run():
try:
# Django 설정 확인
django.setup()
# 기존 DB 연결 정리
close_old_connections()
# 실제 함수 실행
func(*args, **kwargs)
except Exception as e:
logger.exception(f"Async task error: {e}")
finally:
# DB 연결 정리
close_old_connections()
thread = threading.Thread(target=run, daemon=True)
thread.start()
return thread
return wrapper
def execute_async_task(task_id, task_func, *args, **kwargs):
"""
AsyncTask 모델과 연동하여 비동기 작업 실행
Args:
task_id: AsyncTask 모델의 ID
task_func: 실행할 함수 (ApiCompute.create_instance 등)
*args, **kwargs: 함수에 전달할 인자
"""
from .models import AsyncTask
def run():
try:
django.setup()
close_old_connections()
# 작업 시작
task = AsyncTask.objects.get(id=task_id)
task.mark_running()
logger.info(f"[AsyncTask] 작업 시작: {task_id} ({task.task_type})")
# 실제 작업 실행
result = task_func(*args, **kwargs)
# 결과에서 리소스 ID 추출
resource_id = None
if isinstance(result, dict):
# 인스턴스 생성 결과
if "server" in result:
resource_id = result["server"].get("id")
# NKS 클러스터 생성 결과
elif "uuid" in result:
resource_id = result.get("uuid")
# 성공 처리
task.mark_success(result_data=result, resource_id=resource_id)
logger.info(f"[AsyncTask] 작업 완료: {task_id}, resource_id={resource_id}")
except Exception as e:
logger.exception(f"[AsyncTask] 작업 실패: {task_id}")
try:
task = AsyncTask.objects.get(id=task_id)
task.mark_failed(str(e))
except Exception:
pass
finally:
close_old_connections()
thread = threading.Thread(target=run, daemon=True)
thread.start()
logger.info(f"[AsyncTask] 백그라운드 스레드 시작: {task_id}")
return thread
def create_instance_async(region, tenant_id, token, instance_data):
"""
인스턴스 비동기 생성
Args:
region: 리전
tenant_id: 테넌트 ID
token: API 토큰
instance_data: 인스턴스 생성 데이터 (dict)
Returns:
AsyncTask: 생성된 작업 객체
"""
from .models import AsyncTask
from .packages.compute import ApiCompute
# 작업 레코드 생성
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.INSTANCE_CREATE,
request_data=instance_data,
resource_name=instance_data.get("name", ""),
)
# API 객체 생성
api = ApiCompute(region, tenant_id, token)
# 비동기 실행
execute_async_task(
task_id=task.id,
task_func=api.create_instance,
name=instance_data["name"],
image_id=instance_data["image_id"],
flavor_id=instance_data["flavor_id"],
subnet_id=instance_data["subnet_id"],
keypair_name=instance_data.get("keypair_name", ""),
volume_size=instance_data.get("volume_size", 50),
volume_type=instance_data.get("volume_type", "General SSD"),
security_groups=instance_data.get("security_groups"),
availability_zone=instance_data.get("availability_zone"),
)
return task
def create_nks_cluster_async(region, token, cluster_data):
"""
NKS 클러스터 비동기 생성
Args:
region: 리전
token: API 토큰
cluster_data: 클러스터 생성 데이터 (dict)
Returns:
AsyncTask: 생성된 작업 객체
"""
from .models import AsyncTask
from .packages.nks import ApiNks
# 작업 레코드 생성
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.NKS_CREATE,
request_data=cluster_data,
resource_name=cluster_data.get("cluster_name", ""),
)
# API 객체 생성
api = ApiNks(region, token)
# 비동기 실행
execute_async_task(
task_id=task.id,
task_func=api.create_cluster,
**cluster_data,
)
return task
# ==================== Instance 비동기 작업 ====================
def delete_instance_async(region, tenant_id, token, server_id, server_name=""):
"""인스턴스 비동기 삭제"""
from .models import AsyncTask
from .packages.compute import ApiCompute
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.INSTANCE_DELETE,
request_data={"server_id": server_id},
resource_name=server_name,
resource_id=server_id,
)
api = ApiCompute(region, tenant_id, token)
execute_async_task(
task_id=task.id,
task_func=api.delete_instance,
server_id=server_id,
)
return task
def instance_action_async(region, tenant_id, token, server_id, action, server_name="", hard=False):
"""인스턴스 액션 비동기 실행 (start/stop/reboot)"""
from .models import AsyncTask
from .packages.compute import ApiCompute
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.INSTANCE_ACTION,
request_data={"server_id": server_id, "action": action, "hard": hard},
resource_name=server_name,
resource_id=server_id,
)
api = ApiCompute(region, tenant_id, token)
if action == "start":
task_func = api.start_instance
elif action == "stop":
task_func = api.stop_instance
elif action == "reboot":
def task_func(server_id):
return api.reboot_instance(server_id, hard=hard)
else:
raise ValueError(f"Invalid action: {action}")
execute_async_task(
task_id=task.id,
task_func=task_func,
server_id=server_id,
)
return task
# ==================== VPC 비동기 작업 ====================
def create_vpc_async(region, token, name, cidr):
"""VPC 비동기 생성"""
from .models import AsyncTask
from .packages.vpc import ApiVpc
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.VPC_CREATE,
request_data={"name": name, "cidr": cidr},
resource_name=name,
)
api = ApiVpc(region, token)
execute_async_task(
task_id=task.id,
task_func=api.create_vpc,
name=name,
cidr=cidr,
)
return task
def delete_vpc_async(region, token, vpc_id, vpc_name=""):
"""VPC 비동기 삭제"""
from .models import AsyncTask
from .packages.vpc import ApiVpc
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.VPC_DELETE,
request_data={"vpc_id": vpc_id},
resource_name=vpc_name,
resource_id=vpc_id,
)
api = ApiVpc(region, token)
execute_async_task(
task_id=task.id,
task_func=api.delete_vpc,
vpc_id=vpc_id,
)
return task
# ==================== Subnet 비동기 작업 ====================
def create_subnet_async(region, token, vpc_id, cidr, name):
"""서브넷 비동기 생성"""
from .models import AsyncTask
from .packages.vpc import ApiVpc
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.SUBNET_CREATE,
request_data={"vpc_id": vpc_id, "cidr": cidr, "name": name},
resource_name=name,
)
api = ApiVpc(region, token)
execute_async_task(
task_id=task.id,
task_func=api.create_subnet,
vpc_id=vpc_id,
cidr=cidr,
name=name,
)
return task
def delete_subnet_async(region, token, subnet_id, subnet_name=""):
"""서브넷 비동기 삭제"""
from .models import AsyncTask
from .packages.vpc import ApiVpc
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.SUBNET_DELETE,
request_data={"subnet_id": subnet_id},
resource_name=subnet_name,
resource_id=subnet_id,
)
api = ApiVpc(region, token)
execute_async_task(
task_id=task.id,
task_func=api.delete_subnet,
subnet_id=subnet_id,
)
return task
# ==================== NKS 비동기 작업 ====================
def delete_nks_cluster_async(region, token, cluster_name):
"""NKS 클러스터 비동기 삭제"""
from .models import AsyncTask
from .packages.nks import ApiNks
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.NKS_DELETE,
request_data={"cluster_name": cluster_name},
resource_name=cluster_name,
)
api = ApiNks(region, token)
execute_async_task(
task_id=task.id,
task_func=api.delete_cluster,
cluster_name=cluster_name,
)
return task
# ==================== Storage 비동기 작업 ====================
def create_storage_container_async(region, token, storage_account, container_name):
"""스토리지 컨테이너 비동기 생성"""
from .models import AsyncTask
from .packages.storage import ApiStorageObject
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.STORAGE_CREATE,
request_data={"container_name": container_name},
resource_name=container_name,
)
api = ApiStorageObject(region, token, storage_account)
execute_async_task(
task_id=task.id,
task_func=api.create_container,
container_name=container_name,
)
return task
def delete_storage_container_async(region, token, storage_account, container_name):
"""스토리지 컨테이너 비동기 삭제"""
from .models import AsyncTask
from .packages.storage import ApiStorageObject
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.STORAGE_DELETE,
request_data={"container_name": container_name},
resource_name=container_name,
)
api = ApiStorageObject(region, token, storage_account)
execute_async_task(
task_id=task.id,
task_func=api.delete_container,
container_name=container_name,
)
return task
# ==================== Load Balancer 비동기 작업 ====================
def create_loadbalancer_async(region, token, lb_data):
"""
로드밸런서 비동기 생성
Args:
region: 리전
token: API 토큰
lb_data: 로드밸런서 생성 데이터 (dict)
Returns:
AsyncTask: 생성된 작업 객체
"""
from .models import AsyncTask
from .packages.loadbalancer import ApiLoadBalancer
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.LB_CREATE,
request_data=lb_data,
resource_name=lb_data.get("name", ""),
)
api = ApiLoadBalancer(region, token)
execute_async_task(
task_id=task.id,
task_func=api.create_loadbalancer,
vip_subnet_id=lb_data["vip_subnet_id"],
name=lb_data.get("name"),
description=lb_data.get("description"),
vip_address=lb_data.get("vip_address"),
admin_state_up=lb_data.get("admin_state_up", True),
loadbalancer_type=lb_data.get("loadbalancer_type", "shared"),
)
return task
def delete_loadbalancer_async(region, token, loadbalancer_id, loadbalancer_name=""):
"""로드밸런서 비동기 삭제"""
from .models import AsyncTask
from .packages.loadbalancer import ApiLoadBalancer
task = AsyncTask.objects.create(
task_type=AsyncTask.TaskType.LB_DELETE,
request_data={"loadbalancer_id": loadbalancer_id},
resource_name=loadbalancer_name,
resource_id=loadbalancer_id,
)
api = ApiLoadBalancer(region, token)
execute_async_task(
task_id=task.id,
task_func=api.delete_loadbalancer,
loadbalancer_id=loadbalancer_id,
)
return task