#
tokens: 48497/50000 21/194 files (page 3/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/mcp_atlassian/models/confluence/page.py:
--------------------------------------------------------------------------------

```python
"""
Confluence page models.
This module provides Pydantic models for Confluence pages and their versions.
"""

import logging
import warnings
from typing import Any

from pydantic import Field

from ..base import ApiModel, TimestampMixin
from ..constants import (
    CONFLUENCE_DEFAULT_ID,
    EMPTY_STRING,
)

# Import other necessary models using relative imports
from .common import ConfluenceAttachment, ConfluenceUser
from .space import ConfluenceSpace

logger = logging.getLogger(__name__)


class ConfluenceVersion(ApiModel, TimestampMixin):
    """
    Model representing a Confluence page version.
    """

    number: int = 0
    when: str = EMPTY_STRING
    message: str | None = None
    by: ConfluenceUser | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "ConfluenceVersion":
        """
        Create a ConfluenceVersion from a Confluence API response.

        Args:
            data: The version data from the Confluence API

        Returns:
            A ConfluenceVersion instance
        """
        if not data:
            return cls()

        by_user = None
        if by_data := data.get("by"):
            by_user = ConfluenceUser.from_api_response(by_data)

        return cls(
            number=data.get("number", 0),
            when=data.get("when", EMPTY_STRING),
            message=data.get("message"),
            by=by_user,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {"number": self.number, "when": self.format_timestamp(self.when)}

        if self.message:
            result["message"] = self.message

        if self.by:
            result["by"] = self.by.display_name

        return result


class ConfluencePage(ApiModel, TimestampMixin):
    """
    Model representing a Confluence page.

    This model includes the content, metadata, and version information
    for a Confluence page.
    """

    id: str = CONFLUENCE_DEFAULT_ID
    title: str = EMPTY_STRING
    type: str = "page"  # "page", "blogpost", etc.
    status: str = "current"
    space: ConfluenceSpace | None = None
    content: str = EMPTY_STRING
    content_format: str = "view"  # "view", "storage", etc.
    created: str = EMPTY_STRING
    updated: str = EMPTY_STRING
    author: ConfluenceUser | None = None
    version: ConfluenceVersion | None = None
    ancestors: list[dict[str, Any]] = Field(default_factory=list)
    children: dict[str, Any] = Field(default_factory=dict)
    attachments: list[ConfluenceAttachment] = Field(default_factory=list)
    url: str | None = None

    @property
    def page_content(self) -> str:
        """
        Alias for content to maintain compatibility with tests.

        Deprecated: Use content instead.
        """
        warnings.warn(
            "The 'page_content' property is deprecated. Use 'content' instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self.content

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "ConfluencePage":
        """
        Create a ConfluencePage from a Confluence API response.

        Args:
            data: The page data from the Confluence API
            **kwargs: Additional keyword arguments
                base_url: Base URL for constructing page URLs
                include_body: Whether to include body content
                content_override: Override the content value
                content_format: Override the content format
                is_cloud: Whether this is a cloud instance (affects URL format)

        Returns:
            A ConfluencePage instance
        """
        if not data:
            return cls()

        # Extract space information first to ensure it's available for URL construction
        space_data = data.get("space", {})
        if not space_data:
            # Try to extract space info from _expandable if available
            if expandable := data.get("_expandable", {}):
                if space_path := expandable.get("space"):
                    # Extract space key from REST API path
                    if space_path.startswith("/rest/api/space/"):
                        space_key = space_path.split("/rest/api/space/")[1]
                        space_data = {"key": space_key, "name": f"Space {space_key}"}

        # Create space model
        space = ConfluenceSpace.from_api_response(space_data)

        # Extract content based on format or use override if provided
        content = EMPTY_STRING
        content_format = kwargs.get("content_format", "view")
        include_body = kwargs.get("include_body", True)

        # Allow content override to be provided directly
        if content_override := kwargs.get("content_override"):
            content = content_override
        elif include_body and "body" in data:
            body = data.get("body", {})
            if content_format in body:
                content = body.get(content_format, {}).get("value", EMPTY_STRING)

        # Adjust content_format if convert_to_markdown is False and content is processed HTML
        convert_to_markdown = kwargs.get("convert_to_markdown", True)
        if not convert_to_markdown:
            content_format = "html"

        # Process author/creator
        author = None
        if author_data := data.get("author"):
            author = ConfluenceUser.from_api_response(author_data)

        # Process version
        version = None
        if version_data := data.get("version"):
            version = ConfluenceVersion.from_api_response(version_data)

        # Process attachments
        attachments = []
        if (
            attachments_data := data.get("children", {})
            .get("attachment", {})
            .get("results", [])
        ):
            attachments = [
                ConfluenceAttachment.from_api_response(attachment)
                for attachment in attachments_data
            ]

        # Process metadata timestamps
        created = EMPTY_STRING
        updated = EMPTY_STRING

        if history := data.get("history"):
            created = history.get("createdDate", EMPTY_STRING)
            updated = history.get("lastUpdated", {}).get("when", EMPTY_STRING)

            # Fall back to version date if no history is available
            if not updated and version and version.when:
                updated = version.when

        # Construct URL if base_url is provided
        url = None
        if base_url := kwargs.get("base_url"):
            page_id = data.get("id")

            # Use different URL format based on whether it's cloud or server
            is_cloud = kwargs.get("is_cloud", False)
            if is_cloud:
                # Cloud format: {base_url}/spaces/{space_key}/pages/{page_id}
                space_key = space.key if space and space.key else "unknown"
                url = f"{base_url}/spaces/{space_key}/pages/{page_id}"
            else:
                # Server format: {base_url}/pages/viewpage.action?pageId={page_id}
                url = f"{base_url}/pages/viewpage.action?pageId={page_id}"

        return cls(
            id=str(data.get("id", CONFLUENCE_DEFAULT_ID)),
            title=data.get("title", EMPTY_STRING),
            type=data.get("type", "page"),
            status=data.get("status", "current"),
            space=space,
            content=content,
            content_format=content_format,
            created=created,
            updated=updated,
            author=author,
            version=version,
            ancestors=data.get("ancestors", []),
            children=data.get("children", {}),
            attachments=attachments,
            url=url,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "id": self.id,
            "title": self.title,
            "type": self.type,
            "created": self.format_timestamp(self.created),
            "updated": self.format_timestamp(self.updated),
            "url": self.url,
        }

        # Add space information if available
        if self.space:
            result["space"] = {"key": self.space.key, "name": self.space.name}

        # Add author information if available
        if self.author:
            result["author"] = self.author.display_name

        # Add version information if available
        if self.version:
            result["version"] = self.version.number

        # Add attachments if available
        result["attachments"] = [
            attachment.to_simplified_dict() for attachment in self.attachments
        ]

        # Add content if it's not empty
        if self.content and self.content_format:
            result["content"] = {"value": self.content, "format": self.content_format}

        # Add ancestors if there are any
        if self.ancestors:
            result["ancestors"] = [
                {"id": a.get("id"), "title": a.get("title")}
                for a in self.ancestors
                if "id" in a
            ]

        return result

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_client.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the ConfluenceClient class."""

import os
from unittest.mock import MagicMock, patch

from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig


def test_init_with_basic_auth():
    """Test initializing the client with basic auth configuration."""
    # Arrange
    config = ConfluenceConfig(
        url="https://test.atlassian.net/wiki",
        auth_type="basic",
        username="test_user",
        api_token="test_token",
    )

    # Mock the Confluence class, ConfluencePreprocessor, and configure_ssl_verification
    with (
        patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
        patch(
            "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
        ) as mock_preprocessor,
        patch(
            "mcp_atlassian.confluence.client.configure_ssl_verification"
        ) as mock_configure_ssl,
    ):
        # Act
        client = ConfluenceClient(config=config)

        # Assert
        mock_confluence.assert_called_once_with(
            url="https://test.atlassian.net/wiki",
            username="test_user",
            password="test_token",
            cloud=True,
            verify_ssl=True,
        )
        assert client.config == config
        assert client.confluence == mock_confluence.return_value
        assert client.preprocessor == mock_preprocessor.return_value

        # Verify SSL verification was configured
        mock_configure_ssl.assert_called_once_with(
            service_name="Confluence",
            url="https://test.atlassian.net/wiki",
            session=mock_confluence.return_value._session,
            ssl_verify=True,
        )


def test_init_with_token_auth():
    """Test initializing the client with token auth configuration."""
    # Arrange
    config = ConfluenceConfig(
        url="https://confluence.example.com",
        auth_type="pat",
        personal_token="test_personal_token",
        ssl_verify=False,
    )

    # Mock the Confluence class, ConfluencePreprocessor, and configure_ssl_verification
    with (
        patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
        patch(
            "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
        ) as mock_preprocessor,
        patch(
            "mcp_atlassian.confluence.client.configure_ssl_verification"
        ) as mock_configure_ssl,
    ):
        # Act
        client = ConfluenceClient(config=config)

        # Assert
        mock_confluence.assert_called_once_with(
            url="https://confluence.example.com",
            token="test_personal_token",
            cloud=False,
            verify_ssl=False,
        )
        assert client.config == config
        assert client.confluence == mock_confluence.return_value
        assert client.preprocessor == mock_preprocessor.return_value

        # Verify SSL verification was configured with ssl_verify=False
        mock_configure_ssl.assert_called_once_with(
            service_name="Confluence",
            url="https://confluence.example.com",
            session=mock_confluence.return_value._session,
            ssl_verify=False,
        )


def test_init_from_env():
    """Test initializing the client from environment variables."""
    # Arrange
    with (
        patch(
            "mcp_atlassian.confluence.config.ConfluenceConfig.from_env"
        ) as mock_from_env,
        patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
        patch("mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"),
        patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
    ):
        mock_config = MagicMock()
        mock_from_env.return_value = mock_config

        # Act
        client = ConfluenceClient()

        # Assert
        mock_from_env.assert_called_once()
        assert client.config == mock_config


def test_process_html_content():
    """Test the _process_html_content method."""
    # Arrange
    with (
        patch("mcp_atlassian.confluence.client.ConfluenceConfig.from_env"),
        patch("mcp_atlassian.confluence.client.Confluence"),
        patch(
            "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
        ) as mock_preprocessor_class,
        patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
    ):
        mock_preprocessor = mock_preprocessor_class.return_value
        mock_preprocessor.process_html_content.return_value = (
            "<p>HTML</p>",
            "Markdown",
        )

        client = ConfluenceClient()

        # Act
        html, markdown = client._process_html_content("<p>Test</p>", "TEST")

        # Assert
        mock_preprocessor.process_html_content.assert_called_once_with(
            "<p>Test</p>", "TEST", client.confluence
        )
        assert html == "<p>HTML</p>"
        assert markdown == "Markdown"


def test_get_user_details_by_accountid():
    """Test the get_user_details_by_accountid method."""
    # Arrange
    with (
        patch("mcp_atlassian.confluence.client.ConfluenceConfig.from_env"),
        patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence_class,
        patch("mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"),
        patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
    ):
        mock_confluence = mock_confluence_class.return_value
        mock_confluence.get_user_details_by_accountid.return_value = {
            "displayName": "Test User",
            "accountId": "123456",
            "emailAddress": "[email protected]",
            "active": True,
        }

        client = ConfluenceFetcher()

        # Act
        user_details = client.get_user_details_by_accountid("123456")

        # Assert
        mock_confluence.get_user_details_by_accountid.assert_called_once_with(
            "123456", None
        )
        assert user_details["displayName"] == "Test User"
        assert user_details["accountId"] == "123456"
        assert user_details["emailAddress"] == "[email protected]"
        assert user_details["active"] is True

        # Test with expand parameter
        mock_confluence.get_user_details_by_accountid.reset_mock()
        mock_confluence.get_user_details_by_accountid.return_value = {
            "displayName": "Test User",
            "accountId": "123456",
            "status": "active",
        }

        user_details = client.get_user_details_by_accountid("123456", expand="status")

        mock_confluence.get_user_details_by_accountid.assert_called_once_with(
            "123456", "status"
        )
        assert user_details["status"] == "active"


def test_init_sets_proxies_and_no_proxy(monkeypatch):
    """Test that ConfluenceClient sets session proxies and NO_PROXY env var from config."""
    # Patch Confluence and its _session
    mock_confluence = MagicMock()
    mock_session = MagicMock()
    mock_session.proxies = {}  # Use a real dict for proxies
    mock_confluence._session = mock_session
    monkeypatch.setattr(
        "mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
    )
    monkeypatch.setattr(
        "mcp_atlassian.confluence.client.configure_ssl_verification",
        lambda **kwargs: None,
    )
    monkeypatch.setattr(
        "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
        lambda **kwargs: MagicMock(),
    )

    # Patch environment
    monkeypatch.setenv("NO_PROXY", "")

    config = ConfluenceConfig(
        url="https://test.atlassian.net/wiki",
        auth_type="basic",
        username="user",
        api_token="token",
        http_proxy="http://proxy:8080",
        https_proxy="https://proxy:8443",
        socks_proxy="socks5://user:pass@proxy:1080",
        no_proxy="localhost,127.0.0.1",
    )
    client = ConfluenceClient(config=config)
    assert mock_session.proxies["http"] == "http://proxy:8080"
    assert mock_session.proxies["https"] == "https://proxy:8443"
    assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
    assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"


def test_init_no_proxies(monkeypatch):
    """Test that ConfluenceClient does not set proxies if not configured."""
    # Patch Confluence and its _session
    mock_confluence = MagicMock()
    mock_session = MagicMock()
    mock_session.proxies = {}  # Use a real dict for proxies
    mock_confluence._session = mock_session
    monkeypatch.setattr(
        "mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
    )
    monkeypatch.setattr(
        "mcp_atlassian.confluence.client.configure_ssl_verification",
        lambda **kwargs: None,
    )
    monkeypatch.setattr(
        "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
        lambda **kwargs: MagicMock(),
    )

    config = ConfluenceConfig(
        url="https://test.atlassian.net/wiki",
        auth_type="basic",
        username="user",
        api_token="token",
    )
    client = ConfluenceClient(config=config)
    assert mock_session.proxies == {}

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/preprocessing/base.py:
--------------------------------------------------------------------------------

```python
"""Base preprocessing module."""

import logging
import re
import warnings
from typing import Any, Protocol

from bs4 import BeautifulSoup, Tag
from markdownify import markdownify as md

logger = logging.getLogger("mcp-atlassian")


class ConfluenceClient(Protocol):
    """Protocol for Confluence client."""

    def get_user_details_by_accountid(self, account_id: str) -> dict[str, Any]:
        """Get user details by account ID."""
        ...

    def get_user_details_by_username(self, username: str) -> dict[str, Any]:
        """Get user details by username (for Server/DC compatibility)."""
        ...


class BasePreprocessor:
    """Base class for text preprocessing operations."""

    def __init__(self, base_url: str = "") -> None:
        """
        Initialize the base text preprocessor.

        Args:
            base_url: Base URL for API server
        """
        self.base_url = base_url.rstrip("/") if base_url else ""

    def process_html_content(
        self,
        html_content: str,
        space_key: str = "",
        confluence_client: ConfluenceClient | None = None,
    ) -> tuple[str, str]:
        """
        Process HTML content to replace user refs and page links.

        Args:
            html_content: The HTML content to process
            space_key: Optional space key for context
            confluence_client: Optional Confluence client for user lookups

        Returns:
            Tuple of (processed_html, processed_markdown)
        """
        try:
            # Parse the HTML content
            soup = BeautifulSoup(html_content, "html.parser")

            # Process user mentions
            self._process_user_mentions_in_soup(soup, confluence_client)
            self._process_user_profile_macros_in_soup(soup, confluence_client)

            # Convert to string and markdown
            processed_html = str(soup)
            processed_markdown = md(processed_html)

            return processed_html, processed_markdown

        except Exception as e:
            logger.error(f"Error in process_html_content: {str(e)}")
            raise

    def _process_user_mentions_in_soup(
        self, soup: BeautifulSoup, confluence_client: ConfluenceClient | None = None
    ) -> None:
        """
        Process user mentions in BeautifulSoup object.

        Args:
            soup: BeautifulSoup object containing HTML
            confluence_client: Optional Confluence client for user lookups
        """
        # Find all ac:link elements that might contain user mentions
        user_mentions = soup.find_all("ac:link")

        for user_element in user_mentions:
            user_ref = user_element.find("ri:user")
            if user_ref and user_ref.get("ri:account-id"):
                # Case 1: Direct user reference without link-body
                account_id = user_ref.get("ri:account-id")
                if isinstance(account_id, str):
                    self._replace_user_mention(
                        user_element, account_id, confluence_client
                    )
                    continue

            # Case 2: User reference with link-body containing @
            link_body = user_element.find("ac:link-body")
            if link_body and "@" in link_body.get_text(strip=True):
                user_ref = user_element.find("ri:user")
                if user_ref and user_ref.get("ri:account-id"):
                    account_id = user_ref.get("ri:account-id")
                    if isinstance(account_id, str):
                        self._replace_user_mention(
                            user_element, account_id, confluence_client
                        )

    def _process_user_profile_macros_in_soup(
        self, soup: BeautifulSoup, confluence_client: ConfluenceClient | None = None
    ) -> None:
        """
        Process Confluence User Profile macros in BeautifulSoup object.
        Replaces <ac:structured-macro ac:name="profile">...</ac:structured-macro>
        with the user's display name, typically formatted as @DisplayName.

        Args:
            soup: BeautifulSoup object containing HTML
            confluence_client: Optional Confluence client for user lookups
        """
        profile_macros = soup.find_all(
            "ac:structured-macro", attrs={"ac:name": "profile"}
        )

        for macro_element in profile_macros:
            user_param = macro_element.find("ac:parameter", attrs={"ac:name": "user"})
            if not user_param:
                logger.debug(
                    "User profile macro found without a 'user' parameter. Replacing with placeholder."
                )
                macro_element.replace_with("[User Profile Macro (Malformed)]")
                continue

            user_ref = user_param.find("ri:user")
            if not user_ref:
                logger.debug(
                    "User profile macro's 'user' parameter found without 'ri:user' tag. Replacing with placeholder."
                )
                macro_element.replace_with("[User Profile Macro (Malformed)]")
                continue

            account_id = user_ref.get("ri:account-id")
            userkey = user_ref.get("ri:userkey")  # Fallback for Confluence Server/DC

            user_identifier_for_log = account_id or userkey
            display_name = None

            if confluence_client and user_identifier_for_log:
                try:
                    if account_id and isinstance(account_id, str):
                        user_details = confluence_client.get_user_details_by_accountid(
                            account_id
                        )
                        display_name = user_details.get("displayName")
                    elif userkey and isinstance(userkey, str):
                        # For Confluence Server/DC, userkey might be the username
                        user_details = confluence_client.get_user_details_by_username(
                            userkey
                        )
                        display_name = user_details.get("displayName")
                except Exception as e:
                    logger.warning(
                        f"Error fetching user details for profile macro (user: {user_identifier_for_log}): {e}"
                    )
            elif not confluence_client:
                logger.warning(
                    "Confluence client not available for User Profile Macro processing."
                )

            if display_name:
                replacement_text = f"@{display_name}"
                macro_element.replace_with(replacement_text)
            else:
                fallback_identifier = (
                    user_identifier_for_log
                    if user_identifier_for_log
                    else "unknown_user"
                )
                fallback_text = f"[User Profile: {fallback_identifier}]"
                macro_element.replace_with(fallback_text)
                logger.debug(f"Using fallback for user profile macro: {fallback_text}")

    def _replace_user_mention(
        self,
        user_element: Tag,
        account_id: str,
        confluence_client: ConfluenceClient | None = None,
    ) -> None:
        """
        Replace a user mention with the user's display name.

        Args:
            user_element: The HTML element containing the user mention
            account_id: The user's account ID
            confluence_client: Optional Confluence client for user lookups
        """
        try:
            # Only attempt to get user details if we have a valid confluence client
            if confluence_client is not None:
                user_details = confluence_client.get_user_details_by_accountid(
                    account_id
                )
                display_name = user_details.get("displayName", "")
                if display_name:
                    new_text = f"@{display_name}"
                    user_element.replace_with(new_text)
                    return
            # If we don't have a confluence client or couldn't get user details,
            # use fallback
            self._use_fallback_user_mention(user_element, account_id)
        except Exception as e:
            logger.warning(f"Error processing user mention: {str(e)}")
            self._use_fallback_user_mention(user_element, account_id)

    def _use_fallback_user_mention(self, user_element: Tag, account_id: str) -> None:
        """
        Replace user mention with a fallback when the API call fails.

        Args:
            user_element: The HTML element containing the user mention
            account_id: The user's account ID
        """
        # Fallback: just use the account ID
        new_text = f"@user_{account_id}"
        user_element.replace_with(new_text)

    def _convert_html_to_markdown(self, text: str) -> str:
        """Convert HTML content to markdown if needed."""
        if re.search(r"<[^>]+>", text):
            try:
                with warnings.catch_warnings():
                    warnings.filterwarnings("ignore", category=UserWarning)
                    soup = BeautifulSoup(f"<div>{text}</div>", "html.parser")
                    html = str(soup.div.decode_contents()) if soup.div else text
                    text = md(html)
            except Exception as e:
                logger.warning(f"Error converting HTML to markdown: {str(e)}")
        return text

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_comments.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the CommentsMixin class."""

from unittest.mock import patch

import pytest
import requests

from mcp_atlassian.confluence.comments import CommentsMixin


class TestCommentsMixin:
    """Tests for the CommentsMixin class."""

    @pytest.fixture
    def comments_mixin(self, confluence_client):
        """Create a CommentsMixin instance for testing."""
        # CommentsMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.comments.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = CommentsMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = confluence_client.confluence
            mixin.config = confluence_client.config
            mixin.preprocessor = confluence_client.preprocessor
            return mixin

    def test_get_page_comments_success(self, comments_mixin):
        """Test get_page_comments with success response."""
        # Setup
        page_id = "12345"
        # Configure the mock to return a successful response
        comments_mixin.confluence.get_page_comments.return_value = {
            "results": [
                {
                    "id": "12345",
                    "body": {"view": {"value": "<p>Comment content here</p>"}},
                    "version": {"number": 1},
                    "author": {"displayName": "John Doe"},
                }
            ]
        }

        # Mock preprocessor
        comments_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed Markdown",
        )

        # Call the method
        result = comments_mixin.get_page_comments(page_id)

        # Verify
        comments_mixin.confluence.get_page_comments.assert_called_once_with(
            content_id=page_id, expand="body.view.value,version", depth="all"
        )
        assert len(result) == 1
        assert result[0].body == "Processed Markdown"

    def test_get_page_comments_with_html(self, comments_mixin):
        """Test get_page_comments with HTML output instead of markdown."""
        # Setup
        page_id = "12345"
        comments_mixin.confluence.get_page_comments.return_value = {
            "results": [
                {
                    "id": "12345",
                    "body": {"view": {"value": "<p>Comment content here</p>"}},
                    "version": {"number": 1},
                    "author": {"displayName": "John Doe"},
                }
            ]
        }

        # Mock the HTML processing
        comments_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed markdown",
        )

        # Call the method
        result = comments_mixin.get_page_comments(page_id, return_markdown=False)

        # Verify result
        assert len(result) == 1
        comment = result[0]
        assert comment.body == "<p>Processed HTML</p>"

    def test_get_page_comments_api_error(self, comments_mixin):
        """Test handling of API errors."""
        # Mock the API to raise an exception
        comments_mixin.confluence.get_page_comments.side_effect = (
            requests.RequestException("API error")
        )

        # Act
        result = comments_mixin.get_page_comments("987654321")

        # Assert
        assert isinstance(result, list)
        assert len(result) == 0  # Empty list on error

    def test_get_page_comments_key_error(self, comments_mixin):
        """Test handling of missing keys in API response."""
        # Mock the response to be missing expected keys
        comments_mixin.confluence.get_page_comments.return_value = {"invalid": "data"}

        # Act
        result = comments_mixin.get_page_comments("987654321")

        # Assert
        assert isinstance(result, list)
        assert len(result) == 0  # Empty list on error

    def test_get_page_comments_value_error(self, comments_mixin):
        """Test handling of unexpected data types."""
        # Cause a value error by returning a string where a dict is expected
        comments_mixin.confluence.get_page_by_id.return_value = "invalid"

        # Act
        result = comments_mixin.get_page_comments("987654321")

        # Assert
        assert isinstance(result, list)
        assert len(result) == 0  # Empty list on error

    def test_get_page_comments_with_empty_results(self, comments_mixin):
        """Test handling of empty results."""
        # Mock empty results
        comments_mixin.confluence.get_page_comments.return_value = {"results": []}

        # Act
        result = comments_mixin.get_page_comments("987654321")

        # Assert
        assert isinstance(result, list)
        assert len(result) == 0  # Empty list with no comments

    def test_add_comment_success(self, comments_mixin):
        """Test adding a comment with success response."""
        # Setup
        page_id = "12345"
        content = "This is a test comment"

        # Mock the page retrieval
        comments_mixin.confluence.get_page_by_id.return_value = {
            "space": {"key": "TEST"}
        }

        # Mock the preprocessor's conversion method
        comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
            "<p>This is a test comment</p>"
        )

        # Configure the mock to return a successful response
        comments_mixin.confluence.add_comment.return_value = {
            "id": "98765",
            "body": {"view": {"value": "<p>This is a test comment</p>"}},
            "version": {"number": 1},
            "author": {"displayName": "Test User"},
        }

        # Mock the HTML processing
        comments_mixin.preprocessor.process_html_content.return_value = (
            "<p>This is a test comment</p>",
            "This is a test comment",
        )

        # Call the method
        result = comments_mixin.add_comment(page_id, content)

        # Verify
        comments_mixin.confluence.add_comment.assert_called_once_with(
            page_id, "<p>This is a test comment</p>"
        )
        assert result is not None
        assert result.id == "98765"
        assert result.body == "This is a test comment"

    def test_add_comment_with_html_content(self, comments_mixin):
        """Test adding a comment with HTML content."""
        # Setup
        page_id = "12345"
        content = "<p>This is an <strong>HTML</strong> comment</p>"

        # Mock the page retrieval
        comments_mixin.confluence.get_page_by_id.return_value = {
            "space": {"key": "TEST"}
        }

        # Configure the mock to return a successful response
        comments_mixin.confluence.add_comment.return_value = {
            "id": "98765",
            "body": {
                "view": {"value": "<p>This is an <strong>HTML</strong> comment</p>"}
            },
            "version": {"number": 1},
            "author": {"displayName": "Test User"},
        }

        # Mock the HTML processing
        comments_mixin.preprocessor.process_html_content.return_value = (
            "<p>This is an <strong>HTML</strong> comment</p>",
            "This is an **HTML** comment",
        )

        # Call the method
        result = comments_mixin.add_comment(page_id, content)

        # Verify - should not call markdown conversion since content is already HTML
        comments_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()
        comments_mixin.confluence.add_comment.assert_called_once_with(page_id, content)
        assert result is not None
        assert result.body == "This is an **HTML** comment"

    def test_add_comment_api_error(self, comments_mixin):
        """Test handling of API errors when adding a comment."""
        # Setup
        page_id = "12345"
        content = "This is a test comment"

        # Mock the page retrieval
        comments_mixin.confluence.get_page_by_id.return_value = {
            "space": {"key": "TEST"}
        }

        # Mock the preprocessor's conversion method
        comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
            "<p>This is a test comment</p>"
        )

        # Mock the API to raise an exception
        comments_mixin.confluence.add_comment.side_effect = requests.RequestException(
            "API error"
        )

        # Call the method
        result = comments_mixin.add_comment(page_id, content)

        # Verify
        assert result is None

    def test_add_comment_empty_response(self, comments_mixin):
        """Test handling of empty API response when adding a comment."""
        # Setup
        page_id = "12345"
        content = "This is a test comment"

        # Mock the page retrieval
        comments_mixin.confluence.get_page_by_id.return_value = {
            "space": {"key": "TEST"}
        }

        # Mock the preprocessor's conversion method
        comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
            "<p>This is a test comment</p>"
        )

        # Configure the mock to return an empty response
        comments_mixin.confluence.add_comment.return_value = None

        # Call the method
        result = comments_mixin.add_comment(page_id, content)

        # Verify
        assert result is None

```

--------------------------------------------------------------------------------
/tests/integration/test_ssl_verification.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for SSL verification functionality."""

import os
from unittest.mock import MagicMock, patch

import pytest
from requests.exceptions import SSLError
from requests.sessions import Session

from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.ssl import SSLIgnoreAdapter, configure_ssl_verification
from tests.utils.base import BaseAuthTest
from tests.utils.mocks import MockEnvironment


@pytest.mark.integration
def test_configure_ssl_verification_with_real_confluence_url():
    """Test SSL verification configuration with real Confluence URL from environment."""
    # Get the URL from the environment
    url = os.getenv("CONFLUENCE_URL")
    if not url:
        pytest.skip("CONFLUENCE_URL not set in environment")

    # Create a real session
    session = Session()
    original_adapters_count = len(session.adapters)

    # Mock the SSL_VERIFY value to be False for this test
    with patch.dict(os.environ, {"CONFLUENCE_SSL_VERIFY": "false"}):
        # Configure SSL verification - explicitly pass ssl_verify=False
        configure_ssl_verification(
            service_name="Confluence",
            url=url,
            session=session,
            ssl_verify=False,
        )

        # Extract domain from URL (remove protocol and path)
        domain = url.split("://")[1].split("/")[0]

        # Verify the adapters are mounted correctly
        assert len(session.adapters) == original_adapters_count + 2
        assert f"https://{domain}" in session.adapters
        assert f"http://{domain}" in session.adapters
        assert isinstance(session.adapters[f"https://{domain}"], SSLIgnoreAdapter)
        assert isinstance(session.adapters[f"http://{domain}"], SSLIgnoreAdapter)


class TestSSLVerificationEnhanced(BaseAuthTest):
    """Enhanced SSL verification tests using test utilities."""

    @pytest.mark.integration
    def test_ssl_verification_enabled_by_default(self):
        """Test that SSL verification is enabled by default."""
        with MockEnvironment.basic_auth_env():
            # For Jira
            jira_config = JiraConfig.from_env()
            assert jira_config.ssl_verify is True

            # For Confluence
            confluence_config = ConfluenceConfig.from_env()
            assert confluence_config.ssl_verify is True

    @pytest.mark.integration
    def test_ssl_verification_disabled_via_env(self):
        """Test SSL verification can be disabled via environment variables."""
        with MockEnvironment.basic_auth_env() as env_vars:
            env_vars["JIRA_SSL_VERIFY"] = "false"
            env_vars["CONFLUENCE_SSL_VERIFY"] = "false"

            # For Jira - need to reload config after env change
            with patch.dict(os.environ, env_vars):
                jira_config = JiraConfig.from_env()
                assert jira_config.ssl_verify is False

                # For Confluence
                confluence_config = ConfluenceConfig.from_env()
                assert confluence_config.ssl_verify is False

    @pytest.mark.integration
    def test_ssl_adapter_mounting_for_multiple_domains(self):
        """Test SSL adapters are correctly mounted for multiple domains."""
        session = Session()

        # Configure for multiple domains
        urls = [
            "https://domain1.atlassian.net",
            "https://domain2.atlassian.net/wiki",
            "https://custom.domain.com/jira",
        ]

        for url in urls:
            configure_ssl_verification(
                service_name="Test", url=url, session=session, ssl_verify=False
            )

        # Verify all domains have SSL adapters
        assert "https://domain1.atlassian.net" in session.adapters
        assert "https://domain2.atlassian.net" in session.adapters
        assert "https://custom.domain.com" in session.adapters

    @pytest.mark.integration
    def test_ssl_error_handling_with_invalid_cert(self, monkeypatch):
        """Test SSL error handling when certificate validation fails."""
        # Mock the Jira class to simulate SSL error
        mock_jira = MagicMock()
        mock_jira.side_effect = SSLError("Certificate verification failed")
        monkeypatch.setattr("mcp_atlassian.jira.client.Jira", mock_jira)

        with MockEnvironment.basic_auth_env():
            config = JiraConfig.from_env()
            config.ssl_verify = True  # Ensure SSL verification is on

            # Creating client should raise SSL error
            with pytest.raises(SSLError, match="Certificate verification failed"):
                JiraClient(config=config)

    @pytest.mark.integration
    def test_ssl_verification_with_custom_ca_bundle(self):
        """Test SSL verification with custom CA bundle path."""
        with MockEnvironment.basic_auth_env() as env_vars:
            # Set custom CA bundle path
            custom_ca_path = "/path/to/custom/ca-bundle.crt"
            env_vars["JIRA_SSL_VERIFY"] = custom_ca_path
            env_vars["CONFLUENCE_SSL_VERIFY"] = custom_ca_path

            # For Jira - need to reload config after env change
            with patch.dict(os.environ, env_vars):
                jira_config = JiraConfig.from_env()
                # Note: Current implementation only supports boolean ssl_verify
                # Custom CA bundle paths are not supported in the config parsing
                assert (
                    jira_config.ssl_verify is True
                )  # Any non-false value becomes True

                # For Confluence
                confluence_config = ConfluenceConfig.from_env()
                assert (
                    confluence_config.ssl_verify is True
                )  # Any non-false value becomes True

    @pytest.mark.integration
    def test_ssl_adapter_not_mounted_when_verification_enabled(self):
        """Test that SSL adapters are not mounted when verification is enabled."""
        session = Session()
        original_adapter_count = len(session.adapters)

        # Configure with SSL verification enabled
        configure_ssl_verification(
            service_name="Jira",
            url="https://test.atlassian.net",
            session=session,
            ssl_verify=True,  # SSL verification enabled
        )

        # No additional adapters should be mounted
        assert len(session.adapters) == original_adapter_count
        assert "https://test.atlassian.net" not in session.adapters

    @pytest.mark.integration
    def test_ssl_configuration_persistence_across_requests(self):
        """Test SSL configuration persists across multiple requests."""
        session = Session()

        # Configure SSL for a domain
        configure_ssl_verification(
            service_name="Jira",
            url="https://test.atlassian.net",
            session=session,
            ssl_verify=False,
        )

        # Get the adapter
        adapter = session.adapters.get("https://test.atlassian.net")
        assert isinstance(adapter, SSLIgnoreAdapter)

        # Configure again - should not create duplicate adapters
        configure_ssl_verification(
            service_name="Jira",
            url="https://test.atlassian.net",
            session=session,
            ssl_verify=False,
        )

        # Should still have an SSLIgnoreAdapter present
        new_adapter = session.adapters.get("https://test.atlassian.net")
        assert isinstance(new_adapter, SSLIgnoreAdapter)

    @pytest.mark.integration
    def test_ssl_verification_with_oauth_configuration(self):
        """Test SSL verification works correctly with OAuth configuration."""
        with MockEnvironment.oauth_env() as env_vars:
            # Add SSL configuration
            env_vars["JIRA_SSL_VERIFY"] = "false"
            env_vars["CONFLUENCE_SSL_VERIFY"] = "false"

            # OAuth config should still respect SSL settings
            # Need to reload config after env change
            with patch.dict(os.environ, env_vars):
                # Note: OAuth flow would need additional setup, but we're testing config only
                assert os.environ.get("JIRA_SSL_VERIFY") == "false"
                assert os.environ.get("CONFLUENCE_SSL_VERIFY") == "false"


@pytest.mark.integration
def test_configure_ssl_verification_with_real_jira_url():
    """Test SSL verification configuration with real Jira URL from environment."""
    # Get the URL from the environment
    url = os.getenv("JIRA_URL")
    if not url:
        pytest.skip("JIRA_URL not set in environment")

    # Create a real session
    session = Session()
    original_adapters_count = len(session.adapters)

    # Mock the SSL_VERIFY value to be False for this test
    with patch.dict(os.environ, {"JIRA_SSL_VERIFY": "false"}):
        # Configure SSL verification - explicitly pass ssl_verify=False
        configure_ssl_verification(
            service_name="Jira",
            url=url,
            session=session,
            ssl_verify=False,
        )

        # Extract domain from URL (remove protocol and path)
        domain = url.split("://")[1].split("/")[0]

        # Verify the adapters are mounted correctly
        assert len(session.adapters) == original_adapters_count + 2
        assert f"https://{domain}" in session.adapters
        assert f"http://{domain}" in session.adapters
        assert isinstance(session.adapters[f"https://{domain}"], SSLIgnoreAdapter)
        assert isinstance(session.adapters[f"http://{domain}"], SSLIgnoreAdapter)

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_client.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira client module."""

import os
from copy import deepcopy
from typing import Literal
from unittest.mock import MagicMock, call, patch

import pytest

from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig


class DeepcopyMock(MagicMock):
    """A Mock that creates a deep copy of its arguments before storing them."""

    def __call__(self, /, *args, **kwargs):
        args = deepcopy(args)
        kwargs = deepcopy(kwargs)
        return super().__call__(*args, **kwargs)


def test_init_with_basic_auth():
    """Test initializing the client with basic auth configuration."""
    with (
        patch("mcp_atlassian.jira.client.Jira") as mock_jira,
        patch(
            "mcp_atlassian.jira.client.configure_ssl_verification"
        ) as mock_configure_ssl,
    ):
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test_username",
            api_token="test_token",
        )

        client = JiraClient(config=config)

        # Verify Jira was initialized correctly
        mock_jira.assert_called_once_with(
            url="https://test.atlassian.net",
            username="test_username",
            password="test_token",
            cloud=True,
            verify_ssl=True,
        )

        # Verify SSL verification was configured
        mock_configure_ssl.assert_called_once_with(
            service_name="Jira",
            url="https://test.atlassian.net",
            session=mock_jira.return_value._session,
            ssl_verify=True,
        )

        assert client.config == config
        assert client._field_ids_cache is None
        assert client._current_user_account_id is None


def test_init_with_token_auth():
    """Test initializing the client with token auth configuration."""
    with (
        patch("mcp_atlassian.jira.client.Jira") as mock_jira,
        patch(
            "mcp_atlassian.jira.client.configure_ssl_verification"
        ) as mock_configure_ssl,
    ):
        config = JiraConfig(
            url="https://jira.example.com",
            auth_type="pat",
            personal_token="test_personal_token",
            ssl_verify=False,
        )

        client = JiraClient(config=config)

        # Verify Jira was initialized correctly
        mock_jira.assert_called_once_with(
            url="https://jira.example.com",
            token="test_personal_token",
            cloud=False,
            verify_ssl=False,
        )

        # Verify SSL verification was configured with ssl_verify=False
        mock_configure_ssl.assert_called_once_with(
            service_name="Jira",
            url="https://jira.example.com",
            session=mock_jira.return_value._session,
            ssl_verify=False,
        )

        assert client.config == config


def test_init_from_env():
    """Test initializing the client from environment variables."""
    with (
        patch("mcp_atlassian.jira.config.JiraConfig.from_env") as mock_from_env,
        patch("mcp_atlassian.jira.client.Jira") as mock_jira,
        patch("mcp_atlassian.jira.client.configure_ssl_verification"),
    ):
        mock_config = MagicMock()
        mock_config.auth_type = "basic"  # needed for the if condition
        mock_from_env.return_value = mock_config

        client = JiraClient()

        mock_from_env.assert_called_once()
        assert client.config == mock_config


def test_clean_text():
    """Test the _clean_text method."""
    with (
        patch("mcp_atlassian.jira.client.Jira"),
        patch("mcp_atlassian.jira.client.configure_ssl_verification"),
    ):
        client = JiraClient(
            config=JiraConfig(
                url="https://test.atlassian.net",
                auth_type="basic",
                username="test_username",
                api_token="test_token",
            )
        )

        # Test with HTML
        assert client._clean_text("<p>Test content</p>") == "Test content"

        # Test with empty string
        assert client._clean_text("") == ""

        # Test with spaces and newlines
        assert client._clean_text("  \n  Test with spaces  \n  ") == "Test with spaces"


def _test_get_paged(method: Literal["get", "post"]):
    """Test the get_paged method."""
    with (
        patch(
            "mcp_atlassian.jira.client.Jira.get", new_callable=DeepcopyMock
        ) as mock_get,
        patch(
            "mcp_atlassian.jira.client.Jira.post", new_callable=DeepcopyMock
        ) as mock_post,
        patch("mcp_atlassian.jira.client.configure_ssl_verification"),
    ):
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test_username",
            api_token="test_token",
        )
        client = JiraClient(config=config)

        # Mock paged responses
        mock_responses = [
            {"data": "page1", "nextPageToken": "token1"},
            {"data": "page2", "nextPageToken": "token2"},
            {"data": "page3"},  # Last page does not have nextPageToken
        ]

        # Create mock method with side effect to return responses in sequence
        if method == "get":
            mock_get.side_effect = mock_responses
            mock_post.side_effect = RuntimeError("This should not be called")
        else:
            mock_post.side_effect = mock_responses
            mock_get.side_effect = RuntimeError("This should not be called")

        # Run the method
        params = {"initial": "params"}
        results = client.get_paged(method, "/test/url", params)

        # Verify the results
        assert results == mock_responses

        # Verify call parameters
        if method == "get":
            expected_calls = [
                call(path="/test/url", params={"initial": "params"}, absolute=False),
                call(
                    path="/test/url",
                    params={"initial": "params", "nextPageToken": "token1"},
                    absolute=False,
                ),
                call(
                    path="/test/url",
                    params={"initial": "params", "nextPageToken": "token2"},
                    absolute=False,
                ),
            ]
            assert mock_get.call_args_list == expected_calls
        else:
            expected_calls = [
                call(path="/test/url", json={"initial": "params"}, absolute=False),
                call(
                    path="/test/url",
                    json={"initial": "params", "nextPageToken": "token1"},
                    absolute=False,
                ),
                call(
                    path="/test/url",
                    json={"initial": "params", "nextPageToken": "token2"},
                    absolute=False,
                ),
            ]
            assert mock_post.call_args_list == expected_calls


def test_get_paged_get():
    """Test the get_paged method for GET requests."""
    _test_get_paged("get")


def test_get_paged_post():
    """Test the get_paged method for POST requests."""
    _test_get_paged("post")


def test_get_paged_without_cloud():
    """Test the get_paged method without cloud."""
    with patch("mcp_atlassian.jira.client.configure_ssl_verification"):
        config = JiraConfig(
            url="https://jira.example.com",
            auth_type="pat",
            personal_token="test_token",
        )
        client = JiraClient(config=config)
        with pytest.raises(
            ValueError,
            match="Paged requests are only available for Jira Cloud platform",
        ):
            client.get_paged("get", "/test/url")


def test_init_sets_proxies_and_no_proxy(monkeypatch):
    """Test that JiraClient sets session proxies and NO_PROXY env var from config."""
    # Patch Jira and its _session
    mock_jira = MagicMock()
    mock_session = MagicMock()
    mock_session.proxies = {}  # Use a real dict for proxies
    mock_jira._session = mock_session
    monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
    monkeypatch.setattr(
        "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
    )

    # Patch environment
    monkeypatch.setenv("NO_PROXY", "")

    config = JiraConfig(
        url="https://test.atlassian.net",
        auth_type="basic",
        username="user",
        api_token="pat",
        http_proxy="http://proxy:8080",
        https_proxy="https://proxy:8443",
        socks_proxy="socks5://user:pass@proxy:1080",
        no_proxy="localhost,127.0.0.1",
    )
    client = JiraClient(config=config)
    assert mock_session.proxies["http"] == "http://proxy:8080"
    assert mock_session.proxies["https"] == "https://proxy:8443"
    assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
    assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"


def test_init_no_proxies(monkeypatch):
    """Test that JiraClient does not set proxies if not configured."""
    # Patch Jira and its _session
    mock_jira = MagicMock()
    mock_session = MagicMock()
    mock_session.proxies = {}  # Use a real dict for proxies
    mock_jira._session = mock_session
    monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
    monkeypatch.setattr(
        "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
    )

    config = JiraConfig(
        url="https://test.atlassian.net",
        auth_type="basic",
        username="user",
        api_token="pat",
    )
    client = JiraClient(config=config)
    assert mock_session.proxies == {}

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/attachments.py:
--------------------------------------------------------------------------------

```python
"""Attachment operations for Jira API."""

import logging
import os
from pathlib import Path
from typing import Any

from ..models.jira import JiraAttachment
from .client import JiraClient
from .protocols import AttachmentsOperationsProto

# Configure logging
logger = logging.getLogger("mcp-jira")


class AttachmentsMixin(JiraClient, AttachmentsOperationsProto):
    """Mixin for Jira attachment operations."""

    def download_attachment(self, url: str, target_path: str) -> bool:
        """
        Download a Jira attachment to the specified path.

        Args:
            url: The URL of the attachment to download
            target_path: The path where the attachment should be saved

        Returns:
            True if successful, False otherwise
        """
        if not url:
            logger.error("No URL provided for attachment download")
            return False

        try:
            # Convert to absolute path if relative
            if not os.path.isabs(target_path):
                target_path = os.path.abspath(target_path)

            logger.info(f"Downloading attachment from {url} to {target_path}")

            # Create the directory if it doesn't exist
            os.makedirs(os.path.dirname(target_path), exist_ok=True)

            # Use the Jira session to download the file
            response = self.jira._session.get(url, stream=True)
            response.raise_for_status()

            # Write the file to disk
            with open(target_path, "wb") as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)

            # Verify the file was created
            if os.path.exists(target_path):
                file_size = os.path.getsize(target_path)
                logger.info(
                    f"Successfully downloaded attachment to {target_path} (size: {file_size} bytes)"
                )
                return True
            else:
                logger.error(f"File was not created at {target_path}")
                return False

        except Exception as e:
            logger.error(f"Error downloading attachment: {str(e)}")
            return False

    def download_issue_attachments(
        self, issue_key: str, target_dir: str
    ) -> dict[str, Any]:
        """
        Download all attachments for a Jira issue.

        Args:
            issue_key: The Jira issue key (e.g., 'PROJ-123')
            target_dir: The directory where attachments should be saved

        Returns:
            A dictionary with download results
        """
        # Convert to absolute path if relative
        if not os.path.isabs(target_dir):
            target_dir = os.path.abspath(target_dir)

        logger.info(
            f"Downloading attachments for {issue_key} to directory: {target_dir}"
        )

        # Create the target directory if it doesn't exist
        target_path = Path(target_dir)
        target_path.mkdir(parents=True, exist_ok=True)

        # Get the issue with attachments
        logger.info(f"Fetching issue {issue_key} with attachments")
        issue_data = self.jira.issue(issue_key, fields="attachment")

        if not isinstance(issue_data, dict):
            msg = f"Unexpected return value type from `jira.issue`: {type(issue_data)}"
            logger.error(msg)
            raise TypeError(msg)

        if "fields" not in issue_data:
            logger.error(f"Could not retrieve issue {issue_key}")
            return {"success": False, "error": f"Could not retrieve issue {issue_key}"}

        # Process attachments
        attachments = []
        results = []

        # Extract attachments from the API response
        attachment_data = issue_data.get("fields", {}).get("attachment", [])

        if not attachment_data:
            return {
                "success": True,
                "message": f"No attachments found for issue {issue_key}",
                "downloaded": [],
                "failed": [],
            }

        # Create JiraAttachment objects for each attachment
        for attachment in attachment_data:
            if isinstance(attachment, dict):
                attachments.append(JiraAttachment.from_api_response(attachment))

        # Download each attachment
        downloaded = []
        failed = []

        for attachment in attachments:
            if not attachment.url:
                logger.warning(f"No URL for attachment {attachment.filename}")
                failed.append(
                    {"filename": attachment.filename, "error": "No URL available"}
                )
                continue

            # Create a safe filename
            safe_filename = Path(attachment.filename).name
            file_path = target_path / safe_filename

            # Download the attachment
            success = self.download_attachment(attachment.url, str(file_path))

            if success:
                downloaded.append(
                    {
                        "filename": attachment.filename,
                        "path": str(file_path),
                        "size": attachment.size,
                    }
                )
            else:
                failed.append(
                    {"filename": attachment.filename, "error": "Download failed"}
                )

        return {
            "success": True,
            "issue_key": issue_key,
            "total": len(attachments),
            "downloaded": downloaded,
            "failed": failed,
        }

    def upload_attachment(self, issue_key: str, file_path: str) -> dict[str, Any]:
        """
        Upload a single attachment to a Jira issue.

        Args:
            issue_key: The Jira issue key (e.g., 'PROJ-123')
            file_path: The path to the file to upload

        Returns:
            A dictionary with upload result information
        """
        if not issue_key:
            logger.error("No issue key provided for attachment upload")
            return {"success": False, "error": "No issue key provided"}

        if not file_path:
            logger.error("No file path provided for attachment upload")
            return {"success": False, "error": "No file path provided"}

        try:
            # Convert to absolute path if relative
            if not os.path.isabs(file_path):
                file_path = os.path.abspath(file_path)

            # Check if file exists
            if not os.path.exists(file_path):
                logger.error(f"File not found: {file_path}")
                return {"success": False, "error": f"File not found: {file_path}"}

            logger.info(f"Uploading attachment from {file_path} to issue {issue_key}")

            # Use the Jira API to upload the file
            filename = os.path.basename(file_path)
            with open(file_path, "rb") as file:
                attachment = self.jira.add_attachment(
                    issue_key=issue_key, filename=file_path
                )

            if attachment:
                file_size = os.path.getsize(file_path)
                logger.info(
                    f"Successfully uploaded attachment {filename} to {issue_key} (size: {file_size} bytes)"
                )
                return {
                    "success": True,
                    "issue_key": issue_key,
                    "filename": filename,
                    "size": file_size,
                    "id": attachment.get("id")
                    if isinstance(attachment, dict)
                    else None,
                }
            else:
                logger.error(f"Failed to upload attachment {filename} to {issue_key}")
                return {
                    "success": False,
                    "error": f"Failed to upload attachment {filename} to {issue_key}",
                }

        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error uploading attachment: {error_msg}")
            return {"success": False, "error": error_msg}

    def upload_attachments(
        self, issue_key: str, file_paths: list[str]
    ) -> dict[str, Any]:
        """
        Upload multiple attachments to a Jira issue.

        Args:
            issue_key: The Jira issue key (e.g., 'PROJ-123')
            file_paths: List of paths to files to upload

        Returns:
            A dictionary with upload results
        """
        if not issue_key:
            logger.error("No issue key provided for attachment upload")
            return {"success": False, "error": "No issue key provided"}

        if not file_paths:
            logger.error("No file paths provided for attachment upload")
            return {"success": False, "error": "No file paths provided"}

        logger.info(f"Uploading {len(file_paths)} attachments to issue {issue_key}")

        # Upload each attachment
        uploaded = []
        failed = []

        for file_path in file_paths:
            result = self.upload_attachment(issue_key, file_path)

            if result.get("success"):
                uploaded.append(
                    {
                        "filename": result.get("filename"),
                        "size": result.get("size"),
                        "id": result.get("id"),
                    }
                )
            else:
                failed.append(
                    {
                        "filename": os.path.basename(file_path),
                        "error": result.get("error"),
                    }
                )

        return {
            "success": True,
            "issue_key": issue_key,
            "total": len(file_paths),
            "uploaded": uploaded,
            "failed": failed,
        }

```

--------------------------------------------------------------------------------
/tests/unit/utils/test_environment.py:
--------------------------------------------------------------------------------

```python
"""Tests for the environment utilities module."""

import logging

import pytest

from mcp_atlassian.utils.environment import get_available_services
from tests.utils.assertions import assert_log_contains
from tests.utils.mocks import MockEnvironment


@pytest.fixture(autouse=True)
def setup_logger():
    """Ensure logger is set to INFO level for capturing log messages."""
    logger = logging.getLogger("mcp-atlassian.utils.environment")
    original_level = logger.level
    logger.setLevel(logging.INFO)
    yield
    logger.setLevel(original_level)


@pytest.fixture
def env_scenarios():
    """Environment configuration scenarios for testing."""
    return {
        "oauth_cloud": {
            "CONFLUENCE_URL": "https://company.atlassian.net",
            "JIRA_URL": "https://company.atlassian.net",
            "ATLASSIAN_OAUTH_CLIENT_ID": "client_id",
            "ATLASSIAN_OAUTH_CLIENT_SECRET": "client_secret",
            "ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080/callback",
            "ATLASSIAN_OAUTH_SCOPE": "read:jira-user",
            "ATLASSIAN_OAUTH_CLOUD_ID": "cloud_id",
        },
        "basic_auth_cloud": {
            "CONFLUENCE_URL": "https://company.atlassian.net",
            "CONFLUENCE_USERNAME": "[email protected]",
            "CONFLUENCE_API_TOKEN": "api_token",
            "JIRA_URL": "https://company.atlassian.net",
            "JIRA_USERNAME": "[email protected]",
            "JIRA_API_TOKEN": "api_token",
        },
        "pat_server": {
            "CONFLUENCE_URL": "https://confluence.company.com",
            "CONFLUENCE_PERSONAL_TOKEN": "pat_token",
            "JIRA_URL": "https://jira.company.com",
            "JIRA_PERSONAL_TOKEN": "pat_token",
        },
        "basic_auth_server": {
            "CONFLUENCE_URL": "https://confluence.company.com",
            "CONFLUENCE_USERNAME": "admin",
            "CONFLUENCE_API_TOKEN": "password",
            "JIRA_URL": "https://jira.company.com",
            "JIRA_USERNAME": "admin",
            "JIRA_API_TOKEN": "password",
        },
    }


def _assert_service_availability(result, confluence_expected, jira_expected):
    """Helper to assert service availability."""
    assert result == {"confluence": confluence_expected, "jira": jira_expected}


def _assert_authentication_logs(caplog, auth_type, services):
    """Helper to assert authentication log messages."""
    log_patterns = {
        "oauth": "OAuth 2.0 (3LO) authentication (Cloud-only features)",
        "cloud_basic": "Cloud Basic Authentication (API Token)",
        "server": "Server/Data Center authentication (PAT or Basic Auth)",
        "not_configured": "is not configured or required environment variables are missing",
    }

    for service in services:
        service_name = service.title()
        if auth_type == "not_configured":
            assert_log_contains(
                caplog, "INFO", f"{service_name} {log_patterns[auth_type]}"
            )
        else:
            assert_log_contains(
                caplog, "INFO", f"Using {service_name} {log_patterns[auth_type]}"
            )


class TestGetAvailableServices:
    """Test cases for get_available_services function."""

    def test_no_services_configured(self, caplog):
        """Test that no services are available when no environment variables are set."""
        with MockEnvironment.clean_env():
            result = get_available_services()
            _assert_service_availability(
                result, confluence_expected=False, jira_expected=False
            )
            _assert_authentication_logs(
                caplog, "not_configured", ["confluence", "jira"]
            )

    @pytest.mark.parametrize(
        "scenario,expected_confluence,expected_jira",
        [
            ("oauth_cloud", True, True),
            ("basic_auth_cloud", True, True),
            ("pat_server", True, True),
            ("basic_auth_server", True, True),
        ],
    )
    def test_valid_authentication_scenarios(
        self, env_scenarios, scenario, expected_confluence, expected_jira, caplog
    ):
        """Test various valid authentication scenarios."""
        with MockEnvironment.clean_env():
            for key, value in env_scenarios[scenario].items():
                import os

                os.environ[key] = value

            result = get_available_services()
            _assert_service_availability(
                result,
                confluence_expected=expected_confluence,
                jira_expected=expected_jira,
            )

            # Verify appropriate log messages based on scenario
            if scenario == "oauth_cloud":
                _assert_authentication_logs(caplog, "oauth", ["confluence", "jira"])
            elif scenario == "basic_auth_cloud":
                _assert_authentication_logs(
                    caplog, "cloud_basic", ["confluence", "jira"]
                )
            elif scenario in ["pat_server", "basic_auth_server"]:
                _assert_authentication_logs(caplog, "server", ["confluence", "jira"])

    @pytest.mark.parametrize(
        "missing_oauth_var",
        [
            "ATLASSIAN_OAUTH_CLIENT_ID",
            "ATLASSIAN_OAUTH_CLIENT_SECRET",
            "ATLASSIAN_OAUTH_REDIRECT_URI",
            "ATLASSIAN_OAUTH_SCOPE",
            "ATLASSIAN_OAUTH_CLOUD_ID",
        ],
    )
    def test_oauth_missing_required_vars(
        self, env_scenarios, missing_oauth_var, caplog
    ):
        """Test that OAuth fails when any required variable is missing."""
        with MockEnvironment.clean_env():
            oauth_config = env_scenarios["oauth_cloud"]
            # Remove one required OAuth variable
            del oauth_config[missing_oauth_var]

            for key, value in oauth_config.items():
                import os

                os.environ[key] = value

            result = get_available_services()
            _assert_service_availability(
                result, confluence_expected=False, jira_expected=False
            )

    @pytest.mark.parametrize(
        "missing_basic_vars,service",
        [
            (["CONFLUENCE_USERNAME", "JIRA_USERNAME"], "username"),
            (["CONFLUENCE_API_TOKEN", "JIRA_API_TOKEN"], "token"),
        ],
    )
    def test_basic_auth_missing_credentials(
        self, env_scenarios, missing_basic_vars, service
    ):
        """Test that basic auth fails when credentials are missing."""
        with MockEnvironment.clean_env():
            basic_config = env_scenarios["basic_auth_cloud"].copy()

            # Remove required variables
            for var in missing_basic_vars:
                del basic_config[var]

            for key, value in basic_config.items():
                import os

                os.environ[key] = value

            result = get_available_services()
            _assert_service_availability(
                result, confluence_expected=False, jira_expected=False
            )

    def test_oauth_precedence_over_basic_auth(self, env_scenarios, caplog):
        """Test that OAuth takes precedence over Basic Auth."""
        with MockEnvironment.clean_env():
            # Set both OAuth and Basic Auth variables
            combined_config = {
                **env_scenarios["oauth_cloud"],
                **env_scenarios["basic_auth_cloud"],
            }

            for key, value in combined_config.items():
                import os

                os.environ[key] = value

            result = get_available_services()
            _assert_service_availability(
                result, confluence_expected=True, jira_expected=True
            )

            # Should use OAuth, not Basic Auth
            _assert_authentication_logs(caplog, "oauth", ["confluence", "jira"])
            assert "Basic Authentication" not in caplog.text

    def test_mixed_service_configuration(self, caplog):
        """Test mixed configurations where only one service is configured."""
        with MockEnvironment.clean_env():
            import os

            os.environ["CONFLUENCE_URL"] = "https://company.atlassian.net"
            os.environ["CONFLUENCE_USERNAME"] = "[email protected]"
            os.environ["CONFLUENCE_API_TOKEN"] = "api_token"

            result = get_available_services()
            _assert_service_availability(
                result, confluence_expected=True, jira_expected=False
            )

            _assert_authentication_logs(caplog, "cloud_basic", ["confluence"])
            _assert_authentication_logs(caplog, "not_configured", ["jira"])

    def test_return_value_structure(self):
        """Test that the return value has the correct structure."""
        with MockEnvironment.clean_env():
            result = get_available_services()

            assert isinstance(result, dict)
            assert set(result.keys()) == {"confluence", "jira"}
            assert all(isinstance(v, bool) for v in result.values())

    @pytest.mark.parametrize(
        "invalid_vars",
        [
            {"CONFLUENCE_URL": "", "JIRA_URL": ""},  # Empty strings
            {"confluence_url": "https://test.com"},  # Wrong case
        ],
    )
    def test_invalid_environment_variables(self, invalid_vars, caplog):
        """Test behavior with invalid environment variables."""
        with MockEnvironment.clean_env():
            for key, value in invalid_vars.items():
                import os

                os.environ[key] = value

            result = get_available_services()
            _assert_service_availability(
                result, confluence_expected=False, jira_expected=False
            )
            _assert_authentication_logs(
                caplog, "not_configured", ["confluence", "jira"]
            )

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_comments.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira Comments mixin."""

from unittest.mock import MagicMock, Mock

import pytest

from mcp_atlassian.jira.comments import CommentsMixin


class TestCommentsMixin:
    """Tests for the CommentsMixin class."""

    @pytest.fixture
    def comments_mixin(self, jira_client):
        """Create a CommentsMixin instance with mocked dependencies."""
        mixin = CommentsMixin(config=jira_client.config)
        mixin.jira = jira_client.jira

        # Set up a mock preprocessor with markdown_to_jira method
        mixin.preprocessor = Mock()
        mixin.preprocessor.markdown_to_jira = Mock(
            return_value="*This* is _Jira_ formatted"
        )

        # Mock the clean_text method
        mixin._clean_text = Mock(side_effect=lambda x: x)

        return mixin

    def test_get_issue_comments_basic(self, comments_mixin):
        """Test get_issue_comments with basic data."""
        # Setup mock response
        comments_mixin.jira.issue_get_comments.return_value = {
            "comments": [
                {
                    "id": "10001",
                    "body": "This is a comment",
                    "created": "2024-01-01T10:00:00.000+0000",
                    "updated": "2024-01-01T11:00:00.000+0000",
                    "author": {"displayName": "John Doe"},
                }
            ]
        }

        # Call the method
        result = comments_mixin.get_issue_comments("TEST-123")

        # Verify
        comments_mixin.jira.issue_get_comments.assert_called_once_with("TEST-123")
        assert len(result) == 1
        assert result[0]["id"] == "10001"
        assert result[0]["body"] == "This is a comment"
        assert result[0]["created"] == "2024-01-01 10:00:00+00:00"  # Parsed date
        assert result[0]["author"] == "John Doe"

    def test_get_issue_comments_with_limit(self, comments_mixin):
        """Test get_issue_comments with limit parameter."""
        # Setup mock response with multiple comments
        comments_mixin.jira.issue_get_comments.return_value = {
            "comments": [
                {
                    "id": "10001",
                    "body": "First comment",
                    "created": "2024-01-01T10:00:00.000+0000",
                    "author": {"displayName": "John Doe"},
                },
                {
                    "id": "10002",
                    "body": "Second comment",
                    "created": "2024-01-02T10:00:00.000+0000",
                    "author": {"displayName": "Jane Smith"},
                },
                {
                    "id": "10003",
                    "body": "Third comment",
                    "created": "2024-01-03T10:00:00.000+0000",
                    "author": {"displayName": "Bob Johnson"},
                },
            ]
        }

        # Call the method with limit=2
        result = comments_mixin.get_issue_comments("TEST-123", limit=2)

        # Verify
        comments_mixin.jira.issue_get_comments.assert_called_once_with("TEST-123")
        assert len(result) == 2  # Only 2 comments should be returned
        assert result[0]["id"] == "10001"
        assert result[1]["id"] == "10002"
        # Third comment shouldn't be included due to limit

    def test_get_issue_comments_with_missing_fields(self, comments_mixin):
        """Test get_issue_comments with missing fields in the response."""
        # Setup mock response with missing fields
        comments_mixin.jira.issue_get_comments.return_value = {
            "comments": [
                {
                    "id": "10001",
                    # Missing body field
                    "created": "2024-01-01T10:00:00.000+0000",
                    # Missing author field
                },
                {
                    # Missing id field
                    "body": "Second comment",
                    # Missing created field
                    "author": {},  # Empty author object
                },
                {
                    "id": "10003",
                    "body": "Third comment",
                    "created": "2024-01-03T10:00:00.000+0000",
                    "author": {"name": "user123"},  # Using name instead of displayName
                },
            ]
        }

        # Call the method
        result = comments_mixin.get_issue_comments("TEST-123")

        # Verify
        assert len(result) == 3
        assert result[0]["id"] == "10001"
        assert result[0]["body"] == ""  # Should default to empty string
        assert result[0]["author"] == "Unknown"  # Should default to Unknown

        assert (
            "id" not in result[1] or not result[1]["id"]
        )  # Should be missing or empty
        assert result[1]["author"] == "Unknown"  # Should default to Unknown

        assert (
            result[2]["author"] == "Unknown"
        )  # Should use Unknown when only name is available

    def test_get_issue_comments_with_empty_response(self, comments_mixin):
        """Test get_issue_comments with an empty response."""
        # Setup mock response with no comments
        comments_mixin.jira.issue_get_comments.return_value = {"comments": []}

        # Call the method
        result = comments_mixin.get_issue_comments("TEST-123")

        # Verify
        assert len(result) == 0  # Should return an empty list

    def test_get_issue_comments_with_error(self, comments_mixin):
        """Test get_issue_comments with an error response."""
        # Setup mock to raise exception
        comments_mixin.jira.issue_get_comments.side_effect = Exception("API Error")

        # Verify it raises the wrapped exception
        with pytest.raises(Exception, match="Error getting comments"):
            comments_mixin.get_issue_comments("TEST-123")

    def test_add_comment_basic(self, comments_mixin):
        """Test add_comment with basic data."""
        # Setup mock response
        comments_mixin.jira.issue_add_comment.return_value = {
            "id": "10001",
            "body": "This is a comment",
            "created": "2024-01-01T10:00:00.000+0000",
            "author": {"displayName": "John Doe"},
        }

        # Call the method
        result = comments_mixin.add_comment("TEST-123", "Test comment")

        # Verify
        comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
            "Test comment"
        )
        comments_mixin.jira.issue_add_comment.assert_called_once_with(
            "TEST-123", "*This* is _Jira_ formatted"
        )
        assert result["id"] == "10001"
        assert result["body"] == "This is a comment"
        assert result["created"] == "2024-01-01 10:00:00+00:00"  # Parsed date
        assert result["author"] == "John Doe"

    def test_add_comment_with_markdown_conversion(self, comments_mixin):
        """Test add_comment with markdown conversion."""
        # Setup mock response
        comments_mixin.jira.issue_add_comment.return_value = {
            "id": "10001",
            "body": "*This* is _Jira_ formatted",
            "created": "2024-01-01T10:00:00.000+0000",
            "author": {"displayName": "John Doe"},
        }

        # Create a complex markdown comment
        markdown_comment = """
        # Heading 1

        This is a paragraph with **bold** and *italic* text.

        - List item 1
        - List item 2

        ```python
        def hello():
            print("Hello world")
        ```
        """

        # Call the method
        result = comments_mixin.add_comment("TEST-123", markdown_comment)

        # Verify
        comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
            markdown_comment
        )
        comments_mixin.jira.issue_add_comment.assert_called_once_with(
            "TEST-123", "*This* is _Jira_ formatted"
        )
        assert result["body"] == "*This* is _Jira_ formatted"

    def test_add_comment_with_empty_comment(self, comments_mixin):
        """Test add_comment with an empty comment."""
        # Setup mock response
        comments_mixin.jira.issue_add_comment.return_value = {
            "id": "10001",
            "body": "",
            "created": "2024-01-01T10:00:00.000+0000",
            "author": {"displayName": "John Doe"},
        }

        # Call the method with empty comment
        result = comments_mixin.add_comment("TEST-123", "")

        # Verify - for empty comments, markdown_to_jira should NOT be called as per implementation
        comments_mixin.preprocessor.markdown_to_jira.assert_not_called()
        comments_mixin.jira.issue_add_comment.assert_called_once_with("TEST-123", "")
        assert result["body"] == ""

    def test_add_comment_with_error(self, comments_mixin):
        """Test add_comment with an error response."""
        # Setup mock to raise exception
        comments_mixin.jira.issue_add_comment.side_effect = Exception("API Error")

        # Verify it raises the wrapped exception
        with pytest.raises(Exception, match="Error adding comment"):
            comments_mixin.add_comment("TEST-123", "Test comment")

    def test_markdown_to_jira(self, comments_mixin):
        """Test markdown to Jira conversion."""
        # Setup - need to replace the mock entirely
        comments_mixin.preprocessor.markdown_to_jira = MagicMock(
            return_value="Jira text"
        )

        # Call the method
        result = comments_mixin._markdown_to_jira("Markdown text")

        # Verify
        assert result == "Jira text"
        comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
            "Markdown text"
        )

    def test_markdown_to_jira_with_empty_text(self, comments_mixin):
        """Test markdown to Jira conversion with empty text."""
        # Call the method with empty text
        result = comments_mixin._markdown_to_jira("")

        # Verify
        assert result == ""
        # The preprocessor should not be called with empty text
        comments_mixin.preprocessor.markdown_to_jira.assert_not_called()

