#
tokens: 45885/50000 11/194 files (page 5/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 5 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/unit/jira/conftest.py:
--------------------------------------------------------------------------------

```python
"""
Test fixtures for Jira unit tests.

This module provides specialized fixtures for testing Jira-related functionality.
It builds upon the root conftest.py fixtures and provides Jira-specific mocks,
configurations, and utilities with efficient session-scoped caching.
"""

import os
from unittest.mock import MagicMock, patch

import pytest

from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from tests.utils.factories import AuthConfigFactory, JiraIssueFactory
from tests.utils.mocks import MockAtlassianClient

# ============================================================================
# Session-Scoped Jira Data Fixtures
# ============================================================================


@pytest.fixture(scope="session")
def session_jira_field_definitions():
    """
    Session-scoped fixture providing Jira field definitions.

    This expensive-to-create data is cached for the entire test session
    to improve test performance.

    Returns:
        List[Dict[str, Any]]: Complete Jira field definitions
    """
    return [
        {"id": "summary", "name": "Summary", "schema": {"type": "string"}},
        {"id": "description", "name": "Description", "schema": {"type": "string"}},
        {"id": "issuetype", "name": "Issue Type", "schema": {"type": "issuetype"}},
        {"id": "status", "name": "Status", "schema": {"type": "status"}},
        {"id": "priority", "name": "Priority", "schema": {"type": "priority"}},
        {
            "id": "labels",
            "name": "Labels",
            "schema": {"type": "array", "items": "string"},
        },
        {"id": "assignee", "name": "Assignee", "schema": {"type": "user"}},
        {"id": "reporter", "name": "Reporter", "schema": {"type": "user"}},
        {"id": "created", "name": "Created", "schema": {"type": "datetime"}},
        {"id": "updated", "name": "Updated", "schema": {"type": "datetime"}},
        {
            "id": "fixVersions",
            "name": "Fix Version/s",
            "schema": {"type": "array", "items": "version"},
        },
        {
            "id": "customfield_10010",
            "name": "Epic Link",
            "schema": {
                "type": "string",
                "custom": "com.pyxis.greenhopper.jira:gh-epic-link",
            },
        },
        {
            "id": "customfield_10011",
            "name": "Epic Name",
            "schema": {
                "type": "string",
                "custom": "com.pyxis.greenhopper.jira:gh-epic-label",
            },
        },
        {
            "id": "customfield_10012",
            "name": "Story Points",
            "schema": {"type": "number"},
        },
        {
            "id": "customfield_10013",
            "name": "Sprint",
            "schema": {"type": "array", "items": "string"},
        },
        {
            "id": "components",
            "name": "Component/s",
            "schema": {"type": "array", "items": "component"},
        },
        {
            "id": "resolution",
            "name": "Resolution",
            "schema": {"type": "resolution"},
        },
        {
            "id": "resolutiondate",
            "name": "Resolved",
            "schema": {"type": "datetime"},
        },
        {
            "id": "workratio",
            "name": "Work Ratio",
            "schema": {"type": "number"},
        },
        {
            "id": "timeestimate",
            "name": "Remaining Estimate",
            "schema": {"type": "timetracking"},
        },
        {
            "id": "timespent",
            "name": "Time Spent",
            "schema": {"type": "timetracking"},
        },
        {
            "id": "timeoriginalestimate",
            "name": "Original Estimate",
            "schema": {"type": "timetracking"},
        },
    ]


@pytest.fixture(scope="session")
def session_jira_projects():
    """
    Session-scoped fixture providing Jira project definitions.

    Returns:
        List[Dict[str, Any]]: Mock Jira project data
    """
    return [
        {
            "id": "10000",
            "key": "TEST",
            "name": "Test Project",
            "projectTypeKey": "software",
            "lead": {"displayName": "Test Lead"},
            "description": "Test project for unit tests",
        },
        {
            "id": "10001",
            "key": "DEMO",
            "name": "Demo Project",
            "projectTypeKey": "business",
            "lead": {"displayName": "Demo Lead"},
            "description": "Demo project for testing",
        },
        {
            "id": "10002",
            "key": "SAMPLE",
            "name": "Sample Project",
            "projectTypeKey": "service_desk",
            "lead": {"displayName": "Sample Lead"},
            "description": "Sample project with service desk",
        },
    ]


@pytest.fixture(scope="session")
def session_jira_issue_types():
    """
    Session-scoped fixture providing Jira issue type definitions.

    Returns:
        List[Dict[str, Any]]: Mock Jira issue type data
    """
    return [
        {"id": "1", "name": "Bug", "iconUrl": "bug.png", "subtask": False},
        {"id": "2", "name": "Task", "iconUrl": "task.png", "subtask": False},
        {"id": "3", "name": "Story", "iconUrl": "story.png", "subtask": False},
        {"id": "4", "name": "Epic", "iconUrl": "epic.png", "subtask": False},
        {"id": "5", "name": "Sub-task", "iconUrl": "subtask.png", "subtask": True},
        {
            "id": "6",
            "name": "Improvement",
            "iconUrl": "improvement.png",
            "subtask": False,
        },
        {
            "id": "7",
            "name": "New Feature",
            "iconUrl": "newfeature.png",
            "subtask": False,
        },
    ]


# ============================================================================
# Configuration Fixtures
# ============================================================================


@pytest.fixture
def jira_config_factory():
    """
    Factory for creating JiraConfig instances with customizable options.

    Returns:
        Callable: Function that creates JiraConfig instances

    Example:
        def test_config(jira_config_factory):
            config = jira_config_factory(url="https://custom.atlassian.net")
            assert config.url == "https://custom.atlassian.net"
    """

    def _create_config(**overrides):
        defaults = {
            "url": "https://test.atlassian.net",
            "auth_type": "basic",
            "username": "test_username",
            "api_token": "test_token",
        }
        config_data = {**defaults, **overrides}
        return JiraConfig(**config_data)

    return _create_config


@pytest.fixture
def mock_config(jira_config_factory):
    """
    Create a standard mock JiraConfig instance.

    This fixture provides a consistent JiraConfig for tests that don't
    need custom configuration.

    Returns:
        JiraConfig: Standard test configuration
    """
    return jira_config_factory()


# ============================================================================
# Environment Fixtures
# ============================================================================


@pytest.fixture
def mock_env_vars():
    """
    Mock environment variables for testing.

    Note: This fixture is maintained for backward compatibility.
    Consider using the environment fixtures from root conftest.py.
    """
    with patch.dict(
        os.environ,
        {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_USERNAME": "test_username",
            "JIRA_API_TOKEN": "test_token",
        },
        clear=True,  # Clear existing environment variables
    ):
        yield


@pytest.fixture
def jira_auth_environment():
    """
    Fixture providing Jira-specific authentication environment.

    This sets up environment variables specifically for Jira authentication
    and can be customized per test.
    """
    auth_config = AuthConfigFactory.create_basic_auth_config()
    jira_env = {
        "JIRA_URL": auth_config["url"],
        "JIRA_USERNAME": auth_config["username"],
        "JIRA_API_TOKEN": auth_config["api_token"],
    }

    with patch.dict(os.environ, jira_env, clear=False):
        yield jira_env


# ============================================================================
# Mock Atlassian Client Fixtures
# ============================================================================


@pytest.fixture
def mock_atlassian_jira(
    session_jira_field_definitions, session_jira_projects, session_jira_issue_types
):
    """
    Enhanced mock of the Atlassian Jira client.

    This fixture provides a comprehensive mock that uses session-scoped
    data for improved performance and consistency.

    Args:
        session_jira_field_definitions: Session-scoped field definitions
        session_jira_projects: Session-scoped project data
        session_jira_issue_types: Session-scoped issue type data

    Returns:
        MagicMock: Fully configured mock Jira client
    """
    mock_jira = MagicMock()

    # Use session-scoped data for consistent responses
    mock_jira.get_all_fields.return_value = session_jira_field_definitions
    mock_jira.projects.return_value = session_jira_projects
    mock_jira.issue_types.return_value = session_jira_issue_types

    # Set up common method returns using factory
    mock_jira.myself.return_value = {
        "accountId": "test-account-id",
        "displayName": "Test User",
    }
    mock_jira.get_issue.return_value = JiraIssueFactory.create()

    # Search results
    mock_jira.jql.return_value = {
        "issues": [
            JiraIssueFactory.create("TEST-1"),
            JiraIssueFactory.create("TEST-2"),
            JiraIssueFactory.create("TEST-3"),
        ],
        "total": 3,
        "startAt": 0,
        "maxResults": 50,
    }

    # Issue creation
    mock_jira.create_issue.return_value = JiraIssueFactory.create()

    # Issue update (returns None like real API)
    mock_jira.update_issue.return_value = None

    # Worklog operations
    mock_jira.get_issue_worklog.return_value = {
        "worklogs": [
            {
                "id": "10000",
                "timeSpent": "3h",
                "timeSpentSeconds": 10800,
                "comment": "Test work",
                "started": "2023-01-01T09:00:00.000+0000",
                "author": {"displayName": "Test User"},
            }
        ]
    }

    # Comments
    mock_jira.get_issue_comments.return_value = {
        "comments": [
            {
                "id": "10000",
                "body": "Test comment",
                "author": {"displayName": "Test User"},
                "created": "2023-01-01T12:00:00.000+0000",
            }
        ]
    }

    yield mock_jira


@pytest.fixture
def enhanced_mock_jira_client():
    """
    Enhanced mock Jira client using the new factory system.

    This provides a more flexible mock that can be easily customized
    and integrates with the factory system.

    Returns:
        MagicMock: Enhanced mock Jira client with factory integration
    """
    return MockAtlassianClient.create_jira_client()


# ============================================================================
# Client Instance Fixtures
# ============================================================================


@pytest.fixture
def jira_client(mock_config, mock_atlassian_jira):
    """
    Create a JiraClient instance with mocked dependencies.

    This fixture provides a fully functional JiraClient with mocked
    Atlassian API calls for testing.

    Args:
        mock_config: Mock configuration
        mock_atlassian_jira: Mock Atlassian client

    Returns:
        JiraClient: Configured client instance
    """
    with patch("atlassian.Jira") as mock_jira_class:
        mock_jira_class.return_value = mock_atlassian_jira

        client = JiraClient(config=mock_config)
        # Replace the actual Jira instance with our mock
        client.jira = mock_atlassian_jira
        yield client


@pytest.fixture
def jira_fetcher(mock_config, mock_atlassian_jira):
    """
    Create a JiraFetcher instance with mocked dependencies.

    Note: This fixture is maintained for backward compatibility.

    Args:
        mock_config: Mock configuration
        mock_atlassian_jira: Mock Atlassian client

    Returns:
        JiraFetcher: Configured fetcher instance
    """
    from mcp_atlassian.jira import JiraFetcher

    with patch("atlassian.Jira") as mock_jira_class:
        mock_jira_class.return_value = mock_atlassian_jira

        fetcher = JiraFetcher(config=mock_config)
        # Replace the actual Jira instance with our mock
        fetcher.jira = mock_atlassian_jira
        yield fetcher


# ============================================================================
# Specialized Test Data Fixtures
# ============================================================================


@pytest.fixture
def make_jira_issue_with_worklog():
    """
    Factory fixture for creating Jira issues with worklog data.

    Returns:
        Callable: Function that creates issue data with worklog

    Example:
        def test_worklog(make_jira_issue_with_worklog):
            issue = make_jira_issue_with_worklog(
                key="TEST-123",
                worklog_hours=5,
                worklog_comment="Development work"
            )
    """

    def _create_issue_with_worklog(
        key: str = "TEST-123",
        worklog_hours: int = 3,
        worklog_comment: str = "Test work",
        **overrides,
    ):
        issue = JiraIssueFactory.create(key, **overrides)
        issue["fields"]["worklog"] = {
            "worklogs": [
                {
                    "id": "10000",
                    "timeSpent": f"{worklog_hours}h",
                    "timeSpentSeconds": worklog_hours * 3600,
                    "comment": worklog_comment,
                    "started": "2023-01-01T09:00:00.000+0000",
                    "author": {"displayName": "Test User"},
                }
            ]
        }
        return issue

    return _create_issue_with_worklog


@pytest.fixture
def make_jira_search_results():
    """
    Factory fixture for creating Jira search results.

    Returns:
        Callable: Function that creates JQL search results

    Example:
        def test_search(make_jira_search_results):
            results = make_jira_search_results(
                issues=["TEST-1", "TEST-2"],
                total=2
            )
    """

    def _create_search_results(
        issues: list[str] = None, total: int = None, **overrides
    ):
        if issues is None:
            issues = ["TEST-1", "TEST-2", "TEST-3"]
        if total is None:
            total = len(issues)

        issue_objects = [JiraIssueFactory.create(key) for key in issues]

        defaults = {
            "issues": issue_objects,
            "total": total,
            "startAt": 0,
            "maxResults": 50,
        }

        return {**defaults, **overrides}

    return _create_search_results


# ============================================================================
# Integration Test Fixtures
# ============================================================================


@pytest.fixture
def jira_integration_client(session_auth_configs):
    """
    Create a JiraClient for integration testing.

    This fixture creates a client that can be used for integration tests
    when real API credentials are available.

    Args:
        session_auth_configs: Session-scoped auth configurations

    Returns:
        Optional[JiraClient]: Real client if credentials available, None otherwise
    """
    # Check if integration test environment variables are set
    required_vars = ["JIRA_URL", "JIRA_USERNAME", "JIRA_API_TOKEN"]
    if not all(os.environ.get(var) for var in required_vars):
        pytest.skip("Integration test environment variables not set")

    config = JiraConfig(
        url=os.environ["JIRA_URL"],
        auth_type="basic",
        username=os.environ["JIRA_USERNAME"],
        api_token=os.environ["JIRA_API_TOKEN"],
    )

    return JiraClient(config=config)


# ============================================================================
# Parameterized Fixtures
# ============================================================================


@pytest.fixture
def parametrized_jira_issue_type(request):
    """
    Parametrized fixture for testing with different Jira issue types.

    Use with pytest.mark.parametrize to test functionality across
    different issue types.

    Example:
        @pytest.mark.parametrize("parametrized_jira_issue_type",
                               ["Bug", "Task", "Story"], indirect=True)
        def test_issue_types(parametrized_jira_issue_type):
            # Test runs once for each issue type
            pass
    """
    issue_type = request.param
    return JiraIssueFactory.create(fields={"issuetype": {"name": issue_type}})


@pytest.fixture
def parametrized_jira_status(request):
    """
    Parametrized fixture for testing with different Jira statuses.

    Use with pytest.mark.parametrize to test functionality across
    different issue statuses.
    """
    status = request.param
    return JiraIssueFactory.create(fields={"status": {"name": status}})

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/dependencies.py:
--------------------------------------------------------------------------------

```python
"""Dependency providers for JiraFetcher and ConfluenceFetcher with context awareness.

Provides get_jira_fetcher and get_confluence_fetcher for use in tool functions.
"""

from __future__ import annotations

import dataclasses
import logging
from typing import TYPE_CHECKING, Any

from fastmcp import Context
from fastmcp.server.dependencies import get_http_request
from starlette.requests import Request

from mcp_atlassian.confluence import ConfluenceConfig, ConfluenceFetcher
from mcp_atlassian.jira import JiraConfig, JiraFetcher
from mcp_atlassian.servers.context import MainAppContext
from mcp_atlassian.utils.oauth import OAuthConfig

if TYPE_CHECKING:
    from mcp_atlassian.confluence.config import (
        ConfluenceConfig as UserConfluenceConfigType,
    )
    from mcp_atlassian.jira.config import JiraConfig as UserJiraConfigType

logger = logging.getLogger("mcp-atlassian.servers.dependencies")


def _create_user_config_for_fetcher(
    base_config: JiraConfig | ConfluenceConfig,
    auth_type: str,
    credentials: dict[str, Any],
    cloud_id: str | None = None,
) -> JiraConfig | ConfluenceConfig:
    """Create a user-specific configuration for Jira or Confluence fetchers.

    Args:
        base_config: The base JiraConfig or ConfluenceConfig to clone and modify.
        auth_type: The authentication type ('oauth' or 'pat').
        credentials: Dictionary of credentials (token, email, etc).
        cloud_id: Optional cloud ID to override the base config cloud ID.

    Returns:
        JiraConfig or ConfluenceConfig with user-specific credentials.

    Raises:
        ValueError: If required credentials are missing or auth_type is unsupported.
        TypeError: If base_config is not a supported type.
    """
    if auth_type not in ["oauth", "pat"]:
        raise ValueError(
            f"Unsupported auth_type '{auth_type}' for user-specific config creation. Expected 'oauth' or 'pat'."
        )

    username_for_config: str | None = credentials.get("user_email_context")

    logger.debug(
        f"Creating user config for fetcher. Auth type: {auth_type}, Credentials keys: {credentials.keys()}, Cloud ID: {cloud_id}"
    )

    common_args: dict[str, Any] = {
        "url": base_config.url,
        "auth_type": auth_type,
        "ssl_verify": base_config.ssl_verify,
        "http_proxy": base_config.http_proxy,
        "https_proxy": base_config.https_proxy,
        "no_proxy": base_config.no_proxy,
        "socks_proxy": base_config.socks_proxy,
    }

    if auth_type == "oauth":
        user_access_token = credentials.get("oauth_access_token")
        if not user_access_token:
            raise ValueError(
                "OAuth access token missing in credentials for user auth_type 'oauth'"
            )
        if (
            not base_config
            or not hasattr(base_config, "oauth_config")
            or not getattr(base_config, "oauth_config", None)
        ):
            raise ValueError(
                f"Global OAuth config for {type(base_config).__name__} is missing, "
                "but user auth_type is 'oauth'."
            )
        global_oauth_cfg = base_config.oauth_config

        # Use provided cloud_id or fall back to global config cloud_id
        effective_cloud_id = cloud_id if cloud_id else global_oauth_cfg.cloud_id
        if not effective_cloud_id:
            raise ValueError(
                "Cloud ID is required for OAuth authentication. "
                "Provide it via X-Atlassian-Cloud-Id header or configure it globally."
            )

        # For minimal OAuth config (user-provided tokens), use empty strings for client credentials
        oauth_config_for_user = OAuthConfig(
            client_id=global_oauth_cfg.client_id if global_oauth_cfg.client_id else "",
            client_secret=global_oauth_cfg.client_secret
            if global_oauth_cfg.client_secret
            else "",
            redirect_uri=global_oauth_cfg.redirect_uri
            if global_oauth_cfg.redirect_uri
            else "",
            scope=global_oauth_cfg.scope if global_oauth_cfg.scope else "",
            access_token=user_access_token,
            refresh_token=None,
            expires_at=None,
            cloud_id=effective_cloud_id,
        )
        common_args.update(
            {
                "username": username_for_config,
                "api_token": None,
                "personal_token": None,
                "oauth_config": oauth_config_for_user,
            }
        )
    elif auth_type == "pat":
        user_pat = credentials.get("personal_access_token")
        if not user_pat:
            raise ValueError("PAT missing in credentials for user auth_type 'pat'")

        # Log warning if cloud_id is provided with PAT auth (not typically needed)
        if cloud_id:
            logger.warning(
                f"Cloud ID '{cloud_id}' provided with PAT authentication. "
                "PAT authentication typically uses the base URL directly and doesn't require cloud_id override."
            )

        common_args.update(
            {
                "personal_token": user_pat,
                "oauth_config": None,
                "username": None,
                "api_token": None,
            }
        )

    if isinstance(base_config, JiraConfig):
        user_jira_config: UserJiraConfigType = dataclasses.replace(
            base_config, **common_args
        )
        user_jira_config.projects_filter = base_config.projects_filter
        return user_jira_config
    elif isinstance(base_config, ConfluenceConfig):
        user_confluence_config: UserConfluenceConfigType = dataclasses.replace(
            base_config, **common_args
        )
        user_confluence_config.spaces_filter = base_config.spaces_filter
        return user_confluence_config
    else:
        raise TypeError(f"Unsupported base_config type: {type(base_config)}")


async def get_jira_fetcher(ctx: Context) -> JiraFetcher:
    """Returns a JiraFetcher instance appropriate for the current request context.

    Args:
        ctx: The FastMCP context.

    Returns:
        JiraFetcher instance for the current user or global config.

    Raises:
        ValueError: If configuration or credentials are invalid.
    """
    logger.debug(f"get_jira_fetcher: ENTERED. Context ID: {id(ctx)}")
    try:
        request: Request = get_http_request()
        logger.debug(
            f"get_jira_fetcher: In HTTP request context. Request URL: {request.url}. "
            f"State.jira_fetcher exists: {hasattr(request.state, 'jira_fetcher') and request.state.jira_fetcher is not None}. "
            f"State.user_auth_type: {getattr(request.state, 'user_atlassian_auth_type', 'N/A')}. "
            f"State.user_token_present: {hasattr(request.state, 'user_atlassian_token') and request.state.user_atlassian_token is not None}."
        )
        # Use fetcher from request.state if already present
        if hasattr(request.state, "jira_fetcher") and request.state.jira_fetcher:
            logger.debug("get_jira_fetcher: Returning JiraFetcher from request.state.")
            return request.state.jira_fetcher
        user_auth_type = getattr(request.state, "user_atlassian_auth_type", None)
        logger.debug(f"get_jira_fetcher: User auth type: {user_auth_type}")
        # If OAuth or PAT token is present, create user-specific fetcher
        if user_auth_type in ["oauth", "pat"] and hasattr(
            request.state, "user_atlassian_token"
        ):
            user_token = getattr(request.state, "user_atlassian_token", None)
            user_email = getattr(
                request.state, "user_atlassian_email", None
            )  # May be None for PAT
            user_cloud_id = getattr(request.state, "user_atlassian_cloud_id", None)

            if not user_token:
                raise ValueError("User Atlassian token found in state but is empty.")
            credentials = {"user_email_context": user_email}
            if user_auth_type == "oauth":
                credentials["oauth_access_token"] = user_token
            elif user_auth_type == "pat":
                credentials["personal_access_token"] = user_token
            lifespan_ctx_dict = ctx.request_context.lifespan_context  # type: ignore
            app_lifespan_ctx: MainAppContext | None = (
                lifespan_ctx_dict.get("app_lifespan_context")
                if isinstance(lifespan_ctx_dict, dict)
                else None
            )
            if not app_lifespan_ctx or not app_lifespan_ctx.full_jira_config:
                raise ValueError(
                    "Jira global configuration (URL, SSL) is not available from lifespan context."
                )

            cloud_id_info = f" with cloudId {user_cloud_id}" if user_cloud_id else ""
            logger.info(
                f"Creating user-specific JiraFetcher (type: {user_auth_type}) for user {user_email or 'unknown'} (token ...{str(user_token)[-8:]}){cloud_id_info}"
            )
            user_specific_config = _create_user_config_for_fetcher(
                base_config=app_lifespan_ctx.full_jira_config,
                auth_type=user_auth_type,
                credentials=credentials,
                cloud_id=user_cloud_id,
            )
            try:
                user_jira_fetcher = JiraFetcher(config=user_specific_config)
                current_user_id = user_jira_fetcher.get_current_user_account_id()
                logger.debug(
                    f"get_jira_fetcher: Validated Jira token for user ID: {current_user_id}"
                )
                request.state.jira_fetcher = user_jira_fetcher
                return user_jira_fetcher
            except Exception as e:
                logger.error(
                    f"get_jira_fetcher: Failed to create/validate user-specific JiraFetcher: {e}",
                    exc_info=True,
                )
                raise ValueError(f"Invalid user Jira token or configuration: {e}")
        else:
            logger.debug(
                f"get_jira_fetcher: No user-specific JiraFetcher. Auth type: {user_auth_type}. Token present: {hasattr(request.state, 'user_atlassian_token')}. Will use global fallback."
            )
    except RuntimeError:
        logger.debug(
            "Not in an HTTP request context. Attempting global JiraFetcher for non-HTTP."
        )
    # Fallback to global fetcher if not in HTTP context or no user info
    lifespan_ctx_dict_global = ctx.request_context.lifespan_context  # type: ignore
    app_lifespan_ctx_global: MainAppContext | None = (
        lifespan_ctx_dict_global.get("app_lifespan_context")
        if isinstance(lifespan_ctx_dict_global, dict)
        else None
    )
    if app_lifespan_ctx_global and app_lifespan_ctx_global.full_jira_config:
        logger.debug(
            "get_jira_fetcher: Using global JiraFetcher from lifespan_context. "
            f"Global config auth_type: {app_lifespan_ctx_global.full_jira_config.auth_type}"
        )
        return JiraFetcher(config=app_lifespan_ctx_global.full_jira_config)
    logger.error("Jira configuration could not be resolved.")
    raise ValueError(
        "Jira client (fetcher) not available. Ensure server is configured correctly."
    )


async def get_confluence_fetcher(ctx: Context) -> ConfluenceFetcher:
    """Returns a ConfluenceFetcher instance appropriate for the current request context.

    Args:
        ctx: The FastMCP context.

    Returns:
        ConfluenceFetcher instance for the current user or global config.

    Raises:
        ValueError: If configuration or credentials are invalid.
    """
    logger.debug(f"get_confluence_fetcher: ENTERED. Context ID: {id(ctx)}")
    try:
        request: Request = get_http_request()
        logger.debug(
            f"get_confluence_fetcher: In HTTP request context. Request URL: {request.url}. "
            f"State.confluence_fetcher exists: {hasattr(request.state, 'confluence_fetcher') and request.state.confluence_fetcher is not None}. "
            f"State.user_auth_type: {getattr(request.state, 'user_atlassian_auth_type', 'N/A')}. "
            f"State.user_token_present: {hasattr(request.state, 'user_atlassian_token') and request.state.user_atlassian_token is not None}."
        )
        if (
            hasattr(request.state, "confluence_fetcher")
            and request.state.confluence_fetcher
        ):
            logger.debug(
                "get_confluence_fetcher: Returning ConfluenceFetcher from request.state."
            )
            return request.state.confluence_fetcher
        user_auth_type = getattr(request.state, "user_atlassian_auth_type", None)
        logger.debug(f"get_confluence_fetcher: User auth type: {user_auth_type}")
        if user_auth_type in ["oauth", "pat"] and hasattr(
            request.state, "user_atlassian_token"
        ):
            user_token = getattr(request.state, "user_atlassian_token", None)
            user_email = getattr(request.state, "user_atlassian_email", None)
            user_cloud_id = getattr(request.state, "user_atlassian_cloud_id", None)

            if not user_token:
                raise ValueError("User Atlassian token found in state but is empty.")
            credentials = {"user_email_context": user_email}
            if user_auth_type == "oauth":
                credentials["oauth_access_token"] = user_token
            elif user_auth_type == "pat":
                credentials["personal_access_token"] = user_token
            lifespan_ctx_dict = ctx.request_context.lifespan_context  # type: ignore
            app_lifespan_ctx: MainAppContext | None = (
                lifespan_ctx_dict.get("app_lifespan_context")
                if isinstance(lifespan_ctx_dict, dict)
                else None
            )
            if not app_lifespan_ctx or not app_lifespan_ctx.full_confluence_config:
                raise ValueError(
                    "Confluence global configuration (URL, SSL) is not available from lifespan context."
                )

            cloud_id_info = f" with cloudId {user_cloud_id}" if user_cloud_id else ""
            logger.info(
                f"Creating user-specific ConfluenceFetcher (type: {user_auth_type}) for user {user_email or 'unknown'} (token ...{str(user_token)[-8:]}){cloud_id_info}"
            )
            user_specific_config = _create_user_config_for_fetcher(
                base_config=app_lifespan_ctx.full_confluence_config,
                auth_type=user_auth_type,
                credentials=credentials,
                cloud_id=user_cloud_id,
            )
            try:
                user_confluence_fetcher = ConfluenceFetcher(config=user_specific_config)
                current_user_data = user_confluence_fetcher.get_current_user_info()
                # Try to get email from Confluence if not provided (can happen with PAT)
                derived_email = (
                    current_user_data.get("email")
                    if isinstance(current_user_data, dict)
                    else None
                )
                display_name = (
                    current_user_data.get("displayName")
                    if isinstance(current_user_data, dict)
                    else None
                )
                logger.debug(
                    f"get_confluence_fetcher: Validated Confluence token. User context: Email='{user_email or derived_email}', DisplayName='{display_name}'"
                )
                request.state.confluence_fetcher = user_confluence_fetcher
                if (
                    not user_email
                    and derived_email
                    and current_user_data
                    and isinstance(current_user_data, dict)
                    and current_user_data.get("email")
                ):
                    request.state.user_atlassian_email = current_user_data["email"]
                return user_confluence_fetcher
            except Exception as e:
                logger.error(
                    f"get_confluence_fetcher: Failed to create/validate user-specific ConfluenceFetcher: {e}"
                )
                raise ValueError(f"Invalid user Confluence token or configuration: {e}")
        else:
            logger.debug(
                f"get_confluence_fetcher: No user-specific ConfluenceFetcher. Auth type: {user_auth_type}. Token present: {hasattr(request.state, 'user_atlassian_token')}. Will use global fallback."
            )
    except RuntimeError:
        logger.debug(
            "Not in an HTTP request context. Attempting global ConfluenceFetcher for non-HTTP."
        )
    lifespan_ctx_dict_global = ctx.request_context.lifespan_context  # type: ignore
    app_lifespan_ctx_global: MainAppContext | None = (
        lifespan_ctx_dict_global.get("app_lifespan_context")
        if isinstance(lifespan_ctx_dict_global, dict)
        else None
    )
    if app_lifespan_ctx_global and app_lifespan_ctx_global.full_confluence_config:
        logger.debug(
            "get_confluence_fetcher: Using global ConfluenceFetcher from lifespan_context. "
            f"Global config auth_type: {app_lifespan_ctx_global.full_confluence_config.auth_type}"
        )
        return ConfluenceFetcher(config=app_lifespan_ctx_global.full_confluence_config)
    logger.error("Confluence configuration could not be resolved.")
    raise ValueError(
        "Confluence client (fetcher) not available. Ensure server is configured correctly."
    )

```

--------------------------------------------------------------------------------
/tests/unit/models/conftest.py:
--------------------------------------------------------------------------------

```python
"""
Test fixtures for model testing.

This module provides specialized fixtures for testing data models and API response
parsing. It integrates with the new factory system and provides efficient,
reusable fixtures for model validation and serialization testing.
"""

import os
from typing import Any

import pytest

from mcp_atlassian.utils.env import is_env_truthy
from tests.fixtures.confluence_mocks import (
    MOCK_COMMENTS_RESPONSE,
    MOCK_CQL_SEARCH_RESPONSE,
    MOCK_LABELS_RESPONSE,
    MOCK_PAGE_RESPONSE,
)

# Import mock data
from tests.fixtures.jira_mocks import (
    MOCK_JIRA_COMMENTS,
    MOCK_JIRA_ISSUE_RESPONSE,
    MOCK_JIRA_JQL_RESPONSE,
)
from tests.utils.factories import (
    ConfluencePageFactory,
    ErrorResponseFactory,
    JiraIssueFactory,
)

# ============================================================================
# Factory-Based Data Fixtures
# ============================================================================


@pytest.fixture
def make_jira_issue_data():
    """
    Factory fixture for creating Jira issue data for model testing.

    This provides more flexibility than static mock data and allows
    customization for different test scenarios.

    Returns:
        Callable: Function that creates Jira issue data

    Example:
        def test_jira_model(make_jira_issue_data):
            issue_data = make_jira_issue_data(
                key="MODEL-123",
                fields={"priority": {"name": "Critical"}}
            )
            model = JiraIssue.from_dict(issue_data)
            assert model.key == "MODEL-123"
    """
    return JiraIssueFactory.create


@pytest.fixture
def make_confluence_page_data():
    """
    Factory fixture for creating Confluence page data for model testing.

    Returns:
        Callable: Function that creates Confluence page data

    Example:
        def test_confluence_model(make_confluence_page_data):
            page_data = make_confluence_page_data(
                title="Model Test Page",
                space={"key": "MODEL"}
            )
            model = ConfluencePage.from_dict(page_data)
            assert model.title == "Model Test Page"
    """
    return ConfluencePageFactory.create


@pytest.fixture
def make_error_response_data():
    """
    Factory fixture for creating error response data for model testing.

    Returns:
        Callable: Function that creates error response data

    Example:
        def test_error_model(make_error_response_data):
            error_data = make_error_response_data(
                status_code=422,
                message="Validation Error"
            )
            model = ErrorResponse.from_dict(error_data)
            assert model.status == 422
    """
    return ErrorResponseFactory.create_api_error


# ============================================================================
# Compatibility Fixtures (using legacy mock data)
# ============================================================================


@pytest.fixture
def jira_issue_data() -> dict[str, Any]:
    """
    Return mock Jira issue data.

    Note: This fixture is maintained for backward compatibility.
    Consider using make_jira_issue_data for new tests.
    """
    return MOCK_JIRA_ISSUE_RESPONSE


@pytest.fixture
def jira_search_data() -> dict[str, Any]:
    """
    Return mock Jira search (JQL) results.

    Note: This fixture is maintained for backward compatibility.
    """
    return MOCK_JIRA_JQL_RESPONSE


@pytest.fixture
def jira_comments_data() -> dict[str, Any]:
    """
    Return mock Jira comments data.

    Note: This fixture is maintained for backward compatibility.
    """
    return MOCK_JIRA_COMMENTS


@pytest.fixture
def confluence_search_data() -> dict[str, Any]:
    """
    Return mock Confluence search (CQL) results.

    Note: This fixture is maintained for backward compatibility.
    """
    return MOCK_CQL_SEARCH_RESPONSE


@pytest.fixture
def confluence_page_data() -> dict[str, Any]:
    """
    Return mock Confluence page data.

    Note: This fixture is maintained for backward compatibility.
    Consider using make_confluence_page_data for new tests.
    """
    return MOCK_PAGE_RESPONSE


@pytest.fixture
def confluence_comments_data() -> dict[str, Any]:
    """
    Return mock Confluence comments data.

    Note: This fixture is maintained for backward compatibility.
    """
    return MOCK_COMMENTS_RESPONSE


@pytest.fixture
def confluence_labels_data() -> dict[str, Any]:
    """
    Return mock Confluence labels data.

    Note: This fixture is maintained for backward compatibility.
    """
    return MOCK_LABELS_RESPONSE


# ============================================================================
# Enhanced Model Test Data Fixtures
# ============================================================================


@pytest.fixture
def complete_jira_issue_data():
    """
    Fixture providing complete Jira issue data with all fields populated.

    This is useful for testing model serialization/deserialization with
    full field coverage.

    Returns:
        Dict[str, Any]: Complete Jira issue data
    """
    return JiraIssueFactory.create(
        key="COMPLETE-123",
        fields={
            "summary": "Complete Test Issue",
            "description": "This issue has all fields populated for testing",
            "issuetype": {"name": "Story", "id": "10001"},
            "status": {"name": "In Progress", "id": "3"},
            "priority": {"name": "High", "id": "2"},
            "assignee": {
                "displayName": "Test Assignee",
                "emailAddress": "[email protected]",
                "accountId": "assignee-account-id",
            },
            "reporter": {
                "displayName": "Test Reporter",
                "emailAddress": "[email protected]",
                "accountId": "reporter-account-id",
            },
            "labels": ["testing", "complete", "model"],
            "components": [{"name": "Frontend"}, {"name": "Backend"}],
            "fixVersions": [{"name": "v1.0.0"}, {"name": "v1.1.0"}],
            "created": "2023-01-01T12:00:00.000+0000",
            "updated": "2023-01-02T12:00:00.000+0000",
            "duedate": "2023-01-15",
            "timeestimate": 28800,  # 8 hours in seconds
            "timespent": 14400,  # 4 hours in seconds
            "timeoriginalestimate": 28800,
            "customfield_10012": 8.0,  # Story points
            "customfield_10010": "EPIC-123",  # Epic link
        },
    )


@pytest.fixture
def minimal_jira_issue_data():
    """
    Fixture providing minimal Jira issue data for edge case testing.

    This is useful for testing model behavior with minimal required fields.

    Returns:
        Dict[str, Any]: Minimal Jira issue data
    """
    return JiraIssueFactory.create_minimal("MINIMAL-123")


@pytest.fixture
def complete_confluence_page_data():
    """
    Fixture providing complete Confluence page data with all fields populated.

    Returns:
        Dict[str, Any]: Complete Confluence page data
    """
    return ConfluencePageFactory.create(
        page_id="complete123",
        title="Complete Test Page",
        type="page",
        status="current",
        space={"key": "COMPLETE", "name": "Complete Test Space", "type": "global"},
        body={
            "storage": {
                "value": "<h1>Complete Test Page</h1><p>This page has all fields populated.</p>",
                "representation": "storage",
            },
            "view": {
                "value": "<h1>Complete Test Page</h1><p>This page has all fields populated.</p>",
                "representation": "view",
            },
        },
        version={
            "number": 2,
            "when": "2023-01-02T12:00:00.000Z",
            "by": {"displayName": "Test User"},
            "message": "Updated with complete data",
        },
        metadata={
            "labels": {
                "results": [
                    {"name": "testing"},
                    {"name": "complete"},
                    {"name": "model"},
                ]
            }
        },
        ancestors=[{"id": "parent123", "title": "Parent Page"}],
        children={"page": {"results": [{"id": "child123", "title": "Child Page"}]}},
    )


# ============================================================================
# Validation and Edge Case Fixtures
# ============================================================================


@pytest.fixture
def invalid_jira_issue_data():
    """
    Fixture providing invalid Jira issue data for validation testing.

    Returns:
        List[Dict[str, Any]]: List of invalid issue data variations
    """
    return [
        {},  # Empty data
        {"key": None},  # Null key
        {"key": ""},  # Empty key
        {"key": "INVALID"},  # Missing fields
        {"key": "INVALID-123", "fields": None},  # Null fields
        {"key": "INVALID-123", "fields": {}},  # Empty fields
        {
            "key": "INVALID-123",
            "fields": {
                "status": "Invalid Status Format"  # Wrong status format
            },
        },
    ]


@pytest.fixture
def invalid_confluence_page_data():
    """
    Fixture providing invalid Confluence page data for validation testing.

    Returns:
        List[Dict[str, Any]]: List of invalid page data variations
    """
    return [
        {},  # Empty data
        {"id": None},  # Null ID
        {"id": ""},  # Empty ID
        {"id": "123"},  # Missing title
        {"id": "123", "title": None},  # Null title
        {"id": "123", "title": ""},  # Empty title
        {
            "id": "123",
            "title": "Test",
            "type": "invalid_type",  # Invalid content type
        },
    ]


# ============================================================================
# Model Serialization Test Fixtures
# ============================================================================


@pytest.fixture
def jira_model_serialization_cases():
    """
    Fixture providing test cases for Jira model serialization/deserialization.

    Returns:
        List[Dict[str, Any]]: Test cases with expected serialization results
    """
    return [
        {
            "name": "basic_issue",
            "input": JiraIssueFactory.create("SERIAL-1"),
            "expected_fields": ["key", "id", "self", "fields"],
        },
        {
            "name": "minimal_issue",
            "input": JiraIssueFactory.create_minimal("SERIAL-2"),
            "expected_fields": ["key", "fields"],
        },
        {
            "name": "issue_with_custom_fields",
            "input": JiraIssueFactory.create(
                "SERIAL-3", fields={"customfield_10012": 5.0}
            ),
            "expected_fields": ["key", "fields"],
            "expected_custom_fields": ["customfield_10012"],
        },
    ]


@pytest.fixture
def confluence_model_serialization_cases():
    """
    Fixture providing test cases for Confluence model serialization/deserialization.

    Returns:
        List[Dict[str, Any]]: Test cases with expected serialization results
    """
    return [
        {
            "name": "basic_page",
            "input": ConfluencePageFactory.create("serial123"),
            "expected_fields": ["id", "title", "type", "space", "body"],
        },
        {
            "name": "page_with_metadata",
            "input": ConfluencePageFactory.create(
                "serial456",
                version={"number": 2},
                metadata={"labels": {"results": [{"name": "test"}]}},
            ),
            "expected_fields": ["id", "title", "version", "metadata"],
        },
    ]


# ============================================================================
# Real Data Integration Fixtures
# ============================================================================


@pytest.fixture
def use_real_jira_data() -> bool:
    """
    Check if we should use real Jira data from the API.

    This will only return True if:
    1. The JIRA_URL, JIRA_USERNAME, and JIRA_API_TOKEN environment variables are set
    2. The USE_REAL_DATA environment variable is set to "true"

    Note: This fixture is maintained for backward compatibility.
    """
    required_vars = ["JIRA_URL", "JIRA_USERNAME", "JIRA_API_TOKEN"]
    if not all(os.environ.get(var) for var in required_vars):
        return False

    return is_env_truthy("USE_REAL_DATA")


@pytest.fixture
def use_real_confluence_data() -> bool:
    """
    Check if we should use real Confluence data from the API.

    This will only return True if:
    1. The CONFLUENCE_URL, CONFLUENCE_USERNAME, and CONFLUENCE_API_TOKEN environment variables are set
    2. The USE_REAL_DATA environment variable is set to "true"

    Note: This fixture is maintained for backward compatibility.
    """
    required_vars = ["CONFLUENCE_URL", "CONFLUENCE_USERNAME", "CONFLUENCE_API_TOKEN"]
    if not all(os.environ.get(var) for var in required_vars):
        return False

    return is_env_truthy("USE_REAL_DATA")


@pytest.fixture
def default_confluence_page_id() -> str:
    """
    Provides a default Confluence page ID to use for tests.

    Skips the test if CONFLUENCE_TEST_PAGE_ID environment variable is not set.

    Note: This fixture is maintained for backward compatibility.
    """
    page_id = os.environ.get("CONFLUENCE_TEST_PAGE_ID")
    if not page_id:
        pytest.skip("CONFLUENCE_TEST_PAGE_ID environment variable not set")
    return page_id


@pytest.fixture
def default_jira_issue_key() -> str:
    """
    Provides a default Jira issue key to use for tests.

    Skips the test if JIRA_TEST_ISSUE_KEY environment variable is not set.

    Note: This fixture is maintained for backward compatibility.
    """
    issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY")
    if not issue_key:
        pytest.skip("JIRA_TEST_ISSUE_KEY environment variable not set")
    return issue_key


# ============================================================================
# Model Performance Test Fixtures
# ============================================================================


@pytest.fixture
def large_jira_dataset():
    """
    Fixture providing a large dataset for performance testing.

    Returns:
        List[Dict[str, Any]]: Large list of Jira issues for performance tests
    """
    return [
        JiraIssueFactory.create(f"PERF-{i}")
        for i in range(1, 101)  # 100 issues
    ]


@pytest.fixture
def large_confluence_dataset():
    """
    Fixture providing a large dataset for performance testing.

    Returns:
        List[Dict[str, Any]]: Large list of Confluence pages for performance tests
    """
    return [
        ConfluencePageFactory.create(f"perf{i}", title=f"Performance Test Page {i}")
        for i in range(1, 101)  # 100 pages
    ]


# ============================================================================
# Model Composition Fixtures
# ============================================================================


@pytest.fixture
def model_test_suite():
    """
    Comprehensive test suite for model testing.

    This fixture provides a complete set of test data for thorough
    model validation, including edge cases and error conditions.

    Returns:
        Dict[str, Any]: Complete model test suite
    """

    # Define the factory functions once for reuse
    def get_complete_jira_data():
        return JiraIssueFactory.create(
            key="COMPLETE-123",
            fields={
                "summary": "Complete Test Issue",
                "description": "This issue has all fields populated for testing",
                "issuetype": {"name": "Story", "id": "10001"},
                "status": {"name": "In Progress", "id": "3"},
                "priority": {"name": "High", "id": "2"},
            },
        )

    def get_complete_confluence_data():
        return ConfluencePageFactory.create(
            page_id="complete123", title="Complete Test Page"
        )

    def get_invalid_jira_data():
        return [
            {},  # Empty data
            {"key": None},  # Null key
            {"key": ""},  # Empty key
        ]

    def get_invalid_confluence_data():
        return [
            {},  # Empty data
            {"id": None},  # Null ID
            {"id": ""},  # Empty ID
        ]

    return {
        "jira": {
            "valid": [
                JiraIssueFactory.create("SUITE-1"),
                JiraIssueFactory.create_minimal("SUITE-2"),
                get_complete_jira_data(),
            ],
            "invalid": get_invalid_jira_data(),
            "edge_cases": [
                JiraIssueFactory.create("EDGE-1", fields={}),
                JiraIssueFactory.create("EDGE-2", id="", self=""),
            ],
        },
        "confluence": {
            "valid": [
                ConfluencePageFactory.create("suite1"),
                ConfluencePageFactory.create("suite2", title="Suite Page 2"),
                get_complete_confluence_data(),
            ],
            "invalid": get_invalid_confluence_data(),
            "edge_cases": [
                ConfluencePageFactory.create("edge1", body={}),
                ConfluencePageFactory.create("edge2", space={}),
            ],
        },
        "errors": [
            ErrorResponseFactory.create_api_error(400, "Bad Request"),
            ErrorResponseFactory.create_api_error(404, "Not Found"),
            ErrorResponseFactory.create_auth_error(),
        ],
    }

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_client_oauth.py:
--------------------------------------------------------------------------------

```python
"""Tests for the ConfluenceClient with OAuth authentication."""

import os
from unittest.mock import PropertyMock, patch

import pytest

from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig, OAuthConfig


class TestConfluenceClientOAuth:
    """Tests for ConfluenceClient with OAuth authentication."""

    def test_init_with_oauth_config(self):
        """Test initializing the client with OAuth configuration."""
        # Create a mock OAuth config
        oauth_config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:confluence-space.summary write:confluence-content",
            cloud_id="test-cloud-id",
            access_token="test-access-token",
            refresh_token="test-refresh-token",
            expires_at=9999999999.0,  # Set a future expiry time
        )

        # Create a Confluence config with OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=oauth_config,
        )

        # Mock dependencies
        with (
            patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
            patch(
                "mcp_atlassian.confluence.client.configure_oauth_session"
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.confluence.client.configure_ssl_verification"
            ) as mock_configure_ssl,
            patch(
                "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
            ) as mock_preprocessor,
            patch.object(
                OAuthConfig,
                "is_token_expired",
                new_callable=PropertyMock,
                return_value=False,
            ) as mock_is_expired,
            patch.object(
                oauth_config, "ensure_valid_token", return_value=True
            ) as mock_ensure_valid,
        ):
            # Configure the mock to return success for OAuth configuration
            mock_configure_oauth.return_value = True

            # Initialize client
            client = ConfluenceClient(config=config)

            # Verify OAuth session configuration was called
            mock_configure_oauth.assert_called_once()

            # Verify Confluence was initialized with the expected parameters
            mock_confluence.assert_called_once()
            conf_kwargs = mock_confluence.call_args[1]
            assert (
                conf_kwargs["url"]
                == f"https://api.atlassian.com/ex/confluence/{oauth_config.cloud_id}"
            )
            assert "session" in conf_kwargs
            assert conf_kwargs["cloud"] is True

            # Verify SSL verification was configured
            mock_configure_ssl.assert_called_once()

            # Verify preprocessor was initialized
            assert client.preprocessor == mock_preprocessor.return_value

    def test_init_with_byo_access_token_oauth_config(self):
        """Test initializing the client with BYOAccessTokenOAuthConfig."""
        # Create a mock BYO OAuth config
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="test-byo-cloud-id",
            access_token="test-byo-access-token",
        )

        # Create a Confluence config with BYO OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Mock dependencies
        with (
            patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
            patch(
                "mcp_atlassian.confluence.client.configure_oauth_session"
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.confluence.client.configure_ssl_verification"
            ) as mock_configure_ssl,
            patch(
                "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
            ) as mock_preprocessor,
        ):
            # Configure the mock to return success for OAuth configuration
            mock_configure_oauth.return_value = True

            # Initialize client
            client = ConfluenceClient(config=config)

            # Verify OAuth session configuration was called
            mock_configure_oauth.assert_called_once()

            # Verify Confluence was initialized with the expected parameters
            mock_confluence.assert_called_once()
            conf_kwargs = mock_confluence.call_args[1]
            assert (
                conf_kwargs["url"]
                == f"https://api.atlassian.com/ex/confluence/{byo_oauth_config.cloud_id}"
            )
            assert "session" in conf_kwargs
            assert conf_kwargs["cloud"] is True

            # Verify SSL verification was configured
            mock_configure_ssl.assert_called_once()

            # Verify preprocessor was initialized
            assert client.preprocessor == mock_preprocessor.return_value

    def test_init_with_byo_oauth_missing_cloud_id(self):
        """Test initializing the client with BYO OAuth but missing cloud_id."""
        # Create a mock BYO OAuth config without cloud_id
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            access_token="test-byo-access-token",
            cloud_id="",  # Explicitly empty or None
        )

        # Create a Confluence config with BYO OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Verify error is raised by ConfluenceClient's validation
        with pytest.raises(
            ValueError, match="OAuth authentication requires a valid cloud_id"
        ):
            ConfluenceClient(config=config)

    def test_init_with_byo_oauth_failed_session_config(self):
        """Test initializing with BYO OAuth but failed session configuration."""
        # Create a mock BYO OAuth config
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="test-byo-cloud-id",
            access_token="test-byo-access-token",
        )

        # Create a Confluence config with BYO OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Mock dependencies with OAuth configuration failure
        with patch(
            "mcp_atlassian.confluence.client.configure_oauth_session"
        ) as mock_configure_oauth:
            # Configure the mock to return failure for OAuth configuration
            mock_configure_oauth.return_value = False

            # Verify error is raised
            with pytest.raises(
                MCPAtlassianAuthenticationError,
                match="Failed to configure OAuth session",
            ):
                ConfluenceClient(config=config)

    def test_init_with_byo_oauth_empty_token_failed_session_config(self):
        """Test init with BYO OAuth, empty token, and failed session config."""
        # Create a mock BYO OAuth config
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="test-byo-cloud-id",
            access_token="",  # Empty access token
        )

        # Create a Confluence config with BYO OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Mock dependencies with OAuth configuration failure
        # configure_oauth_session might not even be called if token is validated earlier,
        # but if it is, it should fail.
        # For now, assume client init will fail due to invalid config before session setup,
        # or session setup fails because token is invalid.
        # The ConfluenceClient itself should raise error due to invalid oauth_config
        with patch(
            "mcp_atlassian.confluence.client.configure_oauth_session"
        ) as mock_configure_oauth:
            mock_configure_oauth.return_value = False  # Assume it's called and fails
            with pytest.raises(
                MCPAtlassianAuthenticationError,  # Or ValueError depending on where empty token is caught
                # For consistency with Jira, let's assume MCPAtlassianAuthenticationError if session config is attempted
                match="Failed to configure OAuth session",  # This may change if validation is earlier
            ):
                ConfluenceClient(config=config)

    def test_init_with_oauth_missing_cloud_id(self):
        """Test initializing the client with OAuth but missing cloud_id."""
        # Create a mock OAuth config without cloud_id
        oauth_config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:confluence-space.summary write:confluence-content",
            # No cloud_id
            access_token="test-access-token",
        )

        # Create a Confluence config with OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=oauth_config,
        )

        # Verify error is raised
        with pytest.raises(
            ValueError, match="OAuth authentication requires a valid cloud_id"
        ):
            ConfluenceClient(config=config)

    def test_init_with_oauth_failed_session_config(self):
        """Test initializing the client with OAuth but failed session configuration."""
        # Create a mock OAuth config
        oauth_config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:confluence-space.summary write:confluence-content",
            cloud_id="test-cloud-id",
            access_token="test-access-token",
            refresh_token="test-refresh-token",
        )

        # Create a Confluence config with OAuth
        config = ConfluenceConfig(
            url="https://test.atlassian.net/wiki",
            auth_type="oauth",
            oauth_config=oauth_config,
        )

        # Mock dependencies with OAuth configuration failure
        with (
            patch(
                "mcp_atlassian.confluence.client.configure_oauth_session"
            ) as mock_configure_oauth,
            # Patch the methods directly on the instance, not the class
            patch.object(
                OAuthConfig,
                "is_token_expired",
                new_callable=PropertyMock,
                return_value=False,
            ) as mock_is_expired,
            patch.object(
                oauth_config, "ensure_valid_token", return_value=True
            ) as mock_ensure_valid,
        ):
            # Configure the mock to return failure for OAuth configuration
            mock_configure_oauth.return_value = False

            # Verify error is raised
            with pytest.raises(
                MCPAtlassianAuthenticationError,
                match="Failed to configure OAuth session",
            ):
                ConfluenceClient(config=config)

    def test_from_env_with_standard_oauth(self):
        """Test creating client from env vars with standard OAuth configuration."""
        # Mock environment variables - NO CONFLUENCE_AUTH_TYPE
        env_vars = {
            "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
            "CONFLUENCE_AUTH_TYPE": "oauth",
            "ATLASSIAN_OAUTH_CLIENT_ID": "env-client-id",
            "ATLASSIAN_OAUTH_CLIENT_SECRET": "env-client-secret",
            "ATLASSIAN_OAUTH_REDIRECT_URI": "https://example.com/callback",
            "ATLASSIAN_OAUTH_SCOPE": "read:confluence-space.summary",
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-cloud-id",
            "ATLASSIAN_OAUTH_ACCESS_TOKEN": "env-access-token",  # Needed by OAuthConfig
            "ATLASSIAN_OAUTH_REFRESH_TOKEN": "env-refresh-token",  # Needed by OAuthConfig
        }

        # Mock OAuthConfig instance
        mock_standard_oauth_config = OAuthConfig(
            client_id="env-client-id",
            client_secret="env-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:confluence-space.summary",
            cloud_id="env-cloud-id",
            access_token="env-access-token",
            refresh_token="env-refresh-token",
            expires_at=9999999999.0,
        )

        with (
            patch.dict(os.environ, env_vars, clear=True),  # Clear other env vars
            patch(
                "mcp_atlassian.confluence.config.get_oauth_config_from_env",  # Patch the correct utility
                return_value=mock_standard_oauth_config,
            ),
            patch.object(
                OAuthConfig,
                "is_token_expired",
                new_callable=PropertyMock,
                return_value=False,
            ) as mock_is_expired_env,
            patch.object(
                mock_standard_oauth_config, "ensure_valid_token", return_value=True
            ) as mock_ensure_valid_env,
            patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
            patch(
                "mcp_atlassian.confluence.client.configure_oauth_session",
                return_value=True,
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.confluence.client.configure_ssl_verification"
            ) as mock_configure_ssl,
        ):
            # Initialize client from environment
            client = ConfluenceClient()  # Calls ConfluenceConfig.from_env() internally

            # Verify client was initialized with OAuth
            assert client.config.auth_type == "oauth"
            assert client.config.oauth_config is mock_standard_oauth_config

            # Verify Confluence was initialized correctly
            mock_confluence.assert_called_once()
            conf_kwargs = mock_confluence.call_args[1]
            assert (
                conf_kwargs["url"]
                == f"https://api.atlassian.com/ex/confluence/{mock_standard_oauth_config.cloud_id}"
            )
            assert "session" in conf_kwargs
            assert conf_kwargs["cloud"] is True

            # Verify OAuth session was configured
            mock_configure_oauth.assert_called_once()

    def test_from_env_with_byo_token_oauth(self):
        """Test creating client from env vars with BYO token OAuth config."""
        env_vars = {
            "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
            "ATLASSIAN_OAUTH_ACCESS_TOKEN": "env-byo-access-token",
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-byo-cloud-id",
            # No other OAuth vars needed for BYO if get_oauth_config_from_env handles it
        }

        mock_byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="env-byo-cloud-id", access_token="env-byo-access-token"
        )

        with (
            patch.dict(os.environ, env_vars, clear=True),
            patch(
                "mcp_atlassian.confluence.config.get_oauth_config_from_env",
                return_value=mock_byo_oauth_config,
            ),
            patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
            patch(
                "mcp_atlassian.confluence.client.configure_oauth_session",
                return_value=True,
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.confluence.client.configure_ssl_verification"
            ) as mock_configure_ssl,
        ):
            client = ConfluenceClient()

            assert client.config.auth_type == "oauth"
            assert client.config.oauth_config is mock_byo_oauth_config
            mock_confluence.assert_called_once()
            conf_kwargs = mock_confluence.call_args[1]
            assert (
                conf_kwargs["url"]
                == f"https://api.atlassian.com/ex/confluence/{mock_byo_oauth_config.cloud_id}"
            )
            mock_configure_oauth.assert_called_once()

    def test_from_env_with_no_oauth_config_found(self):
        """Test client creation from env when no OAuth config is found by the utility."""
        env_vars = {
            "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
            # Deliberately missing other auth variables (basic, token, or complete OAuth)
        }

        with (
            patch.dict(os.environ, env_vars, clear=True),
            patch(
                "mcp_atlassian.confluence.config.get_oauth_config_from_env",
                return_value=None,  # Simulate no OAuth config found by the utility
            ),
        ):
            # ConfluenceConfig.from_env should raise ValueError if no auth can be determined
            with pytest.raises(
                ValueError,
                match="Cloud authentication requires CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN, or OAuth configuration",
            ):
                ConfluenceClient()  # This will call ConfluenceConfig.from_env()

```

--------------------------------------------------------------------------------
/tests/test_preprocessing.py:
--------------------------------------------------------------------------------

```python
import pytest

from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor
from mcp_atlassian.preprocessing.jira import JiraPreprocessor
from tests.fixtures.confluence_mocks import MOCK_COMMENTS_RESPONSE, MOCK_PAGE_RESPONSE
from tests.fixtures.jira_mocks import MOCK_JIRA_ISSUE_RESPONSE


class MockConfluenceClient:
    def get_user_details_by_accountid(self, account_id):
        # Mock user details response based on the format in MOCK_PAGE_RESPONSE
        return {
            "displayName": f"Test User {account_id}",
            "accountType": "atlassian",
            "accountStatus": "active",
        }


@pytest.fixture
def preprocessor_with_jira():
    return JiraPreprocessor(base_url="https://example.atlassian.net")


@pytest.fixture
def preprocessor_with_confluence():
    return ConfluencePreprocessor(base_url="https://example.atlassian.net")


def test_init():
    """Test JiraPreprocessor initialization."""
    processor = JiraPreprocessor("https://example.atlassian.net/")
    assert processor.base_url == "https://example.atlassian.net"


def test_process_confluence_page_content(preprocessor_with_confluence):
    """Test processing Confluence page content using mock data."""
    html_content = MOCK_PAGE_RESPONSE["body"]["storage"]["value"]
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html_content, confluence_client=MockConfluenceClient()
        )
    )

    # Verify user mention is processed
    assert "@Test User user123" in processed_markdown

    # Verify basic HTML elements are converted
    assert "Date" in processed_markdown
    assert "Goals" in processed_markdown
    assert "Example goal" in processed_markdown


def test_process_confluence_comment_content(preprocessor_with_confluence):
    """Test processing Confluence comment content using mock data."""
    html_content = MOCK_COMMENTS_RESPONSE["results"][0]["body"]["view"]["value"]
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html_content, confluence_client=MockConfluenceClient()
        )
    )

    assert "Comment content here" in processed_markdown


def test_clean_jira_issue_content(preprocessor_with_jira):
    """Test cleaning Jira issue content using mock data."""
    description = MOCK_JIRA_ISSUE_RESPONSE["fields"]["description"]
    cleaned_text = preprocessor_with_jira.clean_jira_text(description)

    assert "test issue description" in cleaned_text.lower()

    # Test comment cleaning
    comment = MOCK_JIRA_ISSUE_RESPONSE["fields"]["comment"]["comments"][0]["body"]
    cleaned_comment = preprocessor_with_jira.clean_jira_text(comment)

    assert "test comment" in cleaned_comment.lower()


def test_process_html_content_basic(preprocessor_with_confluence):
    """Test basic HTML content processing."""
    html = "<p>Simple text</p>"
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html, confluence_client=MockConfluenceClient()
        )
    )

    assert processed_html == "<p>Simple text</p>"
    assert processed_markdown.strip() == "Simple text"


def test_process_html_content_with_user_mentions(preprocessor_with_confluence):
    """Test HTML content processing with user mentions."""
    html = """
    <ac:link>
        <ri:user ri:account-id="123456"/>
    </ac:link>
    <p>Some text</p>
    """
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html, confluence_client=MockConfluenceClient()
        )
    )

    assert "@Test User 123456" in processed_html
    assert "@Test User 123456" in processed_markdown


def test_clean_jira_text_empty(preprocessor_with_jira):
    """Test cleaning empty Jira text."""
    assert preprocessor_with_jira.clean_jira_text("") == ""
    assert preprocessor_with_jira.clean_jira_text(None) == ""


def test_clean_jira_text_user_mentions(preprocessor_with_jira):
    """Test cleaning Jira text with user mentions."""
    text = "Hello [~accountid:123456]!"
    cleaned = preprocessor_with_jira.clean_jira_text(text)
    assert cleaned == "Hello User:123456!"


def test_clean_jira_text_smart_links(preprocessor_with_jira):
    """Test cleaning Jira text with smart links."""
    base_url = "https://example.atlassian.net"

    # Test Jira issue link
    text = f"[Issue|{base_url}/browse/PROJ-123|smart-link]"
    cleaned = preprocessor_with_jira.clean_jira_text(text)
    assert cleaned == f"[PROJ-123]({base_url}/browse/PROJ-123)"

    # Test Confluence page link from mock data
    confluence_url = (
        f"{base_url}/wiki/spaces/PROJ/pages/987654321/Example+Meeting+Notes"
    )
    processed_url = f"{base_url}/wiki/spaces/PROJ/pages/987654321/ExampleMeetingNotes"
    text = f"[Meeting Notes|{confluence_url}|smart-link]"
    cleaned = preprocessor_with_jira.clean_jira_text(text)
    assert cleaned == f"[Example Meeting Notes]({processed_url})"


def test_clean_jira_text_html_content(preprocessor_with_jira):
    """Test cleaning Jira text with HTML content."""
    text = "<p>This is <b>bold</b> text</p>"
    cleaned = preprocessor_with_jira.clean_jira_text(text)
    assert cleaned.strip() == "This is **bold** text"


def test_clean_jira_text_combined(preprocessor_with_jira):
    """Test cleaning Jira text with multiple elements."""
    base_url = "https://example.atlassian.net"
    text = f"""
    <p>Hello [~accountid:123456]!</p>
    <p>Check out [PROJ-123|{base_url}/browse/PROJ-123|smart-link]</p>
    """
    cleaned = preprocessor_with_jira.clean_jira_text(text)
    assert "Hello User:123456!" in cleaned
    assert f"[PROJ-123]({base_url}/browse/PROJ-123)" in cleaned


def test_process_html_content_error_handling(preprocessor_with_confluence):
    """Test error handling in process_html_content."""
    with pytest.raises(Exception):
        preprocessor_with_confluence.process_html_content(
            None, confluence_client=MockConfluenceClient()
        )


def test_clean_jira_text_with_invalid_html(preprocessor_with_jira):
    """Test cleaning Jira text with invalid HTML."""
    text = "<p>Unclosed paragraph with <b>bold</b"
    cleaned = preprocessor_with_jira.clean_jira_text(text)
    assert "Unclosed paragraph with **bold**" in cleaned


def test_process_mentions_error_handling(preprocessor_with_jira):
    """Test error handling in _process_mentions."""
    text = "[~accountid:invalid]"
    processed = preprocessor_with_jira._process_mentions(text, r"\[~accountid:(.*?)\]")
    assert "User:invalid" in processed


def test_jira_to_markdown(preprocessor_with_jira):
    """Test conversion of Jira markup to Markdown."""
    # Test headers
    assert preprocessor_with_jira.jira_to_markdown("h1. Heading 1") == "# Heading 1"
    assert preprocessor_with_jira.jira_to_markdown("h2. Heading 2") == "## Heading 2"

    # Test text formatting
    assert preprocessor_with_jira.jira_to_markdown("*bold text*") == "**bold text**"
    assert preprocessor_with_jira.jira_to_markdown("_italic text_") == "*italic text*"

    # Test code blocks
    assert preprocessor_with_jira.jira_to_markdown("{{code}}") == "`code`"

    # For multiline code blocks, check content is preserved rather than exact format
    converted_code_block = preprocessor_with_jira.jira_to_markdown(
        "{code}\nmultiline code\n{code}"
    )
    assert "```" in converted_code_block
    assert "multiline code" in converted_code_block

    # Test lists
    assert preprocessor_with_jira.jira_to_markdown("* Item 1") == "- Item 1"
    assert preprocessor_with_jira.jira_to_markdown("# Item 1") == "1. Item 1"

    # Test complex Jira markup
    complex_jira = """
h1. Project Overview

h2. Introduction
This project aims to *improve* the user experience.

h3. Features
* Feature 1
* Feature 2

h3. Code Example
{code:python}
def hello():
    print("Hello World")
{code}

For more information, see [our website|https://example.com].
"""

    converted = preprocessor_with_jira.jira_to_markdown(complex_jira)
    assert "# Project Overview" in converted
    assert "## Introduction" in converted
    assert "**improve**" in converted
    assert "- Feature 1" in converted
    assert "```python" in converted
    assert "[our website](https://example.com)" in converted


def test_markdown_to_jira(preprocessor_with_jira):
    """Test conversion of Markdown to Jira markup."""
    # Test headers
    assert preprocessor_with_jira.markdown_to_jira("# Heading 1") == "h1. Heading 1"
    assert preprocessor_with_jira.markdown_to_jira("## Heading 2") == "h2. Heading 2"

    # Test text formatting
    assert preprocessor_with_jira.markdown_to_jira("**bold text**") == "*bold text*"
    assert preprocessor_with_jira.markdown_to_jira("*italic text*") == "_italic text_"

    # Test code blocks
    assert preprocessor_with_jira.markdown_to_jira("`code`") == "{{code}}"

    # For multiline code blocks, check content is preserved rather than exact format
    converted_code_block = preprocessor_with_jira.markdown_to_jira(
        "```\nmultiline code\n```"
    )
    assert "{code}" in converted_code_block
    assert "multiline code" in converted_code_block

    # Test lists
    list_conversion = preprocessor_with_jira.markdown_to_jira("- Item 1")
    assert "* Item 1" in list_conversion

    numbered_list = preprocessor_with_jira.markdown_to_jira("1. Item 1")
    assert "Item 1" in numbered_list
    assert "1" in numbered_list

    # Test complex Markdown
    complex_markdown = """
# Project Overview

## Introduction
This project aims to **improve** the user experience.

### Features
- Feature 1
- Feature 2

### Code Example
```python
def hello():
    print("Hello World")
```

For more information, see [our website](https://example.com).
"""

    converted = preprocessor_with_jira.markdown_to_jira(complex_markdown)
    assert "h1. Project Overview" in converted
    assert "h2. Introduction" in converted
    assert "*improve*" in converted
    assert "* Feature 1" in converted
    assert "{code:python}" in converted
    assert "[our website|https://example.com]" in converted


def test_markdown_to_confluence_storage(preprocessor_with_confluence):
    """Test conversion of Markdown to Confluence storage format."""
    markdown = """# Heading 1

This is some **bold** and *italic* text.

- List item 1
- List item 2

[Link text](https://example.com)
"""

    # Convert markdown to storage format
    storage_format = preprocessor_with_confluence.markdown_to_confluence_storage(
        markdown
    )

    # Verify basic structure (we don't need to test the exact conversion, as that's handled by md2conf)
    assert "<h1>" in storage_format
    assert "Heading 1" in storage_format
    assert "<strong>" in storage_format or "<b>" in storage_format  # Bold
    assert "<em>" in storage_format or "<i>" in storage_format  # Italic
    assert "<a href=" in storage_format.lower()  # Link
    assert "example.com" in storage_format


def test_process_confluence_profile_macro(preprocessor_with_confluence):
    """Test processing Confluence User Profile Macro in page content."""
    html_content = MOCK_PAGE_RESPONSE["body"]["storage"]["value"]
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html_content, confluence_client=MockConfluenceClient()
        )
    )
    # Should replace macro with @Test User user123
    assert "@Test User user123" in processed_html
    assert "@Test User user123" in processed_markdown


