#
tokens: 24349/50000 14/14 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .env.example
├── .github
│   └── workflows
│       └── python-package.yml
├── .gitignore
├── CONTRIBUTING.md
├── LICENSE
├── mcp_server_servicenow
│   ├── __init__.py
│   ├── cli.py
│   ├── nlp.py
│   └── server.py
├── pyproject.toml
├── README.md
├── requirements.txt
├── servicenow-mcp.py
└── tests
    ├── __init__.py
    └── test_nlp.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | # Python bytecode
 2 | __pycache__/
 3 | *.py[cod]
 4 | *$py.class
 5 | 
 6 | # Distribution / packaging
 7 | dist/
 8 | build/
 9 | *.egg-info/
10 | *.egg
11 | 
12 | # Virtual environments
13 | venv/
14 | env/
15 | ENV/
16 | 
17 | # IDE files
18 | .idea/
19 | .vscode/
20 | *.swp
21 | *.swo
22 | 
23 | # Environment variables
24 | .env
25 | 
26 | # Logs
27 | *.log
28 | 
29 | # Test files
30 | test_*.js
31 | 
32 | # OS specific files
33 | .DS_Store
34 | Thumbs.db
35 | 
```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
 1 | # ServiceNow MCP Server Environment Variables
 2 | 
 3 | # ServiceNow Instance URL
 4 | SERVICENOW_INSTANCE_URL=https://your-instance.service-now.com/
 5 | 
 6 | # Authentication - Basic Auth
 7 | SERVICENOW_USERNAME=your-username
 8 | SERVICENOW_PASSWORD=your-password
 9 | 
10 | # Authentication - Token Auth (alternative to Basic Auth)
11 | # SERVICENOW_TOKEN=your-token
12 | 
13 | # Authentication - OAuth (alternative to Basic Auth)
14 | # SERVICENOW_CLIENT_ID=your-client-id
15 | # SERVICENOW_CLIENT_SECRET=your-client-secret
16 | # SERVICENOW_USERNAME=your-username
17 | # SERVICENOW_PASSWORD=your-password
18 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # ServiceNow MCP Server
  2 | 
  3 | [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
  4 | 
  5 | A Model Context Protocol (MCP) server that interfaces with ServiceNow, allowing AI agents to access and manipulate ServiceNow data through a secure API. This server enables natural language interactions with ServiceNow, making it easier to search for records, update them, and manage scripts.
  6 | 
  7 | ## Features
  8 | 
  9 | ### Resources
 10 | 
 11 | - `servicenow://incidents`: List recent incidents
 12 | - `servicenow://incidents/{number}`: Get a specific incident by number
 13 | - `servicenow://users`: List users
 14 | - `servicenow://knowledge`: List knowledge articles
 15 | - `servicenow://tables`: List available tables
 16 | - `servicenow://tables/{table}`: Get records from a specific table
 17 | - `servicenow://schema/{table}`: Get the schema for a table
 18 | 
 19 | ### Tools
 20 | 
 21 | #### Basic Tools
 22 | - `create_incident`: Create a new incident
 23 | - `update_incident`: Update an existing incident
 24 | - `search_records`: Search for records using text query
 25 | - `get_record`: Get a specific record by sys_id
 26 | - `perform_query`: Perform a query against ServiceNow
 27 | - `add_comment`: Add a comment to an incident (customer visible)
 28 | - `add_work_notes`: Add work notes to an incident (internal)
 29 | 
 30 | #### Natural Language Tools
 31 | - `natural_language_search`: Search for records using natural language (e.g., "find all incidents about SAP")
 32 | - `natural_language_update`: Update records using natural language (e.g., "Update incident INC0010001 saying I'm working on it")
 33 | - `update_script`: Update ServiceNow script files (script includes, business rules, etc.)
 34 | 
 35 | ## Installation
 36 | 
 37 | ### From PyPI
 38 | 
 39 | ```bash
 40 | pip install mcp-server-servicenow
 41 | ```
 42 | 
 43 | ### From Source
 44 | 
 45 | ```bash
 46 | git clone https://github.com/michaelbuckner/servicenow-mcp.git
 47 | cd servicenow-mcp
 48 | pip install -e .
 49 | ```
 50 | 
 51 | ## Usage
 52 | 
 53 | ### Command Line
 54 | 
 55 | Run the server using the Python module:
 56 | 
 57 | ```bash
 58 | python -m mcp_server_servicenow.cli --url "https://your-instance.service-now.com/" --username "your-username" --password "your-password"
 59 | ```
 60 | 
 61 | Or use environment variables:
 62 | 
 63 | ```bash
 64 | export SERVICENOW_INSTANCE_URL="https://your-instance.service-now.com/"
 65 | export SERVICENOW_USERNAME="your-username"
 66 | export SERVICENOW_PASSWORD="your-password"
 67 | python -m mcp_server_servicenow.cli
 68 | ```
 69 | 
 70 | ### Configuration in Cline
 71 | 
 72 | To use this MCP server with Cline, add the following to your MCP settings file:
 73 | 
 74 | ```json
 75 | {
 76 |   "mcpServers": {
 77 |     "servicenow": {
 78 |       "command": "/path/to/your/python/executable",
 79 |       "args": [
 80 |         "-m",
 81 |         "mcp_server_servicenow.cli",
 82 |         "--url", "https://your-instance.service-now.com/",
 83 |         "--username", "your-username",
 84 |         "--password", "your-password"
 85 |       ],
 86 |       "disabled": false,
 87 |       "autoApprove": []
 88 |     }
 89 |   }
 90 | }
 91 | ```
 92 | 
 93 | **Note:** Make sure to use the full path to the Python executable that has the `mcp-server-servicenow` package installed.
 94 | 
 95 | ## Natural Language Examples
 96 | 
 97 | ### Searching Records
 98 | 
 99 | You can search for records using natural language queries:
100 | 
101 | ```
102 | find all incidents about email
103 | search for incidents related to network issues
104 | show me all incidents with high priority
105 | ```
106 | 
107 | ### Updating Records
108 | 
109 | You can update records using natural language commands:
110 | 
111 | ```
112 | Update incident INC0010001 saying I'm working on it
113 | Set incident INC0010002 to in progress
114 | Close incident INC0010003 with resolution: fixed the issue
115 | ```
116 | 
117 | ### Managing Scripts
118 | 
119 | You can update ServiceNow scripts from local files:
120 | 
121 | ```
122 | Update the ServiceNow script include "HelloWorld" with the contents of hello_world.js
123 | Upload utils.js to ServiceNow as a script include named "UtilityFunctions"
124 | Update @form_validation.js, it's a client script called "FormValidation"
125 | ```
126 | 
127 | ## Authentication Methods
128 | 
129 | The server supports multiple authentication methods:
130 | 
131 | 1. **Basic Authentication**: Username and password
132 | 2. **Token Authentication**: OAuth token
133 | 3. **OAuth Authentication**: Client ID, Client Secret, Username, and Password
134 | 
135 | ## Development
136 | 
137 | ### Prerequisites
138 | 
139 | - Python 3.8+
140 | - ServiceNow instance with API access
141 | 
142 | ### Setting Up Development Environment
143 | 
144 | ```bash
145 | # Clone the repository
146 | git clone https://github.com/michaelbuckner/servicenow-mcp.git
147 | cd servicenow-mcp
148 | 
149 | # Create a virtual environment
150 | python -m venv venv
151 | source venv/bin/activate  # On Windows: venv\Scripts\activate
152 | 
153 | # Install development dependencies
154 | pip install -e ".[dev]"
155 | ```
156 | 
157 | ### Running Tests
158 | 
159 | ```bash
160 | pytest
161 | ```
162 | 
163 | ## Contributing
164 | 
165 | Contributions are welcome! Please feel free to submit a Pull Request.
166 | 
167 | 1. Fork the repository
168 | 2. Create your feature branch (`git checkout -b feature/amazing-feature`)
169 | 3. Commit your changes (`git commit -m 'Add some amazing feature'`)
170 | 4. Push to the branch (`git push origin feature/amazing-feature`)
171 | 5. Open a Pull Request
172 | 
173 | ## License
174 | 
175 | This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
176 | 
```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
 1 | # Contributing to ServiceNow MCP Server
 2 | 
 3 | Thank you for considering contributing to the ServiceNow MCP Server! This document provides guidelines and instructions for contributing to this project.
 4 | 
 5 | ## Code of Conduct
 6 | 
 7 | By participating in this project, you agree to abide by our code of conduct. Please be respectful and considerate of others.
 8 | 
 9 | ## How to Contribute
10 | 
11 | ### Reporting Bugs
12 | 
13 | If you find a bug, please create an issue on GitHub with the following information:
14 | 
15 | - A clear, descriptive title
16 | - A detailed description of the issue
17 | - Steps to reproduce the bug
18 | - Expected behavior
19 | - Actual behavior
20 | - Screenshots (if applicable)
21 | - Environment information (OS, Python version, etc.)
22 | 
23 | ### Suggesting Enhancements
24 | 
25 | If you have an idea for an enhancement, please create an issue on GitHub with the following information:
26 | 
27 | - A clear, descriptive title
28 | - A detailed description of the enhancement
29 | - Any relevant examples or mockups
30 | - Why this enhancement would be useful
31 | 
32 | ### Pull Requests
33 | 
34 | 1. Fork the repository
35 | 2. Create a new branch (`git checkout -b feature/amazing-feature`)
36 | 3. Make your changes
37 | 4. Run the tests (`pytest`)
38 | 5. Commit your changes (`git commit -m 'Add some amazing feature'`)
39 | 6. Push to the branch (`git push origin feature/amazing-feature`)
40 | 7. Open a Pull Request
41 | 
42 | ## Development Setup
43 | 
44 | 1. Clone the repository
45 |    ```bash
46 |    git clone https://github.com/michaelbuckner/servicenow-mcp.git
47 |    cd servicenow-mcp
48 |    ```
49 | 
50 | 2. Create a virtual environment
51 |    ```bash
52 |    python -m venv venv
53 |    source venv/bin/activate  # On Windows: venv\Scripts\activate
54 |    ```
55 | 
56 | 3. Install development dependencies
57 |    ```bash
58 |    pip install -e ".[dev]"
59 |    ```
60 | 
61 | 4. Create a `.env` file with your ServiceNow credentials (see `.env.example`)
62 | 
63 | 5. Run the tests
64 |    ```bash
65 |    pytest
66 |    ```
67 | 
68 | ## Coding Standards
69 | 
70 | - Follow PEP 8 style guide
71 | - Use type hints
72 | - Write docstrings for all functions, classes, and methods
73 | - Write tests for all new features and bug fixes
74 | 
75 | ## Testing
76 | 
77 | - All tests should be written using pytest
78 | - Run tests with `pytest`
79 | - Ensure all tests pass before submitting a pull request
80 | 
81 | ## Documentation
82 | 
83 | - Update the README.md with any necessary changes
84 | - Document all new features and changes in the code
85 | 
86 | ## License
87 | 
88 | By contributing to this project, you agree that your contributions will be licensed under the project's MIT License.
89 | 
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
1 | # Test package initialization
2 | 
```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
1 | mcp>=1.0.0
2 | httpx>=0.27.0
3 | requests>=2.31.0
4 | pydantic>=2.0.0
5 | python-dotenv>=1.0.0
```

--------------------------------------------------------------------------------
/mcp_server_servicenow/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | ServiceNow MCP Server
3 | 
4 | This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
5 | It allows AI agents to access and manipulate ServiceNow data through a secure API.
6 | """
7 | 
8 | __version__ = "0.1.0"
9 | 
```

