#
tokens: 41440/50000 5/194 files (page 8/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 8 of 10. Use http://codebase.md/sooperset/mcp-atlassian?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/unit/utils/test_oauth.py:
--------------------------------------------------------------------------------

```python
"""Tests for the OAuth utilities."""

import json
import time
import urllib.parse
from unittest.mock import MagicMock, patch

import requests

from mcp_atlassian.utils.oauth import (
    KEYRING_SERVICE_NAME,
    TOKEN_EXPIRY_MARGIN,
    BYOAccessTokenOAuthConfig,
    OAuthConfig,
    configure_oauth_session,
    get_oauth_config_from_env,
)


class TestOAuthConfig:
    """Tests for the OAuthConfig class."""

    def test_init_with_required_params(self):
        """Test initialization with required parameters."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        assert config.client_id == "test-client-id"
        assert config.client_secret == "test-client-secret"
        assert config.redirect_uri == "https://example.com/callback"
        assert config.scope == "read:jira-work write:jira-work"
        assert config.cloud_id is None
        assert config.refresh_token is None
        assert config.access_token is None
        assert config.expires_at is None

    def test_init_with_all_params(self):
        """Test initialization with all parameters."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            cloud_id="test-cloud-id",
            refresh_token="test-refresh-token",
            access_token="test-access-token",
            expires_at=time.time() + 3600,
        )
        assert config.client_id == "test-client-id"
        assert config.cloud_id == "test-cloud-id"
        assert config.access_token == "test-access-token"
        assert config.refresh_token == "test-refresh-token"
        assert config.expires_at is not None

    def test_is_token_expired_no_token(self):
        """Test is_token_expired when no token is set."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        assert config.is_token_expired is True

    def test_is_token_expired_token_expired(self):
        """Test is_token_expired when token is expired."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            access_token="test-access-token",
            expires_at=time.time() - 100,  # Expired 100 seconds ago
        )
        assert config.is_token_expired is True

    def test_is_token_expired_token_expiring_soon(self):
        """Test is_token_expired when token expires soon."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            access_token="test-access-token",
            expires_at=time.time() + (TOKEN_EXPIRY_MARGIN - 10),  # Expires soon
        )
        assert config.is_token_expired is True

    def test_is_token_expired_token_valid(self):
        """Test is_token_expired when token is valid."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            access_token="test-access-token",
            expires_at=time.time() + 3600,  # Expires in 1 hour
        )
        assert config.is_token_expired is False

    def test_get_authorization_url(self):
        """Test get_authorization_url method."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        url = config.get_authorization_url(state="test-state")

        # Parse the URL to check parameters properly
        parsed_url = urllib.parse.urlparse(url)
        query_params = urllib.parse.parse_qs(parsed_url.query)

        assert (
            parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path
            == "https://auth.atlassian.com/authorize"
        )
        assert query_params["client_id"] == ["test-client-id"]
        assert query_params["scope"] == ["read:jira-work write:jira-work"]
        assert query_params["redirect_uri"] == ["https://example.com/callback"]
        assert query_params["response_type"] == ["code"]
        assert query_params["state"] == ["test-state"]

    @patch("requests.post")
    def test_exchange_code_for_tokens_success(self, mock_post):
        """Test successful exchange_code_for_tokens."""
        # Mock response
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "access_token": "new-access-token",
            "refresh_token": "new-refresh-token",
            "expires_in": 3600,
        }
        mock_post.return_value = mock_response

        # Mock cloud ID retrieval and token saving
        with patch.object(OAuthConfig, "_get_cloud_id") as mock_get_cloud_id:
            with patch.object(OAuthConfig, "_save_tokens") as mock_save_tokens:
                config = OAuthConfig(
                    client_id="test-client-id",
                    client_secret="test-client-secret",
                    redirect_uri="https://example.com/callback",
                    scope="read:jira-work write:jira-work",
                )
                result = config.exchange_code_for_tokens("test-code")

                # Check result
                assert result is True
                assert config.access_token == "new-access-token"
                assert config.refresh_token == "new-refresh-token"
                assert config.expires_at is not None

                # Verify calls
                mock_post.assert_called_once()
                mock_get_cloud_id.assert_called_once()
                mock_save_tokens.assert_called_once()

    @patch("requests.post")
    def test_exchange_code_for_tokens_failure(self, mock_post):
        """Test failed exchange_code_for_tokens."""
        mock_post.side_effect = Exception("API error")

        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        result = config.exchange_code_for_tokens("test-code")

        # Check result
        assert result is False
        assert config.access_token is None
        assert config.refresh_token is None

    @patch("requests.post")
    def test_refresh_access_token_success(self, mock_post):
        """Test successful refresh_access_token."""
        # Mock response
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = {
            "access_token": "new-access-token",
            "refresh_token": "new-refresh-token",
            "expires_in": 3600,
        }
        mock_post.return_value = mock_response

        with patch.object(OAuthConfig, "_save_tokens") as mock_save_tokens:
            config = OAuthConfig(
                client_id="test-client-id",
                client_secret="test-client-secret",
                redirect_uri="https://example.com/callback",
                scope="read:jira-work write:jira-work",
                refresh_token="old-refresh-token",
            )
            result = config.refresh_access_token()

            # Check result
            assert result is True
            assert config.access_token == "new-access-token"
            assert config.refresh_token == "new-refresh-token"
            assert config.expires_at is not None

            # Verify calls
            mock_post.assert_called_once()
            mock_save_tokens.assert_called_once()

    def test_refresh_access_token_no_refresh_token(self):
        """Test refresh_access_token with no refresh token."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        result = config.refresh_access_token()

        # Check result
        assert result is False

    @patch("requests.post")
    def test_ensure_valid_token_already_valid(self, mock_post):
        """Test ensure_valid_token when token is already valid."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            access_token="test-access-token",
            expires_at=time.time() + 3600,  # Expires in 1 hour
        )
        result = config.ensure_valid_token()

        # Check result
        assert result is True
        # Should not have tried to refresh the token
        mock_post.assert_not_called()

    @patch.object(OAuthConfig, "refresh_access_token")
    def test_ensure_valid_token_needs_refresh_success(self, mock_refresh):
        """Test ensure_valid_token when token needs refreshing (success case)."""
        mock_refresh.return_value = True

        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            refresh_token="test-refresh-token",
            access_token="test-access-token",
            expires_at=time.time() - 100,  # Expired 100 seconds ago
        )
        result = config.ensure_valid_token()

        # Check result
        assert result is True
        mock_refresh.assert_called_once()

    @patch.object(OAuthConfig, "refresh_access_token")
    def test_ensure_valid_token_needs_refresh_failure(self, mock_refresh):
        """Test ensure_valid_token when token needs refreshing (failure case)."""
        mock_refresh.return_value = False

        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            refresh_token="test-refresh-token",
            access_token="test-access-token",
            expires_at=time.time() - 100,  # Expired 100 seconds ago
        )
        result = config.ensure_valid_token()

        # Check result
        assert result is False
        mock_refresh.assert_called_once()

    @patch("requests.get")
    def test_get_cloud_id_success(self, mock_get):
        """Test _get_cloud_id success case."""
        # Mock response
        mock_response = MagicMock()
        mock_response.status_code = 200
        mock_response.json.return_value = [{"id": "test-cloud-id", "name": "Test Site"}]
        mock_get.return_value = mock_response

        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            access_token="test-access-token",
        )
        config._get_cloud_id()

        # Check result
        assert config.cloud_id == "test-cloud-id"
        mock_get.assert_called_once()
        headers = mock_get.call_args[1]["headers"]
        assert headers["Authorization"] == "Bearer test-access-token"

    @patch("requests.get")
    def test_get_cloud_id_no_access_token(self, mock_get):
        """Test _get_cloud_id with no access token."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        config._get_cloud_id()

        # Should not make API call without token
        mock_get.assert_not_called()
        assert config.cloud_id is None

    def test_get_keyring_username(self):
        """Test _get_keyring_username method."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
        )
        username = config._get_keyring_username()

        # Check the keyring username format
        assert username == "oauth-test-client-id"

    @patch("keyring.set_password")
    @patch.object(OAuthConfig, "_save_tokens_to_file")
    def test_save_tokens_keyring_success(self, mock_save_to_file, mock_set_password):
        """Test _save_tokens with successful keyring storage."""
        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            cloud_id="test-cloud-id",
            refresh_token="test-refresh-token",
            access_token="test-access-token",
            expires_at=1234567890,
        )
        config._save_tokens()

        # Verify keyring was used
        mock_set_password.assert_called_once()
        service_name = mock_set_password.call_args[0][0]
        username = mock_set_password.call_args[0][1]
        token_json = mock_set_password.call_args[0][2]

        assert service_name == KEYRING_SERVICE_NAME
        assert username == "oauth-test-client-id"
        assert "test-refresh-token" in token_json
        assert "test-access-token" in token_json

        # Verify file backup was created
        mock_save_to_file.assert_called_once()

    @patch("keyring.set_password")
    @patch.object(OAuthConfig, "_save_tokens_to_file")
    def test_save_tokens_keyring_failure(self, mock_save_to_file, mock_set_password):
        """Test _save_tokens with keyring failure fallback."""
        # Make keyring fail
        mock_set_password.side_effect = Exception("Keyring error")

        config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            cloud_id="test-cloud-id",
            refresh_token="test-refresh-token",
            access_token="test-access-token",
            expires_at=1234567890,
        )
        config._save_tokens()

        # Verify keyring was attempted
        mock_set_password.assert_called_once()

        # Verify fallback to file was used
        mock_save_to_file.assert_called_once()

    @patch("pathlib.Path.mkdir")
    @patch("json.dump")
    def test_save_tokens_to_file(self, mock_dump, mock_mkdir):
        """Test _save_tokens_to_file method."""
        # Mock open
        mock_open = MagicMock()
        with patch("builtins.open", mock_open):
            config = OAuthConfig(
                client_id="test-client-id",
                client_secret="test-client-secret",
                redirect_uri="https://example.com/callback",
                scope="read:jira-work write:jira-work",
                cloud_id="test-cloud-id",
                refresh_token="test-refresh-token",
                access_token="test-access-token",
                expires_at=1234567890,
            )
            config._save_tokens_to_file()

            # Should create directory and save tokens
            mock_mkdir.assert_called_once()
            mock_open.assert_called_once()
            mock_dump.assert_called_once()

            # Check saved data
            saved_data = mock_dump.call_args[0][0]
            assert saved_data["refresh_token"] == "test-refresh-token"
            assert saved_data["access_token"] == "test-access-token"
            assert saved_data["expires_at"] == 1234567890
            assert saved_data["cloud_id"] == "test-cloud-id"

    @patch("keyring.get_password")
    @patch.object(OAuthConfig, "_load_tokens_from_file")
    def test_load_tokens_keyring_success(self, mock_load_from_file, mock_get_password):
        """Test load_tokens with successful keyring retrieval."""
        # Setup keyring to return token data
        token_data = {
            "refresh_token": "keyring-refresh-token",
            "access_token": "keyring-access-token",
            "expires_at": 1234567890,
            "cloud_id": "keyring-cloud-id",
        }
        mock_get_password.return_value = json.dumps(token_data)

        result = OAuthConfig.load_tokens("test-client-id")

        # Should have used keyring
        mock_get_password.assert_called_once_with(
            KEYRING_SERVICE_NAME, "oauth-test-client-id"
        )

        # Should not fall back to file
        mock_load_from_file.assert_not_called()

        # Check result contains keyring data
        assert result["refresh_token"] == "keyring-refresh-token"
        assert result["access_token"] == "keyring-access-token"
        assert result["expires_at"] == 1234567890
        assert result["cloud_id"] == "keyring-cloud-id"

    @patch("keyring.get_password")
    @patch.object(OAuthConfig, "_load_tokens_from_file")
    def test_load_tokens_keyring_failure(self, mock_load_from_file, mock_get_password):
        """Test load_tokens with keyring failure fallback."""
        # Make keyring fail
        mock_get_password.side_effect = Exception("Keyring error")

        # Setup file fallback to return token data
        file_token_data = {
            "refresh_token": "file-refresh-token",
            "access_token": "file-access-token",
            "expires_at": 9876543210,
            "cloud_id": "file-cloud-id",
        }
        mock_load_from_file.return_value = file_token_data

        result = OAuthConfig.load_tokens("test-client-id")

        # Should have tried keyring
        mock_get_password.assert_called_once()

        # Should have fallen back to file
        mock_load_from_file.assert_called_once_with("test-client-id")

        # Check result contains file data
        assert result["refresh_token"] == "file-refresh-token"
        assert result["access_token"] == "file-access-token"
        assert result["expires_at"] == 9876543210
        assert result["cloud_id"] == "file-cloud-id"

    @patch("keyring.get_password")
    @patch.object(OAuthConfig, "_load_tokens_from_file")
    def test_load_tokens_keyring_empty(self, mock_load_from_file, mock_get_password):
        """Test load_tokens with empty keyring result."""
        # Setup keyring to return None (no saved token)
        mock_get_password.return_value = None

        # Setup file fallback to return token data
        file_token_data = {
            "refresh_token": "file-refresh-token",
            "access_token": "file-access-token",
            "expires_at": 9876543210,
        }
        mock_load_from_file.return_value = file_token_data

        result = OAuthConfig.load_tokens("test-client-id")

        # Should have tried keyring
        mock_get_password.assert_called_once()

        # Should have fallen back to file
        mock_load_from_file.assert_called_once_with("test-client-id")

        # Check result contains file data
        assert result["refresh_token"] == "file-refresh-token"
        assert result["access_token"] == "file-access-token"
        assert result["expires_at"] == 9876543210

    @patch("pathlib.Path.exists")
    @patch("json.load")
    def test_load_tokens_from_file_success(self, mock_load, mock_exists):
        """Test _load_tokens_from_file success case."""
        mock_exists.return_value = True
        mock_load.return_value = {
            "refresh_token": "test-refresh-token",
            "access_token": "test-access-token",
            "expires_at": 1234567890,
            "cloud_id": "test-cloud-id",
        }

        # Mock open
        mock_open = MagicMock()
        with patch("builtins.open", mock_open):
            result = OAuthConfig._load_tokens_from_file("test-client-id")

            # Check result
            assert result["refresh_token"] == "test-refresh-token"
            assert result["access_token"] == "test-access-token"
            assert result["expires_at"] == 1234567890
            assert result["cloud_id"] == "test-cloud-id"

    @patch("pathlib.Path.exists")
    def test_load_tokens_from_file_not_found(self, mock_exists):
        """Test _load_tokens_from_file when file doesn't exist."""
        mock_exists.return_value = False

        result = OAuthConfig._load_tokens_from_file("test-client-id")

        # Should return empty dict
        assert result == {}

    @patch("os.getenv")
    def test_from_env_success(self, mock_getenv):
        """Test from_env success case."""
        # Mock environment variables
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_CLIENT_ID": "env-client-id",
            "ATLASSIAN_OAUTH_CLIENT_SECRET": "env-client-secret",
            "ATLASSIAN_OAUTH_REDIRECT_URI": "https://example.com/callback",
            "ATLASSIAN_OAUTH_SCOPE": "read:jira-work",
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-cloud-id",
        }.get(key, default)

        # Mock token loading
        with patch.object(
            OAuthConfig,
            "load_tokens",
            return_value={
                "refresh_token": "loaded-refresh-token",
                "access_token": "loaded-access-token",
                "expires_at": 1234567890,
            },
        ):
            config = OAuthConfig.from_env()

            # Check result
            assert config is not None
            assert config.client_id == "env-client-id"
            assert config.client_secret == "env-client-secret"
            assert config.redirect_uri == "https://example.com/callback"
            assert config.scope == "read:jira-work"
            assert config.cloud_id == "env-cloud-id"
            assert config.refresh_token == "loaded-refresh-token"
            assert config.access_token == "loaded-access-token"
            assert config.expires_at == 1234567890

    @patch("os.getenv")
    def test_from_env_missing_required(self, mock_getenv):
        """Test from_env with missing required variables."""
        # Mock environment variables - missing some required ones
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_CLIENT_ID": "env-client-id",
            # Missing client secret
            "ATLASSIAN_OAUTH_REDIRECT_URI": "https://example.com/callback",
            # Missing scope
        }.get(key, default)

        config = OAuthConfig.from_env()

        # Should return None if required variables are missing
        assert config is None

    @patch("os.getenv")
    def test_from_env_minimal_oauth_enabled(self, mock_getenv):
        """Test from_env with minimal OAuth configuration (ATLASSIAN_OAUTH_ENABLE=true)."""
        # Mock environment variables - only ATLASSIAN_OAUTH_ENABLE is set
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_ENABLE": "true",
            "ATLASSIAN_OAUTH_CLOUD_ID": "cloud-id",  # Optional fallback
        }.get(key, default)

        config = OAuthConfig.from_env()

        # Should return minimal config when OAuth is enabled
        assert config is not None
        assert config.client_id == ""
        assert config.client_secret == ""
        assert config.redirect_uri == ""
        assert config.scope == ""
        assert config.cloud_id == "cloud-id"

    @patch("os.getenv")
    def test_from_env_minimal_oauth_disabled(self, mock_getenv):
        """Test from_env with minimal OAuth configuration disabled."""
        # Mock environment variables - ATLASSIAN_OAUTH_ENABLE is false
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_ENABLE": "false",
        }.get(key, default)

        config = OAuthConfig.from_env()

        # Should return None when OAuth is disabled
        assert config is None

    @patch("os.getenv")
    def test_from_env_full_oauth_takes_precedence(self, mock_getenv):
        """Test that full OAuth configuration takes precedence over minimal config."""
        # Mock environment variables - both full OAuth and ATLASSIAN_OAUTH_ENABLE
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_ENABLE": "true",
            "ATLASSIAN_OAUTH_CLIENT_ID": "full-client-id",
            "ATLASSIAN_OAUTH_CLIENT_SECRET": "full-client-secret",
            "ATLASSIAN_OAUTH_REDIRECT_URI": "https://example.com/callback",
            "ATLASSIAN_OAUTH_SCOPE": "read:jira-work",
            "ATLASSIAN_OAUTH_CLOUD_ID": "full-cloud-id",
        }.get(key, default)

        # Mock token loading
        with patch.object(OAuthConfig, "load_tokens", return_value={}):
            config = OAuthConfig.from_env()

            # Should return full config, not minimal
            assert config is not None
            assert config.client_id == "full-client-id"
            assert config.client_secret == "full-client-secret"
            assert config.redirect_uri == "https://example.com/callback"
            assert config.scope == "read:jira-work"
            assert config.cloud_id == "full-cloud-id"


class TestBYOAccessTokenOAuthConfig:
    """Tests for the BYOAccessTokenOAuthConfig class."""

    def test_init_with_required_params(self):
        """Test initialization with required parameters."""
        config = BYOAccessTokenOAuthConfig(
            cloud_id="byo-cloud-id", access_token="byo-access-token"
        )
        assert config.cloud_id == "byo-cloud-id"
        assert config.access_token == "byo-access-token"
        assert config.refresh_token is None
        assert config.expires_at is None

    @patch("os.getenv")
    def test_from_env_success(self, mock_getenv):
        """Test from_env success for BYOAccessTokenOAuthConfig."""
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-byo-cloud-id",
            "ATLASSIAN_OAUTH_ACCESS_TOKEN": "env-byo-access-token",
        }.get(key, default)

        config = BYOAccessTokenOAuthConfig.from_env()

        assert config is not None
        assert config.cloud_id == "env-byo-cloud-id"
        assert config.access_token == "env-byo-access-token"
        mock_getenv.assert_any_call("ATLASSIAN_OAUTH_CLOUD_ID")
        mock_getenv.assert_any_call("ATLASSIAN_OAUTH_ACCESS_TOKEN")

    @patch("os.getenv")
    def test_from_env_missing_cloud_id(self, mock_getenv):
        """Test from_env with missing cloud_id for BYOAccessTokenOAuthConfig."""
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_ACCESS_TOKEN": "env-byo-access-token",
        }.get(key, default)

        config = BYOAccessTokenOAuthConfig.from_env()
        assert config is None

    @patch("os.getenv")
    def test_from_env_missing_access_token(self, mock_getenv):
        """Test from_env with missing access_token for BYOAccessTokenOAuthConfig."""
        mock_getenv.side_effect = lambda key, default=None: {
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-byo-cloud-id",
        }.get(key, default)

        config = BYOAccessTokenOAuthConfig.from_env()
        assert config is None

    @patch("os.getenv")
    def test_from_env_missing_both(self, mock_getenv):
        """Test from_env with both missing for BYOAccessTokenOAuthConfig."""
        mock_getenv.return_value = None  # Covers all calls returning None
        config = BYOAccessTokenOAuthConfig.from_env()
        assert config is None


@patch("mcp_atlassian.utils.oauth.BYOAccessTokenOAuthConfig.from_env")
@patch("mcp_atlassian.utils.oauth.OAuthConfig.from_env")
def test_get_oauth_config_prefers_byo_when_both_present(
    mock_oauth_from_env, mock_byo_from_env
):
    """Test get_oauth_config_from_env prefers BYOAccessTokenOAuthConfig when both are configured."""
    mock_byo_config = MagicMock(spec=BYOAccessTokenOAuthConfig)
    mock_byo_from_env.return_value = mock_byo_config
    mock_oauth_config = MagicMock(spec=OAuthConfig)
    mock_oauth_from_env.return_value = mock_oauth_config  # This shouldn't be returned

    result = get_oauth_config_from_env()
    assert result == mock_byo_config
    mock_byo_from_env.assert_called_once()
    mock_oauth_from_env.assert_not_called()  # Standard OAuth should not be called if BYO is found


@patch("mcp_atlassian.utils.oauth.BYOAccessTokenOAuthConfig.from_env")
@patch("mcp_atlassian.utils.oauth.OAuthConfig.from_env")
def test_get_oauth_config_falls_back_to_standard_oauth_config(
    mock_oauth_from_env, mock_byo_from_env
):
    """Test get_oauth_config_from_env falls back to OAuthConfig if BYO is not configured."""
    mock_byo_from_env.return_value = None  # BYO not configured
    mock_oauth_config = MagicMock(spec=OAuthConfig)
    mock_oauth_from_env.return_value = mock_oauth_config

    result = get_oauth_config_from_env()
    assert result == mock_oauth_config  # Should be standard OAuth
    mock_byo_from_env.assert_called_once()
    mock_oauth_from_env.assert_called_once()


@patch("mcp_atlassian.utils.oauth.BYOAccessTokenOAuthConfig.from_env")
@patch("mcp_atlassian.utils.oauth.OAuthConfig.from_env")
def test_get_oauth_config_returns_none_if_both_unavailable(
    mock_oauth_from_env, mock_byo_from_env
):
    """Test get_oauth_config_from_env returns None if neither is available."""
    mock_oauth_from_env.return_value = None
    mock_byo_from_env.return_value = None

    result = get_oauth_config_from_env()
    assert result is None
    mock_oauth_from_env.assert_called_once()
    mock_byo_from_env.assert_called_once()


def test_configure_oauth_session_success_with_oauth_config():
    """Test successful configure_oauth_session with OAuthConfig."""
    session = requests.Session()
    # Explicitly use OAuthConfig and mock its specific methods/attributes
    oauth_config = MagicMock(spec=OAuthConfig)
    oauth_config.access_token = "test-access-token"
    oauth_config.refresh_token = "test-refresh-token"  # Crucial for this path
    oauth_config.ensure_valid_token.return_value = True

    result = configure_oauth_session(session, oauth_config)

    assert result is True
    assert session.headers["Authorization"] == "Bearer test-access-token"
    oauth_config.ensure_valid_token.assert_called_once()


def test_configure_oauth_session_failure_with_oauth_config():
    """Test failed configure_oauth_session with OAuthConfig (token refresh fails)."""
    session = requests.Session()
    oauth_config = MagicMock(spec=OAuthConfig)
    oauth_config.access_token = None  # Start with no access token initially
    oauth_config.refresh_token = "test-refresh-token"  # Has a refresh token
    oauth_config.ensure_valid_token.return_value = False  # Refresh fails

    result = configure_oauth_session(session, oauth_config)

    assert result is False
    assert "Authorization" not in session.headers
    oauth_config.ensure_valid_token.assert_called_once()


def test_configure_oauth_session_success_with_byo_config():
    """Test successful configure_oauth_session with BYOAccessTokenOAuthConfig."""
    session = requests.Session()
    byo_config = BYOAccessTokenOAuthConfig(
        cloud_id="byo-cloud-id", access_token="byo-valid-token"
    )
    # Ensure ensure_valid_token is not called on BYOAccessTokenOAuthConfig if it were a MagicMock
    # by not creating it as a MagicMock or by not setting ensure_valid_token if it were.

    result = configure_oauth_session(session, byo_config)

    assert result is True
    assert session.headers["Authorization"] == "Bearer byo-valid-token"


@patch("mcp_atlassian.utils.oauth.logger")
def test_configure_oauth_session_byo_config_empty_token_logs_error(mock_logger):
    """Test configure_oauth_session with BYO config and empty token logs error."""
    session = requests.Session()
    # BYO config with an effectively invalid (empty) access token
    byo_config = BYOAccessTokenOAuthConfig(cloud_id="byo-cloud-id", access_token="")

    result = configure_oauth_session(session, byo_config)

    assert result is False
    assert "Authorization" not in session.headers
    mock_logger.error.assert_called_once_with(
        "configure_oauth_session: oauth access token configuration provided as empty string."
    )


@patch("mcp_atlassian.utils.oauth.logger")
def test_configure_oauth_session_byo_config_no_refresh_token_direct_use(mock_logger):
    """Test BYO config (with access_token, no refresh_token) uses token directly."""
    session = requests.Session()
    oauth_config = BYOAccessTokenOAuthConfig(
        cloud_id="test_cloud_id", access_token="my_access_token"
    )

    # We don't need to mock ensure_valid_token because it shouldn't be called.
    # The actual BYOAccessTokenOAuthConfig instance does not have this method.

    result = configure_oauth_session(session, oauth_config)

    assert result is True
    assert session.headers["Authorization"] == "Bearer my_access_token"
    # Check that the specific log message for direct use is present
    mock_logger.info.assert_any_call(
        "configure_oauth_session: Using provided OAuth access token directly (no refresh_token)."
    )

```

--------------------------------------------------------------------------------
/tests/integration/test_mcp_protocol.py:
--------------------------------------------------------------------------------

```python
"""Comprehensive MCP protocol integration tests for AtlassianMCP server."""

import json
import logging
import os
from unittest.mock import MagicMock, patch

import pytest
from fastmcp import Context
from fastmcp.tools import Tool as FastMCPTool
from mcp.types import Tool as MCPTool
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.responses import JSONResponse
from starlette.testclient import TestClient

from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.servers.context import MainAppContext
from mcp_atlassian.servers.main import (
    AtlassianMCP,
    UserTokenMiddleware,
    health_check,
    main_lifespan,
)
from tests.utils.factories import (
    ConfluencePageFactory,
    JiraIssueFactory,
)
from tests.utils.mocks import MockEnvironment, MockFastMCP

logger = logging.getLogger(__name__)


@pytest.mark.integration
@pytest.mark.anyio
class TestMCPProtocolIntegration:
    """Test suite for MCP protocol integration with AtlassianMCP server."""

    @pytest.fixture
    async def mock_jira_config(self):
        """Create a mock Jira configuration."""
        config = MagicMock(spec=JiraConfig)
        config.is_auth_configured.return_value = True
        config.url = "https://test.atlassian.net"
        config.auth_type = "oauth"
        return config

    @pytest.fixture
    async def mock_confluence_config(self):
        """Create a mock Confluence configuration."""
        config = MagicMock(spec=ConfluenceConfig)
        config.is_auth_configured.return_value = True
        config.url = "https://test.atlassian.net/wiki"
        config.auth_type = "oauth"
        return config

    @pytest.fixture
    async def mock_jira_fetcher(self):
        """Create a mock Jira fetcher."""
        fetcher = MagicMock(spec=JiraFetcher)
        fetcher.get_issue.return_value = JiraIssueFactory.create()
        fetcher.search_issues.return_value = {
            "issues": [
                JiraIssueFactory.create("TEST-1"),
                JiraIssueFactory.create("TEST-2"),
            ],
            "total": 2,
        }
        return fetcher

    @pytest.fixture
    async def mock_confluence_fetcher(self):
        """Create a mock Confluence fetcher."""
        fetcher = MagicMock(spec=ConfluenceFetcher)
        fetcher.get_page.return_value = ConfluencePageFactory.create()
        fetcher.search_pages.return_value = {
            "results": [
                ConfluencePageFactory.create("123"),
                ConfluencePageFactory.create("456"),
            ],
            "size": 2,
        }
        return fetcher

    @pytest.fixture
    async def atlassian_mcp_server(self):
        """Create an AtlassianMCP server instance for testing."""
        server = AtlassianMCP(name="Test Atlassian MCP", lifespan=main_lifespan)
        # Mount sub-servers (they're already mounted in the actual server)
        return server

    async def test_tool_discovery_with_full_configuration(
        self, atlassian_mcp_server, mock_jira_config, mock_confluence_config
    ):
        """Test tool discovery when both Jira and Confluence are fully configured."""
        with MockEnvironment.basic_auth_env():
            # Mock the configuration loading
            with (
                patch(
                    "mcp_atlassian.jira.config.JiraConfig.from_env",
                    return_value=mock_jira_config,
                ),
                patch(
                    "mcp_atlassian.confluence.config.ConfluenceConfig.from_env",
                    return_value=mock_confluence_config,
                ),
            ):
                # Create app context
                app_context = MainAppContext(
                    full_jira_config=mock_jira_config,
                    full_confluence_config=mock_confluence_config,
                    read_only=False,
                    enabled_tools=None,
                )

                # Mock request context
                request_context = MagicMock()
                request_context.lifespan_context = {"app_lifespan_context": app_context}

                # Set up server context
                atlassian_mcp_server._mcp_server = MagicMock()
                atlassian_mcp_server._mcp_server.request_context = request_context

                # Mock get_tools to return sample tools
                async def mock_get_tools():
                    tools = {}
                    # Add sample Jira tools
                    for tool_name in [
                        "jira_get_issue",
                        "jira_create_issue",
                        "jira_search_issues",
                    ]:
                        tool = MagicMock(spec=FastMCPTool)
                        tool.tags = (
                            {"jira", "read"}
                            if "get" in tool_name or "search" in tool_name
                            else {"jira", "write"}
                        )
                        tool.to_mcp_tool.return_value = MCPTool(
                            name=tool_name,
                            description=f"Tool {tool_name}",
                            inputSchema={"type": "object", "properties": {}},
                        )
                        tools[tool_name] = tool

                    # Add sample Confluence tools
                    for tool_name in ["confluence_get_page", "confluence_create_page"]:
                        tool = MagicMock(spec=FastMCPTool)
                        tool.tags = (
                            {"confluence", "read"}
                            if "get" in tool_name
                            else {"confluence", "write"}
                        )
                        tool.to_mcp_tool.return_value = MCPTool(
                            name=tool_name,
                            description=f"Tool {tool_name}",
                            inputSchema={"type": "object", "properties": {}},
                        )
                        tools[tool_name] = tool

                    return tools

                atlassian_mcp_server.get_tools = mock_get_tools

                # Get filtered tools
                tools = await atlassian_mcp_server._mcp_list_tools()

                # Assert all tools are available
                tool_names = [tool.name for tool in tools]
                assert "jira_get_issue" in tool_names
                assert "jira_create_issue" in tool_names
                assert "jira_search_issues" in tool_names
                assert "confluence_get_page" in tool_names
                assert "confluence_create_page" in tool_names
                assert len(tools) == 5

    async def test_tool_filtering_read_only_mode(
        self, atlassian_mcp_server, mock_jira_config, mock_confluence_config
    ):
        """Test tool filtering when read-only mode is enabled."""
        with MockEnvironment.basic_auth_env():
            # Create app context with read-only mode
            app_context = MainAppContext(
                full_jira_config=mock_jira_config,
                full_confluence_config=mock_confluence_config,
                read_only=True,  # Enable read-only mode
                enabled_tools=None,
            )

            # Mock request context
            request_context = MagicMock()
            request_context.lifespan_context = {"app_lifespan_context": app_context}

            # Set up server context
            atlassian_mcp_server._mcp_server = MagicMock()
            atlassian_mcp_server._mcp_server.request_context = request_context

            # Mock get_tools
            async def mock_get_tools():
                tools = {}
                # Add mix of read and write tools
                read_tools = [
                    "jira_get_issue",
                    "jira_search_issues",
                    "confluence_get_page",
                ]
                write_tools = [
                    "jira_create_issue",
                    "jira_update_issue",
                    "confluence_create_page",
                ]

                for tool_name in read_tools:
                    tool = MagicMock(spec=FastMCPTool)
                    tool.tags = (
                        {"jira", "read"}
                        if "jira" in tool_name
                        else {"confluence", "read"}
                    )
                    tool.to_mcp_tool.return_value = MCPTool(
                        name=tool_name,
                        description=f"Tool {tool_name}",
                        inputSchema={"type": "object", "properties": {}},
                    )
                    tools[tool_name] = tool

                for tool_name in write_tools:
                    tool = MagicMock(spec=FastMCPTool)
                    tool.tags = (
                        {"jira", "write"}
                        if "jira" in tool_name
                        else {"confluence", "write"}
                    )
                    tool.to_mcp_tool.return_value = MCPTool(
                        name=tool_name,
                        description=f"Tool {tool_name}",
                        inputSchema={"type": "object", "properties": {}},
                    )
                    tools[tool_name] = tool

                return tools

            atlassian_mcp_server.get_tools = mock_get_tools

            # Get filtered tools
            tools = await atlassian_mcp_server._mcp_list_tools()

            # Assert only read tools are available
            tool_names = [tool.name for tool in tools]
            assert "jira_get_issue" in tool_names
            assert "jira_search_issues" in tool_names
            assert "confluence_get_page" in tool_names
            assert "jira_create_issue" not in tool_names
            assert "jira_update_issue" not in tool_names
            assert "confluence_create_page" not in tool_names
            assert len(tools) == 3

    async def test_tool_filtering_with_enabled_tools(
        self, atlassian_mcp_server, mock_jira_config, mock_confluence_config
    ):
        """Test tool filtering with specific enabled tools list."""
        with MockEnvironment.basic_auth_env():
            # Create app context with specific enabled tools
            enabled_tools = ["jira_get_issue", "jira_search_issues"]
            app_context = MainAppContext(
                full_jira_config=mock_jira_config,
                full_confluence_config=mock_confluence_config,
                read_only=False,
                enabled_tools=enabled_tools,
            )

            # Mock request context
            request_context = MagicMock()
            request_context.lifespan_context = {"app_lifespan_context": app_context}

            # Set up server context
            atlassian_mcp_server._mcp_server = MagicMock()
            atlassian_mcp_server._mcp_server.request_context = request_context

            # Mock get_tools
            async def mock_get_tools():
                tools = {}
                all_tools = [
                    "jira_get_issue",
                    "jira_create_issue",
                    "jira_search_issues",
                    "confluence_get_page",
                    "confluence_create_page",
                ]

                for tool_name in all_tools:
                    tool = MagicMock(spec=FastMCPTool)
                    if "jira" in tool_name:
                        tool.tags = (
                            {"jira", "read"}
                            if "get" in tool_name or "search" in tool_name
                            else {"jira", "write"}
                        )
                    else:
                        tool.tags = (
                            {"confluence", "read"}
                            if "get" in tool_name
                            else {"confluence", "write"}
                        )
                    tool.to_mcp_tool.return_value = MCPTool(
                        name=tool_name,
                        description=f"Tool {tool_name}",
                        inputSchema={"type": "object", "properties": {}},
                    )
                    tools[tool_name] = tool

                return tools

            atlassian_mcp_server.get_tools = mock_get_tools

            # Get filtered tools
            tools = await atlassian_mcp_server._mcp_list_tools()

            # Assert only enabled tools are available
            tool_names = [tool.name for tool in tools]
            assert "jira_get_issue" in tool_names
            assert "jira_search_issues" in tool_names
            assert "jira_create_issue" not in tool_names
            assert "confluence_get_page" not in tool_names
            assert "confluence_create_page" not in tool_names
            assert len(tools) == 2

    async def test_tool_filtering_service_not_configured(self, atlassian_mcp_server):
        """Test tool filtering when services are not configured."""
        with MockEnvironment.clean_env():
            # Create app context with no configurations
            app_context = MainAppContext(
                full_jira_config=None,  # Jira not configured
                full_confluence_config=None,  # Confluence not configured
                read_only=False,
                enabled_tools=None,
            )

            # Mock request context
            request_context = MagicMock()
            request_context.lifespan_context = {"app_lifespan_context": app_context}

            # Set up server context
            atlassian_mcp_server._mcp_server = MagicMock()
            atlassian_mcp_server._mcp_server.request_context = request_context

            # Mock get_tools
            async def mock_get_tools():
                tools = {}
                all_tools = [
                    "jira_get_issue",
                    "jira_create_issue",
                    "confluence_get_page",
                    "confluence_create_page",
                ]

                for tool_name in all_tools:
                    tool = MagicMock(spec=FastMCPTool)
                    if "jira" in tool_name:
                        tool.tags = (
                            {"jira", "read"}
                            if "get" in tool_name
                            else {"jira", "write"}
                        )
                    else:
                        tool.tags = (
                            {"confluence", "read"}
                            if "get" in tool_name
                            else {"confluence", "write"}
                        )
                    tool.to_mcp_tool.return_value = MCPTool(
                        name=tool_name,
                        description=f"Tool {tool_name}",
                        inputSchema={"type": "object", "properties": {}},
                    )
                    tools[tool_name] = tool

                return tools

            atlassian_mcp_server.get_tools = mock_get_tools

            # Get filtered tools
            tools = await atlassian_mcp_server._mcp_list_tools()

            # Assert no tools are available when services not configured
            assert len(tools) == 0

    async def test_middleware_oauth_token_processing(self):
        """Test UserTokenMiddleware OAuth token extraction and processing."""
        # Create middleware instance
        app = MagicMock()
        mcp_server = MagicMock()
        # Mock the settings attribute with proper structure
        settings_mock = MagicMock()
        settings_mock.streamable_http_path = "/mcp"
        mcp_server.settings = settings_mock
        middleware = UserTokenMiddleware(app, mcp_server_ref=mcp_server)

        # Create mock request with OAuth Bearer token
        request = MockFastMCP.create_request()
        request.url.path = "/mcp"
        request.method = "POST"
        request.headers = {"Authorization": "Bearer test-oauth-token-12345"}

        # Mock call_next
        async def mock_call_next(req):
            # Verify request state was set
            assert hasattr(req.state, "user_atlassian_token")
            assert req.state.user_atlassian_token == "test-oauth-token-12345"
            assert req.state.user_atlassian_auth_type == "oauth"
            assert req.state.user_atlassian_email is None
            return JSONResponse({"status": "ok"})

        # Process request
        response = await middleware.dispatch(request, mock_call_next)

        # Verify response
        assert response.status_code == 200

    async def test_middleware_pat_token_processing(self):
        """Test UserTokenMiddleware PAT token extraction and processing."""
        # Create middleware instance
        app = MagicMock()
        mcp_server = MagicMock()
        # Mock the settings attribute with proper structure
        settings_mock = MagicMock()
        settings_mock.streamable_http_path = "/mcp"
        mcp_server.settings = settings_mock
        middleware = UserTokenMiddleware(app, mcp_server_ref=mcp_server)

        # Create mock request with PAT token
        request = MockFastMCP.create_request()
        request.url.path = "/mcp"
        request.method = "POST"
        request.headers = {"Authorization": "Token test-pat-token-67890"}

        # Mock call_next
        async def mock_call_next(req):
            # Verify request state was set
            assert hasattr(req.state, "user_atlassian_token")
            assert req.state.user_atlassian_token == "test-pat-token-67890"
            assert req.state.user_atlassian_auth_type == "pat"
            assert req.state.user_atlassian_email is None
            return JSONResponse({"status": "ok"})

        # Process request
        response = await middleware.dispatch(request, mock_call_next)

        # Verify response
        assert response.status_code == 200

    async def test_middleware_invalid_auth_header(self):
        """Test UserTokenMiddleware with invalid authorization header."""
        # Create middleware instance
        app = MagicMock()
        mcp_server = MagicMock()
        # Mock the settings attribute with proper structure
        settings_mock = MagicMock()
        settings_mock.streamable_http_path = "/mcp"
        mcp_server.settings = settings_mock
        middleware = UserTokenMiddleware(app, mcp_server_ref=mcp_server)

        # Create mock request with invalid auth header
        request = MockFastMCP.create_request()
        request.url.path = "/mcp"
        request.method = "POST"
        request.headers = {
            "Authorization": "Basic dXNlcjpwYXNz"
        }  # Basic auth not supported

        # Mock call_next (shouldn't be called)
        async def mock_call_next(req):
            pytest.fail("call_next should not be called for invalid auth")

        # Process request
        response = await middleware.dispatch(request, mock_call_next)

        # Verify error response
        assert response.status_code == 401
        body = json.loads(response.body)
        assert "error" in body
        assert (
            "Only 'Bearer <OAuthToken>' or 'Token <PAT>' types are supported"
            in body["error"]
        )

    async def test_middleware_empty_token(self):
        """Test UserTokenMiddleware with empty token."""
        # Create middleware instance
        app = MagicMock()
        mcp_server = MagicMock()
        # Mock the settings attribute with proper structure
        settings_mock = MagicMock()
        settings_mock.streamable_http_path = "/mcp"
        mcp_server.settings = settings_mock
        middleware = UserTokenMiddleware(app, mcp_server_ref=mcp_server)

        # Create mock request with empty Bearer token
        request = MockFastMCP.create_request()
        request.url.path = "/mcp"
        request.method = "POST"
        request.headers = {"Authorization": "Bearer "}

        # Mock call_next (shouldn't be called)
        async def mock_call_next(req):
            pytest.fail("call_next should not be called for empty token")

        # Process request
        response = await middleware.dispatch(request, mock_call_next)

        # Verify error response
        assert response.status_code == 401
        body = json.loads(response.body)
        assert "error" in body
        assert "Empty Bearer token" in body["error"]

    async def test_middleware_non_mcp_path(self):
        """Test UserTokenMiddleware bypasses non-MCP paths."""
        # Create middleware instance
        app = MagicMock()
        mcp_server = MagicMock()
        # Mock the settings attribute with proper structure
        settings_mock = MagicMock()
        settings_mock.streamable_http_path = "/mcp"
        mcp_server.settings = settings_mock
        middleware = UserTokenMiddleware(app, mcp_server_ref=mcp_server)

        # Create mock request for different path
        request = MockFastMCP.create_request()
        request.url.path = "/healthz"
        request.method = "GET"
        request.headers = {}

        # Ensure state doesn't have user_atlassian_token initially
        if hasattr(request.state, "user_atlassian_token"):
            delattr(request.state, "user_atlassian_token")

        # Mock call_next
        async def mock_call_next(req):
            # Should be called without modification
            # Check that user_atlassian_token was not added
            token_added = False
            try:
                _ = req.state.user_atlassian_token
                token_added = True
            except AttributeError:
                token_added = False
            assert not token_added, (
                "user_atlassian_token should not be set for non-MCP paths"
            )
            return JSONResponse({"status": "ok"})

        # Process request
        response = await middleware.dispatch(request, mock_call_next)

        # Verify response
        assert response.status_code == 200

    async def test_concurrent_tool_execution(
        self, atlassian_mcp_server, mock_jira_config, mock_confluence_config
    ):
        """Test concurrent execution of multiple tools."""
        with MockEnvironment.basic_auth_env():
            # Create app context
            app_context = MainAppContext(
                full_jira_config=mock_jira_config,
                full_confluence_config=mock_confluence_config,
                read_only=False,
                enabled_tools=None,
            )

            # Track execution order
            execution_order = []

            # Mock tool implementations
            import anyio

            async def mock_jira_get_issue(ctx: Context, issue_key: str):
                execution_order.append(f"jira_get_issue_{issue_key}_start")
                await anyio.sleep(0.1)  # Simulate API call
                execution_order.append(f"jira_get_issue_{issue_key}_end")
                return json.dumps({"key": issue_key, "summary": f"Issue {issue_key}"})

            async def mock_confluence_get_page(ctx: Context, page_id: str):
                execution_order.append(f"confluence_get_page_{page_id}_start")
                await anyio.sleep(0.05)  # Simulate API call (faster)
                execution_order.append(f"confluence_get_page_{page_id}_end")
                return json.dumps({"id": page_id, "title": f"Page {page_id}"})

            # Mock request context
            request_context = MagicMock()
            request_context.lifespan_context = {"app_lifespan_context": app_context}

            # Create context for tool execution
            mock_fastmcp = MagicMock()
            mock_fastmcp.request_context = request_context
            ctx = Context(fastmcp=mock_fastmcp)

            # Execute tools concurrently using anyio for backend compatibility

            # Execute tools concurrently
            async def run_all_tools():
                results = []
                async with anyio.create_task_group() as tg:
                    result_futures = []

                    async def run_and_store(coro, index):
                        result = await coro
                        result_futures.append((index, result))

                    tg.start_soon(run_and_store, mock_jira_get_issue(ctx, "TEST-1"), 0)
                    tg.start_soon(run_and_store, mock_jira_get_issue(ctx, "TEST-2"), 1)
                    tg.start_soon(
                        run_and_store, mock_confluence_get_page(ctx, "123"), 2
                    )
                    tg.start_soon(
                        run_and_store, mock_confluence_get_page(ctx, "456"), 3
                    )

                # Sort results by original index
                result_futures.sort(key=lambda x: x[0])
                return [r[1] for r in result_futures]

            results = await run_all_tools()

            # Verify results
            assert len(results) == 4
            assert json.loads(results[0])["key"] == "TEST-1"
            assert json.loads(results[1])["key"] == "TEST-2"
            assert json.loads(results[2])["id"] == "123"
            assert json.loads(results[3])["id"] == "456"

            # Verify concurrent execution (Confluence tasks should complete before Jira)
            assert execution_order.index(
                "confluence_get_page_123_end"
            ) < execution_order.index("jira_get_issue_TEST-1_end")
            assert execution_order.index(
                "confluence_get_page_456_end"
            ) < execution_order.index("jira_get_issue_TEST-2_end")

    async def test_error_propagation_through_middleware(self):
        """Test error propagation through the middleware chain."""
        # Create middleware instance
        app = MagicMock()
        mcp_server = MagicMock()
        # Mock the settings attribute with proper structure
        settings_mock = MagicMock()
        settings_mock.streamable_http_path = "/mcp"
        mcp_server.settings = settings_mock
        middleware = UserTokenMiddleware(app, mcp_server_ref=mcp_server)

        # Create mock request
        request = MockFastMCP.create_request()
        request.url.path = "/mcp"
        request.method = "POST"
        request.headers = {"Authorization": "Bearer valid-token"}

        # Mock call_next to raise an exception
        async def mock_call_next(req):
            raise ValueError("Test error from downstream")

        # Process request
        with pytest.raises(ValueError) as exc_info:
            await middleware.dispatch(request, mock_call_next)

        # Verify error propagated
        assert str(exc_info.value) == "Test error from downstream"

    async def test_lifespan_context_initialization(self):
        """Test lifespan context initialization with various configurations."""
        # Test with full configuration
        with MockEnvironment.basic_auth_env():
            with (
                patch(
                    "mcp_atlassian.jira.config.JiraConfig.from_env"
                ) as mock_jira_config,
                patch(
                    "mcp_atlassian.confluence.config.ConfluenceConfig.from_env"
                ) as mock_conf_config,
            ):
                # Configure mocks
                jira_config = MagicMock()
                jira_config.is_auth_configured.return_value = True
                mock_jira_config.return_value = jira_config

                conf_config = MagicMock()
                conf_config.is_auth_configured.return_value = True
                mock_conf_config.return_value = conf_config

                # Run lifespan
                app = MagicMock()
                async with main_lifespan(app) as context:
                    app_context = context["app_lifespan_context"]
                    assert app_context.full_jira_config == jira_config
                    assert app_context.full_confluence_config == conf_config
                    assert app_context.read_only is False
                    assert app_context.enabled_tools is None

    async def test_lifespan_with_partial_configuration(self):
        """Test lifespan with only Jira configured."""
        env_vars = {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_USERNAME": "[email protected]",
            "JIRA_API_TOKEN": "test-token",
        }

        with patch.dict(os.environ, env_vars, clear=False):
            with (
                patch(
                    "mcp_atlassian.jira.config.JiraConfig.from_env"
                ) as mock_jira_config,
                patch(
                    "mcp_atlassian.confluence.config.ConfluenceConfig.from_env"
                ) as mock_conf_config,
            ):
                # Configure mocks
                jira_config = MagicMock()
                jira_config.is_auth_configured.return_value = True
                mock_jira_config.return_value = jira_config

                # Confluence not configured
                mock_conf_config.side_effect = Exception("No Confluence config")

                # Run lifespan
                app = MagicMock()
                async with main_lifespan(app) as context:
                    app_context = context["app_lifespan_context"]
                    assert app_context.full_jira_config == jira_config
                    assert app_context.full_confluence_config is None

    async def test_lifespan_with_read_only_mode(self):
        """Test lifespan with read-only mode enabled."""
        with MockEnvironment.basic_auth_env():
            with patch.dict(os.environ, {"READ_ONLY_MODE": "true"}):
                with patch(
                    "mcp_atlassian.jira.config.JiraConfig.from_env"
                ) as mock_jira_config:
                    # Configure mock
                    jira_config = MagicMock()
                    jira_config.is_auth_configured.return_value = True
                    mock_jira_config.return_value = jira_config

                    # Run lifespan
                    app = MagicMock()
                    async with main_lifespan(app) as context:
                        app_context = context["app_lifespan_context"]
                        assert app_context.read_only is True

    async def test_lifespan_with_enabled_tools(self):
        """Test lifespan with specific enabled tools."""
        with MockEnvironment.basic_auth_env():
            with patch.dict(
                os.environ,
                {
                    "ENABLED_TOOLS": "jira_get_issue,jira_search_issues,confluence_get_page"
                },
            ):
                with patch(
                    "mcp_atlassian.jira.config.JiraConfig.from_env"
                ) as mock_jira_config:
                    # Configure mock
                    jira_config = MagicMock()
                    jira_config.is_auth_configured.return_value = True
                    mock_jira_config.return_value = jira_config

                    # Run lifespan
                    app = MagicMock()
                    async with main_lifespan(app) as context:
                        app_context = context["app_lifespan_context"]
                        assert app_context.enabled_tools == [
                            "jira_get_issue",
                            "jira_search_issues",
                            "confluence_get_page",
                        ]

    async def test_health_check_endpoint(self, atlassian_mcp_server):
        """Test the health check endpoint."""
        # Mock the http_app method to return a test app
        test_app = Starlette()
        test_app.add_route("/healthz", health_check, methods=["GET"])

        # Mock the method
        atlassian_mcp_server.http_app = MagicMock(return_value=test_app)

        # Create test client
        app = atlassian_mcp_server.http_app()

        # Use TestClient for synchronous testing of the Starlette app
        with TestClient(app) as client:
            response = client.get("/healthz")

        assert response.status_code == 200
        assert response.json() == {"status": "ok"}

    async def test_combined_filtering_scenarios(
        self, atlassian_mcp_server, mock_jira_config
    ):
        """Test combined filtering: read-only mode + enabled tools + service availability."""
        with MockEnvironment.basic_auth_env():
            # Create app context with multiple constraints
            app_context = MainAppContext(
                full_jira_config=mock_jira_config,
                full_confluence_config=None,  # Confluence not configured
                read_only=True,  # Read-only mode
                enabled_tools=[
                    "jira_get_issue",
                    "jira_create_issue",
                    "confluence_get_page",
                ],  # Mix of tools
            )

            # Mock request context
            request_context = MagicMock()
            request_context.lifespan_context = {"app_lifespan_context": app_context}

            # Set up server context
            atlassian_mcp_server._mcp_server = MagicMock()
            atlassian_mcp_server._mcp_server.request_context = request_context

            # Mock get_tools
            async def mock_get_tools():
                tools = {}
                tool_configs = [
                    ("jira_get_issue", {"jira", "read"}),  # Should be included
                    ("jira_create_issue", {"jira", "write"}),  # Excluded by read-only
                    (
                        "jira_search_issues",
                        {"jira", "read"},
                    ),  # Excluded by enabled_tools
                    (
                        "confluence_get_page",
                        {"confluence", "read"},
                    ),  # Excluded by service not configured
                ]

                for tool_name, tags in tool_configs:
                    tool = MagicMock(spec=FastMCPTool)
                    tool.tags = tags
                    tool.to_mcp_tool.return_value = MCPTool(
                        name=tool_name,
                        description=f"Tool {tool_name}",
                        inputSchema={"type": "object", "properties": {}},
                    )
                    tools[tool_name] = tool

                return tools

            atlassian_mcp_server.get_tools = mock_get_tools

            # Get filtered tools
            tools = await atlassian_mcp_server._mcp_list_tools()

            # Only jira_get_issue should pass all filters
            tool_names = [tool.name for tool in tools]
            assert tool_names == ["jira_get_issue"]

    async def test_request_context_missing(self, atlassian_mcp_server):
        """Test handling when request context is missing."""
        # Set up server without request context
        atlassian_mcp_server._mcp_server = MagicMock()
        atlassian_mcp_server._mcp_server.request_context = None

        # Mock get_tools (shouldn't be called)
        async def mock_get_tools():
            pytest.fail("get_tools should not be called when context is missing")

        atlassian_mcp_server.get_tools = mock_get_tools

        # Get filtered tools
        tools = await atlassian_mcp_server._mcp_list_tools()

        # Should return empty list
        assert tools == []

    async def test_http_app_middleware_integration(self, atlassian_mcp_server):
        """Test HTTP app creation with custom middleware."""
        # Create a mock app with middleware
        mock_app = MagicMock(spec=Starlette)
        mock_app.middleware = [
            Middleware(UserTokenMiddleware, mcp_server_ref=atlassian_mcp_server)
        ]

        # Mock the http_app method
        atlassian_mcp_server.http_app = MagicMock(return_value=mock_app)

        # Create HTTP app with custom middleware
        custom_middleware = []
        app = atlassian_mcp_server.http_app(
            path="/custom", middleware=custom_middleware, transport="sse"
        )

        # Verify app is created
        assert app is not None
        # UserTokenMiddleware should be added automatically
        assert any("UserTokenMiddleware" in str(m) for m in app.middleware)

    async def test_tool_execution_with_authentication_error(self, atlassian_mcp_server):
        """Test tool execution when authentication fails."""
        from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError

        # Mock a tool that raises authentication error
        async def mock_failing_tool(ctx: Context):
            raise MCPAtlassianAuthenticationError("Invalid credentials")

        # Create context
        mock_fastmcp = MagicMock()
        ctx = Context(fastmcp=mock_fastmcp)

        # Execute tool and verify error handling
        with pytest.raises(MCPAtlassianAuthenticationError):
            await mock_failing_tool(ctx)

```

--------------------------------------------------------------------------------
/tests/unit/servers/test_jira_server.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the Jira FastMCP server implementation."""

import json
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from fastmcp import Client, FastMCP
from fastmcp.client import FastMCPTransport
from fastmcp.exceptions import ToolError
from starlette.requests import Request

from src.mcp_atlassian.jira import JiraFetcher
from src.mcp_atlassian.jira.config import JiraConfig
from src.mcp_atlassian.servers.context import MainAppContext
from src.mcp_atlassian.servers.main import AtlassianMCP
from src.mcp_atlassian.utils.oauth import OAuthConfig
from tests.fixtures.jira_mocks import (
    MOCK_JIRA_COMMENTS_SIMPLIFIED,
    MOCK_JIRA_ISSUE_RESPONSE_SIMPLIFIED,
    MOCK_JIRA_JQL_RESPONSE_SIMPLIFIED,
)

logger = logging.getLogger(__name__)


@pytest.fixture
def mock_jira_fetcher():
    """Create a mock JiraFetcher using predefined responses from fixtures."""
    mock_fetcher = MagicMock(spec=JiraFetcher)
    mock_fetcher.config = MagicMock()
    mock_fetcher.config.read_only = False
    mock_fetcher.config.url = "https://test.atlassian.net"
    mock_fetcher.config.projects_filter = None  # Explicitly set to None by default

    # Configure common methods
    mock_fetcher.get_current_user_account_id.return_value = "test-account-id"
    mock_fetcher.jira = MagicMock()

    # Configure get_issue to return fixture data
    def mock_get_issue(
        issue_key,
        fields=None,
        expand=None,
        comment_limit=10,
        properties=None,
        update_history=True,
    ):
        if not issue_key:
            raise ValueError("Issue key is required")
        mock_issue = MagicMock()
        response_data = MOCK_JIRA_ISSUE_RESPONSE_SIMPLIFIED.copy()
        response_data["key"] = issue_key
        response_data["fields_queried"] = fields
        response_data["expand_param"] = expand
        response_data["comment_limit"] = comment_limit
        response_data["properties_param"] = properties
        response_data["update_history"] = update_history
        response_data["id"] = MOCK_JIRA_ISSUE_RESPONSE_SIMPLIFIED["id"]
        response_data["summary"] = MOCK_JIRA_ISSUE_RESPONSE_SIMPLIFIED["fields"][
            "summary"
        ]
        response_data["status"] = {
            "name": MOCK_JIRA_ISSUE_RESPONSE_SIMPLIFIED["fields"]["status"]["name"]
        }
        mock_issue.to_simplified_dict.return_value = response_data
        return mock_issue

    mock_fetcher.get_issue.side_effect = mock_get_issue

    # Configure get_issue_comments to return fixture data
    def mock_get_issue_comments(issue_key, limit=10):
        return MOCK_JIRA_COMMENTS_SIMPLIFIED["comments"][:limit]

    mock_fetcher.get_issue_comments.side_effect = mock_get_issue_comments

    # Configure search_issues to return fixture data
    def mock_search_issues(jql, **kwargs):
        mock_search_result = MagicMock()
        issues = []
        for issue_data in MOCK_JIRA_JQL_RESPONSE_SIMPLIFIED["issues"]:
            mock_issue = MagicMock()
            mock_issue.to_simplified_dict.return_value = issue_data
            issues.append(mock_issue)
        mock_search_result.issues = issues
        mock_search_result.total = len(issues)
        mock_search_result.start_at = kwargs.get("start", 0)
        mock_search_result.max_results = kwargs.get("limit", 50)
        mock_search_result.to_simplified_dict.return_value = {
            "total": len(issues),
            "start_at": kwargs.get("start", 0),
            "max_results": kwargs.get("limit", 50),
            "issues": [issue.to_simplified_dict() for issue in issues],
        }
        return mock_search_result

    mock_fetcher.search_issues.side_effect = mock_search_issues

    # Configure create_issue
    def mock_create_issue(
        project_key,
        summary,
        issue_type,
        description=None,
        assignee=None,
        components=None,
        **additional_fields,
    ):
        if not project_key or project_key.strip() == "":
            raise ValueError("valid project is required")
        components_list = None
        if components:
            if isinstance(components, str):
                components_list = components.split(",")
            elif isinstance(components, list):
                components_list = components
        mock_issue = MagicMock()
        response_data = {
            "key": f"{project_key}-456",
            "summary": summary,
            "description": description,
            "issue_type": {"name": issue_type},
            "status": {"name": "Open"},
            "components": [{"name": comp} for comp in components_list]
            if components_list
            else [],
            **additional_fields,
        }
        mock_issue.to_simplified_dict.return_value = response_data
        return mock_issue

    mock_fetcher.create_issue.side_effect = mock_create_issue

    # Configure batch_create_issues
    def mock_batch_create_issues(issues, validate_only=False):
        if not isinstance(issues, list):
            try:
                parsed_issues = json.loads(issues)
                if not isinstance(parsed_issues, list):
                    raise ValueError(
                        "Issues must be a list or a valid JSON array string."
                    )
                issues = parsed_issues
            except (json.JSONDecodeError, TypeError):
                raise ValueError("Issues must be a list or a valid JSON array string.")
        mock_issues = []
        for idx, issue_data in enumerate(issues, 1):
            mock_issue = MagicMock()
            mock_issue.to_simplified_dict.return_value = {
                "key": f"{issue_data['project_key']}-{idx}",
                "summary": issue_data["summary"],
                "issue_type": {"name": issue_data["issue_type"]},
                "status": {"name": "To Do"},
            }
            mock_issues.append(mock_issue)
        return mock_issues

    mock_fetcher.batch_create_issues.side_effect = mock_batch_create_issues

    # Configure get_epic_issues
    def mock_get_epic_issues(epic_key, start=0, limit=50):
        mock_issues = []
        for i in range(1, 4):
            mock_issue = MagicMock()
            mock_issue.to_simplified_dict.return_value = {
                "key": f"TEST-{i}",
                "summary": f"Epic Issue {i}",
                "issue_type": {"name": "Task" if i % 2 == 0 else "Bug"},
                "status": {"name": "To Do" if i % 2 == 0 else "In Progress"},
            }
            mock_issues.append(mock_issue)
        return mock_issues[start : start + limit]

    mock_fetcher.get_epic_issues.side_effect = mock_get_epic_issues

    # Configure get_all_projects
    def mock_get_all_projects(include_archived=False):
        projects = [
            {
                "id": "10000",
                "key": "TEST",
                "name": "Test Project",
                "description": "Project for testing",
                "lead": {"name": "admin", "displayName": "Administrator"},
                "projectTypeKey": "software",
                "archived": False,
            }
        ]
        if include_archived:
            projects.append(
                {
                    "id": "10001",
                    "key": "ARCHIVED",
                    "name": "Archived Project",
                    "description": "Archived project",
                    "lead": {"name": "admin", "displayName": "Administrator"},
                    "projectTypeKey": "software",
                    "archived": True,
                }
            )
        return projects

    # Set default side_effect to respect include_archived parameter
    mock_fetcher.get_all_projects.side_effect = mock_get_all_projects

    mock_fetcher.jira.jql.return_value = {
        "issues": [
            {
                "fields": {
                    "project": {
                        "key": "TEST",
                        "name": "Test Project",
                        "description": "Project for testing",
                    }
                }
            }
        ]
    }

    from src.mcp_atlassian.models.jira.common import JiraUser

    mock_user = MagicMock(spec=JiraUser)
    mock_user.to_simplified_dict.return_value = {
        "display_name": "Test User ([email protected])",
        "name": "Test User ([email protected])",
        "email": "[email protected]",
        "avatar_url": "https://test.atlassian.net/avatar/[email protected]",
    }
    mock_get_user_profile = MagicMock()

    def side_effect_func(identifier):
        if identifier == "[email protected]":
            raise ValueError(f"User '{identifier}' not found.")
        return mock_user

    mock_get_user_profile.side_effect = side_effect_func
    mock_fetcher.get_user_profile_by_identifier = mock_get_user_profile
    return mock_fetcher


@pytest.fixture
def mock_base_jira_config():
    """Create a mock base JiraConfig for MainAppContext using OAuth for multi-user scenario."""
    mock_oauth_config = OAuthConfig(
        client_id="server_client_id",
        client_secret="server_client_secret",
        redirect_uri="http://localhost",
        scope="read:jira-work",
        cloud_id="mock_jira_cloud_id",
    )
    return JiraConfig(
        url="https://mock-jira.atlassian.net",
        auth_type="oauth",
        oauth_config=mock_oauth_config,
    )


@pytest.fixture
def test_jira_mcp(mock_jira_fetcher, mock_base_jira_config):
    """Create a test FastMCP instance with standard configuration."""

    @asynccontextmanager
    async def test_lifespan(app: FastMCP) -> AsyncGenerator[MainAppContext, None]:
        try:
            yield MainAppContext(
                full_jira_config=mock_base_jira_config, read_only=False
            )
        finally:
            pass

    test_mcp = AtlassianMCP(
        "TestJira", description="Test Jira MCP Server", lifespan=test_lifespan
    )
    from src.mcp_atlassian.servers.jira import (
        add_comment,
        add_worklog,
        batch_create_issues,
        batch_create_versions,
        batch_get_changelogs,
        create_issue,
        create_issue_link,
        delete_issue,
        download_attachments,
        get_agile_boards,
        get_all_projects,
        get_board_issues,
        get_issue,
        get_link_types,
        get_project_issues,
        get_project_versions,
        get_sprint_issues,
        get_sprints_from_board,
        get_transitions,
        get_user_profile,
        get_worklog,
        link_to_epic,
        remove_issue_link,
        search,
        search_fields,
        transition_issue,
        update_issue,
        update_sprint,
    )

    jira_sub_mcp = FastMCP(name="TestJiraSubMCP")
    jira_sub_mcp.tool()(get_issue)
    jira_sub_mcp.tool()(search)
    jira_sub_mcp.tool()(search_fields)
    jira_sub_mcp.tool()(get_project_issues)
    jira_sub_mcp.tool()(get_project_versions)
    jira_sub_mcp.tool()(get_all_projects)
    jira_sub_mcp.tool()(get_transitions)
    jira_sub_mcp.tool()(get_worklog)
    jira_sub_mcp.tool()(download_attachments)
    jira_sub_mcp.tool()(get_agile_boards)
    jira_sub_mcp.tool()(get_board_issues)
    jira_sub_mcp.tool()(get_sprints_from_board)
    jira_sub_mcp.tool()(get_sprint_issues)
    jira_sub_mcp.tool()(get_link_types)
    jira_sub_mcp.tool()(get_user_profile)
    jira_sub_mcp.tool()(create_issue)
    jira_sub_mcp.tool()(batch_create_issues)
    jira_sub_mcp.tool()(batch_get_changelogs)
    jira_sub_mcp.tool()(update_issue)
    jira_sub_mcp.tool()(delete_issue)
    jira_sub_mcp.tool()(add_comment)
    jira_sub_mcp.tool()(add_worklog)
    jira_sub_mcp.tool()(link_to_epic)
    jira_sub_mcp.tool()(create_issue_link)
    jira_sub_mcp.tool()(remove_issue_link)
    jira_sub_mcp.tool()(transition_issue)
    jira_sub_mcp.tool()(update_sprint)
    jira_sub_mcp.tool()(batch_create_versions)
    test_mcp.mount("jira", jira_sub_mcp)
    return test_mcp


@pytest.fixture
def no_fetcher_test_jira_mcp(mock_base_jira_config):
    """Create a test FastMCP instance that simulates missing Jira fetcher."""

    @asynccontextmanager
    async def no_fetcher_test_lifespan(
        app: FastMCP,
    ) -> AsyncGenerator[MainAppContext, None]:
        try:
            yield MainAppContext(full_jira_config=None, read_only=False)
        finally:
            pass

    test_mcp = AtlassianMCP(
        "NoFetcherTestJira",
        description="No Fetcher Test Jira MCP Server",
        lifespan=no_fetcher_test_lifespan,
    )
    from src.mcp_atlassian.servers.jira import get_issue

    jira_sub_mcp = FastMCP(name="NoFetcherTestJiraSubMCP")
    jira_sub_mcp.tool()(get_issue)
    test_mcp.mount("jira", jira_sub_mcp)
    return test_mcp


@pytest.fixture
def mock_request():
    """Provides a mock Starlette Request object with a state."""
    request = MagicMock(spec=Request)
    request.state = MagicMock()
    request.state.jira_fetcher = None
    request.state.user_atlassian_auth_type = None
    request.state.user_atlassian_token = None
    request.state.user_atlassian_email = None
    return request


@pytest.fixture
async def jira_client(test_jira_mcp, mock_jira_fetcher, mock_request):
    """Create a FastMCP client with mocked Jira fetcher and request state."""
    with (
        patch(
            "src.mcp_atlassian.servers.jira.get_jira_fetcher",
            AsyncMock(return_value=mock_jira_fetcher),
        ),
        patch(
            "src.mcp_atlassian.servers.dependencies.get_http_request",
            return_value=mock_request,
        ),
    ):
        async with Client(transport=FastMCPTransport(test_jira_mcp)) as client_instance:
            yield client_instance


@pytest.fixture
async def no_fetcher_client_fixture(no_fetcher_test_jira_mcp, mock_request):
    """Create a client that simulates missing Jira fetcher configuration."""
    async with Client(
        transport=FastMCPTransport(no_fetcher_test_jira_mcp)
    ) as client_for_no_fetcher:
        yield client_for_no_fetcher


@pytest.mark.anyio
async def test_get_issue(jira_client, mock_jira_fetcher):
    """Test the get_issue tool with fixture data."""
    response = await jira_client.call_tool(
        "jira_get_issue",
        {
            "issue_key": "TEST-123",
            "fields": "summary,description,status",
        },
    )
    assert isinstance(response, list)
    assert len(response) > 0
    text_content = response[0]
    assert text_content.type == "text"
    content = json.loads(text_content.text)
    assert content["key"] == "TEST-123"
    assert content["summary"] == "Test Issue Summary"
    mock_jira_fetcher.get_issue.assert_called_once_with(
        issue_key="TEST-123",
        fields=["summary", "description", "status"],
        expand=None,
        comment_limit=10,
        properties=None,
        update_history=True,
    )


@pytest.mark.anyio
async def test_search(jira_client, mock_jira_fetcher):
    """Test the search tool with fixture data."""
    response = await jira_client.call_tool(
        "jira_search",
        {
            "jql": "project = TEST",
            "fields": "summary,status",
            "limit": 10,
            "start_at": 0,
        },
    )
    assert isinstance(response, list)
    assert len(response) > 0
    text_content = response[0]
    assert text_content.type == "text"
    content = json.loads(text_content.text)
    assert isinstance(content, dict)
    assert "issues" in content
    assert isinstance(content["issues"], list)
    assert len(content["issues"]) >= 1
    assert content["issues"][0]["key"] == "PROJ-123"
    assert content["total"] > 0
    assert content["start_at"] == 0
    assert content["max_results"] == 10
    mock_jira_fetcher.search_issues.assert_called_once_with(
        jql="project = TEST",
        fields=["summary", "status"],
        limit=10,
        start=0,
        projects_filter=None,
        expand=None,
    )


@pytest.mark.anyio
async def test_create_issue(jira_client, mock_jira_fetcher):
    """Test the create_issue tool with fixture data."""
    response = await jira_client.call_tool(
        "jira_create_issue",
        {
            "project_key": "TEST",
            "summary": "New Issue",
            "issue_type": "Task",
            "description": "This is a new task",
            "components": "Frontend,API",
            "additional_fields": {"priority": {"name": "Medium"}},
        },
    )
    assert isinstance(response, list)
    assert len(response) > 0
    text_content = response[0]
    assert text_content.type == "text"
    content = json.loads(text_content.text)
    assert content["message"] == "Issue created successfully"
    assert "issue" in content
    assert content["issue"]["key"] == "TEST-456"
    assert content["issue"]["summary"] == "New Issue"
    assert content["issue"]["description"] == "This is a new task"
    assert "components" in content["issue"]
    component_names = [comp["name"] for comp in content["issue"]["components"]]
    assert "Frontend" in component_names
    assert "API" in component_names
    assert content["issue"]["priority"] == {"name": "Medium"}
    mock_jira_fetcher.create_issue.assert_called_once_with(
        project_key="TEST",
        summary="New Issue",
        issue_type="Task",
        description="This is a new task",
        assignee=None,
        components=["Frontend", "API"],
        priority={"name": "Medium"},
    )


@pytest.mark.anyio
async def test_batch_create_issues(jira_client, mock_jira_fetcher):
    """Test batch creation of Jira issues."""
    test_issues = [
        {
            "project_key": "TEST",
            "summary": "Test Issue 1",
            "issue_type": "Task",
            "description": "Test description 1",
            "assignee": "[email protected]",
            "components": ["Frontend", "API"],
        },
        {
            "project_key": "TEST",
            "summary": "Test Issue 2",
            "issue_type": "Bug",
            "description": "Test description 2",
        },
    ]
    test_issues_json = json.dumps(test_issues)
    response = await jira_client.call_tool(
        "jira_batch_create_issues",
        {"issues": test_issues_json, "validate_only": False},
    )
    assert len(response) == 1
    text_content = response[0]
    assert text_content.type == "text"
    content = json.loads(text_content.text)
    assert "message" in content
    assert "issues" in content
    assert len(content["issues"]) == 2
    assert content["issues"][0]["key"] == "TEST-1"
    assert content["issues"][1]["key"] == "TEST-2"
    call_args, call_kwargs = mock_jira_fetcher.batch_create_issues.call_args
    assert call_args[0] == test_issues
    assert "validate_only" in call_kwargs
    assert call_kwargs["validate_only"] is False


@pytest.mark.anyio
async def test_batch_create_issues_invalid_json(jira_client):
    """Test error handling for invalid JSON in batch issue creation."""
    with pytest.raises(ToolError) as excinfo:
        await jira_client.call_tool(
            "jira_batch_create_issues",
            {"issues": "{invalid json", "validate_only": False},
        )
    assert "Error calling tool 'batch_create_issues'" in str(excinfo.value)


@pytest.mark.anyio
async def test_get_user_profile_tool_success(jira_client, mock_jira_fetcher):
    """Test the get_user_profile tool successfully retrieves user info."""
    response = await jira_client.call_tool(
        "jira_get_user_profile", {"user_identifier": "[email protected]"}
    )
    mock_jira_fetcher.get_user_profile_by_identifier.assert_called_once_with(
        "[email protected]"
    )
    assert len(response) == 1
    result_data = json.loads(response[0].text)
    assert result_data["success"] is True
    assert "user" in result_data
    user_info = result_data["user"]
    assert user_info["display_name"] == "Test User ([email protected])"
    assert user_info["email"] == "[email protected]"
    assert (
        user_info["avatar_url"]
        == "https://test.atlassian.net/avatar/[email protected]"
    )


@pytest.mark.anyio
async def test_get_user_profile_tool_not_found(jira_client, mock_jira_fetcher):
    """Test the get_user_profile tool handles 'user not found' errors."""
    response = await jira_client.call_tool(
        "jira_get_user_profile", {"user_identifier": "[email protected]"}
    )
    assert len(response) == 1
    result_data = json.loads(response[0].text)
    assert result_data["success"] is False
    assert "error" in result_data
    assert "not found" in result_data["error"]
    assert result_data["user_identifier"] == "[email protected]"


@pytest.mark.anyio
async def test_no_fetcher_get_issue(no_fetcher_client_fixture, mock_request):
    """Test that get_issue fails when Jira client is not configured (global config missing)."""

    async def mock_get_fetcher_error(*args, **kwargs):
        raise ValueError(
            "Mocked: Jira client (fetcher) not available. Ensure server is configured correctly."
        )

    with (
        patch(
            "src.mcp_atlassian.servers.jira.get_jira_fetcher",
            AsyncMock(side_effect=mock_get_fetcher_error),
        ),
        patch(
            "src.mcp_atlassian.servers.dependencies.get_http_request",
            return_value=mock_request,
        ),
    ):
        with pytest.raises(ToolError) as excinfo:
            await no_fetcher_client_fixture.call_tool(
                "jira_get_issue",
                {
                    "issue_key": "TEST-123",
                },
            )
    assert "Error calling tool 'get_issue'" in str(excinfo.value)


@pytest.mark.anyio
async def test_get_issue_with_user_specific_fetcher_in_state(
    test_jira_mcp, mock_jira_fetcher, mock_base_jira_config
):
    """Test get_issue uses fetcher from request.state if UserTokenMiddleware provided it."""
    _mock_request_with_fetcher_in_state = MagicMock(spec=Request)
    _mock_request_with_fetcher_in_state.state = MagicMock()
    _mock_request_with_fetcher_in_state.state.jira_fetcher = mock_jira_fetcher
    _mock_request_with_fetcher_in_state.state.user_atlassian_auth_type = "oauth"
    _mock_request_with_fetcher_in_state.state.user_atlassian_token = (
        "user_specific_token"
    )

    # Define the specific fields we expect for this test case
    test_fields_str = "summary,status,issuetype"
    expected_fields_list = ["summary", "status", "issuetype"]

    # Import the real get_jira_fetcher to test its interaction with request.state
    from src.mcp_atlassian.servers.dependencies import (
        get_jira_fetcher as get_jira_fetcher_real,
    )

    with (
        patch(
            "src.mcp_atlassian.servers.dependencies.get_http_request",
            return_value=_mock_request_with_fetcher_in_state,
        ) as mock_get_http,
        patch(
            "src.mcp_atlassian.servers.jira.get_jira_fetcher",
            side_effect=AsyncMock(wraps=get_jira_fetcher_real),
        ),
    ):
        async with Client(transport=FastMCPTransport(test_jira_mcp)) as client_instance:
            response = await client_instance.call_tool(
                "jira_get_issue",
                {"issue_key": "USER-STATE-1", "fields": test_fields_str},
            )

    mock_get_http.assert_called()
    mock_jira_fetcher.get_issue.assert_called_with(
        issue_key="USER-STATE-1",
        fields=expected_fields_list,
        expand=None,
        comment_limit=10,
        properties=None,
        update_history=True,
    )
    result_data = json.loads(response[0].text)
    assert result_data["key"] == "USER-STATE-1"


@pytest.mark.anyio
async def test_get_project_versions_tool(jira_client, mock_jira_fetcher):
    """Test the jira_get_project_versions tool returns simplified version list."""
    # Prepare mock raw versions
    raw_versions = [
        {
            "id": "100",
            "name": "v1.0",
            "description": "First",
            "released": True,
            "archived": False,
        },
        {
            "id": "101",
            "name": "v2.0",
            "startDate": "2025-01-01",
            "releaseDate": "2025-02-01",
            "released": False,
            "archived": False,
        },
    ]
    mock_jira_fetcher.get_project_versions.return_value = raw_versions

    response = await jira_client.call_tool(
        "jira_get_project_versions",
        {"project_key": "TEST"},
    )
    assert isinstance(response, list)
    assert len(response) == 1  # FastMCP wraps as list of messages
    msg = response[0]
    assert msg.type == "text"
    import json

    data = json.loads(msg.text)
    assert isinstance(data, list)
    # Check fields in simplified dict
    assert data[0]["id"] == "100"
    assert data[0]["name"] == "v1.0"
    assert data[0]["description"] == "First"


@pytest.mark.anyio
async def test_get_all_projects_tool(jira_client, mock_jira_fetcher):
    """Test the jira_get_all_projects tool returns all accessible projects."""
    # Prepare mock project data
    mock_projects = [
        {
            "id": "10000",
            "key": "PROJ1",
            "name": "Project One",
            "description": "First project",
            "lead": {"name": "user1", "displayName": "User One"},
            "projectTypeKey": "software",
            "archived": False,
        },
        {
            "id": "10001",
            "key": "PROJ2",
            "name": "Project Two",
            "description": "Second project",
            "lead": {"name": "user2", "displayName": "User Two"},
            "projectTypeKey": "business",
            "archived": False,
        },
    ]
    # Reset the mock and set specific return value for this test
    mock_jira_fetcher.get_all_projects.reset_mock()
    mock_jira_fetcher.get_all_projects.side_effect = (
        lambda include_archived=False: mock_projects
    )

    # Test with default parameters (include_archived=False)
    response = await jira_client.call_tool(
        "jira_get_all_projects",
        {},
    )
    assert isinstance(response, list)
    assert len(response) == 1  # FastMCP wraps as list of messages
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert isinstance(data, list)
    assert len(data) == 2
    assert data[0]["id"] == "10000"
    assert data[0]["key"] == "PROJ1"
    assert data[0]["name"] == "Project One"
    assert data[1]["id"] == "10001"
    assert data[1]["key"] == "PROJ2"
    assert data[1]["name"] == "Project Two"

    # Verify the underlying method was called with default parameter
    mock_jira_fetcher.get_all_projects.assert_called_once_with(include_archived=False)


@pytest.mark.anyio
async def test_get_all_projects_tool_with_archived(jira_client, mock_jira_fetcher):
    """Test the jira_get_all_projects tool with include_archived=True."""
    mock_projects = [
        {
            "id": "10000",
            "key": "PROJ1",
            "name": "Active Project",
            "description": "Active project",
            "archived": False,
        },
        {
            "id": "10002",
            "key": "ARCHIVED",
            "name": "Archived Project",
            "description": "Archived project",
            "archived": True,
        },
    ]
    # Reset the mock and set specific return value for this test
    mock_jira_fetcher.get_all_projects.reset_mock()
    mock_jira_fetcher.get_all_projects.side_effect = (
        lambda include_archived=False: mock_projects
    )

    # Test with include_archived=True
    response = await jira_client.call_tool(
        "jira_get_all_projects",
        {"include_archived": True},
    )
    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert isinstance(data, list)
    assert len(data) == 2
    # Project keys should always be uppercase in the response
    assert data[0]["key"] == "PROJ1"
    assert data[1]["key"] == "ARCHIVED"

    # Verify the underlying method was called with include_archived=True
    mock_jira_fetcher.get_all_projects.assert_called_once_with(include_archived=True)


@pytest.mark.anyio
async def test_get_all_projects_tool_with_projects_filter(
    jira_client, mock_jira_fetcher
):
    """Test the jira_get_all_projects tool respects project filter configuration."""
    # Prepare mock project data - simulate getting all projects from API
    all_mock_projects = [
        {
            "id": "10000",
            "key": "PROJ1",
            "name": "Project One",
            "description": "First project",
        },
        {
            "id": "10001",
            "key": "PROJ2",
            "name": "Project Two",
            "description": "Second project",
        },
        {
            "id": "10002",
            "key": "OTHER",
            "name": "Other Project",
            "description": "Should be filtered out",
        },
    ]

    # Set up the mock to return all projects
    mock_jira_fetcher.get_all_projects.reset_mock()
    mock_jira_fetcher.get_all_projects.side_effect = (
        lambda include_archived=False: all_mock_projects
    )

    # Set up the projects filter in the config
    mock_jira_fetcher.config.projects_filter = "PROJ1,PROJ2"

    # Call the tool
    response = await jira_client.call_tool(
        "jira_get_all_projects",
        {},
    )

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert isinstance(data, list)

    # Should only return projects in the filter (PROJ1, PROJ2), not OTHER
    assert len(data) == 2
    returned_keys = [project["key"] for project in data]
    # Project keys should always be uppercase in the response
    assert "PROJ1" in returned_keys
    assert "PROJ2" in returned_keys
    assert "OTHER" not in returned_keys

    # Verify the underlying method was called (still gets all projects, but then filters)
    mock_jira_fetcher.get_all_projects.assert_called_once_with(include_archived=False)


@pytest.mark.anyio
async def test_get_all_projects_tool_no_projects_filter(jira_client, mock_jira_fetcher):
    """Test the jira_get_all_projects tool returns all projects when no filter is configured."""
    # Prepare mock project data
    all_mock_projects = [
        {
            "id": "10000",
            "key": "PROJ1",
            "name": "Project One",
            "description": "First project",
        },
        {
            "id": "10001",
            "key": "OTHER",
            "name": "Other Project",
            "description": "Should not be filtered out",
        },
    ]

    # Set up the mock to return all projects
    mock_jira_fetcher.get_all_projects.reset_mock()
    mock_jira_fetcher.get_all_projects.side_effect = (
        lambda include_archived=False: all_mock_projects
    )

    # Ensure no projects filter is set
    mock_jira_fetcher.config.projects_filter = None

    # Call the tool
    response = await jira_client.call_tool(
        "jira_get_all_projects",
        {},
    )

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert isinstance(data, list)

    # Should return all projects when no filter is configured
    assert len(data) == 2
    returned_keys = [project["key"] for project in data]
    # Project keys should always be uppercase in the response
    assert "PROJ1" in returned_keys
    assert "OTHER" in returned_keys

    # Verify the underlying method was called
    mock_jira_fetcher.get_all_projects.assert_called_once_with(include_archived=False)


@pytest.mark.anyio
async def test_get_all_projects_tool_case_insensitive_filter(
    jira_client, mock_jira_fetcher
):
    """Test the jira_get_all_projects tool handles case-insensitive filtering and whitespace."""
    # Prepare mock project data with mixed case
    all_mock_projects = [
        {
            "id": "10000",
            "key": "proj1",  # lowercase
            "name": "Project One",
            "description": "First project",
        },
        {
            "id": "10001",
            "key": "PROJ2",  # uppercase
            "name": "Project Two",
            "description": "Second project",
        },
        {
            "id": "10002",
            "key": "other",  # should be filtered out
            "name": "Other Project",
            "description": "Should be filtered out",
        },
    ]

    # Set up the mock to return all projects
    mock_jira_fetcher.get_all_projects.reset_mock()
    mock_jira_fetcher.get_all_projects.side_effect = (
        lambda include_archived=False: all_mock_projects
    )

    # Set up projects filter with mixed case and whitespace
    mock_jira_fetcher.config.projects_filter = " PROJ1 , proj2 "

    # Call the tool
    response = await jira_client.call_tool(
        "jira_get_all_projects",
        {},
    )

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert isinstance(data, list)

    # Should return projects matching the filter (case-insensitive)
    assert len(data) == 2
    returned_keys = [project["key"] for project in data]
    # Project keys should always be uppercase in the response, regardless of input case
    assert "PROJ1" in returned_keys  # lowercase input converted to uppercase
    assert "PROJ2" in returned_keys  # uppercase stays uppercase
    assert "OTHER" not in returned_keys  # not in filter

    # Verify the underlying method was called
    mock_jira_fetcher.get_all_projects.assert_called_once_with(include_archived=False)


@pytest.mark.anyio
async def test_get_all_projects_tool_empty_response(jira_client, mock_jira_fetcher):
    """Test tool handles empty list of projects from API."""
    mock_jira_fetcher.get_all_projects.side_effect = lambda include_archived=False: []

    response = await jira_client.call_tool("jira_get_all_projects", {})

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert data == []


@pytest.mark.anyio
async def test_get_all_projects_tool_api_error_handling(jira_client, mock_jira_fetcher):
    """Test tool handles API errors gracefully."""
    from requests.exceptions import HTTPError

    mock_jira_fetcher.get_all_projects.side_effect = HTTPError("API Error")

    response = await jira_client.call_tool("jira_get_all_projects", {})

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert data["success"] is False
    assert "API Error" in data["error"]


@pytest.mark.anyio
async def test_get_all_projects_tool_authentication_error_handling(
    jira_client, mock_jira_fetcher
):
    """Test tool handles authentication errors gracefully."""
    from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError

    mock_jira_fetcher.get_all_projects.side_effect = MCPAtlassianAuthenticationError(
        "Authentication failed"
    )

    response = await jira_client.call_tool("jira_get_all_projects", {})

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert data["success"] is False
    assert "Authentication/Permission Error" in data["error"]


@pytest.mark.anyio
async def test_get_all_projects_tool_configuration_error_handling(
    jira_client, mock_jira_fetcher
):
    """Test tool handles configuration errors gracefully."""
    mock_jira_fetcher.get_all_projects.side_effect = ValueError(
        "Jira client not configured"
    )

    response = await jira_client.call_tool("jira_get_all_projects", {})

    assert isinstance(response, list)
    assert len(response) == 1
    msg = response[0]
    assert msg.type == "text"

    data = json.loads(msg.text)
    assert data["success"] is False
    assert "Configuration Error" in data["error"]


@pytest.mark.anyio
async def test_batch_create_versions_all_success(jira_client, mock_jira_fetcher):
    """Test batch creation of Jira versions where all succeed."""
    versions = [
        {
            "name": "v1.0",
            "startDate": "2025-01-01",
            "releaseDate": "2025-02-01",
            "description": "First release",
        },
        {"name": "v2.0", "description": "Second release"},
    ]
    # Patch create_project_version to always succeed
    mock_jira_fetcher.create_project_version.side_effect = lambda **kwargs: {
        "id": f"{kwargs['name']}-id",
        **kwargs,
    }
    response = await jira_client.call_tool(
        "jira_batch_create_versions",
        {"project_key": "TEST", "versions": json.dumps(versions)},
    )
    assert len(response) == 1
    content = json.loads(response[0].text)
    assert all(item["success"] for item in content)
    assert content[0]["version"]["name"] == "v1.0"
    assert content[1]["version"]["name"] == "v2.0"


@pytest.mark.anyio
async def test_batch_create_versions_partial_failure(jira_client, mock_jira_fetcher):
    """Test batch creation of Jira versions with some failures."""

    def side_effect(
        project_key, name, start_date=None, release_date=None, description=None
    ):
        if name == "bad":
            raise Exception("Simulated failure")
        return {"id": f"{name}-id", "name": name}

    mock_jira_fetcher.create_project_version.side_effect = side_effect
    versions = [
        {"name": "good1"},
        {"name": "bad"},
        {"name": "good2"},
    ]
    response = await jira_client.call_tool(
        "jira_batch_create_versions",
        {"project_key": "TEST", "versions": json.dumps(versions)},
    )
    content = json.loads(response[0].text)
    assert content[0]["success"] is True
    assert content[1]["success"] is False
    assert "Simulated failure" in content[1]["error"]
    assert content[2]["success"] is True


@pytest.mark.anyio
async def test_batch_create_versions_all_failure(jira_client, mock_jira_fetcher):
    """Test batch creation of Jira versions where all fail."""
    mock_jira_fetcher.create_project_version.side_effect = Exception("API down")
    versions = [
        {"name": "fail1"},
        {"name": "fail2"},
    ]
    response = await jira_client.call_tool(
        "jira_batch_create_versions",
        {"project_key": "TEST", "versions": json.dumps(versions)},
    )
    content = json.loads(response[0].text)
    assert all(not item["success"] for item in content)
    assert all("API down" in item["error"] for item in content)


@pytest.mark.anyio
async def test_batch_create_versions_empty(jira_client, mock_jira_fetcher):
    """Test batch creation of Jira versions with empty input."""
    response = await jira_client.call_tool(
        "jira_batch_create_versions",
        {"project_key": "TEST", "versions": json.dumps([])},
    )
    content = json.loads(response[0].text)
    assert content == []

```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_pages.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the PagesMixin class."""

from unittest.mock import MagicMock, patch

import pytest

from mcp_atlassian.confluence.pages import PagesMixin
from mcp_atlassian.models.confluence import ConfluencePage


class TestPagesMixin:
    """Tests for the PagesMixin class."""

    @pytest.fixture
    def pages_mixin(self, confluence_client):
        """Create a PagesMixin instance for testing."""
        # PagesMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = PagesMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = confluence_client.confluence
            mixin.config = confluence_client.config
            mixin.preprocessor = confluence_client.preprocessor
            return mixin

    def test_get_page_content(self, pages_mixin):
        """Test getting page content by ID."""
        # Arrange
        page_id = "987654321"
        pages_mixin.config.url = "https://example.atlassian.net/wiki"

        # Act
        result = pages_mixin.get_page_content(page_id, convert_to_markdown=True)

        # Assert
        pages_mixin.confluence.get_page_by_id.assert_called_once_with(
            page_id=page_id, expand="body.storage,version,space,children.attachment"
        )

        # Verify result structure
        assert isinstance(result, ConfluencePage)
        assert result.id == "987654321"
        assert result.title == "Example Meeting Notes"

        # Test space information
        assert result.space is not None
        assert result.space.key == "PROJ"

        # Use direct attributes instead of backward compatibility
        assert result.content == "Processed Markdown"
        assert result.id == page_id
        assert result.title == "Example Meeting Notes"
        assert result.space.key == "PROJ"
        assert result.url is not None

        # Test version information
        assert result.version is not None
        assert result.version.number == 1

        # Test attachments
        assert result.attachments is not None
        assert len(result.attachments) == 2
        assert result.attachments[0].id is not None
        assert result.attachments[1].id is not None

    def test_get_page_ancestors(self, pages_mixin):
        """Test getting page ancestors (parent pages)."""
        # Arrange
        page_id = "987654321"
        pages_mixin.config.url = "https://example.atlassian.net/wiki"

        # Mock the ancestors API response
        ancestors_data = [
            {
                "id": "123456789",
                "title": "Parent Page",
                "type": "page",
                "status": "current",
                "space": {"key": "PROJ", "name": "Project Space"},
            },
            {
                "id": "111222333",
                "title": "Grandparent Page",
                "type": "page",
                "status": "current",
                "space": {"key": "PROJ", "name": "Project Space"},
            },
        ]
        pages_mixin.confluence.get_page_ancestors.return_value = ancestors_data

        # Act
        result = pages_mixin.get_page_ancestors(page_id)

        # Assert
        pages_mixin.confluence.get_page_ancestors.assert_called_once_with(page_id)

        # Verify result structure
        assert isinstance(result, list)
        assert len(result) == 2

        # Test first ancestor (parent)
        assert isinstance(result[0], ConfluencePage)
        assert result[0].id == "123456789"
        assert result[0].title == "Parent Page"
        assert result[0].space.key == "PROJ"

        # Test second ancestor (grandparent)
        assert isinstance(result[1], ConfluencePage)
        assert result[1].id == "111222333"
        assert result[1].title == "Grandparent Page"

    def test_get_page_ancestors_empty(self, pages_mixin):
        """Test getting ancestors when there are none (top-level page)."""
        # Arrange
        page_id = "987654321"
        pages_mixin.confluence.get_page_ancestors.return_value = []

        # Act
        result = pages_mixin.get_page_ancestors(page_id)

        # Assert
        assert isinstance(result, list)
        assert len(result) == 0

    def test_get_page_ancestors_error(self, pages_mixin):
        """Test error handling when getting ancestors."""
        # Arrange
        page_id = "987654321"
        pages_mixin.confluence.get_page_ancestors.side_effect = Exception("API Error")

        # Act
        result = pages_mixin.get_page_ancestors(page_id)

        # Assert - should return empty list on error, not raise exception
        assert isinstance(result, list)
        assert len(result) == 0

    def test_get_page_content_html(self, pages_mixin):
        """Test getting page content in HTML format."""
        pages_mixin.config.url = "https://example.atlassian.net/wiki"

        # Mock the preprocessor to return HTML
        pages_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed Markdown",
        )

        # Act
        result = pages_mixin.get_page_content("987654321", convert_to_markdown=False)

        # Assert HTML processing was used
        assert result.content == "<p>Processed HTML</p>"

    def test_get_page_by_title_success(self, pages_mixin):
        """Test getting a page by title when it exists."""
        # Setup
        space_key = "DEMO"
        title = "Example Page"

        # Mock getting the page by title
        pages_mixin.confluence.get_page_by_title.return_value = {
            "id": "987654321",
            "title": title,
            "space": {"key": space_key},
            "body": {"storage": {"value": "<p>Example content</p>"}},
            "version": {"number": 1},
        }

        # Mock the HTML processing
        pages_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed Markdown",
        )

        # Call the method
        result = pages_mixin.get_page_by_title(space_key, title)

        # Verify API calls
        pages_mixin.confluence.get_page_by_title.assert_called_once_with(
            space=space_key, title=title, expand="body.storage,version"
        )

        # Verify result
        assert result.id == "987654321"
        assert result.title == title
        assert result.content == "Processed Markdown"

    def test_get_page_by_title_space_not_found(self, pages_mixin):
        """Test getting a page when the space doesn't exist."""
        # Arrange - API returns None when space doesn't exist
        pages_mixin.confluence.get_page_by_title.return_value = None

        # Act
        result = pages_mixin.get_page_by_title("NONEXISTENT", "Page Title")

        # Assert
        assert result is None
        pages_mixin.confluence.get_page_by_title.assert_called_once_with(
            space="NONEXISTENT", title="Page Title", expand="body.storage,version"
        )

    def test_get_page_by_title_page_not_found(self, pages_mixin):
        """Test getting a page that doesn't exist."""
        # Arrange
        pages_mixin.confluence.get_page_by_title.return_value = None

        # Act
        result = pages_mixin.get_page_by_title("PROJ", "Nonexistent Page")

        # Assert
        assert result is None
        pages_mixin.confluence.get_page_by_title.assert_called_once_with(
            space="PROJ", title="Nonexistent Page", expand="body.storage,version"
        )

    def test_get_page_by_title_error_handling(self, pages_mixin):
        """Test error handling in get_page_by_title."""
        # Arrange
        pages_mixin.confluence.get_page_by_title.side_effect = KeyError("Missing key")

        # Act
        result = pages_mixin.get_page_by_title("PROJ", "Page Title")

        # Assert
        assert result is None

    def test_get_space_pages(self, pages_mixin):
        """Test getting all pages from a space."""
        # Arrange
        space_key = "PROJ"
        pages_mixin.config.url = "https://example.atlassian.net/wiki"

        # Act
        results = pages_mixin.get_space_pages(
            space_key, start=0, limit=10, convert_to_markdown=True
        )

        # Assert
        pages_mixin.confluence.get_all_pages_from_space.assert_called_once_with(
            space=space_key, start=0, limit=10, expand="body.storage"
        )

        # Verify results
        assert len(results) == 2  # Mock has 2 pages

        # Verify each result is a ConfluencePage
        for result in results:
            assert isinstance(result, ConfluencePage)
            assert result.content == "Processed Markdown"
            assert result.space is not None
            assert result.space.key == "PROJ"

        # Verify individual pages
        assert results[0].id == "123456789"  # First page ID from mock
        assert results[0].title == "Sample Research Paper Title"

        # Verify the second page
        assert results[1].id == "987654321"  # Second page ID from mock
        assert results[1].title == "Example Meeting Notes"

    def test_create_page_success(self, pages_mixin):
        """Test creating a new page."""
        # Arrange
        space_key = "PROJ"
        title = "New Test Page"
        body = "<p>Test content</p>"
        parent_id = "987654321"

        # Mock get_page_content to return a ConfluencePage
        with patch.object(
            pages_mixin,
            "get_page_content",
            return_value=ConfluencePage(
                id="123456789",
                title=title,
                content="Page content",
                space={"key": space_key, "name": "Project"},
            ),
        ):
            # Act - specify is_markdown=False since we're directly providing storage format
            result = pages_mixin.create_page(
                space_key, title, body, parent_id, is_markdown=False
            )

            # Assert
            pages_mixin.confluence.create_page.assert_called_once_with(
                space=space_key,
                title=title,
                body=body,
                parent_id=parent_id,
                representation="storage",
            )

            # Verify result is a ConfluencePage
            assert isinstance(result, ConfluencePage)
            assert result.id == "123456789"
            assert result.title == title
            assert result.content == "Page content"

    def test_create_page_error(self, pages_mixin):
        """Test error handling when creating a page."""
        # Arrange
        pages_mixin.confluence.create_page.side_effect = Exception("API Error")

        # Act/Assert
        with pytest.raises(Exception, match="API Error"):
            pages_mixin.create_page("PROJ", "Test Page", "<p>Content</p>")

    def test_create_page_with_wiki_format(self, pages_mixin):
        """Test creating a new page with wiki markup format."""
        # Arrange
        space_key = "PROJ"
        title = "Wiki Format Test Page"
        wiki_body = "h1. This is a heading\n\n* Item 1\n* Item 2"

        # Mock get_page_content to return a ConfluencePage
        with patch.object(
            pages_mixin,
            "get_page_content",
            return_value=ConfluencePage(
                id="wiki123",
                title=title,
                content="Wiki page content",
                space={"key": space_key, "name": "Project"},
            ),
        ):
            # Act - use wiki format
            result = pages_mixin.create_page(
                space_key,
                title,
                wiki_body,
                is_markdown=False,
                content_representation="wiki",
            )

            # Assert
            pages_mixin.confluence.create_page.assert_called_once_with(
                space=space_key,
                title=title,
                body=wiki_body,  # Should be passed as-is
                parent_id=None,
                representation="wiki",  # Should use wiki representation
            )

            # Verify no markdown conversion happened
            pages_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()

            # Verify result is a ConfluencePage
            assert isinstance(result, ConfluencePage)
            assert result.id == "wiki123"

    def test_update_page_success(self, pages_mixin):
        """Test updating an existing page."""
        # Arrange
        page_id = "987654321"
        title = "Updated Page"
        body = "<p>Updated content</p>"
        is_minor_edit = True
        version_comment = "Updated test"

        # Mock get_page_content to return a document
        mock_document = ConfluencePage(
            id=page_id,
            title=title,
            content="Updated content",
            space={"key": "PROJ", "name": "Project"},
            version={"number": 1},  # Add version information
        )
        with patch.object(pages_mixin, "get_page_content", return_value=mock_document):
            # Act - specify is_markdown=False since we're directly providing storage format
            result = pages_mixin.update_page(
                page_id,
                title,
                body,
                is_minor_edit=is_minor_edit,
                version_comment=version_comment,
                is_markdown=False,
            )

            # Assert
            # Verify update_page was called with the correct arguments
            # We now include type='page' and always_update=True parameters
            pages_mixin.confluence.update_page.assert_called_once_with(
                page_id=page_id,
                title=title,
                body=body,
                type="page",
                representation="storage",
                minor_edit=is_minor_edit,
                version_comment=version_comment,
                always_update=True,
            )

    def test_update_page_error(self, pages_mixin):
        """Test error handling when updating a page."""
        # Arrange
        pages_mixin.confluence.update_page.side_effect = Exception("API Error")

        # Act/Assert
        with pytest.raises(Exception, match="Failed to update page"):
            pages_mixin.update_page("987654321", "Test Page", "<p>Content</p>")

    def test_update_page_with_wiki_format(self, pages_mixin):
        """Test updating a page with wiki markup format."""
        # Arrange
        page_id = "wiki987"
        title = "Updated Wiki Page"
        wiki_body = "h1. Updated Heading\n\n||Header 1||Header 2||\n|Cell 1|Cell 2|"
        version_comment = "Wiki format update"

        # Mock get_page_content to return a document
        mock_document = ConfluencePage(
            id=page_id,
            title=title,
            content="Updated wiki content",
            space={"key": "PROJ", "name": "Project"},
            version={"number": 2},
        )
        with patch.object(pages_mixin, "get_page_content", return_value=mock_document):
            # Act - use wiki format
            result = pages_mixin.update_page(
                page_id,
                title,
                wiki_body,
                version_comment=version_comment,
                is_markdown=False,
                content_representation="wiki",
            )

            # Assert
            pages_mixin.confluence.update_page.assert_called_once_with(
                page_id=page_id,
                title=title,
                body=wiki_body,  # Should be passed as-is
                type="page",
                representation="wiki",  # Should use wiki representation
                minor_edit=False,
                version_comment=version_comment,
                always_update=True,
            )

            # Verify no markdown conversion happened
            pages_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()

            # Verify result is a ConfluencePage
            assert isinstance(result, ConfluencePage)
            assert result.id == page_id

    def test_delete_page_success(self, pages_mixin):
        """Test successfully deleting a page."""
        # Arrange
        page_id = "987654321"
        pages_mixin.confluence.remove_page.return_value = True

        # Act
        result = pages_mixin.delete_page(page_id)

        # Assert
        pages_mixin.confluence.remove_page.assert_called_once_with(page_id=page_id)
        assert result is True

    def test_delete_page_error(self, pages_mixin):
        """Test error handling when deleting a page."""
        # Arrange
        page_id = "987654321"
        pages_mixin.confluence.remove_page.side_effect = Exception("API Error")

        # Act/Assert
        with pytest.raises(Exception, match="Failed to delete page"):
            pages_mixin.delete_page(page_id)

    def test_get_page_children_success(self, pages_mixin):
        """Test successfully getting child pages."""
        # Arrange
        parent_id = "123456"
        pages_mixin.config.url = "https://example.atlassian.net/wiki"

        # Mock the response from get_page_child_by_type
        child_pages_data = {
            "results": [
                {
                    "id": "789012",
                    "title": "Child Page 1",
                    "space": {"key": "DEMO"},
                    "version": {"number": 1},
                },
                {
                    "id": "345678",
                    "title": "Child Page 2",
                    "space": {"key": "DEMO"},
                    "version": {"number": 3},
                },
            ]
        }
        pages_mixin.confluence.get_page_child_by_type.return_value = child_pages_data

        # Act
        results = pages_mixin.get_page_children(
            page_id=parent_id, limit=10, expand="version"
        )

        # Assert
        pages_mixin.confluence.get_page_child_by_type.assert_called_once_with(
            page_id=parent_id, type="page", start=0, limit=10, expand="version"
        )

        # Verify the results
        assert len(results) == 2
        assert isinstance(results[0], ConfluencePage)
        assert results[0].id == "789012"
        assert results[0].title == "Child Page 1"
        assert results[1].id == "345678"
        assert results[1].title == "Child Page 2"

    def test_get_page_children_with_content(self, pages_mixin):
        """Test getting child pages with content."""
        # Arrange
        parent_id = "123456"
        pages_mixin.config.url = "https://example.atlassian.net/wiki"

        # Mock the response with body content
        child_pages_data = {
            "results": [
                {
                    "id": "789012",
                    "title": "Child Page With Content",
                    "space": {"key": "DEMO"},
                    "version": {"number": 1},
                    "body": {"storage": {"value": "<p>This is some content</p>"}},
                }
            ]
        }
        pages_mixin.confluence.get_page_child_by_type.return_value = child_pages_data

        # Mock the preprocessor
        pages_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed Markdown",
        )

        # Act
        results = pages_mixin.get_page_children(
            page_id=parent_id, expand="body.storage", convert_to_markdown=True
        )

        # Assert
        assert len(results) == 1
        assert results[0].content == "Processed Markdown"
        pages_mixin.preprocessor.process_html_content.assert_called_once_with(
            "<p>This is some content</p>",
            space_key="DEMO",
            confluence_client=pages_mixin.confluence,
        )

    def test_get_page_children_empty(self, pages_mixin):
        """Test getting child pages when there are none."""
        # Arrange
        parent_id = "123456"

        # Mock empty response
        pages_mixin.confluence.get_page_child_by_type.return_value = {"results": []}

        # Act
        results = pages_mixin.get_page_children(page_id=parent_id)

        # Assert
        assert len(results) == 0

    def test_get_page_children_error(self, pages_mixin):
        """Test error handling when getting child pages."""
        # Arrange
        parent_id = "123456"

        # Mock an exception
        pages_mixin.confluence.get_page_child_by_type.side_effect = Exception(
            "API Error"
        )

        # Act
        results = pages_mixin.get_page_children(page_id=parent_id)

        # Assert - should return empty list on error, not raise exception
        assert len(results) == 0

    def test_get_page_success(self, pages_mixin):
        """Test successful page retrieval."""
        # Setup
        page_id = "12345"
        page_data = {
            "id": page_id,
            "title": "Test Page",
            "body": {"storage": {"value": "<p>Test content</p>"}},
            "version": {"number": 1},
            "space": {"key": "TEST", "name": "Test Space"},
        }
        pages_mixin.confluence.get_page_by_id.return_value = page_data

        # Mock the preprocessor
        pages_mixin.preprocessor.process_html_content.return_value = (
            "<p>Processed HTML</p>",
            "Processed content",
        )

        # Call the method
        result = pages_mixin.get_page_content(page_id)

        # Verify the API call
        pages_mixin.confluence.get_page_by_id.assert_called_once_with(
            page_id=page_id, expand="body.storage,version,space,children.attachment"
        )

        # Verify the result
        assert result.id == page_id
        assert result.title == "Test Page"
        assert result.content == "Processed content"
        assert (
            result.version.number == 1
        )  # Compare version number instead of the whole object
        assert result.space.key == "TEST"
        assert result.space.name == "Test Space"

    def test_create_page_with_markdown(self, pages_mixin):
        """Test creating a new page with markdown content."""
        # Arrange
        space_key = "PROJ"
        title = "New Test Page"
        markdown_body = "# Test Heading\n\nThis is *markdown* content."
        parent_id = "987654321"
        storage_format = (
            "<h1>Test Heading</h1><p>This is <em>markdown</em> content.</p>"
        )

        # Mock the markdown conversion
        pages_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
            storage_format
        )

        # Mock get_page_content to return a ConfluencePage
        with patch.object(
            pages_mixin,
            "get_page_content",
            return_value=ConfluencePage(
                id="123456789",
                title=title,
                content="Converted content",
                space={"key": space_key, "name": "Project"},
            ),
        ):
            # Act
            result = pages_mixin.create_page(
                space_key=space_key,
                title=title,
                body=markdown_body,
                parent_id=parent_id,
                is_markdown=True,
            )

            # Assert
            # Verify markdown was converted
            pages_mixin.preprocessor.markdown_to_confluence_storage.assert_called_once_with(
                markdown_body, enable_heading_anchors=False
            )

            # Verify create_page was called with the converted content
            pages_mixin.confluence.create_page.assert_called_once_with(
                space=space_key,
                title=title,
                body=storage_format,
                parent_id=parent_id,
                representation="storage",
            )

            # Verify result
            assert isinstance(result, ConfluencePage)
            assert result.id == "123456789"
            assert result.title == title

    def test_create_page_with_storage_format(self, pages_mixin):
        """Test creating a page with pre-converted storage format content."""
        # Arrange
        space_key = "PROJ"
        title = "New Test Page"
        storage_body = "<p>Already in storage format</p>"

        # Mock get_page_content
        with patch.object(
            pages_mixin,
            "get_page_content",
            return_value=ConfluencePage(id="123456789", title=title),
        ):
            # Act
            result = pages_mixin.create_page(
                space_key=space_key, title=title, body=storage_body, is_markdown=False
            )

            # Assert
            # Verify conversion was not called
            pages_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()

            # Verify create_page was called with the original content
            pages_mixin.confluence.create_page.assert_called_once_with(
                space=space_key,
                title=title,
                body=storage_body,
                parent_id=None,
                representation="storage",
            )

    def test_update_page_with_markdown(self, pages_mixin):
        """Test updating a page with markdown content."""
        # Arrange
        page_id = "987654321"
        title = "Updated Page"
        markdown_body = "# Updated Content\n\nThis is *updated* content."
        storage_format = (
            "<h1>Updated Content</h1><p>This is <em>updated</em> content.</p>"
        )

        # Mock the markdown conversion
        pages_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
            storage_format
        )

        # Mock get_page_content
        with patch.object(
            pages_mixin,
            "get_page_content",
            return_value=ConfluencePage(
                id=page_id,
                title=title,
                content="Updated content",
                space={"key": "PROJ", "name": "Project"},
            ),
        ):
            # Act
            result = pages_mixin.update_page(
                page_id=page_id,
                title=title,
                body=markdown_body,
                is_minor_edit=True,
                version_comment="Updated test",
                is_markdown=True,
            )

            # Assert
            # Verify markdown was converted
            pages_mixin.preprocessor.markdown_to_confluence_storage.assert_called_once_with(
                markdown_body, enable_heading_anchors=False
            )

            # Verify update_page was called with the converted content
            pages_mixin.confluence.update_page.assert_called_once_with(
                page_id=page_id,
                title=title,
                body=storage_format,
                type="page",
                representation="storage",
                minor_edit=True,
                version_comment="Updated test",
                always_update=True,
            )

    def test_update_page_with_parent_id(self, pages_mixin):
        """Test updating a page and changing its parent."""
        # Arrange
        page_id = "987654321"
        title = "Updated Page"
        body = "<p>Updated content</p>"
        parent_id = "123456789"
        is_minor_edit = False
        version_comment = "Parent changed"

        # Mock get_page_content to return a document
        mock_document = ConfluencePage(
            id=page_id,
            title=title,
            content="Updated content",
            space={"key": "PROJ", "name": "Project"},
            version={"number": 2},
        )
        with patch.object(pages_mixin, "get_page_content", return_value=mock_document):
            # Act
            result = pages_mixin.update_page(
                page_id=page_id,
                title=title,
                body=body,
                is_minor_edit=is_minor_edit,
                version_comment=version_comment,
                is_markdown=False,
                parent_id=parent_id,
            )

            # Assert
            pages_mixin.confluence.update_page.assert_called_once_with(
                page_id=page_id,
                title=title,
                body=body,
                type="page",
                representation="storage",
                minor_edit=is_minor_edit,
                version_comment=version_comment,
                always_update=True,
                parent_id=parent_id,
            )
            assert result.id == page_id
            assert result.title == title
            assert result.version.number == 2

    def test_non_oauth_still_uses_v1_api(self, pages_mixin):
        """Test that non-OAuth authentication still uses v1 API."""
        # This test ensures backward compatibility for API token/basic auth
        # Arrange
        space_key = "PROJ"
        title = "New V1 Test Page"
        body = "<p>Test content for V1</p>"

        # Mock get_page_content to return a ConfluencePage
        with patch.object(
            pages_mixin,
            "get_page_content",
            return_value=ConfluencePage(
                id="v1_123456789",
                title=title,
                content="V1 page content",
                space={"key": space_key, "name": "Project"},
            ),
        ):
            # Act
            result = pages_mixin.create_page(space_key, title, body, is_markdown=False)

            # Assert that v1 API was used
            pages_mixin.confluence.create_page.assert_called_once_with(
                space=space_key,
                title=title,
                body=body,
                parent_id=None,
                representation="storage",
            )

            # Verify result is a ConfluencePage
            assert isinstance(result, ConfluencePage)
            assert result.id == "v1_123456789"
            assert result.title == title