def test_process_confluence_profile_macro_malformed(preprocessor_with_confluence):
    """Test processing malformed User Profile Macro (missing user param and ri:user)."""
    # Macro missing ac:parameter
    html_missing_param = '<ac:structured-macro ac:name="profile"></ac:structured-macro>'
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html_missing_param, confluence_client=MockConfluenceClient()
        )
    )
    assert "[User Profile Macro (Malformed)]" in processed_html
    assert "[User Profile Macro (Malformed)]" in processed_markdown

    # Macro with ac:parameter but missing ri:user
    html_missing_riuser = '<ac:structured-macro ac:name="profile"><ac:parameter ac:name="user"></ac:parameter></ac:structured-macro>'
    processed_html, processed_markdown = (
        preprocessor_with_confluence.process_html_content(
            html_missing_riuser, confluence_client=MockConfluenceClient()
        )
    )
    assert "[User Profile Macro (Malformed)]" in processed_html
    assert "[User Profile Macro (Malformed)]" in processed_markdown


def test_process_confluence_profile_macro_fallback():
    """Test fallback when confluence_client is None."""
    from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor

    html = (
        '<ac:structured-macro ac:name="profile">'
        '<ac:parameter ac:name="user">'
        '<ri:user ri:account-id="user999" />'
        "</ac:parameter>"
        "</ac:structured-macro>"
    )
    preprocessor = ConfluencePreprocessor(base_url="https://example.atlassian.net")
    processed_html, processed_markdown = preprocessor.process_html_content(
        html, confluence_client=None
    )
    assert "[User Profile: user999]" in processed_html
    assert "[User Profile: user999]" in processed_markdown


