# Directory Structure
```
├── .env.example
├── .github
│ └── workflows
│ └── python-package.yml
├── .gitignore
├── CONTRIBUTING.md
├── LICENSE
├── mcp_server_servicenow
│ ├── __init__.py
│ ├── cli.py
│ ├── nlp.py
│ └── server.py
├── pyproject.toml
├── README.md
├── requirements.txt
├── servicenow-mcp.py
└── tests
├── __init__.py
└── test_nlp.py
```
# Files
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
1 | # Python bytecode
2 | __pycache__/
3 | *.py[cod]
4 | *$py.class
5 |
6 | # Distribution / packaging
7 | dist/
8 | build/
9 | *.egg-info/
10 | *.egg
11 |
12 | # Virtual environments
13 | venv/
14 | env/
15 | ENV/
16 |
17 | # IDE files
18 | .idea/
19 | .vscode/
20 | *.swp
21 | *.swo
22 |
23 | # Environment variables
24 | .env
25 |
26 | # Logs
27 | *.log
28 |
29 | # Test files
30 | test_*.js
31 |
32 | # OS specific files
33 | .DS_Store
34 | Thumbs.db
35 |
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
1 | # ServiceNow MCP Server Environment Variables
2 |
3 | # ServiceNow Instance URL
4 | SERVICENOW_INSTANCE_URL=https://your-instance.service-now.com/
5 |
6 | # Authentication - Basic Auth
7 | SERVICENOW_USERNAME=your-username
8 | SERVICENOW_PASSWORD=your-password
9 |
10 | # Authentication - Token Auth (alternative to Basic Auth)
11 | # SERVICENOW_TOKEN=your-token
12 |
13 | # Authentication - OAuth (alternative to Basic Auth)
14 | # SERVICENOW_CLIENT_ID=your-client-id
15 | # SERVICENOW_CLIENT_SECRET=your-client-secret
16 | # SERVICENOW_USERNAME=your-username
17 | # SERVICENOW_PASSWORD=your-password
18 |
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
1 | # ServiceNow MCP Server
2 |
3 | [](https://opensource.org/licenses/MIT)
4 |
5 | A Model Context Protocol (MCP) server that interfaces with ServiceNow, allowing AI agents to access and manipulate ServiceNow data through a secure API. This server enables natural language interactions with ServiceNow, making it easier to search for records, update them, and manage scripts.
6 |
7 | ## Features
8 |
9 | ### Resources
10 |
11 | - `servicenow://incidents`: List recent incidents
12 | - `servicenow://incidents/{number}`: Get a specific incident by number
13 | - `servicenow://users`: List users
14 | - `servicenow://knowledge`: List knowledge articles
15 | - `servicenow://tables`: List available tables
16 | - `servicenow://tables/{table}`: Get records from a specific table
17 | - `servicenow://schema/{table}`: Get the schema for a table
18 |
19 | ### Tools
20 |
21 | #### Basic Tools
22 | - `create_incident`: Create a new incident
23 | - `update_incident`: Update an existing incident
24 | - `search_records`: Search for records using text query
25 | - `get_record`: Get a specific record by sys_id
26 | - `perform_query`: Perform a query against ServiceNow
27 | - `add_comment`: Add a comment to an incident (customer visible)
28 | - `add_work_notes`: Add work notes to an incident (internal)
29 |
30 | #### Natural Language Tools
31 | - `natural_language_search`: Search for records using natural language (e.g., "find all incidents about SAP")
32 | - `natural_language_update`: Update records using natural language (e.g., "Update incident INC0010001 saying I'm working on it")
33 | - `update_script`: Update ServiceNow script files (script includes, business rules, etc.)
34 |
35 | ## Installation
36 |
37 | ### From PyPI
38 |
39 | ```bash
40 | pip install mcp-server-servicenow
41 | ```
42 |
43 | ### From Source
44 |
45 | ```bash
46 | git clone https://github.com/michaelbuckner/servicenow-mcp.git
47 | cd servicenow-mcp
48 | pip install -e .
49 | ```
50 |
51 | ## Usage
52 |
53 | ### Command Line
54 |
55 | Run the server using the Python module:
56 |
57 | ```bash
58 | python -m mcp_server_servicenow.cli --url "https://your-instance.service-now.com/" --username "your-username" --password "your-password"
59 | ```
60 |
61 | Or use environment variables:
62 |
63 | ```bash
64 | export SERVICENOW_INSTANCE_URL="https://your-instance.service-now.com/"
65 | export SERVICENOW_USERNAME="your-username"
66 | export SERVICENOW_PASSWORD="your-password"
67 | python -m mcp_server_servicenow.cli
68 | ```
69 |
70 | ### Configuration in Cline
71 |
72 | To use this MCP server with Cline, add the following to your MCP settings file:
73 |
74 | ```json
75 | {
76 | "mcpServers": {
77 | "servicenow": {
78 | "command": "/path/to/your/python/executable",
79 | "args": [
80 | "-m",
81 | "mcp_server_servicenow.cli",
82 | "--url", "https://your-instance.service-now.com/",
83 | "--username", "your-username",
84 | "--password", "your-password"
85 | ],
86 | "disabled": false,
87 | "autoApprove": []
88 | }
89 | }
90 | }
91 | ```
92 |
93 | **Note:** Make sure to use the full path to the Python executable that has the `mcp-server-servicenow` package installed.
94 |
95 | ## Natural Language Examples
96 |
97 | ### Searching Records
98 |
99 | You can search for records using natural language queries:
100 |
101 | ```
102 | find all incidents about email
103 | search for incidents related to network issues
104 | show me all incidents with high priority
105 | ```
106 |
107 | ### Updating Records
108 |
109 | You can update records using natural language commands:
110 |
111 | ```
112 | Update incident INC0010001 saying I'm working on it
113 | Set incident INC0010002 to in progress
114 | Close incident INC0010003 with resolution: fixed the issue
115 | ```
116 |
117 | ### Managing Scripts
118 |
119 | You can update ServiceNow scripts from local files:
120 |
121 | ```
122 | Update the ServiceNow script include "HelloWorld" with the contents of hello_world.js
123 | Upload utils.js to ServiceNow as a script include named "UtilityFunctions"
124 | Update @form_validation.js, it's a client script called "FormValidation"
125 | ```
126 |
127 | ## Authentication Methods
128 |
129 | The server supports multiple authentication methods:
130 |
131 | 1. **Basic Authentication**: Username and password
132 | 2. **Token Authentication**: OAuth token
133 | 3. **OAuth Authentication**: Client ID, Client Secret, Username, and Password
134 |
135 | ## Development
136 |
137 | ### Prerequisites
138 |
139 | - Python 3.8+
140 | - ServiceNow instance with API access
141 |
142 | ### Setting Up Development Environment
143 |
144 | ```bash
145 | # Clone the repository
146 | git clone https://github.com/michaelbuckner/servicenow-mcp.git
147 | cd servicenow-mcp
148 |
149 | # Create a virtual environment
150 | python -m venv venv
151 | source venv/bin/activate # On Windows: venv\Scripts\activate
152 |
153 | # Install development dependencies
154 | pip install -e ".[dev]"
155 | ```
156 |
157 | ### Running Tests
158 |
159 | ```bash
160 | pytest
161 | ```
162 |
163 | ## Contributing
164 |
165 | Contributions are welcome! Please feel free to submit a Pull Request.
166 |
167 | 1. Fork the repository
168 | 2. Create your feature branch (`git checkout -b feature/amazing-feature`)
169 | 3. Commit your changes (`git commit -m 'Add some amazing feature'`)
170 | 4. Push to the branch (`git push origin feature/amazing-feature`)
171 | 5. Open a Pull Request
172 |
173 | ## License
174 |
175 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
176 |
```
--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------
```markdown
1 | # Contributing to ServiceNow MCP Server
2 |
3 | Thank you for considering contributing to the ServiceNow MCP Server! This document provides guidelines and instructions for contributing to this project.
4 |
5 | ## Code of Conduct
6 |
7 | By participating in this project, you agree to abide by our code of conduct. Please be respectful and considerate of others.
8 |
9 | ## How to Contribute
10 |
11 | ### Reporting Bugs
12 |
13 | If you find a bug, please create an issue on GitHub with the following information:
14 |
15 | - A clear, descriptive title
16 | - A detailed description of the issue
17 | - Steps to reproduce the bug
18 | - Expected behavior
19 | - Actual behavior
20 | - Screenshots (if applicable)
21 | - Environment information (OS, Python version, etc.)
22 |
23 | ### Suggesting Enhancements
24 |
25 | If you have an idea for an enhancement, please create an issue on GitHub with the following information:
26 |
27 | - A clear, descriptive title
28 | - A detailed description of the enhancement
29 | - Any relevant examples or mockups
30 | - Why this enhancement would be useful
31 |
32 | ### Pull Requests
33 |
34 | 1. Fork the repository
35 | 2. Create a new branch (`git checkout -b feature/amazing-feature`)
36 | 3. Make your changes
37 | 4. Run the tests (`pytest`)
38 | 5. Commit your changes (`git commit -m 'Add some amazing feature'`)
39 | 6. Push to the branch (`git push origin feature/amazing-feature`)
40 | 7. Open a Pull Request
41 |
42 | ## Development Setup
43 |
44 | 1. Clone the repository
45 | ```bash
46 | git clone https://github.com/michaelbuckner/servicenow-mcp.git
47 | cd servicenow-mcp
48 | ```
49 |
50 | 2. Create a virtual environment
51 | ```bash
52 | python -m venv venv
53 | source venv/bin/activate # On Windows: venv\Scripts\activate
54 | ```
55 |
56 | 3. Install development dependencies
57 | ```bash
58 | pip install -e ".[dev]"
59 | ```
60 |
61 | 4. Create a `.env` file with your ServiceNow credentials (see `.env.example`)
62 |
63 | 5. Run the tests
64 | ```bash
65 | pytest
66 | ```
67 |
68 | ## Coding Standards
69 |
70 | - Follow PEP 8 style guide
71 | - Use type hints
72 | - Write docstrings for all functions, classes, and methods
73 | - Write tests for all new features and bug fixes
74 |
75 | ## Testing
76 |
77 | - All tests should be written using pytest
78 | - Run tests with `pytest`
79 | - Ensure all tests pass before submitting a pull request
80 |
81 | ## Documentation
82 |
83 | - Update the README.md with any necessary changes
84 | - Document all new features and changes in the code
85 |
86 | ## License
87 |
88 | By contributing to this project, you agree that your contributions will be licensed under the project's MIT License.
89 |
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
1 | # Test package initialization
2 |
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
1 | mcp>=1.0.0
2 | httpx>=0.27.0
3 | requests>=2.31.0
4 | pydantic>=2.0.0
5 | python-dotenv>=1.0.0
```
--------------------------------------------------------------------------------
/mcp_server_servicenow/__init__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | ServiceNow MCP Server
3 |
4 | This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
5 | It allows AI agents to access and manipulate ServiceNow data through a secure API.
6 | """
7 |
8 | __version__ = "0.1.0"
9 |
```
--------------------------------------------------------------------------------
/.github/workflows/python-package.yml:
--------------------------------------------------------------------------------
```yaml
1 | name: Python Package
2 |
3 | on:
4 | push:
5 | branches: [ main ]
6 | pull_request:
7 | branches: [ main ]
8 |
9 | jobs:
10 | build:
11 | runs-on: ubuntu-latest
12 | strategy:
13 | fail-fast: false
14 | matrix:
15 | python-version: ["3.8", "3.9", "3.10", "3.11"]
16 |
17 | steps:
18 | - uses: actions/checkout@v3
19 | - name: Set up Python ${{ matrix.python-version }}
20 | uses: actions/setup-python@v4
21 | with:
22 | python-version: ${{ matrix.python-version }}
23 | - name: Install dependencies
24 | run: |
25 | python -m pip install --upgrade pip
26 | python -m pip install flake8 pytest
27 | if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
28 | pip install -e ".[dev]"
29 | - name: Lint with flake8
30 | run: |
31 | # stop the build if there are Python syntax errors or undefined names
32 | flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
33 | # exit-zero treats all errors as warnings
34 | flake8 . --count --exit-zero --max-complexity=10 --max-line-length=100 --statistics
35 | - name: Test with pytest
36 | run: |
37 | pytest
38 |
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
1 | [build-system]
2 | requires = ["setuptools>=42", "wheel"]
3 | build-backend = "setuptools.build_meta"
4 |
5 | [project]
6 | name = "mcp-server-servicenow"
7 | version = "0.1.0"
8 | description = "ServiceNow MCP Server for natural language interactions with ServiceNow"
9 | authors = [
10 | {name = "Michael Buckner", email = "[email protected]"}
11 | ]
12 | readme = "README.md"
13 | requires-python = ">=3.8"
14 | license = {text = "MIT"}
15 | keywords = ["servicenow", "mcp", "ai", "nlp", "automation"]
16 | classifiers = [
17 | "Development Status :: 4 - Beta",
18 | "Programming Language :: Python :: 3",
19 | "Programming Language :: Python :: 3.8",
20 | "Programming Language :: Python :: 3.9",
21 | "Programming Language :: Python :: 3.10",
22 | "Programming Language :: Python :: 3.11",
23 | "License :: OSI Approved :: MIT License",
24 | "Operating System :: OS Independent",
25 | "Intended Audience :: Developers",
26 | "Topic :: Software Development :: Libraries :: Python Modules",
27 | "Topic :: Internet :: WWW/HTTP",
28 | ]
29 | dependencies = [
30 | "mcp>=1.0.0",
31 | "httpx>=0.27.0",
32 | "requests>=2.31.0",
33 | "pydantic>=2.0.0",
34 | "python-dotenv>=1.0.0",
35 | ]
36 |
37 | [project.optional-dependencies]
38 | dev = [
39 | "pytest>=7.0.0",
40 | "pytest-cov>=4.0.0",
41 | "black>=23.0.0",
42 | "isort>=5.12.0",
43 | "mypy>=1.0.0",
44 | "flake8>=6.0.0",
45 | ]
46 |
47 | [project.urls]
48 | "Homepage" = "https://github.com/michaelbuckner/servicenow-mcp"
49 | "Bug Tracker" = "https://github.com/michaelbuckner/servicenow-mcp/issues"
50 | "Documentation" = "https://github.com/michaelbuckner/servicenow-mcp#readme"
51 |
52 | [project.scripts]
53 | mcp-server-servicenow = "mcp_server_servicenow.cli:main"
54 |
55 | [tool.black]
56 | line-length = 100
57 | target-version = ["py38"]
58 |
59 | [tool.isort]
60 | profile = "black"
61 | line_length = 100
62 |
63 | [tool.mypy]
64 | python_version = "3.8"
65 | warn_return_any = true
66 | warn_unused_configs = true
67 | disallow_untyped_defs = true
68 | disallow_incomplete_defs = true
69 |
70 | [tool.pytest.ini_options]
71 | testpaths = ["tests"]
72 | python_files = "test_*.py"
73 |
```
--------------------------------------------------------------------------------
/mcp_server_servicenow/cli.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | ServiceNow MCP Server CLI
3 |
4 | This module provides the command-line interface for the ServiceNow MCP server.
5 | """
6 |
7 | import argparse
8 | import os
9 | import sys
10 | from dotenv import load_dotenv
11 |
12 | from mcp_server_servicenow.server import ServiceNowMCP, create_basic_auth
13 |
14 | def main():
15 | """Run the ServiceNow MCP server from the command line"""
16 | # Load environment variables from .env file if it exists
17 | load_dotenv()
18 |
19 | parser = argparse.ArgumentParser(description="ServiceNow MCP Server")
20 | parser.add_argument("--url", help="ServiceNow instance URL", default=os.environ.get("SERVICENOW_INSTANCE_URL"))
21 | parser.add_argument("--transport", help="Transport protocol (stdio or sse)", default="stdio", choices=["stdio", "sse"])
22 |
23 | # Authentication options
24 | auth_group = parser.add_argument_group("Authentication")
25 | auth_group.add_argument("--username", help="ServiceNow username", default=os.environ.get("SERVICENOW_USERNAME"))
26 | auth_group.add_argument("--password", help="ServiceNow password", default=os.environ.get("SERVICENOW_PASSWORD"))
27 | auth_group.add_argument("--token", help="ServiceNow token", default=os.environ.get("SERVICENOW_TOKEN"))
28 | auth_group.add_argument("--client-id", help="OAuth client ID", default=os.environ.get("SERVICENOW_CLIENT_ID"))
29 | auth_group.add_argument("--client-secret", help="OAuth client secret", default=os.environ.get("SERVICENOW_CLIENT_SECRET"))
30 |
31 | args = parser.parse_args()
32 |
33 | # Check required parameters
34 | if not args.url:
35 | print("Error: ServiceNow instance URL is required")
36 | print("Set SERVICENOW_INSTANCE_URL environment variable or use --url")
37 | sys.exit(1)
38 |
39 | # Determine authentication method
40 | auth = None
41 | if args.token:
42 | from mcp_server_servicenow.server import create_token_auth
43 | auth = create_token_auth(args.token)
44 | elif args.client_id and args.client_secret and args.username and args.password:
45 | from mcp_server_servicenow.server import create_oauth_auth
46 | auth = create_oauth_auth(args.client_id, args.client_secret, args.username, args.password, args.url)
47 | elif args.username and args.password:
48 | auth = create_basic_auth(args.username, args.password)
49 | else:
50 | print("Error: Authentication credentials required")
51 | print("Either provide username/password, token, or OAuth credentials")
52 | sys.exit(1)
53 |
54 | # Create and run the server
55 | server = ServiceNowMCP(instance_url=args.url, auth=auth)
56 | server.run(transport=args.transport)
57 |
58 | if __name__ == "__main__":
59 | main()
60 |
```
--------------------------------------------------------------------------------
/tests/test_nlp.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the NLP processor module
3 | """
4 |
5 | import pytest
6 | from mcp_server_servicenow.nlp import NLPProcessor
7 |
8 |
9 | class TestNLPProcessor:
10 | """Test cases for the NLPProcessor class"""
11 |
12 | def test_parse_search_query(self):
13 | """Test parsing natural language search queries"""
14 | # Test basic search
15 | result = NLPProcessor.parse_search_query("find all incidents about email")
16 | assert result["table"] == "incident"
17 | assert "123TEXTQUERY321=email" in result["query"]
18 |
19 | # Test with different table
20 | result = NLPProcessor.parse_search_query("search for users related to admin")
21 | assert result["table"] == "sys_user"
22 | assert "123TEXTQUERY321=admin" in result["query"]
23 |
24 | # Test with priority
25 | result = NLPProcessor.parse_search_query("show me all incidents with high priority")
26 | assert result["table"] == "incident"
27 | assert "priority=1" in result["query"]
28 |
29 | # Test with state
30 | result = NLPProcessor.parse_search_query("find all incidents in progress")
31 | assert result["table"] == "incident"
32 | assert "state=2" in result["query"]
33 |
34 | def test_parse_update_command(self):
35 | """Test parsing natural language update commands"""
36 | # Test basic update
37 | record_number, updates = NLPProcessor.parse_update_command(
38 | "Update incident INC0010001 saying I'm working on it"
39 | )
40 | assert record_number == "INC0010001"
41 | assert updates.get("comments") == "I'm working on it"
42 | assert updates.get("state") == 2 # In Progress
43 |
44 | # Test with explicit state change
45 | record_number, updates = NLPProcessor.parse_update_command(
46 | "Close incident INC0010002 with resolution: fixed the issue"
47 | )
48 | assert record_number == "INC0010002"
49 | assert updates.get("state") == 7 # Closed
50 | assert updates.get("close_notes") == "fixed the issue"
51 | assert updates.get("close_code") == "Solved (Permanently)"
52 |
53 | # Test with work notes
54 | record_number, updates = NLPProcessor.parse_update_command(
55 | "Update incident INC0010003 with work note: internal troubleshooting steps"
56 | )
57 | assert record_number == "INC0010003"
58 | assert updates.get("work_notes") == "internal troubleshooting steps"
59 |
60 | def test_parse_script_update(self):
61 | """Test parsing script update commands"""
62 | # Test script include
63 | filename, script_type, _ = NLPProcessor.parse_script_update(
64 | "update @my_script.js, it's a script include"
65 | )
66 | assert filename == "my_script.js"
67 | assert script_type == "sys_script_include"
68 |
69 | # Test business rule
70 | filename, script_type, _ = NLPProcessor.parse_script_update(
71 | "update @validation.js, it's a business rule"
72 | )
73 | assert filename == "validation.js"
74 | assert script_type == "sys_script"
75 |
76 | # Test client script
77 | filename, script_type, _ = NLPProcessor.parse_script_update(
78 | "update @form_script.js, it's a client script"
79 | )
80 | assert filename == "form_script.js"
81 | assert script_type == "sys_script_client"
82 |
```
--------------------------------------------------------------------------------
/mcp_server_servicenow/nlp.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Natural Language Processing for ServiceNow MCP Server
3 |
4 | This module provides natural language processing capabilities for the ServiceNow MCP server.
5 | """
6 |
7 | import re
8 | from typing import Dict, Any, Tuple, List, Optional
9 |
10 | class NLPProcessor:
11 | """Natural Language Processing for ServiceNow queries and commands"""
12 |
13 | @staticmethod
14 | def parse_search_query(query: str) -> Dict[str, Any]:
15 | """
16 | Parse a natural language search query
17 |
18 | Examples:
19 | - "find all incidents about SAP"
20 | - "search for incidents related to email"
21 | - "show me all incidents with high priority"
22 |
23 | Returns:
24 | Dict with table, query, and other parameters
25 | """
26 | # Default to incident table
27 | table = "incident"
28 |
29 | # Extract table if specified
30 | table_match = re.search(r'(incidents?|problems?|changes?|tasks?|users?|groups?)', query, re.IGNORECASE)
31 | if table_match:
32 | table_type = table_match.group(1).lower()
33 | if table_type.startswith('incident'):
34 | table = "incident"
35 | elif table_type.startswith('problem'):
36 | table = "problem"
37 | elif table_type.startswith('change'):
38 | table = "change_request"
39 | elif table_type.startswith('task'):
40 | table = "task"
41 | elif table_type.startswith('user'):
42 | table = "sys_user"
43 | elif table_type.startswith('group'):
44 | table = "sys_user_group"
45 |
46 | # Extract search terms
47 | about_match = re.search(r'(?:about|related to|regarding|concerning|with|containing)\s+([^\.]+)', query, re.IGNORECASE)
48 | search_term = ""
49 | if about_match:
50 | search_term = about_match.group(1).strip()
51 | else:
52 | # Try to find any terms after common search phrases
53 | term_match = re.search(r'(?:find|search for|show|get|list|display)\s+(?:all|any|)(?:\s+\w+)?\s+(?:\w+\s+)?(.+)', query, re.IGNORECASE)
54 | if term_match:
55 | search_term = term_match.group(1).strip()
56 |
57 | # Extract priority if mentioned
58 | priority = None
59 | if re.search(r'\b(high|critical)\s+priority\b', query, re.IGNORECASE):
60 | priority = "1"
61 | elif re.search(r'\b(medium)\s+priority\b', query, re.IGNORECASE):
62 | priority = "2"
63 | elif re.search(r'\b(low)\s+priority\b', query, re.IGNORECASE):
64 | priority = "3"
65 |
66 | # Extract state if mentioned
67 | state = None
68 | if re.search(r'\b(new|open)\b', query, re.IGNORECASE):
69 | state = "1"
70 | elif re.search(r'\b(in progress|working)\b', query, re.IGNORECASE):
71 | state = "2"
72 | elif re.search(r'\b(closed|resolved)\b', query, re.IGNORECASE):
73 | state = "7"
74 |
75 | # Build the query string
76 | query_parts = []
77 | if search_term:
78 | query_parts.append(f"123TEXTQUERY321={search_term}")
79 | if priority:
80 | query_parts.append(f"priority={priority}")
81 | if state:
82 | query_parts.append(f"state={state}")
83 |
84 | query_string = "^".join(query_parts) if query_parts else ""
85 |
86 | return {
87 | "table": table,
88 | "query": query_string,
89 | "limit": 10
90 | }
91 |
92 | @staticmethod
93 | def parse_update_command(command: str) -> Tuple[str, Dict[str, Any]]:
94 | """
95 | Parse a natural language update command
96 |
97 | Examples:
98 | - "Update incident INC0010001 saying I'm working on it"
99 | - "Set incident INC0010002 to in progress"
100 | - "Close incident INC0010003 with resolution: fixed the issue"
101 |
102 | Returns:
103 | Tuple of (record_number, updates_dict)
104 | """
105 | # Extract record number
106 | number_match = re.search(r'(INC\d+|PRB\d+|CHG\d+|TASK\d+)', command, re.IGNORECASE)
107 | if not number_match:
108 | raise ValueError("No record number found in command")
109 |
110 | record_number = number_match.group(1).upper()
111 |
112 | # Initialize updates dictionary
113 | updates = {}
114 |
115 | # Check for state changes
116 | if re.search(r'\b(working on|in progress|assign)\b', command, re.IGNORECASE):
117 | updates["state"] = 2 # In Progress
118 | elif re.search(r'\b(resolve|resolved|fix|fixed)\b', command, re.IGNORECASE):
119 | updates["state"] = 6 # Resolved
120 | elif re.search(r'\b(close|closed)\b', command, re.IGNORECASE):
121 | updates["state"] = 7 # Closed
122 |
123 | # Extract comments or work notes
124 | comment_match = re.search(r'(?:saying|comment|note|with comment|with note)(?:s|)\s*:?\s*(.+?)(?:$|\.(?:\s|$))', command, re.IGNORECASE)
125 | if comment_match:
126 | comment_text = comment_match.group(1).strip()
127 | # Determine if this should be a comment or work note
128 | if re.search(r'\b(work note|internal|private)\b', command, re.IGNORECASE):
129 | updates["work_notes"] = comment_text
130 | else:
131 | updates["comments"] = comment_text
132 |
133 | # Extract close notes if closing
134 | if "state" in updates and updates["state"] in [6, 7]:
135 | close_match = re.search(r'(?:with resolution|resolution|close note|resolve with)(?:s|)\s*:?\s*(.+?)(?:$|\.(?:\s|$))', command, re.IGNORECASE)
136 | if close_match:
137 | updates["close_notes"] = close_match.group(1).strip()
138 | updates["close_code"] = "Solved (Permanently)"
139 |
140 | return record_number, updates
141 |
142 | @staticmethod
143 | def parse_script_update(command: str) -> Tuple[str, str, str]:
144 | """
145 | Parse a command to update a ServiceNow script file
146 |
147 | Examples:
148 | - "update @my_script.js, it's a script include"
149 | - "update @business_rule.js, it's a business rule"
150 |
151 | Returns:
152 | Tuple of (filename, script_type, script_content)
153 | """
154 | # Extract filename
155 | filename_match = re.search(r'@([^\s,]+)', command)
156 | if not filename_match:
157 | raise ValueError("No filename found in command")
158 |
159 | filename = filename_match.group(1)
160 |
161 | # Extract script type
162 | script_types = {
163 | "script include": "sys_script_include",
164 | "business rule": "sys_script",
165 | "client script": "sys_script_client",
166 | "ui script": "sys_ui_script",
167 | "ui action": "sys_ui_action",
168 | "ui page": "sys_ui_page",
169 | "ui macro": "sys_ui_macro",
170 | "scheduled job": "sysauto_script",
171 | "fix script": "sys_script_fix"
172 | }
173 |
174 | script_type = None
175 | for type_name, table_name in script_types.items():
176 | if re.search(rf"\b{type_name}\b", command, re.IGNORECASE):
177 | script_type = table_name
178 | break
179 |
180 | if not script_type:
181 | # Default to script include if not specified
182 | script_type = "sys_script_include"
183 |
184 | # The script content will be provided separately
185 | return filename, script_type, ""
186 |
```
--------------------------------------------------------------------------------
/servicenow-mcp.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | ServiceNow MCP Server
3 |
4 | This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
5 | It allows AI agents to access and manipulate ServiceNow data through a secure API.
6 | """
7 |
8 | import os
9 | import json
10 | import asyncio
11 | import logging
12 | from datetime import datetime
13 | from enum import Enum
14 | from typing import Dict, List, Optional, Any, Union, Literal
15 |
16 | import requests
17 | import httpx
18 | from pydantic import BaseModel, Field, field_validator
19 |
20 | from mcp.server.fastmcp import FastMCP, Context
21 | from mcp.server.fastmcp.utilities.logging import get_logger
22 |
23 | logger = get_logger(__name__)
24 |
25 | # ServiceNow API models
26 | class IncidentState(int, Enum):
27 | NEW = 1
28 | IN_PROGRESS = 2
29 | ON_HOLD = 3
30 | RESOLVED = 6
31 | CLOSED = 7
32 | CANCELED = 8
33 |
34 | class IncidentPriority(int, Enum):
35 | CRITICAL = 1
36 | HIGH = 2
37 | MODERATE = 3
38 | LOW = 4
39 | PLANNING = 5
40 |
41 | class IncidentUrgency(int, Enum):
42 | HIGH = 1
43 | MEDIUM = 2
44 | LOW = 3
45 |
46 | class IncidentImpact(int, Enum):
47 | HIGH = 1
48 | MEDIUM = 2
49 | LOW = 3
50 |
51 | class IncidentCreate(BaseModel):
52 | """Model for creating a new incident"""
53 | short_description: str = Field(..., description="A brief description of the incident")
54 | description: str = Field(..., description="A detailed description of the incident")
55 | caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
56 | category: Optional[str] = Field(None, description="The incident category")
57 | subcategory: Optional[str] = Field(None, description="The incident subcategory")
58 | urgency: Optional[IncidentUrgency] = Field(IncidentUrgency.MEDIUM, description="The urgency of the incident")
59 | impact: Optional[IncidentImpact] = Field(IncidentImpact.MEDIUM, description="The impact of the incident")
60 | assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
61 | assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
62 |
63 | class IncidentUpdate(BaseModel):
64 | """Model for updating an existing incident"""
65 | short_description: Optional[str] = Field(None, description="A brief description of the incident")
66 | description: Optional[str] = Field(None, description="A detailed description of the incident")
67 | caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
68 | category: Optional[str] = Field(None, description="The incident category")
69 | subcategory: Optional[str] = Field(None, description="The incident subcategory")
70 | urgency: Optional[IncidentUrgency] = Field(None, description="The urgency of the incident")
71 | impact: Optional[IncidentImpact] = Field(None, description="The impact of the incident")
72 | state: Optional[IncidentState] = Field(None, description="The state of the incident")
73 | assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
74 | assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
75 | work_notes: Optional[str] = Field(None, description="Work notes to add to the incident (internal)")
76 | comments: Optional[str] = Field(None, description="Customer visible comments to add to the incident")
77 |
78 | @field_validator('work_notes', 'comments')
79 | @classmethod
80 | def validate_not_empty(cls, v):
81 | if v is not None and v.strip() == '':
82 | raise ValueError("Cannot be an empty string")
83 | return v
84 |
85 | class Config:
86 | use_enum_values = True
87 |
88 | class QueryOptions(BaseModel):
89 | """Options for querying ServiceNow records"""
90 | limit: int = Field(10, description="Maximum number of records to return", ge=1, le=1000)
91 | offset: int = Field(0, description="Number of records to skip", ge=0)
92 | fields: Optional[List[str]] = Field(None, description="List of fields to return")
93 | query: Optional[str] = Field(None, description="ServiceNow encoded query string")
94 | order_by: Optional[str] = Field(None, description="Field to order results by")
95 | order_direction: Optional[Literal["asc", "desc"]] = Field("desc", description="Order direction")
96 |
97 | class Authentication:
98 | """Base class for ServiceNow authentication methods"""
99 |
100 | async def get_headers(self) -> Dict[str, str]:
101 | """Get authentication headers for ServiceNow API requests"""
102 | raise NotImplementedError("Subclasses must implement this method")
103 |
104 | class BasicAuth(Authentication):
105 | """Basic authentication for ServiceNow"""
106 |
107 | def __init__(self, username: str, password: str):
108 | self.username = username
109 | self.password = password
110 |
111 | async def get_headers(self) -> Dict[str, str]:
112 | """Get authentication headers for ServiceNow API requests"""
113 | return {}
114 |
115 | def get_auth(self) -> tuple:
116 | """Get authentication tuple for requests"""
117 | return (self.username, self.password)
118 |
119 | class TokenAuth(Authentication):
120 | """Token authentication for ServiceNow"""
121 |
122 | def __init__(self, token: str):
123 | self.token = token
124 |
125 | async def get_headers(self) -> Dict[str, str]:
126 | """Get authentication headers for ServiceNow API requests"""
127 | return {"Authorization": f"Bearer {self.token}"}
128 |
129 | def get_auth(self) -> None:
130 | """Get authentication tuple for requests"""
131 | return None
132 |
133 | class OAuthAuth(Authentication):
134 | """OAuth authentication for ServiceNow"""
135 |
136 | def __init__(self, client_id: str, client_secret: str, username: str, password: str,
137 | instance_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None,
138 | token_expiry: Optional[datetime] = None):
139 | self.client_id = client_id
140 | self.client_secret = client_secret
141 | self.username = username
142 | self.password = password
143 | self.instance_url = instance_url
144 | self.token = token
145 | self.refresh_token = refresh_token
146 | self.token_expiry = token_expiry
147 |
148 | async def get_headers(self) -> Dict[str, str]:
149 | """Get authentication headers for ServiceNow API requests"""
150 | if self.token is None or (self.token_expiry and datetime.now() > self.token_expiry):
151 | await self.refresh()
152 |
153 | return {"Authorization": f"Bearer {self.token}"}
154 |
155 | def get_auth(self) -> None:
156 | """Get authentication tuple for requests"""
157 | return None
158 |
159 | async def refresh(self):
160 | """Refresh the OAuth token"""
161 | if self.refresh_token:
162 | # Try refresh flow first
163 | data = {
164 | "grant_type": "refresh_token",
165 | "client_id": self.client_id,
166 | "client_secret": self.client_secret,
167 | "refresh_token": self.refresh_token
168 | }
169 | else:
170 | # Fall back to password flow
171 | data = {
172 | "grant_type": "password",
173 | "client_id": self.client_id,
174 | "client_secret": self.client_secret,
175 | "username": self.username,
176 | "password": self.password
177 | }
178 |
179 | token_url = f"{self.instance_url}/oauth_token.do"
180 | async with httpx.AsyncClient() as client:
181 | response = await client.post(token_url, data=data)
182 | response.raise_for_status()
183 | result = response.json()
184 |
185 | self.token = result["access_token"]
186 | self.refresh_token = result.get("refresh_token")
187 | expires_in = result.get("expires_in", 1800) # Default 30 minutes
188 | self.token_expiry = datetime.now().timestamp() + expires_in
189 |
190 | class ServiceNowClient:
191 | """Client for interacting with ServiceNow API"""
192 |
193 | def __init__(self, instance_url: str, auth: Authentication):
194 | self.instance_url = instance_url.rstrip('/')
195 | self.auth = auth
196 | self.client = httpx.AsyncClient()
197 |
198 | async def close(self):
199 | """Close the HTTP client"""
200 | await self.client.aclose()
201 |
202 | async def request(self, method: str, path: str,
203 | params: Optional[Dict[str, Any]] = None,
204 | json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
205 | """Make a request to the ServiceNow API"""
206 | url = f"{self.instance_url}{path}"
207 | headers = await self.auth.get_headers()
208 | headers["Accept"] = "application/json"
209 |
210 | if isinstance(self.auth, BasicAuth):
211 | auth = self.auth.get_auth()
212 | else:
213 | auth = None
214 |
215 | try:
216 | response = await self.client.request(
217 | method=method,
218 | url=url,
219 | params=params,
220 | json=json_data,
221 | headers=headers,
222 | auth=auth
223 | )
224 | response.raise_for_status()
225 | return response.json()
226 | except httpx.HTTPStatusError as e:
227 | logger.error(f"ServiceNow API error: {e.response.text}")
228 | raise
229 |
230 | async def get_record(self, table: str, sys_id: str) -> Dict[str, Any]:
231 | """Get a record by sys_id"""
232 | return await self.request("GET", f"/api/now/table/{table}/{sys_id}")
233 |
234 | async def get_records(self, table: str, options: QueryOptions = None) -> Dict[str, Any]:
235 | """Get records with query options"""
236 | if options is None:
237 | options = QueryOptions()
238 |
239 | params = {
240 | "sysparm_limit": options.limit,
241 | "sysparm_offset": options.offset
242 | }
243 |
244 | if options.fields:
245 | params["sysparm_fields"] = ",".join(options.fields)
246 |
247 | if options.query:
248 | params["sysparm_query"] = options.query
249 |
250 | if options.order_by:
251 | direction = "desc" if options.order_direction == "desc" else "asc"
252 | params["sysparm_order_by"] = f"{options.order_by}^{direction}"
253 |
254 | return await self.request("GET", f"/api/now/table/{table}", params=params)
255 |
256 | async def create_record(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
257 | """Create a new record"""
258 | return await self.request("POST", f"/api/now/table/{table}", json_data=data)
259 |
260 | async def update_record(self, table: str, sys_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
261 | """Update an existing record"""
262 | return await self.request("PUT", f"/api/now/table/{table}/{sys_id}", json_data=data)
263 |
264 | async def delete_record(self, table: str, sys_id: str) -> Dict[str, Any]:
265 | """Delete a record"""
266 | return await self.request("DELETE", f"/api/now/table/{table}/{sys_id}")
267 |
268 | async def get_incident_by_number(self, number: str) -> Dict[str, Any]:
269 | """Get an incident by its number"""
270 | result = await self.request("GET", f"/api/now/table/incident",
271 | params={"sysparm_query": f"number={number}", "sysparm_limit": 1})
272 | if result.get("result") and len(result["result"]) > 0:
273 | return result["result"][0]
274 | return None
275 |
276 | async def search(self, query: str, table: str = "incident", limit: int = 10) -> Dict[str, Any]:
277 | """Search for records using text query"""
278 | return await self.request("GET", f"/api/now/table/{table}",
279 | params={"sysparm_query": f"123TEXTQUERY321={query}", "sysparm_limit": limit})
280 |
281 | async def get_available_tables(self) -> List[str]:
282 | """Get a list of available tables"""
283 | result = await self.request("GET", "/api/now/table/sys_db_object",
284 | params={"sysparm_fields": "name,label", "sysparm_limit": 100})
285 | return result.get("result", [])
286 |
287 | async def get_table_schema(self, table: str) -> Dict[str, Any]:
288 | """Get the schema for a table"""
289 | result = await self.request("GET", f"/api/now/ui/meta/{table}")
290 | return result
291 |
292 |
293 | class ServiceNowMCP:
294 | """ServiceNow MCP Server"""
295 |
296 | def __init__(self,
297 | instance_url: str,
298 | auth: Authentication,
299 | name: str = "ServiceNow MCP"):
300 | self.client = ServiceNowClient(instance_url, auth)
301 | self.mcp = FastMCP(name, dependencies=[
302 | "requests",
303 | "httpx",
304 | "pydantic"
305 | ])
306 |
307 | # Register resources
308 | self.mcp.resource("servicenow://incidents")(self.list_incidents)
309 | self.mcp.resource("servicenow://incidents/{number}")(self.get_incident)
310 | self.mcp.resource("servicenow://users")(self.list_users)
311 | self.mcp.resource("servicenow://knowledge")(self.list_knowledge)
312 | self.mcp.resource("servicenow://tables")(self.get_tables)
313 | self.mcp.resource("servicenow://tables/{table}")(self.get_table_records)
314 | self.mcp.resource("servicenow://schema/{table}")(self.get_table_schema)
315 |
316 | # Register tools
317 | self.mcp.tool(name="create_incident")(self.create_incident)
318 | self.mcp.tool(name="update_incident")(self.update_incident)
319 | self.mcp.tool(name="search_records")(self.search_records)
320 | self.mcp.tool(name="get_record")(self.get_record)
321 | self.mcp.tool(name="perform_query")(self.perform_query)
322 | self.mcp.tool(name="add_comment")(self.add_comment)
323 | self.mcp.tool(name="add_work_notes")(self.add_work_notes)
324 |
325 | # Register prompts
326 | self.mcp.prompt(name="analyze_incident")(self.incident_analysis_prompt)
327 | self.mcp.prompt(name="create_incident_prompt")(self.create_incident_prompt)
328 |
329 | async def close(self):
330 | """Close the ServiceNow client"""
331 | await self.client.close()
332 |
333 | def run(self, transport: str = "stdio"):
334 | """Run the ServiceNow MCP server"""
335 | try:
336 | self.mcp.run(transport=transport)
337 | finally:
338 | asyncio.run(self.close())
339 |
340 | # Resource handlers
341 | async def list_incidents(self) -> str:
342 | """List recent incidents in ServiceNow"""
343 | options = QueryOptions(limit=10)
344 | result = await self.client.get_records("incident", options)
345 | return json.dumps(result, indent=2)
346 |
347 | async def get_incident(self, number: str) -> str:
348 | """Get a specific incident by number"""
349 | incident = await self.client.get_incident_by_number(number)
350 | if incident:
351 | return json.dumps({"result": incident}, indent=2)
352 | return json.dumps({"result": "Incident not found"})
353 |
354 | async def list_users(self) -> str:
355 | """List users in ServiceNow"""
356 | options = QueryOptions(limit=10)
357 | result = await self.client.get_records("sys_user", options)
358 | return json.dumps(result, indent=2)
359 |
360 | async def list_knowledge(self) -> str:
361 | """List knowledge articles in ServiceNow"""
362 | options = QueryOptions(limit=10)
363 | result = await self.client.get_records("kb_knowledge", options)
364 | return json.dumps(result, indent=2)
365 |
366 | async def get_tables(self) -> str:
367 | """Get a list of available tables"""
368 | result = await self.client.get_available_tables()
369 | return json.dumps({"result": result}, indent=2)
370 |
371 | async def get_table_records(self, table: str) -> str:
372 | """Get records from a specific table"""
373 | options = QueryOptions(limit=10)
374 | result = await self.client.get_records(table, options)
375 | return json.dumps(result, indent=2)
376 |
377 | async def get_table_schema(self, table: str) -> str:
378 | """Get the schema for a table"""
379 | result = await self.client.get_table_schema(table)
380 | return json.dumps(result, indent=2)
381 |
382 | # Tool handlers
383 | async def create_incident(self,
384 | incident: IncidentCreate,
385 | ctx: Context = None) -> str:
386 | """
387 | Create a new incident in ServiceNow
388 |
389 | Args:
390 | incident: The incident details to create
391 | ctx: Optional context object for progress reporting
392 |
393 | Returns:
394 | JSON response from ServiceNow
395 | """
396 | if ctx:
397 | await ctx.info(f"Creating incident: {incident.short_description}")
398 |
399 | data = incident.dict(exclude_none=True)
400 | result = await self.client.create_record("incident", data)
401 |
402 | if ctx:
403 | await ctx.info(f"Created incident: {result['result']['number']}")
404 |
405 | return json.dumps(result, indent=2)
406 |
407 | async def update_incident(self,
408 | number: str,
409 | updates: IncidentUpdate,
410 | ctx: Context = None) -> str:
411 | """
412 | Update an existing incident in ServiceNow
413 |
414 | Args:
415 | number: The incident number (INC0010001)
416 | updates: The fields to update
417 | ctx: Optional context object for progress reporting
418 |
419 | Returns:
420 | JSON response from ServiceNow
421 | """
422 | # First, get the sys_id for the incident number
423 | if ctx:
424 | await ctx.info(f"Looking up incident: {number}")
425 |
426 | incident = await self.client.get_incident_by_number(number)
427 |
428 | if not incident:
429 | error_message = f"Incident {number} not found"
430 | if ctx:
431 | await ctx.error(error_message)
432 | return json.dumps({"error": error_message})
433 |
434 | sys_id = incident['sys_id']
435 |
436 | # Now update the incident
437 | if ctx:
438 | await ctx.info(f"Updating incident: {number}")
439 |
440 | data = updates.dict(exclude_none=True)
441 | result = await self.client.update_record("incident", sys_id, data)
442 |
443 | return json.dumps(result, indent=2)
444 |
445 | async def search_records(self,
446 | query: str,
447 | table: str = "incident",
448 | limit: int = 10,
449 | ctx: Context = None) -> str:
450 | """
451 | Search for records in ServiceNow using text query
452 |
453 | Args:
454 | query: Text to search for
455 | table: Table to search in
456 | limit: Maximum number of results to return
457 | ctx: Optional context object for progress reporting
458 |
459 | Returns:
460 | JSON response containing matching records
461 | """
462 | if ctx:
463 | await ctx.info(f"Searching {table} for: {query}")
464 |
465 | result = await self.client.search(query, table, limit)
466 | return json.dumps(result, indent=2)
467 |
468 | async def get_record(self,
469 | table: str,
470 | sys_id: str,
471 | ctx: Context = None) -> str:
472 | """
473 | Get a specific record by sys_id
474 |
475 | Args:
476 | table: Table to query
477 | sys_id: System ID of the record
478 | ctx: Optional context object for progress reporting
479 |
480 | Returns:
481 | JSON response containing the record
482 | """
483 | if ctx:
484 | await ctx.info(f"Getting {table} record: {sys_id}")
485 |
486 | result = await self.client.get_record(table, sys_id)
487 | return json.dumps(result, indent=2)
488 |
489 | async def perform_query(self,
490 | table: str,
491 | query: str = "",
492 | limit: int = 10,
493 | offset: int = 0,
494 | fields: Optional[List[str]] = None,
495 | ctx: Context = None) -> str:
496 | """
497 | Perform a query against ServiceNow
498 |
499 | Args:
500 | table: Table to query
501 | query: Encoded query string (ServiceNow syntax)
502 | limit: Maximum number of results to return
503 | offset: Number of records to skip
504 | fields: List of fields to return (or all fields if None)
505 | ctx: Optional context object for progress reporting
506 |
507 | Returns:
508 | JSON response containing query results
509 | """
510 | if ctx:
511 | await ctx.info(f"Querying {table} with: {query}")
512 |
513 | options = QueryOptions(
514 | limit=limit,
515 | offset=offset,
516 | fields=fields,
517 | query=query
518 | )
519 |
520 | result = await self.client.get_records(table, options)
521 | return json.dumps(result, indent=2)
522 |
523 | async def add_comment(self,
524 | number: str,
525 | comment: str,
526 | ctx: Context = None) -> str:
527 | """
528 | Add a comment to an incident (customer visible)
529 |
530 | Args:
531 | number: Incident number
532 | comment: Comment to add
533 | ctx: Optional context object for progress reporting
534 |
535 | Returns:
536 | JSON response from ServiceNow
537 | """
538 | if ctx:
539 | await ctx.info(f"Adding comment to incident: {number}")
540 |
541 | incident = await self.client.get_incident_by_number(number)
542 |
543 | if not incident:
544 | error_message = f"Incident {number} not found"
545 | if ctx:
546 | await ctx.error(error_message)
547 | return json.dumps({"error": error_message})
548 |
549 | sys_id = incident['sys_id']
550 |
551 | # Add the comment
552 | update = {"comments": comment}
553 | result = await self.client.update_record("incident", sys_id, update)
554 |
555 | return json.dumps(result, indent=2)
556 |
557 | async def add_work_notes(self,
558 | number: str,
559 | work_notes: str,
560 | ctx: Context = None) -> str:
561 | """
562 | Add work notes to an incident (internal)
563 |
564 | Args:
565 | number: Incident number
566 | work_notes: Work notes to add
567 | ctx: Optional context object for progress reporting
568 |
569 | Returns:
570 | JSON response from ServiceNow
571 | """
572 | if ctx:
573 | await ctx.info(f"Adding work notes to incident: {number}")
574 |
575 | incident = await self.client.get_incident_by_number(number)
576 |
577 | if not incident:
578 | error_message = f"Incident {number} not found"
579 | if ctx:
580 | await ctx.error(error_message)
581 | return json.dumps({"error": error_message})
582 |
583 | sys_id = incident['sys_id']
584 |
585 | # Add the work notes
586 | update = {"work_notes": work_notes}
587 | result = await self.client.update_record("incident", sys_id, update)
588 |
589 | return json.dumps(result, indent=2)
590 |
591 | # Prompt templates
592 | def incident_analysis_prompt(self, incident_number: str) -> str:
593 | """Create a prompt to analyze a ServiceNow incident
594 |
595 | Args:
596 | incident_number: The incident number to analyze (e.g., INC0010001)
597 |
598 | Returns:
599 | Prompt text for analyzing the incident
600 | """
601 | return f"""
602 | Please analyze the following ServiceNow incident {incident_number}.
603 |
604 | First, call the appropriate tool to fetch the incident details using get_incident.
605 |
606 | Then, provide a comprehensive analysis with the following sections:
607 |
608 | 1. Summary: A brief overview of the incident
609 | 2. Impact Assessment: Analysis of the impact based on the severity, priority, and affected users
610 | 3. Root Cause Analysis: Potential causes based on available information
611 | 4. Resolution Recommendations: Suggested next steps to resolve the incident
612 | 5. SLA Status: Whether the incident is at risk of breaching SLAs
613 |
614 | Use a professional and clear tone appropriate for IT service management.
615 | """
616 |
617 | def create_incident_prompt(self) -> str:
618 | """Create a prompt for incident creation guidance
619 |
620 | Returns:
621 | Prompt text for helping users create an incident
622 | """
623 | return """
624 | I'll help you create a new ServiceNow incident. Please provide the following information:
625 |
626 | 1. Short Description: A brief title for the incident (required)
627 | 2. Detailed Description: A thorough explanation of the issue (required)
628 | 3. Caller: The person reporting the issue (optional)
629 | 4. Category and Subcategory: The type of issue (optional)
630 | 5. Impact (1-High, 2-Medium, 3-Low): How broadly this affects users (optional)
631 | 6. Urgency (1-High, 2-Medium, 3-Low): How time-sensitive this issue is (optional)
632 |
633 | After collecting this information, I'll use the create_incident tool to submit the incident to ServiceNow.
634 | """
635 |
636 |
637 | # Factory functions for creating authentication objects
638 | def create_basic_auth(username: str, password: str) -> BasicAuth:
639 | """Create BasicAuth object for ServiceNow authentication"""
640 | return BasicAuth(username, password)
641 |
642 | def create_token_auth(token: str) -> TokenAuth:
643 | """Create TokenAuth object for ServiceNow authentication"""
644 | return TokenAuth(token)
645 |
646 | def create_oauth_auth(client_id: str, client_secret: str,
647 | username: str, password: str,
648 | instance_url: str) -> OAuthAuth:
649 | """Create OAuthAuth object for ServiceNow authentication"""
650 | return OAuthAuth(client_id, client_secret, username, password, instance_url)
651 |
652 | # Main function for running the server from the command line
653 | def main():
654 | """Run the ServiceNow MCP server from the command line"""
655 | import argparse
656 | import sys
657 |
658 | parser = argparse.ArgumentParser(description="ServiceNow MCP Server")
659 | parser.add_argument("--url", help="ServiceNow instance URL", default=os.environ.get("SERVICENOW_INSTANCE_URL"))
660 | parser.add_argument("--transport", help="Transport protocol (stdio or sse)", default="stdio", choices=["stdio", "sse"])
661 |
662 | # Authentication options
663 | auth_group = parser.add_argument_group("Authentication")
664 | auth_group.add_argument("--username", help="ServiceNow username", default=os.environ.get("SERVICENOW_USERNAME"))
665 | auth_group.add_argument("--password", help="ServiceNow password", default=os.environ.get("SERVICENOW_PASSWORD"))
666 | auth_group.add_argument("--token", help="ServiceNow token", default=os.environ.get("SERVICENOW_TOKEN"))
667 | auth_group.add_argument("--client-id", help="OAuth client ID", default=os.environ.get("SERVICENOW_CLIENT_ID"))
668 | auth_group.add_argument("--client-secret", help="OAuth client secret", default=os.environ.get("SERVICENOW_CLIENT_SECRET"))
669 |
670 | args = parser.parse_args()
671 |
672 | # Check required parameters
673 | if not args.url:
674 | print("Error: ServiceNow instance URL is required")
675 | print("Set SERVICENOW_INSTANCE_URL environment variable or use --url")
676 | sys.exit(1)
677 |
678 | # Determine authentication method
679 | auth = None
680 | if args.token:
681 | auth = create_token_auth(args.token)
682 | elif args.client_id and args.client_secret and args.username and args.password:
683 | auth = create_oauth_auth(args.client_id, args.client_secret, args.username, args.password, args.url)
684 | elif args.username and args.password:
685 | auth = create_basic_auth(args.username, args.password)
686 | else:
687 | print("Error: Authentication credentials required")
688 | print("Either provide username/password, token, or OAuth credentials")
689 | sys.exit(1)
690 |
691 | # Create and run the server
692 | server = ServiceNowMCP(instance_url=args.url, auth=auth)
693 | server.run(transport=args.transport)
694 |
695 | # Entry point
696 | if __name__ == "__main__":
697 | main()
698 |
```
--------------------------------------------------------------------------------
/mcp_server_servicenow/server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | ServiceNow MCP Server
3 |
4 | This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
5 | It allows AI agents to access and manipulate ServiceNow data through a secure API.
6 | """
7 |
8 | import os
9 | import json
10 | import asyncio
11 | import logging
12 | import re
13 | from datetime import datetime
14 | from enum import Enum
15 | from typing import Dict, List, Optional, Any, Union, Literal, Tuple
16 |
17 | import requests
18 | import httpx
19 | from pydantic import BaseModel, Field, field_validator
20 |
21 | from mcp_server_servicenow.nlp import NLPProcessor
22 |
23 | from mcp.server.fastmcp import FastMCP, Context
24 | from mcp.server.fastmcp.utilities.logging import get_logger
25 |
26 | logger = get_logger(__name__)
27 |
28 | # ServiceNow API models
29 | class IncidentState(int, Enum):
30 | NEW = 1
31 | IN_PROGRESS = 2
32 | ON_HOLD = 3
33 | RESOLVED = 6
34 | CLOSED = 7
35 | CANCELED = 8
36 |
37 | class IncidentPriority(int, Enum):
38 | CRITICAL = 1
39 | HIGH = 2
40 | MODERATE = 3
41 | LOW = 4
42 | PLANNING = 5
43 |
44 | class IncidentUrgency(int, Enum):
45 | HIGH = 1
46 | MEDIUM = 2
47 | LOW = 3
48 |
49 | class IncidentImpact(int, Enum):
50 | HIGH = 1
51 | MEDIUM = 2
52 | LOW = 3
53 |
54 | class IncidentCreate(BaseModel):
55 | """Model for creating a new incident"""
56 | short_description: str = Field(..., description="A brief description of the incident")
57 | description: str = Field(..., description="A detailed description of the incident")
58 | caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
59 | category: Optional[str] = Field(None, description="The incident category")
60 | subcategory: Optional[str] = Field(None, description="The incident subcategory")
61 | urgency: Optional[IncidentUrgency] = Field(IncidentUrgency.MEDIUM, description="The urgency of the incident")
62 | impact: Optional[IncidentImpact] = Field(IncidentImpact.MEDIUM, description="The impact of the incident")
63 | assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
64 | assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
65 |
66 | class IncidentUpdate(BaseModel):
67 | """Model for updating an existing incident"""
68 | short_description: Optional[str] = Field(None, description="A brief description of the incident")
69 | description: Optional[str] = Field(None, description="A detailed description of the incident")
70 | caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
71 | category: Optional[str] = Field(None, description="The incident category")
72 | subcategory: Optional[str] = Field(None, description="The incident subcategory")
73 | urgency: Optional[IncidentUrgency] = Field(None, description="The urgency of the incident")
74 | impact: Optional[IncidentImpact] = Field(None, description="The impact of the incident")
75 | state: Optional[IncidentState] = Field(None, description="The state of the incident")
76 | assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
77 | assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
78 | work_notes: Optional[str] = Field(None, description="Work notes to add to the incident (internal)")
79 | comments: Optional[str] = Field(None, description="Customer visible comments to add to the incident")
80 |
81 | @field_validator('work_notes', 'comments')
82 | @classmethod
83 | def validate_not_empty(cls, v):
84 | if v is not None and v.strip() == '':
85 | raise ValueError("Cannot be an empty string")
86 | return v
87 |
88 | class Config:
89 | use_enum_values = True
90 |
91 | class QueryOptions(BaseModel):
92 | """Options for querying ServiceNow records"""
93 | limit: int = Field(10, description="Maximum number of records to return", ge=1, le=1000)
94 | offset: int = Field(0, description="Number of records to skip", ge=0)
95 | fields: Optional[List[str]] = Field(None, description="List of fields to return")
96 | query: Optional[str] = Field(None, description="ServiceNow encoded query string")
97 | order_by: Optional[str] = Field(None, description="Field to order results by")
98 | order_direction: Optional[Literal["asc", "desc"]] = Field("desc", description="Order direction")
99 |
100 | class Authentication:
101 | """Base class for ServiceNow authentication methods"""
102 |
103 | async def get_headers(self) -> Dict[str, str]:
104 | """Get authentication headers for ServiceNow API requests"""
105 | raise NotImplementedError("Subclasses must implement this method")
106 |
107 | class BasicAuth(Authentication):
108 | """Basic authentication for ServiceNow"""
109 |
110 | def __init__(self, username: str, password: str):
111 | self.username = username
112 | self.password = password
113 |
114 | async def get_headers(self) -> Dict[str, str]:
115 | """Get authentication headers for ServiceNow API requests"""
116 | return {}
117 |
118 | def get_auth(self) -> tuple:
119 | """Get authentication tuple for requests"""
120 | return (self.username, self.password)
121 |
122 | class TokenAuth(Authentication):
123 | """Token authentication for ServiceNow"""
124 |
125 | def __init__(self, token: str):
126 | self.token = token
127 |
128 | async def get_headers(self) -> Dict[str, str]:
129 | """Get authentication headers for ServiceNow API requests"""
130 | return {"Authorization": f"Bearer {self.token}"}
131 |
132 | def get_auth(self) -> None:
133 | """Get authentication tuple for requests"""
134 | return None
135 |
136 | class OAuthAuth(Authentication):
137 | """OAuth authentication for ServiceNow"""
138 |
139 | def __init__(self, client_id: str, client_secret: str, username: str, password: str,
140 | instance_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None,
141 | token_expiry: Optional[datetime] = None):
142 | self.client_id = client_id
143 | self.client_secret = client_secret
144 | self.username = username
145 | self.password = password
146 | self.instance_url = instance_url
147 | self.token = token
148 | self.refresh_token = refresh_token
149 | self.token_expiry = token_expiry
150 |
151 | async def get_headers(self) -> Dict[str, str]:
152 | """Get authentication headers for ServiceNow API requests"""
153 | if self.token is None or (self.token_expiry and datetime.now() > self.token_expiry):
154 | await self.refresh()
155 |
156 | return {"Authorization": f"Bearer {self.token}"}
157 |
158 | def get_auth(self) -> None:
159 | """Get authentication tuple for requests"""
160 | return None
161 |
162 | async def refresh(self):
163 | """Refresh the OAuth token"""
164 | if self.refresh_token:
165 | # Try refresh flow first
166 | data = {
167 | "grant_type": "refresh_token",
168 | "client_id": self.client_id,
169 | "client_secret": self.client_secret,
170 | "refresh_token": self.refresh_token
171 | }
172 | else:
173 | # Fall back to password flow
174 | data = {
175 | "grant_type": "password",
176 | "client_id": self.client_id,
177 | "client_secret": self.client_secret,
178 | "username": self.username,
179 | "password": self.password
180 | }
181 |
182 | token_url = f"{self.instance_url}/oauth_token.do"
183 | async with httpx.AsyncClient() as client:
184 | response = await client.post(token_url, data=data)
185 | response.raise_for_status()
186 | result = response.json()
187 |
188 | self.token = result["access_token"]
189 | self.refresh_token = result.get("refresh_token")
190 | expires_in = result.get("expires_in", 1800) # Default 30 minutes
191 | self.token_expiry = datetime.now().timestamp() + expires_in
192 |
193 | class ServiceNowClient:
194 | """Client for interacting with ServiceNow API"""
195 |
196 | def __init__(self, instance_url: str, auth: Authentication):
197 | self.instance_url = instance_url.rstrip('/')
198 | self.auth = auth
199 | self.client = httpx.AsyncClient()
200 |
201 | async def close(self):
202 | """Close the HTTP client"""
203 | await self.client.aclose()
204 |
205 | async def request(self, method: str, path: str,
206 | params: Optional[Dict[str, Any]] = None,
207 | json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
208 | """Make a request to the ServiceNow API"""
209 | url = f"{self.instance_url}{path}"
210 | headers = await self.auth.get_headers()
211 | headers["Accept"] = "application/json"
212 |
213 | if isinstance(self.auth, BasicAuth):
214 | auth = self.auth.get_auth()
215 | else:
216 | auth = None
217 |
218 | try:
219 | response = await self.client.request(
220 | method=method,
221 | url=url,
222 | params=params,
223 | json=json_data,
224 | headers=headers,
225 | auth=auth
226 | )
227 | response.raise_for_status()
228 | return response.json()
229 | except httpx.HTTPStatusError as e:
230 | logger.error(f"ServiceNow API error: {e.response.text}")
231 | raise
232 |
233 | async def get_record(self, table: str, sys_id: str) -> Dict[str, Any]:
234 | """Get a record by sys_id"""
235 | if table == "incident" and sys_id.startswith("INC"):
236 | # This is an incident number, not a sys_id
237 | logger.warning(f"Attempted to use get_record with incident number instead of sys_id: {sys_id}")
238 | logger.warning("Redirecting to get_incident_by_number method")
239 | result = await self.get_incident_by_number(sys_id)
240 | if result:
241 | return {"result": result}
242 | else:
243 | raise ValueError(f"Incident not found: {sys_id}")
244 | return await self.request("GET", f"/api/now/table/{table}/{sys_id}")
245 |
246 | async def get_records(self, table: str, options: QueryOptions = None) -> Dict[str, Any]:
247 | """Get records with query options"""
248 | if options is None:
249 | options = QueryOptions()
250 |
251 | params = {
252 | "sysparm_limit": options.limit,
253 | "sysparm_offset": options.offset
254 | }
255 |
256 | if options.fields:
257 | params["sysparm_fields"] = ",".join(options.fields)
258 |
259 | if options.query:
260 | params["sysparm_query"] = options.query
261 |
262 | if options.order_by:
263 | direction = "desc" if options.order_direction == "desc" else "asc"
264 | params["sysparm_order_by"] = f"{options.order_by}^{direction}"
265 |
266 | return await self.request("GET", f"/api/now/table/{table}", params=params)
267 |
268 | async def create_record(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
269 | """Create a new record"""
270 | return await self.request("POST", f"/api/now/table/{table}", json_data=data)
271 |
272 | async def update_record(self, table: str, sys_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
273 | """Update an existing record"""
274 | return await self.request("PUT", f"/api/now/table/{table}/{sys_id}", json_data=data)
275 |
276 | async def delete_record(self, table: str, sys_id: str) -> Dict[str, Any]:
277 | """Delete a record"""
278 | return await self.request("DELETE", f"/api/now/table/{table}/{sys_id}")
279 |
280 | async def get_incident_by_number(self, number: str) -> Dict[str, Any]:
281 | """Get an incident by its number"""
282 | result = await self.request("GET", f"/api/now/table/incident",
283 | params={"sysparm_query": f"number={number}", "sysparm_limit": 1})
284 | if result.get("result") and len(result["result"]) > 0:
285 | return result["result"][0]
286 | return None
287 |
288 | async def search(self, query: str, table: str = "incident", limit: int = 10) -> Dict[str, Any]:
289 | """Search for records using text query"""
290 | return await self.request("GET", f"/api/now/table/{table}",
291 | params={"sysparm_query": f"123TEXTQUERY321={query}", "sysparm_limit": limit})
292 |
293 | async def get_available_tables(self) -> List[str]:
294 | """Get a list of available tables"""
295 | result = await self.request("GET", "/api/now/table/sys_db_object",
296 | params={"sysparm_fields": "name,label", "sysparm_limit": 100})
297 | return result.get("result", [])
298 |
299 | async def get_table_schema(self, table: str) -> Dict[str, Any]:
300 | """Get the schema for a table"""
301 | result = await self.request("GET", f"/api/now/ui/meta/{table}")
302 | return result
303 |
304 |
305 | class ScriptUpdateModel(BaseModel):
306 | """Model for updating a ServiceNow script"""
307 | name: str = Field(..., description="The name of the script")
308 | script: str = Field(..., description="The script content")
309 | type: str = Field(..., description="The type of script (e.g., sys_script_include)")
310 | description: Optional[str] = Field(None, description="Description of the script")
311 |
312 | class ServiceNowMCP:
313 | """ServiceNow MCP Server"""
314 |
315 | def __init__(self,
316 | instance_url: str,
317 | auth: Authentication,
318 | name: str = "ServiceNow MCP"):
319 | self.client = ServiceNowClient(instance_url, auth)
320 | self.mcp = FastMCP(name, dependencies=[
321 | "requests",
322 | "httpx",
323 | "pydantic"
324 | ])
325 |
326 | # Register resources
327 | self.mcp.resource("servicenow://incidents")(self.list_incidents)
328 | self.mcp.resource("servicenow://incidents/{number}")(self.get_incident)
329 | self.mcp.resource("servicenow://users")(self.list_users)
330 | self.mcp.resource("servicenow://knowledge")(self.list_knowledge)
331 | self.mcp.resource("servicenow://tables")(self.get_tables)
332 | self.mcp.resource("servicenow://tables/{table}")(self.get_table_records)
333 | self.mcp.resource("servicenow://schema/{table}")(self.get_table_schema)
334 |
335 | # Register tools
336 | self.mcp.tool(name="create_incident")(self.create_incident)
337 | self.mcp.tool(name="update_incident")(self.update_incident)
338 | self.mcp.tool(name="search_records")(self.search_records)
339 | self.mcp.tool(name="get_record")(self.get_record)
340 | self.mcp.tool(name="perform_query")(self.perform_query)
341 | self.mcp.tool(name="add_comment")(self.add_comment)
342 | self.mcp.tool(name="add_work_notes")(self.add_work_notes)
343 |
344 | # Register natural language tools
345 | self.mcp.tool(name="natural_language_search")(self.natural_language_search)
346 | self.mcp.tool(name="natural_language_update")(self.natural_language_update)
347 | self.mcp.tool(name="update_script")(self.update_script)
348 |
349 | # Register prompts
350 | self.mcp.prompt(name="analyze_incident")(self.incident_analysis_prompt)
351 | self.mcp.prompt(name="create_incident_prompt")(self.create_incident_prompt)
352 |
353 | async def close(self):
354 | """Close the ServiceNow client"""
355 | await self.client.close()
356 |
357 | def run(self, transport: str = "stdio"):
358 | """Run the ServiceNow MCP server"""
359 | try:
360 | self.mcp.run(transport=transport)
361 | finally:
362 | asyncio.run(self.close())
363 |
364 | # Resource handlers
365 | async def list_incidents(self) -> str:
366 | """List recent incidents in ServiceNow"""
367 | options = QueryOptions(limit=10)
368 | result = await self.client.get_records("incident", options)
369 | return json.dumps(result, indent=2)
370 |
371 | async def get_incident(self, number: str) -> str:
372 | """Get a specific incident by number"""
373 | try:
374 | # Always use get_incident_by_number to query by incident number, not get_record
375 | incident = await self.client.get_incident_by_number(number)
376 | if incident:
377 | return json.dumps({"result": incident}, indent=2)
378 | else:
379 | logger.error(f"No incident found with number: {number}")
380 | return json.dumps({"error":{"message":"No Record found","detail":"Record doesn't exist or ACL restricts the record retrieval"},"status":"failure"})
381 | except Exception as e:
382 | logger.error(f"Error getting incident {number}: {str(e)}")
383 | return json.dumps({"error":{"message":str(e),"detail":"Error occurred while retrieving the record"},"status":"failure"})
384 |
385 | async def list_users(self) -> str:
386 | """List users in ServiceNow"""
387 | options = QueryOptions(limit=10)
388 | result = await self.client.get_records("sys_user", options)
389 | return json.dumps(result, indent=2)
390 |
391 | async def list_knowledge(self) -> str:
392 | """List knowledge articles in ServiceNow"""
393 | options = QueryOptions(limit=10)
394 | result = await self.client.get_records("kb_knowledge", options)
395 | return json.dumps(result, indent=2)
396 |
397 | async def get_tables(self) -> str:
398 | """Get a list of available tables"""
399 | result = await self.client.get_available_tables()
400 | return json.dumps({"result": result}, indent=2)
401 |
402 | async def get_table_records(self, table: str) -> str:
403 | """Get records from a specific table"""
404 | options = QueryOptions(limit=10)
405 | result = await self.client.get_records(table, options)
406 | return json.dumps(result, indent=2)
407 |
408 | async def get_table_schema(self, table: str) -> str:
409 | """Get the schema for a table"""
410 | result = await self.client.get_table_schema(table)
411 | return json.dumps(result, indent=2)
412 |
413 | # Tool handlers
414 | async def create_incident(self,
415 | incident,
416 | ctx: Context = None) -> str:
417 | """
418 | Create a new incident in ServiceNow
419 |
420 | Args:
421 | incident: The incident details to create - can be either an IncidentCreate object,
422 | a dictionary containing incident fields, or a string with the description
423 | ctx: Optional context object for progress reporting
424 |
425 | Returns:
426 | JSON response from ServiceNow
427 | """
428 | # Handle different input types
429 | if isinstance(incident, str):
430 | # If a string was provided, treat it as the description and generate a short description
431 | short_desc = incident[:50] + ('...' if len(incident) > 50 else '')
432 | incident_data = {
433 | "short_description": short_desc,
434 | "description": incident
435 | }
436 | logger.info(f"Creating incident from string description: {short_desc}")
437 | elif isinstance(incident, dict):
438 | # Dictionary provided
439 | incident_data = incident
440 | logger.info(f"Creating incident from dictionary: {incident.get('short_description', 'No short description')}")
441 | elif isinstance(incident, IncidentCreate):
442 | # IncidentCreate model provided
443 | incident_data = incident.dict(exclude_none=True)
444 | logger.info(f"Creating incident from IncidentCreate: {incident.short_description}")
445 | else:
446 | error_message = f"Invalid incident type: {type(incident)}. Expected IncidentCreate, dict, or str."
447 | logger.error(error_message)
448 | return json.dumps({"error": error_message})
449 |
450 | # Validate that required fields are present
451 | if "short_description" not in incident_data and isinstance(incident, dict):
452 | if "description" in incident_data:
453 | # Auto-generate short description from description
454 | desc = incident_data["description"]
455 | incident_data["short_description"] = desc[:50] + ('...' if len(desc) > 50 else '')
456 | else:
457 | incident_data["short_description"] = "Incident created through API"
458 |
459 | if "description" not in incident_data and isinstance(incident, dict):
460 | if "short_description" in incident_data:
461 | incident_data["description"] = incident_data["short_description"]
462 | else:
463 | incident_data["description"] = "No description provided"
464 |
465 | # Log and create the incident
466 | if ctx:
467 | await ctx.info(f"Creating incident: {incident_data.get('short_description', 'No short description')}")
468 |
469 | try:
470 | result = await self.client.create_record("incident", incident_data)
471 |
472 | if ctx:
473 | await ctx.info(f"Created incident: {result['result']['number']}")
474 |
475 | return json.dumps(result, indent=2)
476 | except Exception as e:
477 | error_message = f"Error creating incident: {str(e)}"
478 | logger.error(error_message)
479 | if ctx:
480 | await ctx.error(error_message)
481 | return json.dumps({"error": error_message})
482 |
483 | async def update_incident(self,
484 | number: str,
485 | updates: IncidentUpdate,
486 | ctx: Context = None) -> str:
487 | """
488 | Update an existing incident in ServiceNow
489 |
490 | Args:
491 | number: The incident number (INC0010001)
492 | updates: The fields to update
493 | ctx: Optional context object for progress reporting
494 |
495 | Returns:
496 | JSON response from ServiceNow
497 | """
498 | # First, get the sys_id for the incident number
499 | if ctx:
500 | await ctx.info(f"Looking up incident: {number}")
501 |
502 | incident = await self.client.get_incident_by_number(number)
503 |
504 | if not incident:
505 | error_message = f"Incident {number} not found"
506 | if ctx:
507 | await ctx.error(error_message)
508 | return json.dumps({"error": error_message})
509 |
510 | sys_id = incident['sys_id']
511 |
512 | # Now update the incident
513 | if ctx:
514 | await ctx.info(f"Updating incident: {number}")
515 |
516 | data = updates.dict(exclude_none=True)
517 | result = await self.client.update_record("incident", sys_id, data)
518 |
519 | return json.dumps(result, indent=2)
520 |
521 | async def search_records(self,
522 | query: str,
523 | table: str = "incident",
524 | limit: int = 10,
525 | ctx: Context = None) -> str:
526 | """
527 | Search for records in ServiceNow using text query
528 |
529 | Args:
530 | query: Text to search for
531 | table: Table to search in
532 | limit: Maximum number of results to return
533 | ctx: Optional context object for progress reporting
534 |
535 | Returns:
536 | JSON response containing matching records
537 | """
538 | if ctx:
539 | await ctx.info(f"Searching {table} for: {query}")
540 |
541 | result = await self.client.search(query, table, limit)
542 | return json.dumps(result, indent=2)
543 |
544 | async def get_record(self,
545 | table: str,
546 | sys_id: str,
547 | ctx: Context = None) -> str:
548 | """
549 | Get a specific record by sys_id
550 |
551 | Args:
552 | table: Table to query
553 | sys_id: System ID of the record
554 | ctx: Optional context object for progress reporting
555 |
556 | Returns:
557 | JSON response containing the record
558 | """
559 | if ctx:
560 | await ctx.info(f"Getting {table} record: {sys_id}")
561 |
562 | result = await self.client.get_record(table, sys_id)
563 | return json.dumps(result, indent=2)
564 |
565 | async def perform_query(self,
566 | table: str,
567 | query: str = "",
568 | limit: int = 10,
569 | offset: int = 0,
570 | fields: Optional[List[str]] = None,
571 | ctx: Context = None) -> str:
572 | """
573 | Perform a query against ServiceNow
574 |
575 | Args:
576 | table: Table to query
577 | query: Encoded query string (ServiceNow syntax)
578 | limit: Maximum number of results to return
579 | offset: Number of records to skip
580 | fields: List of fields to return (or all fields if None)
581 | ctx: Optional context object for progress reporting
582 |
583 | Returns:
584 | JSON response containing query results
585 | """
586 | if ctx:
587 | await ctx.info(f"Querying {table} with: {query}")
588 |
589 | options = QueryOptions(
590 | limit=limit,
591 | offset=offset,
592 | fields=fields,
593 | query=query
594 | )
595 |
596 | result = await self.client.get_records(table, options)
597 | return json.dumps(result, indent=2)
598 |
599 | async def add_comment(self,
600 | number: str,
601 | comment: str,
602 | ctx: Context = None) -> str:
603 | """
604 | Add a comment to an incident (customer visible)
605 |
606 | Args:
607 | number: Incident number
608 | comment: Comment to add
609 | ctx: Optional context object for progress reporting
610 |
611 | Returns:
612 | JSON response from ServiceNow
613 | """
614 | if ctx:
615 | await ctx.info(f"Adding comment to incident: {number}")
616 |
617 | incident = await self.client.get_incident_by_number(number)
618 |
619 | if not incident:
620 | error_message = f"Incident {number} not found"
621 | if ctx:
622 | await ctx.error(error_message)
623 | return json.dumps({"error": error_message})
624 |
625 | sys_id = incident['sys_id']
626 |
627 | # Add the comment
628 | update = {"comments": comment}
629 | result = await self.client.update_record("incident", sys_id, update)
630 |
631 | return json.dumps(result, indent=2)
632 |
633 | async def add_work_notes(self,
634 | number: str,
635 | work_notes: str,
636 | ctx: Context = None) -> str:
637 | """
638 | Add work notes to an incident (internal)
639 |
640 | Args:
641 | number: Incident number
642 | work_notes: Work notes to add
643 | ctx: Optional context object for progress reporting
644 |
645 | Returns:
646 | JSON response from ServiceNow
647 | """
648 | if ctx:
649 | await ctx.info(f"Adding work notes to incident: {number}")
650 |
651 | incident = await self.client.get_incident_by_number(number)
652 |
653 | if not incident:
654 | error_message = f"Incident {number} not found"
655 | if ctx:
656 | await ctx.error(error_message)
657 | return json.dumps({"error": error_message})
658 |
659 | sys_id = incident['sys_id']
660 |
661 | # Add the work notes
662 | update = {"work_notes": work_notes}
663 | result = await self.client.update_record("incident", sys_id, update)
664 |
665 | return json.dumps(result, indent=2)
666 |
667 | # Natural language tools
668 | async def natural_language_search(self,
669 | query: str,
670 | ctx: Context = None) -> str:
671 | """
672 | Search for records using natural language
673 |
674 | Examples:
675 | - "find all incidents about SAP"
676 | - "search for incidents related to email"
677 | - "show me all incidents with high priority"
678 |
679 | Args:
680 | query: Natural language query
681 | ctx: Optional context object for progress reporting
682 |
683 | Returns:
684 | JSON response containing matching records
685 | """
686 | if ctx:
687 | await ctx.info(f"Processing natural language query: {query}")
688 |
689 | # Parse the query
690 | search_params = NLPProcessor.parse_search_query(query)
691 |
692 | if ctx:
693 | await ctx.info(f"Searching {search_params['table']} with query: {search_params['query']}")
694 |
695 | # Perform the search
696 | options = QueryOptions(
697 | limit=search_params['limit'],
698 | query=search_params['query']
699 | )
700 |
701 | result = await self.client.get_records(search_params['table'], options)
702 | return json.dumps(result, indent=2)
703 |
704 | async def natural_language_update(self,
705 | command: str,
706 | ctx: Context = None) -> str:
707 | """
708 | Update a record using natural language
709 |
710 | Examples:
711 | - "Update incident INC0010001 saying I'm working on it"
712 | - "Set incident INC0010002 to in progress"
713 | - "Close incident INC0010003 with resolution: fixed the issue"
714 |
715 | Args:
716 | command: Natural language update command
717 | ctx: Optional context object for progress reporting
718 |
719 | Returns:
720 | JSON response from ServiceNow
721 | """
722 | if ctx:
723 | await ctx.info(f"Processing natural language update: {command}")
724 |
725 | try:
726 | # Parse the command
727 | record_number, updates = NLPProcessor.parse_update_command(command)
728 |
729 | if ctx:
730 | await ctx.info(f"Updating {record_number} with: {updates}")
731 |
732 | # Get the record
733 | if record_number.startswith("INC"):
734 | incident = await self.client.get_incident_by_number(record_number)
735 | if not incident:
736 | error_message = f"Incident {record_number} not found"
737 | if ctx:
738 | await ctx.error(error_message)
739 | return json.dumps({"error": error_message})
740 |
741 | sys_id = incident['sys_id']
742 | table = "incident"
743 | else:
744 | # Handle other record types if needed
745 | error_message = f"Record type not supported: {record_number}"
746 | if ctx:
747 | await ctx.error(error_message)
748 | return json.dumps({"error": error_message})
749 |
750 | # Update the record
751 | result = await self.client.update_record(table, sys_id, updates)
752 | return json.dumps(result, indent=2)
753 |
754 | except ValueError as e:
755 | error_message = str(e)
756 | if ctx:
757 | await ctx.error(error_message)
758 | return json.dumps({"error": error_message})
759 |
760 | async def update_script(self,
761 | script_update: ScriptUpdateModel,
762 | ctx: Context = None) -> str:
763 | """
764 | Update a ServiceNow script
765 |
766 | Args:
767 | script_update: The script update details
768 | ctx: Optional context object for progress reporting
769 |
770 | Returns:
771 | JSON response from ServiceNow
772 | """
773 | if ctx:
774 | await ctx.info(f"Updating script: {script_update.name}")
775 |
776 | # Search for the script by name
777 | table = script_update.type
778 | query = f"name={script_update.name}"
779 |
780 | options = QueryOptions(
781 | limit=1,
782 | query=query
783 | )
784 |
785 | result = await self.client.get_records(table, options)
786 |
787 | if not result.get("result") or len(result["result"]) == 0:
788 | # Script doesn't exist, create it
789 | if ctx:
790 | await ctx.info(f"Script not found, creating new script: {script_update.name}")
791 |
792 | data = {
793 | "name": script_update.name,
794 | "script": script_update.script
795 | }
796 |
797 | if script_update.description:
798 | data["description"] = script_update.description
799 |
800 | result = await self.client.create_record(table, data)
801 | else:
802 | # Script exists, update it
803 | script = result["result"][0]
804 | sys_id = script["sys_id"]
805 |
806 | if ctx:
807 | await ctx.info(f"Updating existing script: {script_update.name} ({sys_id})")
808 |
809 | data = {
810 | "script": script_update.script
811 | }
812 |
813 | if script_update.description:
814 | data["description"] = script_update.description
815 |
816 | result = await self.client.update_record(table, sys_id, data)
817 |
818 | return json.dumps(result, indent=2)
819 |
820 | # Prompt templates
821 | def incident_analysis_prompt(self, incident_number: str) -> str:
822 | """Create a prompt to analyze a ServiceNow incident
823 |
824 | Args:
825 | incident_number: The incident number to analyze (e.g., INC0010001)
826 |
827 | Returns:
828 | Prompt text for analyzing the incident
829 | """
830 | return f"""
831 | Please analyze the following ServiceNow incident {incident_number}.
832 |
833 | First, call the appropriate tool to fetch the incident details using get_incident.
834 |
835 | Then, provide a comprehensive analysis with the following sections:
836 |
837 | 1. Summary: A brief overview of the incident
838 | 2. Impact Assessment: Analysis of the impact based on the severity, priority, and affected users
839 | 3. Root Cause Analysis: Potential causes based on available information
840 | 4. Resolution Recommendations: Suggested next steps to resolve the incident
841 | 5. SLA Status: Whether the incident is at risk of breaching SLAs
842 |
843 | Use a professional and clear tone appropriate for IT service management.
844 | """
845 |
846 | def create_incident_prompt(self) -> str:
847 | """Create a prompt for incident creation guidance
848 |
849 | Returns:
850 | Prompt text for helping users create an incident
851 | """
852 | return """
853 | I'll help you create a new ServiceNow incident. Please provide the following information:
854 |
855 | 1. Short Description: A brief title for the incident (required)
856 | 2. Detailed Description: A thorough explanation of the issue (required)
857 | 3. Caller: The person reporting the issue (optional)
858 | 4. Category and Subcategory: The type of issue (optional)
859 | 5. Impact (1-High, 2-Medium, 3-Low): How broadly this affects users (optional)
860 | 6. Urgency (1-High, 2-Medium, 3-Low): How time-sensitive this issue is (optional)
861 |
862 | After collecting this information, I'll use the create_incident tool to submit the incident to ServiceNow.
863 | """
864 |
865 |
866 | # Factory functions for creating authentication objects
867 | def create_basic_auth(username: str, password: str) -> BasicAuth:
868 | """Create BasicAuth object for ServiceNow authentication"""
869 | return BasicAuth(username, password)
870 |
871 | def create_token_auth(token: str) -> TokenAuth:
872 | """Create TokenAuth object for ServiceNow authentication"""
873 | return TokenAuth(token)
874 |
875 | def create_oauth_auth(client_id: str, client_secret: str,
876 | username: str, password: str,
877 | instance_url: str) -> OAuthAuth:
878 | """Create OAuthAuth object for ServiceNow authentication"""
879 | return OAuthAuth(client_id, client_secret, username, password, instance_url)
880 |
```