#
tokens: 47660/50000 11/79 files (page 3/5)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 3 of 5. Use http://codebase.md/osomai/servicenow-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .cursorrules
├── .DS_Store
├── .env.example
├── .gitignore
├── config
│   └── tool_packages.yaml
├── debug_workflow_api.py
├── Dockerfile
├── docs
│   ├── catalog_optimization_plan.md
│   ├── catalog_variables.md
│   ├── catalog.md
│   ├── change_management.md
│   ├── changeset_management.md
│   ├── incident_management.md
│   ├── knowledge_base.md
│   ├── user_management.md
│   └── workflow_management.md
├── examples
│   ├── catalog_integration_test.py
│   ├── catalog_optimization_example.py
│   ├── change_management_demo.py
│   ├── changeset_management_demo.py
│   ├── claude_catalog_demo.py
│   ├── claude_desktop_config.json
│   ├── claude_incident_demo.py
│   ├── debug_workflow_api.py
│   ├── wake_servicenow_instance.py
│   └── workflow_management_demo.py
├── LICENSE
├── prompts
│   └── add_servicenow_mcp_tool.md
├── pyproject.toml
├── README.md
├── scripts
│   ├── check_pdi_info.py
│   ├── check_pdi_status.py
│   ├── install_claude_desktop.sh
│   ├── setup_api_key.py
│   ├── setup_auth.py
│   ├── setup_oauth.py
│   ├── setup.sh
│   └── test_connection.py
├── src
│   ├── .DS_Store
│   └── servicenow_mcp
│       ├── __init__.py
│       ├── .DS_Store
│       ├── auth
│       │   ├── __init__.py
│       │   └── auth_manager.py
│       ├── cli.py
│       ├── server_sse.py
│       ├── server.py
│       ├── tools
│       │   ├── __init__.py
│       │   ├── catalog_optimization.py
│       │   ├── catalog_tools.py
│       │   ├── catalog_variables.py
│       │   ├── change_tools.py
│       │   ├── changeset_tools.py
│       │   ├── epic_tools.py
│       │   ├── incident_tools.py
│       │   ├── knowledge_base.py
│       │   ├── project_tools.py
│       │   ├── script_include_tools.py
│       │   ├── scrum_task_tools.py
│       │   ├── story_tools.py
│       │   ├── user_tools.py
│       │   └── workflow_tools.py
│       └── utils
│           ├── __init__.py
│           ├── config.py
│           └── tool_utils.py
├── tests
│   ├── test_catalog_optimization.py
│   ├── test_catalog_resources.py
│   ├── test_catalog_tools.py
│   ├── test_catalog_variables.py
│   ├── test_change_tools.py
│   ├── test_changeset_resources.py
│   ├── test_changeset_tools.py
│   ├── test_config.py
│   ├── test_incident_tools.py
│   ├── test_knowledge_base.py
│   ├── test_script_include_resources.py
│   ├── test_script_include_tools.py
│   ├── test_server_catalog_optimization.py
│   ├── test_server_catalog.py
│   ├── test_server_workflow.py
│   ├── test_user_tools.py
│   ├── test_workflow_tools_direct.py
│   ├── test_workflow_tools_params.py
│   └── test_workflow_tools.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/test_catalog_variables.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the catalog item variables tools.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | import requests
  8 | 
  9 | from servicenow_mcp.tools.catalog_variables import (
 10 |     CreateCatalogItemVariableParams,
 11 |     ListCatalogItemVariablesParams,
 12 |     UpdateCatalogItemVariableParams,
 13 |     create_catalog_item_variable,
 14 |     list_catalog_item_variables,
 15 |     update_catalog_item_variable,
 16 | )
 17 | from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
 18 | 
 19 | 
 20 | class TestCatalogVariablesTools(unittest.TestCase):
 21 |     """
 22 |     Test the catalog item variables tools.
 23 |     """
 24 | 
 25 |     def setUp(self):
 26 |         """Set up the test environment."""
 27 |         self.config = ServerConfig(
 28 |             instance_url="https://test.service-now.com",
 29 |             timeout=10,
 30 |             auth=AuthConfig(
 31 |                 type=AuthType.BASIC,
 32 |                 basic=BasicAuthConfig(
 33 |                     username="test_user",
 34 |                     password="test_password"
 35 |                 )
 36 |             ),
 37 |         )
 38 |         self.auth_manager = MagicMock()
 39 |         self.auth_manager.get_headers.return_value = {"Content-Type": "application/json"}
 40 | 
 41 |     @patch("requests.post")
 42 |     def test_create_catalog_item_variable(self, mock_post):
 43 |         """Test create_catalog_item_variable function."""
 44 |         # Configure mock
 45 |         mock_response = MagicMock()
 46 |         mock_response.raise_for_status = MagicMock()
 47 |         mock_response.json.return_value = {
 48 |             "result": {
 49 |                 "sys_id": "abc123",
 50 |                 "name": "test_variable",
 51 |                 "type": "string",
 52 |                 "question_text": "Test Variable",
 53 |                 "mandatory": "false",
 54 |             }
 55 |         }
 56 |         mock_post.return_value = mock_response
 57 | 
 58 |         # Create test params
 59 |         params = CreateCatalogItemVariableParams(
 60 |             catalog_item_id="item123",
 61 |             name="test_variable",
 62 |             type="string",
 63 |             label="Test Variable",
 64 |             mandatory=False,
 65 |         )
 66 | 
 67 |         # Call function
 68 |         result = create_catalog_item_variable(self.config, self.auth_manager, params)
 69 | 
 70 |         # Verify result
 71 |         self.assertTrue(result.success)
 72 |         self.assertEqual(result.variable_id, "abc123")
 73 |         self.assertIsNotNone(result.details)
 74 | 
 75 |         # Verify mock was called correctly
 76 |         mock_post.assert_called_once()
 77 |         call_args = mock_post.call_args
 78 |         self.assertEqual(
 79 |             call_args[0][0], f"{self.config.instance_url}/api/now/table/item_option_new"
 80 |         )
 81 |         self.assertEqual(call_args[1]["json"]["cat_item"], "item123")
 82 |         self.assertEqual(call_args[1]["json"]["name"], "test_variable")
 83 |         self.assertEqual(call_args[1]["json"]["type"], "string")
 84 |         self.assertEqual(call_args[1]["json"]["question_text"], "Test Variable")
 85 |         self.assertEqual(call_args[1]["json"]["mandatory"], "false")
 86 | 
 87 |     @patch("requests.post")
 88 |     def test_create_catalog_item_variable_with_optional_params(self, mock_post):
 89 |         """Test create_catalog_item_variable function with optional parameters."""
 90 |         # Configure mock
 91 |         mock_response = MagicMock()
 92 |         mock_response.raise_for_status = MagicMock()
 93 |         mock_response.json.return_value = {
 94 |             "result": {
 95 |                 "sys_id": "abc123",
 96 |                 "name": "test_variable",
 97 |                 "type": "reference",
 98 |                 "question_text": "Test Reference",
 99 |                 "mandatory": "true",
100 |                 "reference": "sys_user",
101 |                 "reference_qual": "active=true",
102 |                 "help_text": "Select a user",
103 |                 "default_value": "admin",
104 |                 "description": "Reference to a user",
105 |                 "order": 100,
106 |             }
107 |         }
108 |         mock_post.return_value = mock_response
109 | 
110 |         # Create test params with optional fields
111 |         params = CreateCatalogItemVariableParams(
112 |             catalog_item_id="item123",
113 |             name="test_variable",
114 |             type="reference",
115 |             label="Test Reference",
116 |             mandatory=True,
117 |             help_text="Select a user",
118 |             default_value="admin",
119 |             description="Reference to a user",
120 |             order=100,
121 |             reference_table="sys_user",
122 |             reference_qualifier="active=true",
123 |         )
124 | 
125 |         # Call function
126 |         result = create_catalog_item_variable(self.config, self.auth_manager, params)
127 | 
128 |         # Verify result
129 |         self.assertTrue(result.success)
130 |         self.assertEqual(result.variable_id, "abc123")
131 | 
132 |         # Verify mock was called correctly
133 |         mock_post.assert_called_once()
134 |         call_args = mock_post.call_args
135 |         self.assertEqual(call_args[1]["json"]["reference"], "sys_user")
136 |         self.assertEqual(call_args[1]["json"]["reference_qual"], "active=true")
137 |         self.assertEqual(call_args[1]["json"]["help_text"], "Select a user")
138 |         self.assertEqual(call_args[1]["json"]["default_value"], "admin")
139 |         self.assertEqual(call_args[1]["json"]["description"], "Reference to a user")
140 |         self.assertEqual(call_args[1]["json"]["order"], 100)
141 | 
142 |     @patch("requests.post")
143 |     def test_create_catalog_item_variable_error(self, mock_post):
144 |         """Test create_catalog_item_variable function with error."""
145 |         # Configure mock to raise exception
146 |         mock_post.side_effect = requests.RequestException("Test error")
147 | 
148 |         # Create test params
149 |         params = CreateCatalogItemVariableParams(
150 |             catalog_item_id="item123",
151 |             name="test_variable",
152 |             type="string",
153 |             label="Test Variable",
154 |         )
155 | 
156 |         # Call function
157 |         result = create_catalog_item_variable(self.config, self.auth_manager, params)
158 | 
159 |         # Verify result
160 |         self.assertFalse(result.success)
161 |         self.assertTrue("failed" in result.message.lower())
162 | 
163 |     @patch("requests.get")
164 |     def test_list_catalog_item_variables(self, mock_get):
165 |         """Test list_catalog_item_variables function."""
166 |         # Configure mock
167 |         mock_response = MagicMock()
168 |         mock_response.raise_for_status = MagicMock()
169 |         mock_response.json.return_value = {
170 |             "result": [
171 |                 {
172 |                     "sys_id": "var1",
173 |                     "name": "variable1",
174 |                     "type": "string",
175 |                     "question_text": "Variable 1",
176 |                     "order": 100,
177 |                     "mandatory": "true",
178 |                 },
179 |                 {
180 |                     "sys_id": "var2",
181 |                     "name": "variable2",
182 |                     "type": "integer",
183 |                     "question_text": "Variable 2",
184 |                     "order": 200,
185 |                     "mandatory": "false",
186 |                 },
187 |             ]
188 |         }
189 |         mock_get.return_value = mock_response
190 | 
191 |         # Create test params
192 |         params = ListCatalogItemVariablesParams(
193 |             catalog_item_id="item123",
194 |             include_details=True,
195 |         )
196 | 
197 |         # Call function
198 |         result = list_catalog_item_variables(self.config, self.auth_manager, params)
199 | 
200 |         # Verify result
201 |         self.assertTrue(result.success)
202 |         self.assertEqual(result.count, 2)
203 |         self.assertEqual(len(result.variables), 2)
204 |         self.assertEqual(result.variables[0]["sys_id"], "var1")
205 |         self.assertEqual(result.variables[1]["sys_id"], "var2")
206 | 
207 |         # Verify mock was called correctly
208 |         mock_get.assert_called_once()
209 |         call_args = mock_get.call_args
210 |         self.assertEqual(
211 |             call_args[0][0], f"{self.config.instance_url}/api/now/table/item_option_new"
212 |         )
213 |         self.assertEqual(
214 |             call_args[1]["params"]["sysparm_query"], "cat_item=item123^ORDERBYorder"
215 |         )
216 |         self.assertEqual(call_args[1]["params"]["sysparm_display_value"], "true")
217 |         self.assertEqual(call_args[1]["params"]["sysparm_exclude_reference_link"], "false")
218 | 
219 |     @patch("requests.get")
220 |     def test_list_catalog_item_variables_with_pagination(self, mock_get):
221 |         """Test list_catalog_item_variables function with pagination parameters."""
222 |         # Configure mock
223 |         mock_response = MagicMock()
224 |         mock_response.raise_for_status = MagicMock()
225 |         mock_response.json.return_value = {"result": [{"sys_id": "var1"}]}
226 |         mock_get.return_value = mock_response
227 | 
228 |         # Create test params with pagination
229 |         params = ListCatalogItemVariablesParams(
230 |             catalog_item_id="item123",
231 |             include_details=False,
232 |             limit=10,
233 |             offset=20,
234 |         )
235 | 
236 |         # Call function
237 |         result = list_catalog_item_variables(self.config, self.auth_manager, params)
238 | 
239 |         # Verify result
240 |         self.assertTrue(result.success)
241 | 
242 |         # Verify mock was called correctly with pagination
243 |         mock_get.assert_called_once()
244 |         call_args = mock_get.call_args
245 |         self.assertEqual(call_args[1]["params"]["sysparm_limit"], 10)
246 |         self.assertEqual(call_args[1]["params"]["sysparm_offset"], 20)
247 |         self.assertEqual(
248 |             call_args[1]["params"]["sysparm_fields"],
249 |             "sys_id,name,type,question_text,order,mandatory",
250 |         )
251 | 
252 |     @patch("requests.get")
253 |     def test_list_catalog_item_variables_error(self, mock_get):
254 |         """Test list_catalog_item_variables function with error."""
255 |         # Configure mock to raise exception
256 |         mock_get.side_effect = requests.RequestException("Test error")
257 | 
258 |         # Create test params
259 |         params = ListCatalogItemVariablesParams(
260 |             catalog_item_id="item123",
261 |         )
262 | 
263 |         # Call function
264 |         result = list_catalog_item_variables(self.config, self.auth_manager, params)
265 | 
266 |         # Verify result
267 |         self.assertFalse(result.success)
268 |         self.assertTrue("failed" in result.message.lower())
269 | 
270 |     @patch("requests.patch")
271 |     def test_update_catalog_item_variable(self, mock_patch):
272 |         """Test update_catalog_item_variable function."""
273 |         # Configure mock
274 |         mock_response = MagicMock()
275 |         mock_response.raise_for_status = MagicMock()
276 |         mock_response.json.return_value = {
277 |             "result": {
278 |                 "sys_id": "var1",
279 |                 "question_text": "Updated Variable",
280 |                 "mandatory": "true",
281 |                 "help_text": "This is help text",
282 |             }
283 |         }
284 |         mock_patch.return_value = mock_response
285 | 
286 |         # Create test params
287 |         params = UpdateCatalogItemVariableParams(
288 |             variable_id="var1",
289 |             label="Updated Variable",
290 |             mandatory=True,
291 |             help_text="This is help text",
292 |         )
293 | 
294 |         # Call function
295 |         result = update_catalog_item_variable(self.config, self.auth_manager, params)
296 | 
297 |         # Verify result
298 |         self.assertTrue(result.success)
299 |         self.assertEqual(result.variable_id, "var1")
300 |         self.assertIsNotNone(result.details)
301 | 
302 |         # Verify mock was called correctly
303 |         mock_patch.assert_called_once()
304 |         call_args = mock_patch.call_args
305 |         self.assertEqual(
306 |             call_args[0][0],
307 |             f"{self.config.instance_url}/api/now/table/item_option_new/var1",
308 |         )
309 |         self.assertEqual(call_args[1]["json"]["question_text"], "Updated Variable")
310 |         self.assertEqual(call_args[1]["json"]["mandatory"], "true")
311 |         self.assertEqual(call_args[1]["json"]["help_text"], "This is help text")
312 | 
313 |     @patch("requests.patch")
314 |     def test_update_catalog_item_variable_no_params(self, mock_patch):
315 |         """Test update_catalog_item_variable function with no update parameters."""
316 |         # Create test params with no updates (only ID)
317 |         params = UpdateCatalogItemVariableParams(
318 |             variable_id="var1",
319 |         )
320 | 
321 |         # Call function
322 |         result = update_catalog_item_variable(self.config, self.auth_manager, params)
323 | 
324 |         # Verify result - should fail since no update parameters provided
325 |         self.assertFalse(result.success)
326 |         self.assertEqual(result.message, "No update parameters provided")
327 | 
328 |         # Verify mock was not called
329 |         mock_patch.assert_not_called()
330 | 
331 |     @patch("requests.patch")
332 |     def test_update_catalog_item_variable_error(self, mock_patch):
333 |         """Test update_catalog_item_variable function with error."""
334 |         # Configure mock to raise exception
335 |         mock_patch.side_effect = requests.RequestException("Test error")
336 | 
337 |         # Create test params
338 |         params = UpdateCatalogItemVariableParams(
339 |             variable_id="var1",
340 |             label="Updated Variable",
341 |         )
342 | 
343 |         # Call function
344 |         result = update_catalog_item_variable(self.config, self.auth_manager, params)
345 | 
346 |         # Verify result
347 |         self.assertFalse(result.success)
348 |         self.assertTrue("failed" in result.message.lower())
349 | 
350 | 
351 | if __name__ == "__main__":
352 |     unittest.main() 
```

--------------------------------------------------------------------------------
/src/servicenow_mcp/server.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | ServiceNow MCP Server
  3 | 
  4 | This module provides the main implementation of the ServiceNow MCP server.
  5 | """
  6 | 
  7 | import json
  8 | import logging
  9 | import os
 10 | from typing import Any, Dict, List, Union
 11 | 
 12 | import mcp.types as types
 13 | import yaml
 14 | from mcp.server.lowlevel import Server
 15 | from pydantic import ValidationError
 16 | 
 17 | from servicenow_mcp.auth.auth_manager import AuthManager
 18 | from servicenow_mcp.tools.knowledge_base import (
 19 |     create_category as create_kb_category_tool,
 20 | )
 21 | from servicenow_mcp.tools.knowledge_base import (
 22 |     list_categories as list_kb_categories_tool,
 23 | )
 24 | from servicenow_mcp.utils.config import ServerConfig
 25 | from servicenow_mcp.utils.tool_utils import get_tool_definitions
 26 | 
 27 | # Set up logging
 28 | logging.basicConfig(level=logging.INFO)
 29 | logger = logging.getLogger(__name__)
 30 | 
 31 | # Define path for the configuration file
 32 | TOOL_PACKAGE_CONFIG_PATH = os.getenv("TOOL_PACKAGE_CONFIG_PATH", "config/tool_packages.yaml")
 33 | 
 34 | 
 35 | def serialize_tool_output(result: Any, tool_name: str) -> str:
 36 |     """Serializes tool output to a string, preferably JSON indented."""
 37 |     try:
 38 |         if isinstance(result, str):
 39 |             # If it's already a string, assume it's intended as such
 40 |             # Try to parse/re-dump JSON for consistent formatting if it looks like JSON
 41 |             try:
 42 |                 parsed = json.loads(result)
 43 |                 return json.dumps(parsed, indent=2)
 44 |             except json.JSONDecodeError:
 45 |                 return result  # Return as is if not valid JSON
 46 |         elif isinstance(result, dict):
 47 |             # Dump dicts to JSON
 48 |             return json.dumps(result, indent=2)
 49 |         elif hasattr(result, "model_dump_json"):  # Pydantic v2
 50 |             # Prefer Pydantic v2 model_dump_json
 51 |             # The indent argument might not be supported by all versions/models,
 52 |             # so we might need to dump to dict first if indent is crucial.
 53 |             try:
 54 |                 return result.model_dump_json(indent=2)
 55 |             except TypeError:  # Handle case where indent is not supported
 56 |                 return json.dumps(result.model_dump(), indent=2)
 57 |         elif hasattr(result, "model_dump"):  # Pydantic v2 fallback
 58 |             # Fallback to Pydantic v2 model_dump -> dict -> json
 59 |             return json.dumps(result.model_dump(), indent=2)
 60 |         elif hasattr(result, "dict"):  # Pydantic v1
 61 |             # Fallback to Pydantic v1 dict -> json
 62 |             return json.dumps(result.dict(), indent=2)
 63 |         else:
 64 |             # Absolute fallback: convert to string
 65 |             logger.warning(
 66 |                 f"Could not serialize result for tool '{tool_name}' to JSON, falling back to str(). Type: {type(result)}"
 67 |             )
 68 |             return str(result)
 69 |     except Exception as e:
 70 |         logger.error(f"Error during serialization for tool '{tool_name}': {e}", exc_info=True)
 71 |         # Return an error message string formatted as JSON
 72 |         return json.dumps(
 73 |             {"error": f"Serialization failed for tool {tool_name}", "details": str(e)}, indent=2
 74 |         )
 75 | 
 76 | 
 77 | class ServiceNowMCP:
 78 |     """
 79 |     ServiceNow MCP Server implementation.
 80 | 
 81 |     This class provides a Model Context Protocol (MCP) server for ServiceNow,
 82 |     allowing LLMs to interact with ServiceNow data and functionality.
 83 |     It supports loading specific tool packages via the MCP_TOOL_PACKAGE env var.
 84 |     """
 85 | 
 86 |     def __init__(self, config: Union[Dict, ServerConfig]):
 87 |         """
 88 |         Initialize the ServiceNow MCP server.
 89 | 
 90 |         Args:
 91 |             config: Server configuration, either as a dictionary or ServerConfig object.
 92 |         """
 93 |         if isinstance(config, dict):
 94 |             self.config = ServerConfig(**config)
 95 |         else:
 96 |             self.config = config
 97 | 
 98 |         self.auth_manager = AuthManager(self.config.auth, self.config.instance_url)
 99 |         self.mcp_server = Server("ServiceNow")  # Use low-level Server
100 |         self.name = "ServiceNow"
101 | 
102 |         self.package_definitions: Dict[str, List[str]] = {}
103 |         self.enabled_tool_names: List[str] = []
104 |         self.current_package_name: str = "none"
105 |         self._load_package_config()
106 |         self._determine_enabled_tools()
107 | 
108 |         # Get tool definitions, passing the aliased KB tool functions if needed
109 |         self.tool_definitions = get_tool_definitions(
110 |             create_kb_category_tool, list_kb_categories_tool
111 |         )
112 | 
113 |         self._register_handlers()
114 | 
115 |     def _register_handlers(self):
116 |         """Register the list_tools and call_tool handlers."""
117 |         self.mcp_server.list_tools()(self._list_tools_impl)
118 |         self.mcp_server.call_tool()(self._call_tool_impl)
119 |         logger.info("Registered list_tools and call_tool handlers.")
120 | 
121 |     def _load_package_config(self):
122 |         """Load tool package definitions from the YAML configuration file."""
123 |         config_path = TOOL_PACKAGE_CONFIG_PATH
124 |         if not os.path.isabs(config_path):
125 |             config_path = os.path.join(os.path.dirname(__file__), "..", "..", config_path)
126 |             config_path = os.path.abspath(config_path)
127 | 
128 |         try:
129 |             with open(config_path, "r") as f:
130 |                 loaded_config = yaml.safe_load(f)
131 |                 if isinstance(loaded_config, dict):
132 |                     self.package_definitions = loaded_config
133 |                     logger.info(f"Successfully loaded tool package config from {config_path}")
134 |                 else:
135 |                     logger.error(
136 |                         f"Invalid format in {config_path}: Expected a dictionary, got {type(loaded_config)}. No packages loaded."
137 |                     )
138 |                     self.package_definitions = {}
139 |         except FileNotFoundError:
140 |             logger.error(
141 |                 f"Tool package config file not found at {config_path}. No packages loaded."
142 |             )
143 |             self.package_definitions = {}
144 |         except yaml.YAMLError as e:
145 |             logger.error(
146 |                 f"Error parsing tool package config file {config_path}: {e}. No packages loaded."
147 |             )
148 |             self.package_definitions = {}
149 |         except Exception as e:
150 |             logger.error(f"Unexpected error loading tool package config {config_path}: {e}")
151 |             self.package_definitions = {}
152 | 
153 |     def _determine_enabled_tools(self):
154 |         """Determine which tool package and tools to enable based on environment variable."""
155 |         requested_package = os.getenv("MCP_TOOL_PACKAGE", "full").strip()
156 | 
157 |         if not requested_package:
158 |             self.current_package_name = "full"
159 |             logger.info("MCP_TOOL_PACKAGE is empty, defaulting to 'full' package.")
160 |         elif requested_package in self.package_definitions:
161 |             self.current_package_name = requested_package
162 |             logger.info(f"MCP_TOOL_PACKAGE set to '{self.current_package_name}'.")
163 |         else:
164 |             self.current_package_name = "none"
165 |             logger.warning(
166 |                 f"MCP_TOOL_PACKAGE '{requested_package}' is not a valid package name. "
167 |                 f"Valid packages: {list(self.package_definitions.keys())}. Loading 'none' package."
168 |             )
169 | 
170 |         if self.package_definitions:
171 |             self.enabled_tool_names = self.package_definitions.get(self.current_package_name, [])
172 |         else:
173 |             self.enabled_tool_names = []
174 | 
175 |         logger.info(
176 |             f"Loading package '{self.current_package_name}' with {len(self.enabled_tool_names)} tools."
177 |         )
178 | 
179 |     async def _list_tools_impl(self) -> List[types.Tool]:
180 |         """Implementation for the list_tools MCP endpoint."""
181 |         tool_list: List[types.Tool] = []
182 | 
183 |         # Add the introspection tool if not 'none' package
184 |         if self.current_package_name != "none":
185 |             tool_list.append(
186 |                 types.Tool(
187 |                     name="list_tool_packages",
188 |                     description="Lists available tool packages and the currently loaded one.",
189 |                     inputSchema={
190 |                         "type": "object",
191 |                         "properties": {
192 |                             "random_string": {
193 |                                 "type": "string",
194 |                                 "description": "Dummy parameter for no-parameter tools",
195 |                             }
196 |                         },
197 |                         "required": ["random_string"],
198 |                     },
199 |                 )
200 |             )
201 | 
202 |         # Iterate through defined tools and add enabled ones
203 |         for tool_name, definition in self.tool_definitions.items():
204 |             if tool_name in self.enabled_tool_names:
205 |                 _impl_func, params_model, _return_annotation, description, _serialization = (
206 |                     definition
207 |                 )
208 |                 try:
209 |                     schema = params_model.model_json_schema()
210 |                     tool_list.append(
211 |                         types.Tool(name=tool_name, description=description, inputSchema=schema)
212 |                     )
213 |                 except Exception as e:
214 |                     logger.error(
215 |                         f"Failed to generate schema for tool '{tool_name}': {e}", exc_info=True
216 |                     )
217 | 
218 |         logger.debug(f"Listing {len(tool_list)} tools for package '{self.current_package_name}'.")
219 |         return tool_list
220 | 
221 |     async def _call_tool_impl(self, name: str, arguments: dict) -> list[types.TextContent]:
222 |         """
223 |         Implementation for the call_tool MCP endpoint.
224 |         Handles argument parsing, tool execution, result serialization (to string),
225 |         and returning a list containing a single TextContent object.
226 | 
227 |         Args:
228 |             name: The name of the tool to call.
229 |             arguments: The arguments for the tool as a dictionary.
230 | 
231 |         Returns:
232 |             A list containing a single TextContent object with the tool output.
233 | 
234 |         Raises:
235 |             ValueError: If the tool is unknown, disabled, or if arguments are invalid.
236 |             RuntimeError: If tool execution or serialization fails.
237 |         """
238 |         logger.info(f"Received call_tool request for tool '{name}'")
239 |         # Handle the introspection tool separately
240 |         if name == "list_tool_packages":
241 |             if self.current_package_name == "none":
242 |                 raise ValueError(
243 |                     "Tool 'list_tool_packages' is not available in the 'none' package."
244 |                 )
245 |             result_dict = self._list_tool_packages_impl()
246 |             serialized_string = json.dumps(result_dict, indent=2)
247 |             # Return a list with a TextContent object
248 |             return [types.TextContent(type="text", text=serialized_string)]
249 | 
250 |         # Check if the tool exists and is enabled
251 |         if name not in self.tool_definitions:
252 |             raise ValueError(f"Unknown tool: {name}")
253 |         if name not in self.enabled_tool_names:
254 |             raise ValueError(
255 |                 f"Tool '{name}' is not enabled in the current package '{self.current_package_name}'."
256 |             )
257 | 
258 |         # Get tool definition (we don't need the serialization hint anymore)
259 |         definition = self.tool_definitions[name]
260 |         impl_func, params_model, _return_annotation, _description, _serialization = definition
261 | 
262 |         # Validate and parse arguments using the Pydantic model
263 |         try:
264 |             params = params_model(**arguments)
265 |             logger.debug(f"Parsed arguments for tool '{name}': {params}")
266 |         except ValidationError as e:
267 |             logger.error(f"Invalid arguments for tool '{name}': {e}", exc_info=True)
268 |             raise ValueError(f"Invalid arguments for tool '{name}': {e}") from e
269 |         except Exception as e:
270 |             logger.error(
271 |                 f"Unexpected error parsing arguments for tool '{name}': {e}", exc_info=True
272 |             )
273 |             raise ValueError(f"Failed to parse arguments for tool '{name}': {e}")
274 | 
275 |         # Execute the tool implementation function
276 |         try:
277 |             result = impl_func(self.config, self.auth_manager, params)
278 |             logger.debug(f"Raw result type from tool '{name}': {type(result)}")
279 |         except Exception as e:
280 |             logger.error(f"Error executing tool '{name}': {e}", exc_info=True)
281 |             raise RuntimeError(f"Error during execution of tool '{name}': {e}") from e
282 | 
283 |         # Serialize the result to a string (preferably JSON) using the helper
284 |         serialized_string = serialize_tool_output(result, name)
285 |         logger.debug(f"Serialized value for tool '{name}': {serialized_string[:500]}...")
286 | 
287 |         # Return a list with a TextContent object
288 |         return [types.TextContent(type="text", text=serialized_string)]
289 | 
290 |     def _list_tool_packages_impl(self) -> Dict[str, Any]:
291 |         """Implementation logic for the list_tool_packages tool."""
292 |         available_packages = list(self.package_definitions.keys())
293 |         return {
294 |             "current_package": self.current_package_name,
295 |             "available_packages": available_packages,
296 |             "message": (
297 |                 f"Currently loaded package: '{self.current_package_name}'. "
298 |                 f"Set MCP_TOOL_PACKAGE env var to one of {available_packages} to switch."
299 |             ),
300 |         }
301 | 
302 |     def start(self) -> Server:
303 |         """
304 |         Prepares and returns the configured low-level MCP Server instance.
305 | 
306 |         The caller (e.g., cli.py) is responsible for obtaining the server
307 |         instance from this method and running it within an appropriate
308 |         async transport context (e.g., mcp.server.stdio.stdio_server).
309 | 
310 |         Returns:
311 |             The configured mcp.server.lowlevel.Server instance.
312 |         """
313 |         logger.info(
314 |             "ServiceNowMCP instance configured. Returning low-level server instance for external execution."
315 |         )
316 |         # The actual running of the server (server.run(...)) must happen
317 |         # within an async context managed by the caller (e.g., using anyio
318 |         # and a specific transport like stdio_server or SseServerTransport).
319 |         return self.mcp_server
320 | 
```