def test_process_user_profile_macro_multiple():
    """Test processing multiple User Profile Macros with account-id and userkey."""
    from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor

    html = (
        "<p>This page mentions a user via profile macro: "
        '<ac:structured-macro ac:name="profile" ac:schema-version="1">'
        '<ac:parameter ac:name="user">'
        '<ri:user ri:account-id="test-account-id-123" />'
        "</ac:parameter>"
        "</ac:structured-macro>. "
        "And another one: "
        '<ac:structured-macro ac:name="profile" ac:schema-version="1">'
        '<ac:parameter ac:name="user">'
        '<ri:user ri:userkey="test-userkey-456" />'
        "</ac:parameter>"
        "</ac:structured-macro>."
        "</p>"
    )

    class CustomMockConfluenceClient:
        def get_user_details_by_accountid(self, account_id):
            return (
                {"displayName": "Test User One"}
                if account_id == "test-account-id-123"
                else {}
            )

        def get_user_details_by_username(self, username):
            return (
                {"displayName": "Test User Two"}
                if username == "test-userkey-456"
                else {}
            )

    preprocessor = ConfluencePreprocessor(base_url="https://example.atlassian.net")
    processed_html, processed_markdown = preprocessor.process_html_content(
        html, confluence_client=CustomMockConfluenceClient()
    )
    assert "@Test User One" in processed_html
    assert "@Test User Two" in processed_html
    assert "@Test User One" in processed_markdown
    assert "@Test User Two" in processed_markdown