class TestPagesOAuthMixin:
    """Tests for PagesMixin with OAuth authentication."""

    @pytest.fixture
    def oauth_pages_mixin(self, oauth_confluence_client):
        """Create a PagesMixin instance for OAuth testing."""
        # PagesMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = PagesMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = oauth_confluence_client.confluence
            mixin.config = oauth_confluence_client.config
            mixin.preprocessor = oauth_confluence_client.preprocessor
            return mixin

    def test_create_page_oauth_uses_v2_api(self, oauth_pages_mixin):
        """Test that OAuth authentication uses v2 API for creating pages."""
        # Arrange
        space_key = "PROJ"
        title = "New OAuth Test Page"
        body = "<p>Test content for OAuth</p>"
        parent_id = "987654321"

        # Mock the v2 adapter
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceV2Adapter"
        ) as mock_v2_adapter_class:
            mock_v2_adapter = MagicMock()
            mock_v2_adapter_class.return_value = mock_v2_adapter
            mock_v2_adapter.create_page.return_value = {
                "id": "oauth_123456789",
                "title": title,
            }

            # Mock get_page_content to return a ConfluencePage
            with patch.object(
                oauth_pages_mixin,
                "get_page_content",
                return_value=ConfluencePage(
                    id="oauth_123456789",
                    title=title,
                    content="OAuth page content",
                    space={"key": space_key, "name": "Project"},
                ),
            ):
                # Act - specify is_markdown=False since we're directly providing storage format
                result = oauth_pages_mixin.create_page(
                    space_key, title, body, parent_id, is_markdown=False
                )

                # Assert that v2 API was used instead of v1
                mock_v2_adapter.create_page.assert_called_once_with(
                    space_key=space_key,
                    title=title,
                    body=body,
                    parent_id=parent_id,
                    representation="storage",
                )

                # Verify v1 API was NOT called
                oauth_pages_mixin.confluence.create_page.assert_not_called()

                # Verify result is a ConfluencePage
                assert isinstance(result, ConfluencePage)
                assert result.id == "oauth_123456789"

    def test_create_page_oauth_with_wiki_format(self, oauth_pages_mixin):
        """Test that OAuth authentication uses v2 API for creating pages with wiki format."""
        # Arrange
        space_key = "PROJ"
        title = "OAuth Wiki Test Page"
        wiki_body = "h1. OAuth Wiki Test\n\n* Item 1\n* Item 2"

        # Mock the v2 adapter
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceV2Adapter"
        ) as mock_v2_adapter_class:
            mock_v2_adapter = MagicMock()
            mock_v2_adapter_class.return_value = mock_v2_adapter
            mock_v2_adapter.create_page.return_value = {
                "id": "oauth_wiki_123",
                "title": title,
            }

            # Mock get_page_content to return a ConfluencePage
            with patch.object(
                oauth_pages_mixin,
                "get_page_content",
                return_value=ConfluencePage(
                    id="oauth_wiki_123",
                    title=title,
                    content="OAuth wiki page content",
                    space={"key": space_key, "name": "Project"},
                ),
            ):
                # Act - use wiki format
                result = oauth_pages_mixin.create_page(
                    space_key,
                    title,
                    wiki_body,
                    is_markdown=False,
                    content_representation="wiki",
                )

                # Assert that v2 API was used with wiki representation
                mock_v2_adapter.create_page.assert_called_once_with(
                    space_key=space_key,
                    title=title,
                    body=wiki_body,
                    parent_id=None,
                    representation="wiki",
                )

                # Verify v1 API was NOT called
                oauth_pages_mixin.confluence.create_page.assert_not_called()

                # Verify no markdown conversion happened
                oauth_pages_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()

                # Verify result is a ConfluencePage
                assert isinstance(result, ConfluencePage)
                assert result.id == "oauth_wiki_123"
                assert result.title == title

    def test_update_page_oauth_uses_v2_api(self, oauth_pages_mixin):
        """Test that OAuth authentication uses v2 API for updating pages."""
        # Arrange
        page_id = "oauth_987654321"
        title = "Updated OAuth Test Page"
        body = "<p>Updated test content for OAuth</p>"
        version_comment = "OAuth update test"

        # Mock the v2 adapter
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceV2Adapter"
        ) as mock_v2_adapter_class:
            mock_v2_adapter = MagicMock()
            mock_v2_adapter_class.return_value = mock_v2_adapter
            mock_v2_adapter.update_page.return_value = {
                "id": page_id,
                "title": title,
            }

            # Mock get_page_content to return a ConfluencePage
            with patch.object(
                oauth_pages_mixin,
                "get_page_content",
                return_value=ConfluencePage(
                    id=page_id,
                    title=title,
                    content="Updated OAuth page content",
                    version={"number": 2},
                ),
            ):
                # Act - specify is_markdown=False since we're directly providing storage format
                result = oauth_pages_mixin.update_page(
                    page_id,
                    title,
                    body,
                    is_markdown=False,
                    version_comment=version_comment,
                )

                # Assert that v2 API was used instead of v1
                mock_v2_adapter.update_page.assert_called_once_with(
                    page_id=page_id,
                    title=title,
                    body=body,
                    representation="storage",
                    version_comment=version_comment,
                )

                # Verify v1 API was NOT called
                oauth_pages_mixin.confluence.update_page.assert_not_called()

                # Verify result is a ConfluencePage
                assert isinstance(result, ConfluencePage)
                assert result.id == page_id
                assert result.title == title

    def test_get_page_content_oauth_uses_v2_api(self, oauth_pages_mixin):
        """Test that OAuth authentication uses v2 API for getting page content."""
        # Arrange
        page_id = "oauth_get_123"

        # Mock the v2 adapter
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceV2Adapter"
        ) as mock_v2_adapter_class:
            mock_v2_adapter = MagicMock()
            mock_v2_adapter_class.return_value = mock_v2_adapter

            # Mock v2 API response
            mock_v2_adapter.get_page.return_value = {
                "id": page_id,
                "title": "OAuth Test Page",
                "body": {"storage": {"value": "<p>OAuth page content</p>"}},
                "space": {"key": "PROJ", "name": "Project"},
                "version": {"number": 3},
            }

            # Mock the preprocessor
            oauth_pages_mixin.preprocessor.process_html_content.return_value = (
                "<p>Processed HTML</p>",
                "Processed OAuth content",
            )

            # Act
            result = oauth_pages_mixin.get_page_content(
                page_id, convert_to_markdown=True
            )

            # Assert that v2 API was used instead of v1
            mock_v2_adapter.get_page.assert_called_once_with(
                page_id=page_id, expand="body.storage,version,space,children.attachment"
            )

            # Verify v1 API was NOT called
            oauth_pages_mixin.confluence.get_page_by_id.assert_not_called()

            # Verify the preprocessor was called
            oauth_pages_mixin.preprocessor.process_html_content.assert_called_once_with(
                "<p>OAuth page content</p>",
                space_key="PROJ",
                confluence_client=oauth_pages_mixin.confluence,
            )

            # Verify result is a ConfluencePage with correct data
            assert isinstance(result, ConfluencePage)
            assert result.id == page_id
            assert result.title == "OAuth Test Page"
            assert result.content == "Processed OAuth content"
            assert result.space.key == "PROJ"
            assert result.version.number == 3

    def test_delete_page_oauth_uses_v2_api(self, oauth_pages_mixin):
        """Test that OAuth authentication uses v2 API for deleting pages."""
        # Arrange
        page_id = "oauth_delete_123"

        # Mock the v2 adapter
        with patch(
            "mcp_atlassian.confluence.pages.ConfluenceV2Adapter"
        ) as mock_v2_adapter_class:
            mock_v2_adapter = MagicMock()
            mock_v2_adapter_class.return_value = mock_v2_adapter
            mock_v2_adapter.delete_page.return_value = True

            # Act
            result = oauth_pages_mixin.delete_page(page_id)

            # Assert that v2 API was used instead of v1
            mock_v2_adapter.delete_page.assert_called_once_with(page_id=page_id)

            # Verify v1 API was NOT called
            oauth_pages_mixin.confluence.remove_page.assert_not_called()

            # Verify result
            assert result is True

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/epics.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira epic operations."""

import logging
from typing import Any

from ..models.jira import JiraIssue
from .client import JiraClient
from .protocols import (
    FieldsOperationsProto,
    IssueOperationsProto,
    SearchOperationsProto,
    UsersOperationsProto,
)

logger = logging.getLogger("mcp-jira")


class EpicsMixin(
    JiraClient,
    FieldsOperationsProto,
    IssueOperationsProto,
    SearchOperationsProto,
    UsersOperationsProto,
):
    """Mixin for Jira epic operations."""

    def _try_discover_fields_from_existing_epic(
        self, field_ids: dict[str, str]
    ) -> None:
        """
        Try to discover Epic fields from existing epics in the Jira instance.

        This is a fallback method used when standard field discovery doesn't find
        all the necessary Epic-related fields. It searches for an existing Epic and
        analyzes its field structure to identify Epic fields dynamically.

        Args:
            field_ids: Dictionary of field IDs to update with discovered fields
        """
        try:
            # If we already have both epic fields, no need to search
            if all(field in field_ids for field in ["epic_name", "epic_link"]):
                return

            # Find an Epic in the system
            epics_jql = "issuetype = Epic ORDER BY created DESC"
            results = self.jira.jql(epics_jql, fields="*all", limit=1)
            if not isinstance(results, dict):
                msg = f"Unexpected return value type from `jira.jql`: {type(results)}"
                logger.error(msg)
                raise TypeError(msg)

            # If no epics found, we can't use this method
            if not results or not results.get("issues"):
                logger.warning("No existing Epics found to analyze field structure")
                return

            # Get the most recent Epic
            epic = results["issues"][0]
            fields = epic.get("fields", {})
            logger.debug(f"Found existing Epic {epic.get('key')} to analyze")

            # Look for Epic Name and other Epic fields
            for field_id, value in fields.items():
                if not field_id.startswith("customfield_"):
                    continue

                # Analyze the field value to determine what it might be
                if isinstance(value, str) and field_id not in field_ids.values():
                    # Epic Name is typically a string value
                    if "epic_name" not in field_ids and 3 <= len(value) <= 255:
                        field_ids["epic_name"] = field_id
                        logger.debug(
                            f"Discovered possible Epic Name field from existing Epic: {field_id}"
                        )

                # Look for color-related values (typically a string like "green", "blue", etc.)
                elif isinstance(value, str) and value.lower() in [
                    "green",
                    "blue",
                    "red",
                    "yellow",
                    "orange",
                    "purple",
                ]:
                    if "epic_color" not in field_ids:
                        field_ids["epic_color"] = field_id
                        logger.debug(
                            f"Discovered possible Epic Color field from existing Epic: {field_id}"
                        )
                # Look for fields that might be used for Epic Link
                elif value is not None and "epic_link" not in field_ids:
                    # Epic Link typically references another issue key or ID
                    try:
                        # Sometimes the value itself is a string containing a key
                        if isinstance(value, str) and "-" in value and len(value) < 20:
                            field_ids["epic_link"] = field_id
                            logger.debug(
                                f"Discovered possible Epic Link field from existing Epic: {field_id}"
                            )
                    except Exception as e:
                        logger.debug(
                            f"Error analyzing potential Epic Link field: {str(e)}"
                        )

            logger.debug(
                f"Updated field IDs after analyzing existing Epic: {field_ids}"
            )

        except Exception as e:
            logger.error(f"Error discovering fields from existing Epic: {str(e)}")

    def prepare_epic_fields(
        self,
        fields: dict[str, Any],
        summary: str,
        kwargs: dict[str, Any],
        project_key: str = None,
    ) -> None:
        """
        Prepare epic-specific fields for issue creation.

        Args:
            fields: The fields dictionary to update
            summary: The issue summary that can be used as a default epic name
            kwargs: Additional fields from the user
            project_key: Optional project key for checking field requirements
        """
        try:
            # Get all field IDs
            field_ids = self.get_field_ids_to_epic()
            logger.info(f"Discovered Jira field IDs for Epic creation: {field_ids}")

            # Get required fields for Epic issue type if project_key provided
            required_fields = {}
            if project_key:
                try:
                    required_fields = self.get_required_fields("Epic", project_key)
                    logger.debug(
                        f"Required fields for Epic in project {project_key}: {list(required_fields.keys())}"
                    )
                except Exception as e:
                    logger.warning(f"Could not check field requirements: {e}")

            # Extract and handle epic_name
            epic_name_field = self._get_epic_name_field_id(field_ids)
            if epic_name_field:
                # Get epic name value
                epic_name = kwargs.pop(
                    "epic_name", kwargs.pop("epicName", summary)
                )  # Use summary as default if epic_name not provided

                # Check if this field is required
                if epic_name_field in required_fields:
                    # Add to fields for initial creation
                    fields[epic_name_field] = epic_name
                    logger.info(
                        f"Adding required Epic Name ({epic_name_field}: {epic_name}) to creation fields"
                    )
                else:
                    # Store for post-creation update as before
                    kwargs["__epic_name_field"] = epic_name_field
                    kwargs["__epic_name_value"] = epic_name
                    logger.info(
                        f"Storing optional Epic Name ({epic_name_field}: {epic_name}) for post-creation update"
                    )

            # Extract and handle epic_color
            epic_color_field = self._get_epic_color_field_id(field_ids)
            if epic_color_field:
                epic_color = (
                    kwargs.pop("epic_color", None)
                    or kwargs.pop("epicColor", None)
                    or kwargs.pop("epic_colour", None)
                    or "green"  # Default color
                )

                # Check if this field is required
                if epic_color_field in required_fields:
                    # Add to fields for initial creation
                    fields[epic_color_field] = epic_color
                    logger.info(
                        f"Adding required Epic Color ({epic_color_field}: {epic_color}) to creation fields"
                    )
                else:
                    # Store for post-creation update
                    kwargs["__epic_color_field"] = epic_color_field
                    kwargs["__epic_color_value"] = epic_color
                    logger.info(
                        f"Storing optional Epic Color ({epic_color_field}: {epic_color}) for post-creation update"
                    )

            # Handle any other epic-related fields
            for key, value in list(kwargs.items()):
                if key.startswith("epic_") and key in field_ids:
                    field_key = key.replace("epic_", "")
                    field_id = field_ids[key]

                    # Check if this field is required
                    if field_id in required_fields:
                        # Add to fields for initial creation
                        fields[field_id] = value
                        logger.info(
                            f"Adding required Epic field ({field_id} from {key}: {value}) to creation fields"
                        )
                    else:
                        # Store for post-creation update
                        kwargs[f"__epic_{field_key}_field"] = field_id
                        kwargs[f"__epic_{field_key}_value"] = value
                        logger.info(
                            f"Storing optional Epic field ({field_id} from {key}: {value}) for post-creation update"
                        )
                    kwargs.pop(key)  # Remove from kwargs to avoid duplicate processing

            # Warn if epic_name field is required but wasn't discovered
            if not epic_name_field:
                logger.warning(
                    "Could not find Epic Name field ID. Epic creation may fail. "
                    "Consider setting it explicitly in your kwargs."
                )

        except Exception as e:
            logger.error(f"Error preparing Epic-specific fields: {str(e)}")

    def _get_epic_name_field_id(self, field_ids: dict[str, str]) -> str | None:
        """
        Get the field ID for Epic Name, using multiple strategies.

        Args:
            field_ids: Dictionary of field IDs

        Returns:
            The field ID for Epic Name if found, None otherwise
        """
        # Strategy 1: Check if already discovered by get_field_ids_to_epic
        if "epic_name" in field_ids:
            return field_ids["epic_name"]
        if "Epic Name" in field_ids:
            return field_ids["Epic Name"]

        # Strategy 2: Check common field IDs used across different Jira instances
        common_ids = ["customfield_10011", "customfield_10005", "customfield_10004"]
        for field_id in common_ids:
            if field_id in field_ids.values():
                logger.debug(f"Using common Epic Name field ID: {field_id}")
                return field_id

        # Strategy 3: Try to find by dynamic name pattern in available fields
        for key, value in field_ids.items():
            if (
                "epic" in key.lower() and "name" in key.lower()
            ) or "epicname" in key.lower():
                logger.debug(f"Found potential Epic Name field by pattern: {value}")
                return value

        logger.debug("Could not determine Epic Name field ID")
        return None

    def _get_epic_color_field_id(self, field_ids: dict[str, str]) -> str | None:
        """
        Get the field ID for Epic Color, using multiple strategies.

        Args:
            field_ids: Dictionary of field IDs

        Returns:
            The field ID for Epic Color if found, None otherwise
        """
        # Strategy 1: Check if already discovered by get_field_ids_to_epic
        if "epic_color" in field_ids:
            return field_ids["epic_color"]
        if "epic_colour" in field_ids:
            return field_ids["epic_colour"]

        # Strategy 2: Check common field IDs
        common_ids = ["customfield_10012", "customfield_10013"]
        for field_id in common_ids:
            if field_id in field_ids.values():
                logger.debug(f"Using common Epic Color field ID: {field_id}")
                return field_id

        # Strategy 3: Find by dynamic name pattern
        for key, value in field_ids.items():
            if "epic" in key.lower() and (
                "color" in key.lower() or "colour" in key.lower()
            ):
                logger.debug(f"Found potential Epic Color field by pattern: {value}")
                return value

        logger.debug("Could not determine Epic Color field ID")
        return None

    def link_issue_to_epic(self, issue_key: str, epic_key: str) -> JiraIssue:
        """
        Link an existing issue to an epic.

        Args:
            issue_key: The key of the issue to link (e.g. 'PROJ-123')
            epic_key: The key of the epic to link to (e.g. 'PROJ-456')

        Returns:
            JiraIssue: The updated issue

        Raises:
            ValueError: If the epic_key is not an actual epic
            Exception: If there is an error linking the issue to the epic
        """
        try:
            # Verify that both issue and epic exist
            issue = self.jira.get_issue(issue_key)
            epic = self.jira.get_issue(epic_key)
            if not isinstance(issue, dict):
                msg = (
                    f"Unexpected return value type from `jira.get_issue`: {type(issue)}"
                )
                logger.error(msg)
                raise TypeError(msg)
            if not isinstance(epic, dict):
                msg = (
                    f"Unexpected return value type from `jira.get_issue`: {type(epic)}"
                )
                logger.error(msg)
                raise TypeError(msg)

            # Check if the epic key corresponds to an actual epic
            fields = epic.get("fields", {})
            issue_type = fields.get("issuetype", {}).get("name", "").lower()

            if issue_type != "epic":
                error_msg = f"Error linking issue to epic: {epic_key} is not an Epic"
                raise ValueError(error_msg)

            # Get the dynamic field IDs for this Jira instance
            field_ids = self.get_field_ids_to_epic()

            # Try the parent field first (if discovered or natively supported)
            if "parent" in field_ids or "parent" not in field_ids:
                try:
                    fields = {"parent": {"key": epic_key}}
                    self.jira.update_issue(
                        issue_key=issue_key, update={"fields": fields}
                    )
                    logger.info(
                        f"Successfully linked {issue_key} to {epic_key} using parent field"
                    )
                    return self.get_issue(issue_key)
                except Exception as e:
                    logger.info(
                        f"Couldn't link using parent field: {str(e)}. Trying discovered fields..."
                    )

            # Try using the discovered Epic Link field
            if "epic_link" in field_ids:
                try:
                    epic_link_fields: dict[str, str] = {
                        field_ids["epic_link"]: epic_key
                    }
                    self.jira.update_issue(
                        issue_key=issue_key, update={"fields": epic_link_fields}
                    )
                    logger.info(
                        f"Successfully linked {issue_key} to {epic_key} using discovered epic_link field: {field_ids['epic_link']}"
                    )
                    return self.get_issue(issue_key)
                except Exception as e:
                    logger.info(
                        f"Couldn't link using discovered epic_link field: {str(e)}. Trying fallback methods..."
                    )

            # Fallback to common custom fields if dynamic discovery didn't work
            custom_field_attempts: list[dict[str, str]] = [
                {"customfield_10014": epic_key},  # Common in Jira Cloud
                {"customfield_10008": epic_key},  # Common in Jira Server
                {"customfield_10000": epic_key},  # Also common
                {"customfield_11703": epic_key},  # Known from previous error
                {"epic_link": epic_key},  # Sometimes used
            ]

            for fields in custom_field_attempts:
                try:
                    self.jira.update_issue(
                        issue_key=issue_key, update={"fields": fields}
                    )
                    field_id = list(fields.keys())[0]
                    logger.info(
                        f"Successfully linked {issue_key} to {epic_key} using field: {field_id}"
                    )

                    # If we get here, it worked - update our cached field ID
                    if self._field_ids_cache is None:
                        self._field_ids_cache = []
                    self._field_ids_cache.append({"id": field_id, "name": "epic_link"})
                    return self.get_issue(issue_key)
                except Exception as e:
                    logger.info(f"Couldn't link using fields {fields}: {str(e)}")
                    continue

            # Method 2: Try to use direct issue linking (relates to, etc.)
            try:
                logger.info(
                    f"Attempting to create issue link between {issue_key} and {epic_key}"
                )
                link_data = {
                    "type": {"name": "Relates to"},
                    "inwardIssue": {"key": issue_key},
                    "outwardIssue": {"key": epic_key},
                }
                self.jira.create_issue_link(link_data)
                logger.info(
                    f"Created relationship link between {issue_key} and {epic_key}"
                )
                return self.get_issue(issue_key)
            except Exception as link_error:
                logger.error(f"Error creating issue link: {str(link_error)}")

            # If we get here, none of our attempts worked
            raise ValueError(
                f"Could not link issue {issue_key} to epic {epic_key}. Your Jira instance might use a different field for epic links."
            )

        except ValueError:
            # Re-raise ValueError as is
            raise
        except Exception as e:
            logger.error(f"Error linking {issue_key} to epic {epic_key}: {str(e)}")
            # Ensure exception messages follow the expected format for tests
            if "API error" in str(e):
                raise Exception(f"Error linking issue to epic: {str(e)}")
            raise

    def get_epic_issues(
        self, epic_key: str, start: int = 0, limit: int = 50
    ) -> list[JiraIssue]:
        """
        Get all issues linked to a specific epic.

        Args:
            epic_key: The key of the epic (e.g. 'PROJ-123')
            start: Starting index for pagination
            limit: Maximum number of issues to return

        Returns:
            List of JiraIssue models representing the issues linked to the epic

        Raises:
            ValueError: If the issue is not an Epic
            Exception: If there is an error getting epic issues
        """
        try:
            # First, check if the issue is an Epic
            epic = self.jira.get_issue(epic_key)
            if not isinstance(epic, dict):
                msg = (
                    f"Unexpected return value type from `jira.get_issue`: {type(epic)}"
                )
                logger.error(msg)
                raise TypeError(msg)
            fields_data = epic.get("fields", {})

            # Check if the issue is an Epic
            issuetype_data = fields_data.get("issuetype", {})
            issue_type_name = issuetype_data.get("name", "")

            # Check if it's an Epic by looking for "epic" in the name (case-insensitive)
            # This handles localized names like "에픽", "エピック", etc.
            if "epic" not in issue_type_name.lower() and issue_type_name not in [
                "에픽",
                "エピック",
            ]:
                # Try to verify via JQL as a fallback
                is_epic = False
                try:
                    verify_jql = f'key = "{epic_key}" AND issuetype = Epic'
                    verify_result = self.search_issues(verify_jql, limit=1)
                    if verify_result and len(verify_result.issues) > 0:
                        is_epic = True
                except Exception as e:
                    # If JQL fails, stick with our previous determination
                    logger.debug(f"JQL verification failed: {e}")

                if not is_epic:
                    error_msg = (
                        f"Issue {epic_key} is not an Epic, it is a "
                        f"{issue_type_name or 'unknown type'}"
                    )
                    raise ValueError(error_msg)

            # Track which methods we've tried
            tried_methods = set()
            issues = []

            # Find the Epic Link field
            field_ids = self.get_field_ids_to_epic()
            epic_link_field = self._find_epic_link_field(field_ids)

            # METHOD 1: Try with 'issueFunction in issuesScopedToEpic' - this works in many Jira instances
            if "issueFunction" not in tried_methods:
                tried_methods.add("issueFunction")
                try:
                    jql = f'issueFunction in issuesScopedToEpic("{epic_key}")'
                    logger.info(f"Trying to get epic issues with issueFunction: {jql}")

                    search_result = self.search_issues(jql, start=start, limit=limit)
                    if search_result:
                        logger.info(
                            f"Successfully found {len(search_result.issues)} issues for epic {epic_key} using issueFunction"
                        )
                        return search_result.issues
                except Exception as e:
                    # Log exception but continue with fallback
                    logger.warning(
                        f"Error searching epic issues with issueFunction: {str(e)}"
                    )

            # METHOD 2: Try using parent relationship - common in many Jira setups
            if "parent" not in tried_methods:
                tried_methods.add("parent")
                try:
                    jql = f'parent = "{epic_key}"'
                    logger.info(
                        f"Trying to get epic issues with parent relationship: {jql}"
                    )
                    issues = self._get_epic_issues_by_jql(epic_key, jql, start, limit)
                    if issues:
                        logger.info(
                            f"Successfully found {len(issues)} issues for epic {epic_key} using parent relationship"
                        )
                        return issues
                except Exception as parent_error:
                    logger.warning(
                        f"Error with parent relationship approach: {str(parent_error)}"
                    )

            # METHOD 3: If we found an Epic Link field, try using it
            if epic_link_field and "epicLinkField" not in tried_methods:
                tried_methods.add("epicLinkField")
                try:
                    jql = f'"{epic_link_field}" = "{epic_key}"'
                    logger.info(
                        f"Trying to get epic issues with epic link field: {jql}"
                    )
                    issues = self._get_epic_issues_by_jql(epic_key, jql, start, limit)
                    if issues:
                        logger.info(
                            f"Successfully found {len(issues)} issues for epic {epic_key} using epic link field {epic_link_field}"
                        )
                        return issues
                except Exception as e:
                    logger.warning(
                        f"Error using epic link field {epic_link_field}: {str(e)}"
                    )

            # METHOD 4: Try using 'Epic Link' as a textual field name
            if "epicLinkName" not in tried_methods:
                tried_methods.add("epicLinkName")
                try:
                    jql = f'"Epic Link" = "{epic_key}"'
                    logger.info(
                        f"Trying to get epic issues with 'Epic Link' field name: {jql}"
                    )
                    issues = self._get_epic_issues_by_jql(epic_key, jql, start, limit)
                    if issues:
                        logger.info(
                            f"Successfully found {len(issues)} issues for epic {epic_key} using 'Epic Link' field name"
                        )
                        return issues
                except Exception as e:
                    logger.warning(f"Error using 'Epic Link' field name: {str(e)}")

            # METHOD 5: Try using issue links with a specific link type
            if "issueLinks" not in tried_methods:
                tried_methods.add("issueLinks")
                try:
                    # Try to find issues linked to this epic with standard link types
                    link_types = ["relates to", "blocks", "is blocked by", "is part of"]
                    for link_type in link_types:
                        jql = f'issueLink = "{link_type}" and issueLink = "{epic_key}"'
                        logger.info(
                            f"Trying to get epic issues with issue links: {jql}"
                        )
                        try:
                            issues = self._get_epic_issues_by_jql(
                                epic_key, jql, start, limit
                            )
                            if issues:
                                logger.info(
                                    f"Successfully found {len(issues)} issues for epic {epic_key} using issue links with type '{link_type}'"
                                )
                                return issues
                        except Exception:
                            # Just try the next link type
                            continue
                except Exception as e:
                    logger.warning(f"Error using issue links approach: {str(e)}")

            # METHOD 6: Last resort - try each common Epic Link field ID directly
            if "commonFields" not in tried_methods:
                tried_methods.add("commonFields")
                common_epic_fields = [
                    "customfield_10014",
                    "customfield_10008",
                    "customfield_10100",
                    "customfield_10001",
                    "customfield_10002",
                    "customfield_10003",
                    "customfield_10004",
                    "customfield_10005",
                    "customfield_10006",
                    "customfield_10007",
                    "customfield_11703",
                ]

                for field_id in common_epic_fields:
                    try:
                        jql = f'"{field_id}" = "{epic_key}"'
                        logger.info(
                            f"Trying to get epic issues with common field ID: {jql}"
                        )
                        issues = self._get_epic_issues_by_jql(
                            epic_key, jql, start, limit
                        )
                        if issues:
                            logger.info(
                                f"Successfully found {len(issues)} issues for epic {epic_key} using field ID {field_id}"
                            )
                            # Cache this successful field ID for future use
                            if self._field_ids_cache is None:
                                self._field_ids_cache = []
                            self._field_ids_cache.append(
                                {"id": field_id, "name": "epic_link"}
                            )
                            return issues
                    except Exception:
                        # Just try the next field ID
                        continue

            # If we've tried everything and found no issues, return an empty list
            logger.warning(
                f"No issues found for epic {epic_key} after trying multiple approaches"
            )
            return []

        except ValueError:
            # Re-raise ValueError (like "not an Epic") as is
            raise
        except Exception as e:
            # Wrap other exceptions
            logger.error(f"Error getting issues for epic {epic_key}: {str(e)}")
            raise Exception(f"Error getting epic issues: {str(e)}") from e

    def _find_epic_link_field(self, field_ids: dict[str, str]) -> str | None:
        """
        Find the Epic Link field with fallback mechanisms.

        Args:
            field_ids: Dictionary of field IDs

        Returns:
            The field ID for Epic Link if found, None otherwise
        """
        # First try the standard field name with case-insensitive matching
        for name in ["epic_link", "epiclink", "Epic Link", "epic link", "EPIC LINK"]:
            if name in field_ids:
                logger.info(
                    f"Found Epic Link field by name: {name} -> {field_ids[name]}"
                )
                return field_ids[name]

        # Next, look for any field ID that contains "epic" and "link"
        # (case-insensitive) in the name
        for field_name, field_id in field_ids.items():
            if (
                isinstance(field_name, str)
                and "epic" in field_name.lower()
                and "link" in field_name.lower()
            ):
                logger.info(
                    f"Found potential Epic Link field: {field_name} -> {field_id}"
                )
                return field_id

        # Look for any customfield that might be an epic link
        # Common epic link field IDs across different Jira instances
        known_epic_fields = [
            "customfield_10014",  # Common in Jira Cloud
            "customfield_10008",  # Common in Jira Server
            "customfield_10100",
            "customfield_10001",
            "customfield_10002",
            "customfield_10003",
            "customfield_10004",
            "customfield_10005",
            "customfield_10006",
            "customfield_10007",
            "customfield_11703",  # Added based on error message
        ]

        # Check if any of these known fields exist in our field IDs values
        for field_id in known_epic_fields:
            if field_id in field_ids.values():
                logger.info(f"Using known epic link field ID: {field_id}")
                return field_id

        # Try with common system names for epic link field
        system_names = ["system.epic-link", "com.pyxis.greenhopper.jira:gh-epic-link"]
        for name in system_names:
            if name in field_ids:
                logger.info(
                    f"Found Epic Link field by system name: {name} -> {field_ids[name]}"
                )
                return field_ids[name]

        # If we still can't find it, try to detect it from issue links
        try:
            # Try to find an existing epic
            epics = self._find_sample_epic()
            if epics:
                epic_key = epics[0].get("key")
                if not isinstance(epic_key, str):
                    msg = f"Unexpected return value type from `_find_sample_epic`: {type(epic_key)}"
                    logger.error(msg)
                    raise TypeError(msg)

                # Try to find issues linked to this epic
                issues = self._find_issues_linked_to_epic(epic_key)
                for issue in issues:
                    # Check fields for any that contain the epic key
                    fields = issue.get("fields", {})
                    for field_id, value in fields.items():
                        if (
                            field_id.startswith("customfield_")
                            and isinstance(value, str)
                            and value == epic_key
                        ):
                            logger.info(
                                f"Detected epic link field {field_id} from linked issue"
                            )
                            return field_id
        except Exception as e:
            logger.warning(f"Error detecting epic link field from issues: {str(e)}")

        # As a last resort, look for any customfield that starts with customfield_
        # and has "epic" in its schema name or description
        try:
            all_fields = self.jira.get_all_fields()
            if not isinstance(all_fields, list):
                msg = f"Unexpected return value type from `jira.get_all_fields`: {type(all_fields)}"
                logger.error(msg)
                raise TypeError(msg)

            for field in all_fields:
                field_id = field.get("id", "")
                schema = field.get("schema", {})
                custom_type = schema.get("custom", "")
                if field_id.startswith("customfield_") and (
                    "epic" in custom_type.lower()
                    or "epic" in field.get("name", "").lower()
                    or "epic" in field.get("description", "").lower()
                ):
                    logger.info(
                        f"Found potential Epic Link field by schema inspection: {field_id}"
                    )
                    return field_id
        except Exception as e:
            logger.warning(
                f"Error during schema inspection for Epic Link field: {str(e)}"
            )

        # No Epic Link field found
        logger.warning("Could not determine Epic Link field with any method")
        return None

    def _find_sample_epic(self) -> list[dict]:
        """
        Find a sample epic to use for detecting the epic link field.

        Returns:
            List of epics found
        """
        try:
            # Search for issues with type=Epic
            jql = "issuetype = Epic ORDER BY updated DESC"
            response = self.jira.jql(jql, limit=1)
            if not isinstance(response, dict):
                msg = f"Unexpected return value type from `jira.jql`: {type(response)}"
                logger.error(msg)
                raise TypeError(msg)

            if response and "issues" in response and response["issues"]:
                return response["issues"]
        except Exception as e:
            logger.warning(f"Error finding sample epic: {str(e)}")
        return []

    def _find_issues_linked_to_epic(self, epic_key: str) -> list[dict]:
        """
        Find issues linked to a specific epic.

        Args:
            epic_key: The key of the epic

        Returns:
            List of issues found
        """
        try:
            # Try several JQL formats to find linked issues
            for query in [
                f"'Epic Link' = {epic_key}",
                f"'Epic' = {epic_key}",
                f"parent = {epic_key}",
                f"issueFunction in issuesScopedToEpic('{epic_key}')",
            ]:
                try:
                    response = self.jira.jql(query, limit=5)
                    if not isinstance(response, dict):
                        msg = f"Unexpected return value type from `jira.jql`: {type(response)}"
                        logger.error(msg)
                        raise TypeError(msg)
                    if response.get("issues"):
                        return response["issues"]
                except Exception:
                    # Try next query format
                    continue
        except Exception as e:
            logger.warning(f"Error finding issues linked to epic {epic_key}: {str(e)}")
        return []

    def _get_epic_issues_by_jql(
        self, epic_key: str, jql: str, start: int, limit: int
    ) -> list[JiraIssue]:
        """
        Helper method to get issues using a JQL query.

        Args:
            epic_key: The key of the epic
            jql: JQL query to execute
            start: Starting index for pagination
            limit: Maximum number of issues to return

        Returns:
            List of JiraIssue models
        """

        search_result = self.search_issues(jql, start=start, limit=limit)
        if not search_result:
            logger.warning(f"No issues found for epic {epic_key} with query: {jql}")
        return search_result.issues

    def update_epic_fields(self, issue_key: str, kwargs: dict[str, Any]) -> JiraIssue:
        """
        Update Epic-specific fields after Epic creation.

        This method implements the second step of the two-step Epic creation process,
        applying Epic-specific fields that may be rejected during initial creation
        due to screen configuration restrictions.

        Args:
            issue_key: The key of the created Epic
            kwargs: Dictionary containing special keys with Epic field information

        Returns:
            JiraIssue: The updated Epic

        Raises:
            Exception: If there is an error updating the Epic fields
        """
        try:
            # Extract Epic fields from kwargs
            update_fields = {}

            # Process Epic Name field
            if "__epic_name_field" in kwargs and "__epic_name_value" in kwargs:
                epic_name_field = kwargs.pop("__epic_name_field")
                epic_name_value = kwargs.pop("__epic_name_value")
                update_fields[epic_name_field] = epic_name_value
                logger.info(
                    f"Updating Epic Name field ({epic_name_field}): {epic_name_value}"
                )

            # Process Epic Color field
            if "__epic_color_field" in kwargs and "__epic_color_value" in kwargs:
                epic_color_field = kwargs.pop("__epic_color_field")
                epic_color_value = kwargs.pop("__epic_color_value")
                update_fields[epic_color_field] = epic_color_value
                logger.info(
                    f"Updating Epic Color field ({epic_color_field}): {epic_color_value}"
                )

            # Process any other stored Epic fields
            epic_field_keys = [
                k for k in kwargs if k.startswith("__epic_") and k.endswith("_field")
            ]
            for field_key in epic_field_keys:
                # Get corresponding value key
                value_key = field_key.replace("_field", "_value")
                if value_key in kwargs:
                    field_id = kwargs.pop(field_key)
                    field_value = kwargs.pop(value_key)
                    update_fields[field_id] = field_value
                    logger.info(f"Updating Epic field ({field_id}): {field_value}")

            # If we have fields to update, make the API call
            if update_fields:
                logger.info(f"Updating Epic {issue_key} with fields: {update_fields}")
                try:
                    # First try using the generic update_issue method
                    self.jira.update_issue(issue_key, update={"fields": update_fields})
                    logger.info(
                        f"Successfully updated Epic {issue_key} with Epic-specific fields"
                    )
                except Exception as update_error:
                    # Log the error but don't raise yet - try alternative approaches
                    logger.warning(
                        f"Error updating Epic with primary method: {str(update_error)}"
                    )

                    # Try updating fields one by one as fallback
                    success = False
                    for field_id, field_value in update_fields.items():
                        try:
                            self.jira.update_issue(
                                issue_key, update={"fields": {field_id: field_value}}
                            )
                            logger.info(
                                f"Successfully updated Epic field {field_id} with value {field_value}"
                            )
                            success = True
                        except Exception as field_error:
                            logger.warning(
                                f"Failed to update Epic field {field_id}: {str(field_error)}"
                            )

                    # If even individual updates failed, log but continue
                    if not success:
                        logger.error(
                            f"Failed to update Epic {issue_key} with Epic-specific fields. "
                            f"The Epic was created but may be missing required attributes."
                        )

            # Return the updated Epic
            return self.get_issue(issue_key)

        except Exception as e:
            logger.error(f"Error in update_epic_fields: {str(e)}")
            # Return the Epic even if the update failed
            return self.get_issue(issue_key)

```
Page 8/10FirstPrevNextLast