This is page 3 of 4. Use http://codebase.md/osomai/servicenow-mcp?page={x} to view the full context.
# Directory Structure
```
├── .cursorrules
├── .DS_Store
├── .env.example
├── .gitignore
├── config
│ └── tool_packages.yaml
├── debug_workflow_api.py
├── Dockerfile
├── docs
│ ├── catalog_optimization_plan.md
│ ├── catalog_variables.md
│ ├── catalog.md
│ ├── change_management.md
│ ├── changeset_management.md
│ ├── incident_management.md
│ ├── knowledge_base.md
│ ├── user_management.md
│ └── workflow_management.md
├── examples
│ ├── catalog_integration_test.py
│ ├── catalog_optimization_example.py
│ ├── change_management_demo.py
│ ├── changeset_management_demo.py
│ ├── claude_catalog_demo.py
│ ├── claude_desktop_config.json
│ ├── claude_incident_demo.py
│ ├── debug_workflow_api.py
│ ├── wake_servicenow_instance.py
│ └── workflow_management_demo.py
├── LICENSE
├── prompts
│ └── add_servicenow_mcp_tool.md
├── pyproject.toml
├── README.md
├── scripts
│ ├── check_pdi_info.py
│ ├── check_pdi_status.py
│ ├── install_claude_desktop.sh
│ ├── setup_api_key.py
│ ├── setup_auth.py
│ ├── setup_oauth.py
│ ├── setup.sh
│ └── test_connection.py
├── src
│ ├── .DS_Store
│ └── servicenow_mcp
│ ├── __init__.py
│ ├── .DS_Store
│ ├── auth
│ │ ├── __init__.py
│ │ └── auth_manager.py
│ ├── cli.py
│ ├── server_sse.py
│ ├── server.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── catalog_optimization.py
│ │ ├── catalog_tools.py
│ │ ├── catalog_variables.py
│ │ ├── change_tools.py
│ │ ├── changeset_tools.py
│ │ ├── epic_tools.py
│ │ ├── incident_tools.py
│ │ ├── knowledge_base.py
│ │ ├── project_tools.py
│ │ ├── script_include_tools.py
│ │ ├── scrum_task_tools.py
│ │ ├── story_tools.py
│ │ ├── user_tools.py
│ │ └── workflow_tools.py
│ └── utils
│ ├── __init__.py
│ ├── config.py
│ └── tool_utils.py
├── tests
│ ├── test_catalog_optimization.py
│ ├── test_catalog_resources.py
│ ├── test_catalog_tools.py
│ ├── test_catalog_variables.py
│ ├── test_change_tools.py
│ ├── test_changeset_resources.py
│ ├── test_changeset_tools.py
│ ├── test_config.py
│ ├── test_incident_tools.py
│ ├── test_knowledge_base.py
│ ├── test_script_include_resources.py
│ ├── test_script_include_tools.py
│ ├── test_server_catalog_optimization.py
│ ├── test_server_catalog.py
│ ├── test_server_workflow.py
│ ├── test_user_tools.py
│ ├── test_workflow_tools_direct.py
│ ├── test_workflow_tools_params.py
│ └── test_workflow_tools.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/scrum_task_tools.py:
--------------------------------------------------------------------------------
```python
"""
Scrum Task management tools for the ServiceNow MCP server.
This module provides tools for managing stories in ServiceNow.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Type, TypeVar
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
# Type variable for Pydantic models
T = TypeVar('T', bound=BaseModel)
class CreateScrumTaskParams(BaseModel):
"""Parameters for creating a scrum task."""
story: str = Field(..., description="Short description of the story. It requires the System ID of the story.")
short_description: str = Field(..., description="Short description of the scrum task")
priority: Optional[str] = Field(None, description="Priority of scrum task (1 is Critical, 2 is High, 3 is Moderate, 4 is Low)")
planned_hours: Optional[int] = Field(None, description="Planned hours for the scrum task")
remaining_hours: Optional[int] = Field(None, description="Remaining hours for the scrum task")
hours: Optional[int] = Field(None, description="Actual Hours for the scrum task")
description: Optional[str] = Field(None, description="Detailed description of the scrum task")
type: Optional[str] = Field(None, description="Type of scrum task (1 is Analysis, 2 is Coding, 3 is Documentation, 4 is Testing)")
state: Optional[str] = Field(None, description="State of scrum task (-6 is Draft,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
assignment_group: Optional[str] = Field(None, description="Group assigned to the scrum task")
assigned_to: Optional[str] = Field(None, description="User assigned to the scrum task")
work_notes: Optional[str] = Field(None, description="Work notes to add to the scrum task")
class UpdateScrumTaskParams(BaseModel):
"""Parameters for updating a scrum task."""
scrum_task_id: str = Field(..., description="Scrum Task ID or sys_id")
short_description: Optional[str] = Field(None, description="Short description of the scrum task")
priority: Optional[str] = Field(None, description="Priority of scrum task (1 is Critical, 2 is High, 3 is Moderate, 4 is Low)")
planned_hours: Optional[int] = Field(None, description="Planned hours for the scrum task")
remaining_hours: Optional[int] = Field(None, description="Remaining hours for the scrum task")
hours: Optional[int] = Field(None, description="Actual Hours for the scrum task")
description: Optional[str] = Field(None, description="Detailed description of the scrum task")
type: Optional[str] = Field(None, description="Type of scrum task (1 is Analysis, 2 is Coding, 3 is Documentation, 4 is Testing)")
state: Optional[str] = Field(None, description="State of scrum task (-6 is Draft,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
assignment_group: Optional[str] = Field(None, description="Group assigned to the scrum task")
assigned_to: Optional[str] = Field(None, description="User assigned to the scrum task")
work_notes: Optional[str] = Field(None, description="Work notes to add to the scrum task")
class ListScrumTasksParams(BaseModel):
"""Parameters for listing scrum tasks."""
limit: Optional[int] = Field(10, description="Maximum number of records to return")
offset: Optional[int] = Field(0, description="Offset to start from")
state: Optional[str] = Field(None, description="Filter by state")
assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
query: Optional[str] = Field(None, description="Additional query string")
def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
"""
Helper function to unwrap and validate parameters.
Args:
params: The parameters to unwrap and validate.
model_class: The Pydantic model class to validate against.
required_fields: List of required field names.
Returns:
A tuple of (success, result) where result is either the validated parameters or an error message.
"""
# Handle case where params might be wrapped in another dictionary
if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
params = params["params"]
# Handle case where params might be a Pydantic model object
if not isinstance(params, dict):
try:
# Try to convert to dict if it's a Pydantic model
logger.warning("Params is not a dictionary. Attempting to convert...")
params = params.dict() if hasattr(params, "dict") else dict(params)
except Exception as e:
logger.error(f"Failed to convert params to dictionary: {e}")
return {
"success": False,
"message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
}
# Validate required parameters are present
if required_fields:
for field in required_fields:
if field not in params:
return {
"success": False,
"message": f"Missing required parameter '{field}'",
}
try:
# Validate parameters against the model
validated_params = model_class(**params)
return {
"success": True,
"params": validated_params,
}
except Exception as e:
logger.error(f"Error validating parameters: {e}")
return {
"success": False,
"message": f"Error validating parameters: {str(e)}",
}
def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
"""
Helper function to get the instance URL from either server_config or auth_manager.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
Returns:
The instance URL if found, None otherwise.
"""
if hasattr(server_config, 'instance_url'):
return server_config.instance_url
elif hasattr(auth_manager, 'instance_url'):
return auth_manager.instance_url
else:
logger.error("Cannot find instance_url in either server_config or auth_manager")
return None
def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
"""
Helper function to get headers from either auth_manager or server_config.
Args:
auth_manager: The authentication manager or object passed as auth_manager.
server_config: The server configuration or object passed as server_config.
Returns:
The headers if found, None otherwise.
"""
# Try to get headers from auth_manager
if hasattr(auth_manager, 'get_headers'):
return auth_manager.get_headers()
# If auth_manager doesn't have get_headers, try server_config
if hasattr(server_config, 'get_headers'):
return server_config.get_headers()
# If neither has get_headers, check if auth_manager is actually a ServerConfig
# and server_config is actually an AuthManager (parameters swapped)
if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
return server_config.get_headers()
logger.error("Cannot find get_headers method in either auth_manager or server_config")
return None
def create_scrum_task(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Create a new scrum task in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for creating the scrum task.
Returns:
The created scrum task.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
CreateScrumTaskParams,
required_fields=["short_description", "story"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {
"story": validated_params.story,
"short_description": validated_params.short_description,
}
# Add optional fields if provided
if validated_params.priority:
data["priority"] = validated_params.priority
if validated_params.planned_hours:
data["planned_hours"] = validated_params.planned_hours
if validated_params.remaining_hours:
data["remaining_hours"] = validated_params.remaining_hours
if validated_params.hours:
data["hours"] = validated_params.hours
if validated_params.description:
data["description"] = validated_params.description
if validated_params.type:
data["type"] = validated_params.type
if validated_params.state:
data["state"] = validated_params.state
if validated_params.assignment_group:
data["assignment_group"] = validated_params.assignment_group
if validated_params.assigned_to:
data["assigned_to"] = validated_params.assigned_to
if validated_params.work_notes:
data["work_notes"] = validated_params.work_notes
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/rm_scrum_task"
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Scrum Task created successfully",
"scrum_task": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error creating scrum task: {e}")
return {
"success": False,
"message": f"Error creating scrum task: {str(e)}",
}
def update_scrum_task(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Update an existing scrum task in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for updating the scrum task.
Returns:
The updated scrum task.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
UpdateScrumTaskParams,
required_fields=["scrum_task_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {}
# Add optional fields if provided
if validated_params.short_description:
data["short_description"] = validated_params.short_description
if validated_params.priority:
data["priority"] = validated_params.priority
if validated_params.planned_hours:
data["planned_hours"] = validated_params.planned_hours
if validated_params.remaining_hours:
data["remaining_hours"] = validated_params.remaining_hours
if validated_params.hours:
data["hours"] = validated_params.hours
if validated_params.description:
data["description"] = validated_params.description
if validated_params.type:
data["type"] = validated_params.type
if validated_params.state:
data["state"] = validated_params.state
if validated_params.assignment_group:
data["assignment_group"] = validated_params.assignment_group
if validated_params.assigned_to:
data["assigned_to"] = validated_params.assigned_to
if validated_params.work_notes:
data["work_notes"] = validated_params.work_notes
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/rm_scrum_task/{validated_params.scrum_task_id}"
try:
response = requests.put(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Scrum Task updated successfully",
"scrum_task": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error updating scrum task: {e}")
return {
"success": False,
"message": f"Error updating scrum task: {str(e)}",
}
def list_scrum_tasks(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
List scrum tasks from ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for listing scrum tasks.
Returns:
A list of scrum tasks.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
ListScrumTasksParams
)
if not result["success"]:
return result
validated_params = result["params"]
# Build the query
query_parts = []
if validated_params.state:
query_parts.append(f"state={validated_params.state}")
if validated_params.assignment_group:
query_parts.append(f"assignment_group={validated_params.assignment_group}")
# Handle timeframe filtering
if validated_params.timeframe:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if validated_params.timeframe == "upcoming":
query_parts.append(f"start_date>{now}")
elif validated_params.timeframe == "in-progress":
query_parts.append(f"start_date<{now}^end_date>{now}")
elif validated_params.timeframe == "completed":
query_parts.append(f"end_date<{now}")
# Add any additional query string
if validated_params.query:
query_parts.append(validated_params.query)
# Combine query parts
query = "^".join(query_parts) if query_parts else ""
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Make the API request
url = f"{instance_url}/api/now/table/rm_scrum_task"
params = {
"sysparm_limit": validated_params.limit,
"sysparm_offset": validated_params.offset,
"sysparm_query": query,
"sysparm_display_value": "true",
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
result = response.json()
# Handle the case where result["result"] is a list
scrum_tasks = result.get("result", [])
count = len(scrum_tasks)
return {
"success": True,
"scrum_tasks": scrum_tasks,
"count": count,
"total": count, # Use count as total if total is not provided
}
except requests.exceptions.RequestException as e:
logger.error(f"Error listing stories: {e}")
return {
"success": False,
"message": f"Error listing stories: {str(e)}",
}
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/catalog_optimization.py:
--------------------------------------------------------------------------------
```python
"""
Tools for optimizing the ServiceNow Service Catalog.
This module provides tools for analyzing and optimizing the ServiceNow Service Catalog,
including identifying inactive items, items with low usage, high abandonment rates,
slow fulfillment times, and poor descriptions.
"""
import logging
import random
from dataclasses import dataclass
from typing import Dict, List, Optional
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
class OptimizationRecommendationsParams(BaseModel):
"""Parameters for getting optimization recommendations."""
recommendation_types: List[str]
category_id: Optional[str] = None
class UpdateCatalogItemParams(BaseModel):
"""Parameters for updating a catalog item."""
item_id: str
name: Optional[str] = None
short_description: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
price: Optional[str] = None
active: Optional[bool] = None
order: Optional[int] = None
def get_optimization_recommendations(
config: ServerConfig, auth_manager: AuthManager, params: OptimizationRecommendationsParams
) -> Dict:
"""
Get optimization recommendations for the ServiceNow Service Catalog.
Args:
config: The server configuration
auth_manager: The authentication manager
params: The parameters for getting optimization recommendations
Returns:
A dictionary containing the optimization recommendations
"""
logger.info("Getting catalog optimization recommendations")
recommendations = []
category_id = params.category_id
try:
# Get recommendations based on the requested types
for rec_type in params.recommendation_types:
if rec_type == "inactive_items":
items = _get_inactive_items(config, auth_manager, category_id)
if items:
recommendations.append({
"type": "inactive_items",
"title": "Inactive Catalog Items",
"description": "Items that are currently inactive in the catalog",
"items": items,
"impact": "medium",
"effort": "low",
"action": "Review and either update or remove these items",
})
elif rec_type == "low_usage":
items = _get_low_usage_items(config, auth_manager, category_id)
if items:
recommendations.append({
"type": "low_usage",
"title": "Low Usage Catalog Items",
"description": "Items that have very few orders",
"items": items,
"impact": "medium",
"effort": "medium",
"action": "Consider promoting these items or removing them if no longer needed",
})
elif rec_type == "high_abandonment":
items = _get_high_abandonment_items(config, auth_manager, category_id)
if items:
recommendations.append({
"type": "high_abandonment",
"title": "High Abandonment Rate Items",
"description": "Items that are frequently added to cart but not ordered",
"items": items,
"impact": "high",
"effort": "medium",
"action": "Simplify the request process or improve the item description",
})
elif rec_type == "slow_fulfillment":
items = _get_slow_fulfillment_items(config, auth_manager, category_id)
if items:
recommendations.append({
"type": "slow_fulfillment",
"title": "Slow Fulfillment Items",
"description": "Items that take longer than average to fulfill",
"items": items,
"impact": "high",
"effort": "high",
"action": "Review the fulfillment process and identify bottlenecks",
})
elif rec_type == "description_quality":
items = _get_poor_description_items(config, auth_manager, category_id)
if items:
recommendations.append({
"type": "description_quality",
"title": "Poor Description Quality",
"description": "Items with missing, short, or low-quality descriptions",
"items": items,
"impact": "medium",
"effort": "low",
"action": "Improve the descriptions to better explain the item's purpose and benefits",
})
return {
"success": True,
"recommendations": recommendations,
}
except Exception as e:
logger.error(f"Error getting optimization recommendations: {e}")
return {
"success": False,
"message": f"Error getting optimization recommendations: {str(e)}",
"recommendations": [],
}
def update_catalog_item(
config: ServerConfig, auth_manager: AuthManager, params: UpdateCatalogItemParams
) -> Dict:
"""
Update a catalog item.
Args:
config: The server configuration
auth_manager: The authentication manager
params: The parameters for updating the catalog item
Returns:
A dictionary containing the result of the update operation
"""
logger.info(f"Updating catalog item: {params.item_id}")
try:
# Build the request body with only the provided parameters
body = {}
if params.name is not None:
body["name"] = params.name
if params.short_description is not None:
body["short_description"] = params.short_description
if params.description is not None:
body["description"] = params.description
if params.category is not None:
body["category"] = params.category
if params.price is not None:
body["price"] = params.price
if params.active is not None:
body["active"] = str(params.active).lower()
if params.order is not None:
body["order"] = str(params.order)
# Make the API request
url = f"{config.instance_url}/api/now/table/sc_cat_item/{params.item_id}"
headers = auth_manager.get_headers()
headers["Content-Type"] = "application/json"
response = requests.patch(url, headers=headers, json=body)
response.raise_for_status()
return {
"success": True,
"message": "Catalog item updated successfully",
"data": response.json()["result"],
}
except Exception as e:
logger.error(f"Error updating catalog item: {e}")
return {
"success": False,
"message": f"Error updating catalog item: {str(e)}",
"data": None,
}
def _get_inactive_items(
config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
) -> List[Dict]:
"""
Get inactive catalog items.
Args:
config: The server configuration
auth_manager: The authentication manager
category_id: Optional category ID to filter by
Returns:
A list of inactive catalog items
"""
try:
# Build the query
query = "active=false"
if category_id:
query += f"^category={category_id}"
# Make the API request
url = f"{config.instance_url}/api/now/table/sc_cat_item"
headers = auth_manager.get_headers()
params = {
"sysparm_query": query,
"sysparm_fields": "sys_id,name,short_description,category",
"sysparm_limit": "50",
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()["result"]
except Exception as e:
logger.error(f"Error getting inactive items: {e}")
return []
def _get_low_usage_items(
config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
) -> List[Dict]:
"""
Get catalog items with low usage.
Args:
config: The server configuration
auth_manager: The authentication manager
category_id: Optional category ID to filter by
Returns:
A list of catalog items with low usage
"""
try:
# Build the query
query = "active=true"
if category_id:
query += f"^category={category_id}"
# Make the API request
url = f"{config.instance_url}/api/now/table/sc_cat_item"
headers = auth_manager.get_headers()
params = {
"sysparm_query": query,
"sysparm_fields": "sys_id,name,short_description,category",
"sysparm_limit": "50",
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
# In a real implementation, we would query the request table to get actual usage data
# For this example, we'll simulate low usage with random data
items = response.json()["result"]
# Select a random subset of items to mark as low usage
low_usage_items = random.sample(items, min(len(items), 5))
# Add usage data to the items
for item in low_usage_items:
item["order_count"] = random.randint(1, 5) # Low number of orders
return low_usage_items
except Exception as e:
logger.error(f"Error getting low usage items: {e}")
return []
def _get_high_abandonment_items(
config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
) -> List[Dict]:
"""
Get catalog items with high abandonment rates.
Args:
config: The server configuration
auth_manager: The authentication manager
category_id: Optional category ID to filter by
Returns:
A list of catalog items with high abandonment rates
"""
try:
# Build the query
query = "active=true"
if category_id:
query += f"^category={category_id}"
# Make the API request
url = f"{config.instance_url}/api/now/table/sc_cat_item"
headers = auth_manager.get_headers()
params = {
"sysparm_query": query,
"sysparm_fields": "sys_id,name,short_description,category",
"sysparm_limit": "50",
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
# In a real implementation, we would query the request table to get actual abandonment data
# For this example, we'll simulate high abandonment with random data
items = response.json()["result"]
# Select a random subset of items to mark as high abandonment
high_abandonment_items = random.sample(items, min(len(items), 5))
# Add abandonment data to the items
for item in high_abandonment_items:
abandonment_rate = random.randint(40, 80) # High abandonment rate (40-80%)
cart_adds = random.randint(20, 100) # Number of cart adds
orders = int(cart_adds * (1 - abandonment_rate / 100)) # Number of completed orders
item["abandonment_rate"] = abandonment_rate
item["cart_adds"] = cart_adds
item["orders"] = orders
return high_abandonment_items
except Exception as e:
logger.error(f"Error getting high abandonment items: {e}")
return []
def _get_slow_fulfillment_items(
config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
) -> List[Dict]:
"""
Get catalog items with slow fulfillment times.
Args:
config: The server configuration
auth_manager: The authentication manager
category_id: Optional category ID to filter by
Returns:
A list of catalog items with slow fulfillment times
"""
try:
# Build the query
query = "active=true"
if category_id:
query += f"^category={category_id}"
# Make the API request
url = f"{config.instance_url}/api/now/table/sc_cat_item"
headers = auth_manager.get_headers()
params = {
"sysparm_query": query,
"sysparm_fields": "sys_id,name,short_description,category",
"sysparm_limit": "50",
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
# In a real implementation, we would query the request table to get actual fulfillment data
# For this example, we'll simulate slow fulfillment with random data
items = response.json()["result"]
# Select a random subset of items to mark as slow fulfillment
slow_fulfillment_items = random.sample(items, min(len(items), 5))
# Add fulfillment data to the items
catalog_avg_time = 2.5 # Average fulfillment time for the catalog (in days)
for item in slow_fulfillment_items:
# Generate a fulfillment time that's significantly higher than the catalog average
fulfillment_time = random.uniform(5.0, 10.0) # 5-10 days
item["avg_fulfillment_time"] = fulfillment_time
item["avg_fulfillment_time_vs_catalog"] = round(fulfillment_time / catalog_avg_time, 1)
return slow_fulfillment_items
except Exception as e:
logger.error(f"Error getting slow fulfillment items: {e}")
return []
def _get_poor_description_items(
config: ServerConfig, auth_manager: AuthManager, category_id: Optional[str] = None
) -> List[Dict]:
"""
Get catalog items with poor description quality.
Args:
config: The server configuration
auth_manager: The authentication manager
category_id: Optional category ID to filter by
Returns:
A list of catalog items with poor description quality
"""
try:
# Build the query
query = "active=true"
if category_id:
query += f"^category={category_id}"
# Make the API request
url = f"{config.instance_url}/api/now/table/sc_cat_item"
headers = auth_manager.get_headers()
params = {
"sysparm_query": query,
"sysparm_fields": "sys_id,name,short_description,category",
"sysparm_limit": "50",
}
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
items = response.json()["result"]
poor_description_items = []
# Analyze each item's description quality
for item in items:
description = item.get("short_description", "")
quality_issues = []
quality_score = 100 # Start with perfect score
# Check for empty description
if not description:
quality_issues.append("Missing description")
quality_score = 0
else:
# Check for short description
if len(description) < 30:
quality_issues.append("Description too short")
quality_issues.append("Lacks detail")
quality_score -= 70
# Check for instructional language instead of descriptive
if "click here" in description.lower() or "request this" in description.lower():
quality_issues.append("Uses instructional language instead of descriptive")
quality_score -= 50
# Check for vague terms
vague_terms = ["etc", "and more", "and so on", "stuff", "things"]
if any(term in description.lower() for term in vague_terms):
quality_issues.append("Contains vague terms")
quality_score -= 30
# Ensure score is between 0 and 100
quality_score = max(0, min(100, quality_score))
# Add to poor description items if quality is below threshold
if quality_score < 80:
item["description_quality"] = quality_score
item["quality_issues"] = quality_issues
poor_description_items.append(item)
return poor_description_items
except Exception as e:
logger.error(f"Error getting poor description items: {e}")
return []
```
--------------------------------------------------------------------------------
/tests/test_changeset_tools.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the changeset tools.
This module contains tests for the changeset tools in the ServiceNow MCP server.
"""
import unittest
from unittest.mock import MagicMock, patch
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.tools.changeset_tools import (
AddFileToChangesetParams,
CommitChangesetParams,
CreateChangesetParams,
GetChangesetDetailsParams,
ListChangesetsParams,
PublishChangesetParams,
UpdateChangesetParams,
add_file_to_changeset,
commit_changeset,
create_changeset,
get_changeset_details,
list_changesets,
publish_changeset,
update_changeset,
)
from servicenow_mcp.utils.config import ServerConfig, AuthConfig, AuthType, BasicAuthConfig
class TestChangesetTools(unittest.TestCase):
"""Tests for the changeset tools."""
def setUp(self):
"""Set up test fixtures."""
auth_config = AuthConfig(
type=AuthType.BASIC,
basic=BasicAuthConfig(
username="test_user",
password="test_password"
)
)
self.server_config = ServerConfig(
instance_url="https://test.service-now.com",
auth=auth_config,
)
self.auth_manager = MagicMock(spec=AuthManager)
self.auth_manager.get_headers.return_value = {"Authorization": "Bearer test"}
@patch("servicenow_mcp.tools.changeset_tools.requests.get")
def test_list_changesets(self, mock_get):
"""Test listing changesets."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "123",
"name": "Test Changeset",
"state": "in_progress",
"application": "Test App",
"developer": "test.user",
}
]
}
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
# Call the function
params = {
"limit": 10,
"offset": 0,
"state": "in_progress",
"application": "Test App",
"developer": "test.user",
}
result = list_changesets(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(len(result["changesets"]), 1)
self.assertEqual(result["changesets"][0]["sys_id"], "123")
self.assertEqual(result["changesets"][0]["name"], "Test Changeset")
# Verify the API call
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_set")
self.assertEqual(kwargs["headers"], {"Authorization": "Bearer test"})
self.assertEqual(kwargs["params"]["sysparm_limit"], 10)
self.assertEqual(kwargs["params"]["sysparm_offset"], 0)
self.assertIn("sysparm_query", kwargs["params"])
self.assertIn("state=in_progress", kwargs["params"]["sysparm_query"])
self.assertIn("application=Test App", kwargs["params"]["sysparm_query"])
self.assertIn("developer=test.user", kwargs["params"]["sysparm_query"])
@patch("servicenow_mcp.tools.changeset_tools.requests.get")
def test_get_changeset_details(self, mock_get):
"""Test getting changeset details."""
# Mock responses
mock_changeset_response = MagicMock()
mock_changeset_response.json.return_value = {
"result": {
"sys_id": "123",
"name": "Test Changeset",
"state": "in_progress",
"application": "Test App",
"developer": "test.user",
}
}
mock_changeset_response.raise_for_status.return_value = None
mock_changes_response = MagicMock()
mock_changes_response.json.return_value = {
"result": [
{
"sys_id": "456",
"name": "test_file.py",
"type": "file",
"update_set": "123",
}
]
}
mock_changes_response.raise_for_status.return_value = None
# Set up the mock to return different responses for different URLs
def side_effect(*args, **kwargs):
url = args[0]
if "sys_update_set" in url:
return mock_changeset_response
elif "sys_update_xml" in url:
return mock_changes_response
return None
mock_get.side_effect = side_effect
# Call the function
params = {"changeset_id": "123"}
result = get_changeset_details(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["changeset"]["sys_id"], "123")
self.assertEqual(result["changeset"]["name"], "Test Changeset")
self.assertEqual(len(result["changes"]), 1)
self.assertEqual(result["changes"][0]["sys_id"], "456")
self.assertEqual(result["changes"][0]["name"], "test_file.py")
# Verify the API calls
self.assertEqual(mock_get.call_count, 2)
first_call_args, first_call_kwargs = mock_get.call_args_list[0]
self.assertEqual(
first_call_args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
)
self.assertEqual(first_call_kwargs["headers"], {"Authorization": "Bearer test"})
second_call_args, second_call_kwargs = mock_get.call_args_list[1]
self.assertEqual(
second_call_args[0], "https://test.service-now.com/api/now/table/sys_update_xml"
)
self.assertEqual(second_call_kwargs["headers"], {"Authorization": "Bearer test"})
self.assertEqual(second_call_kwargs["params"]["sysparm_query"], "update_set=123")
@patch("servicenow_mcp.tools.changeset_tools.requests.post")
def test_create_changeset(self, mock_post):
"""Test creating a changeset."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "123",
"name": "Test Changeset",
"application": "Test App",
"developer": "test.user",
}
}
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
# Call the function
params = {
"name": "Test Changeset",
"application": "Test App",
"developer": "test.user",
"description": "Test description",
}
result = create_changeset(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["changeset"]["sys_id"], "123")
self.assertEqual(result["changeset"]["name"], "Test Changeset")
self.assertEqual(result["message"], "Changeset created successfully")
# Verify the API call
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_set")
self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
self.assertEqual(kwargs["json"]["name"], "Test Changeset")
self.assertEqual(kwargs["json"]["application"], "Test App")
self.assertEqual(kwargs["json"]["developer"], "test.user")
self.assertEqual(kwargs["json"]["description"], "Test description")
@patch("servicenow_mcp.tools.changeset_tools.requests.patch")
def test_update_changeset(self, mock_patch):
"""Test updating a changeset."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "123",
"name": "Updated Changeset",
"state": "in_progress",
"application": "Test App",
"developer": "test.user",
}
}
mock_response.raise_for_status.return_value = None
mock_patch.return_value = mock_response
# Call the function
params = {
"changeset_id": "123",
"name": "Updated Changeset",
"state": "in_progress",
}
result = update_changeset(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["changeset"]["sys_id"], "123")
self.assertEqual(result["changeset"]["name"], "Updated Changeset")
self.assertEqual(result["message"], "Changeset updated successfully")
# Verify the API call
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(
args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
)
self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
self.assertEqual(kwargs["json"]["name"], "Updated Changeset")
self.assertEqual(kwargs["json"]["state"], "in_progress")
@patch("servicenow_mcp.tools.changeset_tools.requests.patch")
def test_commit_changeset(self, mock_patch):
"""Test committing a changeset."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "123",
"name": "Test Changeset",
"state": "complete",
"application": "Test App",
"developer": "test.user",
}
}
mock_response.raise_for_status.return_value = None
mock_patch.return_value = mock_response
# Call the function
params = {
"changeset_id": "123",
"commit_message": "Commit message",
}
result = commit_changeset(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["changeset"]["sys_id"], "123")
self.assertEqual(result["changeset"]["state"], "complete")
self.assertEqual(result["message"], "Changeset committed successfully")
# Verify the API call
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(
args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
)
self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
self.assertEqual(kwargs["json"]["state"], "complete")
self.assertEqual(kwargs["json"]["description"], "Commit message")
@patch("servicenow_mcp.tools.changeset_tools.requests.patch")
def test_publish_changeset(self, mock_patch):
"""Test publishing a changeset."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "123",
"name": "Test Changeset",
"state": "published",
"application": "Test App",
"developer": "test.user",
}
}
mock_response.raise_for_status.return_value = None
mock_patch.return_value = mock_response
# Call the function
params = {
"changeset_id": "123",
"publish_notes": "Publish notes",
}
result = publish_changeset(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["changeset"]["sys_id"], "123")
self.assertEqual(result["changeset"]["state"], "published")
self.assertEqual(result["message"], "Changeset published successfully")
# Verify the API call
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(
args[0], "https://test.service-now.com/api/now/table/sys_update_set/123"
)
self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
self.assertEqual(kwargs["json"]["state"], "published")
self.assertEqual(kwargs["json"]["description"], "Publish notes")
@patch("servicenow_mcp.tools.changeset_tools.requests.post")
def test_add_file_to_changeset(self, mock_post):
"""Test adding a file to a changeset."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "456",
"name": "test_file.py",
"type": "file",
"update_set": "123",
"payload": "print('Hello, world!')",
}
}
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response
# Call the function
params = {
"changeset_id": "123",
"file_path": "test_file.py",
"file_content": "print('Hello, world!')",
}
result = add_file_to_changeset(self.auth_manager, self.server_config, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(result["file"]["sys_id"], "456")
self.assertEqual(result["file"]["name"], "test_file.py")
self.assertEqual(result["message"], "File added to changeset successfully")
# Verify the API call
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(args[0], "https://test.service-now.com/api/now/table/sys_update_xml")
self.assertEqual(kwargs["headers"]["Authorization"], "Bearer test")
self.assertEqual(kwargs["headers"]["Content-Type"], "application/json")
self.assertEqual(kwargs["json"]["update_set"], "123")
self.assertEqual(kwargs["json"]["name"], "test_file.py")
self.assertEqual(kwargs["json"]["payload"], "print('Hello, world!')")
self.assertEqual(kwargs["json"]["type"], "file")
class TestChangesetToolsParams(unittest.TestCase):
"""Tests for the changeset tools parameter classes."""
def test_list_changesets_params(self):
"""Test ListChangesetsParams."""
params = ListChangesetsParams(
limit=20,
offset=10,
state="in_progress",
application="Test App",
developer="test.user",
timeframe="recent",
query="name=test",
)
self.assertEqual(params.limit, 20)
self.assertEqual(params.offset, 10)
self.assertEqual(params.state, "in_progress")
self.assertEqual(params.application, "Test App")
self.assertEqual(params.developer, "test.user")
self.assertEqual(params.timeframe, "recent")
self.assertEqual(params.query, "name=test")
def test_get_changeset_details_params(self):
"""Test GetChangesetDetailsParams."""
params = GetChangesetDetailsParams(changeset_id="123")
self.assertEqual(params.changeset_id, "123")
def test_create_changeset_params(self):
"""Test CreateChangesetParams."""
params = CreateChangesetParams(
name="Test Changeset",
description="Test description",
application="Test App",
developer="test.user",
)
self.assertEqual(params.name, "Test Changeset")
self.assertEqual(params.description, "Test description")
self.assertEqual(params.application, "Test App")
self.assertEqual(params.developer, "test.user")
def test_update_changeset_params(self):
"""Test UpdateChangesetParams."""
params = UpdateChangesetParams(
changeset_id="123",
name="Updated Changeset",
description="Updated description",
state="in_progress",
developer="test.user",
)
self.assertEqual(params.changeset_id, "123")
self.assertEqual(params.name, "Updated Changeset")
self.assertEqual(params.description, "Updated description")
self.assertEqual(params.state, "in_progress")
self.assertEqual(params.developer, "test.user")
def test_commit_changeset_params(self):
"""Test CommitChangesetParams."""
params = CommitChangesetParams(
changeset_id="123",
commit_message="Commit message",
)
self.assertEqual(params.changeset_id, "123")
self.assertEqual(params.commit_message, "Commit message")
def test_publish_changeset_params(self):
"""Test PublishChangesetParams."""
params = PublishChangesetParams(
changeset_id="123",
publish_notes="Publish notes",
)
self.assertEqual(params.changeset_id, "123")
self.assertEqual(params.publish_notes, "Publish notes")
def test_add_file_to_changeset_params(self):
"""Test AddFileToChangesetParams."""
params = AddFileToChangesetParams(
changeset_id="123",
file_path="test_file.py",
file_content="print('Hello, world!')",
)
self.assertEqual(params.changeset_id, "123")
self.assertEqual(params.file_path, "test_file.py")
self.assertEqual(params.file_content, "print('Hello, world!')")
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/tests/test_workflow_tools.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the workflow management tools.
"""
import json
import unittest
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
import requests
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.tools.workflow_tools import (
list_workflows,
get_workflow_details,
list_workflow_versions,
get_workflow_activities,
create_workflow,
update_workflow,
activate_workflow,
deactivate_workflow,
add_workflow_activity,
update_workflow_activity,
delete_workflow_activity,
reorder_workflow_activities,
)
from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
class TestWorkflowTools(unittest.TestCase):
"""Tests for the workflow management tools."""
def setUp(self):
"""Set up test fixtures."""
self.auth_config = AuthConfig(
type=AuthType.BASIC,
basic=BasicAuthConfig(username="test_user", password="test_password"),
)
self.server_config = ServerConfig(
instance_url="https://test.service-now.com",
auth=self.auth_config,
)
self.auth_manager = AuthManager(self.auth_config)
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_list_workflows_success(self, mock_get):
"""Test listing workflows successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "workflow123",
"name": "Incident Approval",
"description": "Workflow for incident approval",
"active": "true",
"table": "incident",
},
{
"sys_id": "workflow456",
"name": "Change Request",
"description": "Workflow for change requests",
"active": "true",
"table": "change_request",
},
]
}
mock_response.headers = {"X-Total-Count": "2"}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
# Call the function
params = {
"limit": 10,
"active": True,
}
result = list_workflows(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(len(result["workflows"]), 2)
self.assertEqual(result["count"], 2)
self.assertEqual(result["total"], 2)
self.assertEqual(result["workflows"][0]["sys_id"], "workflow123")
self.assertEqual(result["workflows"][1]["sys_id"], "workflow456")
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_list_workflows_empty_result(self, mock_get):
"""Test listing workflows with empty result."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {"result": []}
mock_response.headers = {"X-Total-Count": "0"}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
# Call the function
params = {
"limit": 10,
"active": True,
}
result = list_workflows(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(len(result["workflows"]), 0)
self.assertEqual(result["count"], 0)
self.assertEqual(result["total"], 0)
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_list_workflows_error(self, mock_get):
"""Test listing workflows with error."""
# Mock the response
mock_get.side_effect = requests.RequestException("API Error")
# Call the function
params = {
"limit": 10,
"active": True,
}
result = list_workflows(self.auth_manager, self.server_config, params)
# Verify the result
self.assertIn("error", result)
self.assertEqual(result["error"], "API Error")
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_get_workflow_details_success(self, mock_get):
"""Test getting workflow details successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "workflow123",
"name": "Incident Approval",
"description": "Workflow for incident approval",
"active": "true",
"table": "incident",
}
}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
# Call the function
params = {
"workflow_id": "workflow123",
}
result = get_workflow_details(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["workflow"]["sys_id"], "workflow123")
self.assertEqual(result["workflow"]["name"], "Incident Approval")
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_get_workflow_details_error(self, mock_get):
"""Test getting workflow details with error."""
# Mock the response
mock_get.side_effect = requests.RequestException("API Error")
# Call the function
params = {
"workflow_id": "workflow123",
}
result = get_workflow_details(self.auth_manager, self.server_config, params)
# Verify the result
self.assertIn("error", result)
self.assertEqual(result["error"], "API Error")
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_list_workflow_versions_success(self, mock_get):
"""Test listing workflow versions successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "version123",
"workflow": "workflow123",
"name": "Version 1",
"version": "1",
"published": "true",
},
{
"sys_id": "version456",
"workflow": "workflow123",
"name": "Version 2",
"version": "2",
"published": "true",
},
]
}
mock_response.headers = {"X-Total-Count": "2"}
mock_response.raise_for_status = MagicMock()
mock_get.return_value = mock_response
# Call the function
params = {
"workflow_id": "workflow123",
"limit": 10,
}
result = list_workflow_versions(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(len(result["versions"]), 2)
self.assertEqual(result["count"], 2)
self.assertEqual(result["total"], 2)
self.assertEqual(result["versions"][0]["sys_id"], "version123")
self.assertEqual(result["versions"][1]["sys_id"], "version456")
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
def test_get_workflow_activities_success(self, mock_get):
"""Test getting workflow activities successfully."""
# Mock the responses for version query and activities query
version_response = MagicMock()
version_response.json.return_value = {
"result": [
{
"sys_id": "version123",
"workflow": "workflow123",
"name": "Version 1",
"version": "1",
"published": "true",
}
]
}
version_response.raise_for_status = MagicMock()
activities_response = MagicMock()
activities_response.json.return_value = {
"result": [
{
"sys_id": "activity123",
"workflow_version": "version123",
"name": "Approval",
"order": "100",
"activity_definition": "approval",
},
{
"sys_id": "activity456",
"workflow_version": "version123",
"name": "Notification",
"order": "200",
"activity_definition": "notification",
},
]
}
activities_response.raise_for_status = MagicMock()
# Configure the mock to return different responses for different URLs
def side_effect(*args, **kwargs):
url = args[0] if args else kwargs.get('url', '')
if 'wf_workflow_version' in url:
return version_response
elif 'wf_activity' in url:
return activities_response
return MagicMock()
mock_get.side_effect = side_effect
# Call the function
params = {
"workflow_id": "workflow123",
}
result = get_workflow_activities(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(len(result["activities"]), 2)
self.assertEqual(result["count"], 2)
self.assertEqual(result["workflow_id"], "workflow123")
self.assertEqual(result["version_id"], "version123")
self.assertEqual(result["activities"][0]["sys_id"], "activity123")
self.assertEqual(result["activities"][1]["sys_id"], "activity456")
@patch("servicenow_mcp.tools.workflow_tools.requests.post")
def test_create_workflow_success(self, mock_post):
"""Test creating a workflow successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "workflow789",
"name": "New Workflow",
"description": "A new workflow",
"active": "true",
"table": "incident",
}
}
mock_response.raise_for_status = MagicMock()
mock_post.return_value = mock_response
# Call the function
params = {
"name": "New Workflow",
"description": "A new workflow",
"table": "incident",
"active": True,
}
result = create_workflow(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["workflow"]["sys_id"], "workflow789")
self.assertEqual(result["workflow"]["name"], "New Workflow")
self.assertEqual(result["message"], "Workflow created successfully")
@patch("servicenow_mcp.tools.workflow_tools.requests.patch")
def test_update_workflow_success(self, mock_patch):
"""Test updating a workflow successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "workflow123",
"name": "Updated Workflow",
"description": "Updated description",
"active": "true",
"table": "incident",
}
}
mock_response.raise_for_status = MagicMock()
mock_patch.return_value = mock_response
# Call the function
params = {
"workflow_id": "workflow123",
"name": "Updated Workflow",
"description": "Updated description",
}
result = update_workflow(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["workflow"]["sys_id"], "workflow123")
self.assertEqual(result["workflow"]["name"], "Updated Workflow")
self.assertEqual(result["message"], "Workflow updated successfully")
@patch("servicenow_mcp.tools.workflow_tools.requests.patch")
def test_activate_workflow_success(self, mock_patch):
"""Test activating a workflow successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "workflow123",
"name": "Incident Approval",
"active": "true",
}
}
mock_response.raise_for_status = MagicMock()
mock_patch.return_value = mock_response
# Call the function
params = {
"workflow_id": "workflow123",
}
result = activate_workflow(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["workflow"]["sys_id"], "workflow123")
self.assertEqual(result["workflow"]["active"], "true")
self.assertEqual(result["message"], "Workflow activated successfully")
@patch("servicenow_mcp.tools.workflow_tools.requests.patch")
def test_deactivate_workflow_success(self, mock_patch):
"""Test deactivating a workflow successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "workflow123",
"name": "Incident Approval",
"active": "false",
}
}
mock_response.raise_for_status = MagicMock()
mock_patch.return_value = mock_response
# Call the function
params = {
"workflow_id": "workflow123",
}
result = deactivate_workflow(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["workflow"]["sys_id"], "workflow123")
self.assertEqual(result["workflow"]["active"], "false")
self.assertEqual(result["message"], "Workflow deactivated successfully")
@patch("servicenow_mcp.tools.workflow_tools.requests.get")
@patch("servicenow_mcp.tools.workflow_tools.requests.post")
def test_add_workflow_activity_success(self, mock_post, mock_get):
"""Test adding a workflow activity successfully."""
# Mock the responses for version query and activity creation
version_response = MagicMock()
version_response.json.return_value = {
"result": [
{
"sys_id": "version123",
"workflow": "workflow123",
"name": "Version 1",
"version": "1",
"published": "false",
}
]
}
version_response.raise_for_status = MagicMock()
order_response = MagicMock()
order_response.json.return_value = {
"result": [
{
"sys_id": "activity123",
"order": "100",
}
]
}
order_response.raise_for_status = MagicMock()
activity_response = MagicMock()
activity_response.json.return_value = {
"result": {
"sys_id": "activity789",
"workflow_version": "version123",
"name": "New Activity",
"order": "200",
"activity_definition": "approval",
}
}
activity_response.raise_for_status = MagicMock()
# Configure the mocks
def get_side_effect(*args, **kwargs):
url = args[0] if args else kwargs.get('url', '')
if 'wf_workflow_version' in url:
return version_response
elif 'wf_activity' in url:
return order_response
return MagicMock()
mock_get.side_effect = get_side_effect
mock_post.return_value = activity_response
# Call the function
params = {
"workflow_id": "workflow123",
"name": "New Activity",
"activity_type": "approval",
"description": "A new approval activity",
}
result = add_workflow_activity(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["activity"]["sys_id"], "activity789")
self.assertEqual(result["activity"]["name"], "New Activity")
self.assertEqual(result["workflow_id"], "workflow123")
self.assertEqual(result["version_id"], "version123")
self.assertEqual(result["message"], "Activity added successfully")
@patch("servicenow_mcp.tools.workflow_tools.requests.patch")
def test_update_workflow_activity_success(self, mock_patch):
"""Test updating a workflow activity successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "activity123",
"name": "Updated Activity",
"description": "Updated description",
}
}
mock_response.raise_for_status = MagicMock()
mock_patch.return_value = mock_response
# Call the function
params = {
"activity_id": "activity123",
"name": "Updated Activity",
"description": "Updated description",
}
result = update_workflow_activity(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["activity"]["sys_id"], "activity123")
self.assertEqual(result["activity"]["name"], "Updated Activity")
self.assertEqual(result["message"], "Activity updated successfully")
@patch("servicenow_mcp.tools.workflow_tools.requests.delete")
def test_delete_workflow_activity_success(self, mock_delete):
"""Test deleting a workflow activity successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.raise_for_status = MagicMock()
mock_delete.return_value = mock_response
# Call the function
params = {
"activity_id": "activity123",
}
result = delete_workflow_activity(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["message"], "Activity deleted successfully")
self.assertEqual(result["activity_id"], "activity123")
@patch("servicenow_mcp.tools.workflow_tools.requests.patch")
def test_reorder_workflow_activities_success(self, mock_patch):
"""Test reordering workflow activities successfully."""
# Mock the response
mock_response = MagicMock()
mock_response.json.return_value = {"result": {}}
mock_response.raise_for_status = MagicMock()
mock_patch.return_value = mock_response
# Call the function
params = {
"workflow_id": "workflow123",
"activity_ids": ["activity1", "activity2", "activity3"],
}
result = reorder_workflow_activities(self.auth_manager, self.server_config, params)
# Verify the result
self.assertEqual(result["message"], "Activities reordered")
self.assertEqual(result["workflow_id"], "workflow123")
self.assertEqual(len(result["results"]), 3)
self.assertTrue(all(item["success"] for item in result["results"]))
self.assertEqual(result["results"][0]["activity_id"], "activity1")
self.assertEqual(result["results"][0]["new_order"], 100)
self.assertEqual(result["results"][1]["activity_id"], "activity2")
self.assertEqual(result["results"][1]["new_order"], 200)
self.assertEqual(result["results"][2]["activity_id"], "activity3")
self.assertEqual(result["results"][2]["new_order"], 300)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/catalog_tools.py:
--------------------------------------------------------------------------------
```python
"""
Service Catalog tools for the ServiceNow MCP server.
This module provides tools for querying and viewing the service catalog in ServiceNow.
"""
import logging
from typing import Any, Dict, List, Optional
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
class ListCatalogItemsParams(BaseModel):
"""Parameters for listing service catalog items."""
limit: int = Field(10, description="Maximum number of catalog items to return")
offset: int = Field(0, description="Offset for pagination")
category: Optional[str] = Field(None, description="Filter by category")
query: Optional[str] = Field(None, description="Search query for catalog items")
active: bool = Field(True, description="Whether to only return active catalog items")
class GetCatalogItemParams(BaseModel):
"""Parameters for getting a specific service catalog item."""
item_id: str = Field(..., description="Catalog item ID or sys_id")
class ListCatalogCategoriesParams(BaseModel):
"""Parameters for listing service catalog categories."""
limit: int = Field(10, description="Maximum number of categories to return")
offset: int = Field(0, description="Offset for pagination")
query: Optional[str] = Field(None, description="Search query for categories")
active: bool = Field(True, description="Whether to only return active categories")
class CatalogResponse(BaseModel):
"""Response from catalog operations."""
success: bool = Field(..., description="Whether the operation was successful")
message: str = Field(..., description="Message describing the result")
data: Optional[Dict[str, Any]] = Field(None, description="Response data")
class CreateCatalogCategoryParams(BaseModel):
"""Parameters for creating a new service catalog category."""
title: str = Field(..., description="Title of the category")
description: Optional[str] = Field(None, description="Description of the category")
parent: Optional[str] = Field(None, description="Parent category sys_id")
icon: Optional[str] = Field(None, description="Icon for the category")
active: bool = Field(True, description="Whether the category is active")
order: Optional[int] = Field(None, description="Order of the category")
class UpdateCatalogCategoryParams(BaseModel):
"""Parameters for updating a service catalog category."""
category_id: str = Field(..., description="Category ID or sys_id")
title: Optional[str] = Field(None, description="Title of the category")
description: Optional[str] = Field(None, description="Description of the category")
parent: Optional[str] = Field(None, description="Parent category sys_id")
icon: Optional[str] = Field(None, description="Icon for the category")
active: Optional[bool] = Field(None, description="Whether the category is active")
order: Optional[int] = Field(None, description="Order of the category")
class MoveCatalogItemsParams(BaseModel):
"""Parameters for moving catalog items between categories."""
item_ids: List[str] = Field(..., description="List of catalog item IDs to move")
target_category_id: str = Field(..., description="Target category ID to move items to")
def list_catalog_items(
config: ServerConfig,
auth_manager: AuthManager,
params: ListCatalogItemsParams,
) -> Dict[str, Any]:
"""
List service catalog items from ServiceNow.
Args:
config: Server configuration
auth_manager: Authentication manager
params: Parameters for listing catalog items
Returns:
Dictionary containing catalog items and metadata
"""
logger.info("Listing service catalog items")
# Build the API URL
url = f"{config.instance_url}/api/now/table/sc_cat_item"
# Prepare query parameters
query_params = {
"sysparm_limit": params.limit,
"sysparm_offset": params.offset,
"sysparm_display_value": "true",
"sysparm_exclude_reference_link": "true",
}
# Add filters
filters = []
if params.active:
filters.append("active=true")
if params.category:
filters.append(f"category={params.category}")
if params.query:
filters.append(f"short_descriptionLIKE{params.query}^ORnameLIKE{params.query}")
if filters:
query_params["sysparm_query"] = "^".join(filters)
# Make the API request
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
try:
response = requests.get(url, headers=headers, params=query_params)
response.raise_for_status()
# Process the response
result = response.json()
items = result.get("result", [])
# Format the response
formatted_items = []
for item in items:
formatted_items.append({
"sys_id": item.get("sys_id", ""),
"name": item.get("name", ""),
"short_description": item.get("short_description", ""),
"category": item.get("category", ""),
"price": item.get("price", ""),
"picture": item.get("picture", ""),
"active": item.get("active", ""),
"order": item.get("order", ""),
})
return {
"success": True,
"message": f"Retrieved {len(formatted_items)} catalog items",
"items": formatted_items,
"total": len(formatted_items),
"limit": params.limit,
"offset": params.offset,
}
except requests.exceptions.RequestException as e:
logger.error(f"Error listing catalog items: {str(e)}")
return {
"success": False,
"message": f"Error listing catalog items: {str(e)}",
"items": [],
"total": 0,
"limit": params.limit,
"offset": params.offset,
}
def get_catalog_item(
config: ServerConfig,
auth_manager: AuthManager,
params: GetCatalogItemParams,
) -> CatalogResponse:
"""
Get a specific service catalog item from ServiceNow.
Args:
config: Server configuration
auth_manager: Authentication manager
params: Parameters for getting a catalog item
Returns:
Response containing the catalog item details
"""
logger.info(f"Getting service catalog item: {params.item_id}")
# Build the API URL
url = f"{config.instance_url}/api/now/table/sc_cat_item/{params.item_id}"
# Prepare query parameters
query_params = {
"sysparm_display_value": "true",
"sysparm_exclude_reference_link": "true",
}
# Make the API request
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
try:
response = requests.get(url, headers=headers, params=query_params)
response.raise_for_status()
# Process the response
result = response.json()
item = result.get("result", {})
if not item:
return CatalogResponse(
success=False,
message=f"Catalog item not found: {params.item_id}",
data=None,
)
# Format the response
formatted_item = {
"sys_id": item.get("sys_id", ""),
"name": item.get("name", ""),
"short_description": item.get("short_description", ""),
"description": item.get("description", ""),
"category": item.get("category", ""),
"price": item.get("price", ""),
"picture": item.get("picture", ""),
"active": item.get("active", ""),
"order": item.get("order", ""),
"delivery_time": item.get("delivery_time", ""),
"availability": item.get("availability", ""),
"variables": get_catalog_item_variables(config, auth_manager, params.item_id),
}
return CatalogResponse(
success=True,
message=f"Retrieved catalog item: {item.get('name', '')}",
data=formatted_item,
)
except requests.exceptions.RequestException as e:
logger.error(f"Error getting catalog item: {str(e)}")
return CatalogResponse(
success=False,
message=f"Error getting catalog item: {str(e)}",
data=None,
)
def get_catalog_item_variables(
config: ServerConfig,
auth_manager: AuthManager,
item_id: str,
) -> List[Dict[str, Any]]:
"""
Get variables for a specific service catalog item.
Args:
config: Server configuration
auth_manager: Authentication manager
item_id: Catalog item ID or sys_id
Returns:
List of variables for the catalog item
"""
logger.info(f"Getting variables for catalog item: {item_id}")
# Build the API URL
url = f"{config.instance_url}/api/now/table/item_option_new"
# Prepare query parameters
query_params = {
"sysparm_query": f"cat_item={item_id}^ORDERBYorder",
"sysparm_display_value": "true",
"sysparm_exclude_reference_link": "true",
}
# Make the API request
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
try:
response = requests.get(url, headers=headers, params=query_params)
response.raise_for_status()
# Process the response
result = response.json()
variables = result.get("result", [])
# Format the response
formatted_variables = []
for variable in variables:
formatted_variables.append({
"sys_id": variable.get("sys_id", ""),
"name": variable.get("name", ""),
"label": variable.get("question_text", ""),
"type": variable.get("type", ""),
"mandatory": variable.get("mandatory", ""),
"default_value": variable.get("default_value", ""),
"help_text": variable.get("help_text", ""),
"order": variable.get("order", ""),
})
return formatted_variables
except requests.exceptions.RequestException as e:
logger.error(f"Error getting catalog item variables: {str(e)}")
return []
def list_catalog_categories(
config: ServerConfig,
auth_manager: AuthManager,
params: ListCatalogCategoriesParams,
) -> Dict[str, Any]:
"""
List service catalog categories from ServiceNow.
Args:
config: Server configuration
auth_manager: Authentication manager
params: Parameters for listing catalog categories
Returns:
Dictionary containing catalog categories and metadata
"""
logger.info("Listing service catalog categories")
# Build the API URL
url = f"{config.instance_url}/api/now/table/sc_category"
# Prepare query parameters
query_params = {
"sysparm_limit": params.limit,
"sysparm_offset": params.offset,
"sysparm_display_value": "true",
"sysparm_exclude_reference_link": "true",
}
# Add filters
filters = []
if params.active:
filters.append("active=true")
if params.query:
filters.append(f"titleLIKE{params.query}^ORdescriptionLIKE{params.query}")
if filters:
query_params["sysparm_query"] = "^".join(filters)
# Make the API request
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
try:
response = requests.get(url, headers=headers, params=query_params)
response.raise_for_status()
# Process the response
result = response.json()
categories = result.get("result", [])
# Format the response
formatted_categories = []
for category in categories:
formatted_categories.append({
"sys_id": category.get("sys_id", ""),
"title": category.get("title", ""),
"description": category.get("description", ""),
"parent": category.get("parent", ""),
"icon": category.get("icon", ""),
"active": category.get("active", ""),
"order": category.get("order", ""),
})
return {
"success": True,
"message": f"Retrieved {len(formatted_categories)} catalog categories",
"categories": formatted_categories,
"total": len(formatted_categories),
"limit": params.limit,
"offset": params.offset,
}
except requests.exceptions.RequestException as e:
logger.error(f"Error listing catalog categories: {str(e)}")
return {
"success": False,
"message": f"Error listing catalog categories: {str(e)}",
"categories": [],
"total": 0,
"limit": params.limit,
"offset": params.offset,
}
def create_catalog_category(
config: ServerConfig,
auth_manager: AuthManager,
params: CreateCatalogCategoryParams,
) -> CatalogResponse:
"""
Create a new service catalog category in ServiceNow.
Args:
config: Server configuration
auth_manager: Authentication manager
params: Parameters for creating a catalog category
Returns:
Response containing the result of the operation
"""
logger.info("Creating new service catalog category")
# Build the API URL
url = f"{config.instance_url}/api/now/table/sc_category"
# Prepare request body
body = {
"title": params.title,
}
if params.description is not None:
body["description"] = params.description
if params.parent is not None:
body["parent"] = params.parent
if params.icon is not None:
body["icon"] = params.icon
if params.active is not None:
body["active"] = str(params.active).lower()
if params.order is not None:
body["order"] = str(params.order)
# Make the API request
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
headers["Content-Type"] = "application/json"
try:
response = requests.post(url, headers=headers, json=body)
response.raise_for_status()
# Process the response
result = response.json()
category = result.get("result", {})
# Format the response
formatted_category = {
"sys_id": category.get("sys_id", ""),
"title": category.get("title", ""),
"description": category.get("description", ""),
"parent": category.get("parent", ""),
"icon": category.get("icon", ""),
"active": category.get("active", ""),
"order": category.get("order", ""),
}
return CatalogResponse(
success=True,
message=f"Created catalog category: {params.title}",
data=formatted_category,
)
except requests.exceptions.RequestException as e:
logger.error(f"Error creating catalog category: {str(e)}")
return CatalogResponse(
success=False,
message=f"Error creating catalog category: {str(e)}",
data=None,
)
def update_catalog_category(
config: ServerConfig,
auth_manager: AuthManager,
params: UpdateCatalogCategoryParams,
) -> CatalogResponse:
"""
Update an existing service catalog category in ServiceNow.
Args:
config: Server configuration
auth_manager: Authentication manager
params: Parameters for updating a catalog category
Returns:
Response containing the result of the operation
"""
logger.info(f"Updating service catalog category: {params.category_id}")
# Build the API URL
url = f"{config.instance_url}/api/now/table/sc_category/{params.category_id}"
# Prepare request body with only the provided parameters
body = {}
if params.title is not None:
body["title"] = params.title
if params.description is not None:
body["description"] = params.description
if params.parent is not None:
body["parent"] = params.parent
if params.icon is not None:
body["icon"] = params.icon
if params.active is not None:
body["active"] = str(params.active).lower()
if params.order is not None:
body["order"] = str(params.order)
# Make the API request
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
headers["Content-Type"] = "application/json"
try:
response = requests.patch(url, headers=headers, json=body)
response.raise_for_status()
# Process the response
result = response.json()
category = result.get("result", {})
# Format the response
formatted_category = {
"sys_id": category.get("sys_id", ""),
"title": category.get("title", ""),
"description": category.get("description", ""),
"parent": category.get("parent", ""),
"icon": category.get("icon", ""),
"active": category.get("active", ""),
"order": category.get("order", ""),
}
return CatalogResponse(
success=True,
message=f"Updated catalog category: {params.category_id}",
data=formatted_category,
)
except requests.exceptions.RequestException as e:
logger.error(f"Error updating catalog category: {str(e)}")
return CatalogResponse(
success=False,
message=f"Error updating catalog category: {str(e)}",
data=None,
)
def move_catalog_items(
config: ServerConfig,
auth_manager: AuthManager,
params: MoveCatalogItemsParams,
) -> CatalogResponse:
"""
Move catalog items to a different category.
Args:
config: Server configuration
auth_manager: Authentication manager
params: Parameters for moving catalog items
Returns:
Response containing the result of the operation
"""
logger.info(f"Moving {len(params.item_ids)} catalog items to category: {params.target_category_id}")
# Build the API URL
url = f"{config.instance_url}/api/now/table/sc_cat_item"
# Make the API request for each item
headers = auth_manager.get_headers()
headers["Accept"] = "application/json"
headers["Content-Type"] = "application/json"
success_count = 0
failed_items = []
try:
for item_id in params.item_ids:
item_url = f"{url}/{item_id}"
body = {
"category": params.target_category_id
}
try:
response = requests.patch(item_url, headers=headers, json=body)
response.raise_for_status()
success_count += 1
except requests.exceptions.RequestException as e:
logger.error(f"Error moving catalog item {item_id}: {str(e)}")
failed_items.append({"item_id": item_id, "error": str(e)})
# Prepare the response
if success_count == len(params.item_ids):
return CatalogResponse(
success=True,
message=f"Successfully moved {success_count} catalog items to category {params.target_category_id}",
data={"moved_items_count": success_count},
)
elif success_count > 0:
return CatalogResponse(
success=True,
message=f"Partially moved catalog items. {success_count} succeeded, {len(failed_items)} failed.",
data={
"moved_items_count": success_count,
"failed_items": failed_items,
},
)
else:
return CatalogResponse(
success=False,
message="Failed to move any catalog items",
data={"failed_items": failed_items},
)
except Exception as e:
logger.error(f"Error moving catalog items: {str(e)}")
return CatalogResponse(
success=False,
message=f"Error moving catalog items: {str(e)}",
data=None,
)
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/incident_tools.py:
--------------------------------------------------------------------------------
```python
"""
Incident tools for the ServiceNow MCP server.
This module provides tools for managing incidents in ServiceNow.
"""
import logging
from typing import Optional, List
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
class CreateIncidentParams(BaseModel):
"""Parameters for creating an incident."""
short_description: str = Field(..., description="Short description of the incident")
description: Optional[str] = Field(None, description="Detailed description of the incident")
caller_id: Optional[str] = Field(None, description="User who reported the incident")
category: Optional[str] = Field(None, description="Category of the incident")
subcategory: Optional[str] = Field(None, description="Subcategory of the incident")
priority: Optional[str] = Field(None, description="Priority of the incident")
impact: Optional[str] = Field(None, description="Impact of the incident")
urgency: Optional[str] = Field(None, description="Urgency of the incident")
assigned_to: Optional[str] = Field(None, description="User assigned to the incident")
assignment_group: Optional[str] = Field(None, description="Group assigned to the incident")
class UpdateIncidentParams(BaseModel):
"""Parameters for updating an incident."""
incident_id: str = Field(..., description="Incident ID or sys_id")
short_description: Optional[str] = Field(None, description="Short description of the incident")
description: Optional[str] = Field(None, description="Detailed description of the incident")
state: Optional[str] = Field(None, description="State of the incident")
category: Optional[str] = Field(None, description="Category of the incident")
subcategory: Optional[str] = Field(None, description="Subcategory of the incident")
priority: Optional[str] = Field(None, description="Priority of the incident")
impact: Optional[str] = Field(None, description="Impact of the incident")
urgency: Optional[str] = Field(None, description="Urgency of the incident")
assigned_to: Optional[str] = Field(None, description="User assigned to the incident")
assignment_group: Optional[str] = Field(None, description="Group assigned to the incident")
work_notes: Optional[str] = Field(None, description="Work notes to add to the incident")
close_notes: Optional[str] = Field(None, description="Close notes to add to the incident")
close_code: Optional[str] = Field(None, description="Close code for the incident")
class AddCommentParams(BaseModel):
"""Parameters for adding a comment to an incident."""
incident_id: str = Field(..., description="Incident ID or sys_id")
comment: str = Field(..., description="Comment to add to the incident")
is_work_note: bool = Field(False, description="Whether the comment is a work note")
class ResolveIncidentParams(BaseModel):
"""Parameters for resolving an incident."""
incident_id: str = Field(..., description="Incident ID or sys_id")
resolution_code: str = Field(..., description="Resolution code for the incident")
resolution_notes: str = Field(..., description="Resolution notes for the incident")
class ListIncidentsParams(BaseModel):
"""Parameters for listing incidents."""
limit: int = Field(10, description="Maximum number of incidents to return")
offset: int = Field(0, description="Offset for pagination")
state: Optional[str] = Field(None, description="Filter by incident state")
assigned_to: Optional[str] = Field(None, description="Filter by assigned user")
category: Optional[str] = Field(None, description="Filter by category")
query: Optional[str] = Field(None, description="Search query for incidents")
class GetIncidentByNumberParams(BaseModel):
"""Parameters for fetching an incident by its number."""
incident_number: str = Field(..., description="The number of the incident to fetch")
class IncidentResponse(BaseModel):
"""Response from incident operations."""
success: bool = Field(..., description="Whether the operation was successful")
message: str = Field(..., description="Message describing the result")
incident_id: Optional[str] = Field(None, description="ID of the affected incident")
incident_number: Optional[str] = Field(None, description="Number of the affected incident")
def create_incident(
config: ServerConfig,
auth_manager: AuthManager,
params: CreateIncidentParams,
) -> IncidentResponse:
"""
Create a new incident in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for creating the incident.
Returns:
Response with the created incident details.
"""
api_url = f"{config.api_url}/table/incident"
# Build request data
data = {
"short_description": params.short_description,
}
if params.description:
data["description"] = params.description
if params.caller_id:
data["caller_id"] = params.caller_id
if params.category:
data["category"] = params.category
if params.subcategory:
data["subcategory"] = params.subcategory
if params.priority:
data["priority"] = params.priority
if params.impact:
data["impact"] = params.impact
if params.urgency:
data["urgency"] = params.urgency
if params.assigned_to:
data["assigned_to"] = params.assigned_to
if params.assignment_group:
data["assignment_group"] = params.assignment_group
# Make request
try:
response = requests.post(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
return IncidentResponse(
success=True,
message="Incident created successfully",
incident_id=result.get("sys_id"),
incident_number=result.get("number"),
)
except requests.RequestException as e:
logger.error(f"Failed to create incident: {e}")
return IncidentResponse(
success=False,
message=f"Failed to create incident: {str(e)}",
)
def update_incident(
config: ServerConfig,
auth_manager: AuthManager,
params: UpdateIncidentParams,
) -> IncidentResponse:
"""
Update an existing incident in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for updating the incident.
Returns:
Response with the updated incident details.
"""
# Determine if incident_id is a number or sys_id
incident_id = params.incident_id
if len(incident_id) == 32 and all(c in "0123456789abcdef" for c in incident_id):
# This is likely a sys_id
api_url = f"{config.api_url}/table/incident/{incident_id}"
else:
# This is likely an incident number
# First, we need to get the sys_id
try:
query_url = f"{config.api_url}/table/incident"
query_params = {
"sysparm_query": f"number={incident_id}",
"sysparm_limit": 1,
}
response = requests.get(
query_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
if not result:
return IncidentResponse(
success=False,
message=f"Incident not found: {incident_id}",
)
incident_id = result[0].get("sys_id")
api_url = f"{config.api_url}/table/incident/{incident_id}"
except requests.RequestException as e:
logger.error(f"Failed to find incident: {e}")
return IncidentResponse(
success=False,
message=f"Failed to find incident: {str(e)}",
)
# Build request data
data = {}
if params.short_description:
data["short_description"] = params.short_description
if params.description:
data["description"] = params.description
if params.state:
data["state"] = params.state
if params.category:
data["category"] = params.category
if params.subcategory:
data["subcategory"] = params.subcategory
if params.priority:
data["priority"] = params.priority
if params.impact:
data["impact"] = params.impact
if params.urgency:
data["urgency"] = params.urgency
if params.assigned_to:
data["assigned_to"] = params.assigned_to
if params.assignment_group:
data["assignment_group"] = params.assignment_group
if params.work_notes:
data["work_notes"] = params.work_notes
if params.close_notes:
data["close_notes"] = params.close_notes
if params.close_code:
data["close_code"] = params.close_code
# Make request
try:
response = requests.put(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
return IncidentResponse(
success=True,
message="Incident updated successfully",
incident_id=result.get("sys_id"),
incident_number=result.get("number"),
)
except requests.RequestException as e:
logger.error(f"Failed to update incident: {e}")
return IncidentResponse(
success=False,
message=f"Failed to update incident: {str(e)}",
)
def add_comment(
config: ServerConfig,
auth_manager: AuthManager,
params: AddCommentParams,
) -> IncidentResponse:
"""
Add a comment to an incident in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for adding the comment.
Returns:
Response with the result of the operation.
"""
# Determine if incident_id is a number or sys_id
incident_id = params.incident_id
if len(incident_id) == 32 and all(c in "0123456789abcdef" for c in incident_id):
# This is likely a sys_id
api_url = f"{config.api_url}/table/incident/{incident_id}"
else:
# This is likely an incident number
# First, we need to get the sys_id
try:
query_url = f"{config.api_url}/table/incident"
query_params = {
"sysparm_query": f"number={incident_id}",
"sysparm_limit": 1,
}
response = requests.get(
query_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
if not result:
return IncidentResponse(
success=False,
message=f"Incident not found: {incident_id}",
)
incident_id = result[0].get("sys_id")
api_url = f"{config.api_url}/table/incident/{incident_id}"
except requests.RequestException as e:
logger.error(f"Failed to find incident: {e}")
return IncidentResponse(
success=False,
message=f"Failed to find incident: {str(e)}",
)
# Build request data
data = {}
if params.is_work_note:
data["work_notes"] = params.comment
else:
data["comments"] = params.comment
# Make request
try:
response = requests.put(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
return IncidentResponse(
success=True,
message="Comment added successfully",
incident_id=result.get("sys_id"),
incident_number=result.get("number"),
)
except requests.RequestException as e:
logger.error(f"Failed to add comment: {e}")
return IncidentResponse(
success=False,
message=f"Failed to add comment: {str(e)}",
)
def resolve_incident(
config: ServerConfig,
auth_manager: AuthManager,
params: ResolveIncidentParams,
) -> IncidentResponse:
"""
Resolve an incident in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for resolving the incident.
Returns:
Response with the result of the operation.
"""
# Determine if incident_id is a number or sys_id
incident_id = params.incident_id
if len(incident_id) == 32 and all(c in "0123456789abcdef" for c in incident_id):
# This is likely a sys_id
api_url = f"{config.api_url}/table/incident/{incident_id}"
else:
# This is likely an incident number
# First, we need to get the sys_id
try:
query_url = f"{config.api_url}/table/incident"
query_params = {
"sysparm_query": f"number={incident_id}",
"sysparm_limit": 1,
}
response = requests.get(
query_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
if not result:
return IncidentResponse(
success=False,
message=f"Incident not found: {incident_id}",
)
incident_id = result[0].get("sys_id")
api_url = f"{config.api_url}/table/incident/{incident_id}"
except requests.RequestException as e:
logger.error(f"Failed to find incident: {e}")
return IncidentResponse(
success=False,
message=f"Failed to find incident: {str(e)}",
)
# Build request data
data = {
"state": "6", # Resolved
"close_code": params.resolution_code,
"close_notes": params.resolution_notes,
"resolved_at": "now",
}
# Make request
try:
response = requests.put(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
return IncidentResponse(
success=True,
message="Incident resolved successfully",
incident_id=result.get("sys_id"),
incident_number=result.get("number"),
)
except requests.RequestException as e:
logger.error(f"Failed to resolve incident: {e}")
return IncidentResponse(
success=False,
message=f"Failed to resolve incident: {str(e)}",
)
def list_incidents(
config: ServerConfig,
auth_manager: AuthManager,
params: ListIncidentsParams,
) -> dict:
"""
List incidents from ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for listing incidents.
Returns:
Dictionary with list of incidents.
"""
api_url = f"{config.api_url}/table/incident"
# Build query parameters
query_params = {
"sysparm_limit": params.limit,
"sysparm_offset": params.offset,
"sysparm_display_value": "true",
"sysparm_exclude_reference_link": "true",
}
# Add filters
filters = []
if params.state:
filters.append(f"state={params.state}")
if params.assigned_to:
filters.append(f"assigned_to={params.assigned_to}")
if params.category:
filters.append(f"category={params.category}")
if params.query:
filters.append(f"short_descriptionLIKE{params.query}^ORdescriptionLIKE{params.query}")
if filters:
query_params["sysparm_query"] = "^".join(filters)
# Make request
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
data = response.json()
incidents = []
for incident_data in data.get("result", []):
# Handle assigned_to field which could be a string or a dictionary
assigned_to = incident_data.get("assigned_to")
if isinstance(assigned_to, dict):
assigned_to = assigned_to.get("display_value")
incident = {
"sys_id": incident_data.get("sys_id"),
"number": incident_data.get("number"),
"short_description": incident_data.get("short_description"),
"description": incident_data.get("description"),
"state": incident_data.get("state"),
"priority": incident_data.get("priority"),
"assigned_to": assigned_to,
"category": incident_data.get("category"),
"subcategory": incident_data.get("subcategory"),
"created_on": incident_data.get("sys_created_on"),
"updated_on": incident_data.get("sys_updated_on"),
}
incidents.append(incident)
return {
"success": True,
"message": f"Found {len(incidents)} incidents",
"incidents": incidents
}
except requests.RequestException as e:
logger.error(f"Failed to list incidents: {e}")
return {
"success": False,
"message": f"Failed to list incidents: {str(e)}",
"incidents": []
}
def get_incident_by_number(
config: ServerConfig,
auth_manager: AuthManager,
params: GetIncidentByNumberParams,
) -> dict:
"""
Fetch a single incident from ServiceNow by its number.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for fetching the incident.
Returns:
Dictionary with the incident details.
"""
api_url = f"{config.api_url}/table/incident"
# Build query parameters
query_params = {
"sysparm_query": f"number={params.incident_number}",
"sysparm_limit": 1,
"sysparm_display_value": "true",
"sysparm_exclude_reference_link": "true",
}
# Make request
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
data = response.json()
result = data.get("result", [])
if not result:
return {
"success": False,
"message": f"Incident not found: {params.incident_number}",
}
incident_data = result[0]
assigned_to = incident_data.get("assigned_to")
if isinstance(assigned_to, dict):
assigned_to = assigned_to.get("display_value")
incident = {
"sys_id": incident_data.get("sys_id"),
"number": incident_data.get("number"),
"short_description": incident_data.get("short_description"),
"description": incident_data.get("description"),
"state": incident_data.get("state"),
"priority": incident_data.get("priority"),
"assigned_to": assigned_to,
"category": incident_data.get("category"),
"subcategory": incident_data.get("subcategory"),
"created_on": incident_data.get("sys_created_on"),
"updated_on": incident_data.get("sys_updated_on"),
}
return {
"success": True,
"message": f"Incident {params.incident_number} found",
"incident": incident,
}
except requests.RequestException as e:
logger.error(f"Failed to fetch incident: {e}")
return {
"success": False,
"message": f"Failed to fetch incident: {str(e)}",
}
```
--------------------------------------------------------------------------------
/tests/test_catalog_optimization.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the ServiceNow MCP catalog optimization tools.
"""
import unittest
from unittest.mock import MagicMock, patch
import requests
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.tools.catalog_optimization import (
OptimizationRecommendationsParams,
UpdateCatalogItemParams,
_get_high_abandonment_items,
_get_inactive_items,
_get_low_usage_items,
_get_poor_description_items,
_get_slow_fulfillment_items,
get_optimization_recommendations,
update_catalog_item,
)
from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
class TestCatalogOptimizationTools(unittest.TestCase):
"""Test cases for the catalog optimization tools."""
def setUp(self):
"""Set up test fixtures."""
# Create a mock server config
self.config = ServerConfig(
instance_url="https://example.service-now.com",
auth=AuthConfig(
type=AuthType.BASIC,
basic=BasicAuthConfig(username="admin", password="password"),
),
)
# Create a mock auth manager
self.auth_manager = MagicMock(spec=AuthManager)
self.auth_manager.get_headers.return_value = {"Authorization": "Basic YWRtaW46cGFzc3dvcmQ="}
@patch("requests.get")
def test_get_inactive_items(self, mock_get):
"""Test getting inactive catalog items."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "item1",
"name": "Old Laptop",
"short_description": "Outdated laptop model",
"category": "hardware",
},
{
"sys_id": "item2",
"name": "Legacy Software",
"short_description": "Deprecated software package",
"category": "software",
},
]
}
mock_get.return_value = mock_response
# Call the function
result = _get_inactive_items(self.config, self.auth_manager)
# Verify the results
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["name"], "Old Laptop")
self.assertEqual(result[1]["name"], "Legacy Software")
# Verify the API call
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(kwargs["params"]["sysparm_query"], "active=false")
@patch("requests.get")
def test_get_inactive_items_with_category(self, mock_get):
"""Test getting inactive catalog items filtered by category."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "item1",
"name": "Old Laptop",
"short_description": "Outdated laptop model",
"category": "hardware",
},
]
}
mock_get.return_value = mock_response
# Call the function with a category filter
result = _get_inactive_items(self.config, self.auth_manager, "hardware")
# Verify the results
self.assertEqual(len(result), 1)
self.assertEqual(result[0]["name"], "Old Laptop")
# Verify the API call
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(kwargs["params"]["sysparm_query"], "active=false^category=hardware")
@patch("requests.get")
def test_get_inactive_items_error(self, mock_get):
"""Test error handling when getting inactive catalog items."""
# Mock an error response
mock_get.side_effect = requests.exceptions.RequestException("API Error")
# Call the function
result = _get_inactive_items(self.config, self.auth_manager)
# Verify the results
self.assertEqual(result, [])
@patch("requests.get")
@patch("random.sample")
@patch("random.randint")
def test_get_low_usage_items(self, mock_randint, mock_sample, mock_get):
"""Test getting catalog items with low usage."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "item1",
"name": "Rarely Used Laptop",
"short_description": "Laptop model with low demand",
"category": "hardware",
},
{
"sys_id": "item2",
"name": "Unpopular Software",
"short_description": "Software with few users",
"category": "software",
},
{
"sys_id": "item3",
"name": "Niche Service",
"short_description": "Specialized service with limited audience",
"category": "services",
},
]
}
mock_get.return_value = mock_response
# Mock the random sample to return the first two items
mock_sample.return_value = [
{
"sys_id": "item1",
"name": "Rarely Used Laptop",
"short_description": "Laptop model with low demand",
"category": "hardware",
},
{
"sys_id": "item2",
"name": "Unpopular Software",
"short_description": "Software with few users",
"category": "software",
},
]
# Mock the random order counts
mock_randint.return_value = 2
# Call the function
result = _get_low_usage_items(self.config, self.auth_manager)
# Verify the results
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["name"], "Rarely Used Laptop")
self.assertEqual(result[0]["order_count"], 2)
self.assertEqual(result[1]["name"], "Unpopular Software")
self.assertEqual(result[1]["order_count"], 2)
# Verify the API call
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(kwargs["params"]["sysparm_query"], "active=true")
def test_high_abandonment_items_format(self):
"""Test the expected format of high abandonment items."""
# This test doesn't call the actual function, but verifies the expected format
# of the data that would be returned by the function
# Example data that would be returned by _get_high_abandonment_items
high_abandonment_items = [
{
"sys_id": "item1",
"name": "Complex Request",
"short_description": "Request with many fields",
"category": "hardware",
"abandonment_rate": 60,
"cart_adds": 30,
"orders": 12,
},
{
"sys_id": "item2",
"name": "Expensive Item",
"short_description": "High-cost item",
"category": "software",
"abandonment_rate": 60,
"cart_adds": 20,
"orders": 8,
},
]
# Verify the expected format
self.assertEqual(len(high_abandonment_items), 2)
self.assertEqual(high_abandonment_items[0]["name"], "Complex Request")
self.assertEqual(high_abandonment_items[0]["abandonment_rate"], 60)
self.assertEqual(high_abandonment_items[0]["cart_adds"], 30)
self.assertEqual(high_abandonment_items[0]["orders"], 12)
self.assertEqual(high_abandonment_items[1]["name"], "Expensive Item")
self.assertEqual(high_abandonment_items[1]["abandonment_rate"], 60)
self.assertEqual(high_abandonment_items[1]["cart_adds"], 20)
self.assertEqual(high_abandonment_items[1]["orders"], 8)
@patch("requests.get")
@patch("random.sample")
@patch("random.uniform")
def test_get_slow_fulfillment_items(self, mock_uniform, mock_sample, mock_get):
"""Test getting catalog items with slow fulfillment times."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "item1",
"name": "Custom Hardware",
"short_description": "Specialized hardware request",
"category": "hardware",
},
{
"sys_id": "item2",
"name": "Complex Software",
"short_description": "Software with complex installation",
"category": "software",
},
]
}
mock_get.return_value = mock_response
# Mock the random sample to return all items
mock_sample.return_value = [
{
"sys_id": "item1",
"name": "Custom Hardware",
"short_description": "Specialized hardware request",
"category": "hardware",
},
{
"sys_id": "item2",
"name": "Complex Software",
"short_description": "Software with complex installation",
"category": "software",
},
]
# Mock the random uniform values for fulfillment times
mock_uniform.return_value = 7.5
# Call the function
result = _get_slow_fulfillment_items(self.config, self.auth_manager)
# Verify the results
self.assertEqual(len(result), 2)
self.assertEqual(result[0]["name"], "Custom Hardware")
self.assertEqual(result[0]["avg_fulfillment_time"], 7.5)
self.assertEqual(result[0]["avg_fulfillment_time_vs_catalog"], 3.0) # 7.5 / 2.5 = 3.0
self.assertEqual(result[1]["name"], "Complex Software")
self.assertEqual(result[1]["avg_fulfillment_time"], 7.5)
self.assertEqual(result[1]["avg_fulfillment_time_vs_catalog"], 3.0) # 7.5 / 2.5 = 3.0
@patch("requests.get")
def test_get_poor_description_items(self, mock_get):
"""Test getting catalog items with poor description quality."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "item1",
"name": "Laptop",
"short_description": "", # Empty description
"category": "hardware",
},
{
"sys_id": "item2",
"name": "Software",
"short_description": "Software package", # Short description
"category": "software",
},
{
"sys_id": "item3",
"name": "Service",
"short_description": "Please click here to request this service", # Instructional language
"category": "services",
},
]
}
mock_get.return_value = mock_response
# Call the function
result = _get_poor_description_items(self.config, self.auth_manager)
# Verify the results
self.assertEqual(len(result), 3)
# Check the first item (empty description)
self.assertEqual(result[0]["name"], "Laptop")
self.assertEqual(result[0]["description_quality"], 0)
self.assertEqual(result[0]["quality_issues"], ["Missing description"])
# Check the second item (short description)
self.assertEqual(result[1]["name"], "Software")
self.assertEqual(result[1]["description_quality"], 30)
self.assertEqual(result[1]["quality_issues"], ["Description too short", "Lacks detail"])
# Check the third item (instructional language)
self.assertEqual(result[2]["name"], "Service")
self.assertEqual(result[2]["description_quality"], 50)
self.assertEqual(result[2]["quality_issues"], ["Uses instructional language instead of descriptive"])
@patch("servicenow_mcp.tools.catalog_optimization._get_inactive_items")
@patch("servicenow_mcp.tools.catalog_optimization._get_low_usage_items")
@patch("servicenow_mcp.tools.catalog_optimization._get_high_abandonment_items")
@patch("servicenow_mcp.tools.catalog_optimization._get_slow_fulfillment_items")
@patch("servicenow_mcp.tools.catalog_optimization._get_poor_description_items")
def test_get_optimization_recommendations(
self,
mock_poor_desc,
mock_slow_fulfill,
mock_high_abandon,
mock_low_usage,
mock_inactive
):
"""Test getting optimization recommendations."""
# Mock the helper functions to return test data
mock_inactive.return_value = [
{
"sys_id": "item1",
"name": "Old Laptop",
"short_description": "Outdated laptop model",
"category": "hardware",
},
]
mock_low_usage.return_value = [
{
"sys_id": "item2",
"name": "Rarely Used Software",
"short_description": "Software with few users",
"category": "software",
"order_count": 2,
},
]
mock_high_abandon.return_value = [
{
"sys_id": "item3",
"name": "Complex Request",
"short_description": "Request with many fields",
"category": "hardware",
"abandonment_rate": 60,
"cart_adds": 30,
"orders": 12,
},
]
mock_slow_fulfill.return_value = [
{
"sys_id": "item4",
"name": "Custom Hardware",
"short_description": "Specialized hardware request",
"category": "hardware",
"avg_fulfillment_time": 7.5,
"avg_fulfillment_time_vs_catalog": 3.0,
},
]
mock_poor_desc.return_value = [
{
"sys_id": "item5",
"name": "Laptop",
"short_description": "",
"category": "hardware",
"description_quality": 0,
"quality_issues": ["Missing description"],
},
]
# Create the parameters
params = OptimizationRecommendationsParams(
recommendation_types=[
"inactive_items",
"low_usage",
"high_abandonment",
"slow_fulfillment",
"description_quality"
]
)
# Call the function
result = get_optimization_recommendations(self.config, self.auth_manager, params)
# Verify the results
self.assertTrue(result["success"])
self.assertEqual(len(result["recommendations"]), 5)
# Check each recommendation type
recommendation_types = [rec["type"] for rec in result["recommendations"]]
self.assertIn("inactive_items", recommendation_types)
self.assertIn("low_usage", recommendation_types)
self.assertIn("high_abandonment", recommendation_types)
self.assertIn("slow_fulfillment", recommendation_types)
self.assertIn("description_quality", recommendation_types)
# Check that each recommendation has the expected fields
for rec in result["recommendations"]:
self.assertIn("title", rec)
self.assertIn("description", rec)
self.assertIn("items", rec)
self.assertIn("impact", rec)
self.assertIn("effort", rec)
self.assertIn("action", rec)
@patch("servicenow_mcp.tools.catalog_optimization._get_inactive_items")
@patch("servicenow_mcp.tools.catalog_optimization._get_low_usage_items")
def test_get_optimization_recommendations_filtered(self, mock_low_usage, mock_inactive):
"""Test getting filtered optimization recommendations."""
# Mock the helper functions to return test data
mock_inactive.return_value = [
{
"sys_id": "item1",
"name": "Old Laptop",
"short_description": "Outdated laptop model",
"category": "hardware",
},
]
mock_low_usage.return_value = [
{
"sys_id": "item2",
"name": "Rarely Used Software",
"short_description": "Software with few users",
"category": "software",
"order_count": 2,
},
]
# Create the parameters with only specific recommendation types
params = OptimizationRecommendationsParams(
recommendation_types=["inactive_items", "low_usage"]
)
# Call the function
result = get_optimization_recommendations(self.config, self.auth_manager, params)
# Verify the results
self.assertTrue(result["success"])
self.assertEqual(len(result["recommendations"]), 2)
# Check each recommendation type
recommendation_types = [rec["type"] for rec in result["recommendations"]]
self.assertIn("inactive_items", recommendation_types)
self.assertIn("low_usage", recommendation_types)
self.assertNotIn("high_abandonment", recommendation_types)
self.assertNotIn("slow_fulfillment", recommendation_types)
self.assertNotIn("description_quality", recommendation_types)
@patch("requests.patch")
def test_update_catalog_item(self, mock_patch):
"""Test updating a catalog item."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "item1",
"name": "Laptop",
"short_description": "Updated laptop description",
"description": "Detailed description",
"category": "hardware",
"price": "999.99",
"active": "true",
"order": "100",
}
}
mock_patch.return_value = mock_response
# Create the parameters
params = UpdateCatalogItemParams(
item_id="item1",
short_description="Updated laptop description",
)
# Call the function
result = update_catalog_item(self.config, self.auth_manager, params)
# Verify the results
self.assertTrue(result["success"])
self.assertEqual(result["data"]["short_description"], "Updated laptop description")
# Verify the API call
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_cat_item/item1")
self.assertEqual(kwargs["json"], {"short_description": "Updated laptop description"})
@patch("requests.patch")
def test_update_catalog_item_multiple_fields(self, mock_patch):
"""Test updating multiple fields of a catalog item."""
# Mock the response from ServiceNow
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "item1",
"name": "Updated Laptop",
"short_description": "Updated laptop description",
"description": "Detailed description",
"category": "hardware",
"price": "1099.99",
"active": "true",
"order": "100",
}
}
mock_patch.return_value = mock_response
# Create the parameters with multiple fields
params = UpdateCatalogItemParams(
item_id="item1",
name="Updated Laptop",
short_description="Updated laptop description",
price="1099.99",
)
# Call the function
result = update_catalog_item(self.config, self.auth_manager, params)
# Verify the results
self.assertTrue(result["success"])
self.assertEqual(result["data"]["name"], "Updated Laptop")
self.assertEqual(result["data"]["short_description"], "Updated laptop description")
self.assertEqual(result["data"]["price"], "1099.99")
# Verify the API call
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(args[0], "https://example.service-now.com/api/now/table/sc_cat_item/item1")
self.assertEqual(kwargs["json"], {
"name": "Updated Laptop",
"short_description": "Updated laptop description",
"price": "1099.99",
})
@patch("requests.patch")
def test_update_catalog_item_error(self, mock_patch):
"""Test error handling when updating a catalog item."""
# Mock an error response
mock_patch.side_effect = requests.exceptions.RequestException("API Error")
# Create the parameters
params = UpdateCatalogItemParams(
item_id="item1",
short_description="Updated laptop description",
)
# Call the function
result = update_catalog_item(self.config, self.auth_manager, params)
# Verify the results
self.assertFalse(result["success"])
self.assertIn("Error updating catalog item", result["message"])
self.assertIsNone(result["data"])
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/story_tools.py:
--------------------------------------------------------------------------------
```python
"""
Story management tools for the ServiceNow MCP server.
This module provides tools for managing stories in ServiceNow.
"""
import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Type, TypeVar
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
# Type variable for Pydantic models
T = TypeVar('T', bound=BaseModel)
class CreateStoryParams(BaseModel):
"""Parameters for creating a story."""
short_description: str = Field(..., description="Short description of the story")
acceptance_criteria: str = Field(..., description="Acceptance criteria for the story")
description: Optional[str] = Field(None, description="Detailed description of the story")
state: Optional[str] = Field(None, description="State of story (-6 is Draft,-7 is Ready for Testing,-8 is Testing,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
assignment_group: Optional[str] = Field(None, description="Group assigned to the story")
story_points: Optional[int] = Field(10, description="Points value for the story")
assigned_to: Optional[str] = Field(None, description="User assigned to the story")
epic: Optional[str] = Field(None, description="Epic that the story belongs to. It requires the System ID of the epic.")
project: Optional[str] = Field(None, description="Project that the story belongs to. It requires the System ID of the project.")
work_notes: Optional[str] = Field(None, description="Work notes to add to the story. Used for adding notes and comments to a story")
class UpdateStoryParams(BaseModel):
"""Parameters for updating a story."""
story_id: str = Field(..., description="Story IDNumber or sys_id. You will need to fetch the story to get the sys_id if you only have the story number")
short_description: Optional[str] = Field(None, description="Short description of the story")
acceptance_criteria: Optional[str] = Field(None, description="Acceptance criteria for the story")
description: Optional[str] = Field(None, description="Detailed description of the story")
state: Optional[str] = Field(None, description="State of story (-6 is Draft,-7 is Ready for Testing,-8 is Testing,1 is Ready, 2 is Work in progress, 3 is Complete, 4 is Cancelled)")
assignment_group: Optional[str] = Field(None, description="Group assigned to the story")
story_points: Optional[int] = Field(None, description="Points value for the story")
assigned_to: Optional[str] = Field(None, description="User assigned to the story")
epic: Optional[str] = Field(None, description="Epic that the story belongs to. It requires the System ID of the epic.")
project: Optional[str] = Field(None, description="Project that the story belongs to. It requires the System ID of the project.")
work_notes: Optional[str] = Field(None, description="Work notes to add to the story. Used for adding notes and comments to a story")
class ListStoriesParams(BaseModel):
"""Parameters for listing stories."""
limit: Optional[int] = Field(10, description="Maximum number of records to return")
offset: Optional[int] = Field(0, description="Offset to start from")
state: Optional[str] = Field(None, description="Filter by state")
assignment_group: Optional[str] = Field(None, description="Filter by assignment group")
timeframe: Optional[str] = Field(None, description="Filter by timeframe (upcoming, in-progress, completed)")
query: Optional[str] = Field(None, description="Additional query string")
class ListStoryDependenciesParams(BaseModel):
"""Parameters for listing story dependencies."""
limit: Optional[int] = Field(10, description="Maximum number of records to return")
offset: Optional[int] = Field(0, description="Offset to start from")
query: Optional[str] = Field(None, description="Additional query string")
dependent_story: Optional[str] = Field(None, description="Sys_id of the dependent story is required")
prerequisite_story: Optional[str] = Field(None, description="Sys_id that this story depends on is required")
class CreateStoryDependencyParams(BaseModel):
"""Parameters for creating a story dependency."""
dependent_story: str = Field(..., description="Sys_id of the dependent story is required")
prerequisite_story: str = Field(..., description="Sys_id that this story depends on is required")
class DeleteStoryDependencyParams(BaseModel):
"""Parameters for deleting a story dependency."""
dependency_id: str = Field(..., description="Sys_id of the dependency is required")
def _unwrap_and_validate_params(params: Any, model_class: Type[T], required_fields: List[str] = None) -> Dict[str, Any]:
"""
Helper function to unwrap and validate parameters.
Args:
params: The parameters to unwrap and validate.
model_class: The Pydantic model class to validate against.
required_fields: List of required field names.
Returns:
A tuple of (success, result) where result is either the validated parameters or an error message.
"""
# Handle case where params might be wrapped in another dictionary
if isinstance(params, dict) and len(params) == 1 and "params" in params and isinstance(params["params"], dict):
logger.warning("Detected params wrapped in a 'params' key. Unwrapping...")
params = params["params"]
# Handle case where params might be a Pydantic model object
if not isinstance(params, dict):
try:
# Try to convert to dict if it's a Pydantic model
logger.warning("Params is not a dictionary. Attempting to convert...")
params = params.dict() if hasattr(params, "dict") else dict(params)
except Exception as e:
logger.error(f"Failed to convert params to dictionary: {e}")
return {
"success": False,
"message": f"Invalid parameters format. Expected a dictionary, got {type(params).__name__}",
}
# Validate required parameters are present
if required_fields:
for field in required_fields:
if field not in params:
return {
"success": False,
"message": f"Missing required parameter '{field}'",
}
try:
# Validate parameters against the model
validated_params = model_class(**params)
return {
"success": True,
"params": validated_params,
}
except Exception as e:
logger.error(f"Error validating parameters: {e}")
return {
"success": False,
"message": f"Error validating parameters: {str(e)}",
}
def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
"""
Helper function to get the instance URL from either server_config or auth_manager.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
Returns:
The instance URL if found, None otherwise.
"""
if hasattr(server_config, 'instance_url'):
return server_config.instance_url
elif hasattr(auth_manager, 'instance_url'):
return auth_manager.instance_url
else:
logger.error("Cannot find instance_url in either server_config or auth_manager")
return None
def _get_headers(auth_manager: Any, server_config: Any) -> Optional[Dict[str, str]]:
"""
Helper function to get headers from either auth_manager or server_config.
Args:
auth_manager: The authentication manager or object passed as auth_manager.
server_config: The server configuration or object passed as server_config.
Returns:
The headers if found, None otherwise.
"""
# Try to get headers from auth_manager
if hasattr(auth_manager, 'get_headers'):
return auth_manager.get_headers()
# If auth_manager doesn't have get_headers, try server_config
if hasattr(server_config, 'get_headers'):
return server_config.get_headers()
# If neither has get_headers, check if auth_manager is actually a ServerConfig
# and server_config is actually an AuthManager (parameters swapped)
if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
return server_config.get_headers()
logger.error("Cannot find get_headers method in either auth_manager or server_config")
return None
def create_story(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Create a new story in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for creating the story.
Returns:
The created story.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
CreateStoryParams,
required_fields=["short_description", "acceptance_criteria"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {
"short_description": validated_params.short_description,
"acceptance_criteria": validated_params.acceptance_criteria,
}
# Add optional fields if provided
if validated_params.description:
data["description"] = validated_params.description
if validated_params.state:
data["state"] = validated_params.state
if validated_params.assignment_group:
data["assignment_group"] = validated_params.assignment_group
if validated_params.story_points:
data["story_points"] = validated_params.story_points
if validated_params.assigned_to:
data["assigned_to"] = validated_params.assigned_to
if validated_params.epic:
data["epic"] = validated_params.epic
if validated_params.project:
data["project"] = validated_params.project
if validated_params.work_notes:
data["work_notes"] = validated_params.work_notes
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/rm_story"
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Story created successfully",
"story": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error creating story: {e}")
return {
"success": False,
"message": f"Error creating story: {str(e)}",
}
def update_story(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Update an existing story in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for updating the story.
Returns:
The updated story.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
UpdateStoryParams,
required_fields=["story_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {}
# Add optional fields if provided
if validated_params.short_description:
data["short_description"] = validated_params.short_description
if validated_params.acceptance_criteria:
data["acceptance_criteria"] = validated_params.acceptance_criteria
if validated_params.description:
data["description"] = validated_params.description
if validated_params.state:
data["state"] = validated_params.state
if validated_params.assignment_group:
data["assignment_group"] = validated_params.assignment_group
if validated_params.story_points:
data["story_points"] = validated_params.story_points
if validated_params.epic:
data["epic"] = validated_params.epic
if validated_params.project:
data["project"] = validated_params.project
if validated_params.assigned_to:
data["assigned_to"] = validated_params.assigned_to
if validated_params.work_notes:
data["work_notes"] = validated_params.work_notes
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/rm_story/{validated_params.story_id}"
try:
response = requests.put(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Story updated successfully",
"story": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error updating story: {e}")
return {
"success": False,
"message": f"Error updating story: {str(e)}",
}
def list_stories(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
List stories from ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for listing stories.
Returns:
A list of stories.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
ListStoriesParams
)
if not result["success"]:
return result
validated_params = result["params"]
# Build the query
query_parts = []
if validated_params.state:
query_parts.append(f"state={validated_params.state}")
if validated_params.assignment_group:
query_parts.append(f"assignment_group={validated_params.assignment_group}")
# Handle timeframe filtering
if validated_params.timeframe:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if validated_params.timeframe == "upcoming":
query_parts.append(f"start_date>{now}")
elif validated_params.timeframe == "in-progress":
query_parts.append(f"start_date<{now}^end_date>{now}")
elif validated_params.timeframe == "completed":
query_parts.append(f"end_date<{now}")
# Add any additional query string
if validated_params.query:
query_parts.append(validated_params.query)
# Combine query parts
query = "^".join(query_parts) if query_parts else ""
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Make the API request
url = f"{instance_url}/api/now/table/rm_story"
params = {
"sysparm_limit": validated_params.limit,
"sysparm_offset": validated_params.offset,
"sysparm_query": query,
"sysparm_display_value": "true",
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
result = response.json()
# Handle the case where result["result"] is a list
stories = result.get("result", [])
count = len(stories)
return {
"success": True,
"stories": stories,
"count": count,
"total": count, # Use count as total if total is not provided
}
except requests.exceptions.RequestException as e:
logger.error(f"Error listing stories: {e}")
return {
"success": False,
"message": f"Error listing stories: {str(e)}",
}
def list_story_dependencies(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
List story dependencies from ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for listing story dependencies.
Returns:
A list of story dependencies.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
ListStoryDependenciesParams
)
if not result["success"]:
return result
validated_params = result["params"]
# Build the query
query_parts = []
if validated_params.dependent_story:
query_parts.append(f"dependent_story={validated_params.dependent_story}")
if validated_params.prerequisite_story:
query_parts.append(f"prerequisite_story={validated_params.prerequisite_story}")
# Add any additional query string
if validated_params.query:
query_parts.append(validated_params.query)
# Combine query parts
query = "^".join(query_parts) if query_parts else ""
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Make the API request
url = f"{instance_url}/api/now/table/m2m_story_dependencies"
params = {
"sysparm_limit": validated_params.limit,
"sysparm_offset": validated_params.offset,
"sysparm_query": query,
"sysparm_display_value": "true",
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
result = response.json()
# Handle the case where result["result"] is a list
story_dependencies = result.get("result", [])
count = len(story_dependencies)
return {
"success": True,
"story_dependencies": story_dependencies,
"count": count,
"total": count, # Use count as total if total is not provided
}
except requests.exceptions.RequestException as e:
logger.error(f"Error listing story dependencies: {e}")
return {
"success": False,
"message": f"Error listing story dependencies: {str(e)}",
}
def create_story_dependency(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Create a dependency between two stories in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for creating a story dependency.
Returns:
The created story dependency.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
CreateStoryDependencyParams,
required_fields=["dependent_story", "prerequisite_story"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {
"dependent_story": validated_params.dependent_story,
"prerequisite_story": validated_params.prerequisite_story,
}
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/m2m_story_dependencies"
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Story dependency created successfully",
"story_dependency": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error creating story dependency: {e}")
return {
"success": False,
"message": f"Error creating story dependency: {str(e)}",
}
def delete_story_dependency(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Delete a story dependency in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for deleting a story dependency.
Returns:
The deleted story dependency.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
DeleteStoryDependencyParams,
required_fields=["dependency_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Make the API request
url = f"{instance_url}/api/now/table/m2m_story_dependencies/{validated_params.dependency_id}"
try:
response = requests.delete(url, headers=headers)
response.raise_for_status()
return {
"success": True,
"message": "Story dependency deleted successfully",
}
except requests.exceptions.RequestException as e:
logger.error(f"Error deleting story dependency: {e}")
return {
"success": False,
"message": f"Error deleting story dependency: {str(e)}",
}
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/changeset_tools.py:
--------------------------------------------------------------------------------
```python
"""
Changeset tools for the ServiceNow MCP server.
This module provides tools for managing changesets in ServiceNow.
"""
import logging
from typing import Any, Dict, List, Optional, Type, TypeVar, Union
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
# Type variable for Pydantic models
T = TypeVar('T', bound=BaseModel)
class ListChangesetsParams(BaseModel):
"""Parameters for listing changesets."""
limit: Optional[int] = Field(10, description="Maximum number of records to return")
offset: Optional[int] = Field(0, description="Offset to start from")
state: Optional[str] = Field(None, description="Filter by state")
application: Optional[str] = Field(None, description="Filter by application")
developer: Optional[str] = Field(None, description="Filter by developer")
timeframe: Optional[str] = Field(None, description="Filter by timeframe (recent, last_week, last_month)")
query: Optional[str] = Field(None, description="Additional query string")
class GetChangesetDetailsParams(BaseModel):
"""Parameters for getting changeset details."""
changeset_id: str = Field(..., description="Changeset ID or sys_id")
class CreateChangesetParams(BaseModel):
"""Parameters for creating a changeset."""
name: str = Field(..., description="Name of the changeset")
description: Optional[str] = Field(None, description="Description of the changeset")
application: str = Field(..., description="Application the changeset belongs to")
developer: Optional[str] = Field(None, description="Developer responsible for the changeset")
class UpdateChangesetParams(BaseModel):
"""Parameters for updating a changeset."""
changeset_id: str = Field(..., description="Changeset ID or sys_id")
name: Optional[str] = Field(None, description="Name of the changeset")
description: Optional[str] = Field(None, description="Description of the changeset")
state: Optional[str] = Field(None, description="State of the changeset")
developer: Optional[str] = Field(None, description="Developer responsible for the changeset")
class CommitChangesetParams(BaseModel):
"""Parameters for committing a changeset."""
changeset_id: str = Field(..., description="Changeset ID or sys_id")
commit_message: Optional[str] = Field(None, description="Commit message")
class PublishChangesetParams(BaseModel):
"""Parameters for publishing a changeset."""
changeset_id: str = Field(..., description="Changeset ID or sys_id")
publish_notes: Optional[str] = Field(None, description="Notes for publishing")
class AddFileToChangesetParams(BaseModel):
"""Parameters for adding a file to a changeset."""
changeset_id: str = Field(..., description="Changeset ID or sys_id")
file_path: str = Field(..., description="Path of the file to add")
file_content: str = Field(..., description="Content of the file")
def _unwrap_and_validate_params(
params: Union[Dict[str, Any], BaseModel],
model_class: Type[T],
required_fields: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Unwrap and validate parameters.
Args:
params: The parameters to unwrap and validate. Can be a dictionary or a Pydantic model.
model_class: The Pydantic model class to validate against.
required_fields: List of fields that must be present.
Returns:
A dictionary with success status and validated parameters or error message.
"""
try:
# Handle case where params is already a Pydantic model
if isinstance(params, BaseModel):
# If it's already the correct model class, use it directly
if isinstance(params, model_class):
model_instance = params
# Otherwise, convert to dict and create new instance
else:
model_instance = model_class(**params.dict())
# Handle dictionary case
else:
# Create model instance
model_instance = model_class(**params)
# Check required fields
if required_fields:
missing_fields = []
for field in required_fields:
if getattr(model_instance, field, None) is None:
missing_fields.append(field)
if missing_fields:
return {
"success": False,
"message": f"Missing required fields: {', '.join(missing_fields)}",
}
return {
"success": True,
"params": model_instance,
}
except Exception as e:
return {
"success": False,
"message": f"Invalid parameters: {str(e)}",
}
def _get_instance_url(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[str]:
"""
Get the instance URL from either auth_manager or server_config.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
Returns:
The instance URL or None if not found.
"""
# Try to get instance_url from server_config
if hasattr(server_config, 'instance_url'):
return server_config.instance_url
# Try to get instance_url from auth_manager
if hasattr(auth_manager, 'instance_url'):
return auth_manager.instance_url
# If neither has instance_url, check if auth_manager is actually a ServerConfig
# and server_config is actually an AuthManager (parameters swapped)
if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
if hasattr(auth_manager, 'instance_url'):
return auth_manager.instance_url
logger.error("Cannot find instance_url in either auth_manager or server_config")
return None
def _get_headers(auth_manager: AuthManager, server_config: ServerConfig) -> Optional[Dict[str, str]]:
"""
Get the headers from either auth_manager or server_config.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
Returns:
The headers or None if not found.
"""
# Try to get headers from auth_manager
if hasattr(auth_manager, 'get_headers'):
return auth_manager.get_headers()
# Try to get headers from server_config
if hasattr(server_config, 'get_headers'):
return server_config.get_headers()
# If neither has get_headers, check if auth_manager is actually a ServerConfig
# and server_config is actually an AuthManager (parameters swapped)
if hasattr(server_config, 'get_headers') and not hasattr(auth_manager, 'get_headers'):
return server_config.get_headers()
logger.error("Cannot find get_headers method in either auth_manager or server_config")
return None
def list_changesets(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], ListChangesetsParams],
) -> Dict[str, Any]:
"""
List changesets from ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for listing changesets. Can be a dictionary or a ListChangesetsParams object.
Returns:
A list of changesets.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(params, ListChangesetsParams)
if not result["success"]:
return result
validated_params = result["params"]
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Build query parameters
query_params = {
"sysparm_limit": validated_params.limit,
"sysparm_offset": validated_params.offset,
}
# Build sysparm_query
query_parts = []
if validated_params.state:
query_parts.append(f"state={validated_params.state}")
if validated_params.application:
query_parts.append(f"application={validated_params.application}")
if validated_params.developer:
query_parts.append(f"developer={validated_params.developer}")
if validated_params.timeframe:
if validated_params.timeframe == "recent":
query_parts.append("sys_created_onONLast 7 days@javascript:gs.beginningOfLast7Days()@javascript:gs.endOfToday()")
elif validated_params.timeframe == "last_week":
query_parts.append("sys_created_onONLast week@javascript:gs.beginningOfLastWeek()@javascript:gs.endOfLastWeek()")
elif validated_params.timeframe == "last_month":
query_parts.append("sys_created_onONLast month@javascript:gs.beginningOfLastMonth()@javascript:gs.endOfLastMonth()")
if validated_params.query:
query_parts.append(validated_params.query)
if query_parts:
query_params["sysparm_query"] = "^".join(query_parts)
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_set"
try:
response = requests.get(url, params=query_params, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"changesets": result.get("result", []),
"count": len(result.get("result", [])),
}
except requests.exceptions.RequestException as e:
logger.error(f"Error listing changesets: {e}")
return {
"success": False,
"message": f"Error listing changesets: {str(e)}",
}
def get_changeset_details(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], GetChangesetDetailsParams],
) -> Dict[str, Any]:
"""
Get detailed information about a specific changeset.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for getting changeset details. Can be a dictionary or a GetChangesetDetailsParams object.
Returns:
Detailed information about the changeset.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
GetChangesetDetailsParams,
required_fields=["changeset_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_set/{validated_params.changeset_id}"
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
result = response.json()
# Get the changeset details
changeset = result.get("result", {})
# Get the changes in this changeset
changes_url = f"{instance_url}/api/now/table/sys_update_xml"
changes_params = {
"sysparm_query": f"update_set={validated_params.changeset_id}",
}
changes_response = requests.get(changes_url, params=changes_params, headers=headers)
changes_response.raise_for_status()
changes_result = changes_response.json()
changes = changes_result.get("result", [])
return {
"success": True,
"changeset": changeset,
"changes": changes,
"change_count": len(changes),
}
except requests.exceptions.RequestException as e:
logger.error(f"Error getting changeset details: {e}")
return {
"success": False,
"message": f"Error getting changeset details: {str(e)}",
}
def create_changeset(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], CreateChangesetParams],
) -> Dict[str, Any]:
"""
Create a new changeset in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for creating a changeset. Can be a dictionary or a CreateChangesetParams object.
Returns:
The created changeset.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
CreateChangesetParams,
required_fields=["name", "application"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {
"name": validated_params.name,
"application": validated_params.application,
}
# Add optional fields if provided
if validated_params.description:
data["description"] = validated_params.description
if validated_params.developer:
data["developer"] = validated_params.developer
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_set"
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Changeset created successfully",
"changeset": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error creating changeset: {e}")
return {
"success": False,
"message": f"Error creating changeset: {str(e)}",
}
def update_changeset(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], UpdateChangesetParams],
) -> Dict[str, Any]:
"""
Update an existing changeset in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for updating a changeset. Can be a dictionary or a UpdateChangesetParams object.
Returns:
The updated changeset.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
UpdateChangesetParams,
required_fields=["changeset_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {}
# Add optional fields if provided
if validated_params.name:
data["name"] = validated_params.name
if validated_params.description:
data["description"] = validated_params.description
if validated_params.state:
data["state"] = validated_params.state
if validated_params.developer:
data["developer"] = validated_params.developer
# If no fields to update, return error
if not data:
return {
"success": False,
"message": "No fields to update",
}
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_set/{validated_params.changeset_id}"
try:
response = requests.patch(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Changeset updated successfully",
"changeset": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error updating changeset: {e}")
return {
"success": False,
"message": f"Error updating changeset: {str(e)}",
}
def commit_changeset(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], CommitChangesetParams],
) -> Dict[str, Any]:
"""
Commit a changeset in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for committing a changeset. Can be a dictionary or a CommitChangesetParams object.
Returns:
The committed changeset.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
CommitChangesetParams,
required_fields=["changeset_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Prepare the request data
data = {
"state": "complete",
}
# Add commit message if provided
if validated_params.commit_message:
data["description"] = validated_params.commit_message
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_set/{validated_params.changeset_id}"
try:
response = requests.patch(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Changeset committed successfully",
"changeset": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error committing changeset: {e}")
return {
"success": False,
"message": f"Error committing changeset: {str(e)}",
}
def publish_changeset(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], PublishChangesetParams],
) -> Dict[str, Any]:
"""
Publish a changeset in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for publishing a changeset. Can be a dictionary or a PublishChangesetParams object.
Returns:
The published changeset.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
PublishChangesetParams,
required_fields=["changeset_id"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Prepare the request data for the publish action
data = {
"state": "published",
}
# Add publish notes if provided
if validated_params.publish_notes:
data["description"] = validated_params.publish_notes
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_set/{validated_params.changeset_id}"
try:
response = requests.patch(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "Changeset published successfully",
"changeset": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error publishing changeset: {e}")
return {
"success": False,
"message": f"Error publishing changeset: {str(e)}",
}
def add_file_to_changeset(
auth_manager: AuthManager,
server_config: ServerConfig,
params: Union[Dict[str, Any], AddFileToChangesetParams],
) -> Dict[str, Any]:
"""
Add a file to a changeset in ServiceNow.
Args:
auth_manager: The authentication manager.
server_config: The server configuration.
params: The parameters for adding a file to a changeset. Can be a dictionary or a AddFileToChangesetParams object.
Returns:
The result of the add file operation.
"""
# Unwrap and validate parameters
result = _unwrap_and_validate_params(
params,
AddFileToChangesetParams,
required_fields=["changeset_id", "file_path", "file_content"]
)
if not result["success"]:
return result
validated_params = result["params"]
# Get the instance URL
instance_url = _get_instance_url(auth_manager, server_config)
if not instance_url:
return {
"success": False,
"message": "Cannot find instance_url in either server_config or auth_manager",
}
# Get the headers
headers = _get_headers(auth_manager, server_config)
if not headers:
return {
"success": False,
"message": "Cannot find get_headers method in either auth_manager or server_config",
}
# Add Content-Type header
headers["Content-Type"] = "application/json"
# Prepare the request data for adding a file
data = {
"update_set": validated_params.changeset_id,
"name": validated_params.file_path,
"payload": validated_params.file_content,
"type": "file",
}
# Make the API request
url = f"{instance_url}/api/now/table/sys_update_xml"
try:
response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
result = response.json()
return {
"success": True,
"message": "File added to changeset successfully",
"file": result["result"],
}
except requests.exceptions.RequestException as e:
logger.error(f"Error adding file to changeset: {e}")
return {
"success": False,
"message": f"Error adding file to changeset: {str(e)}",
}
```
--------------------------------------------------------------------------------
/tests/test_knowledge_base.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the knowledge base tools.
This module contains tests for the knowledge base tools in the ServiceNow MCP server.
"""
import unittest
from unittest.mock import MagicMock, patch
import requests
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.tools.knowledge_base import (
CreateArticleParams,
CreateCategoryParams,
CreateKnowledgeBaseParams,
GetArticleParams,
ListArticlesParams,
ListKnowledgeBasesParams,
PublishArticleParams,
UpdateArticleParams,
ListCategoriesParams,
KnowledgeBaseResponse,
CategoryResponse,
ArticleResponse,
create_article,
create_category,
create_knowledge_base,
get_article,
list_articles,
list_knowledge_bases,
publish_article,
update_article,
list_categories,
)
from servicenow_mcp.utils.config import AuthConfig, AuthType, BasicAuthConfig, ServerConfig
class TestKnowledgeBaseTools(unittest.TestCase):
"""Tests for the knowledge base tools."""
def setUp(self):
"""Set up test fixtures."""
auth_config = AuthConfig(
type=AuthType.BASIC,
basic=BasicAuthConfig(
username="test_user",
password="test_password"
)
)
self.server_config = ServerConfig(
instance_url="https://test.service-now.com",
auth=auth_config,
)
self.auth_manager = MagicMock(spec=AuthManager)
self.auth_manager.get_headers.return_value = {
"Authorization": "Bearer test",
"Content-Type": "application/json",
}
@patch("servicenow_mcp.tools.knowledge_base.requests.post")
def test_create_knowledge_base(self, mock_post):
"""Test creating a knowledge base."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "kb001",
"title": "Test Knowledge Base",
"description": "Test Description",
"owner": "admin",
"kb_managers": "it_managers",
"workflow_publish": "Knowledge - Instant Publish",
"workflow_retire": "Knowledge - Instant Retire",
}
}
mock_response.status_code = 200
mock_post.return_value = mock_response
# Call the method
params = CreateKnowledgeBaseParams(
title="Test Knowledge Base",
description="Test Description",
owner="admin",
managers="it_managers"
)
result = create_knowledge_base(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result.success)
self.assertEqual("kb001", result.kb_id)
self.assertEqual("Test Knowledge Base", result.kb_name)
# Verify the request
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge_base", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual("Test Knowledge Base", kwargs["json"]["title"])
self.assertEqual("Test Description", kwargs["json"]["description"])
self.assertEqual("admin", kwargs["json"]["owner"])
self.assertEqual("it_managers", kwargs["json"]["kb_managers"])
@patch("servicenow_mcp.tools.knowledge_base.requests.post")
def test_create_category(self, mock_post):
"""Test creating a category."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "cat001",
"label": "Test Category",
"description": "Test Category Description",
"kb_knowledge_base": "kb001",
"parent": "",
"active": "true",
}
}
mock_response.status_code = 200
mock_post.return_value = mock_response
# Call the method
params = CreateCategoryParams(
title="Test Category",
description="Test Category Description",
knowledge_base="kb001",
active=True
)
result = create_category(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result.success)
self.assertEqual("cat001", result.category_id)
self.assertEqual("Test Category", result.category_name)
# Verify the request
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_category", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual("Test Category", kwargs["json"]["label"])
self.assertEqual("Test Category Description", kwargs["json"]["description"])
self.assertEqual("kb001", kwargs["json"]["kb_knowledge_base"])
self.assertEqual("true", kwargs["json"]["active"])
@patch("servicenow_mcp.tools.knowledge_base.requests.post")
def test_create_article(self, mock_post):
"""Test creating a knowledge article."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "art001",
"short_description": "Test Article",
"text": "This is a test article content",
"kb_knowledge_base": "kb001",
"kb_category": "cat001",
"article_type": "text",
"keywords": "test,article,knowledge",
"workflow_state": "draft",
}
}
mock_response.status_code = 200
mock_post.return_value = mock_response
# Call the method
params = CreateArticleParams(
title="Test Article",
short_description="Test Article",
text="This is a test article content",
knowledge_base="kb001",
category="cat001",
keywords="test,article,knowledge",
article_type="text"
)
result = create_article(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result.success)
self.assertEqual("art001", result.article_id)
self.assertEqual("Test Article", result.article_title)
# Verify the request
mock_post.assert_called_once()
args, kwargs = mock_post.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual("Test Article", kwargs["json"]["short_description"])
self.assertEqual("This is a test article content", kwargs["json"]["text"])
self.assertEqual("kb001", kwargs["json"]["kb_knowledge_base"])
self.assertEqual("cat001", kwargs["json"]["kb_category"])
self.assertEqual("text", kwargs["json"]["article_type"])
self.assertEqual("test,article,knowledge", kwargs["json"]["keywords"])
@patch("servicenow_mcp.tools.knowledge_base.requests.patch")
def test_update_article(self, mock_patch):
"""Test updating a knowledge article."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "art001",
"short_description": "Updated Article",
"text": "This is an updated article content",
"kb_category": "cat002",
"keywords": "updated,article,knowledge",
"workflow_state": "draft",
}
}
mock_response.status_code = 200
mock_patch.return_value = mock_response
# Call the method
params = UpdateArticleParams(
article_id="art001",
title="Updated Article",
text="This is an updated article content",
category="cat002",
keywords="updated,article,knowledge"
)
result = update_article(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result.success)
self.assertEqual("art001", result.article_id)
self.assertEqual("Updated Article", result.article_title)
# Verify the request
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge/art001", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual("Updated Article", kwargs["json"]["short_description"])
self.assertEqual("This is an updated article content", kwargs["json"]["text"])
self.assertEqual("cat002", kwargs["json"]["kb_category"])
self.assertEqual("updated,article,knowledge", kwargs["json"]["keywords"])
@patch("servicenow_mcp.tools.knowledge_base.requests.patch")
def test_publish_article(self, mock_patch):
"""Test publishing a knowledge article."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "art001",
"short_description": "Test Article",
"workflow_state": "published",
}
}
mock_response.status_code = 200
mock_patch.return_value = mock_response
# Call the method
params = PublishArticleParams(
article_id="art001",
workflow_state="published"
)
result = publish_article(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result.success)
self.assertEqual("art001", result.article_id)
self.assertEqual("Test Article", result.article_title)
self.assertEqual("published", result.workflow_state)
# Verify the request
mock_patch.assert_called_once()
args, kwargs = mock_patch.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge/art001", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual("published", kwargs["json"]["workflow_state"])
@patch("servicenow_mcp.tools.knowledge_base.requests.get")
def test_list_articles(self, mock_get):
"""Test listing knowledge articles."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "art001",
"short_description": "Test Article 1",
"kb_knowledge_base": {"display_value": "IT Knowledge Base"},
"kb_category": {"display_value": "Network"},
"workflow_state": {"display_value": "Published"},
"sys_created_on": "2023-01-01 00:00:00",
"sys_updated_on": "2023-01-02 00:00:00",
},
{
"sys_id": "art002",
"short_description": "Test Article 2",
"kb_knowledge_base": {"display_value": "IT Knowledge Base"},
"kb_category": {"display_value": "Software"},
"workflow_state": {"display_value": "Draft"},
"sys_created_on": "2023-01-03 00:00:00",
"sys_updated_on": "2023-01-04 00:00:00",
}
]
}
mock_response.status_code = 200
mock_get.return_value = mock_response
# Call the method
params = ListArticlesParams(
limit=10,
offset=0,
knowledge_base="kb001",
category="cat001",
workflow_state="published",
query="network"
)
result = list_articles(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(2, len(result["articles"]))
self.assertEqual("art001", result["articles"][0]["id"])
self.assertEqual("Test Article 1", result["articles"][0]["title"])
self.assertEqual("IT Knowledge Base", result["articles"][0]["knowledge_base"])
self.assertEqual("Network", result["articles"][0]["category"])
self.assertEqual("Published", result["articles"][0]["workflow_state"])
# Verify the request
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual(10, kwargs["params"]["sysparm_limit"])
self.assertEqual(0, kwargs["params"]["sysparm_offset"])
self.assertEqual("all", kwargs["params"]["sysparm_display_value"])
# Verify the query syntax contains the correct pattern
self.assertIn("sysparm_query", kwargs["params"])
query = kwargs["params"]["sysparm_query"]
self.assertIn("kb_knowledge_base.sys_id=kb001", query)
self.assertIn("kb_category.sys_id=cat001", query)
@patch("servicenow_mcp.tools.knowledge_base.requests.get")
def test_get_article(self, mock_get):
"""Test getting a knowledge article."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": {
"sys_id": "art001",
"short_description": "Test Article",
"text": "This is a test article content",
"kb_knowledge_base": {"display_value": "IT Knowledge Base"},
"kb_category": {"display_value": "Network"},
"workflow_state": {"display_value": "Published"},
"sys_created_on": "2023-01-01 00:00:00",
"sys_updated_on": "2023-01-02 00:00:00",
"author": {"display_value": "admin"},
"keywords": "test,article,knowledge",
"article_type": "text",
"view_count": "42"
}
}
mock_response.status_code = 200
mock_get.return_value = mock_response
# Call the method
params = GetArticleParams(article_id="art001")
result = get_article(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual("art001", result["article"]["id"])
self.assertEqual("Test Article", result["article"]["title"])
self.assertEqual("This is a test article content", result["article"]["text"])
self.assertEqual("IT Knowledge Base", result["article"]["knowledge_base"])
self.assertEqual("Network", result["article"]["category"])
self.assertEqual("Published", result["article"]["workflow_state"])
self.assertEqual("admin", result["article"]["author"])
self.assertEqual("test,article,knowledge", result["article"]["keywords"])
self.assertEqual("text", result["article"]["article_type"])
self.assertEqual("42", result["article"]["views"])
# Verify the request
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge/art001", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual("true", kwargs["params"]["sysparm_display_value"])
@patch("servicenow_mcp.tools.knowledge_base.requests.post")
def test_create_knowledge_base_error(self, mock_post):
"""Test error handling when creating a knowledge base."""
# Mock error response
mock_post.side_effect = requests.RequestException("API error")
# Call the method
params = CreateKnowledgeBaseParams(title="Test Knowledge Base")
result = create_knowledge_base(self.server_config, self.auth_manager, params)
# Verify the result
self.assertFalse(result.success)
self.assertIn("Failed to create knowledge base", result.message)
@patch("servicenow_mcp.tools.knowledge_base.requests.get")
def test_get_article_not_found(self, mock_get):
"""Test getting a non-existent article."""
# Mock empty response
mock_response = MagicMock()
mock_response.json.return_value = {"result": {}}
mock_response.status_code = 200
mock_get.return_value = mock_response
# Call the method
params = GetArticleParams(article_id="nonexistent")
result = get_article(self.server_config, self.auth_manager, params)
# Verify the result
self.assertFalse(result["success"])
self.assertIn("not found", result["message"])
@patch("servicenow_mcp.tools.knowledge_base.requests.get")
def test_list_knowledge_bases(self, mock_get):
"""Test listing knowledge bases."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "kb001",
"title": "IT Knowledge Base",
"description": "Knowledge base for IT resources",
"owner": {"display_value": "admin"},
"kb_managers": {"display_value": "it_managers"},
"active": "true",
"sys_created_on": "2023-01-01 00:00:00",
"sys_updated_on": "2023-01-02 00:00:00",
},
{
"sys_id": "kb002",
"title": "HR Knowledge Base",
"description": "Knowledge base for HR resources",
"owner": {"display_value": "hr_admin"},
"kb_managers": {"display_value": "hr_managers"},
"active": "true",
"sys_created_on": "2023-01-03 00:00:00",
"sys_updated_on": "2023-01-04 00:00:00",
}
]
}
mock_response.status_code = 200
mock_get.return_value = mock_response
# Call the method
params = ListKnowledgeBasesParams(
limit=10,
offset=0,
active=True,
query="IT"
)
result = list_knowledge_bases(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(2, len(result["knowledge_bases"]))
self.assertEqual("kb001", result["knowledge_bases"][0]["id"])
self.assertEqual("IT Knowledge Base", result["knowledge_bases"][0]["title"])
self.assertEqual("Knowledge base for IT resources", result["knowledge_bases"][0]["description"])
self.assertEqual("admin", result["knowledge_bases"][0]["owner"])
self.assertEqual("it_managers", result["knowledge_bases"][0]["managers"])
self.assertTrue(result["knowledge_bases"][0]["active"])
# Verify the request
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_knowledge_base", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual(10, kwargs["params"]["sysparm_limit"])
self.assertEqual(0, kwargs["params"]["sysparm_offset"])
self.assertEqual("true", kwargs["params"]["sysparm_display_value"])
self.assertEqual("active=true^titleLIKEIT^ORdescriptionLIKEIT", kwargs["params"]["sysparm_query"])
@patch("servicenow_mcp.tools.knowledge_base.requests.get")
def test_list_categories(self, mock_get):
"""Test listing categories in a knowledge base."""
# Mock response
mock_response = MagicMock()
mock_response.json.return_value = {
"result": [
{
"sys_id": "cat001",
"label": "Network Troubleshooting",
"description": "Articles for network troubleshooting",
"kb_knowledge_base": {"display_value": "IT Knowledge Base"},
"parent": {"display_value": ""},
"active": "true",
"sys_created_on": "2023-01-01 00:00:00",
"sys_updated_on": "2023-01-02 00:00:00",
},
{
"sys_id": "cat002",
"label": "Software Setup",
"description": "Articles for software installation",
"kb_knowledge_base": {"display_value": "IT Knowledge Base"},
"parent": {"display_value": ""},
"active": "true",
"sys_created_on": "2023-01-03 00:00:00",
"sys_updated_on": "2023-01-04 00:00:00",
}
]
}
mock_response.status_code = 200
mock_get.return_value = mock_response
# Call the method
params = ListCategoriesParams(
knowledge_base="kb001",
active=True,
query="Network"
)
result = list_categories(self.server_config, self.auth_manager, params)
# Verify the result
self.assertTrue(result["success"])
self.assertEqual(2, len(result["categories"]))
self.assertEqual("cat001", result["categories"][0]["id"])
self.assertEqual("Network Troubleshooting", result["categories"][0]["title"])
self.assertEqual("Articles for network troubleshooting", result["categories"][0]["description"])
self.assertEqual("IT Knowledge Base", result["categories"][0]["knowledge_base"])
self.assertEqual("", result["categories"][0]["parent_category"])
self.assertTrue(result["categories"][0]["active"])
# Verify the request
mock_get.assert_called_once()
args, kwargs = mock_get.call_args
self.assertEqual(f"{self.server_config.api_url}/table/kb_category", args[0])
self.assertEqual(self.auth_manager.get_headers(), kwargs["headers"])
self.assertEqual(10, kwargs["params"]["sysparm_limit"])
self.assertEqual(0, kwargs["params"]["sysparm_offset"])
self.assertEqual("all", kwargs["params"]["sysparm_display_value"])
# Verify the query syntax contains the correct pattern
self.assertIn("sysparm_query", kwargs["params"])
query = kwargs["params"]["sysparm_query"]
self.assertIn("kb_knowledge_base.sys_id=kb001", query)
self.assertIn("active=true", query)
self.assertIn("labelLIKENetwork", query)
class TestKnowledgeBaseParams(unittest.TestCase):
"""Tests for the knowledge base parameter classes."""
def test_create_knowledge_base_params(self):
"""Test CreateKnowledgeBaseParams validation."""
# Minimal required parameters
params = CreateKnowledgeBaseParams(title="Test Knowledge Base")
self.assertEqual("Test Knowledge Base", params.title)
self.assertEqual("Knowledge - Instant Publish", params.publish_workflow)
# All parameters
params = CreateKnowledgeBaseParams(
title="Test Knowledge Base",
description="Test Description",
owner="admin",
managers="it_managers",
publish_workflow="Custom Workflow",
retire_workflow="Custom Retire Workflow"
)
self.assertEqual("Test Knowledge Base", params.title)
self.assertEqual("Test Description", params.description)
self.assertEqual("admin", params.owner)
self.assertEqual("it_managers", params.managers)
self.assertEqual("Custom Workflow", params.publish_workflow)
self.assertEqual("Custom Retire Workflow", params.retire_workflow)
def test_create_category_params(self):
"""Test CreateCategoryParams validation."""
# Required parameters
params = CreateCategoryParams(
title="Test Category",
knowledge_base="kb001"
)
self.assertEqual("Test Category", params.title)
self.assertEqual("kb001", params.knowledge_base)
self.assertTrue(params.active)
# All parameters
params = CreateCategoryParams(
title="Test Category",
description="Test Description",
knowledge_base="kb001",
parent_category="parent001",
active=False
)
self.assertEqual("Test Category", params.title)
self.assertEqual("Test Description", params.description)
self.assertEqual("kb001", params.knowledge_base)
self.assertEqual("parent001", params.parent_category)
self.assertFalse(params.active)
def test_create_article_params(self):
"""Test CreateArticleParams validation."""
# Required parameters
params = CreateArticleParams(
title="Test Article",
text="Test content",
short_description="Test short description",
knowledge_base="kb001",
category="cat001"
)
self.assertEqual("Test Article", params.title)
self.assertEqual("Test content", params.text)
self.assertEqual("Test short description", params.short_description)
self.assertEqual("kb001", params.knowledge_base)
self.assertEqual("cat001", params.category)
self.assertEqual("text", params.article_type)
# All parameters
params = CreateArticleParams(
title="Test Article",
text="Test content",
short_description="Test short description",
knowledge_base="kb001",
category="cat001",
keywords="test,article",
article_type="html"
)
self.assertEqual("Test Article", params.title)
self.assertEqual("Test content", params.text)
self.assertEqual("Test short description", params.short_description)
self.assertEqual("kb001", params.knowledge_base)
self.assertEqual("cat001", params.category)
self.assertEqual("test,article", params.keywords)
self.assertEqual("html", params.article_type)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/servicenow_mcp/tools/user_tools.py:
--------------------------------------------------------------------------------
```python
"""
User management tools for the ServiceNow MCP server.
This module provides tools for managing users and groups in ServiceNow.
"""
import logging
from typing import List, Optional
import requests
from pydantic import BaseModel, Field
from servicenow_mcp.auth.auth_manager import AuthManager
from servicenow_mcp.utils.config import ServerConfig
logger = logging.getLogger(__name__)
class CreateUserParams(BaseModel):
"""Parameters for creating a user."""
user_name: str = Field(..., description="Username for the user")
first_name: str = Field(..., description="First name of the user")
last_name: str = Field(..., description="Last name of the user")
email: str = Field(..., description="Email address of the user")
title: Optional[str] = Field(None, description="Job title of the user")
department: Optional[str] = Field(None, description="Department the user belongs to")
manager: Optional[str] = Field(None, description="Manager of the user (sys_id or username)")
roles: Optional[List[str]] = Field(None, description="Roles to assign to the user")
phone: Optional[str] = Field(None, description="Phone number of the user")
mobile_phone: Optional[str] = Field(None, description="Mobile phone number of the user")
location: Optional[str] = Field(None, description="Location of the user")
password: Optional[str] = Field(None, description="Password for the user account")
active: Optional[bool] = Field(True, description="Whether the user account is active")
class UpdateUserParams(BaseModel):
"""Parameters for updating a user."""
user_id: str = Field(..., description="User ID or sys_id to update")
user_name: Optional[str] = Field(None, description="Username for the user")
first_name: Optional[str] = Field(None, description="First name of the user")
last_name: Optional[str] = Field(None, description="Last name of the user")
email: Optional[str] = Field(None, description="Email address of the user")
title: Optional[str] = Field(None, description="Job title of the user")
department: Optional[str] = Field(None, description="Department the user belongs to")
manager: Optional[str] = Field(None, description="Manager of the user (sys_id or username)")
roles: Optional[List[str]] = Field(None, description="Roles to assign to the user")
phone: Optional[str] = Field(None, description="Phone number of the user")
mobile_phone: Optional[str] = Field(None, description="Mobile phone number of the user")
location: Optional[str] = Field(None, description="Location of the user")
password: Optional[str] = Field(None, description="Password for the user account")
active: Optional[bool] = Field(None, description="Whether the user account is active")
class GetUserParams(BaseModel):
"""Parameters for getting a user."""
user_id: Optional[str] = Field(None, description="User ID or sys_id")
user_name: Optional[str] = Field(None, description="Username of the user")
email: Optional[str] = Field(None, description="Email address of the user")
class ListUsersParams(BaseModel):
"""Parameters for listing users."""
limit: int = Field(10, description="Maximum number of users to return")
offset: int = Field(0, description="Offset for pagination")
active: Optional[bool] = Field(None, description="Filter by active status")
department: Optional[str] = Field(None, description="Filter by department")
query: Optional[str] = Field(
None,
description="Case-insensitive search term that matches against name, username, or email fields. Uses ServiceNow's LIKE operator for partial matching.",
)
class CreateGroupParams(BaseModel):
"""Parameters for creating a group."""
name: str = Field(..., description="Name of the group")
description: Optional[str] = Field(None, description="Description of the group")
manager: Optional[str] = Field(None, description="Manager of the group (sys_id or username)")
parent: Optional[str] = Field(None, description="Parent group (sys_id or name)")
type: Optional[str] = Field(None, description="Type of the group")
email: Optional[str] = Field(None, description="Email address for the group")
members: Optional[List[str]] = Field(
None, description="List of user sys_ids or usernames to add as members"
)
active: Optional[bool] = Field(True, description="Whether the group is active")
class UpdateGroupParams(BaseModel):
"""Parameters for updating a group."""
group_id: str = Field(..., description="Group ID or sys_id to update")
name: Optional[str] = Field(None, description="Name of the group")
description: Optional[str] = Field(None, description="Description of the group")
manager: Optional[str] = Field(None, description="Manager of the group (sys_id or username)")
parent: Optional[str] = Field(None, description="Parent group (sys_id or name)")
type: Optional[str] = Field(None, description="Type of the group")
email: Optional[str] = Field(None, description="Email address for the group")
active: Optional[bool] = Field(None, description="Whether the group is active")
class AddGroupMembersParams(BaseModel):
"""Parameters for adding members to a group."""
group_id: str = Field(..., description="Group ID or sys_id")
members: List[str] = Field(
..., description="List of user sys_ids or usernames to add as members"
)
class RemoveGroupMembersParams(BaseModel):
"""Parameters for removing members from a group."""
group_id: str = Field(..., description="Group ID or sys_id")
members: List[str] = Field(
..., description="List of user sys_ids or usernames to remove as members"
)
class ListGroupsParams(BaseModel):
"""Parameters for listing groups."""
limit: int = Field(10, description="Maximum number of groups to return")
offset: int = Field(0, description="Offset for pagination")
active: Optional[bool] = Field(None, description="Filter by active status")
query: Optional[str] = Field(
None,
description="Case-insensitive search term that matches against group name or description fields. Uses ServiceNow's LIKE operator for partial matching.",
)
type: Optional[str] = Field(None, description="Filter by group type")
class UserResponse(BaseModel):
"""Response from user operations."""
success: bool = Field(..., description="Whether the operation was successful")
message: str = Field(..., description="Message describing the result")
user_id: Optional[str] = Field(None, description="ID of the affected user")
user_name: Optional[str] = Field(None, description="Username of the affected user")
class GroupResponse(BaseModel):
"""Response from group operations."""
success: bool = Field(..., description="Whether the operation was successful")
message: str = Field(..., description="Message describing the result")
group_id: Optional[str] = Field(None, description="ID of the affected group")
group_name: Optional[str] = Field(None, description="Name of the affected group")
def create_user(
config: ServerConfig,
auth_manager: AuthManager,
params: CreateUserParams,
) -> UserResponse:
"""
Create a new user in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for creating the user.
Returns:
Response with the created user details.
"""
api_url = f"{config.api_url}/table/sys_user"
# Build request data
data = {
"user_name": params.user_name,
"first_name": params.first_name,
"last_name": params.last_name,
"email": params.email,
"active": str(params.active).lower(),
}
if params.title:
data["title"] = params.title
if params.department:
data["department"] = params.department
if params.manager:
data["manager"] = params.manager
if params.phone:
data["phone"] = params.phone
if params.mobile_phone:
data["mobile_phone"] = params.mobile_phone
if params.location:
data["location"] = params.location
if params.password:
data["user_password"] = params.password
# Make request
try:
response = requests.post(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
# Handle role assignments if provided
if params.roles and result.get("sys_id"):
assign_roles_to_user(config, auth_manager, result.get("sys_id"), params.roles)
return UserResponse(
success=True,
message="User created successfully",
user_id=result.get("sys_id"),
user_name=result.get("user_name"),
)
except requests.RequestException as e:
logger.error(f"Failed to create user: {e}")
return UserResponse(
success=False,
message=f"Failed to create user: {str(e)}",
)
def update_user(
config: ServerConfig,
auth_manager: AuthManager,
params: UpdateUserParams,
) -> UserResponse:
"""
Update an existing user in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for updating the user.
Returns:
Response with the updated user details.
"""
api_url = f"{config.api_url}/table/sys_user/{params.user_id}"
# Build request data
data = {}
if params.user_name:
data["user_name"] = params.user_name
if params.first_name:
data["first_name"] = params.first_name
if params.last_name:
data["last_name"] = params.last_name
if params.email:
data["email"] = params.email
if params.title:
data["title"] = params.title
if params.department:
data["department"] = params.department
if params.manager:
data["manager"] = params.manager
if params.phone:
data["phone"] = params.phone
if params.mobile_phone:
data["mobile_phone"] = params.mobile_phone
if params.location:
data["location"] = params.location
if params.password:
data["user_password"] = params.password
if params.active is not None:
data["active"] = str(params.active).lower()
# Make request
try:
response = requests.patch(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
# Handle role assignments if provided
if params.roles:
assign_roles_to_user(config, auth_manager, params.user_id, params.roles)
return UserResponse(
success=True,
message="User updated successfully",
user_id=result.get("sys_id"),
user_name=result.get("user_name"),
)
except requests.RequestException as e:
logger.error(f"Failed to update user: {e}")
return UserResponse(
success=False,
message=f"Failed to update user: {str(e)}",
)
def get_user(
config: ServerConfig,
auth_manager: AuthManager,
params: GetUserParams,
) -> dict:
"""
Get a user from ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for getting the user.
Returns:
Dictionary containing user details.
"""
api_url = f"{config.api_url}/table/sys_user"
query_params = {}
# Build query parameters
if params.user_id:
query_params["sysparm_query"] = f"sys_id={params.user_id}"
elif params.user_name:
query_params["sysparm_query"] = f"user_name={params.user_name}"
elif params.email:
query_params["sysparm_query"] = f"email={params.email}"
else:
return {"success": False, "message": "At least one search parameter is required"}
query_params["sysparm_limit"] = "1"
query_params["sysparm_display_value"] = "true"
# Make request
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
if not result:
return {"success": False, "message": "User not found"}
return {"success": True, "message": "User found", "user": result[0]}
except requests.RequestException as e:
logger.error(f"Failed to get user: {e}")
return {"success": False, "message": f"Failed to get user: {str(e)}"}
def list_users(
config: ServerConfig,
auth_manager: AuthManager,
params: ListUsersParams,
) -> dict:
"""
List users from ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for listing users.
Returns:
Dictionary containing list of users.
"""
api_url = f"{config.api_url}/table/sys_user"
query_params = {
"sysparm_limit": str(params.limit),
"sysparm_offset": str(params.offset),
"sysparm_display_value": "true",
}
# Build query
query_parts = []
if params.active is not None:
query_parts.append(f"active={str(params.active).lower()}")
if params.department:
query_parts.append(f"department={params.department}")
if params.query:
query_parts.append(
f"^nameLIKE{params.query}^ORuser_nameLIKE{params.query}^ORemailLIKE{params.query}"
)
if query_parts:
query_params["sysparm_query"] = "^".join(query_parts)
# Make request
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
return {
"success": True,
"message": f"Found {len(result)} users",
"users": result,
"count": len(result),
}
except requests.RequestException as e:
logger.error(f"Failed to list users: {e}")
return {"success": False, "message": f"Failed to list users: {str(e)}"}
def list_groups(
config: ServerConfig,
auth_manager: AuthManager,
params: ListGroupsParams,
) -> dict:
"""
List groups from ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for listing groups.
Returns:
Dictionary containing list of groups.
"""
api_url = f"{config.api_url}/table/sys_user_group"
query_params = {
"sysparm_limit": str(params.limit),
"sysparm_offset": str(params.offset),
"sysparm_display_value": "true",
}
# Build query
query_parts = []
if params.active is not None:
query_parts.append(f"active={str(params.active).lower()}")
if params.type:
query_parts.append(f"type={params.type}")
if params.query:
query_parts.append(f"^nameLIKE{params.query}^ORdescriptionLIKE{params.query}")
if query_parts:
query_params["sysparm_query"] = "^".join(query_parts)
# Make request
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
return {
"success": True,
"message": f"Found {len(result)} groups",
"groups": result,
"count": len(result),
}
except requests.RequestException as e:
logger.error(f"Failed to list groups: {e}")
return {"success": False, "message": f"Failed to list groups: {str(e)}"}
def assign_roles_to_user(
config: ServerConfig,
auth_manager: AuthManager,
user_id: str,
roles: List[str],
) -> bool:
"""
Assign roles to a user in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
user_id: User ID or sys_id.
roles: List of roles to assign.
Returns:
Boolean indicating success.
"""
# For each role, create a user_role record
api_url = f"{config.api_url}/table/sys_user_has_role"
success = True
for role in roles:
# First check if the role exists
role_id = get_role_id(config, auth_manager, role)
if not role_id:
logger.warning(f"Role '{role}' not found, skipping assignment")
continue
# Check if the user already has this role
if check_user_has_role(config, auth_manager, user_id, role_id):
logger.info(f"User already has role '{role}', skipping assignment")
continue
# Create the user role assignment
data = {
"user": user_id,
"role": role_id,
}
try:
response = requests.post(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to assign role '{role}' to user: {e}")
success = False
return success
def get_role_id(
config: ServerConfig,
auth_manager: AuthManager,
role_name: str,
) -> Optional[str]:
"""
Get the sys_id of a role by its name.
Args:
config: Server configuration.
auth_manager: Authentication manager.
role_name: Name of the role.
Returns:
sys_id of the role if found, None otherwise.
"""
api_url = f"{config.api_url}/table/sys_user_role"
query_params = {
"sysparm_query": f"name={role_name}",
"sysparm_limit": "1",
}
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
if not result:
return None
return result[0].get("sys_id")
except requests.RequestException as e:
logger.error(f"Failed to get role ID: {e}")
return None
def check_user_has_role(
config: ServerConfig,
auth_manager: AuthManager,
user_id: str,
role_id: str,
) -> bool:
"""
Check if a user has a specific role.
Args:
config: Server configuration.
auth_manager: Authentication manager.
user_id: User ID or sys_id.
role_id: Role ID or sys_id.
Returns:
Boolean indicating whether the user has the role.
"""
api_url = f"{config.api_url}/table/sys_user_has_role"
query_params = {
"sysparm_query": f"user={user_id}^role={role_id}",
"sysparm_limit": "1",
}
try:
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
return len(result) > 0
except requests.RequestException as e:
logger.error(f"Failed to check if user has role: {e}")
return False
def create_group(
config: ServerConfig,
auth_manager: AuthManager,
params: CreateGroupParams,
) -> GroupResponse:
"""
Create a new group in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for creating the group.
Returns:
Response with the created group details.
"""
api_url = f"{config.api_url}/table/sys_user_group"
# Build request data
data = {
"name": params.name,
"active": str(params.active).lower(),
}
if params.description:
data["description"] = params.description
if params.manager:
data["manager"] = params.manager
if params.parent:
data["parent"] = params.parent
if params.type:
data["type"] = params.type
if params.email:
data["email"] = params.email
# Make request
try:
response = requests.post(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
group_id = result.get("sys_id")
# Add members if provided
if params.members and group_id:
add_group_members(
config,
auth_manager,
AddGroupMembersParams(group_id=group_id, members=params.members),
)
return GroupResponse(
success=True,
message="Group created successfully",
group_id=group_id,
group_name=result.get("name"),
)
except requests.RequestException as e:
logger.error(f"Failed to create group: {e}")
return GroupResponse(
success=False,
message=f"Failed to create group: {str(e)}",
)
def update_group(
config: ServerConfig,
auth_manager: AuthManager,
params: UpdateGroupParams,
) -> GroupResponse:
"""
Update an existing group in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for updating the group.
Returns:
Response with the updated group details.
"""
api_url = f"{config.api_url}/table/sys_user_group/{params.group_id}"
# Build request data
data = {}
if params.name:
data["name"] = params.name
if params.description:
data["description"] = params.description
if params.manager:
data["manager"] = params.manager
if params.parent:
data["parent"] = params.parent
if params.type:
data["type"] = params.type
if params.email:
data["email"] = params.email
if params.active is not None:
data["active"] = str(params.active).lower()
# Make request
try:
response = requests.patch(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", {})
return GroupResponse(
success=True,
message="Group updated successfully",
group_id=result.get("sys_id"),
group_name=result.get("name"),
)
except requests.RequestException as e:
logger.error(f"Failed to update group: {e}")
return GroupResponse(
success=False,
message=f"Failed to update group: {str(e)}",
)
def add_group_members(
config: ServerConfig,
auth_manager: AuthManager,
params: AddGroupMembersParams,
) -> GroupResponse:
"""
Add members to a group in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for adding members to the group.
Returns:
Response with the result of the operation.
"""
api_url = f"{config.api_url}/table/sys_user_grmember"
success = True
failed_members = []
for member in params.members:
# Get user ID if username is provided
user_id = member
if not member.startswith("sys_id:"):
user = get_user(config, auth_manager, GetUserParams(user_name=member))
if not user.get("success"):
user = get_user(config, auth_manager, GetUserParams(email=member))
if user.get("success"):
user_id = user.get("user", {}).get("sys_id")
else:
success = False
failed_members.append(member)
continue
# Create group membership
data = {
"group": params.group_id,
"user": user_id,
}
try:
response = requests.post(
api_url,
json=data,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to add member '{member}' to group: {e}")
success = False
failed_members.append(member)
if failed_members:
message = f"Some members could not be added to the group: {', '.join(failed_members)}"
else:
message = "All members added to the group successfully"
return GroupResponse(
success=success,
message=message,
group_id=params.group_id,
)
def remove_group_members(
config: ServerConfig,
auth_manager: AuthManager,
params: RemoveGroupMembersParams,
) -> GroupResponse:
"""
Remove members from a group in ServiceNow.
Args:
config: Server configuration.
auth_manager: Authentication manager.
params: Parameters for removing members from the group.
Returns:
Response with the result of the operation.
"""
success = True
failed_members = []
for member in params.members:
# Get user ID if username is provided
user_id = member
if not member.startswith("sys_id:"):
user = get_user(config, auth_manager, GetUserParams(user_name=member))
if not user.get("success"):
user = get_user(config, auth_manager, GetUserParams(email=member))
if user.get("success"):
user_id = user.get("user", {}).get("sys_id")
else:
success = False
failed_members.append(member)
continue
# Find and delete the group membership
api_url = f"{config.api_url}/table/sys_user_grmember"
query_params = {
"sysparm_query": f"group={params.group_id}^user={user_id}",
"sysparm_limit": "1",
}
try:
# First find the membership record
response = requests.get(
api_url,
params=query_params,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
result = response.json().get("result", [])
if not result:
success = False
failed_members.append(member)
continue
# Then delete the membership record
membership_id = result[0].get("sys_id")
delete_url = f"{api_url}/{membership_id}"
response = requests.delete(
delete_url,
headers=auth_manager.get_headers(),
timeout=config.timeout,
)
response.raise_for_status()
except requests.RequestException as e:
logger.error(f"Failed to remove member '{member}' from group: {e}")
success = False
failed_members.append(member)
if failed_members:
message = f"Some members could not be removed from the group: {', '.join(failed_members)}"
else:
message = "All members removed from the group successfully"
return GroupResponse(
success=success,
message=message,
group_id=params.group_id,
)
```