import json import random import requests import urllib3 import certifi import os import pytz from datetime import datetime, timedelta, time class API(object): def __init__(self, server): if not ((os.environ.get('COHESITY_USER')) and (os.environ.get('COHESITY_PASS'))): print("\n*** Environment Variables are not set for authentication! ***") exit() # Get the authorization bearer token self.user = os.environ['COHESITY_USER'] self.pwd = os.environ['COHESITY_PASS'] if self.user == "svcitdchstyauto": self.domain = "local" else: self.domain = "nd.gov" self.server = server self.base_url = ('https://{}/irisservices/api/v1').format(self.server) self.headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } def GetClusterName(self): return self.server ###################################################### Depricated Methods ################################################# def GetAuthToken(self): apiCall = "/public/accessTokens" payload = { "username": self.user, "password": self.pwd, "domain": self.domain } encoded_payload = json.dumps(payload) token = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload, verify=True) if 'locked' in token.text: print("Account {} is locked".format(self.user)) exit(1) if 'expired' in token.text: print("Password for local account {} has expired".format(self.user)) exit(1) return(self.FormatData(token)) ########################################################################################################################### # Methods under maintenance def FormatData(self, response): jsonData = json.loads(response.text) return (jsonData) def GetClusterName(self): return(self.server) def Authenticate(self): apiCall = "/public/accessTokens" payload = { "username": self.user, "password": self.pwd, "domain": self.domain } encoded_payload = json.dumps(payload) authResponse = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload, verify=True) token = self.FormatData(authResponse) if (token['accessToken']): self.headers.update({'Authorization': 'Bearer ' + token['accessToken']}) return() else: print("ERROR: Unable to authenticate to " + self.server) def UpdateHeaders(self, token): self.headers.update({'Authorization': 'Bearer ' + token}) def GetRelativeTimestamp(self, d, h, m, s): # Timezone tz = pytz.timezone("America/Chicago") # Current date date = datetime.now(tz).date() # Convert to yesterday start_date = date + timedelta(days=d) # Set time to 1700 start_time = datetime( year = start_date.year, month = start_date.month, day = start_date.day, hour = h, minute = m, second = s ) # Add timezone info to the start time start_time = tz.localize(start_time) # Convert to UTC start_utc_time = start_time.astimezone(pytz.utc) # Convert to unix epoch microseconds start_epoch_ms = int(start_utc_time.timestamp() * 1000000) return(start_epoch_ms) def GetRandomStartTime(self): hours = [17, 18, 19, 20, 21, 22, 23, 00, 1, 2, 3, 4, 5, 6] min = [0, 15, 30, 45] rHour = random.choice(hours) rMin = random.choice(min) sTime=[rHour,rMin] return sTime #################### DELETE Methods #################### def DeleteJob(self, url): apiCall = url payload = { "deleteSnapshots": True } encoded_payload = json.dumps(payload) data = requests.delete(self.base_url + apiCall, headers=self.headers, data=encoded_payload) return(data) #################### SET Methods ####################### def ResolveAlert(self, alert_id, snow_id): apiCall = "/public/alertResolutions" payload = { "alertIdList": [ alert_id ], "resolutionDetails": { "resolutionDetails":"See ServiceNow", "resolutionSummary": snow_id } } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return(self.GetRequest(apiCall)) #################### GET Methods ####################### def GetRequest(self, url): apiCall = url payload = {} encoded_payload = json.dumps(payload) data = requests.request("GET", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return(self.FormatData(data)) def GetRequestV2(self, url): base_url = ('https://{}/v2').format(self.server) apiCall = url payload = {} encoded_payload = json.dumps(payload) data = requests.request("GET", base_url + apiCall, headers=self.headers, data=encoded_payload) return(self.FormatData(data)) def GetFilteredRequest(self, url, filters): apiCall = url + filters payload = {} encoded_payload = json.dumps(payload) data = requests.request("GET", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return(self.FormatData(data)) def GetPhysicalsources(self): apiCall = "/public/protectionSources/registrationInfo?isDeleted=False&environments=kPhysical" return(self.GetRequest(apiCall)) def GetSQLsources(self): apiCall = "/public/protectionSources/registrationInfo?isDeleted=False&environments=kSQL" return(self.GetRequest(apiCall)) def GetSQLjobs(self): apiCall = "/public/protectionJobs/?isDeleted=False&environments=kSQL" return(self.FormatData(self.GetRequest(apiCall))) def GetVMsources(self): apiCall = "/public/protectionSources?environments=kVMware" return(self.GetRequest(apiCall)) def GetVMjobs(self): apiCall = "/public/protectionJobs?environments=kVMware" return(self.GetRequest(apiCall)) def GetDiskAlerts(self, startTime): apiCall = "/public/alerts?alertStateList=kOpen&alertCategoryList=kDisk&startDateUsecs=" + str(startTime) return(self.GetRequest(apiCall)) def GetProtectedVMobjects(self, hypervisorID): apiCall = "/public/protectionSources/protectedObjects?environment=kVMware&id=" + str(hypervisorID) return(self.GetRequest(apiCall)) def GetProtectionPolicyByName(self, policyName): apiCall = "/public/protectionPolicies/?name=" + policyName return(self.GetRequest(apiCall)) def GetVMObjects(self, hypervisorID): apiCall = "/public/protectionSources/virtualMachines?vCenterId=" + str(hypervisorID) return(self.GetRequest(apiCall)) def GetAZsources(self): apiCall = "/public/protectionSources?environments=kAzure" return(self.GetRequest(apiCall)) def GetAZNativeJobs(self): apiCall = "/public/protectionJobs?environments=kAzureNative" return(self.GetRequest(apiCall)) def GetAZObjects(self, hypervisorID): apiCall = "/public/protectionSources?environments=kAzure&id=" + str(hypervisorID) data = self.GetRequest(apiCall) azureVMs = [] for node in data[0]['nodes']: if 'nodes' in node: for subNode in node['nodes']: if 'protectionSource' in subNode: if 'azureProtectionSource' in subNode['protectionSource']: if "type" in subNode['protectionSource']['azureProtectionSource']: if subNode['protectionSource']['azureProtectionSource']['type'] == 'kVirtualMachine': azureVMs.append(subNode['protectionSource']) return(azureVMs) def GetProtectedAZobjects(self, hypervisorID): apiCall = "/public/protectionSources/protectedObjects?environment=kAzure&id=" + str(hypervisorID) return(self.GetRequest(apiCall)) def GetPhysicalJobs(self): apiCalll = "/public/protectionJobs?isDeleted=False&environments=kPhysicalFiles" return(self.FormatData(self.GetRequest(apiCall))) def GetProtectionSourceId(self, hostname): sourceRegistered = False hostname = hostname.lower() sources = self.GetFilteredRequest("/public/protectionSources/registrationInfo", "?environments=kPhysical") for source in sources['rootNodes']: if (source['rootNode']['name']).lower() == hostname: sourceId = source['rootNode']['id'] sourceRegistered = True break if sourceRegistered: return(sourceId) else: sys.exit("Source not registered") def GetVcenterTagId(self, vCenterId, tagName): tagFound=0 for record in vCenterId[0]['nodes'][0]['nodes']: if "protectionSource" in record: if "name" in record['protectionSource']: # Check if tagName matches AppName if record['protectionSource']['name'] == "AppName": for tagRecord in record['nodes']: if tagRecord['protectionSource']['name'] == tagName and tagRecord['protectionSource']['vmWareProtectionSource']['type'] == "kTag": tagId = tagRecord['protectionSource']['id'] tagFound = 1 break # Check if tagName matches DTAP (Development, Test, Acceptable, Production) if record['protectionSource']['name'] == "DTAP": for tagRecord in record['nodes']: if tagRecord['protectionSource']['name'] == tagName and tagRecord['protectionSource']['vmWareProtectionSource']['type'] == "kTag": tagId = tagRecord['protectionSource']['id'] tagFound = 1 break if tagFound == 1: break if tagFound == 1: return tagId else: return None def GetAzureTagId(self, azSubId, tagName): tagFound = 0 for record in azSubId[0]['nodes']: if "protectionSource" in record: if "name" in record['protectionSource']: if record['protectionSource']['name'] == "ApplicationName#~#" + tagName: if record['protectionSource']['azureProtectionSource']['type'] == "kTag": tagId = record['protectionSource']['id'] tagFound = 1 break if tagFound == 1: break if tagFound == 1: return tagId else: return None def GetProtectionJobByHost(self, hostname): jobFound = False jobs = self.GetFilteredRequest("/public/protectionJobs", "?isDeleted=False&environments=kPhysicalFiles,kSQL") sourceId = self.GetProtectionSourceId(hostname) for job in jobs: for source in job['sourceIds']: if source == sourceId: jobName = job['name'] jobFound = True break # Job was found break out of the jobs loop if jobFound: break if jobFound: return(job) else: return(-1) ################### Registration Methods ################## def RemoveProtectionSource(self, sourceId): apiCall = "/public/protectionSources/" + str(sourceId) response = requests.request("DELETE", self.base_url + apiCall, headers=self.headers) return(response) def RegisterPhysical(self, hostName): apiCall = "/public/protectionSources/register" payload = { "environment": "kPhysical", "physicalType": "kHost", "forceRegister" : True, # The API will error if hostType is not specified; however during the registration process Cohesity will register # the correct type even if the incorrect type was given in the payload. "hostType": "kWindows", "endpoint": hostName } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return(self.FormatData(response)) def RegisterSQL(self, sourceID): apiCall = "/public/protectionSources/applicationServers" payload = { "applications": [ "kSQL" ], "hasPersistentAgent": bool("true"), "protectionSourceId": sourceID } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (self.FormatData(response)) def RefreshSource(self, sourceID): apiCall = "/public/protectionSources/refresh/" + str(sourceID) payload = {} encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers) return ################ Protection Job Methods ####################### def PauseJob(self, jobId): apiCall = "/public/protectionJobState/" + str(jobId) payload = { "pause": True, "pauseReason": 0 } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def ResumeJob(self, jobId): apiCall = "/public/protectionJobState/" + str(jobId) payload = { "pause": False, "pauseReason": 0 } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def CancelJob(self, jobId, runId): apiCall = "/public/protectionRuns/cancel/" + str(jobId) payload = { "jobRunId": runId } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def UpdateVMProtectionJob(self, job): apiCall = "/public/protectionJobs/" + str(job['id']) payload = job encoded_payload = json.dumps(payload) response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def UpdateProtectionJob(self, job): apiCall = "/public/protectionJobs/" + str(job['id']) payload = job encoded_payload = json.dumps(payload) response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def UpdateAZprotectionJob(self, job): base_url = ('https://{}/v2').format(self.server) apiCall = "/data-protect/protection-groups/7780085755317378:1694019083558:" + str(job['id']) payload = job encoded_payload = json.dumps(payload) response = requests.request("PUT", base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def CreateVMProtectionJob(self, pgName, parentId, tagId, excludeTag, polId): apiCall = "/public/protectionJobs/" sTime=self.GetRandomStartTime() payload = { "name": pgName, "description": "", "environment": "kVMware", # NDIT - Bronze Policy "policyId": polId, #"policyId": "4847477517838800:1610060943809:6402", # Default Storage Domain "viewBoxId": 198, "parentSourceId": parentId, "vmTagIds": [ [ tagId ] ], "excludeVmTagIds": [ [ excludeTag ] ], "startTime": { "hour": sTime[0], "minute": sTime[1] }, "timezone": "America/Chicago", "incrementalProtectionSlaTimeMins": 480, "fullProtectionSlaTimeMins": 480, "priority": "kLow", "indexingPolicy":{ "disableIndexing": False, "allowPrefixes": [ "/" ], "denyPrefixes": [ "/$Recycle.Bin", "/Windows", "/ProgramData", "/System Volume Information", "/Users/*/AppData", "/Recovery", "/usr", "/sys", "/proc", "/lib", "/grub", "/grub2", "/opt/splunk", "/splunk", ] }, "abortInBlackoutPeriod": False, "quiesce": False, "qosType": "kBackupHDD", "environmentParameters":{ "vmwareParameters":{ "fallbackToCrashConsistent": False, "skipPhysicalRdmDisks": False } }, "cloudParameters": { "failoverToCloud": False }, "leverageStorageSnapshots": False, "leverageStorageSnapshotsForHyperFlex": False, "isPaused": False } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def CreateAZSQLProtectionJob(self, srcId, pgName): apiCall = "/public/protectionJobs/" payload = { "name": pgName, # NDIT - Bronze Policy "policyId": "7780085755317378:1694019083558:289", # Default Storage Domain "viewBoxId": 36, # SQL Server kRootContainer "parentSourceId": 6, "sourceIds": [ srcId ], "startTime": { "hour": 22, "minute": 00 }, "timezone": "America/Chicago", "fullProtectionSlaTimeMins": 480, "incrementalProtectionSlaTimeMins": 480, "priority": "kLow", "LeverageSanTransport": False, # Default Indexing Policy "indexingPolicy":{ "disableIndexing": False, "allowPrefixes": [ "/" ], "denyPrefixes": [ "/$Recycle.Bin", "/Windows", "/ProgramData", "/System Volume Information", "/Users/*/AppData", "/Recovery", "/usr", "/sys", "/proc", "/lib", "/grub", "/grub2", "/opt/splunk", "/splunk", ] }, "environment": "kSQL", "environmentParameters": { "sqlParameters": { "userDatabasePreference": "kBackupAllDatabases", "backupSystemDatabases": bool('true'), "aagPreferenceFromSqlServer": bool('true'), "backupType": "kSqlVSSVolume", "backupVolumesOnly": bool('true') } }, "abortInBlackoutPeriod": False, "qosType": "kBackupHDD", "description": "", "isPaused": False } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def CreateAZProtectionJob(self, pgName, parentId, tagId): base_url = ('https://{}/v2').format(self.server) apiCall = "/data-protect/protection-groups" payload = { "name": pgName, "description": "", # NDIT - Bronze Policy #"policyId": "5033496487614715:1619543500905:271778", "policyId": "7780085755317378:1694019083558:2291", # Default Storage Domain "storageDomainId": 36, "startTime": { "hour": 19, "minute": 00, "timezone": "America/Chicago" }, "priority": "kLow", "sla": [ { "backupRunType": "kIncremental", "slaMinutes": 480 }, { "backupRunType": "kFull", "slaMinutes": 480 } ], "isPaused": True, "environment": "kAzure", "azureParams": { "protectionType": "kNative", "nativeProtectionTypeParams":{ "objects": [], "vmTagIds": [ [ tagId ] ], "indexingPolicy":{ "enableIndexing": True, "includePaths": [ "/" ], "excludePaths": [ "/$Recycle.Bin", "/Windows", "/ProgramData", "/System Volume Information", "/Users/*/AppData", "/Recovery", "/usr", "/sys", "/proc", "/lib", "/grub", "/grub2", "/splunk", ] }, "sourceId": parentId }, }, "abortInBlackoutPeriod": False } encoded_payload = json.dumps(payload) print(base_url + apiCall) response = requests.request("POST", base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def CreateSQLProtectionJob(self, srcId, pgName): apiCall = "/public/protectionJobs/" payload = { "name": pgName, # NDIT - Bronze Policy "policyId": "4847477517838800:1610060943809:6402", # Default Storage Domain "viewBoxId": 198, # SQL Server kRootContainer "parentSourceId": 8702, "sourceIds": [ srcId ], "startTime": { "hour": 22, "minute": 00 }, "timezone": "America/Chicago", "fullProtectionSlaTimeMins": 480, "incrementalProtectionSlaTimeMins": 480, "priority": "kLow", "LeverageSanTransport": False, # Default Indexing Policy "indexingPolicy":{ "disableIndexing": False, "allowPrefixes": [ "/" ], "denyPrefixes": [ "/$Recycle.Bin", "/Windows", "/ProgramData", "/System Volume Information", "/Users/*/AppData", "/Recovery", "/usr", "/sys", "/proc", "/lib", "/grub", "/grub2", "/opt/splunk", "/splunk", ] }, "environment": "kSQL", "environmentParameters": { "sqlParameters": { "userDatabasePreference": "kBackupAllDatabases", "backupSystemDatabases": bool('true'), "aagPreferenceFromSqlServer": bool('true'), "backupType": "kSqlVSSVolume", "backupVolumesOnly": bool('true') } }, "abortInBlackoutPeriod": False, "qosType": "kBackupHDD", "description": "", "isPaused": False } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) def CreatePhysicalProtectionJob(self, srcId, pgName): apiCall = "/public/protectionJobs/" payload = { "name": pgName, "environment": "kPhysical", # NDIT - Bronze Policy "policyId": "4847477517838800:1610060943809:6402", # Default Storage Domain "viewBoxId": 198, # Physical Servers "parentSourceId": 3149, "sourceIds": [ srcId ], "startTime": { "hour": 18, "minute": 00 }, "timezone": "America/Chicago", "fullProtectionSlaTimeMins": 480, "incrementalProtectionSlaTimeMins": 480, "priority": "kLow", "LeverageSanTransport": False, # Default Indexing Policy "indexingPolicy":{ "disableIndexing": False, "allowPrefixes": [ "/" ], "denyPrefixes": [ "/$Recycle.Bin", "/Windows", "/ProgramData", "/System Volume Information", "/Users/*/AppData", "/Recovery", "/usr", "/sys", "/proc", "/lib", "/grub", "/grub2", "/opt/splunk", "/splunk", ] }, "abortInBlackoutPeriod": False, "performSourceSideDedup": True, "qosType": "kBackupHDD", "description": "", "isPaused": False } encoded_payload = json.dumps(payload) response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response) ################ Security Methods ####################### def UpdatePermissions(self, sourceIds, cohesitySid): apiCall = "/public/principals/protectionSources" payload = { "sourcesForPrincipals": [ { "protectionSourceIds": sourceIds, "sid": cohesitySid } ] } encoded_payload = json.dumps(payload) response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload) return (response)