Ansible ssh키 암복호화기능추가
All checks were successful
Build And Test / build-and-push (push) Successful in 2m49s

This commit is contained in:
2025-05-21 01:04:45 +09:00
parent 91d3b77c23
commit e98090f031
4 changed files with 144 additions and 14 deletions

View File

@ -1,9 +1,9 @@
from django.db import models from django.db import models
from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, BaseUserManager from django.contrib.auth.models import AbstractBaseUser, PermissionsMixin, BaseUserManager
import base64
from cryptography.fernet import Fernet
from django.utils import timezone from django.utils import timezone
import base64, hashlib from django.conf import settings # ✅ 추가
from cryptography.fernet import Fernet
import base64, hashlib # ✅ SECRET_KEY 암호화 키 생성용
class CustomUserManager(BaseUserManager): class CustomUserManager(BaseUserManager):
def create_user(self, email, password=None, **extra_fields): def create_user(self, email, password=None, **extra_fields):
@ -57,12 +57,25 @@ class CustomUser(AbstractBaseUser, PermissionsMixin):
def __str__(self): def __str__(self):
return self.email return self.email
# 🔐 SSH Private Key 암복호화 관련 메서드 # ✅ 2025-05-20 SECRET_KEY 기반 암복호화 메서드
def get_encryption_key(self) -> bytes:
"""
SECRET_KEY 기반으로 Fernet 키 생성 (SHA-256 -> base64)
"""
hashed = hashlib.sha256(settings.SECRET_KEY.encode()).digest()
return base64.urlsafe_b64encode(hashed[:32])
def encrypt_private_key(self, private_key: str) -> bytes: def encrypt_private_key(self, private_key: str) -> bytes:
"""
개인 키를 암호화하여 바이트 문자열로 반환
"""
cipher = Fernet(self.get_encryption_key()) cipher = Fernet(self.get_encryption_key())
return cipher.encrypt(private_key.encode()) return cipher.encrypt(private_key.encode())
def decrypt_private_key(self) -> str: def decrypt_private_key(self) -> str:
"""
암호화된 SSH 키를 복호화하여 문자열로 반환
"""
if self.encrypted_private_key: if self.encrypted_private_key:
cipher = Fernet(self.get_encryption_key()) cipher = Fernet(self.get_encryption_key())
decrypted = cipher.decrypt(self.encrypted_private_key).decode() decrypted = cipher.decrypt(self.encrypted_private_key).decode()
@ -72,10 +85,8 @@ class CustomUser(AbstractBaseUser, PermissionsMixin):
return "" return ""
def save_private_key(self, private_key: str): def save_private_key(self, private_key: str):
"""
암호화된 SSH 키를 저장
"""
self.encrypted_private_key = self.encrypt_private_key(private_key) self.encrypted_private_key = self.encrypt_private_key(private_key)
self.save() self.save()
def get_encryption_key(self) -> bytes:
email_encoded = self.email.encode()
base64_key = base64.urlsafe_b64encode(email_encoded.ljust(32)[:32])
return base64_key

View File