```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
"""
Root pytest configuration file for MCP Atlassian tests.

This module provides session-scoped fixtures and utilities that are shared
across all test modules. It integrates with the new test utilities framework
to provide efficient, reusable test fixtures.
"""

import pytest

from tests.utils.factories import (
    AuthConfigFactory,
    ConfluencePageFactory,
    ErrorResponseFactory,
    JiraIssueFactory,
)
from tests.utils.mocks import MockAtlassianClient, MockEnvironment


def pytest_addoption(parser):
    """Add command-line options for tests."""
    parser.addoption(
        "--use-real-data",
        action="store_true",
        default=False,
        help="Run tests that use real API data (requires env vars)",
    )


# ============================================================================
# Session-Scoped Configuration Fixtures
# ============================================================================


@pytest.fixture(scope="session")
def session_auth_configs():
    """
    Session-scoped fixture providing authentication configuration templates.

    This fixture is computed once per test session and provides standard
    authentication configurations for OAuth and basic auth scenarios.

    Returns:
        Dict[str, Dict[str, str]]: Authentication configuration templates
    """
    return {
        "oauth": AuthConfigFactory.create_oauth_config(),
        "basic_auth": AuthConfigFactory.create_basic_auth_config(),
        "jira_basic": {
            "url": "https://test.atlassian.net",
            "username": "[email protected]",
            "api_token": "test-jira-token",
        },
        "confluence_basic": {
            "url": "https://test.atlassian.net/wiki",
            "username": "[email protected]",
            "api_token": "test-confluence-token",
        },
    }


@pytest.fixture(scope="session")
def session_mock_data():
    """
    Session-scoped fixture providing mock data templates.

    This fixture creates mock data templates once per session to avoid
    recreating expensive mock objects for every test.

    Returns:
        Dict[str, Any]: Mock data templates for various API responses
    """
    return {
        "jira_issue": JiraIssueFactory.create(),
        "jira_issue_minimal": JiraIssueFactory.create_minimal(),
        "confluence_page": ConfluencePageFactory.create(),
        "api_error": ErrorResponseFactory.create_api_error(),
        "auth_error": ErrorResponseFactory.create_auth_error(),
        "jira_search_results": {
            "issues": [
                JiraIssueFactory.create("TEST-1"),
                JiraIssueFactory.create("TEST-2"),
                JiraIssueFactory.create("TEST-3"),
            ],
            "total": 3,
            "startAt": 0,
            "maxResults": 50,
        },
    }


# ============================================================================
# Environment and Configuration Fixtures
# ============================================================================


@pytest.fixture
def clean_environment():
    """
    Fixture that provides a clean environment with no auth variables.

    This is useful for testing error conditions and configuration validation.
    """
    with MockEnvironment.clean_env() as env:
        yield env


@pytest.fixture
def oauth_environment():
    """
    Fixture that provides a complete OAuth environment setup.

    This sets up all necessary OAuth environment variables for testing
    OAuth-based authentication flows.
    """
    with MockEnvironment.oauth_env() as env:
        yield env


@pytest.fixture
def basic_auth_environment():
    """
    Fixture that provides basic authentication environment setup.

    This sets up username/token authentication for both Jira and Confluence.
    """
    with MockEnvironment.basic_auth_env() as env:
        yield env


# ============================================================================
# Factory-Based Fixtures
# ============================================================================


@pytest.fixture
def make_jira_issue():
    """
    Factory fixture for creating Jira issues with customizable properties.

    Returns:
        Callable: Factory function that creates Jira issue data

    Example:
        def test_issue_creation(make_jira_issue):
            issue = make_jira_issue(key="CUSTOM-123",
                                  fields={"priority": {"name": "High"}})
            assert issue["key"] == "CUSTOM-123"
    """
    return JiraIssueFactory.create


@pytest.fixture
def make_confluence_page():
    """
    Factory fixture for creating Confluence pages with customizable properties.

    Returns:
        Callable: Factory function that creates Confluence page data

    Example:
        def test_page_creation(make_confluence_page):
            page = make_confluence_page(title="Custom Page",
                                      space={"key": "CUSTOM"})
            assert page["title"] == "Custom Page"
    """
    return ConfluencePageFactory.create


@pytest.fixture
def make_auth_config():
    """
    Factory fixture for creating authentication configurations.

    Returns:
        Dict[str, Callable]: Factory functions for different auth types

    Example:
        def test_oauth_config(make_auth_config):
            config = make_auth_config["oauth"](client_id="custom-id")
            assert config["client_id"] == "custom-id"
    """
    return {
        "oauth": AuthConfigFactory.create_oauth_config,
        "basic": AuthConfigFactory.create_basic_auth_config,
    }


@pytest.fixture
def make_api_error():
    """
    Factory fixture for creating API error responses.

    Returns:
        Callable: Factory function that creates error response data

    Example:
        def test_error_handling(make_api_error):
            error = make_api_error(status_code=404, message="Not Found")
            assert error["status"] == 404
    """
    return ErrorResponseFactory.create_api_error


# ============================================================================
# Mock Client Fixtures
# ============================================================================


@pytest.fixture
def mock_jira_client():
    """
    Fixture providing a pre-configured mock Jira client.

    The client comes with sensible defaults for common operations
    but can be customized per test as needed.

    Returns:
        MagicMock: Configured mock Jira client
    """
    return MockAtlassianClient.create_jira_client()


@pytest.fixture
def mock_confluence_client():
    """
    Fixture providing a pre-configured mock Confluence client.

    The client comes with sensible defaults for common operations
    but can be customized per test as needed.

    Returns:
        MagicMock: Configured mock Confluence client
    """
    return MockAtlassianClient.create_confluence_client()


# ============================================================================
# Compatibility Fixtures (maintain backward compatibility)
# ============================================================================


@pytest.fixture
def use_real_jira_data(request):
    """
    Check if real Jira data tests should be run.

    This will be True if the --use-real-data flag is passed to pytest.

    Note: This fixture is maintained for backward compatibility.
    """
    return request.config.getoption("--use-real-data")


@pytest.fixture
def use_real_confluence_data(request):
    """
    Check if real Confluence data tests should be run.

    This will be True if the --use-real-data flag is passed to pytest.

    Note: This fixture is maintained for backward compatibility.
    """
    return request.config.getoption("--use-real-data")


# ============================================================================
# Advanced Environment Utilities
# ============================================================================


@pytest.fixture
def env_var_manager():
    """
    Fixture providing utilities for managing environment variables in tests.

    Returns:
        MockEnvironment: Environment management utilities

    Example:
        def test_with_custom_env(env_var_manager):
            with env_var_manager.oauth_env():
                # Test OAuth functionality
                pass
    """
    return MockEnvironment


@pytest.fixture
def parametrized_auth_env(request):
    """
    Parametrized fixture for testing with different authentication environments.

    This fixture can be used with pytest.mark.parametrize to test the same
    functionality with different authentication setups.

    Example:
        @pytest.mark.parametrize("parametrized_auth_env",
                               ["oauth", "basic_auth"], indirect=True)
        def test_auth_scenarios(parametrized_auth_env):
            # Test will run once for OAuth and once for basic auth
            pass
    """
    auth_type = request.param

    if auth_type == "oauth":
        with MockEnvironment.oauth_env() as env:
            yield env
    elif auth_type == "basic_auth":
        with MockEnvironment.basic_auth_env() as env:
            yield env
    elif auth_type == "clean":
        with MockEnvironment.clean_env() as env:
            yield env
    else:
        raise ValueError(f"Unknown auth type: {auth_type}")


# ============================================================================
# Session Validation and Health Checks
# ============================================================================


@pytest.fixture(scope="session", autouse=True)
def validate_test_environment():
    """
    Session-scoped fixture that validates the test environment setup.

    This fixture runs automatically and ensures that the test environment
    is properly configured for running the test suite.
    """
    # Validate that test utilities are importable
    try:
        import importlib.util

        # Check if modules can be imported
        for module_name in [
            "tests.fixtures.confluence_mocks",
            "tests.fixtures.jira_mocks",
            "tests.utils.base",
            "tests.utils.factories",
            "tests.utils.mocks",
        ]:
            spec = importlib.util.find_spec(module_name)
            if spec is None:
                pytest.fail(f"Failed to find module: {module_name}")
    except ImportError as e:
        pytest.fail(f"Failed to import test utilities: {e}")

    # Log session start
    print("\n🧪 Starting MCP Atlassian test session with enhanced fixtures")

    yield

    # Log session end
    print("\n✅ Completed MCP Atlassian test session")

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/formatting.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira content formatting utilities."""

import html
import logging
import re
from typing import Any

from ..preprocessing.jira import JiraPreprocessor
from .client import JiraClient
from .protocols import (
    EpicOperationsProto,
    FieldsOperationsProto,
    IssueOperationsProto,
    UsersOperationsProto,
)

logger = logging.getLogger("mcp-jira")


class FormattingMixin(
    JiraClient,
    EpicOperationsProto,
    FieldsOperationsProto,
    IssueOperationsProto,
    UsersOperationsProto,
):
    """Mixin for Jira content formatting operations.

    This mixin provides utilities for converting between different formats,
    formatting issue content for display, parsing dates, and sanitizing content.
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the FormattingMixin.

        Args:
            *args: Positional arguments for the JiraClient
            **kwargs: Keyword arguments for the JiraClient
        """
        super().__init__(*args, **kwargs)

        # Use the JiraPreprocessor with the base URL from the client
        base_url = ""
        if hasattr(self, "config") and hasattr(self.config, "url"):
            base_url = self.config.url
        self.preprocessor = JiraPreprocessor(base_url=base_url)

    def markdown_to_jira(self, markdown_text: str) -> str:
        """
        Convert Markdown syntax to Jira markup syntax.

        This method uses the TextPreprocessor implementation for consistent
        conversion between Markdown and Jira markup.

        Args:
            markdown_text: Text in Markdown format

        Returns:
            Text in Jira markup format
        """
        if not markdown_text:
            return ""

        try:
            # Use the existing preprocessor
            return self.preprocessor.markdown_to_jira(markdown_text)

        except Exception as e:
            logger.warning(f"Error converting markdown to Jira format: {str(e)}")
            # Return the original text if conversion fails
            return markdown_text

    def format_issue_content(
        self,
        issue_key: str,
        issue: dict[str, Any],
        description: str,
        comments: list[dict[str, Any]],
        created_date: str,
        epic_info: dict[str, str | None],
    ) -> str:
        """
        Format the issue content for display.

        Args:
            issue_key: The issue key
            issue: The issue data from Jira
            description: Processed description text
            comments: List of comment dictionaries
            created_date: Formatted created date
            epic_info: Dictionary with epic_key and epic_name

        Returns:
            Formatted content string
        """
        # Basic issue information
        content = f"""Issue: {issue_key}
Title: {issue["fields"].get("summary", "")}
Type: {issue["fields"]["issuetype"]["name"]}
Status: {issue["fields"]["status"]["name"]}
Created: {created_date}
"""

        # Add Epic information if available
        if epic_info.get("epic_key"):
            content += f"Epic: {epic_info['epic_key']}"
            if epic_info.get("epic_name"):
                content += f" - {epic_info['epic_name']}"
            content += "\n"

        content += f"""
Description:
{description}
"""
        # Add comments if present
        if comments:
            content += "\nComments:\n" + "\n".join(
                [f"{c['created']} - {c['author']}: {c['body']}" for c in comments]
            )

        return content

    def create_issue_metadata(
        self,
        issue_key: str,
        issue: dict[str, Any],
        comments: list[dict[str, Any]],
        created_date: str,
        epic_info: dict[str, str | None],
    ) -> dict[str, Any]:
        """
        Create metadata for the issue document.

        Args:
            issue_key: The issue key
            issue: The issue data from Jira
            comments: List of comment dictionaries
            created_date: Formatted created date
            epic_info: Dictionary with epic_key and epic_name

        Returns:
            Metadata dictionary
        """
        # Extract fields
        fields = issue.get("fields", {})

        # Basic metadata
        metadata = {
            "key": issue_key,
            "summary": fields.get("summary", ""),
            "type": fields.get("issuetype", {}).get("name", ""),
            "status": fields.get("status", {}).get("name", ""),
            "created": created_date,
            "source": "jira",
        }

        # Add assignee if present
        if fields.get("assignee"):
            metadata["assignee"] = fields["assignee"].get(
                "displayName", fields["assignee"].get("name", "")
            )

        # Add reporter if present
        if fields.get("reporter"):
            metadata["reporter"] = fields["reporter"].get(
                "displayName", fields["reporter"].get("name", "")
            )

        # Add priority if present
        if fields.get("priority"):
            metadata["priority"] = fields["priority"].get("name", "")

        # Add Epic information to metadata if available
        if epic_info.get("epic_key"):
            metadata["epic_key"] = epic_info["epic_key"]
            if epic_info.get("epic_name"):
                metadata["epic_name"] = epic_info["epic_name"]

        # Add project information
        if fields.get("project"):
            metadata["project"] = fields["project"].get("key", "")
            metadata["project_name"] = fields["project"].get("name", "")

        # Add comment count
        metadata["comment_count"] = len(comments)

        return metadata

    def extract_epic_information(
        self, issue: dict[str, Any]
    ) -> dict[str, None] | dict[str, str]:
        """
        Extract epic information from issue data.

        Args:
            issue: Issue data dictionary

        Returns:
            Dictionary containing epic_key and epic_name (or None if not found)
        """
        epic_info = {"epic_key": None, "epic_name": None}

        # Check if the issue has fields
        if "fields" not in issue:
            return epic_info

        fields = issue["fields"]

        # Try to get the epic link from issue
        # (requires the correct field ID which varies across instances)
        # Use the field discovery mechanism if available
        try:
            field_ids = self.get_field_ids_to_epic()

            # Get the epic link field ID
            epic_link_field = field_ids.get("Epic Link")
            if (
                epic_link_field
                and epic_link_field in fields
                and fields[epic_link_field]
            ):
                epic_info["epic_key"] = fields[epic_link_field]

                # If the issue is linked to an epic, try to get the epic name
                if epic_info["epic_key"] and hasattr(self, "get_issue"):
                    try:
                        epic_issue = self.get_issue(epic_info["epic_key"])
                        epic_fields = epic_issue.get("fields", {})

                        # Get the epic name field ID
                        epic_name_field = field_ids.get("Epic Name")
                        if epic_name_field and epic_name_field in epic_fields:
                            epic_info["epic_name"] = epic_fields[epic_name_field]

                    except Exception as e:
                        logger.warning(f"Error getting epic details: {str(e)}")

        except Exception as e:
            logger.warning(f"Error extracting epic information: {str(e)}")

        return epic_info

    def sanitize_html(self, html_content: str) -> str:
        """
        Sanitize HTML content by removing HTML tags.

        Args:
            html_content: HTML content to sanitize

        Returns:
            Plaintext content with HTML tags removed
        """
        if not html_content:
            return ""

        try:
            # Remove HTML tags
            plain_text = re.sub(r"<[^>]+>", "", html_content)
            # Decode HTML entities
            plain_text = html.unescape(plain_text)
            # Normalize whitespace
            plain_text = re.sub(r"\s+", " ", plain_text).strip()

            return plain_text

        except Exception as e:
            logger.warning(f"Error sanitizing HTML: {str(e)}")
            return html_content

    def sanitize_transition_fields(self, fields: dict[str, Any]) -> dict[str, Any]:
        """
        Sanitize fields to ensure they're valid for the Jira API.

        This is used for transition data to properly format field values.

        Args:
            fields: Dictionary of fields to sanitize

        Returns:
            Dictionary of sanitized fields
        """
        sanitized_fields = {}

        for key, value in fields.items():
            # Skip empty values
            if value is None:
                continue

            # Handle assignee field specially
            if key in ["assignee", "reporter"]:
                # If the value is already a dictionary, use it as is
                if isinstance(value, dict) and "accountId" in value:
                    sanitized_fields[key] = value
                else:
                    # Otherwise, look up the account ID
                    if not isinstance(value, str):
                        logger.warning(f"Invalid assignee value: {value}")
                        continue

                    try:
                        account_id = self._get_account_id(value)
                        if account_id:
                            sanitized_fields[key] = {"accountId": account_id}
                    except Exception as e:
                        logger.warning(
                            f"Error getting account ID for {value}: {str(e)}"
                        )
            # All other fields pass through as is
            else:
                sanitized_fields[key] = value

        return sanitized_fields

    def add_comment_to_transition_data(
        self, transition_data: dict[str, Any], comment: str | None
    ) -> dict[str, Any]:
        """
        Add a comment to transition data.

        Args:
            transition_data: Transition data dictionary
            comment: Comment text (in Markdown format) or None

        Returns:
            Updated transition data
        """
        if not comment:
            return transition_data

        # Convert markdown to Jira format
        jira_formatted_comment = self.markdown_to_jira(comment)

        # Add the comment to the transition data
        transition_data["update"] = {
            "comment": [{"add": {"body": jira_formatted_comment}}]
        }

        return transition_data

```

