Files
msa-django-auth/users/views.py
icurfer e318855b14
All checks were successful
Build And Test / build-and-push (push) Successful in 2m17s
Add NHN Cloud credentials management and bump version to v0.0.19
- Add NHN Cloud credential fields to User model (tenant_id, username, encrypted password, storage_account)
- Add API endpoints for credentials CRUD operations
- Implement Fernet encryption for password storage

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-14 01:25:43 +09:00

433 lines
18 KiB
Python

# views.py
import logging
from opentelemetry import trace # ✅ OpenTelemetry 트레이서
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated, BasePermission
from rest_framework_simplejwt.views import TokenObtainPairView
from rest_framework import generics
from .serializers import RegisterSerializer, CustomTokenObtainPairSerializer, UserListSerializer
from .models import CustomUser
logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__) # ✅ 트레이서 생성
def get_request_info(request):
ip = request.META.get("REMOTE_ADDR", "unknown")
ua = request.META.get("HTTP_USER_AGENT", "unknown")
email = getattr(request.user, "email", "anonymous")
return email, ip, ua
class RegisterView(APIView):
def post(self, request):
with tracer.start_as_current_span("RegisterView POST") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
logger.info(
f"[REGISTER] user={user.email} | status=success | IP={ip} | UA={ua}"
)
# ✅ Jaeger 이벤트 등록
span.add_event("User registered", attributes={"email": user.email})
return Response(
{"message": "User registered successfully."},
status=status.HTTP_201_CREATED,
)
logger.warning(
f"[REGISTER] user={email} | status=fail | IP={ip} | UA={ua} | detail={serializer.errors}"
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class MeView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
with tracer.start_as_current_span("MeView GET") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
logger.debug(f"[ME GET] user={email} | IP={ip} | UA={ua}")
serializer = RegisterSerializer(request.user)
span.add_event(
"Me info retrieved", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response(serializer.data)
def put(self, request):
with tracer.start_as_current_span("MeView PUT") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
serializer = RegisterSerializer(
request.user, data=request.data, partial=True
)
if serializer.is_valid():
serializer.save()
logger.info(
f"[ME UPDATE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event(
"Me info updated", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response(serializer.data)
logger.warning(
f"[ME UPDATE] user={email} | status=fail | IP={ip} | UA={ua} | detail={serializer.errors}"
)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class CustomTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
def post(self, request, *args, **kwargs):
with tracer.start_as_current_span(
"TokenObtainPairView POST"
) as span: # ✅ Span 생성
ip = request.META.get("REMOTE_ADDR", "unknown")
ua = request.META.get("HTTP_USER_AGENT", "unknown")
email = request.data.get("email", "unknown")
logger.info(f"[LOGIN] user={email} | status=attempt | IP={ip} | UA={ua}")
response = super().post(request, *args, **kwargs)
if response.status_code == 200:
logger.info(
f"[LOGIN] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event(
"Login success", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
else:
logger.warning(
f"[LOGIN] user={email} | status=fail | IP={ip} | UA={ua} | detail={response.data}"
)
span.add_event(
"Login failed",
attributes={"email": email, "reason": str(response.data)},
) # ✅
return response
class SSHKeyUploadView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
with tracer.start_as_current_span(
"SSHKeyUploadView POST"
) as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
private_key = request.data.get("private_key")
key_name = request.data.get("key_name")
if not private_key or not key_name:
logger.warning(
f"[SSH UPLOAD] user={email} | status=fail | reason=missing_key_or_name | IP={ip} | UA={ua}"
)
return Response(
{"error": "private_key와 key_name 모두 필요합니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
user = request.user
user.save_private_key(private_key)
user.encrypted_private_key_name = key_name
user.save(
update_fields=[
"encrypted_private_key",
"encrypted_private_key_name",
]
)
logger.info(
f"[SSH UPLOAD] user={email} | status=success | key_name={key_name} | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key saved", attributes={"email": email, "key_name": key_name}
) # ✅
return Response({"message": "SSH key 저장 완료."}, status=201)
except Exception as e:
logger.exception(
f"[SSH UPLOAD] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response(
{"error": f"암호화 또는 저장 실패: {str(e)}"}, status=500
)
def delete(self, request):
with tracer.start_as_current_span(
"SSHKeyUploadView DELETE"
) as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
user = request.user
user.encrypted_private_key = None
user.encrypted_private_key_name = None
user.last_used_at = None
user.save(
update_fields=[
"encrypted_private_key",
"encrypted_private_key_name",
"last_used_at",
]
)
logger.info(
f"[SSH DELETE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key deleted", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response({"message": "SSH key deleted."}, status=200)
class SSHKeyInfoView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
with tracer.start_as_current_span("SSHKeyInfoView GET") as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
logger.debug(f"[SSH INFO] user={email} | IP={ip} | UA={ua}")
user = request.user
span.add_event(
"SSH key info retrieved", attributes={"email": email}
) # ✅ Jaeger 이벤트 등록
return Response(
{
"has_key": bool(user.encrypted_private_key),
"encrypted_private_key_name": user.encrypted_private_key_name,
"last_used_at": user.last_used_at,
}
)
class SSHKeyRetrieveView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
with tracer.start_as_current_span(
"SSHKeyRetrieveView GET"
) as span: # ✅ Span 생성
email, ip, ua = get_request_info(request)
user = request.user
if not user.encrypted_private_key:
logger.warning(
f"[SSH RETRIEVE] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key retrieve failed",
attributes={"email": email, "reason": "not_found"},
) # ✅
return Response(
{"error": "SSH 키가 등록되어 있지 않습니다."}, status=404
)
try:
decrypted_key = user.decrypt_private_key()
logger.info(
f"[SSH RETRIEVE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event("SSH key retrieved", attributes={"email": email}) # ✅
return Response({"ssh_key": decrypted_key})
except Exception as e:
logger.exception(
f"[SSH RETRIEVE] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
span.add_event(
"SSH key retrieve failed",
attributes={"email": email, "reason": str(e)},
) # ✅
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)
# ============================================
# 관리자용 사용자 관리 API
# ============================================
class IsAdminOrManager(BasePermission):
"""admin 또는 manager 등급만 접근 가능"""
def has_permission(self, request, view):
if not request.user or not request.user.is_authenticated:
return False
return request.user.grade in ['admin', 'manager']
class UserListView(generics.ListAPIView):
"""사용자 목록 조회 (관리자 전용)"""
queryset = CustomUser.objects.all().order_by('-created_at')
serializer_class = UserListSerializer
permission_classes = [IsAuthenticated, IsAdminOrManager]
def list(self, request, *args, **kwargs):
with tracer.start_as_current_span("UserListView GET") as span:
email, ip, ua = get_request_info(request)
logger.info(f"[USER LIST] admin={email} | IP={ip} | UA={ua}")
span.add_event("User list retrieved", attributes={"admin": email})
return super().list(request, *args, **kwargs)
class UserUpdateView(generics.RetrieveUpdateDestroyAPIView):
"""사용자 상태 수정/삭제 (관리자 전용)"""
queryset = CustomUser.objects.all()
serializer_class = UserListSerializer
permission_classes = [IsAuthenticated, IsAdminOrManager]
def partial_update(self, request, *args, **kwargs):
with tracer.start_as_current_span("UserUpdateView PATCH") as span:
admin_email, ip, ua = get_request_info(request)
instance = self.get_object()
target_email = instance.email
# is_active만 수정 가능
is_active = request.data.get('is_active')
if is_active is not None:
instance.is_active = is_active
instance.save(update_fields=['is_active'])
action = "activated" if is_active else "deactivated"
logger.info(
f"[USER UPDATE] admin={admin_email} | target={target_email} | action={action} | IP={ip} | UA={ua}"
)
span.add_event(
f"User {action}",
attributes={"admin": admin_email, "target": target_email}
)
serializer = self.get_serializer(instance)
return Response(serializer.data)
def destroy(self, request, *args, **kwargs):
with tracer.start_as_current_span("UserUpdateView DELETE") as span:
admin_email, ip, ua = get_request_info(request)
instance = self.get_object()
target_email = instance.email
# 자기 자신은 삭제 불가
if request.user.id == instance.id:
return Response(
{"error": "자기 자신의 계정은 삭제할 수 없습니다."},
status=status.HTTP_400_BAD_REQUEST
)
logger.info(
f"[USER DELETE] admin={admin_email} | target={target_email} | IP={ip} | UA={ua}"
)
span.add_event(
"User deleted",
attributes={"admin": admin_email, "target": target_email}
)
instance.delete()
return Response(
{"message": f"사용자 {target_email}이(가) 삭제되었습니다."},
status=status.HTTP_200_OK
)
# ============================================
# NHN Cloud 자격증명 관리 API
# ============================================
class NHNCloudCredentialsView(APIView):
"""NHN Cloud 자격증명 저장/조회/삭제"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""자격증명 조회 (비밀번호 제외)"""
with tracer.start_as_current_span("NHNCloudCredentialsView GET") as span:
email, ip, ua = get_request_info(request)
user = request.user
logger.debug(f"[NHN CREDENTIALS GET] user={email} | IP={ip} | UA={ua}")
span.add_event("NHN credentials retrieved", attributes={"email": email})
return Response({
"has_credentials": bool(user.nhn_tenant_id and user.encrypted_nhn_api_password),
"tenant_id": user.nhn_tenant_id or "",
"username": user.nhn_username or "",
"storage_account": user.nhn_storage_account or "",
})
def post(self, request):
"""자격증명 저장"""
with tracer.start_as_current_span("NHNCloudCredentialsView POST") as span:
email, ip, ua = get_request_info(request)
user = request.user
tenant_id = request.data.get("tenant_id")
username = request.data.get("username")
password = request.data.get("password")
storage_account = request.data.get("storage_account", "")
if not tenant_id or not username or not password:
logger.warning(
f"[NHN CREDENTIALS SAVE] user={email} | status=fail | reason=missing_fields | IP={ip} | UA={ua}"
)
return Response(
{"error": "tenant_id, username, password는 필수입니다."},
status=status.HTTP_400_BAD_REQUEST,
)
try:
user.save_nhn_credentials(tenant_id, username, password, storage_account)
logger.info(
f"[NHN CREDENTIALS SAVE] user={email} | status=success | IP={ip} | UA={ua}"
)
span.add_event("NHN credentials saved", attributes={"email": email})
return Response({"message": "NHN Cloud 자격증명이 저장되었습니다."}, status=201)
except Exception as e:
logger.exception(
f"[NHN CREDENTIALS SAVE] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"저장 실패: {str(e)}"}, status=500)
def delete(self, request):
"""자격증명 삭제"""
with tracer.start_as_current_span("NHNCloudCredentialsView DELETE") as span:
email, ip, ua = get_request_info(request)
user = request.user
user.nhn_tenant_id = None
user.nhn_username = None
user.encrypted_nhn_api_password = None
user.nhn_storage_account = None
user.save(update_fields=[
'nhn_tenant_id', 'nhn_username', 'encrypted_nhn_api_password', 'nhn_storage_account'
])
logger.info(f"[NHN CREDENTIALS DELETE] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("NHN credentials deleted", attributes={"email": email})
return Response({"message": "NHN Cloud 자격증명이 삭제되었습니다."})
class NHNCloudPasswordView(APIView):
"""NHN Cloud API 비밀번호 조회 (복호화) - msa-django-nhn에서 사용"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""복호화된 비밀번호 조회"""
with tracer.start_as_current_span("NHNCloudPasswordView GET") as span:
email, ip, ua = get_request_info(request)
user = request.user
if not user.encrypted_nhn_api_password:
logger.warning(
f"[NHN PASSWORD GET] user={email} | status=fail | reason=not_found | IP={ip} | UA={ua}"
)
return Response(
{"error": "NHN Cloud 자격증명이 등록되어 있지 않습니다."},
status=404
)
try:
decrypted_password = user.decrypt_nhn_password()
logger.info(f"[NHN PASSWORD GET] user={email} | status=success | IP={ip} | UA={ua}")
span.add_event("NHN password retrieved", attributes={"email": email})
return Response({
"tenant_id": user.nhn_tenant_id,
"username": user.nhn_username,
"password": decrypted_password,
"storage_account": user.nhn_storage_account or "",
})
except Exception as e:
logger.exception(
f"[NHN PASSWORD GET] user={email} | status=fail | reason=exception | IP={ip} | UA={ua}"
)
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)