@ -1,3 +1,4 @@
# views.py
from rest_framework.views import APIView from rest_framework.views import APIView
from rest_framework.response import Response from rest_framework.response import Response
from rest_framework import status from rest_framework import status
@ -52,12 +53,13 @@ class SSHKeyUploadView(APIView):
user = request.user user = request.user
try: try:
# ✅ 모델 메서드로 암호화 저장 처리
user.save_private_key(private_key) user.save_private_key(private_key)
user.encrypted_private_key_name = key_name user.encrypted_private_key_name = key_name
user.save(update_fields=["encrypted_private_key", "encrypted_private_key_name"]) user.save(update_fields=["encrypted_private_key", "encrypted_private_key_name"])
return Response({"message": "SSH key 저장 완료."}) return Response({"message": "SSH key 저장 완료."}, status=201)
except Exception as e: except Exception as e:
return Response({"error": str(e)}, status=500) return Response({"error": f"암호화 또는 저장 실패: {str(e)}"}, status=500)
def delete(self, request): def delete(self, request):
user = request.user user = request.user
@ -80,7 +82,6 @@ class SSHKeyInfoView(APIView):
}) })
# ✅ 실제 암호화된 키를 반환하는 API
class SSHKeyRetrieveView(APIView): class SSHKeyRetrieveView(APIView):
permission_classes = [IsAuthenticated] permission_classes = [IsAuthenticated]
@ -88,4 +89,10 @@ class SSHKeyRetrieveView(APIView):
user = request.user user = request.user
if not user.encrypted_private_key: if not user.encrypted_private_key:
return Response({"error": "SSH 키가 등록되어 있지 않습니다."}, status=404) return Response({"error": "SSH 키가 등록되어 있지 않습니다."}, status=404)
return Response({"ssh_key": user.encrypted_private_key})
try:
# ✅ 모델 메서드로 복호화 처리
decrypted_key = user.decrypt_private_key()
return Response({"ssh_key": decrypted_key})
except Exception as e:
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)

112
users/views_after.py Normal file
View File

@ -0,0 +1,112 @@
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from rest_framework_simplejwt.views import TokenObtainPairView
from .serializers import RegisterSerializer, CustomTokenObtainPairSerializer
# ✅ 2025-05-20 Fernet 암호화 적용 (SECRET_KEY 사용)
from cryptography.fernet import Fernet
from django.conf import settings
import base64, hashlib
# SECRET_KEY 기반으로 Fernet 키 생성
hashed = hashlib.sha256(settings.SECRET_KEY.encode()).digest()
fernet_key = base64.urlsafe_b64encode(hashed[:32])
fernet = Fernet(fernet_key)
class RegisterView(APIView):
def post(self, request):
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
return Response({"message": "User registered successfully."}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class MeView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
user = request.user
serializer = RegisterSerializer(user)
return Response(serializer.data)
def put(self, request):
user = request.user
serializer = RegisterSerializer(user, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class CustomTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
class SSHKeyUploadView(APIView):
permission_classes = [IsAuthenticated]
def post(self, request):
private_key = request.data.get("private_key")
key_name = request.data.get("key_name")
if not private_key or not key_name:
return Response(
{"error": "private_key와 key_name 모두 필요합니다."},
status=status.HTTP_400_BAD_REQUEST
)
user = request.user
try:
# ✅ 암호화 처리
encrypted_key = fernet.encrypt(private_key.encode()).decode()
# ✅ 저장
user.encrypted_private_key = encrypted_key
user.encrypted_private_key_name = key_name
user.save(update_fields=["encrypted_private_key", "encrypted_private_key_name"])
return Response({"message": "SSH key 저장 완료."}, status=201)
except Exception as e:
return Response({"error": f"암호화 또는 저장 실패: {str(e)}"}, status=500)
def delete(self, request):
user = request.user
user.encrypted_private_key = None
user.encrypted_private_key_name = None
user.last_used_at = None
user.save(update_fields=["encrypted_private_key", "encrypted_private_key_name", "last_used_at"])
return Response({"message": "SSH key deleted."}, status=200)
class SSHKeyInfoView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
user = request.user
return Response({
"has_key": bool(user.encrypted_private_key),
"encrypted_private_key_name": user.encrypted_private_key_name,
"last_used_at": user.last_used_at
})
class SSHKeyRetrieveView(APIView):
permission_classes = [IsAuthenticated]
def get(self, request):
user = request.user
if not user.encrypted_private_key:
return Response({"error": "SSH 키가 등록되어 있지 않습니다."}, status=404)
try:
# ✅ 복호화
decrypted_key = fernet.decrypt(user.encrypted_private_key.encode()).decode()
return Response({"ssh_key": decrypted_key})
except Exception as e:
return Response({"error": f"복호화 실패: {str(e)}"}, status=500)

View File

@ -1 +1 @@
0.0.10-rc1 0.0.10