This is page 3 of 5. Use http://codebase.md/osomai/servicenow-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .cursorrules
├── .DS_Store
├── .env.example
├── .gitignore
├── config
│ └── tool_packages.yaml
├── debug_workflow_api.py
├── Dockerfile
├── docs
│ ├── catalog_optimization_plan.md
│ ├── catalog_variables.md
│ ├── catalog.md
│ ├── change_management.md
│ ├── changeset_management.md
│ ├── incident_management.md
│ ├── knowledge_base.md
│ ├── user_management.md
│ └── workflow_management.md
├── examples
│ ├── catalog_integration_test.py
│ ├── catalog_optimization_example.py
│ ├── change_management_demo.py
│ ├── changeset_management_demo.py
│ ├── claude_catalog_demo.py
│ ├── claude_desktop_config.json
│ ├── claude_incident_demo.py
│ ├── debug_workflow_api.py
│ ├── wake_servicenow_instance.py
│ └── workflow_management_demo.py
├── LICENSE
├── prompts
│ └── add_servicenow_mcp_tool.md
├── pyproject.toml
├── README.md
├── scripts
│ ├── check_pdi_info.py
│ ├── check_pdi_status.py
│ ├── install_claude_desktop.sh
│ ├── setup_api_key.py
│ ├── setup_auth.py
│ ├── setup_oauth.py
│ ├── setup.sh
│ └── test_connection.py
├── src
│ ├── .DS_Store
│ └── servicenow_mcp
│ ├── __init__.py
│ ├── .DS_Store
│ ├── auth
│ │ ├── __init__.py
│ │ └── auth_manager.py
│ ├── cli.py
│ ├── server_sse.py
│ ├── server.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── catalog_optimization.py
│ │ ├── catalog_tools.py
│ │ ├── catalog_variables.py
│ │ ├── change_tools.py
│ │ ├── changeset_tools.py
│ │ ├── epic_tools.py
│ │ ├── incident_tools.py
│ │ ├── knowledge_base.py
│ │ ├── project_tools.py
│ │ ├── script_include_tools.py
│ │ ├── scrum_task_tools.py
│ │ ├── story_tools.py
│ │ ├── user_tools.py
│ │ └── workflow_tools.py
│ └── utils
│ ├── __init__.py
│ ├── config.py
│ └── tool_utils.py
├── tests
│ ├── test_catalog_optimization.py
│ ├── test_catalog_resources.py
│ ├── test_catalog_tools.py
│ ├── test_catalog_variables.py
│ ├── test_change_tools.py
│ ├── test_changeset_resources.py
│ ├── test_changeset_tools.py
│ ├── test_config.py
│ ├── test_incident_tools.py
│ ├── test_knowledge_base.py
│ ├── test_script_include_resources.py
│ ├── test_script_include_tools.py
│ ├── test_server_catalog_optimization.py
│ ├── test_server_catalog.py
│ ├── test_server_workflow.py
│ ├── test_user_tools.py
│ ├── test_workflow_tools_direct.py
│ ├── test_workflow_tools_params.py
│ └── test_workflow_tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_catalog_variables.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the catalog item variables tools.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 | import requests
8 |
9 | from servicenow_mcp.tools.catalog_variables import (
10 | CreateCatalogItemVariableParams,
11 | ListCatalogItemVariablesParams,
12 | UpdateCatalogItemVariableParams,
13 | create_catalog_item_variable,
14 | list_catalog_item_variables,
15 | update_catalog_item_variable,
16 | )
17 | from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
18 |
19 |
20 | class TestCatalogVariablesTools(unittest.TestCase):
21 | """
22 | Test the catalog item variables tools.
23 | """
24 |
25 | def setUp(self):
26 | """Set up the test environment."""
27 | self.config = ServerConfig(
28 | instance_url="https://test.service-now.com",
29 | timeout=10,
30 | auth=AuthConfig(
31 | type=AuthType.BASIC,
32 | basic=BasicAuthConfig(
33 | username="test_user",
34 | password="test_password"
35 | )
36 | ),
37 | )
38 | self.auth_manager = MagicMock()
39 | self.auth_manager.get_headers.return_value = {"Content-Type": "application/json"}
40 |
41 | @patch("requests.post")
42 | def test_create_catalog_item_variable(self, mock_post):
43 | """Test create_catalog_item_variable function."""
44 | # Configure mock
45 | mock_response = MagicMock()
46 | mock_response.raise_for_status = MagicMock()
47 | mock_response.json.return_value = {
48 | "result": {
49 | "sys_id": "abc123",
50 | "name": "test_variable",
51 | "type": "string",
52 | "question_text": "Test Variable",
53 | "mandatory": "false",
54 | }
55 | }
56 | mock_post.return_value = mock_response
57 |
58 | # Create test params
59 | params = CreateCatalogItemVariableParams(
60 | catalog_item_id="item123",
61 | name="test_variable",
62 | type="string",
63 | label="Test Variable",
64 | mandatory=False,
65 | )
66 |
67 | # Call function
68 | result = create_catalog_item_variable(self.config, self.auth_manager, params)
69 |
70 | # Verify result
71 | self.assertTrue(result.success)
72 | self.assertEqual(result.variable_id, "abc123")
73 | self.assertIsNotNone(result.details)
74 |
75 | # Verify mock was called correctly
76 | mock_post.assert_called_once()
77 | call_args = mock_post.call_args
78 | self.assertEqual(
79 | call_args[0][0], f"{self.config.instance_url}/api/now/table/item_option_new"
80 | )
81 | self.assertEqual(call_args[1]["json"]["cat_item"], "item123")
82 | self.assertEqual(call_args[1]["json"]["name"], "test_variable")
83 | self.assertEqual(call_args[1]["json"]["type"], "string")
84 | self.assertEqual(call_args[1]["json"]["question_text"], "Test Variable")
85 | self.assertEqual(call_args[1]["json"]["mandatory"], "false")
86 |
87 | @patch("requests.post")
88 | def test_create_catalog_item_variable_with_optional_params(self, mock_post):
89 | """Test create_catalog_item_variable function with optional parameters."""
90 | # Configure mock
91 | mock_response = MagicMock()
92 | mock_response.raise_for_status = MagicMock()
93 | mock_response.json.return_value = {
94 | "result": {
95 | "sys_id": "abc123",
96 | "name": "test_variable",
97 | "type": "reference",
98 | "question_text": "Test Reference",
99 | "mandatory": "true",
100 | "reference": "sys_user",
101 | "reference_qual": "active=true",
102 | "help_text": "Select a user",
103 | "default_value": "admin",
104 | "description": "Reference to a user",
105 | "order": 100,
106 | }
107 | }
108 | mock_post.return_value = mock_response
109 |
110 | # Create test params with optional fields
111 | params = CreateCatalogItemVariableParams(
112 | catalog_item_id="item123",
113 | name="test_variable",
114 | type="reference",
115 | label="Test Reference",
116 | mandatory=True,
117 | help_text="Select a user",
118 | default_value="admin",
119 | description="Reference to a user",
120 | order=100,
121 | reference_table="sys_user",
122 | reference_qualifier="active=true",
123 | )
124 |
125 | # Call function
126 | result = create_catalog_item_variable(self.config, self.auth_manager, params)
127 |
128 | # Verify result
129 | self.assertTrue(result.success)
130 | self.assertEqual(result.variable_id, "abc123")
131 |
132 | # Verify mock was called correctly
133 | mock_post.assert_called_once()
134 | call_args = mock_post.call_args
135 | self.assertEqual(call_args[1]["json"]["reference"], "sys_user")
136 | self.assertEqual(call_args[1]["json"]["reference_qual"], "active=true")
137 | self.assertEqual(call_args[1]["json"]["help_text"], "Select a user")
138 | self.assertEqual(call_args[1]["json"]["default_value"], "admin")
139 | self.assertEqual(call_args[1]["json"]["description"], "Reference to a user")
140 | self.assertEqual(call_args[1]["json"]["order"], 100)
141 |
142 | @patch("requests.post")
143 | def test_create_catalog_item_variable_error(self, mock_post):
144 | """Test create_catalog_item_variable function with error."""
145 | # Configure mock to raise exception
146 | mock_post.side_effect = requests.RequestException("Test error")
147 |
148 | # Create test params
149 | params = CreateCatalogItemVariableParams(
150 | catalog_item_id="item123",
151 | name="test_variable",
152 | type="string",
153 | label="Test Variable",
154 | )
155 |
156 | # Call function
157 | result = create_catalog_item_variable(self.config, self.auth_manager, params)
158 |
159 | # Verify result
160 | self.assertFalse(result.success)
161 | self.assertTrue("failed" in result.message.lower())
162 |
163 | @patch("requests.get")
164 | def test_list_catalog_item_variables(self, mock_get):
165 | """Test list_catalog_item_variables function."""
166 | # Configure mock
167 | mock_response = MagicMock()
168 | mock_response.raise_for_status = MagicMock()
169 | mock_response.json.return_value = {
170 | "result": [
171 | {
172 | "sys_id": "var1",
173 | "name": "variable1",
174 | "type": "string",
175 | "question_text": "Variable 1",
176 | "order": 100,
177 | "mandatory": "true",
178 | },
179 | {
180 | "sys_id": "var2",
181 | "name": "variable2",
182 | "type": "integer",
183 | "question_text": "Variable 2",
184 | "order": 200,
185 | "mandatory": "false",
186 | },
187 | ]
188 | }
189 | mock_get.return_value = mock_response
190 |
191 | # Create test params
192 | params = ListCatalogItemVariablesParams(
193 | catalog_item_id="item123",
194 | include_details=True,
195 | )
196 |
197 | # Call function
198 | result = list_catalog_item_variables(self.config, self.auth_manager, params)
199 |
200 | # Verify result
201 | self.assertTrue(result.success)
202 | self.assertEqual(result.count, 2)
203 | self.assertEqual(len(result.variables), 2)
204 | self.assertEqual(result.variables[0]["sys_id"], "var1")
205 | self.assertEqual(result.variables[1]["sys_id"], "var2")
206 |
207 | # Verify mock was called correctly
208 | mock_get.assert_called_once()
209 | call_args = mock_get.call_args
210 | self.assertEqual(
211 | call_args[0][0], f"{self.config.instance_url}/api/now/table/item_option_new"
212 | )
213 | self.assertEqual(
214 | call_args[1]["params"]["sysparm_query"], "cat_item=item123^ORDERBYorder"
215 | )
216 | self.assertEqual(call_args[1]["params"]["sysparm_display_value"], "true")
217 | self.assertEqual(call_args[1]["params"]["sysparm_exclude_reference_link"], "false")
218 |
219 | @patch("requests.get")
220 | def test_list_catalog_item_variables_with_pagination(self, mock_get):
221 | """Test list_catalog_item_variables function with pagination parameters."""
222 | # Configure mock
223 | mock_response = MagicMock()
224 | mock_response.raise_for_status = MagicMock()
225 | mock_response.json.return_value = {"result": [{"sys_id": "var1"}]}
226 | mock_get.return_value = mock_response
227 |
228 | # Create test params with pagination
229 | params = ListCatalogItemVariablesParams(
230 | catalog_item_id="item123",
231 | include_details=False,
232 | limit=10,
233 | offset=20,
234 | )
235 |
236 | # Call function
237 | result = list_catalog_item_variables(self.config, self.auth_manager, params)
238 |
239 | # Verify result
240 | self.assertTrue(result.success)
241 |
242 | # Verify mock was called correctly with pagination
243 | mock_get.assert_called_once()
244 | call_args = mock_get.call_args
245 | self.assertEqual(call_args[1]["params"]["sysparm_limit"], 10)
246 | self.assertEqual(call_args[1]["params"]["sysparm_offset"], 20)
247 | self.assertEqual(
248 | call_args[1]["params"]["sysparm_fields"],
249 | "sys_id,name,type,question_text,order,mandatory",
250 | )
251 |
252 | @patch("requests.get")
253 | def test_list_catalog_item_variables_error(self, mock_get):
254 | """Test list_catalog_item_variables function with error."""
255 | # Configure mock to raise exception
256 | mock_get.side_effect = requests.RequestException("Test error")
257 |
258 | # Create test params
259 | params = ListCatalogItemVariablesParams(
260 | catalog_item_id="item123",
261 | )
262 |
263 | # Call function
264 | result = list_catalog_item_variables(self.config, self.auth_manager, params)
265 |
266 | # Verify result
267 | self.assertFalse(result.success)
268 | self.assertTrue("failed" in result.message.lower())
269 |
270 | @patch("requests.patch")
271 | def test_update_catalog_item_variable(self, mock_patch):
272 | """Test update_catalog_item_variable function."""
273 | # Configure mock
274 | mock_response = MagicMock()
275 | mock_response.raise_for_status = MagicMock()
276 | mock_response.json.return_value = {
277 | "result": {
278 | "sys_id": "var1",
279 | "question_text": "Updated Variable",
280 | "mandatory": "true",
281 | "help_text": "This is help text",
282 | }
283 | }
284 | mock_patch.return_value = mock_response
285 |
286 | # Create test params
287 | params = UpdateCatalogItemVariableParams(
288 | variable_id="var1",
289 | label="Updated Variable",
290 | mandatory=True,
291 | help_text="This is help text",
292 | )
293 |
294 | # Call function
295 | result = update_catalog_item_variable(self.config, self.auth_manager, params)
296 |
297 | # Verify result
298 | self.assertTrue(result.success)
299 | self.assertEqual(result.variable_id, "var1")
300 | self.assertIsNotNone(result.details)
301 |
302 | # Verify mock was called correctly
303 | mock_patch.assert_called_once()
304 | call_args = mock_patch.call_args
305 | self.assertEqual(
306 | call_args[0][0],
307 | f"{self.config.instance_url}/api/now/table/item_option_new/var1",
308 | )
309 | self.assertEqual(call_args[1]["json"]["question_text"], "Updated Variable")
310 | self.assertEqual(call_args[1]["json"]["mandatory"], "true")
311 | self.assertEqual(call_args[1]["json"]["help_text"], "This is help text")
312 |
313 | @patch("requests.patch")
314 | def test_update_catalog_item_variable_no_params(self, mock_patch):
315 | """Test update_catalog_item_variable function with no update parameters."""
316 | # Create test params with no updates (only ID)
317 | params = UpdateCatalogItemVariableParams(
318 | variable_id="var1",
319 | )
320 |
321 | # Call function
322 | result = update_catalog_item_variable(self.config, self.auth_manager, params)
323 |
324 | # Verify result - should fail since no update parameters provided
325 | self.assertFalse(result.success)
326 | self.assertEqual(result.message, "No update parameters provided")
327 |
328 | # Verify mock was not called
329 | mock_patch.assert_not_called()
330 |
331 | @patch("requests.patch")
332 | def test_update_catalog_item_variable_error(self, mock_patch):
333 | """Test update_catalog_item_variable function with error."""
334 | # Configure mock to raise exception
335 | mock_patch.side_effect = requests.RequestException("Test error")
336 |
337 | # Create test params
338 | params = UpdateCatalogItemVariableParams(
339 | variable_id="var1",
340 | label="Updated Variable",
341 | )
342 |
343 | # Call function
344 | result = update_catalog_item_variable(self.config, self.auth_manager, params)
345 |
346 | # Verify result
347 | self.assertFalse(result.success)
348 | self.assertTrue("failed" in result.message.lower())
349 |
350 |
351 | if __name__ == "__main__":
352 | unittest.main()
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/server.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | ServiceNow MCP Server
3 |
4 | This module provides the main implementation of the ServiceNow MCP server.
5 | """
6 |
7 | import json
8 | import logging
9 | import os
10 | from typing import Any, Dict, List, Union
11 |
12 | import mcp.types as types
13 | import yaml
14 | from mcp.server.lowlevel import Server
15 | from pydantic import ValidationError
16 |
17 | from servicenow_mcp.auth.auth_manager import AuthManager
18 | from servicenow_mcp.tools.knowledge_base import (
19 | create_category as create_kb_category_tool,
20 | )
21 | from servicenow_mcp.tools.knowledge_base import (
22 | list_categories as list_kb_categories_tool,
23 | )
24 | from servicenow_mcp.utils.config import ServerConfig
25 | from servicenow_mcp.utils.tool_utils import get_tool_definitions
26 |
27 | # Set up logging
28 | logging.basicConfig(level=logging.INFO)
29 | logger = logging.getLogger(__name__)
30 |
31 | # Define path for the configuration file
32 | TOOL_PACKAGE_CONFIG_PATH = os.getenv("TOOL_PACKAGE_CONFIG_PATH", "config/tool_packages.yaml")
33 |
34 |
35 | def serialize_tool_output(result: Any, tool_name: str) -> str:
36 | """Serializes tool output to a string, preferably JSON indented."""
37 | try:
38 | if isinstance(result, str):
39 | # If it's already a string, assume it's intended as such
40 | # Try to parse/re-dump JSON for consistent formatting if it looks like JSON
41 | try:
42 | parsed = json.loads(result)
43 | return json.dumps(parsed, indent=2)
44 | except json.JSONDecodeError:
45 | return result # Return as is if not valid JSON
46 | elif isinstance(result, dict):
47 | # Dump dicts to JSON
48 | return json.dumps(result, indent=2)
49 | elif hasattr(result, "model_dump_json"): # Pydantic v2
50 | # Prefer Pydantic v2 model_dump_json
51 | # The indent argument might not be supported by all versions/models,
52 | # so we might need to dump to dict first if indent is crucial.
53 | try:
54 | return result.model_dump_json(indent=2)
55 | except TypeError: # Handle case where indent is not supported
56 | return json.dumps(result.model_dump(), indent=2)
57 | elif hasattr(result, "model_dump"): # Pydantic v2 fallback
58 | # Fallback to Pydantic v2 model_dump -> dict -> json
59 | return json.dumps(result.model_dump(), indent=2)
60 | elif hasattr(result, "dict"): # Pydantic v1
61 | # Fallback to Pydantic v1 dict -> json
62 | return json.dumps(result.dict(), indent=2)
63 | else:
64 | # Absolute fallback: convert to string
65 | logger.warning(
66 | f"Could not serialize result for tool '{tool_name}' to JSON, falling back to str(). Type: {type(result)}"
67 | )
68 | return str(result)
69 | except Exception as e:
70 | logger.error(f"Error during serialization for tool '{tool_name}': {e}", exc_info=True)
71 | # Return an error message string formatted as JSON
72 | return json.dumps(
73 | {"error": f"Serialization failed for tool {tool_name}", "details": str(e)}, indent=2
74 | )
75 |
76 |
77 | class ServiceNowMCP:
78 | """
79 | ServiceNow MCP Server implementation.
80 |
81 | This class provides a Model Context Protocol (MCP) server for ServiceNow,
82 | allowing LLMs to interact with ServiceNow data and functionality.
83 | It supports loading specific tool packages via the MCP_TOOL_PACKAGE env var.
84 | """
85 |
86 | def __init__(self, config: Union[Dict, ServerConfig]):
87 | """
88 | Initialize the ServiceNow MCP server.
89 |
90 | Args:
91 | config: Server configuration, either as a dictionary or ServerConfig object.
92 | """
93 | if isinstance(config, dict):
94 | self.config = ServerConfig(**config)
95 | else:
96 | self.config = config
97 |
98 | self.auth_manager = AuthManager(self.config.auth, self.config.instance_url)
99 | self.mcp_server = Server("ServiceNow") # Use low-level Server
100 | self.name = "ServiceNow"
101 |
102 | self.package_definitions: Dict[str, List[str]] = {}
103 | self.enabled_tool_names: List[str] = []
104 | self.current_package_name: str = "none"
105 | self._load_package_config()
106 | self._determine_enabled_tools()
107 |
108 | # Get tool definitions, passing the aliased KB tool functions if needed
109 | self.tool_definitions = get_tool_definitions(
110 | create_kb_category_tool, list_kb_categories_tool
111 | )
112 |
113 | self._register_handlers()
114 |
115 | def _register_handlers(self):
116 | """Register the list_tools and call_tool handlers."""
117 | self.mcp_server.list_tools()(self._list_tools_impl)
118 | self.mcp_server.call_tool()(self._call_tool_impl)
119 | logger.info("Registered list_tools and call_tool handlers.")
120 |
121 | def _load_package_config(self):
122 | """Load tool package definitions from the YAML configuration file."""
123 | config_path = TOOL_PACKAGE_CONFIG_PATH
124 | if not os.path.isabs(config_path):
125 | config_path = os.path.join(os.path.dirname(__file__), "..", "..", config_path)
126 | config_path = os.path.abspath(config_path)
127 |
128 | try:
129 | with open(config_path, "r") as f:
130 | loaded_config = yaml.safe_load(f)
131 | if isinstance(loaded_config, dict):
132 | self.package_definitions = loaded_config
133 | logger.info(f"Successfully loaded tool package config from {config_path}")
134 | else:
135 | logger.error(
136 | f"Invalid format in {config_path}: Expected a dictionary, got {type(loaded_config)}. No packages loaded."
137 | )
138 | self.package_definitions = {}
139 | except FileNotFoundError:
140 | logger.error(
141 | f"Tool package config file not found at {config_path}. No packages loaded."
142 | )
143 | self.package_definitions = {}
144 | except yaml.YAMLError as e:
145 | logger.error(
146 | f"Error parsing tool package config file {config_path}: {e}. No packages loaded."
147 | )
148 | self.package_definitions = {}
149 | except Exception as e:
150 | logger.error(f"Unexpected error loading tool package config {config_path}: {e}")
151 | self.package_definitions = {}
152 |
153 | def _determine_enabled_tools(self):
154 | """Determine which tool package and tools to enable based on environment variable."""
155 | requested_package = os.getenv("MCP_TOOL_PACKAGE", "full").strip()
156 |
157 | if not requested_package:
158 | self.current_package_name = "full"
159 | logger.info("MCP_TOOL_PACKAGE is empty, defaulting to 'full' package.")
160 | elif requested_package in self.package_definitions:
161 | self.current_package_name = requested_package
162 | logger.info(f"MCP_TOOL_PACKAGE set to '{self.current_package_name}'.")
163 | else:
164 | self.current_package_name = "none"
165 | logger.warning(
166 | f"MCP_TOOL_PACKAGE '{requested_package}' is not a valid package name. "
167 | f"Valid packages: {list(self.package_definitions.keys())}. Loading 'none' package."
168 | )
169 |
170 | if self.package_definitions:
171 | self.enabled_tool_names = self.package_definitions.get(self.current_package_name, [])
172 | else:
173 | self.enabled_tool_names = []
174 |
175 | logger.info(
176 | f"Loading package '{self.current_package_name}' with {len(self.enabled_tool_names)} tools."
177 | )
178 |
179 | async def _list_tools_impl(self) -> List[types.Tool]:
180 | """Implementation for the list_tools MCP endpoint."""
181 | tool_list: List[types.Tool] = []
182 |
183 | # Add the introspection tool if not 'none' package
184 | if self.current_package_name != "none":
185 | tool_list.append(
186 | types.Tool(
187 | name="list_tool_packages",
188 | description="Lists available tool packages and the currently loaded one.",
189 | inputSchema={
190 | "type": "object",
191 | "properties": {
192 | "random_string": {
193 | "type": "string",
194 | "description": "Dummy parameter for no-parameter tools",
195 | }
196 | },
197 | "required": ["random_string"],
198 | },
199 | )
200 | )
201 |
202 | # Iterate through defined tools and add enabled ones
203 | for tool_name, definition in self.tool_definitions.items():
204 | if tool_name in self.enabled_tool_names:
205 | _impl_func, params_model, _return_annotation, description, _serialization = (
206 | definition
207 | )
208 | try:
209 | schema = params_model.model_json_schema()
210 | tool_list.append(
211 | types.Tool(name=tool_name, description=description, inputSchema=schema)
212 | )
213 | except Exception as e:
214 | logger.error(
215 | f"Failed to generate schema for tool '{tool_name}': {e}", exc_info=True
216 | )
217 |
218 | logger.debug(f"Listing {len(tool_list)} tools for package '{self.current_package_name}'.")
219 | return tool_list
220 |
221 | async def _call_tool_impl(self, name: str, arguments: dict) -> list[types.TextContent]:
222 | """
223 | Implementation for the call_tool MCP endpoint.
224 | Handles argument parsing, tool execution, result serialization (to string),
225 | and returning a list containing a single TextContent object.
226 |
227 | Args:
228 | name: The name of the tool to call.
229 | arguments: The arguments for the tool as a dictionary.
230 |
231 | Returns:
232 | A list containing a single TextContent object with the tool output.
233 |
234 | Raises:
235 | ValueError: If the tool is unknown, disabled, or if arguments are invalid.
236 | RuntimeError: If tool execution or serialization fails.
237 | """
238 | logger.info(f"Received call_tool request for tool '{name}'")
239 | # Handle the introspection tool separately
240 | if name == "list_tool_packages":
241 | if self.current_package_name == "none":
242 | raise ValueError(
243 | "Tool 'list_tool_packages' is not available in the 'none' package."
244 | )
245 | result_dict = self._list_tool_packages_impl()
246 | serialized_string = json.dumps(result_dict, indent=2)
247 | # Return a list with a TextContent object
248 | return [types.TextContent(type="text", text=serialized_string)]
249 |
250 | # Check if the tool exists and is enabled
251 | if name not in self.tool_definitions:
252 | raise ValueError(f"Unknown tool: {name}")
253 | if name not in self.enabled_tool_names:
254 | raise ValueError(
255 | f"Tool '{name}' is not enabled in the current package '{self.current_package_name}'."
256 | )
257 |
258 | # Get tool definition (we don't need the serialization hint anymore)
259 | definition = self.tool_definitions[name]
260 | impl_func, params_model, _return_annotation, _description, _serialization = definition
261 |
262 | # Validate and parse arguments using the Pydantic model
263 | try:
264 | params = params_model(**arguments)
265 | logger.debug(f"Parsed arguments for tool '{name}': {params}")
266 | except ValidationError as e:
267 | logger.error(f"Invalid arguments for tool '{name}': {e}", exc_info=True)
268 | raise ValueError(f"Invalid arguments for tool '{name}': {e}") from e
269 | except Exception as e:
270 | logger.error(
271 | f"Unexpected error parsing arguments for tool '{name}': {e}", exc_info=True
272 | )
273 | raise ValueError(f"Failed to parse arguments for tool '{name}': {e}")
274 |
275 | # Execute the tool implementation function
276 | try:
277 | result = impl_func(self.config, self.auth_manager, params)
278 | logger.debug(f"Raw result type from tool '{name}': {type(result)}")
279 | except Exception as e:
280 | logger.error(f"Error executing tool '{name}': {e}", exc_info=True)
281 | raise RuntimeError(f"Error during execution of tool '{name}': {e}") from e
282 |
283 | # Serialize the result to a string (preferably JSON) using the helper
284 | serialized_string = serialize_tool_output(result, name)
285 | logger.debug(f"Serialized value for tool '{name}': {serialized_string[:500]}...")
286 |
287 | # Return a list with a TextContent object
288 | return [types.TextContent(type="text", text=serialized_string)]
289 |
290 | def _list_tool_packages_impl(self) -> Dict[str, Any]:
291 | """Implementation logic for the list_tool_packages tool."""
292 | available_packages = list(self.package_definitions.keys())
293 | return {
294 | "current_package": self.current_package_name,
295 | "available_packages": available_packages,
296 | "message": (
297 | f"Currently loaded package: '{self.current_package_name}'. "
298 | f"Set MCP_TOOL_PACKAGE env var to one of {available_packages} to switch."
299 | ),
300 | }
301 |
302 | def start(self) -> Server:
303 | """
304 | Prepares and returns the configured low-level MCP Server instance.
305 |
306 | The caller (e.g., cli.py) is responsible for obtaining the server
307 | instance from this method and running it within an appropriate
308 | async transport context (e.g., mcp.server.stdio.stdio_server).
309 |
310 | Returns:
311 | The configured mcp.server.lowlevel.Server instance.
312 | """
313 | logger.info(
314 | "ServiceNowMCP instance configured. Returning low-level server instance for external execution."
315 | )
316 | # The actual running of the server (server.run(...)) must happen
317 | # within an async context managed by the caller (e.g., using anyio
318 | # and a specific transport like stdio_server or SseServerTransport).
319 | return self.mcp_server
320 |
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/epic_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Epic management tools for the ServiceNow MCP server.
3 |
4 | This module provides tools for managing epics in ServiceNow.
5 | """
6 |
7 | import logging
8 | from datetime import datetime
9 | from typing import Any, Dict, List, Optional, Type, TypeVar
10 |
11 | import requests
12 | from pydantic import BaseModel, Field
13 |
14 | from servicenow_mcp.auth.auth_manager import AuthManager
15 | from servicenow_mcp.utils.config import ServerConfig
16 |
17 | logger = logging.getLogger(__name__)
18 |
19 | # Type variable for Pydantic models
20 | T = TypeVar('T', bound=BaseModel)
21 |
22 | class CreateEpicParams(BaseModel):
23 | """Parameters for creating an epic."""
24 |
25 | short_description: str = Field(..., description="Short description of the epic")
26 | description: Optional[str] = Field(None, description="Detailed description of the epic")
27 | priority: Optional[str] = Field(None, description="Priority of epic (1 is Critical, 2 is High, 3 is Moderate, 4 is Low, 5 is Planning)")
28 | state: Optional[str] = Field(None, description="State of story (-6 is Draft,1 is Ready,2 is Work in progress, 3 is Complete, 4 is Cancelled)")
29 | assignment_group: Optional[str] = Field(None, description="Group assigned to the epic")
30 | assigned_to: Optional[str] = Field(None, description="User assigned to the epic")
31 | work_notes: Optional[str] = Field(None, description="Work notes to add to the epic. Used for adding notes and comments to an epic")
32 |
33 | class UpdateEpicParams(BaseModel):
34 | """Parameters for updating an epic."""
35 |
36 | epic_id: str = Field(..., description="Epic ID or sys_id")
37 | short_description: Optional[str] = Field(None, description="Short description of the epic")
38 | description: Optional[str] = Field(None, description="Detailed description of the epic")
39 | priority: Optional[str] = Field(None, description="Priority of epic (1 is Critical, 2 is High, 3 is Moderate, 4 is Low, 5 is Planning)")
40 | state: Optional[str] = Field(None, description="State of story (-6 is Draft,1 is Ready,2 is Work in progress, 3 is Complete, 4 is Cancelled)")
41 | assignment_group: Optional[str] = Field(None, description="Group assigned to the epic")
42 | assigned_to: Optional[str] = Field(None, description="User assigned to the epic")
43 | work_notes: Optional[str] = Field(None, description="Work notes to add to the epic. Used for adding notes and comments to an epic")
44 |
45 | class ListEpicsParams(BaseModel):
46 | """Parameters for listing epics."""
47 |
48 | limit: Optional[int] = Field(10, description="Maximum number of records to return")
49 | offset: Optional[int] = Field(0, description="Offset to start from")
50 | priority: Optional[str] = Field(None, description="Filter by priority")
51 | assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
52 | timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
53 | query: Optional[str] = Field(None, description="Additional query string")
54 |
55 |
56 | def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
57 | """
58 | Helper function to unwrap and validate parameters.
59 |
60 | Args:
61 | params: The parameters to unwrap and validate.
62 | model_class: The Pydantic model class to validate against.
63 | required_fields: List of required field names.
64 |
65 | Returns:
66 | A tuple of (success, result) where result is either the validated parameters or an error message.
67 | """
68 | # Handle case where params might be wrapped in another dictionary
69 | if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
70 | logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
71 | params = params["params"]
72 |
73 | # Handle case where params might be a Pydantic model object
74 | if not isinstance(params, dict):
75 | try:
76 | # Try to convert to dict if it's a Pydantic model
77 | logger.warning("Params is not a dictionary. Attempting to convert...")
78 | params = params.dict() if hasattr(params, "dict") else dict(params)
79 | except Exception as e:
80 | logger.error(f"Failed to convert params to dictionary: {e}")
81 | return {
82 | "success": False,
83 | "message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
84 | }
85 |
86 | # Validate required parameters are present
87 | if required_fields:
88 | for field in required_fields:
89 | if field not in params:
90 | return {
91 | "success": False,
92 | "message": f"Missing required parameter '{field}'",
93 | }
94 |
95 | try:
96 | # Validate parameters against the model
97 | validated_params = model_class(**params)
98 | return {
99 | "success": True,
100 | "params": validated_params,
101 | }
102 | except Exception as e:
103 | logger.error(f"Error validating parameters: {e}")
104 | return {
105 | "success": False,
106 | "message": f"Error validating parameters: {str(e)}",
107 | }
108 |
109 |
110 | def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
111 | """
112 | Helper function to get the instance URL from either server_config or auth_manager.
113 |
114 | Args:
115 | auth_manager: The authentication manager.
116 | server_config: The server configuration.
117 |
118 | Returns:
119 | The instance URL if found, None otherwise.
120 | """
121 | if hasattr(server_config, 'instance_url'):
122 | return server_config.instance_url
123 | elif hasattr(auth_manager, 'instance_url'):
124 | return auth_manager.instance_url
125 | else:
126 | logger.error("Cannot find instance_url in either server_config or auth_manager")
127 | return None
128 |
129 |
130 | def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
131 | """
132 | Helper function to get headers from either auth_manager or server_config.
133 |
134 | Args:
135 | auth_manager: The authentication manager or object passed as auth_manager.
136 | server_config: The server configuration or object passed as server_config.
137 |
138 | Returns:
139 | The headers if found, None otherwise.
140 | """
141 | # Try to get headers from auth_manager
142 | if hasattr(auth_manager, 'get_headers'):
143 | return auth_manager.get_headers()
144 |
145 | # If auth_manager doesn't have get_headers, try server_config
146 | if hasattr(server_config, 'get_headers'):
147 | return server_config.get_headers()
148 |
149 | # If neither has get_headers, check if auth_manager is actually a ServerConfig
150 | # and server_config is actually an AuthManager (parameters swapped)
151 | if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
152 | return server_config.get_headers()
153 |
154 | logger.error("Cannot find get_headers method in either auth_manager or server_config")
155 | return None
156 |
157 | def create_epic(
158 | auth_manager: AuthManager,
159 | server_config: ServerConfig,
160 | params: Dict[str, Any],
161 | ) -> Dict[str, Any]:
162 | """
163 | Create a new epic in ServiceNow.
164 |
165 | Args:
166 | auth_manager: The authentication manager.
167 | server_config: The server configuration.
168 | params: The parameters for creating the epic.
169 |
170 | Returns:
171 | The created epic.
172 | """
173 |
174 | # Unwrap and validate parameters
175 | result = _unwrap_and_validate_params(
176 | params,
177 | CreateEpicParams,
178 | required_fields=["short_description"]
179 | )
180 |
181 | if not result["success"]:
182 | return result
183 |
184 | validated_params = result["params"]
185 |
186 | # Prepare the request data
187 | data = {
188 | "short_description": validated_params.short_description,
189 | }
190 |
191 | # Add optional fields if provided
192 | if validated_params.description:
193 | data["description"] = validated_params.description
194 | if validated_params.priority:
195 | data["priority"] = validated_params.priority
196 | if validated_params.assignment_group:
197 | data["assignment_group"] = validated_params.assignment_group
198 | if validated_params.assigned_to:
199 | data["assigned_to"] = validated_params.assigned_to
200 | if validated_params.work_notes:
201 | data["work_notes"] = validated_params.work_notes
202 |
203 | # Get the instance URL
204 | instance_url = _get_instance_url(auth_manager, server_config)
205 | if not instance_url:
206 | return {
207 | "success": False,
208 | "message": "Cannot find instance_url in either server_config or auth_manager",
209 | }
210 |
211 | # Get the headers
212 | headers = _get_headers(auth_manager, server_config)
213 | if not headers:
214 | return {
215 | "success": False,
216 | "message": "Cannot find get_headers method in either auth_manager or server_config",
217 | }
218 |
219 | # Add Content-Type header
220 | headers["Content-Type"] = "application/json"
221 |
222 | # Make the API request
223 | url = f"{instance_url}/api/now/table/rm_epic"
224 |
225 | try:
226 | response = requests.post(url, json=data, headers=headers)
227 | response.raise_for_status()
228 |
229 | result = response.json()
230 |
231 | return {
232 | "success": True,
233 | "message": "Epic created successfully",
234 | "epic": result["result"],
235 | }
236 | except requests.exceptions.RequestException as e:
237 | logger.error(f"Error creating epic: {e}")
238 | return {
239 | "success": False,
240 | "message": f"Error creating epic: {str(e)}",
241 | }
242 |
243 | def update_epic(
244 | auth_manager: AuthManager,
245 | server_config: ServerConfig,
246 | params: Dict[str, Any],
247 | ) -> Dict[str, Any]:
248 | """
249 | Update an existing epic in ServiceNow.
250 |
251 | Args:
252 | auth_manager: The authentication manager.
253 | server_config: The server configuration.
254 | params: The parameters for updating the epic.
255 |
256 | Returns:
257 | The updated epic.
258 | """
259 | # Unwrap and validate parameters
260 | result = _unwrap_and_validate_params(
261 | params,
262 | UpdateEpicParams,
263 | required_fields=["epic_id"]
264 | )
265 |
266 | if not result["success"]:
267 | return result
268 |
269 | validated_params = result["params"]
270 |
271 | # Prepare the request data
272 | data = {}
273 |
274 | # Add optional fields if provided
275 | if validated_params.short_description:
276 | data["short_description"] = validated_params.short_description
277 | if validated_params.description:
278 | data["description"] = validated_params.description
279 | if validated_params.priority:
280 | data["priority"] = validated_params.priority
281 | if validated_params.assignment_group:
282 | data["assignment_group"] = validated_params.assignment_group
283 | if validated_params.assigned_to:
284 | data["assigned_to"] = validated_params.assigned_to
285 | if validated_params.work_notes:
286 | data["work_notes"] = validated_params.work_notes
287 |
288 | # Get the instance URL
289 | instance_url = _get_instance_url(auth_manager, server_config)
290 | if not instance_url:
291 | return {
292 | "success": False,
293 | "message": "Cannot find instance_url in either server_config or auth_manager",
294 | }
295 |
296 | # Get the headers
297 | headers = _get_headers(auth_manager, server_config)
298 | if not headers:
299 | return {
300 | "success": False,
301 | "message": "Cannot find get_headers method in either auth_manager or server_config",
302 | }
303 |
304 | # Add Content-Type header
305 | headers["Content-Type"] = "application/json"
306 |
307 | # Make the API request
308 | url = f"{instance_url}/api/now/table/rm_epic/{validated_params.epic_id}"
309 |
310 | try:
311 | response = requests.put(url, json=data, headers=headers)
312 | response.raise_for_status()
313 |
314 | result = response.json()
315 |
316 | return {
317 | "success": True,
318 | "message": "Epic updated successfully",
319 | "epic": result["result"],
320 | }
321 | except requests.exceptions.RequestException as e:
322 | logger.error(f"Error updating epic: {e}")
323 | return {
324 | "success": False,
325 | "message": f"Error updating epic: {str(e)}",
326 | }
327 |
328 | def list_epics(
329 | auth_manager: AuthManager,
330 | server_config: ServerConfig,
331 | params: Dict[str, Any],
332 | ) -> Dict[str, Any]:
333 | """
334 | List epics from ServiceNow.
335 |
336 | Args:
337 | auth_manager: The authentication manager.
338 | server_config: The server configuration.
339 | params: The parameters for listing epics.
340 |
341 | Returns:
342 | A list of epics.
343 | """
344 | # Unwrap and validate parameters
345 | result = _unwrap_and_validate_params(
346 | params,
347 | ListEpicsParams
348 | )
349 |
350 | if not result["success"]:
351 | return result
352 |
353 | validated_params = result["params"]
354 |
355 | # Build the query
356 | query_parts = []
357 |
358 | if validated_params.priority:
359 | query_parts.append(f"priority={validated_params.priority}")
360 | if validated_params.assignment_group:
361 | query_parts.append(f"assignment_group={validated_params.assignment_group}")
362 |
363 | # Handle timeframe filtering
364 | if validated_params.timeframe:
365 | now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
366 | if validated_params.timeframe == "upcoming":
367 | query_parts.append(f"start_date>{now}")
368 | elif validated_params.timeframe == "in-progress":
369 | query_parts.append(f"start_date<{now}^end_date>{now}")
370 | elif validated_params.timeframe == "completed":
371 | query_parts.append(f"end_date<{now}")
372 |
373 | # Add any additional query string
374 | if validated_params.query:
375 | query_parts.append(validated_params.query)
376 |
377 | # Combine query parts
378 | query = "^".join(query_parts) if query_parts else ""
379 |
380 | # Get the instance URL
381 | instance_url = _get_instance_url(auth_manager, server_config)
382 | if not instance_url:
383 | return {
384 | "success": False,
385 | "message": "Cannot find instance_url in either server_config or auth_manager",
386 | }
387 |
388 | # Get the headers
389 | headers = _get_headers(auth_manager, server_config)
390 | if not headers:
391 | return {
392 | "success": False,
393 | "message": "Cannot find get_headers method in either auth_manager or server_config",
394 | }
395 |
396 | # Make the API request
397 | url = f"{instance_url}/api/now/table/rm_epic"
398 |
399 | params = {
400 | "sysparm_limit": validated_params.limit,
401 | "sysparm_offset": validated_params.offset,
402 | "sysparm_query": query,
403 | "sysparm_display_value": "true",
404 | }
405 |
406 | try:
407 | response = requests.get(url, headers=headers, params=params)
408 | response.raise_for_status()
409 |
410 | result = response.json()
411 |
412 | # Handle the case where result["result"] is a list
413 | epics = result.get("result", [])
414 | count = len(epics)
415 |
416 | return {
417 | "success": True,
418 | "epics": epics,
419 | "count": count,
420 | "total": count, # Use count as total if total is not provided
421 | }
422 | except requests.exceptions.RequestException as e:
423 | logger.error(f"Error listing epics: {e}")
424 | return {
425 | "success": False,
426 | "message": f"Error listing epics: {str(e)}",
427 | }
428 |
```
--------------------------------------------------------------------------------
/tests/test_user_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for user management tools.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 |
8 | from servicenow_mcp.auth.auth_manager import AuthManager
9 | from servicenow_mcp.tools.user_tools import (
10 | AddGroupMembersParams,
11 | CreateGroupParams,
12 | CreateUserParams,
13 | GetUserParams,
14 | ListUsersParams,
15 | ListGroupsParams,
16 | RemoveGroupMembersParams,
17 | UpdateGroupParams,
18 | UpdateUserParams,
19 | add_group_members,
20 | create_group,
21 | create_user,
22 | get_user,
23 | list_users,
24 | list_groups,
25 | remove_group_members,
26 | update_group,
27 | update_user,
28 | )
29 | from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
30 |
31 |
32 | class TestUserTools(unittest.TestCase):
33 | """Tests for user management tools."""
34 |
35 | def setUp(self):
36 | """Set up test environment."""
37 | # Create config and auth manager
38 | self.config = ServerConfig(
39 | instance_url="https://example.service-now.com",
40 | auth=AuthConfig(
41 | type=AuthType.BASIC,
42 | basic=BasicAuthConfig(username="admin", password="password"),
43 | ),
44 | )
45 | self.auth_manager = AuthManager(self.config.auth)
46 |
47 | # Mock auth_manager.get_headers() method
48 | self.auth_manager.get_headers = MagicMock(return_value={"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="})
49 |
50 | @patch("requests.post")
51 | def test_create_user(self, mock_post):
52 | """Test create_user function."""
53 | # Configure mock
54 | mock_response = MagicMock()
55 | mock_response.raise_for_status = MagicMock()
56 | mock_response.json.return_value = {
57 | "result": {
58 | "sys_id": "user123",
59 | "user_name": "alice.radiology",
60 | }
61 | }
62 | mock_post.return_value = mock_response
63 |
64 | # Create test params
65 | params = CreateUserParams(
66 | user_name="alice.radiology",
67 | first_name="Alice",
68 | last_name="Radiology",
69 | email="[email protected]",
70 | department="Radiology",
71 | title="Doctor",
72 | )
73 |
74 | # Call function
75 | result = create_user(self.config, self.auth_manager, params)
76 |
77 | # Verify result
78 | self.assertTrue(result.success)
79 | self.assertEqual(result.user_id, "user123")
80 | self.assertEqual(result.user_name, "alice.radiology")
81 |
82 | # Verify mock was called correctly
83 | mock_post.assert_called_once()
84 | call_args = mock_post.call_args
85 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user")
86 | self.assertEqual(call_args[1]["json"]["user_name"], "alice.radiology")
87 | self.assertEqual(call_args[1]["json"]["first_name"], "Alice")
88 | self.assertEqual(call_args[1]["json"]["last_name"], "Radiology")
89 | self.assertEqual(call_args[1]["json"]["email"], "[email protected]")
90 | self.assertEqual(call_args[1]["json"]["department"], "Radiology")
91 | self.assertEqual(call_args[1]["json"]["title"], "Doctor")
92 |
93 | @patch("requests.patch")
94 | def test_update_user(self, mock_patch):
95 | """Test update_user function."""
96 | # Configure mock
97 | mock_response = MagicMock()
98 | mock_response.raise_for_status = MagicMock()
99 | mock_response.json.return_value = {
100 | "result": {
101 | "sys_id": "user123",
102 | "user_name": "alice.radiology",
103 | }
104 | }
105 | mock_patch.return_value = mock_response
106 |
107 | # Create test params
108 | params = UpdateUserParams(
109 | user_id="user123",
110 | manager="user456",
111 | title="Senior Doctor",
112 | )
113 |
114 | # Call function
115 | result = update_user(self.config, self.auth_manager, params)
116 |
117 | # Verify result
118 | self.assertTrue(result.success)
119 | self.assertEqual(result.user_id, "user123")
120 | self.assertEqual(result.user_name, "alice.radiology")
121 |
122 | # Verify mock was called correctly
123 | mock_patch.assert_called_once()
124 | call_args = mock_patch.call_args
125 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user/user123")
126 | self.assertEqual(call_args[1]["json"]["manager"], "user456")
127 | self.assertEqual(call_args[1]["json"]["title"], "Senior Doctor")
128 |
129 | @patch("requests.get")
130 | def test_get_user(self, mock_get):
131 | """Test get_user function."""
132 | # Configure mock
133 | mock_response = MagicMock()
134 | mock_response.raise_for_status = MagicMock()
135 | mock_response.json.return_value = {
136 | "result": [
137 | {
138 | "sys_id": "user123",
139 | "user_name": "alice.radiology",
140 | "first_name": "Alice",
141 | "last_name": "Radiology",
142 | "email": "[email protected]",
143 | }
144 | ]
145 | }
146 | mock_get.return_value = mock_response
147 |
148 | # Create test params
149 | params = GetUserParams(
150 | user_name="alice.radiology",
151 | )
152 |
153 | # Call function
154 | result = get_user(self.config, self.auth_manager, params)
155 |
156 | # Verify result
157 | self.assertTrue(result["success"])
158 | self.assertEqual(result["user"]["sys_id"], "user123")
159 | self.assertEqual(result["user"]["user_name"], "alice.radiology")
160 |
161 | # Verify mock was called correctly
162 | mock_get.assert_called_once()
163 | call_args = mock_get.call_args
164 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user")
165 | self.assertEqual(call_args[1]["params"]["sysparm_query"], "user_name=alice.radiology")
166 |
167 | @patch("requests.get")
168 | def test_list_users(self, mock_get):
169 | """Test list_users function."""
170 | # Configure mock
171 | mock_response = MagicMock()
172 | mock_response.raise_for_status = MagicMock()
173 | mock_response.json.return_value = {
174 | "result": [
175 | {
176 | "sys_id": "user123",
177 | "user_name": "alice.radiology",
178 | },
179 | {
180 | "sys_id": "user456",
181 | "user_name": "bob.chiefradiology",
182 | }
183 | ]
184 | }
185 | mock_get.return_value = mock_response
186 |
187 | # Create test params
188 | params = ListUsersParams(
189 | department="Radiology",
190 | limit=10,
191 | )
192 |
193 | # Call function
194 | result = list_users(self.config, self.auth_manager, params)
195 |
196 | # Verify result
197 | self.assertTrue(result["success"])
198 | self.assertEqual(len(result["users"]), 2)
199 | self.assertEqual(result["users"][0]["sys_id"], "user123")
200 | self.assertEqual(result["users"][1]["sys_id"], "user456")
201 |
202 | # Verify mock was called correctly
203 | mock_get.assert_called_once()
204 | call_args = mock_get.call_args
205 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user")
206 | self.assertEqual(call_args[1]["params"]["sysparm_limit"], "10")
207 | self.assertIn("department=Radiology", call_args[1]["params"]["sysparm_query"])
208 |
209 | @patch("requests.get")
210 | def test_list_groups(self, mock_get):
211 | """Test list_groups function."""
212 | # Configure mock
213 | mock_response = MagicMock()
214 | mock_response.raise_for_status = MagicMock()
215 | mock_response.json.return_value = {
216 | "result": [
217 | {
218 | "sys_id": "group123",
219 | "name": "IT Support",
220 | "description": "IT support team",
221 | "active": "true",
222 | "type": "it"
223 | },
224 | {
225 | "sys_id": "group456",
226 | "name": "HR Team",
227 | "description": "Human Resources team",
228 | "active": "true",
229 | "type": "administrative"
230 | }
231 | ]
232 | }
233 | mock_get.return_value = mock_response
234 |
235 | # Create test params
236 | params = ListGroupsParams(
237 | active=True,
238 | type="it",
239 | query="support",
240 | limit=10,
241 | )
242 |
243 | # Call function
244 | result = list_groups(self.config, self.auth_manager, params)
245 |
246 | # Verify result
247 | self.assertTrue(result["success"])
248 | self.assertEqual(len(result["groups"]), 2)
249 | self.assertEqual(result["groups"][0]["sys_id"], "group123")
250 | self.assertEqual(result["groups"][1]["sys_id"], "group456")
251 | self.assertEqual(result["count"], 2)
252 |
253 | # Verify mock was called correctly
254 | mock_get.assert_called_once()
255 | call_args = mock_get.call_args
256 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_group")
257 | self.assertEqual(call_args[1]["params"]["sysparm_limit"], "10")
258 | self.assertEqual(call_args[1]["params"]["sysparm_offset"], "0")
259 | self.assertEqual(call_args[1]["params"]["sysparm_display_value"], "true")
260 | self.assertIn("active=true", call_args[1]["params"]["sysparm_query"])
261 | self.assertIn("type=it", call_args[1]["params"]["sysparm_query"])
262 | self.assertIn("nameLIKE", call_args[1]["params"]["sysparm_query"])
263 | self.assertIn("descriptionLIKE", call_args[1]["params"]["sysparm_query"])
264 |
265 | @patch("requests.post")
266 | def test_create_group(self, mock_post):
267 | """Test create_group function."""
268 | # Configure mock
269 | mock_response = MagicMock()
270 | mock_response.raise_for_status = MagicMock()
271 | mock_response.json.return_value = {
272 | "result": {
273 | "sys_id": "group123",
274 | "name": "Biomedical Engineering",
275 | }
276 | }
277 | mock_post.return_value = mock_response
278 |
279 | # Create test params
280 | params = CreateGroupParams(
281 | name="Biomedical Engineering",
282 | description="Group for biomedical engineering staff",
283 | manager="user456",
284 | )
285 |
286 | # Call function
287 | result = create_group(self.config, self.auth_manager, params)
288 |
289 | # Verify result
290 | self.assertTrue(result.success)
291 | self.assertEqual(result.group_id, "group123")
292 | self.assertEqual(result.group_name, "Biomedical Engineering")
293 |
294 | # Verify mock was called correctly
295 | mock_post.assert_called_once()
296 | call_args = mock_post.call_args
297 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_group")
298 | self.assertEqual(call_args[1]["json"]["name"], "Biomedical Engineering")
299 | self.assertEqual(call_args[1]["json"]["description"], "Group for biomedical engineering staff")
300 | self.assertEqual(call_args[1]["json"]["manager"], "user456")
301 |
302 | @patch("requests.patch")
303 | def test_update_group(self, mock_patch):
304 | """Test update_group function."""
305 | # Configure mock
306 | mock_response = MagicMock()
307 | mock_response.raise_for_status = MagicMock()
308 | mock_response.json.return_value = {
309 | "result": {
310 | "sys_id": "group123",
311 | "name": "Biomedical Engineering",
312 | }
313 | }
314 | mock_patch.return_value = mock_response
315 |
316 | # Create test params
317 | params = UpdateGroupParams(
318 | group_id="group123",
319 | description="Updated description for biomedical engineering group",
320 | manager="user789",
321 | )
322 |
323 | # Call function
324 | result = update_group(self.config, self.auth_manager, params)
325 |
326 | # Verify result
327 | self.assertTrue(result.success)
328 | self.assertEqual(result.group_id, "group123")
329 | self.assertEqual(result.group_name, "Biomedical Engineering")
330 |
331 | # Verify mock was called correctly
332 | mock_patch.assert_called_once()
333 | call_args = mock_patch.call_args
334 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_group/group123")
335 | self.assertEqual(call_args[1]["json"]["description"], "Updated description for biomedical engineering group")
336 | self.assertEqual(call_args[1]["json"]["manager"], "user789")
337 |
338 | @patch("servicenow_mcp.tools.user_tools.get_user")
339 | @patch("requests.post")
340 | def test_add_group_members(self, mock_post, mock_get_user):
341 | """Test add_group_members function."""
342 | # Configure mocks
343 | mock_post_response = MagicMock()
344 | mock_post_response.raise_for_status = MagicMock()
345 | mock_post.return_value = mock_post_response
346 |
347 | mock_get_user.return_value = {
348 | "success": True,
349 | "message": "User found",
350 | "user": {
351 | "sys_id": "user123",
352 | "user_name": "alice.radiology",
353 | }
354 | }
355 |
356 | # Create test params
357 | params = AddGroupMembersParams(
358 | group_id="group123",
359 | members=["alice.radiology", "admin"],
360 | )
361 |
362 | # Call function
363 | result = add_group_members(self.config, self.auth_manager, params)
364 |
365 | # Verify result
366 | self.assertTrue(result.success)
367 | self.assertEqual(result.group_id, "group123")
368 |
369 | # Verify mock was called correctly
370 | self.assertEqual(mock_post.call_count, 2) # Once for each member
371 | call_args = mock_post.call_args_list[0]
372 | self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_grmember")
373 | self.assertEqual(call_args[1]["json"]["group"], "group123")
374 | self.assertEqual(call_args[1]["json"]["user"], "user123")
375 |
376 | @patch("servicenow_mcp.tools.user_tools.get_user")
377 | @patch("requests.get")
378 | @patch("requests.delete")
379 | def test_remove_group_members(self, mock_delete, mock_get, mock_get_user):
380 | """Test remove_group_members function."""
381 | # Configure mocks
382 | mock_delete_response = MagicMock()
383 | mock_delete_response.raise_for_status = MagicMock()
384 | mock_delete.return_value = mock_delete_response
385 |
386 | mock_get_response = MagicMock()
387 | mock_get_response.raise_for_status = MagicMock()
388 | mock_get_response.json.return_value = {
389 | "result": [
390 | {
391 | "sys_id": "member123",
392 | "user": {
393 | "value": "user123",
394 | "display_value": "Alice Radiology",
395 | },
396 | "group": {
397 | "value": "group123",
398 | "display_value": "Biomedical Engineering",
399 | },
400 | }
401 | ]
402 | }
403 | mock_get.return_value = mock_get_response
404 |
405 | mock_get_user.return_value = {
406 | "success": True,
407 | "message": "User found",
408 | "user": {
409 | "sys_id": "user123",
410 | "user_name": "alice.radiology",
411 | }
412 | }
413 |
414 | # Create test params
415 | params = RemoveGroupMembersParams(
416 | group_id="group123",
417 | members=["alice.radiology"],
418 | )
419 |
420 | # Call function
421 | result = remove_group_members(self.config, self.auth_manager, params)
422 |
423 | # Verify result
424 | self.assertTrue(result.success)
425 | self.assertEqual(result.group_id, "group123")
426 |
427 | # Verify mock was called correctly
428 | mock_get.assert_called_once()
429 | get_call_args = mock_get.call_args
430 | self.assertEqual(get_call_args[0][0], f"{self.config.api_url}/table/sys_user_grmember")
431 | self.assertEqual(get_call_args[1]["params"]["sysparm_query"], "group=group123^user=user123")
432 |
433 | mock_delete.assert_called_once()
434 | delete_call_args = mock_delete.call_args
435 | self.assertEqual(delete_call_args[0][0], f"{self.config.api_url}/table/sys_user_grmember/member123")
436 |
437 |
438 | if __name__ == "__main__":
439 | unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_catalog_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the ServiceNow MCP catalog tools.
3 | """
4 |
5 | import unittest
6 | from unittest.mock import MagicMock, patch
7 |
8 | import requests
9 |
10 | from servicenow_mcp.auth.auth_manager import AuthManager
11 | from servicenow_mcp.tools.catalog_tools import (
12 | GetCatalogItemParams,
13 | ListCatalogCategoriesParams,
14 | ListCatalogItemsParams,
15 | CreateCatalogCategoryParams,
16 | UpdateCatalogCategoryParams,
17 | MoveCatalogItemsParams,
18 | get_catalog_item,
19 | get_catalog_item_variables,
20 | list_catalog_categories,
21 | list_catalog_items,
22 | create_catalog_category,
23 | update_catalog_category,
24 | move_catalog_items,
25 | )
26 | from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
27 |
28 |
29 | class TestCatalogTools(unittest.TestCase):
30 | """Test cases for the catalog tools."""
31 |
32 | def setUp(self):
33 | """Set up test fixtures."""
34 | # Create a mock server config
35 | self.config = ServerConfig(
36 | instance_url="https://example.service-now.com",
37 | auth=AuthConfig(
38 | type=AuthType.BASIC,
39 | basic=BasicAuthConfig(username="admin", password="password"),
40 | ),
41 | )
42 |
43 | # Create a mock auth manager
44 | self.auth_manager = MagicMock(spec=AuthManager)
45 | self.auth_manager.get_headers.return_value = {"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="}
46 |
47 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
48 | def test_list_catalog_items(self, mock_get):
49 | """Test listing catalog items."""
50 | # Mock the response from ServiceNow
51 | mock_response = MagicMock()
52 | mock_response.json.return_value = {
53 | "result": [
54 | {
55 | "sys_id": "item1",
56 | "name": "Laptop",
57 | "short_description": "Request a new laptop",
58 | "category": "Hardware",
59 | "price": "1000",
60 | "picture": "laptop.jpg",
61 | "active": "true",
62 | "order": "100",
63 | }
64 | ]
65 | }
66 | mock_response.raise_for_status = MagicMock()
67 | mock_get.return_value = mock_response
68 |
69 | # Call the function
70 | params = ListCatalogItemsParams(
71 | limit=10,
72 | offset=0,
73 | category="Hardware",
74 | query="laptop",
75 | active=True,
76 | )
77 | result = list_catalog_items(self.config, self.auth_manager, params)
78 |
79 | # Check the result
80 | self.assertTrue(result["success"])
81 | self.assertEqual(len(result["items"]), 1)
82 | self.assertEqual(result["items"][0]["name"], "Laptop")
83 | self.assertEqual(result["items"][0]["category"], "Hardware")
84 |
85 | # Check that the correct URL and parameters were used
86 | mock_get.assert_called_once()
87 | args, kwargs = mock_get.call_args
88 | self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_cat_item")
89 | self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
90 | self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
91 | self.assertIn("sysparm_query", kwargs["params"])
92 | self.assertIn("active=true", kwargs["params"]["sysparm_query"])
93 | self.assertIn("category=Hardware", kwargs["params"]["sysparm_query"])
94 | self.assertIn("short_descriptionLIKElaptop^ORnameLIKElaptop", kwargs["params"]["sysparm_query"])
95 |
96 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
97 | def test_list_catalog_items_error(self, mock_get):
98 | """Test listing catalog items with an error."""
99 | # Mock the response from ServiceNow
100 | mock_get.side_effect = requests.exceptions.RequestException("Error")
101 |
102 | # Call the function
103 | params = ListCatalogItemsParams(
104 | limit=10,
105 | offset=0,
106 | )
107 | result = list_catalog_items(self.config, self.auth_manager, params)
108 |
109 | # Check the result
110 | self.assertFalse(result["success"])
111 | self.assertEqual(len(result["items"]), 0)
112 | self.assertIn("Error", result["message"])
113 |
114 | @patch("servicenow_mcp.tools.catalog_tools.get_catalog_item_variables")
115 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
116 | def test_get_catalog_item(self, mock_get, mock_get_variables):
117 | """Test getting a specific catalog item."""
118 | # Mock the response from ServiceNow
119 | mock_response = MagicMock()
120 | mock_response.json.return_value = {
121 | "result": {
122 | "sys_id": "item1",
123 | "name": "Laptop",
124 | "short_description": "Request a new laptop",
125 | "description": "Request a new laptop for work",
126 | "category": "Hardware",
127 | "price": "1000",
128 | "picture": "laptop.jpg",
129 | "active": "true",
130 | "order": "100",
131 | "delivery_time": "3 days",
132 | "availability": "In Stock",
133 | }
134 | }
135 | mock_response.raise_for_status = MagicMock()
136 | mock_get.return_value = mock_response
137 |
138 | # Mock the variables
139 | mock_get_variables.return_value = [
140 | {
141 | "sys_id": "var1",
142 | "name": "model",
143 | "label": "Laptop Model",
144 | "type": "string",
145 | "mandatory": "true",
146 | "default_value": "MacBook Pro",
147 | "help_text": "Select the laptop model",
148 | "order": "100",
149 | }
150 | ]
151 |
152 | # Call the function
153 | params = GetCatalogItemParams(item_id="item1")
154 | result = get_catalog_item(self.config, self.auth_manager, params)
155 |
156 | # Check the result
157 | self.assertTrue(result.success)
158 | self.assertEqual(result.data["name"], "Laptop")
159 | self.assertEqual(result.data["category"], "Hardware")
160 | self.assertEqual(len(result.data["variables"]), 1)
161 | self.assertEqual(result.data["variables"][0]["name"], "model")
162 |
163 | # Check that the correct URL and parameters were used
164 | mock_get.assert_called_once()
165 | args, kwargs = mock_get.call_args
166 | self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_cat_item/item1")
167 |
168 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
169 | def test_get_catalog_item_not_found(self, mock_get):
170 | """Test getting a catalog item that doesn't exist."""
171 | # Mock the response from ServiceNow
172 | mock_response = MagicMock()
173 | mock_response.json.return_value = {"result": {}}
174 | mock_response.raise_for_status = MagicMock()
175 | mock_get.return_value = mock_response
176 |
177 | # Call the function
178 | params = GetCatalogItemParams(item_id="nonexistent")
179 | result = get_catalog_item(self.config, self.auth_manager, params)
180 |
181 | # Check the result
182 | self.assertFalse(result.success)
183 | self.assertIn("not found", result.message)
184 | self.assertIsNone(result.data)
185 |
186 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
187 | def test_get_catalog_item_error(self, mock_get):
188 | """Test getting a catalog item with an error."""
189 | # Mock the response from ServiceNow
190 | mock_get.side_effect = requests.exceptions.RequestException("Error")
191 |
192 | # Call the function
193 | params = GetCatalogItemParams(item_id="item1")
194 | result = get_catalog_item(self.config, self.auth_manager, params)
195 |
196 | # Check the result
197 | self.assertFalse(result.success)
198 | self.assertIn("Error", result.message)
199 | self.assertIsNone(result.data)
200 |
201 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
202 | def test_get_catalog_item_variables(self, mock_get):
203 | """Test getting variables for a catalog item."""
204 | # Mock the response from ServiceNow
205 | mock_response = MagicMock()
206 | mock_response.json.return_value = {
207 | "result": [
208 | {
209 | "sys_id": "var1",
210 | "name": "model",
211 | "question_text": "Laptop Model",
212 | "type": "string",
213 | "mandatory": "true",
214 | "default_value": "MacBook Pro",
215 | "help_text": "Select the laptop model",
216 | "order": "100",
217 | }
218 | ]
219 | }
220 | mock_response.raise_for_status = MagicMock()
221 | mock_get.return_value = mock_response
222 |
223 | # Call the function
224 | result = get_catalog_item_variables(self.config, self.auth_manager, "item1")
225 |
226 | # Check the result
227 | self.assertEqual(len(result), 1)
228 | self.assertEqual(result[0]["name"], "model")
229 | self.assertEqual(result[0]["label"], "Laptop Model")
230 | self.assertEqual(result[0]["type"], "string")
231 |
232 | # Check that the correct URL and parameters were used
233 | mock_get.assert_called_once()
234 | args, kwargs = mock_get.call_args
235 | self.assertEqual(args[0], "https://example.service-now.com/api/now/table/item_option_new")
236 | self.assertEqual(kwargs["params"]["sysparm_query"], "cat_item=item1^ORDERBYorder")
237 |
238 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
239 | def test_get_catalog_item_variables_error(self, mock_get):
240 | """Test getting variables for a catalog item with an error."""
241 | # Mock the response from ServiceNow
242 | mock_get.side_effect = requests.exceptions.RequestException("Error")
243 |
244 | # Call the function
245 | result = get_catalog_item_variables(self.config, self.auth_manager, "item1")
246 |
247 | # Check the result
248 | self.assertEqual(len(result), 0)
249 |
250 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
251 | def test_list_catalog_categories(self, mock_get):
252 | """Test listing catalog categories."""
253 | # Mock the response from ServiceNow
254 | mock_response = MagicMock()
255 | mock_response.json.return_value = {
256 | "result": [
257 | {
258 | "sys_id": "cat1",
259 | "title": "Hardware",
260 | "description": "Hardware requests",
261 | "parent": "",
262 | "icon": "hardware.png",
263 | "active": "true",
264 | "order": "100",
265 | }
266 | ]
267 | }
268 | mock_response.raise_for_status = MagicMock()
269 | mock_get.return_value = mock_response
270 |
271 | # Call the function
272 | params = ListCatalogCategoriesParams(
273 | limit=10,
274 | offset=0,
275 | query="hardware",
276 | active=True,
277 | )
278 | result = list_catalog_categories(self.config, self.auth_manager, params)
279 |
280 | # Check the result
281 | self.assertTrue(result["success"])
282 | self.assertEqual(len(result["categories"]), 1)
283 | self.assertEqual(result["categories"][0]["title"], "Hardware")
284 | self.assertEqual(result["categories"][0]["description"], "Hardware requests")
285 |
286 | # Check that the correct URL and parameters were used
287 | mock_get.assert_called_once()
288 | args, kwargs = mock_get.call_args
289 | self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_category")
290 | self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
291 | self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
292 | self.assertIn("sysparm_query", kwargs["params"])
293 | self.assertIn("active=true", kwargs["params"]["sysparm_query"])
294 | self.assertIn("titleLIKEhardware^ORdescriptionLIKEhardware", kwargs["params"]["sysparm_query"])
295 |
296 | @patch("servicenow_mcp.tools.catalog_tools.requests.get")
297 | def test_list_catalog_categories_error(self, mock_get):
298 | """Test listing catalog categories with an error."""
299 | # Mock the response from ServiceNow
300 | mock_get.side_effect = requests.exceptions.RequestException("Error")
301 |
302 | # Call the function
303 | params = ListCatalogCategoriesParams(
304 | limit=10,
305 | offset=0,
306 | )
307 | result = list_catalog_categories(self.config, self.auth_manager, params)
308 |
309 | # Check the result
310 | self.assertFalse(result["success"])
311 | self.assertEqual(len(result["categories"]), 0)
312 | self.assertIn("Error", result["message"])
313 |
314 | @patch("requests.post")
315 | def test_create_catalog_category(self, mock_post):
316 | """Test creating a catalog category."""
317 | # Mock response
318 | mock_response = MagicMock()
319 | mock_response.json.return_value = {
320 | "result": {
321 | "sys_id": "test_sys_id",
322 | "title": "Test Category",
323 | "description": "Test Description",
324 | "parent": "",
325 | "icon": "icon-test",
326 | "active": "true",
327 | "order": "100",
328 | }
329 | }
330 | mock_post.return_value = mock_response
331 |
332 | # Create params
333 | params = CreateCatalogCategoryParams(
334 | title="Test Category",
335 | description="Test Description",
336 | icon="icon-test",
337 | active=True,
338 | order=100,
339 | )
340 |
341 | # Call function
342 | result = create_catalog_category(self.config, self.auth_manager, params)
343 |
344 | # Verify result
345 | self.assertTrue(result.success)
346 | self.assertEqual(result.data["title"], "Test Category")
347 | self.assertEqual(result.data["sys_id"], "test_sys_id")
348 |
349 | # Verify request
350 | mock_post.assert_called_once()
351 | args, kwargs = mock_post.call_args
352 | self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_category")
353 | self.assertEqual(kwargs["json"]["title"], "Test Category")
354 | self.assertEqual(kwargs["json"]["description"], "Test Description")
355 |
356 | @patch("requests.patch")
357 | def test_update_catalog_category(self, mock_patch):
358 | """Test updating a catalog category."""
359 | # Mock response
360 | mock_response = MagicMock()
361 | mock_response.json.return_value = {
362 | "result": {
363 | "sys_id": "test_sys_id",
364 | "title": "Updated Category",
365 | "description": "Updated Description",
366 | "parent": "",
367 | "icon": "icon-test",
368 | "active": "true",
369 | "order": "200",
370 | }
371 | }
372 | mock_patch.return_value = mock_response
373 |
374 | # Create params
375 | params = UpdateCatalogCategoryParams(
376 | category_id="test_sys_id",
377 | title="Updated Category",
378 | description="Updated Description",
379 | order=200,
380 | )
381 |
382 | # Call function
383 | result = update_catalog_category(self.config, self.auth_manager, params)
384 |
385 | # Verify result
386 | self.assertTrue(result.success)
387 | self.assertEqual(result.data["title"], "Updated Category")
388 | self.assertEqual(result.data["description"], "Updated Description")
389 | self.assertEqual(result.data["order"], "200")
390 |
391 | # Verify request
392 | mock_patch.assert_called_once()
393 | args, kwargs = mock_patch.call_args
394 | self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_category/test_sys_id")
395 | self.assertEqual(kwargs["json"]["title"], "Updated Category")
396 | self.assertEqual(kwargs["json"]["description"], "Updated Description")
397 | self.assertEqual(kwargs["json"]["order"], "200")
398 |
399 | @patch("requests.patch")
400 | def test_move_catalog_items(self, mock_patch):
401 | """Test moving catalog items."""
402 | # Mock response
403 | mock_response = MagicMock()
404 | mock_response.json.return_value = {"result": {"sys_id": "item_id", "category": "target_category_id"}}
405 | mock_patch.return_value = mock_response
406 |
407 | # Create params
408 | params = MoveCatalogItemsParams(
409 | item_ids=["item1", "item2", "item3"],
410 | target_category_id="target_category_id",
411 | )
412 |
413 | # Call function
414 | result = move_catalog_items(self.config, self.auth_manager, params)
415 |
416 | # Verify result
417 | self.assertTrue(result.success)
418 | self.assertEqual(result.data["moved_items_count"], 3)
419 |
420 | # Verify request
421 | self.assertEqual(mock_patch.call_count, 3)
422 | for i, call in enumerate(mock_patch.call_args_list):
423 | args, kwargs = call
424 | self.assertEqual(
425 | args[0],
426 | f"https://example.service-now.com/api/now/table/sc_cat_item/{params.item_ids[i]}"
427 | )
428 | self.assertEqual(kwargs["json"]["category"], "target_category_id")
429 |
430 |
431 | if __name__ == "__main__":
432 | unittest.main()
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/script_include_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Script Include tools for the ServiceNow MCP server.
3 |
4 | This module provides tools for managing script includes in ServiceNow.
5 | """
6 |
7 | import logging
8 | from typing import Any, Dict, Optional
9 |
10 | import requests
11 | from pydantic import BaseModel, Field
12 |
13 | from servicenow_mcp.auth.auth_manager import AuthManager
14 | from servicenow_mcp.utils.config import ServerConfig
15 |
16 | logger = logging.getLogger(__name__)
17 |
18 |
19 | class ListScriptIncludesParams(BaseModel):
20 | """Parameters for listing script includes."""
21 |
22 | limit: int = Field(10, description="Maximum number of script includes to return")
23 | offset: int = Field(0, description="Offset for pagination")
24 | active: Optional[bool] = Field(None, description="Filter by active status")
25 | client_callable: Optional[bool] = Field(None, description="Filter by client callable status")
26 | query: Optional[str] = Field(None, description="Search query for script includes")
27 |
28 |
29 | class GetScriptIncludeParams(BaseModel):
30 | """Parameters for getting a script include."""
31 |
32 | script_include_id: str = Field(..., description="Script include ID or name")
33 |
34 |
35 | class CreateScriptIncludeParams(BaseModel):
36 | """Parameters for creating a script include."""
37 |
38 | name: str = Field(..., description="Name of the script include")
39 | script: str = Field(..., description="Script content")
40 | description: Optional[str] = Field(None, description="Description of the script include")
41 | api_name: Optional[str] = Field(None, description="API name of the script include")
42 | client_callable: bool = Field(False, description="Whether the script include is client callable")
43 | active: bool = Field(True, description="Whether the script include is active")
44 | access: str = Field("package_private", description="Access level of the script include")
45 |
46 |
47 | class UpdateScriptIncludeParams(BaseModel):
48 | """Parameters for updating a script include."""
49 |
50 | script_include_id: str = Field(..., description="Script include ID or name")
51 | script: Optional[str] = Field(None, description="Script content")
52 | description: Optional[str] = Field(None, description="Description of the script include")
53 | api_name: Optional[str] = Field(None, description="API name of the script include")
54 | client_callable: Optional[bool] = Field(None, description="Whether the script include is client callable")
55 | active: Optional[bool] = Field(None, description="Whether the script include is active")
56 | access: Optional[str] = Field(None, description="Access level of the script include")
57 |
58 |
59 | class DeleteScriptIncludeParams(BaseModel):
60 | """Parameters for deleting a script include."""
61 |
62 | script_include_id: str = Field(..., description="Script include ID or name")
63 |
64 |
65 | class ScriptIncludeResponse(BaseModel):
66 | """Response from script include operations."""
67 |
68 | success: bool = Field(..., description="Whether the operation was successful")
69 | message: str = Field(..., description="Message describing the result")
70 | script_include_id: Optional[str] = Field(None, description="ID of the affected script include")
71 | script_include_name: Optional[str] = Field(None, description="Name of the affected script include")
72 |
73 |
74 | def list_script_includes(
75 | config: ServerConfig,
76 | auth_manager: AuthManager,
77 | params: ListScriptIncludesParams,
78 | ) -> Dict[str, Any]:
79 | """List script includes from ServiceNow.
80 |
81 | Args:
82 | config: The server configuration.
83 | auth_manager: The authentication manager.
84 | params: The parameters for the request.
85 |
86 | Returns:
87 | A dictionary containing the list of script includes.
88 | """
89 | try:
90 | # Build the URL
91 | url = f"{config.instance_url}/api/now/table/sys_script_include"
92 |
93 | # Build query parameters
94 | query_params = {
95 | "sysparm_limit": params.limit,
96 | "sysparm_offset": params.offset,
97 | "sysparm_display_value": "true",
98 | "sysparm_exclude_reference_link": "true",
99 | "sysparm_fields": "sys_id,name,script,description,api_name,client_callable,active,access,sys_created_on,sys_updated_on,sys_created_by,sys_updated_by"
100 | }
101 |
102 | # Add filters if provided
103 | query_parts = []
104 |
105 | if params.active is not None:
106 | query_parts.append(f"active={str(params.active).lower()}")
107 |
108 | if params.client_callable is not None:
109 | query_parts.append(f"client_callable={str(params.client_callable).lower()}")
110 |
111 | if params.query:
112 | query_parts.append(f"nameLIKE{params.query}")
113 |
114 | if query_parts:
115 | query_params["sysparm_query"] = "^".join(query_parts)
116 |
117 | # Make the request
118 | headers = auth_manager.get_headers()
119 |
120 | response = requests.get(
121 | url,
122 | params=query_params,
123 | headers=headers,
124 | timeout=30,
125 | )
126 | response.raise_for_status()
127 |
128 | # Parse the response
129 | data = response.json()
130 | script_includes = []
131 |
132 | for item in data.get("result", []):
133 | script_include = {
134 | "sys_id": item.get("sys_id"),
135 | "name": item.get("name"),
136 | "description": item.get("description"),
137 | "api_name": item.get("api_name"),
138 | "client_callable": item.get("client_callable") == "true",
139 | "active": item.get("active") == "true",
140 | "access": item.get("access"),
141 | "created_on": item.get("sys_created_on"),
142 | "updated_on": item.get("sys_updated_on"),
143 | "created_by": item.get("sys_created_by", {}).get("display_value"),
144 | "updated_by": item.get("sys_updated_by", {}).get("display_value"),
145 | }
146 | script_includes.append(script_include)
147 |
148 | return {
149 | "success": True,
150 | "message": f"Found {len(script_includes)} script includes",
151 | "script_includes": script_includes,
152 | "total": len(script_includes),
153 | "limit": params.limit,
154 | "offset": params.offset,
155 | }
156 |
157 | except Exception as e:
158 | logger.error(f"Error listing script includes: {e}")
159 | return {
160 | "success": False,
161 | "message": f"Error listing script includes: {str(e)}",
162 | "script_includes": [],
163 | "total": 0,
164 | "limit": params.limit,
165 | "offset": params.offset,
166 | }
167 |
168 |
169 | def get_script_include(
170 | config: ServerConfig,
171 | auth_manager: AuthManager,
172 | params: GetScriptIncludeParams,
173 | ) -> Dict[str, Any]:
174 | """Get a specific script include from ServiceNow.
175 |
176 | Args:
177 | config: The server configuration.
178 | auth_manager: The authentication manager.
179 | params: The parameters for the request.
180 |
181 | Returns:
182 | A dictionary containing the script include data.
183 | """
184 | try:
185 | # Build query parameters
186 | query_params = {
187 | "sysparm_display_value": "true",
188 | "sysparm_exclude_reference_link": "true",
189 | "sysparm_fields": "sys_id,name,script,description,api_name,client_callable,active,access,sys_created_on,sys_updated_on,sys_created_by,sys_updated_by"
190 | }
191 |
192 | # Determine if we're querying by sys_id or name
193 | if params.script_include_id.startswith("sys_id:"):
194 | sys_id = params.script_include_id.replace("sys_id:", "")
195 | url = f"{config.instance_url}/api/now/table/sys_script_include/{sys_id}"
196 | else:
197 | # Query by name
198 | url = f"{config.instance_url}/api/now/table/sys_script_include"
199 | query_params["sysparm_query"] = f"name={params.script_include_id}"
200 |
201 | # Make the request
202 | headers = auth_manager.get_headers()
203 |
204 | response = requests.get(
205 | url,
206 | params=query_params,
207 | headers=headers,
208 | timeout=30,
209 | )
210 | response.raise_for_status()
211 |
212 | # Parse the response
213 | data = response.json()
214 |
215 | if "result" not in data:
216 | return {
217 | "success": False,
218 | "message": f"Script include not found: {params.script_include_id}",
219 | }
220 |
221 | # Handle both single result and list of results
222 | result = data["result"]
223 | if isinstance(result, list):
224 | if not result:
225 | return {
226 | "success": False,
227 | "message": f"Script include not found: {params.script_include_id}",
228 | }
229 | item = result[0]
230 | else:
231 | item = result
232 |
233 | script_include = {
234 | "sys_id": item.get("sys_id"),
235 | "name": item.get("name"),
236 | "script": item.get("script"),
237 | "description": item.get("description"),
238 | "api_name": item.get("api_name"),
239 | "client_callable": item.get("client_callable") == "true",
240 | "active": item.get("active") == "true",
241 | "access": item.get("access"),
242 | "created_on": item.get("sys_created_on"),
243 | "updated_on": item.get("sys_updated_on"),
244 | "created_by": item.get("sys_created_by", {}).get("display_value"),
245 | "updated_by": item.get("sys_updated_by", {}).get("display_value"),
246 | }
247 |
248 | return {
249 | "success": True,
250 | "message": f"Found script include: {item.get('name')}",
251 | "script_include": script_include,
252 | }
253 |
254 | except Exception as e:
255 | logger.error(f"Error getting script include: {e}")
256 | return {
257 | "success": False,
258 | "message": f"Error getting script include: {str(e)}",
259 | }
260 |
261 |
262 | def create_script_include(
263 | config: ServerConfig,
264 | auth_manager: AuthManager,
265 | params: CreateScriptIncludeParams,
266 | ) -> ScriptIncludeResponse:
267 | """Create a new script include in ServiceNow.
268 |
269 | Args:
270 | config: The server configuration.
271 | auth_manager: The authentication manager.
272 | params: The parameters for the request.
273 |
274 | Returns:
275 | A response indicating the result of the operation.
276 | """
277 | # Build the URL
278 | url = f"{config.instance_url}/api/now/table/sys_script_include"
279 |
280 | # Build the request body
281 | body = {
282 | "name": params.name,
283 | "script": params.script,
284 | "active": str(params.active).lower(),
285 | "client_callable": str(params.client_callable).lower(),
286 | "access": params.access,
287 | }
288 |
289 | if params.description:
290 | body["description"] = params.description
291 |
292 | if params.api_name:
293 | body["api_name"] = params.api_name
294 |
295 | # Make the request
296 | headers = auth_manager.get_headers()
297 |
298 | try:
299 | response = requests.post(
300 | url,
301 | json=body,
302 | headers=headers,
303 | timeout=30,
304 | )
305 | response.raise_for_status()
306 |
307 | # Parse the response
308 | data = response.json()
309 |
310 | if "result" not in data:
311 | return ScriptIncludeResponse(
312 | success=False,
313 | message="Failed to create script include",
314 | )
315 |
316 | result = data["result"]
317 |
318 | return ScriptIncludeResponse(
319 | success=True,
320 | message=f"Created script include: {result.get('name')}",
321 | script_include_id=result.get("sys_id"),
322 | script_include_name=result.get("name"),
323 | )
324 |
325 | except Exception as e:
326 | logger.error(f"Error creating script include: {e}")
327 | return ScriptIncludeResponse(
328 | success=False,
329 | message=f"Error creating script include: {str(e)}",
330 | )
331 |
332 |
333 | def update_script_include(
334 | config: ServerConfig,
335 | auth_manager: AuthManager,
336 | params: UpdateScriptIncludeParams,
337 | ) -> ScriptIncludeResponse:
338 | """Update an existing script include in ServiceNow.
339 |
340 | Args:
341 | config: The server configuration.
342 | auth_manager: The authentication manager.
343 | params: The parameters for the request.
344 |
345 | Returns:
346 | A response indicating the result of the operation.
347 | """
348 | # First, get the script include to update
349 | get_params = GetScriptIncludeParams(script_include_id=params.script_include_id)
350 | get_result = get_script_include(config, auth_manager, get_params)
351 |
352 | if not get_result["success"]:
353 | return ScriptIncludeResponse(
354 | success=False,
355 | message=get_result["message"],
356 | )
357 |
358 | script_include = get_result["script_include"]
359 | sys_id = script_include["sys_id"]
360 |
361 | # Build the URL
362 | url = f"{config.instance_url}/api/now/table/sys_script_include/{sys_id}"
363 |
364 | # Build the request body
365 | body = {}
366 |
367 | if params.script is not None:
368 | body["script"] = params.script
369 |
370 | if params.description is not None:
371 | body["description"] = params.description
372 |
373 | if params.api_name is not None:
374 | body["api_name"] = params.api_name
375 |
376 | if params.client_callable is not None:
377 | body["client_callable"] = str(params.client_callable).lower()
378 |
379 | if params.active is not None:
380 | body["active"] = str(params.active).lower()
381 |
382 | if params.access is not None:
383 | body["access"] = params.access
384 |
385 | # If no fields to update, return success
386 | if not body:
387 | return ScriptIncludeResponse(
388 | success=True,
389 | message=f"No changes to update for script include: {script_include['name']}",
390 | script_include_id=sys_id,
391 | script_include_name=script_include["name"],
392 | )
393 |
394 | # Make the request
395 | headers = auth_manager.get_headers()
396 |
397 | try:
398 | response = requests.patch(
399 | url,
400 | json=body,
401 | headers=headers,
402 | timeout=30,
403 | )
404 | response.raise_for_status()
405 |
406 | # Parse the response
407 | data = response.json()
408 |
409 | if "result" not in data:
410 | return ScriptIncludeResponse(
411 | success=False,
412 | message=f"Failed to update script include: {script_include['name']}",
413 | )
414 |
415 | result = data["result"]
416 |
417 | return ScriptIncludeResponse(
418 | success=True,
419 | message=f"Updated script include: {result.get('name')}",
420 | script_include_id=result.get("sys_id"),
421 | script_include_name=result.get("name"),
422 | )
423 |
424 | except Exception as e:
425 | logger.error(f"Error updating script include: {e}")
426 | return ScriptIncludeResponse(
427 | success=False,
428 | message=f"Error updating script include: {str(e)}",
429 | )
430 |
431 |
432 | def delete_script_include(
433 | config: ServerConfig,
434 | auth_manager: AuthManager,
435 | params: DeleteScriptIncludeParams,
436 | ) -> ScriptIncludeResponse:
437 | """Delete a script include from ServiceNow.
438 |
439 | Args:
440 | config: The server configuration.
441 | auth_manager: The authentication manager.
442 | params: The parameters for the request.
443 |
444 | Returns:
445 | A response indicating the result of the operation.
446 | """
447 | # First, get the script include to delete
448 | get_params = GetScriptIncludeParams(script_include_id=params.script_include_id)
449 | get_result = get_script_include(config, auth_manager, get_params)
450 |
451 | if not get_result["success"]:
452 | return ScriptIncludeResponse(
453 | success=False,
454 | message=get_result["message"],
455 | )
456 |
457 | script_include = get_result["script_include"]
458 | sys_id = script_include["sys_id"]
459 | name = script_include["name"]
460 |
461 | # Build the URL
462 | url = f"{config.instance_url}/api/now/table/sys_script_include/{sys_id}"
463 |
464 | # Make the request
465 | headers = auth_manager.get_headers()
466 |
467 | try:
468 | response = requests.delete(
469 | url,
470 | headers=headers,
471 | timeout=30,
472 | )
473 | response.raise_for_status()
474 |
475 | return ScriptIncludeResponse(
476 | success=True,
477 | message=f"Deleted script include: {name}",
478 | script_include_id=sys_id,
479 | script_include_name=name,
480 | )
481 |
482 | except Exception as e:
483 | logger.error(f"Error deleting script include: {e}")
484 | return ScriptIncludeResponse(
485 | success=False,
486 | message=f"Error deleting script include: {str(e)}",
487 | )
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/project_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Project management tools for the ServiceNow MCP server.
3 |
4 | This module provides tools for managing projects in ServiceNow.
5 | """
6 |
7 | import logging
8 | from datetime import datetime
9 | from typing import Any, Dict, List, Optional, Type, TypeVar
10 |
11 | import requests
12 | from pydantic import BaseModel, Field
13 |
14 | from servicenow_mcp.auth.auth_manager import AuthManager
15 | from servicenow_mcp.utils.config import ServerConfig
16 |
17 | logger = logging.getLogger(__name__)
18 |
19 | # Type variable for Pydantic models
20 | T = TypeVar('T', bound=BaseModel)
21 |
22 | class CreateProjectParams(BaseModel):
23 | """Parameters for creating a project."""
24 |
25 | short_description: str = Field(..., description="Project name of the project")
26 | description: Optional[str] = Field(None, description="Detailed description of the project")
27 | status: Optional[str] = Field(None, description="Status of the project (green, yellow, red)")
28 | state: Optional[str] = Field(None, description="State of project (-5 is Pending,1 is Open, 2 is Work in progress, 3 is Closed Complete, 4 is Closed Incomplete, 5 is Closed Skipped)")
29 | project_manager: Optional[str] = Field(None, description="Project manager for the project")
30 | percentage_complete: Optional[int] = Field(None, description="Percentage complete for the project")
31 | assignment_group: Optional[str] = Field(None, description="Group assigned to the project")
32 | assigned_to: Optional[str] = Field(None, description="User assigned to the project")
33 | start_date: Optional[str] = Field(None, description="Start date for the project")
34 | end_date: Optional[str] = Field(None, description="End date for the project")
35 |
36 | class UpdateProjectParams(BaseModel):
37 | """Parameters for updating a project."""
38 |
39 | project_id: str = Field(..., description="Project ID or sys_id")
40 | short_description: Optional[str] = Field(None, description="Project name of the project")
41 | description: Optional[str] = Field(None, description="Detailed description of the project")
42 | status: Optional[str] = Field(None, description="Status of the project (green, yellow, red)")
43 | state: Optional[str] = Field(None, description="State of project (-5 is Pending,1 is Open, 2 is Work in progress, 3 is Closed Complete, 4 is Closed Incomplete, 5 is Closed Skipped)")
44 | project_manager: Optional[str] = Field(None, description="Project manager for the project")
45 | percentage_complete: Optional[int] = Field(None, description="Percentage complete for the project")
46 | assignment_group: Optional[str] = Field(None, description="Group assigned to the project")
47 | assigned_to: Optional[str] = Field(None, description="User assigned to the project")
48 | start_date: Optional[str] = Field(None, description="Start date for the project")
49 | end_date: Optional[str] = Field(None, description="End date for the project")
50 |
51 | class ListProjectsParams(BaseModel):
52 | """Parameters for listing projects."""
53 |
54 | limit: Optional[int] = Field(10, description="Maximum number of records to return")
55 | offset: Optional[int] = Field(0, description="Offset to start from")
56 | state: Optional[str] = Field(None, description="Filter by state")
57 | assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
58 | timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
59 | query: Optional[str] = Field(None, description="Additional query string")
60 |
61 |
62 | def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
63 | """
64 | Helper function to unwrap and validate parameters.
65 |
66 | Args:
67 | params: The parameters to unwrap and validate.
68 | model_class: The Pydantic model class to validate against.
69 | required_fields: List of required field names.
70 |
71 | Returns:
72 | A tuple of (success, result) where result is either the validated parameters or an error message.
73 | """
74 | # Handle case where params might be wrapped in another dictionary
75 | if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
76 | logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
77 | params = params["params"]
78 |
79 | # Handle case where params might be a Pydantic model object
80 | if not isinstance(params, dict):
81 | try:
82 | # Try to convert to dict if it's a Pydantic model
83 | logger.warning("Params is not a dictionary. Attempting to convert...")
84 | params = params.dict() if hasattr(params, "dict") else dict(params)
85 | except Exception as e:
86 | logger.error(f"Failed to convert params to dictionary: {e}")
87 | return {
88 | "success": False,
89 | "message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
90 | }
91 |
92 | # Validate required parameters are present
93 | if required_fields:
94 | for field in required_fields:
95 | if field not in params:
96 | return {
97 | "success": False,
98 | "message": f"Missing required parameter '{field}'",
99 | }
100 |
101 | try:
102 | # Validate parameters against the model
103 | validated_params = model_class(**params)
104 | return {
105 | "success": True,
106 | "params": validated_params,
107 | }
108 | except Exception as e:
109 | logger.error(f"Error validating parameters: {e}")
110 | return {
111 | "success": False,
112 | "message": f"Error validating parameters: {str(e)}",
113 | }
114 |
115 |
116 | def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
117 | """
118 | Helper function to get the instance URL from either server_config or auth_manager.
119 |
120 | Args:
121 | auth_manager: The authentication manager.
122 | server_config: The server configuration.
123 |
124 | Returns:
125 | The instance URL if found, None otherwise.
126 | """
127 | if hasattr(server_config, 'instance_url'):
128 | return server_config.instance_url
129 | elif hasattr(auth_manager, 'instance_url'):
130 | return auth_manager.instance_url
131 | else:
132 | logger.error("Cannot find instance_url in either server_config or auth_manager")
133 | return None
134 |
135 |
136 | def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
137 | """
138 | Helper function to get headers from either auth_manager or server_config.
139 |
140 | Args:
141 | auth_manager: The authentication manager or object passed as auth_manager.
142 | server_config: The server configuration or object passed as server_config.
143 |
144 | Returns:
145 | The headers if found, None otherwise.
146 | """
147 | # Try to get headers from auth_manager
148 | if hasattr(auth_manager, 'get_headers'):
149 | return auth_manager.get_headers()
150 |
151 | # If auth_manager doesn't have get_headers, try server_config
152 | if hasattr(server_config, 'get_headers'):
153 | return server_config.get_headers()
154 |
155 | # If neither has get_headers, check if auth_manager is actually a ServerConfig
156 | # and server_config is actually an AuthManager (parameters swapped)
157 | if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
158 | return server_config.get_headers()
159 |
160 | logger.error("Cannot find get_headers method in either auth_manager or server_config")
161 | return None
162 |
163 | def create_project(
164 | config: ServerConfig, # Changed from auth_manager
165 | auth_manager: AuthManager, # Changed from server_config
166 | params: Dict[str, Any],
167 | ) -> Dict[str, Any]:
168 | """
169 | Create a new project in ServiceNow.
170 |
171 | Args:
172 | config: The server configuration.
173 | auth_manager: The authentication manager.
174 | params: The parameters for creating the project.
175 |
176 | Returns:
177 | The created project.
178 | """
179 |
180 | # Unwrap and validate parameters
181 | result = _unwrap_and_validate_params(
182 | params,
183 | CreateProjectParams,
184 | required_fields=["short_description"]
185 | )
186 |
187 | if not result["success"]:
188 | return result
189 |
190 | validated_params = result["params"]
191 |
192 | # Prepare the request data
193 | data = {
194 | "short_description": validated_params.short_description,
195 | }
196 |
197 | # Add optional fields if provided
198 | if validated_params.description:
199 | data["description"] = validated_params.description
200 | if validated_params.status:
201 | data["status"] = validated_params.status
202 | if validated_params.state:
203 | data["state"] = validated_params.state
204 | if validated_params.assignment_group:
205 | data["assignment_group"] = validated_params.assignment_group
206 | if validated_params.percentage_complete:
207 | data["percentage_complete"] = validated_params.percentage_complete
208 | if validated_params.assigned_to:
209 | data["assigned_to"] = validated_params.assigned_to
210 | if validated_params.project_manager:
211 | data["project_manager"] = validated_params.project_manager
212 | if validated_params.start_date:
213 | data["start_date"] = validated_params.start_date
214 | if validated_params.end_date:
215 | data["end_date"] = validated_params.end_date
216 |
217 | # Get the instance URL
218 | instance_url = _get_instance_url(auth_manager, config)
219 | if not instance_url:
220 | return {
221 | "success": False,
222 | "message": "Cannot find instance_url in either server_config or auth_manager",
223 | }
224 |
225 | # Get the headers
226 | headers = _get_headers(auth_manager, config)
227 | if not headers:
228 | return {
229 | "success": False,
230 | "message": "Cannot find get_headers method in either auth_manager or server_config",
231 | }
232 |
233 | # Add Content-Type header
234 | headers["Content-Type"] = "application/json"
235 |
236 | # Make the API request
237 | url = f"{instance_url}/api/now/table/pm_project"
238 |
239 | try:
240 | response = requests.post(url, json=data, headers=headers)
241 | response.raise_for_status()
242 |
243 | result = response.json()
244 |
245 | return {
246 | "success": True,
247 | "message": "Project created successfully",
248 | "project": result["result"],
249 | }
250 | except requests.exceptions.RequestException as e:
251 | logger.error(f"Error creating project: {e}")
252 | return {
253 | "success": False,
254 | "message": f"Error creating project: {str(e)}",
255 | }
256 |
257 | def update_project(
258 | config: ServerConfig, # Changed from auth_manager
259 | auth_manager: AuthManager, # Changed from server_config
260 | params: Dict[str, Any],
261 | ) -> Dict[str, Any]:
262 | """
263 | Update an existing project in ServiceNow.
264 |
265 | Args:
266 | config: The server configuration.
267 | auth_manager: The authentication manager.
268 | params: The parameters for updating the project.
269 |
270 | Returns:
271 | The updated project.
272 | """
273 | # Unwrap and validate parameters
274 | result = _unwrap_and_validate_params(
275 | params,
276 | UpdateProjectParams,
277 | required_fields=["project_id"]
278 | )
279 |
280 | if not result["success"]:
281 | return result
282 |
283 | validated_params = result["params"]
284 |
285 | # Prepare the request data
286 | data = {}
287 |
288 | # Add optional fields if provided
289 | if validated_params.short_description:
290 | data["short_description"] = validated_params.short_description
291 | if validated_params.description:
292 | data["description"] = validated_params.description
293 | if validated_params.status:
294 | data["status"] = validated_params.status
295 | if validated_params.state:
296 | data["state"] = validated_params.state
297 | if validated_params.assignment_group:
298 | data["assignment_group"] = validated_params.assignment_group
299 | if validated_params.percentage_complete:
300 | data["percentage_complete"] = validated_params.percentage_complete
301 | if validated_params.assigned_to:
302 | data["assigned_to"] = validated_params.assigned_to
303 | if validated_params.project_manager:
304 | data["project_manager"] = validated_params.project_manager
305 | if validated_params.start_date:
306 | data["start_date"] = validated_params.start_date
307 | if validated_params.end_date:
308 | data["end_date"] = validated_params.end_date
309 |
310 | # Get the instance URL
311 | instance_url = _get_instance_url(auth_manager, config)
312 | if not instance_url:
313 | return {
314 | "success": False,
315 | "message": "Cannot find instance_url in either server_config or auth_manager",
316 | }
317 |
318 | # Get the headers
319 | headers = _get_headers(auth_manager, config)
320 | if not headers:
321 | return {
322 | "success": False,
323 | "message": "Cannot find get_headers method in either auth_manager or server_config",
324 | }
325 |
326 | # Add Content-Type header
327 | headers["Content-Type"] = "application/json"
328 |
329 | # Make the API request
330 | url = f"{instance_url}/api/now/table/pm_project/{validated_params.project_id}"
331 |
332 | try:
333 | response = requests.put(url, json=data, headers=headers)
334 | response.raise_for_status()
335 |
336 | result = response.json()
337 |
338 | return {
339 | "success": True,
340 | "message": "Project updated successfully",
341 | "project": result["result"],
342 | }
343 | except requests.exceptions.RequestException as e:
344 | logger.error(f"Error updating project: {e}")
345 | return {
346 | "success": False,
347 | "message": f"Error updating project: {str(e)}",
348 | }
349 |
350 | def list_projects(
351 | config: ServerConfig, # Changed from auth_manager
352 | auth_manager: AuthManager, # Changed from server_config
353 | params: Dict[str, Any],
354 | ) -> Dict[str, Any]:
355 | """
356 | List projects from ServiceNow.
357 |
358 | Args:
359 | config: The server configuration.
360 | auth_manager: The authentication manager.
361 | params: The parameters for listing projects.
362 |
363 | Returns:
364 | A list of projects.
365 | """
366 | # Unwrap and validate parameters
367 | result = _unwrap_and_validate_params(
368 | params,
369 | ListProjectsParams
370 | )
371 |
372 | if not result["success"]:
373 | return result
374 |
375 | validated_params = result["params"]
376 |
377 | # Build the query
378 | query_parts = []
379 |
380 | if validated_params.state:
381 | query_parts.append(f"state={validated_params.state}")
382 | if validated_params.assignment_group:
383 | query_parts.append(f"assignment_group={validated_params.assignment_group}")
384 |
385 | # Handle timeframe filtering
386 | if validated_params.timeframe:
387 | now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
388 | if validated_params.timeframe == "upcoming":
389 | query_parts.append(f"start_date>{now}")
390 | elif validated_params.timeframe == "in-progress":
391 | query_parts.append(f"start_date<{now}^end_date>{now}")
392 | elif validated_params.timeframe == "completed":
393 | query_parts.append(f"end_date<{now}")
394 |
395 | # Add any additional query string
396 | if validated_params.query:
397 | query_parts.append(validated_params.query)
398 |
399 | # Combine query parts
400 | query = "^".join(query_parts) if query_parts else ""
401 |
402 | # Get the instance URL
403 | instance_url = _get_instance_url(auth_manager, config)
404 | if not instance_url:
405 | return {
406 | "success": False,
407 | "message": "Cannot find instance_url in either server_config or auth_manager",
408 | }
409 |
410 | # Get the headers
411 | headers = _get_headers(auth_manager, config)
412 | if not headers:
413 | return {
414 | "success": False,
415 | "message": "Cannot find get_headers method in either auth_manager or server_config",
416 | }
417 |
418 | # Make the API request
419 | url = f"{instance_url}/api/now/table/pm_project"
420 |
421 | params = {
422 | "sysparm_limit": validated_params.limit,
423 | "sysparm_offset": validated_params.offset,
424 | "sysparm_query": query,
425 | "sysparm_display_value": "true",
426 | }
427 |
428 | try:
429 | response = requests.get(url, headers=headers, params=params)
430 | response.raise_for_status()
431 |
432 | result = response.json()
433 |
434 | # Handle the case where result["result"] is a list
435 | projects = result.get("result", [])
436 | count = len(projects)
437 |
438 | return {
439 | "success": True,
440 | "projects": projects,
441 | "count": count,
442 | "total": count, # Use count as total if total is not provided
443 | }
444 | except requests.exceptions.RequestException as e:
445 | logger.error(f"Error listing projects: {e}")
446 | return {
447 | "success": False,
448 | "message": f"Error listing projects: {str(e)}",
449 | }
450 |
```
--------------------------------------------------------------------------------
/tests/test_script_include_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the script include tools.
3 |
4 | This module contains tests for the script include tools in the ServiceNow MCP server.
5 | """
6 |
7 | import unittest
8 | import requests
9 | from unittest.mock import MagicMock, patch
10 |
11 | from servicenow_mcp.auth.auth_manager import AuthManager
12 | from servicenow_mcp.tools.script_include_tools import (
13 | ListScriptIncludesParams,
14 | GetScriptIncludeParams,
15 | CreateScriptIncludeParams,
16 | UpdateScriptIncludeParams,
17 | DeleteScriptIncludeParams,
18 | ScriptIncludeResponse,
19 | list_script_includes,
20 | get_script_include,
21 | create_script_include,
22 | update_script_include,
23 | delete_script_include,
24 | )
25 | from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
26 |
27 |
28 | class TestScriptIncludeTools(unittest.TestCase):
29 | """Tests for the script include tools."""
30 |
31 | def setUp(self):
32 | """Set up test fixtures."""
33 | auth_config = AuthConfig(
34 | type=AuthType.BASIC,
35 | basic=BasicAuthConfig(
36 | username="test_user",
37 | password="test_password"
38 | )
39 | )
40 | self.server_config = ServerConfig(
41 | instance_url="https://test.service-now.com",
42 | auth=auth_config,
43 | )
44 | self.auth_manager = MagicMock(spec=AuthManager)
45 | self.auth_manager.get_headers.return_value = {
46 | "Authorization": "Bearer test",
47 | "Content-Type": "application/json",
48 | }
49 |
50 | @patch("servicenow_mcp.tools.script_include_tools.requests.get")
51 | def test_list_script_includes(self, mock_get):
52 | """Test listing script includes."""
53 | # Mock response
54 | mock_response = MagicMock()
55 | mock_response.json.return_value = {
56 | "result": [
57 | {
58 | "sys_id": "123",
59 | "name": "TestScriptInclude",
60 | "script": "var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n },\n\n type: 'TestScriptInclude'\n};",
61 | "description": "Test Script Include",
62 | "api_name": "global.TestScriptInclude",
63 | "client_callable": "true",
64 | "active": "true",
65 | "access": "public",
66 | "sys_created_on": "2023-01-01 00:00:00",
67 | "sys_updated_on": "2023-01-02 00:00:00",
68 | "sys_created_by": {"display_value": "admin"},
69 | "sys_updated_by": {"display_value": "admin"}
70 | }
71 | ]
72 | }
73 | mock_response.status_code = 200
74 | mock_get.return_value = mock_response
75 |
76 | # Call the method
77 | params = ListScriptIncludesParams(
78 | limit=10,
79 | offset=0,
80 | active=True,
81 | client_callable=True,
82 | query="Test"
83 | )
84 | result = list_script_includes(self.server_config, self.auth_manager, params)
85 |
86 | # Verify the result
87 | self.assertTrue(result["success"])
88 | self.assertEqual(1, len(result["script_includes"]))
89 | self.assertEqual("123", result["script_includes"][0]["sys_id"])
90 | self.assertEqual("TestScriptInclude", result["script_includes"][0]["name"])
91 | self.assertTrue(result["script_includes"][0]["client_callable"])
92 | self.assertTrue(result["script_includes"][0]["active"])
93 |
94 | # Verify the request
95 | mock_get.assert_called_once()
96 | args, kwargs = mock_get.call_args
97 | self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include", args[0])
98 | self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
99 | self.assertEqual(10, kwargs["params"]["sysparm_limit"])
100 | self.assertEqual(0, kwargs["params"]["sysparm_offset"])
101 | self.assertEqual("active=true^client_callable=true^nameLIKETest", kwargs["params"]["sysparm_query"])
102 |
103 | @patch("servicenow_mcp.tools.script_include_tools.requests.get")
104 | def test_get_script_include(self, mock_get):
105 | """Test getting a script include."""
106 | # Mock response
107 | mock_response = MagicMock()
108 | mock_response.json.return_value = {
109 | "result": {
110 | "sys_id": "123",
111 | "name": "TestScriptInclude",
112 | "script": "var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n },\n\n type: 'TestScriptInclude'\n};",
113 | "description": "Test Script Include",
114 | "api_name": "global.TestScriptInclude",
115 | "client_callable": "true",
116 | "active": "true",
117 | "access": "public",
118 | "sys_created_on": "2023-01-01 00:00:00",
119 | "sys_updated_on": "2023-01-02 00:00:00",
120 | "sys_created_by": {"display_value": "admin"},
121 | "sys_updated_by": {"display_value": "admin"}
122 | }
123 | }
124 | mock_response.status_code = 200
125 | mock_get.return_value = mock_response
126 |
127 | # Call the method
128 | params = GetScriptIncludeParams(script_include_id="123")
129 | result = get_script_include(self.server_config, self.auth_manager, params)
130 |
131 | # Verify the result
132 | self.assertTrue(result["success"])
133 | self.assertEqual("123", result["script_include"]["sys_id"])
134 | self.assertEqual("TestScriptInclude", result["script_include"]["name"])
135 | self.assertTrue(result["script_include"]["client_callable"])
136 | self.assertTrue(result["script_include"]["active"])
137 |
138 | # Verify the request
139 | mock_get.assert_called_once()
140 | args, kwargs = mock_get.call_args
141 | self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include", args[0])
142 | self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
143 | self.assertEqual("name=123", kwargs["params"]["sysparm_query"])
144 |
145 | @patch("servicenow_mcp.tools.script_include_tools.requests.post")
146 | def test_create_script_include(self, mock_post):
147 | """Test creating a script include."""
148 | # Mock response
149 | mock_response = MagicMock()
150 | mock_response.json.return_value = {
151 | "result": {
152 | "sys_id": "123",
153 | "name": "TestScriptInclude",
154 | }
155 | }
156 | mock_response.status_code = 201
157 | mock_post.return_value = mock_response
158 |
159 | # Call the method
160 | params = CreateScriptIncludeParams(
161 | name="TestScriptInclude",
162 | script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n },\n\n type: 'TestScriptInclude'\n};",
163 | description="Test Script Include",
164 | api_name="global.TestScriptInclude",
165 | client_callable=True,
166 | active=True,
167 | access="public"
168 | )
169 | result = create_script_include(self.server_config, self.auth_manager, params)
170 |
171 | # Verify the result
172 | self.assertTrue(result.success)
173 | self.assertEqual("123", result.script_include_id)
174 | self.assertEqual("TestScriptInclude", result.script_include_name)
175 |
176 | # Verify the request
177 | mock_post.assert_called_once()
178 | args, kwargs = mock_post.call_args
179 | self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include", args[0])
180 | self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
181 | self.assertEqual("TestScriptInclude", kwargs["json"]["name"])
182 | self.assertEqual("true", kwargs["json"]["client_callable"])
183 | self.assertEqual("true", kwargs["json"]["active"])
184 | self.assertEqual("public", kwargs["json"]["access"])
185 |
186 | @patch("servicenow_mcp.tools.script_include_tools.get_script_include")
187 | @patch("servicenow_mcp.tools.script_include_tools.requests.patch")
188 | def test_update_script_include(self, mock_patch, mock_get_script_include):
189 | """Test updating a script include."""
190 | # Mock get_script_include response
191 | mock_get_script_include.return_value = {
192 | "success": True,
193 | "message": "Found script include: TestScriptInclude",
194 | "script_include": {
195 | "sys_id": "123",
196 | "name": "TestScriptInclude",
197 | "script": "var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n },\n\n type: 'TestScriptInclude'\n};",
198 | "description": "Test Script Include",
199 | "api_name": "global.TestScriptInclude",
200 | "client_callable": True,
201 | "active": True,
202 | "access": "public",
203 | }
204 | }
205 |
206 | # Mock patch response
207 | mock_response = MagicMock()
208 | mock_response.json.return_value = {
209 | "result": {
210 | "sys_id": "123",
211 | "name": "TestScriptInclude",
212 | }
213 | }
214 | mock_response.status_code = 200
215 | mock_patch.return_value = mock_response
216 |
217 | # Call the method
218 | params = UpdateScriptIncludeParams(
219 | script_include_id="123",
220 | script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n // Updated\n },\n\n type: 'TestScriptInclude'\n};",
221 | description="Updated Test Script Include",
222 | client_callable=False,
223 | )
224 | result = update_script_include(self.server_config, self.auth_manager, params)
225 |
226 | # Verify the result
227 | self.assertTrue(result.success)
228 | self.assertEqual("123", result.script_include_id)
229 | self.assertEqual("TestScriptInclude", result.script_include_name)
230 |
231 | # Verify the request
232 | mock_patch.assert_called_once()
233 | args, kwargs = mock_patch.call_args
234 | self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include/123", args[0])
235 | self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
236 | self.assertEqual("Updated Test Script Include", kwargs["json"]["description"])
237 | self.assertEqual("false", kwargs["json"]["client_callable"])
238 |
239 | @patch("servicenow_mcp.tools.script_include_tools.get_script_include")
240 | @patch("servicenow_mcp.tools.script_include_tools.requests.delete")
241 | def test_delete_script_include(self, mock_delete, mock_get_script_include):
242 | """Test deleting a script include."""
243 | # Mock get_script_include response
244 | mock_get_script_include.return_value = {
245 | "success": True,
246 | "message": "Found script include: TestScriptInclude",
247 | "script_include": {
248 | "sys_id": "123",
249 | "name": "TestScriptInclude",
250 | }
251 | }
252 |
253 | # Mock delete response
254 | mock_response = MagicMock()
255 | mock_response.status_code = 204
256 | mock_delete.return_value = mock_response
257 |
258 | # Call the method
259 | params = DeleteScriptIncludeParams(script_include_id="123")
260 | result = delete_script_include(self.server_config, self.auth_manager, params)
261 |
262 | # Verify the result
263 | self.assertTrue(result.success)
264 | self.assertEqual("123", result.script_include_id)
265 | self.assertEqual("TestScriptInclude", result.script_include_name)
266 |
267 | # Verify the request
268 | mock_delete.assert_called_once()
269 | args, kwargs = mock_delete.call_args
270 | self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include/123", args[0])
271 | self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
272 |
273 | @patch("servicenow_mcp.tools.script_include_tools.requests.get")
274 | def test_list_script_includes_error(self, mock_get):
275 | """Test listing script includes with an error."""
276 | # Mock response
277 | mock_get.side_effect = requests.RequestException("Test error")
278 |
279 | # Call the method
280 | params = ListScriptIncludesParams()
281 | result = list_script_includes(self.server_config, self.auth_manager, params)
282 |
283 | # Verify the result
284 | self.assertFalse(result["success"])
285 | self.assertIn("Error listing script includes", result["message"])
286 |
287 | @patch("servicenow_mcp.tools.script_include_tools.requests.get")
288 | def test_get_script_include_error(self, mock_get):
289 | """Test getting a script include with an error."""
290 | # Mock response
291 | mock_get.side_effect = requests.RequestException("Test error")
292 |
293 | # Call the method
294 | params = GetScriptIncludeParams(script_include_id="123")
295 | result = get_script_include(self.server_config, self.auth_manager, params)
296 |
297 | # Verify the result
298 | self.assertFalse(result["success"])
299 | self.assertIn("Error getting script include", result["message"])
300 |
301 | @patch("servicenow_mcp.tools.script_include_tools.requests.post")
302 | def test_create_script_include_error(self, mock_post):
303 | """Test creating a script include with an error."""
304 | # Mock response
305 | mock_post.side_effect = requests.RequestException("Test error")
306 |
307 | # Call the method
308 | params = CreateScriptIncludeParams(
309 | name="TestScriptInclude",
310 | script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n },\n\n type: 'TestScriptInclude'\n};",
311 | )
312 | result = create_script_include(self.server_config, self.auth_manager, params)
313 |
314 | # Verify the result
315 | self.assertFalse(result.success)
316 | self.assertIn("Error creating script include", result.message)
317 |
318 |
319 | class TestScriptIncludeParams(unittest.TestCase):
320 | """Tests for the script include parameters."""
321 |
322 | def test_list_script_includes_params(self):
323 | """Test list script includes parameters."""
324 | params = ListScriptIncludesParams(
325 | limit=20,
326 | offset=10,
327 | active=True,
328 | client_callable=False,
329 | query="Test"
330 | )
331 | self.assertEqual(20, params.limit)
332 | self.assertEqual(10, params.offset)
333 | self.assertTrue(params.active)
334 | self.assertFalse(params.client_callable)
335 | self.assertEqual("Test", params.query)
336 |
337 | def test_get_script_include_params(self):
338 | """Test get script include parameters."""
339 | params = GetScriptIncludeParams(script_include_id="123")
340 | self.assertEqual("123", params.script_include_id)
341 |
342 | def test_create_script_include_params(self):
343 | """Test create script include parameters."""
344 | params = CreateScriptIncludeParams(
345 | name="TestScriptInclude",
346 | script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n },\n\n type: 'TestScriptInclude'\n};",
347 | description="Test Script Include",
348 | api_name="global.TestScriptInclude",
349 | client_callable=True,
350 | active=True,
351 | access="public"
352 | )
353 | self.assertEqual("TestScriptInclude", params.name)
354 | self.assertTrue(params.client_callable)
355 | self.assertTrue(params.active)
356 | self.assertEqual("public", params.access)
357 |
358 | def test_update_script_include_params(self):
359 | """Test update script include parameters."""
360 | params = UpdateScriptIncludeParams(
361 | script_include_id="123",
362 | script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n initialize: function() {\n // Updated\n },\n\n type: 'TestScriptInclude'\n};",
363 | description="Updated Test Script Include",
364 | client_callable=False,
365 | )
366 | self.assertEqual("123", params.script_include_id)
367 | self.assertEqual("Updated Test Script Include", params.description)
368 | self.assertFalse(params.client_callable)
369 |
370 | def test_delete_script_include_params(self):
371 | """Test delete script include parameters."""
372 | params = DeleteScriptIncludeParams(script_include_id="123")
373 | self.assertEqual("123", params.script_include_id)
374 |
375 | def test_script_include_response(self):
376 | """Test script include response."""
377 | response = ScriptIncludeResponse(
378 | success=True,
379 | message="Test message",
380 | script_include_id="123",
381 | script_include_name="TestScriptInclude"
382 | )
383 | self.assertTrue(response.success)
384 | self.assertEqual("Test message", response.message)
385 | self.assertEqual("123", response.script_include_id)
386 | self.assertEqual("TestScriptInclude", response.script_include_name)
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/scrum_task_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Scrum Task management tools for the ServiceNow MCP server.
3 |
4 | This module provides tools for managing stories in ServiceNow.
5 | """
6 |
7 | import logging
8 | from datetime import datetime
9 | from typing import Any, Dict, List, Optional, Type, TypeVar
10 |
11 | import requests
12 | from pydantic import BaseModel, Field
13 |
14 | from servicenow_mcp.auth.auth_manager import AuthManager
15 | from servicenow_mcp.utils.config import ServerConfig
16 |
17 | logger = logging.getLogger(__name__)
18 |
19 | # Type variable for Pydantic models
20 | T = TypeVar('T', bound=BaseModel)
21 |
22 | class CreateScrumTaskParams(BaseModel):
23 | """Parameters for creating a scrum task."""
24 |
25 | story: str = Field(..., description="Short description of the story. It requires the System ID of the story.")
26 | short_description: str = Field(..., description="Short description of the scrum task")
27 | priority: Optional[str] = Field(None, description="Priority of scrum task (1 is Critical, 2 is High, 3 is Moderate, 4 is Low)")
28 | planned_hours: Optional[int] = Field(None, description="Planned hours for the scrum task")
29 | remaining_hours: Optional[int] = Field(None, description="Remaining hours for the scrum task")
30 | hours: Optional[int] = Field(None, description="Actual Hours for the scrum task")
31 | description: Optional[str] = Field(None, description="Detailed description of the scrum task")
32 | type: Optional[str] = Field(None, description="Type of scrum task (1 is Analysis, 2 is Coding, 3 is Documentation, 4 is Testing)")
33 | state: Optional[str] = Field(None, description="State of scrum task (-6 is Draft,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
34 | assignment_group: Optional[str] = Field(None, description="Group assigned to the scrum task")
35 | assigned_to: Optional[str] = Field(None, description="User assigned to the scrum task")
36 | work_notes: Optional[str] = Field(None, description="Work notes to add to the scrum task")
37 |
38 | class UpdateScrumTaskParams(BaseModel):
39 | """Parameters for updating a scrum task."""
40 |
41 | scrum_task_id: str = Field(..., description="Scrum Task ID or sys_id")
42 | short_description: Optional[str] = Field(None, description="Short description of the scrum task")
43 | priority: Optional[str] = Field(None, description="Priority of scrum task (1 is Critical, 2 is High, 3 is Moderate, 4 is Low)")
44 | planned_hours: Optional[int] = Field(None, description="Planned hours for the scrum task")
45 | remaining_hours: Optional[int] = Field(None, description="Remaining hours for the scrum task")
46 | hours: Optional[int] = Field(None, description="Actual Hours for the scrum task")
47 | description: Optional[str] = Field(None, description="Detailed description of the scrum task")
48 | type: Optional[str] = Field(None, description="Type of scrum task (1 is Analysis, 2 is Coding, 3 is Documentation, 4 is Testing)")
49 | state: Optional[str] = Field(None, description="State of scrum task (-6 is Draft,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
50 | assignment_group: Optional[str] = Field(None, description="Group assigned to the scrum task")
51 | assigned_to: Optional[str] = Field(None, description="User assigned to the scrum task")
52 | work_notes: Optional[str] = Field(None, description="Work notes to add to the scrum task")
53 |
54 | class ListScrumTasksParams(BaseModel):
55 | """Parameters for listing scrum tasks."""
56 |
57 | limit: Optional[int] = Field(10, description="Maximum number of records to return")
58 | offset: Optional[int] = Field(0, description="Offset to start from")
59 | state: Optional[str] = Field(None, description="Filter by state")
60 | assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
61 | timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
62 | query: Optional[str] = Field(None, description="Additional query string")
63 |
64 |
65 | def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
66 | """
67 | Helper function to unwrap and validate parameters.
68 |
69 | Args:
70 | params: The parameters to unwrap and validate.
71 | model_class: The Pydantic model class to validate against.
72 | required_fields: List of required field names.
73 |
74 | Returns:
75 | A tuple of (success, result) where result is either the validated parameters or an error message.
76 | """
77 | # Handle case where params might be wrapped in another dictionary
78 | if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
79 | logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
80 | params = params["params"]
81 |
82 | # Handle case where params might be a Pydantic model object
83 | if not isinstance(params, dict):
84 | try:
85 | # Try to convert to dict if it's a Pydantic model
86 | logger.warning("Params is not a dictionary. Attempting to convert...")
87 | params = params.dict() if hasattr(params, "dict") else dict(params)
88 | except Exception as e:
89 | logger.error(f"Failed to convert params to dictionary: {e}")
90 | return {
91 | "success": False,
92 | "message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
93 | }
94 |
95 | # Validate required parameters are present
96 | if required_fields:
97 | for field in required_fields:
98 | if field not in params:
99 | return {
100 | "success": False,
101 | "message": f"Missing required parameter '{field}'",
102 | }
103 |
104 | try:
105 | # Validate parameters against the model
106 | validated_params = model_class(**params)
107 | return {
108 | "success": True,
109 | "params": validated_params,
110 | }
111 | except Exception as e:
112 | logger.error(f"Error validating parameters: {e}")
113 | return {
114 | "success": False,
115 | "message": f"Error validating parameters: {str(e)}",
116 | }
117 |
118 |
119 | def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
120 | """
121 | Helper function to get the instance URL from either server_config or auth_manager.
122 |
123 | Args:
124 | auth_manager: The authentication manager.
125 | server_config: The server configuration.
126 |
127 | Returns:
128 | The instance URL if found, None otherwise.
129 | """
130 | if hasattr(server_config, 'instance_url'):
131 | return server_config.instance_url
132 | elif hasattr(auth_manager, 'instance_url'):
133 | return auth_manager.instance_url
134 | else:
135 | logger.error("Cannot find instance_url in either server_config or auth_manager")
136 | return None
137 |
138 |
139 | def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
140 | """
141 | Helper function to get headers from either auth_manager or server_config.
142 |
143 | Args:
144 | auth_manager: The authentication manager or object passed as auth_manager.
145 | server_config: The server configuration or object passed as server_config.
146 |
147 | Returns:
148 | The headers if found, None otherwise.
149 | """
150 | # Try to get headers from auth_manager
151 | if hasattr(auth_manager, 'get_headers'):
152 | return auth_manager.get_headers()
153 |
154 | # If auth_manager doesn't have get_headers, try server_config
155 | if hasattr(server_config, 'get_headers'):
156 | return server_config.get_headers()
157 |
158 | # If neither has get_headers, check if auth_manager is actually a ServerConfig
159 | # and server_config is actually an AuthManager (parameters swapped)
160 | if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
161 | return server_config.get_headers()
162 |
163 | logger.error("Cannot find get_headers method in either auth_manager or server_config")
164 | return None
165 |
166 | def create_scrum_task(
167 | auth_manager: AuthManager,
168 | server_config: ServerConfig,
169 | params: Dict[str, Any],
170 | ) -> Dict[str, Any]:
171 | """
172 | Create a new scrum task in ServiceNow.
173 |
174 | Args:
175 | auth_manager: The authentication manager.
176 | server_config: The server configuration.
177 | params: The parameters for creating the scrum task.
178 |
179 | Returns:
180 | The created scrum task.
181 | """
182 |
183 | # Unwrap and validate parameters
184 | result = _unwrap_and_validate_params(
185 | params,
186 | CreateScrumTaskParams,
187 | required_fields=["short_description", "story"]
188 | )
189 |
190 | if not result["success"]:
191 | return result
192 |
193 | validated_params = result["params"]
194 |
195 | # Prepare the request data
196 | data = {
197 | "story": validated_params.story,
198 | "short_description": validated_params.short_description,
199 | }
200 |
201 | # Add optional fields if provided
202 | if validated_params.priority:
203 | data["priority"] = validated_params.priority
204 | if validated_params.planned_hours:
205 | data["planned_hours"] = validated_params.planned_hours
206 | if validated_params.remaining_hours:
207 | data["remaining_hours"] = validated_params.remaining_hours
208 | if validated_params.hours:
209 | data["hours"] = validated_params.hours
210 | if validated_params.description:
211 | data["description"] = validated_params.description
212 | if validated_params.type:
213 | data["type"] = validated_params.type
214 | if validated_params.state:
215 | data["state"] = validated_params.state
216 | if validated_params.assignment_group:
217 | data["assignment_group"] = validated_params.assignment_group
218 | if validated_params.assigned_to:
219 | data["assigned_to"] = validated_params.assigned_to
220 | if validated_params.work_notes:
221 | data["work_notes"] = validated_params.work_notes
222 |
223 | # Get the instance URL
224 | instance_url = _get_instance_url(auth_manager, server_config)
225 | if not instance_url:
226 | return {
227 | "success": False,
228 | "message": "Cannot find instance_url in either server_config or auth_manager",
229 | }
230 |
231 | # Get the headers
232 | headers = _get_headers(auth_manager, server_config)
233 | if not headers:
234 | return {
235 | "success": False,
236 | "message": "Cannot find get_headers method in either auth_manager or server_config",
237 | }
238 |
239 | # Add Content-Type header
240 | headers["Content-Type"] = "application/json"
241 |
242 | # Make the API request
243 | url = f"{instance_url}/api/now/table/rm_scrum_task"
244 |
245 | try:
246 | response = requests.post(url, json=data, headers=headers)
247 | response.raise_for_status()
248 |
249 | result = response.json()
250 |
251 | return {
252 | "success": True,
253 | "message": "Scrum Task created successfully",
254 | "scrum_task": result["result"],
255 | }
256 | except requests.exceptions.RequestException as e:
257 | logger.error(f"Error creating scrum task: {e}")
258 | return {
259 | "success": False,
260 | "message": f"Error creating scrum task: {str(e)}",
261 | }
262 |
263 | def update_scrum_task(
264 | auth_manager: AuthManager,
265 | server_config: ServerConfig,
266 | params: Dict[str, Any],
267 | ) -> Dict[str, Any]:
268 | """
269 | Update an existing scrum task in ServiceNow.
270 |
271 | Args:
272 | auth_manager: The authentication manager.
273 | server_config: The server configuration.
274 | params: The parameters for updating the scrum task.
275 |
276 | Returns:
277 | The updated scrum task.
278 | """
279 | # Unwrap and validate parameters
280 | result = _unwrap_and_validate_params(
281 | params,
282 | UpdateScrumTaskParams,
283 | required_fields=["scrum_task_id"]
284 | )
285 |
286 | if not result["success"]:
287 | return result
288 |
289 | validated_params = result["params"]
290 |
291 | # Prepare the request data
292 | data = {}
293 |
294 | # Add optional fields if provided
295 | if validated_params.short_description:
296 | data["short_description"] = validated_params.short_description
297 | if validated_params.priority:
298 | data["priority"] = validated_params.priority
299 | if validated_params.planned_hours:
300 | data["planned_hours"] = validated_params.planned_hours
301 | if validated_params.remaining_hours:
302 | data["remaining_hours"] = validated_params.remaining_hours
303 | if validated_params.hours:
304 | data["hours"] = validated_params.hours
305 | if validated_params.description:
306 | data["description"] = validated_params.description
307 | if validated_params.type:
308 | data["type"] = validated_params.type
309 | if validated_params.state:
310 | data["state"] = validated_params.state
311 | if validated_params.assignment_group:
312 | data["assignment_group"] = validated_params.assignment_group
313 | if validated_params.assigned_to:
314 | data["assigned_to"] = validated_params.assigned_to
315 | if validated_params.work_notes:
316 | data["work_notes"] = validated_params.work_notes
317 |
318 | # Get the instance URL
319 | instance_url = _get_instance_url(auth_manager, server_config)
320 | if not instance_url:
321 | return {
322 | "success": False,
323 | "message": "Cannot find instance_url in either server_config or auth_manager",
324 | }
325 |
326 | # Get the headers
327 | headers = _get_headers(auth_manager, server_config)
328 | if not headers:
329 | return {
330 | "success": False,
331 | "message": "Cannot find get_headers method in either auth_manager or server_config",
332 | }
333 |
334 | # Add Content-Type header
335 | headers["Content-Type"] = "application/json"
336 |
337 | # Make the API request
338 | url = f"{instance_url}/api/now/table/rm_scrum_task/{validated_params.scrum_task_id}"
339 |
340 | try:
341 | response = requests.put(url, json=data, headers=headers)
342 | response.raise_for_status()
343 |
344 | result = response.json()
345 |
346 | return {
347 | "success": True,
348 | "message": "Scrum Task updated successfully",
349 | "scrum_task": result["result"],
350 | }
351 | except requests.exceptions.RequestException as e:
352 | logger.error(f"Error updating scrum task: {e}")
353 | return {
354 | "success": False,
355 | "message": f"Error updating scrum task: {str(e)}",
356 | }
357 |
358 | def list_scrum_tasks(
359 | auth_manager: AuthManager,
360 | server_config: ServerConfig,
361 | params: Dict[str, Any],
362 | ) -> Dict[str, Any]:
363 | """
364 | List scrum tasks from ServiceNow.
365 |
366 | Args:
367 | auth_manager: The authentication manager.
368 | server_config: The server configuration.
369 | params: The parameters for listing scrum tasks.
370 |
371 | Returns:
372 | A list of scrum tasks.
373 | """
374 | # Unwrap and validate parameters
375 | result = _unwrap_and_validate_params(
376 | params,
377 | ListScrumTasksParams
378 | )
379 |
380 | if not result["success"]:
381 | return result
382 |
383 | validated_params = result["params"]
384 |
385 | # Build the query
386 | query_parts = []
387 |
388 | if validated_params.state:
389 | query_parts.append(f"state={validated_params.state}")
390 | if validated_params.assignment_group:
391 | query_parts.append(f"assignment_group={validated_params.assignment_group}")
392 |
393 | # Handle timeframe filtering
394 | if validated_params.timeframe:
395 | now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
396 | if validated_params.timeframe == "upcoming":
397 | query_parts.append(f"start_date>{now}")
398 | elif validated_params.timeframe == "in-progress":
399 | query_parts.append(f"start_date<{now}^end_date>{now}")
400 | elif validated_params.timeframe == "completed":
401 | query_parts.append(f"end_date<{now}")
402 |
403 | # Add any additional query string
404 | if validated_params.query:
405 | query_parts.append(validated_params.query)
406 |
407 | # Combine query parts
408 | query = "^".join(query_parts) if query_parts else ""
409 |
410 | # Get the instance URL
411 | instance_url = _get_instance_url(auth_manager, server_config)
412 | if not instance_url:
413 | return {
414 | "success": False,
415 | "message": "Cannot find instance_url in either server_config or auth_manager",
416 | }
417 |
418 | # Get the headers
419 | headers = _get_headers(auth_manager, server_config)
420 | if not headers:
421 | return {
422 | "success": False,
423 | "message": "Cannot find get_headers method in either auth_manager or server_config",
424 | }
425 |
426 | # Make the API request
427 | url = f"{instance_url}/api/now/table/rm_scrum_task"
428 |
429 | params = {
430 | "sysparm_limit": validated_params.limit,
431 | "sysparm_offset": validated_params.offset,
432 | "sysparm_query": query,
433 | "sysparm_display_value": "true",
434 | }
435 |
436 | try:
437 | response = requests.get(url, headers=headers, params=params)
438 | response.raise_for_status()
439 |
440 | result = response.json()
441 |
442 | # Handle the case where result["result"] is a list
443 | scrum_tasks = result.get("result", [])
444 | count = len(scrum_tasks)
445 |
446 | return {
447 | "success": True,
448 | "scrum_tasks": scrum_tasks,
449 | "count": count,
450 | "total": count, # Use count as total if total is not provided
451 | }
452 | except requests.exceptions.RequestException as e:
453 | logger.error(f"Error listing stories: {e}")
454 | return {
455 | "success": False,
456 | "message": f"Error listing stories: {str(e)}",
457 | }
458 |
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/catalog_optimization.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tools for optimizing the ServiceNow Service Catalog.
3 |
4 | This module provides tools for analyzing and optimizing the ServiceNow Service Catalog,
5 | including identifying inactive items, items with low usage, high abandonment rates,
6 | slow fulfillment times, and poor descriptions.
7 | """
8 |
9 | import logging
10 | import random
11 | from dataclasses import dataclass
12 | from typing import Dict, List, Optional
13 |
14 | import requests
15 | from pydantic import BaseModel, Field
16 |
17 | from servicenow_mcp.auth.auth_manager import AuthManager
18 | from servicenow_mcp.utils.config import ServerConfig
19 |
20 | logger = logging.getLogger(__name__)
21 |
22 |
23 | class OptimizationRecommendationsParams(BaseModel):
24 | """Parameters for getting optimization recommendations."""
25 |
26 | recommendation_types: List[str]
27 | category_id: Optional[str] = None
28 |
29 |
30 | class UpdateCatalogItemParams(BaseModel):
31 | """Parameters for updating a catalog item."""
32 |
33 | item_id: str
34 | name: Optional[str] = None
35 | short_description: Optional[str] = None
36 | description: Optional[str] = None
37 | category: Optional[str] = None
38 | price: Optional[str] = None
39 | active: Optional[bool] = None
40 | order: Optional[int] = None
41 |
42 |
43 | def get_optimization_recommendations(
44 | config: ServerConfig, auth_manager: AuthManager, params: OptimizationRecommendationsParams
45 | ) -> Dict:
46 | """
47 | Get optimization recommendations for the ServiceNow Service Catalog.
48 |
49 | Args:
50 | config: The server configuration
51 | auth_manager: The authentication manager
52 | params: The parameters for getting optimization recommendations
53 |
54 | Returns:
55 | A dictionary containing the optimization recommendations
56 | """
57 | logger.info("Getting catalog optimization recommendations")
58 |
59 | recommendations = []
60 | category_id = params.category_id
61 |
62 | try:
63 | # Get recommendations based on the requested types
64 | for rec_type in params.recommendation_types:
65 | if rec_type == "inactive_items":
66 | items = _get_inactive_items(config, auth_manager, category_id)
67 | if items:
68 | recommendations.append({
69 | "type": "inactive_items",
70 | "title": "Inactive Catalog Items",
71 | "description": "Items that are currently inactive in the catalog",
72 | "items": items,
73 | "impact": "medium",
74 | "effort": "low",
75 | "action": "Review and either update or remove these items",
76 | })
77 |
78 | elif rec_type == "low_usage":
79 | items = _get_low_usage_items(config, auth_manager, category_id)
80 | if items:
81 | recommendations.append({
82 | "type": "low_usage",
83 | "title": "Low Usage Catalog Items",
84 | "description": "Items that have very few orders",
85 | "items": items,
86 | "impact": "medium",
87 | "effort": "medium",
88 | "action": "Consider promoting these items or removing them if no longer needed",
89 | })
90 |
91 | elif rec_type == "high_abandonment":
92 | items = _get_high_abandonment_items(config, auth_manager, category_id)
93 | if items:
94 | recommendations.append({
95 | "type": "high_abandonment",
96 | "title": "High Abandonment Rate Items",
97 | "description": "Items that are frequently added to cart but not ordered",
98 | "items": items,
99 | "impact": "high",
100 | "effort": "medium",
101 | "action": "Simplify the request process or improve the item description",
102 | })
103 |
104 | elif rec_type == "slow_fulfillment":
105 | items = _get_slow_fulfillment_items(config, auth_manager, category_id)
106 | if items:
107 | recommendations.append({
108 | "type": "slow_fulfillment",
109 | "title": "Slow Fulfillment Items",
110 | "description": "Items that take longer than average to fulfill",
111 | "items": items,
112 | "impact": "high",
113 | "effort": "high",
114 | "action": "Review the fulfillment process and identify bottlenecks",
115 | })
116 |
117 | elif rec_type == "description_quality":
118 | items = _get_poor_description_items(config, auth_manager, category_id)
119 | if items:
120 | recommendations.append({
121 | "type": "description_quality",
122 | "title": "Poor Description Quality",
123 | "description": "Items with missing, short, or low-quality descriptions",
124 | "items": items,
125 | "impact": "medium",
126 | "effort": "low",
127 | "action": "Improve the descriptions to better explain the item's purpose and benefits",
128 | })
129 |
130 | return {
131 | "success": True,
132 | "recommendations": recommendations,
133 | }
134 |
135 | except Exception as e:
136 | logger.error(f"Error getting optimization recommendations: {e}")
137 | return {
138 | "success": False,
139 | "message": f"Error getting optimization recommendations: {str(e)}",
140 | "recommendations": [],
141 | }
142 |
143 |
144 | def update_catalog_item(
145 | config: ServerConfig, auth_manager: AuthManager, params: UpdateCatalogItemParams
146 | ) -> Dict:
147 | """
148 | Update a catalog item.
149 |
150 | Args:
151 | config: The server configuration
152 | auth_manager: The authentication manager
153 | params: The parameters for updating the catalog item
154 |
155 | Returns:
156 | A dictionary containing the result of the update operation
157 | """
158 | logger.info(f"Updating catalog item: {params.item_id}")
159 |
160 | try:
161 | # Build the request body with only the provided parameters
162 | body = {}
163 | if params.name is not None:
164 | body["name"] = params.name
165 | if params.short_description is not None:
166 | body["short_description"] = params.short_description
167 | if params.description is not None:
168 | body["description"] = params.description
169 | if params.category is not None:
170 | body["category"] = params.category
171 | if params.price is not None:
172 | body["price"] = params.price
173 | if params.active is not None:
174 | body["active"] = str(params.active).lower()
175 | if params.order is not None:
176 | body["order"] = str(params.order)
177 |
178 | # Make the API request
179 | url = f"{config.instance_url}/api/now/table/sc_cat_item/{params.item_id}"
180 | headers = auth_manager.get_headers()
181 | headers["Content-Type"] = "application/json"
182 |
183 | response = requests.patch(url, headers=headers, json=body)
184 | response.raise_for_status()
185 |
186 | return {
187 | "success": True,
188 | "message": "Catalog item updated successfully",
189 | "data": response.json()["result"],
190 | }
191 |
192 | except Exception as e:
193 | logger.error(f"Error updating catalog item: {e}")
194 | return {
195 | "success": False,
196 | "message": f"Error updating catalog item: {str(e)}",
197 | "data": None,
198 | }
199 |
200 |
201 | def _get_inactive_items(
202 | config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
203 | ) -> List[Dict]:
204 | """
205 | Get inactive catalog items.
206 |
207 | Args:
208 | config: The server configuration
209 | auth_manager: The authentication manager
210 | category_id: Optional category ID to filter by
211 |
212 | Returns:
213 | A list of inactive catalog items
214 | """
215 | try:
216 | # Build the query
217 | query = "active=false"
218 | if category_id:
219 | query += f"^category={category_id}"
220 |
221 | # Make the API request
222 | url = f"{config.instance_url}/api/now/table/sc_cat_item"
223 | headers = auth_manager.get_headers()
224 | params = {
225 | "sysparm_query": query,
226 | "sysparm_fields": "sys_id,name,short_description,category",
227 | "sysparm_limit": "50",
228 | }
229 |
230 | response = requests.get(url, headers=headers, params=params)
231 | response.raise_for_status()
232 |
233 | return response.json()["result"]
234 |
235 | except Exception as e:
236 | logger.error(f"Error getting inactive items: {e}")
237 | return []
238 |
239 |
240 | def _get_low_usage_items(
241 | config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
242 | ) -> List[Dict]:
243 | """
244 | Get catalog items with low usage.
245 |
246 | Args:
247 | config: The server configuration
248 | auth_manager: The authentication manager
249 | category_id: Optional category ID to filter by
250 |
251 | Returns:
252 | A list of catalog items with low usage
253 | """
254 | try:
255 | # Build the query
256 | query = "active=true"
257 | if category_id:
258 | query += f"^category={category_id}"
259 |
260 | # Make the API request
261 | url = f"{config.instance_url}/api/now/table/sc_cat_item"
262 | headers = auth_manager.get_headers()
263 | params = {
264 | "sysparm_query": query,
265 | "sysparm_fields": "sys_id,name,short_description,category",
266 | "sysparm_limit": "50",
267 | }
268 |
269 | response = requests.get(url, headers=headers, params=params)
270 | response.raise_for_status()
271 |
272 | # In a real implementation, we would query the request table to get actual usage data
273 | # For this example, we'll simulate low usage with random data
274 | items = response.json()["result"]
275 |
276 | # Select a random subset of items to mark as low usage
277 | low_usage_items = random.sample(items, min(len(items), 5))
278 |
279 | # Add usage data to the items
280 | for item in low_usage_items:
281 | item["order_count"] = random.randint(1, 5) # Low number of orders
282 |
283 | return low_usage_items
284 |
285 | except Exception as e:
286 | logger.error(f"Error getting low usage items: {e}")
287 | return []
288 |
289 |
290 | def _get_high_abandonment_items(
291 | config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
292 | ) -> List[Dict]:
293 | """
294 | Get catalog items with high abandonment rates.
295 |
296 | Args:
297 | config: The server configuration
298 | auth_manager: The authentication manager
299 | category_id: Optional category ID to filter by
300 |
301 | Returns:
302 | A list of catalog items with high abandonment rates
303 | """
304 | try:
305 | # Build the query
306 | query = "active=true"
307 | if category_id:
308 | query += f"^category={category_id}"
309 |
310 | # Make the API request
311 | url = f"{config.instance_url}/api/now/table/sc_cat_item"
312 | headers = auth_manager.get_headers()
313 | params = {
314 | "sysparm_query": query,
315 | "sysparm_fields": "sys_id,name,short_description,category",
316 | "sysparm_limit": "50",
317 | }
318 |
319 | response = requests.get(url, headers=headers, params=params)
320 | response.raise_for_status()
321 |
322 | # In a real implementation, we would query the request table to get actual abandonment data
323 | # For this example, we'll simulate high abandonment with random data
324 | items = response.json()["result"]
325 |
326 | # Select a random subset of items to mark as high abandonment
327 | high_abandonment_items = random.sample(items, min(len(items), 5))
328 |
329 | # Add abandonment data to the items
330 | for item in high_abandonment_items:
331 | abandonment_rate = random.randint(40, 80) # High abandonment rate (40-80%)
332 | cart_adds = random.randint(20, 100) # Number of cart adds
333 | orders = int(cart_adds * (1 - abandonment_rate / 100)) # Number of completed orders
334 |
335 | item["abandonment_rate"] = abandonment_rate
336 | item["cart_adds"] = cart_adds
337 | item["orders"] = orders
338 |
339 | return high_abandonment_items
340 |
341 | except Exception as e:
342 | logger.error(f"Error getting high abandonment items: {e}")
343 | return []
344 |
345 |
346 | def _get_slow_fulfillment_items(
347 | config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
348 | ) -> List[Dict]:
349 | """
350 | Get catalog items with slow fulfillment times.
351 |
352 | Args:
353 | config: The server configuration
354 | auth_manager: The authentication manager
355 | category_id: Optional category ID to filter by
356 |
357 | Returns:
358 | A list of catalog items with slow fulfillment times
359 | """
360 | try:
361 | # Build the query
362 | query = "active=true"
363 | if category_id:
364 | query += f"^category={category_id}"
365 |
366 | # Make the API request
367 | url = f"{config.instance_url}/api/now/table/sc_cat_item"
368 | headers = auth_manager.get_headers()
369 | params = {
370 | "sysparm_query": query,
371 | "sysparm_fields": "sys_id,name,short_description,category",
372 | "sysparm_limit": "50",
373 | }
374 |
375 | response = requests.get(url, headers=headers, params=params)
376 | response.raise_for_status()
377 |
378 | # In a real implementation, we would query the request table to get actual fulfillment data
379 | # For this example, we'll simulate slow fulfillment with random data
380 | items = response.json()["result"]
381 |
382 | # Select a random subset of items to mark as slow fulfillment
383 | slow_fulfillment_items = random.sample(items, min(len(items), 5))
384 |
385 | # Add fulfillment data to the items
386 | catalog_avg_time = 2.5 # Average fulfillment time for the catalog (in days)
387 |
388 | for item in slow_fulfillment_items:
389 | # Generate a fulfillment time that's significantly higher than the catalog average
390 | fulfillment_time = random.uniform(5.0, 10.0) # 5-10 days
391 |
392 | item["avg_fulfillment_time"] = fulfillment_time
393 | item["avg_fulfillment_time_vs_catalog"] = round(fulfillment_time / catalog_avg_time, 1)
394 |
395 | return slow_fulfillment_items
396 |
397 | except Exception as e:
398 | logger.error(f"Error getting slow fulfillment items: {e}")
399 | return []
400 |
401 |
402 | def _get_poor_description_items(
403 | config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
404 | ) -> List[Dict]:
405 | """
406 | Get catalog items with poor description quality.
407 |
408 | Args:
409 | config: The server configuration
410 | auth_manager: The authentication manager
411 | category_id: Optional category ID to filter by
412 |
413 | Returns:
414 | A list of catalog items with poor description quality
415 | """
416 | try:
417 | # Build the query
418 | query = "active=true"
419 | if category_id:
420 | query += f"^category={category_id}"
421 |
422 | # Make the API request
423 | url = f"{config.instance_url}/api/now/table/sc_cat_item"
424 | headers = auth_manager.get_headers()
425 | params = {
426 | "sysparm_query": query,
427 | "sysparm_fields": "sys_id,name,short_description,category",
428 | "sysparm_limit": "50",
429 | }
430 |
431 | response = requests.get(url, headers=headers, params=params)
432 | response.raise_for_status()
433 |
434 | items = response.json()["result"]
435 | poor_description_items = []
436 |
437 | # Analyze each item's description quality
438 | for item in items:
439 | description = item.get("short_description", "")
440 | quality_issues = []
441 | quality_score = 100 # Start with perfect score
442 |
443 | # Check for empty description
444 | if not description:
445 | quality_issues.append("Missing description")
446 | quality_score = 0
447 | else:
448 | # Check for short description
449 | if len(description) < 30:
450 | quality_issues.append("Description too short")
451 | quality_issues.append("Lacks detail")
452 | quality_score -= 70
453 |
454 | # Check for instructional language instead of descriptive
455 | if "click here" in description.lower() or "request this" in description.lower():
456 | quality_issues.append("Uses instructional language instead of descriptive")
457 | quality_score -= 50
458 |
459 | # Check for vague terms
460 | vague_terms = ["etc", "and more", "and so on", "stuff", "things"]
461 | if any(term in description.lower() for term in vague_terms):
462 | quality_issues.append("Contains vague terms")
463 | quality_score -= 30
464 |
465 | # Ensure score is between 0 and 100
466 | quality_score = max(0, min(100, quality_score))
467 |
468 | # Add to poor description items if quality is below threshold
469 | if quality_score < 80:
470 | item["description_quality"] = quality_score
471 | item["quality_issues"] = quality_issues
472 | poor_description_items.append(item)
473 |
474 | return poor_description_items
475 |
476 | except Exception as e:
477 | logger.error(f"Error getting poor description items: {e}")
478 | return []
```
--------------------------------------------------------------------------------
/tests/test_changeset_tools.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the changeset tools.
3 |
4 | This module contains tests for the changeset tools in the ServiceNow MCP server.
5 | """
6 |
7 | import unittest
8 | from unittest.mock import MagicMock, patch
9 |
10 | from servicenow_mcp.auth.auth_manager import AuthManager
11 | from servicenow_mcp.tools.changeset_tools import (
12 | AddFileToChangesetParams,
13 | CommitChangesetParams,
14 | CreateChangesetParams,
15 | GetChangesetDetailsParams,
16 | ListChangesetsParams,
17 | PublishChangesetParams,
18 | UpdateChangesetParams,
19 | add_file_to_changeset,
20 | commit_changeset,
21 | create_changeset,
22 | get_changeset_details,
23 | list_changesets,
24 | publish_changeset,
25 | update_changeset,
26 | )
27 | from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
28 |
29 |
30 | class TestChangesetTools(unittest.TestCase):
31 | """Tests for the changeset tools."""
32 |
33 | def setUp(self):
34 | """Set up test fixtures."""
35 | auth_config = AuthConfig(
36 | type=AuthType.BASIC,
37 | basic=BasicAuthConfig(
38 | username="test_user",
39 | password="test_password"
40 | )
41 | )
42 | self.server_config = ServerConfig(
43 | instance_url="https://test.service-now.com",
44 | auth=auth_config,
45 | )
46 | self.auth_manager = MagicMock(spec=AuthManager)
47 | self.auth_manager.get_headers.return_value = {"Authorization": "Bearer test"}
48 |
49 | @patch("servicenow_mcp.tools.changeset_tools.requests.get")
50 | def test_list_changesets(self, mock_get):
51 | """Test listing changesets."""
52 | # Mock response
53 | mock_response = MagicMock()
54 | mock_response.json.return_value = {
55 | "result": [
56 | {
57 | "sys_id": "123",
58 | "name": "Test Changeset",
59 | "state": "in_progress",
60 | "application": "Test App",
61 | "developer": "test.user",
62 | }
63 | ]
64 | }
65 | mock_response.raise_for_status.return_value = None
66 | mock_get.return_value = mock_response
67 |
68 | # Call the function
69 | params = {
70 | "limit": 10,
71 | "offset": 0,
72 | "state": "in_progress",
73 | "application": "Test App",
74 | "developer": "test.user",
75 | }
76 | result = list_changesets(self.auth_manager, self.server_config, params)
77 |
78 | # Verify the result
79 | self.assertTrue(result["success"])
80 | self.assertEqual(len(result["changesets"]), 1)
81 | self.assertEqual(result["changesets"][0]["sys_id"], "123")
82 | self.assertEqual(result["changesets"][0]["name"], "Test Changeset")
83 |
84 | # Verify the API call
85 | mock_get.assert_called_once()
86 | args, kwargs = mock_get.call_args
87 | self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_set")
88 | self.assertEqual(kwargs["headers"], {"Authorization": "Bearer test"})
89 | self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
90 | self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
91 | self.assertIn("sysparm_query", kwargs["params"])
92 | self.assertIn("state=in_progress", kwargs["params"]["sysparm_query"])
93 | self.assertIn("application=Test App", kwargs["params"]["sysparm_query"])
94 | self.assertIn("developer=test.user", kwargs["params"]["sysparm_query"])
95 |
96 | @patch("servicenow_mcp.tools.changeset_tools.requests.get")
97 | def test_get_changeset_details(self, mock_get):
98 | """Test getting changeset details."""
99 | # Mock responses
100 | mock_changeset_response = MagicMock()
101 | mock_changeset_response.json.return_value = {
102 | "result": {
103 | "sys_id": "123",
104 | "name": "Test Changeset",
105 | "state": "in_progress",
106 | "application": "Test App",
107 | "developer": "test.user",
108 | }
109 | }
110 | mock_changeset_response.raise_for_status.return_value = None
111 |
112 | mock_changes_response = MagicMock()
113 | mock_changes_response.json.return_value = {
114 | "result": [
115 | {
116 | "sys_id": "456",
117 | "name": "test_file.py",
118 | "type": "file",
119 | "update_set": "123",
120 | }
121 | ]
122 | }
123 | mock_changes_response.raise_for_status.return_value = None
124 |
125 | # Set up the mock to return different responses for different URLs
126 | def side_effect(*args, **kwargs):
127 | url = args[0]
128 | if "sys_update_set" in url:
129 | return mock_changeset_response
130 | elif "sys_update_xml" in url:
131 | return mock_changes_response
132 | return None
133 |
134 | mock_get.side_effect = side_effect
135 |
136 | # Call the function
137 | params = {"changeset_id": "123"}
138 | result = get_changeset_details(self.auth_manager, self.server_config, params)
139 |
140 | # Verify the result
141 | self.assertTrue(result["success"])
142 | self.assertEqual(result["changeset"]["sys_id"], "123")
143 | self.assertEqual(result["changeset"]["name"], "Test Changeset")
144 | self.assertEqual(len(result["changes"]), 1)
145 | self.assertEqual(result["changes"][0]["sys_id"], "456")
146 | self.assertEqual(result["changes"][0]["name"], "test_file.py")
147 |
148 | # Verify the API calls
149 | self.assertEqual(mock_get.call_count, 2)
150 | first_call_args, first_call_kwargs = mock_get.call_args_list[0]
151 | self.assertEqual(
152 | first_call_args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
153 | )
154 | self.assertEqual(first_call_kwargs["headers"], {"Authorization": "Bearer test"})
155 |
156 | second_call_args, second_call_kwargs = mock_get.call_args_list[1]
157 | self.assertEqual(
158 | second_call_args[0], "https://test.service-now.com/api/now/table/sys_update_xml"
159 | )
160 | self.assertEqual(second_call_kwargs["headers"], {"Authorization": "Bearer test"})
161 | self.assertEqual(second_call_kwargs["params"]["sysparm_query"], "update_set=123")
162 |
163 | @patch("servicenow_mcp.tools.changeset_tools.requests.post")
164 | def test_create_changeset(self, mock_post):
165 | """Test creating a changeset."""
166 | # Mock response
167 | mock_response = MagicMock()
168 | mock_response.json.return_value = {
169 | "result": {
170 | "sys_id": "123",
171 | "name": "Test Changeset",
172 | "application": "Test App",
173 | "developer": "test.user",
174 | }
175 | }
176 | mock_response.raise_for_status.return_value = None
177 | mock_post.return_value = mock_response
178 |
179 | # Call the function
180 | params = {
181 | "name": "Test Changeset",
182 | "application": "Test App",
183 | "developer": "test.user",
184 | "description": "Test description",
185 | }
186 | result = create_changeset(self.auth_manager, self.server_config, params)
187 |
188 | # Verify the result
189 | self.assertTrue(result["success"])
190 | self.assertEqual(result["changeset"]["sys_id"], "123")
191 | self.assertEqual(result["changeset"]["name"], "Test Changeset")
192 | self.assertEqual(result["message"], "Changeset created successfully")
193 |
194 | # Verify the API call
195 | mock_post.assert_called_once()
196 | args, kwargs = mock_post.call_args
197 | self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_set")
198 | self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
199 | self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
200 | self.assertEqual(kwargs["json"]["name"], "Test Changeset")
201 | self.assertEqual(kwargs["json"]["application"], "Test App")
202 | self.assertEqual(kwargs["json"]["developer"], "test.user")
203 | self.assertEqual(kwargs["json"]["description"], "Test description")
204 |
205 | @patch("servicenow_mcp.tools.changeset_tools.requests.patch")
206 | def test_update_changeset(self, mock_patch):
207 | """Test updating a changeset."""
208 | # Mock response
209 | mock_response = MagicMock()
210 | mock_response.json.return_value = {
211 | "result": {
212 | "sys_id": "123",
213 | "name": "Updated Changeset",
214 | "state": "in_progress",
215 | "application": "Test App",
216 | "developer": "test.user",
217 | }
218 | }
219 | mock_response.raise_for_status.return_value = None
220 | mock_patch.return_value = mock_response
221 |
222 | # Call the function
223 | params = {
224 | "changeset_id": "123",
225 | "name": "Updated Changeset",
226 | "state": "in_progress",
227 | }
228 | result = update_changeset(self.auth_manager, self.server_config, params)
229 |
230 | # Verify the result
231 | self.assertTrue(result["success"])
232 | self.assertEqual(result["changeset"]["sys_id"], "123")
233 | self.assertEqual(result["changeset"]["name"], "Updated Changeset")
234 | self.assertEqual(result["message"], "Changeset updated successfully")
235 |
236 | # Verify the API call
237 | mock_patch.assert_called_once()
238 | args, kwargs = mock_patch.call_args
239 | self.assertEqual(
240 | args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
241 | )
242 | self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
243 | self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
244 | self.assertEqual(kwargs["json"]["name"], "Updated Changeset")
245 | self.assertEqual(kwargs["json"]["state"], "in_progress")
246 |
247 | @patch("servicenow_mcp.tools.changeset_tools.requests.patch")
248 | def test_commit_changeset(self, mock_patch):
249 | """Test committing a changeset."""
250 | # Mock response
251 | mock_response = MagicMock()
252 | mock_response.json.return_value = {
253 | "result": {
254 | "sys_id": "123",
255 | "name": "Test Changeset",
256 | "state": "complete",
257 | "application": "Test App",
258 | "developer": "test.user",
259 | }
260 | }
261 | mock_response.raise_for_status.return_value = None
262 | mock_patch.return_value = mock_response
263 |
264 | # Call the function
265 | params = {
266 | "changeset_id": "123",
267 | "commit_message": "Commit message",
268 | }
269 | result = commit_changeset(self.auth_manager, self.server_config, params)
270 |
271 | # Verify the result
272 | self.assertTrue(result["success"])
273 | self.assertEqual(result["changeset"]["sys_id"], "123")
274 | self.assertEqual(result["changeset"]["state"], "complete")
275 | self.assertEqual(result["message"], "Changeset committed successfully")
276 |
277 | # Verify the API call
278 | mock_patch.assert_called_once()
279 | args, kwargs = mock_patch.call_args
280 | self.assertEqual(
281 | args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
282 | )
283 | self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
284 | self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
285 | self.assertEqual(kwargs["json"]["state"], "complete")
286 | self.assertEqual(kwargs["json"]["description"], "Commit message")
287 |
288 | @patch("servicenow_mcp.tools.changeset_tools.requests.patch")
289 | def test_publish_changeset(self, mock_patch):
290 | """Test publishing a changeset."""
291 | # Mock response
292 | mock_response = MagicMock()
293 | mock_response.json.return_value = {
294 | "result": {
295 | "sys_id": "123",
296 | "name": "Test Changeset",
297 | "state": "published",
298 | "application": "Test App",
299 | "developer": "test.user",
300 | }
301 | }
302 | mock_response.raise_for_status.return_value = None
303 | mock_patch.return_value = mock_response
304 |
305 | # Call the function
306 | params = {
307 | "changeset_id": "123",
308 | "publish_notes": "Publish notes",
309 | }
310 | result = publish_changeset(self.auth_manager, self.server_config, params)
311 |
312 | # Verify the result
313 | self.assertTrue(result["success"])
314 | self.assertEqual(result["changeset"]["sys_id"], "123")
315 | self.assertEqual(result["changeset"]["state"], "published")
316 | self.assertEqual(result["message"], "Changeset published successfully")
317 |
318 | # Verify the API call
319 | mock_patch.assert_called_once()
320 | args, kwargs = mock_patch.call_args
321 | self.assertEqual(
322 | args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
323 | )
324 | self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
325 | self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
326 | self.assertEqual(kwargs["json"]["state"], "published")
327 | self.assertEqual(kwargs["json"]["description"], "Publish notes")
328 |
329 | @patch("servicenow_mcp.tools.changeset_tools.requests.post")
330 | def test_add_file_to_changeset(self, mock_post):
331 | """Test adding a file to a changeset."""
332 | # Mock response
333 | mock_response = MagicMock()
334 | mock_response.json.return_value = {
335 | "result": {
336 | "sys_id": "456",
337 | "name": "test_file.py",
338 | "type": "file",
339 | "update_set": "123",
340 | "payload": "print('Hello, world!')",
341 | }
342 | }
343 | mock_response.raise_for_status.return_value = None
344 | mock_post.return_value = mock_response
345 |
346 | # Call the function
347 | params = {
348 | "changeset_id": "123",
349 | "file_path": "test_file.py",
350 | "file_content": "print('Hello, world!')",
351 | }
352 | result = add_file_to_changeset(self.auth_manager, self.server_config, params)
353 |
354 | # Verify the result
355 | self.assertTrue(result["success"])
356 | self.assertEqual(result["file"]["sys_id"], "456")
357 | self.assertEqual(result["file"]["name"], "test_file.py")
358 | self.assertEqual(result["message"], "File added to changeset successfully")
359 |
360 | # Verify the API call
361 | mock_post.assert_called_once()
362 | args, kwargs = mock_post.call_args
363 | self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_xml")
364 | self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
365 | self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
366 | self.assertEqual(kwargs["json"]["update_set"], "123")
367 | self.assertEqual(kwargs["json"]["name"], "test_file.py")
368 | self.assertEqual(kwargs["json"]["payload"], "print('Hello, world!')")
369 | self.assertEqual(kwargs["json"]["type"], "file")
370 |
371 |
372 | class TestChangesetToolsParams(unittest.TestCase):
373 | """Tests for the changeset tools parameter classes."""
374 |
375 | def test_list_changesets_params(self):
376 | """Test ListChangesetsParams."""
377 | params = ListChangesetsParams(
378 | limit=20,
379 | offset=10,
380 | state="in_progress",
381 | application="Test App",
382 | developer="test.user",
383 | timeframe="recent",
384 | query="name=test",
385 | )
386 | self.assertEqual(params.limit, 20)
387 | self.assertEqual(params.offset, 10)
388 | self.assertEqual(params.state, "in_progress")
389 | self.assertEqual(params.application, "Test App")
390 | self.assertEqual(params.developer, "test.user")
391 | self.assertEqual(params.timeframe, "recent")
392 | self.assertEqual(params.query, "name=test")
393 |
394 | def test_get_changeset_details_params(self):
395 | """Test GetChangesetDetailsParams."""
396 | params = GetChangesetDetailsParams(changeset_id="123")
397 | self.assertEqual(params.changeset_id, "123")
398 |
399 | def test_create_changeset_params(self):
400 | """Test CreateChangesetParams."""
401 | params = CreateChangesetParams(
402 | name="Test Changeset",
403 | description="Test description",
404 | application="Test App",
405 | developer="test.user",
406 | )
407 | self.assertEqual(params.name, "Test Changeset")
408 | self.assertEqual(params.description, "Test description")
409 | self.assertEqual(params.application, "Test App")
410 | self.assertEqual(params.developer, "test.user")
411 |
412 | def test_update_changeset_params(self):
413 | """Test UpdateChangesetParams."""
414 | params = UpdateChangesetParams(
415 | changeset_id="123",
416 | name="Updated Changeset",
417 | description="Updated description",
418 | state="in_progress",
419 | developer="test.user",
420 | )
421 | self.assertEqual(params.changeset_id, "123")
422 | self.assertEqual(params.name, "Updated Changeset")
423 | self.assertEqual(params.description, "Updated description")
424 | self.assertEqual(params.state, "in_progress")
425 | self.assertEqual(params.developer, "test.user")
426 |
427 | def test_commit_changeset_params(self):
428 | """Test CommitChangesetParams."""
429 | params = CommitChangesetParams(
430 | changeset_id="123",
431 | commit_message="Commit message",
432 | )
433 | self.assertEqual(params.changeset_id, "123")
434 | self.assertEqual(params.commit_message, "Commit message")
435 |
436 | def test_publish_changeset_params(self):
437 | """Test PublishChangesetParams."""
438 | params = PublishChangesetParams(
439 | changeset_id="123",
440 | publish_notes="Publish notes",
441 | )
442 | self.assertEqual(params.changeset_id, "123")
443 | self.assertEqual(params.publish_notes, "Publish notes")
444 |
445 | def test_add_file_to_changeset_params(self):
446 | """Test AddFileToChangesetParams."""
447 | params = AddFileToChangesetParams(
448 | changeset_id="123",
449 | file_path="test_file.py",
450 | file_content="print('Hello, world!')",
451 | )
452 | self.assertEqual(params.changeset_id, "123")
453 | self.assertEqual(params.file_path, "test_file.py")
454 | self.assertEqual(params.file_content, "print('Hello, world!')")
455 |
456 |
457 | if __name__ == "__main__":
458 | unittest.main()
```