This commit is contained in:
Zack Meier
2026-04-15 15:45:50 -05:00
commit 1d304511b8
613 changed files with 140998 additions and 0 deletions
@@ -0,0 +1,7 @@
import requests
def send_automation(_data: dict):
_url = 'http://itdnettools.nd.gov/services/automation-tracking.py'
_r = requests.post(_url, json=_data)
return _r.text
@@ -0,0 +1,792 @@
import json
import random
import requests
import urllib3
import certifi
import os
import pytz
from datetime import datetime, timedelta, time
class API(object):
def __init__(self, server):
if not ((os.environ.get('COHESITY_USER')) and (os.environ.get('COHESITY_PASS'))):
print("\n*** Environment Variables are not set for authentication! ***")
exit()
# Get the authorization bearer token
self.user = os.environ['COHESITY_USER']
self.pwd = os.environ['COHESITY_PASS']
if self.user == "svcitdchstyauto":
self.domain = "local"
else:
self.domain = "nd.gov"
self.server = server
self.base_url = ('https://{}/irisservices/api/v1').format(self.server)
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def GetClusterName(self):
return self.server
###################################################### Depricated Methods #################################################
def GetAuthToken(self):
apiCall = "/public/accessTokens"
payload = {
"username": self.user,
"password": self.pwd,
"domain": self.domain
}
encoded_payload = json.dumps(payload)
token = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload, verify=True)
if 'locked' in token.text:
print("Account {} is locked".format(self.user))
exit(1)
if 'expired' in token.text:
print("Password for local account {} has expired".format(self.user))
exit(1)
return(self.FormatData(token))
###########################################################################################################################
# Methods under maintenance
def FormatData(self, response):
jsonData = json.loads(response.text)
return (jsonData)
def GetClusterName(self):
return(self.server)
def Authenticate(self):
apiCall = "/public/accessTokens"
payload = {
"username": self.user,
"password": self.pwd,
"domain": self.domain
}
encoded_payload = json.dumps(payload)
authResponse = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload, verify=True)
token = self.FormatData(authResponse)
if (token['accessToken']):
self.headers.update({'Authorization': 'Bearer ' + token['accessToken']})
return()
else:
print("ERROR: Unable to authenticate to " + self.server)
def UpdateHeaders(self, token):
self.headers.update({'Authorization': 'Bearer ' + token})
def GetRelativeTimestamp(self, d, h, m, s):
# Timezone
tz = pytz.timezone("America/Chicago")
# Current date
date = datetime.now(tz).date()
# Convert to yesterday
start_date = date + timedelta(days=d)
# Set time to 1700
start_time = datetime(
year = start_date.year,
month = start_date.month,
day = start_date.day,
hour = h,
minute = m,
second = s
)
# Add timezone info to the start time
start_time = tz.localize(start_time)
# Convert to UTC
start_utc_time = start_time.astimezone(pytz.utc)
# Convert to unix epoch microseconds
start_epoch_ms = int(start_utc_time.timestamp() * 1000000)
return(start_epoch_ms)
def GetRandomStartTime(self):
hours = [17, 18, 19, 20, 21, 22, 23, 00, 1, 2, 3, 4, 5, 6]
min = [0, 15, 30, 45]
rHour = random.choice(hours)
rMin = random.choice(min)
sTime=[rHour,rMin]
return sTime
#################### DELETE Methods ####################
def DeleteJob(self, url):
apiCall = url
payload = {
"deleteSnapshots": True
}
encoded_payload = json.dumps(payload)
data = requests.delete(self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(data)
#################### SET Methods #######################
def ResolveAlert(self, alert_id, snow_id):
apiCall = "/public/alertResolutions"
payload = {
"alertIdList": [
alert_id
],
"resolutionDetails": {
"resolutionDetails":"See ServiceNow",
"resolutionSummary": snow_id
}
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.GetRequest(apiCall))
#################### GET Methods #######################
def GetRequest(self, url):
apiCall = url
payload = {}
encoded_payload = json.dumps(payload)
data = requests.request("GET", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(data))
def GetRequestV2(self, url):
base_url = ('https://{}/v2').format(self.server)
apiCall = url
payload = {}
encoded_payload = json.dumps(payload)
data = requests.request("GET", base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(data))
def GetFilteredRequest(self, url, filters):
apiCall = url + filters
payload = {}
encoded_payload = json.dumps(payload)
data = requests.request("GET", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(data))
def GetPhysicalsources(self):
apiCall = "/public/protectionSources/registrationInfo?isDeleted=False&environments=kPhysical"
return(self.GetRequest(apiCall))
def GetSQLsources(self):
apiCall = "/public/protectionSources/registrationInfo?isDeleted=False&environments=kSQL"
return(self.GetRequest(apiCall))
def GetSQLjobs(self):
apiCall = "/public/protectionJobs/?isDeleted=False&environments=kSQL"
return(self.FormatData(self.GetRequest(apiCall)))
def GetVMsources(self):
apiCall = "/public/protectionSources?environments=kVMware"
return(self.GetRequest(apiCall))
def GetVMjobs(self):
apiCall = "/public/protectionJobs?environments=kVMware"
return(self.GetRequest(apiCall))
def GetDiskAlerts(self, startTime):
apiCall = "/public/alerts?alertStateList=kOpen&alertCategoryList=kDisk&startDateUsecs=" + str(startTime)
return(self.GetRequest(apiCall))
def GetProtectedVMobjects(self, hypervisorID):
apiCall = "/public/protectionSources/protectedObjects?environment=kVMware&id=" + str(hypervisorID)
return(self.GetRequest(apiCall))
def GetProtectionPolicyByName(self, policyName):
apiCall = "/public/protectionPolicies/?name=" + policyName
return(self.GetRequest(apiCall))
def GetVMObjects(self, hypervisorID):
apiCall = "/public/protectionSources/virtualMachines?vCenterId=" + str(hypervisorID)
return(self.GetRequest(apiCall))
def GetAZsources(self):
apiCall = "/public/protectionSources?environments=kAzure"
return(self.GetRequest(apiCall))
def GetAZNativeJobs(self):
apiCall = "/public/protectionJobs?environments=kAzureNative"
return(self.GetRequest(apiCall))
def GetAZObjects(self, hypervisorID):
apiCall = "/public/protectionSources?environments=kAzure&id=" + str(hypervisorID)
data = self.GetRequest(apiCall)
azureVMs = []
for node in data[0]['nodes']:
if 'nodes' in node:
for subNode in node['nodes']:
if 'protectionSource' in subNode:
if 'azureProtectionSource' in subNode['protectionSource']:
if "type" in subNode['protectionSource']['azureProtectionSource']:
if subNode['protectionSource']['azureProtectionSource']['type'] == 'kVirtualMachine':
azureVMs.append(subNode['protectionSource'])
return(azureVMs)
def GetProtectedAZobjects(self, hypervisorID):
apiCall = "/public/protectionSources/protectedObjects?environment=kAzure&id=" + str(hypervisorID)
return(self.GetRequest(apiCall))
def GetPhysicalJobs(self):
apiCalll = "/public/protectionJobs?isDeleted=False&environments=kPhysicalFiles"
return(self.FormatData(self.GetRequest(apiCall)))
def GetProtectionSourceId(self, hostname):
sourceRegistered = False
hostname = hostname.lower()
sources = self.GetFilteredRequest("/public/protectionSources/registrationInfo", "?environments=kPhysical")
for source in sources['rootNodes']:
if (source['rootNode']['name']).lower() == hostname:
sourceId = source['rootNode']['id']
sourceRegistered = True
break
if sourceRegistered:
return(sourceId)
else:
sys.exit("Source not registered")
def GetVcenterTagId(self, vCenterId, tagName):
tagFound=0
for record in vCenterId[0]['nodes'][0]['nodes']:
if "protectionSource" in record:
if "name" in record['protectionSource']:
# Check if tagName matches AppName
if record['protectionSource']['name'] == "AppName":
for tagRecord in record['nodes']:
if tagRecord['protectionSource']['name'] == tagName and tagRecord['protectionSource']['vmWareProtectionSource']['type'] == "kTag":
tagId = tagRecord['protectionSource']['id']
tagFound = 1
break
# Check if tagName matches DTAP (Development, Test, Acceptable, Production)
if record['protectionSource']['name'] == "DTAP":
for tagRecord in record['nodes']:
if tagRecord['protectionSource']['name'] == tagName and tagRecord['protectionSource']['vmWareProtectionSource']['type'] == "kTag":
tagId = tagRecord['protectionSource']['id']
tagFound = 1
break
if tagFound == 1:
break
if tagFound == 1:
return tagId
else:
return None
def GetAzureTagId(self, azSubId, tagName):
tagFound = 0
for record in azSubId[0]['nodes']:
if "protectionSource" in record:
if "name" in record['protectionSource']:
if record['protectionSource']['name'] == "ApplicationName#~#" + tagName:
if record['protectionSource']['azureProtectionSource']['type'] == "kTag":
tagId = record['protectionSource']['id']
tagFound = 1
break
if tagFound == 1:
break
if tagFound == 1:
return tagId
else:
return None
def GetProtectionJobByHost(self, hostname):
jobFound = False
jobs = self.GetFilteredRequest("/public/protectionJobs", "?isDeleted=False&environments=kPhysicalFiles,kSQL")
sourceId = self.GetProtectionSourceId(hostname)
for job in jobs:
for source in job['sourceIds']:
if source == sourceId:
jobName = job['name']
jobFound = True
break
# Job was found break out of the jobs loop
if jobFound:
break
if jobFound:
return(job)
else:
return(-1)
################### Registration Methods ##################
def RemoveProtectionSource(self, sourceId):
apiCall = "/public/protectionSources/" + str(sourceId)
response = requests.request("DELETE", self.base_url + apiCall, headers=self.headers)
return(response)
def RegisterPhysical(self, hostName):
apiCall = "/public/protectionSources/register"
payload = {
"environment": "kPhysical",
"physicalType": "kHost",
"forceRegister" : True,
# The API will error if hostType is not specified; however during the registration process Cohesity will register
# the correct type even if the incorrect type was given in the payload.
"hostType": "kWindows",
"endpoint": hostName
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return(self.FormatData(response))
def RegisterSQL(self, sourceID):
apiCall = "/public/protectionSources/applicationServers"
payload = {
"applications": [
"kSQL"
],
"hasPersistentAgent": bool("true"),
"protectionSourceId": sourceID
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (self.FormatData(response))
def RefreshSource(self, sourceID):
apiCall = "/public/protectionSources/refresh/" + str(sourceID)
payload = {}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers)
return
################ Protection Job Methods #######################
def PauseJob(self, jobId):
apiCall = "/public/protectionJobState/" + str(jobId)
payload = {
"pause": True,
"pauseReason": 0
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def ResumeJob(self, jobId):
apiCall = "/public/protectionJobState/" + str(jobId)
payload = {
"pause": False,
"pauseReason": 0
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CancelJob(self, jobId, runId):
apiCall = "/public/protectionRuns/cancel/" + str(jobId)
payload = {
"jobRunId": runId
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def UpdateVMProtectionJob(self, job):
apiCall = "/public/protectionJobs/" + str(job['id'])
payload = job
encoded_payload = json.dumps(payload)
response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def UpdateProtectionJob(self, job):
apiCall = "/public/protectionJobs/" + str(job['id'])
payload = job
encoded_payload = json.dumps(payload)
response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def UpdateAZprotectionJob(self, job):
base_url = ('https://{}/v2').format(self.server)
apiCall = "/data-protect/protection-groups/7780085755317378:1694019083558:" + str(job['id'])
payload = job
encoded_payload = json.dumps(payload)
response = requests.request("PUT", base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateVMProtectionJob(self, pgName, parentId, tagId, excludeTag, polId):
apiCall = "/public/protectionJobs/"
sTime=self.GetRandomStartTime()
payload = {
"name": pgName,
"description": "",
"environment": "kVMware",
# NDIT - Bronze Policy
"policyId": polId,
#"policyId": "4847477517838800:1610060943809:6402",
# Default Storage Domain
"viewBoxId": 198,
"parentSourceId": parentId,
"vmTagIds": [
[
tagId
]
],
"excludeVmTagIds": [
[
excludeTag
]
],
"startTime": {
"hour": sTime[0],
"minute": sTime[1]
},
"timezone": "America/Chicago",
"incrementalProtectionSlaTimeMins": 480,
"fullProtectionSlaTimeMins": 480,
"priority": "kLow",
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"abortInBlackoutPeriod": False,
"quiesce": False,
"qosType": "kBackupHDD",
"environmentParameters":{
"vmwareParameters":{
"fallbackToCrashConsistent": False,
"skipPhysicalRdmDisks": False
}
},
"cloudParameters": {
"failoverToCloud": False
},
"leverageStorageSnapshots": False,
"leverageStorageSnapshotsForHyperFlex": False,
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateAZSQLProtectionJob(self, srcId, pgName):
apiCall = "/public/protectionJobs/"
payload = {
"name": pgName,
# NDIT - Bronze Policy
"policyId": "7780085755317378:1694019083558:289",
# Default Storage Domain
"viewBoxId": 36,
# SQL Server kRootContainer
"parentSourceId": 6,
"sourceIds": [
srcId
],
"startTime": {
"hour": 22,
"minute": 00
},
"timezone": "America/Chicago",
"fullProtectionSlaTimeMins": 480,
"incrementalProtectionSlaTimeMins": 480,
"priority": "kLow",
"LeverageSanTransport": False,
# Default Indexing Policy
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"environment": "kSQL",
"environmentParameters": {
"sqlParameters": {
"userDatabasePreference": "kBackupAllDatabases",
"backupSystemDatabases": bool('true'),
"aagPreferenceFromSqlServer": bool('true'),
"backupType": "kSqlVSSVolume",
"backupVolumesOnly": bool('true')
}
},
"abortInBlackoutPeriod": False,
"qosType": "kBackupHDD",
"description": "",
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateAZProtectionJob(self, pgName, parentId, tagId):
base_url = ('https://{}/v2').format(self.server)
apiCall = "/data-protect/protection-groups"
payload = {
"name": pgName,
"description": "",
# NDIT - Bronze Policy
#"policyId": "5033496487614715:1619543500905:271778",
"policyId": "7780085755317378:1694019083558:2291",
# Default Storage Domain
"storageDomainId": 36,
"startTime": {
"hour": 19,
"minute": 00,
"timezone": "America/Chicago"
},
"priority": "kLow",
"sla": [
{
"backupRunType": "kIncremental",
"slaMinutes": 480
},
{
"backupRunType": "kFull",
"slaMinutes": 480
}
],
"isPaused": True,
"environment": "kAzure",
"azureParams": {
"protectionType": "kNative",
"nativeProtectionTypeParams":{
"objects": [],
"vmTagIds": [
[
tagId
]
],
"indexingPolicy":{
"enableIndexing": True,
"includePaths": [
"/"
],
"excludePaths": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/splunk",
]
},
"sourceId": parentId
},
},
"abortInBlackoutPeriod": False
}
encoded_payload = json.dumps(payload)
print(base_url + apiCall)
response = requests.request("POST", base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreateSQLProtectionJob(self, srcId, pgName):
apiCall = "/public/protectionJobs/"
payload = {
"name": pgName,
# NDIT - Bronze Policy
"policyId": "4847477517838800:1610060943809:6402",
# Default Storage Domain
"viewBoxId": 198,
# SQL Server kRootContainer
"parentSourceId": 8702,
"sourceIds": [
srcId
],
"startTime": {
"hour": 22,
"minute": 00
},
"timezone": "America/Chicago",
"fullProtectionSlaTimeMins": 480,
"incrementalProtectionSlaTimeMins": 480,
"priority": "kLow",
"LeverageSanTransport": False,
# Default Indexing Policy
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"environment": "kSQL",
"environmentParameters": {
"sqlParameters": {
"userDatabasePreference": "kBackupAllDatabases",
"backupSystemDatabases": bool('true'),
"aagPreferenceFromSqlServer": bool('true'),
"backupType": "kSqlVSSVolume",
"backupVolumesOnly": bool('true')
}
},
"abortInBlackoutPeriod": False,
"qosType": "kBackupHDD",
"description": "",
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
def CreatePhysicalProtectionJob(self, srcId, pgName):
apiCall = "/public/protectionJobs/"
payload = {
"name": pgName,
"environment": "kPhysical",
# NDIT - Bronze Policy
"policyId": "4847477517838800:1610060943809:6402",
# Default Storage Domain
"viewBoxId": 198,
# Physical Servers
"parentSourceId": 3149,
"sourceIds": [
srcId
],
"startTime": {
"hour": 18,
"minute": 00
},
"timezone": "America/Chicago",
"fullProtectionSlaTimeMins": 480,
"incrementalProtectionSlaTimeMins": 480,
"priority": "kLow",
"LeverageSanTransport": False,
# Default Indexing Policy
"indexingPolicy":{
"disableIndexing": False,
"allowPrefixes": [
"/"
],
"denyPrefixes": [
"/$Recycle.Bin",
"/Windows",
"/ProgramData",
"/System Volume Information",
"/Users/*/AppData",
"/Recovery",
"/usr",
"/sys",
"/proc",
"/lib",
"/grub",
"/grub2",
"/opt/splunk",
"/splunk",
]
},
"abortInBlackoutPeriod": False,
"performSourceSideDedup": True,
"qosType": "kBackupHDD",
"description": "",
"isPaused": False
}
encoded_payload = json.dumps(payload)
response = requests.request("POST", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
################ Security Methods #######################
def UpdatePermissions(self, sourceIds, cohesitySid):
apiCall = "/public/principals/protectionSources"
payload = {
"sourcesForPrincipals": [
{
"protectionSourceIds": sourceIds,
"sid": cohesitySid
}
]
}
encoded_payload = json.dumps(payload)
response = requests.request("PUT", self.base_url + apiCall, headers=self.headers, data=encoded_payload)
return (response)
+502
View File
@@ -0,0 +1,502 @@
import requests
import json
import os
import pytz
from datetime import datetime
from datetime import timedelta
from datetime import time
# API Documentation
# https://docs.servicenow.com/bundle/sandiego-application-development/page/build/applications/concept/api-rest.html
class SnowAPI:
def __init__(self, snInstance):
if not ((os.environ.get('SN_USER')) and (os.environ.get('SN_PASS'))):
print("\n*** Environment variables are not set for ServiceNow Authentication ***")
exit()
self.snInstance = snInstance
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
self.user = os.environ['SN_USER']
self.pwd = os.environ['SN_PASS']
def getGroupID(self, grpName):
grpName = grpName.replace(" ","%20")
url = 'https://{}/api/now/table/sys_user_group?sysparm_query=name={}'.format(self.snInstance, grpName)
apiResponse = requests.get(url, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
return apiData['result'][0]['sys_id']
def getUserID(self, userName):
userName = userName.replace(" ","%20")
apiCall = 'https://{}/api/now/table/sys_user?sysparm_query=user_name={}'.format(self.snInstance, userName)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
return apiData['result'][0]['sys_id']
def getCMDBItemByIP(self, pattern):
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=ip_address={1}&sysparm_limit=10".format(self.snInstance, pattern)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
try:
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
appName = appNameRec['name']
apiData['result'][0].update({
"u_nd_application_svc": {
"link": apiData['result'][0]['u_nd_application_svc']['link'],
"value": apiData['result'][0]['u_nd_application_svc']['value'],
"name": appName
}
})
except:
apiData['result'][0].update({
"u_nd_application_svc": {
"name": "UNDEFINED"
},
"environment": "UNDEFINED"
})
return apiData['result']
def getCMDBItemByHostName(self, pattern):
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=host_name={1}&sysparm_limit=10".format(self.snInstance, pattern)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
try:
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
appName = appNameRec['name']
apiData['result'][0].update({
"u_nd_application_svc": {
"link": apiData['result'][0]['u_nd_application_svc']['link'],
"value": apiData['result'][0]['u_nd_application_svc']['value'],
"name": appName
}
})
except:
apiData['result'][0].update({
"u_nd_application_svc": {
"name": "UNDEFINED"
},
"environment": "UNDEFINED"
})
return apiData['result']
def getCMDBItemByFQDN(self, pattern):
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=fqdn={1}&sysparm_limit=10".format(self.snInstance, pattern)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
if len(apiData['result']) > 0:
try:
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
appName = appNameRec['name']
apiData['result'][0].update({
"u_nd_application_svc": {
"link": apiData['result'][0]['u_nd_application_svc']['link'],
"value": apiData['result'][0]['u_nd_application_svc']['value'],
"name": appName
}
})
except:
print("Result 2: {0}".format(apiData))
apiData['result'][0].update({
"u_nd_application_svc": {
"name": "UNDEFINED"
},
"environment": "UNDEFINED"
})
return apiData['result']
def getCMDBAppById(self, sysId):
apiCall = "https://{0}/api/now/table/cmdb_ci_service_auto?sys_id={1}".format(self.snInstance, sysId)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
return apiData['result'][0]
def getStandardChangeTemplateID(self, templateName):
# Could be improved, might throw unexpected results if the group isn't found
# Needs error handling
allData = []
apiCall = "https://{0}/api/sn_chg_rest/change/standard/template?sysparm_query=active=true".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
for i in data['result']:
allData.append(i)
for record in allData:
if 'sys_name' in record:
if record['sys_name']['display_value'] == templateName:
sysID = record['sys_id']['value']
break
return sysID
def getRequestItemFromReqNum(self, reqNum):
apiCall = "https://{0}/api/now/table/sc_req_item?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
item = data['result'][0]['number']
return item
def getTaskNumFromReqNum(self, reqNum):
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
item = data['result'][0]['number']
return item
def getTaskSysIdFromReqNum(self, reqNum):
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
item = data['result'][0]['sys_id']
return item
def getServiceCatalogs(self):
apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getServiceCatalogCategories(self, sys_id):
apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs/{1}/categories".format(self.snInstance,sys_id)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getServiceCatalogItems(self):
apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getServiceCatalogItemByName(self, itemName):
apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
for item in items['result']:
if item['name'] == itemName:
return item
def getSpecificCatalogItem(self, sysId):
apiCall = "https://{0}/api/sn_sc/servicecatalog/items/{1}".format(self.snInstance,sysId)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getDataFromTaskByReqNum(self, sctask, reqNum):
# This is only written to handle one task per REQ number!!!
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_fields=state%2Cnumber%2Csys_id%2Cdescription%2Cshort_description%2Cvariables.application_name%2Cvariables.additional_comments%2Cclose_notes%2Cclosed_by&sysparm_limit=1&sysparm_display_value=true".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
if data['result'][0]['number'] == sctask:
items = data['result'][0]
else:
items = ""
return items
def updateTaskDescriptions(self, sysId, short_desc, desc):
apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId)
payload = {
'short_description': short_desc,
'description': desc
}
encoded_payload = json.dumps(payload)
apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to update task:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def assignTaskToUser(self, sysId, userName):
userID = self.getUserID(userName)
apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId)
payload = {
"assigned_to": userID
}
encoded_payload = json.dumps(payload)
apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to update task:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def postAppServerRequest(self,itemID,requestFor,requestBy,approver,requestType,comments,appName,env):
requestForID = self.getUserID(requestFor)
requestByID = self.getUserID(requestBy)
approvalID = self.getUserID(approver)
# Statically set these so we always force approvals to go to Computer Systems
departmentID = "f3c65cef1bfed050bba0113fad4bcb1d"
departmentCode = "112"
divisionID = "f40758231b321450bba0113fad4bcb2d"
divisionCode = "32"
payload = {
"get_portal_messages" : "true",
"sysparm_quantity" : "1",
"sysparm_no_validation" : "true",
"variables" : {
"v_approval_department" : departmentID,
"v_approval_department_code" : departmentCode,
"v_approval_division" : divisionID,
"v_approval_division_code" : divisionCode,
"v_manager" : approvalID,
"v_requested_by" : requestByID,
"v_requested_for" : requestForID,
"request_type" : requestType,
"additional_comments" : comments,
"application_name" : appName,
"environment" : env,
"require_hosting_quote" : "No",
"add_change_disaster_recovery" : "No"
}
}
apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID)
encoded_payload = json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
return apiResponse
def postGenericSerivceRequest(self,itemID,requestFor,requestBy,approver,requestType,comments):
requestForID = self.getUserID(requestFor)
requestByID = self.getUserID(requestBy)
approvalID = self.getUserID(approver)
departmentID = "f3c65cef1bfed050bba0113fad4bcb1d"
divisionID = "f40758231b321450bba0113fad4bcb2d"
payload = {
"get_portal_messages" : "true",
"sysparm_quantity" : "1",
"sysparm_no_validation" : "true",
"variables" : {
"v_approval_department" : departmentID,
"v_approval_division" : divisionID,
"v_manager" : approvalID,
"v_requested_by" : requestByID,
"v_requested_for" : requestForID,
"v_type" : requestType,
"additional_comments" : comments
}
}
apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID)
encoded_payload = json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
return apiResponse
def getGenericData(self, url):
apiCall = "https://{0}/".format(self.snInstance,url)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def submitTicket(self, user, subject, content):
apiCall = ('https://{}/api/now/table/incident').format(self.snInstance)
userID=self.getUserID(user)
payload = {
'caller_id': userID,
'short_description': subject,
'description': content
}
encoded_payload = json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def assignTicketToGroup(self, sysId, grpName):
grpId = self.getGroupID(grpName)
url = 'https://{}/api/now/table/incident/{}'.format(self.snInstance,sysId)
#Statically set the group to storage, this value was derived by decoding the web URL on storage tickets
payload = {
'assignment_group': grpId
}
encoded_payload = json.dumps(payload)
apiResponse = requests.put(url, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def assessNormalChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/normal/{1}".format(self.snInstance,sysID)
payload = {
"state": "assess"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def addStandardChangeNotes(self, sysID, txtWorkNotes):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID)
payload = {
"work_notes": txtWorkNotes
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def scheduleStandardChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID)
payload = {
"state": "Scheduled"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def implementStandardChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
payload = {
"state": "Implement"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def reviewStandardChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
payload = {
"state": "Review"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def closeStandardChange(self, sysID, code, notes):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
payload = {
"state": "Closed",
"close_code": code,
"close_notes": notes
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def CreateNormalChange(self, grpName, assignee, coordinator, approver, category, subCategory, shortDesc, desc, justDesc, implmntPlan, riskImpact, backoutPlan, testPlan, sTime, eTime):
# Creates a normal change request that will require approval.
grpID = self.getGroupID(grpName)
userID = self.getUserID(assignee)
chngCoordID = self.getUserID(coordinator)
chngMngrID = self.getUserID(approver)
apiCall = "https://{0}/api/sn_chg_rest/change/normal".format(self.snInstance)
payload = {
"category": category,
"u_subcategory": subCategory,
"u_change_manager": chngMngrID,
"assigned_to": userID,
"u_change_coordinator": chngCoordID,
"assignment_group": grpID,
"short_description": shortDesc,
"description": desc,
"justification": justDesc,
"implementation_plan": implmntPlan,
"risk_impact_analysis": riskImpact,
"backout_plan": backoutPlan,
"test_plan": testPlan,
"start_date": sTime,
"end_date": eTime,
"cab_required": False,
}
encoded_payload=json.dumps(payload)
changeResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
changeData = changeResponse.json()
return changeData
def CreateStandardChange(self, templateName, grpName, assignee, coordinator, approver, category, subcategory, txtShortDesc, txtJustification):
templateID = self.getStandardChangeTemplateID(templateName)
grpID = self.getGroupID(grpName)
userID = self.getUserID(assignee)
chngCoordID = self.getUserID(coordinator)
chngMngrID = self.getUserID(approver)
sTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
eTime = (datetime.now()+timedelta(minutes=+1)).strftime("%Y-%m-%d %H:%M:%S")
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,templateID)
payload={
"category": category,
"u_subcategory": subcategory,
"u_change_manager": chngMngrID,
"assigned_to": userID,
"u_change_coordinator": chngCoordID,
"assignment_group": grpID,
"short_description": txtShortDesc,
"justification": txtJustification,
"start_date": sTime,
"end_date": eTime
}
encoded_payload=json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
######################################### END CLASS ################################
@@ -0,0 +1,502 @@
import requests
import json
import os
import pytz
from datetime import datetime
from datetime import timedelta
from datetime import time
# API Documentation
# https://docs.servicenow.com/bundle/sandiego-application-development/page/build/applications/concept/api-rest.html
class SnowAPI:
def __init__(self, snInstance):
if not ((os.environ.get('SN_USER')) and (os.environ.get('SN_PASS'))):
print("\n*** Environment variables are not set for ServiceNow Authentication ***")
exit()
self.snInstance = snInstance
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
self.user = os.environ['SN_USER']
self.pwd = os.environ['SN_PASS']
def getGroupID(self, grpName):
grpName = grpName.replace(" ","%20")
url = 'https://{}/api/now/table/sys_user_group?sysparm_query=name={}'.format(self.snInstance, grpName)
apiResponse = requests.get(url, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
return apiData['result'][0]['sys_id']
def getUserID(self, userName):
userName = userName.replace(" ","%20")
apiCall = 'https://{}/api/now/table/sys_user?sysparm_query=user_name={}'.format(self.snInstance, userName)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
return apiData['result'][0]['sys_id']
def getCMDBItemByIP(self, pattern):
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=ip_address={1}&sysparm_limit=10".format(self.snInstance, pattern)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
try:
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
appName = appNameRec['name']
apiData['result'][0].update({
"u_nd_application_svc": {
"link": apiData['result'][0]['u_nd_application_svc']['link'],
"value": apiData['result'][0]['u_nd_application_svc']['value'],
"name": appName
}
})
except:
apiData['result'][0].update({
"u_nd_application_svc": {
"name": "UNDEFINED"
},
"environment": "UNDEFINED"
})
return apiData['result']
def getCMDBItemByHostName(self, pattern):
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=host_name={1}&sysparm_limit=10".format(self.snInstance, pattern)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
try:
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
appName = appNameRec['name']
apiData['result'][0].update({
"u_nd_application_svc": {
"link": apiData['result'][0]['u_nd_application_svc']['link'],
"value": apiData['result'][0]['u_nd_application_svc']['value'],
"name": appName
}
})
except:
apiData['result'][0].update({
"u_nd_application_svc": {
"name": "UNDEFINED"
},
"environment": "UNDEFINED"
})
return apiData['result']
def getCMDBItemByFQDN(self, pattern):
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=fqdn={1}&sysparm_limit=10".format(self.snInstance, pattern)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
if len(apiData['result']) > 0:
try:
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
appName = appNameRec['name']
apiData['result'][0].update({
"u_nd_application_svc": {
"link": apiData['result'][0]['u_nd_application_svc']['link'],
"value": apiData['result'][0]['u_nd_application_svc']['value'],
"name": appName
}
})
except:
print("Result 2: {0}".format(apiData))
apiData['result'][0].update({
"u_nd_application_svc": {
"name": "UNDEFINED"
},
"environment": "UNDEFINED"
})
return apiData['result']
def getCMDBAppById(self, sysId):
apiCall = "https://{0}/api/now/table/cmdb_ci_service_auto?sys_id={1}".format(self.snInstance, sysId)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
apiData = apiResponse.json()
return apiData['result'][0]
def getStandardChangeTemplateID(self, templateName):
# Could be improved, might throw unexpected results if the group isn't found
# Needs error handling
allData = []
apiCall = "https://{0}/api/sn_chg_rest/change/standard/template?sysparm_query=active=true".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
for i in data['result']:
allData.append(i)
for record in allData:
if 'sys_name' in record:
if record['sys_name']['display_value'] == templateName:
sysID = record['sys_id']['value']
break
return sysID
def getRequestItemFromReqNum(self, reqNum):
apiCall = "https://{0}/api/now/table/sc_req_item?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
item = data['result'][0]['number']
return item
def getTaskNumFromReqNum(self, reqNum):
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
item = data['result'][0]['number']
return item
def getTaskSysIdFromReqNum(self, reqNum):
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
item = data['result'][0]['sys_id']
return item
def getServiceCatalogs(self):
apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getServiceCatalogCategories(self, sys_id):
apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs/{1}/categories".format(self.snInstance,sys_id)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getServiceCatalogItems(self):
apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getServiceCatalogItemByName(self, itemName):
apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
for item in items['result']:
if item['name'] == itemName:
return item
def getSpecificCatalogItem(self, sysId):
apiCall = "https://{0}/api/sn_sc/servicecatalog/items/{1}".format(self.snInstance,sysId)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def getDataFromTaskByReqNum(self, sctask, reqNum):
# This is only written to handle one task per REQ number!!!
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_fields=state%2Cnumber%2Csys_id%2Cdescription%2Cshort_description%2Cvariables.application_name%2Cvariables.additional_comments%2Cclose_notes%2Cclosed_by&sysparm_limit=1&sysparm_display_value=true".format(self.snInstance, reqNum)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
data = apiResponse.json()
if data['result'][0]['number'] == sctask:
items = data['result'][0]
else:
items = ""
return items
def updateTaskDescriptions(self, sysId, short_desc, desc):
apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId)
payload = {
'short_description': short_desc,
'description': desc
}
encoded_payload = json.dumps(payload)
apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to update task:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def assignTaskToUser(self, sysId, userName):
userID = self.getUserID(userName)
apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId)
payload = {
"assigned_to": userID
}
encoded_payload = json.dumps(payload)
apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to update task:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def postAppServerRequest(self,itemID,requestFor,requestBy,approver,requestType,comments,appName,env):
requestForID = self.getUserID(requestFor)
requestByID = self.getUserID(requestBy)
approvalID = self.getUserID(approver)
# Statically set these so we always force approvals to go to Computer Systems
departmentID = "f3c65cef1bfed050bba0113fad4bcb1d"
departmentCode = "112"
divisionID = "f40758231b321450bba0113fad4bcb2d"
divisionCode = "32"
payload = {
"get_portal_messages" : "true",
"sysparm_quantity" : "1",
"sysparm_no_validation" : "true",
"variables" : {
"v_approval_department" : departmentID,
"v_approval_department_code" : departmentCode,
"v_approval_division" : divisionID,
"v_approval_division_code" : divisionCode,
"v_manager" : approvalID,
"v_requested_by" : requestByID,
"v_requested_for" : requestForID,
"request_type" : requestType,
"additional_comments" : comments,
"application_name" : appName,
"environment" : env,
"require_hosting_quote" : "No",
"add_change_disaster_recovery" : "No"
}
}
apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID)
encoded_payload = json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
return apiResponse
def postGenericSerivceRequest(self,itemID,requestFor,requestBy,approver,requestType,comments):
requestForID = self.getUserID(requestFor)
requestByID = self.getUserID(requestBy)
approvalID = self.getUserID(approver)
departmentID = "f3c65cef1bfed050bba0113fad4bcb1d"
divisionID = "f40758231b321450bba0113fad4bcb2d"
payload = {
"get_portal_messages" : "true",
"sysparm_quantity" : "1",
"sysparm_no_validation" : "true",
"variables" : {
"v_approval_department" : departmentID,
"v_approval_division" : divisionID,
"v_manager" : approvalID,
"v_requested_by" : requestByID,
"v_requested_for" : requestForID,
"v_type" : requestType,
"additional_comments" : comments
}
}
apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID)
encoded_payload = json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
return apiResponse
def getGenericData(self, url):
apiCall = "https://{0}/".format(self.snInstance,url)
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
items = apiResponse.json()
return items
def submitTicket(self, user, subject, content):
apiCall = ('https://{}/api/now/table/incident').format(self.snInstance)
userID=self.getUserID(user)
payload = {
'caller_id': userID,
'short_description': subject,
'description': content
}
encoded_payload = json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def assignTicketToGroup(self, sysId, grpName):
grpId = self.getGroupID(grpName)
url = 'https://{}/api/now/table/incident/{}'.format(self.snInstance,sysId)
#Statically set the group to storage, this value was derived by decoding the web URL on storage tickets
payload = {
'assignment_group': grpId
}
encoded_payload = json.dumps(payload)
apiResponse = requests.put(url, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
print('Failed to create ticket:')
print('\tStatus:', apiResponse.status_code)
print('\tError Response:',apiResponse.json())
exit(1)
else:
return apiResponse.json()
def assessNormalChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/normal/{1}".format(self.snInstance,sysID)
payload = {
"state": "assess"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def addStandardChangeNotes(self, sysID, txtWorkNotes):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID)
payload = {
"work_notes": txtWorkNotes
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def scheduleStandardChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID)
payload = {
"state": "Scheduled"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def implementStandardChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
payload = {
"state": "Implement"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def reviewStandardChange(self, sysID):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
payload = {
"state": "Review"
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def closeStandardChange(self, sysID, code, notes):
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
payload = {
"state": "Closed",
"close_code": code,
"close_notes": notes
}
encoded_payload=json.dumps(payload)
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
def CreateNormalChange(self, grpName, assignee, coordinator, approver, category, subCategory, shortDesc, desc, justDesc, implmntPlan, riskImpact, backoutPlan, testPlan, sTime, eTime):
# Creates a normal change request that will require approval.
grpID = self.getGroupID(grpName)
userID = self.getUserID(assignee)
chngCoordID = self.getUserID(coordinator)
chngMngrID = self.getUserID(approver)
apiCall = "https://{0}/api/sn_chg_rest/change/normal".format(self.snInstance)
payload = {
"category": category,
"u_subcategory": subCategory,
"u_change_manager": chngMngrID,
"assigned_to": userID,
"u_change_coordinator": chngCoordID,
"assignment_group": grpID,
"short_description": shortDesc,
"description": desc,
"justification": justDesc,
"implementation_plan": implmntPlan,
"risk_impact_analysis": riskImpact,
"backout_plan": backoutPlan,
"test_plan": testPlan,
"start_date": sTime,
"end_date": eTime,
"cab_required": False,
}
encoded_payload=json.dumps(payload)
changeResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
changeData = changeResponse.json()
return changeData
def CreateStandardChange(self, templateName, grpName, assignee, coordinator, approver, category, subcategory, txtShortDesc, txtJustification):
templateID = self.getStandardChangeTemplateID(templateName)
grpID = self.getGroupID(grpName)
userID = self.getUserID(assignee)
chngCoordID = self.getUserID(coordinator)
chngMngrID = self.getUserID(approver)
sTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
eTime = (datetime.now()+timedelta(minutes=+1)).strftime("%Y-%m-%d %H:%M:%S")
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,templateID)
payload={
"category": category,
"u_subcategory": subcategory,
"u_change_manager": chngMngrID,
"assigned_to": userID,
"u_change_coordinator": chngCoordID,
"assignment_group": grpID,
"short_description": txtShortDesc,
"justification": txtJustification,
"start_date": sTime,
"end_date": eTime
}
encoded_payload=json.dumps(payload)
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
return apiResponse.json()
######################################### END CLASS ################################
@@ -0,0 +1,175 @@
import os
import argparse
import json
import urllib3
import requests
from requests_ntlm import HttpNtlmAuth
# Use pip install requests_ntlm to install the necessary modules
class API(object):
def __init__(self):
if not ((os.environ.get('ITD_SHAREPOINT_PASS')) and (os.environ.get('ITD_SHAREPOINT_USER'))):
print("\n*** ERROR: Environment Variables are not set for SharePoint ***")
print("Set OS environment variables ITD_SHAREPOINT_PASS and ITD_SHAREPOINT_USER")
exit(1)
else:
# Username is in the format of 'NDGOV\\<userName>'
sp_user = os.environ['ITD_SHAREPOINT_USER']
sp_pass = os.environ['ITD_SHAREPOINT_PASS']
self.server_url = "https://share.nd.gov/"
self.site_url = self.server_url + "itd/Computer-Systems/Distributed-Systems/VMWare/_api/web/"
self.context_url = self.server_url + "itd/Computer-Systems/Distributed-Systems/VMWare/_api/contextinfo/"
self.env = "All"
self.headers = {
"Accept":"application/json; odata=verbose",
"Content-Type":"application/json; odata=verbose",
"odata":"verbose",
"X-RequestForceAuthentication":"true"
}
self.auth = HttpNtlmAuth(sp_user, sp_pass)
def get_AllNodes(self):
list_name = 'VM Guests'
# Get list information
requests.packages.urllib3.disable_warnings()
r = requests.get(self.site_url + "lists/GetByTitle('%s')" % list_name, auth=self.auth, headers=self.headers, verify=False)
list_guid = r.json()['d']['Id']
list_itemcount = r.json()['d']['ItemCount']
# Query list items
api_items_url = self.site_url + "Lists(guid'%s')/Items" % list_guid
concat_items = []
# Parse the list
requests.packages.urllib3.disable_warnings()
cur_page = requests.get(api_items_url, auth=self.auth, headers=self.headers, verify=False)
concat_items += cur_page.json()['d']['results']
# Sharepoint uses pagination with '__next' in the JSON results to indicate additional pages of information
# are present. Loop through the pages and concatenate the information into one dataset
while '__next' in cur_page.json()['d']:
requests.packages.urllib3.disable_warnings()
cur_page = requests.get(cur_page.json()['d']['__next'], auth=self.auth, headers=self.headers, verify=False)
concat_items += cur_page.json()['d']['results']
return concat_items
def get_ActiveNodes(self, e):
self.env = e
list_name = 'VM Guests'
# Get list information
requests.packages.urllib3.disable_warnings()
r = requests.get(self.site_url + "lists/GetByTitle('%s')" % list_name, auth=self.auth, headers=self.headers, verify=False)
list_guid = r.json()['d']['Id']
list_itemcount = r.json()['d']['ItemCount']
# Query list items
api_items_url = self.site_url + "Lists(guid'%s')/Items" % list_guid
if self.env == 'All':
active_items_url = api_items_url + "?$filter=(Status eq 'Current') or (Status eq 'Change')"
else:
active_items_url = api_items_url + "?$filter=(Environment eq '%s') and ((Status eq 'Current') or (Status eq 'Change'))" % self.env
concat_items = []
# Parse the list
requests.packages.urllib3.disable_warnings()
cur_page = requests.get(active_items_url, auth=self.auth, headers=self.headers, verify=False)
concat_items += cur_page.json()['d']['results']
# Sharepoint uses pagination with '__next' in the JSON results to indicate additional pages of information
# are present. Loop through the pages and concatenate the information into one dataset
while '__next' in cur_page.json()['d']:
requests.packages.urllib3.disable_warnings()
cur_page = requests.get(cur_page.json()['d']['__next'], auth=self.auth, headers=self.headers, verify=False)
concat_items += cur_page.json()['d']['results']
return concat_items
def find_VM(self, hostname):
domains = [".nd.gov", ".ndtestdev.nd.dev", ".ndcloud.gov", ".testnd.gov", ".itd.nd.gov", ".ns.nd.gov", ".cloudapp.azure.com", ".uat.mmis.nd.gov", ".test.ndcloud.gov", ".k12tst.nd.us", ".legend.nd.gov", ".tst.mmis.dev", ".trn.mmis.nd.gov", ".sit.mmis.nd.gov", ".nd.gov-mag", ".uat.mmis.dev", ".mo.legend.nd.gov", ".video.nd.gov", ".mmis.nd.gov", ".sit.legend.nd.gov", ".trn.mmis.dev", ".prm.mmis.nd.gov", ".state.nd.us", ".k12.nd.us", ".stg.k12.nd.us", ".sit.mmis.dev", "ndnic.com"]
for domain in domains:
try:
data = self.get_Node(hostname + domain)
if 'Title' in data[0]:
hostname = data[0]['Title']
break
except:
continue
return(hostname)
def get_Node(self, server):
myNode = server
list_name = 'VM Guests'
# Get list information
requests.packages.urllib3.disable_warnings()
r = requests.get(self.site_url + "lists/GetByTitle('%s')" % list_name, auth=self.auth, headers=self.headers, verify=False)
list_guid = r.json()['d']['Id']
# Query list items
api_items_url = self.site_url + "Lists(guid'%s')/Items" % list_guid
singleServer_url = api_items_url + "?$filter=Title eq '%s'" % myNode
cur_page = requests.get(singleServer_url, auth=self.auth, headers=self.headers, verify=False)
return cur_page.json()['d']['results']
def get_Agency(self, aID):
agencyID = aID
list_name = 'Agency'
# Get list information
requests.packages.urllib3.disable_warnings()
r = requests.get(self.site_url + "lists/GetByTitle('%s')" % list_name, auth=self.auth, headers=self.headers, verify=False)
list_guid = r.json()['d']['Id']
# Query list items
api_items_url = self.site_url + "Lists(guid'%s')/Items" % list_guid
singleServer_url = api_items_url + "?$filter=ID eq '%s'" % agencyID
cur_page = requests.get(singleServer_url, auth=self.auth, headers=self.headers, verify=False)
return cur_page.json()['d']['results'][0]['Title']
def get_AppName(self, appID):
applicationID = appID
list_name = 'VM App Name'
# Get list information
requests.packages.urllib3.disable_warnings()
r = requests.get(self.site_url + "lists/GetByTitle('%s')" % list_name, auth=self.auth, headers=self.headers, verify=False)
list_guid = r.json()['d']['Id']
# Query list items
api_items_url = self.site_url + "Lists(guid'%s')/Items" % list_guid
singleServer_url = api_items_url + "?$filter=ID eq '%s'" % applicationID
cur_page = requests.get(singleServer_url, auth=self.auth, headers=self.headers, verify=False)
return cur_page.json()['d']['results'][0]['Title']
def get_AppNodes(self, appID):
applicationID = appID
list_name = 'VM Guests'
# Get list information
requests.packages.urllib3.disable_warnings()
r = requests.get(self.site_url + "lists/GetByTitle('%s')" % list_name, auth=self.auth, headers=self.headers, verify=False)
list_guid = r.json()['d']['Id']
# Query list items
api_items_url = self.site_url + "Lists(guid'%s')/Items" % list_guid
singleServer_url = api_items_url + "?$filter=AppName eq '%s'" % appID
cur_page = requests.get(singleServer_url, auth=self.auth, headers=self.headers, verify=False)
return cur_page.json()['d']['results']