def test_markdown_to_confluence_no_automatic_anchors():
    """Test that heading_anchors=False prevents automatic anchor generation (regression for issue #488)."""
    from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor

    markdown_with_headings = """
# Main Title
Some content here.

## Subsection
More content.

### Deep Section
Final content.
"""

    preprocessor = ConfluencePreprocessor(base_url="https://example.atlassian.net")
    result = preprocessor.markdown_to_confluence_storage(markdown_with_headings)

    # Should not contain automatically generated anchor IDs
    assert 'id="main-title"' not in result.lower()
    assert 'id="subsection"' not in result.lower()
    assert 'id="deep-section"' not in result.lower()

    # Should still contain proper heading tags
    assert "<h1>Main Title</h1>" in result
    assert "<h2>Subsection</h2>" in result
    assert "<h3>Deep Section</h3>" in result


def test_markdown_to_confluence_style_preservation():
    """Test that styled content is preserved during conversion."""
    from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor

    markdown_with_styles = """
# Title with **bold** text

This paragraph has *italic* and **bold** text.

```python
def hello():
    return "world"
```

- Item with **bold**
- Item with *italic*

> Blockquote with **formatting**

[Link text](https://example.com) with description.
"""

    preprocessor = ConfluencePreprocessor(base_url="https://example.atlassian.net")
    result = preprocessor.markdown_to_confluence_storage(markdown_with_styles)

    # Check that formatting is preserved
    assert "<strong>bold</strong>" in result
    assert "<em>italic</em>" in result
    assert "<blockquote>" in result
    assert '<a href="https://example.com">Link text</a>' in result
    assert "ac:structured-macro" in result  # Code block macro
    assert 'ac:name="code"' in result
    assert "python" in result


def test_markdown_to_confluence_optional_anchor_generation():
    """Test that enable_heading_anchors parameter controls anchor generation."""
    from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor

    markdown_with_headings = """
# Main Title
Content here.

## Subsection
More content.
"""

    preprocessor = ConfluencePreprocessor(base_url="https://example.atlassian.net")

    # Test with anchors disabled (default)
    result_no_anchors = preprocessor.markdown_to_confluence_storage(
        markdown_with_headings
    )
    assert 'id="main-title"' not in result_no_anchors.lower()
    assert 'id="subsection"' not in result_no_anchors.lower()

    # Test with anchors enabled
    result_with_anchors = preprocessor.markdown_to_confluence_storage(
        markdown_with_headings, enable_heading_anchors=True
    )
    # When anchors are enabled, they should be present
    # Note: md2conf may use different anchor formats, so we check for presence of id attributes
    assert "<h1>" in result_with_anchors
    assert "<h2>" in result_with_anchors

```

--------------------------------------------------------------------------------
/tests/integration/test_real_api.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests with real Atlassian APIs.

These tests are skipped by default and only run with --integration --use-real-data flags.
They require proper environment configuration and will create/modify real data.
"""

import os
import time
import uuid

import pytest

from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from tests.utils.base import BaseAuthTest


@pytest.mark.integration
class TestRealJiraAPI(BaseAuthTest):
    """Real Jira API integration tests with cleanup."""

    @pytest.fixture(autouse=True)
    def skip_without_real_data(self, request):
        """Skip these tests unless --use-real-data is provided."""
        if not request.config.getoption("--use-real-data", default=False):
            pytest.skip("Real API tests only run with --use-real-data flag")

    @pytest.fixture
    def jira_client(self):
        """Create real Jira client from environment."""
        if not os.getenv("JIRA_URL"):
            pytest.skip("JIRA_URL not set in environment")

        config = JiraConfig.from_env()
        return JiraFetcher(config=config)

    @pytest.fixture
    def test_project_key(self):
        """Get test project key from environment."""
        key = os.getenv("JIRA_TEST_PROJECT_KEY", "TEST")
        return key

    @pytest.fixture
    def created_issues(self):
        """Track created issues for cleanup."""
        issues = []
        yield issues
        # Cleanup will be done in individual tests

    def test_complete_issue_lifecycle(
        self, jira_client, test_project_key, created_issues
    ):
        """Test create, update, transition, and delete issue lifecycle."""
        # Create unique summary to avoid conflicts
        unique_id = str(uuid.uuid4())[:8]
        summary = f"Integration Test Issue {unique_id}"

        # 1. Create issue
        issue_data = {
            "project": {"key": test_project_key},
            "summary": summary,
            "description": "This is an integration test issue that will be deleted",
            "issuetype": {"name": "Task"},
        }

        created_issue = jira_client.create_issue(**issue_data)
        created_issues.append(created_issue.key)

        assert created_issue.key.startswith(test_project_key)
        assert created_issue.fields.summary == summary

        # 2. Update issue
        update_data = {
            "summary": f"{summary} - Updated",
            "description": "Updated description",
        }

        updated_issue = jira_client.update_issue(
            issue_key=created_issue.key, **update_data
        )

        assert updated_issue.fields.summary == f"{summary} - Updated"

        # 3. Add comment
        comment = jira_client.add_comment(
            issue_key=created_issue.key, body="Test comment from integration test"
        )

        assert comment.body == "Test comment from integration test"

        # 4. Get available transitions
        transitions = jira_client.get_transitions(issue_key=created_issue.key)
        assert len(transitions) > 0

        # 5. Transition issue (if "Done" transition available)
        done_transition = next(
            (t for t in transitions if "done" in t.name.lower()), None
        )
        if done_transition:
            jira_client.transition_issue(
                issue_key=created_issue.key, transition_id=done_transition.id
            )

        # 6. Delete issue
        jira_client.delete_issue(issue_key=created_issue.key)
        created_issues.remove(created_issue.key)

        # Verify deletion
        with pytest.raises(Exception):
            jira_client.get_issue(issue_key=created_issue.key)

    def test_attachment_upload_download(
        self, jira_client, test_project_key, created_issues, tmp_path
    ):
        """Test attachment upload and download flow."""
        # Create test issue
        unique_id = str(uuid.uuid4())[:8]
        issue_data = {
            "project": {"key": test_project_key},
            "summary": f"Attachment Test {unique_id}",
            "issuetype": {"name": "Task"},
        }

        issue = jira_client.create_issue(**issue_data)
        created_issues.append(issue.key)

        try:
            # Create test file
            test_file = tmp_path / "test_attachment.txt"
            test_content = f"Test content {unique_id}"
            test_file.write_text(test_content)

            # Upload attachment
            with open(test_file, "rb") as f:
                attachments = jira_client.add_attachment(
                    issue_key=issue.key, filename="test_attachment.txt", data=f.read()
                )

            assert len(attachments) == 1
            attachment = attachments[0]
            assert attachment.filename == "test_attachment.txt"

            # Get issue with attachments
            issue_with_attachments = jira_client.get_issue(
                issue_key=issue.key, expand="attachment"
            )

            assert len(issue_with_attachments.fields.attachment) == 1

        finally:
            # Cleanup
            jira_client.delete_issue(issue_key=issue.key)
            created_issues.remove(issue.key)

    def test_jql_search_with_pagination(self, jira_client, test_project_key):
        """Test JQL search with pagination."""
        # Search for recent issues in test project
        jql = f"project = {test_project_key} ORDER BY created DESC"

        # First page
        results_page1 = jira_client.search_issues(jql=jql, start_at=0, max_results=2)

        assert results_page1.total >= 0

        if results_page1.total > 2:
            # Second page
            results_page2 = jira_client.search_issues(
                jql=jql, start_at=2, max_results=2
            )

            # Ensure different issues
            page1_keys = [i.key for i in results_page1.issues]
            page2_keys = [i.key for i in results_page2.issues]
            assert not set(page1_keys).intersection(set(page2_keys))

    def test_bulk_issue_creation(self, jira_client, test_project_key, created_issues):
        """Test creating multiple issues in bulk."""
        unique_id = str(uuid.uuid4())[:8]
        issues_data = []

        # Prepare 3 issues
        for i in range(3):
            issues_data.append(
                {
                    "project": {"key": test_project_key},
                    "summary": f"Bulk Test Issue {i + 1} - {unique_id}",
                    "issuetype": {"name": "Task"},
                }
            )

        # Create issues
        created = []
        try:
            for issue_data in issues_data:
                issue = jira_client.create_issue(**issue_data)
                created.append(issue)
                created_issues.append(issue.key)

            assert len(created) == 3

            # Verify all created
            for i, issue in enumerate(created):
                assert f"Bulk Test Issue {i + 1}" in issue.fields.summary

        finally:
            # Cleanup all created issues
            for issue in created:
                try:
                    jira_client.delete_issue(issue_key=issue.key)
                    created_issues.remove(issue.key)
                except Exception:
                    pass

    def test_rate_limiting_behavior(self, jira_client):
        """Test API rate limiting behavior with retries."""
        # Make multiple rapid requests
        start_time = time.time()

        for _i in range(5):
            try:
                jira_client.get_fields()
            except Exception as e:
                if "429" in str(e) or "rate limit" in str(e).lower():
                    # Rate limit hit - this is expected
                    assert True
                    return

        # If no rate limit hit, that's also fine
        elapsed = time.time() - start_time
        assert elapsed < 10  # Should complete quickly if no rate limiting


@pytest.mark.integration
class TestRealConfluenceAPI(BaseAuthTest):
    """Real Confluence API integration tests with cleanup."""

    @pytest.fixture(autouse=True)
    def skip_without_real_data(self, request):
        """Skip these tests unless --use-real-data is provided."""
        if not request.config.getoption("--use-real-data", default=False):
            pytest.skip("Real API tests only run with --use-real-data flag")

    @pytest.fixture
    def confluence_client(self):
        """Create real Confluence client from environment."""
        if not os.getenv("CONFLUENCE_URL"):
            pytest.skip("CONFLUENCE_URL not set in environment")

        config = ConfluenceConfig.from_env()
        return ConfluenceFetcher(config=config)

    @pytest.fixture
    def test_space_key(self):
        """Get test space key from environment."""
        key = os.getenv("CONFLUENCE_TEST_SPACE_KEY", "TEST")
        return key

    @pytest.fixture
    def created_pages(self):
        """Track created pages for cleanup."""
        pages = []
        yield pages
        # Cleanup will be done in individual tests

    def test_page_lifecycle(self, confluence_client, test_space_key, created_pages):
        """Test create, update, and delete page lifecycle."""
        unique_id = str(uuid.uuid4())[:8]
        title = f"Integration Test Page {unique_id}"

        # 1. Create page
        page = confluence_client.create_page(
            space_key=test_space_key,
            title=title,
            body="<p>This is an integration test page</p>",
        )
        created_pages.append(page.id)

        assert page.title == title
        assert page.space.key == test_space_key

        # 2. Update page
        updated_page = confluence_client.update_page(
            page_id=page.id,
            title=f"{title} - Updated",
            body="<p>Updated content</p>",
            version_number=page.version.number + 1,
        )

        assert updated_page.title == f"{title} - Updated"
        assert updated_page.version.number == page.version.number + 1

        # 3. Add comment
        comment = confluence_client.add_comment(
            page_id=page.id, body="Test comment from integration test"
        )

        assert "Test comment" in comment.body.storage.value

        # 4. Delete page
        confluence_client.delete_page(page_id=page.id)
        created_pages.remove(page.id)

        # Verify deletion
        with pytest.raises(Exception):
            confluence_client.get_page_by_id(page_id=page.id)

    def test_page_hierarchy(self, confluence_client, test_space_key, created_pages):
        """Test creating page hierarchy with parent-child relationships."""
        unique_id = str(uuid.uuid4())[:8]

        # Create parent page
        parent = confluence_client.create_page(
            space_key=test_space_key,
            title=f"Parent Page {unique_id}",
            body="<p>Parent content</p>",
        )
        created_pages.append(parent.id)

        try:
            # Create child page
            child = confluence_client.create_page(
                space_key=test_space_key,
                title=f"Child Page {unique_id}",
                body="<p>Child content</p>",
                parent_id=parent.id,
            )
            created_pages.append(child.id)

            # Get child pages
            children = confluence_client.get_page_children(
                page_id=parent.id, expand="body.storage"
            )

            assert len(children.results) == 1
            assert children.results[0].id == child.id

            # Delete child first, then parent
            confluence_client.delete_page(page_id=child.id)
            created_pages.remove(child.id)

        finally:
            # Cleanup parent
            confluence_client.delete_page(page_id=parent.id)
            created_pages.remove(parent.id)

    def test_cql_search(self, confluence_client, test_space_key):
        """Test CQL search functionality."""
        # Search for pages in test space
        cql = f'space = "{test_space_key}" and type = "page"'

        results = confluence_client.search_content(cql=cql, limit=5)

        assert results.size >= 0

        # Verify all results are from test space
        for result in results.results:
            if hasattr(result, "space"):
                assert result.space.key == test_space_key

    def test_attachment_handling(
        self, confluence_client, test_space_key, created_pages, tmp_path
    ):
        """Test attachment upload to Confluence page."""
        unique_id = str(uuid.uuid4())[:8]

        # Create page
        page = confluence_client.create_page(
            space_key=test_space_key,
            title=f"Attachment Test Page {unique_id}",
            body="<p>Page with attachments</p>",
        )
        created_pages.append(page.id)

        try:
            # Create test file
            test_file = tmp_path / "confluence_test.txt"
            test_content = f"Confluence test content {unique_id}"
            test_file.write_text(test_content)

            # Upload attachment
            with open(test_file, "rb") as f:
                attachment = confluence_client.create_attachment(
                    page_id=page.id, filename="confluence_test.txt", data=f.read()
                )

            assert attachment.title == "confluence_test.txt"

            # Get page attachments
            attachments = confluence_client.get_attachments(page_id=page.id)
            assert len(attachments.results) == 1
            assert attachments.results[0].title == "confluence_test.txt"

        finally:
            # Cleanup
            confluence_client.delete_page(page_id=page.id)
            created_pages.remove(page.id)

    def test_large_content_handling(
        self, confluence_client, test_space_key, created_pages
    ):
        """Test handling of large content (>1MB)."""
        unique_id = str(uuid.uuid4())[:8]

        # Create large content (approximately 1MB)
        large_content = "<p>" + ("Large content block. " * 10000) + "</p>"

        # Create page with large content
        page = confluence_client.create_page(
            space_key=test_space_key,
            title=f"Large Content Test {unique_id}",
            body=large_content,
        )
        created_pages.append(page.id)

        try:
            # Retrieve and verify
            retrieved = confluence_client.get_page_by_id(
                page_id=page.id, expand="body.storage"
            )

            assert len(retrieved.body.storage.value) > 100000  # At least 100KB

        finally:
            # Cleanup
            confluence_client.delete_page(page_id=page.id)
            created_pages.remove(page.id)


@pytest.mark.integration
class TestCrossServiceIntegration:
    """Test integration between Jira and Confluence services."""

    @pytest.fixture(autouse=True)
    def skip_without_real_data(self, request):
        """Skip these tests unless --use-real-data is provided."""
        if not request.config.getoption("--use-real-data", default=False):
            pytest.skip("Real API tests only run with --use-real-data flag")

    @pytest.fixture
    def jira_client(self):
        """Create real Jira client from environment."""
        if not os.getenv("JIRA_URL"):
            pytest.skip("JIRA_URL not set in environment")

        config = JiraConfig.from_env()
        return JiraFetcher(config=config)

    @pytest.fixture
    def confluence_client(self):
        """Create real Confluence client from environment."""
        if not os.getenv("CONFLUENCE_URL"):
            pytest.skip("CONFLUENCE_URL not set in environment")

        config = ConfluenceConfig.from_env()
        return ConfluenceFetcher(config=config)

    @pytest.fixture
    def test_project_key(self):
        """Get test project key from environment."""
        return os.getenv("JIRA_TEST_PROJECT_KEY", "TEST")

    @pytest.fixture
    def test_space_key(self):
        """Get test space key from environment."""
        return os.getenv("CONFLUENCE_TEST_SPACE_KEY", "TEST")

    @pytest.fixture
    def created_issues(self):
        """Track created issues for cleanup."""
        issues = []
        yield issues

    @pytest.fixture
    def created_pages(self):
        """Track created pages for cleanup."""
        pages = []
        yield pages

    def test_jira_confluence_linking(
        self,
        jira_client,
        confluence_client,
        test_project_key,
        test_space_key,
        created_issues,
        created_pages,
    ):
        """Test linking between Jira issues and Confluence pages."""
        unique_id = str(uuid.uuid4())[:8]

        # Create Jira issue
        issue = jira_client.create_issue(
            project={"key": test_project_key},
            summary=f"Linked Issue {unique_id}",
            issuetype={"name": "Task"},
        )
        created_issues.append(issue.key)

        # Create Confluence page with Jira issue link
        page_content = f'<p>Related to Jira issue: <a href="{jira_client.config.url}/browse/{issue.key}">{issue.key}</a></p>'

        page = confluence_client.create_page(
            space_key=test_space_key,
            title=f"Linked Page {unique_id}",
            body=page_content,
        )
        created_pages.append(page.id)

        try:
            # Add comment in Jira referencing Confluence page
            confluence_url = (
                f"{confluence_client.config.url}/pages/viewpage.action?pageId={page.id}"
            )
            jira_client.add_comment(
                issue_key=issue.key,
                body=f"Documentation available at: {confluence_url}",
            )

            # Verify both exist and contain cross-references
            issue_comments = jira_client.get_comments(issue_key=issue.key)
            assert any(confluence_url in c.body for c in issue_comments.comments)

            retrieved_page = confluence_client.get_page_by_id(
                page_id=page.id, expand="body.storage"
            )
            assert issue.key in retrieved_page.body.storage.value

        finally:
            # Cleanup
            jira_client.delete_issue(issue_key=issue.key)
            created_issues.remove(issue.key)
            confluence_client.delete_page(page_id=page.id)
            created_pages.remove(page.id)

```