--------------------------------------------------------------------------------
/tests/integration/test_transport_lifecycle.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for transport lifecycle behavior.

These tests ensure that:
1. No stdin monitoring is used (preventing issues #519 and #524)
2. Stdio transport doesn't conflict with MCP server's internal stdio handling
3. All transports use direct execution
4. Docker scenarios work correctly
"""

import asyncio
from io import StringIO
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from mcp_atlassian import main
from mcp_atlassian.utils.lifecycle import _shutdown_event


@pytest.mark.integration
class TestTransportLifecycleBehavior:
    """Test transport lifecycle behavior to prevent regression of issues #519 and #524."""

    def setup_method(self):
        """Reset state before each test."""
        _shutdown_event.clear()

    def test_all_transports_use_direct_execution(self):
        """Verify all transports use direct execution without stdin monitoring.

        This is a regression test to ensure stdin monitoring is never reintroduced,
        which caused both issue #519 (stdio conflicts) and #524 (HTTP session termination).
        """
        transports_to_test = ["stdio", "sse", "streamable-http"]

        for transport in transports_to_test:
            with patch("asyncio.run") as mock_asyncio_run:
                with patch.dict("os.environ", {"TRANSPORT": transport}, clear=False):
                    with (
                        patch(
                            "mcp_atlassian.servers.main.AtlassianMCP"
                        ) as mock_server_class,
                        patch("click.core.Context") as mock_click_ctx,
                    ):
                        # Setup mocks
                        mock_server = MagicMock()
                        mock_server.run_async = AsyncMock()
                        mock_server_class.return_value = mock_server

                        # Mock CLI context
                        mock_ctx_instance = MagicMock()
                        mock_ctx_instance.obj = {
                            "transport": transport,
                            "port": None,
                            "host": None,
                            "path": None,
                        }
                        mock_click_ctx.return_value = mock_ctx_instance

                        # Execute main
                        with patch("sys.argv", ["mcp-atlassian"]):
                            try:
                                main()
                            except SystemExit:
                                pass

                        # Verify direct execution for all transports
                        assert mock_asyncio_run.called, (
                            f"asyncio.run not called for {transport}"
                        )
                        called_coro = mock_asyncio_run.call_args[0][0]

                        # Ensure NO stdin monitoring wrapper is used
                        coro_str = str(called_coro)
                        assert "run_with_stdio_monitoring" not in coro_str, (
                            f"{transport} should not use stdin monitoring"
                        )
                        assert "run_async" in coro_str or hasattr(
                            called_coro, "cr_code"
                        ), f"{transport} should use direct run_async execution"

    @pytest.mark.anyio
    async def test_stdio_no_race_condition(self):
        """Test that stdio transport doesn't create race condition with MCP server.

        After the fix, stdin monitoring has been removed completely, so there's
        no possibility of race conditions between components trying to read stdin.
        """
        # Create a mock stdin that tracks reads
        read_count = 0

        class MockStdin:
            def __init__(self):
                self.closed = False
                self._read_lock = asyncio.Lock()

            async def readline(self):
                nonlocal read_count

                async with self._read_lock:
                    if self.closed:
                        raise ValueError("I/O operation on closed file")

                    read_count += 1
                    return b""  # EOF

        mock_stdin = MockStdin()

        # Mock the server coroutine that reads stdin
        async def mock_server_with_stdio(**kwargs):
            """Simulates MCP server reading from stdin."""
            # MCP server would normally read stdin here
            await mock_stdin.readline()
            return "completed"

        # Test direct server execution (current behavior)
        with patch("sys.stdin", mock_stdin):
            # Run server directly without any stdin monitoring
            result = await mock_server_with_stdio()

        # Should only have one read - from the MCP server itself
        assert read_count == 1
        assert result == "completed"

    def test_main_function_transport_logic(self):
        """Test the main function's transport determination logic."""
        test_cases = [
            # (cli_transport, env_transport, expected_final_transport)
            ("stdio", None, "stdio"),
            ("sse", None, "sse"),
            (None, "stdio", "stdio"),
            (None, "sse", "sse"),
            ("stdio", "sse", "stdio"),  # CLI overrides env
        ]

        for cli_transport, env_transport, _expected_transport in test_cases:
            with patch("asyncio.run") as mock_asyncio_run:
                env_vars = {}
                if env_transport:
                    env_vars["TRANSPORT"] = env_transport

                with patch.dict("os.environ", env_vars, clear=False):
                    with (
                        patch(
                            "mcp_atlassian.servers.main.AtlassianMCP"
                        ) as mock_server_class,
                        patch("click.core.Context") as mock_click_ctx,
                    ):
                        # Setup mocks
                        mock_server = MagicMock()
                        mock_server.run_async = AsyncMock()
                        mock_server_class.return_value = mock_server

                        # Mock CLI context
                        mock_ctx_instance = MagicMock()
                        mock_ctx_instance.obj = {
                            "transport": cli_transport,
                            "port": None,
                            "host": None,
                            "path": None,
                        }
                        mock_click_ctx.return_value = mock_ctx_instance

                        # Run main
                        with patch("sys.argv", ["mcp-atlassian"]):
                            try:
                                main()
                            except SystemExit:
                                pass

                        # Verify asyncio.run was called
                        assert mock_asyncio_run.called

                        # All transports now run directly without stdin monitoring
                        called_coro = mock_asyncio_run.call_args[0][0]
                        # Should always call run_async directly
                        assert hasattr(called_coro, "cr_code") or "run_async" in str(
                            called_coro
                        )

    @pytest.mark.anyio
    async def test_shutdown_event_handling(self):
        """Test that shutdown events are handled correctly for all transports."""
        # Pre-set shutdown event
        _shutdown_event.set()

        async def mock_server(**kwargs):
            # Should run even with shutdown event set
            return "completed"

        # Server runs directly now
        result = await mock_server()

        # Server should complete normally
        assert result == "completed"

    def test_docker_stdio_scenario(self):
        """Test the specific Docker stdio scenario that caused the bug.

        This simulates running in Docker with -i flag where stdin is available
        but both components trying to read it causes conflicts.
        """
        with patch("asyncio.run") as mock_asyncio_run:
            # Simulate Docker environment variables
            docker_env = {
                "TRANSPORT": "stdio",
                "JIRA_URL": "https://example.atlassian.net",
                "JIRA_USERNAME": "[email protected]",
                "JIRA_API_TOKEN": "token",
            }

            with patch.dict("os.environ", docker_env, clear=False):
                with (
                    patch(
                        "mcp_atlassian.servers.main.AtlassianMCP"
                    ) as mock_server_class,
                    patch("sys.stdin", StringIO()),  # Simulate available stdin
                ):
                    # Setup mock server
                    mock_server = MagicMock()
                    mock_server.run_async = AsyncMock()
                    mock_server_class.return_value = mock_server

                    # Simulate Docker container startup
                    with patch("sys.argv", ["mcp-atlassian"]):
                        try:
                            main()
                        except SystemExit:
                            pass

                    # Verify stdio transport doesn't use lifecycle monitoring
                    assert mock_asyncio_run.called
                    called_coro = mock_asyncio_run.call_args[0][0]

                    # All transports now use run_async directly
                    assert hasattr(called_coro, "cr_code") or "run_async" in str(
                        called_coro
                    )


@pytest.mark.integration
class TestRegressionPrevention:
    """Tests to prevent regression of specific issues."""

    def test_no_stdin_monitoring_in_codebase(self):
        """Ensure stdin monitoring is not reintroduced in the codebase.

        This is a safeguard against reintroducing the flawed stdin monitoring
        that caused issues #519 and #524.
        """
        # Check that the problematic function doesn't exist
        from mcp_atlassian.utils import lifecycle

        assert not hasattr(lifecycle, "run_with_stdio_monitoring"), (
            "run_with_stdio_monitoring should not exist in lifecycle module"
        )

    def test_signal_handlers_are_setup(self):
        """Verify signal handlers are properly configured."""
        with patch("mcp_atlassian.setup_signal_handlers") as mock_setup:
            with patch("asyncio.run"):
                with patch("mcp_atlassian.servers.main.AtlassianMCP"):
                    with patch("sys.argv", ["mcp-atlassian"]):
                        try:
                            main()
                        except SystemExit:
                            pass

                    # Signal handlers should always be set up
                    mock_setup.assert_called_once()

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_sprints.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira SprintMixin"""

from unittest.mock import MagicMock

import pytest
import requests

from mcp_atlassian.jira import JiraConfig
from mcp_atlassian.jira.sprints import SprintsMixin
from mcp_atlassian.models.jira import JiraSprint


@pytest.fixture
def mock_config():
    """Fixture to create a mock JiraConfig instance."""
    config = MagicMock(spec=JiraConfig)
    config.url = "https://test.atlassian.net"
    config.username = "[email protected]"
    config.api_token = "test-token"
    config.auth_type = "pat"
    return config


@pytest.fixture
def sprints_mixin(mock_config):
    """Fixture to create a SprintsMixin instance for testing."""
    mixin = SprintsMixin(config=mock_config)
    mixin.jira = MagicMock()

    return mixin


@pytest.fixture
def mock_sprints():
    """Fixture to return mock boards data."""
    return {
        "maxResults": 50,
        "startAt": 0,
        "isLast": True,
        "values": [
            {
                "id": 10000,
                "self": "https://test.atlassian.net/rest/agile/1.0/sprint/10000",
                "state": "closed",
                "name": "Sprint 0",
                "startDate": "2099-05-06T02:00:00.000Z",
                "endDate": "2100-05-17T10:00:00.000Z",
                "completeDate": "2024-05-20T05:17:24.302Z",
                "activatedDate": "2024-05-07T01:22:45.128Z",
                "originBoardId": 1000,
                "goal": "",
                "synced": False,
                "autoStartStop": False,
            },
            {
                "id": 10001,
                "self": "https://test.atlassian.net/rest/agile/1.0/sprint/10001",
                "state": "active",
                "name": "Sprint 1",
                "startDate": "2099-03-24T06:13:00.000Z",
                "endDate": "2100-04-07T06:13:00.000Z",
                "activatedDate": "2025-03-24T06:13:20.729Z",
                "originBoardId": 1000,
                "goal": "",
                "synced": False,
                "autoStartStop": False,
            },
            {
                "id": 10002,
                "self": "https://test.atlassian.net/rest/agile/1.0/sprint/10002",
                "state": "future",
                "name": "Sprint 2",
                "originBoardId": 1000,
                "synced": False,
                "autoStartStop": False,
            },
        ],
    }


def test_get_all_sprints_from_board(sprints_mixin, mock_sprints):
    """Test get_all_sprints_from_board method."""
    sprints_mixin.jira.get_all_sprints_from_board.return_value = mock_sprints

    result = sprints_mixin.get_all_sprints_from_board("1000")
    assert result == mock_sprints["values"]


def test_get_all_sprints_from_board_exception(sprints_mixin):
    """Test get_all_sprints_from_board method with exception."""
    sprints_mixin.jira.get_all_sprints_from_board.side_effect = Exception("API Error")

    result = sprints_mixin.get_all_sprints_from_board("1000")
    assert result == []
    sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()


def test_get_all_sprints_from_board_http_error(sprints_mixin):
    """Test get_all_sprints_from_board method with HTTPError."""
    sprints_mixin.jira.get_all_sprints_from_board.side_effect = requests.HTTPError(
        response=MagicMock(content="API Error content")
    )

    result = sprints_mixin.get_all_sprints_from_board("1000")
    assert result == []
    sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()


def test_get_all_sprints_from_board_non_dict_response(sprints_mixin):
    """Test get_all_sprints_from_board method with non-list response."""
    sprints_mixin.jira.get_all_sprints_from_board.return_value = "not a dict"

    result = sprints_mixin.get_all_sprints_from_board("1000")
    assert result == []
    sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()


def test_get_all_sprints_from_board_model(sprints_mixin, mock_sprints):
    sprints_mixin.jira.get_all_sprints_from_board.return_value = mock_sprints

    result = sprints_mixin.get_all_sprints_from_board_model(board_id="1000", state=None)
    assert result == [
        JiraSprint.from_api_response(value) for value in mock_sprints["values"]
    ]


def test_create_sprint(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]

    result = sprints_mixin.create_sprint(
        sprint_name="Sprint 1",
        board_id="10001",
        start_date="2099-05-01T00:00:00.000Z",
        end_date="2100-05-01T00:00:00.000Z",
        goal="Your goal",
    )
    assert result == JiraSprint.from_api_response(mock_sprints["values"][1])


def test_create_sprint_http_exception(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.side_effect = requests.HTTPError(
        response=MagicMock(content="API Error content")
    )

    with pytest.raises(requests.HTTPError):
        sprints_mixin.create_sprint(
            sprint_name="Sprint 1",
            board_id="10001",
            start_date="2099-05-01T00:00:00.000Z",
            end_date="2100-05-15T00:00:00.000Z",
            goal="Your goal",
        )


def test_create_sprint_exception(sprints_mixin, mock_sprints):
    """Test create_sprint method throws general Exception."""
    sprints_mixin.jira.create_sprint.side_effect = Exception

    with pytest.raises(Exception):
        sprints_mixin.create_sprint(
            sprint_name="Sprint 1",
            board_id="10001",
            start_date="2099-05-01T00:00:00.000Z",
            end_date="2100-05-15T00:00:00.000Z",
            goal="Your goal",
        )


def test_create_sprint_test_missing_startdate(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]

    with pytest.raises(ValueError) as excinfo:
        sprints_mixin.create_sprint(
            sprint_name="Sprint 1",
            board_id="10001",
            start_date="",
            end_date="2100-05-15T00:00:00.000Z",
            goal="Your goal",
        )
    assert str(excinfo.value) == "Start date is required."


def test_create_sprint_test_invalid_startdate(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]

    with pytest.raises(ValueError):
        sprints_mixin.create_sprint(
            sprint_name="Sprint 1",
            board_id="10001",
            start_date="IAMNOTADATE!",
            end_date="2100-05-15T00:00:00.000Z",
            goal="Your goal",
        )


def test_create_sprint_test_no_enddate(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]

    result = sprints_mixin.create_sprint(
        sprint_name="Sprint 1",
        board_id="10001",
        start_date="2099-05-15T00:00:00.000Z",
        end_date=None,
        goal="Your goal",
    )
    assert result == JiraSprint.from_api_response(mock_sprints["values"][1])


def test_create_sprint_test_invalid_enddate(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]

    with pytest.raises(ValueError):
        sprints_mixin.create_sprint(
            sprint_name="Sprint 1",
            board_id="10001",
            start_date="2099-05-15T00:00:00.000Z",
            end_date="IAMNOTADATE!",
            goal="Your goal",
        )


def test_create_sprint_test_startdate_after_enddate(sprints_mixin, mock_sprints):
    """Test create_sprint method."""
    sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]

    with pytest.raises(ValueError, match="Start date must be before end date."):
        sprints_mixin.create_sprint(
            sprint_name="Sprint 1",
            board_id="10001",
            start_date="2100-05-15T00:00:00.000Z",
            end_date="2099-05-15T00:00:00.000Z",
            goal="Your goal",
        )


def test_update_sprint_success(sprints_mixin, mock_sprints):
    """Test update_sprint method with valid data."""
    mock_updated_sprint = mock_sprints["values"][0]
    sprints_mixin.jira.update_partially_sprint.return_value = mock_updated_sprint

    result = sprints_mixin.update_sprint(
        sprint_id="10000",
        sprint_name="Updated Sprint Name",
        state="active",
        start_date="2024-05-01T00:00:00.000Z",
        end_date="2024-05-15T00:00:00.000Z",
        goal="Updated goal",
    )

    assert result == JiraSprint.from_api_response(mock_updated_sprint)
    sprints_mixin.jira.update_partially_sprint.assert_called_once_with(
        sprint_id="10000",
        data={
            "name": "Updated Sprint Name",
            "state": "active",
            "startDate": "2024-05-01T00:00:00.000Z",
            "endDate": "2024-05-15T00:00:00.000Z",
            "goal": "Updated goal",
        },
    )


def test_update_sprint_invalid_state(sprints_mixin):
    """Test update_sprint method with invalid state."""
    result = sprints_mixin.update_sprint(
        sprint_id="10000",
        sprint_name="Updated Sprint Name",
        state="invalid_state",
        start_date=None,
        end_date=None,
        goal=None,
    )

    assert result is None
    sprints_mixin.jira.update_partially_sprint.assert_not_called()


def test_update_sprint_missing_sprint_id(sprints_mixin):
    """Test update_sprint method with missing sprint_id."""
    result = sprints_mixin.update_sprint(
        sprint_id=None,
        sprint_name="Updated Sprint Name",
        state="active",
        start_date=None,
        end_date=None,
        goal=None,
    )

    assert result is None
    sprints_mixin.jira.update_partially_sprint.assert_not_called()


def test_update_sprint_http_error(sprints_mixin):
    """Test update_sprint method with HTTPError."""
    sprints_mixin.jira.update_partially_sprint.side_effect = requests.HTTPError(
        response=MagicMock(content="API Error content")
    )

    result = sprints_mixin.update_sprint(
        sprint_id="10000",
        sprint_name="Updated Sprint Name",
        state="active",
        start_date=None,
        end_date=None,
        goal=None,
    )

    assert result is None
    sprints_mixin.jira.update_partially_sprint.assert_called_once()


def test_update_sprint_exception(sprints_mixin):
    """Test update_sprint method with a generic exception."""
    sprints_mixin.jira.update_partially_sprint.side_effect = Exception(
        "Unexpected Error"
    )

    result = sprints_mixin.update_sprint(
        sprint_id="10000",
        sprint_name="Updated Sprint Name",
        state="active",
        start_date=None,
        end_date=None,
        goal=None,
    )

    assert result is None
    sprints_mixin.jira.update_partially_sprint.assert_called_once()

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/links.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira issue link operations."""

import logging
from typing import Any

from requests.exceptions import HTTPError

from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraIssueLinkType
from .client import JiraClient

logger = logging.getLogger("mcp-jira")


class LinksMixin(JiraClient):
    """Mixin for Jira issue link operations."""

    def get_issue_link_types(self) -> list[JiraIssueLinkType]:
        """
        Get all available issue link types.

        Returns:
            List of JiraIssueLinkType objects

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API
                (401/403)
            Exception: If there is an error retrieving issue link types
        """
        try:
            link_types_response = self.jira.get("rest/api/2/issueLinkType")
            if not isinstance(link_types_response, dict):
                msg = f"Unexpected return value type from `jira.get`: {type(link_types_response)}"
                logger.error(msg)
                raise TypeError(msg)

            link_types_data = link_types_response.get("issueLinkTypes", [])

            link_types = [
                JiraIssueLinkType.from_api_response(link_type)
                for link_type in link_types_data
            ]

            return link_types

        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API "
                    f"({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
                raise Exception(
                    f"Error getting issue link types: {http_err}"
                ) from http_err
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error getting issue link types: {error_msg}", exc_info=True)
            raise Exception(f"Error getting issue link types: {error_msg}") from e

    def create_issue_link(self, data: dict[str, Any]) -> dict[str, Any]:
        """
        Create a link between two issues.

        Args:
            data: A dictionary containing the link data with the following structure:
                {
                    "type": {"name": "Duplicate" },  # Link type name (e.g., "Duplicate", "Blocks", "Relates to")
                    "inwardIssue": { "key": "ISSUE-1"},  # The issue that is the source of the link
                    "outwardIssue": {"key": "ISSUE-2"},  # The issue that is the target of the link
                    "comment": {  # Optional comment to add to the link
                        "body": "Linked related issue!",
                        "visibility": {  # Optional visibility settings
                            "type": "group",
                            "value": "jira-software-users"
                        }
                    }
                }

        Returns:
            Dictionary with the created link information

        Raises:
            ValueError: If required fields are missing
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            Exception: If there is an error creating the issue link
        """
        # Validate required fields
        if not data.get("type"):
            raise ValueError("Link type is required")
        if not data.get("inwardIssue") or not data["inwardIssue"].get("key"):
            raise ValueError("Inward issue key is required")
        if not data.get("outwardIssue") or not data["outwardIssue"].get("key"):
            raise ValueError("Outward issue key is required")

        try:
            # Create the issue link
            self.jira.create_issue_link(data)

            # Return a response with the link information
            response = {
                "success": True,
                "message": f"Link created between {data['inwardIssue']['key']} and {data['outwardIssue']['key']}",
                "link_type": data["type"]["name"],
                "inward_issue": data["inwardIssue"]["key"],
                "outward_issue": data["outwardIssue"]["key"],
            }

            return response

        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API "
                    f"({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
                raise Exception(f"Error creating issue link: {http_err}") from http_err
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error creating issue link: {error_msg}", exc_info=True)
            raise Exception(f"Error creating issue link: {error_msg}") from e

    def create_remote_issue_link(
        self, issue_key: str, link_data: dict[str, Any]
    ) -> dict[str, Any]:
        """
        Create a remote issue link (web link or Confluence link) for an issue.

        Args:
            issue_key: The key of the issue to add the link to (e.g., 'PROJ-123')
            link_data: A dictionary containing the remote link data with the following structure:
                {
                    "object": {
                        "url": "https://example.com/page",  # The URL to link to
                        "title": "Example Page",  # The title/name of the link
                        "summary": "Optional description of the link",  # Optional description
                        "icon": {  # Optional icon configuration
                            "url16x16": "https://example.com/icon16.png",
                            "title": "Icon Title"
                        }
                    },
                    "relationship": "causes"  # Optional relationship description
                }

        Returns:
            Dictionary with the created remote link information

        Raises:
            ValueError: If required fields are missing
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            Exception: If there is an error creating the remote issue link
        """
        # Validate required fields
        if not issue_key:
            raise ValueError("Issue key is required")
        if not link_data.get("object"):
            raise ValueError("Link object is required")
        if not link_data["object"].get("url"):
            raise ValueError("URL is required in link object")
        if not link_data["object"].get("title"):
            raise ValueError("Title is required in link object")

        try:
            # Create the remote issue link using the Jira API
            endpoint = f"rest/api/3/issue/{issue_key}/remotelink"
            response = self.jira.post(endpoint, json=link_data)

            # Return a response with the link information
            result = {
                "success": True,
                "message": f"Remote link created for issue {issue_key}",
                "issue_key": issue_key,
                "link_title": link_data["object"]["title"],
                "link_url": link_data["object"]["url"],
                "relationship": link_data.get("relationship", ""),
            }

            return result

        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API "
                    f"({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
                raise Exception(
                    f"Error creating remote issue link: {http_err}"
                ) from http_err
        except Exception as e:
            error_msg = str(e)
            logger.error(
                f"Error creating remote issue link: {error_msg}", exc_info=True
            )
            raise Exception(f"Error creating remote issue link: {error_msg}") from e

    def remove_issue_link(self, link_id: str) -> dict[str, Any]:
        """
        Remove a link between two issues.

        Args:
            link_id: The ID of the link to remove

        Returns:
            Dictionary with the result of the operation

        Raises:
            ValueError: If link_id is empty
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            Exception: If there is an error removing the issue link
        """
        # Validate input
        if not link_id:
            raise ValueError("Link ID is required")

        try:
            # Remove the issue link
            self.jira.remove_issue_link(link_id)

            # Return a response indicating success
            response = {
                "success": True,
                "message": f"Link with ID {link_id} has been removed",
                "link_id": link_id,
            }

            return response

        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API "
                    f"({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
                raise Exception(f"Error removing issue link: {http_err}") from http_err
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error removing issue link: {error_msg}", exc_info=True)
            raise Exception(f"Error removing issue link: {error_msg}") from e

```

--------------------------------------------------------------------------------
/tests/unit/servers/test_context.py:
--------------------------------------------------------------------------------

```python
"""Tests for the server context module."""

import pytest

from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.servers.context import MainAppContext


class TestMainAppContext:
    """Tests for the MainAppContext dataclass."""

    def test_initialization_with_defaults(self):
        """Test MainAppContext initialization with default values."""
        context = MainAppContext()

        assert context.full_jira_config is None
        assert context.full_confluence_config is None
        assert context.read_only is False
        assert context.enabled_tools is None

    def test_initialization_with_all_parameters(self):
        """Test MainAppContext initialization with all parameters provided."""
        # Arrange
        jira_config = JiraConfig(
            url="https://example.atlassian.net",
            auth_type="basic",
            username="[email protected]",
            api_token="test_token",
        )
        confluence_config = ConfluenceConfig(
            url="https://example.atlassian.net/wiki",
            auth_type="basic",
            username="[email protected]",
            api_token="test_token",
        )
        enabled_tools = ["jira_get_issue", "confluence_get_page"]

        # Act
        context = MainAppContext(
            full_jira_config=jira_config,
            full_confluence_config=confluence_config,
            read_only=True,
            enabled_tools=enabled_tools,
        )

        # Assert
        assert context.full_jira_config is jira_config
        assert context.full_confluence_config is confluence_config
        assert context.read_only is True
        assert context.enabled_tools == enabled_tools

    def test_initialization_with_partial_parameters(self):
        """Test MainAppContext initialization with some parameters provided."""
        # Arrange
        jira_config = JiraConfig(
            url="https://example.atlassian.net",
            auth_type="pat",
            personal_token="test_personal_token",
        )

        # Act
        context = MainAppContext(full_jira_config=jira_config, read_only=True)

        # Assert
        assert context.full_jira_config is jira_config
        assert context.full_confluence_config is None
        assert context.read_only is True
        assert context.enabled_tools is None

    def test_frozen_dataclass_behavior(self):
        """Test that MainAppContext is frozen and immutable."""
        # Arrange
        context = MainAppContext(read_only=False)

        # Act & Assert - should raise FrozenInstanceError when trying to modify
        with pytest.raises(AttributeError):
            context.read_only = True

        with pytest.raises(AttributeError):
            context.full_jira_config = JiraConfig(
                url="https://test.com",
                auth_type="basic",
                username="test",
                api_token="token",
            )

    def test_type_hint_compliance_jira_config(self):
        """Test type hint compliance for JiraConfig field."""
        # Test with None
        context = MainAppContext(full_jira_config=None)
        assert context.full_jira_config is None

        # Test with valid JiraConfig
        jira_config = JiraConfig(
            url="https://jira.example.com", auth_type="pat", personal_token="test_token"
        )
        context = MainAppContext(full_jira_config=jira_config)
        assert isinstance(context.full_jira_config, JiraConfig)
        assert context.full_jira_config.url == "https://jira.example.com"

    def test_type_hint_compliance_confluence_config(self):
        """Test type hint compliance for ConfluenceConfig field."""
        # Test with None
        context = MainAppContext(full_confluence_config=None)
        assert context.full_confluence_config is None

        # Test with valid ConfluenceConfig
        confluence_config = ConfluenceConfig(
            url="https://confluence.example.com",
            auth_type="pat",
            username="[email protected]",
            api_token="test_token",
        )
        context = MainAppContext(full_confluence_config=confluence_config)
        assert isinstance(context.full_confluence_config, ConfluenceConfig)
        assert context.full_confluence_config.url == "https://confluence.example.com"

    def test_enabled_tools_field_validation(self):
        """Test enabled_tools field validation and default handling."""
        # Test with None (default)
        context = MainAppContext()
        assert context.enabled_tools is None

        # Test with empty list
        context = MainAppContext(enabled_tools=[])
        assert context.enabled_tools == []

        # Test with list of strings
        tools = ["jira_create_issue", "confluence_create_page", "jira_search_issues"]
        context = MainAppContext(enabled_tools=tools)
        assert context.enabled_tools == tools
        assert len(context.enabled_tools) == 3

    def test_read_only_field_validation(self):
        """Test read_only field validation and default handling."""
        # Test default value
        context = MainAppContext()
        assert context.read_only is False
        assert isinstance(context.read_only, bool)

        # Test explicit True
        context = MainAppContext(read_only=True)
        assert context.read_only is True
        assert isinstance(context.read_only, bool)

        # Test explicit False
        context = MainAppContext(read_only=False)
        assert context.read_only is False
        assert isinstance(context.read_only, bool)

    def test_string_representation(self):
        """Test the string representation of MainAppContext."""
        # Test with default values
        context = MainAppContext()
        str_repr = str(context)

        assert "MainAppContext" in str_repr
        assert "full_jira_config=None" in str_repr
        assert "full_confluence_config=None" in str_repr
        assert "read_only=False" in str_repr
        assert "enabled_tools=None" in str_repr

        # Test with values provided
        jira_config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test",
            api_token="token",
        )
        context = MainAppContext(
            full_jira_config=jira_config,
            read_only=True,
            enabled_tools=["tool1", "tool2"],
        )
        str_repr = str(context)

        assert "MainAppContext" in str_repr
        assert "read_only=True" in str_repr
        assert "enabled_tools=['tool1', 'tool2']" in str_repr

    def test_equality_comparison(self):
        """Test equality comparison between MainAppContext instances."""
        # Test identical instances
        context1 = MainAppContext()
        context2 = MainAppContext()
        assert context1 == context2

        # Test instances with same values
        jira_config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test",
            api_token="token",
        )
        context1 = MainAppContext(
            full_jira_config=jira_config, read_only=True, enabled_tools=["tool1"]
        )
        context2 = MainAppContext(
            full_jira_config=jira_config, read_only=True, enabled_tools=["tool1"]
        )
        assert context1 == context2

        # Test instances with different values
        context3 = MainAppContext(read_only=False)
        context4 = MainAppContext(read_only=True)
        assert context3 != context4

        # Test with different configs
        different_jira_config = JiraConfig(
            url="https://different.atlassian.net",
            auth_type="basic",
            username="different",
            api_token="different_token",
        )
        context5 = MainAppContext(full_jira_config=jira_config)
        context6 = MainAppContext(full_jira_config=different_jira_config)
        assert context5 != context6

    def test_hash_behavior(self):
        """Test hash behavior for MainAppContext instances."""
        # Test that instances with only hashable fields (None configs, no enabled_tools) can be hashed
        context1 = MainAppContext(read_only=True)
        context2 = MainAppContext(read_only=True)
        assert hash(context1) == hash(context2)

        # Test instances with different hashable values
        context3 = MainAppContext(read_only=False)
        context4 = MainAppContext(read_only=True)
        contexts_dict = {context3: "value3", context4: "value4"}
        assert len(contexts_dict) == 2

        # Test that instances with unhashable fields raise TypeError
        jira_config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test",
            api_token="token",
        )
        context_with_config = MainAppContext(full_jira_config=jira_config)
        with pytest.raises(TypeError, match="unhashable type"):
            hash(context_with_config)

        # Test that instances with list fields raise TypeError
        context_with_list = MainAppContext(enabled_tools=["tool1", "tool2"])
        with pytest.raises(TypeError, match="unhashable type"):
            hash(context_with_list)

    def test_field_access_edge_cases(self):
        """Test edge cases for field access."""
        # Test accessing fields on empty context
        context = MainAppContext()

        # All fields should be accessible
        assert hasattr(context, "full_jira_config")
        assert hasattr(context, "full_confluence_config")
        assert hasattr(context, "read_only")
        assert hasattr(context, "enabled_tools")

        # Test that we can't access non-existent fields
        assert not hasattr(context, "non_existent_field")

    def test_with_both_configs_different_auth_types(self):
        """Test MainAppContext with both Jira and Confluence configs using different auth types."""
        # Arrange
        jira_config = JiraConfig(
            url="https://company.atlassian.net",
            auth_type="basic",
            username="[email protected]",
            api_token="jira_token",
        )
        confluence_config = ConfluenceConfig(
            url="https://company.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=None,  # Simplified for test
        )

        # Act
        context = MainAppContext(
            full_jira_config=jira_config,
            full_confluence_config=confluence_config,
            read_only=True,
            enabled_tools=[
                "jira_get_issue",
                "confluence_get_page",
                "jira_create_issue",
            ],
        )

        # Assert
        assert context.full_jira_config.auth_type == "basic"
        assert context.full_confluence_config.auth_type == "oauth"
        assert context.read_only is True
        assert len(context.enabled_tools) == 3
        assert "jira_get_issue" in context.enabled_tools
        assert "confluence_get_page" in context.enabled_tools
        assert "jira_create_issue" in context.enabled_tools

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_issues_markdown.py:
--------------------------------------------------------------------------------

