#
tokens: 48842/50000 37/194 files (page 2/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 10. Use http://codebase.md/sooperset/mcp-atlassian?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/decorators.py:
--------------------------------------------------------------------------------

```python
import logging
from collections.abc import Awaitable, Callable
from functools import wraps
from typing import Any, TypeVar

import requests
from fastmcp import Context
from requests.exceptions import HTTPError

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError

logger = logging.getLogger(__name__)


F = TypeVar("F", bound=Callable[..., Awaitable[Any]])


def check_write_access(func: F) -> F:
    """
    Decorator for FastMCP tools to check if the application is in read-only mode.
    If in read-only mode, it raises a ValueError.
    Assumes the decorated function is async and has `ctx: Context` as its first argument.
    """

    @wraps(func)
    async def wrapper(ctx: Context, *args: Any, **kwargs: Any) -> Any:
        lifespan_ctx_dict = ctx.request_context.lifespan_context
        app_lifespan_ctx = (
            lifespan_ctx_dict.get("app_lifespan_context")
            if isinstance(lifespan_ctx_dict, dict)
            else None
        )  # type: ignore

        if app_lifespan_ctx is not None and app_lifespan_ctx.read_only:
            tool_name = func.__name__
            action_description = tool_name.replace(
                "_", " "
            )  # e.g., "create_issue" -> "create issue"
            logger.warning(f"Attempted to call tool '{tool_name}' in read-only mode.")
            raise ValueError(f"Cannot {action_description} in read-only mode.")

        return await func(ctx, *args, **kwargs)

    return wrapper  # type: ignore


def handle_atlassian_api_errors(service_name: str = "Atlassian API") -> Callable:
    """
    Decorator to handle common Atlassian API exceptions (Jira, Confluence, etc.).

    Args:
        service_name: Name of the service for error logging (e.g., "Jira API").
    """

    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
            try:
                return func(self, *args, **kwargs)
            except HTTPError as http_err:
                if http_err.response is not None and http_err.response.status_code in [
                    401,
                    403,
                ]:
                    error_msg = (
                        f"Authentication failed for {service_name} "
                        f"({http_err.response.status_code}). "
                        "Token may be expired or invalid. Please verify credentials."
                    )
                    logger.error(error_msg)
                    raise MCPAtlassianAuthenticationError(error_msg) from http_err
                else:
                    operation_name = getattr(func, "__name__", "API operation")
                    logger.error(
                        f"HTTP error during {operation_name}: {http_err}",
                        exc_info=False,
                    )
                    raise http_err
            except KeyError as e:
                operation_name = getattr(func, "__name__", "API operation")
                logger.error(f"Missing key in {operation_name} results: {str(e)}")
                return []
            except requests.RequestException as e:
                operation_name = getattr(func, "__name__", "API operation")
                logger.error(f"Network error during {operation_name}: {str(e)}")
                return []
            except (ValueError, TypeError) as e:
                operation_name = getattr(func, "__name__", "API operation")
                logger.error(f"Error processing {operation_name} results: {str(e)}")
                return []
            except Exception as e:  # noqa: BLE001 - Intentional fallback with logging
                operation_name = getattr(func, "__name__", "API operation")
                logger.error(f"Unexpected error during {operation_name}: {str(e)}")
                logger.debug(
                    f"Full exception details for {operation_name}:", exc_info=True
                )
                return []

        return wrapper

    return decorator

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/models/base.py:
--------------------------------------------------------------------------------

```python
"""
Base models and utility classes for the MCP Atlassian API models.

This module provides base classes and mixins that are used by the
Jira and Confluence models to ensure consistent behavior and reduce
code duplication.
"""

from datetime import datetime
from typing import Any, TypeVar

from pydantic import BaseModel

from .constants import EMPTY_STRING

# Type variable for the return type of from_api_response
T = TypeVar("T", bound="ApiModel")


class ApiModel(BaseModel):
    """
    Base model for all API models with common conversion methods.

    This provides a standard interface for converting API responses
    to models and for converting models to simplified dictionaries
    for API responses.
    """

    @classmethod
    def from_api_response(cls: type[T], data: dict[str, Any], **kwargs: Any) -> T:
        """
        Convert an API response to a model instance.

        Args:
            data: The API response data
            **kwargs: Additional context parameters

        Returns:
            An instance of the model

        Raises:
            NotImplementedError: If the subclass does not implement this method
        """
        raise NotImplementedError("Subclasses must implement from_api_response")

    def to_simplified_dict(self) -> dict[str, Any]:
        """
        Convert the model to a simplified dictionary for API responses.

        Returns:
            A dictionary with only the essential fields for API responses
        """
        return self.model_dump(exclude_none=True)


class TimestampMixin:
    """
    Mixin for handling Atlassian API timestamp formats.
    """

    @staticmethod
    def format_timestamp(timestamp: str | None) -> str:
        """
        Format an Atlassian timestamp to a human-readable format.

        Args:
            timestamp: An ISO 8601 timestamp string

        Returns:
            A formatted date string or empty string if the input is invalid
        """
        if not timestamp:
            return EMPTY_STRING

        try:
            # Parse ISO 8601 format like "2024-01-01T10:00:00.000+0000"
            # Convert Z format to +00:00 for compatibility with fromisoformat
            ts = timestamp.replace("Z", "+00:00")

            # Handle timezone format without colon (+0000 -> +00:00)
            if "+" in ts and ":" not in ts[-5:]:
                tz_pos = ts.rfind("+")
                if tz_pos != -1 and len(ts) >= tz_pos + 5:
                    ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]
            elif "-" in ts and ":" not in ts[-5:]:
                tz_pos = ts.rfind("-")
                if tz_pos != -1 and len(ts) >= tz_pos + 5:
                    ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]

            dt = datetime.fromisoformat(ts)
            return dt.strftime("%Y-%m-%d %H:%M:%S")
        except (ValueError, TypeError):
            return timestamp or EMPTY_STRING

    @staticmethod
    def is_valid_timestamp(timestamp: str | None) -> bool:
        """
        Check if a string is a valid ISO 8601 timestamp.

        Args:
            timestamp: The string to check

        Returns:
            True if the string is a valid timestamp, False otherwise
        """
        if not timestamp:
            return False

        try:
            # Convert Z format to +00:00 for compatibility with fromisoformat
            ts = timestamp.replace("Z", "+00:00")

            # Handle timezone format without colon (+0000 -> +00:00)
            if "+" in ts and ":" not in ts[-5:]:
                tz_pos = ts.rfind("+")
                if tz_pos != -1 and len(ts) >= tz_pos + 5:
                    ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]
            elif "-" in ts and ":" not in ts[-5:]:
                tz_pos = ts.rfind("-")
                if tz_pos != -1 and len(ts) >= tz_pos + 5:
                    ts = ts[: tz_pos + 3] + ":" + ts[tz_pos + 3 :]

            datetime.fromisoformat(ts)
            return True
        except (ValueError, TypeError):
            return False

```

--------------------------------------------------------------------------------
/tests/utils/factories.py:
--------------------------------------------------------------------------------

```python
"""Test data factories for creating consistent test objects."""

from typing import Any


class JiraIssueFactory:
    """Factory for creating Jira issue test data."""

    @staticmethod
    def create(key: str = "TEST-123", **overrides) -> dict[str, Any]:
        """Create a Jira issue with default values."""
        defaults = {
            "id": "12345",
            "key": key,
            "self": f"https://test.atlassian.net/rest/api/3/issue/{key}",
            "fields": {
                "summary": "Test Issue Summary",
                "description": "Test issue description",
                "status": {"name": "Open", "id": "1", "statusCategory": {"key": "new"}},
                "issuetype": {"name": "Task", "id": "10001"},
                "priority": {"name": "Medium", "id": "3"},
                "assignee": {
                    "displayName": "Test User",
                    "emailAddress": "[email protected]",
                },
                "created": "2023-01-01T12:00:00.000+0000",
                "updated": "2023-01-01T12:00:00.000+0000",
            },
        }
        return deep_merge(defaults, overrides)

    @staticmethod
    def create_minimal(key: str = "TEST-123") -> dict[str, Any]:
        """Create minimal Jira issue for basic tests."""
        return {
            "key": key,
            "fields": {"summary": "Test Issue", "status": {"name": "Open"}},
        }


class ConfluencePageFactory:
    """Factory for creating Confluence page test data."""

    @staticmethod
    def create(page_id: str = "123456", **overrides) -> dict[str, Any]:
        """Create a Confluence page with default values."""
        defaults = {
            "id": page_id,
            "title": "Test Page",
            "type": "page",
            "status": "current",
            "space": {"key": "TEST", "name": "Test Space"},
            "body": {
                "storage": {"value": "<p>Test content</p>", "representation": "storage"}
            },
            "version": {"number": 1},
            "_links": {
                "webui": f"/spaces/TEST/pages/{page_id}",
                "self": f"https://test.atlassian.net/wiki/rest/api/content/{page_id}",
            },
        }
        return deep_merge(defaults, overrides)


class AuthConfigFactory:
    """Factory for authentication configuration objects."""

    @staticmethod
    def create_oauth_config(**overrides) -> dict[str, str]:
        """Create OAuth configuration."""
        defaults = {
            "client_id": "test-client-id",
            "client_secret": "test-client-secret",
            "redirect_uri": "http://localhost:8080/callback",
            "scope": "read:jira-work write:jira-work",
            "cloud_id": "test-cloud-id",
            "access_token": "test-access-token",
            "refresh_token": "test-refresh-token",
        }
        return {**defaults, **overrides}

    @staticmethod
    def create_basic_auth_config(**overrides) -> dict[str, str]:
        """Create basic auth configuration."""
        defaults = {
            "url": "https://test.atlassian.net",
            "username": "[email protected]",
            "api_token": "test-api-token",
        }
        return {**defaults, **overrides}


class ErrorResponseFactory:
    """Factory for creating error response test data."""

    @staticmethod
    def create_api_error(
        status_code: int = 400, message: str = "Bad Request"
    ) -> dict[str, Any]:
        """Create API error response."""
        return {"errorMessages": [message], "errors": {}, "status": status_code}

    @staticmethod
    def create_auth_error() -> dict[str, Any]:
        """Create authentication error response."""
        return {"errorMessages": ["Authentication failed"], "status": 401}


def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
    """Deep merge two dictionaries."""
    result = base.copy()
    for key, value in override.items():
        if key in result and isinstance(result[key], dict) and isinstance(value, dict):
            result[key] = deep_merge(result[key], value)
        else:
            result[key] = value
    return result

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.yml:
--------------------------------------------------------------------------------

```yaml
name: "\U0001F41B Bug Report"
description: Create a report to help us improve mcp-atlassian
title: "[Bug]: "
labels: ["bug"]
body:
  - type: markdown
    attributes:
      value: |
        Thanks for taking the time to fill out this bug report! Please provide as much detail as possible.
  - type: checkboxes
    id: prerequisites
    attributes:
      label: Prerequisites
      description: Please confirm the following before submitting the issue.
      options:
        - label: I have searched the [existing issues](https://github.com/sooperset/mcp-atlassian/issues) to make sure this bug has not already been reported.
          required: true
        - label: I have checked the [README](https://github.com/sooperset/mcp-atlassian/blob/main/README.md) for relevant information.
          required: true
  - type: textarea
    id: description
    attributes:
      label: Bug Description
      description: A clear and concise description of what the bug is.
      placeholder: "When I call the `jira_create_issue` tool with..., it fails with..."
    validations:
      required: true
  - type: textarea
    id: steps-to-reproduce
    attributes:
      label: Steps to Reproduce
      description: Provide detailed steps to reproduce the behavior.
      placeholder: |
        1. Start the server with command `...`
        2. Send a `list_tools` request using `...`
        3. Call the tool `xyz` with arguments `...`
        4. See error `...`
    validations:
      required: true
  - type: textarea
    id: expected-behavior
    attributes:
      label: Expected Behavior
      description: A clear and concise description of what you expected to happen.
      placeholder: "I expected the Jira issue to be created successfully and return its key."
    validations:
      required: true
  - type: textarea
    id: actual-behavior
    attributes:
      label: Actual Behavior
      description: What actually happened? Include full error messages, logs (from the server and the client if possible), or screenshots.
      placeholder: "The server returned an error message: '...' / The tool call returned an empty list."
      render: shell
    validations:
      required: true
  - type: input
    id: version
    attributes:
      label: mcp-atlassian Version
      description: Which version of `mcp-atlassian` are you using? (Check `pip show mcp-atlassian` or `pyproject.toml`)
      placeholder: "e.g., 0.6.5"
    validations:
      required: true
  - type: dropdown
    id: installation
    attributes:
      label: Installation Method
      description: How did you install `mcp-atlassian`?
      options:
        - From PyPI (pip install mcp-atlassian / uv add mcp-atlassian)
        - From source (git clone)
        - Docker
        - Other
    validations:
      required: true
  - type: dropdown
    id: os
    attributes:
      label: Operating System
      description: What operating system are you using?
      options:
        - Windows
        - macOS
        - Linux (Specify distribution below if relevant)
        - Other
    validations:
      required: true
  - type: input
    id: python-version
    attributes:
      label: Python Version
      description: What version of Python are you using? (`python --version`)
      placeholder: "e.g., 3.11.4"
    validations:
      required: true
  - type: dropdown
    id: atlassian-instance
    attributes:
      label: Atlassian Instance Type
      description: Are you connecting to Atlassian Cloud or Server/Data Center?
      multiple: true
      options:
        - Jira Cloud
        - Jira Server / Data Center
        - Confluence Cloud
        - Confluence Server / Data Center
    validations:
      required: true
  - type: input
    id: client-app
    attributes:
      label: Client Application
      description: What application/library are you using to interact with the MCP server? (This is important!)
      placeholder: "e.g., Cursor, LangChain, custom script, Inspector Tool"
    validations:
      required: true
  - type: textarea
    id: additional-context
    attributes:
      label: Additional Context
      description: Add any other context about the problem here (e.g., environment variables, network configuration like proxies, specific Jira/Confluence setup).

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/models/confluence/user_search.py:
--------------------------------------------------------------------------------

```python
"""
Confluence user search result models.
This module provides Pydantic models for Confluence user search results.
"""

import logging
from typing import Any

from pydantic import Field

from ..base import ApiModel, TimestampMixin
from .common import ConfluenceUser

logger = logging.getLogger(__name__)


class ConfluenceUserSearchResult(ApiModel):
    """
    Model representing a single user search result.
    """

    user: ConfluenceUser | None = None
    title: str | None = None
    excerpt: str | None = None
    url: str | None = None
    entity_type: str = "user"
    last_modified: str | None = None
    score: float = 0.0

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "ConfluenceUserSearchResult":
        """
        Create a ConfluenceUserSearchResult from a Confluence API response.

        Args:
            data: The user search result data from the Confluence API
            **kwargs: Additional context parameters

        Returns:
            A ConfluenceUserSearchResult instance
        """
        if not data:
            return cls()

        # Extract user data from the result
        user_data = data.get("user", {})
        user = ConfluenceUser.from_api_response(user_data) if user_data else None

        return cls(
            user=user,
            title=data.get("title"),
            excerpt=data.get("excerpt"),
            url=data.get("url"),
            entity_type=data.get("entityType", "user"),
            last_modified=data.get("lastModified"),
            score=data.get("score", 0.0),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "entity_type": self.entity_type,
            "title": self.title,
            "score": self.score,
        }

        if self.user:
            result["user"] = {
                "account_id": self.user.account_id,
                "display_name": self.user.display_name,
                "email": self.user.email,
                "profile_picture": self.user.profile_picture,
                "is_active": self.user.is_active,
            }

        if self.url:
            result["url"] = self.url

        if self.last_modified:
            result["last_modified"] = self.last_modified

        if self.excerpt:
            result["excerpt"] = self.excerpt

        return result