--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/epic_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Epic management tools for the ServiceNow MCP server.
  3 | 
  4 | This module provides tools for managing epics in ServiceNow.
  5 | """
  6 | 
  7 | import logging
  8 | from datetime import datetime
  9 | from typing import Any, Dict, List, Optional, Type, TypeVar
 10 | 
 11 | import requests
 12 | from pydantic import BaseModel, Field
 13 | 
 14 | from servicenow_mcp.auth.auth_manager import AuthManager
 15 | from servicenow_mcp.utils.config import ServerConfig
 16 | 
 17 | logger = logging.getLogger(__name__)
 18 | 
 19 | # Type variable for Pydantic models
 20 | T = TypeVar('T', bound=BaseModel)
 21 | 
 22 | class CreateEpicParams(BaseModel):
 23 |     """Parameters for creating an epic."""
 24 | 
 25 |     short_description: str = Field(..., description="Short description of the epic")
 26 |     description: Optional[str] = Field(None, description="Detailed description of the epic")
 27 |     priority: Optional[str] = Field(None, description="Priority of epic (1 is Critical, 2 is High, 3 is Moderate, 4 is Low, 5 is Planning)")
 28 |     state: Optional[str] = Field(None, description="State of story (-6 is Draft,1 is Ready,2 is Work in progress, 3 is Complete, 4 is Cancelled)")
 29 |     assignment_group: Optional[str] = Field(None, description="Group assigned to the epic")
 30 |     assigned_to: Optional[str] = Field(None, description="User assigned to the epic")
 31 |     work_notes: Optional[str] = Field(None, description="Work notes to add to the epic. Used for adding notes and comments to an epic")
 32 |     
 33 | class UpdateEpicParams(BaseModel):
 34 |     """Parameters for updating an epic."""
 35 | 
 36 |     epic_id: str = Field(..., description="Epic ID or sys_id")
 37 |     short_description: Optional[str] = Field(None, description="Short description of the epic")
 38 |     description: Optional[str] = Field(None, description="Detailed description of the epic")
 39 |     priority: Optional[str] = Field(None, description="Priority of epic (1 is Critical, 2 is High, 3 is Moderate, 4 is Low, 5 is Planning)")
 40 |     state: Optional[str] = Field(None, description="State of story (-6 is Draft,1 is Ready,2 is Work in progress, 3 is Complete, 4 is Cancelled)")
 41 |     assignment_group: Optional[str] = Field(None, description="Group assigned to the epic")
 42 |     assigned_to: Optional[str] = Field(None, description="User assigned to the epic")
 43 |     work_notes: Optional[str] = Field(None, description="Work notes to add to the epic. Used for adding notes and comments to an epic")
 44 | 
 45 | class ListEpicsParams(BaseModel):
 46 |     """Parameters for listing epics."""
 47 | 
 48 |     limit: Optional[int] = Field(10, description="Maximum number of records to return")
 49 |     offset: Optional[int] = Field(0, description="Offset to start from")
 50 |     priority: Optional[str] = Field(None, description="Filter by priority")
 51 |     assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
 52 |     timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
 53 |     query: Optional[str] = Field(None, description="Additional query string")
 54 | 
 55 | 
 56 | def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
 57 |     """
 58 |     Helper function to unwrap and validate parameters.
 59 |     
 60 |     Args:
 61 |         params: The parameters to unwrap and validate.
 62 |         model_class: The Pydantic model class to validate against.
 63 |         required_fields: List of required field names.
 64 |         
 65 |     Returns:
 66 |         A tuple of (success, result) where result is either the validated parameters or an error message.
 67 |     """
 68 |     # Handle case where params might be wrapped in another dictionary
 69 |     if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
 70 |         logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
 71 |         params = params["params"]
 72 |     
 73 |     # Handle case where params might be a Pydantic model object
 74 |     if not isinstance(params, dict):
 75 |         try:
 76 |             # Try to convert to dict if it's a Pydantic model
 77 |             logger.warning("Params is not a dictionary. Attempting to convert...")
 78 |             params = params.dict() if hasattr(params, "dict") else dict(params)
 79 |         except Exception as e:
 80 |             logger.error(f"Failed to convert params to dictionary: {e}")
 81 |             return {
 82 |                 "success": False,
 83 |                 "message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
 84 |             }
 85 |     
 86 |     # Validate required parameters are present
 87 |     if required_fields:
 88 |         for field in required_fields:
 89 |             if field not in params:
 90 |                 return {
 91 |                     "success": False,
 92 |                     "message": f"Missing required parameter '{field}'",
 93 |                 }
 94 |     
 95 |     try:
 96 |         # Validate parameters against the model
 97 |         validated_params = model_class(**params)
 98 |         return {
 99 |             "success": True,
100 |             "params": validated_params,
101 |         }
102 |     except Exception as e:
103 |         logger.error(f"Error validating parameters: {e}")
104 |         return {
105 |             "success": False,
106 |             "message": f"Error validating parameters: {str(e)}",
107 |         }
108 | 
109 | 
110 | def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
111 |     """
112 |     Helper function to get the instance URL from either server_config or auth_manager.
113 |     
114 |     Args:
115 |         auth_manager: The authentication manager.
116 |         server_config: The server configuration.
117 |         
118 |     Returns:
119 |         The instance URL if found, None otherwise.
120 |     """
121 |     if hasattr(server_config, 'instance_url'):
122 |         return server_config.instance_url
123 |     elif hasattr(auth_manager, 'instance_url'):
124 |         return auth_manager.instance_url
125 |     else:
126 |         logger.error("Cannot find instance_url in either server_config or auth_manager")
127 |         return None
128 | 
129 | 
130 | def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
131 |     """
132 |     Helper function to get headers from either auth_manager or server_config.
133 |     
134 |     Args:
135 |         auth_manager: The authentication manager or object passed as auth_manager.
136 |         server_config: The server configuration or object passed as server_config.
137 |         
138 |     Returns:
139 |         The headers if found, None otherwise.
140 |     """
141 |     # Try to get headers from auth_manager
142 |     if hasattr(auth_manager, 'get_headers'):
143 |         return auth_manager.get_headers()
144 |     
145 |     # If auth_manager doesn't have get_headers, try server_config
146 |     if hasattr(server_config, 'get_headers'):
147 |         return server_config.get_headers()
148 |     
149 |     # If neither has get_headers, check if auth_manager is actually a ServerConfig
150 |     # and server_config is actually an AuthManager (parameters swapped)
151 |     if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
152 |         return server_config.get_headers()
153 |     
154 |     logger.error("Cannot find get_headers method in either auth_manager or server_config")
155 |     return None
156 | 
157 | def create_epic(
158 |     auth_manager: AuthManager,
159 |     server_config: ServerConfig,
160 |     params: Dict[str, Any],
161 | ) -> Dict[str, Any]:
162 |     """
163 |     Create a new epic in ServiceNow.
164 | 
165 |     Args:
166 |         auth_manager: The authentication manager.
167 |         server_config: The server configuration.
168 |         params: The parameters for creating the epic.
169 | 
170 |     Returns:
171 |         The created epic.
172 |     """
173 | 
174 |     # Unwrap and validate parameters
175 |     result = _unwrap_and_validate_params(
176 |         params, 
177 |         CreateEpicParams, 
178 |         required_fields=["short_description"]
179 |     )
180 |     
181 |     if not result["success"]:
182 |         return result
183 |     
184 |     validated_params = result["params"]
185 |     
186 |     # Prepare the request data
187 |     data = {
188 |         "short_description": validated_params.short_description,
189 |     }
190 |        
191 |     # Add optional fields if provided
192 |     if validated_params.description:
193 |         data["description"] = validated_params.description
194 |     if validated_params.priority:
195 |         data["priority"] = validated_params.priority
196 |     if validated_params.assignment_group:
197 |         data["assignment_group"] = validated_params.assignment_group
198 |     if validated_params.assigned_to:
199 |         data["assigned_to"] = validated_params.assigned_to
200 |     if validated_params.work_notes:
201 |         data["work_notes"] = validated_params.work_notes
202 |     
203 |     # Get the instance URL
204 |     instance_url = _get_instance_url(auth_manager, server_config)
205 |     if not instance_url:
206 |         return {
207 |             "success": False,
208 |             "message": "Cannot find instance_url in either server_config or auth_manager",
209 |         }
210 |     
211 |     # Get the headers
212 |     headers = _get_headers(auth_manager, server_config)
213 |     if not headers:
214 |         return {
215 |             "success": False,
216 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
217 |         }
218 |     
219 |     # Add Content-Type header
220 |     headers["Content-Type"] = "application/json"
221 |     
222 |     # Make the API request
223 |     url = f"{instance_url}/api/now/table/rm_epic"
224 |     
225 |     try:
226 |         response = requests.post(url, json=data, headers=headers)
227 |         response.raise_for_status()
228 |         
229 |         result = response.json()
230 |         
231 |         return {
232 |             "success": True,
233 |             "message": "Epic created successfully",
234 |             "epic": result["result"],
235 |         }
236 |     except requests.exceptions.RequestException as e:
237 |         logger.error(f"Error creating epic: {e}")
238 |         return {
239 |             "success": False,
240 |             "message": f"Error creating epic: {str(e)}",
241 |         }
242 | 
243 | def update_epic(
244 |     auth_manager: AuthManager,
245 |     server_config: ServerConfig,
246 |     params: Dict[str, Any],
247 | ) -> Dict[str, Any]:
248 |     """
249 |     Update an existing epic in ServiceNow.
250 | 
251 |     Args:
252 |         auth_manager: The authentication manager.
253 |         server_config: The server configuration.
254 |         params: The parameters for updating the epic.
255 | 
256 |     Returns:
257 |         The updated epic.
258 |     """
259 |     # Unwrap and validate parameters
260 |     result = _unwrap_and_validate_params(
261 |         params, 
262 |         UpdateEpicParams,
263 |         required_fields=["epic_id"]
264 |     )
265 |     
266 |     if not result["success"]:
267 |         return result
268 |     
269 |     validated_params = result["params"]
270 |     
271 |     # Prepare the request data
272 |     data = {}
273 |     
274 |     # Add optional fields if provided
275 |     if validated_params.short_description:
276 |         data["short_description"] = validated_params.short_description
277 |     if validated_params.description:
278 |         data["description"] = validated_params.description
279 |     if validated_params.priority:
280 |         data["priority"] = validated_params.priority
281 |     if validated_params.assignment_group:
282 |         data["assignment_group"] = validated_params.assignment_group
283 |     if validated_params.assigned_to:
284 |         data["assigned_to"] = validated_params.assigned_to
285 |     if validated_params.work_notes:
286 |         data["work_notes"] = validated_params.work_notes
287 |     
288 |     # Get the instance URL
289 |     instance_url = _get_instance_url(auth_manager, server_config)
290 |     if not instance_url:
291 |         return {
292 |             "success": False,
293 |             "message": "Cannot find instance_url in either server_config or auth_manager",
294 |         }
295 |     
296 |     # Get the headers
297 |     headers = _get_headers(auth_manager, server_config)
298 |     if not headers:
299 |         return {
300 |             "success": False,
301 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
302 |         }
303 |     
304 |     # Add Content-Type header
305 |     headers["Content-Type"] = "application/json"
306 |     
307 |     # Make the API request
308 |     url = f"{instance_url}/api/now/table/rm_epic/{validated_params.epic_id}"
309 |     
310 |     try:
311 |         response = requests.put(url, json=data, headers=headers)
312 |         response.raise_for_status()
313 |         
314 |         result = response.json()
315 |         
316 |         return {
317 |             "success": True,
318 |             "message": "Epic updated successfully",
319 |             "epic": result["result"],
320 |         }
321 |     except requests.exceptions.RequestException as e:
322 |         logger.error(f"Error updating epic: {e}")
323 |         return {
324 |             "success": False,
325 |             "message": f"Error updating epic: {str(e)}",
326 |         }
327 | 
328 | def list_epics(
329 |     auth_manager: AuthManager,
330 |     server_config: ServerConfig,
331 |     params: Dict[str, Any],
332 | ) -> Dict[str, Any]:
333 |     """
334 |     List epics from ServiceNow.
335 | 
336 |     Args:
337 |         auth_manager: The authentication manager.
338 |         server_config: The server configuration.
339 |         params: The parameters for listing epics.
340 | 
341 |     Returns:
342 |         A list of epics.
343 |     """
344 |     # Unwrap and validate parameters
345 |     result = _unwrap_and_validate_params(
346 |         params, 
347 |         ListEpicsParams
348 |     )
349 |     
350 |     if not result["success"]:
351 |         return result
352 |     
353 |     validated_params = result["params"]
354 |     
355 |     # Build the query
356 |     query_parts = []
357 |     
358 |     if validated_params.priority:
359 |         query_parts.append(f"priority={validated_params.priority}")
360 |     if validated_params.assignment_group:
361 |         query_parts.append(f"assignment_group={validated_params.assignment_group}")
362 |     
363 |     # Handle timeframe filtering
364 |     if validated_params.timeframe:
365 |         now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
366 |         if validated_params.timeframe == "upcoming":
367 |             query_parts.append(f"start_date>{now}")
368 |         elif validated_params.timeframe == "in-progress":
369 |             query_parts.append(f"start_date<{now}^end_date>{now}")
370 |         elif validated_params.timeframe == "completed":
371 |             query_parts.append(f"end_date<{now}")
372 |     
373 |     # Add any additional query string
374 |     if validated_params.query:
375 |         query_parts.append(validated_params.query)
376 |     
377 |     # Combine query parts
378 |     query = "^".join(query_parts) if query_parts else ""
379 |     
380 |     # Get the instance URL
381 |     instance_url = _get_instance_url(auth_manager, server_config)
382 |     if not instance_url:
383 |         return {
384 |             "success": False,
385 |             "message": "Cannot find instance_url in either server_config or auth_manager",
386 |         }
387 |     
388 |     # Get the headers
389 |     headers = _get_headers(auth_manager, server_config)
390 |     if not headers:
391 |         return {
392 |             "success": False,
393 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
394 |         }
395 |     
396 |     # Make the API request
397 |     url = f"{instance_url}/api/now/table/rm_epic"
398 |     
399 |     params = {
400 |         "sysparm_limit": validated_params.limit,
401 |         "sysparm_offset": validated_params.offset,
402 |         "sysparm_query": query,
403 |         "sysparm_display_value": "true",
404 |     }
405 |     
406 |     try:
407 |         response = requests.get(url, headers=headers, params=params)
408 |         response.raise_for_status()
409 |         
410 |         result = response.json()
411 |         
412 |         # Handle the case where result["result"] is a list
413 |         epics = result.get("result", [])
414 |         count = len(epics)
415 |         
416 |         return {
417 |             "success": True,
418 |             "epics": epics,
419 |             "count": count,
420 |             "total": count,  # Use count as total if total is not provided
421 |         }
422 |     except requests.exceptions.RequestException as e:
423 |         logger.error(f"Error listing epics: {e}")
424 |         return {
425 |             "success": False,
426 |             "message": f"Error listing epics: {str(e)}",
427 |         }
428 | 
```

--------------------------------------------------------------------------------
/tests/test_user_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for user management tools.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | 
  8 | from servicenow_mcp.auth.auth_manager import AuthManager
  9 | from servicenow_mcp.tools.user_tools import (
 10 |     AddGroupMembersParams,
 11 |     CreateGroupParams,
 12 |     CreateUserParams,
 13 |     GetUserParams,
 14 |     ListUsersParams,
 15 |     ListGroupsParams,
 16 |     RemoveGroupMembersParams,
 17 |     UpdateGroupParams,
 18 |     UpdateUserParams,
 19 |     add_group_members,
 20 |     create_group,
 21 |     create_user,
 22 |     get_user,
 23 |     list_users,
 24 |     list_groups,
 25 |     remove_group_members,
 26 |     update_group,
 27 |     update_user,
 28 | )
 29 | from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
 30 | 
 31 | 
 32 | class TestUserTools(unittest.TestCase):
 33 |     """Tests for user management tools."""
 34 | 
 35 |     def setUp(self):
 36 |         """Set up test environment."""
 37 |         # Create config and auth manager
 38 |         self.config = ServerConfig(
 39 |             instance_url="https://example.service-now.com",
 40 |             auth=AuthConfig(
 41 |                 type=AuthType.BASIC,
 42 |                 basic=BasicAuthConfig(username="admin", password="password"),
 43 |             ),
 44 |         )
 45 |         self.auth_manager = AuthManager(self.config.auth)
 46 |         
 47 |         # Mock auth_manager.get_headers() method
 48 |         self.auth_manager.get_headers = MagicMock(return_value={"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="})
 49 | 
 50 |     @patch("requests.post")
 51 |     def test_create_user(self, mock_post):
 52 |         """Test create_user function."""
 53 |         # Configure mock
 54 |         mock_response = MagicMock()
 55 |         mock_response.raise_for_status = MagicMock()
 56 |         mock_response.json.return_value = {
 57 |             "result": {
 58 |                 "sys_id": "user123",
 59 |                 "user_name": "alice.radiology",
 60 |             }
 61 |         }
 62 |         mock_post.return_value = mock_response
 63 |         
 64 |         # Create test params
 65 |         params = CreateUserParams(
 66 |             user_name="alice.radiology",
 67 |             first_name="Alice",
 68 |             last_name="Radiology",
 69 |             email="[email protected]",
 70 |             department="Radiology",
 71 |             title="Doctor",
 72 |         )
 73 |         
 74 |         # Call function
 75 |         result = create_user(self.config, self.auth_manager, params)
 76 |         
 77 |         # Verify result
 78 |         self.assertTrue(result.success)
 79 |         self.assertEqual(result.user_id, "user123")
 80 |         self.assertEqual(result.user_name, "alice.radiology")
 81 |         
 82 |         # Verify mock was called correctly
 83 |         mock_post.assert_called_once()
 84 |         call_args = mock_post.call_args
 85 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user")
 86 |         self.assertEqual(call_args[1]["json"]["user_name"], "alice.radiology")
 87 |         self.assertEqual(call_args[1]["json"]["first_name"], "Alice")
 88 |         self.assertEqual(call_args[1]["json"]["last_name"], "Radiology")
 89 |         self.assertEqual(call_args[1]["json"]["email"], "[email protected]")
 90 |         self.assertEqual(call_args[1]["json"]["department"], "Radiology")
 91 |         self.assertEqual(call_args[1]["json"]["title"], "Doctor")
 92 | 
 93 |     @patch("requests.patch")
 94 |     def test_update_user(self, mock_patch):
 95 |         """Test update_user function."""
 96 |         # Configure mock
 97 |         mock_response = MagicMock()
 98 |         mock_response.raise_for_status = MagicMock()
 99 |         mock_response.json.return_value = {
100 |             "result": {
101 |                 "sys_id": "user123",
102 |                 "user_name": "alice.radiology",
103 |             }
104 |         }
105 |         mock_patch.return_value = mock_response
106 |         
107 |         # Create test params
108 |         params = UpdateUserParams(
109 |             user_id="user123",
110 |             manager="user456",
111 |             title="Senior Doctor",
112 |         )
113 |         
114 |         # Call function
115 |         result = update_user(self.config, self.auth_manager, params)
116 |         
117 |         # Verify result
118 |         self.assertTrue(result.success)
119 |         self.assertEqual(result.user_id, "user123")
120 |         self.assertEqual(result.user_name, "alice.radiology")
121 |         
122 |         # Verify mock was called correctly
123 |         mock_patch.assert_called_once()
124 |         call_args = mock_patch.call_args
125 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user/user123")
126 |         self.assertEqual(call_args[1]["json"]["manager"], "user456")
127 |         self.assertEqual(call_args[1]["json"]["title"], "Senior Doctor")
128 | 
129 |     @patch("requests.get")
130 |     def test_get_user(self, mock_get):
131 |         """Test get_user function."""
132 |         # Configure mock
133 |         mock_response = MagicMock()
134 |         mock_response.raise_for_status = MagicMock()
135 |         mock_response.json.return_value = {
136 |             "result": [
137 |                 {
138 |                     "sys_id": "user123",
139 |                     "user_name": "alice.radiology",
140 |                     "first_name": "Alice",
141 |                     "last_name": "Radiology",
142 |                     "email": "[email protected]",
143 |                 }
144 |             ]
145 |         }
146 |         mock_get.return_value = mock_response
147 |         
148 |         # Create test params
149 |         params = GetUserParams(
150 |             user_name="alice.radiology",
151 |         )
152 |         
153 |         # Call function
154 |         result = get_user(self.config, self.auth_manager, params)
155 |         
156 |         # Verify result
157 |         self.assertTrue(result["success"])
158 |         self.assertEqual(result["user"]["sys_id"], "user123")
159 |         self.assertEqual(result["user"]["user_name"], "alice.radiology")
160 |         
161 |         # Verify mock was called correctly
162 |         mock_get.assert_called_once()
163 |         call_args = mock_get.call_args
164 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user")
165 |         self.assertEqual(call_args[1]["params"]["sysparm_query"], "user_name=alice.radiology")
166 | 
167 |     @patch("requests.get")
168 |     def test_list_users(self, mock_get):
169 |         """Test list_users function."""
170 |         # Configure mock
171 |         mock_response = MagicMock()
172 |         mock_response.raise_for_status = MagicMock()
173 |         mock_response.json.return_value = {
174 |             "result": [
175 |                 {
176 |                     "sys_id": "user123",
177 |                     "user_name": "alice.radiology",
178 |                 },
179 |                 {
180 |                     "sys_id": "user456",
181 |                     "user_name": "bob.chiefradiology",
182 |                 }
183 |             ]
184 |         }
185 |         mock_get.return_value = mock_response
186 |         
187 |         # Create test params
188 |         params = ListUsersParams(
189 |             department="Radiology",
190 |             limit=10,
191 |         )
192 |         
193 |         # Call function
194 |         result = list_users(self.config, self.auth_manager, params)
195 |         
196 |         # Verify result
197 |         self.assertTrue(result["success"])
198 |         self.assertEqual(len(result["users"]), 2)
199 |         self.assertEqual(result["users"][0]["sys_id"], "user123")
200 |         self.assertEqual(result["users"][1]["sys_id"], "user456")
201 |         
202 |         # Verify mock was called correctly
203 |         mock_get.assert_called_once()
204 |         call_args = mock_get.call_args
205 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user")
206 |         self.assertEqual(call_args[1]["params"]["sysparm_limit"], "10")
207 |         self.assertIn("department=Radiology", call_args[1]["params"]["sysparm_query"])
208 | 
209 |     @patch("requests.get")
210 |     def test_list_groups(self, mock_get):
211 |         """Test list_groups function."""
212 |         # Configure mock
213 |         mock_response = MagicMock()
214 |         mock_response.raise_for_status = MagicMock()
215 |         mock_response.json.return_value = {
216 |             "result": [
217 |                 {
218 |                     "sys_id": "group123",
219 |                     "name": "IT Support",
220 |                     "description": "IT support team",
221 |                     "active": "true",
222 |                     "type": "it"
223 |                 },
224 |                 {
225 |                     "sys_id": "group456",
226 |                     "name": "HR Team",
227 |                     "description": "Human Resources team",
228 |                     "active": "true",
229 |                     "type": "administrative"
230 |                 }
231 |             ]
232 |         }
233 |         mock_get.return_value = mock_response
234 |         
235 |         # Create test params
236 |         params = ListGroupsParams(
237 |             active=True,
238 |             type="it",
239 |             query="support",
240 |             limit=10,
241 |         )
242 |         
243 |         # Call function
244 |         result = list_groups(self.config, self.auth_manager, params)
245 |         
246 |         # Verify result
247 |         self.assertTrue(result["success"])
248 |         self.assertEqual(len(result["groups"]), 2)
249 |         self.assertEqual(result["groups"][0]["sys_id"], "group123")
250 |         self.assertEqual(result["groups"][1]["sys_id"], "group456")
251 |         self.assertEqual(result["count"], 2)
252 |         
253 |         # Verify mock was called correctly
254 |         mock_get.assert_called_once()
255 |         call_args = mock_get.call_args
256 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_group")
257 |         self.assertEqual(call_args[1]["params"]["sysparm_limit"], "10")
258 |         self.assertEqual(call_args[1]["params"]["sysparm_offset"], "0")
259 |         self.assertEqual(call_args[1]["params"]["sysparm_display_value"], "true")
260 |         self.assertIn("active=true", call_args[1]["params"]["sysparm_query"])
261 |         self.assertIn("type=it", call_args[1]["params"]["sysparm_query"])
262 |         self.assertIn("nameLIKE", call_args[1]["params"]["sysparm_query"])
263 |         self.assertIn("descriptionLIKE", call_args[1]["params"]["sysparm_query"])
264 | 
265 |     @patch("requests.post")
266 |     def test_create_group(self, mock_post):
267 |         """Test create_group function."""
268 |         # Configure mock
269 |         mock_response = MagicMock()
270 |         mock_response.raise_for_status = MagicMock()
271 |         mock_response.json.return_value = {
272 |             "result": {
273 |                 "sys_id": "group123",
274 |                 "name": "Biomedical Engineering",
275 |             }
276 |         }
277 |         mock_post.return_value = mock_response
278 |         
279 |         # Create test params
280 |         params = CreateGroupParams(
281 |             name="Biomedical Engineering",
282 |             description="Group for biomedical engineering staff",
283 |             manager="user456",
284 |         )
285 |         
286 |         # Call function
287 |         result = create_group(self.config, self.auth_manager, params)
288 |         
289 |         # Verify result
290 |         self.assertTrue(result.success)
291 |         self.assertEqual(result.group_id, "group123")
292 |         self.assertEqual(result.group_name, "Biomedical Engineering")
293 |         
294 |         # Verify mock was called correctly
295 |         mock_post.assert_called_once()
296 |         call_args = mock_post.call_args
297 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_group")
298 |         self.assertEqual(call_args[1]["json"]["name"], "Biomedical Engineering")
299 |         self.assertEqual(call_args[1]["json"]["description"], "Group for biomedical engineering staff")
300 |         self.assertEqual(call_args[1]["json"]["manager"], "user456")
301 | 
302 |     @patch("requests.patch")
303 |     def test_update_group(self, mock_patch):
304 |         """Test update_group function."""
305 |         # Configure mock
306 |         mock_response = MagicMock()
307 |         mock_response.raise_for_status = MagicMock()
308 |         mock_response.json.return_value = {
309 |             "result": {
310 |                 "sys_id": "group123",
311 |                 "name": "Biomedical Engineering",
312 |             }
313 |         }
314 |         mock_patch.return_value = mock_response
315 |         
316 |         # Create test params
317 |         params = UpdateGroupParams(
318 |             group_id="group123",
319 |             description="Updated description for biomedical engineering group",
320 |             manager="user789",
321 |         )
322 |         
323 |         # Call function
324 |         result = update_group(self.config, self.auth_manager, params)
325 |         
326 |         # Verify result
327 |         self.assertTrue(result.success)
328 |         self.assertEqual(result.group_id, "group123")
329 |         self.assertEqual(result.group_name, "Biomedical Engineering")
330 |         
331 |         # Verify mock was called correctly
332 |         mock_patch.assert_called_once()
333 |         call_args = mock_patch.call_args
334 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_group/group123")
335 |         self.assertEqual(call_args[1]["json"]["description"], "Updated description for biomedical engineering group")
336 |         self.assertEqual(call_args[1]["json"]["manager"], "user789")
337 | 
338 |     @patch("servicenow_mcp.tools.user_tools.get_user")
339 |     @patch("requests.post")
340 |     def test_add_group_members(self, mock_post, mock_get_user):
341 |         """Test add_group_members function."""
342 |         # Configure mocks
343 |         mock_post_response = MagicMock()
344 |         mock_post_response.raise_for_status = MagicMock()
345 |         mock_post.return_value = mock_post_response
346 |         
347 |         mock_get_user.return_value = {
348 |             "success": True,
349 |             "message": "User found",
350 |             "user": {
351 |                 "sys_id": "user123",
352 |                 "user_name": "alice.radiology",
353 |             }
354 |         }
355 |         
356 |         # Create test params
357 |         params = AddGroupMembersParams(
358 |             group_id="group123",
359 |             members=["alice.radiology", "admin"],
360 |         )
361 |         
362 |         # Call function
363 |         result = add_group_members(self.config, self.auth_manager, params)
364 |         
365 |         # Verify result
366 |         self.assertTrue(result.success)
367 |         self.assertEqual(result.group_id, "group123")
368 |         
369 |         # Verify mock was called correctly
370 |         self.assertEqual(mock_post.call_count, 2)  # Once for each member
371 |         call_args = mock_post.call_args_list[0]
372 |         self.assertEqual(call_args[0][0], f"{self.config.api_url}/table/sys_user_grmember")
373 |         self.assertEqual(call_args[1]["json"]["group"], "group123")
374 |         self.assertEqual(call_args[1]["json"]["user"], "user123")
375 | 
376 |     @patch("servicenow_mcp.tools.user_tools.get_user")
377 |     @patch("requests.get")
378 |     @patch("requests.delete")
379 |     def test_remove_group_members(self, mock_delete, mock_get, mock_get_user):
380 |         """Test remove_group_members function."""
381 |         # Configure mocks
382 |         mock_delete_response = MagicMock()
383 |         mock_delete_response.raise_for_status = MagicMock()
384 |         mock_delete.return_value = mock_delete_response
385 |         
386 |         mock_get_response = MagicMock()
387 |         mock_get_response.raise_for_status = MagicMock()
388 |         mock_get_response.json.return_value = {
389 |             "result": [
390 |                 {
391 |                     "sys_id": "member123",
392 |                     "user": {
393 |                         "value": "user123",
394 |                         "display_value": "Alice Radiology",
395 |                     },
396 |                     "group": {
397 |                         "value": "group123",
398 |                         "display_value": "Biomedical Engineering",
399 |                     },
400 |                 }
401 |             ]
402 |         }
403 |         mock_get.return_value = mock_get_response
404 |         
405 |         mock_get_user.return_value = {
406 |             "success": True,
407 |             "message": "User found",
408 |             "user": {
409 |                 "sys_id": "user123",
410 |                 "user_name": "alice.radiology",
411 |             }
412 |         }
413 |         
414 |         # Create test params
415 |         params = RemoveGroupMembersParams(
416 |             group_id="group123",
417 |             members=["alice.radiology"],
418 |         )
419 |         
420 |         # Call function
421 |         result = remove_group_members(self.config, self.auth_manager, params)
422 |         
423 |         # Verify result
424 |         self.assertTrue(result.success)
425 |         self.assertEqual(result.group_id, "group123")
426 |         
427 |         # Verify mock was called correctly
428 |         mock_get.assert_called_once()
429 |         get_call_args = mock_get.call_args
430 |         self.assertEqual(get_call_args[0][0], f"{self.config.api_url}/table/sys_user_grmember")
431 |         self.assertEqual(get_call_args[1]["params"]["sysparm_query"], "group=group123^user=user123")
432 |         
433 |         mock_delete.assert_called_once()
434 |         delete_call_args = mock_delete.call_args
435 |         self.assertEqual(delete_call_args[0][0], f"{self.config.api_url}/table/sys_user_grmember/member123")
436 | 
437 | 
438 | if __name__ == "__main__":
439 |     unittest.main() 
```

--------------------------------------------------------------------------------
/tests/test_catalog_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the ServiceNow MCP catalog tools.
  3 | """
  4 | 
  5 | import unittest
  6 | from unittest.mock import MagicMock, patch
  7 | 
  8 | import requests
  9 | 
 10 | from servicenow_mcp.auth.auth_manager import AuthManager
 11 | from servicenow_mcp.tools.catalog_tools import (
 12 |     GetCatalogItemParams,
 13 |     ListCatalogCategoriesParams,
 14 |     ListCatalogItemsParams,
 15 |     CreateCatalogCategoryParams,
 16 |     UpdateCatalogCategoryParams,
 17 |     MoveCatalogItemsParams,
 18 |     get_catalog_item,
 19 |     get_catalog_item_variables,
 20 |     list_catalog_categories,
 21 |     list_catalog_items,
 22 |     create_catalog_category,
 23 |     update_catalog_category,
 24 |     move_catalog_items,
 25 | )
 26 | from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
 27 | 
 28 | 
 29 | class TestCatalogTools(unittest.TestCase):
 30 |     """Test cases for the catalog tools."""
 31 | 
 32 |     def setUp(self):
 33 |         """Set up test fixtures."""
 34 |         # Create a mock server config
 35 |         self.config = ServerConfig(
 36 |             instance_url="https://example.service-now.com",
 37 |             auth=AuthConfig(
 38 |                 type=AuthType.BASIC,
 39 |                 basic=BasicAuthConfig(username="admin", password="password"),
 40 |             ),
 41 |         )
 42 | 
 43 |         # Create a mock auth manager
 44 |         self.auth_manager = MagicMock(spec=AuthManager)
 45 |         self.auth_manager.get_headers.return_value = {"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="}
 46 | 
 47 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
 48 |     def test_list_catalog_items(self, mock_get):
 49 |         """Test listing catalog items."""
 50 |         # Mock the response from ServiceNow
 51 |         mock_response = MagicMock()
 52 |         mock_response.json.return_value = {
 53 |             "result": [
 54 |                 {
 55 |                     "sys_id": "item1",
 56 |                     "name": "Laptop",
 57 |                     "short_description": "Request a new laptop",
 58 |                     "category": "Hardware",
 59 |                     "price": "1000",
 60 |                     "picture": "laptop.jpg",
 61 |                     "active": "true",
 62 |                     "order": "100",
 63 |                 }
 64 |             ]
 65 |         }
 66 |         mock_response.raise_for_status = MagicMock()
 67 |         mock_get.return_value = mock_response
 68 | 
 69 |         # Call the function
 70 |         params = ListCatalogItemsParams(
 71 |             limit=10,
 72 |             offset=0,
 73 |             category="Hardware",
 74 |             query="laptop",
 75 |             active=True,
 76 |         )
 77 |         result = list_catalog_items(self.config, self.auth_manager, params)
 78 | 
 79 |         # Check the result
 80 |         self.assertTrue(result["success"])
 81 |         self.assertEqual(len(result["items"]), 1)
 82 |         self.assertEqual(result["items"][0]["name"], "Laptop")
 83 |         self.assertEqual(result["items"][0]["category"], "Hardware")
 84 | 
 85 |         # Check that the correct URL and parameters were used
 86 |         mock_get.assert_called_once()
 87 |         args, kwargs = mock_get.call_args
 88 |         self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_cat_item")
 89 |         self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
 90 |         self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
 91 |         self.assertIn("sysparm_query", kwargs["params"])
 92 |         self.assertIn("active=true", kwargs["params"]["sysparm_query"])
 93 |         self.assertIn("category=Hardware", kwargs["params"]["sysparm_query"])
 94 |         self.assertIn("short_descriptionLIKElaptop^ORnameLIKElaptop", kwargs["params"]["sysparm_query"])
 95 | 
 96 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
 97 |     def test_list_catalog_items_error(self, mock_get):
 98 |         """Test listing catalog items with an error."""
 99 |         # Mock the response from ServiceNow