```python
"""Tests for markdown conversion in Jira issue operations."""

from unittest.mock import MagicMock, Mock

import pytest

from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.issues import IssuesMixin


class TestIssuesMarkdownConversion:
    """Tests for markdown to Jira conversion in issue operations."""

    @pytest.fixture
    def issues_mixin(self, jira_fetcher: JiraFetcher) -> IssuesMixin:
        """Create an IssuesMixin instance with mocked dependencies."""
        mixin = jira_fetcher

        # Mock the markdown conversion method
        mixin._markdown_to_jira = Mock(side_effect=lambda x: f"[CONVERTED] {x}")

        # Add other mock methods
        mixin._get_account_id = MagicMock(return_value="test-account-id")

        return mixin

    def test_create_issue_converts_markdown_description(
        self, issues_mixin: IssuesMixin
    ):
        """Test that create_issue converts markdown description to Jira format."""
        # Mock create_issue response
        create_response = {"key": "TEST-123"}
        issues_mixin.jira.create_issue.return_value = create_response

        # Mock get_issue response
        issue_data = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {
                "summary": "Test Issue",
                "description": "[CONVERTED] # Markdown Description",
                "status": {"name": "Open"},
                "issuetype": {"name": "Bug"},
            },
        }
        issues_mixin.jira.get_issue.return_value = issue_data

        # Create issue with markdown description
        markdown_description = "# Markdown Description\n\nThis is **bold** text."
        issue = issues_mixin.create_issue(
            project_key="TEST",
            summary="Test Issue",
            issue_type="Bug",
            description=markdown_description,
        )

        # Verify markdown conversion was called
        issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)

        # Verify the converted description was passed to API
        expected_fields = {
            "project": {"key": "TEST"},
            "summary": "Test Issue",
            "issuetype": {"name": "Bug"},
            "description": f"[CONVERTED] {markdown_description}",
        }
        issues_mixin.jira.create_issue.assert_called_once_with(fields=expected_fields)

        # Verify result
        assert issue.key == "TEST-123"

    def test_create_issue_with_empty_description(self, issues_mixin: IssuesMixin):
        """Test that create_issue handles empty description correctly."""
        # Mock create_issue response
        create_response = {"key": "TEST-123"}
        issues_mixin.jira.create_issue.return_value = create_response

        # Mock get_issue response
        issue_data = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {
                "summary": "Test Issue",
                "status": {"name": "Open"},
                "issuetype": {"name": "Bug"},
            },
        }
        issues_mixin.jira.get_issue.return_value = issue_data

        # Create issue without description
        issue = issues_mixin.create_issue(
            project_key="TEST",
            summary="Test Issue",
            issue_type="Bug",
            description="",
        )

        # Verify markdown conversion was not called (empty string)
        issues_mixin._markdown_to_jira.assert_not_called()

        # Verify no description field was added
        call_args = issues_mixin.jira.create_issue.call_args[1]
        assert "description" not in call_args["fields"]

        # Verify result
        assert issue.key == "TEST-123"

    def test_update_issue_converts_markdown_in_fields(self, issues_mixin: IssuesMixin):
        """Test that update_issue converts markdown description when passed in fields dict."""
        # Mock the issue data for get_issue
        issue_data = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {
                "summary": "Updated Issue",
                "description": "[CONVERTED] # Updated Description",
                "status": {"name": "In Progress"},
                "issuetype": {"name": "Bug"},
            },
        }
        issues_mixin.jira.get_issue.return_value = issue_data

        # Update issue with markdown description in fields
        markdown_description = "# Updated Description\n\nThis is *italic* text."
        issue = issues_mixin.update_issue(
            issue_key="TEST-123",
            fields={"description": markdown_description, "summary": "Updated Issue"},
        )

        # Verify markdown conversion was called
        issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)

        # Verify the converted description was passed to API
        issues_mixin.jira.update_issue.assert_called_once_with(
            issue_key="TEST-123",
            update={
                "fields": {
                    "description": f"[CONVERTED] {markdown_description}",
                    "summary": "Updated Issue",
                }
            },
        )

        # Verify result
        assert issue.key == "TEST-123"

    def test_update_issue_converts_markdown_in_kwargs(self, issues_mixin: IssuesMixin):
        """Test that update_issue converts markdown description when passed as kwarg."""
        # Mock the issue data for get_issue
        issue_data = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {
                "summary": "Test Issue",
                "description": "[CONVERTED] ## Updated via kwargs",
                "status": {"name": "In Progress"},
                "issuetype": {"name": "Bug"},
            },
        }
        issues_mixin.jira.get_issue.return_value = issue_data

        # Update issue with markdown description as kwarg
        markdown_description = (
            "## Updated via kwargs\n\nWith a [link](http://example.com)"
        )
        issue = issues_mixin.update_issue(
            issue_key="TEST-123", description=markdown_description
        )

        # Verify markdown conversion was called
        issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)

        # Verify the converted description was passed to API
        issues_mixin.jira.update_issue.assert_called_once_with(
            issue_key="TEST-123",
            update={"fields": {"description": f"[CONVERTED] {markdown_description}"}},
        )

        # Verify result
        assert issue.key == "TEST-123"

    def test_update_issue_with_multiple_fields_including_description(
        self, issues_mixin: IssuesMixin
    ):
        """Test update_issue with multiple fields including description."""
        # Mock the issue data for get_issue
        issue_data = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {
                "summary": "Updated Summary",
                "description": "[CONVERTED] Updated description",
                "status": {"name": "In Progress"},
                "issuetype": {"name": "Bug"},
                "priority": {"name": "High"},
            },
        }
        issues_mixin.jira.get_issue.return_value = issue_data

        # Update issue with multiple fields
        markdown_description = "Updated description with **emphasis**"
        issue = issues_mixin.update_issue(
            issue_key="TEST-123",
            fields={
                "summary": "Updated Summary",
                "priority": {"name": "High"},
            },
            description=markdown_description,  # As kwarg
        )

        # Verify markdown conversion was called
        issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)

        # Verify all fields were updated correctly
        expected_fields = {
            "summary": "Updated Summary",
            "priority": {"name": "High"},
            "description": f"[CONVERTED] {markdown_description}",
        }
        issues_mixin.jira.update_issue.assert_called_once_with(
            issue_key="TEST-123", update={"fields": expected_fields}
        )

        # Verify result
        assert issue.key == "TEST-123"

    def test_markdown_conversion_preserves_none_values(self, issues_mixin: IssuesMixin):
        """Test that None descriptions are not converted."""
        # Reset the mock to check for actual None handling
        issues_mixin._markdown_to_jira = Mock(
            side_effect=lambda x: f"[CONVERTED] {x}" if x else ""
        )

        # Mock create response
        create_response = {"key": "TEST-123"}
        issues_mixin.jira.create_issue.return_value = create_response

        # Mock get_issue response
        issues_mixin.jira.get_issue.return_value = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {"summary": "Test", "issuetype": {"name": "Task"}},
        }

        # Create issue with None description (shouldn't add description field)
        issues_mixin.create_issue(
            project_key="TEST",
            summary="Test Issue",
            issue_type="Task",
            # description not provided (defaults to "")
        )

        # Verify markdown conversion was not called
        issues_mixin._markdown_to_jira.assert_not_called()

        # Verify no description field was added
        call_args = issues_mixin.jira.create_issue.call_args[1]
        assert "description" not in call_args["fields"]

    def test_create_issue_with_markdown_in_additional_fields(
        self, issues_mixin: IssuesMixin
    ):
        """Test that descriptions in additional_fields are NOT converted (edge case)."""
        # Mock field map for additional fields processing
        issues_mixin._generate_field_map = Mock(
            return_value={"mydescription": "customfield_10001"}
        )
        issues_mixin.get_field_by_id = Mock(
            return_value={"name": "MyDescription", "schema": {"type": "string"}}
        )

        # Mock create response
        create_response = {"key": "TEST-123"}
        issues_mixin.jira.create_issue.return_value = create_response
        issues_mixin.jira.get_issue.return_value = {
            "id": "12345",
            "key": "TEST-123",
            "fields": {"summary": "Test", "issuetype": {"name": "Task"}},
        }

        # Create issue with a custom field that happens to be named 'description'
        # This should NOT be converted as it's a different field
        issues_mixin.create_issue(
            project_key="TEST",
            summary="Test Issue",
            issue_type="Task",
            description="# Main Description",  # This SHOULD be converted
            mydescription="# Custom Field Description",  # This should NOT be converted
        )

        # Verify the main description was converted
        calls = issues_mixin._markdown_to_jira.call_args_list
        assert len(calls) == 1
        assert calls[0][0][0] == "# Main Description"

        # Verify fields
        create_call = issues_mixin.jira.create_issue.call_args[1]["fields"]
        assert create_call["description"] == "[CONVERTED] # Main Description"
        # Custom field should not be converted
        assert "customfield_10001" in create_call
        assert create_call["customfield_10001"] == "# Custom Field Description"

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/client.py:
--------------------------------------------------------------------------------

```python
"""Base client module for Jira API interactions."""