--------------------------------------------------------------------------------
/tests/unit/servers/test_confluence_server.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the Confluence FastMCP server."""

import json
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from fastmcp import Client, FastMCP
from fastmcp.client import FastMCPTransport
from starlette.requests import Request

from src.mcp_atlassian.confluence import ConfluenceFetcher
from src.mcp_atlassian.confluence.config import ConfluenceConfig
from src.mcp_atlassian.models.confluence.page import ConfluencePage
from src.mcp_atlassian.servers.context import MainAppContext
from src.mcp_atlassian.servers.main import AtlassianMCP
from src.mcp_atlassian.utils.oauth import OAuthConfig

logger = logging.getLogger(__name__)


@pytest.fixture
def mock_confluence_fetcher():
    """Create a mocked ConfluenceFetcher instance for testing."""
    mock_fetcher = MagicMock(spec=ConfluenceFetcher)

    # Mock page for various methods
    mock_page = MagicMock(spec=ConfluencePage)
    mock_page.to_simplified_dict.return_value = {
        "id": "123456",
        "title": "Test Page Mock Title",
        "url": "https://example.atlassian.net/wiki/spaces/TEST/pages/123456/Test+Page",
        "content": {
            "value": "This is a test page content in Markdown",
            "format": "markdown",
        },
    }
    mock_page.content = "This is a test page content in Markdown"

    # Set up mock responses for each method
    mock_fetcher.search.return_value = [mock_page]
    mock_fetcher.get_page_content.return_value = mock_page
    mock_fetcher.get_page_children.return_value = [mock_page]
    mock_fetcher.create_page.return_value = mock_page
    mock_fetcher.update_page.return_value = mock_page
    mock_fetcher.delete_page.return_value = True

    # Mock comment
    mock_comment = MagicMock()
    mock_comment.to_simplified_dict.return_value = {
        "id": "789",
        "author": "Test User",
        "created": "2023-08-01T12:00:00.000Z",
        "body": "This is a test comment",
    }
    mock_fetcher.get_page_comments.return_value = [mock_comment]

    # Mock label
    mock_label = MagicMock()
    mock_label.to_simplified_dict.return_value = {"id": "lbl1", "name": "test-label"}
    mock_fetcher.get_page_labels.return_value = [mock_label]
    mock_fetcher.add_page_label.return_value = [mock_label]

    # Mock add_comment method
    mock_comment = MagicMock()
    mock_comment.to_simplified_dict.return_value = {
        "id": "987",
        "author": "Test User",
        "created": "2023-08-01T13:00:00.000Z",
        "body": "This is a test comment added via API",
    }
    mock_fetcher.add_comment.return_value = mock_comment

    # Mock search_user method
    mock_user_search_result = MagicMock()
    mock_user_search_result.to_simplified_dict.return_value = {
        "entity_type": "user",
        "title": "First Last",
        "score": 0.0,
        "user": {
            "account_id": "a031248587011jasoidf9832jd8j1",
            "display_name": "First Last",
            "email": "[email protected]",
            "profile_picture": "/wiki/aa-avatar/a031248587011jasoidf9832jd8j1",
            "is_active": True,
        },
        "url": "/people/a031248587011jasoidf9832jd8j1",
        "last_modified": "2025-06-02T13:35:59.680Z",
        "excerpt": "",
    }
    mock_fetcher.search_user.return_value = [mock_user_search_result]

    return mock_fetcher


@pytest.fixture
def mock_base_confluence_config():
    """Create a mock base ConfluenceConfig for MainAppContext using OAuth for multi-user scenario."""
    mock_oauth_config = OAuthConfig(
        client_id="server_client_id",
        client_secret="server_client_secret",
        redirect_uri="http://localhost",
        scope="read:confluence",
        cloud_id="mock_cloud_id",
    )
    return ConfluenceConfig(
        url="https://mock.atlassian.net/wiki",
        auth_type="oauth",
        oauth_config=mock_oauth_config,
    )


@pytest.fixture
def test_confluence_mcp(mock_confluence_fetcher, mock_base_confluence_config):
    """Create a test FastMCP instance with standard configuration."""

    # Import and register tool functions (as they are in confluence.py)
    from src.mcp_atlassian.servers.confluence import (
        add_comment,
        add_label,
        create_page,
        delete_page,
        get_comments,
        get_labels,
        get_page,
        get_page_children,
        search,
        search_user,
        update_page,
    )

    @asynccontextmanager
    async def test_lifespan(app: FastMCP) -> AsyncGenerator[MainAppContext, None]:
        try:
            yield MainAppContext(
                full_confluence_config=mock_base_confluence_config, read_only=False
            )
        finally:
            pass

    test_mcp = AtlassianMCP(
        "TestConfluence",
        description="Test Confluence MCP Server",
        lifespan=test_lifespan,
    )

    # Create and configure the sub-MCP for Confluence tools
    confluence_sub_mcp = FastMCP(name="TestConfluenceSubMCP")
    confluence_sub_mcp.tool()(search)
    confluence_sub_mcp.tool()(get_page)
    confluence_sub_mcp.tool()(get_page_children)
    confluence_sub_mcp.tool()(get_comments)
    confluence_sub_mcp.tool()(add_comment)
    confluence_sub_mcp.tool()(get_labels)
    confluence_sub_mcp.tool()(add_label)
    confluence_sub_mcp.tool()(create_page)
    confluence_sub_mcp.tool()(update_page)
    confluence_sub_mcp.tool()(delete_page)
    confluence_sub_mcp.tool()(search_user)

    test_mcp.mount("confluence", confluence_sub_mcp)

    return test_mcp


@pytest.fixture
def no_fetcher_test_confluence_mcp(mock_base_confluence_config):
    """Create a test FastMCP instance that simulates missing Confluence fetcher."""

    # Import and register tool functions (as they are in confluence.py)
    from src.mcp_atlassian.servers.confluence import (
        add_comment,
        add_label,
        create_page,
        delete_page,
        get_comments,
        get_labels,
        get_page,
        get_page_children,
        search,
        search_user,
        update_page,
    )

    @asynccontextmanager
    async def no_fetcher_test_lifespan(
        app: FastMCP,
    ) -> AsyncGenerator[MainAppContext, None]:
        try:
            yield MainAppContext(
                full_confluence_config=mock_base_confluence_config, read_only=False
            )
        finally:
            pass

    test_mcp = AtlassianMCP(
        "NoFetcherTestConfluence",
        description="No Fetcher Test Confluence MCP Server",
        lifespan=no_fetcher_test_lifespan,
    )

    # Create and configure the sub-MCP for Confluence tools
    confluence_sub_mcp = FastMCP(name="NoFetcherTestConfluenceSubMCP")
    confluence_sub_mcp.tool()(search)
    confluence_sub_mcp.tool()(get_page)
    confluence_sub_mcp.tool()(get_page_children)
    confluence_sub_mcp.tool()(get_comments)
    confluence_sub_mcp.tool()(add_comment)
    confluence_sub_mcp.tool()(get_labels)
    confluence_sub_mcp.tool()(add_label)
    confluence_sub_mcp.tool()(create_page)
    confluence_sub_mcp.tool()(update_page)
    confluence_sub_mcp.tool()(delete_page)
    confluence_sub_mcp.tool()(search_user)

    test_mcp.mount("confluence", confluence_sub_mcp)

    return test_mcp


@pytest.fixture
def mock_request():
    """Provides a mock Starlette Request object with a state."""
    request = MagicMock(spec=Request)
    request.state = MagicMock()
    return request


@pytest.fixture
async def client(test_confluence_mcp, mock_confluence_fetcher):
    """Create a FastMCP client with mocked Confluence fetcher and request state."""
    with (
        patch(
            "src.mcp_atlassian.servers.confluence.get_confluence_fetcher",
            AsyncMock(return_value=mock_confluence_fetcher),
        ),
        patch(
            "src.mcp_atlassian.servers.dependencies.get_http_request",
            MagicMock(spec=Request, state=MagicMock()),
        ),
    ):
        client_instance = Client(transport=FastMCPTransport(test_confluence_mcp))
        async with client_instance as connected_client:
            yield connected_client


@pytest.fixture
async def no_fetcher_client_fixture(no_fetcher_test_confluence_mcp, mock_request):
    """Create a client that simulates missing Confluence fetcher configuration."""
    client_for_no_fetcher_test = Client(
        transport=FastMCPTransport(no_fetcher_test_confluence_mcp)
    )
    async with client_for_no_fetcher_test as connected_client_for_no_fetcher:
        yield connected_client_for_no_fetcher


@pytest.mark.anyio
async def test_search(client, mock_confluence_fetcher):
    """Test the search tool with basic query."""
    response = await client.call_tool("confluence_search", {"query": "test search"})

    mock_confluence_fetcher.search.assert_called_once()
    args, kwargs = mock_confluence_fetcher.search.call_args
    assert 'siteSearch ~ "test search"' in args[0]
    assert kwargs.get("limit") == 10
    assert kwargs.get("spaces_filter") is None

    result_data = json.loads(response[0].text)
    assert isinstance(result_data, list)
    assert len(result_data) > 0
    assert result_data[0]["title"] == "Test Page Mock Title"


@pytest.mark.anyio
async def test_get_page(client, mock_confluence_fetcher):
    """Test the get_page tool with default parameters."""
    response = await client.call_tool("confluence_get_page", {"page_id": "123456"})

    mock_confluence_fetcher.get_page_content.assert_called_once_with(
        "123456", convert_to_markdown=True
    )

    result_data = json.loads(response[0].text)
    assert "metadata" in result_data
    assert result_data["metadata"]["title"] == "Test Page Mock Title"
    assert "content" in result_data["metadata"]
    assert "value" in result_data["metadata"]["content"]
    assert "This is a test page content" in result_data["metadata"]["content"]["value"]


@pytest.mark.anyio
async def test_get_page_no_metadata(client, mock_confluence_fetcher):
    """Test get_page with metadata disabled."""
    response = await client.call_tool(
        "confluence_get_page", {"page_id": "123456", "include_metadata": False}
    )

    mock_confluence_fetcher.get_page_content.assert_called_once_with(
        "123456", convert_to_markdown=True
    )

    result_data = json.loads(response[0].text)
    assert "metadata" not in result_data
    assert "content" in result_data
    assert "This is a test page content" in result_data["content"]["value"]


@pytest.mark.anyio
async def test_get_page_no_markdown(client, mock_confluence_fetcher):
    """Test get_page with HTML content format."""
    mock_page_html = MagicMock(spec=ConfluencePage)
    mock_page_html.to_simplified_dict.return_value = {
        "id": "123456",
        "title": "Test Page HTML",
        "url": "https://example.com/html",
        "content": "<p>HTML Content</p>",
        "content_format": "storage",
    }
    mock_page_html.content = "<p>HTML Content</p>"
    mock_page_html.content_format = "storage"

    mock_confluence_fetcher.get_page_content.return_value = mock_page_html

    response = await client.call_tool(
        "confluence_get_page", {"page_id": "123456", "convert_to_markdown": False}
    )

    mock_confluence_fetcher.get_page_content.assert_called_once_with(
        "123456", convert_to_markdown=False
    )

    result_data = json.loads(response[0].text)
    assert "metadata" in result_data
    assert result_data["metadata"]["title"] == "Test Page HTML"
    assert result_data["metadata"]["content"] == "<p>HTML Content</p>"
    assert result_data["metadata"]["content_format"] == "storage"


@pytest.mark.anyio
async def test_get_page_children(client, mock_confluence_fetcher):
    """Test the get_page_children tool."""
    response = await client.call_tool(
        "confluence_get_page_children", {"parent_id": "123456"}
    )

    mock_confluence_fetcher.get_page_children.assert_called_once()
    call_kwargs = mock_confluence_fetcher.get_page_children.call_args.kwargs
    assert call_kwargs["page_id"] == "123456"
    assert call_kwargs.get("start") == 0
    assert call_kwargs.get("limit") == 25
    assert call_kwargs.get("expand") == "version"

    result_data = json.loads(response[0].text)
    assert "parent_id" in result_data
    assert "results" in result_data
    assert len(result_data["results"]) > 0
    assert result_data["results"][0]["title"] == "Test Page Mock Title"


@pytest.mark.anyio
async def test_get_comments(client, mock_confluence_fetcher):
    """Test retrieving page comments."""
    response = await client.call_tool("confluence_get_comments", {"page_id": "123456"})

    mock_confluence_fetcher.get_page_comments.assert_called_once_with("123456")

    result_data = json.loads(response[0].text)
    assert isinstance(result_data, list)
    assert len(result_data) > 0
    assert result_data[0]["author"] == "Test User"


@pytest.mark.anyio
async def test_add_comment(client, mock_confluence_fetcher):
    """Test adding a comment to a Confluence page."""
    response = await client.call_tool(
        "confluence_add_comment",
        {"page_id": "123456", "content": "Test comment content"},
    )

    mock_confluence_fetcher.add_comment.assert_called_once_with(
        page_id="123456", content="Test comment content"
    )

    result_data = json.loads(response[0].text)
    assert isinstance(result_data, dict)
    assert result_data["success"] is True
    assert "comment" in result_data
    assert result_data["comment"]["id"] == "987"
    assert result_data["comment"]["author"] == "Test User"
    assert result_data["comment"]["body"] == "This is a test comment added via API"
    assert result_data["comment"]["created"] == "2023-08-01T13:00:00.000Z"


@pytest.mark.anyio
async def test_get_labels(client, mock_confluence_fetcher):
    """Test retrieving page labels."""
    response = await client.call_tool("confluence_get_labels", {"page_id": "123456"})
    mock_confluence_fetcher.get_page_labels.assert_called_once_with("123456")
    result_data = json.loads(response[0].text)
    assert isinstance(result_data, list)
    assert result_data[0]["name"] == "test-label"


@pytest.mark.anyio
async def test_add_label(client, mock_confluence_fetcher):
    """Test adding a label to a page."""
    response = await client.call_tool(
        "confluence_add_label", {"page_id": "123456", "name": "new-label"}
    )
    mock_confluence_fetcher.add_page_label.assert_called_once_with(
        "123456", "new-label"
    )
    result_data = json.loads(response[0].text)
    assert isinstance(result_data, list)
    assert result_data[0]["name"] == "test-label"


@pytest.mark.anyio
async def test_search_user(client, mock_confluence_fetcher):
    """Test the search_user tool with CQL query."""
    response = await client.call_tool(
        "confluence_search_user", {"query": 'user.fullname ~ "First Last"', "limit": 10}
    )

    mock_confluence_fetcher.search_user.assert_called_once_with(
        'user.fullname ~ "First Last"', limit=10
    )

    result_data = json.loads(response[0].text)
    assert isinstance(result_data, list)
    assert len(result_data) == 1
    assert result_data[0]["entity_type"] == "user"
    assert result_data[0]["title"] == "First Last"
    assert result_data[0]["user"]["account_id"] == "a031248587011jasoidf9832jd8j1"
    assert result_data[0]["user"]["display_name"] == "First Last"


@pytest.mark.anyio
async def test_create_page_with_numeric_parent_id(client, mock_confluence_fetcher):
    """Test creating a page with numeric parent_id (integer) - should convert to string."""
    response = await client.call_tool(
        "confluence_create_page",
        {
            "space_key": "TEST",
            "title": "Test Page",
            "content": "Test content",
            "parent_id": 123456789,  # Numeric ID as integer
        },
    )

    # Verify the parent_id was converted to string when calling the underlying method
    mock_confluence_fetcher.create_page.assert_called_once()
    call_kwargs = mock_confluence_fetcher.create_page.call_args.kwargs
    assert call_kwargs["parent_id"] == "123456789"  # Should be string
    assert call_kwargs["space_key"] == "TEST"
    assert call_kwargs["title"] == "Test Page"

    result_data = json.loads(response[0].text)
    assert result_data["message"] == "Page created successfully"
    assert result_data["page"]["title"] == "Test Page Mock Title"


@pytest.mark.anyio
async def test_create_page_with_string_parent_id(client, mock_confluence_fetcher):
    """Test creating a page with string parent_id - should remain unchanged."""
    response = await client.call_tool(
        "confluence_create_page",
        {
            "space_key": "TEST",
            "title": "Test Page",
            "content": "Test content",
            "parent_id": "123456789",  # String ID
        },
    )

    mock_confluence_fetcher.create_page.assert_called_once()
    call_kwargs = mock_confluence_fetcher.create_page.call_args.kwargs
    assert call_kwargs["parent_id"] == "123456789"  # Should remain string
    assert call_kwargs["space_key"] == "TEST"
    assert call_kwargs["title"] == "Test Page"

    result_data = json.loads(response[0].text)
    assert result_data["message"] == "Page created successfully"
    assert result_data["page"]["title"] == "Test Page Mock Title"


@pytest.mark.anyio
async def test_update_page_with_numeric_parent_id(client, mock_confluence_fetcher):
    """Test updating a page with numeric parent_id (integer) - should convert to string."""
    response = await client.call_tool(
        "confluence_update_page",
        {
            "page_id": "999999",
            "title": "Updated Page",
            "content": "Updated content",
            "parent_id": 123456789,  # Numeric ID as integer
        },
    )

    mock_confluence_fetcher.update_page.assert_called_once()
    call_kwargs = mock_confluence_fetcher.update_page.call_args.kwargs
    assert call_kwargs["parent_id"] == "123456789"  # Should be string
    assert call_kwargs["page_id"] == "999999"
    assert call_kwargs["title"] == "Updated Page"

    result_data = json.loads(response[0].text)
    assert result_data["message"] == "Page updated successfully"
    assert result_data["page"]["title"] == "Test Page Mock Title"


@pytest.mark.anyio
async def test_update_page_with_string_parent_id(client, mock_confluence_fetcher):
    """Test updating a page with string parent_id - should remain unchanged."""
    response = await client.call_tool(
        "confluence_update_page",
        {
            "page_id": "999999",
            "title": "Updated Page",
            "content": "Updated content",
            "parent_id": "123456789",  # String ID
        },
    )

    mock_confluence_fetcher.update_page.assert_called_once()
    call_kwargs = mock_confluence_fetcher.update_page.call_args.kwargs
    assert call_kwargs["parent_id"] == "123456789"  # Should remain string
    assert call_kwargs["page_id"] == "999999"
    assert call_kwargs["title"] == "Updated Page"

    result_data = json.loads(response[0].text)
    assert result_data["message"] == "Page updated successfully"
    assert result_data["page"]["title"] == "Test Page Mock Title"

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/fields.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira field operations."""