class ConfluenceUserSearchResults(ApiModel, TimestampMixin):
    """
    Model representing a collection of user search results.
    """

    total_size: int = 0
    start: int = 0
    limit: int = 0
    results: list[ConfluenceUserSearchResult] = Field(default_factory=list)
    cql_query: str | None = None
    search_duration: int | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "ConfluenceUserSearchResults":
        """
        Create a ConfluenceUserSearchResults from a Confluence API response.

        Args:
            data: The search result data from the Confluence API
            **kwargs: Additional context parameters

        Returns:
            A ConfluenceUserSearchResults instance
        """
        if not data:
            return cls()

        # Convert search results to ConfluenceUserSearchResult models
        results = []
        for result_data in data.get("results", []):
            user_result = ConfluenceUserSearchResult.from_api_response(
                result_data, **kwargs
            )
            results.append(user_result)

        return cls(
            total_size=data.get("totalSize", 0),
            start=data.get("start", 0),
            limit=data.get("limit", 0),
            results=results,
            cql_query=data.get("cqlQuery"),
            search_duration=data.get("searchDuration"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        return {
            "total_size": self.total_size,
            "start": self.start,
            "limit": self.limit,
            "cql_query": self.cql_query,
            "search_duration": self.search_duration,
            "results": [result.to_simplified_dict() for result in self.results],
        }

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/agile.py:
--------------------------------------------------------------------------------

```python
"""
Jira agile models.

This module provides Pydantic models for Jira agile entities,
such as boards and sprints.
"""

import logging
from typing import Any

from ..base import ApiModel
from ..constants import (
    EMPTY_STRING,
    JIRA_DEFAULT_ID,
    UNKNOWN,
)

logger = logging.getLogger(__name__)


class JiraBoard(ApiModel):
    """
    Model representing a Jira board.
    """

    id: str = JIRA_DEFAULT_ID
    name: str = UNKNOWN
    type: str = UNKNOWN

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraBoard":
        """
        Create a JiraBoard from a Jira API response.

        Args:
            data: The board data from the Jira API

        Returns:
            A JiraBoard instance
        """
        if not data:
            return cls()

        # Handle non-dictionary data by returning a default instance
        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Ensure ID is a string
        board_id = data.get("id", JIRA_DEFAULT_ID)
        if board_id is not None:
            board_id = str(board_id)

        # We assume boards always have a name and type, but enforce strings
        board_name = str(data.get("name", UNKNOWN))
        board_type = str(data.get("type", UNKNOWN))

        return cls(
            id=board_id,
            name=board_name,
            type=board_type,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        return {
            "id": self.id,
            "name": self.name,
            "type": self.type,
        }


class JiraSprint(ApiModel):
    """
    Model representing a Jira sprint.
    """

    id: str = JIRA_DEFAULT_ID
    state: str = UNKNOWN
    name: str = UNKNOWN
    start_date: str = EMPTY_STRING
    end_date: str = EMPTY_STRING
    activated_date: str = EMPTY_STRING
    origin_board_id: str = JIRA_DEFAULT_ID
    goal: str = EMPTY_STRING
    synced: bool = False
    auto_start_stop: bool = False

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraSprint":
        """
        Create a JiraSprint from a Jira API response.

        Args:
            data: The sprint data from the Jira API

        Returns:
            A JiraSprint instance
        """
        if not data:
            return cls()

        # Handle non-dictionary data by returning a default instance
        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Ensure ID and origin board ID are strings
        sprint_id = data.get("id", JIRA_DEFAULT_ID)
        if sprint_id is not None:
            sprint_id = str(sprint_id)

        origin_board_id = data.get("originBoardId", JIRA_DEFAULT_ID)
        if origin_board_id is not None:
            origin_board_id = str(origin_board_id)

        # Boolean fields
        synced = bool(data.get("synced", False))
        auto_start_stop = bool(data.get("autoStartStop", False))

        return cls(
            id=sprint_id,
            state=str(data.get("state", UNKNOWN)),
            name=str(data.get("name", UNKNOWN)),
            start_date=str(data.get("startDate", EMPTY_STRING)),
            end_date=str(data.get("endDate", EMPTY_STRING)),
            activated_date=str(data.get("activatedDate", EMPTY_STRING)),
            origin_board_id=origin_board_id,
            goal=str(data.get("goal", EMPTY_STRING)),
            synced=synced,
            auto_start_stop=auto_start_stop,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "id": self.id,
            "name": self.name,
            "state": self.state,
        }

        if self.goal and self.goal != EMPTY_STRING:
            result["goal"] = self.goal

        # Only include dates if they're not empty
        if self.start_date and self.start_date != EMPTY_STRING:
            result["start_date"] = self.start_date

        if self.end_date and self.end_date != EMPTY_STRING:
            result["end_date"] = self.end_date

        return result

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery.ai configuration for mcp-atlassian
startCommand:
  type: stdio # Specifies the server communicates over standard input/output
  configSchema:
    # JSON Schema defining the configuration options users need to provide
    type: object
    required:
      - jiraUrl
      - confluenceUrl
      # Add other strictly required credentials based on auth needs
    properties:
      # Confluence Config
      confluenceUrl:
        type: string
        description: "Base URL for your Confluence instance (e.g., https://your-domain.atlassian.net/wiki or https://confluence.yourcompany.com)."
      confluenceUsername:
        type: string
        description: "(Optional for Cloud Basic Auth) Your Confluence username or email."
      confluenceApiToken:
        type: string
        description: "(Optional for Cloud Basic Auth) Your Confluence API token."
        format: password # Hides the value in UI inputs
      confluencePersonalToken:
        type: string
        description: "(Optional for Server/DC Token Auth) Your Confluence Personal Access Token."
        format: password
      confluenceSslVerify:
        type: boolean
        description: "(Optional, Server/DC only) Verify SSL certificate for Confluence. Defaults to true."
        default: true
      confluenceSpacesFilter:
        type: string
        description: "(Optional) Comma-separated list of Confluence space keys to limit searches to (e.g., 'DEV,QA')."

      # Jira Config
      jiraUrl:
        type: string
        description: "Base URL for your Jira instance (e.g., https://your-domain.atlassian.net or https://jira.yourcompany.com)."
      jiraUsername:
        type: string
        description: "(Optional for Cloud Basic Auth or Server/DC Basic Auth) Your Jira username or email."
      jiraApiToken:
        type: string
        description: "(Optional for Cloud Basic Auth or Server/DC Basic Auth) Your Jira API token."
        format: password
      jiraPersonalToken:
        type: string
        description: "(Optional for Server/DC Token Auth) Your Jira Personal Access Token."
        format: password
      jiraSslVerify:
        type: boolean
        description: "(Optional, Server/DC only) Verify SSL certificate for Jira. Defaults to true."
        default: true
      jiraProjectsFilter:
        type: string
        description: "(Optional) Comma-separated list of Jira project keys to limit searches to (e.g., 'PROJ1,PROJ2')."

      # General Config
      readOnlyMode:
        type: boolean
        description: "(Optional) Run in read-only mode (prevents create/update/delete). Defaults to false."
        default: false

    additionalProperties: false # Disallow properties not defined above

  commandFunction:
    # A JavaScript function that produces the CLI command and environment variables
    # needed to start the MCP server, based on the user's configuration.
    |-
    (config) => {
      // The command matches the ENTRYPOINT in the Dockerfile
      const command = 'mcp-atlassian';
      const args = []; // No arguments needed as config is via ENV

      // Map the config schema properties to the environment variables
      const env = {
        // Confluence ENV VARS
        CONFLUENCE_URL: config.confluenceUrl,
        CONFLUENCE_USERNAME: config.confluenceUsername,
        CONFLUENCE_API_TOKEN: config.confluenceApiToken,
        CONFLUENCE_PERSONAL_TOKEN: config.confluencePersonalToken,
        CONFLUENCE_SSL_VERIFY: config.confluenceSslVerify !== undefined ? String(config.confluenceSslVerify) : 'true',
        CONFLUENCE_SPACES_FILTER: config.confluenceSpacesFilter,

        // Jira ENV VARS
        JIRA_URL: config.jiraUrl,
        JIRA_USERNAME: config.jiraUsername,
        JIRA_API_TOKEN: config.jiraApiToken,
        JIRA_PERSONAL_TOKEN: config.jiraPersonalToken,
        JIRA_SSL_VERIFY: config.jiraSslVerify !== undefined ? String(config.jiraSslVerify) : 'true',
        JIRA_PROJECTS_FILTER: config.jiraProjectsFilter,

        // General ENV VARS
        READ_ONLY_MODE: config.readOnlyMode !== undefined ? String(config.readOnlyMode) : 'false',
      };

      // Filter out undefined/null env variables
      const filteredEnv = Object.entries(env)
        .filter(([key, value]) => value !== undefined && value !== null)
        .reduce((obj, [key, value]) => {
          obj[key] = value;
          return obj;
        }, {});

      return {
        command: command,
        args: args,
        env: filteredEnv
      };
    }

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/search.py:
--------------------------------------------------------------------------------

```python
"""Module for Confluence search operations."""

import logging

from ..models.confluence import (
    ConfluencePage,
    ConfluenceSearchResult,
    ConfluenceUserSearchResult,
    ConfluenceUserSearchResults,
)
from ..utils.decorators import handle_atlassian_api_errors
from .client import ConfluenceClient
from .utils import quote_cql_identifier_if_needed

logger = logging.getLogger("mcp-atlassian")


class SearchMixin(ConfluenceClient):
    """Mixin for Confluence search operations."""

    @handle_atlassian_api_errors("Confluence API")
    def search(
        self, cql: str, limit: int = 10, spaces_filter: str | None = None
    ) -> list[ConfluencePage]:
        """
        Search content using Confluence Query Language (CQL).

        Args:
            cql: Confluence Query Language string
            limit: Maximum number of results to return
            spaces_filter: Optional comma-separated list of space keys to filter by,
                overrides config

        Returns:
            List of ConfluencePage models containing search results

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the
                Confluence API (401/403)
        """
        # Use spaces_filter parameter if provided, otherwise fall back to config
        filter_to_use = spaces_filter or self.config.spaces_filter

        # Apply spaces filter if present
        if filter_to_use:
            # Split spaces filter by commas and handle possible whitespace
            spaces = [s.strip() for s in filter_to_use.split(",")]

            # Build the space filter query part using proper quoting for each space key
            space_query = " OR ".join(
                [f"space = {quote_cql_identifier_if_needed(space)}" for space in spaces]
            )

            # Add the space filter to existing query with parentheses
            if cql and space_query:
                if "space = " not in cql:  # Only add if not already filtering by space
                    cql = f"({cql}) AND ({space_query})"
            else:
                cql = space_query

            logger.info(f"Applied spaces filter to query: {cql}")

        # Execute the CQL search query
        results = self.confluence.cql(cql=cql, limit=limit)

        # Convert the response to a search result model
        search_result = ConfluenceSearchResult.from_api_response(
            results,
            base_url=self.config.url,
            cql_query=cql,
            is_cloud=self.config.is_cloud,
        )

        # Process result excerpts as content
        processed_pages = []
        for page in search_result.results:
            # Get the excerpt from the original search results
            for result_item in results.get("results", []):
                if result_item.get("content", {}).get("id") == page.id:
                    excerpt = result_item.get("excerpt", "")
                    if excerpt:
                        # Process the excerpt as HTML content
                        space_key = page.space.key if page.space else ""
                        _, processed_markdown = self.preprocessor.process_html_content(
                            excerpt,
                            space_key=space_key,
                            confluence_client=self.confluence,
                        )
                        # Create a new page with processed content
                        page.content = processed_markdown
                    break

            processed_pages.append(page)

        # Return the list of result pages with processed content
        return processed_pages

    @handle_atlassian_api_errors("Confluence API")
    def search_user(
        self, cql: str, limit: int = 10
    ) -> list[ConfluenceUserSearchResult]:
        """
        Search users using Confluence Query Language (CQL).

        Args:
            cql: Confluence Query Language string for user search
            limit: Maximum number of results to return

        Returns:
            List of ConfluenceUserSearchResult models containing user search results

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the
                Confluence API (401/403)
        """
        # Execute the user search query using the direct API endpoint
        results = self.confluence.get(
            "rest/api/search/user", params={"cql": cql, "limit": limit}
        )

        # Convert the response to a user search result model
        search_result = ConfluenceUserSearchResults.from_api_response(results or {})

        # Return the list of user search results
        return search_result.results

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_labels.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the LabelsMixin class."""

from unittest.mock import patch

import pytest
import requests

from mcp_atlassian.confluence.labels import LabelsMixin
from mcp_atlassian.models.confluence import ConfluenceLabel


class TestLabelsMixin:
    """Tests for the LabelsMixin class."""

    @pytest.fixture
    def labels_mixin(self, confluence_client):
        """Create a LabelsMixin instance for testing."""
        # LabelsMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.labels.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = LabelsMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = confluence_client.confluence
            mixin.config = confluence_client.config
            mixin.preprocessor = confluence_client.preprocessor
            return mixin

    def test_get_page_labels_success(self, labels_mixin):
        """Test get_page_labels with success response."""
        # Setup
        page_id = "12345"

        # Call the method
        result = labels_mixin.get_page_labels(page_id)

        # Verify
        labels_mixin.confluence.get_page_labels.assert_called_once_with(page_id=page_id)
        assert len(result) == 3
        assert result[0].id == "456789123"
        assert result[0].prefix == "global"
        assert result[0].label == "meeting-notes"
        assert result[1].id == "456789124"
        assert result[1].prefix == "my"
        assert result[1].name == "important"
        assert result[2].id == "456789125"
        assert result[2].name == "test"

    def test_get_page_labels_api_error(self, labels_mixin):
        """Test handling of API errors."""
        # Mock the API to raise an exception
        labels_mixin.confluence.get_page_labels.side_effect = requests.RequestException(
            "API error"
        )

        # Act/Assert
        with pytest.raises(Exception, match="Failed fetching labels"):
            labels_mixin.get_page_labels("987654321")

    def test_get_page_labels_key_error(self, labels_mixin):
        """Test handling of missing keys in API response."""
        # Mock the response to be missing expected keys
        labels_mixin.confluence.get_page_labels.return_value = {"invalid": "data"}

        # Act/Assert
        with pytest.raises(Exception, match="Failed fetching labels"):
            labels_mixin.get_page_labels("987654321")

    def test_get_page_labels_value_error(self, labels_mixin):
        """Test handling of unexpected data types."""
        # Cause a value error by returning a string where a dict is expected
        labels_mixin.confluence.get_page_labels.return_value = "invalid"

        # Act/Assert
        with pytest.raises(Exception, match="Failed fetching labels"):
            labels_mixin.get_page_labels("987654321")

    def test_get_page_labels_with_empty_results(self, labels_mixin):
        """Test handling of empty results."""
        # Mock empty results
        labels_mixin.confluence.get_page_labels.return_value = {"results": []}

        # Act
        result = labels_mixin.get_page_labels("987654321")

        # Assert
        assert isinstance(result, list)
        assert len(result) == 0  # Empty list with no labels

    def test_add_page_label_success(self, labels_mixin):
        """Test adding a label"""
        # Arrange
        page_id = "987654321"
        name = "test-label"
        prefix = "global"

        # Mock add_page_label to return a list of ConfluenceLabels
        with patch.object(
            labels_mixin,
            "get_page_labels",
            return_value=ConfluenceLabel(
                id="123456789",
                name=name,
                prefix=prefix,
            ),
        ):
            # Act
            result = labels_mixin.add_page_label(page_id, name)

            # Assert
            labels_mixin.confluence.set_page_label.assert_called_once_with(
                page_id=page_id, label=name
            )

            # Verify result is a ConfluenceLabel
            assert isinstance(result, ConfluenceLabel)
            assert result.id == "123456789"
            assert result.name == name
            assert result.prefix == prefix

    def test_add_page_label_error(self, labels_mixin):
        """Test error handling when adding a label."""
        # Arrange
        labels_mixin.confluence.set_page_label.side_effect = Exception("API Error")

        # Act/Assert
        with pytest.raises(Exception, match="Failed to add label"):
            labels_mixin.add_page_label("987654321", "test")

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_custom_headers.py:
--------------------------------------------------------------------------------

```python
"""Tests for JIRA custom headers functionality."""

import os
from unittest.mock import MagicMock, patch

from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig


class TestJiraConfigCustomHeaders:
    """Test JiraConfig parsing of custom headers."""

    def test_no_custom_headers(self):
        """Test JiraConfig when no custom headers are configured."""
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "test_user",
                "JIRA_API_TOKEN": "test_token",
            },
            clear=True,
        ):
            config = JiraConfig.from_env()
            assert config.custom_headers == {}

    def test_service_specific_headers_only(self):
        """Test JiraConfig parsing of service-specific headers only."""
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "test_user",
                "JIRA_API_TOKEN": "test_token",
                "JIRA_CUSTOM_HEADERS": "X-Jira-Specific=jira_value,X-Service=service_value",
            },
            clear=True,
        ):
            config = JiraConfig.from_env()
            expected = {"X-Jira-Specific": "jira_value", "X-Service": "service_value"}
            assert config.custom_headers == expected

    def test_malformed_headers_are_ignored(self):
        """Test that malformed headers are ignored gracefully."""
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "test_user",
                "JIRA_API_TOKEN": "test_token",
                "JIRA_CUSTOM_HEADERS": "malformed-header,X-Valid=valid_value,another-malformed",
            },
            clear=True,
        ):
            config = JiraConfig.from_env()
            expected = {"X-Valid": "valid_value"}
            assert config.custom_headers == expected

    def test_empty_header_strings(self):
        """Test handling of empty header strings."""
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "test_user",
                "JIRA_API_TOKEN": "test_token",
                "JIRA_CUSTOM_HEADERS": "   ",
            },
            clear=True,
        ):
            config = JiraConfig.from_env()
            assert config.custom_headers == {}


class TestJiraClientCustomHeaders:
    """Test JiraClient custom headers application."""

    def test_no_custom_headers_applied(self, monkeypatch):
        """Test that no headers are applied when none are configured."""
        # Mock Jira and related dependencies
        mock_jira = MagicMock()
        mock_session = MagicMock()
        mock_session.headers = {}
        mock_jira._session = mock_session

        monkeypatch.setattr(
            "mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
        )
        monkeypatch.setattr(
            "mcp_atlassian.jira.client.configure_ssl_verification",
            lambda **kwargs: None,
        )

        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test_user",
            api_token="test_token",
            custom_headers={},
        )

        client = JiraClient(config=config)

        # Verify no custom headers were applied
        assert mock_session.headers == {}

    def test_custom_headers_applied_to_session(self, monkeypatch):
        """Test that custom headers are applied to the JIRA session."""
        # Mock Jira and related dependencies
        mock_jira = MagicMock()
        mock_session = MagicMock()
        mock_session.headers = {}
        mock_jira._session = mock_session

        monkeypatch.setattr(
            "mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
        )
        monkeypatch.setattr(
            "mcp_atlassian.jira.client.configure_ssl_verification",
            lambda **kwargs: None,
        )

        custom_headers = {
            "X-Corp-Auth": "token123",
            "X-Dept": "engineering",
            "User-Agent": "CustomJiraClient/1.0",
        }

        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="basic",
            username="test_user",
            api_token="test_token",
            custom_headers=custom_headers,
        )

        client = JiraClient(config=config)

        # Verify custom headers were applied to session
        for header_name, header_value in custom_headers.items():
            assert mock_session.headers[header_name] == header_value

```

--------------------------------------------------------------------------------
/tests/unit/models/test_constants.py:
--------------------------------------------------------------------------------

```python
"""Tests for model constants.

Focused tests for model constants, validating correct values and business logic.
"""

from mcp_atlassian.models.constants import (
    # Confluence defaults
    CONFLUENCE_DEFAULT_ID,
    CONFLUENCE_DEFAULT_SPACE,
    CONFLUENCE_DEFAULT_VERSION,
    # Date/Time defaults
    DEFAULT_TIMESTAMP,
    # Common defaults
    EMPTY_STRING,
    # Jira defaults
    JIRA_DEFAULT_ID,
    JIRA_DEFAULT_ISSUE_TYPE,
    JIRA_DEFAULT_KEY,
    JIRA_DEFAULT_PRIORITY,
    JIRA_DEFAULT_PROJECT,
    JIRA_DEFAULT_STATUS,
    NONE_VALUE,
    UNASSIGNED,
    UNKNOWN,
)


class TestCommonDefaults:
    """Test suite for common default constants."""

    def test_string_constants_values(self):
        """Test that common string constants have expected values."""
        assert EMPTY_STRING == ""
        assert UNKNOWN == "Unknown"
        assert UNASSIGNED == "Unassigned"
        assert NONE_VALUE == "None"

    def test_string_constants_types(self):
        """Test that all string constants are strings."""
        assert isinstance(EMPTY_STRING, str)
        assert isinstance(UNKNOWN, str)
        assert isinstance(UNASSIGNED, str)
        assert isinstance(NONE_VALUE, str)


class TestJiraDefaults:
    """Test suite for Jira default constants."""

    def test_jira_id_and_key_values(self):
        """Test Jira ID and key default values."""
        assert JIRA_DEFAULT_ID == "0"
        assert JIRA_DEFAULT_KEY == "UNKNOWN-0"
        assert JIRA_DEFAULT_PROJECT == "0"

    def test_jira_default_dict_structures(self):
        """Test that Jira default dictionaries have correct structure."""
        # Status
        assert isinstance(JIRA_DEFAULT_STATUS, dict)
        assert JIRA_DEFAULT_STATUS == {"name": UNKNOWN, "id": JIRA_DEFAULT_ID}

        # Priority
        assert isinstance(JIRA_DEFAULT_PRIORITY, dict)
        assert JIRA_DEFAULT_PRIORITY == {"name": NONE_VALUE, "id": JIRA_DEFAULT_ID}

        # Issue Type
        assert isinstance(JIRA_DEFAULT_ISSUE_TYPE, dict)
        assert JIRA_DEFAULT_ISSUE_TYPE == {"name": UNKNOWN, "id": JIRA_DEFAULT_ID}

    def test_jira_key_format(self):
        """Test that Jira key follows expected format."""
        parts = JIRA_DEFAULT_KEY.split("-")
        assert len(parts) == 2
        assert parts[0] == "UNKNOWN"
        assert parts[1] == "0"


class TestConfluenceDefaults:
    """Test suite for Confluence default constants."""

    def test_confluence_id_value(self):
        """Test Confluence default ID value."""
        assert CONFLUENCE_DEFAULT_ID == "0"

    def test_confluence_default_space_structure(self):
        """Test that Confluence default space has correct structure."""
        assert isinstance(CONFLUENCE_DEFAULT_SPACE, dict)
        expected_space = {
            "key": EMPTY_STRING,
            "name": UNKNOWN,
            "id": CONFLUENCE_DEFAULT_ID,
        }
        assert CONFLUENCE_DEFAULT_SPACE == expected_space

    def test_confluence_default_version_structure(self):
        """Test that Confluence default version has correct structure."""
        assert isinstance(CONFLUENCE_DEFAULT_VERSION, dict)
        expected_version = {"number": 0, "when": EMPTY_STRING}
        assert CONFLUENCE_DEFAULT_VERSION == expected_version
        assert isinstance(CONFLUENCE_DEFAULT_VERSION["number"], int)


class TestDateTimeDefaults:
    """Test suite for date/time default constants."""

    def test_default_timestamp_format(self):
        """Test that DEFAULT_TIMESTAMP has expected format."""
        assert DEFAULT_TIMESTAMP == "1970-01-01T00:00:00.000+0000"
        assert isinstance(DEFAULT_TIMESTAMP, str)
        assert DEFAULT_TIMESTAMP.startswith("1970-01-01T")
        assert "+0000" in DEFAULT_TIMESTAMP


class TestCrossReferenceConsistency:
    """Test suite for consistency between related constants."""

    def test_id_consistency(self):
        """Test that default IDs are consistent across structures."""
        assert JIRA_DEFAULT_STATUS["id"] == JIRA_DEFAULT_ID
        assert JIRA_DEFAULT_PRIORITY["id"] == JIRA_DEFAULT_ID
        assert JIRA_DEFAULT_ISSUE_TYPE["id"] == JIRA_DEFAULT_ID
        assert CONFLUENCE_DEFAULT_SPACE["id"] == CONFLUENCE_DEFAULT_ID

    def test_semantic_usage_consistency(self):
        """Test that semantically similar fields use consistent values."""
        # UNKNOWN used for required fields with unknown values
        assert JIRA_DEFAULT_STATUS["name"] == UNKNOWN
        assert JIRA_DEFAULT_ISSUE_TYPE["name"] == UNKNOWN
        assert CONFLUENCE_DEFAULT_SPACE["name"] == UNKNOWN

        # NONE_VALUE used for nullable/optional fields
        assert JIRA_DEFAULT_PRIORITY["name"] == NONE_VALUE

        # EMPTY_STRING used for optional string fields
        assert CONFLUENCE_DEFAULT_SPACE["key"] == EMPTY_STRING
        assert CONFLUENCE_DEFAULT_VERSION["when"] == EMPTY_STRING

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/environment.py:
--------------------------------------------------------------------------------

```python
"""Utility functions related to environment checking."""

import logging
import os

from .urls import is_atlassian_cloud_url

logger = logging.getLogger("mcp-atlassian.utils.environment")


def get_available_services() -> dict[str, bool | None]:
    """Determine which services are available based on environment variables."""
    confluence_url = os.getenv("CONFLUENCE_URL")
    confluence_is_setup = False
    if confluence_url:
        is_cloud = is_atlassian_cloud_url(confluence_url)

        # OAuth check (highest precedence, applies to Cloud)
        if all(
            [
                os.getenv("ATLASSIAN_OAUTH_CLIENT_ID"),
                os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET"),
                os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI"),
                os.getenv("ATLASSIAN_OAUTH_SCOPE"),
                os.getenv(
                    "ATLASSIAN_OAUTH_CLOUD_ID"
                ),  # CLOUD_ID is essential for OAuth client init
            ]
        ):
            confluence_is_setup = True
            logger.info(
                "Using Confluence OAuth 2.0 (3LO) authentication (Cloud-only features)"
            )
        elif all(
            [
                os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN"),
                os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
            ]
        ):
            confluence_is_setup = True
            logger.info(
                "Using Confluence OAuth 2.0 (3LO) authentication (Cloud-only features) "
                "with provided access token"
            )
        elif is_cloud:  # Cloud non-OAuth
            if all(
                [
                    os.getenv("CONFLUENCE_USERNAME"),
                    os.getenv("CONFLUENCE_API_TOKEN"),
                ]
            ):
                confluence_is_setup = True
                logger.info("Using Confluence Cloud Basic Authentication (API Token)")
        else:  # Server/Data Center non-OAuth
            if os.getenv("CONFLUENCE_PERSONAL_TOKEN") or (
                os.getenv("CONFLUENCE_USERNAME") and os.getenv("CONFLUENCE_API_TOKEN")
            ):
                confluence_is_setup = True
                logger.info(
                    "Using Confluence Server/Data Center authentication (PAT or Basic Auth)"
                )
    elif os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
        confluence_is_setup = True
        logger.info(
            "Using Confluence minimal OAuth configuration - expecting user-provided tokens via headers"
        )

    jira_url = os.getenv("JIRA_URL")
    jira_is_setup = False
    if jira_url:
        is_cloud = is_atlassian_cloud_url(jira_url)

        # OAuth check (highest precedence, applies to Cloud)
        if all(
            [
                os.getenv("ATLASSIAN_OAUTH_CLIENT_ID"),
                os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET"),
                os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI"),
                os.getenv("ATLASSIAN_OAUTH_SCOPE"),
                os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
            ]
        ):
            jira_is_setup = True
            logger.info(
                "Using Jira OAuth 2.0 (3LO) authentication (Cloud-only features)"
            )
        elif all(
            [
                os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN"),
                os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
            ]
        ):
            jira_is_setup = True
            logger.info(
                "Using Jira OAuth 2.0 (3LO) authentication (Cloud-only features) "
                "with provided access token"
            )
        elif is_cloud:  # Cloud non-OAuth
            if all(
                [
                    os.getenv("JIRA_USERNAME"),
                    os.getenv("JIRA_API_TOKEN"),
                ]
            ):
                jira_is_setup = True
                logger.info("Using Jira Cloud Basic Authentication (API Token)")
        else:  # Server/Data Center non-OAuth
            if os.getenv("JIRA_PERSONAL_TOKEN") or (
                os.getenv("JIRA_USERNAME") and os.getenv("JIRA_API_TOKEN")
            ):
                jira_is_setup = True
                logger.info(
                    "Using Jira Server/Data Center authentication (PAT or Basic Auth)"
                )
    elif os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in ("true", "1", "yes"):
        jira_is_setup = True
        logger.info(
            "Using Jira minimal OAuth configuration - expecting user-provided tokens via headers"
        )

    if not confluence_is_setup:
        logger.info(
            "Confluence is not configured or required environment variables are missing."
        )
    if not jira_is_setup:
        logger.info(
            "Jira is not configured or required environment variables are missing."
        )

    return {"confluence": confluence_is_setup, "jira": jira_is_setup}

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_v2_adapter.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for ConfluenceV2Adapter class."""

from unittest.mock import MagicMock, Mock

import pytest
import requests
from requests.exceptions import HTTPError

from mcp_atlassian.confluence.v2_adapter import ConfluenceV2Adapter


class TestConfluenceV2Adapter:
    """Test cases for ConfluenceV2Adapter."""

    @pytest.fixture
    def mock_session(self):
        """Create a mock session."""
        return MagicMock(spec=requests.Session)

    @pytest.fixture
    def v2_adapter(self, mock_session):
        """Create a ConfluenceV2Adapter instance."""
        return ConfluenceV2Adapter(
            session=mock_session, base_url="https://example.atlassian.net/wiki"
        )

    def test_get_page_success(self, v2_adapter, mock_session):
        """Test successful page retrieval."""
        # Mock the v2 API response
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "id": "123456",
            "status": "current",
            "title": "Test Page",
            "spaceId": "789",
            "version": {"number": 5},
            "body": {
                "storage": {"value": "<p>Test content</p>", "representation": "storage"}
            },
            "_links": {"webui": "/pages/viewpage.action?pageId=123456"},
        }
        mock_session.get.return_value = mock_response

        # Mock space key lookup
        space_response = Mock()
        space_response.status_code = 200
        space_response.json.return_value = {"key": "TEST"}
        mock_session.get.side_effect = [mock_response, space_response]

        # Call the method
        result = v2_adapter.get_page("123456")

        # Verify the API call
        assert mock_session.get.call_count == 2
        mock_session.get.assert_any_call(
            "https://example.atlassian.net/wiki/api/v2/pages/123456",
            params={"body-format": "storage"},
        )

        # Verify the response format
        assert result["id"] == "123456"
        assert result["type"] == "page"
        assert result["title"] == "Test Page"
        assert result["space"]["key"] == "TEST"
        assert result["space"]["id"] == "789"
        assert result["version"]["number"] == 5
        assert result["body"]["storage"]["value"] == "<p>Test content</p>"
        assert result["body"]["storage"]["representation"] == "storage"

    def test_get_page_not_found(self, v2_adapter, mock_session):
        """Test page retrieval when page doesn't exist."""
        # Mock a 404 response
        mock_response = Mock()
        mock_response.status_code = 404
        mock_response.text = "Page not found"
        mock_response.raise_for_status.side_effect = HTTPError(response=mock_response)
        mock_session.get.return_value = mock_response

        # Call the method and expect an exception
        with pytest.raises(ValueError, match="Failed to get page '999999'"):
            v2_adapter.get_page("999999")

    def test_get_page_with_minimal_response(self, v2_adapter, mock_session):
        """Test page retrieval with minimal v2 response."""
        # Mock the v2 API response without optional fields
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "id": "123456",
            "status": "current",
            "title": "Minimal Page",
        }
        mock_session.get.return_value = mock_response

        # Call the method
        result = v2_adapter.get_page("123456")

        # Verify the response handles missing fields gracefully
        assert result["id"] == "123456"
        assert result["type"] == "page"
        assert result["title"] == "Minimal Page"
        assert result["space"]["key"] == "unknown"  # Fallback when no spaceId
        assert result["version"]["number"] == 1  # Default version

    def test_get_page_network_error(self, v2_adapter, mock_session):
        """Test page retrieval with network error."""
        # Mock a network error
        mock_session.get.side_effect = requests.RequestException("Network error")

        # Call the method and expect an exception
        with pytest.raises(ValueError, match="Failed to get page '123456'"):
            v2_adapter.get_page("123456")

    def test_get_page_with_expand_parameter(self, v2_adapter, mock_session):
        """Test that expand parameter is accepted but not used."""
        # Mock the v2 API response
        mock_response = Mock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "id": "123456",
            "status": "current",
            "title": "Test Page",
        }
        mock_session.get.return_value = mock_response

        # Call with expand parameter
        result = v2_adapter.get_page("123456", expand="body.storage,version")

        # Verify the API call doesn't include expand in params
        mock_session.get.assert_called_once_with(
            "https://example.atlassian.net/wiki/api/v2/pages/123456",
            params={"body-format": "storage"},
        )

        # Verify we still get a result
        assert result["id"] == "123456"