import logging
import os
from typing import Any, Literal

from atlassian import Jira
from requests import Session

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.preprocessing import JiraPreprocessor
from mcp_atlassian.utils.logging import (
    get_masked_session_headers,
    log_config_param,
    mask_sensitive,
)
from mcp_atlassian.utils.oauth import configure_oauth_session
from mcp_atlassian.utils.ssl import configure_ssl_verification

from .config import JiraConfig

# Configure logging
logger = logging.getLogger("mcp-jira")


class JiraClient:
    """Base client for Jira API interactions."""

    _field_ids_cache: list[dict[str, Any]] | None
    _current_user_account_id: str | None

    config: JiraConfig
    preprocessor: JiraPreprocessor

    def __init__(self, config: JiraConfig | None = None) -> None:
        """Initialize the Jira client with configuration options.

        Args:
            config: Optional configuration object (will use env vars if not provided)

        Raises:
            ValueError: If configuration is invalid or required credentials are missing
            MCPAtlassianAuthenticationError: If OAuth authentication fails
        """
        # Load configuration from environment variables if not provided
        self.config = config or JiraConfig.from_env()

        # Initialize the Jira client based on auth type
        if self.config.auth_type == "oauth":
            if not self.config.oauth_config or not self.config.oauth_config.cloud_id:
                error_msg = "OAuth authentication requires a valid cloud_id"
                raise ValueError(error_msg)

            # Create a session for OAuth
            session = Session()

            # Configure the session with OAuth authentication
            if not configure_oauth_session(session, self.config.oauth_config):
                error_msg = "Failed to configure OAuth session"
                raise MCPAtlassianAuthenticationError(error_msg)

            # The Jira API URL with OAuth is different
            api_url = (
                f"https://api.atlassian.com/ex/jira/{self.config.oauth_config.cloud_id}"
            )

            # Initialize Jira with the session
            self.jira = Jira(
                url=api_url,
                session=session,
                cloud=True,  # OAuth is only for Cloud
                verify_ssl=self.config.ssl_verify,
            )
        elif self.config.auth_type == "pat":
            logger.debug(
                f"Initializing Jira client with Token (PAT) auth. "
                f"URL: {self.config.url}, "
                f"Token (masked): {mask_sensitive(str(self.config.personal_token))}"
            )
            self.jira = Jira(
                url=self.config.url,
                token=self.config.personal_token,
                cloud=self.config.is_cloud,
                verify_ssl=self.config.ssl_verify,
            )
        else:  # basic auth
            logger.debug(
                f"Initializing Jira client with Basic auth. "
                f"URL: {self.config.url}, Username: {self.config.username}, "
                f"API Token present: {bool(self.config.api_token)}, "
                f"Is Cloud: {self.config.is_cloud}"
            )
            self.jira = Jira(
                url=self.config.url,
                username=self.config.username,
                password=self.config.api_token,
                cloud=self.config.is_cloud,
                verify_ssl=self.config.ssl_verify,
            )
            logger.debug(
                f"Jira client initialized. Session headers (Authorization masked): "
                f"{get_masked_session_headers(dict(self.jira._session.headers))}"
            )

        # Configure SSL verification using the shared utility
        configure_ssl_verification(
            service_name="Jira",
            url=self.config.url,
            session=self.jira._session,
            ssl_verify=self.config.ssl_verify,
        )

        # Proxy configuration
        proxies = {}
        if self.config.http_proxy:
            proxies["http"] = self.config.http_proxy
        if self.config.https_proxy:
            proxies["https"] = self.config.https_proxy
        if self.config.socks_proxy:
            proxies["socks"] = self.config.socks_proxy
        if proxies:
            self.jira._session.proxies.update(proxies)
            for k, v in proxies.items():
                log_config_param(
                    logger, "Jira", f"{k.upper()}_PROXY", v, sensitive=True
                )
        if self.config.no_proxy and isinstance(self.config.no_proxy, str):
            os.environ["NO_PROXY"] = self.config.no_proxy
            log_config_param(logger, "Jira", "NO_PROXY", self.config.no_proxy)

        # Apply custom headers if configured
        if self.config.custom_headers:
            self._apply_custom_headers()

        # Initialize the text preprocessor for text processing capabilities
        self.preprocessor = JiraPreprocessor(base_url=self.config.url)
        self._field_ids_cache = None
        self._current_user_account_id = None

        # Test authentication during initialization (in debug mode only)
        if logger.isEnabledFor(logging.DEBUG):
            try:
                self._validate_authentication()
            except MCPAtlassianAuthenticationError:
                logger.warning(
                    "Authentication validation failed during client initialization - "
                    "continuing anyway"
                )

    def _validate_authentication(self) -> None:
        """Validate authentication by making a simple API call."""
        try:
            logger.debug(
                "Testing Jira authentication by retrieving current user info..."
            )
            current_user = self.jira.myself()
            if current_user:
                logger.info(
                    f"Jira authentication successful. "
                    f"Current user: {current_user.get('displayName', 'Unknown')} "
                    f"({current_user.get('emailAddress', 'No email')})"
                )
            else:
                logger.warning(
                    "Jira authentication test returned empty user info - "
                    "this may indicate an issue"
                )
        except Exception as e:
            error_msg = f"Jira authentication validation failed: {e}"
            logger.error(error_msg)
            logger.debug(
                f"Authentication headers during failure: "
                f"{get_masked_session_headers(dict(self.jira._session.headers))}"
            )
            raise MCPAtlassianAuthenticationError(error_msg) from e

    def _apply_custom_headers(self) -> None:
        """Apply custom headers to the Jira session."""
        if not self.config.custom_headers:
            return

        logger.debug(
            f"Applying {len(self.config.custom_headers)} custom headers to Jira session"
        )
        for header_name, header_value in self.config.custom_headers.items():
            self.jira._session.headers[header_name] = header_value
            logger.debug(f"Applied custom header: {header_name}")

    def _clean_text(self, text: str) -> str:
        """Clean text content by:
        1. Processing user mentions and links
        2. Converting HTML/wiki markup to markdown

        Args:
            text: Text to clean

        Returns:
            Cleaned text
        """
        if not text:
            return ""

        # Otherwise create a temporary one
        _ = self.config.url if hasattr(self, "config") else ""
        return self.preprocessor.clean_jira_text(text)

    def _markdown_to_jira(self, markdown_text: str) -> str:
        """
        Convert Markdown syntax to Jira markup syntax.

        Args:
            markdown_text: Text in Markdown format

        Returns:
            Text in Jira markup format
        """
        if not markdown_text:
            return ""

        # Use the shared preprocessor if available
        if hasattr(self, "preprocessor"):
            return self.preprocessor.markdown_to_jira(markdown_text)

        # Otherwise create a temporary one
        _ = self.config.url if hasattr(self, "config") else ""
        return self.preprocessor.markdown_to_jira(markdown_text)

    def get_paged(
        self,
        method: Literal["get", "post"],
        url: str,
        params_or_json: dict | None = None,
        *,
        absolute: bool = False,
    ) -> list[dict]:
        """
        Repeatly fetch paged data from Jira API using `nextPageToken` to paginate.

        Args:
            method: The HTTP method to use
            url: The URL to retrieve data from
            params_or_json: Optional query parameters or JSON data to send
            absolute: Whether to use absolute URL

        Returns:
            List of requested json data

        Raises:
            ValueError: If using paged request on non-cloud Jira
        """

        if not self.config.is_cloud:
            raise ValueError(
                "Paged requests are only available for Jira Cloud platform"
            )

        all_results: list[dict] = []
        current_data = params_or_json or {}

        while True:
            if method == "get":
                api_result = self.jira.get(
                    path=url, params=current_data, absolute=absolute
                )
            else:
                api_result = self.jira.post(
                    path=url, json=current_data, absolute=absolute
                )

            if not isinstance(api_result, dict):
                error_message = f"API result is not a dictionary: {api_result}"
                logger.error(error_message)
                raise ValueError(error_message)

            # Extract values from response
            all_results.append(api_result)

            # Check if this is the last page
            if "nextPageToken" not in api_result:
                break

            # Update for next iteration
            current_data["nextPageToken"] = api_result["nextPageToken"]

        return all_results

    def create_version(
        self,
        project: str,
        name: str,
        start_date: str = None,
        release_date: str = None,
        description: str = None,
    ) -> dict[str, Any]:
        """
        Create a new version in a Jira project.

        Args:
            project: The project key (e.g., 'PROJ')
            name: The name of the version
            start_date: The start date (YYYY-MM-DD, optional)
            release_date: The release date (YYYY-MM-DD, optional)
            description: Description of the version (optional)

        Returns:
            The created version object as returned by Jira
        """
        payload = {"project": project, "name": name}
        if start_date:
            payload["startDate"] = start_date
        if release_date:
            payload["releaseDate"] = release_date
        if description:
            payload["description"] = description
        logger.info(f"Creating Jira version: {payload}")
        result = self.jira.post("/rest/api/3/version", json=payload)
        if not isinstance(result, dict):
            error_message = f"Unexpected response from Jira API: {result}"
            raise ValueError(error_message)
        return result

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/search.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira search operations."""

import logging

import requests
from requests.exceptions import HTTPError

from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraSearchResult
from .client import JiraClient
from .constants import DEFAULT_READ_JIRA_FIELDS
from .protocols import IssueOperationsProto

logger = logging.getLogger("mcp-jira")


class SearchMixin(JiraClient, IssueOperationsProto):
    """Mixin for Jira search operations."""

    def search_issues(
        self,
        jql: str,
        fields: list[str] | tuple[str, ...] | set[str] | str | None = None,
        start: int = 0,
        limit: int = 50,
        expand: str | None = None,
        projects_filter: str | None = None,
    ) -> JiraSearchResult:
        """
        Search for issues using JQL (Jira Query Language).

        Args:
            jql: JQL query string
            fields: Fields to return (comma-separated string, list, tuple, set, or "*all")
            start: Starting index if number of issues is greater than the limit
                  Note: This parameter is ignored in Cloud environments and results will always
                  start from the first page.
            limit: Maximum issues to return
            expand: Optional items to expand (comma-separated)
            projects_filter: Optional comma-separated list of project keys to filter by, overrides config

        Returns:
            JiraSearchResult object containing issues and metadata (total, start_at, max_results)

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            Exception: If there is an error searching for issues
        """
        try:
            # Use projects_filter parameter if provided, otherwise fall back to config
            filter_to_use = projects_filter or self.config.projects_filter

            # Apply projects filter if present
            if filter_to_use:
                # Split projects filter by commas and handle possible whitespace
                projects = [p.strip() for p in filter_to_use.split(",")]

                # Build the project filter query part
                if len(projects) == 1:
                    project_query = f'project = "{projects[0]}"'
                else:
                    quoted_projects = [f'"{p}"' for p in projects]
                    projects_list = ", ".join(quoted_projects)
                    project_query = f"project IN ({projects_list})"

                # Add the project filter to existing query
                if not jql:
                    # Empty JQL - just use project filter
                    jql = project_query
                elif jql.strip().upper().startswith("ORDER BY"):
                    # JQL starts with ORDER BY - prepend project filter
                    jql = f"{project_query} {jql}"
                elif "project = " not in jql and "project IN" not in jql:
                    # Only add if not already filtering by project
                    jql = f"({jql}) AND {project_query}"

                logger.info(f"Applied projects filter to query: {jql}")

            # Convert fields to proper format if it's a list/tuple/set
            fields_param: str | None
            if fields is None:  # Use default if None
                fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
            elif isinstance(fields, list | tuple | set):
                fields_param = ",".join(fields)
            else:
                fields_param = fields

            if self.config.is_cloud:
                actual_total = -1
                try:
                    # Call 1: Get metadata (including total) using standard search API
                    metadata_params = {"jql": jql, "maxResults": 0}
                    metadata_response = self.jira.get(
                        self.jira.resource_url("search"), params=metadata_params
                    )

                    if (
                        isinstance(metadata_response, dict)
                        and "total" in metadata_response
                    ):
                        try:
                            actual_total = int(metadata_response["total"])
                        except (ValueError, TypeError):
                            logger.warning(
                                f"Could not parse 'total' from metadata response for JQL: {jql}. Received: {metadata_response.get('total')}"
                            )
                    else:
                        logger.warning(
                            f"Could not retrieve total count from metadata response for JQL: {jql}. Response type: {type(metadata_response)}"
                        )
                except Exception as meta_err:
                    logger.error(
                        f"Error fetching metadata for JQL '{jql}': {str(meta_err)}"
                    )

                # Call 2: Get the actual issues using the enhanced method
                issues_response_list = self.jira.enhanced_jql_get_list_of_tickets(
                    jql, fields=fields_param, limit=limit, expand=expand
                )

                if not isinstance(issues_response_list, list):
                    msg = f"Unexpected return value type from `jira.enhanced_jql_get_list_of_tickets`: {type(issues_response_list)}"
                    logger.error(msg)
                    raise TypeError(msg)

                response_dict_for_model = {
                    "issues": issues_response_list,
                    "total": actual_total,
                }

                search_result = JiraSearchResult.from_api_response(
                    response_dict_for_model,
                    base_url=self.config.url,
                    requested_fields=fields_param,
                )

                # Return the full search result object
                return search_result
            else:
                limit = min(limit, 50)
                response = self.jira.jql(
                    jql, fields=fields_param, start=start, limit=limit, expand=expand
                )
                if not isinstance(response, dict):
                    msg = f"Unexpected return value type from `jira.jql`: {type(response)}"
                    logger.error(msg)
                    raise TypeError(msg)

                # Convert the response to a search result model
                search_result = JiraSearchResult.from_api_response(
                    response, base_url=self.config.url, requested_fields=fields_param
                )

                # Return the full search result object
                return search_result

        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API ({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
                raise http_err
        except Exception as e:
            logger.error(f"Error searching issues with JQL '{jql}': {str(e)}")
            raise Exception(f"Error searching issues: {str(e)}") from e

    def get_board_issues(
        self,
        board_id: str,
        jql: str,
        fields: str | None = None,
        start: int = 0,
        limit: int = 50,
        expand: str | None = None,
    ) -> JiraSearchResult:
        """
        Get all issues linked to a specific board.

        Args:
            board_id: The ID of the board
            jql: JQL query string
            fields: Fields to return (comma-separated string or "*all")
            start: Starting index
            limit: Maximum issues to return
            expand: Optional items to expand (comma-separated)

        Returns:
            JiraSearchResult object containing board issues and metadata

        Raises:
            Exception: If there is an error getting board issues
        """
        try:
            # Determine fields_param
            fields_param = fields
            if fields_param is None:
                fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)

            response = self.jira.get_issues_for_board(
                board_id=board_id,
                jql=jql,
                fields=fields_param,
                start=start,
                limit=limit,
                expand=expand,
            )
            if not isinstance(response, dict):
                msg = f"Unexpected return value type from `jira.get_issues_for_board`: {type(response)}"
                logger.error(msg)
                raise TypeError(msg)

            # Convert the response to a search result model
            search_result = JiraSearchResult.from_api_response(
                response, base_url=self.config.url, requested_fields=fields_param
            )
            return search_result
        except requests.HTTPError as e:
            logger.error(
                f"Error searching issues for board with JQL '{board_id}': {str(e.response.content)}"
            )
            raise Exception(
                f"Error searching issues for board with JQL: {str(e.response.content)}"
            ) from e
        except Exception as e:
            logger.error(f"Error searching issues for board with JQL '{jql}': {str(e)}")
            raise Exception(
                f"Error searching issues for board with JQL {str(e)}"
            ) from e

    def get_sprint_issues(
        self,
        sprint_id: str,
        fields: str | None = None,
        start: int = 0,
        limit: int = 50,
    ) -> JiraSearchResult:
        """
        Get all issues linked to a specific sprint.

        Args:
            sprint_id: The ID of the sprint
            fields: Fields to return (comma-separated string or "*all")
            start: Starting index
            limit: Maximum issues to return

        Returns:
            JiraSearchResult object containing sprint issues and metadata

        Raises:
            Exception: If there is an error getting board issues
        """
        try:
            # Determine fields_param
            fields_param = fields
            if fields_param is None:
                fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)

            response = self.jira.get_sprint_issues(
                sprint_id=sprint_id,
                start=start,
                limit=limit,
            )
            if not isinstance(response, dict):
                msg = f"Unexpected return value type from `jira.get_sprint_issues`: {type(response)}"
                logger.error(msg)
                raise TypeError(msg)

            # Convert the response to a search result model
            search_result = JiraSearchResult.from_api_response(
                response, base_url=self.config.url, requested_fields=fields_param
            )
            return search_result
        except requests.HTTPError as e:
            logger.error(
                f"Error searching issues for sprint '{sprint_id}': {str(e.response.content)}"
            )
            raise Exception(
                f"Error searching issues for sprint: {str(e.response.content)}"
            ) from e
        except Exception as e:
            logger.error(f"Error searching issues for sprint: {sprint_id}': {str(e)}")
            raise Exception(f"Error searching issues for sprint: {str(e)}") from e

```

--------------------------------------------------------------------------------
/tests/integration/test_proxy.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for proxy handling in Jira and Confluence clients (mocked requests).
"""

