butler_ddochi/nhncloud/packages/ApiVpc.py
icurfer ea11832a53
Some checks failed
Build And Test / build-and-push (push) Failing after 53s
init
2024-12-13 17:12:03 +09:00

230 lines
8.2 KiB
Python

import json
import requests
# type : network
# Region :
# 판교: https://kr1-api-network-infrastructure.nhncloudservice.com
# 평촌: https://kr2-api-network-infrastructure.nhncloudservice.com
endpoint_vpc_kr1 = "https://kr1-api-network-infrastructure.nhncloudservice.com"
endpoint_vpc_kr2 = "https://kr2-api-network-infrastructure.nhncloudservice.com"
endpoint_floating_kr1 = "https://kr1-api-network-infrastructure.nhncloudservice.com"
endpoint_floating_kr2 = "https://kr2-api-network-infrastructure.nhncloudservice.com"
class ApiVpc:
def __init__(self, region, token):
if region == "kr1":
self.vpcUrl = endpoint_vpc_kr1
self.floatingUrl = endpoint_floating_kr1
elif region == "kr2":
self.vpcUrl = endpoint_vpc_kr2
self.floatingUrl = endpoint_floating_kr2
else:
self.vpcUrl = endpoint_vpc_kr2
self.floatingUrl = endpoint_floating_kr2
self.token = token
def vpcList(self):
url = f"{self.vpcUrl}/v2.0/vpcs"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)
return datas
def vpcInfo(self, vpc_id):
url = f"{self.vpcUrl}/v2.0/vpcs/{vpc_id}"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)
return datas
def getVpcId(self, vpc_name):
vpcName = vpc_name
url = f"{self.vpcUrl}/v2.0/vpcs"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)["vpcs"]
print(f"ss:{datas}")
i = len(datas) - 1
for var in reversed(datas):
foo = str(var["name"])
if foo.startswith(vpcName):
break
else:
if i == 0:
break
else:
i -= 1
return datas[i]["id"]
def vpcCreate(self, name, cidrv4):
url = f"{self.vpcUrl}/v2.0/vpcs"
parms = {"vpc": {"name": name, "cidrv4": cidrv4}}
headers = {"X-Auth-Token": self.token}
response = requests.post(url, json=parms, headers=headers)
datas = json.loads(response.text)
return datas["vpc"]["id"]
def vpcDelete(self, vpc_id):
url = f"{self.vpcUrl}/v2.0/vpcs/{vpc_id}"
headers = {"X-Auth-Token": self.token}
response = requests.delete(url, headers=headers)
return
def subnetList(self):
url = f"{self.vpcUrl}/v2.0/vpcsubnets"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)
return datas
def getSubnetId(self, subnetName):
url = f"{self.vpcUrl}/v2.0/vpcsubnets"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)["vpcsubnets"]
for var in datas:
foo = str(var["name"])
if foo.startswith(subnetName) and foo.count("rt") == 0:
return var["id"]
def subnetCreate(self, vpc_id, cidr, name):
url = f"{self.vpcUrl}/v2.0/vpcsubnets"
parms = {"vpcsubnet": {"vpc_id": vpc_id, "cidr": cidr, "name": name}}
headers = {"X-Auth-Token": self.token}
response = requests.post(url, json=parms, headers=headers)
datas = json.loads(response.text)
return datas["vpcsubnet"]["id"]
def subnetDelete(self, subnet_id):
url = f"{self.vpcUrl}/v2.0/vpcsubnets/{subnet_id}"
headers = {"X-Auth-Token": self.token}
response = requests.delete(url, headers=headers)
return
def routingList(self):
# url = f"{self.vpcUrl}/v2.0/routingtables"
url = f"{self.vpcUrl}/v2.0/routingtables?detail=true"
# parms = {
# "default_table": "true"
# }
headers = {
"X-Auth-Token": self.token,
#
}
response = requests.get(url, headers=headers)
# response = requests.post(url, query={"detail":"true"}, headers=headers)
datas = json.loads(response.text)
return datas
def getRoutingDefault(self):
url = f"{self.vpcUrl}/v2.0/routingtables"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)
return datas["routingtables"][0]["id"]
def routingCreate(self, name, vpc_id, distributed="true"):
url = f"{self.vpcUrl}/v2.0/routingtables"
parms = {
"routingtable": {"name": name, "vpc_id": vpc_id, "distributed": distributed}
}
headers = {"X-Auth-Token": self.token}
response = requests.post(url, json=parms, headers=headers)
datas = json.loads(response.text)
return datas["routingtable"]["id"]
def routingDelete(self, routingtable_Id):
url = f"{self.vpcUrl}/v2.0/routingtables/{routingtable_Id}"
headers = {"X-Auth-Token": self.token}
response = requests.delete(url, headers=headers)
return
def setDefaultRouting(self, routingtable_Id):
url = f"{self.vpcUrl}/v2.0/routingtables/{routingtable_Id}/set_as_default"
headers = {"X-Auth-Token": self.token}
response = requests.put(url, headers=headers)
datas = json.loads(response.text)
return datas
def setIgwToRoutingTable(self, routingtable_Id, igw_id):
url = f"{self.vpcUrl}/v2.0/routingtables/{routingtable_Id}/attach_gateway"
parms = {"gateway_id": igw_id}
headers = {"X-Auth-Token": self.token}
response = requests.put(url, json=parms, headers=headers)
datas = json.loads(response.text)
return datas
def detachIgwFromRoutingTable(self, routingtable_Id):
url = f"{self.vpcUrl}/v2.0/routingtables/{routingtable_Id}/detach_gateway"
headers = {"X-Auth-Token": self.token}
response = requests.put(url, headers=headers)
datas = json.loads(response.text)
return datas
def attachRoutingToSubnet(self, subnet_Id, routing_id):
url = f"{self.vpcUrl}/v2.0/vpcsubnets/{subnet_Id}/attach_routingtable"
parms = {"routingtable_id": routing_id}
headers = {"X-Auth-Token": self.token}
response = requests.put(url, json=parms, headers=headers)
datas = json.loads(response.text)
return datas
def getExternalNetworkId(self):
url = f"{self.vpcUrl}/v2.0/networks?router:external=True"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)
return datas
def createFloatingIp(self, external_id):
url = f"{self.floatingUrl}/v2.0/floatingips"
headers = {"X-Auth-Token": self.token}
parms = {
"floatingip": {
"floating_network_id": external_id,
# "port_id": ""
}
}
response = requests.post(url, json=parms, headers=headers)
# datas = json.loads(response.text)["floatingip"]["floating_ip_address"]
datas = json.loads(response.text)["floatingip"]["id"]
return datas
def attachFloatingIpToDeviceNic(self, floating_ip_id, device_nic_id):
url = f"{self.floatingUrl}/v2.0/floatingips/{floating_ip_id}"
parms = {"floatingip": {"port_id": device_nic_id}}
headers = {"X-Auth-Token": self.token}
response = requests.put(url, json=parms, headers=headers)
print(response)
datas = json.loads(response.text)
return datas
def nicList(self):
url = f"{self.vpcUrl}/v2.0/ports"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)
return datas
def nicId(self, network_id):
url = f"{self.vpcUrl}/v2.0/ports"
headers = {"X-Auth-Token": self.token}
response = requests.get(url, headers=headers)
datas = json.loads(response.text)["ports"]
for var in datas:
foo = str(var["device_id"])
if foo.startswith(network_id):
return var["id"]