```

--------------------------------------------------------------------------------
/tests/unit/test_main_transport_selection.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for transport selection and execution.

These tests verify that:
1. All transports use direct execution (no stdin monitoring)
2. Transport selection logic works correctly (CLI vs environment)
3. Error handling is preserved
"""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from mcp_atlassian import main


class TestMainTransportSelection:
    """Test the main function's transport-specific execution logic."""

    @pytest.fixture
    def mock_server(self):
        """Create a mock server instance."""
        server = MagicMock()
        server.run_async = AsyncMock(return_value=None)
        return server

    @pytest.fixture
    def mock_asyncio_run(self):
        """Mock asyncio.run to capture what coroutine is executed."""
        with patch("asyncio.run") as mock_run:
            # Store the coroutine for inspection
            mock_run.side_effect = lambda coro: setattr(mock_run, "_called_with", coro)
            yield mock_run

    @pytest.mark.parametrize("transport", ["stdio", "sse", "streamable-http"])
    def test_all_transports_use_direct_execution(
        self, mock_server, mock_asyncio_run, transport
    ):
        """Verify all transports use direct execution without stdin monitoring.

        This is a regression test for issues #519 and #524.
        """
        with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
            with patch.dict("os.environ", {"TRANSPORT": transport}):
                with patch("sys.argv", ["mcp-atlassian"]):
                    try:
                        main()
                    except SystemExit:
                        pass

                    # Verify asyncio.run was called
                    assert mock_asyncio_run.called

                    # Get the coroutine info
                    called_coro = mock_asyncio_run._called_with
                    coro_repr = repr(called_coro)

                    # All transports must use direct execution
                    assert "run_with_stdio_monitoring" not in coro_repr
                    assert "run_async" in coro_repr or hasattr(called_coro, "cr_code")

    def test_cli_overrides_env_transport(self, mock_server, mock_asyncio_run):
        """Test that CLI transport argument overrides environment variable."""
        with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
            with patch.dict("os.environ", {"TRANSPORT": "sse"}):
                # Simulate CLI args with --transport stdio
                with patch("sys.argv", ["mcp-atlassian", "--transport", "stdio"]):
                    try:
                        main()
                    except SystemExit:
                        pass

                    # All transports now use direct execution
                    called_coro = mock_asyncio_run._called_with
                    coro_repr = repr(called_coro)
                    assert "run_async" in coro_repr or hasattr(called_coro, "cr_code")

    def test_signal_handlers_always_setup(self, mock_server):
        """Test that signal handlers are set up regardless of transport."""
        with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
            with patch("asyncio.run"):
                # Patch where it's imported in the main module
                with patch("mcp_atlassian.setup_signal_handlers") as mock_setup:
                    with patch.dict("os.environ", {"TRANSPORT": "stdio"}):
                        with patch("sys.argv", ["mcp-atlassian"]):
                            try:
                                main()
                            except SystemExit:
                                pass

                            # Signal handlers should always be set up
                            mock_setup.assert_called_once()

    def test_error_handling_preserved(self, mock_server):
        """Test that error handling works correctly for all transports."""
        # Make the server's run_async raise an exception when awaited
        error = RuntimeError("Server error")

        async def failing_run_async(**kwargs):
            raise error

        mock_server.run_async = failing_run_async

        with patch("mcp_atlassian.servers.main.AtlassianMCP", return_value=mock_server):
            with patch("asyncio.run") as mock_run:
                # Simulate the exception propagating through asyncio.run
                mock_run.side_effect = error

                with patch.dict("os.environ", {"TRANSPORT": "stdio"}):
                    with patch("sys.argv", ["mcp-atlassian"]):
                        # The main function logs the error and exits with code 1
                        with patch("sys.exit") as mock_exit:
                            main()
                            # Verify error was handled - sys.exit called with 1 for error
                            # and then with 0 in the finally block
                            assert mock_exit.call_count == 2
                            assert mock_exit.call_args_list[0][0][0] == 1  # Error exit
                            assert (
                                mock_exit.call_args_list[1][0][0] == 0
                            )  # Finally exit

```

--------------------------------------------------------------------------------
/tests/unit/servers/test_main_server.py:
--------------------------------------------------------------------------------

```python
"""Tests for the main MCP server implementation."""

from unittest.mock import AsyncMock, MagicMock, patch

import httpx
import pytest
from starlette.requests import Request
from starlette.responses import JSONResponse

from mcp_atlassian.servers.main import UserTokenMiddleware, main_mcp


@pytest.mark.anyio
async def test_run_server_stdio():
    """Test that main_mcp.run_async is called with stdio transport."""
    with patch.object(main_mcp, "run_async") as mock_run_async:
        mock_run_async.return_value = None
        await main_mcp.run_async(transport="stdio")
        mock_run_async.assert_called_once_with(transport="stdio")


@pytest.mark.anyio
async def test_run_server_sse():
    """Test that main_mcp.run_async is called with sse transport and correct port."""
    with patch.object(main_mcp, "run_async") as mock_run_async:
        mock_run_async.return_value = None
        test_port = 9000
        await main_mcp.run_async(transport="sse", port=test_port)
        mock_run_async.assert_called_once_with(transport="sse", port=test_port)


@pytest.mark.anyio
async def test_run_server_streamable_http():
    """Test that main_mcp.run_async is called with streamable-http transport and correct parameters."""
    with patch.object(main_mcp, "run_async") as mock_run_async:
        mock_run_async.return_value = None
        test_port = 9001
        test_host = "127.0.0.1"
        test_path = "/custom_mcp"
        await main_mcp.run_async(
            transport="streamable-http", port=test_port, host=test_host, path=test_path
        )
        mock_run_async.assert_called_once_with(
            transport="streamable-http", port=test_port, host=test_host, path=test_path
        )


@pytest.mark.anyio
async def test_run_server_invalid_transport():
    """Test that run_server raises ValueError for invalid transport."""
    # We don't need to patch run_async here as the error occurs before it's called
    with pytest.raises(ValueError) as excinfo:
        await main_mcp.run_async(transport="invalid")  # type: ignore

    assert "Unknown transport" in str(excinfo.value)
    assert "invalid" in str(excinfo.value)


@pytest.mark.anyio
async def test_health_check_endpoint():
    """Test the health check endpoint returns 200 and correct JSON response."""
    app = main_mcp.sse_app()
    transport = httpx.ASGITransport(app=app)
    async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
        response = await client.get("/healthz")

        assert response.status_code == 200
        assert response.json() == {"status": "ok"}


@pytest.mark.anyio
async def test_sse_app_health_check_endpoint():
    """Test the /healthz endpoint on the SSE app returns 200 and correct JSON response."""
    app = main_mcp.sse_app()
    transport = httpx.ASGITransport(app=app)
    async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
        response = await client.get("/healthz")
        assert response.status_code == 200
        assert response.json() == {"status": "ok"}


@pytest.mark.anyio
async def test_streamable_http_app_health_check_endpoint():
    """Test the /healthz endpoint on the Streamable HTTP app returns 200 and correct JSON response."""
    app = main_mcp.streamable_http_app()
    transport = httpx.ASGITransport(app=app)
    async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
        response = await client.get("/healthz")
        assert response.status_code == 200
        assert response.json() == {"status": "ok"}


class TestUserTokenMiddleware:
    """Tests for the UserTokenMiddleware class."""

    @pytest.fixture
    def middleware(self):
        """Create a UserTokenMiddleware instance for testing."""
        mock_app = AsyncMock()
        # Create a mock MCP server to avoid warnings
        mock_mcp_server = MagicMock()
        mock_mcp_server.settings.streamable_http_path = "/mcp"
        return UserTokenMiddleware(mock_app, mcp_server_ref=mock_mcp_server)

    @pytest.fixture
    def mock_request(self):
        """Create a mock request for testing."""
        request = MagicMock(spec=Request)
        request.url.path = "/mcp"
        request.method = "POST"
        request.headers = {}
        # Create a real state object that can be modified
        from types import SimpleNamespace

        request.state = SimpleNamespace()
        return request

    @pytest.fixture
    def mock_call_next(self):
        """Create a mock call_next function."""
        mock_response = JSONResponse({"test": "response"})
        call_next = AsyncMock(return_value=mock_response)
        return call_next

    @pytest.mark.anyio
    async def test_cloud_id_header_extraction_success(
        self, middleware, mock_request, mock_call_next
    ):
        """Test successful cloud ID header extraction."""
        # Setup request with cloud ID header
        mock_request.headers = {
            "Authorization": "Bearer test-token",
            "X-Atlassian-Cloud-Id": "test-cloud-id-123",
        }

        result = await middleware.dispatch(mock_request, mock_call_next)

        # Verify cloud ID was extracted and stored in request state
        assert hasattr(mock_request.state, "user_atlassian_cloud_id")
        assert mock_request.state.user_atlassian_cloud_id == "test-cloud-id-123"

        # Verify the request was processed normally
        mock_call_next.assert_called_once_with(mock_request)
        assert result is not None

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_custom_headers.py:
--------------------------------------------------------------------------------

```python
"""Tests for Confluence custom headers functionality."""

import os
from unittest.mock import MagicMock, patch

from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig


class TestConfluenceConfigCustomHeaders:
    """Test ConfluenceConfig parsing of custom headers."""

    def test_no_custom_headers(self):
        """Test ConfluenceConfig when no custom headers are configured."""
        with patch.dict(
            os.environ,
            {
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                "CONFLUENCE_USERNAME": "test_user",
                "CONFLUENCE_API_TOKEN": "test_token",
            },
            clear=True,
        ):
            config = ConfluenceConfig.from_env()
            assert config.custom_headers == {}

    def test_service_specific_headers_only(self):
        """Test ConfluenceConfig parsing of service-specific headers only."""
        with patch.dict(
            os.environ,
            {
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                "CONFLUENCE_USERNAME": "test_user",
                "CONFLUENCE_API_TOKEN": "test_token",
                "CONFLUENCE_CUSTOM_HEADERS": "X-Confluence-Specific=confluence_value,X-Service=service_value",
            },
            clear=True,
        ):
            config = ConfluenceConfig.from_env()
            expected = {
                "X-Confluence-Specific": "confluence_value",
                "X-Service": "service_value",
            }
            assert config.custom_headers == expected

    def test_malformed_headers_are_ignored(self):
        """Test that malformed headers are ignored gracefully."""
        with patch.dict(
            os.environ,
            {
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                "CONFLUENCE_USERNAME": "test_user",
                "CONFLUENCE_API_TOKEN": "test_token",
                "CONFLUENCE_CUSTOM_HEADERS": "malformed-header,X-Valid=valid_value,another-malformed",
            },
            clear=True,
        ):
            config = ConfluenceConfig.from_env()
            expected = {"X-Valid": "valid_value"}
            assert config.custom_headers == expected

    def test_empty_header_strings(self):
        """Test handling of empty header strings."""
        with patch.dict(
            os.environ,
            {
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                "CONFLUENCE_USERNAME": "test_user",
                "CONFLUENCE_API_TOKEN": "test_token",
                "CONFLUENCE_CUSTOM_HEADERS": "   ",
            },
            clear=True,
        ):
            config = ConfluenceConfig.from_env()
            assert config.custom_headers == {}


class TestConfluenceClientCustomHeaders:
    """Test ConfluenceClient custom headers application."""

    def test_no_custom_headers_applied(self, monkeypatch):
        """Test that no headers are applied when none are configured."""
        # Mock Confluence and related dependencies
        mock_confluence = MagicMock()
        mock_session = MagicMock()
        mock_session.headers = {}
        mock_confluence._session = mock_session

        monkeypatch.setattr(
            "mcp_atlassian.confluence.client.Confluence",
            lambda **kwargs: mock_confluence,
        )
        monkeypatch.setattr(
            "mcp_atlassian.confluence.client.configure_ssl_verification",
            lambda **kwargs: None,
        )
        monkeypatch.setattr(
            "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
            lambda **kwargs: MagicMock(),
        )

        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="basic",
            username="test_user",
            api_token="test_token",
            custom_headers={},
        )

        client = ConfluenceClient(config=config)

        # Verify no custom headers were applied
        assert mock_session.headers == {}

    def test_custom_headers_applied_to_session(self, monkeypatch):
        """Test that custom headers are applied to the Confluence session."""
        # Mock Confluence and related dependencies
        mock_confluence = MagicMock()
        mock_session = MagicMock()
        mock_session.headers = {}
        mock_confluence._session = mock_session

        monkeypatch.setattr(
            "mcp_atlassian.confluence.client.Confluence",
            lambda **kwargs: mock_confluence,
        )
        monkeypatch.setattr(
            "mcp_atlassian.confluence.client.configure_ssl_verification",
            lambda **kwargs: None,
        )
        monkeypatch.setattr(
            "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
            lambda **kwargs: MagicMock(),
        )

        custom_headers = {
            "X-Corp-Auth": "token123",
            "X-Dept": "engineering",
            "User-Agent": "CustomConfluenceClient/1.0",
        }

        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="basic",
            username="test_user",
            api_token="test_token",
            custom_headers=custom_headers,
        )

        client = ConfluenceClient(config=config)

        # Verify custom headers were applied to session
        for header_name, header_value in custom_headers.items():
            assert mock_session.headers[header_name] == header_value

```

--------------------------------------------------------------------------------
/scripts/test_with_real_data.sh:
--------------------------------------------------------------------------------

```bash
#!/bin/bash

# Unified script for testing with real Atlassian data
# Supports testing models, API, or both

# Default settings
TEST_TYPE="all"  # Can be "all", "models", or "api"
VERBOSITY="-v"   # Verbosity level
RUN_WRITE_TESTS=false
FILTER=""        # Test filter using pytest's -k option

# Parse command line arguments
while [[ $# -gt 0 ]]; do
  case $1 in
    --models-only)
      TEST_TYPE="models"
      shift
      ;;
    --api-only)
      TEST_TYPE="api"
      shift
      ;;
    --all)
      TEST_TYPE="all"
      shift
      ;;
    --quiet)
      VERBOSITY=""
      shift
      ;;
    --verbose)
      VERBOSITY="-vv"
      shift
      ;;
    --with-write-tests)
      RUN_WRITE_TESTS=true
      shift
      ;;
    -k)
      FILTER="-k \"$2\""
      shift
      shift
      ;;
    --help)
      echo "Usage: $0 [options]"
      echo "Options:"
      echo "  --models-only          Test only Pydantic models"
      echo "  --api-only             Test only API integration"
      echo "  --all                  Test both models and API (default)"
      echo "  --quiet                Minimal output"
      echo "  --verbose              More detailed output"
      echo "  --with-write-tests     Include tests that modify data (including TextContent validation)"
      echo "  -k \"PATTERN\"         Only run tests matching the given pattern (uses pytest's -k option)"
      echo "  --help                 Show this help message"
      exit 0
      ;;
    *)
      echo "Unknown option: $1"
      echo "Use --help for usage information"
      exit 1
      ;;
  esac
done

# Check if .env file exists
if [ ! -f ".env" ]; then
    echo "Warning: .env file not found. Tests will be skipped if environment variables are not set."
else
    # Load environment variables from .env
    source .env
fi

# Set environment variable to enable real data testing
export USE_REAL_DATA=true

# Set specific test IDs for API validation tests
# These will be used if they're set, otherwise tests will be skipped
export JIRA_TEST_ISSUE_KEY="${JIRA_TEST_ISSUE_KEY:-}"
export JIRA_TEST_EPIC_KEY="${JIRA_TEST_EPIC_KEY:-}"
export CONFLUENCE_TEST_PAGE_ID="${CONFLUENCE_TEST_PAGE_ID:-}"
export JIRA_TEST_PROJECT_KEY="${JIRA_TEST_PROJECT_KEY:-}"
export CONFLUENCE_TEST_SPACE_KEY="${CONFLUENCE_TEST_SPACE_KEY:-}"

# Check required environment variables and warn if any are missing
required_vars=(
    "JIRA_URL"
    "JIRA_USERNAME"
    "JIRA_API_TOKEN"
    "CONFLUENCE_URL"
    "CONFLUENCE_USERNAME"
    "CONFLUENCE_API_TOKEN"
)

missing_vars=0
for var in "${required_vars[@]}"; do
    if [ -z "${!var}" ]; then
        echo "Warning: Environment variable $var is not set. Some tests will be skipped."
        missing_vars=$((missing_vars+1))
    fi
done

if [ $missing_vars -gt 0 ]; then
    echo "Found $missing_vars missing required variables. Tests requiring these variables will be skipped."
    echo "You can set these in your .env file to run all tests."
fi

# Function to run model tests
run_model_tests() {
    echo "Running Pydantic model tests with real data..."
    echo ""

    echo "===== Base Model Tests ====="
    uv run pytest tests/unit/models/test_base_models.py $VERBOSITY

    echo ""
    echo "===== Jira Model Tests ====="
    uv run pytest tests/unit/models/test_jira_models.py::TestRealJiraData $VERBOSITY

    echo ""
    echo "===== Confluence Model Tests ====="
    uv run pytest tests/unit/models/test_confluence_models.py::TestRealConfluenceData $VERBOSITY
}

# Function to run API tests
run_api_tests() {
    echo ""
    echo "===== API Read-Only Tests ====="

    # If a filter is provided, run all tests with that filter
    if [[ -n "$FILTER" ]]; then
        echo "Running tests with filter: $FILTER"
        eval "uv run pytest tests/test_real_api_validation.py $VERBOSITY $FILTER"
        return
    fi

    # Otherwise run specific tests based on write/read only setting
    # Run the read-only tests
    uv run pytest tests/test_real_api_validation.py::test_jira_get_issue tests/test_real_api_validation.py::test_jira_get_issue_with_fields tests/test_real_api_validation.py::test_jira_get_epic_issues tests/test_real_api_validation.py::test_confluence_get_page_content $VERBOSITY

    if [[ "$RUN_WRITE_TESTS" == "true" ]]; then
        echo ""
        echo "===== API Write Operation Tests ====="
        echo "WARNING: These tests will create and modify data in your Atlassian instance."
        echo "Press Ctrl+C now to cancel, or wait 5 seconds to continue..."
        sleep 5

        # Run the write operation tests
        uv run pytest tests/test_real_api_validation.py::test_jira_create_issue tests/test_real_api_validation.py::test_jira_create_subtask tests/test_real_api_validation.py::test_jira_create_task_with_parent tests/test_real_api_validation.py::test_jira_add_comment tests/test_real_api_validation.py::test_confluence_create_page tests/test_real_api_validation.py::test_confluence_update_page tests/test_real_api_validation.py::test_jira_create_epic tests/test_real_api_validation.py::test_jira_create_epic_two_step $VERBOSITY

        # Run the skipped transition test if explicitly requested write tests
        echo ""
        echo "===== API Advanced Write Tests ====="
        echo "Running tests for status transitions"
        uv run pytest tests/test_real_api_validation.py::test_jira_transition_issue -v -k "test_jira_transition_issue"
    fi
}

# Run the appropriate tests based on the selected type
case $TEST_TYPE in
    "models")
        run_model_tests
        ;;
    "api")
        run_api_tests
        ;;
    "all")
        run_model_tests
        run_api_tests
        ;;
esac

echo ""
echo "Testing completed. Check the output for any failures or skipped tests."

```

--------------------------------------------------------------------------------
/tests/unit/utils/test_lifecycle.py:
--------------------------------------------------------------------------------

```python
"""Tests for lifecycle management utilities."""

import signal
from unittest.mock import patch

from mcp_atlassian.utils.lifecycle import (
    _shutdown_event,
    ensure_clean_exit,
    setup_signal_handlers,
)


class TestSetupSignalHandlers:
    """Test signal handler setup functionality."""

    @patch("signal.signal")
    def test_setup_signal_handlers_all_platforms(self, mock_signal):
        """Test that signal handlers are registered for all platforms."""
        # Mock SIGPIPE as available
        mock_signal.side_effect = None

        setup_signal_handlers()

        # Check that SIGTERM and SIGINT handlers were registered
        assert any(call[0][0] == signal.SIGTERM for call in mock_signal.call_args_list)
        assert any(call[0][0] == signal.SIGINT for call in mock_signal.call_args_list)

        # Check that all handlers are callable
        for call in mock_signal.call_args_list:
            assert callable(call[0][1])

    @patch("signal.signal")
    def test_setup_signal_handlers_no_sigpipe(self, mock_signal):
        """Test signal handler setup when SIGPIPE is not available (Windows)."""

        # Mock SIGPIPE as not available
        def side_effect(sig, handler):
            if sig == signal.SIGPIPE:
                raise AttributeError("SIGPIPE not available")
            return None

        mock_signal.side_effect = side_effect

        # This should not raise an exception
        setup_signal_handlers()

        # SIGTERM and SIGINT should still be registered
        assert any(call[0][0] == signal.SIGTERM for call in mock_signal.call_args_list)
        assert any(call[0][0] == signal.SIGINT for call in mock_signal.call_args_list)

    @patch("signal.signal")
    def test_signal_handler_function(self, mock_signal):
        """Test that the signal handler function works correctly."""
        handler = None

        # Capture the handler function
        def capture_handler(sig, func):
            nonlocal handler
            if sig == signal.SIGTERM:
                handler = func

        mock_signal.side_effect = capture_handler

        # Clear the shutdown event before test
        _shutdown_event.clear()

        setup_signal_handlers()

        # Call the handler
        assert handler is not None
        handler(signal.SIGTERM, None)

        # Check shutdown event was set instead of calling sys.exit
        assert _shutdown_event.is_set()