import os
from unittest.mock import MagicMock, patch

import pytest
from requests.exceptions import ProxyError

from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from tests.utils.base import BaseAuthTest
from tests.utils.mocks import MockEnvironment


@pytest.mark.integration
def test_jira_client_passes_proxies_to_requests(monkeypatch):
    """Test that JiraClient passes proxies to requests.Session.request."""
    mock_jira = MagicMock()
    mock_session = MagicMock()
    # Create a proper proxies dictionary that can be updated
    mock_session.proxies = {}
    mock_jira._session = mock_session
    monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
    monkeypatch.setattr(
        "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
    )
    config = JiraConfig(
        url="https://test.atlassian.net",
        auth_type="basic",
        username="user",
        api_token="pat",
        http_proxy="http://proxy:8080",
        https_proxy="https://proxy:8443",
        socks_proxy="socks5://user:pass@proxy:1080",
        no_proxy="localhost,127.0.0.1",
    )
    client = JiraClient(config=config)
    # Simulate a request
    client.jira._session.request(
        "GET", "https://test.atlassian.net/rest/api/2/issue/TEST-1"
    )
    assert mock_session.proxies["http"] == "http://proxy:8080"
    assert mock_session.proxies["https"] == "https://proxy:8443"
    assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"


@pytest.mark.integration
def test_confluence_client_passes_proxies_to_requests(monkeypatch):
    """Test that ConfluenceClient passes proxies to requests.Session.request."""
    mock_confluence = MagicMock()
    mock_session = MagicMock()
    # Create a proper proxies dictionary that can be updated
    mock_session.proxies = {}
    mock_confluence._session = mock_session
    monkeypatch.setattr(
        "mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
    )
    monkeypatch.setattr(
        "mcp_atlassian.confluence.client.configure_ssl_verification",
        lambda **kwargs: None,
    )
    monkeypatch.setattr(
        "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
        lambda **kwargs: MagicMock(),
    )
    config = ConfluenceConfig(
        url="https://test.atlassian.net/wiki",
        auth_type="basic",
        username="user",
        api_token="pat",
        http_proxy="http://proxy:8080",
        https_proxy="https://proxy:8443",
        socks_proxy="socks5://user:pass@proxy:1080",
        no_proxy="localhost,127.0.0.1",
    )
    client = ConfluenceClient(config=config)
    # Simulate a request
    client.confluence._session.request(
        "GET", "https://test.atlassian.net/wiki/rest/api/content/123"
    )
    assert mock_session.proxies["http"] == "http://proxy:8080"
    assert mock_session.proxies["https"] == "https://proxy:8443"
    assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"