100 |         mock_get.side_effect = requests.exceptions.RequestException("Error")
101 | 
102 |         # Call the function
103 |         params = ListCatalogItemsParams(
104 |             limit=10,
105 |             offset=0,
106 |         )
107 |         result = list_catalog_items(self.config, self.auth_manager, params)
108 | 
109 |         # Check the result
110 |         self.assertFalse(result["success"])
111 |         self.assertEqual(len(result["items"]), 0)
112 |         self.assertIn("Error", result["message"])
113 | 
114 |     @patch("servicenow_mcp.tools.catalog_tools.get_catalog_item_variables")
115 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
116 |     def test_get_catalog_item(self, mock_get, mock_get_variables):
117 |         """Test getting a specific catalog item."""
118 |         # Mock the response from ServiceNow
119 |         mock_response = MagicMock()
120 |         mock_response.json.return_value = {
121 |             "result": {
122 |                 "sys_id": "item1",
123 |                 "name": "Laptop",
124 |                 "short_description": "Request a new laptop",
125 |                 "description": "Request a new laptop for work",
126 |                 "category": "Hardware",
127 |                 "price": "1000",
128 |                 "picture": "laptop.jpg",
129 |                 "active": "true",
130 |                 "order": "100",
131 |                 "delivery_time": "3 days",
132 |                 "availability": "In Stock",
133 |             }
134 |         }
135 |         mock_response.raise_for_status = MagicMock()
136 |         mock_get.return_value = mock_response
137 | 
138 |         # Mock the variables
139 |         mock_get_variables.return_value = [
140 |             {
141 |                 "sys_id": "var1",
142 |                 "name": "model",
143 |                 "label": "Laptop Model",
144 |                 "type": "string",
145 |                 "mandatory": "true",
146 |                 "default_value": "MacBook Pro",
147 |                 "help_text": "Select the laptop model",
148 |                 "order": "100",
149 |             }
150 |         ]
151 | 
152 |         # Call the function
153 |         params = GetCatalogItemParams(item_id="item1")
154 |         result = get_catalog_item(self.config, self.auth_manager, params)
155 | 
156 |         # Check the result
157 |         self.assertTrue(result.success)
158 |         self.assertEqual(result.data["name"], "Laptop")
159 |         self.assertEqual(result.data["category"], "Hardware")
160 |         self.assertEqual(len(result.data["variables"]), 1)
161 |         self.assertEqual(result.data["variables"][0]["name"], "model")
162 | 
163 |         # Check that the correct URL and parameters were used
164 |         mock_get.assert_called_once()
165 |         args, kwargs = mock_get.call_args
166 |         self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_cat_item/item1")
167 | 
168 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
169 |     def test_get_catalog_item_not_found(self, mock_get):
170 |         """Test getting a catalog item that doesn't exist."""
171 |         # Mock the response from ServiceNow
172 |         mock_response = MagicMock()
173 |         mock_response.json.return_value = {"result": {}}
174 |         mock_response.raise_for_status = MagicMock()
175 |         mock_get.return_value = mock_response
176 | 
177 |         # Call the function
178 |         params = GetCatalogItemParams(item_id="nonexistent")
179 |         result = get_catalog_item(self.config, self.auth_manager, params)
180 | 
181 |         # Check the result
182 |         self.assertFalse(result.success)
183 |         self.assertIn("not found", result.message)
184 |         self.assertIsNone(result.data)
185 | 
186 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
187 |     def test_get_catalog_item_error(self, mock_get):
188 |         """Test getting a catalog item with an error."""
189 |         # Mock the response from ServiceNow
190 |         mock_get.side_effect = requests.exceptions.RequestException("Error")
191 | 
192 |         # Call the function
193 |         params = GetCatalogItemParams(item_id="item1")
194 |         result = get_catalog_item(self.config, self.auth_manager, params)
195 | 
196 |         # Check the result
197 |         self.assertFalse(result.success)
198 |         self.assertIn("Error", result.message)
199 |         self.assertIsNone(result.data)
200 | 
201 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
202 |     def test_get_catalog_item_variables(self, mock_get):
203 |         """Test getting variables for a catalog item."""
204 |         # Mock the response from ServiceNow
205 |         mock_response = MagicMock()
206 |         mock_response.json.return_value = {
207 |             "result": [
208 |                 {
209 |                     "sys_id": "var1",
210 |                     "name": "model",
211 |                     "question_text": "Laptop Model",
212 |                     "type": "string",
213 |                     "mandatory": "true",
214 |                     "default_value": "MacBook Pro",
215 |                     "help_text": "Select the laptop model",
216 |                     "order": "100",
217 |                 }
218 |             ]
219 |         }
220 |         mock_response.raise_for_status = MagicMock()
221 |         mock_get.return_value = mock_response
222 | 
223 |         # Call the function
224 |         result = get_catalog_item_variables(self.config, self.auth_manager, "item1")
225 | 
226 |         # Check the result
227 |         self.assertEqual(len(result), 1)
228 |         self.assertEqual(result[0]["name"], "model")
229 |         self.assertEqual(result[0]["label"], "Laptop Model")
230 |         self.assertEqual(result[0]["type"], "string")
231 | 
232 |         # Check that the correct URL and parameters were used
233 |         mock_get.assert_called_once()
234 |         args, kwargs = mock_get.call_args
235 |         self.assertEqual(args[0], "https://example.service-now.com/api/now/table/item_option_new")
236 |         self.assertEqual(kwargs["params"]["sysparm_query"], "cat_item=item1^ORDERBYorder")
237 | 
238 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
239 |     def test_get_catalog_item_variables_error(self, mock_get):
240 |         """Test getting variables for a catalog item with an error."""
241 |         # Mock the response from ServiceNow
242 |         mock_get.side_effect = requests.exceptions.RequestException("Error")
243 | 
244 |         # Call the function
245 |         result = get_catalog_item_variables(self.config, self.auth_manager, "item1")
246 | 
247 |         # Check the result
248 |         self.assertEqual(len(result), 0)
249 | 
250 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
251 |     def test_list_catalog_categories(self, mock_get):
252 |         """Test listing catalog categories."""
253 |         # Mock the response from ServiceNow
254 |         mock_response = MagicMock()
255 |         mock_response.json.return_value = {
256 |             "result": [
257 |                 {
258 |                     "sys_id": "cat1",
259 |                     "title": "Hardware",
260 |                     "description": "Hardware requests",
261 |                     "parent": "",
262 |                     "icon": "hardware.png",
263 |                     "active": "true",
264 |                     "order": "100",
265 |                 }
266 |             ]
267 |         }
268 |         mock_response.raise_for_status = MagicMock()
269 |         mock_get.return_value = mock_response
270 | 
271 |         # Call the function
272 |         params = ListCatalogCategoriesParams(
273 |             limit=10,
274 |             offset=0,
275 |             query="hardware",
276 |             active=True,
277 |         )
278 |         result = list_catalog_categories(self.config, self.auth_manager, params)
279 | 
280 |         # Check the result
281 |         self.assertTrue(result["success"])
282 |         self.assertEqual(len(result["categories"]), 1)
283 |         self.assertEqual(result["categories"][0]["title"], "Hardware")
284 |         self.assertEqual(result["categories"][0]["description"], "Hardware requests")
285 | 
286 |         # Check that the correct URL and parameters were used
287 |         mock_get.assert_called_once()
288 |         args, kwargs = mock_get.call_args
289 |         self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_category")
290 |         self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
291 |         self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
292 |         self.assertIn("sysparm_query", kwargs["params"])
293 |         self.assertIn("active=true", kwargs["params"]["sysparm_query"])
294 |         self.assertIn("titleLIKEhardware^ORdescriptionLIKEhardware", kwargs["params"]["sysparm_query"])
295 | 
296 |     @patch("servicenow_mcp.tools.catalog_tools.requests.get")
297 |     def test_list_catalog_categories_error(self, mock_get):
298 |         """Test listing catalog categories with an error."""
299 |         # Mock the response from ServiceNow
300 |         mock_get.side_effect = requests.exceptions.RequestException("Error")
301 | 
302 |         # Call the function
303 |         params = ListCatalogCategoriesParams(
304 |             limit=10,
305 |             offset=0,
306 |         )
307 |         result = list_catalog_categories(self.config, self.auth_manager, params)
308 | 
309 |         # Check the result
310 |         self.assertFalse(result["success"])
311 |         self.assertEqual(len(result["categories"]), 0)
312 |         self.assertIn("Error", result["message"])
313 | 
314 |     @patch("requests.post")
315 |     def test_create_catalog_category(self, mock_post):
316 |         """Test creating a catalog category."""
317 |         # Mock response
318 |         mock_response = MagicMock()
319 |         mock_response.json.return_value = {
320 |             "result": {
321 |                 "sys_id": "test_sys_id",
322 |                 "title": "Test Category",
323 |                 "description": "Test Description",
324 |                 "parent": "",
325 |                 "icon": "icon-test",
326 |                 "active": "true",
327 |                 "order": "100",
328 |             }
329 |         }
330 |         mock_post.return_value = mock_response
331 | 
332 |         # Create params
333 |         params = CreateCatalogCategoryParams(
334 |             title="Test Category",
335 |             description="Test Description",
336 |             icon="icon-test",
337 |             active=True,
338 |             order=100,
339 |         )
340 | 
341 |         # Call function
342 |         result = create_catalog_category(self.config, self.auth_manager, params)
343 | 
344 |         # Verify result
345 |         self.assertTrue(result.success)
346 |         self.assertEqual(result.data["title"], "Test Category")
347 |         self.assertEqual(result.data["sys_id"], "test_sys_id")
348 | 
349 |         # Verify request
350 |         mock_post.assert_called_once()
351 |         args, kwargs = mock_post.call_args
352 |         self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_category")
353 |         self.assertEqual(kwargs["json"]["title"], "Test Category")
354 |         self.assertEqual(kwargs["json"]["description"], "Test Description")
355 | 
356 |     @patch("requests.patch")
357 |     def test_update_catalog_category(self, mock_patch):
358 |         """Test updating a catalog category."""
359 |         # Mock response
360 |         mock_response = MagicMock()
361 |         mock_response.json.return_value = {
362 |             "result": {
363 |                 "sys_id": "test_sys_id",
364 |                 "title": "Updated Category",
365 |                 "description": "Updated Description",
366 |                 "parent": "",
367 |                 "icon": "icon-test",
368 |                 "active": "true",
369 |                 "order": "200",
370 |             }
371 |         }
372 |         mock_patch.return_value = mock_response
373 | 
374 |         # Create params
375 |         params = UpdateCatalogCategoryParams(
376 |             category_id="test_sys_id",
377 |             title="Updated Category",
378 |             description="Updated Description",
379 |             order=200,
380 |         )
381 | 
382 |         # Call function
383 |         result = update_catalog_category(self.config, self.auth_manager, params)
384 | 
385 |         # Verify result
386 |         self.assertTrue(result.success)
387 |         self.assertEqual(result.data["title"], "Updated Category")
388 |         self.assertEqual(result.data["description"], "Updated Description")
389 |         self.assertEqual(result.data["order"], "200")
390 | 
391 |         # Verify request
392 |         mock_patch.assert_called_once()
393 |         args, kwargs = mock_patch.call_args
394 |         self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_category/test_sys_id")
395 |         self.assertEqual(kwargs["json"]["title"], "Updated Category")
396 |         self.assertEqual(kwargs["json"]["description"], "Updated Description")
397 |         self.assertEqual(kwargs["json"]["order"], "200")
398 | 
399 |     @patch("requests.patch")
400 |     def test_move_catalog_items(self, mock_patch):
401 |         """Test moving catalog items."""
402 |         # Mock response
403 |         mock_response = MagicMock()
404 |         mock_response.json.return_value = {"result": {"sys_id": "item_id", "category": "target_category_id"}}
405 |         mock_patch.return_value = mock_response
406 | 
407 |         # Create params
408 |         params = MoveCatalogItemsParams(
409 |             item_ids=["item1", "item2", "item3"],
410 |             target_category_id="target_category_id",
411 |         )
412 | 
413 |         # Call function
414 |         result = move_catalog_items(self.config, self.auth_manager, params)
415 | 
416 |         # Verify result
417 |         self.assertTrue(result.success)
418 |         self.assertEqual(result.data["moved_items_count"], 3)
419 | 
420 |         # Verify request
421 |         self.assertEqual(mock_patch.call_count, 3)
422 |         for i, call in enumerate(mock_patch.call_args_list):
423 |             args, kwargs = call
424 |             self.assertEqual(
425 |                 args[0], 
426 |                 f"https://example.service-now.com/api/now/table/sc_cat_item/{params.item_ids[i]}"
427 |             )
428 |             self.assertEqual(kwargs["json"]["category"], "target_category_id")
429 | 
430 | 
431 | if __name__ == "__main__":
432 |     unittest.main() 
```

--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/script_include_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Script Include tools for the ServiceNow MCP server.
  3 | 
  4 | This module provides tools for managing script includes in ServiceNow.
  5 | """
  6 | 
  7 | import logging
  8 | from typing import Any, Dict, Optional
  9 | 
 10 | import requests
 11 | from pydantic import BaseModel, Field
 12 | 
 13 | from servicenow_mcp.auth.auth_manager import AuthManager
 14 | from servicenow_mcp.utils.config import ServerConfig
 15 | 
 16 | logger = logging.getLogger(__name__)
 17 | 
 18 | 
 19 | class ListScriptIncludesParams(BaseModel):
 20 |     """Parameters for listing script includes."""
 21 |     
 22 |     limit: int = Field(10, description="Maximum number of script includes to return")
 23 |     offset: int = Field(0, description="Offset for pagination")
 24 |     active: Optional[bool] = Field(None, description="Filter by active status")
 25 |     client_callable: Optional[bool] = Field(None, description="Filter by client callable status")
 26 |     query: Optional[str] = Field(None, description="Search query for script includes")
 27 | 
 28 | 
 29 | class GetScriptIncludeParams(BaseModel):
 30 |     """Parameters for getting a script include."""
 31 |     
 32 |     script_include_id: str = Field(..., description="Script include ID or name")
 33 | 
 34 | 
 35 | class CreateScriptIncludeParams(BaseModel):
 36 |     """Parameters for creating a script include."""
 37 |     
 38 |     name: str = Field(..., description="Name of the script include")
 39 |     script: str = Field(..., description="Script content")
 40 |     description: Optional[str] = Field(None, description="Description of the script include")
 41 |     api_name: Optional[str] = Field(None, description="API name of the script include")
 42 |     client_callable: bool = Field(False, description="Whether the script include is client callable")
 43 |     active: bool = Field(True, description="Whether the script include is active")
 44 |     access: str = Field("package_private", description="Access level of the script include")
 45 | 
 46 | 
 47 | class UpdateScriptIncludeParams(BaseModel):
 48 |     """Parameters for updating a script include."""
 49 |     
 50 |     script_include_id: str = Field(..., description="Script include ID or name")
 51 |     script: Optional[str] = Field(None, description="Script content")
 52 |     description: Optional[str] = Field(None, description="Description of the script include")
 53 |     api_name: Optional[str] = Field(None, description="API name of the script include")
 54 |     client_callable: Optional[bool] = Field(None, description="Whether the script include is client callable")
 55 |     active: Optional[bool] = Field(None, description="Whether the script include is active")
 56 |     access: Optional[str] = Field(None, description="Access level of the script include")
 57 | 
 58 | 
 59 | class DeleteScriptIncludeParams(BaseModel):
 60 |     """Parameters for deleting a script include."""
 61 |     
 62 |     script_include_id: str = Field(..., description="Script include ID or name")
 63 | 
 64 | 
 65 | class ScriptIncludeResponse(BaseModel):
 66 |     """Response from script include operations."""
 67 |     
 68 |     success: bool = Field(..., description="Whether the operation was successful")
 69 |     message: str = Field(..., description="Message describing the result")
 70 |     script_include_id: Optional[str] = Field(None, description="ID of the affected script include")
 71 |     script_include_name: Optional[str] = Field(None, description="Name of the affected script include")
 72 | 
 73 | 
 74 | def list_script_includes(
 75 |     config: ServerConfig,
 76 |     auth_manager: AuthManager,
 77 |     params: ListScriptIncludesParams,
 78 | ) -> Dict[str, Any]:
 79 |     """List script includes from ServiceNow.
 80 |     
 81 |     Args:
 82 |         config: The server configuration.
 83 |         auth_manager: The authentication manager.
 84 |         params: The parameters for the request.
 85 |         
 86 |     Returns:
 87 |         A dictionary containing the list of script includes.
 88 |     """
 89 |     try:
 90 |         # Build the URL
 91 |         url = f"{config.instance_url}/api/now/table/sys_script_include"
 92 |         
 93 |         # Build query parameters
 94 |         query_params = {
 95 |             "sysparm_limit": params.limit,
 96 |             "sysparm_offset": params.offset,
 97 |             "sysparm_display_value": "true",
 98 |             "sysparm_exclude_reference_link": "true",
 99 |             "sysparm_fields": "sys_id,name,script,description,api_name,client_callable,active,access,sys_created_on,sys_updated_on,sys_created_by,sys_updated_by"
100 |         }
101 |         
102 |         # Add filters if provided
103 |         query_parts = []
104 |         
105 |         if params.active is not None:
106 |             query_parts.append(f"active={str(params.active).lower()}")
107 |             
108 |         if params.client_callable is not None:
109 |             query_parts.append(f"client_callable={str(params.client_callable).lower()}")
110 |             
111 |         if params.query:
112 |             query_parts.append(f"nameLIKE{params.query}")
113 |             
114 |         if query_parts:
115 |             query_params["sysparm_query"] = "^".join(query_parts)
116 |             
117 |         # Make the request
118 |         headers = auth_manager.get_headers()
119 |         
120 |         response = requests.get(
121 |             url,
122 |             params=query_params,
123 |             headers=headers,
124 |             timeout=30,
125 |         )
126 |         response.raise_for_status()
127 |         
128 |         # Parse the response
129 |         data = response.json()
130 |         script_includes = []
131 |         
132 |         for item in data.get("result", []):
133 |             script_include = {
134 |                 "sys_id": item.get("sys_id"),
135 |                 "name": item.get("name"),
136 |                 "description": item.get("description"),
137 |                 "api_name": item.get("api_name"),
138 |                 "client_callable": item.get("client_callable") == "true",
139 |                 "active": item.get("active") == "true",
140 |                 "access": item.get("access"),
141 |                 "created_on": item.get("sys_created_on"),
142 |                 "updated_on": item.get("sys_updated_on"),
143 |                 "created_by": item.get("sys_created_by", {}).get("display_value"),
144 |                 "updated_by": item.get("sys_updated_by", {}).get("display_value"),
145 |             }
146 |             script_includes.append(script_include)
147 |             
148 |         return {
149 |             "success": True,
150 |             "message": f"Found {len(script_includes)} script includes",
151 |             "script_includes": script_includes,
152 |             "total": len(script_includes),
153 |             "limit": params.limit,
154 |             "offset": params.offset,
155 |         }
156 |         
157 |     except Exception as e:
158 |         logger.error(f"Error listing script includes: {e}")
159 |         return {
160 |             "success": False,
161 |             "message": f"Error listing script includes: {str(e)}",
162 |             "script_includes": [],
163 |             "total": 0,
164 |             "limit": params.limit,
165 |             "offset": params.offset,
166 |         }
167 | 
168 | 
169 | def get_script_include(
170 |     config: ServerConfig,
171 |     auth_manager: AuthManager,
172 |     params: GetScriptIncludeParams,
173 | ) -> Dict[str, Any]:
174 |     """Get a specific script include from ServiceNow.
175 |     
176 |     Args:
177 |         config: The server configuration.
178 |         auth_manager: The authentication manager.
179 |         params: The parameters for the request.
180 |         
181 |     Returns:
182 |         A dictionary containing the script include data.
183 |     """
184 |     try:
185 |         # Build query parameters
186 |         query_params = {
187 |             "sysparm_display_value": "true",
188 |             "sysparm_exclude_reference_link": "true",
189 |             "sysparm_fields": "sys_id,name,script,description,api_name,client_callable,active,access,sys_created_on,sys_updated_on,sys_created_by,sys_updated_by"
190 |         }
191 |         
192 |         # Determine if we're querying by sys_id or name
193 |         if params.script_include_id.startswith("sys_id:"):
194 |             sys_id = params.script_include_id.replace("sys_id:", "")
195 |             url = f"{config.instance_url}/api/now/table/sys_script_include/{sys_id}"
196 |         else:
197 |             # Query by name
198 |             url = f"{config.instance_url}/api/now/table/sys_script_include"
199 |             query_params["sysparm_query"] = f"name={params.script_include_id}"
200 |             
201 |         # Make the request
202 |         headers = auth_manager.get_headers()
203 |         
204 |         response = requests.get(
205 |             url,
206 |             params=query_params,
207 |             headers=headers,
208 |             timeout=30,
209 |         )
210 |         response.raise_for_status()
211 |         
212 |         # Parse the response
213 |         data = response.json()
214 |         
215 |         if "result" not in data:
216 |             return {
217 |                 "success": False,
218 |                 "message": f"Script include not found: {params.script_include_id}",
219 |             }
220 |             
221 |         # Handle both single result and list of results
222 |         result = data["result"]
223 |         if isinstance(result, list):
224 |             if not result:
225 |                 return {
226 |                     "success": False,
227 |                     "message": f"Script include not found: {params.script_include_id}",
228 |                 }
229 |             item = result[0]
230 |         else:
231 |             item = result
232 |             
233 |         script_include = {
234 |             "sys_id": item.get("sys_id"),
235 |             "name": item.get("name"),
236 |             "script": item.get("script"),
237 |             "description": item.get("description"),
238 |             "api_name": item.get("api_name"),
239 |             "client_callable": item.get("client_callable") == "true",
240 |             "active": item.get("active") == "true",
241 |             "access": item.get("access"),
242 |             "created_on": item.get("sys_created_on"),
243 |             "updated_on": item.get("sys_updated_on"),
244 |             "created_by": item.get("sys_created_by", {}).get("display_value"),
245 |             "updated_by": item.get("sys_updated_by", {}).get("display_value"),
246 |         }
247 |         
248 |         return {
249 |             "success": True,
250 |             "message": f"Found script include: {item.get('name')}",
251 |             "script_include": script_include,
252 |         }
253 |         
254 |     except Exception as e:
255 |         logger.error(f"Error getting script include: {e}")
256 |         return {
257 |             "success": False,
258 |             "message": f"Error getting script include: {str(e)}",
259 |         }
260 | 
261 | 
262 | def create_script_include(
263 |     config: ServerConfig,
264 |     auth_manager: AuthManager,
265 |     params: CreateScriptIncludeParams,
266 | ) -> ScriptIncludeResponse:
267 |     """Create a new script include in ServiceNow.
268 |     
269 |     Args:
270 |         config: The server configuration.
271 |         auth_manager: The authentication manager.
272 |         params: The parameters for the request.
273 |         
274 |     Returns:
275 |         A response indicating the result of the operation.
276 |     """
277 |     # Build the URL
278 |     url = f"{config.instance_url}/api/now/table/sys_script_include"
279 |     
280 |     # Build the request body
281 |     body = {
282 |         "name": params.name,
283 |         "script": params.script,
284 |         "active": str(params.active).lower(),
285 |         "client_callable": str(params.client_callable).lower(),
286 |         "access": params.access,
287 |     }
288 |     
289 |     if params.description:
290 |         body["description"] = params.description
291 |         
292 |     if params.api_name:
293 |         body["api_name"] = params.api_name
294 |         
295 |     # Make the request
296 |     headers = auth_manager.get_headers()
297 |     
298 |     try:
299 |         response = requests.post(
300 |             url,
301 |             json=body,
302 |             headers=headers,
303 |             timeout=30,
304 |         )
305 |         response.raise_for_status()
306 |         
307 |         # Parse the response
308 |         data = response.json()
309 |         
310 |         if "result" not in data:
311 |             return ScriptIncludeResponse(
312 |                 success=False,
313 |                 message="Failed to create script include",
314 |             )
315 |             
316 |         result = data["result"]
317 |         
318 |         return ScriptIncludeResponse(
319 |             success=True,
320 |             message=f"Created script include: {result.get('name')}",
321 |             script_include_id=result.get("sys_id"),
322 |             script_include_name=result.get("name"),
323 |         )
324 |         
325 |     except Exception as e:
326 |         logger.error(f"Error creating script include: {e}")
327 |         return ScriptIncludeResponse(
328 |             success=False,
329 |             message=f"Error creating script include: {str(e)}",
330 |         )
331 | 
332 | 
333 | def update_script_include(
334 |     config: ServerConfig,
335 |     auth_manager: AuthManager,
336 |     params: UpdateScriptIncludeParams,
337 | ) -> ScriptIncludeResponse:
338 |     """Update an existing script include in ServiceNow.
339 |     
340 |     Args:
341 |         config: The server configuration.
342 |         auth_manager: The authentication manager.
343 |         params: The parameters for the request.
344 |         
345 |     Returns:
346 |         A response indicating the result of the operation.
347 |     """
348 |     # First, get the script include to update
349 |     get_params = GetScriptIncludeParams(script_include_id=params.script_include_id)
350 |     get_result = get_script_include(config, auth_manager, get_params)
351 |     
352 |     if not get_result["success"]:
353 |         return ScriptIncludeResponse(
354 |             success=False,
355 |             message=get_result["message"],
356 |         )
357 |         
358 |     script_include = get_result["script_include"]
359 |     sys_id = script_include["sys_id"]
360 |     
361 |     # Build the URL
362 |     url = f"{config.instance_url}/api/now/table/sys_script_include/{sys_id}"
363 |     
364 |     # Build the request body
365 |     body = {}
366 |     
367 |     if params.script is not None:
368 |         body["script"] = params.script
369 |         
370 |     if params.description is not None:
371 |         body["description"] = params.description
372 |         
373 |     if params.api_name is not None:
374 |         body["api_name"] = params.api_name
375 |         
376 |     if params.client_callable is not None:
377 |         body["client_callable"] = str(params.client_callable).lower()
378 |         
379 |     if params.active is not None:
380 |         body["active"] = str(params.active).lower()
381 |         
382 |     if params.access is not None:
383 |         body["access"] = params.access
384 |         
385 |     # If no fields to update, return success
386 |     if not body:
387 |         return ScriptIncludeResponse(
388 |             success=True,
389 |             message=f"No changes to update for script include: {script_include['name']}",
390 |             script_include_id=sys_id,
391 |             script_include_name=script_include["name"],
392 |         )
393 |         
394 |     # Make the request
395 |     headers = auth_manager.get_headers()
396 |     
397 |     try:
398 |         response = requests.patch(
399 |             url,
400 |             json=body,
401 |             headers=headers,
402 |             timeout=30,
403 |         )
404 |         response.raise_for_status()
405 |         
406 |         # Parse the response
407 |         data = response.json()
408 |         
409 |         if "result" not in data:
410 |             return ScriptIncludeResponse(
411 |                 success=False,
412 |                 message=f"Failed to update script include: {script_include['name']}",
413 |             )
414 |             
415 |         result = data["result"]
416 |         
417 |         return ScriptIncludeResponse(
418 |             success=True,
419 |             message=f"Updated script include: {result.get('name')}",
420 |             script_include_id=result.get("sys_id"),
421 |             script_include_name=result.get("name"),
422 |         )
423 |         
424 |     except Exception as e:
425 |         logger.error(f"Error updating script include: {e}")
426 |         return ScriptIncludeResponse(
427 |             success=False,
428 |             message=f"Error updating script include: {str(e)}",
429 |         )
430 | 
431 | 
432 | def delete_script_include(
433 |     config: ServerConfig,
434 |     auth_manager: AuthManager,
435 |     params: DeleteScriptIncludeParams,
436 | ) -> ScriptIncludeResponse:
437 |     """Delete a script include from ServiceNow.
438 |     
439 |     Args:
440 |         config: The server configuration.
441 |         auth_manager: The authentication manager.
442 |         params: The parameters for the request.
443 |         
444 |     Returns:
445 |         A response indicating the result of the operation.
446 |     """
447 |     # First, get the script include to delete
448 |     get_params = GetScriptIncludeParams(script_include_id=params.script_include_id)
449 |     get_result = get_script_include(config, auth_manager, get_params)
450 |     
451 |     if not get_result["success"]:
452 |         return ScriptIncludeResponse(
453 |             success=False,
454 |             message=get_result["message"],
455 |         )
456 |         
457 |     script_include = get_result["script_include"]
458 |     sys_id = script_include["sys_id"]
459 |     name = script_include["name"]
460 |     
461 |     # Build the URL
462 |     url = f"{config.instance_url}/api/now/table/sys_script_include/{sys_id}"
463 |     
464 |     # Make the request
465 |     headers = auth_manager.get_headers()
466 |     
467 |     try:
468 |         response = requests.delete(
469 |             url,
470 |             headers=headers,
471 |             timeout=30,
472 |         )
473 |         response.raise_for_status()
474 |         
475 |         return ScriptIncludeResponse(
476 |             success=True,
477 |             message=f"Deleted script include: {name}",
478 |             script_include_id=sys_id,
479 |             script_include_name=name,
480 |         )
481 |         
482 |     except Exception as e:
483 |         logger.error(f"Error deleting script include: {e}")
484 |         return ScriptIncludeResponse(
485 |             success=False,
486 |             message=f"Error deleting script include: {str(e)}",
487 |         ) 
```

