This is page 3 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/confluence/page.py:
--------------------------------------------------------------------------------
```python
"""
Confluence page models.
This module provides Pydantic models for Confluence pages and their versions.
"""
import logging
import warnings
from typing import Any
from pydantic import Field
from ..base import ApiModel, TimestampMixin
from ..constants import (
CONFLUENCE_DEFAULT_ID,
EMPTY_STRING,
)
# Import other necessary models using relative imports
from .common import ConfluenceAttachment, ConfluenceUser
from .space import ConfluenceSpace
logger = logging.getLogger(__name__)
class ConfluenceVersion(ApiModel, TimestampMixin):
"""
Model representing a Confluence page version.
"""
number: int = 0
when: str = EMPTY_STRING
message: str | None = None
by: ConfluenceUser | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "ConfluenceVersion":
"""
Create a ConfluenceVersion from a Confluence API response.
Args:
data: The version data from the Confluence API
Returns:
A ConfluenceVersion instance
"""
if not data:
return cls()
by_user = None
if by_data := data.get("by"):
by_user = ConfluenceUser.from_api_response(by_data)
return cls(
number=data.get("number", 0),
when=data.get("when", EMPTY_STRING),
message=data.get("message"),
by=by_user,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {"number": self.number, "when": self.format_timestamp(self.when)}
if self.message:
result["message"] = self.message
if self.by:
result["by"] = self.by.display_name
return result
class ConfluencePage(ApiModel, TimestampMixin):
"""
Model representing a Confluence page.
This model includes the content, metadata, and version information
for a Confluence page.
"""
id: str = CONFLUENCE_DEFAULT_ID
title: str = EMPTY_STRING
type: str = "page" # "page", "blogpost", etc.
status: str = "current"
space: ConfluenceSpace | None = None
content: str = EMPTY_STRING
content_format: str = "view" # "view", "storage", etc.
created: str = EMPTY_STRING
updated: str = EMPTY_STRING
author: ConfluenceUser | None = None
version: ConfluenceVersion | None = None
ancestors: list[dict[str, Any]] = Field(default_factory=list)
children: dict[str, Any] = Field(default_factory=dict)
attachments: list[ConfluenceAttachment] = Field(default_factory=list)
url: str | None = None
@property
def page_content(self) -> str:
"""
Alias for content to maintain compatibility with tests.
Deprecated: Use content instead.
"""
warnings.warn(
"The 'page_content' property is deprecated. Use 'content' instead.",
DeprecationWarning,
stacklevel=2,
)
return self.content
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "ConfluencePage":
"""
Create a ConfluencePage from a Confluence API response.
Args:
data: The page data from the Confluence API
**kwargs: Additional keyword arguments
base_url: Base URL for constructing page URLs
include_body: Whether to include body content
content_override: Override the content value
content_format: Override the content format
is_cloud: Whether this is a cloud instance (affects URL format)
Returns:
A ConfluencePage instance
"""
if not data:
return cls()
# Extract space information first to ensure it's available for URL construction
space_data = data.get("space", {})
if not space_data:
# Try to extract space info from _expandable if available
if expandable := data.get("_expandable", {}):
if space_path := expandable.get("space"):
# Extract space key from REST API path
if space_path.startswith("/rest/api/space/"):
space_key = space_path.split("/rest/api/space/")[1]
space_data = {"key": space_key, "name": f"Space {space_key}"}
# Create space model
space = ConfluenceSpace.from_api_response(space_data)
# Extract content based on format or use override if provided
content = EMPTY_STRING
content_format = kwargs.get("content_format", "view")
include_body = kwargs.get("include_body", True)
# Allow content override to be provided directly
if content_override := kwargs.get("content_override"):
content = content_override
elif include_body and "body" in data:
body = data.get("body", {})
if content_format in body:
content = body.get(content_format, {}).get("value", EMPTY_STRING)
# Adjust content_format if convert_to_markdown is False and content is processed HTML
convert_to_markdown = kwargs.get("convert_to_markdown", True)
if not convert_to_markdown:
content_format = "html"
# Process author/creator
author = None
if author_data := data.get("author"):
author = ConfluenceUser.from_api_response(author_data)
# Process version
version = None
if version_data := data.get("version"):
version = ConfluenceVersion.from_api_response(version_data)
# Process attachments
attachments = []
if (
attachments_data := data.get("children", {})
.get("attachment", {})
.get("results", [])
):
attachments = [
ConfluenceAttachment.from_api_response(attachment)
for attachment in attachments_data
]
# Process metadata timestamps
created = EMPTY_STRING
updated = EMPTY_STRING
if history := data.get("history"):
created = history.get("createdDate", EMPTY_STRING)
updated = history.get("lastUpdated", {}).get("when", EMPTY_STRING)
# Fall back to version date if no history is available
if not updated and version and version.when:
updated = version.when
# Construct URL if base_url is provided
url = None
if base_url := kwargs.get("base_url"):
page_id = data.get("id")
# Use different URL format based on whether it's cloud or server
is_cloud = kwargs.get("is_cloud", False)
if is_cloud:
# Cloud format: {base_url}/spaces/{space_key}/pages/{page_id}
space_key = space.key if space and space.key else "unknown"
url = f"{base_url}/spaces/{space_key}/pages/{page_id}"
else:
# Server format: {base_url}/pages/viewpage.action?pageId={page_id}
url = f"{base_url}/pages/viewpage.action?pageId={page_id}"
return cls(
id=str(data.get("id", CONFLUENCE_DEFAULT_ID)),
title=data.get("title", EMPTY_STRING),
type=data.get("type", "page"),
status=data.get("status", "current"),
space=space,
content=content,
content_format=content_format,
created=created,
updated=updated,
author=author,
version=version,
ancestors=data.get("ancestors", []),
children=data.get("children", {}),
attachments=attachments,
url=url,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"id": self.id,
"title": self.title,
"type": self.type,
"created": self.format_timestamp(self.created),
"updated": self.format_timestamp(self.updated),
"url": self.url,
}
# Add space information if available
if self.space:
result["space"] = {"key": self.space.key, "name": self.space.name}
# Add author information if available
if self.author:
result["author"] = self.author.display_name
# Add version information if available
if self.version:
result["version"] = self.version.number
# Add attachments if available
result["attachments"] = [
attachment.to_simplified_dict() for attachment in self.attachments
]
# Add content if it's not empty
if self.content and self.content_format:
result["content"] = {"value": self.content, "format": self.content_format}
# Add ancestors if there are any
if self.ancestors:
result["ancestors"] = [
{"id": a.get("id"), "title": a.get("title")}
for a in self.ancestors
if "id" in a
]
return result
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_client.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the ConfluenceClient class."""
import os
from unittest.mock import MagicMock, patch
from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
def test_init_with_basic_auth():
"""Test initializing the client with basic auth configuration."""
# Arrange
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="test_user",
api_token="test_token",
)
# Mock the Confluence class, ConfluencePreprocessor, and configure_ssl_verification
with (
patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
patch(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
) as mock_preprocessor,
patch(
"mcp_atlassian.confluence.client.configure_ssl_verification"
) as mock_configure_ssl,
):
# Act
client = ConfluenceClient(config=config)
# Assert
mock_confluence.assert_called_once_with(
url="https://test.atlassian.net/wiki",
username="test_user",
password="test_token",
cloud=True,
verify_ssl=True,
)
assert client.config == config
assert client.confluence == mock_confluence.return_value
assert client.preprocessor == mock_preprocessor.return_value
# Verify SSL verification was configured
mock_configure_ssl.assert_called_once_with(
service_name="Confluence",
url="https://test.atlassian.net/wiki",
session=mock_confluence.return_value._session,
ssl_verify=True,
)
def test_init_with_token_auth():
"""Test initializing the client with token auth configuration."""
# Arrange
config = ConfluenceConfig(
url="https://confluence.example.com",
auth_type="pat",
personal_token="test_personal_token",
ssl_verify=False,
)
# Mock the Confluence class, ConfluencePreprocessor, and configure_ssl_verification
with (
patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
patch(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
) as mock_preprocessor,
patch(
"mcp_atlassian.confluence.client.configure_ssl_verification"
) as mock_configure_ssl,
):
# Act
client = ConfluenceClient(config=config)
# Assert
mock_confluence.assert_called_once_with(
url="https://confluence.example.com",
token="test_personal_token",
cloud=False,
verify_ssl=False,
)
assert client.config == config
assert client.confluence == mock_confluence.return_value
assert client.preprocessor == mock_preprocessor.return_value
# Verify SSL verification was configured with ssl_verify=False
mock_configure_ssl.assert_called_once_with(
service_name="Confluence",
url="https://confluence.example.com",
session=mock_confluence.return_value._session,
ssl_verify=False,
)
def test_init_from_env():
"""Test initializing the client from environment variables."""
# Arrange
with (
patch(
"mcp_atlassian.confluence.config.ConfluenceConfig.from_env"
) as mock_from_env,
patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
patch("mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"),
patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
):
mock_config = MagicMock()
mock_from_env.return_value = mock_config
# Act
client = ConfluenceClient()
# Assert
mock_from_env.assert_called_once()
assert client.config == mock_config
def test_process_html_content():
"""Test the _process_html_content method."""
# Arrange
with (
patch("mcp_atlassian.confluence.client.ConfluenceConfig.from_env"),
patch("mcp_atlassian.confluence.client.Confluence"),
patch(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
) as mock_preprocessor_class,
patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
):
mock_preprocessor = mock_preprocessor_class.return_value
mock_preprocessor.process_html_content.return_value = (
"<p>HTML</p>",
"Markdown",
)
client = ConfluenceClient()
# Act
html, markdown = client._process_html_content("<p>Test</p>", "TEST")
# Assert
mock_preprocessor.process_html_content.assert_called_once_with(
"<p>Test</p>", "TEST", client.confluence
)
assert html == "<p>HTML</p>"
assert markdown == "Markdown"
def test_get_user_details_by_accountid():
"""Test the get_user_details_by_accountid method."""
# Arrange
with (
patch("mcp_atlassian.confluence.client.ConfluenceConfig.from_env"),
patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence_class,
patch("mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"),
patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
):
mock_confluence = mock_confluence_class.return_value
mock_confluence.get_user_details_by_accountid.return_value = {
"displayName": "Test User",
"accountId": "123456",
"emailAddress": "[email protected]",
"active": True,
}
client = ConfluenceFetcher()
# Act
user_details = client.get_user_details_by_accountid("123456")
# Assert
mock_confluence.get_user_details_by_accountid.assert_called_once_with(
"123456", None
)
assert user_details["displayName"] == "Test User"
assert user_details["accountId"] == "123456"
assert user_details["emailAddress"] == "[email protected]"
assert user_details["active"] is True
# Test with expand parameter
mock_confluence.get_user_details_by_accountid.reset_mock()
mock_confluence.get_user_details_by_accountid.return_value = {
"displayName": "Test User",
"accountId": "123456",
"status": "active",
}
user_details = client.get_user_details_by_accountid("123456", expand="status")
mock_confluence.get_user_details_by_accountid.assert_called_once_with(
"123456", "status"
)
assert user_details["status"] == "active"
def test_init_sets_proxies_and_no_proxy(monkeypatch):
"""Test that ConfluenceClient sets session proxies and NO_PROXY env var from config."""
# Patch Confluence and its _session
mock_confluence = MagicMock()
mock_session = MagicMock()
mock_session.proxies = {} # Use a real dict for proxies
mock_confluence._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
)
monkeypatch.setattr(
"mcp_atlassian.confluence.client.configure_ssl_verification",
lambda **kwargs: None,
)
monkeypatch.setattr(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
lambda **kwargs: MagicMock(),
)
# Patch environment
monkeypatch.setenv("NO_PROXY", "")
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="user",
api_token="token",
http_proxy="http://proxy:8080",
https_proxy="https://proxy:8443",
socks_proxy="socks5://user:pass@proxy:1080",
no_proxy="localhost,127.0.0.1",
)
client = ConfluenceClient(config=config)
assert mock_session.proxies["http"] == "http://proxy:8080"
assert mock_session.proxies["https"] == "https://proxy:8443"
assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"
def test_init_no_proxies(monkeypatch):
"""Test that ConfluenceClient does not set proxies if not configured."""
# Patch Confluence and its _session
mock_confluence = MagicMock()
mock_session = MagicMock()
mock_session.proxies = {} # Use a real dict for proxies
mock_confluence._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
)
monkeypatch.setattr(
"mcp_atlassian.confluence.client.configure_ssl_verification",
lambda **kwargs: None,
)
monkeypatch.setattr(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
lambda **kwargs: MagicMock(),
)
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="user",
api_token="token",
)
client = ConfluenceClient(config=config)
assert mock_session.proxies == {}
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/preprocessing/base.py:
--------------------------------------------------------------------------------
```python
"""Base preprocessing module."""
import logging
import re
import warnings
from typing import Any, Protocol
from bs4 import BeautifulSoup, Tag
from markdownify import markdownify as md
logger = logging.getLogger("mcp-atlassian")
class ConfluenceClient(Protocol):
"""Protocol for Confluence client."""
def get_user_details_by_accountid(self, account_id: str) -> dict[str, Any]:
"""Get user details by account ID."""
...
def get_user_details_by_username(self, username: str) -> dict[str, Any]:
"""Get user details by username (for Server/DC compatibility)."""
...
class BasePreprocessor:
"""Base class for text preprocessing operations."""
def __init__(self, base_url: str = "") -> None:
"""
Initialize the base text preprocessor.
Args:
base_url: Base URL for API server
"""
self.base_url = base_url.rstrip("/") if base_url else ""
def process_html_content(
self,
html_content: str,
space_key: str = "",
confluence_client: ConfluenceClient | None = None,
) -> tuple[str, str]:
"""
Process HTML content to replace user refs and page links.
Args:
html_content: The HTML content to process
space_key: Optional space key for context
confluence_client: Optional Confluence client for user lookups
Returns:
Tuple of (processed_html, processed_markdown)
"""
try:
# Parse the HTML content
soup = BeautifulSoup(html_content, "html.parser")
# Process user mentions
self._process_user_mentions_in_soup(soup, confluence_client)
self._process_user_profile_macros_in_soup(soup, confluence_client)
# Convert to string and markdown
processed_html = str(soup)
processed_markdown = md(processed_html)
return processed_html, processed_markdown
except Exception as e:
logger.error(f"Error in process_html_content: {str(e)}")
raise
def _process_user_mentions_in_soup(
self, soup: BeautifulSoup, confluence_client: ConfluenceClient | None = None
) -> None:
"""
Process user mentions in BeautifulSoup object.
Args:
soup: BeautifulSoup object containing HTML
confluence_client: Optional Confluence client for user lookups
"""
# Find all ac:link elements that might contain user mentions
user_mentions = soup.find_all("ac:link")
for user_element in user_mentions:
user_ref = user_element.find("ri:user")
if user_ref and user_ref.get("ri:account-id"):
# Case 1: Direct user reference without link-body
account_id = user_ref.get("ri:account-id")
if isinstance(account_id, str):
self._replace_user_mention(
user_element, account_id, confluence_client
)
continue
# Case 2: User reference with link-body containing @
link_body = user_element.find("ac:link-body")
if link_body and "@" in link_body.get_text(strip=True):
user_ref = user_element.find("ri:user")
if user_ref and user_ref.get("ri:account-id"):
account_id = user_ref.get("ri:account-id")
if isinstance(account_id, str):
self._replace_user_mention(
user_element, account_id, confluence_client
)
def _process_user_profile_macros_in_soup(
self, soup: BeautifulSoup, confluence_client: ConfluenceClient | None = None
) -> None:
"""
Process Confluence User Profile macros in BeautifulSoup object.
Replaces <ac:structured-macro ac:name="profile">...</ac:structured-macro>
with the user's display name, typically formatted as @DisplayName.
Args:
soup: BeautifulSoup object containing HTML
confluence_client: Optional Confluence client for user lookups
"""
profile_macros = soup.find_all(
"ac:structured-macro", attrs={"ac:name": "profile"}
)
for macro_element in profile_macros:
user_param = macro_element.find("ac:parameter", attrs={"ac:name": "user"})
if not user_param:
logger.debug(
"User profile macro found without a 'user' parameter. Replacing with placeholder."
)
macro_element.replace_with("[User Profile Macro (Malformed)]")
continue
user_ref = user_param.find("ri:user")
if not user_ref:
logger.debug(
"User profile macro's 'user' parameter found without 'ri:user' tag. Replacing with placeholder."
)
macro_element.replace_with("[User Profile Macro (Malformed)]")
continue
account_id = user_ref.get("ri:account-id")
userkey = user_ref.get("ri:userkey") # Fallback for Confluence Server/DC
user_identifier_for_log = account_id or userkey
display_name = None
if confluence_client and user_identifier_for_log:
try:
if account_id and isinstance(account_id, str):
user_details = confluence_client.get_user_details_by_accountid(
account_id
)
display_name = user_details.get("displayName")
elif userkey and isinstance(userkey, str):
# For Confluence Server/DC, userkey might be the username
user_details = confluence_client.get_user_details_by_username(
userkey
)
display_name = user_details.get("displayName")
except Exception as e:
logger.warning(
f"Error fetching user details for profile macro (user: {user_identifier_for_log}): {e}"
)
elif not confluence_client:
logger.warning(
"Confluence client not available for User Profile Macro processing."
)
if display_name:
replacement_text = f"@{display_name}"
macro_element.replace_with(replacement_text)
else:
fallback_identifier = (
user_identifier_for_log
if user_identifier_for_log
else "unknown_user"
)
fallback_text = f"[User Profile: {fallback_identifier}]"
macro_element.replace_with(fallback_text)
logger.debug(f"Using fallback for user profile macro: {fallback_text}")
def _replace_user_mention(
self,
user_element: Tag,
account_id: str,
confluence_client: ConfluenceClient | None = None,
) -> None:
"""
Replace a user mention with the user's display name.
Args:
user_element: The HTML element containing the user mention
account_id: The user's account ID
confluence_client: Optional Confluence client for user lookups
"""
try:
# Only attempt to get user details if we have a valid confluence client
if confluence_client is not None:
user_details = confluence_client.get_user_details_by_accountid(
account_id
)
display_name = user_details.get("displayName", "")
if display_name:
new_text = f"@{display_name}"
user_element.replace_with(new_text)
return
# If we don't have a confluence client or couldn't get user details,
# use fallback
self._use_fallback_user_mention(user_element, account_id)
except Exception as e:
logger.warning(f"Error processing user mention: {str(e)}")
self._use_fallback_user_mention(user_element, account_id)
def _use_fallback_user_mention(self, user_element: Tag, account_id: str) -> None:
"""
Replace user mention with a fallback when the API call fails.
Args:
user_element: The HTML element containing the user mention
account_id: The user's account ID
"""
# Fallback: just use the account ID
new_text = f"@user_{account_id}"
user_element.replace_with(new_text)
def _convert_html_to_markdown(self, text: str) -> str:
"""Convert HTML content to markdown if needed."""
if re.search(r"<[^>]+>", text):
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning)
soup = BeautifulSoup(f"<div>{text}</div>", "html.parser")
html = str(soup.div.decode_contents()) if soup.div else text
text = md(html)
except Exception as e:
logger.warning(f"Error converting HTML to markdown: {str(e)}")
return text
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_comments.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for the CommentsMixin class."""
from unittest.mock import patch
import pytest
import requests
from mcp_atlassian.confluence.comments import CommentsMixin
class TestCommentsMixin:
"""Tests for the CommentsMixin class."""
@pytest.fixture
def comments_mixin(self, confluence_client):
"""Create a CommentsMixin instance for testing."""
# CommentsMixin inherits from ConfluenceClient, so we need to create it properly
with patch(
"mcp_atlassian.confluence.comments.ConfluenceClient.__init__"
) as mock_init:
mock_init.return_value = None
mixin = CommentsMixin()
# Copy the necessary attributes from our mocked client
mixin.confluence = confluence_client.confluence
mixin.config = confluence_client.config
mixin.preprocessor = confluence_client.preprocessor
return mixin
def test_get_page_comments_success(self, comments_mixin):
"""Test get_page_comments with success response."""
# Setup
page_id = "12345"
# Configure the mock to return a successful response
comments_mixin.confluence.get_page_comments.return_value = {
"results": [
{
"id": "12345",
"body": {"view": {"value": "<p>Comment content here</p>"}},
"version": {"number": 1},
"author": {"displayName": "John Doe"},
}
]
}
# Mock preprocessor
comments_mixin.preprocessor.process_html_content.return_value = (
"<p>Processed HTML</p>",
"Processed Markdown",
)
# Call the method
result = comments_mixin.get_page_comments(page_id)
# Verify
comments_mixin.confluence.get_page_comments.assert_called_once_with(
content_id=page_id, expand="body.view.value,version", depth="all"
)
assert len(result) == 1
assert result[0].body == "Processed Markdown"
def test_get_page_comments_with_html(self, comments_mixin):
"""Test get_page_comments with HTML output instead of markdown."""
# Setup
page_id = "12345"
comments_mixin.confluence.get_page_comments.return_value = {
"results": [
{
"id": "12345",
"body": {"view": {"value": "<p>Comment content here</p>"}},
"version": {"number": 1},
"author": {"displayName": "John Doe"},
}
]
}
# Mock the HTML processing
comments_mixin.preprocessor.process_html_content.return_value = (
"<p>Processed HTML</p>",
"Processed markdown",
)
# Call the method
result = comments_mixin.get_page_comments(page_id, return_markdown=False)
# Verify result
assert len(result) == 1
comment = result[0]
assert comment.body == "<p>Processed HTML</p>"
def test_get_page_comments_api_error(self, comments_mixin):
"""Test handling of API errors."""
# Mock the API to raise an exception
comments_mixin.confluence.get_page_comments.side_effect = (
requests.RequestException("API error")
)
# Act
result = comments_mixin.get_page_comments("987654321")
# Assert
assert isinstance(result, list)
assert len(result) == 0 # Empty list on error
def test_get_page_comments_key_error(self, comments_mixin):
"""Test handling of missing keys in API response."""
# Mock the response to be missing expected keys
comments_mixin.confluence.get_page_comments.return_value = {"invalid": "data"}
# Act
result = comments_mixin.get_page_comments("987654321")
# Assert
assert isinstance(result, list)
assert len(result) == 0 # Empty list on error
def test_get_page_comments_value_error(self, comments_mixin):
"""Test handling of unexpected data types."""
# Cause a value error by returning a string where a dict is expected
comments_mixin.confluence.get_page_by_id.return_value = "invalid"
# Act
result = comments_mixin.get_page_comments("987654321")
# Assert
assert isinstance(result, list)
assert len(result) == 0 # Empty list on error
def test_get_page_comments_with_empty_results(self, comments_mixin):
"""Test handling of empty results."""
# Mock empty results
comments_mixin.confluence.get_page_comments.return_value = {"results": []}
# Act
result = comments_mixin.get_page_comments("987654321")
# Assert
assert isinstance(result, list)
assert len(result) == 0 # Empty list with no comments
def test_add_comment_success(self, comments_mixin):
"""Test adding a comment with success response."""
# Setup
page_id = "12345"
content = "This is a test comment"
# Mock the page retrieval
comments_mixin.confluence.get_page_by_id.return_value = {
"space": {"key": "TEST"}
}
# Mock the preprocessor's conversion method
comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
"<p>This is a test comment</p>"
)
# Configure the mock to return a successful response
comments_mixin.confluence.add_comment.return_value = {
"id": "98765",
"body": {"view": {"value": "<p>This is a test comment</p>"}},
"version": {"number": 1},
"author": {"displayName": "Test User"},
}
# Mock the HTML processing
comments_mixin.preprocessor.process_html_content.return_value = (
"<p>This is a test comment</p>",
"This is a test comment",
)
# Call the method
result = comments_mixin.add_comment(page_id, content)
# Verify
comments_mixin.confluence.add_comment.assert_called_once_with(
page_id, "<p>This is a test comment</p>"
)
assert result is not None
assert result.id == "98765"
assert result.body == "This is a test comment"
def test_add_comment_with_html_content(self, comments_mixin):
"""Test adding a comment with HTML content."""
# Setup
page_id = "12345"
content = "<p>This is an <strong>HTML</strong> comment</p>"
# Mock the page retrieval
comments_mixin.confluence.get_page_by_id.return_value = {
"space": {"key": "TEST"}
}
# Configure the mock to return a successful response
comments_mixin.confluence.add_comment.return_value = {
"id": "98765",
"body": {
"view": {"value": "<p>This is an <strong>HTML</strong> comment</p>"}
},
"version": {"number": 1},
"author": {"displayName": "Test User"},
}
# Mock the HTML processing
comments_mixin.preprocessor.process_html_content.return_value = (
"<p>This is an <strong>HTML</strong> comment</p>",
"This is an **HTML** comment",
)
# Call the method
result = comments_mixin.add_comment(page_id, content)
# Verify - should not call markdown conversion since content is already HTML
comments_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()
comments_mixin.confluence.add_comment.assert_called_once_with(page_id, content)
assert result is not None
assert result.body == "This is an **HTML** comment"
def test_add_comment_api_error(self, comments_mixin):
"""Test handling of API errors when adding a comment."""
# Setup
page_id = "12345"
content = "This is a test comment"
# Mock the page retrieval
comments_mixin.confluence.get_page_by_id.return_value = {
"space": {"key": "TEST"}
}
# Mock the preprocessor's conversion method
comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
"<p>This is a test comment</p>"
)
# Mock the API to raise an exception
comments_mixin.confluence.add_comment.side_effect = requests.RequestException(
"API error"
)
# Call the method
result = comments_mixin.add_comment(page_id, content)
# Verify
assert result is None
def test_add_comment_empty_response(self, comments_mixin):
"""Test handling of empty API response when adding a comment."""
# Setup
page_id = "12345"
content = "This is a test comment"
# Mock the page retrieval
comments_mixin.confluence.get_page_by_id.return_value = {
"space": {"key": "TEST"}
}
# Mock the preprocessor's conversion method
comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
"<p>This is a test comment</p>"
)
# Configure the mock to return an empty response
comments_mixin.confluence.add_comment.return_value = None
# Call the method
result = comments_mixin.add_comment(page_id, content)
# Verify
assert result is None
```
--------------------------------------------------------------------------------
/tests/integration/test_ssl_verification.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for SSL verification functionality."""
import os
from unittest.mock import MagicMock, patch
import pytest
from requests.exceptions import SSLError
from requests.sessions import Session
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.ssl import SSLIgnoreAdapter, configure_ssl_verification
from tests.utils.base import BaseAuthTest
from tests.utils.mocks import MockEnvironment
@pytest.mark.integration
def test_configure_ssl_verification_with_real_confluence_url():
"""Test SSL verification configuration with real Confluence URL from environment."""
# Get the URL from the environment
url = os.getenv("CONFLUENCE_URL")
if not url:
pytest.skip("CONFLUENCE_URL not set in environment")
# Create a real session
session = Session()
original_adapters_count = len(session.adapters)
# Mock the SSL_VERIFY value to be False for this test
with patch.dict(os.environ, {"CONFLUENCE_SSL_VERIFY": "false"}):
# Configure SSL verification - explicitly pass ssl_verify=False
configure_ssl_verification(
service_name="Confluence",
url=url,
session=session,
ssl_verify=False,
)
# Extract domain from URL (remove protocol and path)
domain = url.split("://")[1].split("/")[0]
# Verify the adapters are mounted correctly
assert len(session.adapters) == original_adapters_count + 2
assert f"https://{domain}" in session.adapters
assert f"http://{domain}" in session.adapters
assert isinstance(session.adapters[f"https://{domain}"], SSLIgnoreAdapter)
assert isinstance(session.adapters[f"http://{domain}"], SSLIgnoreAdapter)
class TestSSLVerificationEnhanced(BaseAuthTest):
"""Enhanced SSL verification tests using test utilities."""
@pytest.mark.integration
def test_ssl_verification_enabled_by_default(self):
"""Test that SSL verification is enabled by default."""
with MockEnvironment.basic_auth_env():
# For Jira
jira_config = JiraConfig.from_env()
assert jira_config.ssl_verify is True
# For Confluence
confluence_config = ConfluenceConfig.from_env()
assert confluence_config.ssl_verify is True
@pytest.mark.integration
def test_ssl_verification_disabled_via_env(self):
"""Test SSL verification can be disabled via environment variables."""
with MockEnvironment.basic_auth_env() as env_vars:
env_vars["JIRA_SSL_VERIFY"] = "false"
env_vars["CONFLUENCE_SSL_VERIFY"] = "false"
# For Jira - need to reload config after env change
with patch.dict(os.environ, env_vars):
jira_config = JiraConfig.from_env()
assert jira_config.ssl_verify is False
# For Confluence
confluence_config = ConfluenceConfig.from_env()
assert confluence_config.ssl_verify is False
@pytest.mark.integration
def test_ssl_adapter_mounting_for_multiple_domains(self):
"""Test SSL adapters are correctly mounted for multiple domains."""
session = Session()
# Configure for multiple domains
urls = [
"https://domain1.atlassian.net",
"https://domain2.atlassian.net/wiki",
"https://custom.domain.com/jira",
]
for url in urls:
configure_ssl_verification(
service_name="Test", url=url, session=session, ssl_verify=False
)
# Verify all domains have SSL adapters
assert "https://domain1.atlassian.net" in session.adapters
assert "https://domain2.atlassian.net" in session.adapters
assert "https://custom.domain.com" in session.adapters
@pytest.mark.integration
def test_ssl_error_handling_with_invalid_cert(self, monkeypatch):
"""Test SSL error handling when certificate validation fails."""
# Mock the Jira class to simulate SSL error
mock_jira = MagicMock()
mock_jira.side_effect = SSLError("Certificate verification failed")
monkeypatch.setattr("mcp_atlassian.jira.client.Jira", mock_jira)
with MockEnvironment.basic_auth_env():
config = JiraConfig.from_env()
config.ssl_verify = True # Ensure SSL verification is on
# Creating client should raise SSL error
with pytest.raises(SSLError, match="Certificate verification failed"):
JiraClient(config=config)
@pytest.mark.integration
def test_ssl_verification_with_custom_ca_bundle(self):
"""Test SSL verification with custom CA bundle path."""
with MockEnvironment.basic_auth_env() as env_vars:
# Set custom CA bundle path
custom_ca_path = "/path/to/custom/ca-bundle.crt"
env_vars["JIRA_SSL_VERIFY"] = custom_ca_path
env_vars["CONFLUENCE_SSL_VERIFY"] = custom_ca_path
# For Jira - need to reload config after env change
with patch.dict(os.environ, env_vars):
jira_config = JiraConfig.from_env()
# Note: Current implementation only supports boolean ssl_verify
# Custom CA bundle paths are not supported in the config parsing
assert (
jira_config.ssl_verify is True
) # Any non-false value becomes True
# For Confluence
confluence_config = ConfluenceConfig.from_env()
assert (
confluence_config.ssl_verify is True
) # Any non-false value becomes True
@pytest.mark.integration
def test_ssl_adapter_not_mounted_when_verification_enabled(self):
"""Test that SSL adapters are not mounted when verification is enabled."""
session = Session()
original_adapter_count = len(session.adapters)
# Configure with SSL verification enabled
configure_ssl_verification(
service_name="Jira",
url="https://test.atlassian.net",
session=session,
ssl_verify=True, # SSL verification enabled
)
# No additional adapters should be mounted
assert len(session.adapters) == original_adapter_count
assert "https://test.atlassian.net" not in session.adapters
@pytest.mark.integration
def test_ssl_configuration_persistence_across_requests(self):
"""Test SSL configuration persists across multiple requests."""
session = Session()
# Configure SSL for a domain
configure_ssl_verification(
service_name="Jira",
url="https://test.atlassian.net",
session=session,
ssl_verify=False,
)
# Get the adapter
adapter = session.adapters.get("https://test.atlassian.net")
assert isinstance(adapter, SSLIgnoreAdapter)
# Configure again - should not create duplicate adapters
configure_ssl_verification(
service_name="Jira",
url="https://test.atlassian.net",
session=session,
ssl_verify=False,
)
# Should still have an SSLIgnoreAdapter present
new_adapter = session.adapters.get("https://test.atlassian.net")
assert isinstance(new_adapter, SSLIgnoreAdapter)
@pytest.mark.integration
def test_ssl_verification_with_oauth_configuration(self):
"""Test SSL verification works correctly with OAuth configuration."""
with MockEnvironment.oauth_env() as env_vars:
# Add SSL configuration
env_vars["JIRA_SSL_VERIFY"] = "false"
env_vars["CONFLUENCE_SSL_VERIFY"] = "false"
# OAuth config should still respect SSL settings
# Need to reload config after env change
with patch.dict(os.environ, env_vars):
# Note: OAuth flow would need additional setup, but we're testing config only
assert os.environ.get("JIRA_SSL_VERIFY") == "false"
assert os.environ.get("CONFLUENCE_SSL_VERIFY") == "false"
@pytest.mark.integration
def test_configure_ssl_verification_with_real_jira_url():
"""Test SSL verification configuration with real Jira URL from environment."""
# Get the URL from the environment
url = os.getenv("JIRA_URL")
if not url:
pytest.skip("JIRA_URL not set in environment")
# Create a real session
session = Session()
original_adapters_count = len(session.adapters)
# Mock the SSL_VERIFY value to be False for this test
with patch.dict(os.environ, {"JIRA_SSL_VERIFY": "false"}):
# Configure SSL verification - explicitly pass ssl_verify=False
configure_ssl_verification(
service_name="Jira",
url=url,
session=session,
ssl_verify=False,
)
# Extract domain from URL (remove protocol and path)
domain = url.split("://")[1].split("/")[0]
# Verify the adapters are mounted correctly
assert len(session.adapters) == original_adapters_count + 2
assert f"https://{domain}" in session.adapters
assert f"http://{domain}" in session.adapters
assert isinstance(session.adapters[f"https://{domain}"], SSLIgnoreAdapter)
assert isinstance(session.adapters[f"http://{domain}"], SSLIgnoreAdapter)
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_client.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira client module."""
import os
from copy import deepcopy
from typing import Literal
from unittest.mock import MagicMock, call, patch
import pytest
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
class DeepcopyMock(MagicMock):
"""A Mock that creates a deep copy of its arguments before storing them."""
def __call__(self, /, *args, **kwargs):
args = deepcopy(args)
kwargs = deepcopy(kwargs)
return super().__call__(*args, **kwargs)
def test_init_with_basic_auth():
"""Test initializing the client with basic auth configuration."""
with (
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
):
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test_username",
api_token="test_token",
)
client = JiraClient(config=config)
# Verify Jira was initialized correctly
mock_jira.assert_called_once_with(
url="https://test.atlassian.net",
username="test_username",
password="test_token",
cloud=True,
verify_ssl=True,
)
# Verify SSL verification was configured
mock_configure_ssl.assert_called_once_with(
service_name="Jira",
url="https://test.atlassian.net",
session=mock_jira.return_value._session,
ssl_verify=True,
)
assert client.config == config
assert client._field_ids_cache is None
assert client._current_user_account_id is None
def test_init_with_token_auth():
"""Test initializing the client with token auth configuration."""
with (
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
):
config = JiraConfig(
url="https://jira.example.com",
auth_type="pat",
personal_token="test_personal_token",
ssl_verify=False,
)
client = JiraClient(config=config)
# Verify Jira was initialized correctly
mock_jira.assert_called_once_with(
url="https://jira.example.com",
token="test_personal_token",
cloud=False,
verify_ssl=False,
)
# Verify SSL verification was configured with ssl_verify=False
mock_configure_ssl.assert_called_once_with(
service_name="Jira",
url="https://jira.example.com",
session=mock_jira.return_value._session,
ssl_verify=False,
)
assert client.config == config
def test_init_from_env():
"""Test initializing the client from environment variables."""
with (
patch("mcp_atlassian.jira.config.JiraConfig.from_env") as mock_from_env,
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch("mcp_atlassian.jira.client.configure_ssl_verification"),
):
mock_config = MagicMock()
mock_config.auth_type = "basic" # needed for the if condition
mock_from_env.return_value = mock_config
client = JiraClient()
mock_from_env.assert_called_once()
assert client.config == mock_config
def test_clean_text():
"""Test the _clean_text method."""
with (
patch("mcp_atlassian.jira.client.Jira"),
patch("mcp_atlassian.jira.client.configure_ssl_verification"),
):
client = JiraClient(
config=JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test_username",
api_token="test_token",
)
)
# Test with HTML
assert client._clean_text("<p>Test content</p>") == "Test content"
# Test with empty string
assert client._clean_text("") == ""
# Test with spaces and newlines
assert client._clean_text(" \n Test with spaces \n ") == "Test with spaces"
def _test_get_paged(method: Literal["get", "post"]):
"""Test the get_paged method."""
with (
patch(
"mcp_atlassian.jira.client.Jira.get", new_callable=DeepcopyMock
) as mock_get,
patch(
"mcp_atlassian.jira.client.Jira.post", new_callable=DeepcopyMock
) as mock_post,
patch("mcp_atlassian.jira.client.configure_ssl_verification"),
):
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test_username",
api_token="test_token",
)
client = JiraClient(config=config)
# Mock paged responses
mock_responses = [
{"data": "page1", "nextPageToken": "token1"},
{"data": "page2", "nextPageToken": "token2"},
{"data": "page3"}, # Last page does not have nextPageToken
]
# Create mock method with side effect to return responses in sequence
if method == "get":
mock_get.side_effect = mock_responses
mock_post.side_effect = RuntimeError("This should not be called")
else:
mock_post.side_effect = mock_responses
mock_get.side_effect = RuntimeError("This should not be called")
# Run the method
params = {"initial": "params"}
results = client.get_paged(method, "/test/url", params)
# Verify the results
assert results == mock_responses
# Verify call parameters
if method == "get":
expected_calls = [
call(path="/test/url", params={"initial": "params"}, absolute=False),
call(
path="/test/url",
params={"initial": "params", "nextPageToken": "token1"},
absolute=False,
),
call(
path="/test/url",
params={"initial": "params", "nextPageToken": "token2"},
absolute=False,
),
]
assert mock_get.call_args_list == expected_calls
else:
expected_calls = [
call(path="/test/url", json={"initial": "params"}, absolute=False),
call(
path="/test/url",
json={"initial": "params", "nextPageToken": "token1"},
absolute=False,
),
call(
path="/test/url",
json={"initial": "params", "nextPageToken": "token2"},
absolute=False,
),
]
assert mock_post.call_args_list == expected_calls
def test_get_paged_get():
"""Test the get_paged method for GET requests."""
_test_get_paged("get")
def test_get_paged_post():
"""Test the get_paged method for POST requests."""
_test_get_paged("post")
def test_get_paged_without_cloud():
"""Test the get_paged method without cloud."""
with patch("mcp_atlassian.jira.client.configure_ssl_verification"):
config = JiraConfig(
url="https://jira.example.com",
auth_type="pat",
personal_token="test_token",
)
client = JiraClient(config=config)
with pytest.raises(
ValueError,
match="Paged requests are only available for Jira Cloud platform",
):
client.get_paged("get", "/test/url")
def test_init_sets_proxies_and_no_proxy(monkeypatch):
"""Test that JiraClient sets session proxies and NO_PROXY env var from config."""
# Patch Jira and its _session
mock_jira = MagicMock()
mock_session = MagicMock()
mock_session.proxies = {} # Use a real dict for proxies
mock_jira._session = mock_session
monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
)
# Patch environment
monkeypatch.setenv("NO_PROXY", "")
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="pat",
http_proxy="http://proxy:8080",
https_proxy="https://proxy:8443",
socks_proxy="socks5://user:pass@proxy:1080",
no_proxy="localhost,127.0.0.1",
)
client = JiraClient(config=config)
assert mock_session.proxies["http"] == "http://proxy:8080"
assert mock_session.proxies["https"] == "https://proxy:8443"
assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"
def test_init_no_proxies(monkeypatch):
"""Test that JiraClient does not set proxies if not configured."""
# Patch Jira and its _session
mock_jira = MagicMock()
mock_session = MagicMock()
mock_session.proxies = {} # Use a real dict for proxies
mock_jira._session = mock_session
monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
)
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="pat",
)
client = JiraClient(config=config)
assert mock_session.proxies == {}
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/attachments.py:
--------------------------------------------------------------------------------
```python
"""Attachment operations for Jira API."""
import logging
import os
from pathlib import Path
from typing import Any
from ..models.jira import JiraAttachment
from .client import JiraClient
from .protocols import AttachmentsOperationsProto
# Configure logging
logger = logging.getLogger("mcp-jira")
class AttachmentsMixin(JiraClient, AttachmentsOperationsProto):
"""Mixin for Jira attachment operations."""
def download_attachment(self, url: str, target_path: str) -> bool:
"""
Download a Jira attachment to the specified path.
Args:
url: The URL of the attachment to download
target_path: The path where the attachment should be saved
Returns:
True if successful, False otherwise
"""
if not url:
logger.error("No URL provided for attachment download")
return False
try:
# Convert to absolute path if relative
if not os.path.isabs(target_path):
target_path = os.path.abspath(target_path)
logger.info(f"Downloading attachment from {url} to {target_path}")
# Create the directory if it doesn't exist
os.makedirs(os.path.dirname(target_path), exist_ok=True)
# Use the Jira session to download the file
response = self.jira._session.get(url, stream=True)
response.raise_for_status()
# Write the file to disk
with open(target_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
# Verify the file was created
if os.path.exists(target_path):
file_size = os.path.getsize(target_path)
logger.info(
f"Successfully downloaded attachment to {target_path} (size: {file_size} bytes)"
)
return True
else:
logger.error(f"File was not created at {target_path}")
return False
except Exception as e:
logger.error(f"Error downloading attachment: {str(e)}")
return False
def download_issue_attachments(
self, issue_key: str, target_dir: str
) -> dict[str, Any]:
"""
Download all attachments for a Jira issue.
Args:
issue_key: The Jira issue key (e.g., 'PROJ-123')
target_dir: The directory where attachments should be saved
Returns:
A dictionary with download results
"""
# Convert to absolute path if relative
if not os.path.isabs(target_dir):
target_dir = os.path.abspath(target_dir)
logger.info(
f"Downloading attachments for {issue_key} to directory: {target_dir}"
)
# Create the target directory if it doesn't exist
target_path = Path(target_dir)
target_path.mkdir(parents=True, exist_ok=True)
# Get the issue with attachments
logger.info(f"Fetching issue {issue_key} with attachments")
issue_data = self.jira.issue(issue_key, fields="attachment")
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
if "fields" not in issue_data:
logger.error(f"Could not retrieve issue {issue_key}")
return {"success": False, "error": f"Could not retrieve issue {issue_key}"}
# Process attachments
attachments = []
results = []
# Extract attachments from the API response
attachment_data = issue_data.get("fields", {}).get("attachment", [])
if not attachment_data:
return {
"success": True,
"message": f"No attachments found for issue {issue_key}",
"downloaded": [],
"failed": [],
}
# Create JiraAttachment objects for each attachment
for attachment in attachment_data:
if isinstance(attachment, dict):
attachments.append(JiraAttachment.from_api_response(attachment))
# Download each attachment
downloaded = []
failed = []
for attachment in attachments:
if not attachment.url:
logger.warning(f"No URL for attachment {attachment.filename}")
failed.append(
{"filename": attachment.filename, "error": "No URL available"}
)
continue
# Create a safe filename
safe_filename = Path(attachment.filename).name
file_path = target_path / safe_filename
# Download the attachment
success = self.download_attachment(attachment.url, str(file_path))
if success:
downloaded.append(
{
"filename": attachment.filename,
"path": str(file_path),
"size": attachment.size,
}
)
else:
failed.append(
{"filename": attachment.filename, "error": "Download failed"}
)
return {
"success": True,
"issue_key": issue_key,
"total": len(attachments),
"downloaded": downloaded,
"failed": failed,
}
def upload_attachment(self, issue_key: str, file_path: str) -> dict[str, Any]:
"""
Upload a single attachment to a Jira issue.
Args:
issue_key: The Jira issue key (e.g., 'PROJ-123')
file_path: The path to the file to upload
Returns:
A dictionary with upload result information
"""
if not issue_key:
logger.error("No issue key provided for attachment upload")
return {"success": False, "error": "No issue key provided"}
if not file_path:
logger.error("No file path provided for attachment upload")
return {"success": False, "error": "No file path provided"}
try:
# Convert to absolute path if relative
if not os.path.isabs(file_path):
file_path = os.path.abspath(file_path)
# Check if file exists
if not os.path.exists(file_path):
logger.error(f"File not found: {file_path}")
return {"success": False, "error": f"File not found: {file_path}"}
logger.info(f"Uploading attachment from {file_path} to issue {issue_key}")
# Use the Jira API to upload the file
filename = os.path.basename(file_path)
with open(file_path, "rb") as file:
attachment = self.jira.add_attachment(
issue_key=issue_key, filename=file_path
)
if attachment:
file_size = os.path.getsize(file_path)
logger.info(
f"Successfully uploaded attachment {filename} to {issue_key} (size: {file_size} bytes)"
)
return {
"success": True,
"issue_key": issue_key,
"filename": filename,
"size": file_size,
"id": attachment.get("id")
if isinstance(attachment, dict)
else None,
}
else:
logger.error(f"Failed to upload attachment {filename} to {issue_key}")
return {
"success": False,
"error": f"Failed to upload attachment {filename} to {issue_key}",
}
except Exception as e:
error_msg = str(e)
logger.error(f"Error uploading attachment: {error_msg}")
return {"success": False, "error": error_msg}
def upload_attachments(
self, issue_key: str, file_paths: list[str]
) -> dict[str, Any]:
"""
Upload multiple attachments to a Jira issue.
Args:
issue_key: The Jira issue key (e.g., 'PROJ-123')
file_paths: List of paths to files to upload
Returns:
A dictionary with upload results
"""
if not issue_key:
logger.error("No issue key provided for attachment upload")
return {"success": False, "error": "No issue key provided"}
if not file_paths:
logger.error("No file paths provided for attachment upload")
return {"success": False, "error": "No file paths provided"}
logger.info(f"Uploading {len(file_paths)} attachments to issue {issue_key}")
# Upload each attachment
uploaded = []
failed = []
for file_path in file_paths:
result = self.upload_attachment(issue_key, file_path)
if result.get("success"):
uploaded.append(
{
"filename": result.get("filename"),
"size": result.get("size"),
"id": result.get("id"),
}
)
else:
failed.append(
{
"filename": os.path.basename(file_path),
"error": result.get("error"),
}
)
return {
"success": True,
"issue_key": issue_key,
"total": len(file_paths),
"uploaded": uploaded,
"failed": failed,
}
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_environment.py:
--------------------------------------------------------------------------------
```python
"""Tests for the environment utilities module."""
import logging
import pytest
from mcp_atlassian.utils.environment import get_available_services
from tests.utils.assertions import assert_log_contains
from tests.utils.mocks import MockEnvironment
@pytest.fixture(autouse=True)
def setup_logger():
"""Ensure logger is set to INFO level for capturing log messages."""
logger = logging.getLogger("mcp-atlassian.utils.environment")
original_level = logger.level
logger.setLevel(logging.INFO)
yield
logger.setLevel(original_level)
@pytest.fixture
def env_scenarios():
"""Environment configuration scenarios for testing."""
return {
"oauth_cloud": {
"CONFLUENCE_URL": "https://company.atlassian.net",
"JIRA_URL": "https://company.atlassian.net",
"ATLASSIAN_OAUTH_CLIENT_ID": "client_id",
"ATLASSIAN_OAUTH_CLIENT_SECRET": "client_secret",
"ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080/callback",
"ATLASSIAN_OAUTH_SCOPE": "read:jira-user",
"ATLASSIAN_OAUTH_CLOUD_ID": "cloud_id",
},
"basic_auth_cloud": {
"CONFLUENCE_URL": "https://company.atlassian.net",
"CONFLUENCE_USERNAME": "[email protected]",
"CONFLUENCE_API_TOKEN": "api_token",
"JIRA_URL": "https://company.atlassian.net",
"JIRA_USERNAME": "[email protected]",
"JIRA_API_TOKEN": "api_token",
},
"pat_server": {
"CONFLUENCE_URL": "https://confluence.company.com",
"CONFLUENCE_PERSONAL_TOKEN": "pat_token",
"JIRA_URL": "https://jira.company.com",
"JIRA_PERSONAL_TOKEN": "pat_token",
},
"basic_auth_server": {
"CONFLUENCE_URL": "https://confluence.company.com",
"CONFLUENCE_USERNAME": "admin",
"CONFLUENCE_API_TOKEN": "password",
"JIRA_URL": "https://jira.company.com",
"JIRA_USERNAME": "admin",
"JIRA_API_TOKEN": "password",
},
}
def _assert_service_availability(result, confluence_expected, jira_expected):
"""Helper to assert service availability."""
assert result == {"confluence": confluence_expected, "jira": jira_expected}
def _assert_authentication_logs(caplog, auth_type, services):
"""Helper to assert authentication log messages."""
log_patterns = {
"oauth": "OAuth 2.0 (3LO) authentication (Cloud-only features)",
"cloud_basic": "Cloud Basic Authentication (API Token)",
"server": "Server/Data Center authentication (PAT or Basic Auth)",
"not_configured": "is not configured or required environment variables are missing",
}
for service in services:
service_name = service.title()
if auth_type == "not_configured":
assert_log_contains(
caplog, "INFO", f"{service_name} {log_patterns[auth_type]}"
)
else:
assert_log_contains(
caplog, "INFO", f"Using {service_name} {log_patterns[auth_type]}"
)
class TestGetAvailableServices:
"""Test cases for get_available_services function."""
def test_no_services_configured(self, caplog):
"""Test that no services are available when no environment variables are set."""
with MockEnvironment.clean_env():
result = get_available_services()
_assert_service_availability(
result, confluence_expected=False, jira_expected=False
)
_assert_authentication_logs(
caplog, "not_configured", ["confluence", "jira"]
)
@pytest.mark.parametrize(
"scenario,expected_confluence,expected_jira",
[
("oauth_cloud", True, True),
("basic_auth_cloud", True, True),
("pat_server", True, True),
("basic_auth_server", True, True),
],
)
def test_valid_authentication_scenarios(
self, env_scenarios, scenario, expected_confluence, expected_jira, caplog
):
"""Test various valid authentication scenarios."""
with MockEnvironment.clean_env():
for key, value in env_scenarios[scenario].items():
import os
os.environ[key] = value
result = get_available_services()
_assert_service_availability(
result,
confluence_expected=expected_confluence,
jira_expected=expected_jira,
)
# Verify appropriate log messages based on scenario
if scenario == "oauth_cloud":
_assert_authentication_logs(caplog, "oauth", ["confluence", "jira"])
elif scenario == "basic_auth_cloud":
_assert_authentication_logs(
caplog, "cloud_basic", ["confluence", "jira"]
)
elif scenario in ["pat_server", "basic_auth_server"]:
_assert_authentication_logs(caplog, "server", ["confluence", "jira"])
@pytest.mark.parametrize(
"missing_oauth_var",
[
"ATLASSIAN_OAUTH_CLIENT_ID",
"ATLASSIAN_OAUTH_CLIENT_SECRET",
"ATLASSIAN_OAUTH_REDIRECT_URI",
"ATLASSIAN_OAUTH_SCOPE",
"ATLASSIAN_OAUTH_CLOUD_ID",
],
)
def test_oauth_missing_required_vars(
self, env_scenarios, missing_oauth_var, caplog
):
"""Test that OAuth fails when any required variable is missing."""
with MockEnvironment.clean_env():
oauth_config = env_scenarios["oauth_cloud"]
# Remove one required OAuth variable
del oauth_config[missing_oauth_var]
for key, value in oauth_config.items():
import os
os.environ[key] = value
result = get_available_services()
_assert_service_availability(
result, confluence_expected=False, jira_expected=False
)
@pytest.mark.parametrize(
"missing_basic_vars,service",
[
(["CONFLUENCE_USERNAME", "JIRA_USERNAME"], "username"),
(["CONFLUENCE_API_TOKEN", "JIRA_API_TOKEN"], "token"),
],
)
def test_basic_auth_missing_credentials(
self, env_scenarios, missing_basic_vars, service
):
"""Test that basic auth fails when credentials are missing."""
with MockEnvironment.clean_env():
basic_config = env_scenarios["basic_auth_cloud"].copy()
# Remove required variables
for var in missing_basic_vars:
del basic_config[var]
for key, value in basic_config.items():
import os
os.environ[key] = value
result = get_available_services()
_assert_service_availability(
result, confluence_expected=False, jira_expected=False
)
def test_oauth_precedence_over_basic_auth(self, env_scenarios, caplog):
"""Test that OAuth takes precedence over Basic Auth."""
with MockEnvironment.clean_env():
# Set both OAuth and Basic Auth variables
combined_config = {
**env_scenarios["oauth_cloud"],
**env_scenarios["basic_auth_cloud"],
}
for key, value in combined_config.items():
import os
os.environ[key] = value
result = get_available_services()
_assert_service_availability(
result, confluence_expected=True, jira_expected=True
)
# Should use OAuth, not Basic Auth
_assert_authentication_logs(caplog, "oauth", ["confluence", "jira"])
assert "Basic Authentication" not in caplog.text
def test_mixed_service_configuration(self, caplog):
"""Test mixed configurations where only one service is configured."""
with MockEnvironment.clean_env():
import os
os.environ["CONFLUENCE_URL"] = "https://company.atlassian.net"
os.environ["CONFLUENCE_USERNAME"] = "[email protected]"
os.environ["CONFLUENCE_API_TOKEN"] = "api_token"
result = get_available_services()
_assert_service_availability(
result, confluence_expected=True, jira_expected=False
)
_assert_authentication_logs(caplog, "cloud_basic", ["confluence"])
_assert_authentication_logs(caplog, "not_configured", ["jira"])
def test_return_value_structure(self):
"""Test that the return value has the correct structure."""
with MockEnvironment.clean_env():
result = get_available_services()
assert isinstance(result, dict)
assert set(result.keys()) == {"confluence", "jira"}
assert all(isinstance(v, bool) for v in result.values())
@pytest.mark.parametrize(
"invalid_vars",
[
{"CONFLUENCE_URL": "", "JIRA_URL": ""}, # Empty strings
{"confluence_url": "https://test.com"}, # Wrong case
],
)
def test_invalid_environment_variables(self, invalid_vars, caplog):
"""Test behavior with invalid environment variables."""
with MockEnvironment.clean_env():
for key, value in invalid_vars.items():
import os
os.environ[key] = value
result = get_available_services()
_assert_service_availability(
result, confluence_expected=False, jira_expected=False
)
_assert_authentication_logs(
caplog, "not_configured", ["confluence", "jira"]
)
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_comments.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira Comments mixin."""
from unittest.mock import MagicMock, Mock
import pytest
from mcp_atlassian.jira.comments import CommentsMixin
class TestCommentsMixin:
"""Tests for the CommentsMixin class."""
@pytest.fixture
def comments_mixin(self, jira_client):
"""Create a CommentsMixin instance with mocked dependencies."""
mixin = CommentsMixin(config=jira_client.config)
mixin.jira = jira_client.jira
# Set up a mock preprocessor with markdown_to_jira method
mixin.preprocessor = Mock()
mixin.preprocessor.markdown_to_jira = Mock(
return_value="*This* is _Jira_ formatted"
)
# Mock the clean_text method
mixin._clean_text = Mock(side_effect=lambda x: x)
return mixin
def test_get_issue_comments_basic(self, comments_mixin):
"""Test get_issue_comments with basic data."""
# Setup mock response
comments_mixin.jira.issue_get_comments.return_value = {
"comments": [
{
"id": "10001",
"body": "This is a comment",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
"author": {"displayName": "John Doe"},
}
]
}
# Call the method
result = comments_mixin.get_issue_comments("TEST-123")
# Verify
comments_mixin.jira.issue_get_comments.assert_called_once_with("TEST-123")
assert len(result) == 1
assert result[0]["id"] == "10001"
assert result[0]["body"] == "This is a comment"
assert result[0]["created"] == "2024-01-01 10:00:00+00:00" # Parsed date
assert result[0]["author"] == "John Doe"
def test_get_issue_comments_with_limit(self, comments_mixin):
"""Test get_issue_comments with limit parameter."""
# Setup mock response with multiple comments
comments_mixin.jira.issue_get_comments.return_value = {
"comments": [
{
"id": "10001",
"body": "First comment",
"created": "2024-01-01T10:00:00.000+0000",
"author": {"displayName": "John Doe"},
},
{
"id": "10002",
"body": "Second comment",
"created": "2024-01-02T10:00:00.000+0000",
"author": {"displayName": "Jane Smith"},
},
{
"id": "10003",
"body": "Third comment",
"created": "2024-01-03T10:00:00.000+0000",
"author": {"displayName": "Bob Johnson"},
},
]
}
# Call the method with limit=2
result = comments_mixin.get_issue_comments("TEST-123", limit=2)
# Verify
comments_mixin.jira.issue_get_comments.assert_called_once_with("TEST-123")
assert len(result) == 2 # Only 2 comments should be returned
assert result[0]["id"] == "10001"
assert result[1]["id"] == "10002"
# Third comment shouldn't be included due to limit
def test_get_issue_comments_with_missing_fields(self, comments_mixin):
"""Test get_issue_comments with missing fields in the response."""
# Setup mock response with missing fields
comments_mixin.jira.issue_get_comments.return_value = {
"comments": [
{
"id": "10001",
# Missing body field
"created": "2024-01-01T10:00:00.000+0000",
# Missing author field
},
{
# Missing id field
"body": "Second comment",
# Missing created field
"author": {}, # Empty author object
},
{
"id": "10003",
"body": "Third comment",
"created": "2024-01-03T10:00:00.000+0000",
"author": {"name": "user123"}, # Using name instead of displayName
},
]
}
# Call the method
result = comments_mixin.get_issue_comments("TEST-123")
# Verify
assert len(result) == 3
assert result[0]["id"] == "10001"
assert result[0]["body"] == "" # Should default to empty string
assert result[0]["author"] == "Unknown" # Should default to Unknown
assert (
"id" not in result[1] or not result[1]["id"]
) # Should be missing or empty
assert result[1]["author"] == "Unknown" # Should default to Unknown
assert (
result[2]["author"] == "Unknown"
) # Should use Unknown when only name is available
def test_get_issue_comments_with_empty_response(self, comments_mixin):
"""Test get_issue_comments with an empty response."""
# Setup mock response with no comments
comments_mixin.jira.issue_get_comments.return_value = {"comments": []}
# Call the method
result = comments_mixin.get_issue_comments("TEST-123")
# Verify
assert len(result) == 0 # Should return an empty list
def test_get_issue_comments_with_error(self, comments_mixin):
"""Test get_issue_comments with an error response."""
# Setup mock to raise exception
comments_mixin.jira.issue_get_comments.side_effect = Exception("API Error")
# Verify it raises the wrapped exception
with pytest.raises(Exception, match="Error getting comments"):
comments_mixin.get_issue_comments("TEST-123")
def test_add_comment_basic(self, comments_mixin):
"""Test add_comment with basic data."""
# Setup mock response
comments_mixin.jira.issue_add_comment.return_value = {
"id": "10001",
"body": "This is a comment",
"created": "2024-01-01T10:00:00.000+0000",
"author": {"displayName": "John Doe"},
}
# Call the method
result = comments_mixin.add_comment("TEST-123", "Test comment")
# Verify
comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
"Test comment"
)
comments_mixin.jira.issue_add_comment.assert_called_once_with(
"TEST-123", "*This* is _Jira_ formatted"
)
assert result["id"] == "10001"
assert result["body"] == "This is a comment"
assert result["created"] == "2024-01-01 10:00:00+00:00" # Parsed date
assert result["author"] == "John Doe"
def test_add_comment_with_markdown_conversion(self, comments_mixin):
"""Test add_comment with markdown conversion."""
# Setup mock response
comments_mixin.jira.issue_add_comment.return_value = {
"id": "10001",
"body": "*This* is _Jira_ formatted",
"created": "2024-01-01T10:00:00.000+0000",
"author": {"displayName": "John Doe"},
}
# Create a complex markdown comment
markdown_comment = """
# Heading 1
This is a paragraph with **bold** and *italic* text.
- List item 1
- List item 2
```python
def hello():
print("Hello world")
```
"""
# Call the method
result = comments_mixin.add_comment("TEST-123", markdown_comment)
# Verify
comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
markdown_comment
)
comments_mixin.jira.issue_add_comment.assert_called_once_with(
"TEST-123", "*This* is _Jira_ formatted"
)
assert result["body"] == "*This* is _Jira_ formatted"
def test_add_comment_with_empty_comment(self, comments_mixin):
"""Test add_comment with an empty comment."""
# Setup mock response
comments_mixin.jira.issue_add_comment.return_value = {
"id": "10001",
"body": "",
"created": "2024-01-01T10:00:00.000+0000",
"author": {"displayName": "John Doe"},
}
# Call the method with empty comment
result = comments_mixin.add_comment("TEST-123", "")
# Verify - for empty comments, markdown_to_jira should NOT be called as per implementation
comments_mixin.preprocessor.markdown_to_jira.assert_not_called()
comments_mixin.jira.issue_add_comment.assert_called_once_with("TEST-123", "")
assert result["body"] == ""
def test_add_comment_with_error(self, comments_mixin):
"""Test add_comment with an error response."""
# Setup mock to raise exception
comments_mixin.jira.issue_add_comment.side_effect = Exception("API Error")
# Verify it raises the wrapped exception
with pytest.raises(Exception, match="Error adding comment"):
comments_mixin.add_comment("TEST-123", "Test comment")
def test_markdown_to_jira(self, comments_mixin):
"""Test markdown to Jira conversion."""
# Setup - need to replace the mock entirely
comments_mixin.preprocessor.markdown_to_jira = MagicMock(
return_value="Jira text"
)
# Call the method
result = comments_mixin._markdown_to_jira("Markdown text")
# Verify
assert result == "Jira text"
comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
"Markdown text"
)
def test_markdown_to_jira_with_empty_text(self, comments_mixin):
"""Test markdown to Jira conversion with empty text."""
# Call the method with empty text
result = comments_mixin._markdown_to_jira("")
# Verify
assert result == ""
# The preprocessor should not be called with empty text
comments_mixin.preprocessor.markdown_to_jira.assert_not_called()
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""
Root pytest configuration file for MCP Atlassian tests.
This module provides session-scoped fixtures and utilities that are shared
across all test modules. It integrates with the new test utilities framework
to provide efficient, reusable test fixtures.
"""
import pytest
from tests.utils.factories import (
AuthConfigFactory,
ConfluencePageFactory,
ErrorResponseFactory,
JiraIssueFactory,
)
from tests.utils.mocks import MockAtlassianClient, MockEnvironment
def pytest_addoption(parser):
"""Add command-line options for tests."""
parser.addoption(
"--use-real-data",
action="store_true",
default=False,
help="Run tests that use real API data (requires env vars)",
)
# ============================================================================
# Session-Scoped Configuration Fixtures
# ============================================================================
@pytest.fixture(scope="session")
def session_auth_configs():
"""
Session-scoped fixture providing authentication configuration templates.
This fixture is computed once per test session and provides standard
authentication configurations for OAuth and basic auth scenarios.
Returns:
Dict[str, Dict[str, str]]: Authentication configuration templates
"""
return {
"oauth": AuthConfigFactory.create_oauth_config(),
"basic_auth": AuthConfigFactory.create_basic_auth_config(),
"jira_basic": {
"url": "https://test.atlassian.net",
"username": "[email protected]",
"api_token": "test-jira-token",
},
"confluence_basic": {
"url": "https://test.atlassian.net/wiki",
"username": "[email protected]",
"api_token": "test-confluence-token",
},
}
@pytest.fixture(scope="session")
def session_mock_data():
"""
Session-scoped fixture providing mock data templates.
This fixture creates mock data templates once per session to avoid
recreating expensive mock objects for every test.
Returns:
Dict[str, Any]: Mock data templates for various API responses
"""
return {
"jira_issue": JiraIssueFactory.create(),
"jira_issue_minimal": JiraIssueFactory.create_minimal(),
"confluence_page": ConfluencePageFactory.create(),
"api_error": ErrorResponseFactory.create_api_error(),
"auth_error": ErrorResponseFactory.create_auth_error(),
"jira_search_results": {
"issues": [
JiraIssueFactory.create("TEST-1"),
JiraIssueFactory.create("TEST-2"),
JiraIssueFactory.create("TEST-3"),
],
"total": 3,
"startAt": 0,
"maxResults": 50,
},
}
# ============================================================================
# Environment and Configuration Fixtures
# ============================================================================
@pytest.fixture
def clean_environment():
"""
Fixture that provides a clean environment with no auth variables.
This is useful for testing error conditions and configuration validation.
"""
with MockEnvironment.clean_env() as env:
yield env
@pytest.fixture
def oauth_environment():
"""
Fixture that provides a complete OAuth environment setup.
This sets up all necessary OAuth environment variables for testing
OAuth-based authentication flows.
"""
with MockEnvironment.oauth_env() as env:
yield env
@pytest.fixture
def basic_auth_environment():
"""
Fixture that provides basic authentication environment setup.
This sets up username/token authentication for both Jira and Confluence.
"""
with MockEnvironment.basic_auth_env() as env:
yield env
# ============================================================================
# Factory-Based Fixtures
# ============================================================================
@pytest.fixture
def make_jira_issue():
"""
Factory fixture for creating Jira issues with customizable properties.
Returns:
Callable: Factory function that creates Jira issue data
Example:
def test_issue_creation(make_jira_issue):
issue = make_jira_issue(key="CUSTOM-123",
fields={"priority": {"name": "High"}})
assert issue["key"] == "CUSTOM-123"
"""
return JiraIssueFactory.create
@pytest.fixture
def make_confluence_page():
"""
Factory fixture for creating Confluence pages with customizable properties.
Returns:
Callable: Factory function that creates Confluence page data
Example:
def test_page_creation(make_confluence_page):
page = make_confluence_page(title="Custom Page",
space={"key": "CUSTOM"})
assert page["title"] == "Custom Page"
"""
return ConfluencePageFactory.create
@pytest.fixture
def make_auth_config():
"""
Factory fixture for creating authentication configurations.
Returns:
Dict[str, Callable]: Factory functions for different auth types
Example:
def test_oauth_config(make_auth_config):
config = make_auth_config["oauth"](client_id="custom-id")
assert config["client_id"] == "custom-id"
"""
return {
"oauth": AuthConfigFactory.create_oauth_config,
"basic": AuthConfigFactory.create_basic_auth_config,
}
@pytest.fixture
def make_api_error():
"""
Factory fixture for creating API error responses.
Returns:
Callable: Factory function that creates error response data
Example:
def test_error_handling(make_api_error):
error = make_api_error(status_code=404, message="Not Found")
assert error["status"] == 404
"""
return ErrorResponseFactory.create_api_error
# ============================================================================
# Mock Client Fixtures
# ============================================================================
@pytest.fixture
def mock_jira_client():
"""
Fixture providing a pre-configured mock Jira client.
The client comes with sensible defaults for common operations
but can be customized per test as needed.
Returns:
MagicMock: Configured mock Jira client
"""
return MockAtlassianClient.create_jira_client()
@pytest.fixture
def mock_confluence_client():
"""
Fixture providing a pre-configured mock Confluence client.
The client comes with sensible defaults for common operations
but can be customized per test as needed.
Returns:
MagicMock: Configured mock Confluence client
"""
return MockAtlassianClient.create_confluence_client()
# ============================================================================
# Compatibility Fixtures (maintain backward compatibility)
# ============================================================================
@pytest.fixture
def use_real_jira_data(request):
"""
Check if real Jira data tests should be run.
This will be True if the --use-real-data flag is passed to pytest.
Note: This fixture is maintained for backward compatibility.
"""
return request.config.getoption("--use-real-data")
@pytest.fixture
def use_real_confluence_data(request):
"""
Check if real Confluence data tests should be run.
This will be True if the --use-real-data flag is passed to pytest.
Note: This fixture is maintained for backward compatibility.
"""
return request.config.getoption("--use-real-data")
# ============================================================================
# Advanced Environment Utilities
# ============================================================================
@pytest.fixture
def env_var_manager():
"""
Fixture providing utilities for managing environment variables in tests.
Returns:
MockEnvironment: Environment management utilities
Example:
def test_with_custom_env(env_var_manager):
with env_var_manager.oauth_env():
# Test OAuth functionality
pass
"""
return MockEnvironment
@pytest.fixture
def parametrized_auth_env(request):
"""
Parametrized fixture for testing with different authentication environments.
This fixture can be used with pytest.mark.parametrize to test the same
functionality with different authentication setups.
Example:
@pytest.mark.parametrize("parametrized_auth_env",
["oauth", "basic_auth"], indirect=True)
def test_auth_scenarios(parametrized_auth_env):
# Test will run once for OAuth and once for basic auth
pass
"""
auth_type = request.param
if auth_type == "oauth":
with MockEnvironment.oauth_env() as env:
yield env
elif auth_type == "basic_auth":
with MockEnvironment.basic_auth_env() as env:
yield env
elif auth_type == "clean":
with MockEnvironment.clean_env() as env:
yield env
else:
raise ValueError(f"Unknown auth type: {auth_type}")
# ============================================================================
# Session Validation and Health Checks
# ============================================================================
@pytest.fixture(scope="session", autouse=True)
def validate_test_environment():
"""
Session-scoped fixture that validates the test environment setup.
This fixture runs automatically and ensures that the test environment
is properly configured for running the test suite.
"""
# Validate that test utilities are importable
try:
import importlib.util
# Check if modules can be imported
for module_name in [
"tests.fixtures.confluence_mocks",
"tests.fixtures.jira_mocks",
"tests.utils.base",
"tests.utils.factories",
"tests.utils.mocks",
]:
spec = importlib.util.find_spec(module_name)
if spec is None:
pytest.fail(f"Failed to find module: {module_name}")
except ImportError as e:
pytest.fail(f"Failed to import test utilities: {e}")
# Log session start
print("\n🧪 Starting MCP Atlassian test session with enhanced fixtures")
yield
# Log session end
print("\n✅ Completed MCP Atlassian test session")
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/formatting.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira content formatting utilities."""
import html
import logging
import re
from typing import Any
from ..preprocessing.jira import JiraPreprocessor
from .client import JiraClient
from .protocols import (
EpicOperationsProto,
FieldsOperationsProto,
IssueOperationsProto,
UsersOperationsProto,
)
logger = logging.getLogger("mcp-jira")
class FormattingMixin(
JiraClient,
EpicOperationsProto,
FieldsOperationsProto,
IssueOperationsProto,
UsersOperationsProto,
):
"""Mixin for Jira content formatting operations.
This mixin provides utilities for converting between different formats,
formatting issue content for display, parsing dates, and sanitizing content.
"""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the FormattingMixin.
Args:
*args: Positional arguments for the JiraClient
**kwargs: Keyword arguments for the JiraClient
"""
super().__init__(*args, **kwargs)
# Use the JiraPreprocessor with the base URL from the client
base_url = ""
if hasattr(self, "config") and hasattr(self.config, "url"):
base_url = self.config.url
self.preprocessor = JiraPreprocessor(base_url=base_url)
def markdown_to_jira(self, markdown_text: str) -> str:
"""
Convert Markdown syntax to Jira markup syntax.
This method uses the TextPreprocessor implementation for consistent
conversion between Markdown and Jira markup.
Args:
markdown_text: Text in Markdown format
Returns:
Text in Jira markup format
"""
if not markdown_text:
return ""
try:
# Use the existing preprocessor
return self.preprocessor.markdown_to_jira(markdown_text)
except Exception as e:
logger.warning(f"Error converting markdown to Jira format: {str(e)}")
# Return the original text if conversion fails
return markdown_text
def format_issue_content(
self,
issue_key: str,
issue: dict[str, Any],
description: str,
comments: list[dict[str, Any]],
created_date: str,
epic_info: dict[str, str | None],
) -> str:
"""
Format the issue content for display.
Args:
issue_key: The issue key
issue: The issue data from Jira
description: Processed description text
comments: List of comment dictionaries
created_date: Formatted created date
epic_info: Dictionary with epic_key and epic_name
Returns:
Formatted content string
"""
# Basic issue information
content = f"""Issue: {issue_key}
Title: {issue["fields"].get("summary", "")}
Type: {issue["fields"]["issuetype"]["name"]}
Status: {issue["fields"]["status"]["name"]}
Created: {created_date}
"""
# Add Epic information if available
if epic_info.get("epic_key"):
content += f"Epic: {epic_info['epic_key']}"
if epic_info.get("epic_name"):
content += f" - {epic_info['epic_name']}"
content += "\n"
content += f"""
Description:
{description}
"""
# Add comments if present
if comments:
content += "\nComments:\n" + "\n".join(
[f"{c['created']} - {c['author']}: {c['body']}" for c in comments]
)
return content
def create_issue_metadata(
self,
issue_key: str,
issue: dict[str, Any],
comments: list[dict[str, Any]],
created_date: str,
epic_info: dict[str, str | None],
) -> dict[str, Any]:
"""
Create metadata for the issue document.
Args:
issue_key: The issue key
issue: The issue data from Jira
comments: List of comment dictionaries
created_date: Formatted created date
epic_info: Dictionary with epic_key and epic_name
Returns:
Metadata dictionary
"""
# Extract fields
fields = issue.get("fields", {})
# Basic metadata
metadata = {
"key": issue_key,
"summary": fields.get("summary", ""),
"type": fields.get("issuetype", {}).get("name", ""),
"status": fields.get("status", {}).get("name", ""),
"created": created_date,
"source": "jira",
}
# Add assignee if present
if fields.get("assignee"):
metadata["assignee"] = fields["assignee"].get(
"displayName", fields["assignee"].get("name", "")
)
# Add reporter if present
if fields.get("reporter"):
metadata["reporter"] = fields["reporter"].get(
"displayName", fields["reporter"].get("name", "")
)
# Add priority if present
if fields.get("priority"):
metadata["priority"] = fields["priority"].get("name", "")
# Add Epic information to metadata if available
if epic_info.get("epic_key"):
metadata["epic_key"] = epic_info["epic_key"]
if epic_info.get("epic_name"):
metadata["epic_name"] = epic_info["epic_name"]
# Add project information
if fields.get("project"):
metadata["project"] = fields["project"].get("key", "")
metadata["project_name"] = fields["project"].get("name", "")
# Add comment count
metadata["comment_count"] = len(comments)
return metadata
def extract_epic_information(
self, issue: dict[str, Any]
) -> dict[str, None] | dict[str, str]:
"""
Extract epic information from issue data.
Args:
issue: Issue data dictionary
Returns:
Dictionary containing epic_key and epic_name (or None if not found)
"""
epic_info = {"epic_key": None, "epic_name": None}
# Check if the issue has fields
if "fields" not in issue:
return epic_info
fields = issue["fields"]
# Try to get the epic link from issue
# (requires the correct field ID which varies across instances)
# Use the field discovery mechanism if available
try:
field_ids = self.get_field_ids_to_epic()
# Get the epic link field ID
epic_link_field = field_ids.get("Epic Link")
if (
epic_link_field
and epic_link_field in fields
and fields[epic_link_field]
):
epic_info["epic_key"] = fields[epic_link_field]
# If the issue is linked to an epic, try to get the epic name
if epic_info["epic_key"] and hasattr(self, "get_issue"):
try:
epic_issue = self.get_issue(epic_info["epic_key"])
epic_fields = epic_issue.get("fields", {})
# Get the epic name field ID
epic_name_field = field_ids.get("Epic Name")
if epic_name_field and epic_name_field in epic_fields:
epic_info["epic_name"] = epic_fields[epic_name_field]
except Exception as e:
logger.warning(f"Error getting epic details: {str(e)}")
except Exception as e:
logger.warning(f"Error extracting epic information: {str(e)}")
return epic_info
def sanitize_html(self, html_content: str) -> str:
"""
Sanitize HTML content by removing HTML tags.
Args:
html_content: HTML content to sanitize
Returns:
Plaintext content with HTML tags removed
"""
if not html_content:
return ""
try:
# Remove HTML tags
plain_text = re.sub(r"<[^>]+>", "", html_content)
# Decode HTML entities
plain_text = html.unescape(plain_text)
# Normalize whitespace
plain_text = re.sub(r"\s+", " ", plain_text).strip()
return plain_text
except Exception as e:
logger.warning(f"Error sanitizing HTML: {str(e)}")
return html_content
def sanitize_transition_fields(self, fields: dict[str, Any]) -> dict[str, Any]:
"""
Sanitize fields to ensure they're valid for the Jira API.
This is used for transition data to properly format field values.
Args:
fields: Dictionary of fields to sanitize
Returns:
Dictionary of sanitized fields
"""
sanitized_fields = {}
for key, value in fields.items():
# Skip empty values
if value is None:
continue
# Handle assignee field specially
if key in ["assignee", "reporter"]:
# If the value is already a dictionary, use it as is
if isinstance(value, dict) and "accountId" in value:
sanitized_fields[key] = value
else:
# Otherwise, look up the account ID
if not isinstance(value, str):
logger.warning(f"Invalid assignee value: {value}")
continue
try:
account_id = self._get_account_id(value)
if account_id:
sanitized_fields[key] = {"accountId": account_id}
except Exception as e:
logger.warning(
f"Error getting account ID for {value}: {str(e)}"
)
# All other fields pass through as is
else:
sanitized_fields[key] = value
return sanitized_fields
def add_comment_to_transition_data(
self, transition_data: dict[str, Any], comment: str | None
) -> dict[str, Any]:
"""
Add a comment to transition data.
Args:
transition_data: Transition data dictionary
comment: Comment text (in Markdown format) or None
Returns:
Updated transition data
"""
if not comment:
return transition_data
# Convert markdown to Jira format
jira_formatted_comment = self.markdown_to_jira(comment)
# Add the comment to the transition data
transition_data["update"] = {
"comment": [{"add": {"body": jira_formatted_comment}}]
}
return transition_data
```
--------------------------------------------------------------------------------
/tests/integration/test_transport_lifecycle.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for transport lifecycle behavior.
These tests ensure that:
1. No stdin monitoring is used (preventing issues #519 and #524)
2. Stdio transport doesn't conflict with MCP server's internal stdio handling
3. All transports use direct execution
4. Docker scenarios work correctly
"""
import asyncio
from io import StringIO
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from mcp_atlassian import main
from mcp_atlassian.utils.lifecycle import _shutdown_event
@pytest.mark.integration
class TestTransportLifecycleBehavior:
"""Test transport lifecycle behavior to prevent regression of issues #519 and #524."""
def setup_method(self):
"""Reset state before each test."""
_shutdown_event.clear()
def test_all_transports_use_direct_execution(self):
"""Verify all transports use direct execution without stdin monitoring.
This is a regression test to ensure stdin monitoring is never reintroduced,
which caused both issue #519 (stdio conflicts) and #524 (HTTP session termination).
"""
transports_to_test = ["stdio", "sse", "streamable-http"]
for transport in transports_to_test:
with patch("asyncio.run") as mock_asyncio_run:
with patch.dict("os.environ", {"TRANSPORT": transport}, clear=False):
with (
patch(
"mcp_atlassian.servers.main.AtlassianMCP"
) as mock_server_class,
patch("click.core.Context") as mock_click_ctx,
):
# Setup mocks
mock_server = MagicMock()
mock_server.run_async = AsyncMock()
mock_server_class.return_value = mock_server
# Mock CLI context
mock_ctx_instance = MagicMock()
mock_ctx_instance.obj = {
"transport": transport,
"port": None,
"host": None,
"path": None,
}
mock_click_ctx.return_value = mock_ctx_instance
# Execute main
with patch("sys.argv", ["mcp-atlassian"]):
try:
main()
except SystemExit:
pass
# Verify direct execution for all transports
assert mock_asyncio_run.called, (
f"asyncio.run not called for {transport}"
)
called_coro = mock_asyncio_run.call_args[0][0]
# Ensure NO stdin monitoring wrapper is used
coro_str = str(called_coro)
assert "run_with_stdio_monitoring" not in coro_str, (
f"{transport} should not use stdin monitoring"
)
assert "run_async" in coro_str or hasattr(
called_coro, "cr_code"
), f"{transport} should use direct run_async execution"
@pytest.mark.anyio
async def test_stdio_no_race_condition(self):
"""Test that stdio transport doesn't create race condition with MCP server.
After the fix, stdin monitoring has been removed completely, so there's
no possibility of race conditions between components trying to read stdin.
"""
# Create a mock stdin that tracks reads
read_count = 0
class MockStdin:
def __init__(self):
self.closed = False
self._read_lock = asyncio.Lock()
async def readline(self):
nonlocal read_count
async with self._read_lock:
if self.closed:
raise ValueError("I/O operation on closed file")
read_count += 1
return b"" # EOF
mock_stdin = MockStdin()
# Mock the server coroutine that reads stdin
async def mock_server_with_stdio(**kwargs):
"""Simulates MCP server reading from stdin."""
# MCP server would normally read stdin here
await mock_stdin.readline()
return "completed"
# Test direct server execution (current behavior)
with patch("sys.stdin", mock_stdin):
# Run server directly without any stdin monitoring
result = await mock_server_with_stdio()
# Should only have one read - from the MCP server itself
assert read_count == 1
assert result == "completed"
def test_main_function_transport_logic(self):
"""Test the main function's transport determination logic."""
test_cases = [
# (cli_transport, env_transport, expected_final_transport)
("stdio", None, "stdio"),
("sse", None, "sse"),
(None, "stdio", "stdio"),
(None, "sse", "sse"),
("stdio", "sse", "stdio"), # CLI overrides env
]
for cli_transport, env_transport, _expected_transport in test_cases:
with patch("asyncio.run") as mock_asyncio_run:
env_vars = {}
if env_transport:
env_vars["TRANSPORT"] = env_transport
with patch.dict("os.environ", env_vars, clear=False):
with (
patch(
"mcp_atlassian.servers.main.AtlassianMCP"
) as mock_server_class,
patch("click.core.Context") as mock_click_ctx,
):
# Setup mocks
mock_server = MagicMock()
mock_server.run_async = AsyncMock()
mock_server_class.return_value = mock_server
# Mock CLI context
mock_ctx_instance = MagicMock()
mock_ctx_instance.obj = {
"transport": cli_transport,
"port": None,
"host": None,
"path": None,
}
mock_click_ctx.return_value = mock_ctx_instance
# Run main
with patch("sys.argv", ["mcp-atlassian"]):
try:
main()
except SystemExit:
pass
# Verify asyncio.run was called
assert mock_asyncio_run.called
# All transports now run directly without stdin monitoring
called_coro = mock_asyncio_run.call_args[0][0]
# Should always call run_async directly
assert hasattr(called_coro, "cr_code") or "run_async" in str(
called_coro
)
@pytest.mark.anyio
async def test_shutdown_event_handling(self):
"""Test that shutdown events are handled correctly for all transports."""
# Pre-set shutdown event
_shutdown_event.set()
async def mock_server(**kwargs):
# Should run even with shutdown event set
return "completed"
# Server runs directly now
result = await mock_server()
# Server should complete normally
assert result == "completed"
def test_docker_stdio_scenario(self):
"""Test the specific Docker stdio scenario that caused the bug.
This simulates running in Docker with -i flag where stdin is available
but both components trying to read it causes conflicts.
"""
with patch("asyncio.run") as mock_asyncio_run:
# Simulate Docker environment variables
docker_env = {
"TRANSPORT": "stdio",
"JIRA_URL": "https://example.atlassian.net",
"JIRA_USERNAME": "[email protected]",
"JIRA_API_TOKEN": "token",
}
with patch.dict("os.environ", docker_env, clear=False):
with (
patch(
"mcp_atlassian.servers.main.AtlassianMCP"
) as mock_server_class,
patch("sys.stdin", StringIO()), # Simulate available stdin
):
# Setup mock server
mock_server = MagicMock()
mock_server.run_async = AsyncMock()
mock_server_class.return_value = mock_server
# Simulate Docker container startup
with patch("sys.argv", ["mcp-atlassian"]):
try:
main()
except SystemExit:
pass
# Verify stdio transport doesn't use lifecycle monitoring
assert mock_asyncio_run.called
called_coro = mock_asyncio_run.call_args[0][0]
# All transports now use run_async directly
assert hasattr(called_coro, "cr_code") or "run_async" in str(
called_coro
)
@pytest.mark.integration
class TestRegressionPrevention:
"""Tests to prevent regression of specific issues."""
def test_no_stdin_monitoring_in_codebase(self):
"""Ensure stdin monitoring is not reintroduced in the codebase.
This is a safeguard against reintroducing the flawed stdin monitoring
that caused issues #519 and #524.
"""
# Check that the problematic function doesn't exist
from mcp_atlassian.utils import lifecycle
assert not hasattr(lifecycle, "run_with_stdio_monitoring"), (
"run_with_stdio_monitoring should not exist in lifecycle module"
)
def test_signal_handlers_are_setup(self):
"""Verify signal handlers are properly configured."""
with patch("mcp_atlassian.setup_signal_handlers") as mock_setup:
with patch("asyncio.run"):
with patch("mcp_atlassian.servers.main.AtlassianMCP"):
with patch("sys.argv", ["mcp-atlassian"]):
try:
main()
except SystemExit:
pass
# Signal handlers should always be set up
mock_setup.assert_called_once()
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_sprints.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira SprintMixin"""
from unittest.mock import MagicMock
import pytest
import requests
from mcp_atlassian.jira import JiraConfig
from mcp_atlassian.jira.sprints import SprintsMixin
from mcp_atlassian.models.jira import JiraSprint
@pytest.fixture
def mock_config():
"""Fixture to create a mock JiraConfig instance."""
config = MagicMock(spec=JiraConfig)
config.url = "https://test.atlassian.net"
config.username = "[email protected]"
config.api_token = "test-token"
config.auth_type = "pat"
return config
@pytest.fixture
def sprints_mixin(mock_config):
"""Fixture to create a SprintsMixin instance for testing."""
mixin = SprintsMixin(config=mock_config)
mixin.jira = MagicMock()
return mixin
@pytest.fixture
def mock_sprints():
"""Fixture to return mock boards data."""
return {
"maxResults": 50,
"startAt": 0,
"isLast": True,
"values": [
{
"id": 10000,
"self": "https://test.atlassian.net/rest/agile/1.0/sprint/10000",
"state": "closed",
"name": "Sprint 0",
"startDate": "2099-05-06T02:00:00.000Z",
"endDate": "2100-05-17T10:00:00.000Z",
"completeDate": "2024-05-20T05:17:24.302Z",
"activatedDate": "2024-05-07T01:22:45.128Z",
"originBoardId": 1000,
"goal": "",
"synced": False,
"autoStartStop": False,
},
{
"id": 10001,
"self": "https://test.atlassian.net/rest/agile/1.0/sprint/10001",
"state": "active",
"name": "Sprint 1",
"startDate": "2099-03-24T06:13:00.000Z",
"endDate": "2100-04-07T06:13:00.000Z",
"activatedDate": "2025-03-24T06:13:20.729Z",
"originBoardId": 1000,
"goal": "",
"synced": False,
"autoStartStop": False,
},
{
"id": 10002,
"self": "https://test.atlassian.net/rest/agile/1.0/sprint/10002",
"state": "future",
"name": "Sprint 2",
"originBoardId": 1000,
"synced": False,
"autoStartStop": False,
},
],
}
def test_get_all_sprints_from_board(sprints_mixin, mock_sprints):
"""Test get_all_sprints_from_board method."""
sprints_mixin.jira.get_all_sprints_from_board.return_value = mock_sprints
result = sprints_mixin.get_all_sprints_from_board("1000")
assert result == mock_sprints["values"]
def test_get_all_sprints_from_board_exception(sprints_mixin):
"""Test get_all_sprints_from_board method with exception."""
sprints_mixin.jira.get_all_sprints_from_board.side_effect = Exception("API Error")
result = sprints_mixin.get_all_sprints_from_board("1000")
assert result == []
sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()
def test_get_all_sprints_from_board_http_error(sprints_mixin):
"""Test get_all_sprints_from_board method with HTTPError."""
sprints_mixin.jira.get_all_sprints_from_board.side_effect = requests.HTTPError(
response=MagicMock(content="API Error content")
)
result = sprints_mixin.get_all_sprints_from_board("1000")
assert result == []
sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()
def test_get_all_sprints_from_board_non_dict_response(sprints_mixin):
"""Test get_all_sprints_from_board method with non-list response."""
sprints_mixin.jira.get_all_sprints_from_board.return_value = "not a dict"
result = sprints_mixin.get_all_sprints_from_board("1000")
assert result == []
sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()
def test_get_all_sprints_from_board_model(sprints_mixin, mock_sprints):
sprints_mixin.jira.get_all_sprints_from_board.return_value = mock_sprints
result = sprints_mixin.get_all_sprints_from_board_model(board_id="1000", state=None)
assert result == [
JiraSprint.from_api_response(value) for value in mock_sprints["values"]
]
def test_create_sprint(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
result = sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="2099-05-01T00:00:00.000Z",
end_date="2100-05-01T00:00:00.000Z",
goal="Your goal",
)
assert result == JiraSprint.from_api_response(mock_sprints["values"][1])
def test_create_sprint_http_exception(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.side_effect = requests.HTTPError(
response=MagicMock(content="API Error content")
)
with pytest.raises(requests.HTTPError):
sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="2099-05-01T00:00:00.000Z",
end_date="2100-05-15T00:00:00.000Z",
goal="Your goal",
)
def test_create_sprint_exception(sprints_mixin, mock_sprints):
"""Test create_sprint method throws general Exception."""
sprints_mixin.jira.create_sprint.side_effect = Exception
with pytest.raises(Exception):
sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="2099-05-01T00:00:00.000Z",
end_date="2100-05-15T00:00:00.000Z",
goal="Your goal",
)
def test_create_sprint_test_missing_startdate(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
with pytest.raises(ValueError) as excinfo:
sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="",
end_date="2100-05-15T00:00:00.000Z",
goal="Your goal",
)
assert str(excinfo.value) == "Start date is required."
def test_create_sprint_test_invalid_startdate(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
with pytest.raises(ValueError):
sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="IAMNOTADATE!",
end_date="2100-05-15T00:00:00.000Z",
goal="Your goal",
)
def test_create_sprint_test_no_enddate(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
result = sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="2099-05-15T00:00:00.000Z",
end_date=None,
goal="Your goal",
)
assert result == JiraSprint.from_api_response(mock_sprints["values"][1])
def test_create_sprint_test_invalid_enddate(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
with pytest.raises(ValueError):
sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="2099-05-15T00:00:00.000Z",
end_date="IAMNOTADATE!",
goal="Your goal",
)
def test_create_sprint_test_startdate_after_enddate(sprints_mixin, mock_sprints):
"""Test create_sprint method."""
sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
with pytest.raises(ValueError, match="Start date must be before end date."):
sprints_mixin.create_sprint(
sprint_name="Sprint 1",
board_id="10001",
start_date="2100-05-15T00:00:00.000Z",
end_date="2099-05-15T00:00:00.000Z",
goal="Your goal",
)
def test_update_sprint_success(sprints_mixin, mock_sprints):
"""Test update_sprint method with valid data."""
mock_updated_sprint = mock_sprints["values"][0]
sprints_mixin.jira.update_partially_sprint.return_value = mock_updated_sprint
result = sprints_mixin.update_sprint(
sprint_id="10000",
sprint_name="Updated Sprint Name",
state="active",
start_date="2024-05-01T00:00:00.000Z",
end_date="2024-05-15T00:00:00.000Z",
goal="Updated goal",
)
assert result == JiraSprint.from_api_response(mock_updated_sprint)
sprints_mixin.jira.update_partially_sprint.assert_called_once_with(
sprint_id="10000",
data={
"name": "Updated Sprint Name",
"state": "active",
"startDate": "2024-05-01T00:00:00.000Z",
"endDate": "2024-05-15T00:00:00.000Z",
"goal": "Updated goal",
},
)
def test_update_sprint_invalid_state(sprints_mixin):
"""Test update_sprint method with invalid state."""
result = sprints_mixin.update_sprint(
sprint_id="10000",
sprint_name="Updated Sprint Name",
state="invalid_state",
start_date=None,
end_date=None,
goal=None,
)
assert result is None
sprints_mixin.jira.update_partially_sprint.assert_not_called()
def test_update_sprint_missing_sprint_id(sprints_mixin):
"""Test update_sprint method with missing sprint_id."""
result = sprints_mixin.update_sprint(
sprint_id=None,
sprint_name="Updated Sprint Name",
state="active",
start_date=None,
end_date=None,
goal=None,
)
assert result is None
sprints_mixin.jira.update_partially_sprint.assert_not_called()
def test_update_sprint_http_error(sprints_mixin):
"""Test update_sprint method with HTTPError."""
sprints_mixin.jira.update_partially_sprint.side_effect = requests.HTTPError(
response=MagicMock(content="API Error content")
)
result = sprints_mixin.update_sprint(
sprint_id="10000",
sprint_name="Updated Sprint Name",
state="active",
start_date=None,
end_date=None,
goal=None,
)
assert result is None
sprints_mixin.jira.update_partially_sprint.assert_called_once()
def test_update_sprint_exception(sprints_mixin):
"""Test update_sprint method with a generic exception."""
sprints_mixin.jira.update_partially_sprint.side_effect = Exception(
"Unexpected Error"
)
result = sprints_mixin.update_sprint(
sprint_id="10000",
sprint_name="Updated Sprint Name",
state="active",
start_date=None,
end_date=None,
goal=None,
)
assert result is None
sprints_mixin.jira.update_partially_sprint.assert_called_once()
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/links.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira issue link operations."""
import logging
from typing import Any
from requests.exceptions import HTTPError
from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraIssueLinkType
from .client import JiraClient
logger = logging.getLogger("mcp-jira")
class LinksMixin(JiraClient):
"""Mixin for Jira issue link operations."""
def get_issue_link_types(self) -> list[JiraIssueLinkType]:
"""
Get all available issue link types.
Returns:
List of JiraIssueLinkType objects
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the Jira API
(401/403)
Exception: If there is an error retrieving issue link types
"""
try:
link_types_response = self.jira.get("rest/api/2/issueLinkType")
if not isinstance(link_types_response, dict):
msg = f"Unexpected return value type from `jira.get`: {type(link_types_response)}"
logger.error(msg)
raise TypeError(msg)
link_types_data = link_types_response.get("issueLinkTypes", [])
link_types = [
JiraIssueLinkType.from_api_response(link_type)
for link_type in link_types_data
]
return link_types
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API "
f"({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
raise Exception(
f"Error getting issue link types: {http_err}"
) from http_err
except Exception as e:
error_msg = str(e)
logger.error(f"Error getting issue link types: {error_msg}", exc_info=True)
raise Exception(f"Error getting issue link types: {error_msg}") from e
def create_issue_link(self, data: dict[str, Any]) -> dict[str, Any]:
"""
Create a link between two issues.
Args:
data: A dictionary containing the link data with the following structure:
{
"type": {"name": "Duplicate" }, # Link type name (e.g., "Duplicate", "Blocks", "Relates to")
"inwardIssue": { "key": "ISSUE-1"}, # The issue that is the source of the link
"outwardIssue": {"key": "ISSUE-2"}, # The issue that is the target of the link
"comment": { # Optional comment to add to the link
"body": "Linked related issue!",
"visibility": { # Optional visibility settings
"type": "group",
"value": "jira-software-users"
}
}
}
Returns:
Dictionary with the created link information
Raises:
ValueError: If required fields are missing
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error creating the issue link
"""
# Validate required fields
if not data.get("type"):
raise ValueError("Link type is required")
if not data.get("inwardIssue") or not data["inwardIssue"].get("key"):
raise ValueError("Inward issue key is required")
if not data.get("outwardIssue") or not data["outwardIssue"].get("key"):
raise ValueError("Outward issue key is required")
try:
# Create the issue link
self.jira.create_issue_link(data)
# Return a response with the link information
response = {
"success": True,
"message": f"Link created between {data['inwardIssue']['key']} and {data['outwardIssue']['key']}",
"link_type": data["type"]["name"],
"inward_issue": data["inwardIssue"]["key"],
"outward_issue": data["outwardIssue"]["key"],
}
return response
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API "
f"({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
raise Exception(f"Error creating issue link: {http_err}") from http_err
except Exception as e:
error_msg = str(e)
logger.error(f"Error creating issue link: {error_msg}", exc_info=True)
raise Exception(f"Error creating issue link: {error_msg}") from e
def create_remote_issue_link(
self, issue_key: str, link_data: dict[str, Any]
) -> dict[str, Any]:
"""
Create a remote issue link (web link or Confluence link) for an issue.
Args:
issue_key: The key of the issue to add the link to (e.g., 'PROJ-123')
link_data: A dictionary containing the remote link data with the following structure:
{
"object": {
"url": "https://example.com/page", # The URL to link to
"title": "Example Page", # The title/name of the link
"summary": "Optional description of the link", # Optional description
"icon": { # Optional icon configuration
"url16x16": "https://example.com/icon16.png",
"title": "Icon Title"
}
},
"relationship": "causes" # Optional relationship description
}
Returns:
Dictionary with the created remote link information
Raises:
ValueError: If required fields are missing
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error creating the remote issue link
"""
# Validate required fields
if not issue_key:
raise ValueError("Issue key is required")
if not link_data.get("object"):
raise ValueError("Link object is required")
if not link_data["object"].get("url"):
raise ValueError("URL is required in link object")
if not link_data["object"].get("title"):
raise ValueError("Title is required in link object")
try:
# Create the remote issue link using the Jira API
endpoint = f"rest/api/3/issue/{issue_key}/remotelink"
response = self.jira.post(endpoint, json=link_data)
# Return a response with the link information
result = {
"success": True,
"message": f"Remote link created for issue {issue_key}",
"issue_key": issue_key,
"link_title": link_data["object"]["title"],
"link_url": link_data["object"]["url"],
"relationship": link_data.get("relationship", ""),
}
return result
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API "
f"({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
raise Exception(
f"Error creating remote issue link: {http_err}"
) from http_err
except Exception as e:
error_msg = str(e)
logger.error(
f"Error creating remote issue link: {error_msg}", exc_info=True
)
raise Exception(f"Error creating remote issue link: {error_msg}") from e
def remove_issue_link(self, link_id: str) -> dict[str, Any]:
"""
Remove a link between two issues.
Args:
link_id: The ID of the link to remove
Returns:
Dictionary with the result of the operation
Raises:
ValueError: If link_id is empty
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error removing the issue link
"""
# Validate input
if not link_id:
raise ValueError("Link ID is required")
try:
# Remove the issue link
self.jira.remove_issue_link(link_id)
# Return a response indicating success
response = {
"success": True,
"message": f"Link with ID {link_id} has been removed",
"link_id": link_id,
}
return response
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API "
f"({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
raise Exception(f"Error removing issue link: {http_err}") from http_err
except Exception as e:
error_msg = str(e)
logger.error(f"Error removing issue link: {error_msg}", exc_info=True)
raise Exception(f"Error removing issue link: {error_msg}") from e
```
--------------------------------------------------------------------------------
/tests/unit/servers/test_context.py:
--------------------------------------------------------------------------------
```python
"""Tests for the server context module."""
import pytest
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.servers.context import MainAppContext
class TestMainAppContext:
"""Tests for the MainAppContext dataclass."""
def test_initialization_with_defaults(self):
"""Test MainAppContext initialization with default values."""
context = MainAppContext()
assert context.full_jira_config is None
assert context.full_confluence_config is None
assert context.read_only is False
assert context.enabled_tools is None
def test_initialization_with_all_parameters(self):
"""Test MainAppContext initialization with all parameters provided."""
# Arrange
jira_config = JiraConfig(
url="https://example.atlassian.net",
auth_type="basic",
username="[email protected]",
api_token="test_token",
)
confluence_config = ConfluenceConfig(
url="https://example.atlassian.net/wiki",
auth_type="basic",
username="[email protected]",
api_token="test_token",
)
enabled_tools = ["jira_get_issue", "confluence_get_page"]
# Act
context = MainAppContext(
full_jira_config=jira_config,
full_confluence_config=confluence_config,
read_only=True,
enabled_tools=enabled_tools,
)
# Assert
assert context.full_jira_config is jira_config
assert context.full_confluence_config is confluence_config
assert context.read_only is True
assert context.enabled_tools == enabled_tools
def test_initialization_with_partial_parameters(self):
"""Test MainAppContext initialization with some parameters provided."""
# Arrange
jira_config = JiraConfig(
url="https://example.atlassian.net",
auth_type="pat",
personal_token="test_personal_token",
)
# Act
context = MainAppContext(full_jira_config=jira_config, read_only=True)
# Assert
assert context.full_jira_config is jira_config
assert context.full_confluence_config is None
assert context.read_only is True
assert context.enabled_tools is None
def test_frozen_dataclass_behavior(self):
"""Test that MainAppContext is frozen and immutable."""
# Arrange
context = MainAppContext(read_only=False)
# Act & Assert - should raise FrozenInstanceError when trying to modify
with pytest.raises(AttributeError):
context.read_only = True
with pytest.raises(AttributeError):
context.full_jira_config = JiraConfig(
url="https://test.com",
auth_type="basic",
username="test",
api_token="token",
)
def test_type_hint_compliance_jira_config(self):
"""Test type hint compliance for JiraConfig field."""
# Test with None
context = MainAppContext(full_jira_config=None)
assert context.full_jira_config is None
# Test with valid JiraConfig
jira_config = JiraConfig(
url="https://jira.example.com", auth_type="pat", personal_token="test_token"
)
context = MainAppContext(full_jira_config=jira_config)
assert isinstance(context.full_jira_config, JiraConfig)
assert context.full_jira_config.url == "https://jira.example.com"
def test_type_hint_compliance_confluence_config(self):
"""Test type hint compliance for ConfluenceConfig field."""
# Test with None
context = MainAppContext(full_confluence_config=None)
assert context.full_confluence_config is None
# Test with valid ConfluenceConfig
confluence_config = ConfluenceConfig(
url="https://confluence.example.com",
auth_type="pat",
username="[email protected]",
api_token="test_token",
)
context = MainAppContext(full_confluence_config=confluence_config)
assert isinstance(context.full_confluence_config, ConfluenceConfig)
assert context.full_confluence_config.url == "https://confluence.example.com"
def test_enabled_tools_field_validation(self):
"""Test enabled_tools field validation and default handling."""
# Test with None (default)
context = MainAppContext()
assert context.enabled_tools is None
# Test with empty list
context = MainAppContext(enabled_tools=[])
assert context.enabled_tools == []
# Test with list of strings
tools = ["jira_create_issue", "confluence_create_page", "jira_search_issues"]
context = MainAppContext(enabled_tools=tools)
assert context.enabled_tools == tools
assert len(context.enabled_tools) == 3
def test_read_only_field_validation(self):
"""Test read_only field validation and default handling."""
# Test default value
context = MainAppContext()
assert context.read_only is False
assert isinstance(context.read_only, bool)
# Test explicit True
context = MainAppContext(read_only=True)
assert context.read_only is True
assert isinstance(context.read_only, bool)
# Test explicit False
context = MainAppContext(read_only=False)
assert context.read_only is False
assert isinstance(context.read_only, bool)
def test_string_representation(self):
"""Test the string representation of MainAppContext."""
# Test with default values
context = MainAppContext()
str_repr = str(context)
assert "MainAppContext" in str_repr
assert "full_jira_config=None" in str_repr
assert "full_confluence_config=None" in str_repr
assert "read_only=False" in str_repr
assert "enabled_tools=None" in str_repr
# Test with values provided
jira_config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test",
api_token="token",
)
context = MainAppContext(
full_jira_config=jira_config,
read_only=True,
enabled_tools=["tool1", "tool2"],
)
str_repr = str(context)
assert "MainAppContext" in str_repr
assert "read_only=True" in str_repr
assert "enabled_tools=['tool1', 'tool2']" in str_repr
def test_equality_comparison(self):
"""Test equality comparison between MainAppContext instances."""
# Test identical instances
context1 = MainAppContext()
context2 = MainAppContext()
assert context1 == context2
# Test instances with same values
jira_config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test",
api_token="token",
)
context1 = MainAppContext(
full_jira_config=jira_config, read_only=True, enabled_tools=["tool1"]
)
context2 = MainAppContext(
full_jira_config=jira_config, read_only=True, enabled_tools=["tool1"]
)
assert context1 == context2
# Test instances with different values
context3 = MainAppContext(read_only=False)
context4 = MainAppContext(read_only=True)
assert context3 != context4
# Test with different configs
different_jira_config = JiraConfig(
url="https://different.atlassian.net",
auth_type="basic",
username="different",
api_token="different_token",
)
context5 = MainAppContext(full_jira_config=jira_config)
context6 = MainAppContext(full_jira_config=different_jira_config)
assert context5 != context6
def test_hash_behavior(self):
"""Test hash behavior for MainAppContext instances."""
# Test that instances with only hashable fields (None configs, no enabled_tools) can be hashed
context1 = MainAppContext(read_only=True)
context2 = MainAppContext(read_only=True)
assert hash(context1) == hash(context2)
# Test instances with different hashable values
context3 = MainAppContext(read_only=False)
context4 = MainAppContext(read_only=True)
contexts_dict = {context3: "value3", context4: "value4"}
assert len(contexts_dict) == 2
# Test that instances with unhashable fields raise TypeError
jira_config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="test",
api_token="token",
)
context_with_config = MainAppContext(full_jira_config=jira_config)
with pytest.raises(TypeError, match="unhashable type"):
hash(context_with_config)
# Test that instances with list fields raise TypeError
context_with_list = MainAppContext(enabled_tools=["tool1", "tool2"])
with pytest.raises(TypeError, match="unhashable type"):
hash(context_with_list)
def test_field_access_edge_cases(self):
"""Test edge cases for field access."""
# Test accessing fields on empty context
context = MainAppContext()
# All fields should be accessible
assert hasattr(context, "full_jira_config")
assert hasattr(context, "full_confluence_config")
assert hasattr(context, "read_only")
assert hasattr(context, "enabled_tools")
# Test that we can't access non-existent fields
assert not hasattr(context, "non_existent_field")
def test_with_both_configs_different_auth_types(self):
"""Test MainAppContext with both Jira and Confluence configs using different auth types."""
# Arrange
jira_config = JiraConfig(
url="https://company.atlassian.net",
auth_type="basic",
username="[email protected]",
api_token="jira_token",
)
confluence_config = ConfluenceConfig(
url="https://company.atlassian.net/wiki",
auth_type="oauth",
oauth_config=None, # Simplified for test
)
# Act
context = MainAppContext(
full_jira_config=jira_config,
full_confluence_config=confluence_config,
read_only=True,
enabled_tools=[
"jira_get_issue",
"confluence_get_page",
"jira_create_issue",
],
)
# Assert
assert context.full_jira_config.auth_type == "basic"
assert context.full_confluence_config.auth_type == "oauth"
assert context.read_only is True
assert len(context.enabled_tools) == 3
assert "jira_get_issue" in context.enabled_tools
assert "confluence_get_page" in context.enabled_tools
assert "jira_create_issue" in context.enabled_tools
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_issues_markdown.py:
--------------------------------------------------------------------------------
```python
"""Tests for markdown conversion in Jira issue operations."""
from unittest.mock import MagicMock, Mock
import pytest
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.issues import IssuesMixin
class TestIssuesMarkdownConversion:
"""Tests for markdown to Jira conversion in issue operations."""
@pytest.fixture
def issues_mixin(self, jira_fetcher: JiraFetcher) -> IssuesMixin:
"""Create an IssuesMixin instance with mocked dependencies."""
mixin = jira_fetcher
# Mock the markdown conversion method
mixin._markdown_to_jira = Mock(side_effect=lambda x: f"[CONVERTED] {x}")
# Add other mock methods
mixin._get_account_id = MagicMock(return_value="test-account-id")
return mixin
def test_create_issue_converts_markdown_description(
self, issues_mixin: IssuesMixin
):
"""Test that create_issue converts markdown description to Jira format."""
# Mock create_issue response
create_response = {"key": "TEST-123"}
issues_mixin.jira.create_issue.return_value = create_response
# Mock get_issue response
issue_data = {
"id": "12345",
"key": "TEST-123",
"fields": {
"summary": "Test Issue",
"description": "[CONVERTED] # Markdown Description",
"status": {"name": "Open"},
"issuetype": {"name": "Bug"},
},
}
issues_mixin.jira.get_issue.return_value = issue_data
# Create issue with markdown description
markdown_description = "# Markdown Description\n\nThis is **bold** text."
issue = issues_mixin.create_issue(
project_key="TEST",
summary="Test Issue",
issue_type="Bug",
description=markdown_description,
)
# Verify markdown conversion was called
issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
# Verify the converted description was passed to API
expected_fields = {
"project": {"key": "TEST"},
"summary": "Test Issue",
"issuetype": {"name": "Bug"},
"description": f"[CONVERTED] {markdown_description}",
}
issues_mixin.jira.create_issue.assert_called_once_with(fields=expected_fields)
# Verify result
assert issue.key == "TEST-123"
def test_create_issue_with_empty_description(self, issues_mixin: IssuesMixin):
"""Test that create_issue handles empty description correctly."""
# Mock create_issue response
create_response = {"key": "TEST-123"}
issues_mixin.jira.create_issue.return_value = create_response
# Mock get_issue response
issue_data = {
"id": "12345",
"key": "TEST-123",
"fields": {
"summary": "Test Issue",
"status": {"name": "Open"},
"issuetype": {"name": "Bug"},
},
}
issues_mixin.jira.get_issue.return_value = issue_data
# Create issue without description
issue = issues_mixin.create_issue(
project_key="TEST",
summary="Test Issue",
issue_type="Bug",
description="",
)
# Verify markdown conversion was not called (empty string)
issues_mixin._markdown_to_jira.assert_not_called()
# Verify no description field was added
call_args = issues_mixin.jira.create_issue.call_args[1]
assert "description" not in call_args["fields"]
# Verify result
assert issue.key == "TEST-123"
def test_update_issue_converts_markdown_in_fields(self, issues_mixin: IssuesMixin):
"""Test that update_issue converts markdown description when passed in fields dict."""
# Mock the issue data for get_issue
issue_data = {
"id": "12345",
"key": "TEST-123",
"fields": {
"summary": "Updated Issue",
"description": "[CONVERTED] # Updated Description",
"status": {"name": "In Progress"},
"issuetype": {"name": "Bug"},
},
}
issues_mixin.jira.get_issue.return_value = issue_data
# Update issue with markdown description in fields
markdown_description = "# Updated Description\n\nThis is *italic* text."
issue = issues_mixin.update_issue(
issue_key="TEST-123",
fields={"description": markdown_description, "summary": "Updated Issue"},
)
# Verify markdown conversion was called
issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
# Verify the converted description was passed to API
issues_mixin.jira.update_issue.assert_called_once_with(
issue_key="TEST-123",
update={
"fields": {
"description": f"[CONVERTED] {markdown_description}",
"summary": "Updated Issue",
}
},
)
# Verify result
assert issue.key == "TEST-123"
def test_update_issue_converts_markdown_in_kwargs(self, issues_mixin: IssuesMixin):
"""Test that update_issue converts markdown description when passed as kwarg."""
# Mock the issue data for get_issue
issue_data = {
"id": "12345",
"key": "TEST-123",
"fields": {
"summary": "Test Issue",
"description": "[CONVERTED] ## Updated via kwargs",
"status": {"name": "In Progress"},
"issuetype": {"name": "Bug"},
},
}
issues_mixin.jira.get_issue.return_value = issue_data
# Update issue with markdown description as kwarg
markdown_description = (
"## Updated via kwargs\n\nWith a [link](http://example.com)"
)
issue = issues_mixin.update_issue(
issue_key="TEST-123", description=markdown_description
)
# Verify markdown conversion was called
issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
# Verify the converted description was passed to API
issues_mixin.jira.update_issue.assert_called_once_with(
issue_key="TEST-123",
update={"fields": {"description": f"[CONVERTED] {markdown_description}"}},
)
# Verify result
assert issue.key == "TEST-123"
def test_update_issue_with_multiple_fields_including_description(
self, issues_mixin: IssuesMixin
):
"""Test update_issue with multiple fields including description."""
# Mock the issue data for get_issue
issue_data = {
"id": "12345",
"key": "TEST-123",
"fields": {
"summary": "Updated Summary",
"description": "[CONVERTED] Updated description",
"status": {"name": "In Progress"},
"issuetype": {"name": "Bug"},
"priority": {"name": "High"},
},
}
issues_mixin.jira.get_issue.return_value = issue_data
# Update issue with multiple fields
markdown_description = "Updated description with **emphasis**"
issue = issues_mixin.update_issue(
issue_key="TEST-123",
fields={
"summary": "Updated Summary",
"priority": {"name": "High"},
},
description=markdown_description, # As kwarg
)
# Verify markdown conversion was called
issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
# Verify all fields were updated correctly
expected_fields = {
"summary": "Updated Summary",
"priority": {"name": "High"},
"description": f"[CONVERTED] {markdown_description}",
}
issues_mixin.jira.update_issue.assert_called_once_with(
issue_key="TEST-123", update={"fields": expected_fields}
)
# Verify result
assert issue.key == "TEST-123"
def test_markdown_conversion_preserves_none_values(self, issues_mixin: IssuesMixin):
"""Test that None descriptions are not converted."""
# Reset the mock to check for actual None handling
issues_mixin._markdown_to_jira = Mock(
side_effect=lambda x: f"[CONVERTED] {x}" if x else ""
)
# Mock create response
create_response = {"key": "TEST-123"}
issues_mixin.jira.create_issue.return_value = create_response
# Mock get_issue response
issues_mixin.jira.get_issue.return_value = {
"id": "12345",
"key": "TEST-123",
"fields": {"summary": "Test", "issuetype": {"name": "Task"}},
}
# Create issue with None description (shouldn't add description field)
issues_mixin.create_issue(
project_key="TEST",
summary="Test Issue",
issue_type="Task",
# description not provided (defaults to "")
)
# Verify markdown conversion was not called
issues_mixin._markdown_to_jira.assert_not_called()
# Verify no description field was added
call_args = issues_mixin.jira.create_issue.call_args[1]
assert "description" not in call_args["fields"]
def test_create_issue_with_markdown_in_additional_fields(
self, issues_mixin: IssuesMixin
):
"""Test that descriptions in additional_fields are NOT converted (edge case)."""
# Mock field map for additional fields processing
issues_mixin._generate_field_map = Mock(
return_value={"mydescription": "customfield_10001"}
)
issues_mixin.get_field_by_id = Mock(
return_value={"name": "MyDescription", "schema": {"type": "string"}}
)
# Mock create response
create_response = {"key": "TEST-123"}
issues_mixin.jira.create_issue.return_value = create_response
issues_mixin.jira.get_issue.return_value = {
"id": "12345",
"key": "TEST-123",
"fields": {"summary": "Test", "issuetype": {"name": "Task"}},
}
# Create issue with a custom field that happens to be named 'description'
# This should NOT be converted as it's a different field
issues_mixin.create_issue(
project_key="TEST",
summary="Test Issue",
issue_type="Task",
description="# Main Description", # This SHOULD be converted
mydescription="# Custom Field Description", # This should NOT be converted
)
# Verify the main description was converted
calls = issues_mixin._markdown_to_jira.call_args_list
assert len(calls) == 1
assert calls[0][0][0] == "# Main Description"
# Verify fields
create_call = issues_mixin.jira.create_issue.call_args[1]["fields"]
assert create_call["description"] == "[CONVERTED] # Main Description"
# Custom field should not be converted
assert "customfield_10001" in create_call
assert create_call["customfield_10001"] == "# Custom Field Description"
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/client.py:
--------------------------------------------------------------------------------
```python
"""Base client module for Jira API interactions."""
import logging
import os
from typing import Any, Literal
from atlassian import Jira
from requests import Session
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.preprocessing import JiraPreprocessor
from mcp_atlassian.utils.logging import (
get_masked_session_headers,
log_config_param,
mask_sensitive,
)
from mcp_atlassian.utils.oauth import configure_oauth_session
from mcp_atlassian.utils.ssl import configure_ssl_verification
from .config import JiraConfig
# Configure logging
logger = logging.getLogger("mcp-jira")
class JiraClient:
"""Base client for Jira API interactions."""
_field_ids_cache: list[dict[str, Any]] | None
_current_user_account_id: str | None
config: JiraConfig
preprocessor: JiraPreprocessor
def __init__(self, config: JiraConfig | None = None) -> None:
"""Initialize the Jira client with configuration options.
Args:
config: Optional configuration object (will use env vars if not provided)
Raises:
ValueError: If configuration is invalid or required credentials are missing
MCPAtlassianAuthenticationError: If OAuth authentication fails
"""
# Load configuration from environment variables if not provided
self.config = config or JiraConfig.from_env()
# Initialize the Jira client based on auth type
if self.config.auth_type == "oauth":
if not self.config.oauth_config or not self.config.oauth_config.cloud_id:
error_msg = "OAuth authentication requires a valid cloud_id"
raise ValueError(error_msg)
# Create a session for OAuth
session = Session()
# Configure the session with OAuth authentication
if not configure_oauth_session(session, self.config.oauth_config):
error_msg = "Failed to configure OAuth session"
raise MCPAtlassianAuthenticationError(error_msg)
# The Jira API URL with OAuth is different
api_url = (
f"https://api.atlassian.com/ex/jira/{self.config.oauth_config.cloud_id}"
)
# Initialize Jira with the session
self.jira = Jira(
url=api_url,
session=session,
cloud=True, # OAuth is only for Cloud
verify_ssl=self.config.ssl_verify,
)
elif self.config.auth_type == "pat":
logger.debug(
f"Initializing Jira client with Token (PAT) auth. "
f"URL: {self.config.url}, "
f"Token (masked): {mask_sensitive(str(self.config.personal_token))}"
)
self.jira = Jira(
url=self.config.url,
token=self.config.personal_token,
cloud=self.config.is_cloud,
verify_ssl=self.config.ssl_verify,
)
else: # basic auth
logger.debug(
f"Initializing Jira client with Basic auth. "
f"URL: {self.config.url}, Username: {self.config.username}, "
f"API Token present: {bool(self.config.api_token)}, "
f"Is Cloud: {self.config.is_cloud}"
)
self.jira = Jira(
url=self.config.url,
username=self.config.username,
password=self.config.api_token,
cloud=self.config.is_cloud,
verify_ssl=self.config.ssl_verify,
)
logger.debug(
f"Jira client initialized. Session headers (Authorization masked): "
f"{get_masked_session_headers(dict(self.jira._session.headers))}"
)
# Configure SSL verification using the shared utility
configure_ssl_verification(
service_name="Jira",
url=self.config.url,
session=self.jira._session,
ssl_verify=self.config.ssl_verify,
)
# Proxy configuration
proxies = {}
if self.config.http_proxy:
proxies["http"] = self.config.http_proxy
if self.config.https_proxy:
proxies["https"] = self.config.https_proxy
if self.config.socks_proxy:
proxies["socks"] = self.config.socks_proxy
if proxies:
self.jira._session.proxies.update(proxies)
for k, v in proxies.items():
log_config_param(
logger, "Jira", f"{k.upper()}_PROXY", v, sensitive=True
)
if self.config.no_proxy and isinstance(self.config.no_proxy, str):
os.environ["NO_PROXY"] = self.config.no_proxy
log_config_param(logger, "Jira", "NO_PROXY", self.config.no_proxy)
# Apply custom headers if configured
if self.config.custom_headers:
self._apply_custom_headers()
# Initialize the text preprocessor for text processing capabilities
self.preprocessor = JiraPreprocessor(base_url=self.config.url)
self._field_ids_cache = None
self._current_user_account_id = None
# Test authentication during initialization (in debug mode only)
if logger.isEnabledFor(logging.DEBUG):
try:
self._validate_authentication()
except MCPAtlassianAuthenticationError:
logger.warning(
"Authentication validation failed during client initialization - "
"continuing anyway"
)
def _validate_authentication(self) -> None:
"""Validate authentication by making a simple API call."""
try:
logger.debug(
"Testing Jira authentication by retrieving current user info..."
)
current_user = self.jira.myself()
if current_user:
logger.info(
f"Jira authentication successful. "
f"Current user: {current_user.get('displayName', 'Unknown')} "
f"({current_user.get('emailAddress', 'No email')})"
)
else:
logger.warning(
"Jira authentication test returned empty user info - "
"this may indicate an issue"
)
except Exception as e:
error_msg = f"Jira authentication validation failed: {e}"
logger.error(error_msg)
logger.debug(
f"Authentication headers during failure: "
f"{get_masked_session_headers(dict(self.jira._session.headers))}"
)
raise MCPAtlassianAuthenticationError(error_msg) from e
def _apply_custom_headers(self) -> None:
"""Apply custom headers to the Jira session."""
if not self.config.custom_headers:
return
logger.debug(
f"Applying {len(self.config.custom_headers)} custom headers to Jira session"
)
for header_name, header_value in self.config.custom_headers.items():
self.jira._session.headers[header_name] = header_value
logger.debug(f"Applied custom header: {header_name}")
def _clean_text(self, text: str) -> str:
"""Clean text content by:
1. Processing user mentions and links
2. Converting HTML/wiki markup to markdown
Args:
text: Text to clean
Returns:
Cleaned text
"""
if not text:
return ""
# Otherwise create a temporary one
_ = self.config.url if hasattr(self, "config") else ""
return self.preprocessor.clean_jira_text(text)
def _markdown_to_jira(self, markdown_text: str) -> str:
"""
Convert Markdown syntax to Jira markup syntax.
Args:
markdown_text: Text in Markdown format
Returns:
Text in Jira markup format
"""
if not markdown_text:
return ""
# Use the shared preprocessor if available
if hasattr(self, "preprocessor"):
return self.preprocessor.markdown_to_jira(markdown_text)
# Otherwise create a temporary one
_ = self.config.url if hasattr(self, "config") else ""
return self.preprocessor.markdown_to_jira(markdown_text)
def get_paged(
self,
method: Literal["get", "post"],
url: str,
params_or_json: dict | None = None,
*,
absolute: bool = False,
) -> list[dict]:
"""
Repeatly fetch paged data from Jira API using `nextPageToken` to paginate.
Args:
method: The HTTP method to use
url: The URL to retrieve data from
params_or_json: Optional query parameters or JSON data to send
absolute: Whether to use absolute URL
Returns:
List of requested json data
Raises:
ValueError: If using paged request on non-cloud Jira
"""
if not self.config.is_cloud:
raise ValueError(
"Paged requests are only available for Jira Cloud platform"
)
all_results: list[dict] = []
current_data = params_or_json or {}
while True:
if method == "get":
api_result = self.jira.get(
path=url, params=current_data, absolute=absolute
)
else:
api_result = self.jira.post(
path=url, json=current_data, absolute=absolute
)
if not isinstance(api_result, dict):
error_message = f"API result is not a dictionary: {api_result}"
logger.error(error_message)
raise ValueError(error_message)
# Extract values from response
all_results.append(api_result)
# Check if this is the last page
if "nextPageToken" not in api_result:
break
# Update for next iteration
current_data["nextPageToken"] = api_result["nextPageToken"]
return all_results
def create_version(
self,
project: str,
name: str,
start_date: str = None,
release_date: str = None,
description: str = None,
) -> dict[str, Any]:
"""
Create a new version in a Jira project.
Args:
project: The project key (e.g., 'PROJ')
name: The name of the version
start_date: The start date (YYYY-MM-DD, optional)
release_date: The release date (YYYY-MM-DD, optional)
description: Description of the version (optional)
Returns:
The created version object as returned by Jira
"""
payload = {"project": project, "name": name}
if start_date:
payload["startDate"] = start_date
if release_date:
payload["releaseDate"] = release_date
if description:
payload["description"] = description
logger.info(f"Creating Jira version: {payload}")
result = self.jira.post("/rest/api/3/version", json=payload)
if not isinstance(result, dict):
error_message = f"Unexpected response from Jira API: {result}"
raise ValueError(error_message)
return result
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/search.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira search operations."""
import logging
import requests
from requests.exceptions import HTTPError
from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraSearchResult
from .client import JiraClient
from .constants import DEFAULT_READ_JIRA_FIELDS
from .protocols import IssueOperationsProto
logger = logging.getLogger("mcp-jira")
class SearchMixin(JiraClient, IssueOperationsProto):
"""Mixin for Jira search operations."""
def search_issues(
self,
jql: str,
fields: list[str] | tuple[str, ...] | set[str] | str | None = None,
start: int = 0,
limit: int = 50,
expand: str | None = None,
projects_filter: str | None = None,
) -> JiraSearchResult:
"""
Search for issues using JQL (Jira Query Language).
Args:
jql: JQL query string
fields: Fields to return (comma-separated string, list, tuple, set, or "*all")
start: Starting index if number of issues is greater than the limit
Note: This parameter is ignored in Cloud environments and results will always
start from the first page.
limit: Maximum issues to return
expand: Optional items to expand (comma-separated)
projects_filter: Optional comma-separated list of project keys to filter by, overrides config
Returns:
JiraSearchResult object containing issues and metadata (total, start_at, max_results)
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error searching for issues
"""
try:
# Use projects_filter parameter if provided, otherwise fall back to config
filter_to_use = projects_filter or self.config.projects_filter
# Apply projects filter if present
if filter_to_use:
# Split projects filter by commas and handle possible whitespace
projects = [p.strip() for p in filter_to_use.split(",")]
# Build the project filter query part
if len(projects) == 1:
project_query = f'project = "{projects[0]}"'
else:
quoted_projects = [f'"{p}"' for p in projects]
projects_list = ", ".join(quoted_projects)
project_query = f"project IN ({projects_list})"
# Add the project filter to existing query
if not jql:
# Empty JQL - just use project filter
jql = project_query
elif jql.strip().upper().startswith("ORDER BY"):
# JQL starts with ORDER BY - prepend project filter
jql = f"{project_query} {jql}"
elif "project = " not in jql and "project IN" not in jql:
# Only add if not already filtering by project
jql = f"({jql}) AND {project_query}"
logger.info(f"Applied projects filter to query: {jql}")
# Convert fields to proper format if it's a list/tuple/set
fields_param: str | None
if fields is None: # Use default if None
fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
elif isinstance(fields, list | tuple | set):
fields_param = ",".join(fields)
else:
fields_param = fields
if self.config.is_cloud:
actual_total = -1
try:
# Call 1: Get metadata (including total) using standard search API
metadata_params = {"jql": jql, "maxResults": 0}
metadata_response = self.jira.get(
self.jira.resource_url("search"), params=metadata_params
)
if (
isinstance(metadata_response, dict)
and "total" in metadata_response
):
try:
actual_total = int(metadata_response["total"])
except (ValueError, TypeError):
logger.warning(
f"Could not parse 'total' from metadata response for JQL: {jql}. Received: {metadata_response.get('total')}"
)
else:
logger.warning(
f"Could not retrieve total count from metadata response for JQL: {jql}. Response type: {type(metadata_response)}"
)
except Exception as meta_err:
logger.error(
f"Error fetching metadata for JQL '{jql}': {str(meta_err)}"
)
# Call 2: Get the actual issues using the enhanced method
issues_response_list = self.jira.enhanced_jql_get_list_of_tickets(
jql, fields=fields_param, limit=limit, expand=expand
)
if not isinstance(issues_response_list, list):
msg = f"Unexpected return value type from `jira.enhanced_jql_get_list_of_tickets`: {type(issues_response_list)}"
logger.error(msg)
raise TypeError(msg)
response_dict_for_model = {
"issues": issues_response_list,
"total": actual_total,
}
search_result = JiraSearchResult.from_api_response(
response_dict_for_model,
base_url=self.config.url,
requested_fields=fields_param,
)
# Return the full search result object
return search_result
else:
limit = min(limit, 50)
response = self.jira.jql(
jql, fields=fields_param, start=start, limit=limit, expand=expand
)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.jql`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Convert the response to a search result model
search_result = JiraSearchResult.from_api_response(
response, base_url=self.config.url, requested_fields=fields_param
)
# Return the full search result object
return search_result
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API ({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
raise http_err
except Exception as e:
logger.error(f"Error searching issues with JQL '{jql}': {str(e)}")
raise Exception(f"Error searching issues: {str(e)}") from e
def get_board_issues(
self,
board_id: str,
jql: str,
fields: str | None = None,
start: int = 0,
limit: int = 50,
expand: str | None = None,
) -> JiraSearchResult:
"""
Get all issues linked to a specific board.
Args:
board_id: The ID of the board
jql: JQL query string
fields: Fields to return (comma-separated string or "*all")
start: Starting index
limit: Maximum issues to return
expand: Optional items to expand (comma-separated)
Returns:
JiraSearchResult object containing board issues and metadata
Raises:
Exception: If there is an error getting board issues
"""
try:
# Determine fields_param
fields_param = fields
if fields_param is None:
fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
response = self.jira.get_issues_for_board(
board_id=board_id,
jql=jql,
fields=fields_param,
start=start,
limit=limit,
expand=expand,
)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.get_issues_for_board`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Convert the response to a search result model
search_result = JiraSearchResult.from_api_response(
response, base_url=self.config.url, requested_fields=fields_param
)
return search_result
except requests.HTTPError as e:
logger.error(
f"Error searching issues for board with JQL '{board_id}': {str(e.response.content)}"
)
raise Exception(
f"Error searching issues for board with JQL: {str(e.response.content)}"
) from e
except Exception as e:
logger.error(f"Error searching issues for board with JQL '{jql}': {str(e)}")
raise Exception(
f"Error searching issues for board with JQL {str(e)}"
) from e
def get_sprint_issues(
self,
sprint_id: str,
fields: str | None = None,
start: int = 0,
limit: int = 50,
) -> JiraSearchResult:
"""
Get all issues linked to a specific sprint.
Args:
sprint_id: The ID of the sprint
fields: Fields to return (comma-separated string or "*all")
start: Starting index
limit: Maximum issues to return
Returns:
JiraSearchResult object containing sprint issues and metadata
Raises:
Exception: If there is an error getting board issues
"""
try:
# Determine fields_param
fields_param = fields
if fields_param is None:
fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
response = self.jira.get_sprint_issues(
sprint_id=sprint_id,
start=start,
limit=limit,
)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.get_sprint_issues`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Convert the response to a search result model
search_result = JiraSearchResult.from_api_response(
response, base_url=self.config.url, requested_fields=fields_param
)
return search_result
except requests.HTTPError as e:
logger.error(
f"Error searching issues for sprint '{sprint_id}': {str(e.response.content)}"
)
raise Exception(
f"Error searching issues for sprint: {str(e.response.content)}"
) from e
except Exception as e:
logger.error(f"Error searching issues for sprint: {sprint_id}': {str(e)}")
raise Exception(f"Error searching issues for sprint: {str(e)}") from e
```
--------------------------------------------------------------------------------
/tests/integration/test_proxy.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for proxy handling in Jira and Confluence clients (mocked requests).
"""
import os
from unittest.mock import MagicMock, patch
import pytest
from requests.exceptions import ProxyError
from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from tests.utils.base import BaseAuthTest
from tests.utils.mocks import MockEnvironment
@pytest.mark.integration
def test_jira_client_passes_proxies_to_requests(monkeypatch):
"""Test that JiraClient passes proxies to requests.Session.request."""
mock_jira = MagicMock()
mock_session = MagicMock()
# Create a proper proxies dictionary that can be updated
mock_session.proxies = {}
mock_jira._session = mock_session
monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
)
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="pat",
http_proxy="http://proxy:8080",
https_proxy="https://proxy:8443",
socks_proxy="socks5://user:pass@proxy:1080",
no_proxy="localhost,127.0.0.1",
)
client = JiraClient(config=config)
# Simulate a request
client.jira._session.request(
"GET", "https://test.atlassian.net/rest/api/2/issue/TEST-1"
)
assert mock_session.proxies["http"] == "http://proxy:8080"
assert mock_session.proxies["https"] == "https://proxy:8443"
assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
@pytest.mark.integration
def test_confluence_client_passes_proxies_to_requests(monkeypatch):
"""Test that ConfluenceClient passes proxies to requests.Session.request."""
mock_confluence = MagicMock()
mock_session = MagicMock()
# Create a proper proxies dictionary that can be updated
mock_session.proxies = {}
mock_confluence._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
)
monkeypatch.setattr(
"mcp_atlassian.confluence.client.configure_ssl_verification",
lambda **kwargs: None,
)
monkeypatch.setattr(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
lambda **kwargs: MagicMock(),
)
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="user",
api_token="pat",
http_proxy="http://proxy:8080",
https_proxy="https://proxy:8443",
socks_proxy="socks5://user:pass@proxy:1080",
no_proxy="localhost,127.0.0.1",
)
client = ConfluenceClient(config=config)
# Simulate a request
client.confluence._session.request(
"GET", "https://test.atlassian.net/wiki/rest/api/content/123"
)
assert mock_session.proxies["http"] == "http://proxy:8080"
assert mock_session.proxies["https"] == "https://proxy:8443"
assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
@pytest.mark.integration
def test_jira_client_no_proxy_env(monkeypatch):
"""Test that JiraClient sets NO_PROXY env var and requests to excluded hosts bypass proxy."""
mock_jira = MagicMock()
mock_session = MagicMock()
mock_jira._session = mock_session
monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
)
monkeypatch.setenv("NO_PROXY", "")
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="pat",
http_proxy="http://proxy:8080",
no_proxy="localhost,127.0.0.1",
)
client = JiraClient(config=config)
assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"
class TestProxyConfigurationEnhanced(BaseAuthTest):
"""Enhanced proxy configuration tests using test utilities."""
@pytest.mark.integration
def test_proxy_configuration_from_environment(self):
"""Test proxy configuration loaded from environment variables."""
with MockEnvironment.basic_auth_env() as env_vars:
# Set proxy environment variables in os.environ directly
proxy_vars = {
"HTTP_PROXY": "http://proxy.company.com:8080",
"HTTPS_PROXY": "https://proxy.company.com:8443",
"NO_PROXY": "*.internal.com,localhost",
}
# Patch environment with proxy settings
with patch.dict(os.environ, proxy_vars):
# Jira should pick up proxy settings
jira_config = JiraConfig.from_env()
assert jira_config.http_proxy == "http://proxy.company.com:8080"
assert jira_config.https_proxy == "https://proxy.company.com:8443"
assert jira_config.no_proxy == "*.internal.com,localhost"
# Confluence should pick up proxy settings
confluence_config = ConfluenceConfig.from_env()
assert confluence_config.http_proxy == "http://proxy.company.com:8080"
assert confluence_config.https_proxy == "https://proxy.company.com:8443"
assert confluence_config.no_proxy == "*.internal.com,localhost"
@pytest.mark.integration
def test_proxy_authentication_in_url(self):
"""Test proxy URLs with authentication credentials."""
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="token",
http_proxy="http://proxyuser:[email protected]:8080",
https_proxy="https://proxyuser:[email protected]:8443",
)
# Verify proxy URLs contain authentication
assert "proxyuser:proxypass" in config.http_proxy
assert "proxyuser:proxypass" in config.https_proxy
@pytest.mark.integration
def test_socks_proxy_configuration(self, monkeypatch):
"""Test SOCKS proxy configuration for both services."""
mock_jira = MagicMock()
mock_session = MagicMock()
# Create a proper proxies dictionary that can be updated
mock_session.proxies = {}
mock_jira._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
)
monkeypatch.setattr(
"mcp_atlassian.jira.client.configure_ssl_verification",
lambda **kwargs: None,
)
# Test SOCKS5 proxy
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="token",
socks_proxy="socks5://socksuser:[email protected]:1080",
)
client = JiraClient(config=config)
assert (
mock_session.proxies["socks"]
== "socks5://socksuser:[email protected]:1080"
)
@pytest.mark.integration
def test_proxy_bypass_for_internal_domains(self, monkeypatch):
"""Test that requests to NO_PROXY domains bypass the proxy."""
# Set up environment
monkeypatch.setenv("NO_PROXY", "*.internal.com,localhost,127.0.0.1")
config = JiraConfig(
url="https://jira.internal.com", # Internal domain
auth_type="basic",
username="user",
api_token="token",
http_proxy="http://proxy.company.com:8080",
no_proxy="*.internal.com,localhost,127.0.0.1",
)
# Verify NO_PROXY is set in environment
assert os.environ["NO_PROXY"] == "*.internal.com,localhost,127.0.0.1"
assert "internal.com" in config.no_proxy
@pytest.mark.integration
def test_proxy_error_handling(self, monkeypatch):
"""Test proper error handling when proxy connection fails."""
# Mock to simulate proxy connection failure
mock_jira = MagicMock()
mock_jira.side_effect = ProxyError("Unable to connect to proxy")
monkeypatch.setattr("mcp_atlassian.jira.client.Jira", mock_jira)
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="token",
http_proxy="http://unreachable.proxy.com:8080",
)
# Creating client should raise proxy error
with pytest.raises(ProxyError, match="Unable to connect to proxy"):
JiraClient(config=config)
@pytest.mark.integration
def test_proxy_configuration_precedence(self):
"""Test that explicit proxy config takes precedence over environment."""
with patch.dict(
os.environ,
{
"HTTP_PROXY": "http://env.proxy.com:8080",
"HTTPS_PROXY": "https://env.proxy.com:8443",
},
):
# Explicit configuration should override environment
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="basic",
username="user",
api_token="token",
http_proxy="http://explicit.proxy.com:8080",
https_proxy="https://explicit.proxy.com:8443",
)
assert config.http_proxy == "http://explicit.proxy.com:8080"
assert config.https_proxy == "https://explicit.proxy.com:8443"
@pytest.mark.integration
def test_mixed_proxy_and_ssl_configuration(self, monkeypatch):
"""Test proxy configuration works correctly with SSL verification disabled."""
mock_confluence = MagicMock()
mock_session = MagicMock()
# Create a proper proxies dictionary that can be updated
mock_session.proxies = {}
mock_confluence._session = mock_session
monkeypatch.setattr(
"mcp_atlassian.confluence.client.Confluence",
lambda **kwargs: mock_confluence,
)
monkeypatch.setattr(
"mcp_atlassian.confluence.client.configure_ssl_verification",
lambda **kwargs: None,
)
monkeypatch.setattr(
"mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
lambda **kwargs: MagicMock(),
)
# Configure with both proxy and SSL disabled
config = ConfluenceConfig(
url="https://test.atlassian.net/wiki",
auth_type="basic",
username="user",
api_token="token",
http_proxy="http://proxy.company.com:8080",
ssl_verify=False,
)
client = ConfluenceClient(config=config)
# Both proxy and SSL settings should be applied
assert mock_session.proxies["http"] == "http://proxy.company.com:8080"
assert config.ssl_verify is False
@pytest.mark.integration
def test_proxy_with_oauth_configuration(self):
"""Test proxy configuration works with OAuth authentication."""
with MockEnvironment.oauth_env() as env_vars:
# Add proxy configuration to env_vars directly, then patch os.environ
proxy_vars = {
"HTTP_PROXY": "http://proxy.company.com:8080",
"HTTPS_PROXY": "https://proxy.company.com:8443",
"NO_PROXY": "localhost,127.0.0.1",
}
# Merge with OAuth env vars
all_vars = {**env_vars, **proxy_vars}
# Use patch.dict to ensure environment variables are set
with patch.dict(os.environ, all_vars):
# OAuth should still respect proxy settings
assert os.environ.get("HTTP_PROXY") == "http://proxy.company.com:8080"
assert os.environ.get("HTTPS_PROXY") == "https://proxy.company.com:8443"
assert os.environ.get("NO_PROXY") == "localhost,127.0.0.1"
```
--------------------------------------------------------------------------------
/scripts/oauth_authorize.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
"""
OAuth 2.0 Authorization Flow Helper for MCP Atlassian
This script helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud:
1. Opens a browser to the authorization URL
2. Starts a local server to receive the callback with the authorization code
3. Exchanges the authorization code for access and refresh tokens
4. Saves the tokens for later use by MCP Atlassian
Usage:
python oauth_authorize.py --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET
--redirect-uri http://localhost:8080/callback
--scope "read:jira-work write:jira-work read:confluence-space.summary offline_access"
IMPORTANT: The 'offline_access' scope is required for refresh tokens to work properly.
Without this scope, tokens will expire quickly and authentication will fail.
Environment variables can also be used:
- ATLASSIAN_OAUTH_CLIENT_ID
- ATLASSIAN_OAUTH_CLIENT_SECRET
- ATLASSIAN_OAUTH_REDIRECT_URI
- ATLASSIAN_OAUTH_SCOPE
"""
import argparse
import http.server
import logging
import os
import secrets
import socketserver
import sys
import threading
import time
import urllib.parse
import webbrowser
# Add the parent directory to the path so we can import the package
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.mcp_atlassian.utils.oauth import OAuthConfig
# Configure logging (basicConfig should be called only once, ideally at the very start)
# Adding lineno for better debugging.
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(message)s",
force=True,
)
logger = logging.getLogger("oauth-authorize")
logger.setLevel(logging.DEBUG)
logging.getLogger("mcp-atlassian.oauth").setLevel(logging.DEBUG)
# Global variables for callback handling
authorization_code = None
received_state = None
callback_received = False
callback_error = None
class CallbackHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for OAuth callback."""
def do_GET(self) -> None: # noqa: N802
"""Handle GET requests (OAuth callback and favicon)."""
global authorization_code, callback_received, callback_error, received_state
parsed_path = urllib.parse.urlparse(self.path)
logger.debug(f"CallbackHandler received GET request for: {self.path}")
# Ignore favicon requests politely
if parsed_path.path == "/favicon.ico":
self.send_error(404, "File not found")
logger.debug("CallbackHandler: Ignored /favicon.ico request.")
return
# Process only /callback path
if parsed_path.path != "/callback":
self.send_error(404, "Not Found: Only /callback is supported.")
logger.warning(
f"CallbackHandler: Received request for unexpected path: {parsed_path.path}"
)
return
# Parse the query parameters from the URL
query = parsed_path.query
params = urllib.parse.parse_qs(query)
if "error" in params:
callback_error = params["error"][0]
callback_received = True
logger.error(f"Authorization error from callback: {callback_error}")
self._send_response(f"Authorization failed: {callback_error}")
return
if "code" in params:
authorization_code = params["code"][0]
if "state" in params:
received_state = params["state"][0]
callback_received = True
logger.info(
"Authorization code and state received successfully via callback."
)
self._send_response(
"Authorization successful! You can close this window now."
)
else:
logger.error("Invalid callback: 'code' or 'error' parameter missing.")
self._send_response(
"Invalid callback: Authorization code missing", status=400
)
def _send_response(self, message: str, status: int = 200) -> None:
"""Send response to the browser."""
self.send_response(status)
self.send_header("Content-type", "text/html")
self.end_headers()
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Atlassian OAuth Authorization</title>
<style>
body {{
font-family: Arial, sans-serif;
text-align: center;
padding: 40px;
max-width: 600px;
margin: 0 auto;
}}
.message {{
padding: 20px;
border-radius: 5px;
margin-bottom: 20px;
}}
.success {{
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}}
.error {{
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}}
</style>
</head>
<body>
<h1>Atlassian OAuth Authorization</h1>
<div class="message {"success" if status == 200 else "error"}">
<p>{message}</p>
</div>
<p>This window will automatically close in 5 seconds...</p>
<script>
setTimeout(function() {{
window.close();
}}, 5000);
</script>
</body>
</html>
"""
self.wfile.write(html.encode())
# Make the server quiet
def log_message(self, format: str, *args: str) -> None:
return
def start_callback_server(port: int) -> socketserver.TCPServer:
"""Start a local server to receive the OAuth callback."""
handler = CallbackHandler
httpd = socketserver.TCPServer(("", port), handler)
server_thread = threading.Thread(target=httpd.serve_forever)
server_thread.daemon = True
server_thread.start()
return httpd
def wait_for_callback(timeout: int = 300) -> bool:
"""Wait for the callback to be received."""
start_time = time.time()
while not callback_received and (time.time() - start_time) < timeout:
time.sleep(1)
if not callback_received:
logger.error(
f"Timed out waiting for authorization callback after {timeout} seconds"
)
return False
if callback_error:
logger.error(f"Authorization error: {callback_error}")
return False
return True
def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]:
"""Parse the redirect URI to extract host and port."""
parsed = urllib.parse.urlparse(redirect_uri)
port = parsed.port or (443 if parsed.scheme == "https" else 80)
return parsed.hostname, port
def run_oauth_flow(args: argparse.Namespace) -> bool:
"""Run the OAuth 2.0 authorization flow."""
# Create OAuth configuration
oauth_config = OAuthConfig(
client_id=args.client_id,
client_secret=args.client_secret,
redirect_uri=args.redirect_uri,
scope=args.scope,
)
# Generate a random state for CSRF protection
state = secrets.token_urlsafe(16)
# Start local callback server if using localhost
hostname, port = parse_redirect_uri(args.redirect_uri)
httpd = None
if hostname and hostname.lower() in ["localhost", "127.0.0.1"]:
logger.info(f"Attempting to start local callback server on {hostname}:{port}")
try:
httpd = start_callback_server(port)
except OSError as e:
logger.error(f"Failed to start callback server: {e}")
logger.error(f"Make sure port {port} is available and not in use")
return False
# Get the authorization URL
auth_url = oauth_config.get_authorization_url(state=state)
# Open the browser for authorization
logger.info(f"Opening browser for authorization at {auth_url}")
webbrowser.open(auth_url)
logger.info(
"If the browser doesn't open automatically, please visit this URL manually."
)
# Wait for the callback
if not wait_for_callback():
if httpd:
httpd.shutdown()
return False
# Verify state to prevent CSRF attacks
if received_state != state:
logger.error(
f"State mismatch! Possible CSRF attack. Expected: {state}, Received: {received_state}"
)
if httpd:
httpd.shutdown()
return False
logger.info("CSRF state verified successfully.")
# Exchange the code for tokens
logger.info("Exchanging authorization code for tokens...")
if not authorization_code:
logger.error("Authorization code is missing, cannot exchange for tokens.")
if httpd:
httpd.shutdown()
return False
if oauth_config.exchange_code_for_tokens(authorization_code):
logger.info("🎉 OAuth authorization flow completed successfully!")
if oauth_config.cloud_id:
logger.info(f"Retrieved Cloud ID: {oauth_config.cloud_id}")
logger.info(
"\n💡 Tip: Add/update the following in your .env file or environment variables:"
)
logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}")
logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}")
logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}")
logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}")
logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}")
else:
logger.warning(
"Cloud ID could not be obtained. Some API calls might require it."
)
if httpd:
httpd.shutdown()
return True
else:
logger.error("Failed to exchange authorization code for tokens")
if httpd:
httpd.shutdown()
return False
def main() -> int:
"""Main entry point."""
parser = argparse.ArgumentParser(
description="OAuth 2.0 Authorization Flow Helper for MCP Atlassian"
)
parser.add_argument("--client-id", help="OAuth Client ID")
parser.add_argument("--client-secret", help="OAuth Client Secret")
parser.add_argument(
"--redirect-uri",
help="OAuth Redirect URI (e.g., http://localhost:8080/callback)",
)
parser.add_argument("--scope", help="OAuth Scope (space-separated)")
args = parser.parse_args()
# Check for environment variables if arguments are not provided
if not args.client_id:
args.client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID")
if not args.client_secret:
args.client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET")
if not args.redirect_uri:
args.redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI")
if not args.scope:
args.scope = os.getenv("ATLASSIAN_OAUTH_SCOPE")
# Validate required arguments
missing = []
if not args.client_id:
missing.append("client-id")
if not args.client_secret:
missing.append("client-secret")
if not args.redirect_uri:
missing.append("redirect-uri")
if not args.scope:
missing.append("scope")
if missing:
logger.error(f"Missing required arguments: {', '.join(missing)}")
parser.print_help()
return 1
# Check for offline_access scope
if args.scope and "offline_access" not in args.scope.split():
logger.warning("\n⚠️ WARNING: The 'offline_access' scope is missing!")
logger.warning(
"Without this scope, refresh tokens will not be issued and authentication will fail when tokens expire."
)
logger.warning("Consider adding 'offline_access' to your scope string.")
proceed = input("Do you want to proceed anyway? (y/n): ")
if proceed.lower() != "y":
return 1
success = run_oauth_flow(args)
return 0 if success else 1
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/preprocessing/jira.py:
--------------------------------------------------------------------------------
```python
"""Jira-specific text preprocessing module."""
import logging
import re
from typing import Any
from .base import BasePreprocessor
logger = logging.getLogger("mcp-atlassian")
class JiraPreprocessor(BasePreprocessor):
"""Handles text preprocessing for Jira content."""
def __init__(self, base_url: str = "", **kwargs: Any) -> None:
"""
Initialize the Jira text preprocessor.
Args:
base_url: Base URL for Jira API
**kwargs: Additional arguments for the base class
"""
super().__init__(base_url=base_url, **kwargs)
def clean_jira_text(self, text: str) -> str:
"""
Clean Jira text content by:
1. Processing user mentions and links
2. Converting Jira markup to markdown
3. Converting HTML/wiki markup to markdown
"""
if not text:
return ""
# Process user mentions
mention_pattern = r"\[~accountid:(.*?)\]"
text = self._process_mentions(text, mention_pattern)
# Process Jira smart links
text = self._process_smart_links(text)
# First convert any Jira markup to Markdown
text = self.jira_to_markdown(text)
# Then convert any remaining HTML to markdown
text = self._convert_html_to_markdown(text)
return text.strip()
def _process_mentions(self, text: str, pattern: str) -> str:
"""
Process user mentions in text.
Args:
text: The text containing mentions
pattern: Regular expression pattern to match mentions
Returns:
Text with mentions replaced with display names
"""
mentions = re.findall(pattern, text)
for account_id in mentions:
try:
# Note: This is a placeholder - actual user fetching should be injected
display_name = f"User:{account_id}"
text = text.replace(f"[~accountid:{account_id}]", display_name)
except Exception as e:
logger.error(f"Error processing mention for {account_id}: {str(e)}")
return text
def _process_smart_links(self, text: str) -> str:
"""Process Jira/Confluence smart links."""
# Pattern matches: [text|url|smart-link]
link_pattern = r"\[(.*?)\|(.*?)\|smart-link\]"
matches = re.finditer(link_pattern, text)
for match in matches:
full_match = match.group(0)
link_text = match.group(1)
link_url = match.group(2)
# Extract issue key if it's a Jira issue link
issue_key_match = re.search(r"browse/([A-Z]+-\d+)", link_url)
# Check if it's a Confluence wiki link
confluence_match = re.search(
r"wiki/spaces/.+?/pages/\d+/(.+?)(?:\?|$)", link_url
)
if issue_key_match:
issue_key = issue_key_match.group(1)
clean_url = f"{self.base_url}/browse/{issue_key}"
text = text.replace(full_match, f"[{issue_key}]({clean_url})")
elif confluence_match:
url_title = confluence_match.group(1)
readable_title = url_title.replace("+", " ")
readable_title = re.sub(r"^[A-Z]+-\d+\s+", "", readable_title)
text = text.replace(full_match, f"[{readable_title}]({link_url})")
else:
clean_url = link_url.split("?")[0]
text = text.replace(full_match, f"[{link_text}]({clean_url})")
return text
def jira_to_markdown(self, input_text: str) -> str:
"""
Convert Jira markup to Markdown format.
Args:
input_text: Text in Jira markup format
Returns:
Text in Markdown format
"""
if not input_text:
return ""
# Block quotes
output = re.sub(r"^bq\.(.*?)$", r"> \1\n", input_text, flags=re.MULTILINE)
# Text formatting (bold, italic)
output = re.sub(
r"([*_])(.*?)\1",
lambda match: ("**" if match.group(1) == "*" else "*")
+ match.group(2)
+ ("**" if match.group(1) == "*" else "*"),
output,
)
# Multi-level numbered list
output = re.sub(
r"^((?:#|-|\+|\*)+) (.*)$",
lambda match: self._convert_jira_list_to_markdown(match),
output,
flags=re.MULTILINE,
)
# Headers
output = re.sub(
r"^h([0-6])\.(.*)$",
lambda match: "#" * int(match.group(1)) + match.group(2),
output,
flags=re.MULTILINE,
)
# Inline code
output = re.sub(r"\{\{([^}]+)\}\}", r"`\1`", output)
# Citation
output = re.sub(r"\?\?((?:.[^?]|[^?].)+)\?\?", r"<cite>\1</cite>", output)
# Inserted text
output = re.sub(r"\+([^+]*)\+", r"<ins>\1</ins>", output)
# Superscript
output = re.sub(r"\^([^^]*)\^", r"<sup>\1</sup>", output)
# Subscript
output = re.sub(r"~([^~]*)~", r"<sub>\1</sub>", output)
# Strikethrough
output = re.sub(r"-([^-]*)-", r"-\1-", output)
# Code blocks with optional language specification
output = re.sub(
r"\{code(?::([a-z]+))?\}([\s\S]*?)\{code\}",
r"```\1\n\2\n```",
output,
flags=re.MULTILINE,
)
# No format
output = re.sub(r"\{noformat\}([\s\S]*?)\{noformat\}", r"```\n\1\n```", output)
# Quote blocks
output = re.sub(
r"\{quote\}([\s\S]*)\{quote\}",
lambda match: "\n".join(
[f"> {line}" for line in match.group(1).split("\n")]
),
output,
flags=re.MULTILINE,
)
# Images with alt text
output = re.sub(
r"!([^|\n\s]+)\|([^\n!]*)alt=([^\n!\,]+?)(,([^\n!]*))?!",
r"",
output,
)
# Images with other parameters (ignore them)
output = re.sub(r"!([^|\n\s]+)\|([^\n!]*)!", r"", output)
# Images without parameters
output = re.sub(r"!([^\n\s!]+)!", r"", output)
# Links
output = re.sub(r"\[([^|]+)\|(.+?)\]", r"[\1](\2)", output)
output = re.sub(r"\[(.+?)\]([^\(]+)", r"<\1>\2", output)
# Colored text
output = re.sub(
r"\{color:([^}]+)\}([\s\S]*?)\{color\}",
r"<span style=\"color:\1\">\2</span>",
output,
flags=re.MULTILINE,
)
# Convert Jira table headers (||) to markdown table format
lines = output.split("\n")
i = 0
while i < len(lines):
line = lines[i]
if "||" in line:
# Replace Jira table headers
lines[i] = lines[i].replace("||", "|")
# Add a separator line for markdown tables
header_cells = lines[i].count("|") - 1
if header_cells > 0:
separator_line = "|" + "---|" * header_cells
lines.insert(i + 1, separator_line)
i += 1 # Skip the newly inserted line in next iteration
i += 1
# Rejoin the lines
output = "\n".join(lines)
return output
def markdown_to_jira(self, input_text: str) -> str:
"""
Convert Markdown syntax to Jira markup syntax.
Args:
input_text: Text in Markdown format
Returns:
Text in Jira markup format
"""
if not input_text:
return ""
# Save code blocks to prevent recursive processing
code_blocks = []
inline_codes = []
# Extract code blocks
def save_code_block(match: re.Match) -> str:
"""
Process and save a code block.
Args:
match: Regex match object containing the code block
Returns:
Jira-formatted code block
"""
syntax = match.group(1) or ""
content = match.group(2)
code = "{code"
if syntax:
code += ":" + syntax
code += "}" + content + "{code}"
code_blocks.append(code)
return str(code) # Ensure we return a string
# Extract inline code
def save_inline_code(match: re.Match) -> str:
"""
Process and save inline code.
Args:
match: Regex match object containing the inline code
Returns:
Jira-formatted inline code
"""
content = match.group(1)
code = "{{" + content + "}}"
inline_codes.append(code)
return str(code) # Ensure we return a string
# Save code sections temporarily
output = re.sub(r"```(\w*)\n([\s\S]+?)```", save_code_block, input_text)
output = re.sub(r"`([^`]+)`", save_inline_code, output)
# Headers with = or - underlines
output = re.sub(
r"^(.*?)\n([=-])+$",
lambda match: f"h{1 if match.group(2)[0] == '=' else 2}. {match.group(1)}",
output,
flags=re.MULTILINE,
)
# Headers with # prefix
output = re.sub(
r"^([#]+)(.*?)$",
lambda match: f"h{len(match.group(1))}." + match.group(2),
output,
flags=re.MULTILINE,
)
# Bold and italic
output = re.sub(
r"([*_]+)(.*?)\1",
lambda match: ("_" if len(match.group(1)) == 1 else "*")
+ match.group(2)
+ ("_" if len(match.group(1)) == 1 else "*"),
output,
)
# Multi-level bulleted list
output = re.sub(
r"^(\s*)- (.*)$",
lambda match: (
"* " + match.group(2)
if not match.group(1)
else " " * (len(match.group(1)) // 2) + "* " + match.group(2)
),
output,
flags=re.MULTILINE,
)
# Multi-level numbered list
output = re.sub(
r"^(\s+)1\. (.*)$",
lambda match: "#" * (int(len(match.group(1)) / 4) + 2)
+ " "
+ match.group(2),
output,
flags=re.MULTILINE,
)
# HTML formatting tags to Jira markup
tag_map = {"cite": "??", "del": "-", "ins": "+", "sup": "^", "sub": "~"}
for tag, replacement in tag_map.items():
output = re.sub(
rf"<{tag}>(.*?)<\/{tag}>", rf"{replacement}\1{replacement}", output
)
# Colored text
output = re.sub(
r"<span style=\"color:(#[^\"]+)\">([\s\S]*?)</span>",
r"{color:\1}\2{color}",
output,
flags=re.MULTILINE,
)
# Strikethrough
output = re.sub(r"~~(.*?)~~", r"-\1-", output)
# Images without alt text
output = re.sub(r"!\[\]\(([^)\n\s]+)\)", r"!\1!", output)
# Images with alt text
output = re.sub(r"!\[([^\]\n]+)\]\(([^)\n\s]+)\)", r"!\2|alt=\1!", output)
# Links
output = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"[\1|\2]", output)
output = re.sub(r"<([^>]+)>", r"[\1]", output)
# Convert markdown tables to Jira table format
lines = output.split("\n")
i = 0
while i < len(lines):
if i < len(lines) - 1 and re.match(r"\|[-\s|]+\|", lines[i + 1]):
# Convert header row to Jira format
lines[i] = lines[i].replace("|", "||")
# Remove the separator line
lines.pop(i + 1)
i += 1
# Rejoin the lines
output = "\n".join(lines)
return output
def _convert_jira_list_to_markdown(self, match: re.Match) -> str:
"""
Helper method to convert Jira lists to Markdown format.
Args:
match: Regex match object containing the Jira list markup
Returns:
Markdown-formatted list item
"""
jira_bullets = match.group(1)
content = match.group(2)
# Calculate indentation level based on number of symbols
indent_level = len(jira_bullets) - 1
indent = " " * (indent_level * 2)
# Determine the marker based on the last character
last_char = jira_bullets[-1]
prefix = "1." if last_char == "#" else "-"
return f"{indent}{prefix} {content}"
```