@pytest.mark.integration
def test_jira_client_no_proxy_env(monkeypatch):
    """Test that JiraClient sets NO_PROXY env var and requests to excluded hosts bypass proxy."""
    mock_jira = MagicMock()
    mock_session = MagicMock()
    mock_jira._session = mock_session
    monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
    monkeypatch.setattr(
        "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
    )
    monkeypatch.setenv("NO_PROXY", "")
    config = JiraConfig(
        url="https://test.atlassian.net",
        auth_type="basic",
        username="user",
        api_token="pat",
        http_proxy="http://proxy:8080",
        no_proxy="localhost,127.0.0.1",
    )
    client = JiraClient(config=config)
    assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"


class TestProxyConfigurationEnhanced(BaseAuthTest):
    """Enhanced proxy configuration tests using test utilities."""

    @pytest.mark.integration
    def test_proxy_configuration_from_environment(self):
        """Test proxy configuration loaded from environment variables."""
        with MockEnvironment.basic_auth_env() as env_vars:
            # Set proxy environment variables in os.environ directly
            proxy_vars = {
                "HTTP_PROXY": "http://proxy.company.com:8080",
                "HTTPS_PROXY": "https://proxy.company.com:8443",
                "NO_PROXY": "*.internal.com,localhost",
            }

            # Patch environment with proxy settings
            with patch.dict(os.environ, proxy_vars):
                # Jira should pick up proxy settings
                jira_config = JiraConfig.from_env()
                assert jira_config.http_proxy == "http://proxy.company.com:8080"
                assert jira_config.https_proxy == "https://proxy.company.com:8443"
                assert jira_config.no_proxy == "*.internal.com,localhost"

                # Confluence should pick up proxy settings
                confluence_config = ConfluenceConfig.from_env()
                assert confluence_config.http_proxy == "http://proxy.company.com:8080"
                assert confluence_config.https_proxy == "https://proxy.company.com:8443"
                assert confluence_config.no_proxy == "*.internal.com,localhost"

    @pytest.mark.integration
    def test_proxy_authentication_in_url(self):
        """Test proxy URLs with authentication credentials."""
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="user",
            api_token="token",
            http_proxy="http://proxyuser:[email protected]:8080",
            https_proxy="https://proxyuser:[email protected]:8443",
        )

        # Verify proxy URLs contain authentication
        assert "proxyuser:proxypass" in config.http_proxy
        assert "proxyuser:proxypass" in config.https_proxy

    @pytest.mark.integration
    def test_socks_proxy_configuration(self, monkeypatch):
        """Test SOCKS proxy configuration for both services."""
        mock_jira = MagicMock()
        mock_session = MagicMock()
        # Create a proper proxies dictionary that can be updated
        mock_session.proxies = {}
        mock_jira._session = mock_session
        monkeypatch.setattr(
            "mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
        )
        monkeypatch.setattr(
            "mcp_atlassian.jira.client.configure_ssl_verification",
            lambda **kwargs: None,
        )

        # Test SOCKS5 proxy
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="user",
            api_token="token",
            socks_proxy="socks5://socksuser:[email protected]:1080",
        )

        client = JiraClient(config=config)
        assert (
            mock_session.proxies["socks"]
            == "socks5://socksuser:[email protected]:1080"
        )

    @pytest.mark.integration
    def test_proxy_bypass_for_internal_domains(self, monkeypatch):
        """Test that requests to NO_PROXY domains bypass the proxy."""
        # Set up environment
        monkeypatch.setenv("NO_PROXY", "*.internal.com,localhost,127.0.0.1")

        config = JiraConfig(
            url="https://jira.internal.com",  # Internal domain
            auth_type="basic",
            username="user",
            api_token="token",
            http_proxy="http://proxy.company.com:8080",
            no_proxy="*.internal.com,localhost,127.0.0.1",
        )

        # Verify NO_PROXY is set in environment
        assert os.environ["NO_PROXY"] == "*.internal.com,localhost,127.0.0.1"
        assert "internal.com" in config.no_proxy

    @pytest.mark.integration
    def test_proxy_error_handling(self, monkeypatch):
        """Test proper error handling when proxy connection fails."""
        # Mock to simulate proxy connection failure
        mock_jira = MagicMock()
        mock_jira.side_effect = ProxyError("Unable to connect to proxy")
        monkeypatch.setattr("mcp_atlassian.jira.client.Jira", mock_jira)

        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="user",
            api_token="token",
            http_proxy="http://unreachable.proxy.com:8080",
        )

        # Creating client should raise proxy error
        with pytest.raises(ProxyError, match="Unable to connect to proxy"):
            JiraClient(config=config)

    @pytest.mark.integration
    def test_proxy_configuration_precedence(self):
        """Test that explicit proxy config takes precedence over environment."""
        with patch.dict(
            os.environ,
            {
                "HTTP_PROXY": "http://env.proxy.com:8080",
                "HTTPS_PROXY": "https://env.proxy.com:8443",
            },
        ):
            # Explicit configuration should override environment
            config = JiraConfig(
                url="https://test.atlassian.net",
                auth_type="basic",
                username="user",
                api_token="token",
                http_proxy="http://explicit.proxy.com:8080",
                https_proxy="https://explicit.proxy.com:8443",
            )

            assert config.http_proxy == "http://explicit.proxy.com:8080"
            assert config.https_proxy == "https://explicit.proxy.com:8443"

    @pytest.mark.integration
    def test_mixed_proxy_and_ssl_configuration(self, monkeypatch):
        """Test proxy configuration works correctly with SSL verification disabled."""
        mock_confluence = MagicMock()
        mock_session = MagicMock()
        # Create a proper proxies dictionary that can be updated
        mock_session.proxies = {}
        mock_confluence._session = mock_session
        monkeypatch.setattr(
            "mcp_atlassian.confluence.client.Confluence",
            lambda **kwargs: mock_confluence,
        )
        monkeypatch.setattr(
            "mcp_atlassian.confluence.client.configure_ssl_verification",
            lambda **kwargs: None,
        )
        monkeypatch.setattr(
            "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
            lambda **kwargs: MagicMock(),
        )

        # Configure with both proxy and SSL disabled
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="basic",
            username="user",
            api_token="token",
            http_proxy="http://proxy.company.com:8080",
            ssl_verify=False,
        )

        client = ConfluenceClient(config=config)

        # Both proxy and SSL settings should be applied
        assert mock_session.proxies["http"] == "http://proxy.company.com:8080"
        assert config.ssl_verify is False

    @pytest.mark.integration
    def test_proxy_with_oauth_configuration(self):
        """Test proxy configuration works with OAuth authentication."""
        with MockEnvironment.oauth_env() as env_vars:
            # Add proxy configuration to env_vars directly, then patch os.environ
            proxy_vars = {
                "HTTP_PROXY": "http://proxy.company.com:8080",
                "HTTPS_PROXY": "https://proxy.company.com:8443",
                "NO_PROXY": "localhost,127.0.0.1",
            }

            # Merge with OAuth env vars
            all_vars = {**env_vars, **proxy_vars}

            # Use patch.dict to ensure environment variables are set
            with patch.dict(os.environ, all_vars):
                # OAuth should still respect proxy settings
                assert os.environ.get("HTTP_PROXY") == "http://proxy.company.com:8080"
                assert os.environ.get("HTTPS_PROXY") == "https://proxy.company.com:8443"
                assert os.environ.get("NO_PROXY") == "localhost,127.0.0.1"