class TestEnsureCleanExit:
    """Test the clean exit functionality."""

    @patch("sys.stderr")
    @patch("sys.stdout")
    def test_ensure_clean_exit(self, mock_stdout, mock_stderr):
        """Test that output streams are flushed on exit."""
        # Mock streams as open
        mock_stdout.closed = False
        mock_stderr.closed = False

        ensure_clean_exit()

        # Check both streams were flushed
        mock_stdout.flush.assert_called_once()
        mock_stderr.flush.assert_called_once()

    @patch("sys.stderr")
    @patch("sys.stdout")
    def test_ensure_clean_exit_closed_stdout(self, mock_stdout, mock_stderr):
        """Test that closed stdout is handled gracefully."""
        # Mock stdout as closed, stderr as open
        mock_stdout.closed = True
        mock_stderr.closed = False

        ensure_clean_exit()

        # Check stdout was not flushed
        mock_stdout.flush.assert_not_called()
        # Check stderr was still flushed
        mock_stderr.flush.assert_called_once()

    @patch("sys.stderr")
    @patch("sys.stdout")
    def test_ensure_clean_exit_closed_stderr(self, mock_stdout, mock_stderr):
        """Test that closed stderr is handled gracefully."""
        # Mock stderr as closed, stdout as open
        mock_stdout.closed = False
        mock_stderr.closed = True

        ensure_clean_exit()

        # Check stdout was flushed
        mock_stdout.flush.assert_called_once()
        # Check stderr was not flushed
        mock_stderr.flush.assert_not_called()

    @patch("sys.stderr")
    @patch("sys.stdout")
    def test_ensure_clean_exit_both_closed(self, mock_stdout, mock_stderr):
        """Test that both streams being closed is handled gracefully."""
        # Mock both streams as closed
        mock_stdout.closed = True
        mock_stderr.closed = True

        ensure_clean_exit()

        # Check neither stream was flushed
        mock_stdout.flush.assert_not_called()
        mock_stderr.flush.assert_not_called()

    @patch("sys.stderr")
    @patch("sys.stdout")
    def test_ensure_clean_exit_flush_raises_value_error(self, mock_stdout, mock_stderr):
        """Test that ValueError during flush is handled gracefully."""
        # Mock streams as open but flush raises ValueError
        mock_stdout.closed = False
        mock_stderr.closed = False
        mock_stdout.flush.side_effect = ValueError("I/O operation on closed file")
        mock_stderr.flush.side_effect = ValueError("I/O operation on closed file")

        # Should not raise exception
        ensure_clean_exit()

        # Check both streams had flush attempts
        mock_stdout.flush.assert_called_once()
        mock_stderr.flush.assert_called_once()

    @patch("sys.stderr")
    @patch("sys.stdout")
    def test_ensure_clean_exit_no_closed_attribute(self, mock_stdout, mock_stderr):
        """Test handling of streams without 'closed' attribute."""
        # Remove closed attribute to simulate non-standard streams
        if hasattr(mock_stdout, "closed"):
            delattr(mock_stdout, "closed")
        if hasattr(mock_stderr, "closed"):
            delattr(mock_stderr, "closed")

        # Should not raise exception
        ensure_clean_exit()

        # Check neither stream was flushed (no closed attribute)
        mock_stdout.flush.assert_not_called()
        mock_stderr.flush.assert_not_called()

```

--------------------------------------------------------------------------------
/tests/integration/test_stdin_monitoring_fix.py:
--------------------------------------------------------------------------------

```python
"""Simple integration test to verify stdin monitoring fix for streamable-http transport.

This test verifies that the fix in PR #522 correctly disables stdin monitoring
for HTTP transports (SSE and streamable-http) to prevent hanging issues.
"""

import os
import subprocess
import sys
import tempfile
from pathlib import Path

import pytest


@pytest.mark.integration
class TestStdinMonitoringFix:
    """Test that stdin monitoring is correctly disabled for HTTP transports."""

    def test_streamable_http_starts_without_hanging(self):
        """Test that streamable-http transport starts without stdin monitoring issues.

        This test creates a minimal script that would hang if stdin monitoring
        was enabled for HTTP transports, and verifies it runs successfully.
        """
        # Create a test script that simulates the issue
        with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
            f.write("""
import sys
import os

# The actual test: if stdin monitoring was incorrectly enabled for HTTP,
# closing stdin would cause issues. With the fix, it should work fine.
if __name__ == "__main__":
    # This simulates the scenario where stdin is closed (like in the bug report)
    # With the fix, HTTP transports won't monitor stdin, so this won't cause issues
    sys.stdin.close()

    # If we get here without hanging, the fix is working
    print("TEST_PASSED: No hanging with closed stdin")
    sys.exit(0)
""")
            test_script = f.name

        try:
            # Run the test script
            result = subprocess.run(
                [sys.executable, test_script],
                capture_output=True,
                text=True,
                timeout=5,  # Should complete quickly, timeout means hanging
            )

            # Check the output
            assert "TEST_PASSED" in result.stdout, (
                f"Test failed. Output: {result.stdout}, Error: {result.stderr}"
            )
            assert result.returncode == 0, (
                f"Script failed with code {result.returncode}"
            )

        except subprocess.TimeoutExpired:
            pytest.fail(
                "Script timed out - stdin monitoring may still be active for HTTP transports"
            )
        finally:
            # Clean up
            os.unlink(test_script)

    def test_code_structure_validates_fix(self):
        """Validate that the code structure implements the fix correctly.

        This checks that the main entry point has the correct logic to disable
        stdin monitoring for HTTP transports.
        """
        # Read the main module source directly
        main_file = (
            Path(__file__).parent.parent.parent
            / "src"
            / "mcp_atlassian"
            / "__init__.py"
        )
        with open(main_file) as f:
            source = f.read()

        # Check for the key parts of the fix

        # 1. Different handling for stdio vs HTTP transports
        assert 'if final_transport == "stdio":' in source

        # 2. Comments explaining the fix
        assert (
            "# For stdio transport, don't monitor stdin as MCP server handles it internally"
            in source
        )
        assert (
            "# This prevents race conditions where both try to read from the same stdin"
            in source
        )
        assert (
            "# For HTTP transports (SSE, streamable-http), don't use stdin monitoring"
            in source
        )
        assert (
            "# as it causes premature shutdown when the client closes stdin" in source
        )
        assert "# The server should only rely on OS signals for shutdown" in source

        # 3. Proper conditional logic - look for the actual asyncio.run calls
        # There should be two separate sections handling stdio vs HTTP
        stdio_section = False
        http_section = False

        lines = source.split("\n")
        for i, line in enumerate(lines):
            # Look for the stdio handling
            if "# For stdio transport," in line and "monitor stdin" in line:
                # Next few lines should have the stdio-specific handling
                next_lines = "\n".join(lines[i : i + 5])
                if (
                    'if final_transport == "stdio":' in next_lines
                    and "asyncio.run" in next_lines
                ):
                    stdio_section = True

            # Look for the HTTP handling
            if "# For HTTP transports" in line and "stdin monitoring" in line:
                # Next few lines should have the HTTP-specific handling
                next_lines = "\n".join(lines[i : i + 10])
                if (
                    "without stdin monitoring" in next_lines
                    and "asyncio.run" in next_lines
                ):
                    http_section = True

        assert stdio_section, "Could not find proper stdio transport handling"
        assert http_section, "Could not find proper HTTP transport handling"

        print("Code structure validation passed - fix is properly implemented")

    def test_lifecycle_module_supports_http_transports(self):
        """Test that the lifecycle module properly handles HTTP transports.

        This verifies that the lifecycle management doesn't interfere with
        HTTP transport operation.
        """
        from mcp_atlassian.utils.lifecycle import (
            ensure_clean_exit,
            setup_signal_handlers,
        )

        # These should work without issues for HTTP transports
        try:
            setup_signal_handlers()
            ensure_clean_exit()
            print("Lifecycle module works correctly for HTTP transports")
        except Exception as e:
            pytest.fail(f"Lifecycle module failed: {e}")

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_spaces.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the SpacesMixin class."""

from unittest.mock import patch

import pytest
import requests
from fixtures.confluence_mocks import MOCK_SPACES_RESPONSE

from mcp_atlassian.confluence.spaces import SpacesMixin


class TestSpacesMixin:
    """Tests for the SpacesMixin class."""

    @pytest.fixture
    def spaces_mixin(self, confluence_client):
        """Create a SpacesMixin instance for testing."""
        # SpacesMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.spaces.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = SpacesMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = confluence_client.confluence
            mixin.config = confluence_client.config
            mixin.preprocessor = confluence_client.preprocessor
            return mixin

    def test_get_spaces(self, spaces_mixin):
        """Test that get_spaces returns spaces from the Confluence client."""
        # Act
        result = spaces_mixin.get_spaces(start=10, limit=20)

        # Assert
        spaces_mixin.confluence.get_all_spaces.assert_called_once_with(
            start=10, limit=20
        )
        assert result == MOCK_SPACES_RESPONSE

    def test_get_user_contributed_spaces_success(self, spaces_mixin):
        """Test getting spaces that the user has contributed to."""
        # Arrange
        mock_result = {
            "results": [
                {
                    "content": {"_expandable": {"space": "/rest/api/space/TEST"}},
                    "resultGlobalContainer": {
                        "title": "Test Space",
                        "displayUrl": "/spaces/TEST",
                    },
                }
            ]
        }
        spaces_mixin.confluence.cql.return_value = mock_result

        # Act
        result = spaces_mixin.get_user_contributed_spaces(limit=100)

        # Assert
        spaces_mixin.confluence.cql.assert_called_once_with(
            cql="contributor = currentUser() order by lastmodified DESC", limit=100
        )
        assert result == {"TEST": {"key": "TEST", "name": "Test Space"}}

    def test_get_user_contributed_spaces_extraction_methods(self, spaces_mixin):
        """Test that the method extracts space keys from different result structures."""
        # Arrange - Test different extraction methods
        mock_results = {
            "results": [
                # Case 1: Extract from resultGlobalContainer.displayUrl
                {
                    "resultGlobalContainer": {
                        "title": "Space 1",
                        "displayUrl": "/spaces/SPACE1/pages",
                    }
                },
                # Case 2: Extract from content._expandable.space
                {
                    "content": {"_expandable": {"space": "/rest/api/space/SPACE2"}},
                    "resultGlobalContainer": {"title": "Space 2"},
                },
                # Case 3: Extract from url
                {
                    "url": "/spaces/SPACE3/pages/12345",
                    "resultGlobalContainer": {"title": "Space 3"},
                },
            ]
        }
        spaces_mixin.confluence.cql.return_value = mock_results

        # Act
        result = spaces_mixin.get_user_contributed_spaces()

        # Assert
        assert "SPACE1" in result
        assert result["SPACE1"]["name"] == "Space 1"
        assert "SPACE2" in result
        assert result["SPACE2"]["name"] == "Space 2"
        assert "SPACE3" in result
        assert result["SPACE3"]["name"] == "Space 3"

    def test_get_user_contributed_spaces_with_duplicate_spaces(self, spaces_mixin):
        """Test that duplicate spaces are deduplicated."""
        # Arrange
        mock_results = {
            "results": [
                # Same space key appears multiple times
                {
                    "resultGlobalContainer": {
                        "title": "Space 1",
                        "displayUrl": "/spaces/SPACE1",
                    }
                },
                {
                    "resultGlobalContainer": {
                        "title": "Space 1",
                        "displayUrl": "/spaces/SPACE1",
                    }
                },
                {"content": {"_expandable": {"space": "/rest/api/space/SPACE1"}}},
            ]
        }
        spaces_mixin.confluence.cql.return_value = mock_results

        # Act
        result = spaces_mixin.get_user_contributed_spaces()

        # Assert
        assert len(result) == 1
        assert "SPACE1" in result
        assert result["SPACE1"]["name"] == "Space 1"

    def test_get_user_contributed_spaces_api_error(self, spaces_mixin):
        """Test handling of API errors."""
        # Arrange
        spaces_mixin.confluence.cql.side_effect = requests.RequestException("API Error")

        # Act
        result = spaces_mixin.get_user_contributed_spaces()

        # Assert
        assert result == {}

    def test_get_user_contributed_spaces_key_error(self, spaces_mixin):
        """Test handling of KeyError when parsing results."""
        # Arrange
        spaces_mixin.confluence.cql.return_value = {"invalid_key": []}

        # Act
        result = spaces_mixin.get_user_contributed_spaces()

        # Assert
        assert result == {}

    def test_get_user_contributed_spaces_type_error(self, spaces_mixin):
        """Test handling of TypeError when processing results."""
        # Arrange
        spaces_mixin.confluence.cql.return_value = (
            None  # Will cause TypeError when iterating
        )

        # Act
        result = spaces_mixin.get_user_contributed_spaces()

        # Assert
        assert result == {}

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_config.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the ConfluenceConfig class."""

import os
from unittest.mock import patch

import pytest

from mcp_atlassian.confluence.config import ConfluenceConfig


def test_from_env_success():
    """Test that from_env successfully creates a config from environment variables."""
    # Need to clear and reset the environment for this test
    with patch.dict(
        "os.environ",
        {
            "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
            "CONFLUENCE_USERNAME": "test_username",
            "CONFLUENCE_API_TOKEN": "test_token",
        },
        clear=True,  # Clear existing environment variables
    ):
        config = ConfluenceConfig.from_env()
        assert config.url == "https://test.atlassian.net/wiki"
        assert config.username == "test_username"
        assert config.api_token == "test_token"


def test_from_env_missing_url():
    """Test that from_env raises ValueError when URL is missing."""
    original_env = os.environ.copy()
    try:
        os.environ.clear()
        with pytest.raises(
            ValueError, match="Missing required CONFLUENCE_URL environment variable"
        ):
            ConfluenceConfig.from_env()
    finally:
        # Restore original environment
        os.environ.clear()
        os.environ.update(original_env)


def test_from_env_missing_cloud_auth():
    """Test that from_env raises ValueError when cloud auth credentials are missing."""
    with patch.dict(
        os.environ,
        {
            "CONFLUENCE_URL": "https://test.atlassian.net",  # Cloud URL
        },
        clear=True,
    ):
        with pytest.raises(
            ValueError,
            match="Cloud authentication requires CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN",
        ):
            ConfluenceConfig.from_env()


def test_from_env_missing_server_auth():
    """Test that from_env raises ValueError when server auth credentials are missing."""
    with patch.dict(
        os.environ,
        {
            "CONFLUENCE_URL": "https://confluence.example.com",  # Server URL
        },
        clear=True,
    ):
        with pytest.raises(
            ValueError,
            match="Server/Data Center authentication requires CONFLUENCE_PERSONAL_TOKEN",
        ):
            ConfluenceConfig.from_env()


def test_is_cloud():
    """Test that is_cloud property returns correct value."""
    # Arrange & Act - Cloud URL
    config = ConfluenceConfig(
        url="https://example.atlassian.net/wiki",
        auth_type="basic",
        username="test",
        api_token="test",
    )

    # Assert
    assert config.is_cloud is True

    # Arrange & Act - Server URL
    config = ConfluenceConfig(
        url="https://confluence.example.com",
        auth_type="pat",
        personal_token="test",
    )

    # Assert
    assert config.is_cloud is False

    # Arrange & Act - Localhost URL (Data Center/Server)
    config = ConfluenceConfig(
        url="http://localhost:8090",
        auth_type="pat",
        personal_token="test",
    )

    # Assert
    assert config.is_cloud is False

    # Arrange & Act - IP localhost URL (Data Center/Server)
    config = ConfluenceConfig(
        url="http://127.0.0.1:8090",
        auth_type="pat",
        personal_token="test",
    )

    # Assert
    assert config.is_cloud is False


def test_from_env_proxy_settings():
    """Test that from_env correctly loads proxy environment variables."""
    with patch.dict(
        os.environ,
        {
            "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
            "CONFLUENCE_USERNAME": "test_username",
            "CONFLUENCE_API_TOKEN": "test_token",
            "HTTP_PROXY": "http://proxy.example.com:8080",
            "HTTPS_PROXY": "https://proxy.example.com:8443",
            "SOCKS_PROXY": "socks5://user:[email protected]:1080",
            "NO_PROXY": "localhost,127.0.0.1",
        },
        clear=True,
    ):
        config = ConfluenceConfig.from_env()
        assert config.http_proxy == "http://proxy.example.com:8080"
        assert config.https_proxy == "https://proxy.example.com:8443"
        assert config.socks_proxy == "socks5://user:[email protected]:1080"
        assert config.no_proxy == "localhost,127.0.0.1"

    # Service-specific overrides
    with patch.dict(
        os.environ,
        {
            "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
            "CONFLUENCE_USERNAME": "test_username",
            "CONFLUENCE_API_TOKEN": "test_token",
            "CONFLUENCE_HTTP_PROXY": "http://confluence-proxy.example.com:8080",
            "CONFLUENCE_HTTPS_PROXY": "https://confluence-proxy.example.com:8443",
            "CONFLUENCE_SOCKS_PROXY": "socks5://user:[email protected]:1080",
            "CONFLUENCE_NO_PROXY": "localhost,127.0.0.1,.internal.example.com",
        },
        clear=True,
    ):
        config = ConfluenceConfig.from_env()
        assert config.http_proxy == "http://confluence-proxy.example.com:8080"
        assert config.https_proxy == "https://confluence-proxy.example.com:8443"
        assert (
            config.socks_proxy == "socks5://user:[email protected]:1080"
        )
        assert config.no_proxy == "localhost,127.0.0.1,.internal.example.com"


def test_is_cloud_oauth_with_cloud_id():
    """Test that is_cloud returns True for OAuth with cloud_id regardless of URL."""
    from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig

    # OAuth with cloud_id and no URL - should be Cloud
    oauth_config = BYOAccessTokenOAuthConfig(
        cloud_id="test-cloud-id", access_token="test-token"
    )
    config = ConfluenceConfig(
        url=None,  # URL can be None in Multi-Cloud OAuth mode
        auth_type="oauth",
        oauth_config=oauth_config,
    )
    assert config.is_cloud is True

    # OAuth with cloud_id and server URL - should still be Cloud
    config = ConfluenceConfig(
        url="https://confluence.example.com",  # Server-like URL
        auth_type="oauth",
        oauth_config=oauth_config,
    )
    assert config.is_cloud is True

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/comments.py:
--------------------------------------------------------------------------------