--------------------------------------------------------------------------------
/.github/workflows/python-package.yml:
--------------------------------------------------------------------------------

```yaml
 1 | name: Python Package
 2 | 
 3 | on:
 4 |   push:
 5 |     branches: [ main ]
 6 |   pull_request:
 7 |     branches: [ main ]
 8 | 
 9 | jobs:
10 |   build:
11 |     runs-on: ubuntu-latest
12 |     strategy:
13 |       fail-fast: false
14 |       matrix:
15 |         python-version: ["3.8", "3.9", "3.10", "3.11"]
16 | 
17 |     steps:
18 |     - uses: actions/checkout@v3
19 |     - name: Set up Python ${{ matrix.python-version }}
20 |       uses: actions/setup-python@v4
21 |       with:
22 |         python-version: ${{ matrix.python-version }}
23 |     - name: Install dependencies
24 |       run: |
25 |         python -m pip install --upgrade pip
26 |         python -m pip install flake8 pytest
27 |         if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
28 |         pip install -e ".[dev]"
29 |     - name: Lint with flake8
30 |       run: |
31 |         # stop the build if there are Python syntax errors or undefined names
32 |         flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
33 |         # exit-zero treats all errors as warnings
34 |         flake8 . --count --exit-zero --max-complexity=10 --max-line-length=100 --statistics
35 |     - name: Test with pytest
36 |       run: |
37 |         pytest
38 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [build-system]
 2 | requires = ["setuptools>=42", "wheel"]
 3 | build-backend = "setuptools.build_meta"
 4 | 
 5 | [project]
 6 | name = "mcp-server-servicenow"
 7 | version = "0.1.0"
 8 | description = "ServiceNow MCP Server for natural language interactions with ServiceNow"
 9 | authors = [
10 |     {name = "Michael Buckner", email = "[email protected]"}
11 | ]
12 | readme = "README.md"
13 | requires-python = ">=3.8"
14 | license = {text = "MIT"}
15 | keywords = ["servicenow", "mcp", "ai", "nlp", "automation"]
16 | classifiers = [
17 |     "Development Status :: 4 - Beta",
18 |     "Programming Language :: Python :: 3",
19 |     "Programming Language :: Python :: 3.8",
20 |     "Programming Language :: Python :: 3.9",
21 |     "Programming Language :: Python :: 3.10",
22 |     "Programming Language :: Python :: 3.11",
23 |     "License :: OSI Approved :: MIT License",
24 |     "Operating System :: OS Independent",
25 |     "Intended Audience :: Developers",
26 |     "Topic :: Software Development :: Libraries :: Python Modules",
27 |     "Topic :: Internet :: WWW/HTTP",
28 | ]
29 | dependencies = [
30 |     "mcp>=1.0.0",
31 |     "httpx>=0.27.0",
32 |     "requests>=2.31.0",
33 |     "pydantic>=2.0.0",
34 |     "python-dotenv>=1.0.0",
35 | ]
36 | 
37 | [project.optional-dependencies]
38 | dev = [
39 |     "pytest>=7.0.0",
40 |     "pytest-cov>=4.0.0",
41 |     "black>=23.0.0",
42 |     "isort>=5.12.0",
43 |     "mypy>=1.0.0",
44 |     "flake8>=6.0.0",
45 | ]
46 | 
47 | [project.urls]
48 | "Homepage" = "https://github.com/michaelbuckner/servicenow-mcp"
49 | "Bug Tracker" = "https://github.com/michaelbuckner/servicenow-mcp/issues"
50 | "Documentation" = "https://github.com/michaelbuckner/servicenow-mcp#readme"
51 | 
52 | [project.scripts]
53 | mcp-server-servicenow = "mcp_server_servicenow.cli:main"
54 | 
55 | [tool.black]
56 | line-length = 100
57 | target-version = ["py38"]
58 | 
59 | [tool.isort]
60 | profile = "black"
61 | line_length = 100
62 | 
63 | [tool.mypy]
64 | python_version = "3.8"
65 | warn_return_any = true
66 | warn_unused_configs = true
67 | disallow_untyped_defs = true
68 | disallow_incomplete_defs = true
69 | 
70 | [tool.pytest.ini_options]
71 | testpaths = ["tests"]
72 | python_files = "test_*.py"
73 | 
```