import logging
from typing import Any

from thefuzz import fuzz

from .client import JiraClient
from .protocols import EpicOperationsProto, UsersOperationsProto

logger = logging.getLogger("mcp-jira")


class FieldsMixin(JiraClient, EpicOperationsProto, UsersOperationsProto):
    """Mixin for Jira field operations.

    This mixin provides methods for discovering, caching, and working with Jira fields.
    Field IDs in Jira are crucial for many operations since they can differ across
    different Jira instances, especially for custom fields.
    """

    _field_name_to_id_map: dict[str, str] | None = None  # Cache for name -> id mapping

    def get_fields(self, refresh: bool = False) -> list[dict[str, Any]]:
        """
        Get all available fields from Jira.

        Args:
            refresh: When True, forces a refresh from the server instead of using cache

        Returns:
            List of field definitions
        """
        try:
            # Use cached field data if available and refresh is not requested
            if self._field_ids_cache is not None and not refresh:
                return self._field_ids_cache

            if refresh:
                self._field_name_to_id_map = (
                    None  # Clear name map cache if refreshing fields
                )

            # Fetch fields from Jira API
            fields = self.jira.get_all_fields()
            if not isinstance(fields, list):
                msg = f"Unexpected return value type from `jira.get_all_fields`: {type(fields)}"
                logger.error(msg)
                raise TypeError(msg)

            # Cache the fields
            self._field_ids_cache = fields

            # Regenerate the name map upon fetching new fields
            self._generate_field_map(force_regenerate=True)

            # Log available fields for debugging
            self._log_available_fields(fields)

            return fields

        except Exception as e:
            logger.error(f"Error getting Jira fields: {str(e)}")
            return []

    def _generate_field_map(self, force_regenerate: bool = False) -> dict[str, str]:
        """Generates and caches a map of lowercase field names to field IDs."""
        if self._field_name_to_id_map is not None and not force_regenerate:
            return self._field_name_to_id_map

        # Ensure fields are loaded into cache first
        fields = (
            self.get_fields()
        )  # Uses cache if available unless force_regenerate was True
        if not fields:
            self._field_name_to_id_map = {}
            return {}

        name_map: dict[str, str] = {}
        id_map: dict[str, str] = {}  # Also map ID to ID for consistency
        for field in fields:
            field_id = field.get("id")
            field_name = field.get("name")
            if field_id:
                id_map[field_id] = field_id  # Map ID to itself
                if field_name:
                    # Store lowercase name -> ID. Handle potential name collisions if necessary.
                    name_map.setdefault(field_name.lower(), field_id)

        # Combine maps, ensuring IDs can also be looked up directly
        self._field_name_to_id_map = name_map | id_map
        logger.debug(
            f"Generated/Updated field name map: {len(self._field_name_to_id_map)} entries"
        )
        return self._field_name_to_id_map

    def get_field_id(self, field_name: str, refresh: bool = False) -> str | None:
        """
        Get the ID for a specific field by name.

        Args:
            field_name: The name of the field to look for (case-insensitive)
            refresh: When True, forces a refresh from the server

        Returns:
            Field ID if found, None otherwise
        """
        try:
            # Ensure the map is generated/cached
            field_map = self._generate_field_map(force_regenerate=refresh)
            if not field_map:
                logger.error("Field map could not be generated.")
                return None

            normalized_name = field_name.lower()
            if normalized_name in field_map:
                return field_map[normalized_name]
            # Fallback: Check if the input IS an ID (using original casing)
            elif field_name in field_map:  # Checks the id_map part
                return field_map[field_name]
            else:
                logger.warning(f"Field '{field_name}' not found in generated map.")
                return None

        except Exception as e:
            logger.error(f"Error getting field ID for '{field_name}': {str(e)}")
            return None

    def get_field_by_id(
        self, field_id: str, refresh: bool = False
    ) -> dict[str, Any] | None:
        """
        Get field definition by ID.

        Args:
            field_id: The ID of the field to look for
            refresh: When True, forces a refresh from the server

        Returns:
            Field definition if found, None otherwise
        """
        try:
            fields = self.get_fields(refresh=refresh)

            for field in fields:
                if field.get("id") == field_id:
                    return field

            logger.warning(f"Field with ID '{field_id}' not found")
            return None

        except Exception as e:
            logger.error(f"Error getting field by ID '{field_id}': {str(e)}")
            return None

    def get_custom_fields(self, refresh: bool = False) -> list[dict[str, Any]]:
        """
        Get all custom fields.

        Args:
            refresh: When True, forces a refresh from the server

        Returns:
            List of custom field definitions
        """
        try:
            fields = self.get_fields(refresh=refresh)
            custom_fields = [
                field
                for field in fields
                if field.get("id", "").startswith("customfield_")
            ]

            return custom_fields

        except Exception as e:
            logger.error(f"Error getting custom fields: {str(e)}")
            return []

    def get_required_fields(self, issue_type: str, project_key: str) -> dict[str, Any]:
        """
        Get required fields for creating an issue of a specific type in a project.

        Args:
            issue_type: The issue type (e.g., 'Bug', 'Story', 'Epic')
            project_key: The project key (e.g., 'PROJ')

        Returns:
            Dictionary mapping required field names to their definitions
        """
        # Initialize cache if it doesn't exist
        if not hasattr(self, "_required_fields_cache"):
            self._required_fields_cache = {}

        # Check cache first
        cache_key = (project_key, issue_type)
        if cache_key in self._required_fields_cache:
            logger.debug(
                f"Returning cached required fields for {issue_type} in {project_key}"
            )
            return self._required_fields_cache[cache_key]

        try:
            # Step 1: Get the ID for the given issue type name within the project
            if not hasattr(self, "get_project_issue_types"):
                logger.error(
                    "get_project_issue_types method not available. Cannot resolve issue type ID."
                )
                return {}

            all_issue_types = self.get_project_issue_types(project_key)
            issue_type_id = None
            for it in all_issue_types:
                if it.get("name", "").lower() == issue_type.lower():
                    issue_type_id = it.get("id")
                    break

            if not issue_type_id:
                logger.warning(
                    f"Issue type '{issue_type}' not found in project '{project_key}'"
                )
                return {}

            # Step 2: Call the correct API method to get field metadata
            meta = self.jira.issue_createmeta_fieldtypes(
                project=project_key, issue_type_id=issue_type_id
            )

            required_fields = {}
            # Step 3: Parse the response and extract required fields
            if isinstance(meta, dict) and "fields" in meta:
                if isinstance(meta["fields"], list):
                    for field_meta in meta["fields"]:
                        if isinstance(field_meta, dict) and field_meta.get(
                            "required", False
                        ):
                            field_id = field_meta.get("fieldId")
                            if field_id:
                                required_fields[field_id] = field_meta
                else:
                    logger.warning(
                        "Unexpected format for 'fields' in createmeta response."
                    )

            if not required_fields:
                logger.warning(
                    f"No required fields found for issue type '{issue_type}' "
                    f"in project '{project_key}'"
                )

            # Cache the result before returning
            self._required_fields_cache[cache_key] = required_fields
            logger.debug(
                f"Cached required fields for {issue_type} in {project_key}: "
                f"{len(required_fields)} fields"
            )

            return required_fields

        except Exception as e:
            logger.error(
                f"Error getting required fields for issue type '{issue_type}' "
                f"in project '{project_key}': {str(e)}"
            )
            return {}

    def get_field_ids_to_epic(self) -> dict[str, str]:
        """
        Dynamically discover Jira field IDs relevant to Epic linking.
        This method queries the Jira API to find the correct custom field IDs
        for Epic-related fields, which can vary between different Jira instances.

        Returns:
            Dictionary mapping field names to their IDs
            (e.g., {'epic_link': 'customfield_10014', 'epic_name': 'customfield_10011'})
        """
        try:
            # Ensure field list and map are cached/generated
            self._generate_field_map()  # Generates map and ensures fields are cached

            # Get all fields (uses cache if available)
            fields = self.get_fields()
            if not fields:  # Check if get_fields failed or returned empty
                logger.error(
                    "Could not load field definitions for epic field discovery."
                )
                return {}

            field_ids = {}

            # Log the complete list of fields for debugging
            all_field_names = [field.get("name", "").lower() for field in fields]
            logger.debug(f"All field names: {all_field_names}")

            # Enhanced logging for debugging
            custom_fields = {
                field.get("id", ""): field.get("name", "")
                for field in fields
                if field.get("id", "").startswith("customfield_")
            }
            logger.debug(f"Custom fields: {custom_fields}")

            # Look for Epic-related fields - use multiple strategies to identify them
            for field in fields:
                field_name = field.get("name", "").lower()
                original_name = field.get("name", "")
                field_id = field.get("id", "")
                field_schema = field.get("schema", {})
                field_custom = field_schema.get("custom", "")

                if original_name and field_id:
                    field_ids[original_name] = field_id

                # Epic Link field - used to link issues to epics
                if (
                    field_name == "epic link"
                    or field_name == "epic"
                    or "epic link" in field_name
                    or field_custom == "com.pyxis.greenhopper.jira:gh-epic-link"
                    or field_id == "customfield_10014"
                ):  # Common in Jira Cloud
                    field_ids["epic_link"] = field_id
                    # For backward compatibility
                    field_ids["Epic Link"] = field_id
                    logger.debug(f"Found Epic Link field: {field_id} ({original_name})")

                # Epic Name field - used when creating epics
                elif (
                    field_name == "epic name"
                    or field_name == "epic title"
                    or "epic name" in field_name
                    or field_custom == "com.pyxis.greenhopper.jira:gh-epic-label"
                    or field_id == "customfield_10011"
                ):  # Common in Jira Cloud
                    field_ids["epic_name"] = field_id
                    # For backward compatibility
                    field_ids["Epic Name"] = field_id
                    logger.debug(f"Found Epic Name field: {field_id} ({original_name})")

                # Epic Status field
                elif (
                    field_name == "epic status"
                    or "epic status" in field_name
                    or field_custom == "com.pyxis.greenhopper.jira:gh-epic-status"
                ):
                    field_ids["epic_status"] = field_id
                    logger.debug(
                        f"Found Epic Status field: {field_id} ({original_name})"
                    )

                # Epic Color field
                elif (
                    field_name == "epic color"
                    or field_name == "epic colour"
                    or "epic color" in field_name
                    or "epic colour" in field_name
                    or field_custom == "com.pyxis.greenhopper.jira:gh-epic-color"
                ):
                    field_ids["epic_color"] = field_id
                    logger.debug(
                        f"Found Epic Color field: {field_id} ({original_name})"
                    )

                # Parent field - sometimes used instead of Epic Link
                elif (
                    field_name == "parent"
                    or field_name == "parent issue"
                    or "parent issue" in field_name
                ):
                    field_ids["parent"] = field_id
                    logger.debug(f"Found Parent field: {field_id} ({original_name})")

                # Try to detect any other fields that might be related to Epics
                elif "epic" in field_name and field_id.startswith("customfield_"):
                    key = f"epic_{field_name.replace(' ', '_').replace('-', '_')}"
                    field_ids[key] = field_id
                    logger.debug(
                        f"Found potential Epic-related field: {field_id} ({original_name})"
                    )

            # If we couldn't find certain key fields, try alternative approaches
            if "epic_name" not in field_ids or "epic_link" not in field_ids:
                logger.debug(
                    "Standard field search didn't find all Epic fields, trying alternative approaches"
                )
                self._try_discover_fields_from_existing_epic(field_ids)

            logger.debug(f"Discovered field IDs: {field_ids}")

            return field_ids

        except Exception as e:
            logger.error(f"Error discovering Jira field IDs: {str(e)}")
            # Return an empty dict as fallback
            return {}

    def _log_available_fields(self, fields: list[dict]) -> None:
        """
        Log available fields for debugging.

        Args:
            fields: List of field definitions
        """
        logger.debug("Available Jira fields:")
        for field in fields:
            field_id = field.get("id", "")
            name = field.get("name", "")
            field_type = field.get("schema", {}).get("type", "")
            logger.debug(f"{field_id}: {name} ({field_type})")

    def is_custom_field(self, field_id: str) -> bool:
        """
        Check if a field is a custom field.

        Args:
            field_id: The field ID to check

        Returns:
            True if it's a custom field, False otherwise
        """
        return field_id.startswith("customfield_")

    def format_field_value(self, field_id: str, value: Any) -> Any:
        """
        Format a field value based on its type for update operations.

        Different field types in Jira require different JSON formats when updating.
        This method helps format the value correctly for the specific field type.

        Args:
            field_id: The ID of the field
            value: The value to format

        Returns:
            Properly formatted value for the field
        """
        try:
            # Get field definition
            field = self.get_field_by_id(field_id)

            if not field:
                # For unknown fields, return value as-is
                return value

            field_type = field.get("schema", {}).get("type")

            # Format based on field type
            if field_type == "user":
                # Handle user fields - need accountId for cloud or name for server
                if isinstance(value, str):
                    try:
                        account_id = self._get_account_id(value)
                        return {"accountId": account_id}
                    except Exception as e:
                        logger.warning(f"Could not resolve user '{value}': {str(e)}")
                        return value
                else:
                    return value

            elif field_type == "array":
                # Handle array fields - convert single value to list if needed
                if not isinstance(value, list):
                    return [value]
                return value

            elif field_type == "option":
                # Handle option fields - convert to {"value": value} format
                if isinstance(value, str):
                    return {"value": value}
                return value

            # For other types, return as-is
            return value

        except Exception as e:
            logger.warning(f"Error formatting field value for '{field_id}': {str(e)}")
            return value

    def search_fields(
        self, keyword: str, limit: int = 10, *, refresh: bool = False
    ) -> list[dict[str, Any]]:
        """
        Search fields using fuzzy matching.

        Args:
            keyword: The search keyword
            limit: Maximum number of results to return (default: 10)
            refresh: When True, forces a refresh from the server

        Returns:
            List of matching field definitions, sorted by relevance
        """
        try:
            # Get all fields
            fields = self.get_fields(refresh=refresh)

            # if keyword is empty, return `limit` fields
            if not keyword:
                return fields[:limit]

            def similarity(keyword: str, field: dict) -> int:
                """Calculate similarity score between keyword and field."""
                name_candidates = [
                    field.get("id", ""),
                    field.get("key", ""),
                    field.get("name", ""),
                    *field.get("clauseNames", []),
                ]

                # Calculate the fuzzy match score
                return max(
                    fuzz.partial_ratio(keyword.lower(), name.lower())
                    for name in name_candidates
                )

            # Sort by similarity
            sorted_fields = sorted(
                fields, key=lambda x: similarity(keyword, x), reverse=True
            )

            # Return the top limit results
            return sorted_fields[:limit]

        except Exception as e:
            logger.error(f"Error searching fields: {str(e)}")
            return []

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_search.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the SearchMixin class."""

from unittest.mock import MagicMock, patch

import pytest
import requests
from requests import HTTPError

from mcp_atlassian.confluence.search import SearchMixin
from mcp_atlassian.confluence.utils import quote_cql_identifier_if_needed
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError


class TestSearchMixin:
    """Tests for the SearchMixin class."""

    @pytest.fixture
    def search_mixin(self, confluence_client):
        """Create a SearchMixin instance for testing."""
        # SearchMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.search.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = SearchMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = confluence_client.confluence
            mixin.config = confluence_client.config
            mixin.preprocessor = confluence_client.preprocessor
            return mixin

    def test_search_success(self, search_mixin):
        """Test search with successful results."""
        # Prepare the mock
        search_mixin.confluence.cql.return_value = {
            "results": [
                {
                    "content": {
                        "id": "123456789",
                        "title": "Test Page",
                        "type": "page",
                        "space": {"key": "SPACE", "name": "Test Space"},
                        "version": {"number": 1},
                    },
                    "excerpt": "Test content excerpt",
                    "url": "https://confluence.example.com/pages/123456789",
                }
            ]
        }

        # Mock the preprocessor to return processed content
        search_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed content",
        )

        # Call the method
        result = search_mixin.search("test query")

        # Verify API call
        search_mixin.confluence.cql.assert_called_once_with(cql="test query", limit=10)

        # Verify result
        assert len(result) == 1
        assert result[0].id == "123456789"
        assert result[0].title == "Test Page"
        assert result[0].content == "Processed content"

    def test_search_with_empty_results(self, search_mixin):
        """Test handling of empty search results."""
        # Mock an empty result set
        search_mixin.confluence.cql.return_value = {"results": []}

        # Act
        results = search_mixin.search("empty query")

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_with_non_page_content(self, search_mixin):
        """Test handling of non-page content in search results."""
        # Mock search results with non-page content
        search_mixin.confluence.cql.return_value = {
            "results": [
                {
                    "content": {"type": "blogpost", "id": "12345"},
                    "title": "Blog Post",
                    "excerpt": "This is a blog post",
                    "url": "/pages/12345",
                    "resultGlobalContainer": {"title": "TEST"},
                }
            ]
        }

        # Act
        results = search_mixin.search("blogpost query")

        # Assert
        assert isinstance(results, list)
        # The method should still handle them as pages since we're using models
        assert len(results) > 0

    def test_search_key_error(self, search_mixin):
        """Test handling of KeyError in search results."""
        # Mock a response missing required keys
        search_mixin.confluence.cql.return_value = {"incomplete": "data"}

        # Act
        results = search_mixin.search("invalid query")

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_request_exception(self, search_mixin):
        """Test handling of RequestException during search."""
        # Mock a network error
        search_mixin.confluence.cql.side_effect = requests.RequestException("API error")

        # Act
        results = search_mixin.search("error query")

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_value_error(self, search_mixin):
        """Test handling of ValueError during search."""
        # Mock a value error
        search_mixin.confluence.cql.side_effect = ValueError("Value error")

        # Act
        results = search_mixin.search("error query")

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_type_error(self, search_mixin):
        """Test handling of TypeError during search."""
        # Mock a type error
        search_mixin.confluence.cql.side_effect = TypeError("Type error")

        # Act
        results = search_mixin.search("error query")

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_with_spaces_filter(self, search_mixin):
        """Test searching with spaces filter from parameter."""
        # Prepare the mock
        search_mixin.confluence.cql.return_value = {
            "results": [
                {
                    "content": {
                        "id": "123456789",
                        "title": "Test Page",
                        "type": "page",
                        "space": {"key": "SPACE", "name": "Test Space"},
                        "version": {"number": 1},
                    },
                    "excerpt": "Test content excerpt",
                    "url": "https://confluence.example.com/pages/123456789",
                }
            ]
        }

        # Mock the preprocessor
        search_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed content",
        )

        # Test with single space filter
        result = search_mixin.search("test query", spaces_filter="DEV")

        # Verify space was properly quoted in the CQL query
        quoted_dev = quote_cql_identifier_if_needed("DEV")
        search_mixin.confluence.cql.assert_called_with(
            cql=f"(test query) AND (space = {quoted_dev})",
            limit=10,
        )
        assert len(result) == 1

        # Test with multiple spaces filter
        result = search_mixin.search("test query", spaces_filter="DEV,TEAM")

        # Verify spaces were properly quoted in the CQL query
        quoted_dev = quote_cql_identifier_if_needed("DEV")
        quoted_team = quote_cql_identifier_if_needed("TEAM")
        search_mixin.confluence.cql.assert_called_with(
            cql=f"(test query) AND (space = {quoted_dev} OR space = {quoted_team})",
            limit=10,
        )
        assert len(result) == 1

        # Test with filter when query already has space
        result = search_mixin.search('space = "EXISTING"', spaces_filter="DEV")
        search_mixin.confluence.cql.assert_called_with(
            cql='space = "EXISTING"',  # Should not add filter when space already exists
            limit=10,
        )
        assert len(result) == 1

    def test_search_with_config_spaces_filter(self, search_mixin):
        """Test search using spaces filter from config."""
        # Prepare the mock
        search_mixin.confluence.cql.return_value = {
            "results": [
                {
                    "content": {
                        "id": "123456789",
                        "title": "Test Page",
                        "type": "page",
                        "space": {"key": "SPACE", "name": "Test Space"},
                        "version": {"number": 1},
                    },
                    "excerpt": "Test content excerpt",
                    "url": "https://confluence.example.com/pages/123456789",
                }
            ]
        }

        # Mock the preprocessor
        search_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed content",
        )

        # Set config filter
        search_mixin.config.spaces_filter = "DEV,TEAM"

        # Test with config filter
        result = search_mixin.search("test query")

        # Verify spaces were properly quoted in the CQL query
        quoted_dev = quote_cql_identifier_if_needed("DEV")
        quoted_team = quote_cql_identifier_if_needed("TEAM")
        search_mixin.confluence.cql.assert_called_with(
            cql=f"(test query) AND (space = {quoted_dev} OR space = {quoted_team})",
            limit=10,
        )
        assert len(result) == 1

        # Test that explicit filter overrides config filter
        result = search_mixin.search("test query", spaces_filter="OVERRIDE")

        # Verify space was properly quoted in the CQL query
        quoted_override = quote_cql_identifier_if_needed("OVERRIDE")
        search_mixin.confluence.cql.assert_called_with(
            cql=f"(test query) AND (space = {quoted_override})",
            limit=10,
        )
        assert len(result) == 1

    def test_search_general_exception(self, search_mixin):
        """Test handling of general exceptions during search."""
        # Mock a general exception
        search_mixin.confluence.cql.side_effect = Exception("General error")

        # Act
        results = search_mixin.search("error query")

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_user_success(self, search_mixin):
        """Test search_user with successful results."""
        # Prepare the mock response
        search_mixin.confluence.get.return_value = {
            "results": [
                {
                    "user": {
                        "type": "known",
                        "accountId": "1234asdf",
                        "accountType": "atlassian",
                        "email": "[email protected]",
                        "publicName": "First Last",
                        "displayName": "First Last",
                        "isExternalCollaborator": False,
                        "profilePicture": {
                            "path": "/wiki/aa-avatar/1234asdf",
                            "width": 48,
                            "height": 48,
                            "isDefault": False,
                        },
                    },
                    "title": "First Last",
                    "excerpt": "",
                    "url": "/people/1234asdf",
                    "entityType": "user",
                    "lastModified": "2025-06-02T13:35:59.680Z",
                    "score": 0.0,
                }
            ],
            "start": 0,
            "limit": 25,
            "size": 1,
            "totalSize": 1,
            "cqlQuery": "( user.fullname ~ 'First Last' )",
            "searchDuration": 115,
        }

        # Call the method
        result = search_mixin.search_user('user.fullname ~ "First Last"')

        # Verify API call
        search_mixin.confluence.get.assert_called_once_with(
            "rest/api/search/user",
            params={"cql": 'user.fullname ~ "First Last"', "limit": 10},
        )

        # Verify result
        assert len(result) == 1
        assert result[0].user.account_id == "1234asdf"
        assert result[0].user.display_name == "First Last"
        assert result[0].user.email == "[email protected]"
        assert result[0].title == "First Last"
        assert result[0].entity_type == "user"

    def test_search_user_with_empty_results(self, search_mixin):
        """Test search_user with empty results."""
        # Mock an empty result set
        search_mixin.confluence.get.return_value = {
            "results": [],
            "start": 0,
            "limit": 25,
            "size": 0,
            "totalSize": 0,
            "cqlQuery": 'user.fullname ~ "Nonexistent"',
            "searchDuration": 50,
        }

        # Act
        results = search_mixin.search_user('user.fullname ~ "Nonexistent"')

        # Assert
        assert isinstance(results, list)
        assert len(results) == 0

    def test_search_user_with_custom_limit(self, search_mixin):
        """Test search_user with custom limit."""
        # Prepare the mock response
        search_mixin.confluence.get.return_value = {
            "results": [],
            "start": 0,
            "limit": 5,
            "size": 0,
            "totalSize": 0,
            "cqlQuery": 'user.fullname ~ "Test"',
            "searchDuration": 30,
        }

        # Call with custom limit
        search_mixin.search_user('user.fullname ~ "Test"', limit=5)

        # Verify API call with correct limit
        search_mixin.confluence.get.assert_called_once_with(
            "rest/api/search/user", params={"cql": 'user.fullname ~ "Test"', "limit": 5}
        )

    @pytest.mark.parametrize(
        "exception_type,exception_args,expected_result",
        [
            (requests.RequestException, ("Network error",), []),
            (ValueError, ("Value error",), []),
            (TypeError, ("Type error",), []),
            (Exception, ("General error",), []),
            (KeyError, ("Missing key",), []),
        ],
    )
    def test_search_user_exception_handling(
        self, search_mixin, exception_type, exception_args, expected_result
    ):
        """Test search_user handling of various exceptions that return empty list."""
        # Mock the exception
        search_mixin.confluence.get.side_effect = exception_type(*exception_args)

        # Act
        results = search_mixin.search_user('user.fullname ~ "Test"')

        # Assert
        assert isinstance(results, list)
        assert results == expected_result

    @pytest.mark.parametrize(
        "status_code,exception_type",
        [
            (401, MCPAtlassianAuthenticationError),
            (403, MCPAtlassianAuthenticationError),
        ],
    )
    def test_search_user_http_auth_errors(
        self, search_mixin, status_code, exception_type
    ):
        """Test search_user handling of HTTP authentication errors."""
        # Mock HTTP error
        mock_response = MagicMock()
        mock_response.status_code = status_code
        http_error = HTTPError(f"HTTP {status_code}")
        http_error.response = mock_response
        search_mixin.confluence.get.side_effect = http_error

        # Act and assert
        with pytest.raises(exception_type):
            search_mixin.search_user('user.fullname ~ "Test"')

    def test_search_user_http_other_error(self, search_mixin):
        """Test search_user handling of other HTTP errors."""
        # Mock HTTP 500 error
        mock_response = MagicMock()
        mock_response.status_code = 500
        http_error = HTTPError("Internal Server Error")
        http_error.response = mock_response
        search_mixin.confluence.get.side_effect = http_error

        # Act and assert - should re-raise the HTTPError
        with pytest.raises(HTTPError):
            search_mixin.search_user('user.fullname ~ "Test"')

    @pytest.mark.parametrize(
        "mock_response,expected_length",
        [
            ({"incomplete": "data"}, 0),  # KeyError case
            (None, 0),  # None response case
            ({"results": []}, 0),  # Empty results case
        ],
    )
    def test_search_user_edge_cases(self, search_mixin, mock_response, expected_length):
        """Test search_user handling of edge cases in API responses."""
        search_mixin.confluence.get.return_value = mock_response

        # Act
        results = search_mixin.search_user('user.fullname ~ "Test"')

        # Assert
        assert isinstance(results, list)
        assert len(results) == expected_length

    # You can also parametrize the regular search method exception tests:
    @pytest.mark.parametrize(
        "exception_type,exception_args,expected_result",
        [
            (requests.RequestException, ("API error",), []),
            (ValueError, ("Value error",), []),
            (TypeError, ("Type error",), []),
            (Exception, ("General error",), []),
            (KeyError, ("Missing key",), []),
        ],
    )
    def test_search_exception_handling(
        self, search_mixin, exception_type, exception_args, expected_result
    ):
        """Test search handling of various exceptions that return empty list."""
        # Mock the exception
        search_mixin.confluence.cql.side_effect = exception_type(*exception_args)

        # Act
        results = search_mixin.search("error query")

        # Assert
        assert isinstance(results, list)
        assert results == expected_result

    # Parametrize CQL query tests:
    @pytest.mark.parametrize(
        "query,limit,expected_params",
        [
            (
                'user.fullname ~ "Test"',
                10,
                {"cql": 'user.fullname ~ "Test"', "limit": 10},
            ),
            (
                'user.email ~ "[email protected]"',
                5,
                {"cql": 'user.email ~ "[email protected]"', "limit": 5},
            ),
            (
                'user.fullname ~ "John" AND user.email ~ "@company.com"',
                15,
                {
                    "cql": 'user.fullname ~ "John" AND user.email ~ "@company.com"',
                    "limit": 15,
                },
            ),
        ],
    )
    def test_search_user_api_parameters(
        self, search_mixin, query, limit, expected_params
    ):
        """Test that search_user calls the API with correct parameters."""
        # Mock successful response
        search_mixin.confluence.get.return_value = {
            "results": [],
            "start": 0,
            "limit": limit,
            "totalSize": 0,
        }

        # Act
        search_mixin.search_user(query, limit=limit)

        # Assert API was called with correct parameters
        search_mixin.confluence.get.assert_called_once_with(
            "rest/api/search/user", params=expected_params
        )

    def test_search_user_with_complex_cql_query(self, search_mixin):
        """Test search_user with complex CQL query containing operators."""
        # Mock successful response
        search_mixin.confluence.get.return_value = {
            "results": [],
            "start": 0,
            "limit": 10,
            "totalSize": 0,
        }

        complex_query = 'user.fullname ~ "John" AND user.email ~ "@company.com" OR user.displayName ~ "JD"'

        # Act
        search_mixin.search_user(complex_query)

        # Assert API was called with the exact query
        search_mixin.confluence.get.assert_called_once_with(
            "rest/api/search/user", params={"cql": complex_query, "limit": 10}
        )

    def test_search_user_result_processing(self, search_mixin):
        """Test that search_user properly processes and returns user search result objects."""
        # Mock response with user data
        search_mixin.confluence.get.return_value = {
            "results": [
                {
                    "user": {
                        "accountId": "test-account-id",
                        "displayName": "Test User",
                        "email": "[email protected]",
                        "isExternalCollaborator": False,
                    },
                    "title": "Test User",
                    "entityType": "user",
                    "score": 1.5,
                }
            ],
            "start": 0,
            "limit": 10,
            "totalSize": 1,
        }

        # Act
        results = search_mixin.search_user('user.fullname ~ "Test User"')

        # Assert result structure
        assert len(results) == 1
        assert hasattr(results[0], "user")
        assert hasattr(results[0], "title")
        assert hasattr(results[0], "entity_type")
        assert results[0].user.account_id == "test-account-id"
        assert results[0].user.display_name == "Test User"
        assert results[0].title == "Test User"
        assert results[0].entity_type == "user"

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/oauth.py:
--------------------------------------------------------------------------------

```python
"""OAuth 2.0 utilities for Atlassian Cloud authentication.

