Files
msa-django-auth/users/serializers.py
icurfer 6b4d38ad5f
Some checks failed
Build And Test / build-and-push (push) Failing after 2m49s
비밀번호 정책 기능 구현
- RegisterSerializer에 비밀번호 정책 검증 추가
- ExtendPasswordExpiryView: 비밀번호 유효기간 연장 API
- CustomTokenObtainPairSerializer: 로그인 시 만료/잠금 검증
- password_utils.py: 정책 검증, 계정 잠금, 만료 체크 유틸리티
- SiteSettings 모델에 비밀번호 정책 필드 추가

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 21:35:46 +09:00

219 lines
8.0 KiB
Python

from rest_framework import serializers
from .models import CustomUser, KVMServer
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from rest_framework.exceptions import ValidationError
class RegisterSerializer(serializers.ModelSerializer):
password = serializers.CharField(write_only=True, required=False)
has_password = serializers.SerializerMethodField()
class Meta:
model = CustomUser
fields = ("email", "name", "password", "grade", "desc",
"phone", "address", "gender", "birth_date", "education",
"social_provider", "profile_image", "has_password")
read_only_fields = ("social_provider", "profile_image", "has_password")
def get_has_password(self, obj):
"""사용자가 비밀번호를 가지고 있는지 여부"""
return obj.has_usable_password()
def validate_email(self, value):
# 업데이트 시에는 자기 자신 제외
instance = getattr(self, 'instance', None)
queryset = CustomUser.objects.filter(email=value)
if instance:
queryset = queryset.exclude(pk=instance.pk)
if queryset.exists():
raise ValidationError("이미 사용 중인 이메일입니다.")
return value
def validate_name(self, value):
# 업데이트 시에는 자기 자신 제외
instance = getattr(self, 'instance', None)
queryset = CustomUser.objects.filter(name=value)
if instance:
queryset = queryset.exclude(pk=instance.pk)
if queryset.exists():
raise ValidationError("이미 사용 중인 이름입니다.")
return value
def validate_password(self, value):
"""비밀번호 정책 검증 (회원가입 시)"""
from .models import SiteSettings
from .password_utils import validate_password_policy
# 비밀번호가 제공된 경우에만 검증 (업데이트 시 비밀번호 변경 안 할 수 있음)
if value:
site_settings = SiteSettings.get_settings()
is_valid, errors = validate_password_policy(value, site_settings)
if not is_valid:
raise ValidationError(errors)
return value
def create(self, validated_data):
from .models import SiteSettings
from .password_utils import validate_password_policy
from django.utils import timezone
password = validated_data.pop("password", None)
# 회원가입 시 비밀번호 필수
if not password:
raise ValidationError({"password": "비밀번호는 필수입니다."})
user = CustomUser(**validated_data)
user.set_password(password)
user.password_changed_at = timezone.now() # 비밀번호 변경일 설정
user.save()
return user
class UserListSerializer(serializers.ModelSerializer):
"""관리자용 사용자 목록 시리얼라이저"""
class Meta:
model = CustomUser
fields = [
'id', 'email', 'name', 'grade', 'is_active', 'is_staff',
'created_at', 'phone', 'address', 'gender', 'birth_date', 'education'
]
read_only_fields = [
'id', 'email', 'name', 'grade', 'is_staff',
'created_at', 'phone', 'address', 'gender', 'birth_date', 'education'
]
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
# email 필드를 identifier로 재정의 (이메일 또는 이름 허용)
email = serializers.CharField()
@classmethod
def get_token(cls, user):
token = super().get_token(user)
# ✅ JWT payload에 커스텀 정보 추가
token["name"] = user.name
token["grade"] = user.grade
token["email"] = user.email # 선택적으로 추가 가능
token["sub"] = user.email # 선택적으로 추가 가능
# Kong JWT 플러그인용 issuer 정보 추가
token["iss"] = "msa-user"
return token
def validate(self, attrs):
from .models import SiteSettings
from .password_utils import (
check_account_lockout, record_login_failure,
reset_login_failures, check_password_expiry
)
identifier = attrs.get("email") # 이메일 또는 이름
password = attrs.get("password")
# 이메일 또는 이름으로 사용자 찾기
user = CustomUser.objects.filter(email=identifier).first()
if user is None:
user = CustomUser.objects.filter(name=identifier).first()
if user is None:
raise ValidationError("계정 또는 비밀번호가 올바르지 않습니다.")
# 사이트 설정 가져오기
site_settings = SiteSettings.get_settings()
# 계정 잠금 상태 확인
lockout_status = check_account_lockout(user, site_settings)
if lockout_status['is_locked']:
raise ValidationError(lockout_status['message'])
if not user.is_active:
raise ValidationError("계정이 비활성화되어 있습니다. 관리자에게 문의하세요.")
# 비밀번호 검증
if not user.check_password(password):
# 로그인 실패 기록
record_login_failure(user, site_settings)
raise ValidationError("계정 또는 비밀번호가 올바르지 않습니다.")
# 로그인 성공 - 실패 횟수 초기화
reset_login_failures(user)
# 부모 클래스의 validate를 위해 attrs에 실제 email 설정
attrs["email"] = user.email
self.user = user # ✅ 수동 설정 필요
data = super().validate(attrs)
data["email"] = user.email
data["grade"] = user.grade
# 비밀번호 만료 상태 확인
expiry_status = check_password_expiry(user, site_settings)
if expiry_status['is_expired']:
data["password_expired"] = True
data["password_message"] = expiry_status['message']
elif expiry_status['is_warning']:
data["password_warning"] = True
data["password_message"] = expiry_status['message']
data["password_days_until_expiry"] = expiry_status['days_until_expiry']
return data
class KVMServerSerializer(serializers.ModelSerializer):
"""KVM 서버 시리얼라이저"""
tags_list = serializers.ListField(
child=serializers.CharField(),
required=False,
write_only=True
)
private_key = serializers.CharField(write_only=True, required=False)
class Meta:
model = KVMServer
fields = [
'id', 'name', 'host', 'port', 'username',
'encrypted_private_key_name', 'libvirt_uri',
'description', 'tags', 'tags_list', 'is_active',
'last_used_at', 'created_at', 'updated_at', 'private_key'
]
read_only_fields = ['id', 'last_used_at', 'created_at', 'updated_at']
extra_kwargs = {
'tags': {'required': False},
}
def to_representation(self, instance):
data = super().to_representation(instance)
data['has_ssh_key'] = bool(instance.encrypted_private_key)
data['tags_list'] = instance.get_tags_list()
return data
def create(self, validated_data):
tags_list = validated_data.pop('tags_list', None)
private_key = validated_data.pop('private_key', None)
instance = super().create(validated_data)
if tags_list is not None:
instance.set_tags_list(tags_list)
if private_key:
instance.save_ssh_key(private_key, validated_data.get('encrypted_private_key_name'))
instance.save()
return instance
def update(self, instance, validated_data):
tags_list = validated_data.pop('tags_list', None)
private_key = validated_data.pop('private_key', None)
instance = super().update(instance, validated_data)
if tags_list is not None:
instance.set_tags_list(tags_list)
instance.save()
if private_key:
instance.save_ssh_key(private_key, validated_data.get('encrypted_private_key_name'))
return instance