503 lines
21 KiB
Python
503 lines
21 KiB
Python
import requests
|
|
import json
|
|
import os
|
|
import pytz
|
|
from datetime import datetime
|
|
from datetime import timedelta
|
|
from datetime import time
|
|
|
|
# API Documentation
|
|
# https://docs.servicenow.com/bundle/sandiego-application-development/page/build/applications/concept/api-rest.html
|
|
|
|
class SnowAPI:
|
|
def __init__(self, snInstance):
|
|
if not ((os.environ.get('SN_USER')) and (os.environ.get('SN_PASS'))):
|
|
print("\n*** Environment variables are not set for ServiceNow Authentication ***")
|
|
exit()
|
|
|
|
self.snInstance = snInstance
|
|
self.headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
|
|
self.user = os.environ['SN_USER']
|
|
self.pwd = os.environ['SN_PASS']
|
|
|
|
def getGroupID(self, grpName):
|
|
grpName = grpName.replace(" ","%20")
|
|
url = 'https://{}/api/now/table/sys_user_group?sysparm_query=name={}'.format(self.snInstance, grpName)
|
|
apiResponse = requests.get(url, auth=(self.user, self.pwd), headers=self.headers)
|
|
apiData = apiResponse.json()
|
|
|
|
return apiData['result'][0]['sys_id']
|
|
|
|
def getUserID(self, userName):
|
|
userName = userName.replace(" ","%20")
|
|
apiCall = 'https://{}/api/now/table/sys_user?sysparm_query=user_name={}'.format(self.snInstance, userName)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
apiData = apiResponse.json()
|
|
|
|
return apiData['result'][0]['sys_id']
|
|
|
|
def getCMDBItemByIP(self, pattern):
|
|
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=ip_address={1}&sysparm_limit=10".format(self.snInstance, pattern)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
apiData = apiResponse.json()
|
|
try:
|
|
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
|
|
appName = appNameRec['name']
|
|
apiData['result'][0].update({
|
|
"u_nd_application_svc": {
|
|
"link": apiData['result'][0]['u_nd_application_svc']['link'],
|
|
"value": apiData['result'][0]['u_nd_application_svc']['value'],
|
|
"name": appName
|
|
}
|
|
})
|
|
except:
|
|
apiData['result'][0].update({
|
|
"u_nd_application_svc": {
|
|
"name": "UNDEFINED"
|
|
},
|
|
"environment": "UNDEFINED"
|
|
})
|
|
|
|
return apiData['result']
|
|
|
|
def getCMDBItemByHostName(self, pattern):
|
|
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=host_name={1}&sysparm_limit=10".format(self.snInstance, pattern)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
apiData = apiResponse.json()
|
|
try:
|
|
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
|
|
appName = appNameRec['name']
|
|
apiData['result'][0].update({
|
|
"u_nd_application_svc": {
|
|
"link": apiData['result'][0]['u_nd_application_svc']['link'],
|
|
"value": apiData['result'][0]['u_nd_application_svc']['value'],
|
|
"name": appName
|
|
}
|
|
})
|
|
except:
|
|
apiData['result'][0].update({
|
|
"u_nd_application_svc": {
|
|
"name": "UNDEFINED"
|
|
},
|
|
"environment": "UNDEFINED"
|
|
})
|
|
|
|
return apiData['result']
|
|
|
|
def getCMDBItemByFQDN(self, pattern):
|
|
apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=fqdn={1}&sysparm_limit=10".format(self.snInstance, pattern)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
apiData = apiResponse.json()
|
|
if len(apiData['result']) > 0:
|
|
try:
|
|
appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value'])
|
|
appName = appNameRec['name']
|
|
apiData['result'][0].update({
|
|
"u_nd_application_svc": {
|
|
"link": apiData['result'][0]['u_nd_application_svc']['link'],
|
|
"value": apiData['result'][0]['u_nd_application_svc']['value'],
|
|
"name": appName
|
|
}
|
|
})
|
|
except:
|
|
print("Result 2: {0}".format(apiData))
|
|
apiData['result'][0].update({
|
|
"u_nd_application_svc": {
|
|
"name": "UNDEFINED"
|
|
},
|
|
"environment": "UNDEFINED"
|
|
})
|
|
return apiData['result']
|
|
|
|
def getCMDBAppById(self, sysId):
|
|
apiCall = "https://{0}/api/now/table/cmdb_ci_service_auto?sys_id={1}".format(self.snInstance, sysId)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
apiData = apiResponse.json()
|
|
|
|
return apiData['result'][0]
|
|
|
|
def getStandardChangeTemplateID(self, templateName):
|
|
# Could be improved, might throw unexpected results if the group isn't found
|
|
# Needs error handling
|
|
|
|
allData = []
|
|
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/template?sysparm_query=active=true".format(self.snInstance)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
data = apiResponse.json()
|
|
|
|
for i in data['result']:
|
|
allData.append(i)
|
|
|
|
for record in allData:
|
|
if 'sys_name' in record:
|
|
if record['sys_name']['display_value'] == templateName:
|
|
sysID = record['sys_id']['value']
|
|
break
|
|
|
|
return sysID
|
|
|
|
def getRequestItemFromReqNum(self, reqNum):
|
|
apiCall = "https://{0}/api/now/table/sc_req_item?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
data = apiResponse.json()
|
|
item = data['result'][0]['number']
|
|
|
|
return item
|
|
|
|
def getTaskNumFromReqNum(self, reqNum):
|
|
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
data = apiResponse.json()
|
|
item = data['result'][0]['number']
|
|
|
|
return item
|
|
|
|
def getTaskSysIdFromReqNum(self, reqNum):
|
|
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
data = apiResponse.json()
|
|
item = data['result'][0]['sys_id']
|
|
|
|
return item
|
|
|
|
def getServiceCatalogs(self):
|
|
apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs".format(self.snInstance)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
items = apiResponse.json()
|
|
|
|
return items
|
|
|
|
def getServiceCatalogCategories(self, sys_id):
|
|
apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs/{1}/categories".format(self.snInstance,sys_id)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
items = apiResponse.json()
|
|
|
|
return items
|
|
|
|
def getServiceCatalogItems(self):
|
|
apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
items = apiResponse.json()
|
|
|
|
return items
|
|
|
|
def getServiceCatalogItemByName(self, itemName):
|
|
apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
items = apiResponse.json()
|
|
|
|
for item in items['result']:
|
|
if item['name'] == itemName:
|
|
return item
|
|
|
|
def getSpecificCatalogItem(self, sysId):
|
|
apiCall = "https://{0}/api/sn_sc/servicecatalog/items/{1}".format(self.snInstance,sysId)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
items = apiResponse.json()
|
|
|
|
return items
|
|
|
|
def getDataFromTaskByReqNum(self, sctask, reqNum):
|
|
# This is only written to handle one task per REQ number!!!
|
|
apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_fields=state%2Cnumber%2Csys_id%2Cdescription%2Cshort_description%2Cvariables.application_name%2Cvariables.additional_comments%2Cclose_notes%2Cclosed_by&sysparm_limit=1&sysparm_display_value=true".format(self.snInstance, reqNum)
|
|
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
data = apiResponse.json()
|
|
|
|
if data['result'][0]['number'] == sctask:
|
|
items = data['result'][0]
|
|
else:
|
|
items = ""
|
|
|
|
return items
|
|
|
|
def updateTaskDescriptions(self, sysId, short_desc, desc):
|
|
apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId)
|
|
|
|
payload = {
|
|
'short_description': short_desc,
|
|
'description': desc
|
|
}
|
|
encoded_payload = json.dumps(payload)
|
|
apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
|
|
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
|
|
print('Failed to update task:')
|
|
print('\tStatus:', apiResponse.status_code)
|
|
print('\tError Response:',apiResponse.json())
|
|
exit(1)
|
|
else:
|
|
return apiResponse.json()
|
|
|
|
def assignTaskToUser(self, sysId, userName):
|
|
userID = self.getUserID(userName)
|
|
apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId)
|
|
payload = {
|
|
"assigned_to": userID
|
|
}
|
|
|
|
encoded_payload = json.dumps(payload)
|
|
apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
|
|
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
|
|
print('Failed to update task:')
|
|
print('\tStatus:', apiResponse.status_code)
|
|
print('\tError Response:',apiResponse.json())
|
|
exit(1)
|
|
else:
|
|
return apiResponse.json()
|
|
|
|
def postAppServerRequest(self,itemID,requestFor,requestBy,approver,requestType,comments,appName,env):
|
|
requestForID = self.getUserID(requestFor)
|
|
requestByID = self.getUserID(requestBy)
|
|
approvalID = self.getUserID(approver)
|
|
|
|
# Statically set these so we always force approvals to go to Computer Systems
|
|
departmentID = "f3c65cef1bfed050bba0113fad4bcb1d"
|
|
departmentCode = "112"
|
|
divisionID = "f40758231b321450bba0113fad4bcb2d"
|
|
divisionCode = "32"
|
|
|
|
payload = {
|
|
"get_portal_messages" : "true",
|
|
"sysparm_quantity" : "1",
|
|
"sysparm_no_validation" : "true",
|
|
"variables" : {
|
|
"v_approval_department" : departmentID,
|
|
"v_approval_department_code" : departmentCode,
|
|
"v_approval_division" : divisionID,
|
|
"v_approval_division_code" : divisionCode,
|
|
"v_manager" : approvalID,
|
|
"v_requested_by" : requestByID,
|
|
"v_requested_for" : requestForID,
|
|
"request_type" : requestType,
|
|
"additional_comments" : comments,
|
|
"application_name" : appName,
|
|
"environment" : env,
|
|
"require_hosting_quote" : "No",
|
|
"add_change_disaster_recovery" : "No"
|
|
}
|
|
}
|
|
|
|
apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID)
|
|
|
|
encoded_payload = json.dumps(payload)
|
|
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
|
|
print('Failed to create ticket:')
|
|
print('\tStatus:', apiResponse.status_code)
|
|
print('\tError Response:',apiResponse.json())
|
|
exit(1)
|
|
else:
|
|
return apiResponse.json()
|
|
|
|
return apiResponse
|
|
|
|
def postGenericSerivceRequest(self,itemID,requestFor,requestBy,approver,requestType,comments):
|
|
requestForID = self.getUserID(requestFor)
|
|
requestByID = self.getUserID(requestBy)
|
|
approvalID = self.getUserID(approver)
|
|
departmentID = "f3c65cef1bfed050bba0113fad4bcb1d"
|
|
divisionID = "f40758231b321450bba0113fad4bcb2d"
|
|
|
|
payload = {
|
|
"get_portal_messages" : "true",
|
|
"sysparm_quantity" : "1",
|
|
"sysparm_no_validation" : "true",
|
|
"variables" : {
|
|
"v_approval_department" : departmentID,
|
|
"v_approval_division" : divisionID,
|
|
"v_manager" : approvalID,
|
|
"v_requested_by" : requestByID,
|
|
"v_requested_for" : requestForID,
|
|
"v_type" : requestType,
|
|
"additional_comments" : comments
|
|
}
|
|
}
|
|
|
|
apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID)
|
|
encoded_payload = json.dumps(payload)
|
|
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
|
|
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
|
|
print('Failed to create ticket:')
|
|
print('\tStatus:', apiResponse.status_code)
|
|
print('\tError Response:',apiResponse.json())
|
|
exit(1)
|
|
else:
|
|
return apiResponse.json()
|
|
|
|
return apiResponse
|
|
|
|
def getGenericData(self, url):
|
|
apiCall = "https://{0}/".format(self.snInstance,url)
|
|
apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers)
|
|
items = apiResponse.json()
|
|
|
|
return items
|
|
|
|
def submitTicket(self, user, subject, content):
|
|
apiCall = ('https://{}/api/now/table/incident').format(self.snInstance)
|
|
userID=self.getUserID(user)
|
|
|
|
payload = {
|
|
'caller_id': userID,
|
|
'short_description': subject,
|
|
'description': content
|
|
}
|
|
|
|
encoded_payload = json.dumps(payload)
|
|
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
|
|
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
|
|
print('Failed to create ticket:')
|
|
print('\tStatus:', apiResponse.status_code)
|
|
print('\tError Response:',apiResponse.json())
|
|
exit(1)
|
|
else:
|
|
return apiResponse.json()
|
|
|
|
def assignTicketToGroup(self, sysId, grpName):
|
|
grpId = self.getGroupID(grpName)
|
|
url = 'https://{}/api/now/table/incident/{}'.format(self.snInstance,sysId)
|
|
|
|
#Statically set the group to storage, this value was derived by decoding the web URL on storage tickets
|
|
payload = {
|
|
'assignment_group': grpId
|
|
}
|
|
encoded_payload = json.dumps(payload)
|
|
apiResponse = requests.put(url, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
|
|
if apiResponse.status_code != 201 and apiResponse.status_code != 200:
|
|
print('Failed to create ticket:')
|
|
print('\tStatus:', apiResponse.status_code)
|
|
print('\tError Response:',apiResponse.json())
|
|
exit(1)
|
|
else:
|
|
return apiResponse.json()
|
|
|
|
def assessNormalChange(self, sysID):
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/normal/{1}".format(self.snInstance,sysID)
|
|
payload = {
|
|
"state": "assess"
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
def addStandardChangeNotes(self, sysID, txtWorkNotes):
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID)
|
|
payload = {
|
|
"work_notes": txtWorkNotes
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
def scheduleStandardChange(self, sysID):
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID)
|
|
payload = {
|
|
"state": "Scheduled"
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
def implementStandardChange(self, sysID):
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
|
|
payload = {
|
|
"state": "Implement"
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
def reviewStandardChange(self, sysID):
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
|
|
payload = {
|
|
"state": "Review"
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
def closeStandardChange(self, sysID, code, notes):
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID)
|
|
payload = {
|
|
"state": "Closed",
|
|
"close_code": code,
|
|
"close_notes": notes
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
def CreateNormalChange(self, grpName, assignee, coordinator, approver, category, subCategory, shortDesc, desc, justDesc, implmntPlan, riskImpact, backoutPlan, testPlan, sTime, eTime):
|
|
|
|
# Creates a normal change request that will require approval.
|
|
|
|
grpID = self.getGroupID(grpName)
|
|
userID = self.getUserID(assignee)
|
|
chngCoordID = self.getUserID(coordinator)
|
|
chngMngrID = self.getUserID(approver)
|
|
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/normal".format(self.snInstance)
|
|
|
|
payload = {
|
|
"category": category,
|
|
"u_subcategory": subCategory,
|
|
"u_change_manager": chngMngrID,
|
|
"assigned_to": userID,
|
|
"u_change_coordinator": chngCoordID,
|
|
"assignment_group": grpID,
|
|
"short_description": shortDesc,
|
|
"description": desc,
|
|
"justification": justDesc,
|
|
"implementation_plan": implmntPlan,
|
|
"risk_impact_analysis": riskImpact,
|
|
"backout_plan": backoutPlan,
|
|
"test_plan": testPlan,
|
|
"start_date": sTime,
|
|
"end_date": eTime,
|
|
"cab_required": False,
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
changeResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
changeData = changeResponse.json()
|
|
|
|
return changeData
|
|
|
|
def CreateStandardChange(self, templateName, grpName, assignee, coordinator, approver, category, subcategory, txtShortDesc, txtJustification):
|
|
templateID = self.getStandardChangeTemplateID(templateName)
|
|
grpID = self.getGroupID(grpName)
|
|
userID = self.getUserID(assignee)
|
|
chngCoordID = self.getUserID(coordinator)
|
|
chngMngrID = self.getUserID(approver)
|
|
|
|
sTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
eTime = (datetime.now()+timedelta(minutes=+1)).strftime("%Y-%m-%d %H:%M:%S")
|
|
apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,templateID)
|
|
|
|
payload={
|
|
"category": category,
|
|
"u_subcategory": subcategory,
|
|
"u_change_manager": chngMngrID,
|
|
"assigned_to": userID,
|
|
"u_change_coordinator": chngCoordID,
|
|
"assignment_group": grpID,
|
|
"short_description": txtShortDesc,
|
|
"justification": txtJustification,
|
|
"start_date": sTime,
|
|
"end_date": eTime
|
|
}
|
|
encoded_payload=json.dumps(payload)
|
|
apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload)
|
|
return apiResponse.json()
|
|
|
|
######################################### END CLASS ################################
|