import requests import os endpoint_url_kr1 = "https://kr1-api-object-storage.nhncloudservice.com/v1" endpoint_url_kr2 = "https://kr2-api-object-storage.nhncloudservice.com/v1" class ApiStorageObject: def __init__(self, region, token, storageAccount): if region == "kr1": self.storage_url = endpoint_url_kr1 elif region == "kr2": self.storage_url = endpoint_url_kr2 else: self.storage_url = endpoint_url_kr2 self.token = token self.scAccount = storageAccount def _get_url(self, container=None): if container == None: container = "" else: container = container return self.storage_url + "/" + container def _get_obj_url(self, container=None, object=None): return '/'.join([self.storage_url, container, object]) def _get_request_header(self): return {"X-Auth-Token": self.token} def get_container_list(self): req_url = str(f"{self._get_url()}{self.scAccount}") print(f"url: {req_url}") headers = self._get_request_header() resp = requests.get(req_url, headers=headers) # print(resp) return resp.text.split("\n") def create(self, container): req_url = str(f"{self._get_url()}{self.scAccount}/{container}") print(req_url) req_header = self._get_request_header() return requests.put(req_url, headers=req_header) def remove(self, container): req_url = str(f"{self._get_url()}{self.scAccount}/{container}") print(req_url) req_header = self._get_request_header() return requests.delete(req_url, headers=req_header) # public 으로 변경 def set_read_acl(self, container, is_public): req_url = str(f"{self._get_url()}{self.scAccount}/{container}") req_header = self._get_request_header() req_header["X-Container-Read"] = ".r:*" if is_public else "" return requests.post(req_url, headers=req_header) def upload(self, container, object, object_path): req_url = str(f"{self._get_url()}{self.scAccount}/{container}/{object}") req_header = self._get_request_header() path = '/'.join([object_path, object]) with open(path, 'rb') as f: return requests.put(req_url, headers=req_header, data=f.read()) def update(self, container, object): req_url = str(f"{self._get_url()}{self.scAccount}/{container}/{object}") print(req_url) headers={ "X-Auth-Token": self.token, "Content" : "multipart/form-data", "X-Delete-At" : "1705985040" } return requests.put(req_url, headers=headers) # 컨테이너 디렉토리 리스트 def _get_list(self, container): req_url = str(f"{self._get_url()}{self.scAccount}/{container}") req_header = self._get_request_header() response = requests.get(req_url, headers=req_header) return response.text.split('\n') def get_object_list(self, container): req_url = str(f"{self._get_url()}{self.scAccount}/{container}") print(req_url) return self._get_list(req_url) def delete(self, containerName, objectPath): req_url = f"{self._get_url()}{self.scAccount}/{containerName}/{objectPath}" headers={ "X-Auth-Token": self.token, } return requests.delete(req_url, headers=headers)