This module provides utilities for OAuth 2.0 (3LO) authentication with Atlassian Cloud.
It handles:
- OAuth configuration
- Token acquisition, storage, and refresh
- Session configuration for API clients
"""

import json
import logging
import os
import pprint
import time
import urllib.parse
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional

import keyring
import requests

# Configure logging
logger = logging.getLogger("mcp-atlassian.oauth")

# Constants
TOKEN_URL = "https://auth.atlassian.com/oauth/token"  # noqa: S105 - This is a public API endpoint URL, not a password
AUTHORIZE_URL = "https://auth.atlassian.com/authorize"
CLOUD_ID_URL = "https://api.atlassian.com/oauth/token/accessible-resources"
TOKEN_EXPIRY_MARGIN = 300  # 5 minutes in seconds
KEYRING_SERVICE_NAME = "mcp-atlassian-oauth"


@dataclass
class OAuthConfig:
    """OAuth 2.0 configuration for Atlassian Cloud.

    This class manages the OAuth configuration and tokens. It handles:
    - Authentication configuration (client credentials)
    - Token acquisition and refreshing
    - Token storage and retrieval
    - Cloud ID identification
    """

    client_id: str
    client_secret: str
    redirect_uri: str
    scope: str
    cloud_id: str | None = None
    refresh_token: str | None = None
    access_token: str | None = None
    expires_at: float | None = None

    @property
    def is_token_expired(self) -> bool:
        """Check if the access token is expired or will expire soon.

        Returns:
            True if the token is expired or will expire soon, False otherwise.
        """
        # If we don't have a token or expiry time, consider it expired
        if not self.access_token or not self.expires_at:
            return True

        # Consider the token expired if it will expire within the margin
        return time.time() + TOKEN_EXPIRY_MARGIN >= self.expires_at

    def get_authorization_url(self, state: str) -> str:
        """Get the authorization URL for the OAuth 2.0 flow.

        Args:
            state: Random state string for CSRF protection

        Returns:
            The authorization URL to redirect the user to.
        """
        params = {
            "audience": "api.atlassian.com",
            "client_id": self.client_id,
            "scope": self.scope,
            "redirect_uri": self.redirect_uri,
            "response_type": "code",
            "prompt": "consent",
            "state": state,
        }
        return f"{AUTHORIZE_URL}?{urllib.parse.urlencode(params)}"

    def exchange_code_for_tokens(self, code: str) -> bool:
        """Exchange the authorization code for access and refresh tokens.

        Args:
            code: The authorization code from the callback

        Returns:
            True if tokens were successfully acquired, False otherwise.
        """
        try:
            payload = {
                "grant_type": "authorization_code",
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "code": code,
                "redirect_uri": self.redirect_uri,
            }

            logger.info(f"Exchanging authorization code for tokens at {TOKEN_URL}")
            logger.debug(f"Token exchange payload: {pprint.pformat(payload)}")

            response = requests.post(TOKEN_URL, data=payload)

            # Log more details about the response
            logger.debug(f"Token exchange response status: {response.status_code}")
            logger.debug(
                f"Token exchange response headers: {pprint.pformat(response.headers)}"
            )
            logger.debug(f"Token exchange response body: {response.text[:500]}...")

            if not response.ok:
                logger.error(
                    f"Token exchange failed with status {response.status_code}. Response: {response.text}"
                )
                return False

            # Parse the response
            token_data = response.json()

            # Check if required tokens are present
            if "access_token" not in token_data:
                logger.error(
                    f"Access token not found in response. Keys found: {list(token_data.keys())}"
                )
                return False

            if "refresh_token" not in token_data:
                logger.error(
                    "Refresh token not found in response. Ensure 'offline_access' scope is included. "
                    f"Keys found: {list(token_data.keys())}"
                )
                return False

            self.access_token = token_data["access_token"]
            self.refresh_token = token_data["refresh_token"]
            self.expires_at = time.time() + token_data["expires_in"]

            # Get the cloud ID using the access token
            self._get_cloud_id()

            # Save the tokens
            self._save_tokens()

            # Log success message with token details
            logger.info(
                f"✅ OAuth token exchange successful! Access token expires in {token_data['expires_in']}s."
            )
            logger.info(
                f"Access Token (partial): {self.access_token[:10]}...{self.access_token[-5:] if self.access_token else ''}"
            )
            logger.info(
                f"Refresh Token (partial): {self.refresh_token[:5]}...{self.refresh_token[-3:] if self.refresh_token else ''}"
            )
            if self.cloud_id:
                logger.info(f"Cloud ID successfully retrieved: {self.cloud_id}")
            else:
                logger.warning(
                    "Cloud ID was not retrieved after token exchange. Check accessible resources."
                )
            return True
        except requests.exceptions.RequestException as e:
            logger.error(f"Network error during token exchange: {e}", exc_info=True)
            return False
        except json.JSONDecodeError as e:
            logger.error(
                f"Failed to decode JSON response from token endpoint: {e}",
                exc_info=True,
            )
            logger.error(
                f"Response text that failed to parse: {response.text if 'response' in locals() else 'Response object not available'}"
            )
            return False
        except Exception as e:
            logger.error(f"Failed to exchange code for tokens: {e}")
            return False

    def refresh_access_token(self) -> bool:
        """Refresh the access token using the refresh token.

        Returns:
            True if the token was successfully refreshed, False otherwise.
        """
        if not self.refresh_token:
            logger.error("No refresh token available")
            return False

        try:
            payload = {
                "grant_type": "refresh_token",
                "client_id": self.client_id,
                "client_secret": self.client_secret,
                "refresh_token": self.refresh_token,
            }

            logger.debug("Refreshing access token...")
            response = requests.post(TOKEN_URL, data=payload)
            response.raise_for_status()

            # Parse the response
            token_data = response.json()
            self.access_token = token_data["access_token"]
            # Refresh token might also be rotated
            if "refresh_token" in token_data:
                self.refresh_token = token_data["refresh_token"]
            self.expires_at = time.time() + token_data["expires_in"]

            # Save the tokens
            self._save_tokens()

            return True
        except Exception as e:
            logger.error(f"Failed to refresh access token: {e}")
            return False

    def ensure_valid_token(self) -> bool:
        """Ensure the access token is valid, refreshing if necessary.

        Returns:
            True if the token is valid (or was refreshed successfully), False otherwise.
        """
        if not self.is_token_expired:
            return True
        return self.refresh_access_token()

    def _get_cloud_id(self) -> None:
        """Get the cloud ID for the Atlassian instance.

        This method queries the accessible resources endpoint to get the cloud ID.
        The cloud ID is needed for API calls with OAuth.
        """
        if not self.access_token:
            logger.debug("No access token available to get cloud ID")
            return

        try:
            headers = {"Authorization": f"Bearer {self.access_token}"}
            response = requests.get(CLOUD_ID_URL, headers=headers)
            response.raise_for_status()

            resources = response.json()
            if resources and len(resources) > 0:
                # Use the first cloud site (most users have only one)
                # For users with multiple sites, they might need to specify which one to use
                self.cloud_id = resources[0]["id"]
                logger.debug(f"Found cloud ID: {self.cloud_id}")
            else:
                logger.warning("No Atlassian sites found in the response")
        except Exception as e:
            logger.error(f"Failed to get cloud ID: {e}")

    def _get_keyring_username(self) -> str:
        """Get the keyring username for storing tokens.

        The username is based on the client ID to allow multiple OAuth apps.

        Returns:
            A username string for keyring
        """
        return f"oauth-{self.client_id}"

    def _save_tokens(self) -> None:
        """Save the tokens securely using keyring for later use.

        This allows the tokens to be reused between runs without requiring
        the user to go through the authorization flow again.
        """
        try:
            username = self._get_keyring_username()

            # Store token data as JSON string in keyring
            token_data = {
                "refresh_token": self.refresh_token,
                "access_token": self.access_token,
                "expires_at": self.expires_at,
                "cloud_id": self.cloud_id,
            }

            # Store the token data in the system keyring
            keyring.set_password(KEYRING_SERVICE_NAME, username, json.dumps(token_data))

            logger.debug(f"Saved OAuth tokens to keyring for {username}")

            # Also maintain backwards compatibility with file storage
            # for environments where keyring might not work
            self._save_tokens_to_file(token_data)

        except Exception as e:
            logger.error(f"Failed to save tokens to keyring: {e}")
            # Fall back to file storage if keyring fails
            self._save_tokens_to_file()

    def _save_tokens_to_file(self, token_data: dict = None) -> None:
        """Save the tokens to a file as fallback storage.

        Args:
            token_data: Optional dict with token data. If not provided,
                        will use the current object attributes.
        """
        try:
            # Create the directory if it doesn't exist
            token_dir = Path.home() / ".mcp-atlassian"
            token_dir.mkdir(exist_ok=True)

            # Save the tokens to a file
            token_path = token_dir / f"oauth-{self.client_id}.json"

            if token_data is None:
                token_data = {
                    "refresh_token": self.refresh_token,
                    "access_token": self.access_token,
                    "expires_at": self.expires_at,
                    "cloud_id": self.cloud_id,
                }

            with open(token_path, "w") as f:
                json.dump(token_data, f)

            logger.debug(f"Saved OAuth tokens to file {token_path} (fallback storage)")
        except Exception as e:
            logger.error(f"Failed to save tokens to file: {e}")

    @staticmethod
    def load_tokens(client_id: str) -> dict[str, Any]:
        """Load tokens securely from keyring.

        Args:
            client_id: The OAuth client ID

        Returns:
            Dict with the token data or empty dict if no tokens found
        """
        username = f"oauth-{client_id}"

        # Try to load tokens from keyring first
        try:
            token_json = keyring.get_password(KEYRING_SERVICE_NAME, username)
            if token_json:
                logger.debug(f"Loaded OAuth tokens from keyring for {username}")
                return json.loads(token_json)
        except Exception as e:
            logger.warning(
                f"Failed to load tokens from keyring: {e}. Trying file fallback."
            )

        # Fall back to loading from file if keyring fails or returns None
        return OAuthConfig._load_tokens_from_file(client_id)

    @staticmethod
    def _load_tokens_from_file(client_id: str) -> dict[str, Any]:
        """Load tokens from a file as fallback.

        Args:
            client_id: The OAuth client ID

        Returns:
            Dict with the token data or empty dict if no tokens found
        """
        token_path = Path.home() / ".mcp-atlassian" / f"oauth-{client_id}.json"

        if not token_path.exists():
            return {}

        try:
            with open(token_path) as f:
                token_data = json.load(f)
                logger.debug(
                    f"Loaded OAuth tokens from file {token_path} (fallback storage)"
                )
                return token_data
        except Exception as e:
            logger.error(f"Failed to load tokens from file: {e}")
            return {}

    @classmethod
    def from_env(cls) -> Optional["OAuthConfig"]:
        """Create an OAuth configuration from environment variables.

        Returns:
            OAuthConfig instance or None if OAuth is not enabled
        """
        # Check if OAuth is explicitly enabled (allows minimal config)
        oauth_enabled = os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in (
            "true",
            "1",
            "yes",
        )

        # Check for required environment variables
        client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID")
        client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET")
        redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI")
        scope = os.getenv("ATLASSIAN_OAUTH_SCOPE")

        # Full OAuth configuration (traditional mode)
        if all([client_id, client_secret, redirect_uri, scope]):
            # Create the OAuth configuration with full credentials
            config = cls(
                client_id=client_id,
                client_secret=client_secret,
                redirect_uri=redirect_uri,
                scope=scope,
                cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
            )

            # Try to load existing tokens
            token_data = cls.load_tokens(client_id)
            if token_data:
                config.refresh_token = token_data.get("refresh_token")
                config.access_token = token_data.get("access_token")
                config.expires_at = token_data.get("expires_at")
                if not config.cloud_id and "cloud_id" in token_data:
                    config.cloud_id = token_data["cloud_id"]

            return config

        # Minimal OAuth configuration (user-provided tokens mode)
        elif oauth_enabled:
            # Create minimal config that works with user-provided tokens
            logger.info(
                "Creating minimal OAuth config for user-provided tokens (ATLASSIAN_OAUTH_ENABLE=true)"
            )
            return cls(
                client_id="",  # Will be provided by user tokens
                client_secret="",  # Not needed for user tokens
                redirect_uri="",  # Not needed for user tokens
                scope="",  # Will be determined by user token permissions
                cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),  # Optional fallback
            )

        # No OAuth configuration
        return None


@dataclass
class BYOAccessTokenOAuthConfig:
    """OAuth configuration when providing a pre-existing access token.

    This class is used when the user provides their own Atlassian Cloud ID
    and access token directly, bypassing the full OAuth 2.0 (3LO) flow.
    It's suitable for scenarios like service accounts or CI/CD pipelines
    where an access token is already available.

    This configuration does not support token refreshing.
    """

    cloud_id: str
    access_token: str
    refresh_token: None = None
    expires_at: None = None

    @classmethod
    def from_env(cls) -> Optional["BYOAccessTokenOAuthConfig"]:
        """Create a BYOAccessTokenOAuthConfig from environment variables.

        Reads `ATLASSIAN_OAUTH_CLOUD_ID` and `ATLASSIAN_OAUTH_ACCESS_TOKEN`.

        Returns:
            BYOAccessTokenOAuthConfig instance or None if required
            environment variables are missing.
        """
        cloud_id = os.getenv("ATLASSIAN_OAUTH_CLOUD_ID")
        access_token = os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN")

        if not all([cloud_id, access_token]):
            return None

        return cls(cloud_id=cloud_id, access_token=access_token)


def get_oauth_config_from_env() -> OAuthConfig | BYOAccessTokenOAuthConfig | None:
    """Get the appropriate OAuth configuration from environment variables.

    This function attempts to load standard OAuth configuration first (OAuthConfig).
    If that's not available, it tries to load a "Bring Your Own Access Token"
    configuration (BYOAccessTokenOAuthConfig).

    Returns:
        An instance of OAuthConfig or BYOAccessTokenOAuthConfig if environment
        variables are set for either, otherwise None.
    """
    return BYOAccessTokenOAuthConfig.from_env() or OAuthConfig.from_env()


def configure_oauth_session(
    session: requests.Session, oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig
) -> bool:
    """Configure a requests session with OAuth 2.0 authentication.

    This function ensures the access token is valid and adds it to the session headers.

    Args:
        session: The requests session to configure
        oauth_config: The OAuth configuration to use

    Returns:
        True if the session was successfully configured, False otherwise
    """
    logger.debug(
        f"configure_oauth_session: Received OAuthConfig with "
        f"access_token_present={bool(oauth_config.access_token)}, "
        f"refresh_token_present={bool(oauth_config.refresh_token)}, "
        f"cloud_id='{oauth_config.cloud_id}'"
    )
    # If user provided only an access token (no refresh_token), use it directly
    if oauth_config.access_token and not oauth_config.refresh_token:
        logger.info(
            "configure_oauth_session: Using provided OAuth access token directly (no refresh_token)."
        )
        session.headers["Authorization"] = f"Bearer {oauth_config.access_token}"
        return True
    logger.debug("configure_oauth_session: Proceeding to ensure_valid_token.")
    # Otherwise, ensure we have a valid token (refresh if needed)
    if isinstance(oauth_config, BYOAccessTokenOAuthConfig):
        logger.error(
            "configure_oauth_session: oauth access token configuration provided as empty string."
        )
        return False
    if not oauth_config.ensure_valid_token():
        logger.error(
            f"configure_oauth_session: ensure_valid_token returned False. "
            f"Token was expired: {oauth_config.is_token_expired}, "
            f"Refresh token present for attempt: {bool(oauth_config.refresh_token)}"
        )
        return False
    session.headers["Authorization"] = f"Bearer {oauth_config.access_token}"
    logger.info("Successfully configured OAuth session for Atlassian Cloud API")
    return True

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_fields.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira Fields mixin."""