```python
"""Module for Confluence comment operations."""

import logging

import requests

from ..models.confluence import ConfluenceComment
from .client import ConfluenceClient

logger = logging.getLogger("mcp-atlassian")


class CommentsMixin(ConfluenceClient):
    """Mixin for Confluence comment operations."""

    def get_page_comments(
        self, page_id: str, *, return_markdown: bool = True
    ) -> list[ConfluenceComment]:
        """
        Get all comments for a specific page.

        Args:
            page_id: The ID of the page to get comments from
            return_markdown: When True, returns content in markdown format,
                           otherwise returns raw HTML (keyword-only)

        Returns:
            List of ConfluenceComment models containing comment content and metadata
        """
        try:
            # Get page info to extract space details
            page = self.confluence.get_page_by_id(page_id=page_id, expand="space")
            space_key = page.get("space", {}).get("key", "")

            # Get comments with expanded content
            comments_response = self.confluence.get_page_comments(
                content_id=page_id, expand="body.view.value,version", depth="all"
            )

            # Process each comment
            comment_models = []
            for comment_data in comments_response.get("results", []):
                # Get the content based on format
                body = comment_data["body"]["view"]["value"]
                processed_html, processed_markdown = (
                    self.preprocessor.process_html_content(
                        body, space_key=space_key, confluence_client=self.confluence
                    )
                )

                # Create a copy of the comment data to modify
                modified_comment_data = comment_data.copy()

                # Modify the body value based on the return format
                if "body" not in modified_comment_data:
                    modified_comment_data["body"] = {}
                if "view" not in modified_comment_data["body"]:
                    modified_comment_data["body"]["view"] = {}

                # Set the appropriate content based on return format
                modified_comment_data["body"]["view"]["value"] = (
                    processed_markdown if return_markdown else processed_html
                )

                # Create the model with the processed content
                comment_model = ConfluenceComment.from_api_response(
                    modified_comment_data,
                    base_url=self.config.url,
                )

                comment_models.append(comment_model)

            return comment_models

        except KeyError as e:
            logger.error(f"Missing key in comment data: {str(e)}")
            return []
        except requests.RequestException as e:
            logger.error(f"Network error when fetching comments: {str(e)}")
            return []
        except (ValueError, TypeError) as e:
            logger.error(f"Error processing comment data: {str(e)}")
            return []
        except Exception as e:  # noqa: BLE001 - Intentional fallback with full logging
            logger.error(f"Unexpected error fetching comments: {str(e)}")
            logger.debug("Full exception details for comments:", exc_info=True)
            return []

    def add_comment(self, page_id: str, content: str) -> ConfluenceComment | None:
        """
        Add a comment to a Confluence page.

        Args:
            page_id: The ID of the page to add the comment to
            content: The content of the comment (in Confluence storage format)

        Returns:
            ConfluenceComment object if comment was added successfully, None otherwise
        """
        try:
            # Get page info to extract space details
            page = self.confluence.get_page_by_id(page_id=page_id, expand="space")
            space_key = page.get("space", {}).get("key", "")

            # Convert markdown to Confluence storage format if needed
            # The atlassian-python-api expects content in Confluence storage format
            if not content.strip().startswith("<"):
                # If content doesn't appear to be HTML/XML, treat it as markdown
                content = self.preprocessor.markdown_to_confluence_storage(content)

            # Add the comment via the Confluence API
            response = self.confluence.add_comment(page_id, content)

            if not response:
                logger.error("Failed to add comment: empty response")
                return None

            # Process the comment to return a consistent model
            processed_html, processed_markdown = self.preprocessor.process_html_content(
                response.get("body", {}).get("view", {}).get("value", ""),
                space_key=space_key,
                confluence_client=self.confluence,
            )

            # Modify the response to include processed content
            modified_response = response.copy()
            if "body" not in modified_response:
                modified_response["body"] = {}
            if "view" not in modified_response["body"]:
                modified_response["body"]["view"] = {}

            modified_response["body"]["view"]["value"] = processed_markdown

            # Create and return the comment model
            return ConfluenceComment.from_api_response(
                modified_response,
                base_url=self.config.url,
            )

        except requests.RequestException as e:
            logger.error(f"Network error when adding comment: {str(e)}")
            return None
        except (ValueError, TypeError, KeyError) as e:
            logger.error(f"Error processing comment data: {str(e)}")
            return None
        except Exception as e:  # noqa: BLE001 - Intentional fallback with full logging
            logger.error(f"Unexpected error adding comment: {str(e)}")
            logger.debug("Full exception details for adding comment:", exc_info=True)
            return None

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/protocols.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira protocol definitions."""

from abc import abstractmethod
from typing import Any, Protocol, runtime_checkable

from ..models.jira import JiraIssue
from ..models.jira.search import JiraSearchResult


class AttachmentsOperationsProto(Protocol):
    """Protocol defining attachments operations interface."""

    @abstractmethod
    def upload_attachments(
        self, issue_key: str, file_paths: list[str]
    ) -> dict[str, Any]:
        """
        Upload multiple attachments to a Jira issue.

        Args:
            issue_key: The Jira issue key (e.g., 'PROJ-123')
            file_paths: List of paths to files to upload

        Returns:
            A dictionary with upload results
        """


class IssueOperationsProto(Protocol):
    """Protocol defining issue operations interface."""

    @abstractmethod
    def get_issue(
        self,
        issue_key: str,
        expand: str | None = None,
        comment_limit: int | str | None = 10,
        fields: str
        | list[str]
        | tuple[str, ...]
        | set[str]
        | None = "summary,description,status,assignee,reporter,labels,priority,created,updated,issuetype",
        properties: str | list[str] | None = None,
        update_history: bool = True,
    ) -> JiraIssue:
        """Get a Jira issue by key."""


class SearchOperationsProto(Protocol):
    """Protocol defining search operations interface."""

    @abstractmethod
    def search_issues(
        self,
        jql: str,
        fields: str
        | list[str]
        | tuple[str, ...]
        | set[str]
        | None = "summary,description,status,assignee,reporter,labels,priority,created,updated,issuetype",
        start: int = 0,
        limit: int = 50,
        expand: str | None = None,
        projects_filter: str | None = None,
    ) -> JiraSearchResult:
        """Search for issues using JQL."""


class EpicOperationsProto(Protocol):
    """Protocol defining epic operations interface."""

    @abstractmethod
    def update_epic_fields(self, issue_key: str, kwargs: dict[str, Any]) -> JiraIssue:
        """
        Update Epic-specific fields after Epic creation.

        This method implements the second step of the two-step Epic creation process,
        applying Epic-specific fields that may be rejected during initial creation
        due to screen configuration restrictions.

        Args:
            issue_key: The key of the created Epic
            kwargs: Dictionary containing special keys with Epic field information

        Returns:
            JiraIssue: The updated Epic

        Raises:
            Exception: If there is an error updating the Epic fields
        """

    @abstractmethod
    def prepare_epic_fields(
        self,
        fields: dict[str, Any],
        summary: str,
        kwargs: dict[str, Any],
        project_key: str = None,
    ) -> None:
        """
        Prepare epic-specific fields for issue creation.

        Args:
            fields: The fields dictionary to update
            summary: The issue summary that can be used as a default epic name
            kwargs: Additional fields from the user
        """

    @abstractmethod
    def _try_discover_fields_from_existing_epic(
        self, field_ids: dict[str, str]
    ) -> None:
        """
        Try to discover Epic fields from existing epics in the Jira instance.

        This is a fallback method used when standard field discovery doesn't find
        all the necessary Epic-related fields. It searches for an existing Epic and
        analyzes its field structure to identify Epic fields dynamically.

        Args:
            field_ids: Dictionary of field IDs to update with discovered fields
        """


class FieldsOperationsProto(Protocol):
    """Protocol defining fields operations interface."""

    @abstractmethod
    def _generate_field_map(self, force_regenerate: bool = False) -> dict[str, str]:
        """
        Generates and caches a map of lowercase field names to field IDs.

        Args:
            force_regenerate: If True, forces regeneration even if cache exists.

        Returns:
            A dictionary mapping lowercase field names and field IDs to actual field IDs.
        """

    @abstractmethod
    def get_field_by_id(
        self, field_id: str, refresh: bool = False
    ) -> dict[str, Any] | None:
        """
        Get field definition by ID.
        """

    @abstractmethod
    def get_field_ids_to_epic(self) -> dict[str, str]:
        """
        Dynamically discover Jira field IDs relevant to Epic linking.
        This method queries the Jira API to find the correct custom field IDs
        for Epic-related fields, which can vary between different Jira instances.

        Returns:
            Dictionary mapping field names to their IDs
            (e.g., {'epic_link': 'customfield_10014', 'epic_name': 'customfield_10011'})
        """

    @abstractmethod
    def get_required_fields(self, issue_type: str, project_key: str) -> dict[str, Any]:
        """
        Get required fields for creating an issue of a specific type in a project.

        Args:
            issue_type: The issue type (e.g., 'Bug', 'Story', 'Epic')
            project_key: The project key (e.g., 'PROJ')

        Returns:
            Dictionary mapping required field names to their definitions
        """


@runtime_checkable
class ProjectsOperationsProto(Protocol):
    """Protocol defining project operations interface."""

    @abstractmethod
    def get_project_issue_types(self, project_key: str) -> list[dict[str, Any]]:
        """
        Get all issue types available for a project.

        Args:
            project_key: The project key

        Returns:
            List of issue type data dictionaries
        """


@runtime_checkable
class UsersOperationsProto(Protocol):
    """Protocol defining user operations interface."""

    @abstractmethod
    def _get_account_id(self, assignee: str) -> str:
        """Get the account ID for a username.

        Args:
            assignee: Username or account ID

        Returns:
            Account ID

        Raises:
            ValueError: If the account ID could not be found
        """

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/sprints.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira sprints operations."""

import datetime
import logging
from typing import Any

import requests

from ..models.jira import JiraSprint
from ..utils import parse_date
from .client import JiraClient

logger = logging.getLogger("mcp-jira")


class SprintsMixin(JiraClient):
    """Mixin for Jira sprints operations."""

    def get_all_sprints_from_board(
        self, board_id: str, state: str | None = None, start: int = 0, limit: int = 50
    ) -> list[dict[str, Any]]:
        """
        Get all sprints from a board.

        Args:
            board_id: Board ID
            state: Sprint state (e.g., active, future, closed) if None, return all state sprints
            start: Start index
            limit: Maximum number of sprints to return

        Returns:
            List of sprints
        """
        try:
            sprints = self.jira.get_all_sprints_from_board(
                board_id=board_id,
                state=state,
                start=start,
                limit=limit,
            )
            return sprints.get("values", []) if isinstance(sprints, dict) else []
        except requests.HTTPError as e:
            logger.error(
                f"Error getting all sprints from board: {str(e.response.content)}"
            )
            return []
        except Exception as e:
            logger.error(f"Error getting all sprints from board: {str(e)}")
            return []

    def get_all_sprints_from_board_model(
        self, board_id: str, state: str | None = None, start: int = 0, limit: int = 50
    ) -> list[JiraSprint]:
        """
        Get all sprints as JiraSprint from a board.

        Args:
            board_id: Board ID
            state: Sprint state (e.g., active, future, closed) if None, return all state sprints
            start: Start index
            limit: Maximum number of sprints to return

        Returns:
            List of JiraSprint
        """
        sprints = self.get_all_sprints_from_board(
            board_id=board_id,
            state=state,
            start=start,
            limit=limit,
        )
        return [JiraSprint.from_api_response(sprint) for sprint in sprints]

    def update_sprint(
        self,
        sprint_id: str,
        sprint_name: str | None,
        state: str | None,
        start_date: str | None,
        end_date: str | None,
        goal: str | None,
    ) -> JiraSprint | None:
        """
        Update a sprint.

        Args:
            sprint_id: Sprint ID
            sprint_name: New name for the sprint (optional)
            state: New state for the sprint (future|active|closed - optional)
            start_date: New start date for the sprint (optional)
            end_date: New end date for the sprint (optional)
            goal: New goal for the sprint (optional)

        Returns:
            Updated sprint
        """
        data = {}
        if sprint_name:
            data["name"] = sprint_name
        if state and state not in ["future", "active", "closed"]:
            logger.warning("Invalid state. Valid states are: future, active, closed.")
            return None
        elif state:
            data["state"] = state
        if start_date:
            data["startDate"] = start_date
        if end_date:
            data["endDate"] = end_date
        if goal:
            data["goal"] = goal
        if not sprint_id:
            logger.warning("Sprint ID is required.")
            return None
        try:
            updated_sprint = self.jira.update_partially_sprint(
                sprint_id=sprint_id,
                data=data,
            )

            if not isinstance(updated_sprint, dict):
                msg = f"Unexpected return value type from `SprintMixin.update_sprint`: {type(updated_sprint)}"
                logger.error(msg)
                raise TypeError(msg)

            return JiraSprint.from_api_response(updated_sprint)
        except requests.HTTPError as e:
            logger.error(f"Error updating sprint: {str(e.response.content)}")
            return None
        except Exception as e:
            logger.error(f"Error updating sprint: {str(e)}")
            return None

    def create_sprint(
        self,
        board_id: str,
        sprint_name: str,
        start_date: str,
        end_date: str,
        goal: str | None = None,
    ) -> JiraSprint:
        """
        Create a new sprint.

        Args:
            board_id: Board ID
            sprint_name: Sprint name
            start_date: Start date in ISO format
            end_date: End date in ISO format
            goal: Sprint goal

        Returns:
            Created sprint details
        """

        if not start_date:
            raise ValueError("Start date is required.")

        # validate start date format
        parsed_start_date = parse_date(start_date)

        if parsed_start_date is None:
            raise ValueError("Start date is required.")

        # validate start date is not in the past
        if parsed_start_date < datetime.datetime.now(datetime.timezone.utc):
            raise ValueError("Start date cannot be in the past.")

        # validate end date format
        if end_date:
            parsed_end_date = parse_date(end_date)
            if parsed_end_date is not None and parsed_start_date >= parsed_end_date:
                raise ValueError("Start date must be before end date.")

        try:
            sprint = self.jira.create_sprint(
                name=sprint_name,
                board_id=board_id,
                start_date=start_date,
                end_date=end_date,
                goal=goal,
            )

            logger.info(f"Sprint created: {sprint}")

            if not isinstance(sprint, dict):
                msg = f"Unexpected return value type from `SprintMixin.create_sprint`: {type(sprint)}"
                logger.error(msg)
                raise TypeError(msg)

            return JiraSprint.from_api_response(sprint)

        except requests.HTTPError as e:
            logger.error(f"Error creating sprint: {str(e.response.content)}")
            raise
        except Exception as e:
            logger.error(f"Error creating sprint: {str(e)}")
            raise

```

--------------------------------------------------------------------------------
/tests/unit/utils/test_ssl.py:
--------------------------------------------------------------------------------

```python
"""Tests for the SSL utilities module."""

import ssl
from unittest.mock import MagicMock, patch

from requests.adapters import HTTPAdapter
from requests.sessions import Session

from mcp_atlassian.utils.ssl import SSLIgnoreAdapter, configure_ssl_verification


def test_ssl_ignore_adapter_cert_verify():
    """Test that SSLIgnoreAdapter overrides cert verification."""
    # Arrange
    adapter = SSLIgnoreAdapter()
    connection = MagicMock()
    url = "https://example.com"
    cert = None

    # Mock the super class's cert_verify method
    with patch.object(HTTPAdapter, "cert_verify") as mock_super_cert_verify:
        # Act
        adapter.cert_verify(
            connection, url, verify=True, cert=cert
        )  # Pass True, but expect False to be used

        # Assert
        mock_super_cert_verify.assert_called_once_with(
            connection, url, verify=False, cert=cert
        )


def test_ssl_ignore_adapter_init_poolmanager():
    """Test that SSLIgnoreAdapter properly initializes the connection pool with SSL verification disabled."""
    # Arrange
    adapter = SSLIgnoreAdapter()

    # Create a mock for PoolManager that will be returned by constructor
    mock_pool_manager = MagicMock()

    # Mock ssl.create_default_context
    with patch("ssl.create_default_context") as mock_create_context:
        mock_context = MagicMock()
        mock_create_context.return_value = mock_context

        # Patch the PoolManager constructor
        with patch(
            "mcp_atlassian.utils.ssl.PoolManager", return_value=mock_pool_manager
        ) as mock_pool_manager_cls:
            # Act
            adapter.init_poolmanager(5, 10, block=True)

            # Assert
            mock_create_context.assert_called_once()
            assert mock_context.check_hostname is False
            assert mock_context.verify_mode == ssl.CERT_NONE

            # Verify PoolManager was called with our context
            mock_pool_manager_cls.assert_called_once()
            _, kwargs = mock_pool_manager_cls.call_args
            assert kwargs["num_pools"] == 5
            assert kwargs["maxsize"] == 10
            assert kwargs["block"] is True
            assert kwargs["ssl_context"] == mock_context


def test_configure_ssl_verification_disabled():
    """Test configure_ssl_verification when SSL verification is disabled."""
    # Arrange
    service_name = "TestService"
    url = "https://test.example.com/path"
    session = MagicMock()  # Use MagicMock instead of actual Session
    ssl_verify = False

    # Mock the logger to avoid issues with real logging
    with patch("mcp_atlassian.utils.ssl.logger") as mock_logger:
        with patch("mcp_atlassian.utils.ssl.SSLIgnoreAdapter") as mock_adapter_class:
            mock_adapter = MagicMock()
            mock_adapter_class.return_value = mock_adapter

            # Act
            configure_ssl_verification(service_name, url, session, ssl_verify)

            # Assert
            mock_adapter_class.assert_called_once()
            # Verify the adapter is mounted for both http and https
            assert session.mount.call_count == 2
            session.mount.assert_any_call("https://test.example.com", mock_adapter)
            session.mount.assert_any_call("http://test.example.com", mock_adapter)


def test_configure_ssl_verification_enabled():
    """Test configure_ssl_verification when SSL verification is enabled."""
    # Arrange
    service_name = "TestService"
    url = "https://test.example.com/path"
    session = MagicMock()  # Use MagicMock instead of actual Session
    ssl_verify = True

    with patch("mcp_atlassian.utils.ssl.SSLIgnoreAdapter") as mock_adapter_class:
        # Act
        configure_ssl_verification(service_name, url, session, ssl_verify)

        # Assert
        mock_adapter_class.assert_not_called()
        assert session.mount.call_count == 0


def test_configure_ssl_verification_enabled_with_real_session():
    """Test SSL verification configuration when verification is enabled using a real Session."""
    session = Session()
    original_adapters_count = len(session.adapters)

    # Configure with SSL verification enabled
    configure_ssl_verification(
        service_name="Test",
        url="https://example.com",
        session=session,
        ssl_verify=True,
    )

    # No adapters should be added when SSL verification is enabled
    assert len(session.adapters) == original_adapters_count


def test_configure_ssl_verification_disabled_with_real_session():
    """Test SSL verification configuration when verification is disabled using a real Session."""
    session = Session()
    original_adapters_count = len(session.adapters)

    # Mock the logger to avoid issues with real logging
    with patch("mcp_atlassian.utils.ssl.logger") as mock_logger:
        # Configure with SSL verification disabled
        configure_ssl_verification(
            service_name="Test",
            url="https://example.com",
            session=session,
            ssl_verify=False,
        )

        # Should add custom adapters for http and https protocols
        assert len(session.adapters) == original_adapters_count + 2
        assert "https://example.com" in session.adapters
        assert "http://example.com" in session.adapters
        assert isinstance(session.adapters["https://example.com"], SSLIgnoreAdapter)
        assert isinstance(session.adapters["http://example.com"], SSLIgnoreAdapter)


def test_ssl_ignore_adapter():
    """Test the SSLIgnoreAdapter overrides the cert_verify method."""
    # Mock objects
    adapter = SSLIgnoreAdapter()
    conn = MagicMock()
    url = "https://example.com"
    cert = None

    # Test with verify=True - the adapter should still bypass SSL verification
    with patch.object(HTTPAdapter, "cert_verify") as mock_cert_verify:
        adapter.cert_verify(conn, url, verify=True, cert=cert)
        mock_cert_verify.assert_called_once_with(conn, url, verify=False, cert=cert)

    # Test with verify=False - same behavior
    with patch.object(HTTPAdapter, "cert_verify") as mock_cert_verify:
        adapter.cert_verify(conn, url, verify=False, cert=cert)
        mock_cert_verify.assert_called_once_with(conn, url, verify=False, cert=cert)

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_config.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira config module."""

import os
from unittest.mock import patch

import pytest

from mcp_atlassian.jira.config import JiraConfig


def test_from_env_basic_auth():
    """Test that from_env correctly loads basic auth configuration."""
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_USERNAME": "test_username",
            "JIRA_API_TOKEN": "test_token",
        },
        clear=True,
    ):
        config = JiraConfig.from_env()
        assert config.url == "https://test.atlassian.net"
        assert config.auth_type == "basic"
        assert config.username == "test_username"
        assert config.api_token == "test_token"
        assert config.personal_token is None
        assert config.ssl_verify is True


def test_from_env_token_auth():
    """Test that from_env correctly loads token auth configuration."""
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://jira.example.com",
            "JIRA_PERSONAL_TOKEN": "test_personal_token",
            "JIRA_SSL_VERIFY": "false",
        },
        clear=True,
    ):
        config = JiraConfig.from_env()
        assert config.url == "https://jira.example.com"
        assert config.auth_type == "pat"
        assert config.username is None
        assert config.api_token is None
        assert config.personal_token == "test_personal_token"
        assert config.ssl_verify is False


def test_from_env_missing_url():
    """Test that from_env raises ValueError when URL is missing."""
    original_env = os.environ.copy()
    try:
        os.environ.clear()
        with pytest.raises(
            ValueError, match="Missing required JIRA_URL environment variable"
        ):
            JiraConfig.from_env()
    finally:
        # Restore original environment
        os.environ.clear()
        os.environ.update(original_env)


def test_from_env_missing_cloud_auth():
    """Test that from_env raises ValueError when cloud auth credentials are missing."""
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://test.atlassian.net",  # Cloud URL
        },
        clear=True,
    ):
        with pytest.raises(
            ValueError,
            match="Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN",
        ):
            JiraConfig.from_env()


def test_from_env_missing_server_auth():
    """Test that from_env raises ValueError when server auth credentials are missing."""
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://jira.example.com",  # Server URL
        },
        clear=True,
    ):
        with pytest.raises(
            ValueError,
            match="Server/Data Center authentication requires JIRA_PERSONAL_TOKEN",
        ):
            JiraConfig.from_env()


def test_is_cloud():
    """Test that is_cloud property returns correct value."""
    # Arrange & Act - Cloud URL
    config = JiraConfig(
        url="https://example.atlassian.net",
        auth_type="basic",
        username="test",
        api_token="test",
    )

    # Assert
    assert config.is_cloud is True

    # Arrange & Act - Server URL
    config = JiraConfig(
        url="https://jira.example.com",
        auth_type="pat",
        personal_token="test",
    )

    # Assert
    assert config.is_cloud is False

    # Arrange & Act - Localhost URL (Data Center/Server)
    config = JiraConfig(
        url="http://localhost:8080",
        auth_type="pat",
        personal_token="test",
    )

    # Assert
    assert config.is_cloud is False

    # Arrange & Act - IP localhost URL (Data Center/Server)
    config = JiraConfig(
        url="http://127.0.0.1:8080",
        auth_type="pat",
        personal_token="test",
    )

    # Assert
    assert config.is_cloud is False


def test_from_env_proxy_settings():
    """Test that from_env correctly loads proxy environment variables."""
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_USERNAME": "test_username",
            "JIRA_API_TOKEN": "test_token",
            "HTTP_PROXY": "http://proxy.example.com:8080",
            "HTTPS_PROXY": "https://proxy.example.com:8443",
            "SOCKS_PROXY": "socks5://user:[email protected]:1080",
            "NO_PROXY": "localhost,127.0.0.1",
        },
        clear=True,
    ):
        config = JiraConfig.from_env()
        assert config.http_proxy == "http://proxy.example.com:8080"
        assert config.https_proxy == "https://proxy.example.com:8443"
        assert config.socks_proxy == "socks5://user:[email protected]:1080"
        assert config.no_proxy == "localhost,127.0.0.1"

    # Service-specific overrides
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_USERNAME": "test_username",
            "JIRA_API_TOKEN": "test_token",
            "JIRA_HTTP_PROXY": "http://jira-proxy.example.com:8080",
            "JIRA_HTTPS_PROXY": "https://jira-proxy.example.com:8443",
            "JIRA_SOCKS_PROXY": "socks5://user:[email protected]:1080",
            "JIRA_NO_PROXY": "localhost,127.0.0.1,.internal.example.com",
        },
        clear=True,
    ):
        config = JiraConfig.from_env()
        assert config.http_proxy == "http://jira-proxy.example.com:8080"
        assert config.https_proxy == "https://jira-proxy.example.com:8443"
        assert config.socks_proxy == "socks5://user:[email protected]:1080"
        assert config.no_proxy == "localhost,127.0.0.1,.internal.example.com"


def test_is_cloud_oauth_with_cloud_id():
    """Test that is_cloud returns True for OAuth with cloud_id regardless of URL."""
    from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig

    # OAuth with cloud_id and no URL - should be Cloud
    oauth_config = BYOAccessTokenOAuthConfig(
        cloud_id="test-cloud-id", access_token="test-token"
    )
    config = JiraConfig(
        url=None,  # URL can be None in Multi-Cloud OAuth mode
        auth_type="oauth",
        oauth_config=oauth_config,
    )
    assert config.is_cloud is True

    # OAuth with cloud_id and server URL - should still be Cloud
    config = JiraConfig(
        url="https://jira.example.com",  # Server-like URL
        auth_type="oauth",
        oauth_config=oauth_config,
    )
    assert config.is_cloud is True

```

--------------------------------------------------------------------------------
/tests/utils/mocks.py:
--------------------------------------------------------------------------------

