import requests import json import os import pytz from datetime import datetime from datetime import timedelta from datetime import time # API Documentation # https://docs.servicenow.com/bundle/sandiego-application-development/page/build/applications/concept/api-rest.html class SnowAPI: def __init__(self, snInstance): if not ((os.environ.get('SN_USER')) and (os.environ.get('SN_PASS'))): print("\n*** Environment variables are not set for ServiceNow Authentication ***") exit() self.snInstance = snInstance self.headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } self.user = os.environ['SN_USER'] self.pwd = os.environ['SN_PASS'] def getGroupID(self, grpName): grpName = grpName.replace(" ","%20") url = 'https://{}/api/now/table/sys_user_group?sysparm_query=name={}'.format(self.snInstance, grpName) apiResponse = requests.get(url, auth=(self.user, self.pwd), headers=self.headers) apiData = apiResponse.json() return apiData['result'][0]['sys_id'] def getUserID(self, userName): userName = userName.replace(" ","%20") apiCall = 'https://{}/api/now/table/sys_user?sysparm_query=user_name={}'.format(self.snInstance, userName) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) apiData = apiResponse.json() return apiData['result'][0]['sys_id'] def getCMDBItemByIP(self, pattern): apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=ip_address={1}&sysparm_limit=10".format(self.snInstance, pattern) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) apiData = apiResponse.json() try: appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value']) appName = appNameRec['name'] apiData['result'][0].update({ "u_nd_application_svc": { "link": apiData['result'][0]['u_nd_application_svc']['link'], "value": apiData['result'][0]['u_nd_application_svc']['value'], "name": appName } }) except: apiData['result'][0].update({ "u_nd_application_svc": { "name": "UNDEFINED" }, "environment": "UNDEFINED" }) return apiData['result'] def getCMDBItemByHostName(self, pattern): apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=host_name={1}&sysparm_limit=10".format(self.snInstance, pattern) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) apiData = apiResponse.json() try: appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value']) appName = appNameRec['name'] apiData['result'][0].update({ "u_nd_application_svc": { "link": apiData['result'][0]['u_nd_application_svc']['link'], "value": apiData['result'][0]['u_nd_application_svc']['value'], "name": appName } }) except: apiData['result'][0].update({ "u_nd_application_svc": { "name": "UNDEFINED" }, "environment": "UNDEFINED" }) return apiData['result'] def getCMDBItemByFQDN(self, pattern): apiCall = "https://{0}/api/now/table/cmdb_ci_server?sysparm_query=fqdn={1}&sysparm_limit=10".format(self.snInstance, pattern) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) apiData = apiResponse.json() if len(apiData['result']) > 0: try: appNameRec = self.getCMDBAppById(apiData['result'][0]['u_nd_application_svc']['value']) appName = appNameRec['name'] apiData['result'][0].update({ "u_nd_application_svc": { "link": apiData['result'][0]['u_nd_application_svc']['link'], "value": apiData['result'][0]['u_nd_application_svc']['value'], "name": appName } }) except: print("Result 2: {0}".format(apiData)) apiData['result'][0].update({ "u_nd_application_svc": { "name": "UNDEFINED" }, "environment": "UNDEFINED" }) return apiData['result'] def getCMDBAppById(self, sysId): apiCall = "https://{0}/api/now/table/cmdb_ci_service_auto?sys_id={1}".format(self.snInstance, sysId) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) apiData = apiResponse.json() return apiData['result'][0] def getStandardChangeTemplateID(self, templateName): # Could be improved, might throw unexpected results if the group isn't found # Needs error handling allData = [] apiCall = "https://{0}/api/sn_chg_rest/change/standard/template?sysparm_query=active=true".format(self.snInstance) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) data = apiResponse.json() for i in data['result']: allData.append(i) for record in allData: if 'sys_name' in record: if record['sys_name']['display_value'] == templateName: sysID = record['sys_id']['value'] break return sysID def getRequestItemFromReqNum(self, reqNum): apiCall = "https://{0}/api/now/table/sc_req_item?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) data = apiResponse.json() item = data['result'][0]['number'] return item def getTaskNumFromReqNum(self, reqNum): apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) data = apiResponse.json() item = data['result'][0]['number'] return item def getTaskSysIdFromReqNum(self, reqNum): apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_limit=1".format(self.snInstance, reqNum) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) data = apiResponse.json() item = data['result'][0]['sys_id'] return item def getServiceCatalogs(self): apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs".format(self.snInstance) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) items = apiResponse.json() return items def getServiceCatalogCategories(self, sys_id): apiCall = "https://{0}/api/sn_sc/servicecatalog/catalogs/{1}/categories".format(self.snInstance,sys_id) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) items = apiResponse.json() return items def getServiceCatalogItems(self): apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) items = apiResponse.json() return items def getServiceCatalogItemByName(self, itemName): apiCall = "https://{0}/api/sn_sc/servicecatalog/items?sysparm_limit=10000&sysparm_offset=0".format(self.snInstance) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) items = apiResponse.json() for item in items['result']: if item['name'] == itemName: return item def getSpecificCatalogItem(self, sysId): apiCall = "https://{0}/api/sn_sc/servicecatalog/items/{1}".format(self.snInstance,sysId) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) items = apiResponse.json() return items def getDataFromTaskByReqNum(self, sctask, reqNum): # This is only written to handle one task per REQ number!!! apiCall = "https://{0}/api/now/table/sc_task?sysparm_query=request.number={1}&sysparm_fields=state%2Cnumber%2Csys_id%2Cdescription%2Cshort_description%2Cvariables.application_name%2Cvariables.additional_comments%2Cclose_notes%2Cclosed_by&sysparm_limit=1&sysparm_display_value=true".format(self.snInstance, reqNum) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) data = apiResponse.json() if data['result'][0]['number'] == sctask: items = data['result'][0] else: items = "" return items def updateTaskDescriptions(self, sysId, short_desc, desc): apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId) payload = { 'short_description': short_desc, 'description': desc } encoded_payload = json.dumps(payload) apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) if apiResponse.status_code != 201 and apiResponse.status_code != 200: print('Failed to update task:') print('\tStatus:', apiResponse.status_code) print('\tError Response:',apiResponse.json()) exit(1) else: return apiResponse.json() def assignTaskToUser(self, sysId, userName): userID = self.getUserID(userName) apiCall = "https://{0}/api/now/table/sc_task/{1}".format(self.snInstance,sysId) payload = { "assigned_to": userID } encoded_payload = json.dumps(payload) apiResponse = requests.put(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) if apiResponse.status_code != 201 and apiResponse.status_code != 200: print('Failed to update task:') print('\tStatus:', apiResponse.status_code) print('\tError Response:',apiResponse.json()) exit(1) else: return apiResponse.json() def postAppServerRequest(self,itemID,requestFor,requestBy,approver,requestType,comments,appName,env): requestForID = self.getUserID(requestFor) requestByID = self.getUserID(requestBy) approvalID = self.getUserID(approver) # Statically set these so we always force approvals to go to Computer Systems departmentID = "f3c65cef1bfed050bba0113fad4bcb1d" departmentCode = "112" divisionID = "f40758231b321450bba0113fad4bcb2d" divisionCode = "32" payload = { "get_portal_messages" : "true", "sysparm_quantity" : "1", "sysparm_no_validation" : "true", "variables" : { "v_approval_department" : departmentID, "v_approval_department_code" : departmentCode, "v_approval_division" : divisionID, "v_approval_division_code" : divisionCode, "v_manager" : approvalID, "v_requested_by" : requestByID, "v_requested_for" : requestForID, "request_type" : requestType, "additional_comments" : comments, "application_name" : appName, "environment" : env, "require_hosting_quote" : "No", "add_change_disaster_recovery" : "No" } } apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID) encoded_payload = json.dumps(payload) apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) if apiResponse.status_code != 201 and apiResponse.status_code != 200: print('Failed to create ticket:') print('\tStatus:', apiResponse.status_code) print('\tError Response:',apiResponse.json()) exit(1) else: return apiResponse.json() return apiResponse def postGenericSerivceRequest(self,itemID,requestFor,requestBy,approver,requestType,comments): requestForID = self.getUserID(requestFor) requestByID = self.getUserID(requestBy) approvalID = self.getUserID(approver) departmentID = "f3c65cef1bfed050bba0113fad4bcb1d" divisionID = "f40758231b321450bba0113fad4bcb2d" payload = { "get_portal_messages" : "true", "sysparm_quantity" : "1", "sysparm_no_validation" : "true", "variables" : { "v_approval_department" : departmentID, "v_approval_division" : divisionID, "v_manager" : approvalID, "v_requested_by" : requestByID, "v_requested_for" : requestForID, "v_type" : requestType, "additional_comments" : comments } } apiCall = ('https://{0}/api/sn_sc/v1/servicecatalog/items/{1}/order_now').format(self.snInstance,itemID) encoded_payload = json.dumps(payload) apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) if apiResponse.status_code != 201 and apiResponse.status_code != 200: print('Failed to create ticket:') print('\tStatus:', apiResponse.status_code) print('\tError Response:',apiResponse.json()) exit(1) else: return apiResponse.json() return apiResponse def getGenericData(self, url): apiCall = "https://{0}/".format(self.snInstance,url) apiResponse = requests.get(apiCall, auth=(self.user, self.pwd), headers=self.headers) items = apiResponse.json() return items def submitTicket(self, user, subject, content): apiCall = ('https://{}/api/now/table/incident').format(self.snInstance) userID=self.getUserID(user) payload = { 'caller_id': userID, 'short_description': subject, 'description': content } encoded_payload = json.dumps(payload) apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) if apiResponse.status_code != 201 and apiResponse.status_code != 200: print('Failed to create ticket:') print('\tStatus:', apiResponse.status_code) print('\tError Response:',apiResponse.json()) exit(1) else: return apiResponse.json() def assignTicketToGroup(self, sysId, grpName): grpId = self.getGroupID(grpName) url = 'https://{}/api/now/table/incident/{}'.format(self.snInstance,sysId) #Statically set the group to storage, this value was derived by decoding the web URL on storage tickets payload = { 'assignment_group': grpId } encoded_payload = json.dumps(payload) apiResponse = requests.put(url, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) if apiResponse.status_code != 201 and apiResponse.status_code != 200: print('Failed to create ticket:') print('\tStatus:', apiResponse.status_code) print('\tError Response:',apiResponse.json()) exit(1) else: return apiResponse.json() def assessNormalChange(self, sysID): apiCall = "https://{0}/api/sn_chg_rest/change/normal/{1}".format(self.snInstance,sysID) payload = { "state": "assess" } encoded_payload=json.dumps(payload) apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() def addStandardChangeNotes(self, sysID, txtWorkNotes): apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID) payload = { "work_notes": txtWorkNotes } encoded_payload=json.dumps(payload) apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() def scheduleStandardChange(self, sysID): apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,sysID) payload = { "state": "Scheduled" } encoded_payload=json.dumps(payload) apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() def implementStandardChange(self, sysID): apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID) payload = { "state": "Implement" } encoded_payload=json.dumps(payload) apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() def reviewStandardChange(self, sysID): apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID) payload = { "state": "Review" } encoded_payload=json.dumps(payload) apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() def closeStandardChange(self, sysID, code, notes): apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance, sysID) payload = { "state": "Closed", "close_code": code, "close_notes": notes } encoded_payload=json.dumps(payload) apiResponse = requests.patch(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() def CreateNormalChange(self, grpName, assignee, coordinator, approver, category, subCategory, shortDesc, desc, justDesc, implmntPlan, riskImpact, backoutPlan, testPlan, sTime, eTime): # Creates a normal change request that will require approval. grpID = self.getGroupID(grpName) userID = self.getUserID(assignee) chngCoordID = self.getUserID(coordinator) chngMngrID = self.getUserID(approver) apiCall = "https://{0}/api/sn_chg_rest/change/normal".format(self.snInstance) payload = { "category": category, "u_subcategory": subCategory, "u_change_manager": chngMngrID, "assigned_to": userID, "u_change_coordinator": chngCoordID, "assignment_group": grpID, "short_description": shortDesc, "description": desc, "justification": justDesc, "implementation_plan": implmntPlan, "risk_impact_analysis": riskImpact, "backout_plan": backoutPlan, "test_plan": testPlan, "start_date": sTime, "end_date": eTime, "cab_required": False, } encoded_payload=json.dumps(payload) changeResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) changeData = changeResponse.json() return changeData def CreateStandardChange(self, templateName, grpName, assignee, coordinator, approver, category, subcategory, txtShortDesc, txtJustification): templateID = self.getStandardChangeTemplateID(templateName) grpID = self.getGroupID(grpName) userID = self.getUserID(assignee) chngCoordID = self.getUserID(coordinator) chngMngrID = self.getUserID(approver) sTime = datetime.now().strftime("%Y-%m-%d %H:%M:%S") eTime = (datetime.now()+timedelta(minutes=+1)).strftime("%Y-%m-%d %H:%M:%S") apiCall = "https://{0}/api/sn_chg_rest/change/standard/{1}".format(self.snInstance,templateID) payload={ "category": category, "u_subcategory": subcategory, "u_change_manager": chngMngrID, "assigned_to": userID, "u_change_coordinator": chngCoordID, "assignment_group": grpID, "short_description": txtShortDesc, "justification": txtJustification, "start_date": sTime, "end_date": eTime } encoded_payload=json.dumps(payload) apiResponse = requests.post(apiCall, auth=(self.user, self.pwd), headers=self.headers, data=encoded_payload) return apiResponse.json() ######################################### END CLASS ################################