--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/project_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Project management tools for the ServiceNow MCP server.
  3 | 
  4 | This module provides tools for managing projects in ServiceNow.
  5 | """
  6 | 
  7 | import logging
  8 | from datetime import datetime
  9 | from typing import Any, Dict, List, Optional, Type, TypeVar
 10 | 
 11 | import requests
 12 | from pydantic import BaseModel, Field
 13 | 
 14 | from servicenow_mcp.auth.auth_manager import AuthManager
 15 | from servicenow_mcp.utils.config import ServerConfig
 16 | 
 17 | logger = logging.getLogger(__name__)
 18 | 
 19 | # Type variable for Pydantic models
 20 | T = TypeVar('T', bound=BaseModel)
 21 | 
 22 | class CreateProjectParams(BaseModel):
 23 |     """Parameters for creating a project."""
 24 | 
 25 |     short_description: str = Field(..., description="Project name of the project")
 26 |     description: Optional[str] = Field(None, description="Detailed description of the project")
 27 |     status: Optional[str] = Field(None, description="Status of the project (green, yellow, red)")
 28 |     state: Optional[str] = Field(None, description="State of project (-5 is Pending,1 is Open, 2 is Work in progress, 3 is Closed Complete, 4 is Closed Incomplete, 5 is Closed Skipped)")
 29 |     project_manager: Optional[str] = Field(None, description="Project manager for the project")
 30 |     percentage_complete: Optional[int] = Field(None, description="Percentage complete for the project")
 31 |     assignment_group: Optional[str] = Field(None, description="Group assigned to the project")
 32 |     assigned_to: Optional[str] = Field(None, description="User assigned to the project")
 33 |     start_date: Optional[str] = Field(None, description="Start date for the project")
 34 |     end_date: Optional[str] = Field(None, description="End date for the project")
 35 |     
 36 | class UpdateProjectParams(BaseModel):
 37 |     """Parameters for updating a project."""
 38 | 
 39 |     project_id: str = Field(..., description="Project ID or sys_id")
 40 |     short_description: Optional[str] = Field(None, description="Project name of the project")
 41 |     description: Optional[str] = Field(None, description="Detailed description of the project")
 42 |     status: Optional[str] = Field(None, description="Status of the project (green, yellow, red)")
 43 |     state: Optional[str] = Field(None, description="State of project (-5 is Pending,1 is Open, 2 is Work in progress, 3 is Closed Complete, 4 is Closed Incomplete, 5 is Closed Skipped)")
 44 |     project_manager: Optional[str] = Field(None, description="Project manager for the project")
 45 |     percentage_complete: Optional[int] = Field(None, description="Percentage complete for the project")
 46 |     assignment_group: Optional[str] = Field(None, description="Group assigned to the project")
 47 |     assigned_to: Optional[str] = Field(None, description="User assigned to the project")
 48 |     start_date: Optional[str] = Field(None, description="Start date for the project")
 49 |     end_date: Optional[str] = Field(None, description="End date for the project")
 50 | 
 51 | class ListProjectsParams(BaseModel):
 52 |     """Parameters for listing projects."""
 53 | 
 54 |     limit: Optional[int] = Field(10, description="Maximum number of records to return")
 55 |     offset: Optional[int] = Field(0, description="Offset to start from")
 56 |     state: Optional[str] = Field(None, description="Filter by state")
 57 |     assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
 58 |     timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
 59 |     query: Optional[str] = Field(None, description="Additional query string")
 60 | 
 61 | 
 62 | def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
 63 |     """
 64 |     Helper function to unwrap and validate parameters.
 65 |     
 66 |     Args:
 67 |         params: The parameters to unwrap and validate.
 68 |         model_class: The Pydantic model class to validate against.
 69 |         required_fields: List of required field names.
 70 |         
 71 |     Returns:
 72 |         A tuple of (success, result) where result is either the validated parameters or an error message.
 73 |     """
 74 |     # Handle case where params might be wrapped in another dictionary
 75 |     if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
 76 |         logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
 77 |         params = params["params"]
 78 |     
 79 |     # Handle case where params might be a Pydantic model object
 80 |     if not isinstance(params, dict):
 81 |         try:
 82 |             # Try to convert to dict if it's a Pydantic model
 83 |             logger.warning("Params is not a dictionary. Attempting to convert...")
 84 |             params = params.dict() if hasattr(params, "dict") else dict(params)
 85 |         except Exception as e:
 86 |             logger.error(f"Failed to convert params to dictionary: {e}")
 87 |             return {
 88 |                 "success": False,
 89 |                 "message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
 90 |             }
 91 |     
 92 |     # Validate required parameters are present
 93 |     if required_fields:
 94 |         for field in required_fields:
 95 |             if field not in params:
 96 |                 return {
 97 |                     "success": False,
 98 |                     "message": f"Missing required parameter '{field}'",
 99 |                 }
100 |     
101 |     try:
102 |         # Validate parameters against the model
103 |         validated_params = model_class(**params)
104 |         return {
105 |             "success": True,
106 |             "params": validated_params,
107 |         }
108 |     except Exception as e:
109 |         logger.error(f"Error validating parameters: {e}")
110 |         return {
111 |             "success": False,
112 |             "message": f"Error validating parameters: {str(e)}",
113 |         }
114 | 
115 | 
116 | def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
117 |     """
118 |     Helper function to get the instance URL from either server_config or auth_manager.
119 |     
120 |     Args:
121 |         auth_manager: The authentication manager.
122 |         server_config: The server configuration.
123 |         
124 |     Returns:
125 |         The instance URL if found, None otherwise.
126 |     """
127 |     if hasattr(server_config, 'instance_url'):
128 |         return server_config.instance_url
129 |     elif hasattr(auth_manager, 'instance_url'):
130 |         return auth_manager.instance_url
131 |     else:
132 |         logger.error("Cannot find instance_url in either server_config or auth_manager")
133 |         return None
134 | 
135 | 
136 | def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
137 |     """
138 |     Helper function to get headers from either auth_manager or server_config.
139 |     
140 |     Args:
141 |         auth_manager: The authentication manager or object passed as auth_manager.
142 |         server_config: The server configuration or object passed as server_config.
143 |         
144 |     Returns:
145 |         The headers if found, None otherwise.
146 |     """
147 |     # Try to get headers from auth_manager
148 |     if hasattr(auth_manager, 'get_headers'):
149 |         return auth_manager.get_headers()
150 |     
151 |     # If auth_manager doesn't have get_headers, try server_config
152 |     if hasattr(server_config, 'get_headers'):
153 |         return server_config.get_headers()
154 |     
155 |     # If neither has get_headers, check if auth_manager is actually a ServerConfig
156 |     # and server_config is actually an AuthManager (parameters swapped)
157 |     if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
158 |         return server_config.get_headers()
159 |     
160 |     logger.error("Cannot find get_headers method in either auth_manager or server_config")
161 |     return None
162 | 
163 | def create_project(
164 |     config: ServerConfig,  # Changed from auth_manager
165 |     auth_manager: AuthManager,  # Changed from server_config
166 |     params: Dict[str, Any],
167 | ) -> Dict[str, Any]:
168 |     """
169 |     Create a new project in ServiceNow.
170 | 
171 |     Args:
172 |         config: The server configuration.
173 |         auth_manager: The authentication manager.
174 |         params: The parameters for creating the project.
175 | 
176 |     Returns:
177 |         The created project.
178 |     """
179 | 
180 |     # Unwrap and validate parameters
181 |     result = _unwrap_and_validate_params(
182 |         params, 
183 |         CreateProjectParams, 
184 |         required_fields=["short_description"]
185 |     )
186 |     
187 |     if not result["success"]:
188 |         return result
189 |     
190 |     validated_params = result["params"]
191 |     
192 |     # Prepare the request data
193 |     data = {
194 |         "short_description": validated_params.short_description,
195 |     }
196 | 
197 |     # Add optional fields if provided
198 |     if validated_params.description:
199 |         data["description"] = validated_params.description
200 |     if validated_params.status:
201 |         data["status"] = validated_params.status
202 |     if validated_params.state:
203 |         data["state"] = validated_params.state
204 |     if validated_params.assignment_group:
205 |         data["assignment_group"] = validated_params.assignment_group
206 |     if validated_params.percentage_complete:
207 |         data["percentage_complete"] = validated_params.percentage_complete
208 |     if validated_params.assigned_to:
209 |         data["assigned_to"] = validated_params.assigned_to
210 |     if validated_params.project_manager:
211 |         data["project_manager"] = validated_params.project_manager
212 |     if validated_params.start_date:
213 |         data["start_date"] = validated_params.start_date
214 |     if validated_params.end_date:
215 |         data["end_date"] = validated_params.end_date
216 |     
217 |     # Get the instance URL
218 |     instance_url = _get_instance_url(auth_manager, config)
219 |     if not instance_url:
220 |         return {
221 |             "success": False,
222 |             "message": "Cannot find instance_url in either server_config or auth_manager",
223 |         }
224 |     
225 |     # Get the headers
226 |     headers = _get_headers(auth_manager, config)
227 |     if not headers:
228 |         return {
229 |             "success": False,
230 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
231 |         }
232 |     
233 |     # Add Content-Type header
234 |     headers["Content-Type"] = "application/json"
235 |     
236 |     # Make the API request
237 |     url = f"{instance_url}/api/now/table/pm_project"
238 |     
239 |     try:
240 |         response = requests.post(url, json=data, headers=headers)
241 |         response.raise_for_status()
242 |         
243 |         result = response.json()
244 |         
245 |         return {
246 |             "success": True,
247 |             "message": "Project created successfully",
248 |             "project": result["result"],
249 |         }
250 |     except requests.exceptions.RequestException as e:
251 |         logger.error(f"Error creating project: {e}")
252 |         return {
253 |             "success": False,
254 |             "message": f"Error creating project: {str(e)}",
255 |         }
256 | 
257 | def update_project(
258 |     config: ServerConfig,  # Changed from auth_manager
259 |     auth_manager: AuthManager,  # Changed from server_config
260 |     params: Dict[str, Any],
261 | ) -> Dict[str, Any]:
262 |     """
263 |     Update an existing project in ServiceNow.
264 | 
265 |     Args:
266 |         config: The server configuration.
267 |         auth_manager: The authentication manager.
268 |         params: The parameters for updating the project.
269 | 
270 |     Returns:
271 |         The updated project.
272 |     """
273 |     # Unwrap and validate parameters
274 |     result = _unwrap_and_validate_params(
275 |         params, 
276 |         UpdateProjectParams,
277 |         required_fields=["project_id"]
278 |     )
279 |     
280 |     if not result["success"]:
281 |         return result
282 |     
283 |     validated_params = result["params"]
284 |     
285 |     # Prepare the request data
286 |     data = {}
287 | 
288 |     # Add optional fields if provided
289 |     if validated_params.short_description:
290 |         data["short_description"] = validated_params.short_description
291 |     if validated_params.description:
292 |         data["description"] = validated_params.description
293 |     if validated_params.status:
294 |         data["status"] = validated_params.status
295 |     if validated_params.state:
296 |         data["state"] = validated_params.state
297 |     if validated_params.assignment_group:
298 |         data["assignment_group"] = validated_params.assignment_group
299 |     if validated_params.percentage_complete:
300 |         data["percentage_complete"] = validated_params.percentage_complete
301 |     if validated_params.assigned_to:
302 |         data["assigned_to"] = validated_params.assigned_to
303 |     if validated_params.project_manager:
304 |         data["project_manager"] = validated_params.project_manager
305 |     if validated_params.start_date:
306 |         data["start_date"] = validated_params.start_date
307 |     if validated_params.end_date:
308 |         data["end_date"] = validated_params.end_date
309 | 
310 |     # Get the instance URL
311 |     instance_url = _get_instance_url(auth_manager, config)
312 |     if not instance_url:
313 |         return {
314 |             "success": False,
315 |             "message": "Cannot find instance_url in either server_config or auth_manager",
316 |         }
317 |     
318 |     # Get the headers
319 |     headers = _get_headers(auth_manager, config)
320 |     if not headers:
321 |         return {
322 |             "success": False,
323 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
324 |         }
325 |     
326 |     # Add Content-Type header
327 |     headers["Content-Type"] = "application/json"
328 |     
329 |     # Make the API request
330 |     url = f"{instance_url}/api/now/table/pm_project/{validated_params.project_id}"
331 |     
332 |     try:
333 |         response = requests.put(url, json=data, headers=headers)
334 |         response.raise_for_status()
335 |         
336 |         result = response.json()
337 |         
338 |         return {
339 |             "success": True,
340 |             "message": "Project updated successfully",
341 |             "project": result["result"],
342 |         }
343 |     except requests.exceptions.RequestException as e:
344 |         logger.error(f"Error updating project: {e}")
345 |         return {
346 |             "success": False,
347 |             "message": f"Error updating project: {str(e)}",
348 |         }
349 | 
350 | def list_projects(
351 |     config: ServerConfig,  # Changed from auth_manager
352 |     auth_manager: AuthManager,  # Changed from server_config
353 |     params: Dict[str, Any],
354 | ) -> Dict[str, Any]:
355 |     """
356 |     List projects from ServiceNow.
357 | 
358 |     Args:
359 |         config: The server configuration.
360 |         auth_manager: The authentication manager.
361 |         params: The parameters for listing projects.
362 | 
363 |     Returns:
364 |         A list of projects.
365 |     """
366 |     # Unwrap and validate parameters
367 |     result = _unwrap_and_validate_params(
368 |         params, 
369 |         ListProjectsParams
370 |     )
371 |     
372 |     if not result["success"]:
373 |         return result
374 |     
375 |     validated_params = result["params"]
376 |     
377 |     # Build the query
378 |     query_parts = []
379 |     
380 |     if validated_params.state:
381 |         query_parts.append(f"state={validated_params.state}")
382 |     if validated_params.assignment_group:
383 |         query_parts.append(f"assignment_group={validated_params.assignment_group}")
384 |     
385 |     # Handle timeframe filtering
386 |     if validated_params.timeframe:
387 |         now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
388 |         if validated_params.timeframe == "upcoming":
389 |             query_parts.append(f"start_date>{now}")
390 |         elif validated_params.timeframe == "in-progress":
391 |             query_parts.append(f"start_date<{now}^end_date>{now}")
392 |         elif validated_params.timeframe == "completed":
393 |             query_parts.append(f"end_date<{now}")
394 |     
395 |     # Add any additional query string
396 |     if validated_params.query:
397 |         query_parts.append(validated_params.query)
398 |     
399 |     # Combine query parts
400 |     query = "^".join(query_parts) if query_parts else ""
401 |     
402 |     # Get the instance URL
403 |     instance_url = _get_instance_url(auth_manager, config)
404 |     if not instance_url:
405 |         return {
406 |             "success": False,
407 |             "message": "Cannot find instance_url in either server_config or auth_manager",
408 |         }
409 |     
410 |     # Get the headers
411 |     headers = _get_headers(auth_manager, config)
412 |     if not headers:
413 |         return {
414 |             "success": False,
415 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
416 |         }
417 |     
418 |     # Make the API request
419 |     url = f"{instance_url}/api/now/table/pm_project"
420 |     
421 |     params = {
422 |         "sysparm_limit": validated_params.limit,
423 |         "sysparm_offset": validated_params.offset,
424 |         "sysparm_query": query,
425 |         "sysparm_display_value": "true",
426 |     }
427 |     
428 |     try:
429 |         response = requests.get(url, headers=headers, params=params)
430 |         response.raise_for_status()
431 |         
432 |         result = response.json()
433 |         
434 |         # Handle the case where result["result"] is a list
435 |         projects = result.get("result", [])
436 |         count = len(projects)
437 |         
438 |         return {
439 |             "success": True,
440 |             "projects": projects,
441 |             "count": count,
442 |             "total": count,  # Use count as total if total is not provided
443 |         }
444 |     except requests.exceptions.RequestException as e:
445 |         logger.error(f"Error listing projects: {e}")
446 |         return {
447 |             "success": False,
448 |             "message": f"Error listing projects: {str(e)}",
449 |         }
450 | 
```

--------------------------------------------------------------------------------
/tests/test_script_include_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the script include tools.
  3 | 
  4 | This module contains tests for the script include tools in the ServiceNow MCP server.
  5 | """
  6 | 
  7 | import unittest
  8 | import requests
  9 | from unittest.mock import MagicMock, patch
 10 | 
 11 | from servicenow_mcp.auth.auth_manager import AuthManager
 12 | from servicenow_mcp.tools.script_include_tools import (
 13 |     ListScriptIncludesParams,
 14 |     GetScriptIncludeParams,
 15 |     CreateScriptIncludeParams,
 16 |     UpdateScriptIncludeParams,
 17 |     DeleteScriptIncludeParams,
 18 |     ScriptIncludeResponse,
 19 |     list_script_includes,
 20 |     get_script_include,
 21 |     create_script_include,
 22 |     update_script_include,
 23 |     delete_script_include,
 24 | )
 25 | from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
 26 | 
 27 | 
 28 | class TestScriptIncludeTools(unittest.TestCase):
 29 |     """Tests for the script include tools."""
 30 | 
 31 |     def setUp(self):
 32 |         """Set up test fixtures."""
 33 |         auth_config = AuthConfig(
 34 |             type=AuthType.BASIC,
 35 |             basic=BasicAuthConfig(
 36 |                 username="test_user",
 37 |                 password="test_password"
 38 |             )
 39 |         )
 40 |         self.server_config = ServerConfig(
 41 |             instance_url="https://test.service-now.com",
 42 |             auth=auth_config,
 43 |         )
 44 |         self.auth_manager = MagicMock(spec=AuthManager)
 45 |         self.auth_manager.get_headers.return_value = {
 46 |             "Authorization": "Bearer test",
 47 |             "Content-Type": "application/json",
 48 |         }
 49 | 
 50 |     @patch("servicenow_mcp.tools.script_include_tools.requests.get")
 51 |     def test_list_script_includes(self, mock_get):
 52 |         """Test listing script includes."""
 53 |         # Mock response
 54 |         mock_response = MagicMock()
 55 |         mock_response.json.return_value = {
 56 |             "result": [
 57 |                 {
 58 |                     "sys_id": "123",
 59 |                     "name": "TestScriptInclude",
 60 |                     "script": "var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n    },\n\n    type: 'TestScriptInclude'\n};",
 61 |                     "description": "Test Script Include",
 62 |                     "api_name": "global.TestScriptInclude",
 63 |                     "client_callable": "true",
 64 |                     "active": "true",
 65 |                     "access": "public",
 66 |                     "sys_created_on": "2023-01-01 00:00:00",
 67 |                     "sys_updated_on": "2023-01-02 00:00:00",
 68 |                     "sys_created_by": {"display_value": "admin"},
 69 |                     "sys_updated_by": {"display_value": "admin"}
 70 |                 }
 71 |             ]
 72 |         }
 73 |         mock_response.status_code = 200
 74 |         mock_get.return_value = mock_response
 75 | 
 76 |         # Call the method
 77 |         params = ListScriptIncludesParams(
 78 |             limit=10,
 79 |             offset=0,
 80 |             active=True,
 81 |             client_callable=True,
 82 |             query="Test"
 83 |         )
 84 |         result = list_script_includes(self.server_config, self.auth_manager, params)
 85 | 
 86 |         # Verify the result
 87 |         self.assertTrue(result["success"])
 88 |         self.assertEqual(1, len(result["script_includes"]))
 89 |         self.assertEqual("123", result["script_includes"][0]["sys_id"])
 90 |         self.assertEqual("TestScriptInclude", result["script_includes"][0]["name"])
 91 |         self.assertTrue(result["script_includes"][0]["client_callable"])
 92 |         self.assertTrue(result["script_includes"][0]["active"])
 93 | 
 94 |         # Verify the request
 95 |         mock_get.assert_called_once()
 96 |         args, kwargs = mock_get.call_args
 97 |         self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include", args[0])
 98 |         self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
 99 |         self.assertEqual(10, kwargs["params"]["sysparm_limit"])