```python
"""Reusable mock utilities and fixtures for MCP Atlassian tests."""

import os
from contextlib import contextmanager
from typing import Any
from unittest.mock import MagicMock, patch

from .factories import AuthConfigFactory, ConfluencePageFactory, JiraIssueFactory


class MockEnvironment:
    """Utility for mocking environment variables."""

    @staticmethod
    @contextmanager
    def oauth_env():
        """Context manager for OAuth environment variables."""
        oauth_vars = AuthConfigFactory.create_oauth_config()
        env_vars = {
            "ATLASSIAN_OAUTH_CLIENT_ID": oauth_vars["client_id"],
            "ATLASSIAN_OAUTH_CLIENT_SECRET": oauth_vars["client_secret"],
            "ATLASSIAN_OAUTH_REDIRECT_URI": oauth_vars["redirect_uri"],
            "ATLASSIAN_OAUTH_SCOPE": oauth_vars["scope"],
            "ATLASSIAN_OAUTH_CLOUD_ID": oauth_vars["cloud_id"],
        }
        with patch.dict(os.environ, env_vars, clear=False):
            yield env_vars

    @staticmethod
    @contextmanager
    def basic_auth_env():
        """Context manager for basic auth environment variables."""
        auth_config = AuthConfigFactory.create_basic_auth_config()
        env_vars = {
            "JIRA_URL": auth_config["url"],
            "JIRA_USERNAME": auth_config["username"],
            "JIRA_API_TOKEN": auth_config["api_token"],
            "CONFLUENCE_URL": f"{auth_config['url']}/wiki",
            "CONFLUENCE_USERNAME": auth_config["username"],
            "CONFLUENCE_API_TOKEN": auth_config["api_token"],
        }
        with patch.dict(os.environ, env_vars, clear=False):
            yield env_vars

    @staticmethod
    @contextmanager
    def clean_env():
        """Context manager with no authentication environment variables."""
        auth_vars = [
            "JIRA_URL",
            "JIRA_USERNAME",
            "JIRA_API_TOKEN",
            "CONFLUENCE_URL",
            "CONFLUENCE_USERNAME",
            "CONFLUENCE_API_TOKEN",
            "ATLASSIAN_OAUTH_CLIENT_ID",
            "ATLASSIAN_OAUTH_CLIENT_SECRET",
            "ATLASSIAN_OAUTH_REDIRECT_URI",
            "ATLASSIAN_OAUTH_SCOPE",
            "ATLASSIAN_OAUTH_CLOUD_ID",
            "ATLASSIAN_OAUTH_ENABLE",
        ]

        # Remove auth vars from environment
        with patch.dict(os.environ, {}, clear=False) as env_dict:
            for var in auth_vars:
                env_dict.pop(var, None)
            yield env_dict


class MockAtlassianClient:
    """Factory for creating mock Atlassian clients."""

    @staticmethod
    def create_jira_client(**response_overrides):
        """Create a mock Jira client with common responses."""
        client = MagicMock()

        # Default responses
        default_responses = {
            "issue": JiraIssueFactory.create(),
            "search_issues": {
                "issues": [
                    JiraIssueFactory.create("TEST-1"),
                    JiraIssueFactory.create("TEST-2"),
                ],
                "total": 2,
            },
            "projects": [{"key": "TEST", "name": "Test Project"}],
            "fields": [{"id": "summary", "name": "Summary"}],
        }

        # Merge with overrides
        responses = {**default_responses, **response_overrides}

        # Set up mock methods
        client.issue.return_value = responses["issue"]
        client.search_issues.return_value = responses["search_issues"]
        client.projects.return_value = responses["projects"]
        client.fields.return_value = responses["fields"]

        return client

    @staticmethod
    def create_confluence_client(**response_overrides):
        """Create a mock Confluence client with common responses."""
        client = MagicMock()

        # Default responses
        default_responses = {
            "get_page_by_id": ConfluencePageFactory.create(),
            "get_all_pages_from_space": {
                "results": [
                    ConfluencePageFactory.create("123"),
                    ConfluencePageFactory.create("456"),
                ]
            },
            "get_all_spaces": {"results": [{"key": "TEST", "name": "Test Space"}]},
        }

        # Merge with overrides
        responses = {**default_responses, **response_overrides}

        # Set up mock methods
        client.get_page_by_id.return_value = responses["get_page_by_id"]
        client.get_all_pages_from_space.return_value = responses[
            "get_all_pages_from_space"
        ]
        client.get_all_spaces.return_value = responses["get_all_spaces"]

        return client


class MockOAuthServer:
    """Utility for mocking OAuth server interactions."""

    @staticmethod
    @contextmanager
    def mock_oauth_flow():
        """Context manager for mocking complete OAuth flow."""
        with (
            patch("http.server.HTTPServer") as mock_server,
            patch("webbrowser.open") as mock_browser,
            patch("secrets.token_urlsafe") as mock_token,
        ):
            # Configure mocks
            mock_token.return_value = "test-state-token"
            mock_server_instance = MagicMock()
            mock_server.return_value = mock_server_instance

            yield {
                "server": mock_server,
                "server_instance": mock_server_instance,
                "browser": mock_browser,
                "token": mock_token,
            }


class MockFastMCP:
    """Utility for mocking FastMCP components."""

    @staticmethod
    def create_request(state_data: dict[str, Any] | None = None):
        """Create a mock FastMCP request."""
        request = MagicMock()
        request.state = MagicMock()

        if state_data:
            for key, value in state_data.items():
                setattr(request.state, key, value)

        return request

    @staticmethod
    def create_context():
        """Create a mock FastMCP context."""
        return MagicMock()


class MockPreprocessor:
    """Utility for mocking content preprocessors."""

    @staticmethod
    def create_html_to_markdown():
        """Create a mock HTML to Markdown preprocessor."""
        preprocessor = MagicMock()
        preprocessor.process.return_value = "# Markdown Content"
        return preprocessor

    @staticmethod
    def create_markdown_to_html():
        """Create a mock Markdown to HTML preprocessor."""
        preprocessor = MagicMock()
        preprocessor.process.return_value = "<h1>HTML Content</h1>"
        return preprocessor

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_links.py:
--------------------------------------------------------------------------------

```python
from unittest.mock import MagicMock, Mock, patch

import pytest
from requests.exceptions import HTTPError

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.links import LinksMixin
from mcp_atlassian.models.jira import JiraIssueLinkType


class TestLinksMixin:
    @pytest.fixture
    def links_mixin(self, mock_config, mock_atlassian_jira):
        mixin = LinksMixin(config=mock_config)
        mixin.jira = mock_atlassian_jira
        return mixin

    def test_get_issue_link_types_success(self, links_mixin):
        """Test successful retrieval of issue link types."""
        mock_response = {
            "issueLinkTypes": [
                {
                    "id": "10000",
                    "name": "Blocks",
                    "inward": "is blocked by",
                    "outward": "blocks",
                },
                {
                    "id": "10001",
                    "name": "Duplicate",
                    "inward": "is duplicated by",
                    "outward": "duplicates",
                },
            ]
        }
        links_mixin.jira.get.return_value = mock_response

        def fake_from_api_response(data):
            mock = MagicMock()
            mock.name = data["name"]
            return mock

        with patch.object(
            JiraIssueLinkType, "from_api_response", side_effect=fake_from_api_response
        ):
            result = links_mixin.get_issue_link_types()

        assert len(result) == 2
        assert result[0].name == "Blocks"
        assert result[1].name == "Duplicate"
        links_mixin.jira.get.assert_called_once_with("rest/api/2/issueLinkType")

    def test_get_issue_link_types_authentication_error(self, links_mixin):
        links_mixin.jira.get.side_effect = HTTPError(response=Mock(status_code=401))

        with pytest.raises(MCPAtlassianAuthenticationError):
            links_mixin.get_issue_link_types()

    def test_get_issue_link_types_generic_error(self, links_mixin):
        links_mixin.jira.get.side_effect = Exception("Unexpected error")

        with pytest.raises(Exception, match="Unexpected error"):
            links_mixin.get_issue_link_types()

    def test_create_issue_link_success(self, links_mixin):
        data = {
            "type": {"name": "Blocks"},
            "inwardIssue": {"key": "PROJ-123"},
            "outwardIssue": {"key": "PROJ-456"},
        }

        response = links_mixin.create_issue_link(data)

        assert response["success"] is True
        assert "Link created between PROJ-123 and PROJ-456" in response["message"]
        links_mixin.jira.create_issue_link.assert_called_once_with(data)

    def test_create_issue_link_missing_type(self, links_mixin):
        data = {
            "inwardIssue": {"key": "PROJ-123"},
            "outwardIssue": {"key": "PROJ-456"},
        }

        with pytest.raises(ValueError, match="Link type is required"):
            links_mixin.create_issue_link(data)

    def test_create_issue_link_authentication_error(self, links_mixin):
        data = {
            "type": {"name": "Blocks"},
            "inwardIssue": {"key": "PROJ-123"},
            "outwardIssue": {"key": "PROJ-456"},
        }
        links_mixin.jira.create_issue_link.side_effect = HTTPError(
            response=Mock(status_code=401)
        )

        with pytest.raises(MCPAtlassianAuthenticationError):
            links_mixin.create_issue_link(data)

    def test_create_remote_issue_link_success(self, links_mixin):
        issue_key = "PROJ-123"
        link_data = {
            "object": {
                "url": "https://example.com/page",
                "title": "Example Page",
                "summary": "A test page",
            },
            "relationship": "documentation",
        }

        response = links_mixin.create_remote_issue_link(issue_key, link_data)

        assert response["success"] is True
        assert response["issue_key"] == issue_key
        assert response["link_title"] == "Example Page"
        assert response["link_url"] == "https://example.com/page"
        assert response["relationship"] == "documentation"
        links_mixin.jira.post.assert_called_once_with(
            "rest/api/3/issue/PROJ-123/remotelink", json=link_data
        )

    def test_create_remote_issue_link_missing_issue_key(self, links_mixin):
        link_data = {
            "object": {"url": "https://example.com/page", "title": "Example Page"}
        }

        with pytest.raises(ValueError, match="Issue key is required"):
            links_mixin.create_remote_issue_link("", link_data)

    def test_create_remote_issue_link_missing_object(self, links_mixin):
        issue_key = "PROJ-123"
        link_data = {"relationship": "documentation"}

        with pytest.raises(ValueError, match="Link object is required"):
            links_mixin.create_remote_issue_link(issue_key, link_data)

    def test_create_remote_issue_link_missing_url(self, links_mixin):
        issue_key = "PROJ-123"
        link_data = {"object": {"title": "Example Page"}}

        with pytest.raises(ValueError, match="URL is required in link object"):
            links_mixin.create_remote_issue_link(issue_key, link_data)

    def test_create_remote_issue_link_missing_title(self, links_mixin):
        issue_key = "PROJ-123"
        link_data = {"object": {"url": "https://example.com/page"}}

        with pytest.raises(ValueError, match="Title is required in link object"):
            links_mixin.create_remote_issue_link(issue_key, link_data)

    def test_create_remote_issue_link_authentication_error(self, links_mixin):
        issue_key = "PROJ-123"
        link_data = {
            "object": {"url": "https://example.com/page", "title": "Example Page"}
        }
        links_mixin.jira.post.side_effect = HTTPError(response=Mock(status_code=401))

        with pytest.raises(MCPAtlassianAuthenticationError):
            links_mixin.create_remote_issue_link(issue_key, link_data)

    def test_remove_issue_link_success(self, links_mixin):
        link_id = "10000"

        response = links_mixin.remove_issue_link(link_id)

        assert response["success"] is True
        assert f"Link with ID {link_id} has been removed" in response["message"]
        links_mixin.jira.remove_issue_link.assert_called_once_with(link_id)

    def test_remove_issue_link_empty_id(self, links_mixin):
        with pytest.raises(ValueError, match="Link ID is required"):
            links_mixin.remove_issue_link("")

    def test_remove_issue_link_authentication_error(self, links_mixin):
        link_id = "10000"
        links_mixin.jira.remove_issue_link.side_effect = HTTPError(
            response=Mock(status_code=401)
        )

        with pytest.raises(MCPAtlassianAuthenticationError):
            links_mixin.remove_issue_link(link_id)

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_protocols.py:
--------------------------------------------------------------------------------

```python
"""Tests for Jira protocol definitions."""

import inspect
from typing import Any, get_type_hints

from mcp_atlassian.jira.protocols import (
    AttachmentsOperationsProto,
    UsersOperationsProto,
)
from mcp_atlassian.models.jira import JiraIssue
from mcp_atlassian.models.jira.search import JiraSearchResult


class TestProtocolCompliance:
    """Tests for protocol compliance checking."""

    def test_compliant_attachments_implementation(self):
        """Test compliant attachments implementation."""

        class CompliantAttachments:
            def upload_attachments(
                self, issue_key: str, file_paths: list[str]
            ) -> dict[str, Any]:
                return {"uploaded": len(file_paths)}

        instance = CompliantAttachments()
        assert hasattr(instance, "upload_attachments")
        assert callable(instance.upload_attachments)

    def test_compliant_issue_implementation(self):
        """Test compliant issue implementation."""

        class CompliantIssues:
            def get_issue(
                self,
                issue_key: str,
                expand: str | None = None,
                comment_limit: int | str | None = 10,
                fields: str | list[str] | tuple[str, ...] | set[str] | None = (
                    "summary,description,status,assignee,reporter,labels,"
                    "priority,created,updated,issuetype"
                ),
                properties: str | list[str] | None = None,
                *,
                update_history: bool = True,
            ) -> JiraIssue:
                return JiraIssue(id="123", key=issue_key, summary="Test Issue")

        instance = CompliantIssues()
        assert hasattr(instance, "get_issue")
        result = instance.get_issue("TEST-1")
        assert isinstance(result, JiraIssue)
        assert result.key == "TEST-1"

    def test_compliant_search_implementation(self):
        """Test compliant search implementation."""

        class CompliantSearch:
            def search_issues(
                self,
                jql: str,
                fields: str | list[str] | tuple[str, ...] | set[str] | None = (
                    "summary,description,status,assignee,reporter,labels,"
                    "priority,created,updated,issuetype"
                ),
                start: int = 0,
                limit: int = 50,
                expand: str | None = None,
                projects_filter: str | None = None,
            ) -> JiraSearchResult:
                return JiraSearchResult(
                    total=0, start_at=start, max_results=limit, issues=[]
                )

        instance = CompliantSearch()
        result = instance.search_issues("project = TEST")
        assert isinstance(result, JiraSearchResult)

    def test_runtime_checkable_users_protocol(self):
        """Test runtime checking for UsersOperationsProto."""

        class CompliantUsers:
            def _get_account_id(self, assignee: str) -> str:
                return f"account-id-for-{assignee}"

        class NonCompliantUsers:
            pass

        compliant_instance = CompliantUsers()
        non_compliant_instance = NonCompliantUsers()

        # Runtime checkable only checks method existence
        assert isinstance(compliant_instance, UsersOperationsProto)
        assert not isinstance(non_compliant_instance, UsersOperationsProto)


class TestProtocolContractValidation:
    """Tests for validating protocol contract compliance."""

    def test_method_signature_validation(self):
        """Test method signature validation helper."""

        def validate_method_signature(protocol_class, method_name: str, implementation):
            """Validate implementation method signature matches protocol."""
            protocol_method = getattr(protocol_class, method_name)
            impl_method = getattr(implementation, method_name)

            protocol_sig = inspect.signature(protocol_method)
            impl_sig = inspect.signature(impl_method)

            # Compare parameter names (excluding 'self')
            protocol_params = [p for p in protocol_sig.parameters.keys() if p != "self"]
            impl_params = [p for p in impl_sig.parameters.keys() if p != "self"]

            return protocol_params == impl_params

        class TestImplementation:
            def upload_attachments(
                self, issue_key: str, file_paths: list[str]
            ) -> dict[str, Any]:
                return {}

        impl = TestImplementation()
        assert validate_method_signature(
            AttachmentsOperationsProto, "upload_attachments", impl
        )

    def test_type_hint_validation(self):
        """Test type hint compliance validation."""

        def validate_type_hints(protocol_class, method_name: str, implementation):
            """Validate type hints match between protocol and implementation."""
            protocol_method = getattr(protocol_class, method_name)
            impl_method = getattr(implementation, method_name)

            protocol_hints = get_type_hints(protocol_method)
            impl_hints = get_type_hints(impl_method)

            # Check return type
            return protocol_hints.get("return") == impl_hints.get("return")

        class TypeCompliantImplementation:
            def upload_attachments(
                self, issue_key: str, file_paths: list[str]
            ) -> dict[str, Any]:
                return {}

        impl = TypeCompliantImplementation()
        assert validate_type_hints(
            AttachmentsOperationsProto, "upload_attachments", impl
        )

    def test_structural_compliance_check(self):
        """Test structural typing validation."""

        def check_structural_compliance(instance, protocol_class):
            """Check if instance structurally complies with protocol."""
            abstract_methods = []
            for attr_name in dir(protocol_class):
                if not attr_name.startswith("__"):
                    attr = getattr(protocol_class, attr_name, None)
                    if (
                        callable(attr)
                        and hasattr(attr, "__isabstractmethod__")
                        and attr.__isabstractmethod__
                    ):
                        abstract_methods.append(attr_name)

            # Check if instance has all required methods
            for method_name in abstract_methods:
                if not hasattr(instance, method_name):
                    return False
                if not callable(getattr(instance, method_name)):
                    return False
            return True

        class CompliantImplementation:
            def upload_attachments(
                self, issue_key: str, file_paths: list[str]
            ) -> dict[str, Any]:
                return {}

        class NonCompliantImplementation:
            def some_other_method(self):
                pass

        compliant = CompliantImplementation()
        non_compliant = NonCompliantImplementation()

        assert check_structural_compliance(compliant, AttachmentsOperationsProto)
        assert not check_structural_compliance(
            non_compliant, AttachmentsOperationsProto
        )

```

--------------------------------------------------------------------------------
/tests/unit/utils/test_custom_headers.py:
--------------------------------------------------------------------------------

```python
"""Tests for custom headers parsing functionality."""

from mcp_atlassian.utils.env import get_custom_headers


class TestParseCustomHeaders:
    """Test the parse_custom_headers function."""

    def test_empty_input(self, monkeypatch):
        """Test parse_custom_headers with empty/None inputs."""
        # Test unset environment variable
        monkeypatch.delenv("TEST_HEADERS", raising=False)
        assert get_custom_headers("TEST_HEADERS") == {}

        # Test empty string
        monkeypatch.setenv("TEST_HEADERS", "")
        assert get_custom_headers("TEST_HEADERS") == {}

        # Test whitespace only
        monkeypatch.setenv("TEST_HEADERS", "   ")
        assert get_custom_headers("TEST_HEADERS") == {}

        monkeypatch.setenv("TEST_HEADERS", "\t\n")
        assert get_custom_headers("TEST_HEADERS") == {}

    def test_single_header(self, monkeypatch):
        """Test parsing a single header."""
        monkeypatch.setenv("TEST_HEADERS", "X-Custom=value123")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Custom": "value123"}

        # Test with spaces around key and value
        monkeypatch.setenv("TEST_HEADERS", " X-Spaced = value with spaces ")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Spaced": "value with spaces"}

    def test_multiple_headers(self, monkeypatch):
        """Test parsing multiple comma-separated headers."""
        monkeypatch.setenv("TEST_HEADERS", "X-Corp-Auth=token123,X-Dept=engineering")
        result = get_custom_headers("TEST_HEADERS")
        expected = {"X-Corp-Auth": "token123", "X-Dept": "engineering"}
        assert result == expected

    def test_headers_with_spaces(self, monkeypatch):
        """Test parsing headers with various spacing."""
        monkeypatch.setenv("TEST_HEADERS", " X-Key = value , X-Another = value2 ")
        result = get_custom_headers("TEST_HEADERS")
        expected = {"X-Key": "value", "X-Another": "value2"}
        assert result == expected

    def test_value_with_equals_signs(self, monkeypatch):
        """Test parsing headers where values contain equals signs."""
        monkeypatch.setenv("TEST_HEADERS", "X-Token=abc=def=123")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Token": "abc=def=123"}

        # Multiple headers with equals in values
        monkeypatch.setenv(
            "TEST_HEADERS", "X-Token=abc=def,X-URL=https://api.example.com/v1?key=value"
        )
        result = get_custom_headers("TEST_HEADERS")
        expected = {
            "X-Token": "abc=def",
            "X-URL": "https://api.example.com/v1?key=value",
        }
        assert result == expected

    def test_malformed_headers(self, monkeypatch):
        """Test handling of malformed header strings."""
        # Header without equals sign - should be skipped
        monkeypatch.setenv("TEST_HEADERS", "invalid-header-format")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {}

        # Mix of valid and invalid headers
        monkeypatch.setenv(
            "TEST_HEADERS", "X-Valid=value,invalid-header,X-Another=value2"
        )
        result = get_custom_headers("TEST_HEADERS")
        expected = {"X-Valid": "value", "X-Another": "value2"}
        assert result == expected

    def test_empty_key_or_value(self, monkeypatch):
        """Test handling of empty keys or values."""
        # Empty key - should be skipped
        monkeypatch.setenv("TEST_HEADERS", "=value")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {}

        # Empty value - should be included
        monkeypatch.setenv("TEST_HEADERS", "X-Empty=")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Empty": ""}

        # Whitespace-only key - should be skipped
        monkeypatch.setenv("TEST_HEADERS", "  =value")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {}

        # Mix of empty and valid
        monkeypatch.setenv("TEST_HEADERS", "=empty_key,X-Valid=value, =another_empty")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Valid": "value"}

    def test_special_characters_in_values(self, monkeypatch):
        """Test headers with special characters in values."""
        monkeypatch.setenv("TEST_HEADERS", "X-Special=!@#$%^&*()_+-[]{}|;':\"/<>?")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Special": "!@#$%^&*()_+-[]{}|;':\"/<>?"}

    def test_unicode_characters(self, monkeypatch):
        """Test headers with unicode characters."""
        monkeypatch.setenv("TEST_HEADERS", "X-Unicode=café,X-Emoji=🚀")
        result = get_custom_headers("TEST_HEADERS")
        expected = {"X-Unicode": "café", "X-Emoji": "🚀"}
        assert result == expected

    def test_empty_pairs_in_list(self, monkeypatch):
        """Test handling of empty pairs in comma-separated list."""
        # Empty pairs should be skipped
        monkeypatch.setenv("TEST_HEADERS", "X-First=value1,,X-Second=value2,")
        result = get_custom_headers("TEST_HEADERS")
        expected = {"X-First": "value1", "X-Second": "value2"}
        assert result == expected

        # Only commas
        monkeypatch.setenv("TEST_HEADERS", ",,,")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {}

    def test_complex_real_world_example(self, monkeypatch):
        """Test a complex real-world example."""
        headers_string = (
            "Authorization=Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9,"
            "X-API-Key=sk-1234567890abcdef,"
            "X-Request-ID=req_123456789,"
            "X-Custom-Header=value with spaces and = signs,"
            "User-Agent=MyApp/1.0 (Custom Agent)"
        )

        monkeypatch.setenv("TEST_HEADERS", headers_string)
        result = get_custom_headers("TEST_HEADERS")
        expected = {
            "Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9",
            "X-API-Key": "sk-1234567890abcdef",
            "X-Request-ID": "req_123456789",
            "X-Custom-Header": "value with spaces and = signs",
            "User-Agent": "MyApp/1.0 (Custom Agent)",
        }
        assert result == expected

    def test_case_sensitive_keys(self, monkeypatch):
        """Test that header keys are case-sensitive."""
        monkeypatch.setenv(
            "TEST_HEADERS", "x-lower=value1,X-UPPER=value2,X-Mixed=value3"
        )
        result = get_custom_headers("TEST_HEADERS")
        expected = {"x-lower": "value1", "X-UPPER": "value2", "X-Mixed": "value3"}
        assert result == expected

    def test_duplicate_keys(self, monkeypatch):
        """Test behavior with duplicate keys - later values should override."""
        monkeypatch.setenv("TEST_HEADERS", "X-Duplicate=first,X-Duplicate=second")
        result = get_custom_headers("TEST_HEADERS")
        assert result == {"X-Duplicate": "second"}

    def test_newlines_and_tabs_in_input(self, monkeypatch):
        """Test handling of newlines and tabs in input."""
        # These should be treated as part of values, not separators
        monkeypatch.setenv(
            "TEST_HEADERS", "X-Multi=line1\nline2,X-Tab=value\twith\ttabs"
        )
        result = get_custom_headers("TEST_HEADERS")
        expected = {"X-Multi": "line1\nline2", "X-Tab": "value\twith\ttabs"}
        assert result == expected

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/client.py:
--------------------------------------------------------------------------------

```python
"""Base client module for Confluence API interactions."""

