butler_ddochi/nhncloud/packages/ApiStorageObject.py
icurfer ea11832a53
Some checks failed
Build And Test / build-and-push (push) Failing after 53s
init
2024-12-13 17:12:03 +09:00

95 lines
3.4 KiB
Python

import requests
import os
endpoint_url_kr1 = "https://kr1-api-object-storage.nhncloudservice.com/v1"
endpoint_url_kr2 = "https://kr2-api-object-storage.nhncloudservice.com/v1"
class ApiStorageObject:
def __init__(self, region, token, storageAccount):
if region == "kr1":
self.storage_url = endpoint_url_kr1
elif region == "kr2":
self.storage_url = endpoint_url_kr2
else:
self.storage_url = endpoint_url_kr2
self.token = token
self.scAccount = storageAccount
def _get_url(self, container=None):
if container == None:
container = ""
else:
container = container
return self.storage_url + "/" + container
def _get_obj_url(self, container=None, object=None):
return '/'.join([self.storage_url, container, object])
def _get_request_header(self):
return {"X-Auth-Token": self.token}
def get_container_list(self):
req_url = str(f"{self._get_url()}{self.scAccount}")
print(f"url: {req_url}")
headers = self._get_request_header()
resp = requests.get(req_url, headers=headers)
# print(resp)
return resp.text.split("\n")
def create(self, container):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}")
print(req_url)
req_header = self._get_request_header()
return requests.put(req_url, headers=req_header)
def remove(self, container):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}")
print(req_url)
req_header = self._get_request_header()
return requests.delete(req_url, headers=req_header)
# public 으로 변경
def set_read_acl(self, container, is_public):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}")
req_header = self._get_request_header()
req_header["X-Container-Read"] = ".r:*" if is_public else ""
return requests.post(req_url, headers=req_header)
def upload(self, container, object, object_path):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}/{object}")
req_header = self._get_request_header()
path = '/'.join([object_path, object])
with open(path, 'rb') as f:
return requests.put(req_url, headers=req_header, data=f.read())
def update(self, container, object):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}/{object}")
print(req_url)
headers={
"X-Auth-Token": self.token,
"Content" : "multipart/form-data",
"X-Delete-At" : "1705985040"
}
return requests.put(req_url, headers=headers)
# 컨테이너 디렉토리 리스트
def _get_list(self, container):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}")
req_header = self._get_request_header()
response = requests.get(req_url, headers=req_header)
return response.text.split('\n')
def get_object_list(self, container):
req_url = str(f"{self._get_url()}{self.scAccount}/{container}")
print(req_url)
return self._get_list(req_url)
def delete(self, containerName, objectPath):
req_url = f"{self._get_url()}{self.scAccount}/{containerName}/{objectPath}"
headers={
"X-Auth-Token": self.token,
}
return requests.delete(req_url, headers=headers)