--------------------------------------------------------------------------------
/mcp_server_servicenow/cli.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | ServiceNow MCP Server CLI
 3 | 
 4 | This module provides the command-line interface for the ServiceNow MCP server.
 5 | """
 6 | 
 7 | import argparse
 8 | import os
 9 | import sys
10 | from dotenv import load_dotenv
11 | 
12 | from mcp_server_servicenow.server import ServiceNowMCP, create_basic_auth
13 | 
14 | def main():
15 |     """Run the ServiceNow MCP server from the command line"""
16 |     # Load environment variables from .env file if it exists
17 |     load_dotenv()
18 |     
19 |     parser = argparse.ArgumentParser(description="ServiceNow MCP Server")
20 |     parser.add_argument("--url", help="ServiceNow instance URL", default=os.environ.get("SERVICENOW_INSTANCE_URL"))
21 |     parser.add_argument("--transport", help="Transport protocol (stdio or sse)", default="stdio", choices=["stdio", "sse"])
22 |     
23 |     # Authentication options
24 |     auth_group = parser.add_argument_group("Authentication")
25 |     auth_group.add_argument("--username", help="ServiceNow username", default=os.environ.get("SERVICENOW_USERNAME"))
26 |     auth_group.add_argument("--password", help="ServiceNow password", default=os.environ.get("SERVICENOW_PASSWORD"))
27 |     auth_group.add_argument("--token", help="ServiceNow token", default=os.environ.get("SERVICENOW_TOKEN"))
28 |     auth_group.add_argument("--client-id", help="OAuth client ID", default=os.environ.get("SERVICENOW_CLIENT_ID"))
29 |     auth_group.add_argument("--client-secret", help="OAuth client secret", default=os.environ.get("SERVICENOW_CLIENT_SECRET"))
30 |     
31 |     args = parser.parse_args()
32 |     
33 |     # Check required parameters
34 |     if not args.url:
35 |         print("Error: ServiceNow instance URL is required")
36 |         print("Set SERVICENOW_INSTANCE_URL environment variable or use --url")
37 |         sys.exit(1)
38 |     
39 |     # Determine authentication method
40 |     auth = None
41 |     if args.token:
42 |         from mcp_server_servicenow.server import create_token_auth
43 |         auth = create_token_auth(args.token)
44 |     elif args.client_id and args.client_secret and args.username and args.password:
45 |         from mcp_server_servicenow.server import create_oauth_auth
46 |         auth = create_oauth_auth(args.client_id, args.client_secret, args.username, args.password, args.url)
47 |     elif args.username and args.password:
48 |         auth = create_basic_auth(args.username, args.password)
49 |     else:
50 |         print("Error: Authentication credentials required")
51 |         print("Either provide username/password, token, or OAuth credentials")
52 |         sys.exit(1)
53 |     
54 |     # Create and run the server
55 |     server = ServiceNowMCP(instance_url=args.url, auth=auth)
56 |     server.run(transport=args.transport)
57 | 
58 | if __name__ == "__main__":
59 |     main()
60 | 
```

--------------------------------------------------------------------------------
/tests/test_nlp.py:
--------------------------------------------------------------------------------

```python
 1 | """
 2 | Tests for the NLP processor module
 3 | """
 4 | 
 5 | import pytest
 6 | from mcp_server_servicenow.nlp import NLPProcessor
 7 | 
 8 | 
 9 | class TestNLPProcessor:
10 |     """Test cases for the NLPProcessor class"""
11 | 
12 |     def test_parse_search_query(self):
13 |         """Test parsing natural language search queries"""
14 |         # Test basic search
15 |         result = NLPProcessor.parse_search_query("find all incidents about email")
16 |         assert result["table"] == "incident"
17 |         assert "123TEXTQUERY321=email" in result["query"]
18 | 
19 |         # Test with different table
20 |         result = NLPProcessor.parse_search_query("search for users related to admin")
21 |         assert result["table"] == "sys_user"
22 |         assert "123TEXTQUERY321=admin" in result["query"]
23 | 
24 |         # Test with priority
25 |         result = NLPProcessor.parse_search_query("show me all incidents with high priority")
26 |         assert result["table"] == "incident"
27 |         assert "priority=1" in result["query"]
28 | 
29 |         # Test with state
30 |         result = NLPProcessor.parse_search_query("find all incidents in progress")
31 |         assert result["table"] == "incident"
32 |         assert "state=2" in result["query"]
33 | 
34 |     def test_parse_update_command(self):
35 |         """Test parsing natural language update commands"""
36 |         # Test basic update
37 |         record_number, updates = NLPProcessor.parse_update_command(
38 |             "Update incident INC0010001 saying I'm working on it"
39 |         )
40 |         assert record_number == "INC0010001"
41 |         assert updates.get("comments") == "I'm working on it"
42 |         assert updates.get("state") == 2  # In Progress
43 | 
44 |         # Test with explicit state change
45 |         record_number, updates = NLPProcessor.parse_update_command(
46 |             "Close incident INC0010002 with resolution: fixed the issue"
47 |         )
48 |         assert record_number == "INC0010002"
49 |         assert updates.get("state") == 7  # Closed
50 |         assert updates.get("close_notes") == "fixed the issue"
51 |         assert updates.get("close_code") == "Solved (Permanently)"
52 | 
53 |         # Test with work notes
54 |         record_number, updates = NLPProcessor.parse_update_command(
55 |             "Update incident INC0010003 with work note: internal troubleshooting steps"
56 |         )
57 |         assert record_number == "INC0010003"
58 |         assert updates.get("work_notes") == "internal troubleshooting steps"
59 | 
60 |     def test_parse_script_update(self):
61 |         """Test parsing script update commands"""
62 |         # Test script include
63 |         filename, script_type, _ = NLPProcessor.parse_script_update(
64 |             "update @my_script.js, it's a script include"
65 |         )
66 |         assert filename == "my_script.js"
67 |         assert script_type == "sys_script_include"
68 | 
69 |         # Test business rule
70 |         filename, script_type, _ = NLPProcessor.parse_script_update(
71 |             "update @validation.js, it's a business rule"
72 |         )
73 |         assert filename == "validation.js"
74 |         assert script_type == "sys_script"
75 | 
76 |         # Test client script
77 |         filename, script_type, _ = NLPProcessor.parse_script_update(
78 |             "update @form_script.js, it's a client script"
79 |         )
80 |         assert filename == "form_script.js"
81 |         assert script_type == "sys_script_client"
82 | 
```

--------------------------------------------------------------------------------
/mcp_server_servicenow/nlp.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Natural Language Processing for ServiceNow MCP Server
  3 | 
  4 | This module provides natural language processing capabilities for the ServiceNow MCP server.
  5 | """
  6 | 
  7 | import re
  8 | from typing import Dict, Any, Tuple, List, Optional
  9 | 
 10 | class NLPProcessor:
 11 |     """Natural Language Processing for ServiceNow queries and commands"""
 12 |     
 13 |     @staticmethod
 14 |     def parse_search_query(query: str) -> Dict[str, Any]:
 15 |         """
 16 |         Parse a natural language search query
 17 |         
 18 |         Examples:
 19 |         - "find all incidents about SAP"
 20 |         - "search for incidents related to email"
 21 |         - "show me all incidents with high priority"
 22 |         
 23 |         Returns:
 24 |             Dict with table, query, and other parameters
 25 |         """
 26 |         # Default to incident table
 27 |         table = "incident"
 28 |         
 29 |         # Extract table if specified
 30 |         table_match = re.search(r'(incidents?|problems?|changes?|tasks?|users?|groups?)', query, re.IGNORECASE)
 31 |         if table_match:
 32 |             table_type = table_match.group(1).lower()
 33 |             if table_type.startswith('incident'):
 34 |                 table = "incident"
 35 |             elif table_type.startswith('problem'):
 36 |                 table = "problem"
 37 |             elif table_type.startswith('change'):
 38 |                 table = "change_request"
 39 |             elif table_type.startswith('task'):
 40 |                 table = "task"
 41 |             elif table_type.startswith('user'):
 42 |                 table = "sys_user"
 43 |             elif table_type.startswith('group'):
 44 |                 table = "sys_user_group"
 45 |         
 46 |         # Extract search terms
 47 |         about_match = re.search(r'(?:about|related to|regarding|concerning|with|containing)\s+([^\.]+)', query, re.IGNORECASE)
 48 |         search_term = ""
 49 |         if about_match:
 50 |             search_term = about_match.group(1).strip()
 51 |         else:
 52 |             # Try to find any terms after common search phrases
 53 |             term_match = re.search(r'(?:find|search for|show|get|list|display)\s+(?:all|any|)(?:\s+\w+)?\s+(?:\w+\s+)?(.+)', query, re.IGNORECASE)
 54 |             if term_match:
 55 |                 search_term = term_match.group(1).strip()
 56 |         
 57 |         # Extract priority if mentioned
 58 |         priority = None
 59 |         if re.search(r'\b(high|critical)\s+priority\b', query, re.IGNORECASE):
 60 |             priority = "1"
 61 |         elif re.search(r'\b(medium)\s+priority\b', query, re.IGNORECASE):
 62 |             priority = "2"
 63 |         elif re.search(r'\b(low)\s+priority\b', query, re.IGNORECASE):
 64 |             priority = "3"
 65 |         
 66 |         # Extract state if mentioned
 67 |         state = None
 68 |         if re.search(r'\b(new|open)\b', query, re.IGNORECASE):
 69 |             state = "1"
 70 |         elif re.search(r'\b(in progress|working)\b', query, re.IGNORECASE):
 71 |             state = "2"
 72 |         elif re.search(r'\b(closed|resolved)\b', query, re.IGNORECASE):
 73 |             state = "7"
 74 |         
 75 |         # Build the query string
 76 |         query_parts = []
 77 |         if search_term:
 78 |             query_parts.append(f"123TEXTQUERY321={search_term}")
 79 |         if priority:
 80 |             query_parts.append(f"priority={priority}")
 81 |         if state:
 82 |             query_parts.append(f"state={state}")
 83 |         
 84 |         query_string = "^".join(query_parts) if query_parts else ""
 85 |         
 86 |         return {
 87 |             "table": table,
 88 |             "query": query_string,
 89 |             "limit": 10
 90 |         }
 91 |     
 92 |     @staticmethod
 93 |     def parse_update_command(command: str) -> Tuple[str, Dict[str, Any]]:
 94 |         """
 95 |         Parse a natural language update command
 96 |         
 97 |         Examples:
 98 |         - "Update incident INC0010001 saying I'm working on it"
 99 |         - "Set incident INC0010002 to in progress"
100 |         - "Close incident INC0010003 with resolution: fixed the issue"
101 |         
102 |         Returns:
103 |             Tuple of (record_number, updates_dict)
104 |         """
105 |         # Extract record number
106 |         number_match = re.search(r'(INC\d+|PRB\d+|CHG\d+|TASK\d+)', command, re.IGNORECASE)
107 |         if not number_match:
108 |             raise ValueError("No record number found in command")
109 |         
110 |         record_number = number_match.group(1).upper()
111 |         
112 |         # Initialize updates dictionary
113 |         updates = {}
114 |         
115 |         # Check for state changes
116 |         if re.search(r'\b(working on|in progress|assign)\b', command, re.IGNORECASE):
117 |             updates["state"] = 2  # In Progress
118 |         elif re.search(r'\b(resolve|resolved|fix|fixed)\b', command, re.IGNORECASE):
119 |             updates["state"] = 6  # Resolved
120 |         elif re.search(r'\b(close|closed)\b', command, re.IGNORECASE):
121 |             updates["state"] = 7  # Closed
122 |         
123 |         # Extract comments or work notes
124 |         comment_match = re.search(r'(?:saying|comment|note|with comment|with note)(?:s|)\s*:?\s*(.+?)(?:$|\.(?:\s|$))', command, re.IGNORECASE)
125 |         if comment_match:
126 |             comment_text = comment_match.group(1).strip()
127 |             # Determine if this should be a comment or work note
128 |             if re.search(r'\b(work note|internal|private)\b', command, re.IGNORECASE):
129 |                 updates["work_notes"] = comment_text
130 |             else:
131 |                 updates["comments"] = comment_text
132 |         
133 |         # Extract close notes if closing
134 |         if "state" in updates and updates["state"] in [6, 7]:
135 |             close_match = re.search(r'(?:with resolution|resolution|close note|resolve with)(?:s|)\s*:?\s*(.+?)(?:$|\.(?:\s|$))', command, re.IGNORECASE)
136 |             if close_match:
137 |                 updates["close_notes"] = close_match.group(1).strip()
138 |                 updates["close_code"] = "Solved (Permanently)"
139 |         
140 |         return record_number, updates
141 |     
142 |     @staticmethod
143 |     def parse_script_update(command: str) -> Tuple[str, str, str]:
144 |         """
145 |         Parse a command to update a ServiceNow script file
146 |         
147 |         Examples:
148 |         - "update @my_script.js, it's a script include"
149 |         - "update @business_rule.js, it's a business rule"
150 |         
151 |         Returns:
152 |             Tuple of (filename, script_type, script_content)
153 |         """
154 |         # Extract filename
155 |         filename_match = re.search(r'@([^\s,]+)', command)
156 |         if not filename_match:
157 |             raise ValueError("No filename found in command")
158 |         
159 |         filename = filename_match.group(1)
160 |         
161 |         # Extract script type
162 |         script_types = {
163 |             "script include": "sys_script_include",
164 |             "business rule": "sys_script",
165 |             "client script": "sys_script_client",
166 |             "ui script": "sys_ui_script",
167 |             "ui action": "sys_ui_action",
168 |             "ui page": "sys_ui_page",
169 |             "ui macro": "sys_ui_macro",
170 |             "scheduled job": "sysauto_script",
171 |             "fix script": "sys_script_fix"
172 |         }
173 |         
174 |         script_type = None
175 |         for type_name, table_name in script_types.items():
176 |             if re.search(rf"\b{type_name}\b", command, re.IGNORECASE):
177 |                 script_type = table_name
178 |                 break
179 |         
180 |         if not script_type:
181 |             # Default to script include if not specified
182 |             script_type = "sys_script_include"
183 |         
184 |         # The script content will be provided separately
185 |         return filename, script_type, ""
186 | 
```

--------------------------------------------------------------------------------
/servicenow-mcp.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | ServiceNow MCP Server
  3 | 
  4 | This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
  5 | It allows AI agents to access and manipulate ServiceNow data through a secure API.
  6 | """
  7 | 
  8 | import os
  9 | import json
 10 | import asyncio
 11 | import logging
 12 | from datetime import datetime
 13 | from enum import Enum
 14 | from typing import Dict, List, Optional, Any, Union, Literal
 15 | 
 16 | import requests
 17 | import httpx
 18 | from pydantic import BaseModel, Field, field_validator
 19 | 
 20 | from mcp.server.fastmcp import FastMCP, Context
 21 | from mcp.server.fastmcp.utilities.logging import get_logger
 22 | 
 23 | logger = get_logger(__name__)
 24 | 
 25 | # ServiceNow API models
 26 | class IncidentState(int, Enum):
 27 |     NEW = 1
 28 |     IN_PROGRESS = 2 
 29 |     ON_HOLD = 3
 30 |     RESOLVED = 6
 31 |     CLOSED = 7
 32 |     CANCELED = 8
 33 | 
 34 | class IncidentPriority(int, Enum):
 35 |     CRITICAL = 1
 36 |     HIGH = 2
 37 |     MODERATE = 3
 38 |     LOW = 4
 39 |     PLANNING = 5
 40 | 
 41 | class IncidentUrgency(int, Enum):
 42 |     HIGH = 1
 43 |     MEDIUM = 2
 44 |     LOW = 3
 45 | 
 46 | class IncidentImpact(int, Enum):
 47 |     HIGH = 1
 48 |     MEDIUM = 2
 49 |     LOW = 3
 50 | 
 51 | class IncidentCreate(BaseModel):
 52 |     """Model for creating a new incident"""
 53 |     short_description: str = Field(..., description="A brief description of the incident")
 54 |     description: str = Field(..., description="A detailed description of the incident")
 55 |     caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
 56 |     category: Optional[str] = Field(None, description="The incident category")
 57 |     subcategory: Optional[str] = Field(None, description="The incident subcategory")
 58 |     urgency: Optional[IncidentUrgency] = Field(IncidentUrgency.MEDIUM, description="The urgency of the incident")
 59 |     impact: Optional[IncidentImpact] = Field(IncidentImpact.MEDIUM, description="The impact of the incident")
 60 |     assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
 61 |     assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
 62 | 
 63 | class IncidentUpdate(BaseModel):
 64 |     """Model for updating an existing incident"""
 65 |     short_description: Optional[str] = Field(None, description="A brief description of the incident")
 66 |     description: Optional[str] = Field(None, description="A detailed description of the incident")
 67 |     caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
 68 |     category: Optional[str] = Field(None, description="The incident category")
 69 |     subcategory: Optional[str] = Field(None, description="The incident subcategory")
 70 |     urgency: Optional[IncidentUrgency] = Field(None, description="The urgency of the incident")
 71 |     impact: Optional[IncidentImpact] = Field(None, description="The impact of the incident")
 72 |     state: Optional[IncidentState] = Field(None, description="The state of the incident")
 73 |     assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
 74 |     assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
 75 |     work_notes: Optional[str] = Field(None, description="Work notes to add to the incident (internal)")
 76 |     comments: Optional[str] = Field(None, description="Customer visible comments to add to the incident")
 77 |     
 78 |     @field_validator('work_notes', 'comments')
 79 |     @classmethod
 80 |     def validate_not_empty(cls, v):
 81 |         if v is not None and v.strip() == '':
 82 |             raise ValueError("Cannot be an empty string")
 83 |         return v
 84 | 
 85 |     class Config:
 86 |         use_enum_values = True
 87 |         
 88 | class QueryOptions(BaseModel):
 89 |     """Options for querying ServiceNow records"""
 90 |     limit: int = Field(10, description="Maximum number of records to return", ge=1, le=1000)
 91 |     offset: int = Field(0, description="Number of records to skip", ge=0)
 92 |     fields: Optional[List[str]] = Field(None, description="List of fields to return")
 93 |     query: Optional[str] = Field(None, description="ServiceNow encoded query string")
 94 |     order_by: Optional[str] = Field(None, description="Field to order results by")
 95 |     order_direction: Optional[Literal["asc", "desc"]] = Field("desc", description="Order direction")
 96 | 
 97 | class Authentication:
 98 |     """Base class for ServiceNow authentication methods"""
 99 |     
100 |     async def get_headers(self) -> Dict[str, str]:
101 |         """Get authentication headers for ServiceNow API requests"""
102 |         raise NotImplementedError("Subclasses must implement this method")
103 | 
104 | class BasicAuth(Authentication):
105 |     """Basic authentication for ServiceNow"""
106 |     
107 |     def __init__(self, username: str, password: str):
108 |         self.username = username
109 |         self.password = password
110 |         
111 |     async def get_headers(self) -> Dict[str, str]:
112 |         """Get authentication headers for ServiceNow API requests"""
113 |         return {}
114 |     
115 |     def get_auth(self) -> tuple:
116 |         """Get authentication tuple for requests"""
117 |         return (self.username, self.password)
118 | 
119 | class TokenAuth(Authentication):
120 |     """Token authentication for ServiceNow"""
121 |     
122 |     def __init__(self, token: str):
123 |         self.token = token
124 |         
125 |     async def get_headers(self) -> Dict[str, str]:
126 |         """Get authentication headers for ServiceNow API requests"""
127 |         return {"Authorization": f"Bearer {self.token}"}
128 |     
129 |     def get_auth(self) -> None:
130 |         """Get authentication tuple for requests"""
131 |         return None
132 | 
133 | class OAuthAuth(Authentication):
134 |     """OAuth authentication for ServiceNow"""
135 |     
136 |     def __init__(self, client_id: str, client_secret: str, username: str, password: str, 
137 |                  instance_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None,
138 |                  token_expiry: Optional[datetime] = None):
139 |         self.client_id = client_id
140 |         self.client_secret = client_secret
141 |         self.username = username
142 |         self.password = password
143 |         self.instance_url = instance_url
144 |         self.token = token
145 |         self.refresh_token = refresh_token
146 |         self.token_expiry = token_expiry
147 |         
148 |     async def get_headers(self) -> Dict[str, str]:
149 |         """Get authentication headers for ServiceNow API requests"""
150 |         if self.token is None or (self.token_expiry and datetime.now() > self.token_expiry):
151 |             await self.refresh()
152 |             
153 |         return {"Authorization": f"Bearer {self.token}"}
154 |     
155 |     def get_auth(self) -> None:
156 |         """Get authentication tuple for requests"""
157 |         return None
158 |         
159 |     async def refresh(self):
160 |         """Refresh the OAuth token"""
161 |         if self.refresh_token:
162 |             # Try refresh flow first
163 |             data = {
164 |                 "grant_type": "refresh_token",
165 |                 "client_id": self.client_id,
166 |                 "client_secret": self.client_secret,
167 |                 "refresh_token": self.refresh_token
168 |             }
169 |         else:
170 |             # Fall back to password flow
171 |             data = {
172 |                 "grant_type": "password",
173 |                 "client_id": self.client_id,
174 |                 "client_secret": self.client_secret,
175 |                 "username": self.username,
176 |                 "password": self.password
177 |             }
178 |             
179 |         token_url = f"{self.instance_url}/oauth_token.do"
180 |         async with httpx.AsyncClient() as client:
181 |             response = await client.post(token_url, data=data)
182 |             response.raise_for_status()
183 |             result = response.json()
184 |             
185 |             self.token = result["access_token"]
186 |             self.refresh_token = result.get("refresh_token")
187 |             expires_in = result.get("expires_in", 1800)  # Default 30 minutes
188 |             self.token_expiry = datetime.now().timestamp() + expires_in
189 | 
190 | class ServiceNowClient:
191 |     """Client for interacting with ServiceNow API"""
192 |     
193 |     def __init__(self, instance_url: str, auth: Authentication):
194 |         self.instance_url = instance_url.rstrip('/')
195 |         self.auth = auth
196 |         self.client = httpx.AsyncClient()
197 |         
198 |     async def close(self):
199 |         """Close the HTTP client"""
200 |         await self.client.aclose()
201 |         
202 |     async def request(self, method: str, path: str, 
203 |                     params: Optional[Dict[str, Any]] = None,
204 |                     json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
205 |         """Make a request to the ServiceNow API"""
206 |         url = f"{self.instance_url}{path}"
207 |         headers = await self.auth.get_headers()
208 |         headers["Accept"] = "application/json"
209 |         
210 |         if isinstance(self.auth, BasicAuth):
211 |             auth = self.auth.get_auth()
212 |         else:
213 |             auth = None
214 |             
215 |         try:
216 |             response = await self.client.request(
217 |                 method=method,
218 |                 url=url,
219 |                 params=params,
220 |                 json=json_data,
221 |                 headers=headers,
222 |                 auth=auth
223 |             )
224 |             response.raise_for_status()
225 |             return response.json()
226 |         except httpx.HTTPStatusError as e:
227 |             logger.error(f"ServiceNow API error: {e.response.text}")
228 |             raise
229 |             
230 |     async def get_record(self, table: str, sys_id: str) -> Dict[str, Any]:
231 |         """Get a record by sys_id"""
232 |         return await self.request("GET", f"/api/now/table/{table}/{sys_id}")
233 |         
234 |     async def get_records(self, table: str, options: QueryOptions = None) -> Dict[str, Any]:
235 |         """Get records with query options"""
236 |         if options is None:
237 |             options = QueryOptions()
238 |             
239 |         params = {
240 |             "sysparm_limit": options.limit,
241 |             "sysparm_offset": options.offset
242 |         }
243 |         
244 |         if options.fields:
245 |             params["sysparm_fields"] = ",".join(options.fields)
246 |             
247 |         if options.query:
248 |             params["sysparm_query"] = options.query
249 |             
250 |         if options.order_by:
251 |             direction = "desc" if options.order_direction == "desc" else "asc"
252 |             params["sysparm_order_by"] = f"{options.order_by}^{direction}"
253 |             
254 |         return await self.request("GET", f"/api/now/table/{table}", params=params)
255 |     
256 |     async def create_record(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
257 |         """Create a new record"""
258 |         return await self.request("POST", f"/api/now/table/{table}", json_data=data)
259 |         
260 |     async def update_record(self, table: str, sys_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
261 |         """Update an existing record"""
262 |         return await self.request("PUT", f"/api/now/table/{table}/{sys_id}", json_data=data)
263 |         
264 |     async def delete_record(self, table: str, sys_id: str) -> Dict[str, Any]:
265 |         """Delete a record"""
266 |         return await self.request("DELETE", f"/api/now/table/{table}/{sys_id}")
267 |         
268 |     async def get_incident_by_number(self, number: str) -> Dict[str, Any]:
269 |         """Get an incident by its number"""
270 |         result = await self.request("GET", f"/api/now/table/incident", 
271 |                                   params={"sysparm_query": f"number={number}", "sysparm_limit": 1})
272 |         if result.get("result") and len(result["result"]) > 0:
273 |             return result["result"][0]
274 |         return None
275 |         
276 |     async def search(self, query: str, table: str = "incident", limit: int = 10) -> Dict[str, Any]:
277 |         """Search for records using text query"""
278 |         return await self.request("GET", f"/api/now/table/{table}", 
279 |                                 params={"sysparm_query": f"123TEXTQUERY321={query}", "sysparm_limit": limit})
280 |                                 
281 |     async def get_available_tables(self) -> List[str]:
282 |         """Get a list of available tables"""
283 |         result = await self.request("GET", "/api/now/table/sys_db_object", 
284 |                                   params={"sysparm_fields": "name,label", "sysparm_limit": 100})
285 |         return result.get("result", [])
286 |         
287 |     async def get_table_schema(self, table: str) -> Dict[str, Any]:
288 |         """Get the schema for a table"""
289 |         result = await self.request("GET", f"/api/now/ui/meta/{table}")
290 |         return result
291 | 
292 | 
293 | class ServiceNowMCP:
294 |     """ServiceNow MCP Server"""
295 |     
296 |     def __init__(self, 
297 |                 instance_url: str,
298 |                 auth: Authentication,
299 |                 name: str = "ServiceNow MCP"):
300 |         self.client = ServiceNowClient(instance_url, auth)
301 |         self.mcp = FastMCP(name, dependencies=[
302 |             "requests",
303 |             "httpx", 
304 |             "pydantic"
305 |         ])
306 |         
307 |         # Register resources
308 |         self.mcp.resource("servicenow://incidents")(self.list_incidents)
309 |         self.mcp.resource("servicenow://incidents/{number}")(self.get_incident)
310 |         self.mcp.resource("servicenow://users")(self.list_users)
311 |         self.mcp.resource("servicenow://knowledge")(self.list_knowledge)
312 |         self.mcp.resource("servicenow://tables")(self.get_tables)
313 |         self.mcp.resource("servicenow://tables/{table}")(self.get_table_records)
314 |         self.mcp.resource("servicenow://schema/{table}")(self.get_table_schema)
315 |         
316 |         # Register tools
317 |         self.mcp.tool(name="create_incident")(self.create_incident)
318 |         self.mcp.tool(name="update_incident")(self.update_incident)
319 |         self.mcp.tool(name="search_records")(self.search_records)
320 |         self.mcp.tool(name="get_record")(self.get_record)
321 |         self.mcp.tool(name="perform_query")(self.perform_query)
322 |         self.mcp.tool(name="add_comment")(self.add_comment)
323 |         self.mcp.tool(name="add_work_notes")(self.add_work_notes)
324 |         
325 |         # Register prompts
326 |         self.mcp.prompt(name="analyze_incident")(self.incident_analysis_prompt)
327 |         self.mcp.prompt(name="create_incident_prompt")(self.create_incident_prompt)
328 |     
329 |     async def close(self):
330 |         """Close the ServiceNow client"""
331 |         await self.client.close()
332 |         
333 |     def run(self, transport: str = "stdio"):
334 |         """Run the ServiceNow MCP server"""
335 |         try:
336 |             self.mcp.run(transport=transport)
337 |         finally:
338 |             asyncio.run(self.close())
339 |         
340 |     # Resource handlers
341 |     async def list_incidents(self) -> str:
342 |         """List recent incidents in ServiceNow"""
343 |         options = QueryOptions(limit=10)
344 |         result = await self.client.get_records("incident", options)
345 |         return json.dumps(result, indent=2)
346 |         
347 |     async def get_incident(self, number: str) -> str:
348 |         """Get a specific incident by number"""
349 |         incident = await self.client.get_incident_by_number(number)
350 |         if incident:
351 |             return json.dumps({"result": incident}, indent=2)
352 |         return json.dumps({"result": "Incident not found"})
353 |         
354 |     async def list_users(self) -> str:
355 |         """List users in ServiceNow"""
356 |         options = QueryOptions(limit=10)
357 |         result = await self.client.get_records("sys_user", options)
358 |         return json.dumps(result, indent=2)
359 |         
360 |     async def list_knowledge(self) -> str:
361 |         """List knowledge articles in ServiceNow"""
362 |         options = QueryOptions(limit=10)
363 |         result = await self.client.get_records("kb_knowledge", options)
364 |         return json.dumps(result, indent=2)
365 |         
366 |     async def get_tables(self) -> str:
367 |         """Get a list of available tables"""
368 |         result = await self.client.get_available_tables()
369 |         return json.dumps({"result": result}, indent=2)
370 |         
371 |     async def get_table_records(self, table: str) -> str:
372 |         """Get records from a specific table"""
373 |         options = QueryOptions(limit=10)
374 |         result = await self.client.get_records(table, options)
375 |         return json.dumps(result, indent=2)
376 |         
377 |     async def get_table_schema(self, table: str) -> str:
378 |         """Get the schema for a table"""
379 |         result = await self.client.get_table_schema(table)
380 |         return json.dumps(result, indent=2)
381 |     
382 |     # Tool handlers
383 |     async def create_incident(self, 
384 |                      incident: IncidentCreate,
385 |                      ctx: Context = None) -> str:
386 |         """
387 |         Create a new incident in ServiceNow
388 |         
389 |         Args:
390 |             incident: The incident details to create
391 |             ctx: Optional context object for progress reporting
392 |         
393 |         Returns:
394 |             JSON response from ServiceNow
395 |         """
396 |         if ctx:
397 |             await ctx.info(f"Creating incident: {incident.short_description}")
398 |             
399 |         data = incident.dict(exclude_none=True)
400 |         result = await self.client.create_record("incident", data)
401 |         
402 |         if ctx:
403 |             await ctx.info(f"Created incident: {result['result']['number']}")
404 |             
405 |         return json.dumps(result, indent=2)
406 |         
407 |     async def update_incident(self,
408 |                      number: str,
409 |                      updates: IncidentUpdate,
410 |                      ctx: Context = None) -> str:
411 |         """
412 |         Update an existing incident in ServiceNow
413 |         
414 |         Args:
415 |             number: The incident number (INC0010001)
416 |             updates: The fields to update
417 |             ctx: Optional context object for progress reporting
418 |             
419 |         Returns:
420 |             JSON response from ServiceNow
421 |         """
422 |         # First, get the sys_id for the incident number
423 |         if ctx:
424 |             await ctx.info(f"Looking up incident: {number}")
425 |             
426 |         incident = await self.client.get_incident_by_number(number)
427 |         
428 |         if not incident:
429 |             error_message = f"Incident {number} not found"
430 |             if ctx:
431 |                 await ctx.error(error_message)
432 |             return json.dumps({"error": error_message})
433 |             
434 |         sys_id = incident['sys_id']
435 |         
436 |         # Now update the incident
437 |         if ctx:
438 |             await ctx.info(f"Updating incident: {number}")
439 |             
440 |         data = updates.dict(exclude_none=True)
441 |         result = await self.client.update_record("incident", sys_id, data)
442 |         
443 |         return json.dumps(result, indent=2)
444 |         
445 |     async def search_records(self, 
446 |                     query: str, 
447 |                     table: str = "incident",
448 |                     limit: int = 10,
449 |                     ctx: Context = None) -> str:
450 |         """
451 |         Search for records in ServiceNow using text query
452 |         
453 |         Args:
454 |             query: Text to search for
455 |             table: Table to search in
456 |             limit: Maximum number of results to return
457 |             ctx: Optional context object for progress reporting
458 |             
459 |         Returns:
460 |             JSON response containing matching records
461 |         """
462 |         if ctx:
463 |             await ctx.info(f"Searching {table} for: {query}")
464 |             
465 |         result = await self.client.search(query, table, limit)
466 |         return json.dumps(result, indent=2)
467 |         
468 |     async def get_record(self,
469 |                 table: str,
470 |                 sys_id: str,
471 |                 ctx: Context = None) -> str:
472 |         """
473 |         Get a specific record by sys_id
474 |         
475 |         Args:
476 |             table: Table to query
477 |             sys_id: System ID of the record
478 |             ctx: Optional context object for progress reporting
479 |             
480 |         Returns:
481 |             JSON response containing the record
482 |         """
483 |         if ctx:
484 |             await ctx.info(f"Getting {table} record: {sys_id}")
485 |             
486 |         result = await self.client.get_record(table, sys_id)
487 |         return json.dumps(result, indent=2)
488 |         
489 |     async def perform_query(self,
490 |                    table: str,
491 |                    query: str = "",
492 |                    limit: int = 10,
493 |                    offset: int = 0,
494 |                    fields: Optional[List[str]] = None,
495 |                    ctx: Context = None) -> str:
496 |         """
497 |         Perform a query against ServiceNow
498 |         
499 |         Args:
500 |             table: Table to query
501 |             query: Encoded query string (ServiceNow syntax)
502 |             limit: Maximum number of results to return
503 |             offset: Number of records to skip
504 |             fields: List of fields to return (or all fields if None)
505 |             ctx: Optional context object for progress reporting
506 |             
507 |         Returns:
508 |             JSON response containing query results
509 |         """
510 |         if ctx:
511 |             await ctx.info(f"Querying {table} with: {query}")
512 |             
513 |         options = QueryOptions(
514 |             limit=limit,
515 |             offset=offset,
516 |             fields=fields,
517 |             query=query
518 |         )
519 |         
520 |         result = await self.client.get_records(table, options)
521 |         return json.dumps(result, indent=2)
522 |         
523 |     async def add_comment(self,
524 |                  number: str,
525 |                  comment: str,
526 |                  ctx: Context = None) -> str:
527 |         """
528 |         Add a comment to an incident (customer visible)
529 |         
530 |         Args:
531 |             number: Incident number
532 |             comment: Comment to add
533 |             ctx: Optional context object for progress reporting
534 |             
535 |         Returns:
536 |             JSON response from ServiceNow
537 |         """
538 |         if ctx:
539 |             await ctx.info(f"Adding comment to incident: {number}")
540 |             
541 |         incident = await self.client.get_incident_by_number(number)
542 |         
543 |         if not incident:
544 |             error_message = f"Incident {number} not found"
545 |             if ctx:
546 |                 await ctx.error(error_message)
547 |             return json.dumps({"error": error_message})
548 |             
549 |         sys_id = incident['sys_id']
550 |         
551 |         # Add the comment
552 |         update = {"comments": comment}
553 |         result = await self.client.update_record("incident", sys_id, update)
554 |         
555 |         return json.dumps(result, indent=2)
556 |         
557 |     async def add_work_notes(self,
558 |                     number: str,
559 |                     work_notes: str,
560 |                     ctx: Context = None) -> str:
561 |         """
562 |         Add work notes to an incident (internal)
563 |         
564 |         Args:
565 |             number: Incident number
566 |             work_notes: Work notes to add
567 |             ctx: Optional context object for progress reporting
568 |             
569 |         Returns:
570 |             JSON response from ServiceNow
571 |         """
572 |         if ctx:
573 |             await ctx.info(f"Adding work notes to incident: {number}")
574 |             
575 |         incident = await self.client.get_incident_by_number(number)
576 |         
577 |         if not incident:
578 |             error_message = f"Incident {number} not found"
579 |             if ctx:
580 |                 await ctx.error(error_message)
581 |             return json.dumps({"error": error_message})
582 |             
583 |         sys_id = incident['sys_id']
584 |         
585 |         # Add the work notes
586 |         update = {"work_notes": work_notes}
587 |         result = await self.client.update_record("incident", sys_id, update)
588 |         
589 |         return json.dumps(result, indent=2)
590 |     
591 |     # Prompt templates
592 |     def incident_analysis_prompt(self, incident_number: str) -> str:
593 |         """Create a prompt to analyze a ServiceNow incident
594 |         
595 |         Args:
596 |             incident_number: The incident number to analyze (e.g., INC0010001)
597 |             
598 |         Returns:
599 |             Prompt text for analyzing the incident
600 |         """
601 |         return f"""
602 |         Please analyze the following ServiceNow incident {incident_number}.
603 |         
604 |         First, call the appropriate tool to fetch the incident details using get_incident.
605 |         
606 |         Then, provide a comprehensive analysis with the following sections:
607 |         
608 |         1. Summary: A brief overview of the incident
609 |         2. Impact Assessment: Analysis of the impact based on the severity, priority, and affected users
610 |         3. Root Cause Analysis: Potential causes based on available information
611 |         4. Resolution Recommendations: Suggested next steps to resolve the incident
612 |         5. SLA Status: Whether the incident is at risk of breaching SLAs
613 |         
614 |         Use a professional and clear tone appropriate for IT service management.
615 |         """
616 |         
617 |     def create_incident_prompt(self) -> str:
618 |         """Create a prompt for incident creation guidance
619 |         
620 |         Returns:
621 |             Prompt text for helping users create an incident
622 |         """
623 |         return """
624 |         I'll help you create a new ServiceNow incident. Please provide the following information:
625 |         
626 |         1. Short Description: A brief title for the incident (required)
627 |         2. Detailed Description: A thorough explanation of the issue (required)
628 |         3. Caller: The person reporting the issue (optional)
629 |         4. Category and Subcategory: The type of issue (optional)
630 |         5. Impact (1-High, 2-Medium, 3-Low): How broadly this affects users (optional)
631 |         6. Urgency (1-High, 2-Medium, 3-Low): How time-sensitive this issue is (optional)
632 |         
633 |         After collecting this information, I'll use the create_incident tool to submit the incident to ServiceNow.
634 |         """
635 | 
636 | 
637 | # Factory functions for creating authentication objects
638 | def create_basic_auth(username: str, password: str) -> BasicAuth:
639 |     """Create BasicAuth object for ServiceNow authentication"""
640 |     return BasicAuth(username, password)
641 | 
642 | def create_token_auth(token: str) -> TokenAuth:
643 |     """Create TokenAuth object for ServiceNow authentication"""
644 |     return TokenAuth(token)
645 | 
646 | def create_oauth_auth(client_id: str, client_secret: str, 
647 |                      username: str, password: str,
648 |                      instance_url: str) -> OAuthAuth:
649 |     """Create OAuthAuth object for ServiceNow authentication"""
650 |     return OAuthAuth(client_id, client_secret, username, password, instance_url)
651 | 
652 | # Main function for running the server from the command line
653 | def main():
654 |     """Run the ServiceNow MCP server from the command line"""
655 |     import argparse
656 |     import sys
657 |     
658 |     parser = argparse.ArgumentParser(description="ServiceNow MCP Server")
659 |     parser.add_argument("--url", help="ServiceNow instance URL", default=os.environ.get("SERVICENOW_INSTANCE_URL"))
660 |     parser.add_argument("--transport", help="Transport protocol (stdio or sse)", default="stdio", choices=["stdio", "sse"])
661 |     
662 |     # Authentication options
663 |     auth_group = parser.add_argument_group("Authentication")
664 |     auth_group.add_argument("--username", help="ServiceNow username", default=os.environ.get("SERVICENOW_USERNAME"))
665 |     auth_group.add_argument("--password", help="ServiceNow password", default=os.environ.get("SERVICENOW_PASSWORD"))
666 |     auth_group.add_argument("--token", help="ServiceNow token", default=os.environ.get("SERVICENOW_TOKEN"))
667 |     auth_group.add_argument("--client-id", help="OAuth client ID", default=os.environ.get("SERVICENOW_CLIENT_ID"))
668 |     auth_group.add_argument("--client-secret", help="OAuth client secret", default=os.environ.get("SERVICENOW_CLIENT_SECRET"))
669 |     
670 |     args = parser.parse_args()
671 |     
672 |     # Check required parameters
673 |     if not args.url:
674 |         print("Error: ServiceNow instance URL is required")
675 |         print("Set SERVICENOW_INSTANCE_URL environment variable or use --url")
676 |         sys.exit(1)
677 |     
678 |     # Determine authentication method
679 |     auth = None
680 |     if args.token:
681 |         auth = create_token_auth(args.token)
682 |     elif args.client_id and args.client_secret and args.username and args.password:
683 |         auth = create_oauth_auth(args.client_id, args.client_secret, args.username, args.password, args.url)
684 |     elif args.username and args.password:
685 |         auth = create_basic_auth(args.username, args.password)
686 |     else:
687 |         print("Error: Authentication credentials required")
688 |         print("Either provide username/password, token, or OAuth credentials")
689 |         sys.exit(1)
690 |     
691 |     # Create and run the server
692 |     server = ServiceNowMCP(instance_url=args.url, auth=auth)
693 |     server.run(transport=args.transport)
694 | 
695 | # Entry point
696 | if __name__ == "__main__":
697 |     main()
698 | 
```