from typing import Any
from unittest.mock import MagicMock

import pytest

from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.fields import FieldsMixin


class TestFieldsMixin:
    """Tests for the FieldsMixin class."""

    @pytest.fixture
    def fields_mixin(self, jira_fetcher: JiraFetcher) -> FieldsMixin:
        """Create a FieldsMixin instance with mocked dependencies."""
        mixin = jira_fetcher
        mixin._field_ids_cache = None
        return mixin

    @pytest.fixture
    def mock_fields(self):
        """Return mock field data."""
        return [
            {"id": "summary", "name": "Summary", "schema": {"type": "string"}},
            {"id": "description", "name": "Description", "schema": {"type": "string"}},
            {"id": "status", "name": "Status", "schema": {"type": "status"}},
            {"id": "assignee", "name": "Assignee", "schema": {"type": "user"}},
            {
                "id": "customfield_10010",
                "name": "Epic Link",
                "schema": {
                    "type": "string",
                    "custom": "com.pyxis.greenhopper.jira:gh-epic-link",
                },
            },
            {
                "id": "customfield_10011",
                "name": "Epic Name",
                "schema": {
                    "type": "string",
                    "custom": "com.pyxis.greenhopper.jira:gh-epic-label",
                },
            },
            {
                "id": "customfield_10012",
                "name": "Story Points",
                "schema": {"type": "number"},
            },
        ]

    def test_get_field_ids_cache(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_fields uses cache when available."""
        # Set up the cache
        fields_mixin._field_ids_cache = mock_fields

        # Call the method
        result = fields_mixin.get_fields()

        # Verify cache was used
        assert result == mock_fields
        fields_mixin.jira.get_all_fields.assert_not_called()

    def test_get_fields_refresh(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_fields refreshes data when requested."""
        # Set up the cache
        fields_mixin._field_ids_cache = [{"id": "old_data", "name": "old data"}]

        # Mock the API response
        fields_mixin.jira.get_all_fields.return_value = mock_fields

        # Call the method with refresh=True
        result = fields_mixin.get_fields(refresh=True)

        # Verify API was called
        fields_mixin.jira.get_all_fields.assert_called_once()
        assert result == mock_fields
        # Verify cache was updated
        assert fields_mixin._field_ids_cache == mock_fields

    def test_get_fields_from_api(
        self, fields_mixin: FieldsMixin, mock_fields: list[dict[str, Any]]
    ):
        """Test get_fields fetches from API when no cache exists."""
        # Mock the API response
        fields_mixin.jira.get_all_fields.return_value = mock_fields

        # Call the method
        result = fields_mixin.get_fields()

        # Verify API was called
        fields_mixin.jira.get_all_fields.assert_called_once()
        assert result == mock_fields
        # Verify cache was created
        assert fields_mixin._field_ids_cache == mock_fields

    def test_get_fields_error(self, fields_mixin: FieldsMixin):
        """Test get_fields handles errors gracefully."""

        # Mock API error
        fields_mixin.jira.get_all_fields.side_effect = Exception("API error")

        # Call the method
        result = fields_mixin.get_fields()

        # Verify empty list is returned on error
        assert result == []

    def test_get_field_id_by_exact_match(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_field_id finds field by exact name match."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call the method
        result = fields_mixin.get_field_id("Summary")

        # Verify the result
        assert result == "summary"

    def test_get_field_id_case_insensitive(
        self, fields_mixin: FieldsMixin, mock_fields
    ):
        """Test get_field_id is case-insensitive."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call the method with different case
        result = fields_mixin.get_field_id("summary")

        # Verify the result
        assert result == "summary"

    def test_get_field_id_exact_match_case_insensitive(
        self, fields_mixin: FieldsMixin, mock_fields
    ):
        """Test get_field_id finds field by exact match (case-insensitive) using the map."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)
        # Ensure the map is generated based on the mock fields for this test
        fields_mixin._generate_field_map(force_regenerate=True)

        # Call the method with exact name (case-insensitive)
        result = fields_mixin.get_field_id("epic link")

        # Verify the result (should find Epic Link as first match)
        assert result == "customfield_10010"

    def test_get_field_id_not_found(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_field_id returns None when field not found."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call the method with non-existent field
        result = fields_mixin.get_field_id("NonExistent")

        # Verify the result
        assert result is None

    def test_get_field_id_error(self, fields_mixin: FieldsMixin):
        """Test get_field_id handles errors gracefully."""
        # Make get_fields raise an exception
        fields_mixin.get_fields = MagicMock(
            side_effect=Exception("Error getting fields")
        )

        # Call the method
        result = fields_mixin.get_field_id("Summary")

        # Verify None is returned on error
        assert result is None

    def test_get_field_by_id(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_field_by_id retrieves field definition correctly."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call the method
        result = fields_mixin.get_field_by_id("customfield_10012")

        # Verify the result
        assert result == mock_fields[6]  # The Story Points field
        assert result["name"] == "Story Points"

    def test_get_field_by_id_not_found(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_field_by_id returns None when field not found."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call the method with non-existent ID
        result = fields_mixin.get_field_by_id("customfield_99999")

        # Verify the result
        assert result is None

    def test_get_custom_fields(self, fields_mixin: FieldsMixin, mock_fields):
        """Test get_custom_fields returns only custom fields."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call the method
        result = fields_mixin.get_custom_fields()

        # Verify the result
        assert len(result) == 3
        assert all(field["id"].startswith("customfield_") for field in result)
        assert result[0]["name"] == "Epic Link"
        assert result[1]["name"] == "Epic Name"
        assert result[2]["name"] == "Story Points"

    def test_get_required_fields(self, fields_mixin: FieldsMixin):
        """Test get_required_fields retrieves required fields correctly."""
        # Mock the response for get_project_issue_types
        mock_issue_types = [
            {"id": "10001", "name": "Bug"},
            {"id": "10002", "name": "Task"},
        ]
        fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)

        # Mock the response for issue_createmeta_fieldtypes based on API docs
        mock_field_meta = {
            "fields": [
                {
                    "required": True,
                    "schema": {"type": "string", "system": "summary"},
                    "name": "Summary",
                    "fieldId": "summary",
                    "autoCompleteUrl": "",
                    "hasDefaultValue": False,
                    "operations": ["set"],
                    "allowedValues": [],
                },
                {
                    "required": False,
                    "schema": {"type": "string", "system": "description"},
                    "name": "Description",
                    "fieldId": "description",
                },
                {
                    "required": True,
                    "schema": {"type": "string", "custom": "some.custom.type"},
                    "name": "Epic Link",
                    "fieldId": "customfield_10010",
                },
            ]
        }
        fields_mixin.jira.issue_createmeta_fieldtypes.return_value = mock_field_meta

        # Call the method
        result = fields_mixin.get_required_fields("Bug", "TEST")

        # Verify the result
        assert len(result) == 2
        assert "summary" in result
        assert result["summary"]["required"] is True
        assert "customfield_10010" in result
        assert result["customfield_10010"]["required"] is True
        assert "description" not in result
        # Verify the correct API was called
        fields_mixin.get_project_issue_types.assert_called_once_with("TEST")
        fields_mixin.jira.issue_createmeta_fieldtypes.assert_called_once_with(
            project="TEST", issue_type_id="10001"
        )

    def test_get_required_fields_not_found(self, fields_mixin: FieldsMixin):
        """Test get_required_fields handles project/issue type not found."""
        # Scenario 1: Issue type not found in project
        mock_issue_types = [{"id": "10002", "name": "Task"}]  # "Bug" is missing
        fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
        fields_mixin.jira.issue_createmeta_fieldtypes = MagicMock()

        # Call the method
        result = fields_mixin.get_required_fields("Bug", "TEST")
        # Verify issue type lookup was attempted, but field meta was not called
        fields_mixin.get_project_issue_types.assert_called_once_with("TEST")
        fields_mixin.jira.issue_createmeta_fieldtypes.assert_not_called()

        # Verify the result
        assert result == {}

    def test_get_required_fields_error(self, fields_mixin: FieldsMixin):
        """Test get_required_fields handles errors gracefully."""
        # Mock the response for get_project_issue_types
        mock_issue_types = [
            {"id": "10001", "name": "Bug"},
        ]
        fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
        # Mock issue_createmeta_fieldtypes to raise an error
        fields_mixin.jira.issue_createmeta_fieldtypes.side_effect = Exception(
            "API error"
        )

        # Call the method
        result = fields_mixin.get_required_fields("Bug", "TEST")

        # Verify the result
        assert result == {}
        # Verify the correct API was called (which then raised the error)
        fields_mixin.jira.issue_createmeta_fieldtypes.assert_called_once_with(
            project="TEST", issue_type_id="10001"
        )

    def test_get_jira_field_ids_cached(self, fields_mixin: FieldsMixin):
        """Test get_field_ids_to_epic returns cached field IDs."""
        # Set up the cache
        fields_mixin._field_ids_cache = [
            {"id": "summary", "name": "Summary"},
            {"id": "description", "name": "Description"},
        ]

        # Call the method
        result = fields_mixin.get_field_ids_to_epic()

        # Verify the result
        assert result == {
            "Summary": "summary",
            "Description": "description",
        }

    def test_get_jira_field_ids_from_fields(
        self, fields_mixin: FieldsMixin, mock_fields: list[dict]
    ):
        """Test get_field_ids_to_epic extracts field IDs from field definitions."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)
        # Ensure field map is generated
        fields_mixin._generate_field_map(force_regenerate=True)

        # Call the method
        result = fields_mixin.get_field_ids_to_epic()

        # Verify that epic-specific fields are properly identified
        assert "epic_link" in result
        assert "Epic Link" in result
        assert result["epic_link"] == "customfield_10010"
        assert "epic_name" in result
        assert "Epic Name" in result
        assert result["epic_name"] == "customfield_10011"

    def test_get_jira_field_ids_error(self, fields_mixin: FieldsMixin):
        """Test get_field_ids_to_epic handles errors gracefully."""
        # Ensure no cache exists
        fields_mixin._field_ids_cache = None

        # Make get_fields raise an exception
        fields_mixin.get_fields = MagicMock(
            side_effect=Exception("Error getting fields")
        )

        # Call the method
        result = fields_mixin.get_field_ids_to_epic()

        # Verify the result
        assert result == {}

    def test_is_custom_field(self, fields_mixin: FieldsMixin):
        """Test is_custom_field correctly identifies custom fields."""
        # Test with custom field
        assert fields_mixin.is_custom_field("customfield_10010") is True

        # Test with standard field
        assert fields_mixin.is_custom_field("summary") is False

    def test_format_field_value_user_field(
        self, fields_mixin: FieldsMixin, mock_fields
    ):
        """Test format_field_value formats user fields correctly."""
        # Set up the mocks
        fields_mixin.get_field_by_id = MagicMock(
            return_value=mock_fields[3]
        )  # The Assignee field
        fields_mixin._get_account_id = MagicMock(return_value="account123")

        # Call the method with a user field and string value
        result = fields_mixin.format_field_value("assignee", "johndoe")

        # Verify the result
        assert result == {"accountId": "account123"}
        fields_mixin._get_account_id.assert_called_once_with("johndoe")

    # FIXME: The test covers impossible case.
    #
    # This test is failing because it assumes that the `_get_account_id`
    # method is unavailable. As default, `format_field_value` will return
    # `{"name": value}` for server/DC.
    #
    # However, in any case `JiraFetcher` always inherits from `UsersMixin`
    # and will therefore have the `_get_account_id` method available.
    #
    # That is to say, the `format_field_value` method will never return in
    # format `{"name": value}`.
    #
    # Further fixes are needed in the `FieldsMixin` class to support the case
    # for server/DC.
    #
    # See also:
    # https://github.com/sooperset/mcp-atlassian/blob/651c271e8aa76b469e9c67535669d93267ad5da6/src/mcp_atlassian/jira/fields.py#L279-L297

    # def test_format_field_value_user_field_no_account_id(
    #     self, fields_mixin: FieldsMixin, mock_fields
    # ):
    #     """Test format_field_value handles user fields without _get_account_id."""
    #     # Set up the mocks
    #     fields_mixin.get_field_by_id = MagicMock(
    #         return_value=mock_fields[3]
    #     )  # The Assignee field

    #     # Call the method with a user field and string value
    #     result = fields_mixin.format_field_value("assignee", "johndoe")

    #     # Verify the result - should use name for server/DC
    #     assert result == {"name": "johndoe"}

    def test_format_field_value_array_field(self, fields_mixin: FieldsMixin):
        """Test format_field_value formats array fields correctly."""
        # Set up the mocks
        mock_array_field = {
            "id": "labels",
            "name": "Labels",
            "schema": {"type": "array"},
        }
        fields_mixin.get_field_by_id = MagicMock(return_value=mock_array_field)

        # Test with single value (should convert to list)
        result = fields_mixin.format_field_value("labels", "bug")
        assert result == ["bug"]

        # Test with list value (should keep as list)
        result = fields_mixin.format_field_value("labels", ["bug", "feature"])
        assert result == ["bug", "feature"]

    def test_format_field_value_option_field(self, fields_mixin: FieldsMixin):
        """Test format_field_value formats option fields correctly."""
        # Set up the mocks
        mock_option_field = {
            "id": "priority",
            "name": "Priority",
            "schema": {"type": "option"},
        }
        fields_mixin.get_field_by_id = MagicMock(return_value=mock_option_field)

        # Test with string value
        result = fields_mixin.format_field_value("priority", "High")
        assert result == {"value": "High"}

        # Test with already formatted value
        already_formatted = {"value": "Medium"}
        result = fields_mixin.format_field_value("priority", already_formatted)
        assert result == already_formatted

    def test_format_field_value_unknown_field(self, fields_mixin: FieldsMixin):
        """Test format_field_value returns value as-is for unknown fields."""
        # Set up the mocks
        fields_mixin.get_field_by_id = MagicMock(return_value=None)

        # Call the method with unknown field
        test_value = "test value"
        result = fields_mixin.format_field_value("unknown", test_value)

        # Verify the value is returned as-is
        assert result == test_value

    def test_search_fields_empty_keyword(self, fields_mixin: FieldsMixin, mock_fields):
        """Test search_fields returns first N fields when keyword is empty."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Call with empty keyword and limit=3
        result = fields_mixin.search_fields("", limit=3)

        # Verify first 3 fields are returned
        assert len(result) == 3
        assert result == mock_fields[:3]

    def test_search_fields_exact_match(self, fields_mixin: FieldsMixin, mock_fields):
        """Test search_fields finds exact matches with high relevance."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Search for "Story Points"
        result = fields_mixin.search_fields("Story Points")

        # Verify Story Points field is first result
        assert len(result) > 0
        assert result[0]["name"] == "Story Points"
        assert result[0]["id"] == "customfield_10012"

    def test_search_fields_partial_match(self, fields_mixin: FieldsMixin, mock_fields):
        """Test search_fields finds partial matches."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Search for "Epic"
        result = fields_mixin.search_fields("Epic")

        # Verify Epic-related fields are in results
        epic_fields = [field["name"] for field in result[:2]]  # Top 2 results
        assert "Epic Link" in epic_fields
        assert "Epic Name" in epic_fields

    def test_search_fields_case_insensitive(
        self, fields_mixin: FieldsMixin, mock_fields
    ):
        """Test search_fields is case insensitive."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Search with different cases
        result_lower = fields_mixin.search_fields("story points")
        result_upper = fields_mixin.search_fields("STORY POINTS")
        result_mixed = fields_mixin.search_fields("Story Points")

        # Verify all searches find the same field
        assert len(result_lower) > 0
        assert len(result_upper) > 0
        assert len(result_mixed) > 0
        assert result_lower[0]["id"] == result_upper[0]["id"] == result_mixed[0]["id"]
        assert result_lower[0]["name"] == "Story Points"

    def test_search_fields_with_limit(self, fields_mixin: FieldsMixin, mock_fields):
        """Test search_fields respects the limit parameter."""
        # Set up the fields
        fields_mixin.get_fields = MagicMock(return_value=mock_fields)

        # Search with limit=2
        result = fields_mixin.search_fields("field", limit=2)

        # Verify only 2 results are returned
        assert len(result) == 2

    def test_search_fields_error(self, fields_mixin: FieldsMixin):
        """Test search_fields handles errors gracefully."""
        # Make get_fields raise an exception
        fields_mixin.get_fields = MagicMock(
            side_effect=Exception("Error getting fields")
        )

        # Call the method
        result = fields_mixin.search_fields("test")

        # Verify empty list is returned on error
        assert result == []

```
Page 5/10FirstPrevNextLast