Files
msa-django-auth/users/views.py
icurfer 1c7f241b37
All checks were successful
Build And Test / build-and-push (push) Successful in 3m38s
v0.0.31 | 비밀번호 변경 기능 추가
- ChangePasswordView API 추가 (사용자 본인 비밀번호 변경)
- 소셜 로그인 계정 비밀번호 설정 지원
- 관리자 비밀번호 초기화 기능 (UserUpdateView)
- RegisterSerializer에 has_password 필드 추가

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 00:26:53 +09:00

1611 lines
70 KiB
Python

# views.py
import logging
import secrets
from opentelemetry import trace # ✅ OpenTelemetry 트레이서
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated, BasePermission
from rest_framework_simplejwt.views import TokenObtainPairView
from rest_framework import generics
from django.conf import settings
from google.oauth2 import id_token
from google.auth.transport import requests as google_requests
from .serializers import RegisterSerializer, CustomTokenObtainPairSerializer, UserListSerializer, KVMServerSerializer
from .models import CustomUser, NHNCloudProject, KVMServer, SiteSettings
logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__) # ✅ 트레이서 생성
def get_request_info(request):
ip = request.META.get("REMOTE_ADDR", "unknown")
ua = request.META.get("HTTP_USER_AGENT", "unknown")
email = getattr(request.user, "email", "anonymous")
return email, ip, ua
class RegisterView(APIView):
def post(self, request):
with tracer.start_as_current_span("RegisterView POST") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
logger.info(
f"[REGISTER] user={user.email} | status=success | IP={ip} | UA={ua}"
)
# ✅ Jaeger 이벤트 등록
span.add_event("User registered", attributes={"email": user.email})
return Response(
{"message": "User registered successfully."},
status=status.HTTP_201_CREATED,
)
logger.warning(
f"[REGISTER] user={email} | status=fail | IP={ip} | UA={ua} | detail={serializer.errors}"
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class MeView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
with tracer.start_as_current_span("MeView GET") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
logger.debug(f"[ME GET] user={email} | IP={ip} | UA={ua}")
serializer = RegisterSerializer(request.user)
span.add_event(
"Me info retrieved", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response(serializer.data)
def put(self, request):
with tracer.start_as_current_span("MeView PUT") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
serializer = RegisterSerializer(
request.user, data=request.data, partial=True
)
if serializer.is_valid():
serializer.save()
logger.info(
f"[ME UPDATE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event(
"Me info updated", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response(serializer.data)
logger.warning(
f"[ME UPDATE] user={email} | status=fail | IP={ip} | UA={ua} | detail={serializer.errors}"
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ChangePasswordView(APIView):
"""사용자 본인 비밀번호 변경"""
permission_classes = [IsAuthenticated]
def post(self, request):
with tracer.start_as_current_span("ChangePasswordView POST") as span:
email, ip, ua = get_request_info(request)
user = request.user
current_password = request.data.get("current_password")
new_password = request.data.get("new_password")
confirm_password = request.data.get("confirm_password")
# 필수 필드 확인
if not current_password or not new_password or not confirm_password:
return Response(
{"error": "현재 비밀번호, 새 비밀번호, 비밀번호 확인은 필수입니다."},
status=status.HTTP_400_BAD_REQUEST
)
# 새 비밀번호 확인
if new_password != confirm_password:
return Response(
{"error": "새 비밀번호가 일치하지 않습니다."},
status=status.HTTP_400_BAD_REQUEST
)
# 비밀번호 길이 확인
if len(new_password) < 8:
return Response(
{"error": "비밀번호는 최소 8자 이상이어야 합니다."},
status=status.HTTP_400_BAD_REQUEST
)
# 소셜 로그인 전용 계정인 경우 (비밀번호 없음)
if not user.has_usable_password():
# 현재 비밀번호 없이 새 비밀번호 설정 가능
user.set_password(new_password)
user.save()
logger.info(f"[PASSWORD SET] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("Password set for social user", attributes={"email": email})
return Response({"message": "비밀번호가 설정되었습니다."})
# 현재 비밀번호 확인
if not user.check_password(current_password):
logger.warning(f"[PASSWORD CHANGE] user={email} | status=fail | reason=wrong_password | IP={ip} | UA={ua}")
return Response(
{"error": "현재 비밀번호가 일치하지 않습니다."},
status=status.HTTP_401_UNAUTHORIZED
)
# 새 비밀번호 설정
user.set_password(new_password)
user.save()
logger.info(f"[PASSWORD CHANGE] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("Password changed", attributes={"email": email})
return Response({"message": "비밀번호가 변경되었습니다."})
class CustomTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
with tracer.start_as_current_span(
"TokenObtainPairView POST"
) as span: # ✅ Span 생성
ip = request.META.get("REMOTE_ADDR", "unknown")
ua = request.META.get("HTTP_USER_AGENT", "unknown")
email = request.data.get("email", "unknown")
logger.info(f"[LOGIN] user={email} | status=attempt | IP={ip} | UA={ua}")
response = super().post(request, *args, **kwargs)
if response.status_code == 200:
logger.info(
f"[LOGIN] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event(
"Login success", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
else:
logger.warning(
f"[LOGIN] user={email} | status=fail | IP={ip} | UA={ua} | detail={response.data}"
)
span.add_event(
"Login failed",
attributes={"email": email, "reason": str(response.data)},
) # ✅
return response
class SSHKeyUploadView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
with tracer.start_as_current_span(
"SSHKeyUploadView POST"
) as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
private_key = request.data.get("private_key")
key_name = request.data.get("key_name")
if not private_key or not key_name:
logger.warning(
f"[SSH UPLOAD] user={email} | status=fail | reason=missing_key_or_name | IP={ip} | UA={ua}"
)
return Response(
{"error": "private_key와 key_name 모두 필요합니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
user = request.user
user.save_private_key(private_key)
user.encrypted_private_key_name = key_name
user.save(
update_fields=[
"encrypted_private_key",
"encrypted_private_key_name",
]
)
logger.info(
f"[SSH UPLOAD] user={email} | status=success | key_name={key_name} | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key saved", attributes={"email": email, "key_name": key_name}
) # ✅
return Response({"message": "SSH key 저장 완료."}, status=201)
except Exception as e:
logger.exception(
f"[SSH UPLOAD] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response(
{"error": f"암호화 또는 저장 실패: {str(e)}"}, status=500
)
def delete(self, request):
with tracer.start_as_current_span(
"SSHKeyUploadView DELETE"
) as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
user = request.user
user.encrypted_private_key = None
user.encrypted_private_key_name = None
user.last_used_at = None
user.save(
update_fields=[
"encrypted_private_key",
"encrypted_private_key_name",
"last_used_at",
]
)
logger.info(
f"[SSH DELETE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key deleted", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response({"message": "SSH key deleted."}, status=200)
class SSHKeyInfoView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
with tracer.start_as_current_span("SSHKeyInfoView GET") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
logger.debug(f"[SSH INFO] user={email} | IP={ip} | UA={ua}")
user = request.user
span.add_event(
"SSH key info retrieved", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response(
{
"has_key": bool(user.encrypted_private_key),
"encrypted_private_key_name": user.encrypted_private_key_name,
"last_used_at": user.last_used_at,
}
)
class SSHKeyRetrieveView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
with tracer.start_as_current_span(
"SSHKeyRetrieveView GET"
) as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
user = request.user
if not user.encrypted_private_key:
logger.warning(
f"[SSH RETRIEVE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key retrieve failed",
attributes={"email": email, "reason": "not_found"},
) # ✅
return Response(
{"error": "SSH 키가 등록되어 있지 않습니다."}, status=404
)
try:
decrypted_key = user.decrypt_private_key()
logger.info(
f"[SSH RETRIEVE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event("SSH key retrieved", attributes={"email": email}) # ✅
return Response({"ssh_key": decrypted_key})
except Exception as e:
logger.exception(
f"[SSH RETRIEVE] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key retrieve failed",
attributes={"email": email, "reason": str(e)},
) # ✅
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)
# ============================================
# 관리자용 사용자 관리 API
# ============================================
class IsAdminOrManager(BasePermission):
"""admin 또는 manager 등급만 접근 가능"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
return request.user.grade in ['admin', 'manager']
class UserListView(generics.ListAPIView):
"""사용자 목록 조회 (관리자 전용)"""
queryset = CustomUser.objects.all().order_by('-created_at')
serializer_class = UserListSerializer
permission_classes = [IsAuthenticated, IsAdminOrManager]
def list(self, request, *args, **kwargs):
with tracer.start_as_current_span("UserListView GET") as span:
email, ip, ua = get_request_info(request)
logger.info(f"[USER LIST] admin={email} | IP={ip} | UA={ua}")
span.add_event("User list retrieved", attributes={"admin": email})
return super().list(request, *args, **kwargs)
class UserUpdateView(generics.RetrieveUpdateDestroyAPIView):
"""사용자 상태 수정/삭제 (관리자 전용)"""
queryset = CustomUser.objects.all()
serializer_class = UserListSerializer
permission_classes = [IsAuthenticated, IsAdminOrManager]
def partial_update(self, request, *args, **kwargs):
with tracer.start_as_current_span("UserUpdateView PATCH") as span:
admin_email, ip, ua = get_request_info(request)
instance = self.get_object()
target_email = instance.email
update_fields = []
actions = []
# is_active 수정
is_active = request.data.get('is_active')
if is_active is not None:
instance.is_active = is_active
update_fields.append('is_active')
actions.append("activated" if is_active else "deactivated")
# grade 수정 (관리자만 admin 등급 부여 가능)
grade = request.data.get('grade')
if grade is not None:
valid_grades = ['admin', 'manager', 'user']
if grade not in valid_grades:
return Response(
{"error": f"유효하지 않은 등급입니다. 가능한 값: {valid_grades}"},
status=status.HTTP_400_BAD_REQUEST
)
# admin 등급 부여는 admin만 가능 (manager는 불가)
if grade == 'admin' and request.user.grade != 'admin':
return Response(
{"error": "admin 등급 부여는 admin만 가능합니다."},
status=status.HTTP_403_FORBIDDEN
)
# 자기 자신의 등급 하향 방지 (실수로 관리자 권한 잃는 것 방지)
if request.user.id == instance.id and instance.grade == 'admin' and grade != 'admin':
return Response(
{"error": "자신의 admin 등급을 변경할 수 없습니다."},
status=status.HTTP_400_BAD_REQUEST
)
instance.grade = grade
update_fields.append('grade')
actions.append(f"grade_changed_to_{grade}")
# 비밀번호 초기화 (관리자 전용)
new_password = request.data.get('new_password')
if new_password is not None:
if len(new_password) < 8:
return Response(
{"error": "비밀번호는 최소 8자 이상이어야 합니다."},
status=status.HTTP_400_BAD_REQUEST
)
instance.set_password(new_password)
instance.save() # set_password 후 별도 save 필요
actions.append("password_reset")
logger.info(
f"[USER PASSWORD RESET] admin={admin_email} | target={target_email} | IP={ip} | UA={ua}"
)
if update_fields:
instance.save(update_fields=update_fields)
logger.info(
f"[USER UPDATE] admin={admin_email} | target={target_email} | actions={actions} | IP={ip} | UA={ua}"
)
span.add_event(
"User updated",
attributes={"admin": admin_email, "target": target_email, "actions": str(actions)}
)
serializer = self.get_serializer(instance)
return Response(serializer.data)
def destroy(self, request, *args, **kwargs):
with tracer.start_as_current_span("UserUpdateView DELETE") as span:
admin_email, ip, ua = get_request_info(request)
instance = self.get_object()
target_email = instance.email
# 자기 자신은 삭제 불가
if request.user.id == instance.id:
return Response(
{"error": "자기 자신의 계정은 삭제할 수 없습니다."},
status=status.HTTP_400_BAD_REQUEST
)
logger.info(
f"[USER DELETE] admin={admin_email} | target={target_email} | IP={ip} | UA={ua}"
)
span.add_event(
"User deleted",
attributes={"admin": admin_email, "target": target_email}
)
instance.delete()
return Response(
{"message": f"사용자 {target_email}이(가) 삭제되었습니다."},
status=status.HTTP_200_OK
)
# ============================================
# NHN Cloud 자격증명 관리 API
# ============================================
class NHNCloudCredentialsView(APIView):
"""NHN Cloud 자격증명 저장/조회/삭제"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""자격증명 조회 (비밀번호 제외)"""
with tracer.start_as_current_span("NHNCloudCredentialsView GET") as span:
email, ip, ua = get_request_info(request)
user = request.user
logger.debug(f"[NHN CREDENTIALS GET] user={email} | IP={ip} | UA={ua}")
span.add_event("NHN credentials retrieved", attributes={"email": email})
return Response({
"has_credentials": bool(user.nhn_tenant_id and user.encrypted_nhn_api_password),
"tenant_id": user.nhn_tenant_id or "",
"username": user.nhn_username or "",
"storage_account": user.nhn_storage_account or "",
})
def post(self, request):
"""자격증명 저장"""
with tracer.start_as_current_span("NHNCloudCredentialsView POST") as span:
email, ip, ua = get_request_info(request)
user = request.user
tenant_id = request.data.get("tenant_id")
username = request.data.get("username")
password = request.data.get("password")
storage_account = request.data.get("storage_account", "")
if not tenant_id or not username or not password:
logger.warning(
f"[NHN CREDENTIALS SAVE] user={email} | status=fail | reason=missing_fields | IP={ip} | UA={ua}"
)
return Response(
{"error": "tenant_id, username, password는 필수입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
user.save_nhn_credentials(tenant_id, username, password, storage_account)
logger.info(
f"[NHN CREDENTIALS SAVE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event("NHN credentials saved", attributes={"email": email})
return Response({"message": "NHN Cloud 자격증명이 저장되었습니다."}, status=201)
except Exception as e:
logger.exception(
f"[NHN CREDENTIALS SAVE] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"저장 실패: {str(e)}"}, status=500)
def delete(self, request):
"""자격증명 삭제"""
with tracer.start_as_current_span("NHNCloudCredentialsView DELETE") as span:
email, ip, ua = get_request_info(request)
user = request.user
user.nhn_tenant_id = None
user.nhn_username = None
user.encrypted_nhn_api_password = None
user.nhn_storage_account = None
user.save(update_fields=[
'nhn_tenant_id', 'nhn_username', 'encrypted_nhn_api_password', 'nhn_storage_account'
])
logger.info(f"[NHN CREDENTIALS DELETE] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("NHN credentials deleted", attributes={"email": email})
return Response({"message": "NHN Cloud 자격증명이 삭제되었습니다."})
class NHNCloudPasswordView(APIView):
"""NHN Cloud API 비밀번호 조회 (복호화) - msa-django-nhn에서 사용"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""복호화된 비밀번호 조회"""
with tracer.start_as_current_span("NHNCloudPasswordView GET") as span:
email, ip, ua = get_request_info(request)
user = request.user
if not user.encrypted_nhn_api_password:
logger.warning(
f"[NHN PASSWORD GET] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response(
{"error": "NHN Cloud 자격증명이 등록되어 있지 않습니다."},
status=404
)
try:
decrypted_password = user.decrypt_nhn_password()
logger.info(f"[NHN PASSWORD GET] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("NHN password retrieved", attributes={"email": email})
return Response({
"tenant_id": user.nhn_tenant_id,
"username": user.nhn_username,
"password": decrypted_password,
"storage_account": user.nhn_storage_account or "",
})
except Exception as e:
logger.exception(
f"[NHN PASSWORD GET] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)
# ============================================
# NHN Cloud 프로젝트 관리 API (멀티 프로젝트 지원)
# ============================================
class NHNCloudProjectListView(APIView):
"""NHN Cloud 프로젝트 목록 조회 및 추가"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""프로젝트 목록 조회"""
with tracer.start_as_current_span("NHNCloudProjectListView GET") as span:
email, ip, ua = get_request_info(request)
projects = NHNCloudProject.objects.filter(user=request.user)
logger.debug(f"[NHN PROJECT LIST] user={email} | count={projects.count()} | IP={ip} | UA={ua}")
span.add_event("NHN projects listed", attributes={"email": email, "count": projects.count()})
data = [{
"id": p.id,
"name": p.name,
"tenant_id": p.tenant_id,
"username": p.username,
"storage_account": p.storage_account or "",
"dns_appkey": p.dns_appkey or "",
"is_active": p.is_active,
"created_at": p.created_at.isoformat(),
} for p in projects]
return Response(data)
def post(self, request):
"""프로젝트 추가"""
with tracer.start_as_current_span("NHNCloudProjectListView POST") as span:
email, ip, ua = get_request_info(request)
name = request.data.get("name")
tenant_id = request.data.get("tenant_id")
username = request.data.get("username")
password = request.data.get("password")
storage_account = request.data.get("storage_account", "")
dns_appkey = request.data.get("dns_appkey", "")
if not name or not tenant_id or not username or not password:
logger.warning(
f"[NHN PROJECT CREATE] user={email} | status=fail | reason=missing_fields | IP={ip} | UA={ua}"
)
return Response(
{"error": "name, tenant_id, username, password는 필수입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
# 중복 체크
if NHNCloudProject.objects.filter(user=request.user, tenant_id=tenant_id).exists():
logger.warning(
f"[NHN PROJECT CREATE] user={email} | status=fail | reason=duplicate | IP={ip} | UA={ua}"
)
return Response(
{"error": "이미 등록된 Tenant ID입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
# 첫 프로젝트면 자동 활성화
is_first = not NHNCloudProject.objects.filter(user=request.user).exists()
project = NHNCloudProject(
user=request.user,
name=name,
tenant_id=tenant_id,
username=username,
storage_account=storage_account,
dns_appkey=dns_appkey,
is_active=is_first,
)
# 암호화된 비밀번호 설정 후 저장
project.encrypted_password = project.encrypt_password(password)
project.save()
logger.info(
f"[NHN PROJECT CREATE] user={email} | project={name} | is_active={is_first} | IP={ip} | UA={ua}"
)
span.add_event("NHN project created", attributes={"email": email, "project": name})
return Response({
"id": project.id,
"name": project.name,
"tenant_id": project.tenant_id,
"username": project.username,
"storage_account": project.storage_account or "",
"dns_appkey": project.dns_appkey or "",
"is_active": project.is_active,
"created_at": project.created_at.isoformat(),
}, status=status.HTTP_201_CREATED)
except Exception as e:
logger.exception(
f"[NHN PROJECT CREATE] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"프로젝트 생성 실패: {str(e)}"}, status=500)
class NHNCloudProjectDetailView(APIView):
"""NHN Cloud 프로젝트 상세 (조회/수정/삭제)"""
permission_classes = [IsAuthenticated]
def get_project(self, request, project_id):
"""프로젝트 조회 (본인 것만)"""
try:
return NHNCloudProject.objects.get(id=project_id, user=request.user)
except NHNCloudProject.DoesNotExist:
return None
def get(self, request, project_id):
"""프로젝트 상세 조회"""
with tracer.start_as_current_span("NHNCloudProjectDetailView GET") as span:
email, ip, ua = get_request_info(request)
project = self.get_project(request, project_id)
if not project:
return Response({"error": "프로젝트를 찾을 수 없습니다."}, status=404)
span.add_event("NHN project retrieved", attributes={"email": email, "project": project.name})
return Response({
"id": project.id,
"name": project.name,
"tenant_id": project.tenant_id,
"username": project.username,
"storage_account": project.storage_account or "",
"dns_appkey": project.dns_appkey or "",
"is_active": project.is_active,
"created_at": project.created_at.isoformat(),
})
def put(self, request, project_id):
"""프로젝트 수정"""
with tracer.start_as_current_span("NHNCloudProjectDetailView PUT") as span:
email, ip, ua = get_request_info(request)
project = self.get_project(request, project_id)
if not project:
logger.warning(
f"[NHN PROJECT UPDATE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "프로젝트를 찾을 수 없습니다."}, status=404)
# 수정 가능한 필드
name = request.data.get("name")
tenant_id = request.data.get("tenant_id")
username = request.data.get("username")
password = request.data.get("password") # 비어있으면 변경 안함
storage_account = request.data.get("storage_account")
dns_appkey = request.data.get("dns_appkey")
# 필수 필드 검증
if not name or not tenant_id or not username:
return Response(
{"error": "name, tenant_id, username은 필수입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
# tenant_id 중복 체크 (자기 자신 제외)
if NHNCloudProject.objects.filter(
user=request.user, tenant_id=tenant_id
).exclude(id=project_id).exists():
return Response(
{"error": "이미 등록된 Tenant ID입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
project.name = name
project.tenant_id = tenant_id
project.username = username
if storage_account is not None:
project.storage_account = storage_account
if dns_appkey is not None:
project.dns_appkey = dns_appkey
# 비밀번호가 제공된 경우에만 변경
if password:
project.encrypted_password = project.encrypt_password(password)
project.save()
logger.info(
f"[NHN PROJECT UPDATE] user={email} | project={name} | IP={ip} | UA={ua}"
)
span.add_event("NHN project updated", attributes={"email": email, "project": name})
return Response({
"id": project.id,
"name": project.name,
"tenant_id": project.tenant_id,
"username": project.username,
"storage_account": project.storage_account or "",
"dns_appkey": project.dns_appkey or "",
"is_active": project.is_active,
"created_at": project.created_at.isoformat(),
})
except Exception as e:
logger.exception(
f"[NHN PROJECT UPDATE] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"프로젝트 수정 실패: {str(e)}"}, status=500)
def delete(self, request, project_id):
"""프로젝트 삭제"""
with tracer.start_as_current_span("NHNCloudProjectDetailView DELETE") as span:
email, ip, ua = get_request_info(request)
project = self.get_project(request, project_id)
if not project:
logger.warning(
f"[NHN PROJECT DELETE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "프로젝트를 찾을 수 없습니다."}, status=404)
project_name = project.name
was_active = project.is_active
project.delete()
# 삭제된 프로젝트가 활성이었으면 다른 프로젝트 활성화
if was_active:
other_project = NHNCloudProject.objects.filter(user=request.user).first()
if other_project:
other_project.is_active = True
other_project.save(update_fields=['is_active'])
logger.info(
f"[NHN PROJECT DELETE] user={email} | project={project_name} | IP={ip} | UA={ua}"
)
span.add_event("NHN project deleted", attributes={"email": email, "project": project_name})
return Response(status=status.HTTP_204_NO_CONTENT)
class NHNCloudProjectActivateView(APIView):
"""NHN Cloud 프로젝트 활성화"""
permission_classes = [IsAuthenticated]
def patch(self, request, project_id):
"""프로젝트 활성화 (기존 활성 해제)"""
with tracer.start_as_current_span("NHNCloudProjectActivateView PATCH") as span:
email, ip, ua = get_request_info(request)
try:
project = NHNCloudProject.objects.get(id=project_id, user=request.user)
except NHNCloudProject.DoesNotExist:
logger.warning(
f"[NHN PROJECT ACTIVATE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "프로젝트를 찾을 수 없습니다."}, status=404)
# 기존 활성 프로젝트 비활성화
NHNCloudProject.objects.filter(user=request.user, is_active=True).update(is_active=False)
# 새 프로젝트 활성화
project.is_active = True
project.save(update_fields=['is_active'])
logger.info(
f"[NHN PROJECT ACTIVATE] user={email} | project={project.name} | IP={ip} | UA={ua}"
)
span.add_event("NHN project activated", attributes={"email": email, "project": project.name})
return Response({
"message": "프로젝트가 활성화되었습니다.",
"id": project.id,
"name": project.name,
})
class NHNCloudProjectPasswordView(APIView):
"""NHN Cloud 프로젝트 비밀번호 조회 (복호화)"""
permission_classes = [IsAuthenticated]
def get(self, request, project_id):
"""복호화된 비밀번호 조회"""
with tracer.start_as_current_span("NHNCloudProjectPasswordView GET") as span:
email, ip, ua = get_request_info(request)
try:
project = NHNCloudProject.objects.get(id=project_id, user=request.user)
except NHNCloudProject.DoesNotExist:
logger.warning(
f"[NHN PROJECT PASSWORD] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "프로젝트를 찾을 수 없습니다."}, status=404)
try:
decrypted_password = project.decrypt_password()
logger.info(f"[NHN PROJECT PASSWORD] user={email} | project={project.name} | IP={ip} | UA={ua}")
span.add_event("NHN project password retrieved", attributes={"email": email, "project": project.name})
return Response({
"tenant_id": project.tenant_id,
"username": project.username,
"password": decrypted_password,
"storage_account": project.storage_account or "",
"dns_appkey": project.dns_appkey or "",
})
except Exception as e:
logger.exception(
f"[NHN PROJECT PASSWORD] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)
# ============================================
# KVM 서버 관리 API (멀티 서버 지원)
# ============================================
class KVMServerListView(APIView):
"""KVM 서버 목록 조회 및 추가"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""서버 목록 조회"""
with tracer.start_as_current_span("KVMServerListView GET") as span:
email, ip, ua = get_request_info(request)
servers = KVMServer.objects.filter(user=request.user)
logger.debug(f"[KVM SERVER LIST] user={email} | count={servers.count()} | IP={ip} | UA={ua}")
span.add_event("KVM servers listed", attributes={"email": email, "count": servers.count()})
serializer = KVMServerSerializer(servers, many=True)
return Response(serializer.data)
def post(self, request):
"""서버 추가"""
with tracer.start_as_current_span("KVMServerListView POST") as span:
email, ip, ua = get_request_info(request)
# 중복 체크
host = request.data.get("host")
port = request.data.get("port", 22)
if KVMServer.objects.filter(user=request.user, host=host, port=port).exists():
logger.warning(
f"[KVM SERVER CREATE] user={email} | status=fail | reason=duplicate | IP={ip} | UA={ua}"
)
return Response(
{"error": "이미 등록된 서버입니다 (동일한 호스트:포트)."},
status=status.HTTP_400_BAD_REQUEST,
)
serializer = KVMServerSerializer(data=request.data)
if serializer.is_valid():
server = serializer.save(user=request.user)
logger.info(
f"[KVM SERVER CREATE] user={email} | server={server.name} | host={server.host} | IP={ip} | UA={ua}"
)
span.add_event("KVM server created", attributes={"email": email, "server": server.name})
return Response(KVMServerSerializer(server).data, status=status.HTTP_201_CREATED)
logger.warning(
f"[KVM SERVER CREATE] user={email} | status=fail | reason=validation | IP={ip} | UA={ua}"
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class KVMServerDetailView(APIView):
"""KVM 서버 상세 (조회/수정/삭제)"""
permission_classes = [IsAuthenticated]
def get_server(self, request, server_id):
"""서버 조회 (본인 것만)"""
try:
return KVMServer.objects.get(id=server_id, user=request.user)
except KVMServer.DoesNotExist:
return None
def get(self, request, server_id):
"""서버 상세 조회"""
with tracer.start_as_current_span("KVMServerDetailView GET") as span:
email, ip, ua = get_request_info(request)
server = self.get_server(request, server_id)
if not server:
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
span.add_event("KVM server retrieved", attributes={"email": email, "server": server.name})
return Response(KVMServerSerializer(server).data)
def put(self, request, server_id):
"""서버 수정"""
with tracer.start_as_current_span("KVMServerDetailView PUT") as span:
email, ip, ua = get_request_info(request)
server = self.get_server(request, server_id)
if not server:
logger.warning(
f"[KVM SERVER UPDATE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
# 호스트:포트 중복 체크 (자기 자신 제외)
host = request.data.get("host", server.host)
port = request.data.get("port", server.port)
if KVMServer.objects.filter(
user=request.user, host=host, port=port
).exclude(id=server_id).exists():
return Response(
{"error": "이미 등록된 서버입니다 (동일한 호스트:포트)."},
status=status.HTTP_400_BAD_REQUEST,
)
serializer = KVMServerSerializer(server, data=request.data, partial=True)
if serializer.is_valid():
server = serializer.save()
logger.info(
f"[KVM SERVER UPDATE] user={email} | server={server.name} | IP={ip} | UA={ua}"
)
span.add_event("KVM server updated", attributes={"email": email, "server": server.name})
return Response(KVMServerSerializer(server).data)
logger.warning(
f"[KVM SERVER UPDATE] user={email} | status=fail | reason=validation | IP={ip} | UA={ua}"
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def delete(self, request, server_id):
"""서버 삭제"""
with tracer.start_as_current_span("KVMServerDetailView DELETE") as span:
email, ip, ua = get_request_info(request)
server = self.get_server(request, server_id)
if not server:
logger.warning(
f"[KVM SERVER DELETE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
server_name = server.name
server.delete()
logger.info(
f"[KVM SERVER DELETE] user={email} | server={server_name} | IP={ip} | UA={ua}"
)
span.add_event("KVM server deleted", attributes={"email": email, "server": server_name})
return Response(status=status.HTTP_204_NO_CONTENT)
class KVMServerActivateView(APIView):
"""KVM 서버 활성화/비활성화"""
permission_classes = [IsAuthenticated]
def patch(self, request, server_id):
"""서버 활성화 상태 토글"""
with tracer.start_as_current_span("KVMServerActivateView PATCH") as span:
email, ip, ua = get_request_info(request)
try:
server = KVMServer.objects.get(id=server_id, user=request.user)
except KVMServer.DoesNotExist:
logger.warning(
f"[KVM SERVER ACTIVATE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
# 요청에서 is_active 값 가져오기 (없으면 토글)
is_active = request.data.get('is_active')
if is_active is None:
server.is_active = not server.is_active
else:
server.is_active = is_active
server.save(update_fields=['is_active'])
action = "activated" if server.is_active else "deactivated"
logger.info(
f"[KVM SERVER ACTIVATE] user={email} | server={server.name} | action={action} | IP={ip} | UA={ua}"
)
span.add_event(f"KVM server {action}", attributes={"email": email, "server": server.name})
return Response({
"message": f"서버가 {'활성화' if server.is_active else '비활성화'}되었습니다.",
"id": server.id,
"name": server.name,
"is_active": server.is_active,
})
class KVMServerSSHKeyView(APIView):
"""KVM 서버 SSH 키 조회 (복호화) - msa-django-libvirt에서 사용"""
permission_classes = [IsAuthenticated]
def get(self, request, server_id):
"""복호화된 SSH 키와 접속 정보 조회"""
with tracer.start_as_current_span("KVMServerSSHKeyView GET") as span:
email, ip, ua = get_request_info(request)
try:
server = KVMServer.objects.get(id=server_id, user=request.user)
except KVMServer.DoesNotExist:
logger.warning(
f"[KVM SERVER SSH KEY] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
if not server.encrypted_private_key:
logger.warning(
f"[KVM SERVER SSH KEY] user={email} | server={server.name} | status=fail | reason=no_key | IP={ip} | UA={ua}"
)
return Response(
{"error": "SSH 키가 등록되어 있지 않습니다."},
status=404
)
try:
decrypted_key = server.decrypt_private_key()
logger.info(
f"[KVM SERVER SSH KEY] user={email} | server={server.name} | IP={ip} | UA={ua}"
)
span.add_event("KVM server SSH key retrieved", attributes={"email": email, "server": server.name})
return Response({
"id": server.id,
"name": server.name,
"host": server.host,
"port": server.port,
"username": server.username,
"private_key": decrypted_key,
"libvirt_uri": server.get_libvirt_uri(),
})
except Exception as e:
logger.exception(
f"[KVM SERVER SSH KEY] user={email} | server={server.name} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)
class KVMServerSSHKeyUploadView(APIView):
"""KVM 서버 SSH 키 업로드/삭제"""
permission_classes = [IsAuthenticated]
def get_server(self, request, server_id):
try:
return KVMServer.objects.get(id=server_id, user=request.user)
except KVMServer.DoesNotExist:
return None
def post(self, request, server_id):
"""SSH 키 업로드"""
with tracer.start_as_current_span("KVMServerSSHKeyUploadView POST") as span:
email, ip, ua = get_request_info(request)
server = self.get_server(request, server_id)
if not server:
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
private_key = request.data.get("private_key")
key_name = request.data.get("key_name")
if not private_key:
logger.warning(
f"[KVM SERVER SSH UPLOAD] user={email} | server={server.name} | status=fail | reason=missing_key | IP={ip} | UA={ua}"
)
return Response(
{"error": "private_key는 필수입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
server.save_ssh_key(private_key, key_name)
logger.info(
f"[KVM SERVER SSH UPLOAD] user={email} | server={server.name} | IP={ip} | UA={ua}"
)
span.add_event("KVM server SSH key uploaded", attributes={"email": email, "server": server.name})
return Response({"message": "SSH 키가 저장되었습니다."}, status=201)
except Exception as e:
logger.exception(
f"[KVM SERVER SSH UPLOAD] user={email} | server={server.name} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"저장 실패: {str(e)}"}, status=500)
def delete(self, request, server_id):
"""SSH 키 삭제"""
with tracer.start_as_current_span("KVMServerSSHKeyUploadView DELETE") as span:
email, ip, ua = get_request_info(request)
server = self.get_server(request, server_id)
if not server:
return Response({"error": "서버를 찾을 수 없습니다."}, status=404)
server.encrypted_private_key = None
server.encrypted_private_key_name = None
server.last_used_at = None
server.save(update_fields=['encrypted_private_key', 'encrypted_private_key_name', 'last_used_at'])
logger.info(
f"[KVM SERVER SSH DELETE] user={email} | server={server.name} | IP={ip} | UA={ua}"
)
span.add_event("KVM server SSH key deleted", attributes={"email": email, "server": server.name})
return Response({"message": "SSH 키가 삭제되었습니다."})
# ============================================
# Google 소셜 로그인 API
# ============================================
class GoogleLoginView(APIView):
"""
Google 소셜 로그인
- Google ID Token 검증
- 기존 사용자 조회 또는 자동 회원가입
- 동일 이메일 계정 연동
- JWT 토큰 발급
"""
def post(self, request):
with tracer.start_as_current_span("GoogleLoginView POST") as span:
ip = request.META.get("REMOTE_ADDR", "unknown")
ua = request.META.get("HTTP_USER_AGENT", "unknown")
credential = request.data.get("credential")
if not credential:
logger.warning(f"[GOOGLE LOGIN] status=fail | reason=missing_credential | IP={ip} | UA={ua}")
return Response(
{"error": "Google credential이 필요합니다."},
status=status.HTTP_400_BAD_REQUEST
)
# GOOGLE_CLIENT_ID 확인
if not settings.GOOGLE_CLIENT_ID:
logger.error(f"[GOOGLE LOGIN] status=fail | reason=missing_client_id | IP={ip} | UA={ua}")
return Response(
{"error": "서버에 Google Client ID가 설정되지 않았습니다."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
try:
# Google ID Token 검증
idinfo = id_token.verify_oauth2_token(
credential,
google_requests.Request(),
settings.GOOGLE_CLIENT_ID
)
# 토큰 정보 추출
google_id = idinfo.get("sub") # Google 고유 ID
email = idinfo.get("email")
name = idinfo.get("name", "")
picture = idinfo.get("picture", "")
if not email:
logger.warning(f"[GOOGLE LOGIN] status=fail | reason=no_email | IP={ip} | UA={ua}")
return Response(
{"error": "Google 계정에서 이메일을 가져올 수 없습니다."},
status=status.HTTP_400_BAD_REQUEST
)
logger.info(f"[GOOGLE LOGIN] email={email} | google_id={google_id} | status=token_verified | IP={ip} | UA={ua}")
# 1. social_id로 기존 사용자 조회 (Google로 가입한 사용자)
user = CustomUser.objects.filter(social_provider="google", social_id=google_id).first()
if user:
# 기존 Google 계정 사용자 로그인
logger.info(f"[GOOGLE LOGIN] user={email} | status=existing_social_user | IP={ip} | UA={ua}")
# 프로필 이미지 업데이트
if picture and user.profile_image != picture:
user.profile_image = picture
user.save(update_fields=["profile_image"])
else:
# 2. 이메일로 기존 사용자 조회
existing_user = CustomUser.objects.filter(email=email).first()
if existing_user:
# 기존 계정이 일반 가입인 경우 (social_provider가 없음)
if not existing_user.social_provider:
logger.info(f"[GOOGLE LOGIN] user={email} | status=need_password | reason=email_exists_need_link | IP={ip} | UA={ua}")
return Response(
{
"error": "이미 해당 이메일로 가입된 계정이 있습니다.",
"code": "EMAIL_EXISTS_NEED_LINK",
"message": "기존 계정 비밀번호를 입력하면 Google 계정을 연동할 수 있습니다.",
"email": email,
"google_id": google_id,
"google_name": name,
"google_picture": picture,
},
status=status.HTTP_409_CONFLICT
)
# 다른 소셜 로그인으로 가입한 경우 (예: kakao)
elif existing_user.social_provider != "google":
logger.warning(f"[GOOGLE LOGIN] user={email} | status=fail | reason=other_social_provider | IP={ip} | UA={ua}")
return Response(
{
"error": f"해당 이메일은 {existing_user.social_provider} 계정으로 가입되어 있습니다.",
"code": "OTHER_SOCIAL_PROVIDER",
"message": f"{existing_user.social_provider} 로그인을 이용해주세요."
},
status=status.HTTP_409_CONFLICT
)
# Google social_id가 다른 경우 (동일 이메일, 다른 Google 계정)
else:
logger.warning(f"[GOOGLE LOGIN] user={email} | status=fail | reason=different_google_account | IP={ip} | UA={ua}")
return Response(
{
"error": "다른 Google 계정으로 이미 가입되어 있습니다.",
"code": "DIFFERENT_GOOGLE_ACCOUNT"
},
status=status.HTTP_409_CONFLICT
)
else:
# 3. 신규 사용자 자동 회원가입 (기존 계정이 없는 경우)
# 고유한 name 생성 (이메일 앞부분 + 랜덤 문자열)
base_name = name or email.split("@")[0]
unique_name = base_name
counter = 1
while CustomUser.objects.filter(name=unique_name).exists():
unique_name = f"{base_name}_{secrets.token_hex(3)}"
counter += 1
if counter > 10:
unique_name = f"{base_name}_{secrets.token_hex(6)}"
break
user = CustomUser.objects.create(
email=email,
name=unique_name,
social_provider="google",
social_id=google_id,
profile_image=picture,
is_active=False, # 관리자 승인 필요
)
# 비밀번호 없이 사용 불가하게 설정
user.set_unusable_password()
user.save()
logger.info(f"[GOOGLE LOGIN] user={email} | status=new_user_created_pending | IP={ip} | UA={ua}")
# 승인 대기 응답 반환
span.add_event("Google login - pending approval", attributes={"email": email})
return Response(
{
"error": "회원가입이 완료되었습니다. 관리자 승인 후 로그인할 수 있습니다.",
"code": "PENDING_APPROVAL",
"message": "관리자 승인 대기 중입니다.",
},
status=status.HTTP_403_FORBIDDEN
)
# 사용자 활성 상태 확인
if not user.is_active:
logger.warning(f"[GOOGLE LOGIN] user={email} | status=fail | reason=inactive_user | IP={ip} | UA={ua}")
return Response(
{"error": "비활성화된 계정입니다. 관리자에게 문의하세요."},
status=status.HTTP_403_FORBIDDEN
)
# JWT 토큰 발급 (커스텀 클레임 포함)
refresh = CustomTokenObtainPairSerializer.get_token(user)
access = refresh.access_token
# 기존 로그인 응답과 동일한 형식
span.add_event("Google login success", attributes={"email": email})
logger.info(f"[GOOGLE LOGIN] user={email} | status=success | IP={ip} | UA={ua}")
return Response({
"access": str(access),
"refresh": str(refresh),
"user": {
"id": user.id,
"email": user.email,
"name": user.name,
"grade": user.grade,
"profile_image": user.profile_image,
"social_provider": user.social_provider,
}
}, status=status.HTTP_200_OK)
except ValueError as e:
# 토큰 검증 실패
logger.warning(f"[GOOGLE LOGIN] status=fail | reason=invalid_token | error={str(e)} | IP={ip} | UA={ua}")
span.add_event("Google login failed", attributes={"reason": "invalid_token", "error": str(e)})
return Response(
{"error": "유효하지 않은 Google 토큰입니다."},
status=status.HTTP_401_UNAUTHORIZED
)
except Exception as e:
logger.exception(f"[GOOGLE LOGIN] status=fail | reason=exception | IP={ip} | UA={ua}")
span.add_event("Google login failed", attributes={"reason": "exception", "error": str(e)})
return Response(
{"error": f"로그인 처리 중 오류가 발생했습니다: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class GoogleLinkWithPasswordView(APIView):
"""
비밀번호 확인 후 Google 계정 연동
- 일반 가입 사용자가 Google 로그인 시도 시 비밀번호 확인 후 연동
"""
def post(self, request):
with tracer.start_as_current_span("GoogleLinkWithPasswordView POST") as span:
ip = request.META.get("REMOTE_ADDR", "unknown")
ua = request.META.get("HTTP_USER_AGENT", "unknown")
email = request.data.get("email")
password = request.data.get("password")
google_id = request.data.get("google_id")
google_picture = request.data.get("google_picture", "")
if not email or not password or not google_id:
return Response(
{"error": "email, password, google_id는 필수입니다."},
status=status.HTTP_400_BAD_REQUEST
)
try:
user = CustomUser.objects.get(email=email)
except CustomUser.DoesNotExist:
logger.warning(f"[GOOGLE LINK] email={email} | status=fail | reason=user_not_found | IP={ip} | UA={ua}")
return Response(
{"error": "사용자를 찾을 수 없습니다."},
status=status.HTTP_404_NOT_FOUND
)
# 비밀번호 확인
if not user.check_password(password):
logger.warning(f"[GOOGLE LINK] user={email} | status=fail | reason=wrong_password | IP={ip} | UA={ua}")
return Response(
{"error": "비밀번호가 일치하지 않습니다."},
status=status.HTTP_401_UNAUTHORIZED
)
# 이미 다른 소셜 계정으로 연동된 경우
if user.social_provider and user.social_provider != "google":
return Response(
{"error": f"이미 {user.social_provider} 계정으로 연동되어 있습니다."},
status=status.HTTP_409_CONFLICT
)
# Google 연동
user.social_provider = "google"
user.social_id = google_id
if google_picture:
user.profile_image = google_picture
user.save(update_fields=["social_provider", "social_id", "profile_image"])
logger.info(f"[GOOGLE LINK] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("Google account linked", attributes={"email": email})
# JWT 토큰 발급 (커스텀 클레임 포함)
refresh = CustomTokenObtainPairSerializer.get_token(user)
access = refresh.access_token
return Response({
"message": "Google 계정이 연동되었습니다.",
"access": str(access),
"refresh": str(refresh),
"user": {
"id": user.id,
"email": user.email,
"name": user.name,
"grade": user.grade,
"profile_image": user.profile_image,
"social_provider": user.social_provider,
}
}, status=status.HTTP_200_OK)
class GoogleLinkView(APIView):
"""
로그인된 상태에서 Google 계정 연동
- 마이페이지에서 Google 연동 버튼 클릭 시
"""
permission_classes = [IsAuthenticated]
def post(self, request):
with tracer.start_as_current_span("GoogleLinkView POST") as span:
email, ip, ua = get_request_info(request)
user = request.user
credential = request.data.get("credential")
if not credential:
return Response(
{"error": "Google credential이 필요합니다."},
status=status.HTTP_400_BAD_REQUEST
)
# 이미 Google 연동된 경우
if user.social_provider == "google":
return Response(
{"error": "이미 Google 계정이 연동되어 있습니다."},
status=status.HTTP_409_CONFLICT
)
# 다른 소셜 계정으로 연동된 경우
if user.social_provider:
return Response(
{"error": f"이미 {user.social_provider} 계정으로 연동되어 있습니다."},
status=status.HTTP_409_CONFLICT
)
if not settings.GOOGLE_CLIENT_ID:
return Response(
{"error": "서버에 Google Client ID가 설정되지 않았습니다."},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
try:
# Google ID Token 검증
idinfo = id_token.verify_oauth2_token(
credential,
google_requests.Request(),
settings.GOOGLE_CLIENT_ID
)
google_id = idinfo.get("sub")
google_email = idinfo.get("email")
picture = idinfo.get("picture", "")
# 이메일 일치 확인
if google_email != user.email:
logger.warning(f"[GOOGLE LINK] user={email} | google_email={google_email} | status=fail | reason=email_mismatch | IP={ip} | UA={ua}")
return Response(
{"error": "로그인된 계정의 이메일과 Google 이메일이 일치하지 않습니다."},
status=status.HTTP_400_BAD_REQUEST
)
# 해당 Google 계정이 다른 사용자에게 이미 연동되어 있는지 확인
existing_google_user = CustomUser.objects.filter(social_provider="google", social_id=google_id).exclude(id=user.id).first()
if existing_google_user:
return Response(
{"error": "해당 Google 계정은 이미 다른 사용자에게 연동되어 있습니다."},
status=status.HTTP_409_CONFLICT
)
# Google 연동
user.social_provider = "google"
user.social_id = google_id
if picture:
user.profile_image = picture
user.save(update_fields=["social_provider", "social_id", "profile_image"])
logger.info(f"[GOOGLE LINK] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("Google account linked", attributes={"email": email})
return Response({
"message": "Google 계정이 연동되었습니다.",
"social_provider": user.social_provider,
"profile_image": user.profile_image,
}, status=status.HTTP_200_OK)
except ValueError as e:
logger.warning(f"[GOOGLE LINK] user={email} | status=fail | reason=invalid_token | error={str(e)} | IP={ip} | UA={ua}")
return Response(
{"error": "유효하지 않은 Google 토큰입니다."},
status=status.HTTP_401_UNAUTHORIZED
)
except Exception as e:
logger.exception(f"[GOOGLE LINK] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}")
return Response(
{"error": f"연동 처리 중 오류가 발생했습니다: {str(e)}"},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class GoogleUnlinkView(APIView):
"""
Google 계정 연동 해제
- 비밀번호가 설정되어 있어야 연동 해제 가능 (로그인 수단 확보)
"""
permission_classes = [IsAuthenticated]
def post(self, request):
with tracer.start_as_current_span("GoogleUnlinkView POST") as span:
email, ip, ua = get_request_info(request)
user = request.user
# Google 연동 확인
if user.social_provider != "google":
return Response(
{"error": "Google 계정이 연동되어 있지 않습니다."},
status=status.HTTP_400_BAD_REQUEST
)
# 비밀번호 설정 여부 확인 (소셜 전용 계정인 경우 연동 해제 불가)
if not user.has_usable_password():
return Response(
{"error": "비밀번호가 설정되어 있지 않아 연동 해제할 수 없습니다. 먼저 비밀번호를 설정해주세요."},
status=status.HTTP_400_BAD_REQUEST
)
# 연동 해제
user.social_provider = None
user.social_id = None
# profile_image는 유지 (사용자가 원하면 별도로 삭제)
user.save(update_fields=["social_provider", "social_id"])
logger.info(f"[GOOGLE UNLINK] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("Google account unlinked", attributes={"email": email})
return Response({
"message": "Google 계정 연동이 해제되었습니다.",
"social_provider": None,
}, status=status.HTTP_200_OK)
# ============================================
# 사이트 설정 API
# ============================================
class IsAdmin(BasePermission):
"""admin 등급만 접근 가능"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
return request.user.grade == 'admin'
class SiteSettingsView(APIView):
"""
사이트 설정 조회/수정 (관리자 전용)
"""
def get_permissions(self):
# GET은 인증된 사용자면 누구나 가능 (설정 조회)
# PUT/PATCH는 관리자만 가능 (설정 변경)
if self.request.method in ['PUT', 'PATCH']:
return [IsAuthenticated(), IsAdmin()]
return [] # GET은 누구나 가능 (비로그인 포함)
def get(self, request):
"""사이트 설정 조회 (공개)"""
with tracer.start_as_current_span("SiteSettingsView GET") as span:
settings_obj = SiteSettings.get_settings()
span.add_event("Site settings retrieved")
return Response({
"google_login_enabled": settings_obj.google_login_enabled,
"updated_at": settings_obj.updated_at.isoformat() if settings_obj.updated_at else None,
})
def patch(self, request):
"""사이트 설정 수정 (관리자 전용)"""
with tracer.start_as_current_span("SiteSettingsView PATCH") as span:
email, ip, ua = get_request_info(request)
settings_obj = SiteSettings.get_settings()
# 업데이트할 필드 처리
updated_fields = []
if 'google_login_enabled' in request.data:
settings_obj.google_login_enabled = request.data['google_login_enabled']
updated_fields.append('google_login_enabled')
if updated_fields:
settings_obj.save()
logger.info(
f"[SITE SETTINGS UPDATE] admin={email} | fields={updated_fields} | IP={ip} | UA={ua}"
)
span.add_event("Site settings updated", attributes={
"admin": email,
"fields": str(updated_fields)
})
return Response({
"message": "설정이 저장되었습니다.",
"google_login_enabled": settings_obj.google_login_enabled,
"updated_at": settings_obj.updated_at.isoformat() if settings_obj.updated_at else None,
})