This is page 2 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/decorators.py:
--------------------------------------------------------------------------------
```python
import logging
from collections.abc import Awaitable, Callable
from functools import wraps
from typing import Any, TypeVar
import requests
from fastmcp import Context
from requests.exceptions import HTTPError
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
logger = logging.getLogger(__name__)
F = TypeVar("F", bound=Callable[..., Awaitable[Any]])
def check_write_access(func: F) -> F:
"""
Decorator for FastMCP tools to check if the application is in read-only mode.
If in read-only mode, it raises a ValueError.
Assumes the decorated function is async and has `ctx: Context` as its first argument.
"""
@wraps(func)
async def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any:
lifespan_ctx_dict = ctx.request_context.lifespan_context
app_lifespan_ctx = (
lifespan_ctx_dict.get("app_lifespan_context")
if isinstance(lifespan_ctx_dict, dict)
else None
) # type: ignore
if app_lifespan_ctx is not None and app_lifespan_ctx.read_only:
tool_name = func.__name__
action_description = tool_name.replace(
"_", " "
) # e.g., "create_issue" -> "create issue"
logger.warning(f"Attempted to call tool '{tool_name}' in read-only mode.")
raise ValueError(f"Cannot {action_description} in read-only mode.")
return await func(ctx, *args, **kwargs)
return wrapper # type: ignore
def handle_atlassian_api_errors(service_name: str = "Atlassian API") -> Callable:
"""
Decorator to handle common Atlassian API exceptions (Jira, Confluence, etc.).
Args:
service_name: Name of the service for error logging (e.g., "Jira API").
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
try:
return func(self, *args, **kwargs)
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for {service_name} "
f"({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
operation_name = getattr(func, "__name__", "API operation")
logger.error(
f"HTTP error during {operation_name}: {http_err}",
exc_info=False,
)
raise http_err
except KeyError as e:
operation_name = getattr(func, "__name__", "API operation")
logger.error(f"Missing key in {operation_name} results: {str(e)}")
return []
except requests.RequestException as e:
operation_name = getattr(func, "__name__", "API operation")
logger.error(f"Network error during {operation_name}: {str(e)}")
return []
except (ValueError, TypeError) as e:
operation_name = getattr(func, "__name__", "API operation")
logger.error(f"Error processing {operation_name} results: {str(e)}")
return []
except Exception as e: # noqa: BLE001 - Intentional fallback with logging
operation_name = getattr(func, "__name__", "API operation")
logger.error(f"Unexpected error during {operation_name}: {str(e)}")
logger.debug(
f"Full exception details for {operation_name}:", exc_info=True
)
return []
return wrapper
return decorator
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/base.py:
--------------------------------------------------------------------------------
```python
"""
Base models and utility classes for the MCP Atlassian API models.
This module provides base classes and mixins that are used by the
Jira and Confluence models to ensure consistent behavior and reduce
code duplication.
"""
from datetime import datetime
from typing import Any, TypeVar
from pydantic import BaseModel
from .constants import EMPTY_STRING
# Type variable for the return type of from_api_response
T = TypeVar("T", bound="ApiModel")
class ApiModel(BaseModel):
"""
Base model for all API models with common conversion methods.
This provides a standard interface for converting API responses
to models and for converting models to simplified dictionaries
for API responses.
"""
@classmethod
def from_api_response(cls: type[T], data: dict[str, Any], **kwargs: Any) -> T:
"""
Convert an API response to a model instance.
Args:
data: The API response data
**kwargs: Additional context parameters
Returns:
An instance of the model
Raises:
NotImplementedError: If the subclass does not implement this method
"""
raise NotImplementedError("Subclasses must implement from_api_response")
def to_simplified_dict(self) -> dict[str, Any]:
"""
Convert the model to a simplified dictionary for API responses.
Returns:
A dictionary with only the essential fields for API responses
"""
return self.model_dump(exclude_none=True)
class TimestampMixin:
"""
Mixin for handling Atlassian API timestamp formats.
"""
@staticmethod
def format_timestamp(timestamp: str | None) -> str:
"""
Format an Atlassian timestamp to a human-readable format.
Args:
timestamp: An ISO 8601 timestamp string
Returns:
A formatted date string or empty string if the input is invalid
"""
if not timestamp:
return EMPTY_STRING
try:
# Parse ISO 8601 format like "2024-01-01T10:00:00.000+0000"
# Convert Z format to +00:00 for compatibility with fromisoformat
ts = timestamp.replace("Z", "+00:00")
# Handle timezone format without colon (+0000 -> +00:00)
if "+" in ts and ":" not in ts[-5:]:
tz_pos = ts.rfind("+")
if tz_pos != -1 and len(ts) >= tz_pos + 5:
ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]
elif "-" in ts and ":" not in ts[-5:]:
tz_pos = ts.rfind("-")
if tz_pos != -1 and len(ts) >= tz_pos + 5:
ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]
dt = datetime.fromisoformat(ts)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, TypeError):
return timestamp or EMPTY_STRING
@staticmethod
def is_valid_timestamp(timestamp: str | None) -> bool:
"""
Check if a string is a valid ISO 8601 timestamp.
Args:
timestamp: The string to check
Returns:
True if the string is a valid timestamp, False otherwise
"""
if not timestamp:
return False
try:
# Convert Z format to +00:00 for compatibility with fromisoformat
ts = timestamp.replace("Z", "+00:00")
# Handle timezone format without colon (+0000 -> +00:00)
if "+" in ts and ":" not in ts[-5:]:
tz_pos = ts.rfind("+")
if tz_pos != -1 and len(ts) >= tz_pos + 5:
ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]
elif "-" in ts and ":" not in ts[-5:]:
tz_pos = ts.rfind("-")
if tz_pos != -1 and len(ts) >= tz_pos + 5:
ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]
datetime.fromisoformat(ts)
return True
except (ValueError, TypeError):
return False
```
--------------------------------------------------------------------------------
/tests/utils/factories.py:
--------------------------------------------------------------------------------
```python
"""Test data factories for creating consistent test objects."""
from typing import Any
class JiraIssueFactory:
"""Factory for creating Jira issue test data."""
@staticmethod
def create(key: str = "TEST-123", **overrides) -> dict[str, Any]:
"""Create a Jira issue with default values."""
defaults = {
"id": "12345",
"key": key,
"self": f"https://test.atlassian.net/rest/api/3/issue/{key}",
"fields": {
"summary": "Test Issue Summary",
"description": "Test issue description",
"status": {"name": "Open", "id": "1", "statusCategory": {"key": "new"}},
"issuetype": {"name": "Task", "id": "10001"},
"priority": {"name": "Medium", "id": "3"},
"assignee": {
"displayName": "Test User",
"emailAddress": "[email protected]",
},
"created": "2023-01-01T12:00:00.000+0000",
"updated": "2023-01-01T12:00:00.000+0000",
},
}
return deep_merge(defaults, overrides)
@staticmethod
def create_minimal(key: str = "TEST-123") -> dict[str, Any]:
"""Create minimal Jira issue for basic tests."""
return {
"key": key,
"fields": {"summary": "Test Issue", "status": {"name": "Open"}},
}
class ConfluencePageFactory:
"""Factory for creating Confluence page test data."""
@staticmethod
def create(page_id: str = "123456", **overrides) -> dict[str, Any]:
"""Create a Confluence page with default values."""
defaults = {
"id": page_id,
"title": "Test Page",
"type": "page",
"status": "current",
"space": {"key": "TEST", "name": "Test Space"},
"body": {
"storage": {"value": "<p>Test content</p>", "representation": "storage"}
},
"version": {"number": 1},
"_links": {
"webui": f"/spaces/TEST/pages/{page_id}",
"self": f"https://test.atlassian.net/wiki/rest/api/content/{page_id}",
},
}
return deep_merge(defaults, overrides)
class AuthConfigFactory:
"""Factory for authentication configuration objects."""
@staticmethod
def create_oauth_config(**overrides) -> dict[str, str]:
"""Create OAuth configuration."""
defaults = {
"client_id": "test-client-id",
"client_secret": "test-client-secret",
"redirect_uri": "http://localhost:8080/callback",
"scope": "read:jira-work write:jira-work",
"cloud_id": "test-cloud-id",
"access_token": "test-access-token",
"refresh_token": "test-refresh-token",
}
return {**defaults, **overrides}
@staticmethod
def create_basic_auth_config(**overrides) -> dict[str, str]:
"""Create basic auth configuration."""
defaults = {
"url": "https://test.atlassian.net",
"username": "[email protected]",
"api_token": "test-api-token",
}
return {**defaults, **overrides}
class ErrorResponseFactory:
"""Factory for creating error response test data."""
@staticmethod
def create_api_error(
status_code: int = 400, message: str = "Bad Request"
) -> dict[str, Any]:
"""Create API error response."""
return {"errorMessages": [message], "errors": {}, "status": status_code}
@staticmethod
def create_auth_error() -> dict[str, Any]:
"""Create authentication error response."""
return {"errorMessages": ["Authentication failed"], "status": 401}
def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Deep merge two dictionaries."""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = deep_merge(result[key], value)
else:
result[key] = value
return result
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.yml:
--------------------------------------------------------------------------------
```yaml
name: "\U0001F41B Bug Report"
description: Create a report to help us improve mcp-atlassian
title: "[Bug]: "
labels: ["bug"]
body:
- type: markdown
attributes:
value: |
Thanks for taking the time to fill out this bug report! Please provide as much detail as possible.
- type: checkboxes
id: prerequisites
attributes:
label: Prerequisites
description: Please confirm the following before submitting the issue.
options:
- label: I have searched the [existing issues](https://github.com/sooperset/mcp-atlassian/issues) to make sure this bug has not already been reported.
required: true
- label: I have checked the [README](https://github.com/sooperset/mcp-atlassian/blob/main/README.md) for relevant information.
required: true
- type: textarea
id: description
attributes:
label: Bug Description
description: A clear and concise description of what the bug is.
placeholder: "When I call the `jira_create_issue` tool with..., it fails with..."
validations:
required: true
- type: textarea
id: steps-to-reproduce
attributes:
label: Steps to Reproduce
description: Provide detailed steps to reproduce the behavior.
placeholder: |
1. Start the server with command `...`
2. Send a `list_tools` request using `...`
3. Call the tool `xyz` with arguments `...`
4. See error `...`
validations:
required: true
- type: textarea
id: expected-behavior
attributes:
label: Expected Behavior
description: A clear and concise description of what you expected to happen.
placeholder: "I expected the Jira issue to be created successfully and return its key."
validations:
required: true
- type: textarea
id: actual-behavior
attributes:
label: Actual Behavior
description: What actually happened? Include full error messages, logs (from the server and the client if possible), or screenshots.
placeholder: "The server returned an error message: '...' / The tool call returned an empty list."
render: shell
validations:
required: true
- type: input
id: version
attributes:
label: mcp-atlassian Version
description: Which version of `mcp-atlassian` are you using? (Check `pip show mcp-atlassian` or `pyproject.toml`)
placeholder: "e.g., 0.6.5"
validations:
required: true
- type: dropdown
id: installation
attributes:
label: Installation Method
description: How did you install `mcp-atlassian`?
options:
- From PyPI (pip install mcp-atlassian / uv add mcp-atlassian)
- From source (git clone)
- Docker
- Other
validations:
required: true
- type: dropdown
id: os
attributes:
label: Operating System
description: What operating system are you using?
options:
- Windows
- macOS
- Linux (Specify distribution below if relevant)
- Other
validations:
required: true
- type: input
id: python-version
attributes:
label: Python Version
description: What version of Python are you using? (`python --version`)
placeholder: "e.g., 3.11.4"
validations:
required: true
- type: dropdown
id: atlassian-instance
attributes:
label: Atlassian Instance Type
description: Are you connecting to Atlassian Cloud or Server/Data Center?
multiple: true
options:
- Jira Cloud
- Jira Server / Data Center
- Confluence Cloud
- Confluence Server / Data Center
validations:
required: true
- type: input
id: client-app
attributes:
label: Client Application
description: What application/library are you using to interact with the MCP server? (This is important!)
placeholder: "e.g., Cursor, LangChain, custom script, Inspector Tool"
validations:
required: true
- type: textarea
id: additional-context
attributes:
label: Additional Context
description: Add any other context about the problem here (e.g., environment variables, network configuration like proxies, specific Jira/Confluence setup).
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/confluence/user_search.py:
--------------------------------------------------------------------------------
```python
"""
Confluence user search result models.
This module provides Pydantic models for Confluence user search results.
"""
import logging
from typing import Any
from pydantic import Field
from ..base import ApiModel, TimestampMixin
from .common import ConfluenceUser
logger = logging.getLogger(__name__)
class ConfluenceUserSearchResult(ApiModel):
"""
Model representing a single user search result.
"""
user: ConfluenceUser | None = None
title: str | None = None
excerpt: str | None = None
url: str | None = None
entity_type: str = "user"
last_modified: str | None = None
score: float = 0.0
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "ConfluenceUserSearchResult":
"""
Create a ConfluenceUserSearchResult from a Confluence API response.
Args:
data: The user search result data from the Confluence API
**kwargs: Additional context parameters
Returns:
A ConfluenceUserSearchResult instance
"""
if not data:
return cls()
# Extract user data from the result
user_data = data.get("user", {})
user = ConfluenceUser.from_api_response(user_data) if user_data else None
return cls(
user=user,
title=data.get("title"),
excerpt=data.get("excerpt"),
url=data.get("url"),
entity_type=data.get("entityType", "user"),
last_modified=data.get("lastModified"),
score=data.get("score", 0.0),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"entity_type": self.entity_type,
"title": self.title,
"score": self.score,
}
if self.user:
result["user"] = {
"account_id": self.user.account_id,
"display_name": self.user.display_name,
"email": self.user.email,
"profile_picture": self.user.profile_picture,
"is_active": self.user.is_active,
}
if self.url:
result["url"] = self.url
if self.last_modified:
result["last_modified"] = self.last_modified
if self.excerpt:
result["excerpt"] = self.excerpt
return result
class ConfluenceUserSearchResults(ApiModel, TimestampMixin):
"""
Model representing a collection of user search results.
"""
total_size: int = 0
start: int = 0
limit: int = 0
results: list[ConfluenceUserSearchResult] = Field(default_factory=list)
cql_query: str | None = None
search_duration: int | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "ConfluenceUserSearchResults":
"""
Create a ConfluenceUserSearchResults from a Confluence API response.
Args:
data: The search result data from the Confluence API
**kwargs: Additional context parameters
Returns:
A ConfluenceUserSearchResults instance
"""
if not data:
return cls()
# Convert search results to ConfluenceUserSearchResult models
results = []
for result_data in data.get("results", []):
user_result = ConfluenceUserSearchResult.from_api_response(
result_data, **kwargs
)
results.append(user_result)
return cls(
total_size=data.get("totalSize", 0),
start=data.get("start", 0),
limit=data.get("limit", 0),
results=results,
cql_query=data.get("cqlQuery"),
search_duration=data.get("searchDuration"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
return {
"total_size": self.total_size,
"start": self.start,
"limit": self.limit,
"cql_query": self.cql_query,
"search_duration": self.search_duration,
"results": [result.to_simplified_dict() for result in self.results],
}
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/agile.py:
--------------------------------------------------------------------------------
```python
"""
Jira agile models.
This module provides Pydantic models for Jira agile entities,
such as boards and sprints.
"""
import logging
from typing import Any
from ..base import ApiModel
from ..constants import (
EMPTY_STRING,
JIRA_DEFAULT_ID,
UNKNOWN,
)
logger = logging.getLogger(__name__)
class JiraBoard(ApiModel):
"""
Model representing a Jira board.
"""
id: str = JIRA_DEFAULT_ID
name: str = UNKNOWN
type: str = UNKNOWN
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraBoard":
"""
Create a JiraBoard from a Jira API response.
Args:
data: The board data from the Jira API
Returns:
A JiraBoard instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Ensure ID is a string
board_id = data.get("id", JIRA_DEFAULT_ID)
if board_id is not None:
board_id = str(board_id)
# We assume boards always have a name and type, but enforce strings
board_name = str(data.get("name", UNKNOWN))
board_type = str(data.get("type", UNKNOWN))
return cls(
id=board_id,
name=board_name,
type=board_type,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
return {
"id": self.id,
"name": self.name,
"type": self.type,
}
class JiraSprint(ApiModel):
"""
Model representing a Jira sprint.
"""
id: str = JIRA_DEFAULT_ID
state: str = UNKNOWN
name: str = UNKNOWN
start_date: str = EMPTY_STRING
end_date: str = EMPTY_STRING
activated_date: str = EMPTY_STRING
origin_board_id: str = JIRA_DEFAULT_ID
goal: str = EMPTY_STRING
synced: bool = False
auto_start_stop: bool = False
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraSprint":
"""
Create a JiraSprint from a Jira API response.
Args:
data: The sprint data from the Jira API
Returns:
A JiraSprint instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Ensure ID and origin board ID are strings
sprint_id = data.get("id", JIRA_DEFAULT_ID)
if sprint_id is not None:
sprint_id = str(sprint_id)
origin_board_id = data.get("originBoardId", JIRA_DEFAULT_ID)
if origin_board_id is not None:
origin_board_id = str(origin_board_id)
# Boolean fields
synced = bool(data.get("synced", False))
auto_start_stop = bool(data.get("autoStartStop", False))
return cls(
id=sprint_id,
state=str(data.get("state", UNKNOWN)),
name=str(data.get("name", UNKNOWN)),
start_date=str(data.get("startDate", EMPTY_STRING)),
end_date=str(data.get("endDate", EMPTY_STRING)),
activated_date=str(data.get("activatedDate", EMPTY_STRING)),
origin_board_id=origin_board_id,
goal=str(data.get("goal", EMPTY_STRING)),
synced=synced,
auto_start_stop=auto_start_stop,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"id": self.id,
"name": self.name,
"state": self.state,
}
if self.goal and self.goal != EMPTY_STRING:
result["goal"] = self.goal
# Only include dates if they're not empty
if self.start_date and self.start_date != EMPTY_STRING:
result["start_date"] = self.start_date
if self.end_date and self.end_date != EMPTY_STRING:
result["end_date"] = self.end_date
return result
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery.ai configuration for mcp-atlassian
startCommand:
type: stdio # Specifies the server communicates over standard input/output
configSchema:
# JSON Schema defining the configuration options users need to provide
type: object
required:
- jiraUrl
- confluenceUrl
# Add other strictly required credentials based on auth needs
properties:
# Confluence Config
confluenceUrl:
type: string
description: "Base URL for your Confluence instance (e.g., https://your-domain.atlassian.net/wiki or https://confluence.yourcompany.com)."
confluenceUsername:
type: string
description: "(Optional for Cloud Basic Auth) Your Confluence username or email."
confluenceApiToken:
type: string
description: "(Optional for Cloud Basic Auth) Your Confluence API token."
format: password # Hides the value in UI inputs
confluencePersonalToken:
type: string
description: "(Optional for Server/DC Token Auth) Your Confluence Personal Access Token."
format: password
confluenceSslVerify:
type: boolean
description: "(Optional, Server/DC only) Verify SSL certificate for Confluence. Defaults to true."
default: true
confluenceSpacesFilter:
type: string
description: "(Optional) Comma-separated list of Confluence space keys to limit searches to (e.g., 'DEV,QA')."
# Jira Config
jiraUrl:
type: string
description: "Base URL for your Jira instance (e.g., https://your-domain.atlassian.net or https://jira.yourcompany.com)."
jiraUsername:
type: string
description: "(Optional for Cloud Basic Auth or Server/DC Basic Auth) Your Jira username or email."
jiraApiToken:
type: string
description: "(Optional for Cloud Basic Auth or Server/DC Basic Auth) Your Jira API token."
format: password
jiraPersonalToken:
type: string
description: "(Optional for Server/DC Token Auth) Your Jira Personal Access Token."
format: password
jiraSslVerify:
type: boolean
description: "(Optional, Server/DC only) Verify SSL certificate for Jira. Defaults to true."
default: true
jiraProjectsFilter:
type: string
description: "(Optional) Comma-separated list of Jira project keys to limit searches to (e.g., 'PROJ1,PROJ2')."
# General Config
readOnlyMode:
type: boolean
description: "(Optional) Run in read-only mode (prevents create/update/delete). Defaults to false."
default: false
additionalProperties: false # Disallow properties not defined above
commandFunction:
# A JavaScript function that produces the CLI command and environment variables
# needed to start the MCP server, based on the user's configuration.
|-
(config) => {
// The command matches the ENTRYPOINT in the Dockerfile
const command = 'mcp-atlassian';
const args = []; // No arguments needed as config is via ENV
// Map the config schema properties to the environment variables
const env = {
// Confluence ENV VARS
CONFLUENCE_URL: config.confluenceUrl,
CONFLUENCE_USERNAME: config.confluenceUsername,
CONFLUENCE_API_TOKEN: config.confluenceApiToken,
CONFLUENCE_PERSONAL_TOKEN: config.confluencePersonalToken,
CONFLUENCE_SSL_VERIFY: config.confluenceSslVerify !== undefined ? String(config.confluenceSslVerify) : 'true',
CONFLUENCE_SPACES_FILTER: config.confluenceSpacesFilter,
// Jira ENV VARS
JIRA_URL: config.jiraUrl,
JIRA_USERNAME: config.jiraUsername,
JIRA_API_TOKEN: config.jiraApiToken,
JIRA_PERSONAL_TOKEN: config.jiraPersonalToken,
JIRA_SSL_VERIFY: config.jiraSslVerify !== undefined ? String(config.jiraSslVerify) : 'true',
JIRA_PROJECTS_FILTER: config.jiraProjectsFilter,
// General ENV VARS
READ_ONLY_MODE: config.readOnlyMode !== undefined ? String(config.readOnlyMode) : 'false',
};
// Filter out undefined/null env variables
const filteredEnv = Object.entries(env)
.filter(([key, value]) => value !== undefined && value !== null)
.reduce((obj, [key, value]) => {
obj[key] = value;
return obj;
}, {});
return {
command: command,
args: args,
env: filteredEnv
};
}
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/search.py:
--------------------------------------------------------------------------------
```python
"""Module for Confluence search operations."""
import logging
from ..models.confluence import (
ConfluencePage,
ConfluenceSearchResult,
ConfluenceUserSearchResult,
ConfluenceUserSearchResults,
)
from ..utils.decorators import handle_atlassian_api_errors
from .client import ConfluenceClient
from .utils import quote_cql_identifier_if_needed
logger = logging.getLogger("mcp-atlassian")
class SearchMixin(ConfluenceClient):
"""Mixin for Confluence search operations."""
@handle_atlassian_api_errors("Confluence API")
def search(
self, cql: str, limit: int = 10, spaces_filter: str | None = None
) -> list[ConfluencePage]:
"""
Search content using Confluence Query Language (CQL).
Args:
cql: Confluence Query Language string
limit: Maximum number of results to return
spaces_filter: Optional comma-separated list of space keys to filter by,
overrides config
Returns:
List of ConfluencePage models containing search results
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the
Confluence API (401/403)
"""
# Use spaces_filter parameter if provided, otherwise fall back to config
filter_to_use = spaces_filter or self.config.spaces_filter
# Apply spaces filter if present
if filter_to_use:
# Split spaces filter by commas and handle possible whitespace
spaces = [s.strip() for s in filter_to_use.split(",")]
# Build the space filter query part using proper quoting for each space key
space_query = " OR ".join(
[f"space = {quote_cql_identifier_if_needed(space)}" for space in spaces]
)
# Add the space filter to existing query with parentheses
if cql and space_query:
if "space = " not in cql: # Only add if not already filtering by space
cql = f"({cql}) AND ({space_query})"
else:
cql = space_query
logger.info(f"Applied spaces filter to query: {cql}")
# Execute the CQL search query
results = self.confluence.cql(cql=cql, limit=limit)
# Convert the response to a search result model
search_result = ConfluenceSearchResult.from_api_response(
results,
base_url=self.config.url,
cql_query=cql,
is_cloud=self.config.is_cloud,
)
# Process result excerpts as content
processed_pages = []
for page in search_result.results:
# Get the excerpt from the original search results
for result_item in results.get("results", []):
if result_item.get("content", {}).get("id") == page.id:
excerpt = result_item.get("excerpt", "")
if excerpt:
# Process the excerpt as HTML content
space_key = page.space.key if page.space else ""
_, processed_markdown = self.preprocessor.process_html_content(
excerpt,
space_key=space_key,
confluence_client=self.confluence,
)
# Create a new page with processed content
page.content = processed_markdown
break
processed_pages.append(page)
# Return the list of result pages with processed content
return processed_pages
@handle_atlassian_api_errors("Confluence API")
def search_user(
self, cql: str, limit: int = 10
) -> list[ConfluenceUserSearchResult]:
"""
Search users using Confluence Query Language (CQL).
Args:
cql: Confluence Query Language string for user search
limit: Maximum number of results to return
Returns:
List of ConfluenceUserSearchResult models containing user search results
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the
Confluence API (401/403)
"""
# Execute the user search query using the direct API endpoint
results = self.confluence.get(
"rest/api/search/user", params={"cql": cql, "limit": limit}
)
# Convert the response to a user search result model
search_result = ConfluenceUserSearchResults.from_api_response(results or {})
# Return the list of user search results
return search_result.results
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_labels.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the LabelsMixin class."""
from unittest.mock import patch
import pytest
import requests
from mcp_atlassian.confluence.labels import LabelsMixin
from mcp_atlassian.models.confluence import ConfluenceLabel
class TestLabelsMixin:
"""Tests for the LabelsMixin class."""
@pytest.fixture
def labels_mixin(self, confluence_client):
"""Create a LabelsMixin instance for testing."""
# LabelsMixin inherits from ConfluenceClient, so we need to create it properly
with patch(
"mcp_atlassian.confluence.labels.ConfluenceClient.__init__"
) as mock_init:
mock_init.return_value = None
mixin = LabelsMixin()
# Copy the necessary attributes from our mocked client
mixin.confluence = confluence_client.confluence
mixin.config = confluence_client.config
mixin.preprocessor = confluence_client.preprocessor
return mixin
def test_get_page_labels_success(self, labels_mixin):
"""Test get_page_labels with success response."""
# Setup
page_id = "12345"
# Call the method
result = labels_mixin.get_page_labels(page_id)
# Verify
labels_mixin.confluence.get_page_labels.assert_called_once_with(page_id=page_id)
assert len(result) == 3
assert result[0].id == "456789123"
assert result[0].prefix == "global"
assert result[0].label == "meeting-notes"
assert result[1].id == "456789124"
assert result[1].prefix == "my"
assert result[1].name == "important"
assert result[2].id == "456789125"
assert result[2].name == "test"
def test_get_page_labels_api_error(self, labels_mixin):
"""Test handling of API errors."""
# Mock the API to raise an exception
labels_mixin.confluence.get_page_labels.side_effect = requests.RequestException(
"API error"
)
# Act/Assert
with pytest.raises(Exception, match="Failed fetching labels"):
labels_mixin.get_page_labels("987654321")
def test_get_page_labels_key_error(self, labels_mixin):
"""Test handling of missing keys in API response."""
# Mock the response to be missing expected keys
labels_mixin.confluence.get_page_labels.return_value = {"invalid": "data"}
# Act/Assert
with pytest.raises(Exception, match="Failed fetching labels"):
labels_mixin.get_page_labels("987654321")
def test_get_page_labels_value_error(self, labels_mixin):
"""Test handling of unexpected data types."""
# Cause a value error by returning a string where a dict is expected
labels_mixin.confluence.get_page_labels.return_value = "invalid"
# Act/Assert
with pytest.raises(Exception, match="Failed fetching labels"):
labels_mixin.get_page_labels("987654321")
def test_get_page_labels_with_empty_results(self, labels_mixin):
"""Test handling of empty results."""
# Mock empty results
labels_mixin.confluence.get_page_labels.return_value = {"results": []}
# Act
result = labels_mixin.get_page_labels("987654321")
# Assert
assert isinstance(result, list)
assert len(result) == 0 # Empty list with no labels
def test_add_page_label_success(self, labels_mixin):
"""Test adding a label"""
# Arrange
page_id = "987654321"
name = "test-label"
prefix = "global"
# Mock add_page_label to return a list of ConfluenceLabels
with patch.object(
labels_mixin,
"get_page_labels",
return_value=ConfluenceLabel(
id="123456789",
name=name,
prefix=prefix,
),
):
# Act
result = labels_mixin.add_page_label(page_id, name)
# Assert
labels_mixin.confluence.set_page_label.assert_called_once_with(
page_id=page_id, label=name
)
# Verify result is a ConfluenceLabel
assert isinstance(result, ConfluenceLabel)
assert result.id == "123456789"
assert result.name == name
assert result.prefix == prefix
def test_add_page_label_error(self, labels_mixin):
"""Test error handling when adding a label."""
# Arrange
labels_mixin.confluence.set_page_label.side_effect = Exception("API Error")
# Act/Assert
with pytest.raises(Exception, match="Failed to add label"):
labels_mixin.add_page_label("987654321", "test")
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_custom_headers.py:
--------------------------------------------------------------------------------
```python
"""Tests for JIRA custom headers functionality."""
import os
from unittest.mock import MagicMock, patch
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
class TestJiraConfigCustomHeaders:
"""Test JiraConfig parsing of custom headers."""
def test_no_custom_headers(self):
"""Test JiraConfig when no custom headers are configured."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_user",
"JIRA_API_TOKEN": "test_token",
},
clear=True,
):
config = JiraConfig.from_env()
assert config.custom_headers == {}
def test_service_specific_headers_only(self):
"""Test JiraConfig parsing of service-specific headers only."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_user",
"JIRA_API_TOKEN": "test_token",
"JIRA_CUSTOM_HEADERS": "X-Jira-Specific=jira_value,X-Service=service_value",
},
clear=True,
):
config = JiraConfig.from_env()
expected = {"X-Jira-Specific": "jira_value", "X-Service": "service_value"}
assert config.custom_headers == expected
def test_malformed_headers_are_ignored(self):
"""Test that malformed headers are ignored gracefully."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_user",
"JIRA_API_TOKEN": "test_token",
"JIRA_CUSTOM_HEADERS": "malformed-header,X-Valid=valid_value,another-malformed",
},
clear=True,
):
config = JiraConfig.from_env()
expected = {"X-Valid": "valid_value"}
assert config.custom_headers == expected
def test_empty_header_strings(self):
"""Test handling of empty header strings."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_user",
"JIRA_API_TOKEN": "test_token",
"JIRA_CUSTOM_HEADERS": " ",
},
clear=True,
):
config = JiraConfig.from_env()
assert config.custom_headers == {}
class TestJiraClientCustomHeaders:
"""Test JiraClient custom headers application."""
def test_no_custom_headers_applied(self, monkeypatch):
"""Test that no headers are applied when none are configured."""
# Mock Jira and related dependencies
mock_jira = MagicMock()
mock_session = MagicMock()
mock_session.headers = {}
mock_jira._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification",
lambda **kwargs: None,
)
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test_user",
api_token="test_token",
custom_headers={},
)
client = JiraClient(config=config)
# Verify no custom headers were applied
assert mock_session.headers == {}
def test_custom_headers_applied_to_session(self, monkeypatch):
"""Test that custom headers are applied to the JIRA session."""
# Mock Jira and related dependencies
mock_jira = MagicMock()
mock_session = MagicMock()
mock_session.headers = {}
mock_jira._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification",
lambda **kwargs: None,
)
custom_headers = {
"X-Corp-Auth": "token123",
"X-Dept": "engineering",
"User-Agent": "CustomJiraClient/1.0",
}
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test_user",
api_token="test_token",
custom_headers=custom_headers,
)
client = JiraClient(config=config)
# Verify custom headers were applied to session
for header_name, header_value in custom_headers.items():
assert mock_session.headers[header_name] == header_value
```
--------------------------------------------------------------------------------
/tests/unit/models/test_constants.py:
--------------------------------------------------------------------------------
```python
"""Tests for model constants.
Focused tests for model constants, validating correct values and business logic.
"""
from mcp_atlassian.models.constants import (
# Confluence defaults
CONFLUENCE_DEFAULT_ID,
CONFLUENCE_DEFAULT_SPACE,
CONFLUENCE_DEFAULT_VERSION,
# Date/Time defaults
DEFAULT_TIMESTAMP,
# Common defaults
EMPTY_STRING,
# Jira defaults
JIRA_DEFAULT_ID,
JIRA_DEFAULT_ISSUE_TYPE,
JIRA_DEFAULT_KEY,
JIRA_DEFAULT_PRIORITY,
JIRA_DEFAULT_PROJECT,
JIRA_DEFAULT_STATUS,
NONE_VALUE,
UNASSIGNED,
UNKNOWN,
)
class TestCommonDefaults:
"""Test suite for common default constants."""
def test_string_constants_values(self):
"""Test that common string constants have expected values."""
assert EMPTY_STRING == ""
assert UNKNOWN == "Unknown"
assert UNASSIGNED == "Unassigned"
assert NONE_VALUE == "None"
def test_string_constants_types(self):
"""Test that all string constants are strings."""
assert isinstance(EMPTY_STRING, str)
assert isinstance(UNKNOWN, str)
assert isinstance(UNASSIGNED, str)
assert isinstance(NONE_VALUE, str)
class TestJiraDefaults:
"""Test suite for Jira default constants."""
def test_jira_id_and_key_values(self):
"""Test Jira ID and key default values."""
assert JIRA_DEFAULT_ID == "0"
assert JIRA_DEFAULT_KEY == "UNKNOWN-0"
assert JIRA_DEFAULT_PROJECT == "0"
def test_jira_default_dict_structures(self):
"""Test that Jira default dictionaries have correct structure."""
# Status
assert isinstance(JIRA_DEFAULT_STATUS, dict)
assert JIRA_DEFAULT_STATUS == {"name": UNKNOWN, "id": JIRA_DEFAULT_ID}
# Priority
assert isinstance(JIRA_DEFAULT_PRIORITY, dict)
assert JIRA_DEFAULT_PRIORITY == {"name": NONE_VALUE, "id": JIRA_DEFAULT_ID}
# Issue Type
assert isinstance(JIRA_DEFAULT_ISSUE_TYPE, dict)
assert JIRA_DEFAULT_ISSUE_TYPE == {"name": UNKNOWN, "id": JIRA_DEFAULT_ID}
def test_jira_key_format(self):
"""Test that Jira key follows expected format."""
parts = JIRA_DEFAULT_KEY.split("-")
assert len(parts) == 2
assert parts[0] == "UNKNOWN"
assert parts[1] == "0"
class TestConfluenceDefaults:
"""Test suite for Confluence default constants."""
def test_confluence_id_value(self):
"""Test Confluence default ID value."""
assert CONFLUENCE_DEFAULT_ID == "0"
def test_confluence_default_space_structure(self):
"""Test that Confluence default space has correct structure."""
assert isinstance(CONFLUENCE_DEFAULT_SPACE, dict)
expected_space = {
"key": EMPTY_STRING,
"name": UNKNOWN,
"id": CONFLUENCE_DEFAULT_ID,
}
assert CONFLUENCE_DEFAULT_SPACE == expected_space
def test_confluence_default_version_structure(self):
"""Test that Confluence default version has correct structure."""
assert isinstance(CONFLUENCE_DEFAULT_VERSION, dict)
expected_version = {"number": 0, "when": EMPTY_STRING}
assert CONFLUENCE_DEFAULT_VERSION == expected_version
assert isinstance(CONFLUENCE_DEFAULT_VERSION["number"], int)
class TestDateTimeDefaults:
"""Test suite for date/time default constants."""
def test_default_timestamp_format(self):
"""Test that DEFAULT_TIMESTAMP has expected format."""
assert DEFAULT_TIMESTAMP == "1970-01-01T00:00:00.000+0000"
assert isinstance(DEFAULT_TIMESTAMP, str)
assert DEFAULT_TIMESTAMP.startswith("1970-01-01T")
assert "+0000" in DEFAULT_TIMESTAMP
class TestCrossReferenceConsistency:
"""Test suite for consistency between related constants."""
def test_id_consistency(self):
"""Test that default IDs are consistent across structures."""
assert JIRA_DEFAULT_STATUS["id"] == JIRA_DEFAULT_ID
assert JIRA_DEFAULT_PRIORITY["id"] == JIRA_DEFAULT_ID
assert JIRA_DEFAULT_ISSUE_TYPE["id"] == JIRA_DEFAULT_ID
assert CONFLUENCE_DEFAULT_SPACE["id"] == CONFLUENCE_DEFAULT_ID
def test_semantic_usage_consistency(self):
"""Test that semantically similar fields use consistent values."""
# UNKNOWN used for required fields with unknown values
assert JIRA_DEFAULT_STATUS["name"] == UNKNOWN
assert JIRA_DEFAULT_ISSUE_TYPE["name"] == UNKNOWN
assert CONFLUENCE_DEFAULT_SPACE["name"] == UNKNOWN
# NONE_VALUE used for nullable/optional fields
assert JIRA_DEFAULT_PRIORITY["name"] == NONE_VALUE
# EMPTY_STRING used for optional string fields
assert CONFLUENCE_DEFAULT_SPACE["key"] == EMPTY_STRING
assert CONFLUENCE_DEFAULT_VERSION["when"] == EMPTY_STRING
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/environment.py:
--------------------------------------------------------------------------------
```python
"""Utility functions related to environment checking."""
import logging
import os
from .urls import is_atlassian_cloud_url
logger = logging.getLogger("mcp-atlassian.utils.environment")
def get_available_services() -> dict[str, bool | None]:
"""Determine which services are available based on environment variables."""
confluence_url = os.getenv("CONFLUENCE_URL")
confluence_is_setup = False
if confluence_url:
is_cloud = is_atlassian_cloud_url(confluence_url)
# OAuth check (highest precedence, applies to Cloud)
if all(
[
os.getenv("ATLASSIAN_OAUTH_CLIENT_ID"),
os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET"),
os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI"),
os.getenv("ATLASSIAN_OAUTH_SCOPE"),
os.getenv(
"ATLASSIAN_OAUTH_CLOUD_ID"
), # CLOUD_ID is essential for OAuth client init
]
):
confluence_is_setup = True
logger.info(
"Using Confluence OAuth 2.0 (3LO) authentication (Cloud-only features)"
)
elif all(
[
os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN"),
os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
]
):
confluence_is_setup = True
logger.info(
"Using Confluence OAuth 2.0 (3LO) authentication (Cloud-only features) "
"with provided access token"
)
elif is_cloud: # Cloud non-OAuth
if all(
[
os.getenv("CONFLUENCE_USERNAME"),
os.getenv("CONFLUENCE_API_TOKEN"),
]
):
confluence_is_setup = True
logger.info("Using Confluence Cloud Basic Authentication (API Token)")
else: # Server/Data Center non-OAuth
if os.getenv("CONFLUENCE_PERSONAL_TOKEN") or (
os.getenv("CONFLUENCE_USERNAME") and os.getenv("CONFLUENCE_API_TOKEN")
):
confluence_is_setup = True
logger.info(
"Using Confluence Server/Data Center authentication (PAT or Basic Auth)"
)
elif os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
confluence_is_setup = True
logger.info(
"Using Confluence minimal OAuth configuration - expecting user-provided tokens via headers"
)
jira_url = os.getenv("JIRA_URL")
jira_is_setup = False
if jira_url:
is_cloud = is_atlassian_cloud_url(jira_url)
# OAuth check (highest precedence, applies to Cloud)
if all(
[
os.getenv("ATLASSIAN_OAUTH_CLIENT_ID"),
os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET"),
os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI"),
os.getenv("ATLASSIAN_OAUTH_SCOPE"),
os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
]
):
jira_is_setup = True
logger.info(
"Using Jira OAuth 2.0 (3LO) authentication (Cloud-only features)"
)
elif all(
[
os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN"),
os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
]
):
jira_is_setup = True
logger.info(
"Using Jira OAuth 2.0 (3LO) authentication (Cloud-only features) "
"with provided access token"
)
elif is_cloud: # Cloud non-OAuth
if all(
[
os.getenv("JIRA_USERNAME"),
os.getenv("JIRA_API_TOKEN"),
]
):
jira_is_setup = True
logger.info("Using Jira Cloud Basic Authentication (API Token)")
else: # Server/Data Center non-OAuth
if os.getenv("JIRA_PERSONAL_TOKEN") or (
os.getenv("JIRA_USERNAME") and os.getenv("JIRA_API_TOKEN")
):
jira_is_setup = True
logger.info(
"Using Jira Server/Data Center authentication (PAT or Basic Auth)"
)
elif os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
jira_is_setup = True
logger.info(
"Using Jira minimal OAuth configuration - expecting user-provided tokens via headers"
)
if not confluence_is_setup:
logger.info(
"Confluence is not configured or required environment variables are missing."
)
if not jira_is_setup:
logger.info(
"Jira is not configured or required environment variables are missing."
)
return {"confluence": confluence_is_setup, "jira": jira_is_setup}
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_v2_adapter.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for ConfluenceV2Adapter class."""
from unittest.mock import MagicMock, Mock
import pytest
import requests
from requests.exceptions import HTTPError
from mcp_atlassian.confluence.v2_adapter import ConfluenceV2Adapter
class TestConfluenceV2Adapter:
"""Test cases for ConfluenceV2Adapter."""
@pytest.fixture
def mock_session(self):
"""Create a mock session."""
return MagicMock(spec=requests.Session)
@pytest.fixture
def v2_adapter(self, mock_session):
"""Create a ConfluenceV2Adapter instance."""
return ConfluenceV2Adapter(
session=mock_session, base_url="https://example.atlassian.net/wiki"
)
def test_get_page_success(self, v2_adapter, mock_session):
"""Test successful page retrieval."""
# Mock the v2 API response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"id": "123456",
"status": "current",
"title": "Test Page",
"spaceId": "789",
"version": {"number": 5},
"body": {
"storage": {"value": "<p>Test content</p>", "representation": "storage"}
},
"_links": {"webui": "/pages/viewpage.action?pageId=123456"},
}
mock_session.get.return_value = mock_response
# Mock space key lookup
space_response = Mock()
space_response.status_code = 200
space_response.json.return_value = {"key": "TEST"}
mock_session.get.side_effect = [mock_response, space_response]
# Call the method
result = v2_adapter.get_page("123456")
# Verify the API call
assert mock_session.get.call_count == 2
mock_session.get.assert_any_call(
"https://example.atlassian.net/wiki/api/v2/pages/123456",
params={"body-format": "storage"},
)
# Verify the response format
assert result["id"] == "123456"
assert result["type"] == "page"
assert result["title"] == "Test Page"
assert result["space"]["key"] == "TEST"
assert result["space"]["id"] == "789"
assert result["version"]["number"] == 5
assert result["body"]["storage"]["value"] == "<p>Test content</p>"
assert result["body"]["storage"]["representation"] == "storage"
def test_get_page_not_found(self, v2_adapter, mock_session):
"""Test page retrieval when page doesn't exist."""
# Mock a 404 response
mock_response = Mock()
mock_response.status_code = 404
mock_response.text = "Page not found"
mock_response.raise_for_status.side_effect = HTTPError(response=mock_response)
mock_session.get.return_value = mock_response
# Call the method and expect an exception
with pytest.raises(ValueError, match="Failed to get page '999999'"):
v2_adapter.get_page("999999")
def test_get_page_with_minimal_response(self, v2_adapter, mock_session):
"""Test page retrieval with minimal v2 response."""
# Mock the v2 API response without optional fields
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"id": "123456",
"status": "current",
"title": "Minimal Page",
}
mock_session.get.return_value = mock_response
# Call the method
result = v2_adapter.get_page("123456")
# Verify the response handles missing fields gracefully
assert result["id"] == "123456"
assert result["type"] == "page"
assert result["title"] == "Minimal Page"
assert result["space"]["key"] == "unknown" # Fallback when no spaceId
assert result["version"]["number"] == 1 # Default version
def test_get_page_network_error(self, v2_adapter, mock_session):
"""Test page retrieval with network error."""
# Mock a network error
mock_session.get.side_effect = requests.RequestException("Network error")
# Call the method and expect an exception
with pytest.raises(ValueError, match="Failed to get page '123456'"):
v2_adapter.get_page("123456")
def test_get_page_with_expand_parameter(self, v2_adapter, mock_session):
"""Test that expand parameter is accepted but not used."""
# Mock the v2 API response
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {
"id": "123456",
"status": "current",
"title": "Test Page",
}
mock_session.get.return_value = mock_response
# Call with expand parameter
result = v2_adapter.get_page("123456", expand="body.storage,version")
# Verify the API call doesn't include expand in params
mock_session.get.assert_called_once_with(
"https://example.atlassian.net/wiki/api/v2/pages/123456",
params={"body-format": "storage"},
)
# Verify we still get a result
assert result["id"] == "123456"
```
--------------------------------------------------------------------------------
/tests/unit/test_main_transport_selection.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for transport selection and execution.
These tests verify that:
1. All transports use direct execution (no stdin monitoring)
2. Transport selection logic works correctly (CLI vs environment)
3. Error handling is preserved
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcp_atlassian import main
class TestMainTransportSelection:
"""Test the main function's transport-specific execution logic."""
@pytest.fixture
def mock_server(self):
"""Create a mock server instance."""
server = MagicMock()
server.run_async = AsyncMock(return_value=None)
return server
@pytest.fixture
def mock_asyncio_run(self):
"""Mock asyncio.run to capture what coroutine is executed."""
with patch("asyncio.run") as mock_run:
# Store the coroutine for inspection
mock_run.side_effect = lambda coro: setattr(mock_run, "_called_with", coro)
yield mock_run
@pytest.mark.parametrize("transport", ["stdio", "sse", "streamable-http"])
def test_all_transports_use_direct_execution(
self, mock_server, mock_asyncio_run, transport
):
"""Verify all transports use direct execution without stdin monitoring.
This is a regression test for issues #519 and #524.
"""
with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
with patch.dict("os.environ", {"TRANSPORT": transport}):
with patch("sys.argv", ["mcp-atlassian"]):
try:
main()
except SystemExit:
pass
# Verify asyncio.run was called
assert mock_asyncio_run.called
# Get the coroutine info
called_coro = mock_asyncio_run._called_with
coro_repr = repr(called_coro)
# All transports must use direct execution
assert "run_with_stdio_monitoring" not in coro_repr
assert "run_async" in coro_repr or hasattr(called_coro, "cr_code")
def test_cli_overrides_env_transport(self, mock_server, mock_asyncio_run):
"""Test that CLI transport argument overrides environment variable."""
with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
with patch.dict("os.environ", {"TRANSPORT": "sse"}):
# Simulate CLI args with --transport stdio
with patch("sys.argv", ["mcp-atlassian", "--transport", "stdio"]):
try:
main()
except SystemExit:
pass
# All transports now use direct execution
called_coro = mock_asyncio_run._called_with
coro_repr = repr(called_coro)
assert "run_async" in coro_repr or hasattr(called_coro, "cr_code")
def test_signal_handlers_always_setup(self, mock_server):
"""Test that signal handlers are set up regardless of transport."""
with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
with patch("asyncio.run"):
# Patch where it's imported in the main module
with patch("mcp_atlassian.setup_signal_handlers") as mock_setup:
with patch.dict("os.environ", {"TRANSPORT": "stdio"}):
with patch("sys.argv", ["mcp-atlassian"]):
try:
main()
except SystemExit:
pass
# Signal handlers should always be set up
mock_setup.assert_called_once()
def test_error_handling_preserved(self, mock_server):
"""Test that error handling works correctly for all transports."""
# Make the server's run_async raise an exception when awaited
error = RuntimeError("Server error")
async def failing_run_async(**kwargs):
raise error
mock_server.run_async = failing_run_async
with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
with patch("asyncio.run") as mock_run:
# Simulate the exception propagating through asyncio.run
mock_run.side_effect = error
with patch.dict("os.environ", {"TRANSPORT": "stdio"}):
with patch("sys.argv", ["mcp-atlassian"]):
# The main function logs the error and exits with code 1
with patch("sys.exit") as mock_exit:
main()
# Verify error was handled - sys.exit called with 1 for error
# and then with 0 in the finally block
assert mock_exit.call_count == 2
assert mock_exit.call_args_list[0][0][0] == 1 # Error exit
assert (
mock_exit.call_args_list[1][0][0] == 0
) # Finally exit
```
--------------------------------------------------------------------------------
/tests/unit/servers/test_main_server.py:
--------------------------------------------------------------------------------
```python
"""Tests for the main MCP server implementation."""
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from starlette.requests import Request
from starlette.responses import JSONResponse
from mcp_atlassian.servers.main import UserTokenMiddleware, main_mcp
@pytest.mark.anyio
async def test_run_server_stdio():
"""Test that main_mcp.run_async is called with stdio transport."""
with patch.object(main_mcp, "run_async") as mock_run_async:
mock_run_async.return_value = None
await main_mcp.run_async(transport="stdio")
mock_run_async.assert_called_once_with(transport="stdio")
@pytest.mark.anyio
async def test_run_server_sse():
"""Test that main_mcp.run_async is called with sse transport and correct port."""
with patch.object(main_mcp, "run_async") as mock_run_async:
mock_run_async.return_value = None
test_port = 9000
await main_mcp.run_async(transport="sse", port=test_port)
mock_run_async.assert_called_once_with(transport="sse", port=test_port)
@pytest.mark.anyio
async def test_run_server_streamable_http():
"""Test that main_mcp.run_async is called with streamable-http transport and correct parameters."""
with patch.object(main_mcp, "run_async") as mock_run_async:
mock_run_async.return_value = None
test_port = 9001
test_host = "127.0.0.1"
test_path = "/custom_mcp"
await main_mcp.run_async(
transport="streamable-http", port=test_port, host=test_host, path=test_path
)
mock_run_async.assert_called_once_with(
transport="streamable-http", port=test_port, host=test_host, path=test_path
)
@pytest.mark.anyio
async def test_run_server_invalid_transport():
"""Test that run_server raises ValueError for invalid transport."""
# We don't need to patch run_async here as the error occurs before it's called
with pytest.raises(ValueError) as excinfo:
await main_mcp.run_async(transport="invalid") # type: ignore
assert "Unknown transport" in str(excinfo.value)
assert "invalid" in str(excinfo.value)
@pytest.mark.anyio
async def test_health_check_endpoint():
"""Test the health check endpoint returns 200 and correct JSON response."""
app = main_mcp.sse_app()
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/healthz")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
@pytest.mark.anyio
async def test_sse_app_health_check_endpoint():
"""Test the /healthz endpoint on the SSE app returns 200 and correct JSON response."""
app = main_mcp.sse_app()
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/healthz")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
@pytest.mark.anyio
async def test_streamable_http_app_health_check_endpoint():
"""Test the /healthz endpoint on the Streamable HTTP app returns 200 and correct JSON response."""
app = main_mcp.streamable_http_app()
transport = httpx.ASGITransport(app=app)
async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
response = await client.get("/healthz")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
class TestUserTokenMiddleware:
"""Tests for the UserTokenMiddleware class."""
@pytest.fixture
def middleware(self):
"""Create a UserTokenMiddleware instance for testing."""
mock_app = AsyncMock()
# Create a mock MCP server to avoid warnings
mock_mcp_server = MagicMock()
mock_mcp_server.settings.streamable_http_path = "/mcp"
return UserTokenMiddleware(mock_app, mcp_server_ref=mock_mcp_server)
@pytest.fixture
def mock_request(self):
"""Create a mock request for testing."""
request = MagicMock(spec=Request)
request.url.path = "/mcp"
request.method = "POST"
request.headers = {}
# Create a real state object that can be modified
from types import SimpleNamespace
request.state = SimpleNamespace()
return request
@pytest.fixture
def mock_call_next(self):
"""Create a mock call_next function."""
mock_response = JSONResponse({"test": "response"})
call_next = AsyncMock(return_value=mock_response)
return call_next
@pytest.mark.anyio
async def test_cloud_id_header_extraction_success(
self, middleware, mock_request, mock_call_next
):
"""Test successful cloud ID header extraction."""
# Setup request with cloud ID header
mock_request.headers = {
"Authorization": "Bearer test-token",
"X-Atlassian-Cloud-Id": "test-cloud-id-123",
}
result = await middleware.dispatch(mock_request, mock_call_next)
# Verify cloud ID was extracted and stored in request state
assert hasattr(mock_request.state, "user_atlassian_cloud_id")
assert mock_request.state.user_atlassian_cloud_id == "test-cloud-id-123"
# Verify the request was processed normally
mock_call_next.assert_called_once_with(mock_request)
assert result is not None
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_custom_headers.py:
--------------------------------------------------------------------------------
```python
"""Tests for Confluence custom headers functionality."""
import os
from unittest.mock import MagicMock, patch
from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
class TestConfluenceConfigCustomHeaders:
"""Test ConfluenceConfig parsing of custom headers."""
def test_no_custom_headers(self):
"""Test ConfluenceConfig when no custom headers are configured."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_user",
"CONFLUENCE_API_TOKEN": "test_token",
},
clear=True,
):
config = ConfluenceConfig.from_env()
assert config.custom_headers == {}
def test_service_specific_headers_only(self):
"""Test ConfluenceConfig parsing of service-specific headers only."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_user",
"CONFLUENCE_API_TOKEN": "test_token",
"CONFLUENCE_CUSTOM_HEADERS": "X-Confluence-Specific=confluence_value,X-Service=service_value",
},
clear=True,
):
config = ConfluenceConfig.from_env()
expected = {
"X-Confluence-Specific": "confluence_value",
"X-Service": "service_value",
}
assert config.custom_headers == expected
def test_malformed_headers_are_ignored(self):
"""Test that malformed headers are ignored gracefully."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_user",
"CONFLUENCE_API_TOKEN": "test_token",
"CONFLUENCE_CUSTOM_HEADERS": "malformed-header,X-Valid=valid_value,another-malformed",
},
clear=True,
):
config = ConfluenceConfig.from_env()
expected = {"X-Valid": "valid_value"}
assert config.custom_headers == expected
def test_empty_header_strings(self):
"""Test handling of empty header strings."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_user",
"CONFLUENCE_API_TOKEN": "test_token",
"CONFLUENCE_CUSTOM_HEADERS": " ",
},
clear=True,
):
config = ConfluenceConfig.from_env()
assert config.custom_headers == {}
class TestConfluenceClientCustomHeaders:
"""Test ConfluenceClient custom headers application."""
def test_no_custom_headers_applied(self, monkeypatch):
"""Test that no headers are applied when none are configured."""
# Mock Confluence and related dependencies
mock_confluence = MagicMock()
mock_session = MagicMock()
mock_session.headers = {}
mock_confluence._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.confluence.client.Confluence",
lambda **kwargs: mock_confluence,
)
monkeypatch.setattr(
"mcp_atlassian.confluence.client.configure_ssl_verification",
lambda **kwargs: None,
)
monkeypatch.setattr(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
lambda **kwargs: MagicMock(),
)
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="test_user",
api_token="test_token",
custom_headers={},
)
client = ConfluenceClient(config=config)
# Verify no custom headers were applied
assert mock_session.headers == {}
def test_custom_headers_applied_to_session(self, monkeypatch):
"""Test that custom headers are applied to the Confluence session."""
# Mock Confluence and related dependencies
mock_confluence = MagicMock()
mock_session = MagicMock()
mock_session.headers = {}
mock_confluence._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.confluence.client.Confluence",
lambda **kwargs: mock_confluence,
)
monkeypatch.setattr(
"mcp_atlassian.confluence.client.configure_ssl_verification",
lambda **kwargs: None,
)
monkeypatch.setattr(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
lambda **kwargs: MagicMock(),
)
custom_headers = {
"X-Corp-Auth": "token123",
"X-Dept": "engineering",
"User-Agent": "CustomConfluenceClient/1.0",
}
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="test_user",
api_token="test_token",
custom_headers=custom_headers,
)
client = ConfluenceClient(config=config)
# Verify custom headers were applied to session
for header_name, header_value in custom_headers.items():
assert mock_session.headers[header_name] == header_value
```
--------------------------------------------------------------------------------
/scripts/test_with_real_data.sh:
--------------------------------------------------------------------------------
```bash
#!/bin/bash
# Unified script for testing with real Atlassian data
# Supports testing models, API, or both
# Default settings
TEST_TYPE="all" # Can be "all", "models", or "api"
VERBOSITY="-v" # Verbosity level
RUN_WRITE_TESTS=false
FILTER="" # Test filter using pytest's -k option
# Parse command line arguments
while [[ $# -gt 0 ]]; do
case $1 in
--models-only)
TEST_TYPE="models"
shift
;;
--api-only)
TEST_TYPE="api"
shift
;;
--all)
TEST_TYPE="all"
shift
;;
--quiet)
VERBOSITY=""
shift
;;
--verbose)
VERBOSITY="-vv"
shift
;;
--with-write-tests)
RUN_WRITE_TESTS=true
shift
;;
-k)
FILTER="-k \"$2\""
shift
shift
;;
--help)
echo "Usage: $0 [options]"
echo "Options:"
echo " --models-only Test only Pydantic models"
echo " --api-only Test only API integration"
echo " --all Test both models and API (default)"
echo " --quiet Minimal output"
echo " --verbose More detailed output"
echo " --with-write-tests Include tests that modify data (including TextContent validation)"
echo " -k \"PATTERN\" Only run tests matching the given pattern (uses pytest's -k option)"
echo " --help Show this help message"
exit 0
;;
*)
echo "Unknown option: $1"
echo "Use --help for usage information"
exit 1
;;
esac
done
# Check if .env file exists
if [ ! -f ".env" ]; then
echo "Warning: .env file not found. Tests will be skipped if environment variables are not set."
else
# Load environment variables from .env
source .env
fi
# Set environment variable to enable real data testing
export USE_REAL_DATA=true
# Set specific test IDs for API validation tests
# These will be used if they're set, otherwise tests will be skipped
export JIRA_TEST_ISSUE_KEY="${JIRA_TEST_ISSUE_KEY:-}"
export JIRA_TEST_EPIC_KEY="${JIRA_TEST_EPIC_KEY:-}"
export CONFLUENCE_TEST_PAGE_ID="${CONFLUENCE_TEST_PAGE_ID:-}"
export JIRA_TEST_PROJECT_KEY="${JIRA_TEST_PROJECT_KEY:-}"
export CONFLUENCE_TEST_SPACE_KEY="${CONFLUENCE_TEST_SPACE_KEY:-}"
# Check required environment variables and warn if any are missing
required_vars=(
"JIRA_URL"
"JIRA_USERNAME"
"JIRA_API_TOKEN"
"CONFLUENCE_URL"
"CONFLUENCE_USERNAME"
"CONFLUENCE_API_TOKEN"
)
missing_vars=0
for var in "${required_vars[@]}"; do
if [ -z "${!var}" ]; then
echo "Warning: Environment variable $var is not set. Some tests will be skipped."
missing_vars=$((missing_vars+1))
fi
done
if [ $missing_vars -gt 0 ]; then
echo "Found $missing_vars missing required variables. Tests requiring these variables will be skipped."
echo "You can set these in your .env file to run all tests."
fi
# Function to run model tests
run_model_tests() {
echo "Running Pydantic model tests with real data..."
echo ""
echo "===== Base Model Tests ====="
uv run pytest tests/unit/models/test_base_models.py $VERBOSITY
echo ""
echo "===== Jira Model Tests ====="
uv run pytest tests/unit/models/test_jira_models.py::TestRealJiraData $VERBOSITY
echo ""
echo "===== Confluence Model Tests ====="
uv run pytest tests/unit/models/test_confluence_models.py::TestRealConfluenceData $VERBOSITY
}
# Function to run API tests
run_api_tests() {
echo ""
echo "===== API Read-Only Tests ====="
# If a filter is provided, run all tests with that filter
if [[ -n "$FILTER" ]]; then
echo "Running tests with filter: $FILTER"
eval "uv run pytest tests/test_real_api_validation.py $VERBOSITY $FILTER"
return
fi
# Otherwise run specific tests based on write/read only setting
# Run the read-only tests
uv run pytest tests/test_real_api_validation.py::test_jira_get_issue tests/test_real_api_validation.py::test_jira_get_issue_with_fields tests/test_real_api_validation.py::test_jira_get_epic_issues tests/test_real_api_validation.py::test_confluence_get_page_content $VERBOSITY
if [[ "$RUN_WRITE_TESTS" == "true" ]]; then
echo ""
echo "===== API Write Operation Tests ====="
echo "WARNING: These tests will create and modify data in your Atlassian instance."
echo "Press Ctrl+C now to cancel, or wait 5 seconds to continue..."
sleep 5
# Run the write operation tests
uv run pytest tests/test_real_api_validation.py::test_jira_create_issue tests/test_real_api_validation.py::test_jira_create_subtask tests/test_real_api_validation.py::test_jira_create_task_with_parent tests/test_real_api_validation.py::test_jira_add_comment tests/test_real_api_validation.py::test_confluence_create_page tests/test_real_api_validation.py::test_confluence_update_page tests/test_real_api_validation.py::test_jira_create_epic tests/test_real_api_validation.py::test_jira_create_epic_two_step $VERBOSITY
# Run the skipped transition test if explicitly requested write tests
echo ""
echo "===== API Advanced Write Tests ====="
echo "Running tests for status transitions"
uv run pytest tests/test_real_api_validation.py::test_jira_transition_issue -v -k "test_jira_transition_issue"
fi
}
# Run the appropriate tests based on the selected type
case $TEST_TYPE in
"models")
run_model_tests
;;
"api")
run_api_tests
;;
"all")
run_model_tests
run_api_tests
;;
esac
echo ""
echo "Testing completed. Check the output for any failures or skipped tests."
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_lifecycle.py:
--------------------------------------------------------------------------------
```python
"""Tests for lifecycle management utilities."""
import signal
from unittest.mock import patch
from mcp_atlassian.utils.lifecycle import (
_shutdown_event,
ensure_clean_exit,
setup_signal_handlers,
)
class TestSetupSignalHandlers:
"""Test signal handler setup functionality."""
@patch("signal.signal")
def test_setup_signal_handlers_all_platforms(self, mock_signal):
"""Test that signal handlers are registered for all platforms."""
# Mock SIGPIPE as available
mock_signal.side_effect = None
setup_signal_handlers()
# Check that SIGTERM and SIGINT handlers were registered
assert any(call[0][0] == signal.SIGTERM for call in mock_signal.call_args_list)
assert any(call[0][0] == signal.SIGINT for call in mock_signal.call_args_list)
# Check that all handlers are callable
for call in mock_signal.call_args_list:
assert callable(call[0][1])
@patch("signal.signal")
def test_setup_signal_handlers_no_sigpipe(self, mock_signal):
"""Test signal handler setup when SIGPIPE is not available (Windows)."""
# Mock SIGPIPE as not available
def side_effect(sig, handler):
if sig == signal.SIGPIPE:
raise AttributeError("SIGPIPE not available")
return None
mock_signal.side_effect = side_effect
# This should not raise an exception
setup_signal_handlers()
# SIGTERM and SIGINT should still be registered
assert any(call[0][0] == signal.SIGTERM for call in mock_signal.call_args_list)
assert any(call[0][0] == signal.SIGINT for call in mock_signal.call_args_list)
@patch("signal.signal")
def test_signal_handler_function(self, mock_signal):
"""Test that the signal handler function works correctly."""
handler = None
# Capture the handler function
def capture_handler(sig, func):
nonlocal handler
if sig == signal.SIGTERM:
handler = func
mock_signal.side_effect = capture_handler
# Clear the shutdown event before test
_shutdown_event.clear()
setup_signal_handlers()
# Call the handler
assert handler is not None
handler(signal.SIGTERM, None)
# Check shutdown event was set instead of calling sys.exit
assert _shutdown_event.is_set()
class TestEnsureCleanExit:
"""Test the clean exit functionality."""
@patch("sys.stderr")
@patch("sys.stdout")
def test_ensure_clean_exit(self, mock_stdout, mock_stderr):
"""Test that output streams are flushed on exit."""
# Mock streams as open
mock_stdout.closed = False
mock_stderr.closed = False
ensure_clean_exit()
# Check both streams were flushed
mock_stdout.flush.assert_called_once()
mock_stderr.flush.assert_called_once()
@patch("sys.stderr")
@patch("sys.stdout")
def test_ensure_clean_exit_closed_stdout(self, mock_stdout, mock_stderr):
"""Test that closed stdout is handled gracefully."""
# Mock stdout as closed, stderr as open
mock_stdout.closed = True
mock_stderr.closed = False
ensure_clean_exit()
# Check stdout was not flushed
mock_stdout.flush.assert_not_called()
# Check stderr was still flushed
mock_stderr.flush.assert_called_once()
@patch("sys.stderr")
@patch("sys.stdout")
def test_ensure_clean_exit_closed_stderr(self, mock_stdout, mock_stderr):
"""Test that closed stderr is handled gracefully."""
# Mock stderr as closed, stdout as open
mock_stdout.closed = False
mock_stderr.closed = True
ensure_clean_exit()
# Check stdout was flushed
mock_stdout.flush.assert_called_once()
# Check stderr was not flushed
mock_stderr.flush.assert_not_called()
@patch("sys.stderr")
@patch("sys.stdout")
def test_ensure_clean_exit_both_closed(self, mock_stdout, mock_stderr):
"""Test that both streams being closed is handled gracefully."""
# Mock both streams as closed
mock_stdout.closed = True
mock_stderr.closed = True
ensure_clean_exit()
# Check neither stream was flushed
mock_stdout.flush.assert_not_called()
mock_stderr.flush.assert_not_called()
@patch("sys.stderr")
@patch("sys.stdout")
def test_ensure_clean_exit_flush_raises_value_error(self, mock_stdout, mock_stderr):
"""Test that ValueError during flush is handled gracefully."""
# Mock streams as open but flush raises ValueError
mock_stdout.closed = False
mock_stderr.closed = False
mock_stdout.flush.side_effect = ValueError("I/O operation on closed file")
mock_stderr.flush.side_effect = ValueError("I/O operation on closed file")
# Should not raise exception
ensure_clean_exit()
# Check both streams had flush attempts
mock_stdout.flush.assert_called_once()
mock_stderr.flush.assert_called_once()
@patch("sys.stderr")
@patch("sys.stdout")
def test_ensure_clean_exit_no_closed_attribute(self, mock_stdout, mock_stderr):
"""Test handling of streams without 'closed' attribute."""
# Remove closed attribute to simulate non-standard streams
if hasattr(mock_stdout, "closed"):
delattr(mock_stdout, "closed")
if hasattr(mock_stderr, "closed"):
delattr(mock_stderr, "closed")
# Should not raise exception
ensure_clean_exit()
# Check neither stream was flushed (no closed attribute)
mock_stdout.flush.assert_not_called()
mock_stderr.flush.assert_not_called()
```
--------------------------------------------------------------------------------
/tests/integration/test_stdin_monitoring_fix.py:
--------------------------------------------------------------------------------
```python
"""Simple integration test to verify stdin monitoring fix for streamable-http transport.
This test verifies that the fix in PR #522 correctly disables stdin monitoring
for HTTP transports (SSE and streamable-http) to prevent hanging issues.
"""
import os
import subprocess
import sys
import tempfile
from pathlib import Path
import pytest
@pytest.mark.integration
class TestStdinMonitoringFix:
"""Test that stdin monitoring is correctly disabled for HTTP transports."""
def test_streamable_http_starts_without_hanging(self):
"""Test that streamable-http transport starts without stdin monitoring issues.
This test creates a minimal script that would hang if stdin monitoring
was enabled for HTTP transports, and verifies it runs successfully.
"""
# Create a test script that simulates the issue
with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
f.write("""
import sys
import os
# The actual test: if stdin monitoring was incorrectly enabled for HTTP,
# closing stdin would cause issues. With the fix, it should work fine.
if __name__ == "__main__":
# This simulates the scenario where stdin is closed (like in the bug report)
# With the fix, HTTP transports won't monitor stdin, so this won't cause issues
sys.stdin.close()
# If we get here without hanging, the fix is working
print("TEST_PASSED: No hanging with closed stdin")
sys.exit(0)
""")
test_script = f.name
try:
# Run the test script
result = subprocess.run(
[sys.executable, test_script],
capture_output=True,
text=True,
timeout=5, # Should complete quickly, timeout means hanging
)
# Check the output
assert "TEST_PASSED" in result.stdout, (
f"Test failed. Output: {result.stdout}, Error: {result.stderr}"
)
assert result.returncode == 0, (
f"Script failed with code {result.returncode}"
)
except subprocess.TimeoutExpired:
pytest.fail(
"Script timed out - stdin monitoring may still be active for HTTP transports"
)
finally:
# Clean up
os.unlink(test_script)
def test_code_structure_validates_fix(self):
"""Validate that the code structure implements the fix correctly.
This checks that the main entry point has the correct logic to disable
stdin monitoring for HTTP transports.
"""
# Read the main module source directly
main_file = (
Path(__file__).parent.parent.parent
/ "src"
/ "mcp_atlassian"
/ "__init__.py"
)
with open(main_file) as f:
source = f.read()
# Check for the key parts of the fix
# 1. Different handling for stdio vs HTTP transports
assert 'if final_transport == "stdio":' in source
# 2. Comments explaining the fix
assert (
"# For stdio transport, don't monitor stdin as MCP server handles it internally"
in source
)
assert (
"# This prevents race conditions where both try to read from the same stdin"
in source
)
assert (
"# For HTTP transports (SSE, streamable-http), don't use stdin monitoring"
in source
)
assert (
"# as it causes premature shutdown when the client closes stdin" in source
)
assert "# The server should only rely on OS signals for shutdown" in source
# 3. Proper conditional logic - look for the actual asyncio.run calls
# There should be two separate sections handling stdio vs HTTP
stdio_section = False
http_section = False
lines = source.split("\n")
for i, line in enumerate(lines):
# Look for the stdio handling
if "# For stdio transport," in line and "monitor stdin" in line:
# Next few lines should have the stdio-specific handling
next_lines = "\n".join(lines[i : i + 5])
if (
'if final_transport == "stdio":' in next_lines
and "asyncio.run" in next_lines
):
stdio_section = True
# Look for the HTTP handling
if "# For HTTP transports" in line and "stdin monitoring" in line:
# Next few lines should have the HTTP-specific handling
next_lines = "\n".join(lines[i : i + 10])
if (
"without stdin monitoring" in next_lines
and "asyncio.run" in next_lines
):
http_section = True
assert stdio_section, "Could not find proper stdio transport handling"
assert http_section, "Could not find proper HTTP transport handling"
print("Code structure validation passed - fix is properly implemented")
def test_lifecycle_module_supports_http_transports(self):
"""Test that the lifecycle module properly handles HTTP transports.
This verifies that the lifecycle management doesn't interfere with
HTTP transport operation.
"""
from mcp_atlassian.utils.lifecycle import (
ensure_clean_exit,
setup_signal_handlers,
)
# These should work without issues for HTTP transports
try:
setup_signal_handlers()
ensure_clean_exit()
print("Lifecycle module works correctly for HTTP transports")
except Exception as e:
pytest.fail(f"Lifecycle module failed: {e}")
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_spaces.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the SpacesMixin class."""
from unittest.mock import patch
import pytest
import requests
from fixtures.confluence_mocks import MOCK_SPACES_RESPONSE
from mcp_atlassian.confluence.spaces import SpacesMixin
class TestSpacesMixin:
"""Tests for the SpacesMixin class."""
@pytest.fixture
def spaces_mixin(self, confluence_client):
"""Create a SpacesMixin instance for testing."""
# SpacesMixin inherits from ConfluenceClient, so we need to create it properly
with patch(
"mcp_atlassian.confluence.spaces.ConfluenceClient.__init__"
) as mock_init:
mock_init.return_value = None
mixin = SpacesMixin()
# Copy the necessary attributes from our mocked client
mixin.confluence = confluence_client.confluence
mixin.config = confluence_client.config
mixin.preprocessor = confluence_client.preprocessor
return mixin
def test_get_spaces(self, spaces_mixin):
"""Test that get_spaces returns spaces from the Confluence client."""
# Act
result = spaces_mixin.get_spaces(start=10, limit=20)
# Assert
spaces_mixin.confluence.get_all_spaces.assert_called_once_with(
start=10, limit=20
)
assert result == MOCK_SPACES_RESPONSE
def test_get_user_contributed_spaces_success(self, spaces_mixin):
"""Test getting spaces that the user has contributed to."""
# Arrange
mock_result = {
"results": [
{
"content": {"_expandable": {"space": "/rest/api/space/TEST"}},
"resultGlobalContainer": {
"title": "Test Space",
"displayUrl": "/spaces/TEST",
},
}
]
}
spaces_mixin.confluence.cql.return_value = mock_result
# Act
result = spaces_mixin.get_user_contributed_spaces(limit=100)
# Assert
spaces_mixin.confluence.cql.assert_called_once_with(
cql="contributor = currentUser() order by lastmodified DESC", limit=100
)
assert result == {"TEST": {"key": "TEST", "name": "Test Space"}}
def test_get_user_contributed_spaces_extraction_methods(self, spaces_mixin):
"""Test that the method extracts space keys from different result structures."""
# Arrange - Test different extraction methods
mock_results = {
"results": [
# Case 1: Extract from resultGlobalContainer.displayUrl
{
"resultGlobalContainer": {
"title": "Space 1",
"displayUrl": "/spaces/SPACE1/pages",
}
},
# Case 2: Extract from content._expandable.space
{
"content": {"_expandable": {"space": "/rest/api/space/SPACE2"}},
"resultGlobalContainer": {"title": "Space 2"},
},
# Case 3: Extract from url
{
"url": "/spaces/SPACE3/pages/12345",
"resultGlobalContainer": {"title": "Space 3"},
},
]
}
spaces_mixin.confluence.cql.return_value = mock_results
# Act
result = spaces_mixin.get_user_contributed_spaces()
# Assert
assert "SPACE1" in result
assert result["SPACE1"]["name"] == "Space 1"
assert "SPACE2" in result
assert result["SPACE2"]["name"] == "Space 2"
assert "SPACE3" in result
assert result["SPACE3"]["name"] == "Space 3"
def test_get_user_contributed_spaces_with_duplicate_spaces(self, spaces_mixin):
"""Test that duplicate spaces are deduplicated."""
# Arrange
mock_results = {
"results": [
# Same space key appears multiple times
{
"resultGlobalContainer": {
"title": "Space 1",
"displayUrl": "/spaces/SPACE1",
}
},
{
"resultGlobalContainer": {
"title": "Space 1",
"displayUrl": "/spaces/SPACE1",
}
},
{"content": {"_expandable": {"space": "/rest/api/space/SPACE1"}}},
]
}
spaces_mixin.confluence.cql.return_value = mock_results
# Act
result = spaces_mixin.get_user_contributed_spaces()
# Assert
assert len(result) == 1
assert "SPACE1" in result
assert result["SPACE1"]["name"] == "Space 1"
def test_get_user_contributed_spaces_api_error(self, spaces_mixin):
"""Test handling of API errors."""
# Arrange
spaces_mixin.confluence.cql.side_effect = requests.RequestException("API Error")
# Act
result = spaces_mixin.get_user_contributed_spaces()
# Assert
assert result == {}
def test_get_user_contributed_spaces_key_error(self, spaces_mixin):
"""Test handling of KeyError when parsing results."""
# Arrange
spaces_mixin.confluence.cql.return_value = {"invalid_key": []}
# Act
result = spaces_mixin.get_user_contributed_spaces()
# Assert
assert result == {}
def test_get_user_contributed_spaces_type_error(self, spaces_mixin):
"""Test handling of TypeError when processing results."""
# Arrange
spaces_mixin.confluence.cql.return_value = (
None # Will cause TypeError when iterating
)
# Act
result = spaces_mixin.get_user_contributed_spaces()
# Assert
assert result == {}
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_config.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the ConfluenceConfig class."""
import os
from unittest.mock import patch
import pytest
from mcp_atlassian.confluence.config import ConfluenceConfig
def test_from_env_success():
"""Test that from_env successfully creates a config from environment variables."""
# Need to clear and reset the environment for this test
with patch.dict(
"os.environ",
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_username",
"CONFLUENCE_API_TOKEN": "test_token",
},
clear=True, # Clear existing environment variables
):
config = ConfluenceConfig.from_env()
assert config.url == "https://test.atlassian.net/wiki"
assert config.username == "test_username"
assert config.api_token == "test_token"
def test_from_env_missing_url():
"""Test that from_env raises ValueError when URL is missing."""
original_env = os.environ.copy()
try:
os.environ.clear()
with pytest.raises(
ValueError, match="Missing required CONFLUENCE_URL environment variable"
):
ConfluenceConfig.from_env()
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
def test_from_env_missing_cloud_auth():
"""Test that from_env raises ValueError when cloud auth credentials are missing."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net", # Cloud URL
},
clear=True,
):
with pytest.raises(
ValueError,
match="Cloud authentication requires CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN",
):
ConfluenceConfig.from_env()
def test_from_env_missing_server_auth():
"""Test that from_env raises ValueError when server auth credentials are missing."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://confluence.example.com", # Server URL
},
clear=True,
):
with pytest.raises(
ValueError,
match="Server/Data Center authentication requires CONFLUENCE_PERSONAL_TOKEN",
):
ConfluenceConfig.from_env()
def test_is_cloud():
"""Test that is_cloud property returns correct value."""
# Arrange & Act - Cloud URL
config = ConfluenceConfig(
url="https://example.atlassian.net/wiki",
auth_type="basic",
username="test",
api_token="test",
)
# Assert
assert config.is_cloud is True
# Arrange & Act - Server URL
config = ConfluenceConfig(
url="https://confluence.example.com",
auth_type="pat",
personal_token="test",
)
# Assert
assert config.is_cloud is False
# Arrange & Act - Localhost URL (Data Center/Server)
config = ConfluenceConfig(
url="http://localhost:8090",
auth_type="pat",
personal_token="test",
)
# Assert
assert config.is_cloud is False
# Arrange & Act - IP localhost URL (Data Center/Server)
config = ConfluenceConfig(
url="http://127.0.0.1:8090",
auth_type="pat",
personal_token="test",
)
# Assert
assert config.is_cloud is False
def test_from_env_proxy_settings():
"""Test that from_env correctly loads proxy environment variables."""
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_username",
"CONFLUENCE_API_TOKEN": "test_token",
"HTTP_PROXY": "http://proxy.example.com:8080",
"HTTPS_PROXY": "https://proxy.example.com:8443",
"SOCKS_PROXY": "socks5://user:[email protected]:1080",
"NO_PROXY": "localhost,127.0.0.1",
},
clear=True,
):
config = ConfluenceConfig.from_env()
assert config.http_proxy == "http://proxy.example.com:8080"
assert config.https_proxy == "https://proxy.example.com:8443"
assert config.socks_proxy == "socks5://user:[email protected]:1080"
assert config.no_proxy == "localhost,127.0.0.1"
# Service-specific overrides
with patch.dict(
os.environ,
{
"CONFLUENCE_URL": "https://test.atlassian.net/wiki",
"CONFLUENCE_USERNAME": "test_username",
"CONFLUENCE_API_TOKEN": "test_token",
"CONFLUENCE_HTTP_PROXY": "http://confluence-proxy.example.com:8080",
"CONFLUENCE_HTTPS_PROXY": "https://confluence-proxy.example.com:8443",
"CONFLUENCE_SOCKS_PROXY": "socks5://user:[email protected]:1080",
"CONFLUENCE_NO_PROXY": "localhost,127.0.0.1,.internal.example.com",
},
clear=True,
):
config = ConfluenceConfig.from_env()
assert config.http_proxy == "http://confluence-proxy.example.com:8080"
assert config.https_proxy == "https://confluence-proxy.example.com:8443"
assert (
config.socks_proxy == "socks5://user:[email protected]:1080"
)
assert config.no_proxy == "localhost,127.0.0.1,.internal.example.com"
def test_is_cloud_oauth_with_cloud_id():
"""Test that is_cloud returns True for OAuth with cloud_id regardless of URL."""
from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig
# OAuth with cloud_id and no URL - should be Cloud
oauth_config = BYOAccessTokenOAuthConfig(
cloud_id="test-cloud-id", access_token="test-token"
)
config = ConfluenceConfig(
url=None, # URL can be None in Multi-Cloud OAuth mode
auth_type="oauth",
oauth_config=oauth_config,
)
assert config.is_cloud is True
# OAuth with cloud_id and server URL - should still be Cloud
config = ConfluenceConfig(
url="https://confluence.example.com", # Server-like URL
auth_type="oauth",
oauth_config=oauth_config,
)
assert config.is_cloud is True
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/comments.py:
--------------------------------------------------------------------------------
```python
"""Module for Confluence comment operations."""
import logging
import requests
from ..models.confluence import ConfluenceComment
from .client import ConfluenceClient
logger = logging.getLogger("mcp-atlassian")
class CommentsMixin(ConfluenceClient):
"""Mixin for Confluence comment operations."""
def get_page_comments(
self, page_id: str, *, return_markdown: bool = True
) -> list[ConfluenceComment]:
"""
Get all comments for a specific page.
Args:
page_id: The ID of the page to get comments from
return_markdown: When True, returns content in markdown format,
otherwise returns raw HTML (keyword-only)
Returns:
List of ConfluenceComment models containing comment content and metadata
"""
try:
# Get page info to extract space details
page = self.confluence.get_page_by_id(page_id=page_id, expand="space")
space_key = page.get("space", {}).get("key", "")
# Get comments with expanded content
comments_response = self.confluence.get_page_comments(
content_id=page_id, expand="body.view.value,version", depth="all"
)
# Process each comment
comment_models = []
for comment_data in comments_response.get("results", []):
# Get the content based on format
body = comment_data["body"]["view"]["value"]
processed_html, processed_markdown = (
self.preprocessor.process_html_content(
body, space_key=space_key, confluence_client=self.confluence
)
)
# Create a copy of the comment data to modify
modified_comment_data = comment_data.copy()
# Modify the body value based on the return format
if "body" not in modified_comment_data:
modified_comment_data["body"] = {}
if "view" not in modified_comment_data["body"]:
modified_comment_data["body"]["view"] = {}
# Set the appropriate content based on return format
modified_comment_data["body"]["view"]["value"] = (
processed_markdown if return_markdown else processed_html
)
# Create the model with the processed content
comment_model = ConfluenceComment.from_api_response(
modified_comment_data,
base_url=self.config.url,
)
comment_models.append(comment_model)
return comment_models
except KeyError as e:
logger.error(f"Missing key in comment data: {str(e)}")
return []
except requests.RequestException as e:
logger.error(f"Network error when fetching comments: {str(e)}")
return []
except (ValueError, TypeError) as e:
logger.error(f"Error processing comment data: {str(e)}")
return []
except Exception as e: # noqa: BLE001 - Intentional fallback with full logging
logger.error(f"Unexpected error fetching comments: {str(e)}")
logger.debug("Full exception details for comments:", exc_info=True)
return []
def add_comment(self, page_id: str, content: str) -> ConfluenceComment | None:
"""
Add a comment to a Confluence page.
Args:
page_id: The ID of the page to add the comment to
content: The content of the comment (in Confluence storage format)
Returns:
ConfluenceComment object if comment was added successfully, None otherwise
"""
try:
# Get page info to extract space details
page = self.confluence.get_page_by_id(page_id=page_id, expand="space")
space_key = page.get("space", {}).get("key", "")
# Convert markdown to Confluence storage format if needed
# The atlassian-python-api expects content in Confluence storage format
if not content.strip().startswith("<"):
# If content doesn't appear to be HTML/XML, treat it as markdown
content = self.preprocessor.markdown_to_confluence_storage(content)
# Add the comment via the Confluence API
response = self.confluence.add_comment(page_id, content)
if not response:
logger.error("Failed to add comment: empty response")
return None
# Process the comment to return a consistent model
processed_html, processed_markdown = self.preprocessor.process_html_content(
response.get("body", {}).get("view", {}).get("value", ""),
space_key=space_key,
confluence_client=self.confluence,
)
# Modify the response to include processed content
modified_response = response.copy()
if "body" not in modified_response:
modified_response["body"] = {}
if "view" not in modified_response["body"]:
modified_response["body"]["view"] = {}
modified_response["body"]["view"]["value"] = processed_markdown
# Create and return the comment model
return ConfluenceComment.from_api_response(
modified_response,
base_url=self.config.url,
)
except requests.RequestException as e:
logger.error(f"Network error when adding comment: {str(e)}")
return None
except (ValueError, TypeError, KeyError) as e:
logger.error(f"Error processing comment data: {str(e)}")
return None
except Exception as e: # noqa: BLE001 - Intentional fallback with full logging
logger.error(f"Unexpected error adding comment: {str(e)}")
logger.debug("Full exception details for adding comment:", exc_info=True)
return None
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/protocols.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira protocol definitions."""
from abc import abstractmethod
from typing import Any, Protocol, runtime_checkable
from ..models.jira import JiraIssue
from ..models.jira.search import JiraSearchResult
class AttachmentsOperationsProto(Protocol):
"""Protocol defining attachments operations interface."""
@abstractmethod
def upload_attachments(
self, issue_key: str, file_paths: list[str]
) -> dict[str, Any]:
"""
Upload multiple attachments to a Jira issue.
Args:
issue_key: The Jira issue key (e.g., 'PROJ-123')
file_paths: List of paths to files to upload
Returns:
A dictionary with upload results
"""
class IssueOperationsProto(Protocol):
"""Protocol defining issue operations interface."""
@abstractmethod
def get_issue(
self,
issue_key: str,
expand: str | None = None,
comment_limit: int | str | None = 10,
fields: str
| list[str]
| tuple[str, ...]
| set[str]
| None = "summary,description,status,assignee,reporter,labels,priority,created,updated,issuetype",
properties: str | list[str] | None = None,
update_history: bool = True,
) -> JiraIssue:
"""Get a Jira issue by key."""
class SearchOperationsProto(Protocol):
"""Protocol defining search operations interface."""
@abstractmethod
def search_issues(
self,
jql: str,
fields: str
| list[str]
| tuple[str, ...]
| set[str]
| None = "summary,description,status,assignee,reporter,labels,priority,created,updated,issuetype",
start: int = 0,
limit: int = 50,
expand: str | None = None,
projects_filter: str | None = None,
) -> JiraSearchResult:
"""Search for issues using JQL."""
class EpicOperationsProto(Protocol):
"""Protocol defining epic operations interface."""
@abstractmethod
def update_epic_fields(self, issue_key: str, kwargs: dict[str, Any]) -> JiraIssue:
"""
Update Epic-specific fields after Epic creation.
This method implements the second step of the two-step Epic creation process,
applying Epic-specific fields that may be rejected during initial creation
due to screen configuration restrictions.
Args:
issue_key: The key of the created Epic
kwargs: Dictionary containing special keys with Epic field information
Returns:
JiraIssue: The updated Epic
Raises:
Exception: If there is an error updating the Epic fields
"""
@abstractmethod
def prepare_epic_fields(
self,
fields: dict[str, Any],
summary: str,
kwargs: dict[str, Any],
project_key: str = None,
) -> None:
"""
Prepare epic-specific fields for issue creation.
Args:
fields: The fields dictionary to update
summary: The issue summary that can be used as a default epic name
kwargs: Additional fields from the user
"""
@abstractmethod
def _try_discover_fields_from_existing_epic(
self, field_ids: dict[str, str]
) -> None:
"""
Try to discover Epic fields from existing epics in the Jira instance.
This is a fallback method used when standard field discovery doesn't find
all the necessary Epic-related fields. It searches for an existing Epic and
analyzes its field structure to identify Epic fields dynamically.
Args:
field_ids: Dictionary of field IDs to update with discovered fields
"""
class FieldsOperationsProto(Protocol):
"""Protocol defining fields operations interface."""
@abstractmethod
def _generate_field_map(self, force_regenerate: bool = False) -> dict[str, str]:
"""
Generates and caches a map of lowercase field names to field IDs.
Args:
force_regenerate: If True, forces regeneration even if cache exists.
Returns:
A dictionary mapping lowercase field names and field IDs to actual field IDs.
"""
@abstractmethod
def get_field_by_id(
self, field_id: str, refresh: bool = False
) -> dict[str, Any] | None:
"""
Get field definition by ID.
"""
@abstractmethod
def get_field_ids_to_epic(self) -> dict[str, str]:
"""
Dynamically discover Jira field IDs relevant to Epic linking.
This method queries the Jira API to find the correct custom field IDs
for Epic-related fields, which can vary between different Jira instances.
Returns:
Dictionary mapping field names to their IDs
(e.g., {'epic_link': 'customfield_10014', 'epic_name': 'customfield_10011'})
"""
@abstractmethod
def get_required_fields(self, issue_type: str, project_key: str) -> dict[str, Any]:
"""
Get required fields for creating an issue of a specific type in a project.
Args:
issue_type: The issue type (e.g., 'Bug', 'Story', 'Epic')
project_key: The project key (e.g., 'PROJ')
Returns:
Dictionary mapping required field names to their definitions
"""
@runtime_checkable
class ProjectsOperationsProto(Protocol):
"""Protocol defining project operations interface."""
@abstractmethod
def get_project_issue_types(self, project_key: str) -> list[dict[str, Any]]:
"""
Get all issue types available for a project.
Args:
project_key: The project key
Returns:
List of issue type data dictionaries
"""
@runtime_checkable
class UsersOperationsProto(Protocol):
"""Protocol defining user operations interface."""
@abstractmethod
def _get_account_id(self, assignee: str) -> str:
"""Get the account ID for a username.
Args:
assignee: Username or account ID
Returns:
Account ID
Raises:
ValueError: If the account ID could not be found
"""
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/sprints.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira sprints operations."""
import datetime
import logging
from typing import Any
import requests
from ..models.jira import JiraSprint
from ..utils import parse_date
from .client import JiraClient
logger = logging.getLogger("mcp-jira")
class SprintsMixin(JiraClient):
"""Mixin for Jira sprints operations."""
def get_all_sprints_from_board(
self, board_id: str, state: str | None = None, start: int = 0, limit: int = 50
) -> list[dict[str, Any]]:
"""
Get all sprints from a board.
Args:
board_id: Board ID
state: Sprint state (e.g., active, future, closed) if None, return all state sprints
start: Start index
limit: Maximum number of sprints to return
Returns:
List of sprints
"""
try:
sprints = self.jira.get_all_sprints_from_board(
board_id=board_id,
state=state,
start=start,
limit=limit,
)
return sprints.get("values", []) if isinstance(sprints, dict) else []
except requests.HTTPError as e:
logger.error(
f"Error getting all sprints from board: {str(e.response.content)}"
)
return []
except Exception as e:
logger.error(f"Error getting all sprints from board: {str(e)}")
return []
def get_all_sprints_from_board_model(
self, board_id: str, state: str | None = None, start: int = 0, limit: int = 50
) -> list[JiraSprint]:
"""
Get all sprints as JiraSprint from a board.
Args:
board_id: Board ID
state: Sprint state (e.g., active, future, closed) if None, return all state sprints
start: Start index
limit: Maximum number of sprints to return
Returns:
List of JiraSprint
"""
sprints = self.get_all_sprints_from_board(
board_id=board_id,
state=state,
start=start,
limit=limit,
)
return [JiraSprint.from_api_response(sprint) for sprint in sprints]
def update_sprint(
self,
sprint_id: str,
sprint_name: str | None,
state: str | None,
start_date: str | None,
end_date: str | None,
goal: str | None,
) -> JiraSprint | None:
"""
Update a sprint.
Args:
sprint_id: Sprint ID
sprint_name: New name for the sprint (optional)
state: New state for the sprint (future|active|closed - optional)
start_date: New start date for the sprint (optional)
end_date: New end date for the sprint (optional)
goal: New goal for the sprint (optional)
Returns:
Updated sprint
"""
data = {}
if sprint_name:
data["name"] = sprint_name
if state and state not in ["future", "active", "closed"]:
logger.warning("Invalid state. Valid states are: future, active, closed.")
return None
elif state:
data["state"] = state
if start_date:
data["startDate"] = start_date
if end_date:
data["endDate"] = end_date
if goal:
data["goal"] = goal
if not sprint_id:
logger.warning("Sprint ID is required.")
return None
try:
updated_sprint = self.jira.update_partially_sprint(
sprint_id=sprint_id,
data=data,
)
if not isinstance(updated_sprint, dict):
msg = f"Unexpected return value type from `SprintMixin.update_sprint`: {type(updated_sprint)}"
logger.error(msg)
raise TypeError(msg)
return JiraSprint.from_api_response(updated_sprint)
except requests.HTTPError as e:
logger.error(f"Error updating sprint: {str(e.response.content)}")
return None
except Exception as e:
logger.error(f"Error updating sprint: {str(e)}")
return None
def create_sprint(
self,
board_id: str,
sprint_name: str,
start_date: str,
end_date: str,
goal: str | None = None,
) -> JiraSprint:
"""
Create a new sprint.
Args:
board_id: Board ID
sprint_name: Sprint name
start_date: Start date in ISO format
end_date: End date in ISO format
goal: Sprint goal
Returns:
Created sprint details
"""
if not start_date:
raise ValueError("Start date is required.")
# validate start date format
parsed_start_date = parse_date(start_date)
if parsed_start_date is None:
raise ValueError("Start date is required.")
# validate start date is not in the past
if parsed_start_date < datetime.datetime.now(datetime.timezone.utc):
raise ValueError("Start date cannot be in the past.")
# validate end date format
if end_date:
parsed_end_date = parse_date(end_date)
if parsed_end_date is not None and parsed_start_date >= parsed_end_date:
raise ValueError("Start date must be before end date.")
try:
sprint = self.jira.create_sprint(
name=sprint_name,
board_id=board_id,
start_date=start_date,
end_date=end_date,
goal=goal,
)
logger.info(f"Sprint created: {sprint}")
if not isinstance(sprint, dict):
msg = f"Unexpected return value type from `SprintMixin.create_sprint`: {type(sprint)}"
logger.error(msg)
raise TypeError(msg)
return JiraSprint.from_api_response(sprint)
except requests.HTTPError as e:
logger.error(f"Error creating sprint: {str(e.response.content)}")
raise
except Exception as e:
logger.error(f"Error creating sprint: {str(e)}")
raise
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_ssl.py:
--------------------------------------------------------------------------------
```python
"""Tests for the SSL utilities module."""
import ssl
from unittest.mock import MagicMock, patch
from requests.adapters import HTTPAdapter
from requests.sessions import Session
from mcp_atlassian.utils.ssl import SSLIgnoreAdapter, configure_ssl_verification
def test_ssl_ignore_adapter_cert_verify():
"""Test that SSLIgnoreAdapter overrides cert verification."""
# Arrange
adapter = SSLIgnoreAdapter()
connection = MagicMock()
url = "https://example.com"
cert = None
# Mock the super class's cert_verify method
with patch.object(HTTPAdapter, "cert_verify") as mock_super_cert_verify:
# Act
adapter.cert_verify(
connection, url, verify=True, cert=cert
) # Pass True, but expect False to be used
# Assert
mock_super_cert_verify.assert_called_once_with(
connection, url, verify=False, cert=cert
)
def test_ssl_ignore_adapter_init_poolmanager():
"""Test that SSLIgnoreAdapter properly initializes the connection pool with SSL verification disabled."""
# Arrange
adapter = SSLIgnoreAdapter()
# Create a mock for PoolManager that will be returned by constructor
mock_pool_manager = MagicMock()
# Mock ssl.create_default_context
with patch("ssl.create_default_context") as mock_create_context:
mock_context = MagicMock()
mock_create_context.return_value = mock_context
# Patch the PoolManager constructor
with patch(
"mcp_atlassian.utils.ssl.PoolManager", return_value=mock_pool_manager
) as mock_pool_manager_cls:
# Act
adapter.init_poolmanager(5, 10, block=True)
# Assert
mock_create_context.assert_called_once()
assert mock_context.check_hostname is False
assert mock_context.verify_mode == ssl.CERT_NONE
# Verify PoolManager was called with our context
mock_pool_manager_cls.assert_called_once()
_, kwargs = mock_pool_manager_cls.call_args
assert kwargs["num_pools"] == 5
assert kwargs["maxsize"] == 10
assert kwargs["block"] is True
assert kwargs["ssl_context"] == mock_context
def test_configure_ssl_verification_disabled():
"""Test configure_ssl_verification when SSL verification is disabled."""
# Arrange
service_name = "TestService"
url = "https://test.example.com/path"
session = MagicMock() # Use MagicMock instead of actual Session
ssl_verify = False
# Mock the logger to avoid issues with real logging
with patch("mcp_atlassian.utils.ssl.logger") as mock_logger:
with patch("mcp_atlassian.utils.ssl.SSLIgnoreAdapter") as mock_adapter_class:
mock_adapter = MagicMock()
mock_adapter_class.return_value = mock_adapter
# Act
configure_ssl_verification(service_name, url, session, ssl_verify)
# Assert
mock_adapter_class.assert_called_once()
# Verify the adapter is mounted for both http and https
assert session.mount.call_count == 2
session.mount.assert_any_call("https://test.example.com", mock_adapter)
session.mount.assert_any_call("http://test.example.com", mock_adapter)
def test_configure_ssl_verification_enabled():
"""Test configure_ssl_verification when SSL verification is enabled."""
# Arrange
service_name = "TestService"
url = "https://test.example.com/path"
session = MagicMock() # Use MagicMock instead of actual Session
ssl_verify = True
with patch("mcp_atlassian.utils.ssl.SSLIgnoreAdapter") as mock_adapter_class:
# Act
configure_ssl_verification(service_name, url, session, ssl_verify)
# Assert
mock_adapter_class.assert_not_called()
assert session.mount.call_count == 0
def test_configure_ssl_verification_enabled_with_real_session():
"""Test SSL verification configuration when verification is enabled using a real Session."""
session = Session()
original_adapters_count = len(session.adapters)
# Configure with SSL verification enabled
configure_ssl_verification(
service_name="Test",
url="https://example.com",
session=session,
ssl_verify=True,
)
# No adapters should be added when SSL verification is enabled
assert len(session.adapters) == original_adapters_count
def test_configure_ssl_verification_disabled_with_real_session():
"""Test SSL verification configuration when verification is disabled using a real Session."""
session = Session()
original_adapters_count = len(session.adapters)
# Mock the logger to avoid issues with real logging
with patch("mcp_atlassian.utils.ssl.logger") as mock_logger:
# Configure with SSL verification disabled
configure_ssl_verification(
service_name="Test",
url="https://example.com",
session=session,
ssl_verify=False,
)
# Should add custom adapters for http and https protocols
assert len(session.adapters) == original_adapters_count + 2
assert "https://example.com" in session.adapters
assert "http://example.com" in session.adapters
assert isinstance(session.adapters["https://example.com"], SSLIgnoreAdapter)
assert isinstance(session.adapters["http://example.com"], SSLIgnoreAdapter)
def test_ssl_ignore_adapter():
"""Test the SSLIgnoreAdapter overrides the cert_verify method."""
# Mock objects
adapter = SSLIgnoreAdapter()
conn = MagicMock()
url = "https://example.com"
cert = None
# Test with verify=True - the adapter should still bypass SSL verification
with patch.object(HTTPAdapter, "cert_verify") as mock_cert_verify:
adapter.cert_verify(conn, url, verify=True, cert=cert)
mock_cert_verify.assert_called_once_with(conn, url, verify=False, cert=cert)
# Test with verify=False - same behavior
with patch.object(HTTPAdapter, "cert_verify") as mock_cert_verify:
adapter.cert_verify(conn, url, verify=False, cert=cert)
mock_cert_verify.assert_called_once_with(conn, url, verify=False, cert=cert)
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_config.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira config module."""
import os
from unittest.mock import patch
import pytest
from mcp_atlassian.jira.config import JiraConfig
def test_from_env_basic_auth():
"""Test that from_env correctly loads basic auth configuration."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_username",
"JIRA_API_TOKEN": "test_token",
},
clear=True,
):
config = JiraConfig.from_env()
assert config.url == "https://test.atlassian.net"
assert config.auth_type == "basic"
assert config.username == "test_username"
assert config.api_token == "test_token"
assert config.personal_token is None
assert config.ssl_verify is True
def test_from_env_token_auth():
"""Test that from_env correctly loads token auth configuration."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://jira.example.com",
"JIRA_PERSONAL_TOKEN": "test_personal_token",
"JIRA_SSL_VERIFY": "false",
},
clear=True,
):
config = JiraConfig.from_env()
assert config.url == "https://jira.example.com"
assert config.auth_type == "pat"
assert config.username is None
assert config.api_token is None
assert config.personal_token == "test_personal_token"
assert config.ssl_verify is False
def test_from_env_missing_url():
"""Test that from_env raises ValueError when URL is missing."""
original_env = os.environ.copy()
try:
os.environ.clear()
with pytest.raises(
ValueError, match="Missing required JIRA_URL environment variable"
):
JiraConfig.from_env()
finally:
# Restore original environment
os.environ.clear()
os.environ.update(original_env)
def test_from_env_missing_cloud_auth():
"""Test that from_env raises ValueError when cloud auth credentials are missing."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net", # Cloud URL
},
clear=True,
):
with pytest.raises(
ValueError,
match="Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN",
):
JiraConfig.from_env()
def test_from_env_missing_server_auth():
"""Test that from_env raises ValueError when server auth credentials are missing."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://jira.example.com", # Server URL
},
clear=True,
):
with pytest.raises(
ValueError,
match="Server/Data Center authentication requires JIRA_PERSONAL_TOKEN",
):
JiraConfig.from_env()
def test_is_cloud():
"""Test that is_cloud property returns correct value."""
# Arrange & Act - Cloud URL
config = JiraConfig(
url="https://example.atlassian.net",
auth_type="basic",
username="test",
api_token="test",
)
# Assert
assert config.is_cloud is True
# Arrange & Act - Server URL
config = JiraConfig(
url="https://jira.example.com",
auth_type="pat",
personal_token="test",
)
# Assert
assert config.is_cloud is False
# Arrange & Act - Localhost URL (Data Center/Server)
config = JiraConfig(
url="http://localhost:8080",
auth_type="pat",
personal_token="test",
)
# Assert
assert config.is_cloud is False
# Arrange & Act - IP localhost URL (Data Center/Server)
config = JiraConfig(
url="http://127.0.0.1:8080",
auth_type="pat",
personal_token="test",
)
# Assert
assert config.is_cloud is False
def test_from_env_proxy_settings():
"""Test that from_env correctly loads proxy environment variables."""
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_username",
"JIRA_API_TOKEN": "test_token",
"HTTP_PROXY": "http://proxy.example.com:8080",
"HTTPS_PROXY": "https://proxy.example.com:8443",
"SOCKS_PROXY": "socks5://user:[email protected]:1080",
"NO_PROXY": "localhost,127.0.0.1",
},
clear=True,
):
config = JiraConfig.from_env()
assert config.http_proxy == "http://proxy.example.com:8080"
assert config.https_proxy == "https://proxy.example.com:8443"
assert config.socks_proxy == "socks5://user:[email protected]:1080"
assert config.no_proxy == "localhost,127.0.0.1"
# Service-specific overrides
with patch.dict(
os.environ,
{
"JIRA_URL": "https://test.atlassian.net",
"JIRA_USERNAME": "test_username",
"JIRA_API_TOKEN": "test_token",
"JIRA_HTTP_PROXY": "http://jira-proxy.example.com:8080",
"JIRA_HTTPS_PROXY": "https://jira-proxy.example.com:8443",
"JIRA_SOCKS_PROXY": "socks5://user:[email protected]:1080",
"JIRA_NO_PROXY": "localhost,127.0.0.1,.internal.example.com",
},
clear=True,
):
config = JiraConfig.from_env()
assert config.http_proxy == "http://jira-proxy.example.com:8080"
assert config.https_proxy == "https://jira-proxy.example.com:8443"
assert config.socks_proxy == "socks5://user:[email protected]:1080"
assert config.no_proxy == "localhost,127.0.0.1,.internal.example.com"
def test_is_cloud_oauth_with_cloud_id():
"""Test that is_cloud returns True for OAuth with cloud_id regardless of URL."""
from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig
# OAuth with cloud_id and no URL - should be Cloud
oauth_config = BYOAccessTokenOAuthConfig(
cloud_id="test-cloud-id", access_token="test-token"
)
config = JiraConfig(
url=None, # URL can be None in Multi-Cloud OAuth mode
auth_type="oauth",
oauth_config=oauth_config,
)
assert config.is_cloud is True
# OAuth with cloud_id and server URL - should still be Cloud
config = JiraConfig(
url="https://jira.example.com", # Server-like URL
auth_type="oauth",
oauth_config=oauth_config,
)
assert config.is_cloud is True
```
--------------------------------------------------------------------------------
/tests/utils/mocks.py:
--------------------------------------------------------------------------------
```python
"""Reusable mock utilities and fixtures for MCP Atlassian tests."""
import os
from contextlib import contextmanager
from typing import Any
from unittest.mock import MagicMock, patch
from .factories import AuthConfigFactory, ConfluencePageFactory, JiraIssueFactory
class MockEnvironment:
"""Utility for mocking environment variables."""
@staticmethod
@contextmanager
def oauth_env():
"""Context manager for OAuth environment variables."""
oauth_vars = AuthConfigFactory.create_oauth_config()
env_vars = {
"ATLASSIAN_OAUTH_CLIENT_ID": oauth_vars["client_id"],
"ATLASSIAN_OAUTH_CLIENT_SECRET": oauth_vars["client_secret"],
"ATLASSIAN_OAUTH_REDIRECT_URI": oauth_vars["redirect_uri"],
"ATLASSIAN_OAUTH_SCOPE": oauth_vars["scope"],
"ATLASSIAN_OAUTH_CLOUD_ID": oauth_vars["cloud_id"],
}
with patch.dict(os.environ, env_vars, clear=False):
yield env_vars
@staticmethod
@contextmanager
def basic_auth_env():
"""Context manager for basic auth environment variables."""
auth_config = AuthConfigFactory.create_basic_auth_config()
env_vars = {
"JIRA_URL": auth_config["url"],
"JIRA_USERNAME": auth_config["username"],
"JIRA_API_TOKEN": auth_config["api_token"],
"CONFLUENCE_URL": f"{auth_config['url']}/wiki",
"CONFLUENCE_USERNAME": auth_config["username"],
"CONFLUENCE_API_TOKEN": auth_config["api_token"],
}
with patch.dict(os.environ, env_vars, clear=False):
yield env_vars
@staticmethod
@contextmanager
def clean_env():
"""Context manager with no authentication environment variables."""
auth_vars = [
"JIRA_URL",
"JIRA_USERNAME",
"JIRA_API_TOKEN",
"CONFLUENCE_URL",
"CONFLUENCE_USERNAME",
"CONFLUENCE_API_TOKEN",
"ATLASSIAN_OAUTH_CLIENT_ID",
"ATLASSIAN_OAUTH_CLIENT_SECRET",
"ATLASSIAN_OAUTH_REDIRECT_URI",
"ATLASSIAN_OAUTH_SCOPE",
"ATLASSIAN_OAUTH_CLOUD_ID",
"ATLASSIAN_OAUTH_ENABLE",
]
# Remove auth vars from environment
with patch.dict(os.environ, {}, clear=False) as env_dict:
for var in auth_vars:
env_dict.pop(var, None)
yield env_dict
class MockAtlassianClient:
"""Factory for creating mock Atlassian clients."""
@staticmethod
def create_jira_client(**response_overrides):
"""Create a mock Jira client with common responses."""
client = MagicMock()
# Default responses
default_responses = {
"issue": JiraIssueFactory.create(),
"search_issues": {
"issues": [
JiraIssueFactory.create("TEST-1"),
JiraIssueFactory.create("TEST-2"),
],
"total": 2,
},
"projects": [{"key": "TEST", "name": "Test Project"}],
"fields": [{"id": "summary", "name": "Summary"}],
}
# Merge with overrides
responses = {**default_responses, **response_overrides}
# Set up mock methods
client.issue.return_value = responses["issue"]
client.search_issues.return_value = responses["search_issues"]
client.projects.return_value = responses["projects"]
client.fields.return_value = responses["fields"]
return client
@staticmethod
def create_confluence_client(**response_overrides):
"""Create a mock Confluence client with common responses."""
client = MagicMock()
# Default responses
default_responses = {
"get_page_by_id": ConfluencePageFactory.create(),
"get_all_pages_from_space": {
"results": [
ConfluencePageFactory.create("123"),
ConfluencePageFactory.create("456"),
]
},
"get_all_spaces": {"results": [{"key": "TEST", "name": "Test Space"}]},
}
# Merge with overrides
responses = {**default_responses, **response_overrides}
# Set up mock methods
client.get_page_by_id.return_value = responses["get_page_by_id"]
client.get_all_pages_from_space.return_value = responses[
"get_all_pages_from_space"
]
client.get_all_spaces.return_value = responses["get_all_spaces"]
return client
class MockOAuthServer:
"""Utility for mocking OAuth server interactions."""
@staticmethod
@contextmanager
def mock_oauth_flow():
"""Context manager for mocking complete OAuth flow."""
with (
patch("http.server.HTTPServer") as mock_server,
patch("webbrowser.open") as mock_browser,
patch("secrets.token_urlsafe") as mock_token,
):
# Configure mocks
mock_token.return_value = "test-state-token"
mock_server_instance = MagicMock()
mock_server.return_value = mock_server_instance
yield {
"server": mock_server,
"server_instance": mock_server_instance,
"browser": mock_browser,
"token": mock_token,
}
class MockFastMCP:
"""Utility for mocking FastMCP components."""
@staticmethod
def create_request(state_data: dict[str, Any] | None = None):
"""Create a mock FastMCP request."""
request = MagicMock()
request.state = MagicMock()
if state_data:
for key, value in state_data.items():
setattr(request.state, key, value)
return request
@staticmethod
def create_context():
"""Create a mock FastMCP context."""
return MagicMock()
class MockPreprocessor:
"""Utility for mocking content preprocessors."""
@staticmethod
def create_html_to_markdown():
"""Create a mock HTML to Markdown preprocessor."""
preprocessor = MagicMock()
preprocessor.process.return_value = "# Markdown Content"
return preprocessor
@staticmethod
def create_markdown_to_html():
"""Create a mock Markdown to HTML preprocessor."""
preprocessor = MagicMock()
preprocessor.process.return_value = "<h1>HTML Content</h1>"
return preprocessor
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_links.py:
--------------------------------------------------------------------------------
```python
from unittest.mock import MagicMock, Mock, patch
import pytest
from requests.exceptions import HTTPError
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.links import LinksMixin
from mcp_atlassian.models.jira import JiraIssueLinkType
class TestLinksMixin:
@pytest.fixture
def links_mixin(self, mock_config, mock_atlassian_jira):
mixin = LinksMixin(config=mock_config)
mixin.jira = mock_atlassian_jira
return mixin
def test_get_issue_link_types_success(self, links_mixin):
"""Test successful retrieval of issue link types."""
mock_response = {
"issueLinkTypes": [
{
"id": "10000",
"name": "Blocks",
"inward": "is blocked by",
"outward": "blocks",
},
{
"id": "10001",
"name": "Duplicate",
"inward": "is duplicated by",
"outward": "duplicates",
},
]
}
links_mixin.jira.get.return_value = mock_response
def fake_from_api_response(data):
mock = MagicMock()
mock.name = data["name"]
return mock
with patch.object(
JiraIssueLinkType, "from_api_response", side_effect=fake_from_api_response
):
result = links_mixin.get_issue_link_types()
assert len(result) == 2
assert result[0].name == "Blocks"
assert result[1].name == "Duplicate"
links_mixin.jira.get.assert_called_once_with("rest/api/2/issueLinkType")
def test_get_issue_link_types_authentication_error(self, links_mixin):
links_mixin.jira.get.side_effect = HTTPError(response=Mock(status_code=401))
with pytest.raises(MCPAtlassianAuthenticationError):
links_mixin.get_issue_link_types()
def test_get_issue_link_types_generic_error(self, links_mixin):
links_mixin.jira.get.side_effect = Exception("Unexpected error")
with pytest.raises(Exception, match="Unexpected error"):
links_mixin.get_issue_link_types()
def test_create_issue_link_success(self, links_mixin):
data = {
"type": {"name": "Blocks"},
"inwardIssue": {"key": "PROJ-123"},
"outwardIssue": {"key": "PROJ-456"},
}
response = links_mixin.create_issue_link(data)
assert response["success"] is True
assert "Link created between PROJ-123 and PROJ-456" in response["message"]
links_mixin.jira.create_issue_link.assert_called_once_with(data)
def test_create_issue_link_missing_type(self, links_mixin):
data = {
"inwardIssue": {"key": "PROJ-123"},
"outwardIssue": {"key": "PROJ-456"},
}
with pytest.raises(ValueError, match="Link type is required"):
links_mixin.create_issue_link(data)
def test_create_issue_link_authentication_error(self, links_mixin):
data = {
"type": {"name": "Blocks"},
"inwardIssue": {"key": "PROJ-123"},
"outwardIssue": {"key": "PROJ-456"},
}
links_mixin.jira.create_issue_link.side_effect = HTTPError(
response=Mock(status_code=401)
)
with pytest.raises(MCPAtlassianAuthenticationError):
links_mixin.create_issue_link(data)
def test_create_remote_issue_link_success(self, links_mixin):
issue_key = "PROJ-123"
link_data = {
"object": {
"url": "https://example.com/page",
"title": "Example Page",
"summary": "A test page",
},
"relationship": "documentation",
}
response = links_mixin.create_remote_issue_link(issue_key, link_data)
assert response["success"] is True
assert response["issue_key"] == issue_key
assert response["link_title"] == "Example Page"
assert response["link_url"] == "https://example.com/page"
assert response["relationship"] == "documentation"
links_mixin.jira.post.assert_called_once_with(
"rest/api/3/issue/PROJ-123/remotelink", json=link_data
)
def test_create_remote_issue_link_missing_issue_key(self, links_mixin):
link_data = {
"object": {"url": "https://example.com/page", "title": "Example Page"}
}
with pytest.raises(ValueError, match="Issue key is required"):
links_mixin.create_remote_issue_link("", link_data)
def test_create_remote_issue_link_missing_object(self, links_mixin):
issue_key = "PROJ-123"
link_data = {"relationship": "documentation"}
with pytest.raises(ValueError, match="Link object is required"):
links_mixin.create_remote_issue_link(issue_key, link_data)
def test_create_remote_issue_link_missing_url(self, links_mixin):
issue_key = "PROJ-123"
link_data = {"object": {"title": "Example Page"}}
with pytest.raises(ValueError, match="URL is required in link object"):
links_mixin.create_remote_issue_link(issue_key, link_data)
def test_create_remote_issue_link_missing_title(self, links_mixin):
issue_key = "PROJ-123"
link_data = {"object": {"url": "https://example.com/page"}}
with pytest.raises(ValueError, match="Title is required in link object"):
links_mixin.create_remote_issue_link(issue_key, link_data)
def test_create_remote_issue_link_authentication_error(self, links_mixin):
issue_key = "PROJ-123"
link_data = {
"object": {"url": "https://example.com/page", "title": "Example Page"}
}
links_mixin.jira.post.side_effect = HTTPError(response=Mock(status_code=401))
with pytest.raises(MCPAtlassianAuthenticationError):
links_mixin.create_remote_issue_link(issue_key, link_data)
def test_remove_issue_link_success(self, links_mixin):
link_id = "10000"
response = links_mixin.remove_issue_link(link_id)
assert response["success"] is True
assert f"Link with ID {link_id} has been removed" in response["message"]
links_mixin.jira.remove_issue_link.assert_called_once_with(link_id)
def test_remove_issue_link_empty_id(self, links_mixin):
with pytest.raises(ValueError, match="Link ID is required"):
links_mixin.remove_issue_link("")
def test_remove_issue_link_authentication_error(self, links_mixin):
link_id = "10000"
links_mixin.jira.remove_issue_link.side_effect = HTTPError(
response=Mock(status_code=401)
)
with pytest.raises(MCPAtlassianAuthenticationError):
links_mixin.remove_issue_link(link_id)
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_protocols.py:
--------------------------------------------------------------------------------
```python
"""Tests for Jira protocol definitions."""
import inspect
from typing import Any, get_type_hints
from mcp_atlassian.jira.protocols import (
AttachmentsOperationsProto,
UsersOperationsProto,
)
from mcp_atlassian.models.jira import JiraIssue
from mcp_atlassian.models.jira.search import JiraSearchResult
class TestProtocolCompliance:
"""Tests for protocol compliance checking."""
def test_compliant_attachments_implementation(self):
"""Test compliant attachments implementation."""
class CompliantAttachments:
def upload_attachments(
self, issue_key: str, file_paths: list[str]
) -> dict[str, Any]:
return {"uploaded": len(file_paths)}
instance = CompliantAttachments()
assert hasattr(instance, "upload_attachments")
assert callable(instance.upload_attachments)
def test_compliant_issue_implementation(self):
"""Test compliant issue implementation."""
class CompliantIssues:
def get_issue(
self,
issue_key: str,
expand: str | None = None,
comment_limit: int | str | None = 10,
fields: str | list[str] | tuple[str, ...] | set[str] | None = (
"summary,description,status,assignee,reporter,labels,"
"priority,created,updated,issuetype"
),
properties: str | list[str] | None = None,
*,
update_history: bool = True,
) -> JiraIssue:
return JiraIssue(id="123", key=issue_key, summary="Test Issue")
instance = CompliantIssues()
assert hasattr(instance, "get_issue")
result = instance.get_issue("TEST-1")
assert isinstance(result, JiraIssue)
assert result.key == "TEST-1"
def test_compliant_search_implementation(self):
"""Test compliant search implementation."""
class CompliantSearch:
def search_issues(
self,
jql: str,
fields: str | list[str] | tuple[str, ...] | set[str] | None = (
"summary,description,status,assignee,reporter,labels,"
"priority,created,updated,issuetype"
),
start: int = 0,
limit: int = 50,
expand: str | None = None,
projects_filter: str | None = None,
) -> JiraSearchResult:
return JiraSearchResult(
total=0, start_at=start, max_results=limit, issues=[]
)
instance = CompliantSearch()
result = instance.search_issues("project = TEST")
assert isinstance(result, JiraSearchResult)
def test_runtime_checkable_users_protocol(self):
"""Test runtime checking for UsersOperationsProto."""
class CompliantUsers:
def _get_account_id(self, assignee: str) -> str:
return f"account-id-for-{assignee}"
class NonCompliantUsers:
pass
compliant_instance = CompliantUsers()
non_compliant_instance = NonCompliantUsers()
# Runtime checkable only checks method existence
assert isinstance(compliant_instance, UsersOperationsProto)
assert not isinstance(non_compliant_instance, UsersOperationsProto)
class TestProtocolContractValidation:
"""Tests for validating protocol contract compliance."""
def test_method_signature_validation(self):
"""Test method signature validation helper."""
def validate_method_signature(protocol_class, method_name: str, implementation):
"""Validate implementation method signature matches protocol."""
protocol_method = getattr(protocol_class, method_name)
impl_method = getattr(implementation, method_name)
protocol_sig = inspect.signature(protocol_method)
impl_sig = inspect.signature(impl_method)
# Compare parameter names (excluding 'self')
protocol_params = [p for p in protocol_sig.parameters.keys() if p != "self"]
impl_params = [p for p in impl_sig.parameters.keys() if p != "self"]
return protocol_params == impl_params
class TestImplementation:
def upload_attachments(
self, issue_key: str, file_paths: list[str]
) -> dict[str, Any]:
return {}
impl = TestImplementation()
assert validate_method_signature(
AttachmentsOperationsProto, "upload_attachments", impl
)
def test_type_hint_validation(self):
"""Test type hint compliance validation."""
def validate_type_hints(protocol_class, method_name: str, implementation):
"""Validate type hints match between protocol and implementation."""
protocol_method = getattr(protocol_class, method_name)
impl_method = getattr(implementation, method_name)
protocol_hints = get_type_hints(protocol_method)
impl_hints = get_type_hints(impl_method)
# Check return type
return protocol_hints.get("return") == impl_hints.get("return")
class TypeCompliantImplementation:
def upload_attachments(
self, issue_key: str, file_paths: list[str]
) -> dict[str, Any]:
return {}
impl = TypeCompliantImplementation()
assert validate_type_hints(
AttachmentsOperationsProto, "upload_attachments", impl
)
def test_structural_compliance_check(self):
"""Test structural typing validation."""
def check_structural_compliance(instance, protocol_class):
"""Check if instance structurally complies with protocol."""
abstract_methods = []
for attr_name in dir(protocol_class):
if not attr_name.startswith("__"):
attr = getattr(protocol_class, attr_name, None)
if (
callable(attr)
and hasattr(attr, "__isabstractmethod__")
and attr.__isabstractmethod__
):
abstract_methods.append(attr_name)
# Check if instance has all required methods
for method_name in abstract_methods:
if not hasattr(instance, method_name):
return False
if not callable(getattr(instance, method_name)):
return False
return True
class CompliantImplementation:
def upload_attachments(
self, issue_key: str, file_paths: list[str]
) -> dict[str, Any]:
return {}
class NonCompliantImplementation:
def some_other_method(self):
pass
compliant = CompliantImplementation()
non_compliant = NonCompliantImplementation()
assert check_structural_compliance(compliant, AttachmentsOperationsProto)
assert not check_structural_compliance(
non_compliant, AttachmentsOperationsProto
)
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_custom_headers.py:
--------------------------------------------------------------------------------
```python
"""Tests for custom headers parsing functionality."""
from mcp_atlassian.utils.env import get_custom_headers
class TestParseCustomHeaders:
"""Test the parse_custom_headers function."""
def test_empty_input(self, monkeypatch):
"""Test parse_custom_headers with empty/None inputs."""
# Test unset environment variable
monkeypatch.delenv("TEST_HEADERS", raising=False)
assert get_custom_headers("TEST_HEADERS") == {}
# Test empty string
monkeypatch.setenv("TEST_HEADERS", "")
assert get_custom_headers("TEST_HEADERS") == {}
# Test whitespace only
monkeypatch.setenv("TEST_HEADERS", " ")
assert get_custom_headers("TEST_HEADERS") == {}
monkeypatch.setenv("TEST_HEADERS", "\t\n")
assert get_custom_headers("TEST_HEADERS") == {}
def test_single_header(self, monkeypatch):
"""Test parsing a single header."""
monkeypatch.setenv("TEST_HEADERS", "X-Custom=value123")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Custom": "value123"}
# Test with spaces around key and value
monkeypatch.setenv("TEST_HEADERS", " X-Spaced = value with spaces ")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Spaced": "value with spaces"}
def test_multiple_headers(self, monkeypatch):
"""Test parsing multiple comma-separated headers."""
monkeypatch.setenv("TEST_HEADERS", "X-Corp-Auth=token123,X-Dept=engineering")
result = get_custom_headers("TEST_HEADERS")
expected = {"X-Corp-Auth": "token123", "X-Dept": "engineering"}
assert result == expected
def test_headers_with_spaces(self, monkeypatch):
"""Test parsing headers with various spacing."""
monkeypatch.setenv("TEST_HEADERS", " X-Key = value , X-Another = value2 ")
result = get_custom_headers("TEST_HEADERS")
expected = {"X-Key": "value", "X-Another": "value2"}
assert result == expected
def test_value_with_equals_signs(self, monkeypatch):
"""Test parsing headers where values contain equals signs."""
monkeypatch.setenv("TEST_HEADERS", "X-Token=abc=def=123")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Token": "abc=def=123"}
# Multiple headers with equals in values
monkeypatch.setenv(
"TEST_HEADERS", "X-Token=abc=def,X-URL=https://api.example.com/v1?key=value"
)
result = get_custom_headers("TEST_HEADERS")
expected = {
"X-Token": "abc=def",
"X-URL": "https://api.example.com/v1?key=value",
}
assert result == expected
def test_malformed_headers(self, monkeypatch):
"""Test handling of malformed header strings."""
# Header without equals sign - should be skipped
monkeypatch.setenv("TEST_HEADERS", "invalid-header-format")
result = get_custom_headers("TEST_HEADERS")
assert result == {}
# Mix of valid and invalid headers
monkeypatch.setenv(
"TEST_HEADERS", "X-Valid=value,invalid-header,X-Another=value2"
)
result = get_custom_headers("TEST_HEADERS")
expected = {"X-Valid": "value", "X-Another": "value2"}
assert result == expected
def test_empty_key_or_value(self, monkeypatch):
"""Test handling of empty keys or values."""
# Empty key - should be skipped
monkeypatch.setenv("TEST_HEADERS", "=value")
result = get_custom_headers("TEST_HEADERS")
assert result == {}
# Empty value - should be included
monkeypatch.setenv("TEST_HEADERS", "X-Empty=")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Empty": ""}
# Whitespace-only key - should be skipped
monkeypatch.setenv("TEST_HEADERS", " =value")
result = get_custom_headers("TEST_HEADERS")
assert result == {}
# Mix of empty and valid
monkeypatch.setenv("TEST_HEADERS", "=empty_key,X-Valid=value, =another_empty")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Valid": "value"}
def test_special_characters_in_values(self, monkeypatch):
"""Test headers with special characters in values."""
monkeypatch.setenv("TEST_HEADERS", "X-Special=!@#$%^&*()_+-[]{}|;':\"/<>?")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Special": "!@#$%^&*()_+-[]{}|;':\"/<>?"}
def test_unicode_characters(self, monkeypatch):
"""Test headers with unicode characters."""
monkeypatch.setenv("TEST_HEADERS", "X-Unicode=café,X-Emoji=🚀")
result = get_custom_headers("TEST_HEADERS")
expected = {"X-Unicode": "café", "X-Emoji": "🚀"}
assert result == expected
def test_empty_pairs_in_list(self, monkeypatch):
"""Test handling of empty pairs in comma-separated list."""
# Empty pairs should be skipped
monkeypatch.setenv("TEST_HEADERS", "X-First=value1,,X-Second=value2,")
result = get_custom_headers("TEST_HEADERS")
expected = {"X-First": "value1", "X-Second": "value2"}
assert result == expected
# Only commas
monkeypatch.setenv("TEST_HEADERS", ",,,")
result = get_custom_headers("TEST_HEADERS")
assert result == {}
def test_complex_real_world_example(self, monkeypatch):
"""Test a complex real-world example."""
headers_string = (
"Authorization=Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9,"
"X-API-Key=sk-1234567890abcdef,"
"X-Request-ID=req_123456789,"
"X-Custom-Header=value with spaces and = signs,"
"User-Agent=MyApp/1.0 (Custom Agent)"
)
monkeypatch.setenv("TEST_HEADERS", headers_string)
result = get_custom_headers("TEST_HEADERS")
expected = {
"Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9",
"X-API-Key": "sk-1234567890abcdef",
"X-Request-ID": "req_123456789",
"X-Custom-Header": "value with spaces and = signs",
"User-Agent": "MyApp/1.0 (Custom Agent)",
}
assert result == expected
def test_case_sensitive_keys(self, monkeypatch):
"""Test that header keys are case-sensitive."""
monkeypatch.setenv(
"TEST_HEADERS", "x-lower=value1,X-UPPER=value2,X-Mixed=value3"
)
result = get_custom_headers("TEST_HEADERS")
expected = {"x-lower": "value1", "X-UPPER": "value2", "X-Mixed": "value3"}
assert result == expected
def test_duplicate_keys(self, monkeypatch):
"""Test behavior with duplicate keys - later values should override."""
monkeypatch.setenv("TEST_HEADERS", "X-Duplicate=first,X-Duplicate=second")
result = get_custom_headers("TEST_HEADERS")
assert result == {"X-Duplicate": "second"}
def test_newlines_and_tabs_in_input(self, monkeypatch):
"""Test handling of newlines and tabs in input."""
# These should be treated as part of values, not separators
monkeypatch.setenv(
"TEST_HEADERS", "X-Multi=line1\nline2,X-Tab=value\twith\ttabs"
)
result = get_custom_headers("TEST_HEADERS")
expected = {"X-Multi": "line1\nline2", "X-Tab": "value\twith\ttabs"}
assert result == expected
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/client.py:
--------------------------------------------------------------------------------
```python
"""Base client module for Confluence API interactions."""
import logging
import os
from atlassian import Confluence
from requests import Session
from ..exceptions import MCPAtlassianAuthenticationError
from ..utils.logging import get_masked_session_headers, log_config_param, mask_sensitive
from ..utils.oauth import configure_oauth_session
from ..utils.ssl import configure_ssl_verification
from .config import ConfluenceConfig
# Configure logging
logger = logging.getLogger("mcp-atlassian")
class ConfluenceClient:
"""Base client for Confluence API interactions."""
def __init__(self, config: ConfluenceConfig | None = None) -> None:
"""Initialize the Confluence client with given or environment config.
Args:
config: Configuration for Confluence client. If None, will load from
environment.
Raises:
ValueError: If configuration is invalid or environment variables are missing
MCPAtlassianAuthenticationError: If OAuth authentication fails
"""
self.config = config or ConfluenceConfig.from_env()
# Initialize the Confluence client based on auth type
if self.config.auth_type == "oauth":
if not self.config.oauth_config or not self.config.oauth_config.cloud_id:
error_msg = "OAuth authentication requires a valid cloud_id"
raise ValueError(error_msg)
# Create a session for OAuth
session = Session()
# Configure the session with OAuth authentication
if not configure_oauth_session(session, self.config.oauth_config):
error_msg = "Failed to configure OAuth session"
raise MCPAtlassianAuthenticationError(error_msg)
# The Confluence API URL with OAuth is different
api_url = f"https://api.atlassian.com/ex/confluence/{self.config.oauth_config.cloud_id}"
# Initialize Confluence with the session
self.confluence = Confluence(
url=api_url,
session=session,
cloud=True, # OAuth is only for Cloud
verify_ssl=self.config.ssl_verify,
)
elif self.config.auth_type == "pat":
logger.debug(
f"Initializing Confluence client with Token (PAT) auth. "
f"URL: {self.config.url}, "
f"Token (masked): {mask_sensitive(str(self.config.personal_token))}"
)
self.confluence = Confluence(
url=self.config.url,
token=self.config.personal_token,
cloud=self.config.is_cloud,
verify_ssl=self.config.ssl_verify,
)
else: # basic auth
logger.debug(
f"Initializing Confluence client with Basic auth. "
f"URL: {self.config.url}, Username: {self.config.username}, "
f"API Token present: {bool(self.config.api_token)}, "
f"Is Cloud: {self.config.is_cloud}"
)
self.confluence = Confluence(
url=self.config.url,
username=self.config.username,
password=self.config.api_token, # API token is used as password
cloud=self.config.is_cloud,
verify_ssl=self.config.ssl_verify,
)
logger.debug(
f"Confluence client initialized. "
f"Session headers (Authorization masked): "
f"{get_masked_session_headers(dict(self.confluence._session.headers))}"
)
# Configure SSL verification using the shared utility
configure_ssl_verification(
service_name="Confluence",
url=self.config.url,
session=self.confluence._session,
ssl_verify=self.config.ssl_verify,
)
# Proxy configuration
proxies = {}
if self.config.http_proxy:
proxies["http"] = self.config.http_proxy
if self.config.https_proxy:
proxies["https"] = self.config.https_proxy
if self.config.socks_proxy:
proxies["socks"] = self.config.socks_proxy
if proxies:
self.confluence._session.proxies.update(proxies)
for k, v in proxies.items():
log_config_param(
logger, "Confluence", f"{k.upper()}_PROXY", v, sensitive=True
)
if self.config.no_proxy and isinstance(self.config.no_proxy, str):
os.environ["NO_PROXY"] = self.config.no_proxy
log_config_param(logger, "Confluence", "NO_PROXY", self.config.no_proxy)
# Apply custom headers if configured
if self.config.custom_headers:
self._apply_custom_headers()
# Import here to avoid circular imports
from ..preprocessing.confluence import ConfluencePreprocessor
self.preprocessor = ConfluencePreprocessor(base_url=self.config.url)
# Test authentication during initialization (in debug mode only)
if logger.isEnabledFor(logging.DEBUG):
try:
self._validate_authentication()
except MCPAtlassianAuthenticationError:
logger.warning(
"Authentication validation failed during client initialization - "
"continuing anyway"
)
def _validate_authentication(self) -> None:
"""Validate authentication by making a simple API call."""
try:
logger.debug(
"Testing Confluence authentication by making a simple API call..."
)
# Make a simple API call to test authentication
spaces = self.confluence.get_all_spaces(start=0, limit=1)
if spaces is not None:
logger.info(
f"Confluence authentication successful. "
f"API call returned {len(spaces.get('results', []))} spaces."
)
else:
logger.warning(
"Confluence authentication test returned None - "
"this may indicate an issue"
)
except Exception as e:
error_msg = f"Confluence authentication validation failed: {e}"
logger.error(error_msg)
logger.debug(
f"Authentication headers during failure: "
f"{get_masked_session_headers(dict(self.confluence._session.headers))}"
)
raise MCPAtlassianAuthenticationError(error_msg) from e
def _apply_custom_headers(self) -> None:
"""Apply custom headers to the Confluence session."""
if not self.config.custom_headers:
return
logger.debug(
f"Applying {len(self.config.custom_headers)} custom headers to Confluence session"
)
for header_name, header_value in self.config.custom_headers.items():
self.confluence._session.headers[header_name] = header_value
logger.debug(f"Applied custom header: {header_name}")
def _process_html_content(
self, html_content: str, space_key: str
) -> tuple[str, str]:
"""Process HTML content into both HTML and markdown formats.
Args:
html_content: Raw HTML content from Confluence
space_key: The key of the space containing the content
Returns:
Tuple of (processed_html, processed_markdown)
"""
return self.preprocessor.process_html_content(
html_content, space_key, self.confluence
)
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/config.py:
--------------------------------------------------------------------------------
```python
"""Configuration module for Jira API interactions."""
import logging
import os
from dataclasses import dataclass
from typing import Literal
from ..utils.env import get_custom_headers, is_env_ssl_verify
from ..utils.oauth import (
BYOAccessTokenOAuthConfig,
OAuthConfig,
get_oauth_config_from_env,
)
from ..utils.urls import is_atlassian_cloud_url
@dataclass
class JiraConfig:
"""Jira API configuration.
Handles authentication for Jira Cloud and Server/Data Center:
- Cloud: username/API token (basic auth) or OAuth 2.0 (3LO)
- Server/DC: personal access token or basic auth
"""
url: str # Base URL for Jira
auth_type: Literal["basic", "pat", "oauth"] # Authentication type
username: str | None = None # Email or username (Cloud)
api_token: str | None = None # API token (Cloud)
personal_token: str | None = None # Personal access token (Server/DC)
oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig | None = None
ssl_verify: bool = True # Whether to verify SSL certificates
projects_filter: str | None = None # List of project keys to filter searches
http_proxy: str | None = None # HTTP proxy URL
https_proxy: str | None = None # HTTPS proxy URL
no_proxy: str | None = None # Comma-separated list of hosts to bypass proxy
socks_proxy: str | None = None # SOCKS proxy URL (optional)
custom_headers: dict[str, str] | None = None # Custom HTTP headers
@property
def is_cloud(self) -> bool:
"""Check if this is a cloud instance.
Returns:
True if this is a cloud instance (atlassian.net), False otherwise.
Localhost URLs are always considered non-cloud (Server/Data Center).
"""
# Multi-Cloud OAuth mode: URL might be None, but we use api.atlassian.com
if (
self.auth_type == "oauth"
and self.oauth_config
and self.oauth_config.cloud_id
):
# OAuth with cloud_id uses api.atlassian.com which is always Cloud
return True
# For other auth types, check the URL
return is_atlassian_cloud_url(self.url) if self.url else False
@property
def verify_ssl(self) -> bool:
"""Compatibility property for old code.
Returns:
The ssl_verify value
"""
return self.ssl_verify
@classmethod
def from_env(cls) -> "JiraConfig":
"""Create configuration from environment variables.
Returns:
JiraConfig with values from environment variables
Raises:
ValueError: If required environment variables are missing or invalid
"""
url = os.getenv("JIRA_URL")
if not url and not os.getenv("ATLASSIAN_OAUTH_ENABLE"):
error_msg = "Missing required JIRA_URL environment variable"
raise ValueError(error_msg)
# Determine authentication type based on available environment variables
username = os.getenv("JIRA_USERNAME")
api_token = os.getenv("JIRA_API_TOKEN")
personal_token = os.getenv("JIRA_PERSONAL_TOKEN")
# Check for OAuth configuration
oauth_config = get_oauth_config_from_env()
auth_type = None
# Use the shared utility function directly
is_cloud = is_atlassian_cloud_url(url)
if oauth_config:
# OAuth is available - could be full config or minimal config for user-provided tokens
auth_type = "oauth"
elif is_cloud:
if username and api_token:
auth_type = "basic"
else:
error_msg = "Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN, or OAuth configuration (set ATLASSIAN_OAUTH_ENABLE=true for user-provided tokens)"
raise ValueError(error_msg)
else: # Server/Data Center
if personal_token:
auth_type = "pat"
elif username and api_token:
# Allow basic auth for Server/DC too
auth_type = "basic"
else:
error_msg = "Server/Data Center authentication requires JIRA_PERSONAL_TOKEN or JIRA_USERNAME and JIRA_API_TOKEN"
raise ValueError(error_msg)
# SSL verification (for Server/DC)
ssl_verify = is_env_ssl_verify("JIRA_SSL_VERIFY")
# Get the projects filter if provided
projects_filter = os.getenv("JIRA_PROJECTS_FILTER")
# Proxy settings
http_proxy = os.getenv("JIRA_HTTP_PROXY", os.getenv("HTTP_PROXY"))
https_proxy = os.getenv("JIRA_HTTPS_PROXY", os.getenv("HTTPS_PROXY"))
no_proxy = os.getenv("JIRA_NO_PROXY", os.getenv("NO_PROXY"))
socks_proxy = os.getenv("JIRA_SOCKS_PROXY", os.getenv("SOCKS_PROXY"))
# Custom headers - service-specific only
custom_headers = get_custom_headers("JIRA_CUSTOM_HEADERS")
return cls(
url=url,
auth_type=auth_type,
username=username,
api_token=api_token,
personal_token=personal_token,
oauth_config=oauth_config,
ssl_verify=ssl_verify,
projects_filter=projects_filter,
http_proxy=http_proxy,
https_proxy=https_proxy,
no_proxy=no_proxy,
socks_proxy=socks_proxy,
custom_headers=custom_headers,
)
def is_auth_configured(self) -> bool:
"""Check if the current authentication configuration is complete and valid for making API calls.
Returns:
bool: True if authentication is fully configured, False otherwise.
"""
logger = logging.getLogger("mcp-atlassian.jira.config")
if self.auth_type == "oauth":
# Handle different OAuth configuration types
if self.oauth_config:
# Full OAuth configuration (traditional mode)
if isinstance(self.oauth_config, OAuthConfig):
if (
self.oauth_config.client_id
and self.oauth_config.client_secret
and self.oauth_config.redirect_uri
and self.oauth_config.scope
and self.oauth_config.cloud_id
):
return True
# Minimal OAuth configuration (user-provided tokens mode)
# This is valid if we have oauth_config but missing client credentials
# In this case, we expect authentication to come from user-provided headers
elif (
not self.oauth_config.client_id
and not self.oauth_config.client_secret
):
logger.debug(
"Minimal OAuth config detected - expecting user-provided tokens via headers"
)
return True
# Bring Your Own Access Token mode
elif isinstance(self.oauth_config, BYOAccessTokenOAuthConfig):
if self.oauth_config.cloud_id and self.oauth_config.access_token:
return True
# Partial configuration is invalid
logger.warning("Incomplete OAuth configuration detected")
return False
elif self.auth_type == "pat":
return bool(self.personal_token)
elif self.auth_type == "basic":
return bool(self.username and self.api_token)
logger.warning(
f"Unknown or unsupported auth_type: {self.auth_type} in JiraConfig"
)
return False
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/config.py:
--------------------------------------------------------------------------------
```python
"""Configuration module for the Confluence client."""
import logging
import os
from dataclasses import dataclass
from typing import Literal
from ..utils.env import get_custom_headers, is_env_ssl_verify
from ..utils.oauth import (
BYOAccessTokenOAuthConfig,
OAuthConfig,
get_oauth_config_from_env,
)
from ..utils.urls import is_atlassian_cloud_url
@dataclass
class ConfluenceConfig:
"""Confluence API configuration.
Handles authentication for Confluence Cloud and Server/Data Center:
- Cloud: username/API token (basic auth) or OAuth 2.0 (3LO)
- Server/DC: personal access token or basic auth
"""
url: str # Base URL for Confluence
auth_type: Literal["basic", "pat", "oauth"] # Authentication type
username: str | None = None # Email or username
api_token: str | None = None # API token used as password
personal_token: str | None = None # Personal access token (Server/DC)
oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig | None = None
ssl_verify: bool = True # Whether to verify SSL certificates
spaces_filter: str | None = None # List of space keys to filter searches
http_proxy: str | None = None # HTTP proxy URL
https_proxy: str | None = None # HTTPS proxy URL
no_proxy: str | None = None # Comma-separated list of hosts to bypass proxy
socks_proxy: str | None = None # SOCKS proxy URL (optional)
custom_headers: dict[str, str] | None = None # Custom HTTP headers
@property
def is_cloud(self) -> bool:
"""Check if this is a cloud instance.
Returns:
True if this is a cloud instance (atlassian.net), False otherwise.
Localhost URLs are always considered non-cloud (Server/Data Center).
"""
# Multi-Cloud OAuth mode: URL might be None, but we use api.atlassian.com
if (
self.auth_type == "oauth"
and self.oauth_config
and self.oauth_config.cloud_id
):
# OAuth with cloud_id uses api.atlassian.com which is always Cloud
return True
# For other auth types, check the URL
return is_atlassian_cloud_url(self.url) if self.url else False
@property
def verify_ssl(self) -> bool:
"""Compatibility property for old code.
Returns:
The ssl_verify value
"""
return self.ssl_verify
@classmethod
def from_env(cls) -> "ConfluenceConfig":
"""Create configuration from environment variables.
Returns:
ConfluenceConfig with values from environment variables
Raises:
ValueError: If any required environment variable is missing
"""
url = os.getenv("CONFLUENCE_URL")
if not url and not os.getenv("ATLASSIAN_OAUTH_ENABLE"):
error_msg = "Missing required CONFLUENCE_URL environment variable"
raise ValueError(error_msg)
# Determine authentication type based on available environment variables
username = os.getenv("CONFLUENCE_USERNAME")
api_token = os.getenv("CONFLUENCE_API_TOKEN")
personal_token = os.getenv("CONFLUENCE_PERSONAL_TOKEN")
# Check for OAuth configuration
oauth_config = get_oauth_config_from_env()
auth_type = None
# Use the shared utility function directly
is_cloud = is_atlassian_cloud_url(url)
if oauth_config:
# OAuth is available - could be full config or minimal config for user-provided tokens
auth_type = "oauth"
elif is_cloud:
if username and api_token:
auth_type = "basic"
else:
error_msg = "Cloud authentication requires CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN, or OAuth configuration (set ATLASSIAN_OAUTH_ENABLE=true for user-provided tokens)"
raise ValueError(error_msg)
else: # Server/Data Center
if personal_token:
auth_type = "pat"
elif username and api_token:
# Allow basic auth for Server/DC too
auth_type = "basic"
else:
error_msg = "Server/Data Center authentication requires CONFLUENCE_PERSONAL_TOKEN or CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN"
raise ValueError(error_msg)
# SSL verification (for Server/DC)
ssl_verify = is_env_ssl_verify("CONFLUENCE_SSL_VERIFY")
# Get the spaces filter if provided
spaces_filter = os.getenv("CONFLUENCE_SPACES_FILTER")
# Proxy settings
http_proxy = os.getenv("CONFLUENCE_HTTP_PROXY", os.getenv("HTTP_PROXY"))
https_proxy = os.getenv("CONFLUENCE_HTTPS_PROXY", os.getenv("HTTPS_PROXY"))
no_proxy = os.getenv("CONFLUENCE_NO_PROXY", os.getenv("NO_PROXY"))
socks_proxy = os.getenv("CONFLUENCE_SOCKS_PROXY", os.getenv("SOCKS_PROXY"))
# Custom headers - service-specific only
custom_headers = get_custom_headers("CONFLUENCE_CUSTOM_HEADERS")
return cls(
url=url,
auth_type=auth_type,
username=username,
api_token=api_token,
personal_token=personal_token,
oauth_config=oauth_config,
ssl_verify=ssl_verify,
spaces_filter=spaces_filter,
http_proxy=http_proxy,
https_proxy=https_proxy,
no_proxy=no_proxy,
socks_proxy=socks_proxy,
custom_headers=custom_headers,
)
def is_auth_configured(self) -> bool:
"""Check if the current authentication configuration is complete and valid for making API calls.
Returns:
bool: True if authentication is fully configured, False otherwise.
"""
logger = logging.getLogger("mcp-atlassian.confluence.config")
if self.auth_type == "oauth":
# Handle different OAuth configuration types
if self.oauth_config:
# Full OAuth configuration (traditional mode)
if isinstance(self.oauth_config, OAuthConfig):
if (
self.oauth_config.client_id
and self.oauth_config.client_secret
and self.oauth_config.redirect_uri
and self.oauth_config.scope
and self.oauth_config.cloud_id
):
return True
# Minimal OAuth configuration (user-provided tokens mode)
# This is valid if we have oauth_config but missing client credentials
# In this case, we expect authentication to come from user-provided headers
elif (
not self.oauth_config.client_id
and not self.oauth_config.client_secret
):
logger.debug(
"Minimal OAuth config detected - expecting user-provided tokens via headers"
)
return True
# Bring Your Own Access Token mode
elif isinstance(self.oauth_config, BYOAccessTokenOAuthConfig):
if self.oauth_config.cloud_id and self.oauth_config.access_token:
return True
# Partial configuration is invalid
logger.warning("Incomplete OAuth configuration detected")
return False
elif self.auth_type == "pat":
return bool(self.personal_token)
elif self.auth_type == "basic":
return bool(self.username and self.api_token)
logger.warning(
f"Unknown or unsupported auth_type: {self.auth_type} in ConfluenceConfig"
)
return False
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/link.py:
--------------------------------------------------------------------------------
```python
"""
Jira issue link models.
This module provides Pydantic models for Jira issue links and link types.
"""
import logging
from typing import Any
from ..base import ApiModel
from ..constants import EMPTY_STRING, JIRA_DEFAULT_ID, UNKNOWN
from .common import JiraIssueType, JiraPriority, JiraStatus
logger = logging.getLogger(__name__)
class JiraIssueLinkType(ApiModel):
"""
Model representing a Jira issue link type.
"""
id: str = JIRA_DEFAULT_ID
name: str = UNKNOWN
inward: str = EMPTY_STRING
outward: str = EMPTY_STRING
self_url: str | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "JiraIssueLinkType":
"""
Create a JiraIssueLinkType from a Jira API response.
Args:
data: The issue link type data from the Jira API
Returns:
A JiraIssueLinkType instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
link_type_id = data.get("id", JIRA_DEFAULT_ID)
if link_type_id is not None:
link_type_id = str(link_type_id)
return cls(
id=link_type_id,
name=str(data.get("name", UNKNOWN)),
inward=str(data.get("inward", EMPTY_STRING)),
outward=str(data.get("outward", EMPTY_STRING)),
self_url=data.get("self"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"id": self.id,
"name": self.name,
"inward": self.inward,
"outward": self.outward,
}
if self.self_url:
result["self"] = self.self_url
return result
class JiraLinkedIssueFields(ApiModel):
"""
Model representing the fields of a linked issue.
"""
summary: str = EMPTY_STRING
status: JiraStatus | None = None
priority: JiraPriority | None = None
issuetype: JiraIssueType | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "JiraLinkedIssueFields":
"""
Create a JiraLinkedIssueFields from a Jira API response.
Args:
data: The linked issue fields data from the Jira API
Returns:
A JiraLinkedIssueFields instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Extract status data
status = None
status_data = data.get("status")
if status_data:
status = JiraStatus.from_api_response(status_data)
# Extract priority data
priority = None
priority_data = data.get("priority")
if priority_data:
priority = JiraPriority.from_api_response(priority_data)
# Extract issue type data
issuetype = None
issuetype_data = data.get("issuetype")
if issuetype_data:
issuetype = JiraIssueType.from_api_response(issuetype_data)
return cls(
summary=str(data.get("summary", EMPTY_STRING)),
status=status,
priority=priority,
issuetype=issuetype,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"summary": self.summary,
}
if self.status:
result["status"] = self.status.to_simplified_dict()
if self.priority:
result["priority"] = self.priority.to_simplified_dict()
if self.issuetype:
result["issuetype"] = self.issuetype.to_simplified_dict()
return result
class JiraLinkedIssue(ApiModel):
"""
Model representing a linked issue in Jira.
"""
id: str = JIRA_DEFAULT_ID
key: str = EMPTY_STRING
self_url: str | None = None
fields: JiraLinkedIssueFields | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "JiraLinkedIssue":
"""
Create a JiraLinkedIssue from a Jira API response.
Args:
data: The linked issue data from the Jira API
Returns:
A JiraLinkedIssue instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Extract fields data
fields = None
fields_data = data.get("fields")
if fields_data:
fields = JiraLinkedIssueFields.from_api_response(fields_data)
# Ensure ID is a string
issue_id = data.get("id", JIRA_DEFAULT_ID)
if issue_id is not None:
issue_id = str(issue_id)
return cls(
id=issue_id,
key=str(data.get("key", EMPTY_STRING)),
self_url=data.get("self"),
fields=fields,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"id": self.id,
"key": self.key,
}
if self.self_url:
result["self"] = self.self_url
if self.fields:
result["fields"] = self.fields.to_simplified_dict()
return result
class JiraIssueLink(ApiModel):
"""
Model representing a link between two Jira issues.
"""
id: str = JIRA_DEFAULT_ID
type: JiraIssueLinkType | None = None
inward_issue: JiraLinkedIssue | None = None
outward_issue: JiraLinkedIssue | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssueLink":
"""
Create a JiraIssueLink from a Jira API response.
Args:
data: The issue link data from the Jira API
Returns:
A JiraIssueLink instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Extract link type data
link_type = None
type_data = data.get("type")
if type_data:
link_type = JiraIssueLinkType.from_api_response(type_data)
# Extract inward issue data
inward_issue = None
inward_issue_data = data.get("inwardIssue")
if inward_issue_data:
inward_issue = JiraLinkedIssue.from_api_response(inward_issue_data)
# Extract outward issue data
outward_issue = None
outward_issue_data = data.get("outwardIssue")
if outward_issue_data:
outward_issue = JiraLinkedIssue.from_api_response(outward_issue_data)
# Ensure ID is a string
link_id = data.get("id", JIRA_DEFAULT_ID)
if link_id is not None:
link_id = str(link_id)
return cls(
id=link_id,
type=link_type,
inward_issue=inward_issue,
outward_issue=outward_issue,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"id": self.id,
}
if self.type:
result["type"] = self.type.to_simplified_dict()
if self.inward_issue:
result["inward_issue"] = self.inward_issue.to_simplified_dict()
if self.outward_issue:
result["outward_issue"] = self.outward_issue.to_simplified_dict()
return result
```
--------------------------------------------------------------------------------
/tests/unit/test_exceptions.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the exceptions module.
"""
import pickle
import pytest
from src.mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
class TestMCPAtlassianAuthenticationError:
"""Tests for the MCPAtlassianAuthenticationError exception class."""
def test_instantiation_without_message(self):
"""Test creating exception without a message."""
error = MCPAtlassianAuthenticationError()
assert isinstance(error, MCPAtlassianAuthenticationError)
assert isinstance(error, Exception)
assert str(error) == ""
assert error.args == ()
def test_instantiation_with_message(self):
"""Test creating exception with a message."""
message = "Authentication failed"
error = MCPAtlassianAuthenticationError(message)
assert isinstance(error, MCPAtlassianAuthenticationError)
assert isinstance(error, Exception)
assert str(error) == message
assert error.args == (message,)
def test_instantiation_with_multiple_args(self):
"""Test creating exception with multiple arguments."""
message = "Authentication failed"
code = 401
error = MCPAtlassianAuthenticationError(message, code)
assert isinstance(error, MCPAtlassianAuthenticationError)
assert isinstance(error, Exception)
# When multiple args are present, str() returns tuple representation
assert str(error) == "('Authentication failed', 401)"
assert error.args == (message, code)
def test_inheritance_hierarchy(self):
"""Test that the exception properly inherits from Exception."""
error = MCPAtlassianAuthenticationError("test")
assert isinstance(error, MCPAtlassianAuthenticationError)
assert isinstance(error, Exception)
assert isinstance(error, BaseException)
assert issubclass(MCPAtlassianAuthenticationError, Exception)
assert issubclass(MCPAtlassianAuthenticationError, BaseException)
def test_string_representation(self):
"""Test string representation of the exception."""
# Empty message
error = MCPAtlassianAuthenticationError()
assert str(error) == ""
assert repr(error) == "MCPAtlassianAuthenticationError()"
# With message
message = "Invalid credentials provided"
error = MCPAtlassianAuthenticationError(message)
assert str(error) == message
assert repr(error) == f"MCPAtlassianAuthenticationError('{message}')"
# With multiple args
error = MCPAtlassianAuthenticationError("Auth failed", 403)
assert str(error) == "('Auth failed', 403)"
assert repr(error) == "MCPAtlassianAuthenticationError('Auth failed', 403)"
def test_exception_raising_and_catching(self):
"""Test raising and catching the exception."""
message = "401 Unauthorized"
with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
raise MCPAtlassianAuthenticationError(message)
assert str(exc_info.value) == message
assert exc_info.value.args == (message,)
def test_exception_catching_as_base_exception(self):
"""Test that the exception can be caught as base Exception."""
message = "403 Forbidden"
with pytest.raises(Exception) as exc_info:
raise MCPAtlassianAuthenticationError(message)
assert isinstance(exc_info.value, MCPAtlassianAuthenticationError)
assert str(exc_info.value) == message
def test_exception_chaining_with_cause(self):
"""Test exception chaining using 'raise from' syntax."""
original_error = ValueError("Invalid token format")
auth_message = "Authentication failed due to invalid token"
with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
try:
raise original_error
except ValueError as e:
raise MCPAtlassianAuthenticationError(auth_message) from e
assert str(exc_info.value) == auth_message
assert exc_info.value.__cause__ is original_error
# Context is still preserved even with explicit 'raise from'
assert exc_info.value.__context__ is original_error
def test_exception_chaining_with_context(self):
"""Test implicit exception chaining (context preservation)."""
original_error = ConnectionError("Network timeout")
auth_message = "Authentication failed"
with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
try:
raise original_error
except ConnectionError:
raise MCPAtlassianAuthenticationError(auth_message) from None
assert str(exc_info.value) == auth_message
assert exc_info.value.__context__ is original_error
assert exc_info.value.__cause__ is None
def test_exception_suppressed_context(self):
"""Test exception with suppressed context."""
original_error = RuntimeError("Some runtime error")
auth_message = "Authentication failed"
with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
try:
raise original_error
except RuntimeError:
error = MCPAtlassianAuthenticationError(auth_message)
error.__suppress_context__ = True
raise error from None
assert str(exc_info.value) == auth_message
assert exc_info.value.__suppress_context__ is True
def test_serialization_with_pickle(self):
"""Test that the exception can be pickled and unpickled."""
message = "Authentication error for serialization test"
original_error = MCPAtlassianAuthenticationError(message)
# Serialize
pickled_data = pickle.dumps(original_error)
# Deserialize
unpickled_error = pickle.loads(pickled_data)
assert isinstance(unpickled_error, MCPAtlassianAuthenticationError)
assert str(unpickled_error) == message
assert unpickled_error.args == original_error.args
def test_exception_attributes_access(self):
"""Test accessing exception attributes."""
message = "Test message"
error = MCPAtlassianAuthenticationError(message)
# Test standard exception attributes
assert hasattr(error, "args")
assert hasattr(error, "__traceback__")
assert hasattr(error, "__cause__")
assert hasattr(error, "__context__")
assert hasattr(error, "__suppress_context__")
# Test docstring access
expected_doc = "Raised when Atlassian API authentication fails (401/403)."
assert error.__doc__ == expected_doc
def test_exception_equality(self):
"""Test exception equality comparison."""
message = "Same message"
error1 = MCPAtlassianAuthenticationError(message)
error2 = MCPAtlassianAuthenticationError(message)
error3 = MCPAtlassianAuthenticationError("Different message")
# Exceptions with same args should have same args but different identity
assert error1.args == error2.args
assert error1 is not error2
assert error1.args != error3.args
def test_realistic_authentication_scenarios(self):
"""Test realistic authentication error scenarios."""
# 401 Unauthorized
msg_401 = "401 Unauthorized: Invalid API token"
error_401 = MCPAtlassianAuthenticationError(msg_401)
assert "401" in str(error_401)
assert "Invalid API token" in str(error_401)
# 403 Forbidden
msg_403 = "403 Forbidden: Insufficient permissions"
error_403 = MCPAtlassianAuthenticationError(msg_403)
assert "403" in str(error_403)
assert "Insufficient permissions" in str(error_403)
# OAuth token expired
oauth_error = MCPAtlassianAuthenticationError("OAuth token has expired")
assert "OAuth" in str(oauth_error)
assert "expired" in str(oauth_error)
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_env.py:
--------------------------------------------------------------------------------
```python
"""Tests for environment variable utility functions."""
from mcp_atlassian.utils.env import (
is_env_extended_truthy,
is_env_ssl_verify,
is_env_truthy,
)
class TestIsEnvTruthy:
"""Test the is_env_truthy function."""
def test_standard_truthy_values(self, monkeypatch):
"""Test standard truthy values: 'true', '1', 'yes'."""
truthy_values = ["true", "1", "yes"]
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_truthy("TEST_VAR") is True
# Test uppercase variants
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value.upper())
assert is_env_truthy("TEST_VAR") is True
# Test mixed case variants
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value.capitalize())
assert is_env_truthy("TEST_VAR") is True
def test_standard_falsy_values(self, monkeypatch):
"""Test that standard falsy values return False."""
falsy_values = ["false", "0", "no", "", "invalid", "y", "on"]
for value in falsy_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_truthy("TEST_VAR") is False
def test_unset_variable_with_default(self, monkeypatch):
"""Test behavior when variable is unset with various defaults."""
monkeypatch.delenv("TEST_VAR", raising=False)
# Default empty string
assert is_env_truthy("TEST_VAR") is False
# Default truthy value
assert is_env_truthy("TEST_VAR", "true") is True
assert is_env_truthy("TEST_VAR", "1") is True
assert is_env_truthy("TEST_VAR", "yes") is True
# Default falsy value
assert is_env_truthy("TEST_VAR", "false") is False
assert is_env_truthy("TEST_VAR", "0") is False
def test_empty_string_environment_variable(self, monkeypatch):
"""Test behavior when environment variable is set to empty string."""
monkeypatch.setenv("TEST_VAR", "")
assert is_env_truthy("TEST_VAR") is False
class TestIsEnvExtendedTruthy:
"""Test the is_env_extended_truthy function."""
def test_extended_truthy_values(self, monkeypatch):
"""Test extended truthy values: 'true', '1', 'yes', 'y', 'on'."""
truthy_values = ["true", "1", "yes", "y", "on"]
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_extended_truthy("TEST_VAR") is True
# Test uppercase variants
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value.upper())
assert is_env_extended_truthy("TEST_VAR") is True
# Test mixed case variants
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value.capitalize())
assert is_env_extended_truthy("TEST_VAR") is True
def test_extended_falsy_values(self, monkeypatch):
"""Test that extended falsy values return False."""
falsy_values = ["false", "0", "no", "", "invalid", "off"]
for value in falsy_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_extended_truthy("TEST_VAR") is False
def test_extended_vs_standard_difference(self, monkeypatch):
"""Test that extended truthy accepts 'y' and 'on' while standard doesn't."""
extended_only_values = ["y", "on"]
for value in extended_only_values:
monkeypatch.setenv("TEST_VAR", value)
# Extended should be True
assert is_env_extended_truthy("TEST_VAR") is True
# Standard should be False
assert is_env_truthy("TEST_VAR") is False
def test_unset_variable_with_default(self, monkeypatch):
"""Test behavior when variable is unset with various defaults."""
monkeypatch.delenv("TEST_VAR", raising=False)
# Default empty string
assert is_env_extended_truthy("TEST_VAR") is False
# Default truthy values
assert is_env_extended_truthy("TEST_VAR", "true") is True
assert is_env_extended_truthy("TEST_VAR", "y") is True
assert is_env_extended_truthy("TEST_VAR", "on") is True
# Default falsy value
assert is_env_extended_truthy("TEST_VAR", "false") is False
class TestIsEnvSslVerify:
"""Test the is_env_ssl_verify function."""
def test_ssl_verify_default_true(self, monkeypatch):
"""Test that SSL verification defaults to True when unset."""
monkeypatch.delenv("TEST_VAR", raising=False)
assert is_env_ssl_verify("TEST_VAR") is True
def test_ssl_verify_explicit_false_values(self, monkeypatch):
"""Test that SSL verification is False only for explicit false values."""
false_values = ["false", "0", "no"]
for value in false_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_ssl_verify("TEST_VAR") is False
# Test uppercase variants
for value in false_values:
monkeypatch.setenv("TEST_VAR", value.upper())
assert is_env_ssl_verify("TEST_VAR") is False
# Test mixed case variants
for value in false_values:
monkeypatch.setenv("TEST_VAR", value.capitalize())
assert is_env_ssl_verify("TEST_VAR") is False
def test_ssl_verify_truthy_and_other_values(self, monkeypatch):
"""Test that SSL verification is True for truthy and other values."""
truthy_values = ["true", "1", "yes", "y", "on", "enable", "enabled", "anything"]
for value in truthy_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_ssl_verify("TEST_VAR") is True
def test_ssl_verify_custom_default(self, monkeypatch):
"""Test SSL verification with custom defaults."""
monkeypatch.delenv("TEST_VAR", raising=False)
# Custom default true
assert is_env_ssl_verify("TEST_VAR", "true") is True
# Custom default false
assert is_env_ssl_verify("TEST_VAR", "false") is False
# Custom default other value
assert is_env_ssl_verify("TEST_VAR", "anything") is True
def test_ssl_verify_empty_string(self, monkeypatch):
"""Test SSL verification when set to empty string."""
monkeypatch.setenv("TEST_VAR", "")
# Empty string is not in the false values, so should be True
assert is_env_ssl_verify("TEST_VAR") is True
class TestEdgeCases:
"""Test edge cases and special scenarios."""
def test_whitespace_handling(self, monkeypatch):
"""Test that whitespace in values is not stripped."""
# Values with leading/trailing whitespace should not match
monkeypatch.setenv("TEST_VAR", " true ")
assert is_env_truthy("TEST_VAR") is False
assert is_env_extended_truthy("TEST_VAR") is False
monkeypatch.setenv("TEST_VAR", " false ")
assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
def test_special_characters(self, monkeypatch):
"""Test behavior with special characters."""
special_values = ["true!", "@yes", "1.0", "y,", "on;"]
for value in special_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_truthy("TEST_VAR") is False
assert is_env_extended_truthy("TEST_VAR") is False
assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
def test_unicode_values(self, monkeypatch):
"""Test behavior with unicode values."""
unicode_values = ["truë", "yés", "1️⃣"]
for value in unicode_values:
monkeypatch.setenv("TEST_VAR", value)
assert is_env_truthy("TEST_VAR") is False
assert is_env_extended_truthy("TEST_VAR") is False
assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
def test_numeric_string_edge_cases(self, monkeypatch):
"""Test numeric string edge cases."""
numeric_values = ["01", "1.0", "10", "-1", "2"]
for value in numeric_values:
monkeypatch.setenv("TEST_VAR", value)
if value == "01":
# "01" is not exactly "1", so should be False
assert is_env_truthy("TEST_VAR") is False
assert is_env_extended_truthy("TEST_VAR") is False
else:
assert is_env_truthy("TEST_VAR") is False
assert is_env_extended_truthy("TEST_VAR") is False
assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/worklog.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira worklog operations."""
import logging
import re
from typing import Any
from ..models import JiraWorklog
from ..utils import parse_date
from .client import JiraClient
logger = logging.getLogger("mcp-jira")
class WorklogMixin(JiraClient):
"""Mixin for Jira worklog operations."""
def _parse_time_spent(self, time_spent: str) -> int:
"""
Parse time spent string into seconds.
Args:
time_spent: Time spent string (e.g. 1h 30m, 1d, etc.)
Returns:
Time spent in seconds
"""
# Base case for direct specification in seconds
if time_spent.endswith("s"):
try:
return int(time_spent[:-1])
except ValueError:
pass
total_seconds = 0
time_units = {
"w": 7 * 24 * 60 * 60, # weeks to seconds
"d": 24 * 60 * 60, # days to seconds
"h": 60 * 60, # hours to seconds
"m": 60, # minutes to seconds
}
# Regular expression to find time components like 1w, 2d, 3h, 4m
pattern = r"(\d+)([wdhm])"
matches = re.findall(pattern, time_spent)
for value, unit in matches:
# Convert value to int and multiply by the unit in seconds
seconds = int(value) * time_units[unit]
total_seconds += seconds
if total_seconds == 0:
# If we couldn't parse anything, try using the raw value
try:
return int(float(time_spent)) # Convert to float first, then to int
except ValueError:
# If all else fails, default to 60 seconds (1 minute)
logger.warning(
f"Could not parse time: {time_spent}, defaulting to 60 seconds"
)
return 60
return total_seconds
def add_worklog(
self,
issue_key: str,
time_spent: str,
comment: str | None = None,
started: str | None = None,
original_estimate: str | None = None,
remaining_estimate: str | None = None,
) -> dict[str, Any]:
"""
Add a worklog entry to a Jira issue.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
time_spent: Time spent (e.g. '1h 30m', '3h', '1d')
comment: Optional comment for the worklog
started: Optional ISO8601 date time string for when work began
original_estimate: Optional new value for the original estimate
remaining_estimate: Optional new value for the remaining estimate
Returns:
Response data if successful
Raises:
Exception: If there's an error adding the worklog
"""
try:
# Convert time_spent string to seconds
time_spent_seconds = self._parse_time_spent(time_spent)
# Convert Markdown comment to Jira format if provided
if comment:
# Check if _markdown_to_jira is available (from CommentsMixin)
if hasattr(self, "_markdown_to_jira"):
comment = self._markdown_to_jira(comment)
# Step 1: Update original estimate if provided (separate API call)
original_estimate_updated = False
if original_estimate:
try:
fields = {"timetracking": {"originalEstimate": original_estimate}}
self.jira.edit_issue(issue_id_or_key=issue_key, fields=fields)
original_estimate_updated = True
logger.info(f"Updated original estimate for issue {issue_key}")
except Exception as e: # noqa: BLE001 - Intentional fallback with logging
logger.error(
f"Failed to update original estimate for issue {issue_key}: "
f"{str(e)}"
)
# Continue with worklog creation even if estimate update fails
# Step 2: Prepare worklog data
worklog_data: dict[str, Any] = {"timeSpentSeconds": time_spent_seconds}
if comment:
worklog_data["comment"] = comment
if started:
worklog_data["started"] = started
# Step 3: Prepare query parameters for remaining estimate
params = {}
remaining_estimate_updated = False
if remaining_estimate:
params["adjustEstimate"] = "new"
params["newEstimate"] = remaining_estimate
remaining_estimate_updated = True
# Step 4: Add the worklog with remaining estimate adjustment
base_url = self.jira.resource_url("issue")
url = f"{base_url}/{issue_key}/worklog"
result = self.jira.post(url, data=worklog_data, params=params)
if not isinstance(result, dict):
msg = f"Unexpected return value type from `jira.post`: {type(result)}"
logger.error(msg)
raise TypeError(msg)
# Format and return the result
return {
"id": result.get("id"),
"comment": self._clean_text(result.get("comment", "")),
"created": str(parse_date(result.get("created", ""))),
"updated": str(parse_date(result.get("updated", ""))),
"started": str(parse_date(result.get("started", ""))),
"timeSpent": result.get("timeSpent", ""),
"timeSpentSeconds": result.get("timeSpentSeconds", 0),
"author": result.get("author", {}).get("displayName", "Unknown"),
"original_estimate_updated": original_estimate_updated,
"remaining_estimate_updated": remaining_estimate_updated,
}
except Exception as e:
logger.error(f"Error adding worklog to issue {issue_key}: {str(e)}")
raise Exception(f"Error adding worklog: {str(e)}") from e
def get_worklog(self, issue_key: str) -> dict[str, Any]:
"""
Get the worklog data for an issue.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
Returns:
Raw worklog data from the API
"""
try:
return self.jira.worklog(issue_key) # type: ignore[attr-defined]
except Exception as e:
logger.warning(f"Error getting worklog for {issue_key}: {e}")
return {"worklogs": []}
def get_worklog_models(self, issue_key: str) -> list[JiraWorklog]:
"""
Get all worklog entries for an issue as JiraWorklog models.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
Returns:
List of JiraWorklog models
"""
worklog_data = self.get_worklog(issue_key)
result: list[JiraWorklog] = []
if "worklogs" in worklog_data and worklog_data["worklogs"]:
for log_data in worklog_data["worklogs"]:
worklog = JiraWorklog.from_api_response(log_data)
result.append(worklog)
return result
def get_worklogs(self, issue_key: str) -> list[dict[str, Any]]:
"""
Get all worklog entries for an issue.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
Returns:
List of worklog entries
Raises:
Exception: If there's an error getting the worklogs
"""
try:
result = self.jira.issue_get_worklog(issue_key)
if not isinstance(result, dict):
msg = f"Unexpected return value type from `jira.issue_get_worklog`: {type(result)}"
logger.error(msg)
raise TypeError(msg)
# Process the worklogs
worklogs = []
for worklog in result.get("worklogs", []):
worklogs.append(
{
"id": worklog.get("id"),
"comment": self._clean_text(worklog.get("comment", "")),
"created": str(parse_date(worklog.get("created", ""))),
"updated": str(parse_date(worklog.get("updated", ""))),
"started": str(parse_date(worklog.get("started", ""))),
"timeSpent": worklog.get("timeSpent", ""),
"timeSpentSeconds": worklog.get("timeSpentSeconds", 0),
"author": worklog.get("author", {}).get(
"displayName", "Unknown"
),
}
)
return worklogs
except Exception as e:
logger.error(f"Error getting worklogs for issue {issue_key}: {str(e)}")
raise Exception(f"Error getting worklogs: {str(e)}") from e
```