import logging
import os

from atlassian import Confluence
from requests import Session

from ..exceptions import MCPAtlassianAuthenticationError
from ..utils.logging import get_masked_session_headers, log_config_param, mask_sensitive
from ..utils.oauth import configure_oauth_session
from ..utils.ssl import configure_ssl_verification
from .config import ConfluenceConfig

# Configure logging
logger = logging.getLogger("mcp-atlassian")


class ConfluenceClient:
    """Base client for Confluence API interactions."""

    def __init__(self, config: ConfluenceConfig | None = None) -> None:
        """Initialize the Confluence client with given or environment config.

        Args:
            config: Configuration for Confluence client. If None, will load from
                environment.

        Raises:
            ValueError: If configuration is invalid or environment variables are missing
            MCPAtlassianAuthenticationError: If OAuth authentication fails
        """
        self.config = config or ConfluenceConfig.from_env()

        # Initialize the Confluence client based on auth type
        if self.config.auth_type == "oauth":
            if not self.config.oauth_config or not self.config.oauth_config.cloud_id:
                error_msg = "OAuth authentication requires a valid cloud_id"
                raise ValueError(error_msg)

            # Create a session for OAuth
            session = Session()

            # Configure the session with OAuth authentication
            if not configure_oauth_session(session, self.config.oauth_config):
                error_msg = "Failed to configure OAuth session"
                raise MCPAtlassianAuthenticationError(error_msg)

            # The Confluence API URL with OAuth is different
            api_url = f"https://api.atlassian.com/ex/confluence/{self.config.oauth_config.cloud_id}"

            # Initialize Confluence with the session
            self.confluence = Confluence(
                url=api_url,
                session=session,
                cloud=True,  # OAuth is only for Cloud
                verify_ssl=self.config.ssl_verify,
            )
        elif self.config.auth_type == "pat":
            logger.debug(
                f"Initializing Confluence client with Token (PAT) auth. "
                f"URL: {self.config.url}, "
                f"Token (masked): {mask_sensitive(str(self.config.personal_token))}"
            )
            self.confluence = Confluence(
                url=self.config.url,
                token=self.config.personal_token,
                cloud=self.config.is_cloud,
                verify_ssl=self.config.ssl_verify,
            )
        else:  # basic auth
            logger.debug(
                f"Initializing Confluence client with Basic auth. "
                f"URL: {self.config.url}, Username: {self.config.username}, "
                f"API Token present: {bool(self.config.api_token)}, "
                f"Is Cloud: {self.config.is_cloud}"
            )
            self.confluence = Confluence(
                url=self.config.url,
                username=self.config.username,
                password=self.config.api_token,  # API token is used as password
                cloud=self.config.is_cloud,
                verify_ssl=self.config.ssl_verify,
            )
            logger.debug(
                f"Confluence client initialized. "
                f"Session headers (Authorization masked): "
                f"{get_masked_session_headers(dict(self.confluence._session.headers))}"
            )

        # Configure SSL verification using the shared utility
        configure_ssl_verification(
            service_name="Confluence",
            url=self.config.url,
            session=self.confluence._session,
            ssl_verify=self.config.ssl_verify,
        )

        # Proxy configuration
        proxies = {}
        if self.config.http_proxy:
            proxies["http"] = self.config.http_proxy
        if self.config.https_proxy:
            proxies["https"] = self.config.https_proxy
        if self.config.socks_proxy:
            proxies["socks"] = self.config.socks_proxy
        if proxies:
            self.confluence._session.proxies.update(proxies)
            for k, v in proxies.items():
                log_config_param(
                    logger, "Confluence", f"{k.upper()}_PROXY", v, sensitive=True
                )
        if self.config.no_proxy and isinstance(self.config.no_proxy, str):
            os.environ["NO_PROXY"] = self.config.no_proxy
            log_config_param(logger, "Confluence", "NO_PROXY", self.config.no_proxy)

        # Apply custom headers if configured
        if self.config.custom_headers:
            self._apply_custom_headers()

        # Import here to avoid circular imports
        from ..preprocessing.confluence import ConfluencePreprocessor

        self.preprocessor = ConfluencePreprocessor(base_url=self.config.url)

        # Test authentication during initialization (in debug mode only)
        if logger.isEnabledFor(logging.DEBUG):
            try:
                self._validate_authentication()
            except MCPAtlassianAuthenticationError:
                logger.warning(
                    "Authentication validation failed during client initialization - "
                    "continuing anyway"
                )

    def _validate_authentication(self) -> None:
        """Validate authentication by making a simple API call."""
        try:
            logger.debug(
                "Testing Confluence authentication by making a simple API call..."
            )
            # Make a simple API call to test authentication
            spaces = self.confluence.get_all_spaces(start=0, limit=1)
            if spaces is not None:
                logger.info(
                    f"Confluence authentication successful. "
                    f"API call returned {len(spaces.get('results', []))} spaces."
                )
            else:
                logger.warning(
                    "Confluence authentication test returned None - "
                    "this may indicate an issue"
                )
        except Exception as e:
            error_msg = f"Confluence authentication validation failed: {e}"
            logger.error(error_msg)
            logger.debug(
                f"Authentication headers during failure: "
                f"{get_masked_session_headers(dict(self.confluence._session.headers))}"
            )
            raise MCPAtlassianAuthenticationError(error_msg) from e

    def _apply_custom_headers(self) -> None:
        """Apply custom headers to the Confluence session."""
        if not self.config.custom_headers:
            return

        logger.debug(
            f"Applying {len(self.config.custom_headers)} custom headers to Confluence session"
        )
        for header_name, header_value in self.config.custom_headers.items():
            self.confluence._session.headers[header_name] = header_value
            logger.debug(f"Applied custom header: {header_name}")

    def _process_html_content(
        self, html_content: str, space_key: str
    ) -> tuple[str, str]:
        """Process HTML content into both HTML and markdown formats.

        Args:
            html_content: Raw HTML content from Confluence
            space_key: The key of the space containing the content

        Returns:
            Tuple of (processed_html, processed_markdown)
        """
        return self.preprocessor.process_html_content(
            html_content, space_key, self.confluence
        )

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/config.py:
--------------------------------------------------------------------------------

```python
"""Configuration module for Jira API interactions."""

import logging
import os
from dataclasses import dataclass
from typing import Literal

from ..utils.env import get_custom_headers, is_env_ssl_verify
from ..utils.oauth import (
    BYOAccessTokenOAuthConfig,
    OAuthConfig,
    get_oauth_config_from_env,
)
from ..utils.urls import is_atlassian_cloud_url


@dataclass
class JiraConfig:
    """Jira API configuration.

    Handles authentication for Jira Cloud and Server/Data Center:
    - Cloud: username/API token (basic auth) or OAuth 2.0 (3LO)
    - Server/DC: personal access token or basic auth
    """

    url: str  # Base URL for Jira
    auth_type: Literal["basic", "pat", "oauth"]  # Authentication type
    username: str | None = None  # Email or username (Cloud)
    api_token: str | None = None  # API token (Cloud)
    personal_token: str | None = None  # Personal access token (Server/DC)
    oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig | None = None
    ssl_verify: bool = True  # Whether to verify SSL certificates
    projects_filter: str | None = None  # List of project keys to filter searches
    http_proxy: str | None = None  # HTTP proxy URL
    https_proxy: str | None = None  # HTTPS proxy URL
    no_proxy: str | None = None  # Comma-separated list of hosts to bypass proxy
    socks_proxy: str | None = None  # SOCKS proxy URL (optional)
    custom_headers: dict[str, str] | None = None  # Custom HTTP headers

    @property
    def is_cloud(self) -> bool:
        """Check if this is a cloud instance.

        Returns:
            True if this is a cloud instance (atlassian.net), False otherwise.
            Localhost URLs are always considered non-cloud (Server/Data Center).
        """
        # Multi-Cloud OAuth mode: URL might be None, but we use api.atlassian.com
        if (
            self.auth_type == "oauth"
            and self.oauth_config
            and self.oauth_config.cloud_id
        ):
            # OAuth with cloud_id uses api.atlassian.com which is always Cloud
            return True

        # For other auth types, check the URL
        return is_atlassian_cloud_url(self.url) if self.url else False

    @property
    def verify_ssl(self) -> bool:
        """Compatibility property for old code.

        Returns:
            The ssl_verify value
        """
        return self.ssl_verify

    @classmethod
    def from_env(cls) -> "JiraConfig":
        """Create configuration from environment variables.

        Returns:
            JiraConfig with values from environment variables

        Raises:
            ValueError: If required environment variables are missing or invalid
        """
        url = os.getenv("JIRA_URL")
        if not url and not os.getenv("ATLASSIAN_OAUTH_ENABLE"):
            error_msg = "Missing required JIRA_URL environment variable"
            raise ValueError(error_msg)

        # Determine authentication type based on available environment variables
        username = os.getenv("JIRA_USERNAME")
        api_token = os.getenv("JIRA_API_TOKEN")
        personal_token = os.getenv("JIRA_PERSONAL_TOKEN")

        # Check for OAuth configuration
        oauth_config = get_oauth_config_from_env()
        auth_type = None

        # Use the shared utility function directly
        is_cloud = is_atlassian_cloud_url(url)

        if oauth_config:
            # OAuth is available - could be full config or minimal config for user-provided tokens
            auth_type = "oauth"
        elif is_cloud:
            if username and api_token:
                auth_type = "basic"
            else:
                error_msg = "Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN, or OAuth configuration (set ATLASSIAN_OAUTH_ENABLE=true for user-provided tokens)"
                raise ValueError(error_msg)
        else:  # Server/Data Center
            if personal_token:
                auth_type = "pat"
            elif username and api_token:
                # Allow basic auth for Server/DC too
                auth_type = "basic"
            else:
                error_msg = "Server/Data Center authentication requires JIRA_PERSONAL_TOKEN or JIRA_USERNAME and JIRA_API_TOKEN"
                raise ValueError(error_msg)

        # SSL verification (for Server/DC)
        ssl_verify = is_env_ssl_verify("JIRA_SSL_VERIFY")

        # Get the projects filter if provided
        projects_filter = os.getenv("JIRA_PROJECTS_FILTER")

        # Proxy settings
        http_proxy = os.getenv("JIRA_HTTP_PROXY", os.getenv("HTTP_PROXY"))
        https_proxy = os.getenv("JIRA_HTTPS_PROXY", os.getenv("HTTPS_PROXY"))
        no_proxy = os.getenv("JIRA_NO_PROXY", os.getenv("NO_PROXY"))
        socks_proxy = os.getenv("JIRA_SOCKS_PROXY", os.getenv("SOCKS_PROXY"))

        # Custom headers - service-specific only
        custom_headers = get_custom_headers("JIRA_CUSTOM_HEADERS")

        return cls(
            url=url,
            auth_type=auth_type,
            username=username,
            api_token=api_token,
            personal_token=personal_token,
            oauth_config=oauth_config,
            ssl_verify=ssl_verify,
            projects_filter=projects_filter,
            http_proxy=http_proxy,
            https_proxy=https_proxy,
            no_proxy=no_proxy,
            socks_proxy=socks_proxy,
            custom_headers=custom_headers,
        )

    def is_auth_configured(self) -> bool:
        """Check if the current authentication configuration is complete and valid for making API calls.

        Returns:
            bool: True if authentication is fully configured, False otherwise.
        """
        logger = logging.getLogger("mcp-atlassian.jira.config")
        if self.auth_type == "oauth":
            # Handle different OAuth configuration types
            if self.oauth_config:
                # Full OAuth configuration (traditional mode)
                if isinstance(self.oauth_config, OAuthConfig):
                    if (
                        self.oauth_config.client_id
                        and self.oauth_config.client_secret
                        and self.oauth_config.redirect_uri
                        and self.oauth_config.scope
                        and self.oauth_config.cloud_id
                    ):
                        return True
                    # Minimal OAuth configuration (user-provided tokens mode)
                    # This is valid if we have oauth_config but missing client credentials
                    # In this case, we expect authentication to come from user-provided headers
                    elif (
                        not self.oauth_config.client_id
                        and not self.oauth_config.client_secret
                    ):
                        logger.debug(
                            "Minimal OAuth config detected - expecting user-provided tokens via headers"
                        )
                        return True
                # Bring Your Own Access Token mode
                elif isinstance(self.oauth_config, BYOAccessTokenOAuthConfig):
                    if self.oauth_config.cloud_id and self.oauth_config.access_token:
                        return True

            # Partial configuration is invalid
            logger.warning("Incomplete OAuth configuration detected")
            return False
        elif self.auth_type == "pat":
            return bool(self.personal_token)
        elif self.auth_type == "basic":
            return bool(self.username and self.api_token)
        logger.warning(
            f"Unknown or unsupported auth_type: {self.auth_type} in JiraConfig"
        )
        return False

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/config.py:
--------------------------------------------------------------------------------

```python
"""Configuration module for the Confluence client."""

import logging
import os
from dataclasses import dataclass
from typing import Literal

from ..utils.env import get_custom_headers, is_env_ssl_verify
from ..utils.oauth import (
    BYOAccessTokenOAuthConfig,
    OAuthConfig,
    get_oauth_config_from_env,
)
from ..utils.urls import is_atlassian_cloud_url


@dataclass
class ConfluenceConfig:
    """Confluence API configuration.

    Handles authentication for Confluence Cloud and Server/Data Center:
    - Cloud: username/API token (basic auth) or OAuth 2.0 (3LO)
    - Server/DC: personal access token or basic auth
    """

    url: str  # Base URL for Confluence
    auth_type: Literal["basic", "pat", "oauth"]  # Authentication type
    username: str | None = None  # Email or username
    api_token: str | None = None  # API token used as password
    personal_token: str | None = None  # Personal access token (Server/DC)
    oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig | None = None
    ssl_verify: bool = True  # Whether to verify SSL certificates
    spaces_filter: str | None = None  # List of space keys to filter searches
    http_proxy: str | None = None  # HTTP proxy URL
    https_proxy: str | None = None  # HTTPS proxy URL
    no_proxy: str | None = None  # Comma-separated list of hosts to bypass proxy
    socks_proxy: str | None = None  # SOCKS proxy URL (optional)
    custom_headers: dict[str, str] | None = None  # Custom HTTP headers

    @property
    def is_cloud(self) -> bool:
        """Check if this is a cloud instance.

        Returns:
            True if this is a cloud instance (atlassian.net), False otherwise.
            Localhost URLs are always considered non-cloud (Server/Data Center).
        """
        # Multi-Cloud OAuth mode: URL might be None, but we use api.atlassian.com
        if (
            self.auth_type == "oauth"
            and self.oauth_config
            and self.oauth_config.cloud_id
        ):
            # OAuth with cloud_id uses api.atlassian.com which is always Cloud
            return True

        # For other auth types, check the URL
        return is_atlassian_cloud_url(self.url) if self.url else False

    @property
    def verify_ssl(self) -> bool:
        """Compatibility property for old code.

        Returns:
            The ssl_verify value
        """
        return self.ssl_verify

    @classmethod
    def from_env(cls) -> "ConfluenceConfig":
        """Create configuration from environment variables.

        Returns:
            ConfluenceConfig with values from environment variables

        Raises:
            ValueError: If any required environment variable is missing
        """
        url = os.getenv("CONFLUENCE_URL")
        if not url and not os.getenv("ATLASSIAN_OAUTH_ENABLE"):
            error_msg = "Missing required CONFLUENCE_URL environment variable"
            raise ValueError(error_msg)

        # Determine authentication type based on available environment variables
        username = os.getenv("CONFLUENCE_USERNAME")
        api_token = os.getenv("CONFLUENCE_API_TOKEN")
        personal_token = os.getenv("CONFLUENCE_PERSONAL_TOKEN")

        # Check for OAuth configuration
        oauth_config = get_oauth_config_from_env()
        auth_type = None

        # Use the shared utility function directly
        is_cloud = is_atlassian_cloud_url(url)

        if oauth_config:
            # OAuth is available - could be full config or minimal config for user-provided tokens
            auth_type = "oauth"
        elif is_cloud:
            if username and api_token:
                auth_type = "basic"
            else:
                error_msg = "Cloud authentication requires CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN, or OAuth configuration (set ATLASSIAN_OAUTH_ENABLE=true for user-provided tokens)"
                raise ValueError(error_msg)
        else:  # Server/Data Center
            if personal_token:
                auth_type = "pat"
            elif username and api_token:
                # Allow basic auth for Server/DC too
                auth_type = "basic"
            else:
                error_msg = "Server/Data Center authentication requires CONFLUENCE_PERSONAL_TOKEN or CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN"
                raise ValueError(error_msg)

        # SSL verification (for Server/DC)
        ssl_verify = is_env_ssl_verify("CONFLUENCE_SSL_VERIFY")

        # Get the spaces filter if provided
        spaces_filter = os.getenv("CONFLUENCE_SPACES_FILTER")

        # Proxy settings
        http_proxy = os.getenv("CONFLUENCE_HTTP_PROXY", os.getenv("HTTP_PROXY"))
        https_proxy = os.getenv("CONFLUENCE_HTTPS_PROXY", os.getenv("HTTPS_PROXY"))
        no_proxy = os.getenv("CONFLUENCE_NO_PROXY", os.getenv("NO_PROXY"))
        socks_proxy = os.getenv("CONFLUENCE_SOCKS_PROXY", os.getenv("SOCKS_PROXY"))

        # Custom headers - service-specific only
        custom_headers = get_custom_headers("CONFLUENCE_CUSTOM_HEADERS")

        return cls(
            url=url,
            auth_type=auth_type,
            username=username,
            api_token=api_token,
            personal_token=personal_token,
            oauth_config=oauth_config,
            ssl_verify=ssl_verify,
            spaces_filter=spaces_filter,
            http_proxy=http_proxy,
            https_proxy=https_proxy,
            no_proxy=no_proxy,
            socks_proxy=socks_proxy,
            custom_headers=custom_headers,
        )

    def is_auth_configured(self) -> bool:
        """Check if the current authentication configuration is complete and valid for making API calls.

        Returns:
            bool: True if authentication is fully configured, False otherwise.
        """
        logger = logging.getLogger("mcp-atlassian.confluence.config")
        if self.auth_type == "oauth":
            # Handle different OAuth configuration types
            if self.oauth_config:
                # Full OAuth configuration (traditional mode)
                if isinstance(self.oauth_config, OAuthConfig):
                    if (
                        self.oauth_config.client_id
                        and self.oauth_config.client_secret
                        and self.oauth_config.redirect_uri
                        and self.oauth_config.scope
                        and self.oauth_config.cloud_id
                    ):
                        return True
                    # Minimal OAuth configuration (user-provided tokens mode)
                    # This is valid if we have oauth_config but missing client credentials
                    # In this case, we expect authentication to come from user-provided headers
                    elif (
                        not self.oauth_config.client_id
                        and not self.oauth_config.client_secret
                    ):
                        logger.debug(
                            "Minimal OAuth config detected - expecting user-provided tokens via headers"
                        )
                        return True
                # Bring Your Own Access Token mode
                elif isinstance(self.oauth_config, BYOAccessTokenOAuthConfig):
                    if self.oauth_config.cloud_id and self.oauth_config.access_token:
                        return True

            # Partial configuration is invalid
            logger.warning("Incomplete OAuth configuration detected")
            return False
        elif self.auth_type == "pat":
            return bool(self.personal_token)
        elif self.auth_type == "basic":
            return bool(self.username and self.api_token)
        logger.warning(
            f"Unknown or unsupported auth_type: {self.auth_type} in ConfluenceConfig"
        )
        return False

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/link.py:
--------------------------------------------------------------------------------

```python
"""
Jira issue link models.

This module provides Pydantic models for Jira issue links and link types.
"""

import logging
from typing import Any

from ..base import ApiModel
from ..constants import EMPTY_STRING, JIRA_DEFAULT_ID, UNKNOWN
from .common import JiraIssueType, JiraPriority, JiraStatus

logger = logging.getLogger(__name__)


class JiraIssueLinkType(ApiModel):
    """
    Model representing a Jira issue link type.
    """

    id: str = JIRA_DEFAULT_ID
    name: str = UNKNOWN
    inward: str = EMPTY_STRING
    outward: str = EMPTY_STRING
    self_url: str | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "JiraIssueLinkType":
        """
        Create a JiraIssueLinkType from a Jira API response.

        Args:
            data: The issue link type data from the Jira API

        Returns:
            A JiraIssueLinkType instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        link_type_id = data.get("id", JIRA_DEFAULT_ID)
        if link_type_id is not None:
            link_type_id = str(link_type_id)

        return cls(
            id=link_type_id,
            name=str(data.get("name", UNKNOWN)),
            inward=str(data.get("inward", EMPTY_STRING)),
            outward=str(data.get("outward", EMPTY_STRING)),
            self_url=data.get("self"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "id": self.id,
            "name": self.name,
            "inward": self.inward,
            "outward": self.outward,
        }

        if self.self_url:
            result["self"] = self.self_url

        return result


class JiraLinkedIssueFields(ApiModel):
    """
    Model representing the fields of a linked issue.
    """

    summary: str = EMPTY_STRING
    status: JiraStatus | None = None
    priority: JiraPriority | None = None
    issuetype: JiraIssueType | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "JiraLinkedIssueFields":
        """
        Create a JiraLinkedIssueFields from a Jira API response.

        Args:
            data: The linked issue fields data from the Jira API

        Returns:
            A JiraLinkedIssueFields instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Extract status data
        status = None
        status_data = data.get("status")
        if status_data:
            status = JiraStatus.from_api_response(status_data)

        # Extract priority data
        priority = None
        priority_data = data.get("priority")
        if priority_data:
            priority = JiraPriority.from_api_response(priority_data)

        # Extract issue type data
        issuetype = None
        issuetype_data = data.get("issuetype")
        if issuetype_data:
            issuetype = JiraIssueType.from_api_response(issuetype_data)

        return cls(
            summary=str(data.get("summary", EMPTY_STRING)),
            status=status,
            priority=priority,
            issuetype=issuetype,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "summary": self.summary,
        }

        if self.status:
            result["status"] = self.status.to_simplified_dict()

        if self.priority:
            result["priority"] = self.priority.to_simplified_dict()

        if self.issuetype:
            result["issuetype"] = self.issuetype.to_simplified_dict()

        return result


class JiraLinkedIssue(ApiModel):
    """
    Model representing a linked issue in Jira.
    """

    id: str = JIRA_DEFAULT_ID
    key: str = EMPTY_STRING
    self_url: str | None = None
    fields: JiraLinkedIssueFields | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "JiraLinkedIssue":
        """
        Create a JiraLinkedIssue from a Jira API response.

        Args:
            data: The linked issue data from the Jira API

        Returns:
            A JiraLinkedIssue instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Extract fields data
        fields = None
        fields_data = data.get("fields")
        if fields_data:
            fields = JiraLinkedIssueFields.from_api_response(fields_data)

        # Ensure ID is a string
        issue_id = data.get("id", JIRA_DEFAULT_ID)
        if issue_id is not None:
            issue_id = str(issue_id)

        return cls(
            id=issue_id,
            key=str(data.get("key", EMPTY_STRING)),
            self_url=data.get("self"),
            fields=fields,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "id": self.id,
            "key": self.key,
        }

        if self.self_url:
            result["self"] = self.self_url

        if self.fields:
            result["fields"] = self.fields.to_simplified_dict()

        return result


class JiraIssueLink(ApiModel):
    """
    Model representing a link between two Jira issues.
    """

    id: str = JIRA_DEFAULT_ID
    type: JiraIssueLinkType | None = None
    inward_issue: JiraLinkedIssue | None = None
    outward_issue: JiraLinkedIssue | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssueLink":
        """
        Create a JiraIssueLink from a Jira API response.

        Args:
            data: The issue link data from the Jira API

        Returns:
            A JiraIssueLink instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Extract link type data
        link_type = None
        type_data = data.get("type")
        if type_data:
            link_type = JiraIssueLinkType.from_api_response(type_data)

        # Extract inward issue data
        inward_issue = None
        inward_issue_data = data.get("inwardIssue")
        if inward_issue_data:
            inward_issue = JiraLinkedIssue.from_api_response(inward_issue_data)

        # Extract outward issue data
        outward_issue = None
        outward_issue_data = data.get("outwardIssue")
        if outward_issue_data:
            outward_issue = JiraLinkedIssue.from_api_response(outward_issue_data)

        # Ensure ID is a string
        link_id = data.get("id", JIRA_DEFAULT_ID)
        if link_id is not None:
            link_id = str(link_id)

        return cls(
            id=link_id,
            type=link_type,
            inward_issue=inward_issue,
            outward_issue=outward_issue,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "id": self.id,
        }

        if self.type:
            result["type"] = self.type.to_simplified_dict()

        if self.inward_issue:
            result["inward_issue"] = self.inward_issue.to_simplified_dict()

        if self.outward_issue:
            result["outward_issue"] = self.outward_issue.to_simplified_dict()

        return result

```