100 |         self.assertEqual(0, kwargs["params"]["sysparm_offset"])
101 |         self.assertEqual("active=true^client_callable=true^nameLIKETest", kwargs["params"]["sysparm_query"])
102 | 
103 |     @patch("servicenow_mcp.tools.script_include_tools.requests.get")
104 |     def test_get_script_include(self, mock_get):
105 |         """Test getting a script include."""
106 |         # Mock response
107 |         mock_response = MagicMock()
108 |         mock_response.json.return_value = {
109 |             "result": {
110 |                 "sys_id": "123",
111 |                 "name": "TestScriptInclude",
112 |                 "script": "var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n    },\n\n    type: 'TestScriptInclude'\n};",
113 |                 "description": "Test Script Include",
114 |                 "api_name": "global.TestScriptInclude",
115 |                 "client_callable": "true",
116 |                 "active": "true",
117 |                 "access": "public",
118 |                 "sys_created_on": "2023-01-01 00:00:00",
119 |                 "sys_updated_on": "2023-01-02 00:00:00",
120 |                 "sys_created_by": {"display_value": "admin"},
121 |                 "sys_updated_by": {"display_value": "admin"}
122 |             }
123 |         }
124 |         mock_response.status_code = 200
125 |         mock_get.return_value = mock_response
126 | 
127 |         # Call the method
128 |         params = GetScriptIncludeParams(script_include_id="123")
129 |         result = get_script_include(self.server_config, self.auth_manager, params)
130 | 
131 |         # Verify the result
132 |         self.assertTrue(result["success"])
133 |         self.assertEqual("123", result["script_include"]["sys_id"])
134 |         self.assertEqual("TestScriptInclude", result["script_include"]["name"])
135 |         self.assertTrue(result["script_include"]["client_callable"])
136 |         self.assertTrue(result["script_include"]["active"])
137 | 
138 |         # Verify the request
139 |         mock_get.assert_called_once()
140 |         args, kwargs = mock_get.call_args
141 |         self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include", args[0])
142 |         self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
143 |         self.assertEqual("name=123", kwargs["params"]["sysparm_query"])
144 | 
145 |     @patch("servicenow_mcp.tools.script_include_tools.requests.post")
146 |     def test_create_script_include(self, mock_post):
147 |         """Test creating a script include."""
148 |         # Mock response
149 |         mock_response = MagicMock()
150 |         mock_response.json.return_value = {
151 |             "result": {
152 |                 "sys_id": "123",
153 |                 "name": "TestScriptInclude",
154 |             }
155 |         }
156 |         mock_response.status_code = 201
157 |         mock_post.return_value = mock_response
158 | 
159 |         # Call the method
160 |         params = CreateScriptIncludeParams(
161 |             name="TestScriptInclude",
162 |             script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n    },\n\n    type: 'TestScriptInclude'\n};",
163 |             description="Test Script Include",
164 |             api_name="global.TestScriptInclude",
165 |             client_callable=True,
166 |             active=True,
167 |             access="public"
168 |         )
169 |         result = create_script_include(self.server_config, self.auth_manager, params)
170 | 
171 |         # Verify the result
172 |         self.assertTrue(result.success)
173 |         self.assertEqual("123", result.script_include_id)
174 |         self.assertEqual("TestScriptInclude", result.script_include_name)
175 | 
176 |         # Verify the request
177 |         mock_post.assert_called_once()
178 |         args, kwargs = mock_post.call_args
179 |         self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include", args[0])
180 |         self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
181 |         self.assertEqual("TestScriptInclude", kwargs["json"]["name"])
182 |         self.assertEqual("true", kwargs["json"]["client_callable"])
183 |         self.assertEqual("true", kwargs["json"]["active"])
184 |         self.assertEqual("public", kwargs["json"]["access"])
185 | 
186 |     @patch("servicenow_mcp.tools.script_include_tools.get_script_include")
187 |     @patch("servicenow_mcp.tools.script_include_tools.requests.patch")
188 |     def test_update_script_include(self, mock_patch, mock_get_script_include):
189 |         """Test updating a script include."""
190 |         # Mock get_script_include response
191 |         mock_get_script_include.return_value = {
192 |             "success": True,
193 |             "message": "Found script include: TestScriptInclude",
194 |             "script_include": {
195 |                 "sys_id": "123",
196 |                 "name": "TestScriptInclude",
197 |                 "script": "var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n    },\n\n    type: 'TestScriptInclude'\n};",
198 |                 "description": "Test Script Include",
199 |                 "api_name": "global.TestScriptInclude",
200 |                 "client_callable": True,
201 |                 "active": True,
202 |                 "access": "public",
203 |             }
204 |         }
205 | 
206 |         # Mock patch response
207 |         mock_response = MagicMock()
208 |         mock_response.json.return_value = {
209 |             "result": {
210 |                 "sys_id": "123",
211 |                 "name": "TestScriptInclude",
212 |             }
213 |         }
214 |         mock_response.status_code = 200
215 |         mock_patch.return_value = mock_response
216 | 
217 |         # Call the method
218 |         params = UpdateScriptIncludeParams(
219 |             script_include_id="123",
220 |             script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n        // Updated\n    },\n\n    type: 'TestScriptInclude'\n};",
221 |             description="Updated Test Script Include",
222 |             client_callable=False,
223 |         )
224 |         result = update_script_include(self.server_config, self.auth_manager, params)
225 | 
226 |         # Verify the result
227 |         self.assertTrue(result.success)
228 |         self.assertEqual("123", result.script_include_id)
229 |         self.assertEqual("TestScriptInclude", result.script_include_name)
230 | 
231 |         # Verify the request
232 |         mock_patch.assert_called_once()
233 |         args, kwargs = mock_patch.call_args
234 |         self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include/123", args[0])
235 |         self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
236 |         self.assertEqual("Updated Test Script Include", kwargs["json"]["description"])
237 |         self.assertEqual("false", kwargs["json"]["client_callable"])
238 | 
239 |     @patch("servicenow_mcp.tools.script_include_tools.get_script_include")
240 |     @patch("servicenow_mcp.tools.script_include_tools.requests.delete")
241 |     def test_delete_script_include(self, mock_delete, mock_get_script_include):
242 |         """Test deleting a script include."""
243 |         # Mock get_script_include response
244 |         mock_get_script_include.return_value = {
245 |             "success": True,
246 |             "message": "Found script include: TestScriptInclude",
247 |             "script_include": {
248 |                 "sys_id": "123",
249 |                 "name": "TestScriptInclude",
250 |             }
251 |         }
252 | 
253 |         # Mock delete response
254 |         mock_response = MagicMock()
255 |         mock_response.status_code = 204
256 |         mock_delete.return_value = mock_response
257 | 
258 |         # Call the method
259 |         params = DeleteScriptIncludeParams(script_include_id="123")
260 |         result = delete_script_include(self.server_config, self.auth_manager, params)
261 | 
262 |         # Verify the result
263 |         self.assertTrue(result.success)
264 |         self.assertEqual("123", result.script_include_id)
265 |         self.assertEqual("TestScriptInclude", result.script_include_name)
266 | 
267 |         # Verify the request
268 |         mock_delete.assert_called_once()
269 |         args, kwargs = mock_delete.call_args
270 |         self.assertEqual(f"{self.server_config.instance_url}/api/now/table/sys_script_include/123", args[0])
271 |         self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
272 | 
273 |     @patch("servicenow_mcp.tools.script_include_tools.requests.get")
274 |     def test_list_script_includes_error(self, mock_get):
275 |         """Test listing script includes with an error."""
276 |         # Mock response
277 |         mock_get.side_effect = requests.RequestException("Test error")
278 | 
279 |         # Call the method
280 |         params = ListScriptIncludesParams()
281 |         result = list_script_includes(self.server_config, self.auth_manager, params)
282 | 
283 |         # Verify the result
284 |         self.assertFalse(result["success"])
285 |         self.assertIn("Error listing script includes", result["message"])
286 | 
287 |     @patch("servicenow_mcp.tools.script_include_tools.requests.get")
288 |     def test_get_script_include_error(self, mock_get):
289 |         """Test getting a script include with an error."""
290 |         # Mock response
291 |         mock_get.side_effect = requests.RequestException("Test error")
292 | 
293 |         # Call the method
294 |         params = GetScriptIncludeParams(script_include_id="123")
295 |         result = get_script_include(self.server_config, self.auth_manager, params)
296 | 
297 |         # Verify the result
298 |         self.assertFalse(result["success"])
299 |         self.assertIn("Error getting script include", result["message"])
300 | 
301 |     @patch("servicenow_mcp.tools.script_include_tools.requests.post")
302 |     def test_create_script_include_error(self, mock_post):
303 |         """Test creating a script include with an error."""
304 |         # Mock response
305 |         mock_post.side_effect = requests.RequestException("Test error")
306 | 
307 |         # Call the method
308 |         params = CreateScriptIncludeParams(
309 |             name="TestScriptInclude",
310 |             script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n    },\n\n    type: 'TestScriptInclude'\n};",
311 |         )
312 |         result = create_script_include(self.server_config, self.auth_manager, params)
313 | 
314 |         # Verify the result
315 |         self.assertFalse(result.success)
316 |         self.assertIn("Error creating script include", result.message)
317 | 
318 | 
319 | class TestScriptIncludeParams(unittest.TestCase):
320 |     """Tests for the script include parameters."""
321 | 
322 |     def test_list_script_includes_params(self):
323 |         """Test list script includes parameters."""
324 |         params = ListScriptIncludesParams(
325 |             limit=20,
326 |             offset=10,
327 |             active=True,
328 |             client_callable=False,
329 |             query="Test"
330 |         )
331 |         self.assertEqual(20, params.limit)
332 |         self.assertEqual(10, params.offset)
333 |         self.assertTrue(params.active)
334 |         self.assertFalse(params.client_callable)
335 |         self.assertEqual("Test", params.query)
336 | 
337 |     def test_get_script_include_params(self):
338 |         """Test get script include parameters."""
339 |         params = GetScriptIncludeParams(script_include_id="123")
340 |         self.assertEqual("123", params.script_include_id)
341 | 
342 |     def test_create_script_include_params(self):
343 |         """Test create script include parameters."""
344 |         params = CreateScriptIncludeParams(
345 |             name="TestScriptInclude",
346 |             script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n    },\n\n    type: 'TestScriptInclude'\n};",
347 |             description="Test Script Include",
348 |             api_name="global.TestScriptInclude",
349 |             client_callable=True,
350 |             active=True,
351 |             access="public"
352 |         )
353 |         self.assertEqual("TestScriptInclude", params.name)
354 |         self.assertTrue(params.client_callable)
355 |         self.assertTrue(params.active)
356 |         self.assertEqual("public", params.access)
357 | 
358 |     def test_update_script_include_params(self):
359 |         """Test update script include parameters."""
360 |         params = UpdateScriptIncludeParams(
361 |             script_include_id="123",
362 |             script="var TestScriptInclude = Class.create();\nTestScriptInclude.prototype = {\n    initialize: function() {\n        // Updated\n    },\n\n    type: 'TestScriptInclude'\n};",
363 |             description="Updated Test Script Include",
364 |             client_callable=False,
365 |         )
366 |         self.assertEqual("123", params.script_include_id)
367 |         self.assertEqual("Updated Test Script Include", params.description)
368 |         self.assertFalse(params.client_callable)
369 | 
370 |     def test_delete_script_include_params(self):
371 |         """Test delete script include parameters."""
372 |         params = DeleteScriptIncludeParams(script_include_id="123")
373 |         self.assertEqual("123", params.script_include_id)
374 | 
375 |     def test_script_include_response(self):
376 |         """Test script include response."""
377 |         response = ScriptIncludeResponse(
378 |             success=True,
379 |             message="Test message",
380 |             script_include_id="123",
381 |             script_include_name="TestScriptInclude"
382 |         )
383 |         self.assertTrue(response.success)
384 |         self.assertEqual("Test message", response.message)
385 |         self.assertEqual("123", response.script_include_id)
386 |         self.assertEqual("TestScriptInclude", response.script_include_name) 
```

--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/scrum_task_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Scrum Task management tools for the ServiceNow MCP server.
  3 | 
  4 | This module provides tools for managing stories in ServiceNow.
  5 | """
  6 | 
  7 | import logging
  8 | from datetime import datetime
  9 | from typing import Any, Dict, List, Optional, Type, TypeVar
 10 | 
 11 | import requests
 12 | from pydantic import BaseModel, Field
 13 | 
 14 | from servicenow_mcp.auth.auth_manager import AuthManager
 15 | from servicenow_mcp.utils.config import ServerConfig
 16 | 
 17 | logger = logging.getLogger(__name__)
 18 | 
 19 | # Type variable for Pydantic models
 20 | T = TypeVar('T', bound=BaseModel)
 21 | 
 22 | class CreateScrumTaskParams(BaseModel):
 23 |     """Parameters for creating a scrum task."""
 24 | 
 25 |     story: str = Field(..., description="Short description of the story. It requires the System ID of the story.")
 26 |     short_description: str = Field(..., description="Short description of the scrum task")
 27 |     priority: Optional[str] = Field(None, description="Priority of scrum task (1 is Critical, 2 is High, 3 is Moderate, 4 is Low)")
 28 |     planned_hours: Optional[int] = Field(None, description="Planned hours for the scrum task")
 29 |     remaining_hours: Optional[int] = Field(None, description="Remaining hours for the scrum task")
 30 |     hours: Optional[int] = Field(None, description="Actual Hours for the scrum task")
 31 |     description: Optional[str] = Field(None, description="Detailed description of the scrum task")
 32 |     type: Optional[str] = Field(None, description="Type of scrum task (1 is Analysis, 2 is Coding, 3 is Documentation, 4 is Testing)")
 33 |     state: Optional[str] = Field(None, description="State of scrum task (-6 is Draft,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
 34 |     assignment_group: Optional[str] = Field(None, description="Group assigned to the scrum task")
 35 |     assigned_to: Optional[str] = Field(None, description="User assigned to the scrum task")
 36 |     work_notes: Optional[str] = Field(None, description="Work notes to add to the scrum task")
 37 |     
 38 | class UpdateScrumTaskParams(BaseModel):
 39 |     """Parameters for updating a scrum task."""
 40 | 
 41 |     scrum_task_id: str = Field(..., description="Scrum Task ID or sys_id")
 42 |     short_description: Optional[str] = Field(None, description="Short description of the scrum task")
 43 |     priority: Optional[str] = Field(None, description="Priority of scrum task (1 is Critical, 2 is High, 3 is Moderate, 4 is Low)")
 44 |     planned_hours: Optional[int] = Field(None, description="Planned hours for the scrum task")
 45 |     remaining_hours: Optional[int] = Field(None, description="Remaining hours for the scrum task")
 46 |     hours: Optional[int] = Field(None, description="Actual Hours for the scrum task")
 47 |     description: Optional[str] = Field(None, description="Detailed description of the scrum task")
 48 |     type: Optional[str] = Field(None, description="Type of scrum task (1 is Analysis, 2 is Coding, 3 is Documentation, 4 is Testing)")
 49 |     state: Optional[str] = Field(None, description="State of scrum task (-6 is Draft,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
 50 |     assignment_group: Optional[str] = Field(None, description="Group assigned to the scrum task")
 51 |     assigned_to: Optional[str] = Field(None, description="User assigned to the scrum task")
 52 |     work_notes: Optional[str] = Field(None, description="Work notes to add to the scrum task")
 53 | 
 54 | class ListScrumTasksParams(BaseModel):
 55 |     """Parameters for listing scrum tasks."""
 56 | 
 57 |     limit: Optional[int] = Field(10, description="Maximum number of records to return")
 58 |     offset: Optional[int] = Field(0, description="Offset to start from")
 59 |     state: Optional[str] = Field(None, description="Filter by state")
 60 |     assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
 61 |     timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
 62 |     query: Optional[str] = Field(None, description="Additional query string")
 63 | 
 64 | 
 65 | def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
 66 |     """
 67 |     Helper function to unwrap and validate parameters.
 68 |     
 69 |     Args:
 70 |         params: The parameters to unwrap and validate.
 71 |         model_class: The Pydantic model class to validate against.
 72 |         required_fields: List of required field names.
 73 |         
 74 |     Returns:
 75 |         A tuple of (success, result) where result is either the validated parameters or an error message.
 76 |     """
 77 |     # Handle case where params might be wrapped in another dictionary
 78 |     if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
 79 |         logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
 80 |         params = params["params"]
 81 |     
 82 |     # Handle case where params might be a Pydantic model object
 83 |     if not isinstance(params, dict):
 84 |         try:
 85 |             # Try to convert to dict if it's a Pydantic model
 86 |             logger.warning("Params is not a dictionary. Attempting to convert...")
 87 |             params = params.dict() if hasattr(params, "dict") else dict(params)
 88 |         except Exception as e:
 89 |             logger.error(f"Failed to convert params to dictionary: {e}")
 90 |             return {
 91 |                 "success": False,
 92 |                 "message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
 93 |             }
 94 |     
 95 |     # Validate required parameters are present
 96 |     if required_fields:
 97 |         for field in required_fields:
 98 |             if field not in params:
 99 |                 return {
100 |                     "success": False,
101 |                     "message": f"Missing required parameter '{field}'",
102 |                 }
103 |     
104 |     try:
105 |         # Validate parameters against the model
106 |         validated_params = model_class(**params)
107 |         return {
108 |             "success": True,
109 |             "params": validated_params,
110 |         }
111 |     except Exception as e:
112 |         logger.error(f"Error validating parameters: {e}")
113 |         return {
114 |             "success": False,
115 |             "message": f"Error validating parameters: {str(e)}",
116 |         }
117 | 
118 | 
119 | def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
120 |     """
121 |     Helper function to get the instance URL from either server_config or auth_manager.
122 |     
123 |     Args:
124 |         auth_manager: The authentication manager.
125 |         server_config: The server configuration.
126 |         
127 |     Returns:
128 |         The instance URL if found, None otherwise.
129 |     """
130 |     if hasattr(server_config, 'instance_url'):
131 |         return server_config.instance_url
132 |     elif hasattr(auth_manager, 'instance_url'):
133 |         return auth_manager.instance_url
134 |     else:
135 |         logger.error("Cannot find instance_url in either server_config or auth_manager")
136 |         return None
137 | 
138 | 
139 | def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
140 |     """
141 |     Helper function to get headers from either auth_manager or server_config.
142 |     
143 |     Args:
144 |         auth_manager: The authentication manager or object passed as auth_manager.
145 |         server_config: The server configuration or object passed as server_config.
146 |         
147 |     Returns:
148 |         The headers if found, None otherwise.
149 |     """
150 |     # Try to get headers from auth_manager
151 |     if hasattr(auth_manager, 'get_headers'):
152 |         return auth_manager.get_headers()
153 |     
154 |     # If auth_manager doesn't have get_headers, try server_config
155 |     if hasattr(server_config, 'get_headers'):
156 |         return server_config.get_headers()
157 |     
158 |     # If neither has get_headers, check if auth_manager is actually a ServerConfig
159 |     # and server_config is actually an AuthManager (parameters swapped)
160 |     if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
161 |         return server_config.get_headers()
162 |     
163 |     logger.error("Cannot find get_headers method in either auth_manager or server_config")
164 |     return None
165 | 
166 | def create_scrum_task(
167 |     auth_manager: AuthManager,
168 |     server_config: ServerConfig,
169 |     params: Dict[str, Any],
170 | ) -> Dict[str, Any]:
171 |     """
172 |     Create a new scrum task in ServiceNow.
173 | 
174 |     Args:
175 |         auth_manager: The authentication manager.
176 |         server_config: The server configuration.
177 |         params: The parameters for creating the scrum task.
178 | 
179 |     Returns:
180 |         The created scrum task.
181 |     """
182 | 
183 |     # Unwrap and validate parameters
184 |     result = _unwrap_and_validate_params(
185 |         params, 
186 |         CreateScrumTaskParams, 
187 |         required_fields=["short_description", "story"]
188 |     )
189 |     
190 |     if not result["success"]:
191 |         return result
192 |     
193 |     validated_params = result["params"]
194 |     
195 |     # Prepare the request data
196 |     data = {
197 |         "story": validated_params.story,
198 |         "short_description": validated_params.short_description,
199 |     }
200 | 
201 |     # Add optional fields if provided
202 |     if validated_params.priority:
203 |         data["priority"] = validated_params.priority
204 |     if validated_params.planned_hours:
205 |         data["planned_hours"] = validated_params.planned_hours
206 |     if validated_params.remaining_hours:
207 |         data["remaining_hours"] = validated_params.remaining_hours
208 |     if validated_params.hours:
209 |         data["hours"] = validated_params.hours
210 |     if validated_params.description:
211 |         data["description"] = validated_params.description
212 |     if validated_params.type:
213 |         data["type"] = validated_params.type
214 |     if validated_params.state:
215 |         data["state"] = validated_params.state
216 |     if validated_params.assignment_group:
217 |         data["assignment_group"] = validated_params.assignment_group
218 |     if validated_params.assigned_to:
219 |         data["assigned_to"] = validated_params.assigned_to
220 |     if validated_params.work_notes:
221 |         data["work_notes"] = validated_params.work_notes
222 |     
223 |     # Get the instance URL
224 |     instance_url = _get_instance_url(auth_manager, server_config)
225 |     if not instance_url:
226 |         return {
227 |             "success": False,
228 |             "message": "Cannot find instance_url in either server_config or auth_manager",
229 |         }
230 |     
231 |     # Get the headers
232 |     headers = _get_headers(auth_manager, server_config)
233 |     if not headers:
234 |         return {
235 |             "success": False,
236 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
237 |         }
238 |     
239 |     # Add Content-Type header
240 |     headers["Content-Type"] = "application/json"
241 |     
242 |     # Make the API request
243 |     url = f"{instance_url}/api/now/table/rm_scrum_task"
244 |     
245 |     try:
246 |         response = requests.post(url, json=data, headers=headers)
247 |         response.raise_for_status()
248 |         
249 |         result = response.json()
250 |         
251 |         return {
252 |             "success": True,
253 |             "message": "Scrum Task created successfully",
254 |             "scrum_task": result["result"],
255 |         }
256 |     except requests.exceptions.RequestException as e:
257 |         logger.error(f"Error creating scrum task: {e}")
258 |         return {
259 |             "success": False,
260 |             "message": f"Error creating scrum task: {str(e)}",
261 |         }
262 | 
263 | def update_scrum_task(
264 |     auth_manager: AuthManager,
265 |     server_config: ServerConfig,
266 |     params: Dict[str, Any],
267 | ) -> Dict[str, Any]:
268 |     """
269 |     Update an existing scrum task in ServiceNow.
270 | 
271 |     Args:
272 |         auth_manager: The authentication manager.
273 |         server_config: The server configuration.
274 |         params: The parameters for updating the scrum task.
275 | 
276 |     Returns:
277 |         The updated scrum task.
278 |     """
279 |     # Unwrap and validate parameters
280 |     result = _unwrap_and_validate_params(
281 |         params, 
282 |         UpdateScrumTaskParams,
283 |         required_fields=["scrum_task_id"]
284 |     )
285 |     
286 |     if not result["success"]:
287 |         return result
288 |     
289 |     validated_params = result["params"]
290 |     
291 |     # Prepare the request data
292 |     data = {}
293 | 
294 |     # Add optional fields if provided
295 |     if validated_params.short_description:
296 |         data["short_description"] = validated_params.short_description
297 |     if validated_params.priority:
298 |         data["priority"] = validated_params.priority
299 |     if validated_params.planned_hours:
300 |         data["planned_hours"] = validated_params.planned_hours
301 |     if validated_params.remaining_hours:
302 |         data["remaining_hours"] = validated_params.remaining_hours
303 |     if validated_params.hours:
304 |         data["hours"] = validated_params.hours
305 |     if validated_params.description:
306 |         data["description"] = validated_params.description
307 |     if validated_params.type:
308 |         data["type"] = validated_params.type
309 |     if validated_params.state:
310 |         data["state"] = validated_params.state
311 |     if validated_params.assignment_group:
312 |         data["assignment_group"] = validated_params.assignment_group
313 |     if validated_params.assigned_to:
314 |         data["assigned_to"] = validated_params.assigned_to
315 |     if validated_params.work_notes:
316 |         data["work_notes"] = validated_params.work_notes
317 |     
318 |     # Get the instance URL
319 |     instance_url = _get_instance_url(auth_manager, server_config)
320 |     if not instance_url:
321 |         return {
322 |             "success": False,
323 |             "message": "Cannot find instance_url in either server_config or auth_manager",
324 |         }
325 |     
326 |     # Get the headers
327 |     headers = _get_headers(auth_manager, server_config)
328 |     if not headers:
329 |         return {
330 |             "success": False,
331 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
332 |         }
333 |     
334 |     # Add Content-Type header
335 |     headers["Content-Type"] = "application/json"
336 |     
337 |     # Make the API request
338 |     url = f"{instance_url}/api/now/table/rm_scrum_task/{validated_params.scrum_task_id}"
339 |     
340 |     try:
341 |         response = requests.put(url, json=data, headers=headers)
342 |         response.raise_for_status()
343 |         
344 |         result = response.json()
345 |         
346 |         return {
347 |             "success": True,
348 |             "message": "Scrum Task updated successfully",
349 |             "scrum_task": result["result"],
350 |         }
351 |     except requests.exceptions.RequestException as e:
352 |         logger.error(f"Error updating scrum task: {e}")
353 |         return {
354 |             "success": False,
355 |             "message": f"Error updating scrum task: {str(e)}",
356 |         }
357 | 
358 | def list_scrum_tasks(
359 |     auth_manager: AuthManager,
360 |     server_config: ServerConfig,
361 |     params: Dict[str, Any],
362 | ) -> Dict[str, Any]:
363 |     """
364 |     List scrum tasks from ServiceNow.
365 | 
366 |     Args:
367 |         auth_manager: The authentication manager.
368 |         server_config: The server configuration.
369 |         params: The parameters for listing scrum tasks.
370 | 
371 |     Returns:
372 |         A list of scrum tasks.
373 |     """
374 |     # Unwrap and validate parameters
375 |     result = _unwrap_and_validate_params(
376 |         params, 
377 |         ListScrumTasksParams
378 |     )
379 |     
380 |     if not result["success"]:
381 |         return result
382 |     
383 |     validated_params = result["params"]
384 |     
385 |     # Build the query
386 |     query_parts = []
387 |     
388 |     if validated_params.state:
389 |         query_parts.append(f"state={validated_params.state}")
390 |     if validated_params.assignment_group:
391 |         query_parts.append(f"assignment_group={validated_params.assignment_group}")
392 |     
393 |     # Handle timeframe filtering
394 |     if validated_params.timeframe:
395 |         now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
396 |         if validated_params.timeframe == "upcoming":
397 |             query_parts.append(f"start_date>{now}")
398 |         elif validated_params.timeframe == "in-progress":
399 |             query_parts.append(f"start_date<{now}^end_date>{now}")
400 |         elif validated_params.timeframe == "completed":
401 |             query_parts.append(f"end_date<{now}")
402 |     
403 |     # Add any additional query string
404 |     if validated_params.query:
405 |         query_parts.append(validated_params.query)
406 |     
407 |     # Combine query parts
408 |     query = "^".join(query_parts) if query_parts else ""
409 |     
410 |     # Get the instance URL
411 |     instance_url = _get_instance_url(auth_manager, server_config)
412 |     if not instance_url:
413 |         return {
414 |             "success": False,
415 |             "message": "Cannot find instance_url in either server_config or auth_manager",
416 |         }
417 |     
418 |     # Get the headers
419 |     headers = _get_headers(auth_manager, server_config)
420 |     if not headers:
421 |         return {
422 |             "success": False,
423 |             "message": "Cannot find get_headers method in either auth_manager or server_config",
424 |         }
425 |     
426 |     # Make the API request
427 |     url = f"{instance_url}/api/now/table/rm_scrum_task"
428 |     
429 |     params = {
430 |         "sysparm_limit": validated_params.limit,
431 |         "sysparm_offset": validated_params.offset,
432 |         "sysparm_query": query,
433 |         "sysparm_display_value": "true",
434 |     }
435 |     
436 |     try:
437 |         response = requests.get(url, headers=headers, params=params)
438 |         response.raise_for_status()
439 |         
440 |         result = response.json()
441 |         
442 |         # Handle the case where result["result"] is a list
443 |         scrum_tasks = result.get("result", [])
444 |         count = len(scrum_tasks)
445 |         
446 |         return {
447 |             "success": True,
448 |             "scrum_tasks": scrum_tasks,
449 |             "count": count,
450 |             "total": count,  # Use count as total if total is not provided
451 |         }
452 |     except requests.exceptions.RequestException as e:
453 |         logger.error(f"Error listing stories: {e}")
454 |         return {
455 |             "success": False,
456 |             "message": f"Error listing stories: {str(e)}",
457 |         }
458 | 
```

