Files
Zack Meier 1d304511b8 update
2026-04-15 15:45:50 -05:00

793 lines
27 KiB
Python

import json
import random
import requests
import urllib3
import certifi
import os
import pytz
from datetime import datetime, timedelta, time
class API(object):
def __init__(self, server):
if not ((os.environ.get('COHESITY_USER')) and (os.environ.get('COHESITY_PASS'))):
print("\n*** Environment Variables are not set for authentication! ***")
exit()
# Get the authorization bearer token
self.user = os.environ['COHESITY_USER']
self.pwd = os.environ['COHESITY_PASS']
if self.user == "svcitdchstyauto":
self.domain = "local"
else:
self.domain = "nd.gov"
self.server = server
self.base_url = ('https://{}/irisservices/api/v1').format(self.server)
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def GetClusterName(self):
return self.server
###################################################### Depricated Methods #################################################
def GetAuthToken(self):
apiCall = "/public/accessTokens"
payload = {
"username": self.user,
"password": self.pwd,
"domain": self.domain
}
encoded_payload = json.dumps(payload)
token = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload, verify=True)
if 'locked' in token.text:
print("Account {} is locked".format(self.user))
exit(1)
if 'expired' in token.text:
print("Password for local account {} has expired".format(self.user))
exit(1)
return(self.FormatData(token))
###########################################################################################################################
# Methods under maintenance
def FormatData(self, response):
jsonData = json.loads(response.text)
return (jsonData)
def GetClusterName(self):
return(self.server)
def Authenticate(self):
apiCall = "/public/accessTokens"
payload = {
"username": self.user,
"password": self.pwd,
"domain": self.domain
}
encoded_payload = json.dumps(payload)
authResponse = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload, verify=True)
token = self.FormatData(authResponse)
if (token['accessToken']):
self.headers.update({'Authorization': 'Bearer ' + token['accessToken']})
return()
else:
print("ERROR: Unable to authenticate to " + self.server)
def UpdateHeaders(self, token):
self.headers.update({'Authorization': 'Bearer ' + token})
def GetRelativeTimestamp(self, d, h, m, s):
# Timezone
tz = pytz.timezone("America/Chicago")
# Current date
date = datetime.now(tz).date()
# Convert to yesterday
start_date = date + timedelta(days=d)
# Set time to 1700
start_time = datetime(
year = start_date.year,
month = start_date.month,
day = start_date.day,
hour = h,
minute = m,
second = s
)
# Add timezone info to the start time
start_time = tz.localize(start_time)
# Convert to UTC
start_utc_time = start_time.astimezone(pytz.utc)
# Convert to unix epoch microseconds
start_epoch_ms = int(start_utc_time.timestamp() * 1000000)
return(start_epoch_ms)
def GetRandomStartTime(self):
hours = [17, 18, 19, 20, 21, 22, 23, 00, 1, 2, 3, 4, 5, 6]
min = [0, 15, 30, 45]
rHour = random.choice(hours)
rMin = random.choice(min)
sTime=[rHour,rMin]
return sTime
#################### DELETE Methods ####################
def DeleteJob(self, url):
apiCall = url
payload = {
"deleteSnapshots": True
}
encoded_payload = json.dumps(payload)
data = requests.delete(self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(data)
#################### SET Methods #######################
def ResolveAlert(self, alert_id, snow_id):
apiCall = "/public/alertResolutions"
payload = {
"alertIdList": [
alert_id
],
"resolutionDetails": {
"resolutionDetails":"See ServiceNow",
"resolutionSummary": snow_id
}
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.GetRequest(apiCall))
#################### GET Methods #######################
def GetRequest(self, url):
apiCall = url
payload = {}
encoded_payload = json.dumps(payload)
data = requests.request("GET", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(data))
def GetRequestV2(self, url):
base_url = ('https://{}/v2').format(self.server)
apiCall = url
payload = {}
encoded_payload = json.dumps(payload)
data = requests.request("GET", base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(data))
def GetFilteredRequest(self, url, filters):
apiCall = url + filters
payload = {}
encoded_payload = json.dumps(payload)
data = requests.request("GET", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(data))
def GetPhysicalsources(self):
apiCall = "/public/protectionSources/registrationInfo?isDeleted=False&environments=kPhysical"
return(self.GetRequest(apiCall))
def GetSQLsources(self):
apiCall = "/public/protectionSources/registrationInfo?isDeleted=False&environments=kSQL"
return(self.GetRequest(apiCall))
def GetSQLjobs(self):
apiCall = "/public/protectionJobs/?isDeleted=False&environments=kSQL"
return(self.FormatData(self.GetRequest(apiCall)))
def GetVMsources(self):
apiCall = "/public/protectionSources?environments=kVMware"
return(self.GetRequest(apiCall))
def GetVMjobs(self):
apiCall = "/public/protectionJobs?environments=kVMware"
return(self.GetRequest(apiCall))
def GetDiskAlerts(self, startTime):
apiCall = "/public/alerts?alertStateList=kOpen&alertCategoryList=kDisk&startDateUsecs=" + str(startTime)
return(self.GetRequest(apiCall))
def GetProtectedVMobjects(self, hypervisorID):
apiCall = "/public/protectionSources/protectedObjects?environment=kVMware&id=" + str(hypervisorID)
return(self.GetRequest(apiCall))
def GetProtectionPolicyByName(self, policyName):
apiCall = "/public/protectionPolicies/?name=" + policyName
return(self.GetRequest(apiCall))
def GetVMObjects(self, hypervisorID):
apiCall = "/public/protectionSources/virtualMachines?vCenterId=" + str(hypervisorID)
return(self.GetRequest(apiCall))
def GetAZsources(self):
apiCall = "/public/protectionSources?environments=kAzure"
return(self.GetRequest(apiCall))
def GetAZNativeJobs(self):
apiCall = "/public/protectionJobs?environments=kAzureNative"
return(self.GetRequest(apiCall))
def GetAZObjects(self, hypervisorID):
apiCall = "/public/protectionSources?environments=kAzure&id=" + str(hypervisorID)
data = self.GetRequest(apiCall)
azureVMs = []
for node in data[0]['nodes']:
if 'nodes' in node:
for subNode in node['nodes']:
if 'protectionSource' in subNode:
if 'azureProtectionSource' in subNode['protectionSource']:
if "type" in subNode['protectionSource']['azureProtectionSource']:
if subNode['protectionSource']['azureProtectionSource']['type'] == 'kVirtualMachine':
azureVMs.append(subNode['protectionSource'])
return(azureVMs)
def GetProtectedAZobjects(self, hypervisorID):
apiCall = "/public/protectionSources/protectedObjects?environment=kAzure&id=" + str(hypervisorID)
return(self.GetRequest(apiCall))
def GetPhysicalJobs(self):
apiCalll = "/public/protectionJobs?isDeleted=False&environments=kPhysicalFiles"
return(self.FormatData(self.GetRequest(apiCall)))
def GetProtectionSourceId(self, hostname):
sourceRegistered = False
hostname = hostname.lower()
sources = self.GetFilteredRequest("/public/protectionSources/registrationInfo", "?environments=kPhysical")
for source in sources['rootNodes']:
if (source['rootNode']['name']).lower() == hostname:
sourceId = source['rootNode']['id']
sourceRegistered = True
break
if sourceRegistered:
return(sourceId)
else:
sys.exit("Source not registered")
def GetVcenterTagId(self, vCenterId, tagName):
tagFound=0
for record in vCenterId[0]['nodes'][0]['nodes']:
if "protectionSource" in record:
if "name" in record['protectionSource']:
# Check if tagName matches AppName
if record['protectionSource']['name'] == "AppName":
for tagRecord in record['nodes']:
if tagRecord['protectionSource']['name'] == tagName and tagRecord['protectionSource']['vmWareProtectionSource']['type'] == "kTag":
tagId = tagRecord['protectionSource']['id']
tagFound = 1
break
# Check if tagName matches DTAP (Development, Test, Acceptable, Production)
if record['protectionSource']['name'] == "DTAP":
for tagRecord in record['nodes']:
if tagRecord['protectionSource']['name'] == tagName and tagRecord['protectionSource']['vmWareProtectionSource']['type'] == "kTag":
tagId = tagRecord['protectionSource']['id']
tagFound = 1
break
if tagFound == 1:
break
if tagFound == 1:
return tagId
else:
return None
def GetAzureTagId(self, azSubId, tagName):
tagFound = 0
for record in azSubId[0]['nodes']:
if "protectionSource" in record:
if "name" in record['protectionSource']:
if record['protectionSource']['name'] == "ApplicationName#~#" + tagName:
if record['protectionSource']['azureProtectionSource']['type'] == "kTag":
tagId = record['protectionSource']['id']
tagFound = 1
break
if tagFound == 1:
break
if tagFound == 1:
return tagId
else:
return None
def GetProtectionJobByHost(self, hostname):
jobFound = False
jobs = self.GetFilteredRequest("/public/protectionJobs", "?isDeleted=False&environments=kPhysicalFiles,kSQL")
sourceId = self.GetProtectionSourceId(hostname)
for job in jobs:
for source in job['sourceIds']:
if source == sourceId:
jobName = job['name']
jobFound = True
break
# Job was found break out of the jobs loop
if jobFound:
break
if jobFound:
return(job)
else:
return(-1)
################### Registration Methods ##################
def RemoveProtectionSource(self, sourceId):
apiCall = "/public/protectionSources/" + str(sourceId)
response = requests.request("DELETE", self.base_url + apiCall, headers=self.headers)
return(response)
def RegisterPhysical(self, hostName):
apiCall = "/public/protectionSources/register"
payload = {
"environment": "kPhysical",
"physicalType": "kHost",
"forceRegister" : True,
# The API will error if hostType is not specified; however during the registration process Cohesity will register
# the correct type even if the incorrect type was given in the payload.
"hostType": "kWindows",
"endpoint": hostName
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(response))
def RegisterSQL(self, sourceID):
apiCall = "/public/protectionSources/applicationServers"
payload = {
"applications": [
"kSQL"
],
"hasPersistentAgent": bool("true"),
"protectionSourceId": sourceID
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (self.FormatData(response))
def RefreshSource(self, sourceID):
apiCall = "/public/protectionSources/refresh/" + str(sourceID)
payload = {}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers)
return
################ Protection Job Methods #######################
def PauseJob(self, jobId):
apiCall = "/public/protectionJobState/" + str(jobId)
payload = {
"pause": True,
"pauseReason": 0
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def ResumeJob(self, jobId):
apiCall = "/public/protectionJobState/" + str(jobId)
payload = {
"pause": False,
"pauseReason": 0
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CancelJob(self, jobId, runId):
apiCall = "/public/protectionRuns/cancel/" + str(jobId)
payload = {
"jobRunId": runId
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def UpdateVMProtectionJob(self, job):
apiCall = "/public/protectionJobs/" + str(job['id'])
payload = job
encoded_payload = json.dumps(payload)
response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def UpdateProtectionJob(self, job):
apiCall = "/public/protectionJobs/" + str(job['id'])
payload = job
encoded_payload = json.dumps(payload)
response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def UpdateAZprotectionJob(self, job):
base_url = ('https://{}/v2').format(self.server)
apiCall = "/data-protect/protection-groups/7780085755317378:1694019083558:" + str(job['id'])
payload = job
encoded_payload = json.dumps(payload)
response = requests.request("PUT", base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateVMProtectionJob(self, pgName, parentId, tagId, excludeTag, polId):
apiCall = "/public/protectionJobs/"
sTime=self.GetRandomStartTime()
payload = {
"name": pgName,
"description": "",
"environment": "kVMware",
# NDIT - Bronze Policy
"policyId": polId,
#"policyId": "4847477517838800:1610060943809:6402",
# Default Storage Domain
"viewBoxId": 198,
"parentSourceId": parentId,
"vmTagIds": [
[
tagId
]
],
"excludeVmTagIds": [
[
excludeTag
]
],
"startTime": {
"hour": sTime[0],
"minute": sTime[1]
},
"timezone": "America/Chicago",
"incrementalProtectionSlaTimeMins": 480,
"fullProtectionSlaTimeMins": 480,
"priority": "kLow",
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"abortInBlackoutPeriod": False,
"quiesce": False,
"qosType": "kBackupHDD",
"environmentParameters":{
"vmwareParameters":{
"fallbackToCrashConsistent": False,
"skipPhysicalRdmDisks": False
}
},
"cloudParameters": {
"failoverToCloud": False
},
"leverageStorageSnapshots": False,
"leverageStorageSnapshotsForHyperFlex": False,
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateAZSQLProtectionJob(self, srcId, pgName):
apiCall = "/public/protectionJobs/"
payload = {
"name": pgName,
# NDIT - Bronze Policy
"policyId": "7780085755317378:1694019083558:289",
# Default Storage Domain
"viewBoxId": 36,
# SQL Server kRootContainer
"parentSourceId": 6,
"sourceIds": [
srcId
],
"startTime": {
"hour": 22,
"minute": 00
},
"timezone": "America/Chicago",
"fullProtectionSlaTimeMins": 480,
"incrementalProtectionSlaTimeMins": 480,
"priority": "kLow",
"LeverageSanTransport": False,
# Default Indexing Policy
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"environment": "kSQL",
"environmentParameters": {
"sqlParameters": {
"userDatabasePreference": "kBackupAllDatabases",
"backupSystemDatabases": bool('true'),
"aagPreferenceFromSqlServer": bool('true'),
"backupType": "kSqlVSSVolume",
"backupVolumesOnly": bool('true')
}
},
"abortInBlackoutPeriod": False,
"qosType": "kBackupHDD",
"description": "",
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateAZProtectionJob(self, pgName, parentId, tagId):
base_url = ('https://{}/v2').format(self.server)
apiCall = "/data-protect/protection-groups"
payload = {
"name": pgName,
"description": "",
# NDIT - Bronze Policy
#"policyId": "5033496487614715:1619543500905:271778",
"policyId": "7780085755317378:1694019083558:2291",
# Default Storage Domain
"storageDomainId": 36,
"startTime": {
"hour": 19,
"minute": 00,
"timezone": "America/Chicago"
},
"priority": "kLow",
"sla": [
{
"backupRunType": "kIncremental",
"slaMinutes": 480
},
{
"backupRunType": "kFull",
"slaMinutes": 480
}
],
"isPaused": True,
"environment": "kAzure",
"azureParams": {
"protectionType": "kNative",
"nativeProtectionTypeParams":{
"objects": [],
"vmTagIds": [
[
tagId
]
],
"indexingPolicy":{
"enableIndexing": True,
"includePaths": [
"/"
],
"excludePaths": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/splunk",
]
},
"sourceId": parentId
},
},
"abortInBlackoutPeriod": False
}
encoded_payload = json.dumps(payload)
print(base_url + apiCall)
response = requests.request("POST", base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateSQLProtectionJob(self, srcId, pgName):
apiCall = "/public/protectionJobs/"
payload = {
"name": pgName,
# NDIT - Bronze Policy
"policyId": "4847477517838800:1610060943809:6402",
# Default Storage Domain
"viewBoxId": 198,
# SQL Server kRootContainer
"parentSourceId": 8702,
"sourceIds": [
srcId
],
"startTime": {
"hour": 22,
"minute": 00
},
"timezone": "America/Chicago",
"fullProtectionSlaTimeMins": 480,
"incrementalProtectionSlaTimeMins": 480,
"priority": "kLow",
"LeverageSanTransport": False,
# Default Indexing Policy
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"environment": "kSQL",
"environmentParameters": {
"sqlParameters": {
"userDatabasePreference": "kBackupAllDatabases",
"backupSystemDatabases": bool('true'),
"aagPreferenceFromSqlServer": bool('true'),
"backupType": "kSqlVSSVolume",
"backupVolumesOnly": bool('true')
}
},
"abortInBlackoutPeriod": False,
"qosType": "kBackupHDD",
"description": "",
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreatePhysicalProtectionJob(self, srcId, pgName):
apiCall = "/public/protectionJobs/"
payload = {
"name": pgName,
"environment": "kPhysical",
# NDIT - Bronze Policy
"policyId": "4847477517838800:1610060943809:6402",
# Default Storage Domain
"viewBoxId": 198,
# Physical Servers
"parentSourceId": 3149,
"sourceIds": [
srcId
],
"startTime": {
"hour": 18,
"minute": 00
},
"timezone": "America/Chicago",
"fullProtectionSlaTimeMins": 480,
"incrementalProtectionSlaTimeMins": 480,
"priority": "kLow",
"LeverageSanTransport": False,
# Default Indexing Policy
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"abortInBlackoutPeriod": False,
"performSourceSideDedup": True,
"qosType": "kBackupHDD",
"description": "",
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
################ Security Methods #######################
def UpdatePermissions(self, sourceIds, cohesitySid):
apiCall = "/public/principals/protectionSources"
payload = {
"sourcesForPrincipals": [
{
"protectionSourceIds": sourceIds,
"sid": cohesitySid
}
]
}
encoded_payload = json.dumps(payload)
response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)