--------------------------------------------------------------------------------
/tests/unit/test_exceptions.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the exceptions module.
"""

import pickle

import pytest

from src.mcp_atlassian.exceptions import MCPAtlassianAuthenticationError


class TestMCPAtlassianAuthenticationError:
    """Tests for the MCPAtlassianAuthenticationError exception class."""

    def test_instantiation_without_message(self):
        """Test creating exception without a message."""
        error = MCPAtlassianAuthenticationError()

        assert isinstance(error, MCPAtlassianAuthenticationError)
        assert isinstance(error, Exception)
        assert str(error) == ""
        assert error.args == ()

    def test_instantiation_with_message(self):
        """Test creating exception with a message."""
        message = "Authentication failed"
        error = MCPAtlassianAuthenticationError(message)

        assert isinstance(error, MCPAtlassianAuthenticationError)
        assert isinstance(error, Exception)
        assert str(error) == message
        assert error.args == (message,)

    def test_instantiation_with_multiple_args(self):
        """Test creating exception with multiple arguments."""
        message = "Authentication failed"
        code = 401
        error = MCPAtlassianAuthenticationError(message, code)

        assert isinstance(error, MCPAtlassianAuthenticationError)
        assert isinstance(error, Exception)
        # When multiple args are present, str() returns tuple representation
        assert str(error) == "('Authentication failed', 401)"
        assert error.args == (message, code)

    def test_inheritance_hierarchy(self):
        """Test that the exception properly inherits from Exception."""
        error = MCPAtlassianAuthenticationError("test")

        assert isinstance(error, MCPAtlassianAuthenticationError)
        assert isinstance(error, Exception)
        assert isinstance(error, BaseException)
        assert issubclass(MCPAtlassianAuthenticationError, Exception)
        assert issubclass(MCPAtlassianAuthenticationError, BaseException)

    def test_string_representation(self):
        """Test string representation of the exception."""
        # Empty message
        error = MCPAtlassianAuthenticationError()
        assert str(error) == ""
        assert repr(error) == "MCPAtlassianAuthenticationError()"

        # With message
        message = "Invalid credentials provided"
        error = MCPAtlassianAuthenticationError(message)
        assert str(error) == message
        assert repr(error) == f"MCPAtlassianAuthenticationError('{message}')"

        # With multiple args
        error = MCPAtlassianAuthenticationError("Auth failed", 403)
        assert str(error) == "('Auth failed', 403)"
        assert repr(error) == "MCPAtlassianAuthenticationError('Auth failed', 403)"

    def test_exception_raising_and_catching(self):
        """Test raising and catching the exception."""
        message = "401 Unauthorized"

        with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
            raise MCPAtlassianAuthenticationError(message)

        assert str(exc_info.value) == message
        assert exc_info.value.args == (message,)

    def test_exception_catching_as_base_exception(self):
        """Test that the exception can be caught as base Exception."""
        message = "403 Forbidden"

        with pytest.raises(Exception) as exc_info:
            raise MCPAtlassianAuthenticationError(message)

        assert isinstance(exc_info.value, MCPAtlassianAuthenticationError)
        assert str(exc_info.value) == message

    def test_exception_chaining_with_cause(self):
        """Test exception chaining using 'raise from' syntax."""
        original_error = ValueError("Invalid token format")
        auth_message = "Authentication failed due to invalid token"

        with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
            try:
                raise original_error
            except ValueError as e:
                raise MCPAtlassianAuthenticationError(auth_message) from e

        assert str(exc_info.value) == auth_message
        assert exc_info.value.__cause__ is original_error
        # Context is still preserved even with explicit 'raise from'
        assert exc_info.value.__context__ is original_error

    def test_exception_chaining_with_context(self):
        """Test implicit exception chaining (context preservation)."""
        original_error = ConnectionError("Network timeout")
        auth_message = "Authentication failed"

        with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
            try:
                raise original_error
            except ConnectionError:
                raise MCPAtlassianAuthenticationError(auth_message) from None

        assert str(exc_info.value) == auth_message
        assert exc_info.value.__context__ is original_error
        assert exc_info.value.__cause__ is None

    def test_exception_suppressed_context(self):
        """Test exception with suppressed context."""
        original_error = RuntimeError("Some runtime error")
        auth_message = "Authentication failed"

        with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
            try:
                raise original_error
            except RuntimeError:
                error = MCPAtlassianAuthenticationError(auth_message)
                error.__suppress_context__ = True
                raise error from None

        assert str(exc_info.value) == auth_message
        assert exc_info.value.__suppress_context__ is True

    def test_serialization_with_pickle(self):
        """Test that the exception can be pickled and unpickled."""
        message = "Authentication error for serialization test"
        original_error = MCPAtlassianAuthenticationError(message)

        # Serialize
        pickled_data = pickle.dumps(original_error)

        # Deserialize
        unpickled_error = pickle.loads(pickled_data)

        assert isinstance(unpickled_error, MCPAtlassianAuthenticationError)
        assert str(unpickled_error) == message
        assert unpickled_error.args == original_error.args

    def test_exception_attributes_access(self):
        """Test accessing exception attributes."""
        message = "Test message"
        error = MCPAtlassianAuthenticationError(message)

        # Test standard exception attributes
        assert hasattr(error, "args")
        assert hasattr(error, "__traceback__")
        assert hasattr(error, "__cause__")
        assert hasattr(error, "__context__")
        assert hasattr(error, "__suppress_context__")

        # Test docstring access
        expected_doc = "Raised when Atlassian API authentication fails (401/403)."
        assert error.__doc__ == expected_doc

    def test_exception_equality(self):
        """Test exception equality comparison."""
        message = "Same message"
        error1 = MCPAtlassianAuthenticationError(message)
        error2 = MCPAtlassianAuthenticationError(message)
        error3 = MCPAtlassianAuthenticationError("Different message")

        # Exceptions with same args should have same args but different identity
        assert error1.args == error2.args
        assert error1 is not error2
        assert error1.args != error3.args

    def test_realistic_authentication_scenarios(self):
        """Test realistic authentication error scenarios."""
        # 401 Unauthorized
        msg_401 = "401 Unauthorized: Invalid API token"
        error_401 = MCPAtlassianAuthenticationError(msg_401)
        assert "401" in str(error_401)
        assert "Invalid API token" in str(error_401)

        # 403 Forbidden
        msg_403 = "403 Forbidden: Insufficient permissions"
        error_403 = MCPAtlassianAuthenticationError(msg_403)
        assert "403" in str(error_403)
        assert "Insufficient permissions" in str(error_403)

        # OAuth token expired
        oauth_error = MCPAtlassianAuthenticationError("OAuth token has expired")
        assert "OAuth" in str(oauth_error)
        assert "expired" in str(oauth_error)

```

--------------------------------------------------------------------------------
/tests/unit/utils/test_env.py:
--------------------------------------------------------------------------------

```python
"""Tests for environment variable utility functions."""

from mcp_atlassian.utils.env import (
    is_env_extended_truthy,
    is_env_ssl_verify,
    is_env_truthy,
)


class TestIsEnvTruthy:
    """Test the is_env_truthy function."""

    def test_standard_truthy_values(self, monkeypatch):
        """Test standard truthy values: 'true', '1', 'yes'."""
        truthy_values = ["true", "1", "yes"]

        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_truthy("TEST_VAR") is True

        # Test uppercase variants
        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value.upper())
            assert is_env_truthy("TEST_VAR") is True

        # Test mixed case variants
        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value.capitalize())
            assert is_env_truthy("TEST_VAR") is True

    def test_standard_falsy_values(self, monkeypatch):
        """Test that standard falsy values return False."""
        falsy_values = ["false", "0", "no", "", "invalid", "y", "on"]

        for value in falsy_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_truthy("TEST_VAR") is False

    def test_unset_variable_with_default(self, monkeypatch):
        """Test behavior when variable is unset with various defaults."""
        monkeypatch.delenv("TEST_VAR", raising=False)

        # Default empty string
        assert is_env_truthy("TEST_VAR") is False

        # Default truthy value
        assert is_env_truthy("TEST_VAR", "true") is True
        assert is_env_truthy("TEST_VAR", "1") is True
        assert is_env_truthy("TEST_VAR", "yes") is True

        # Default falsy value
        assert is_env_truthy("TEST_VAR", "false") is False
        assert is_env_truthy("TEST_VAR", "0") is False

    def test_empty_string_environment_variable(self, monkeypatch):
        """Test behavior when environment variable is set to empty string."""
        monkeypatch.setenv("TEST_VAR", "")
        assert is_env_truthy("TEST_VAR") is False


class TestIsEnvExtendedTruthy:
    """Test the is_env_extended_truthy function."""

    def test_extended_truthy_values(self, monkeypatch):
        """Test extended truthy values: 'true', '1', 'yes', 'y', 'on'."""
        truthy_values = ["true", "1", "yes", "y", "on"]

        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_extended_truthy("TEST_VAR") is True

        # Test uppercase variants
        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value.upper())
            assert is_env_extended_truthy("TEST_VAR") is True

        # Test mixed case variants
        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value.capitalize())
            assert is_env_extended_truthy("TEST_VAR") is True

    def test_extended_falsy_values(self, monkeypatch):
        """Test that extended falsy values return False."""
        falsy_values = ["false", "0", "no", "", "invalid", "off"]

        for value in falsy_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_extended_truthy("TEST_VAR") is False

    def test_extended_vs_standard_difference(self, monkeypatch):
        """Test that extended truthy accepts 'y' and 'on' while standard doesn't."""
        extended_only_values = ["y", "on"]

        for value in extended_only_values:
            monkeypatch.setenv("TEST_VAR", value)
            # Extended should be True
            assert is_env_extended_truthy("TEST_VAR") is True
            # Standard should be False
            assert is_env_truthy("TEST_VAR") is False

    def test_unset_variable_with_default(self, monkeypatch):
        """Test behavior when variable is unset with various defaults."""
        monkeypatch.delenv("TEST_VAR", raising=False)

        # Default empty string
        assert is_env_extended_truthy("TEST_VAR") is False

        # Default truthy values
        assert is_env_extended_truthy("TEST_VAR", "true") is True
        assert is_env_extended_truthy("TEST_VAR", "y") is True
        assert is_env_extended_truthy("TEST_VAR", "on") is True

        # Default falsy value
        assert is_env_extended_truthy("TEST_VAR", "false") is False


class TestIsEnvSslVerify:
    """Test the is_env_ssl_verify function."""

    def test_ssl_verify_default_true(self, monkeypatch):
        """Test that SSL verification defaults to True when unset."""
        monkeypatch.delenv("TEST_VAR", raising=False)
        assert is_env_ssl_verify("TEST_VAR") is True

    def test_ssl_verify_explicit_false_values(self, monkeypatch):
        """Test that SSL verification is False only for explicit false values."""
        false_values = ["false", "0", "no"]

        for value in false_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_ssl_verify("TEST_VAR") is False

        # Test uppercase variants
        for value in false_values:
            monkeypatch.setenv("TEST_VAR", value.upper())
            assert is_env_ssl_verify("TEST_VAR") is False

        # Test mixed case variants
        for value in false_values:
            monkeypatch.setenv("TEST_VAR", value.capitalize())
            assert is_env_ssl_verify("TEST_VAR") is False

    def test_ssl_verify_truthy_and_other_values(self, monkeypatch):
        """Test that SSL verification is True for truthy and other values."""
        truthy_values = ["true", "1", "yes", "y", "on", "enable", "enabled", "anything"]

        for value in truthy_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_ssl_verify("TEST_VAR") is True

    def test_ssl_verify_custom_default(self, monkeypatch):
        """Test SSL verification with custom defaults."""
        monkeypatch.delenv("TEST_VAR", raising=False)

        # Custom default true
        assert is_env_ssl_verify("TEST_VAR", "true") is True

        # Custom default false
        assert is_env_ssl_verify("TEST_VAR", "false") is False

        # Custom default other value
        assert is_env_ssl_verify("TEST_VAR", "anything") is True

    def test_ssl_verify_empty_string(self, monkeypatch):
        """Test SSL verification when set to empty string."""
        monkeypatch.setenv("TEST_VAR", "")
        # Empty string is not in the false values, so should be True
        assert is_env_ssl_verify("TEST_VAR") is True


class TestEdgeCases:
    """Test edge cases and special scenarios."""

    def test_whitespace_handling(self, monkeypatch):
        """Test that whitespace in values is not stripped."""
        # Values with leading/trailing whitespace should not match
        monkeypatch.setenv("TEST_VAR", " true ")
        assert is_env_truthy("TEST_VAR") is False
        assert is_env_extended_truthy("TEST_VAR") is False

        monkeypatch.setenv("TEST_VAR", " false ")
        assert is_env_ssl_verify("TEST_VAR") is True  # Not in false values

    def test_special_characters(self, monkeypatch):
        """Test behavior with special characters."""
        special_values = ["true!", "@yes", "1.0", "y,", "on;"]

        for value in special_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_truthy("TEST_VAR") is False
            assert is_env_extended_truthy("TEST_VAR") is False
            assert is_env_ssl_verify("TEST_VAR") is True  # Not in false values

    def test_unicode_values(self, monkeypatch):
        """Test behavior with unicode values."""
        unicode_values = ["truë", "yés", "1️⃣"]

        for value in unicode_values:
            monkeypatch.setenv("TEST_VAR", value)
            assert is_env_truthy("TEST_VAR") is False
            assert is_env_extended_truthy("TEST_VAR") is False
            assert is_env_ssl_verify("TEST_VAR") is True  # Not in false values

    def test_numeric_string_edge_cases(self, monkeypatch):
        """Test numeric string edge cases."""
        numeric_values = ["01", "1.0", "10", "-1", "2"]

        for value in numeric_values:
            monkeypatch.setenv("TEST_VAR", value)
            if value == "01":
                # "01" is not exactly "1", so should be False
                assert is_env_truthy("TEST_VAR") is False
                assert is_env_extended_truthy("TEST_VAR") is False
            else:
                assert is_env_truthy("TEST_VAR") is False
                assert is_env_extended_truthy("TEST_VAR") is False
            assert is_env_ssl_verify("TEST_VAR") is True  # Not in false values

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/worklog.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira worklog operations."""

import logging
import re
from typing import Any

from ..models import JiraWorklog
from ..utils import parse_date
from .client import JiraClient

logger = logging.getLogger("mcp-jira")


class WorklogMixin(JiraClient):
    """Mixin for Jira worklog operations."""

    def _parse_time_spent(self, time_spent: str) -> int:
        """
        Parse time spent string into seconds.

        Args:
            time_spent: Time spent string (e.g. 1h 30m, 1d, etc.)

        Returns:
            Time spent in seconds
        """
        # Base case for direct specification in seconds
        if time_spent.endswith("s"):
            try:
                return int(time_spent[:-1])
            except ValueError:
                pass

        total_seconds = 0
        time_units = {
            "w": 7 * 24 * 60 * 60,  # weeks to seconds
            "d": 24 * 60 * 60,  # days to seconds
            "h": 60 * 60,  # hours to seconds
            "m": 60,  # minutes to seconds
        }

        # Regular expression to find time components like 1w, 2d, 3h, 4m
        pattern = r"(\d+)([wdhm])"
        matches = re.findall(pattern, time_spent)

        for value, unit in matches:
            # Convert value to int and multiply by the unit in seconds
            seconds = int(value) * time_units[unit]
            total_seconds += seconds

        if total_seconds == 0:
            # If we couldn't parse anything, try using the raw value
            try:
                return int(float(time_spent))  # Convert to float first, then to int
            except ValueError:
                # If all else fails, default to 60 seconds (1 minute)
                logger.warning(
                    f"Could not parse time: {time_spent}, defaulting to 60 seconds"
                )
                return 60

        return total_seconds

    def add_worklog(
        self,
        issue_key: str,
        time_spent: str,
        comment: str | None = None,
        started: str | None = None,
        original_estimate: str | None = None,
        remaining_estimate: str | None = None,
    ) -> dict[str, Any]:
        """
        Add a worklog entry to a Jira issue.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')
            time_spent: Time spent (e.g. '1h 30m', '3h', '1d')
            comment: Optional comment for the worklog
            started: Optional ISO8601 date time string for when work began
            original_estimate: Optional new value for the original estimate
            remaining_estimate: Optional new value for the remaining estimate

        Returns:
            Response data if successful

        Raises:
            Exception: If there's an error adding the worklog
        """
        try:
            # Convert time_spent string to seconds
            time_spent_seconds = self._parse_time_spent(time_spent)

            # Convert Markdown comment to Jira format if provided
            if comment:
                # Check if _markdown_to_jira is available (from CommentsMixin)
                if hasattr(self, "_markdown_to_jira"):
                    comment = self._markdown_to_jira(comment)

            # Step 1: Update original estimate if provided (separate API call)
            original_estimate_updated = False
            if original_estimate:
                try:
                    fields = {"timetracking": {"originalEstimate": original_estimate}}
                    self.jira.edit_issue(issue_id_or_key=issue_key, fields=fields)
                    original_estimate_updated = True
                    logger.info(f"Updated original estimate for issue {issue_key}")
                except Exception as e:  # noqa: BLE001 - Intentional fallback with logging
                    logger.error(
                        f"Failed to update original estimate for issue {issue_key}: "
                        f"{str(e)}"
                    )
                    # Continue with worklog creation even if estimate update fails

            # Step 2: Prepare worklog data
            worklog_data: dict[str, Any] = {"timeSpentSeconds": time_spent_seconds}
            if comment:
                worklog_data["comment"] = comment
            if started:
                worklog_data["started"] = started

            # Step 3: Prepare query parameters for remaining estimate
            params = {}
            remaining_estimate_updated = False
            if remaining_estimate:
                params["adjustEstimate"] = "new"
                params["newEstimate"] = remaining_estimate
                remaining_estimate_updated = True

            # Step 4: Add the worklog with remaining estimate adjustment
            base_url = self.jira.resource_url("issue")
            url = f"{base_url}/{issue_key}/worklog"

            result = self.jira.post(url, data=worklog_data, params=params)
            if not isinstance(result, dict):
                msg = f"Unexpected return value type from `jira.post`: {type(result)}"
                logger.error(msg)
                raise TypeError(msg)

            # Format and return the result
            return {
                "id": result.get("id"),
                "comment": self._clean_text(result.get("comment", "")),
                "created": str(parse_date(result.get("created", ""))),
                "updated": str(parse_date(result.get("updated", ""))),
                "started": str(parse_date(result.get("started", ""))),
                "timeSpent": result.get("timeSpent", ""),
                "timeSpentSeconds": result.get("timeSpentSeconds", 0),
                "author": result.get("author", {}).get("displayName", "Unknown"),
                "original_estimate_updated": original_estimate_updated,
                "remaining_estimate_updated": remaining_estimate_updated,
            }
        except Exception as e:
            logger.error(f"Error adding worklog to issue {issue_key}: {str(e)}")
            raise Exception(f"Error adding worklog: {str(e)}") from e

    def get_worklog(self, issue_key: str) -> dict[str, Any]:
        """
        Get the worklog data for an issue.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')

        Returns:
            Raw worklog data from the API
        """
        try:
            return self.jira.worklog(issue_key)  # type: ignore[attr-defined]
        except Exception as e:
            logger.warning(f"Error getting worklog for {issue_key}: {e}")
            return {"worklogs": []}

    def get_worklog_models(self, issue_key: str) -> list[JiraWorklog]:
        """
        Get all worklog entries for an issue as JiraWorklog models.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')

        Returns:
            List of JiraWorklog models
        """
        worklog_data = self.get_worklog(issue_key)
        result: list[JiraWorklog] = []

        if "worklogs" in worklog_data and worklog_data["worklogs"]:
            for log_data in worklog_data["worklogs"]:
                worklog = JiraWorklog.from_api_response(log_data)
                result.append(worklog)

        return result

    def get_worklogs(self, issue_key: str) -> list[dict[str, Any]]:
        """
        Get all worklog entries for an issue.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')

        Returns:
            List of worklog entries

        Raises:
            Exception: If there's an error getting the worklogs
        """
        try:
            result = self.jira.issue_get_worklog(issue_key)
            if not isinstance(result, dict):
                msg = f"Unexpected return value type from `jira.issue_get_worklog`: {type(result)}"
                logger.error(msg)
                raise TypeError(msg)

            # Process the worklogs
            worklogs = []
            for worklog in result.get("worklogs", []):
                worklogs.append(
                    {
                        "id": worklog.get("id"),
                        "comment": self._clean_text(worklog.get("comment", "")),
                        "created": str(parse_date(worklog.get("created", ""))),
                        "updated": str(parse_date(worklog.get("updated", ""))),
                        "started": str(parse_date(worklog.get("started", ""))),
                        "timeSpent": worklog.get("timeSpent", ""),
                        "timeSpentSeconds": worklog.get("timeSpentSeconds", 0),
                        "author": worklog.get("author", {}).get(
                            "displayName", "Unknown"
                        ),
                    }
                )

            return worklogs
        except Exception as e:
            logger.error(f"Error getting worklogs for issue {issue_key}: {str(e)}")
            raise Exception(f"Error getting worklogs: {str(e)}") from e

```
Page 2/10FirstPrevNextLast