--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/catalog_optimization.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tools for optimizing the ServiceNow Service Catalog.
  3 | 
  4 | This module provides tools for analyzing and optimizing the ServiceNow Service Catalog,
  5 | including identifying inactive items, items with low usage, high abandonment rates,
  6 | slow fulfillment times, and poor descriptions.
  7 | """
  8 | 
  9 | import logging
 10 | import random
 11 | from dataclasses import dataclass
 12 | from typing import Dict, List, Optional
 13 | 
 14 | import requests
 15 | from pydantic import BaseModel, Field
 16 | 
 17 | from servicenow_mcp.auth.auth_manager import AuthManager
 18 | from servicenow_mcp.utils.config import ServerConfig
 19 | 
 20 | logger = logging.getLogger(__name__)
 21 | 
 22 | 
 23 | class OptimizationRecommendationsParams(BaseModel):
 24 |     """Parameters for getting optimization recommendations."""
 25 | 
 26 |     recommendation_types: List[str]
 27 |     category_id: Optional[str] = None
 28 | 
 29 | 
 30 | class UpdateCatalogItemParams(BaseModel):
 31 |     """Parameters for updating a catalog item."""
 32 | 
 33 |     item_id: str
 34 |     name: Optional[str] = None
 35 |     short_description: Optional[str] = None
 36 |     description: Optional[str] = None
 37 |     category: Optional[str] = None
 38 |     price: Optional[str] = None
 39 |     active: Optional[bool] = None
 40 |     order: Optional[int] = None
 41 | 
 42 | 
 43 | def get_optimization_recommendations(
 44 |     config: ServerConfig, auth_manager: AuthManager, params: OptimizationRecommendationsParams
 45 | ) -> Dict:
 46 |     """
 47 |     Get optimization recommendations for the ServiceNow Service Catalog.
 48 | 
 49 |     Args:
 50 |         config: The server configuration
 51 |         auth_manager: The authentication manager
 52 |         params: The parameters for getting optimization recommendations
 53 | 
 54 |     Returns:
 55 |         A dictionary containing the optimization recommendations
 56 |     """
 57 |     logger.info("Getting catalog optimization recommendations")
 58 |     
 59 |     recommendations = []
 60 |     category_id = params.category_id
 61 |     
 62 |     try:
 63 |         # Get recommendations based on the requested types
 64 |         for rec_type in params.recommendation_types:
 65 |             if rec_type == "inactive_items":
 66 |                 items = _get_inactive_items(config, auth_manager, category_id)
 67 |                 if items:
 68 |                     recommendations.append({
 69 |                         "type": "inactive_items",
 70 |                         "title": "Inactive Catalog Items",
 71 |                         "description": "Items that are currently inactive in the catalog",
 72 |                         "items": items,
 73 |                         "impact": "medium",
 74 |                         "effort": "low",
 75 |                         "action": "Review and either update or remove these items",
 76 |                     })
 77 |             
 78 |             elif rec_type == "low_usage":
 79 |                 items = _get_low_usage_items(config, auth_manager, category_id)
 80 |                 if items:
 81 |                     recommendations.append({
 82 |                         "type": "low_usage",
 83 |                         "title": "Low Usage Catalog Items",
 84 |                         "description": "Items that have very few orders",
 85 |                         "items": items,
 86 |                         "impact": "medium",
 87 |                         "effort": "medium",
 88 |                         "action": "Consider promoting these items or removing them if no longer needed",
 89 |                     })
 90 |             
 91 |             elif rec_type == "high_abandonment":
 92 |                 items = _get_high_abandonment_items(config, auth_manager, category_id)
 93 |                 if items:
 94 |                     recommendations.append({
 95 |                         "type": "high_abandonment",
 96 |                         "title": "High Abandonment Rate Items",
 97 |                         "description": "Items that are frequently added to cart but not ordered",
 98 |                         "items": items,
 99 |                         "impact": "high",
100 |                         "effort": "medium",
101 |                         "action": "Simplify the request process or improve the item description",
102 |                     })
103 |             
104 |             elif rec_type == "slow_fulfillment":
105 |                 items = _get_slow_fulfillment_items(config, auth_manager, category_id)
106 |                 if items:
107 |                     recommendations.append({
108 |                         "type": "slow_fulfillment",
109 |                         "title": "Slow Fulfillment Items",
110 |                         "description": "Items that take longer than average to fulfill",
111 |                         "items": items,
112 |                         "impact": "high",
113 |                         "effort": "high",
114 |                         "action": "Review the fulfillment process and identify bottlenecks",
115 |                     })
116 |             
117 |             elif rec_type == "description_quality":
118 |                 items = _get_poor_description_items(config, auth_manager, category_id)
119 |                 if items:
120 |                     recommendations.append({
121 |                         "type": "description_quality",
122 |                         "title": "Poor Description Quality",
123 |                         "description": "Items with missing, short, or low-quality descriptions",
124 |                         "items": items,
125 |                         "impact": "medium",
126 |                         "effort": "low",
127 |                         "action": "Improve the descriptions to better explain the item's purpose and benefits",
128 |                     })
129 |         
130 |         return {
131 |             "success": True,
132 |             "recommendations": recommendations,
133 |         }
134 |     
135 |     except Exception as e:
136 |         logger.error(f"Error getting optimization recommendations: {e}")
137 |         return {
138 |             "success": False,
139 |             "message": f"Error getting optimization recommendations: {str(e)}",
140 |             "recommendations": [],
141 |         }
142 | 
143 | 
144 | def update_catalog_item(
145 |     config: ServerConfig, auth_manager: AuthManager, params: UpdateCatalogItemParams
146 | ) -> Dict:
147 |     """
148 |     Update a catalog item.
149 | 
150 |     Args:
151 |         config: The server configuration
152 |         auth_manager: The authentication manager
153 |         params: The parameters for updating the catalog item
154 | 
155 |     Returns:
156 |         A dictionary containing the result of the update operation
157 |     """
158 |     logger.info(f"Updating catalog item: {params.item_id}")
159 |     
160 |     try:
161 |         # Build the request body with only the provided parameters
162 |         body = {}
163 |         if params.name is not None:
164 |             body["name"] = params.name
165 |         if params.short_description is not None:
166 |             body["short_description"] = params.short_description
167 |         if params.description is not None:
168 |             body["description"] = params.description
169 |         if params.category is not None:
170 |             body["category"] = params.category
171 |         if params.price is not None:
172 |             body["price"] = params.price
173 |         if params.active is not None:
174 |             body["active"] = str(params.active).lower()
175 |         if params.order is not None:
176 |             body["order"] = str(params.order)
177 |         
178 |         # Make the API request
179 |         url = f"{config.instance_url}/api/now/table/sc_cat_item/{params.item_id}"
180 |         headers = auth_manager.get_headers()
181 |         headers["Content-Type"] = "application/json"
182 |         
183 |         response = requests.patch(url, headers=headers, json=body)
184 |         response.raise_for_status()
185 |         
186 |         return {
187 |             "success": True,
188 |             "message": "Catalog item updated successfully",
189 |             "data": response.json()["result"],
190 |         }
191 |     
192 |     except Exception as e:
193 |         logger.error(f"Error updating catalog item: {e}")
194 |         return {
195 |             "success": False,
196 |             "message": f"Error updating catalog item: {str(e)}",
197 |             "data": None,
198 |         }
199 | 
200 | 
201 | def _get_inactive_items(
202 |     config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
203 | ) -> List[Dict]:
204 |     """
205 |     Get inactive catalog items.
206 | 
207 |     Args:
208 |         config: The server configuration
209 |         auth_manager: The authentication manager
210 |         category_id: Optional category ID to filter by
211 | 
212 |     Returns:
213 |         A list of inactive catalog items
214 |     """
215 |     try:
216 |         # Build the query
217 |         query = "active=false"
218 |         if category_id:
219 |             query += f"^category={category_id}"
220 |         
221 |         # Make the API request
222 |         url = f"{config.instance_url}/api/now/table/sc_cat_item"
223 |         headers = auth_manager.get_headers()
224 |         params = {
225 |             "sysparm_query": query,
226 |             "sysparm_fields": "sys_id,name,short_description,category",
227 |             "sysparm_limit": "50",
228 |         }
229 |         
230 |         response = requests.get(url, headers=headers, params=params)
231 |         response.raise_for_status()
232 |         
233 |         return response.json()["result"]
234 |     
235 |     except Exception as e:
236 |         logger.error(f"Error getting inactive items: {e}")
237 |         return []
238 | 
239 | 
240 | def _get_low_usage_items(
241 |     config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
242 | ) -> List[Dict]:
243 |     """
244 |     Get catalog items with low usage.
245 | 
246 |     Args:
247 |         config: The server configuration
248 |         auth_manager: The authentication manager
249 |         category_id: Optional category ID to filter by
250 | 
251 |     Returns:
252 |         A list of catalog items with low usage
253 |     """
254 |     try:
255 |         # Build the query
256 |         query = "active=true"
257 |         if category_id:
258 |             query += f"^category={category_id}"
259 |         
260 |         # Make the API request
261 |         url = f"{config.instance_url}/api/now/table/sc_cat_item"
262 |         headers = auth_manager.get_headers()
263 |         params = {
264 |             "sysparm_query": query,
265 |             "sysparm_fields": "sys_id,name,short_description,category",
266 |             "sysparm_limit": "50",
267 |         }
268 |         
269 |         response = requests.get(url, headers=headers, params=params)
270 |         response.raise_for_status()
271 |         
272 |         # In a real implementation, we would query the request table to get actual usage data
273 |         # For this example, we'll simulate low usage with random data
274 |         items = response.json()["result"]
275 |         
276 |         # Select a random subset of items to mark as low usage
277 |         low_usage_items = random.sample(items, min(len(items), 5))
278 |         
279 |         # Add usage data to the items
280 |         for item in low_usage_items:
281 |             item["order_count"] = random.randint(1, 5)  # Low number of orders
282 |         
283 |         return low_usage_items
284 |     
285 |     except Exception as e:
286 |         logger.error(f"Error getting low usage items: {e}")
287 |         return []
288 | 
289 | 
290 | def _get_high_abandonment_items(
291 |     config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
292 | ) -> List[Dict]:
293 |     """
294 |     Get catalog items with high abandonment rates.
295 | 
296 |     Args:
297 |         config: The server configuration
298 |         auth_manager: The authentication manager
299 |         category_id: Optional category ID to filter by
300 | 
301 |     Returns:
302 |         A list of catalog items with high abandonment rates
303 |     """
304 |     try:
305 |         # Build the query
306 |         query = "active=true"
307 |         if category_id:
308 |             query += f"^category={category_id}"
309 |         
310 |         # Make the API request
311 |         url = f"{config.instance_url}/api/now/table/sc_cat_item"
312 |         headers = auth_manager.get_headers()
313 |         params = {
314 |             "sysparm_query": query,
315 |             "sysparm_fields": "sys_id,name,short_description,category",
316 |             "sysparm_limit": "50",
317 |         }
318 |         
319 |         response = requests.get(url, headers=headers, params=params)
320 |         response.raise_for_status()
321 |         
322 |         # In a real implementation, we would query the request table to get actual abandonment data
323 |         # For this example, we'll simulate high abandonment with random data
324 |         items = response.json()["result"]
325 |         
326 |         # Select a random subset of items to mark as high abandonment
327 |         high_abandonment_items = random.sample(items, min(len(items), 5))
328 |         
329 |         # Add abandonment data to the items
330 |         for item in high_abandonment_items:
331 |             abandonment_rate = random.randint(40, 80)  # High abandonment rate (40-80%)
332 |             cart_adds = random.randint(20, 100)  # Number of cart adds
333 |             orders = int(cart_adds * (1 - abandonment_rate / 100))  # Number of completed orders
334 |             
335 |             item["abandonment_rate"] = abandonment_rate
336 |             item["cart_adds"] = cart_adds
337 |             item["orders"] = orders
338 |         
339 |         return high_abandonment_items
340 |     
341 |     except Exception as e:
342 |         logger.error(f"Error getting high abandonment items: {e}")
343 |         return []
344 | 
345 | 
346 | def _get_slow_fulfillment_items(
347 |     config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
348 | ) -> List[Dict]:
349 |     """
350 |     Get catalog items with slow fulfillment times.
351 | 
352 |     Args:
353 |         config: The server configuration
354 |         auth_manager: The authentication manager
355 |         category_id: Optional category ID to filter by
356 | 
357 |     Returns:
358 |         A list of catalog items with slow fulfillment times
359 |     """
360 |     try:
361 |         # Build the query
362 |         query = "active=true"
363 |         if category_id:
364 |             query += f"^category={category_id}"
365 |         
366 |         # Make the API request
367 |         url = f"{config.instance_url}/api/now/table/sc_cat_item"
368 |         headers = auth_manager.get_headers()
369 |         params = {
370 |             "sysparm_query": query,
371 |             "sysparm_fields": "sys_id,name,short_description,category",
372 |             "sysparm_limit": "50",
373 |         }
374 |         
375 |         response = requests.get(url, headers=headers, params=params)
376 |         response.raise_for_status()
377 |         
378 |         # In a real implementation, we would query the request table to get actual fulfillment data
379 |         # For this example, we'll simulate slow fulfillment with random data
380 |         items = response.json()["result"]
381 |         
382 |         # Select a random subset of items to mark as slow fulfillment
383 |         slow_fulfillment_items = random.sample(items, min(len(items), 5))
384 |         
385 |         # Add fulfillment data to the items
386 |         catalog_avg_time = 2.5  # Average fulfillment time for the catalog (in days)
387 |         
388 |         for item in slow_fulfillment_items:
389 |             # Generate a fulfillment time that's significantly higher than the catalog average
390 |             fulfillment_time = random.uniform(5.0, 10.0)  # 5-10 days
391 |             
392 |             item["avg_fulfillment_time"] = fulfillment_time
393 |             item["avg_fulfillment_time_vs_catalog"] = round(fulfillment_time / catalog_avg_time, 1)
394 |         
395 |         return slow_fulfillment_items
396 |     
397 |     except Exception as e:
398 |         logger.error(f"Error getting slow fulfillment items: {e}")
399 |         return []
400 | 
401 | 
402 | def _get_poor_description_items(
403 |     config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
404 | ) -> List[Dict]:
405 |     """
406 |     Get catalog items with poor description quality.
407 | 
408 |     Args:
409 |         config: The server configuration
410 |         auth_manager: The authentication manager
411 |         category_id: Optional category ID to filter by
412 | 
413 |     Returns:
414 |         A list of catalog items with poor description quality
415 |     """
416 |     try:
417 |         # Build the query
418 |         query = "active=true"
419 |         if category_id:
420 |             query += f"^category={category_id}"
421 |         
422 |         # Make the API request
423 |         url = f"{config.instance_url}/api/now/table/sc_cat_item"
424 |         headers = auth_manager.get_headers()
425 |         params = {
426 |             "sysparm_query": query,
427 |             "sysparm_fields": "sys_id,name,short_description,category",
428 |             "sysparm_limit": "50",
429 |         }
430 |         
431 |         response = requests.get(url, headers=headers, params=params)
432 |         response.raise_for_status()
433 |         
434 |         items = response.json()["result"]
435 |         poor_description_items = []
436 |         
437 |         # Analyze each item's description quality
438 |         for item in items:
439 |             description = item.get("short_description", "")
440 |             quality_issues = []
441 |             quality_score = 100  # Start with perfect score
442 |             
443 |             # Check for empty description
444 |             if not description:
445 |                 quality_issues.append("Missing description")
446 |                 quality_score = 0
447 |             else:
448 |                 # Check for short description
449 |                 if len(description) < 30:
450 |                     quality_issues.append("Description too short")
451 |                     quality_issues.append("Lacks detail")
452 |                     quality_score -= 70
453 |                 
454 |                 # Check for instructional language instead of descriptive
455 |                 if "click here" in description.lower() or "request this" in description.lower():
456 |                     quality_issues.append("Uses instructional language instead of descriptive")
457 |                     quality_score -= 50
458 |                 
459 |                 # Check for vague terms
460 |                 vague_terms = ["etc", "and more", "and so on", "stuff", "things"]
461 |                 if any(term in description.lower() for term in vague_terms):
462 |                     quality_issues.append("Contains vague terms")
463 |                     quality_score -= 30
464 |             
465 |             # Ensure score is between 0 and 100
466 |             quality_score = max(0, min(100, quality_score))
467 |             
468 |             # Add to poor description items if quality is below threshold
469 |             if quality_score < 80:
470 |                 item["description_quality"] = quality_score
471 |                 item["quality_issues"] = quality_issues
472 |                 poor_description_items.append(item)
473 |         
474 |         return poor_description_items
475 |     
476 |     except Exception as e:
477 |         logger.error(f"Error getting poor description items: {e}")
478 |         return [] 
```

--------------------------------------------------------------------------------
/tests/test_changeset_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the changeset tools.
  3 | 
  4 | This module contains tests for the changeset tools in the ServiceNow MCP server.
  5 | """
  6 | 
  7 | import unittest
  8 | from unittest.mock import MagicMock, patch
  9 | 
 10 | from servicenow_mcp.auth.auth_manager import AuthManager
 11 | from servicenow_mcp.tools.changeset_tools import (
 12 |     AddFileToChangesetParams,
 13 |     CommitChangesetParams,
 14 |     CreateChangesetParams,
 15 |     GetChangesetDetailsParams,
 16 |     ListChangesetsParams,
 17 |     PublishChangesetParams,
 18 |     UpdateChangesetParams,
 19 |     add_file_to_changeset,
 20 |     commit_changeset,
 21 |     create_changeset,
 22 |     get_changeset_details,
 23 |     list_changesets,
 24 |     publish_changeset,
 25 |     update_changeset,
 26 | )
 27 | from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
 28 | 
 29 | 
 30 | class TestChangesetTools(unittest.TestCase):
 31 |     """Tests for the changeset tools."""
 32 | 
 33 |     def setUp(self):
 34 |         """Set up test fixtures."""
 35 |         auth_config = AuthConfig(
 36 |             type=AuthType.BASIC,
 37 |             basic=BasicAuthConfig(
 38 |                 username="test_user",
 39 |                 password="test_password"
 40 |             )
 41 |         )
 42 |         self.server_config = ServerConfig(
 43 |             instance_url="https://test.service-now.com",
 44 |             auth=auth_config,
 45 |         )
 46 |         self.auth_manager = MagicMock(spec=AuthManager)
 47 |         self.auth_manager.get_headers.return_value = {"Authorization": "Bearer test"}
 48 | 
 49 |     @patch("servicenow_mcp.tools.changeset_tools.requests.get")
 50 |     def test_list_changesets(self, mock_get):
 51 |         """Test listing changesets."""
 52 |         # Mock response
 53 |         mock_response = MagicMock()
 54 |         mock_response.json.return_value = {
 55 |             "result": [
 56 |                 {
 57 |                     "sys_id": "123",
 58 |                     "name": "Test Changeset",
 59 |                     "state": "in_progress",
 60 |                     "application": "Test App",
 61 |                     "developer": "test.user",
 62 |                 }
 63 |             ]
 64 |         }
 65 |         mock_response.raise_for_status.return_value = None
 66 |         mock_get.return_value = mock_response
 67 | 
 68 |         # Call the function
 69 |         params = {
 70 |             "limit": 10,
 71 |             "offset": 0,
 72 |             "state": "in_progress",
 73 |             "application": "Test App",
 74 |             "developer": "test.user",
 75 |         }
 76 |         result = list_changesets(self.auth_manager, self.server_config, params)
 77 | 
 78 |         # Verify the result
 79 |         self.assertTrue(result["success"])
 80 |         self.assertEqual(len(result["changesets"]), 1)
 81 |         self.assertEqual(result["changesets"][0]["sys_id"], "123")
 82 |         self.assertEqual(result["changesets"][0]["name"], "Test Changeset")
 83 | 
 84 |         # Verify the API call
 85 |         mock_get.assert_called_once()
 86 |         args, kwargs = mock_get.call_args
 87 |         self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_set")
 88 |         self.assertEqual(kwargs["headers"], {"Authorization": "Bearer test"})
 89 |         self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
 90 |         self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
 91 |         self.assertIn("sysparm_query", kwargs["params"])
 92 |         self.assertIn("state=in_progress", kwargs["params"]["sysparm_query"])
 93 |         self.assertIn("application=Test App", kwargs["params"]["sysparm_query"])
 94 |         self.assertIn("developer=test.user", kwargs["params"]["sysparm_query"])
 95 | 
 96 |     @patch("servicenow_mcp.tools.changeset_tools.requests.get")
 97 |     def test_get_changeset_details(self, mock_get):
 98 |         """Test getting changeset details."""
 99 |         # Mock responses
