""" NHN Cloud Async Task Runner Threading 기반 비동기 작업 실행 """ import logging import threading from functools import wraps import django from django.db import close_old_connections logger = logging.getLogger(__name__) def run_async(func): """ 함수를 비동기로 실행하는 데코레이터 Django DB 연결 관리 포함 """ @wraps(func) def wrapper(*args, **kwargs): def run(): try: # Django 설정 확인 django.setup() # 기존 DB 연결 정리 close_old_connections() # 실제 함수 실행 func(*args, **kwargs) except Exception as e: logger.exception(f"Async task error: {e}") finally: # DB 연결 정리 close_old_connections() thread = threading.Thread(target=run, daemon=True) thread.start() return thread return wrapper def execute_async_task(task_id, task_func, *args, **kwargs): """ AsyncTask 모델과 연동하여 비동기 작업 실행 Args: task_id: AsyncTask 모델의 ID task_func: 실행할 함수 (ApiCompute.create_instance 등) *args, **kwargs: 함수에 전달할 인자 """ from .models import AsyncTask def run(): try: django.setup() close_old_connections() # 작업 시작 task = AsyncTask.objects.get(id=task_id) task.mark_running() logger.info(f"[AsyncTask] 작업 시작: {task_id} ({task.task_type})") # 실제 작업 실행 result = task_func(*args, **kwargs) # 결과에서 리소스 ID 추출 resource_id = None if isinstance(result, dict): # 인스턴스 생성 결과 if "server" in result: resource_id = result["server"].get("id") # NKS 클러스터 생성 결과 elif "uuid" in result: resource_id = result.get("uuid") # 성공 처리 task.mark_success(result_data=result, resource_id=resource_id) logger.info(f"[AsyncTask] 작업 완료: {task_id}, resource_id={resource_id}") except Exception as e: logger.exception(f"[AsyncTask] 작업 실패: {task_id}") try: task = AsyncTask.objects.get(id=task_id) task.mark_failed(str(e)) except Exception: pass finally: close_old_connections() thread = threading.Thread(target=run, daemon=True) thread.start() logger.info(f"[AsyncTask] 백그라운드 스레드 시작: {task_id}") return thread def create_instance_async(region, tenant_id, token, instance_data): """ 인스턴스 비동기 생성 Args: region: 리전 tenant_id: 테넌트 ID token: API 토큰 instance_data: 인스턴스 생성 데이터 (dict) Returns: AsyncTask: 생성된 작업 객체 """ from .models import AsyncTask from .packages.compute import ApiCompute # 작업 레코드 생성 task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.INSTANCE_CREATE, request_data=instance_data, resource_name=instance_data.get("name", ""), ) # API 객체 생성 api = ApiCompute(region, tenant_id, token) # 비동기 실행 execute_async_task( task_id=task.id, task_func=api.create_instance, name=instance_data["name"], image_id=instance_data["image_id"], flavor_id=instance_data["flavor_id"], subnet_id=instance_data["subnet_id"], keypair_name=instance_data.get("keypair_name", ""), volume_size=instance_data.get("volume_size", 50), volume_type=instance_data.get("volume_type", "General SSD"), security_groups=instance_data.get("security_groups"), availability_zone=instance_data.get("availability_zone"), ) return task def create_nks_cluster_async(region, token, cluster_data): """ NKS 클러스터 비동기 생성 Args: region: 리전 token: API 토큰 cluster_data: 클러스터 생성 데이터 (dict) Returns: AsyncTask: 생성된 작업 객체 """ from .models import AsyncTask from .packages.nks import ApiNks # 작업 레코드 생성 task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.NKS_CREATE, request_data=cluster_data, resource_name=cluster_data.get("cluster_name", ""), ) # API 객체 생성 api = ApiNks(region, token) # 비동기 실행 execute_async_task( task_id=task.id, task_func=api.create_cluster, **cluster_data, ) return task # ==================== Instance 비동기 작업 ==================== def delete_instance_async(region, tenant_id, token, server_id, server_name=""): """인스턴스 비동기 삭제""" from .models import AsyncTask from .packages.compute import ApiCompute task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.INSTANCE_DELETE, request_data={"server_id": server_id}, resource_name=server_name, resource_id=server_id, ) api = ApiCompute(region, tenant_id, token) execute_async_task( task_id=task.id, task_func=api.delete_instance, server_id=server_id, ) return task def instance_action_async(region, tenant_id, token, server_id, action, server_name="", hard=False): """인스턴스 액션 비동기 실행 (start/stop/reboot)""" from .models import AsyncTask from .packages.compute import ApiCompute task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.INSTANCE_ACTION, request_data={"server_id": server_id, "action": action, "hard": hard}, resource_name=server_name, resource_id=server_id, ) api = ApiCompute(region, tenant_id, token) if action == "start": task_func = api.start_instance elif action == "stop": task_func = api.stop_instance elif action == "reboot": def task_func(server_id): return api.reboot_instance(server_id, hard=hard) else: raise ValueError(f"Invalid action: {action}") execute_async_task( task_id=task.id, task_func=task_func, server_id=server_id, ) return task # ==================== VPC 비동기 작업 ==================== def create_vpc_async(region, token, name, cidr): """VPC 비동기 생성""" from .models import AsyncTask from .packages.vpc import ApiVpc task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.VPC_CREATE, request_data={"name": name, "cidr": cidr}, resource_name=name, ) api = ApiVpc(region, token) execute_async_task( task_id=task.id, task_func=api.create_vpc, name=name, cidr=cidr, ) return task def delete_vpc_async(region, token, vpc_id, vpc_name=""): """VPC 비동기 삭제""" from .models import AsyncTask from .packages.vpc import ApiVpc task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.VPC_DELETE, request_data={"vpc_id": vpc_id}, resource_name=vpc_name, resource_id=vpc_id, ) api = ApiVpc(region, token) execute_async_task( task_id=task.id, task_func=api.delete_vpc, vpc_id=vpc_id, ) return task # ==================== Subnet 비동기 작업 ==================== def create_subnet_async(region, token, vpc_id, cidr, name): """서브넷 비동기 생성""" from .models import AsyncTask from .packages.vpc import ApiVpc task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.SUBNET_CREATE, request_data={"vpc_id": vpc_id, "cidr": cidr, "name": name}, resource_name=name, ) api = ApiVpc(region, token) execute_async_task( task_id=task.id, task_func=api.create_subnet, vpc_id=vpc_id, cidr=cidr, name=name, ) return task def delete_subnet_async(region, token, subnet_id, subnet_name=""): """서브넷 비동기 삭제""" from .models import AsyncTask from .packages.vpc import ApiVpc task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.SUBNET_DELETE, request_data={"subnet_id": subnet_id}, resource_name=subnet_name, resource_id=subnet_id, ) api = ApiVpc(region, token) execute_async_task( task_id=task.id, task_func=api.delete_subnet, subnet_id=subnet_id, ) return task # ==================== NKS 비동기 작업 ==================== def delete_nks_cluster_async(region, token, cluster_name): """NKS 클러스터 비동기 삭제""" from .models import AsyncTask from .packages.nks import ApiNks task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.NKS_DELETE, request_data={"cluster_name": cluster_name}, resource_name=cluster_name, ) api = ApiNks(region, token) execute_async_task( task_id=task.id, task_func=api.delete_cluster, cluster_name=cluster_name, ) return task # ==================== Storage 비동기 작업 ==================== def create_storage_container_async(region, token, storage_account, container_name): """스토리지 컨테이너 비동기 생성""" from .models import AsyncTask from .packages.storage import ApiStorageObject task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.STORAGE_CREATE, request_data={"container_name": container_name}, resource_name=container_name, ) api = ApiStorageObject(region, token, storage_account) execute_async_task( task_id=task.id, task_func=api.create_container, container_name=container_name, ) return task def delete_storage_container_async(region, token, storage_account, container_name): """스토리지 컨테이너 비동기 삭제""" from .models import AsyncTask from .packages.storage import ApiStorageObject task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.STORAGE_DELETE, request_data={"container_name": container_name}, resource_name=container_name, ) api = ApiStorageObject(region, token, storage_account) execute_async_task( task_id=task.id, task_func=api.delete_container, container_name=container_name, ) return task # ==================== Load Balancer 비동기 작업 ==================== def create_loadbalancer_async(region, token, lb_data): """ 로드밸런서 비동기 생성 Args: region: 리전 token: API 토큰 lb_data: 로드밸런서 생성 데이터 (dict) Returns: AsyncTask: 생성된 작업 객체 """ from .models import AsyncTask from .packages.loadbalancer import ApiLoadBalancer task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.LB_CREATE, request_data=lb_data, resource_name=lb_data.get("name", ""), ) api = ApiLoadBalancer(region, token) execute_async_task( task_id=task.id, task_func=api.create_loadbalancer, vip_subnet_id=lb_data["vip_subnet_id"], name=lb_data.get("name"), description=lb_data.get("description"), vip_address=lb_data.get("vip_address"), admin_state_up=lb_data.get("admin_state_up", True), loadbalancer_type=lb_data.get("loadbalancer_type", "shared"), ) return task def delete_loadbalancer_async(region, token, loadbalancer_id, loadbalancer_name=""): """로드밸런서 비동기 삭제""" from .models import AsyncTask from .packages.loadbalancer import ApiLoadBalancer task = AsyncTask.objects.create( task_type=AsyncTask.TaskType.LB_DELETE, request_data={"loadbalancer_id": loadbalancer_id}, resource_name=loadbalancer_name, resource_id=loadbalancer_id, ) api = ApiLoadBalancer(region, token) execute_async_task( task_id=task.id, task_func=api.delete_loadbalancer, loadbalancer_id=loadbalancer_id, ) return task