--------------------------------------------------------------------------------
/mcp_server_servicenow/server.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | ServiceNow MCP Server
  3 | 
  4 | This module provides a Model Context Protocol (MCP) server that interfaces with ServiceNow.
  5 | It allows AI agents to access and manipulate ServiceNow data through a secure API.
  6 | """
  7 | 
  8 | import os
  9 | import json
 10 | import asyncio
 11 | import logging
 12 | import re
 13 | from datetime import datetime
 14 | from enum import Enum
 15 | from typing import Dict, List, Optional, Any, Union, Literal, Tuple
 16 | 
 17 | import requests
 18 | import httpx
 19 | from pydantic import BaseModel, Field, field_validator
 20 | 
 21 | from mcp_server_servicenow.nlp import NLPProcessor
 22 | 
 23 | from mcp.server.fastmcp import FastMCP, Context
 24 | from mcp.server.fastmcp.utilities.logging import get_logger
 25 | 
 26 | logger = get_logger(__name__)
 27 | 
 28 | # ServiceNow API models
 29 | class IncidentState(int, Enum):
 30 |     NEW = 1
 31 |     IN_PROGRESS = 2 
 32 |     ON_HOLD = 3
 33 |     RESOLVED = 6
 34 |     CLOSED = 7
 35 |     CANCELED = 8
 36 | 
 37 | class IncidentPriority(int, Enum):
 38 |     CRITICAL = 1
 39 |     HIGH = 2
 40 |     MODERATE = 3
 41 |     LOW = 4
 42 |     PLANNING = 5
 43 | 
 44 | class IncidentUrgency(int, Enum):
 45 |     HIGH = 1
 46 |     MEDIUM = 2
 47 |     LOW = 3
 48 | 
 49 | class IncidentImpact(int, Enum):
 50 |     HIGH = 1
 51 |     MEDIUM = 2
 52 |     LOW = 3
 53 | 
 54 | class IncidentCreate(BaseModel):
 55 |     """Model for creating a new incident"""
 56 |     short_description: str = Field(..., description="A brief description of the incident")
 57 |     description: str = Field(..., description="A detailed description of the incident")
 58 |     caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
 59 |     category: Optional[str] = Field(None, description="The incident category")
 60 |     subcategory: Optional[str] = Field(None, description="The incident subcategory")
 61 |     urgency: Optional[IncidentUrgency] = Field(IncidentUrgency.MEDIUM, description="The urgency of the incident")
 62 |     impact: Optional[IncidentImpact] = Field(IncidentImpact.MEDIUM, description="The impact of the incident")
 63 |     assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
 64 |     assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
 65 | 
 66 | class IncidentUpdate(BaseModel):
 67 |     """Model for updating an existing incident"""
 68 |     short_description: Optional[str] = Field(None, description="A brief description of the incident")
 69 |     description: Optional[str] = Field(None, description="A detailed description of the incident")
 70 |     caller_id: Optional[str] = Field(None, description="The sys_id or name of the caller")
 71 |     category: Optional[str] = Field(None, description="The incident category")
 72 |     subcategory: Optional[str] = Field(None, description="The incident subcategory")
 73 |     urgency: Optional[IncidentUrgency] = Field(None, description="The urgency of the incident")
 74 |     impact: Optional[IncidentImpact] = Field(None, description="The impact of the incident")
 75 |     state: Optional[IncidentState] = Field(None, description="The state of the incident")
 76 |     assignment_group: Optional[str] = Field(None, description="The sys_id or name of the assignment group")
 77 |     assigned_to: Optional[str] = Field(None, description="The sys_id or name of the assignee")
 78 |     work_notes: Optional[str] = Field(None, description="Work notes to add to the incident (internal)")
 79 |     comments: Optional[str] = Field(None, description="Customer visible comments to add to the incident")
 80 |     
 81 |     @field_validator('work_notes', 'comments')
 82 |     @classmethod
 83 |     def validate_not_empty(cls, v):
 84 |         if v is not None and v.strip() == '':
 85 |             raise ValueError("Cannot be an empty string")
 86 |         return v
 87 | 
 88 |     class Config:
 89 |         use_enum_values = True
 90 |         
 91 | class QueryOptions(BaseModel):
 92 |     """Options for querying ServiceNow records"""
 93 |     limit: int = Field(10, description="Maximum number of records to return", ge=1, le=1000)
 94 |     offset: int = Field(0, description="Number of records to skip", ge=0)
 95 |     fields: Optional[List[str]] = Field(None, description="List of fields to return")
 96 |     query: Optional[str] = Field(None, description="ServiceNow encoded query string")
 97 |     order_by: Optional[str] = Field(None, description="Field to order results by")
 98 |     order_direction: Optional[Literal["asc", "desc"]] = Field("desc", description="Order direction")
 99 | 
100 | class Authentication:
101 |     """Base class for ServiceNow authentication methods"""
102 |     
103 |     async def get_headers(self) -> Dict[str, str]:
104 |         """Get authentication headers for ServiceNow API requests"""
105 |         raise NotImplementedError("Subclasses must implement this method")
106 | 
107 | class BasicAuth(Authentication):
108 |     """Basic authentication for ServiceNow"""
109 |     
110 |     def __init__(self, username: str, password: str):
111 |         self.username = username
112 |         self.password = password
113 |         
114 |     async def get_headers(self) -> Dict[str, str]:
115 |         """Get authentication headers for ServiceNow API requests"""
116 |         return {}
117 |     
118 |     def get_auth(self) -> tuple:
119 |         """Get authentication tuple for requests"""
120 |         return (self.username, self.password)
121 | 
122 | class TokenAuth(Authentication):
123 |     """Token authentication for ServiceNow"""
124 |     
125 |     def __init__(self, token: str):
126 |         self.token = token
127 |         
128 |     async def get_headers(self) -> Dict[str, str]:
129 |         """Get authentication headers for ServiceNow API requests"""
130 |         return {"Authorization": f"Bearer {self.token}"}
131 |     
132 |     def get_auth(self) -> None:
133 |         """Get authentication tuple for requests"""
134 |         return None
135 | 
136 | class OAuthAuth(Authentication):
137 |     """OAuth authentication for ServiceNow"""
138 |     
139 |     def __init__(self, client_id: str, client_secret: str, username: str, password: str, 
140 |                  instance_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None,
141 |                  token_expiry: Optional[datetime] = None):
142 |         self.client_id = client_id
143 |         self.client_secret = client_secret
144 |         self.username = username
145 |         self.password = password
146 |         self.instance_url = instance_url
147 |         self.token = token
148 |         self.refresh_token = refresh_token
149 |         self.token_expiry = token_expiry
150 |         
151 |     async def get_headers(self) -> Dict[str, str]:
152 |         """Get authentication headers for ServiceNow API requests"""
153 |         if self.token is None or (self.token_expiry and datetime.now() > self.token_expiry):
154 |             await self.refresh()
155 |             
156 |         return {"Authorization": f"Bearer {self.token}"}
157 |     
158 |     def get_auth(self) -> None:
159 |         """Get authentication tuple for requests"""
160 |         return None
161 |         
162 |     async def refresh(self):
163 |         """Refresh the OAuth token"""
164 |         if self.refresh_token:
165 |             # Try refresh flow first
166 |             data = {
167 |                 "grant_type": "refresh_token",
168 |                 "client_id": self.client_id,
169 |                 "client_secret": self.client_secret,
170 |                 "refresh_token": self.refresh_token
171 |             }
172 |         else:
173 |             # Fall back to password flow
174 |             data = {
175 |                 "grant_type": "password",
176 |                 "client_id": self.client_id,
177 |                 "client_secret": self.client_secret,
178 |                 "username": self.username,
179 |                 "password": self.password
180 |             }
181 |             
182 |         token_url = f"{self.instance_url}/oauth_token.do"
183 |         async with httpx.AsyncClient() as client:
184 |             response = await client.post(token_url, data=data)
185 |             response.raise_for_status()
186 |             result = response.json()
187 |             
188 |             self.token = result["access_token"]
189 |             self.refresh_token = result.get("refresh_token")
190 |             expires_in = result.get("expires_in", 1800)  # Default 30 minutes
191 |             self.token_expiry = datetime.now().timestamp() + expires_in
192 | 
193 | class ServiceNowClient:
194 |     """Client for interacting with ServiceNow API"""
195 |     
196 |     def __init__(self, instance_url: str, auth: Authentication):
197 |         self.instance_url = instance_url.rstrip('/')
198 |         self.auth = auth
199 |         self.client = httpx.AsyncClient()
200 |         
201 |     async def close(self):
202 |         """Close the HTTP client"""
203 |         await self.client.aclose()
204 |         
205 |     async def request(self, method: str, path: str, 
206 |                     params: Optional[Dict[str, Any]] = None,
207 |                     json_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
208 |         """Make a request to the ServiceNow API"""
209 |         url = f"{self.instance_url}{path}"
210 |         headers = await self.auth.get_headers()
211 |         headers["Accept"] = "application/json"
212 |         
213 |         if isinstance(self.auth, BasicAuth):
214 |             auth = self.auth.get_auth()
215 |         else:
216 |             auth = None
217 |             
218 |         try:
219 |             response = await self.client.request(
220 |                 method=method,
221 |                 url=url,
222 |                 params=params,
223 |                 json=json_data,
224 |                 headers=headers,
225 |                 auth=auth
226 |             )
227 |             response.raise_for_status()
228 |             return response.json()
229 |         except httpx.HTTPStatusError as e:
230 |             logger.error(f"ServiceNow API error: {e.response.text}")
231 |             raise
232 |             
233 |     async def get_record(self, table: str, sys_id: str) -> Dict[str, Any]:
234 |         """Get a record by sys_id"""
235 |         if table == "incident" and sys_id.startswith("INC"):
236 |             # This is an incident number, not a sys_id
237 |             logger.warning(f"Attempted to use get_record with incident number instead of sys_id: {sys_id}")
238 |             logger.warning("Redirecting to get_incident_by_number method")
239 |             result = await self.get_incident_by_number(sys_id)
240 |             if result:
241 |                 return {"result": result}
242 |             else:
243 |                 raise ValueError(f"Incident not found: {sys_id}")
244 |         return await self.request("GET", f"/api/now/table/{table}/{sys_id}")
245 |         
246 |     async def get_records(self, table: str, options: QueryOptions = None) -> Dict[str, Any]:
247 |         """Get records with query options"""
248 |         if options is None:
249 |             options = QueryOptions()
250 |             
251 |         params = {
252 |             "sysparm_limit": options.limit,
253 |             "sysparm_offset": options.offset
254 |         }
255 |         
256 |         if options.fields:
257 |             params["sysparm_fields"] = ",".join(options.fields)
258 |             
259 |         if options.query:
260 |             params["sysparm_query"] = options.query
261 |             
262 |         if options.order_by:
263 |             direction = "desc" if options.order_direction == "desc" else "asc"
264 |             params["sysparm_order_by"] = f"{options.order_by}^{direction}"
265 |             
266 |         return await self.request("GET", f"/api/now/table/{table}", params=params)
267 |     
268 |     async def create_record(self, table: str, data: Dict[str, Any]) -> Dict[str, Any]:
269 |         """Create a new record"""
270 |         return await self.request("POST", f"/api/now/table/{table}", json_data=data)
271 |         
272 |     async def update_record(self, table: str, sys_id: str, data: Dict[str, Any]) -> Dict[str, Any]:
273 |         """Update an existing record"""
274 |         return await self.request("PUT", f"/api/now/table/{table}/{sys_id}", json_data=data)
275 |         
276 |     async def delete_record(self, table: str, sys_id: str) -> Dict[str, Any]:
277 |         """Delete a record"""
278 |         return await self.request("DELETE", f"/api/now/table/{table}/{sys_id}")
279 |         
280 |     async def get_incident_by_number(self, number: str) -> Dict[str, Any]:
281 |         """Get an incident by its number"""
282 |         result = await self.request("GET", f"/api/now/table/incident", 
283 |                                   params={"sysparm_query": f"number={number}", "sysparm_limit": 1})
284 |         if result.get("result") and len(result["result"]) > 0:
285 |             return result["result"][0]
286 |         return None
287 |         
288 |     async def search(self, query: str, table: str = "incident", limit: int = 10) -> Dict[str, Any]:
289 |         """Search for records using text query"""
290 |         return await self.request("GET", f"/api/now/table/{table}", 
291 |                                 params={"sysparm_query": f"123TEXTQUERY321={query}", "sysparm_limit": limit})
292 |                                 
293 |     async def get_available_tables(self) -> List[str]:
294 |         """Get a list of available tables"""
295 |         result = await self.request("GET", "/api/now/table/sys_db_object", 
296 |                                   params={"sysparm_fields": "name,label", "sysparm_limit": 100})
297 |         return result.get("result", [])
298 |         
299 |     async def get_table_schema(self, table: str) -> Dict[str, Any]:
300 |         """Get the schema for a table"""
301 |         result = await self.request("GET", f"/api/now/ui/meta/{table}")
302 |         return result
303 | 
304 | 
305 | class ScriptUpdateModel(BaseModel):
306 |     """Model for updating a ServiceNow script"""
307 |     name: str = Field(..., description="The name of the script")
308 |     script: str = Field(..., description="The script content")
309 |     type: str = Field(..., description="The type of script (e.g., sys_script_include)")
310 |     description: Optional[str] = Field(None, description="Description of the script")
311 | 
312 | class ServiceNowMCP:
313 |     """ServiceNow MCP Server"""
314 |     
315 |     def __init__(self, 
316 |                 instance_url: str,
317 |                 auth: Authentication,
318 |                 name: str = "ServiceNow MCP"):
319 |         self.client = ServiceNowClient(instance_url, auth)
320 |         self.mcp = FastMCP(name, dependencies=[
321 |             "requests",
322 |             "httpx", 
323 |             "pydantic"
324 |         ])
325 |         
326 |         # Register resources
327 |         self.mcp.resource("servicenow://incidents")(self.list_incidents)
328 |         self.mcp.resource("servicenow://incidents/{number}")(self.get_incident)
329 |         self.mcp.resource("servicenow://users")(self.list_users)
330 |         self.mcp.resource("servicenow://knowledge")(self.list_knowledge)
331 |         self.mcp.resource("servicenow://tables")(self.get_tables)
332 |         self.mcp.resource("servicenow://tables/{table}")(self.get_table_records)
333 |         self.mcp.resource("servicenow://schema/{table}")(self.get_table_schema)
334 |         
335 |         # Register tools
336 |         self.mcp.tool(name="create_incident")(self.create_incident)
337 |         self.mcp.tool(name="update_incident")(self.update_incident)
338 |         self.mcp.tool(name="search_records")(self.search_records)
339 |         self.mcp.tool(name="get_record")(self.get_record)
340 |         self.mcp.tool(name="perform_query")(self.perform_query)
341 |         self.mcp.tool(name="add_comment")(self.add_comment)
342 |         self.mcp.tool(name="add_work_notes")(self.add_work_notes)
343 |         
344 |         # Register natural language tools
345 |         self.mcp.tool(name="natural_language_search")(self.natural_language_search)
346 |         self.mcp.tool(name="natural_language_update")(self.natural_language_update)
347 |         self.mcp.tool(name="update_script")(self.update_script)
348 |         
349 |         # Register prompts
350 |         self.mcp.prompt(name="analyze_incident")(self.incident_analysis_prompt)
351 |         self.mcp.prompt(name="create_incident_prompt")(self.create_incident_prompt)
352 |     
353 |     async def close(self):
354 |         """Close the ServiceNow client"""
355 |         await self.client.close()
356 |         
357 |     def run(self, transport: str = "stdio"):
358 |         """Run the ServiceNow MCP server"""
359 |         try:
360 |             self.mcp.run(transport=transport)
361 |         finally:
362 |             asyncio.run(self.close())
363 |         
364 |     # Resource handlers
365 |     async def list_incidents(self) -> str:
366 |         """List recent incidents in ServiceNow"""
367 |         options = QueryOptions(limit=10)
368 |         result = await self.client.get_records("incident", options)
369 |         return json.dumps(result, indent=2)
370 |         
371 |     async def get_incident(self, number: str) -> str:
372 |         """Get a specific incident by number"""
373 |         try:
374 |             # Always use get_incident_by_number to query by incident number, not get_record
375 |             incident = await self.client.get_incident_by_number(number)
376 |             if incident:
377 |                 return json.dumps({"result": incident}, indent=2)
378 |             else:
379 |                 logger.error(f"No incident found with number: {number}")
380 |                 return json.dumps({"error":{"message":"No Record found","detail":"Record doesn't exist or ACL restricts the record retrieval"},"status":"failure"})
381 |         except Exception as e:
382 |             logger.error(f"Error getting incident {number}: {str(e)}")
383 |             return json.dumps({"error":{"message":str(e),"detail":"Error occurred while retrieving the record"},"status":"failure"})
384 |         
385 |     async def list_users(self) -> str:
386 |         """List users in ServiceNow"""
387 |         options = QueryOptions(limit=10)
388 |         result = await self.client.get_records("sys_user", options)
389 |         return json.dumps(result, indent=2)
390 |         
391 |     async def list_knowledge(self) -> str:
392 |         """List knowledge articles in ServiceNow"""
393 |         options = QueryOptions(limit=10)
394 |         result = await self.client.get_records("kb_knowledge", options)
395 |         return json.dumps(result, indent=2)
396 |         
397 |     async def get_tables(self) -> str:
398 |         """Get a list of available tables"""
399 |         result = await self.client.get_available_tables()
400 |         return json.dumps({"result": result}, indent=2)
401 |         
402 |     async def get_table_records(self, table: str) -> str:
403 |         """Get records from a specific table"""
404 |         options = QueryOptions(limit=10)
405 |         result = await self.client.get_records(table, options)
406 |         return json.dumps(result, indent=2)
407 |         
408 |     async def get_table_schema(self, table: str) -> str:
409 |         """Get the schema for a table"""
410 |         result = await self.client.get_table_schema(table)
411 |         return json.dumps(result, indent=2)
412 |     
413 |     # Tool handlers
414 |     async def create_incident(self, 
415 |                      incident,
416 |                      ctx: Context = None) -> str:
417 |         """
418 |         Create a new incident in ServiceNow
419 |         
420 |         Args:
421 |             incident: The incident details to create - can be either an IncidentCreate object,
422 |                       a dictionary containing incident fields, or a string with the description
423 |             ctx: Optional context object for progress reporting
424 |         
425 |         Returns:
426 |             JSON response from ServiceNow
427 |         """
428 |         # Handle different input types
429 |         if isinstance(incident, str):
430 |             # If a string was provided, treat it as the description and generate a short description
431 |             short_desc = incident[:50] + ('...' if len(incident) > 50 else '')
432 |             incident_data = {
433 |                 "short_description": short_desc,
434 |                 "description": incident
435 |             }
436 |             logger.info(f"Creating incident from string description: {short_desc}")
437 |         elif isinstance(incident, dict):
438 |             # Dictionary provided
439 |             incident_data = incident
440 |             logger.info(f"Creating incident from dictionary: {incident.get('short_description', 'No short description')}")
441 |         elif isinstance(incident, IncidentCreate):
442 |             # IncidentCreate model provided
443 |             incident_data = incident.dict(exclude_none=True)
444 |             logger.info(f"Creating incident from IncidentCreate: {incident.short_description}")
445 |         else:
446 |             error_message = f"Invalid incident type: {type(incident)}. Expected IncidentCreate, dict, or str."
447 |             logger.error(error_message)
448 |             return json.dumps({"error": error_message})
449 | 
450 |         # Validate that required fields are present
451 |         if "short_description" not in incident_data and isinstance(incident, dict):
452 |             if "description" in incident_data:
453 |                 # Auto-generate short description from description
454 |                 desc = incident_data["description"]
455 |                 incident_data["short_description"] = desc[:50] + ('...' if len(desc) > 50 else '')
456 |             else:
457 |                 incident_data["short_description"] = "Incident created through API"
458 |         
459 |         if "description" not in incident_data and isinstance(incident, dict):
460 |             if "short_description" in incident_data:
461 |                 incident_data["description"] = incident_data["short_description"]
462 |             else:
463 |                 incident_data["description"] = "No description provided"
464 |     
465 |         # Log and create the incident
466 |         if ctx:
467 |             await ctx.info(f"Creating incident: {incident_data.get('short_description', 'No short description')}")
468 |         
469 |         try:
470 |             result = await self.client.create_record("incident", incident_data)
471 |             
472 |             if ctx:
473 |                 await ctx.info(f"Created incident: {result['result']['number']}")
474 |                 
475 |             return json.dumps(result, indent=2)
476 |         except Exception as e:
477 |             error_message = f"Error creating incident: {str(e)}"
478 |             logger.error(error_message)
479 |             if ctx:
480 |                 await ctx.error(error_message)
481 |             return json.dumps({"error": error_message})
482 |         
483 |     async def update_incident(self,
484 |                      number: str,
485 |                      updates: IncidentUpdate,
486 |                      ctx: Context = None) -> str:
487 |         """
488 |         Update an existing incident in ServiceNow
489 |         
490 |         Args:
491 |             number: The incident number (INC0010001)
492 |             updates: The fields to update
493 |             ctx: Optional context object for progress reporting
494 |             
495 |         Returns:
496 |             JSON response from ServiceNow
497 |         """
498 |         # First, get the sys_id for the incident number
499 |         if ctx:
500 |             await ctx.info(f"Looking up incident: {number}")
501 |             
502 |         incident = await self.client.get_incident_by_number(number)
503 |         
504 |         if not incident:
505 |             error_message = f"Incident {number} not found"
506 |             if ctx:
507 |                 await ctx.error(error_message)
508 |             return json.dumps({"error": error_message})
509 |             
510 |         sys_id = incident['sys_id']
511 |         
512 |         # Now update the incident
513 |         if ctx:
514 |             await ctx.info(f"Updating incident: {number}")
515 |             
516 |         data = updates.dict(exclude_none=True)
517 |         result = await self.client.update_record("incident", sys_id, data)
518 |         
519 |         return json.dumps(result, indent=2)
520 |         
521 |     async def search_records(self, 
522 |                     query: str, 
523 |                     table: str = "incident",
524 |                     limit: int = 10,
525 |                     ctx: Context = None) -> str:
526 |         """
527 |         Search for records in ServiceNow using text query
528 |         
529 |         Args:
530 |             query: Text to search for
531 |             table: Table to search in
532 |             limit: Maximum number of results to return
533 |             ctx: Optional context object for progress reporting
534 |             
535 |         Returns:
536 |             JSON response containing matching records
537 |         """
538 |         if ctx:
539 |             await ctx.info(f"Searching {table} for: {query}")
540 |             
541 |         result = await self.client.search(query, table, limit)
542 |         return json.dumps(result, indent=2)
543 |         
544 |     async def get_record(self,
545 |                 table: str,
546 |                 sys_id: str,
547 |                 ctx: Context = None) -> str:
548 |         """
549 |         Get a specific record by sys_id
550 |         
551 |         Args:
552 |             table: Table to query
553 |             sys_id: System ID of the record
554 |             ctx: Optional context object for progress reporting
555 |             
556 |         Returns:
557 |             JSON response containing the record
558 |         """
559 |         if ctx:
560 |             await ctx.info(f"Getting {table} record: {sys_id}")
561 |             
562 |         result = await self.client.get_record(table, sys_id)
563 |         return json.dumps(result, indent=2)
564 |         
565 |     async def perform_query(self,
566 |                    table: str,
567 |                    query: str = "",
568 |                    limit: int = 10,
569 |                    offset: int = 0,
570 |                    fields: Optional[List[str]] = None,
571 |                    ctx: Context = None) -> str:
572 |         """
573 |         Perform a query against ServiceNow
574 |         
575 |         Args:
576 |             table: Table to query
577 |             query: Encoded query string (ServiceNow syntax)
578 |             limit: Maximum number of results to return
579 |             offset: Number of records to skip
580 |             fields: List of fields to return (or all fields if None)
581 |             ctx: Optional context object for progress reporting
582 |             
583 |         Returns:
584 |             JSON response containing query results
585 |         """
586 |         if ctx:
587 |             await ctx.info(f"Querying {table} with: {query}")
588 |             
589 |         options = QueryOptions(
590 |             limit=limit,
591 |             offset=offset,
592 |             fields=fields,
593 |             query=query
594 |         )
595 |         
596 |         result = await self.client.get_records(table, options)
597 |         return json.dumps(result, indent=2)
598 |         
599 |     async def add_comment(self,
600 |                  number: str,
601 |                  comment: str,
602 |                  ctx: Context = None) -> str:
603 |         """
604 |         Add a comment to an incident (customer visible)
605 |         
606 |         Args:
607 |             number: Incident number
608 |             comment: Comment to add
609 |             ctx: Optional context object for progress reporting
610 |             
611 |         Returns:
612 |             JSON response from ServiceNow
613 |         """
614 |         if ctx:
615 |             await ctx.info(f"Adding comment to incident: {number}")
616 |             
617 |         incident = await self.client.get_incident_by_number(number)
618 |         
619 |         if not incident:
620 |             error_message = f"Incident {number} not found"
621 |             if ctx:
622 |                 await ctx.error(error_message)
623 |             return json.dumps({"error": error_message})
624 |             
625 |         sys_id = incident['sys_id']
626 |         
627 |         # Add the comment
628 |         update = {"comments": comment}
629 |         result = await self.client.update_record("incident", sys_id, update)
630 |         
631 |         return json.dumps(result, indent=2)
632 |         
633 |     async def add_work_notes(self,
634 |                     number: str,
635 |                     work_notes: str,
636 |                     ctx: Context = None) -> str:
637 |         """
638 |         Add work notes to an incident (internal)
639 |         
640 |         Args:
641 |             number: Incident number
642 |             work_notes: Work notes to add
643 |             ctx: Optional context object for progress reporting
644 |             
645 |         Returns:
646 |             JSON response from ServiceNow
647 |         """
648 |         if ctx:
649 |             await ctx.info(f"Adding work notes to incident: {number}")
650 |             
651 |         incident = await self.client.get_incident_by_number(number)
652 |         
653 |         if not incident:
654 |             error_message = f"Incident {number} not found"
655 |             if ctx:
656 |                 await ctx.error(error_message)
657 |             return json.dumps({"error": error_message})
658 |             
659 |         sys_id = incident['sys_id']
660 |         
661 |         # Add the work notes
662 |         update = {"work_notes": work_notes}
663 |         result = await self.client.update_record("incident", sys_id, update)
664 |         
665 |         return json.dumps(result, indent=2)
666 |     
667 |     # Natural language tools
668 |     async def natural_language_search(self,
669 |                              query: str,
670 |                              ctx: Context = None) -> str:
671 |         """
672 |         Search for records using natural language
673 |         
674 |         Examples:
675 |         - "find all incidents about SAP"
676 |         - "search for incidents related to email"
677 |         - "show me all incidents with high priority"
678 |         
679 |         Args:
680 |             query: Natural language query
681 |             ctx: Optional context object for progress reporting
682 |             
683 |         Returns:
684 |             JSON response containing matching records
685 |         """
686 |         if ctx:
687 |             await ctx.info(f"Processing natural language query: {query}")
688 |             
689 |         # Parse the query
690 |         search_params = NLPProcessor.parse_search_query(query)
691 |         
692 |         if ctx:
693 |             await ctx.info(f"Searching {search_params['table']} with query: {search_params['query']}")
694 |         
695 |         # Perform the search
696 |         options = QueryOptions(
697 |             limit=search_params['limit'],
698 |             query=search_params['query']
699 |         )
700 |         
701 |         result = await self.client.get_records(search_params['table'], options)
702 |         return json.dumps(result, indent=2)
703 |     
704 |     async def natural_language_update(self,
705 |                               command: str,
706 |                               ctx: Context = None) -> str:
707 |         """
708 |         Update a record using natural language
709 |         
710 |         Examples:
711 |         - "Update incident INC0010001 saying I'm working on it"
712 |         - "Set incident INC0010002 to in progress"
713 |         - "Close incident INC0010003 with resolution: fixed the issue"
714 |         
715 |         Args:
716 |             command: Natural language update command
717 |             ctx: Optional context object for progress reporting
718 |             
719 |         Returns:
720 |             JSON response from ServiceNow
721 |         """
722 |         if ctx:
723 |             await ctx.info(f"Processing natural language update: {command}")
724 |             
725 |         try:
726 |             # Parse the command
727 |             record_number, updates = NLPProcessor.parse_update_command(command)
728 |             
729 |             if ctx:
730 |                 await ctx.info(f"Updating {record_number} with: {updates}")
731 |             
732 |             # Get the record
733 |             if record_number.startswith("INC"):
734 |                 incident = await self.client.get_incident_by_number(record_number)
735 |                 if not incident:
736 |                     error_message = f"Incident {record_number} not found"
737 |                     if ctx:
738 |                         await ctx.error(error_message)
739 |                     return json.dumps({"error": error_message})
740 |                 
741 |                 sys_id = incident['sys_id']
742 |                 table = "incident"
743 |             else:
744 |                 # Handle other record types if needed
745 |                 error_message = f"Record type not supported: {record_number}"
746 |                 if ctx:
747 |                     await ctx.error(error_message)
748 |                 return json.dumps({"error": error_message})
749 |             
750 |             # Update the record
751 |             result = await self.client.update_record(table, sys_id, updates)
752 |             return json.dumps(result, indent=2)
753 |             
754 |         except ValueError as e:
755 |             error_message = str(e)
756 |             if ctx:
757 |                 await ctx.error(error_message)
758 |             return json.dumps({"error": error_message})
759 |     
760 |     async def update_script(self,
761 |                    script_update: ScriptUpdateModel,
762 |                    ctx: Context = None) -> str:
763 |         """
764 |         Update a ServiceNow script
765 |         
766 |         Args:
767 |             script_update: The script update details
768 |             ctx: Optional context object for progress reporting
769 |             
770 |         Returns:
771 |             JSON response from ServiceNow
772 |         """
773 |         if ctx:
774 |             await ctx.info(f"Updating script: {script_update.name}")
775 |             
776 |         # Search for the script by name
777 |         table = script_update.type
778 |         query = f"name={script_update.name}"
779 |         
780 |         options = QueryOptions(
781 |             limit=1,
782 |             query=query
783 |         )
784 |         
785 |         result = await self.client.get_records(table, options)
786 |         
787 |         if not result.get("result") or len(result["result"]) == 0:
788 |             # Script doesn't exist, create it
789 |             if ctx:
790 |                 await ctx.info(f"Script not found, creating new script: {script_update.name}")
791 |                 
792 |             data = {
793 |                 "name": script_update.name,
794 |                 "script": script_update.script
795 |             }
796 |             
797 |             if script_update.description:
798 |                 data["description"] = script_update.description
799 |                 
800 |             result = await self.client.create_record(table, data)
801 |         else:
802 |             # Script exists, update it
803 |             script = result["result"][0]
804 |             sys_id = script["sys_id"]
805 |             
806 |             if ctx:
807 |                 await ctx.info(f"Updating existing script: {script_update.name} ({sys_id})")
808 |                 
809 |             data = {
810 |                 "script": script_update.script
811 |             }
812 |             
813 |             if script_update.description:
814 |                 data["description"] = script_update.description
815 |                 
816 |             result = await self.client.update_record(table, sys_id, data)
817 |             
818 |         return json.dumps(result, indent=2)
819 |     
820 |     # Prompt templates
821 |     def incident_analysis_prompt(self, incident_number: str) -> str:
822 |         """Create a prompt to analyze a ServiceNow incident
823 |         
824 |         Args:
825 |             incident_number: The incident number to analyze (e.g., INC0010001)
826 |             
827 |         Returns:
828 |             Prompt text for analyzing the incident
829 |         """
830 |         return f"""
831 |         Please analyze the following ServiceNow incident {incident_number}.
832 |         
833 |         First, call the appropriate tool to fetch the incident details using get_incident.
834 |         
835 |         Then, provide a comprehensive analysis with the following sections:
836 |         
837 |         1. Summary: A brief overview of the incident
838 |         2. Impact Assessment: Analysis of the impact based on the severity, priority, and affected users
839 |         3. Root Cause Analysis: Potential causes based on available information
840 |         4. Resolution Recommendations: Suggested next steps to resolve the incident
841 |         5. SLA Status: Whether the incident is at risk of breaching SLAs
842 |         
843 |         Use a professional and clear tone appropriate for IT service management.
844 |         """
845 |         
846 |     def create_incident_prompt(self) -> str:
847 |         """Create a prompt for incident creation guidance
848 |         
849 |         Returns:
850 |             Prompt text for helping users create an incident
851 |         """
852 |         return """
853 |         I'll help you create a new ServiceNow incident. Please provide the following information:
854 |         
855 |         1. Short Description: A brief title for the incident (required)
856 |         2. Detailed Description: A thorough explanation of the issue (required)
857 |         3. Caller: The person reporting the issue (optional)
858 |         4. Category and Subcategory: The type of issue (optional)
859 |         5. Impact (1-High, 2-Medium, 3-Low): How broadly this affects users (optional)
860 |         6. Urgency (1-High, 2-Medium, 3-Low): How time-sensitive this issue is (optional)
861 |         
862 |         After collecting this information, I'll use the create_incident tool to submit the incident to ServiceNow.
863 |         """
864 | 
865 | 
866 | # Factory functions for creating authentication objects
867 | def create_basic_auth(username: str, password: str) -> BasicAuth:
868 |     """Create BasicAuth object for ServiceNow authentication"""
869 |     return BasicAuth(username, password)
870 | 
871 | def create_token_auth(token: str) -> TokenAuth:
872 |     """Create TokenAuth object for ServiceNow authentication"""
873 |     return TokenAuth(token)
874 | 
875 | def create_oauth_auth(client_id: str, client_secret: str, 
876 |                      username: str, password: str,
877 |                      instance_url: str) -> OAuthAuth:
878 |     """Create OAuthAuth object for ServiceNow authentication"""
879 |     return OAuthAuth(client_id, client_secret, username, password, instance_url)
880 | 
```