100 |         mock_changeset_response = MagicMock()
101 |         mock_changeset_response.json.return_value = {
102 |             "result": {
103 |                 "sys_id": "123",
104 |                 "name": "Test Changeset",
105 |                 "state": "in_progress",
106 |                 "application": "Test App",
107 |                 "developer": "test.user",
108 |             }
109 |         }
110 |         mock_changeset_response.raise_for_status.return_value = None
111 | 
112 |         mock_changes_response = MagicMock()
113 |         mock_changes_response.json.return_value = {
114 |             "result": [
115 |                 {
116 |                     "sys_id": "456",
117 |                     "name": "test_file.py",
118 |                     "type": "file",
119 |                     "update_set": "123",
120 |                 }
121 |             ]
122 |         }
123 |         mock_changes_response.raise_for_status.return_value = None
124 | 
125 |         # Set up the mock to return different responses for different URLs
126 |         def side_effect(*args, **kwargs):
127 |             url = args[0]
128 |             if "sys_update_set" in url:
129 |                 return mock_changeset_response
130 |             elif "sys_update_xml" in url:
131 |                 return mock_changes_response
132 |             return None
133 | 
134 |         mock_get.side_effect = side_effect
135 | 
136 |         # Call the function
137 |         params = {"changeset_id": "123"}
138 |         result = get_changeset_details(self.auth_manager, self.server_config, params)
139 | 
140 |         # Verify the result
141 |         self.assertTrue(result["success"])
142 |         self.assertEqual(result["changeset"]["sys_id"], "123")
143 |         self.assertEqual(result["changeset"]["name"], "Test Changeset")
144 |         self.assertEqual(len(result["changes"]), 1)
145 |         self.assertEqual(result["changes"][0]["sys_id"], "456")
146 |         self.assertEqual(result["changes"][0]["name"], "test_file.py")
147 | 
148 |         # Verify the API calls
149 |         self.assertEqual(mock_get.call_count, 2)
150 |         first_call_args, first_call_kwargs = mock_get.call_args_list[0]
151 |         self.assertEqual(
152 |             first_call_args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
153 |         )
154 |         self.assertEqual(first_call_kwargs["headers"], {"Authorization": "Bearer test"})
155 | 
156 |         second_call_args, second_call_kwargs = mock_get.call_args_list[1]
157 |         self.assertEqual(
158 |             second_call_args[0], "https://test.service-now.com/api/now/table/sys_update_xml"
159 |         )
160 |         self.assertEqual(second_call_kwargs["headers"], {"Authorization": "Bearer test"})
161 |         self.assertEqual(second_call_kwargs["params"]["sysparm_query"], "update_set=123")
162 | 
163 |     @patch("servicenow_mcp.tools.changeset_tools.requests.post")
164 |     def test_create_changeset(self, mock_post):
165 |         """Test creating a changeset."""
166 |         # Mock response
167 |         mock_response = MagicMock()
168 |         mock_response.json.return_value = {
169 |             "result": {
170 |                 "sys_id": "123",
171 |                 "name": "Test Changeset",
172 |                 "application": "Test App",
173 |                 "developer": "test.user",
174 |             }
175 |         }
176 |         mock_response.raise_for_status.return_value = None
177 |         mock_post.return_value = mock_response
178 | 
179 |         # Call the function
180 |         params = {
181 |             "name": "Test Changeset",
182 |             "application": "Test App",
183 |             "developer": "test.user",
184 |             "description": "Test description",
185 |         }
186 |         result = create_changeset(self.auth_manager, self.server_config, params)
187 | 
188 |         # Verify the result
189 |         self.assertTrue(result["success"])
190 |         self.assertEqual(result["changeset"]["sys_id"], "123")
191 |         self.assertEqual(result["changeset"]["name"], "Test Changeset")
192 |         self.assertEqual(result["message"], "Changeset created successfully")
193 | 
194 |         # Verify the API call
195 |         mock_post.assert_called_once()
196 |         args, kwargs = mock_post.call_args
197 |         self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_set")
198 |         self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
199 |         self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
200 |         self.assertEqual(kwargs["json"]["name"], "Test Changeset")
201 |         self.assertEqual(kwargs["json"]["application"], "Test App")
202 |         self.assertEqual(kwargs["json"]["developer"], "test.user")
203 |         self.assertEqual(kwargs["json"]["description"], "Test description")
204 | 
205 |     @patch("servicenow_mcp.tools.changeset_tools.requests.patch")
206 |     def test_update_changeset(self, mock_patch):
207 |         """Test updating a changeset."""
208 |         # Mock response
209 |         mock_response = MagicMock()
210 |         mock_response.json.return_value = {
211 |             "result": {
212 |                 "sys_id": "123",
213 |                 "name": "Updated Changeset",
214 |                 "state": "in_progress",
215 |                 "application": "Test App",
216 |                 "developer": "test.user",
217 |             }
218 |         }
219 |         mock_response.raise_for_status.return_value = None
220 |         mock_patch.return_value = mock_response
221 | 
222 |         # Call the function
223 |         params = {
224 |             "changeset_id": "123",
225 |             "name": "Updated Changeset",
226 |             "state": "in_progress",
227 |         }
228 |         result = update_changeset(self.auth_manager, self.server_config, params)
229 | 
230 |         # Verify the result
231 |         self.assertTrue(result["success"])
232 |         self.assertEqual(result["changeset"]["sys_id"], "123")
233 |         self.assertEqual(result["changeset"]["name"], "Updated Changeset")
234 |         self.assertEqual(result["message"], "Changeset updated successfully")
235 | 
236 |         # Verify the API call
237 |         mock_patch.assert_called_once()
238 |         args, kwargs = mock_patch.call_args
239 |         self.assertEqual(
240 |             args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
241 |         )
242 |         self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
243 |         self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
244 |         self.assertEqual(kwargs["json"]["name"], "Updated Changeset")
245 |         self.assertEqual(kwargs["json"]["state"], "in_progress")
246 | 
247 |     @patch("servicenow_mcp.tools.changeset_tools.requests.patch")
248 |     def test_commit_changeset(self, mock_patch):
249 |         """Test committing a changeset."""
250 |         # Mock response
251 |         mock_response = MagicMock()
252 |         mock_response.json.return_value = {
253 |             "result": {
254 |                 "sys_id": "123",
255 |                 "name": "Test Changeset",
256 |                 "state": "complete",
257 |                 "application": "Test App",
258 |                 "developer": "test.user",
259 |             }
260 |         }
261 |         mock_response.raise_for_status.return_value = None
262 |         mock_patch.return_value = mock_response
263 | 
264 |         # Call the function
265 |         params = {
266 |             "changeset_id": "123",
267 |             "commit_message": "Commit message",
268 |         }
269 |         result = commit_changeset(self.auth_manager, self.server_config, params)
270 | 
271 |         # Verify the result
272 |         self.assertTrue(result["success"])
273 |         self.assertEqual(result["changeset"]["sys_id"], "123")
274 |         self.assertEqual(result["changeset"]["state"], "complete")
275 |         self.assertEqual(result["message"], "Changeset committed successfully")
276 | 
277 |         # Verify the API call
278 |         mock_patch.assert_called_once()
279 |         args, kwargs = mock_patch.call_args
280 |         self.assertEqual(
281 |             args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
282 |         )
283 |         self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
284 |         self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
285 |         self.assertEqual(kwargs["json"]["state"], "complete")
286 |         self.assertEqual(kwargs["json"]["description"], "Commit message")
287 | 
288 |     @patch("servicenow_mcp.tools.changeset_tools.requests.patch")
289 |     def test_publish_changeset(self, mock_patch):
290 |         """Test publishing a changeset."""
291 |         # Mock response
292 |         mock_response = MagicMock()
293 |         mock_response.json.return_value = {
294 |             "result": {
295 |                 "sys_id": "123",
296 |                 "name": "Test Changeset",
297 |                 "state": "published",
298 |                 "application": "Test App",
299 |                 "developer": "test.user",
300 |             }
301 |         }
302 |         mock_response.raise_for_status.return_value = None
303 |         mock_patch.return_value = mock_response
304 | 
305 |         # Call the function
306 |         params = {
307 |             "changeset_id": "123",
308 |             "publish_notes": "Publish notes",
309 |         }
310 |         result = publish_changeset(self.auth_manager, self.server_config, params)
311 | 
312 |         # Verify the result
313 |         self.assertTrue(result["success"])
314 |         self.assertEqual(result["changeset"]["sys_id"], "123")
315 |         self.assertEqual(result["changeset"]["state"], "published")
316 |         self.assertEqual(result["message"], "Changeset published successfully")
317 | 
318 |         # Verify the API call
319 |         mock_patch.assert_called_once()
320 |         args, kwargs = mock_patch.call_args
321 |         self.assertEqual(
322 |             args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
323 |         )
324 |         self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
325 |         self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
326 |         self.assertEqual(kwargs["json"]["state"], "published")
327 |         self.assertEqual(kwargs["json"]["description"], "Publish notes")
328 | 
329 |     @patch("servicenow_mcp.tools.changeset_tools.requests.post")
330 |     def test_add_file_to_changeset(self, mock_post):
331 |         """Test adding a file to a changeset."""
332 |         # Mock response
333 |         mock_response = MagicMock()
334 |         mock_response.json.return_value = {
335 |             "result": {
336 |                 "sys_id": "456",
337 |                 "name": "test_file.py",
338 |                 "type": "file",
339 |                 "update_set": "123",
340 |                 "payload": "print('Hello, world!')",
341 |             }
342 |         }
343 |         mock_response.raise_for_status.return_value = None
344 |         mock_post.return_value = mock_response
345 | 
346 |         # Call the function
347 |         params = {
348 |             "changeset_id": "123",
349 |             "file_path": "test_file.py",
350 |             "file_content": "print('Hello, world!')",
351 |         }
352 |         result = add_file_to_changeset(self.auth_manager, self.server_config, params)
353 | 
354 |         # Verify the result
355 |         self.assertTrue(result["success"])
356 |         self.assertEqual(result["file"]["sys_id"], "456")
357 |         self.assertEqual(result["file"]["name"], "test_file.py")
358 |         self.assertEqual(result["message"], "File added to changeset successfully")
359 | 
360 |         # Verify the API call
361 |         mock_post.assert_called_once()
362 |         args, kwargs = mock_post.call_args
363 |         self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_xml")
364 |         self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
365 |         self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
366 |         self.assertEqual(kwargs["json"]["update_set"], "123")
367 |         self.assertEqual(kwargs["json"]["name"], "test_file.py")
368 |         self.assertEqual(kwargs["json"]["payload"], "print('Hello, world!')")
369 |         self.assertEqual(kwargs["json"]["type"], "file")
370 | 
371 | 
372 | class TestChangesetToolsParams(unittest.TestCase):
373 |     """Tests for the changeset tools parameter classes."""
374 | 
375 |     def test_list_changesets_params(self):
376 |         """Test ListChangesetsParams."""
377 |         params = ListChangesetsParams(
378 |             limit=20,
379 |             offset=10,
380 |             state="in_progress",
381 |             application="Test App",
382 |             developer="test.user",
383 |             timeframe="recent",
384 |             query="name=test",
385 |         )
386 |         self.assertEqual(params.limit, 20)
387 |         self.assertEqual(params.offset, 10)
388 |         self.assertEqual(params.state, "in_progress")
389 |         self.assertEqual(params.application, "Test App")
390 |         self.assertEqual(params.developer, "test.user")
391 |         self.assertEqual(params.timeframe, "recent")
392 |         self.assertEqual(params.query, "name=test")
393 | 
394 |     def test_get_changeset_details_params(self):
395 |         """Test GetChangesetDetailsParams."""
396 |         params = GetChangesetDetailsParams(changeset_id="123")
397 |         self.assertEqual(params.changeset_id, "123")
398 | 
399 |     def test_create_changeset_params(self):
400 |         """Test CreateChangesetParams."""
401 |         params = CreateChangesetParams(
402 |             name="Test Changeset",
403 |             description="Test description",
404 |             application="Test App",
405 |             developer="test.user",
406 |         )
407 |         self.assertEqual(params.name, "Test Changeset")
408 |         self.assertEqual(params.description, "Test description")
409 |         self.assertEqual(params.application, "Test App")
410 |         self.assertEqual(params.developer, "test.user")
411 | 
412 |     def test_update_changeset_params(self):
413 |         """Test UpdateChangesetParams."""
414 |         params = UpdateChangesetParams(
415 |             changeset_id="123",
416 |             name="Updated Changeset",
417 |             description="Updated description",
418 |             state="in_progress",
419 |             developer="test.user",
420 |         )
421 |         self.assertEqual(params.changeset_id, "123")
422 |         self.assertEqual(params.name, "Updated Changeset")
423 |         self.assertEqual(params.description, "Updated description")
424 |         self.assertEqual(params.state, "in_progress")
425 |         self.assertEqual(params.developer, "test.user")
426 | 
427 |     def test_commit_changeset_params(self):
428 |         """Test CommitChangesetParams."""
429 |         params = CommitChangesetParams(
430 |             changeset_id="123",
431 |             commit_message="Commit message",
432 |         )
433 |         self.assertEqual(params.changeset_id, "123")
434 |         self.assertEqual(params.commit_message, "Commit message")
435 | 
436 |     def test_publish_changeset_params(self):
437 |         """Test PublishChangesetParams."""
438 |         params = PublishChangesetParams(
439 |             changeset_id="123",
440 |             publish_notes="Publish notes",
441 |         )
442 |         self.assertEqual(params.changeset_id, "123")
443 |         self.assertEqual(params.publish_notes, "Publish notes")
444 | 
445 |     def test_add_file_to_changeset_params(self):
446 |         """Test AddFileToChangesetParams."""
447 |         params = AddFileToChangesetParams(
448 |             changeset_id="123",
449 |             file_path="test_file.py",
450 |             file_content="print('Hello, world!')",
451 |         )
452 |         self.assertEqual(params.changeset_id, "123")
453 |         self.assertEqual(params.file_path, "test_file.py")
454 |         self.assertEqual(params.file_content, "print('Hello, world!')")
455 | 
456 | 
457 | if __name__ == "__main__":
458 |     unittest.main() 
```
Page 3/5FirstPrevNextLast