```

--------------------------------------------------------------------------------
/scripts/oauth_authorize.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
"""
OAuth 2.0 Authorization Flow Helper for MCP Atlassian

This script helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud:
1. Opens a browser to the authorization URL
2. Starts a local server to receive the callback with the authorization code
3. Exchanges the authorization code for access and refresh tokens
4. Saves the tokens for later use by MCP Atlassian

Usage:
    python oauth_authorize.py --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET
                             --redirect-uri http://localhost:8080/callback
                             --scope "read:jira-work write:jira-work read:confluence-space.summary offline_access"

IMPORTANT: The 'offline_access' scope is required for refresh tokens to work properly.
Without this scope, tokens will expire quickly and authentication will fail.

Environment variables can also be used:
- ATLASSIAN_OAUTH_CLIENT_ID
- ATLASSIAN_OAUTH_CLIENT_SECRET
- ATLASSIAN_OAUTH_REDIRECT_URI
- ATLASSIAN_OAUTH_SCOPE
"""

import argparse
import http.server
import logging
import os
import secrets
import socketserver
import sys
import threading
import time
import urllib.parse
import webbrowser

# Add the parent directory to the path so we can import the package
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from src.mcp_atlassian.utils.oauth import OAuthConfig

# Configure logging (basicConfig should be called only once, ideally at the very start)
# Adding lineno for better debugging.
logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(message)s",
    force=True,
)


logger = logging.getLogger("oauth-authorize")
logger.setLevel(logging.DEBUG)
logging.getLogger("mcp-atlassian.oauth").setLevel(logging.DEBUG)

# Global variables for callback handling
authorization_code = None
received_state = None
callback_received = False
callback_error = None


class CallbackHandler(http.server.BaseHTTPRequestHandler):
    """HTTP request handler for OAuth callback."""

    def do_GET(self) -> None:  # noqa: N802
        """Handle GET requests (OAuth callback and favicon)."""
        global authorization_code, callback_received, callback_error, received_state

        parsed_path = urllib.parse.urlparse(self.path)
        logger.debug(f"CallbackHandler received GET request for: {self.path}")

        # Ignore favicon requests politely
        if parsed_path.path == "/favicon.ico":
            self.send_error(404, "File not found")
            logger.debug("CallbackHandler: Ignored /favicon.ico request.")
            return

        # Process only /callback path
        if parsed_path.path != "/callback":
            self.send_error(404, "Not Found: Only /callback is supported.")
            logger.warning(
                f"CallbackHandler: Received request for unexpected path: {parsed_path.path}"
            )
            return

        # Parse the query parameters from the URL
        query = parsed_path.query
        params = urllib.parse.parse_qs(query)

        if "error" in params:
            callback_error = params["error"][0]
            callback_received = True
            logger.error(f"Authorization error from callback: {callback_error}")
            self._send_response(f"Authorization failed: {callback_error}")
            return

        if "code" in params:
            authorization_code = params["code"][0]
            if "state" in params:
                received_state = params["state"][0]
            callback_received = True
            logger.info(
                "Authorization code and state received successfully via callback."
            )
            self._send_response(
                "Authorization successful! You can close this window now."
            )
        else:
            logger.error("Invalid callback: 'code' or 'error' parameter missing.")
            self._send_response(
                "Invalid callback: Authorization code missing", status=400
            )

    def _send_response(self, message: str, status: int = 200) -> None:
        """Send response to the browser."""
        self.send_response(status)
        self.send_header("Content-type", "text/html")
        self.end_headers()

        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Atlassian OAuth Authorization</title>
            <style>
                body {{
                    font-family: Arial, sans-serif;
                    text-align: center;
                    padding: 40px;
                    max-width: 600px;
                    margin: 0 auto;
                }}
                .message {{
                    padding: 20px;
                    border-radius: 5px;
                    margin-bottom: 20px;
                }}
                .success {{
                    background-color: #d4edda;
                    color: #155724;
                    border: 1px solid #c3e6cb;
                }}
                .error {{
                    background-color: #f8d7da;
                    color: #721c24;
                    border: 1px solid #f5c6cb;
                }}
            </style>
        </head>
        <body>
            <h1>Atlassian OAuth Authorization</h1>
            <div class="message {"success" if status == 200 else "error"}">
                <p>{message}</p>
            </div>
            <p>This window will automatically close in 5 seconds...</p>
            <script>
                setTimeout(function() {{
                    window.close();
                }}, 5000);
            </script>
        </body>
        </html>
        """
        self.wfile.write(html.encode())

    # Make the server quiet
    def log_message(self, format: str, *args: str) -> None:
        return


def start_callback_server(port: int) -> socketserver.TCPServer:
    """Start a local server to receive the OAuth callback."""
    handler = CallbackHandler
    httpd = socketserver.TCPServer(("", port), handler)
    server_thread = threading.Thread(target=httpd.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    return httpd


def wait_for_callback(timeout: int = 300) -> bool:
    """Wait for the callback to be received."""
    start_time = time.time()
    while not callback_received and (time.time() - start_time) < timeout:
        time.sleep(1)

    if not callback_received:
        logger.error(
            f"Timed out waiting for authorization callback after {timeout} seconds"
        )
        return False

    if callback_error:
        logger.error(f"Authorization error: {callback_error}")
        return False

    return True


def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]:
    """Parse the redirect URI to extract host and port."""
    parsed = urllib.parse.urlparse(redirect_uri)
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    return parsed.hostname, port


def run_oauth_flow(args: argparse.Namespace) -> bool:
    """Run the OAuth 2.0 authorization flow."""
    # Create OAuth configuration
    oauth_config = OAuthConfig(
        client_id=args.client_id,
        client_secret=args.client_secret,
        redirect_uri=args.redirect_uri,
        scope=args.scope,
    )

    # Generate a random state for CSRF protection
    state = secrets.token_urlsafe(16)

    # Start local callback server if using localhost
    hostname, port = parse_redirect_uri(args.redirect_uri)
    httpd = None

    if hostname and hostname.lower() in ["localhost", "127.0.0.1"]:
        logger.info(f"Attempting to start local callback server on {hostname}:{port}")
        try:
            httpd = start_callback_server(port)
        except OSError as e:
            logger.error(f"Failed to start callback server: {e}")
            logger.error(f"Make sure port {port} is available and not in use")
            return False

    # Get the authorization URL
    auth_url = oauth_config.get_authorization_url(state=state)

    # Open the browser for authorization
    logger.info(f"Opening browser for authorization at {auth_url}")
    webbrowser.open(auth_url)
    logger.info(
        "If the browser doesn't open automatically, please visit this URL manually."
    )

    # Wait for the callback
    if not wait_for_callback():
        if httpd:
            httpd.shutdown()
        return False

    # Verify state to prevent CSRF attacks
    if received_state != state:
        logger.error(
            f"State mismatch! Possible CSRF attack. Expected: {state}, Received: {received_state}"
        )
        if httpd:
            httpd.shutdown()
        return False
    logger.info("CSRF state verified successfully.")

    # Exchange the code for tokens
    logger.info("Exchanging authorization code for tokens...")
    if not authorization_code:
        logger.error("Authorization code is missing, cannot exchange for tokens.")
        if httpd:
            httpd.shutdown()
        return False

    if oauth_config.exchange_code_for_tokens(authorization_code):
        logger.info("🎉 OAuth authorization flow completed successfully!")

        if oauth_config.cloud_id:
            logger.info(f"Retrieved Cloud ID: {oauth_config.cloud_id}")
            logger.info(
                "\n💡 Tip: Add/update the following in your .env file or environment variables:"
            )
            logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}")
            logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}")
            logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}")
            logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}")
            logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}")
        else:
            logger.warning(
                "Cloud ID could not be obtained. Some API calls might require it."
            )

        if httpd:
            httpd.shutdown()
        return True
    else:
        logger.error("Failed to exchange authorization code for tokens")
        if httpd:
            httpd.shutdown()
        return False


def main() -> int:
    """Main entry point."""
    parser = argparse.ArgumentParser(
        description="OAuth 2.0 Authorization Flow Helper for MCP Atlassian"
    )
    parser.add_argument("--client-id", help="OAuth Client ID")
    parser.add_argument("--client-secret", help="OAuth Client Secret")
    parser.add_argument(
        "--redirect-uri",
        help="OAuth Redirect URI (e.g., http://localhost:8080/callback)",
    )
    parser.add_argument("--scope", help="OAuth Scope (space-separated)")

    args = parser.parse_args()

    # Check for environment variables if arguments are not provided
    if not args.client_id:
        args.client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID")
    if not args.client_secret:
        args.client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET")
    if not args.redirect_uri:
        args.redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI")
    if not args.scope:
        args.scope = os.getenv("ATLASSIAN_OAUTH_SCOPE")

    # Validate required arguments
    missing = []
    if not args.client_id:
        missing.append("client-id")
    if not args.client_secret:
        missing.append("client-secret")
    if not args.redirect_uri:
        missing.append("redirect-uri")
    if not args.scope:
        missing.append("scope")

    if missing:
        logger.error(f"Missing required arguments: {', '.join(missing)}")
        parser.print_help()
        return 1

    # Check for offline_access scope
    if args.scope and "offline_access" not in args.scope.split():
        logger.warning("\n⚠️ WARNING: The 'offline_access' scope is missing!")
        logger.warning(
            "Without this scope, refresh tokens will not be issued and authentication will fail when tokens expire."
        )
        logger.warning("Consider adding 'offline_access' to your scope string.")
        proceed = input("Do you want to proceed anyway? (y/n): ")
        if proceed.lower() != "y":
            return 1

    success = run_oauth_flow(args)
    return 0 if success else 1


if __name__ == "__main__":
    sys.exit(main())

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/preprocessing/jira.py:
--------------------------------------------------------------------------------

```python
"""Jira-specific text preprocessing module."""

import logging
import re
from typing import Any

from .base import BasePreprocessor

logger = logging.getLogger("mcp-atlassian")


class JiraPreprocessor(BasePreprocessor):
    """Handles text preprocessing for Jira content."""

    def __init__(self, base_url: str = "", **kwargs: Any) -> None:
        """
        Initialize the Jira text preprocessor.

        Args:
            base_url: Base URL for Jira API
            **kwargs: Additional arguments for the base class
        """
        super().__init__(base_url=base_url, **kwargs)

    def clean_jira_text(self, text: str) -> str:
        """
        Clean Jira text content by:
        1. Processing user mentions and links
        2. Converting Jira markup to markdown
        3. Converting HTML/wiki markup to markdown
        """
        if not text:
            return ""

        # Process user mentions
        mention_pattern = r"\[~accountid:(.*?)\]"
        text = self._process_mentions(text, mention_pattern)

        # Process Jira smart links
        text = self._process_smart_links(text)

        # First convert any Jira markup to Markdown
        text = self.jira_to_markdown(text)

        # Then convert any remaining HTML to markdown
        text = self._convert_html_to_markdown(text)

        return text.strip()

    def _process_mentions(self, text: str, pattern: str) -> str:
        """
        Process user mentions in text.

        Args:
            text: The text containing mentions
            pattern: Regular expression pattern to match mentions

        Returns:
            Text with mentions replaced with display names
        """
        mentions = re.findall(pattern, text)
        for account_id in mentions:
            try:
                # Note: This is a placeholder - actual user fetching should be injected
                display_name = f"User:{account_id}"
                text = text.replace(f"[~accountid:{account_id}]", display_name)
            except Exception as e:
                logger.error(f"Error processing mention for {account_id}: {str(e)}")
        return text

    def _process_smart_links(self, text: str) -> str:
        """Process Jira/Confluence smart links."""
        # Pattern matches: [text|url|smart-link]
        link_pattern = r"\[(.*?)\|(.*?)\|smart-link\]"
        matches = re.finditer(link_pattern, text)

        for match in matches:
            full_match = match.group(0)
            link_text = match.group(1)
            link_url = match.group(2)

            # Extract issue key if it's a Jira issue link
            issue_key_match = re.search(r"browse/([A-Z]+-\d+)", link_url)
            # Check if it's a Confluence wiki link
            confluence_match = re.search(
                r"wiki/spaces/.+?/pages/\d+/(.+?)(?:\?|$)", link_url
            )

            if issue_key_match:
                issue_key = issue_key_match.group(1)
                clean_url = f"{self.base_url}/browse/{issue_key}"
                text = text.replace(full_match, f"[{issue_key}]({clean_url})")
            elif confluence_match:
                url_title = confluence_match.group(1)
                readable_title = url_title.replace("+", " ")
                readable_title = re.sub(r"^[A-Z]+-\d+\s+", "", readable_title)
                text = text.replace(full_match, f"[{readable_title}]({link_url})")
            else:
                clean_url = link_url.split("?")[0]
                text = text.replace(full_match, f"[{link_text}]({clean_url})")

        return text

    def jira_to_markdown(self, input_text: str) -> str:
        """
        Convert Jira markup to Markdown format.

        Args:
            input_text: Text in Jira markup format

        Returns:
            Text in Markdown format
        """
        if not input_text:
            return ""

        # Block quotes
        output = re.sub(r"^bq\.(.*?)$", r"> \1\n", input_text, flags=re.MULTILINE)

        # Text formatting (bold, italic)
        output = re.sub(
            r"([*_])(.*?)\1",
            lambda match: ("**" if match.group(1) == "*" else "*")
            + match.group(2)
            + ("**" if match.group(1) == "*" else "*"),
            output,
        )

        # Multi-level numbered list
        output = re.sub(
            r"^((?:#|-|\+|\*)+) (.*)$",
            lambda match: self._convert_jira_list_to_markdown(match),
            output,
            flags=re.MULTILINE,
        )

        # Headers
        output = re.sub(
            r"^h([0-6])\.(.*)$",
            lambda match: "#" * int(match.group(1)) + match.group(2),
            output,
            flags=re.MULTILINE,
        )

        # Inline code
        output = re.sub(r"\{\{([^}]+)\}\}", r"`\1`", output)

        # Citation
        output = re.sub(r"\?\?((?:.[^?]|[^?].)+)\?\?", r"<cite>\1</cite>", output)

        # Inserted text
        output = re.sub(r"\+([^+]*)\+", r"<ins>\1</ins>", output)

        # Superscript
        output = re.sub(r"\^([^^]*)\^", r"<sup>\1</sup>", output)

        # Subscript
        output = re.sub(r"~([^~]*)~", r"<sub>\1</sub>", output)

        # Strikethrough
        output = re.sub(r"-([^-]*)-", r"-\1-", output)

        # Code blocks with optional language specification
        output = re.sub(
            r"\{code(?::([a-z]+))?\}([\s\S]*?)\{code\}",
            r"```\1\n\2\n```",
            output,
            flags=re.MULTILINE,
        )

        # No format
        output = re.sub(r"\{noformat\}([\s\S]*?)\{noformat\}", r"```\n\1\n```", output)

        # Quote blocks
        output = re.sub(
            r"\{quote\}([\s\S]*)\{quote\}",
            lambda match: "\n".join(
                [f"> {line}" for line in match.group(1).split("\n")]
            ),
            output,
            flags=re.MULTILINE,
        )

        # Images with alt text
        output = re.sub(
            r"!([^|\n\s]+)\|([^\n!]*)alt=([^\n!\,]+?)(,([^\n!]*))?!",
            r"![\3](\1)",
            output,
        )

        # Images with other parameters (ignore them)
        output = re.sub(r"!([^|\n\s]+)\|([^\n!]*)!", r"![](\1)", output)

        # Images without parameters
        output = re.sub(r"!([^\n\s!]+)!", r"![](\1)", output)

        # Links
        output = re.sub(r"\[([^|]+)\|(.+?)\]", r"[\1](\2)", output)
        output = re.sub(r"\[(.+?)\]([^\(]+)", r"<\1>\2", output)

        # Colored text
        output = re.sub(
            r"\{color:([^}]+)\}([\s\S]*?)\{color\}",
            r"<span style=\"color:\1\">\2</span>",
            output,
            flags=re.MULTILINE,
        )

        # Convert Jira table headers (||) to markdown table format
        lines = output.split("\n")
        i = 0
        while i < len(lines):
            line = lines[i]

            if "||" in line:
                # Replace Jira table headers
                lines[i] = lines[i].replace("||", "|")

                # Add a separator line for markdown tables
                header_cells = lines[i].count("|") - 1
                if header_cells > 0:
                    separator_line = "|" + "---|" * header_cells
                    lines.insert(i + 1, separator_line)
                    i += 1  # Skip the newly inserted line in next iteration

            i += 1

        # Rejoin the lines
        output = "\n".join(lines)

        return output

    def markdown_to_jira(self, input_text: str) -> str:
        """
        Convert Markdown syntax to Jira markup syntax.

        Args:
            input_text: Text in Markdown format

        Returns:
            Text in Jira markup format
        """
        if not input_text:
            return ""

        # Save code blocks to prevent recursive processing
        code_blocks = []
        inline_codes = []

        # Extract code blocks
        def save_code_block(match: re.Match) -> str:
            """
            Process and save a code block.

            Args:
                match: Regex match object containing the code block

            Returns:
                Jira-formatted code block
            """
            syntax = match.group(1) or ""
            content = match.group(2)
            code = "{code"
            if syntax:
                code += ":" + syntax
            code += "}" + content + "{code}"
            code_blocks.append(code)
            return str(code)  # Ensure we return a string

        # Extract inline code
        def save_inline_code(match: re.Match) -> str:
            """
            Process and save inline code.

            Args:
                match: Regex match object containing the inline code

            Returns:
                Jira-formatted inline code
            """
            content = match.group(1)
            code = "{{" + content + "}}"
            inline_codes.append(code)
            return str(code)  # Ensure we return a string

        # Save code sections temporarily
        output = re.sub(r"```(\w*)\n([\s\S]+?)```", save_code_block, input_text)
        output = re.sub(r"`([^`]+)`", save_inline_code, output)

        # Headers with = or - underlines
        output = re.sub(
            r"^(.*?)\n([=-])+$",
            lambda match: f"h{1 if match.group(2)[0] == '=' else 2}. {match.group(1)}",
            output,
            flags=re.MULTILINE,
        )

        # Headers with # prefix
        output = re.sub(
            r"^([#]+)(.*?)$",
            lambda match: f"h{len(match.group(1))}." + match.group(2),
            output,
            flags=re.MULTILINE,
        )

        # Bold and italic
        output = re.sub(
            r"([*_]+)(.*?)\1",
            lambda match: ("_" if len(match.group(1)) == 1 else "*")
            + match.group(2)
            + ("_" if len(match.group(1)) == 1 else "*"),
            output,
        )

        # Multi-level bulleted list
        output = re.sub(
            r"^(\s*)- (.*)$",
            lambda match: (
                "* " + match.group(2)
                if not match.group(1)
                else "  " * (len(match.group(1)) // 2) + "* " + match.group(2)
            ),
            output,
            flags=re.MULTILINE,
        )

        # Multi-level numbered list
        output = re.sub(
            r"^(\s+)1\. (.*)$",
            lambda match: "#" * (int(len(match.group(1)) / 4) + 2)
            + " "
            + match.group(2),
            output,
            flags=re.MULTILINE,
        )

        # HTML formatting tags to Jira markup
        tag_map = {"cite": "??", "del": "-", "ins": "+", "sup": "^", "sub": "~"}

        for tag, replacement in tag_map.items():
            output = re.sub(
                rf"<{tag}>(.*?)<\/{tag}>", rf"{replacement}\1{replacement}", output
            )

        # Colored text
        output = re.sub(
            r"<span style=\"color:(#[^\"]+)\">([\s\S]*?)</span>",
            r"{color:\1}\2{color}",
            output,
            flags=re.MULTILINE,
        )

        # Strikethrough
        output = re.sub(r"~~(.*?)~~", r"-\1-", output)

        # Images without alt text
        output = re.sub(r"!\[\]\(([^)\n\s]+)\)", r"!\1!", output)

        # Images with alt text
        output = re.sub(r"!\[([^\]\n]+)\]\(([^)\n\s]+)\)", r"!\2|alt=\1!", output)

        # Links
        output = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"[\1|\2]", output)
        output = re.sub(r"<([^>]+)>", r"[\1]", output)

        # Convert markdown tables to Jira table format
        lines = output.split("\n")
        i = 0
        while i < len(lines):
            if i < len(lines) - 1 and re.match(r"\|[-\s|]+\|", lines[i + 1]):
                # Convert header row to Jira format
                lines[i] = lines[i].replace("|", "||")
                # Remove the separator line
                lines.pop(i + 1)
            i += 1

        # Rejoin the lines
        output = "\n".join(lines)

        return output

    def _convert_jira_list_to_markdown(self, match: re.Match) -> str:
        """
        Helper method to convert Jira lists to Markdown format.

        Args:
            match: Regex match object containing the Jira list markup

        Returns:
            Markdown-formatted list item
        """
        jira_bullets = match.group(1)
        content = match.group(2)

        # Calculate indentation level based on number of symbols
        indent_level = len(jira_bullets) - 1
        indent = " " * (indent_level * 2)

        # Determine the marker based on the last character
        last_char = jira_bullets[-1]
        prefix = "1." if last_char == "#" else "-"

        return f"{indent}{prefix} {content}"

```
Page 3/10FirstPrevNextLast