#
tokens: 47383/50000 9/194 files (page 6/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 6 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/unit/confluence/test_users.py:
--------------------------------------------------------------------------------

```python
"""Unit tests for the Confluence users module."""

from unittest.mock import MagicMock, patch

import pytest
from requests.exceptions import HTTPError

from mcp_atlassian.confluence.users import UsersMixin
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError


class TestUsersMixin:
    """Tests for the UsersMixin class."""

    @pytest.fixture
    def users_mixin(self, confluence_client):
        """Create a UsersMixin instance for testing."""
        # UsersMixin inherits from ConfluenceClient, so we need to create it properly
        with patch(
            "mcp_atlassian.confluence.users.ConfluenceClient.__init__"
        ) as mock_init:
            mock_init.return_value = None
            mixin = UsersMixin()
            # Copy the necessary attributes from our mocked client
            mixin.confluence = confluence_client.confluence
            mixin.config = confluence_client.config
            return mixin

    # Mock user data for different scenarios
    @pytest.fixture
    def mock_user_data_cloud(self):
        """Mock user data for Confluence Cloud."""
        return {
            "accountId": "5b10ac8d82e05b22cc7d4ef5",
            "accountType": "atlassian",
            "email": "[email protected]",
            "publicName": "Test User",
            "displayName": "Test User",
            "profilePicture": {
                "path": "/wiki/aa-avatar/5b10ac8d82e05b22cc7d4ef5",
                "width": 48,
                "height": 48,
                "isDefault": False,
            },
            "isExternalCollaborator": False,
            "accountStatus": "active",
        }

    @pytest.fixture
    def mock_user_data_server(self):
        """Mock user data for Confluence Server/DC."""
        return {
            "username": "testuser",
            "userKey": "testuser-key-12345",
            "displayName": "Test User",
            "fullName": "Test User Full Name",
            "email": "[email protected]",
            "status": "active",
        }

    @pytest.fixture
    def mock_user_data_with_status(self):
        """Mock user data with status expansion."""
        return {
            "accountId": "5b10ac8d82e05b22cc7d4ef5",
            "accountType": "atlassian",
            "email": "[email protected]",
            "publicName": "Test User",
            "displayName": "Test User",
            "accountStatus": "active",
            "status": "Active",  # Expanded status field
        }

    @pytest.fixture
    def mock_current_user_data(self):
        """Mock current user data for get_current_user_info."""
        return {
            "accountId": "5b10ac8d82e05b22cc7d4ef5",
            "type": "known",
            "accountType": "atlassian",
            "email": "[email protected]",
            "publicName": "Current User",
            "displayName": "Current User",
            "profilePicture": {
                "path": "/wiki/aa-avatar/5b10ac8d82e05b22cc7d4ef5",
                "width": 48,
                "height": 48,
                "isDefault": False,
            },
            "isExternalCollaborator": False,
            "isGuest": False,
            "locale": "en_US",
            "accountStatus": "active",
        }

    def test_get_user_details_by_accountid_success(
        self, users_mixin, mock_user_data_cloud
    ):
        """Test successfully getting user details by account ID."""
        # Arrange
        account_id = "5b10ac8d82e05b22cc7d4ef5"
        users_mixin.confluence.get_user_details_by_accountid.return_value = (
            mock_user_data_cloud
        )

        # Act
        result = users_mixin.get_user_details_by_accountid(account_id)

        # Assert
        users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
            account_id, None
        )
        assert result == mock_user_data_cloud
        assert result["accountId"] == account_id
        assert result["displayName"] == "Test User"

    def test_get_user_details_by_accountid_with_expand(
        self, users_mixin, mock_user_data_with_status
    ):
        """Test getting user details by account ID with status expansion."""
        # Arrange
        account_id = "5b10ac8d82e05b22cc7d4ef5"
        expand = "status"
        users_mixin.confluence.get_user_details_by_accountid.return_value = (
            mock_user_data_with_status
        )

        # Act
        result = users_mixin.get_user_details_by_accountid(account_id, expand=expand)

        # Assert
        users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
            account_id, expand
        )
        assert result == mock_user_data_with_status
        assert result["status"] == "Active"
        assert result["accountStatus"] == "active"

    def test_get_user_details_by_accountid_invalid_account_id(self, users_mixin):
        """Test getting user details with invalid account ID."""
        # Arrange
        invalid_account_id = "invalid-account-id"
        users_mixin.confluence.get_user_details_by_accountid.side_effect = Exception(
            "User not found"
        )

        # Act/Assert
        with pytest.raises(Exception, match="User not found"):
            users_mixin.get_user_details_by_accountid(invalid_account_id)

    def test_get_user_details_by_username_success(
        self, users_mixin, mock_user_data_server
    ):
        """Test successfully getting user details by username."""
        # Arrange
        username = "testuser"
        users_mixin.confluence.get_user_details_by_username.return_value = (
            mock_user_data_server
        )

        # Act
        result = users_mixin.get_user_details_by_username(username)

        # Assert
        users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
            username, None
        )
        assert result == mock_user_data_server
        assert result["username"] == username
        assert result["displayName"] == "Test User"

    def test_get_user_details_by_username_with_expand(
        self, users_mixin, mock_user_data_server
    ):
        """Test getting user details by username with status expansion."""
        # Arrange
        username = "testuser"
        expand = "status"
        mock_data_with_status = mock_user_data_server.copy()
        mock_data_with_status["status"] = "Active"
        users_mixin.confluence.get_user_details_by_username.return_value = (
            mock_data_with_status
        )

        # Act
        result = users_mixin.get_user_details_by_username(username, expand=expand)

        # Assert
        users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
            username, expand
        )
        assert result == mock_data_with_status
        assert result["status"] == "Active"

    def test_get_user_details_by_username_invalid_username(self, users_mixin):
        """Test getting user details with invalid username."""
        # Arrange
        invalid_username = "nonexistent-user"
        users_mixin.confluence.get_user_details_by_username.side_effect = Exception(
            "User not found"
        )

        # Act/Assert
        with pytest.raises(Exception, match="User not found"):
            users_mixin.get_user_details_by_username(invalid_username)

    def test_get_user_details_by_username_server_dc_pattern(
        self, users_mixin, mock_user_data_server
    ):
        """Test that username lookup follows Server/DC patterns."""
        # Arrange
        username = "[email protected]"  # Email-like username common in DC
        users_mixin.confluence.get_user_details_by_username.return_value = (
            mock_user_data_server
        )

        # Act
        result = users_mixin.get_user_details_by_username(username)

        # Assert
        users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
            username, None
        )
        assert result == mock_user_data_server

    def test_get_current_user_info_success(self, users_mixin, mock_current_user_data):
        """Test successfully getting current user info."""
        # Arrange
        users_mixin.confluence.get.return_value = mock_current_user_data

        # Act
        result = users_mixin.get_current_user_info()

        # Assert
        users_mixin.confluence.get.assert_called_once_with("rest/api/user/current")
        assert result == mock_current_user_data
        assert result["accountId"] == "5b10ac8d82e05b22cc7d4ef5"
        assert result["displayName"] == "Current User"

    def test_get_current_user_info_returns_non_dict(self, users_mixin):
        """Test get_current_user_info when API returns non-dict data."""
        # Arrange
        users_mixin.confluence.get.return_value = "Invalid response"

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed: Did not receive valid JSON user data",
        ):
            users_mixin.get_current_user_info()

        users_mixin.confluence.get.assert_called_once_with("rest/api/user/current")

    def test_get_current_user_info_returns_none(self, users_mixin):
        """Test get_current_user_info when API returns None."""
        # Arrange
        users_mixin.confluence.get.return_value = None

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed: Did not receive valid JSON user data",
        ):
            users_mixin.get_current_user_info()

    def test_get_current_user_info_http_error_401(self, users_mixin):
        """Test get_current_user_info with 401 authentication error."""
        # Arrange
        mock_response = MagicMock()
        mock_response.status_code = 401
        http_error = HTTPError(response=mock_response)
        users_mixin.confluence.get.side_effect = http_error

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed: 401 from /rest/api/user/current",
        ):
            users_mixin.get_current_user_info()

    def test_get_current_user_info_http_error_403(self, users_mixin):
        """Test get_current_user_info with 403 forbidden error."""
        # Arrange
        mock_response = MagicMock()
        mock_response.status_code = 403
        http_error = HTTPError(response=mock_response)
        users_mixin.confluence.get.side_effect = http_error

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed: 403 from /rest/api/user/current",
        ):
            users_mixin.get_current_user_info()

    def test_get_current_user_info_http_error_other(self, users_mixin):
        """Test get_current_user_info with other HTTP error codes."""
        # Arrange
        mock_response = MagicMock()
        mock_response.status_code = 500
        http_error = HTTPError(response=mock_response)
        users_mixin.confluence.get.side_effect = http_error

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed with HTTPError",
        ):
            users_mixin.get_current_user_info()

    def test_get_current_user_info_http_error_no_response(self, users_mixin):
        """Test get_current_user_info with HTTPError but no response object."""
        # Arrange
        http_error = HTTPError()
        http_error.response = None
        users_mixin.confluence.get.side_effect = http_error

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed with HTTPError",
        ):
            users_mixin.get_current_user_info()

    def test_get_current_user_info_generic_exception(self, users_mixin):
        """Test get_current_user_info with generic exception."""
        # Arrange
        users_mixin.confluence.get.side_effect = ConnectionError("Network error")

        # Act/Assert
        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Confluence token validation failed: Network error",
        ):
            users_mixin.get_current_user_info()

    @pytest.mark.parametrize(
        "expand_param",
        [
            None,
            "status",
            "",  # Empty string
        ],
    )
    def test_get_user_details_by_accountid_expand_parameter_handling(
        self, users_mixin, mock_user_data_cloud, expand_param
    ):
        """Test that expand parameter is properly handled for account ID lookup."""
        # Arrange
        account_id = "5b10ac8d82e05b22cc7d4ef5"
        expected_data = mock_user_data_cloud.copy()
        if expand_param == "status":
            expected_data["status"] = "Active"

        users_mixin.confluence.get_user_details_by_accountid.return_value = (
            expected_data
        )

        # Act
        result = users_mixin.get_user_details_by_accountid(account_id, expand_param)

        # Assert
        users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
            account_id, expand_param
        )
        assert result == expected_data

    @pytest.mark.parametrize(
        "expand_param",
        [
            None,
            "status",
            "",  # Empty string
        ],
    )
    def test_get_user_details_by_username_expand_parameter_handling(
        self, users_mixin, mock_user_data_server, expand_param
    ):
        """Test that expand parameter is properly handled for username lookup."""
        # Arrange
        username = "testuser"
        expected_data = mock_user_data_server.copy()
        if expand_param == "status":
            expected_data["status"] = "Active"

        users_mixin.confluence.get_user_details_by_username.return_value = expected_data

        # Act
        result = users_mixin.get_user_details_by_username(username, expand_param)

        # Assert
        users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
            username, expand_param
        )
        assert result == expected_data

    def test_users_mixin_inheritance(self, users_mixin):
        """Test that UsersMixin properly inherits from ConfluenceClient."""
        # Verify that UsersMixin is indeed a ConfluenceClient
        from mcp_atlassian.confluence.client import ConfluenceClient

        assert isinstance(users_mixin, ConfluenceClient)

        # Verify it has the expected attributes from ConfluenceClient
        assert hasattr(users_mixin, "confluence")
        assert hasattr(users_mixin, "config")

    def test_users_mixin_has_required_methods(self):
        """Test that UsersMixin has all required methods."""
        # Verify the mixin has the expected methods
        assert hasattr(UsersMixin, "get_user_details_by_accountid")
        assert hasattr(UsersMixin, "get_user_details_by_username")
        assert hasattr(UsersMixin, "get_current_user_info")

        # Verify method signatures
        import inspect

        # Check get_user_details_by_accountid signature
        sig = inspect.signature(UsersMixin.get_user_details_by_accountid)
        params = list(sig.parameters.keys())
        assert "self" in params
        assert "account_id" in params
        assert "expand" in params
        assert sig.parameters["expand"].default is None

        # Check get_user_details_by_username signature
        sig = inspect.signature(UsersMixin.get_user_details_by_username)
        params = list(sig.parameters.keys())
        assert "self" in params
        assert "username" in params
        assert "expand" in params
        assert sig.parameters["expand"].default is None

        # Check get_current_user_info signature
        sig = inspect.signature(UsersMixin.get_current_user_info)
        params = list(sig.parameters.keys())
        assert "self" in params
        assert len(params) == 1  # Only self parameter

    def test_user_permission_scenarios(self, users_mixin):
        """Test various permission error scenarios."""
        # Test 401 Unauthorized
        mock_response_401 = MagicMock()
        mock_response_401.status_code = 401
        http_error_401 = HTTPError(response=mock_response_401)
        users_mixin.confluence.get_user_details_by_accountid.side_effect = (
            http_error_401
        )

        with pytest.raises(Exception):  # Should propagate the original exception
            users_mixin.get_user_details_by_accountid("test-account-id")

        # Test 403 Forbidden
        mock_response_403 = MagicMock()
        mock_response_403.status_code = 403
        http_error_403 = HTTPError(response=mock_response_403)
        users_mixin.confluence.get_user_details_by_username.side_effect = http_error_403

        with pytest.raises(Exception):  # Should propagate the original exception
            users_mixin.get_user_details_by_username("testuser")

    def test_cloud_vs_server_authentication_patterns(self, users_mixin):
        """Test that different authentication patterns work for Cloud vs Server/DC."""
        # Mock Cloud response (account ID based)
        cloud_user_data = {
            "accountId": "5b10ac8d82e05b22cc7d4ef5",
            "accountType": "atlassian",
            "displayName": "Cloud User",
            "accountStatus": "active",
        }

        # Mock Server/DC response (username based)
        server_user_data = {
            "username": "serveruser",
            "userKey": "serveruser-key-12345",
            "displayName": "Server User",
            "status": "active",
        }

        # Test Cloud pattern
        users_mixin.confluence.get_user_details_by_accountid.return_value = (
            cloud_user_data
        )
        cloud_result = users_mixin.get_user_details_by_accountid(
            "5b10ac8d82e05b22cc7d4ef5"
        )
        assert cloud_result["accountId"] == "5b10ac8d82e05b22cc7d4ef5"
        assert "accountType" in cloud_result

        # Test Server/DC pattern
        users_mixin.confluence.get_user_details_by_username.return_value = (
            server_user_data
        )
        server_result = users_mixin.get_user_details_by_username("serveruser")
        assert server_result["username"] == "serveruser"
        assert "userKey" in server_result

    def test_response_data_validation_and_transformation(
        self, users_mixin, mock_user_data_cloud
    ):
        """Test that response data is properly validated and returned as-is."""
        # Arrange
        account_id = "5b10ac8d82e05b22cc7d4ef5"
        users_mixin.confluence.get_user_details_by_accountid.return_value = (
            mock_user_data_cloud
        )

        # Act
        result = users_mixin.get_user_details_by_accountid(account_id)

        # Assert - should return the data exactly as received from the API
        assert result is mock_user_data_cloud  # Same object reference
        assert isinstance(result, dict)
        assert all(
            key in result
            for key in ["accountId", "displayName", "email", "accountStatus"]
        )

    def test_deactivated_user_status_handling(self, users_mixin):
        """Test handling of deactivated users with status expansion."""
        # Arrange
        deactivated_user_data = {
            "accountId": "5b10ac8d82e05b22cc7d4ef5",
            "displayName": "Deactivated User",
            "accountStatus": "inactive",
            "status": "Deactivated",  # Expanded status
        }
        users_mixin.confluence.get_user_details_by_accountid.return_value = (
            deactivated_user_data
        )

        # Act
        result = users_mixin.get_user_details_by_accountid(
            "5b10ac8d82e05b22cc7d4ef5", expand="status"
        )

        # Assert
        assert result["accountStatus"] == "inactive"
        assert result["status"] == "Deactivated"
        users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
            "5b10ac8d82e05b22cc7d4ef5", "status"
        )

    def test_method_delegation_to_confluence_client(
        self, users_mixin, mock_current_user_data
    ):
        """Test that methods properly delegate to the underlying confluence client."""
        # Test that the methods are thin wrappers around confluence client methods
        account_id = "test-account-id"
        username = "testuser"
        expand = "status"

        # Test account ID method delegation
        users_mixin.get_user_details_by_accountid(account_id, expand)
        users_mixin.confluence.get_user_details_by_accountid.assert_called_with(
            account_id, expand
        )

        # Test username method delegation
        users_mixin.get_user_details_by_username(username, expand)
        users_mixin.confluence.get_user_details_by_username.assert_called_with(
            username, expand
        )

        # Test current user method delegation - need to mock the return value
        users_mixin.confluence.get.return_value = mock_current_user_data
        users_mixin.get_current_user_info()
        users_mixin.confluence.get.assert_called_with("rest/api/user/current")

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/pages.py:
--------------------------------------------------------------------------------

```python
"""Module for Confluence page operations."""

import logging

import requests
from requests.exceptions import HTTPError

from ..exceptions import MCPAtlassianAuthenticationError
from ..models.confluence import ConfluencePage
from .client import ConfluenceClient
from .v2_adapter import ConfluenceV2Adapter

logger = logging.getLogger("mcp-atlassian")


class PagesMixin(ConfluenceClient):
    """Mixin for Confluence page operations."""

    @property
    def _v2_adapter(self) -> ConfluenceV2Adapter | None:
        """Get v2 API adapter for OAuth authentication.

        Returns:
            ConfluenceV2Adapter instance if OAuth is configured, None otherwise
        """
        if self.config.auth_type == "oauth" and self.config.is_cloud:
            return ConfluenceV2Adapter(
                session=self.confluence._session, base_url=self.confluence.url
            )
        return None

    def get_page_content(
        self, page_id: str, *, convert_to_markdown: bool = True
    ) -> ConfluencePage:
        """
        Get content of a specific page.

        Args:
            page_id: The ID of the page to retrieve
            convert_to_markdown: When True, returns content in markdown format,
                               otherwise returns raw HTML (keyword-only)

        Returns:
            ConfluencePage model containing the page content and metadata

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Confluence API (401/403)
            Exception: If there is an error retrieving the page
        """
        try:
            # Use v2 API for OAuth authentication, v1 API for token/basic auth
            v2_adapter = self._v2_adapter
            if v2_adapter:
                logger.debug(
                    f"Using v2 API for OAuth authentication to get page '{page_id}'"
                )
                page = v2_adapter.get_page(
                    page_id=page_id,
                    expand="body.storage,version,space,children.attachment",
                )
            else:
                logger.debug(
                    f"Using v1 API for token/basic authentication to get page '{page_id}'"
                )
                page = self.confluence.get_page_by_id(
                    page_id=page_id,
                    expand="body.storage,version,space,children.attachment",
                )

            space_key = page.get("space", {}).get("key", "")
            content = page["body"]["storage"]["value"]
            processed_html, processed_markdown = self.preprocessor.process_html_content(
                content, space_key=space_key, confluence_client=self.confluence
            )

            # Use the appropriate content format based on the convert_to_markdown flag
            page_content = processed_markdown if convert_to_markdown else processed_html

            # Create and return the ConfluencePage model
            return ConfluencePage.from_api_response(
                page,
                base_url=self.config.url,
                include_body=True,
                # Override content with our processed version
                content_override=page_content,
                content_format="storage" if not convert_to_markdown else "markdown",
                is_cloud=self.config.is_cloud,
            )
        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Confluence API ({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
                raise http_err
        except Exception as e:
            logger.error(
                f"Error retrieving page content for page ID {page_id}: {str(e)}"
            )
            raise Exception(f"Error retrieving page content: {str(e)}") from e

    def get_page_ancestors(self, page_id: str) -> list[ConfluencePage]:
        """
        Get ancestors (parent pages) of a specific page.

        Args:
            page_id: The ID of the page to get ancestors for

        Returns:
            List of ConfluencePage models representing the ancestors in hierarchical order
                (immediate parent first, root ancestor last)

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Confluence API (401/403)
        """
        try:
            # Use the Atlassian Python API to get ancestors
            ancestors = self.confluence.get_page_ancestors(page_id)

            # Process each ancestor
            ancestor_models = []
            for ancestor in ancestors:
                # Create the page model without fetching content
                page_model = ConfluencePage.from_api_response(
                    ancestor,
                    base_url=self.config.url,
                    include_body=False,
                )
                ancestor_models.append(page_model)

            return ancestor_models
        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Confluence API ({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
                raise http_err
        except Exception as e:
            logger.error(f"Error fetching ancestors for page {page_id}: {str(e)}")
            logger.debug("Full exception details:", exc_info=True)
            return []

    def get_page_by_title(
        self, space_key: str, title: str, *, convert_to_markdown: bool = True
    ) -> ConfluencePage | None:
        """
        Get a specific page by its title from a Confluence space.

        Args:
            space_key: The key of the space containing the page
            title: The title of the page to retrieve
            convert_to_markdown: When True, returns content in markdown format,
                               otherwise returns raw HTML (keyword-only)

        Returns:
            ConfluencePage model containing the page content and metadata, or None if not found
        """
        try:
            # Directly try to find the page by title
            page = self.confluence.get_page_by_title(
                space=space_key, title=title, expand="body.storage,version"
            )

            if not page:
                logger.warning(
                    f"Page '{title}' not found in space '{space_key}'. "
                    f"The space may be invalid, the page may not exist, or permissions may be insufficient."
                )
                return None

            content = page["body"]["storage"]["value"]
            processed_html, processed_markdown = self.preprocessor.process_html_content(
                content, space_key=space_key, confluence_client=self.confluence
            )

            # Use the appropriate content format based on the convert_to_markdown flag
            page_content = processed_markdown if convert_to_markdown else processed_html

            # Create and return the ConfluencePage model
            return ConfluencePage.from_api_response(
                page,
                base_url=self.config.url,
                include_body=True,
                # Override content with our processed version
                content_override=page_content,
                content_format="storage" if not convert_to_markdown else "markdown",
                is_cloud=self.config.is_cloud,
            )

        except KeyError as e:
            logger.error(f"Missing key in page data: {str(e)}")
            return None
        except requests.RequestException as e:
            logger.error(f"Network error when fetching page: {str(e)}")
            return None
        except (ValueError, TypeError) as e:
            logger.error(f"Error processing page data: {str(e)}")
            return None
        except Exception as e:  # noqa: BLE001 - Intentional fallback with full logging
            logger.error(f"Unexpected error fetching page: {str(e)}")
            # Log the full traceback at debug level for troubleshooting
            logger.debug("Full exception details:", exc_info=True)
            return None

    def get_space_pages(
        self,
        space_key: str,
        start: int = 0,
        limit: int = 10,
        *,
        convert_to_markdown: bool = True,
    ) -> list[ConfluencePage]:
        """
        Get all pages from a specific space.

        Args:
            space_key: The key of the space to get pages from
            start: The starting index for pagination
            limit: Maximum number of pages to return
            convert_to_markdown: When True, returns content in markdown format,
                               otherwise returns raw HTML (keyword-only)

        Returns:
            List of ConfluencePage models containing page content and metadata
        """
        pages = self.confluence.get_all_pages_from_space(
            space=space_key, start=start, limit=limit, expand="body.storage"
        )

        page_models = []
        for page in pages:
            content = page["body"]["storage"]["value"]
            processed_html, processed_markdown = self.preprocessor.process_html_content(
                content, space_key=space_key, confluence_client=self.confluence
            )

            # Use the appropriate content format based on the convert_to_markdown flag
            page_content = processed_markdown if convert_to_markdown else processed_html

            # Ensure space information is included
            if "space" not in page:
                page["space"] = {
                    "key": space_key,
                    "name": space_key,  # Use space_key as name if not available
                }

            # Create the ConfluencePage model
            page_model = ConfluencePage.from_api_response(
                page,
                base_url=self.config.url,
                include_body=True,
                # Override content with our processed version
                content_override=page_content,
                content_format="storage" if not convert_to_markdown else "markdown",
                is_cloud=self.config.is_cloud,
            )

            page_models.append(page_model)

        return page_models

    def create_page(
        self,
        space_key: str,
        title: str,
        body: str,
        parent_id: str | None = None,
        *,
        is_markdown: bool = True,
        enable_heading_anchors: bool = False,
        content_representation: str | None = None,
    ) -> ConfluencePage:
        """
        Create a new page in a Confluence space.

        Args:
            space_key: The key of the space to create the page in
            title: The title of the new page
            body: The content of the page (markdown, wiki markup, or storage format)
            parent_id: Optional ID of a parent page
            is_markdown: Whether the body content is in markdown format (default: True, keyword-only)
            enable_heading_anchors: Whether to enable automatic heading anchor generation (default: False, keyword-only)
            content_representation: Content format when is_markdown=False ('wiki' or 'storage', keyword-only)

        Returns:
            ConfluencePage model containing the new page's data

        Raises:
            Exception: If there is an error creating the page
        """
        try:
            # Determine body and representation based on content type
            if is_markdown:
                # Convert markdown to Confluence storage format
                final_body = self.preprocessor.markdown_to_confluence_storage(
                    body, enable_heading_anchors=enable_heading_anchors
                )
                representation = "storage"
            else:
                # Use body as-is with specified representation
                final_body = body
                representation = content_representation or "storage"

            # Use v2 API for OAuth authentication, v1 API for token/basic auth
            v2_adapter = self._v2_adapter
            if v2_adapter:
                logger.debug(
                    f"Using v2 API for OAuth authentication to create page '{title}'"
                )
                result = v2_adapter.create_page(
                    space_key=space_key,
                    title=title,
                    body=final_body,
                    parent_id=parent_id,
                    representation=representation,
                )
            else:
                logger.debug(
                    f"Using v1 API for token/basic authentication to create page '{title}'"
                )
                result = self.confluence.create_page(
                    space=space_key,
                    title=title,
                    body=final_body,
                    parent_id=parent_id,
                    representation=representation,
                )

            # Get the new page content
            page_id = result.get("id")
            if not page_id:
                raise ValueError("Create page response did not contain an ID")

            return self.get_page_content(page_id)
        except Exception as e:
            logger.error(
                f"Error creating page '{title}' in space {space_key}: {str(e)}"
            )
            raise Exception(
                f"Failed to create page '{title}' in space {space_key}: {str(e)}"
            ) from e

    def update_page(
        self,
        page_id: str,
        title: str,
        body: str,
        *,
        is_minor_edit: bool = False,
        version_comment: str = "",
        is_markdown: bool = True,
        parent_id: str | None = None,
        enable_heading_anchors: bool = False,
        content_representation: str | None = None,
    ) -> ConfluencePage:
        """
        Update an existing page in Confluence.

        Args:
            page_id: The ID of the page to update
            title: The new title of the page
            body: The new content of the page (markdown, wiki markup, or storage format)
            is_minor_edit: Whether this is a minor edit (keyword-only)
            version_comment: Optional comment for this version (keyword-only)
            is_markdown: Whether the body content is in markdown format (default: True, keyword-only)
            parent_id: Optional new parent page ID (keyword-only)
            enable_heading_anchors: Whether to enable automatic heading anchor generation (default: False, keyword-only)
            content_representation: Content format when is_markdown=False ('wiki' or 'storage', keyword-only)

        Returns:
            ConfluencePage model containing the updated page's data

        Raises:
            Exception: If there is an error updating the page
        """
        try:
            # Determine body and representation based on content type
            if is_markdown:
                # Convert markdown to Confluence storage format
                final_body = self.preprocessor.markdown_to_confluence_storage(
                    body, enable_heading_anchors=enable_heading_anchors
                )
                representation = "storage"
            else:
                # Use body as-is with specified representation
                final_body = body
                representation = content_representation or "storage"

            logger.debug(f"Updating page {page_id} with title '{title}'")

            # Use v2 API for OAuth authentication, v1 API for token/basic auth
            v2_adapter = self._v2_adapter
            if v2_adapter:
                logger.debug(
                    f"Using v2 API for OAuth authentication to update page '{page_id}'"
                )
                response = v2_adapter.update_page(
                    page_id=page_id,
                    title=title,
                    body=final_body,
                    representation=representation,
                    version_comment=version_comment,
                )
            else:
                logger.debug(
                    f"Using v1 API for token/basic authentication to update page '{page_id}'"
                )
                update_kwargs = {
                    "page_id": page_id,
                    "title": title,
                    "body": final_body,
                    "type": "page",
                    "representation": representation,
                    "minor_edit": is_minor_edit,
                    "version_comment": version_comment,
                    "always_update": True,
                }
                if parent_id:
                    update_kwargs["parent_id"] = parent_id

                self.confluence.update_page(**update_kwargs)

            # After update, refresh the page data
            return self.get_page_content(page_id)
        except Exception as e:
            logger.error(f"Error updating page {page_id}: {str(e)}")
            raise Exception(f"Failed to update page {page_id}: {str(e)}") from e

    def get_page_children(
        self,
        page_id: str,
        start: int = 0,
        limit: int = 25,
        expand: str = "version",
        *,
        convert_to_markdown: bool = True,
    ) -> list[ConfluencePage]:
        """
        Get child pages of a specific Confluence page.

        Args:
            page_id: The ID of the parent page
            start: The starting index for pagination
            limit: Maximum number of child pages to return
            expand: Fields to expand in the response
            convert_to_markdown: When True, returns content in markdown format,
                               otherwise returns raw HTML (keyword-only)

        Returns:
            List of ConfluencePage models containing the child pages
        """
        try:
            # Use the Atlassian Python API's get_page_child_by_type method
            results = self.confluence.get_page_child_by_type(
                page_id=page_id, type="page", start=start, limit=limit, expand=expand
            )

            # Process results
            page_models = []

            # Handle both pagination modes
            if isinstance(results, dict) and "results" in results:
                child_pages = results.get("results", [])
            else:
                child_pages = results or []

            space_key = ""

            # Get space key from the first result if available
            if child_pages and "space" in child_pages[0]:
                space_key = child_pages[0].get("space", {}).get("key", "")

            # Process each child page
            for page in child_pages:
                # Only process content if we have "body" expanded
                content_override = None
                if "body" in page and convert_to_markdown:
                    content = page.get("body", {}).get("storage", {}).get("value", "")
                    if content:
                        _, processed_markdown = self.preprocessor.process_html_content(
                            content,
                            space_key=space_key,
                            confluence_client=self.confluence,
                        )
                        content_override = processed_markdown

                # Create the page model
                page_model = ConfluencePage.from_api_response(
                    page,
                    base_url=self.config.url,
                    include_body=True,
                    content_override=content_override,
                    content_format="markdown" if convert_to_markdown else "storage",
                )

                page_models.append(page_model)

            return page_models

        except Exception as e:
            logger.error(f"Error fetching child pages for page {page_id}: {str(e)}")
            logger.debug("Full exception details:", exc_info=True)
            return []

    def delete_page(self, page_id: str) -> bool:
        """
        Delete a Confluence page by its ID.

        Args:
            page_id: The ID of the page to delete

        Returns:
            Boolean indicating success (True) or failure (False)

        Raises:
            Exception: If there is an error deleting the page
        """
        try:
            logger.debug(f"Deleting page {page_id}")

            # Use v2 API for OAuth authentication, v1 API for token/basic auth
            v2_adapter = self._v2_adapter
            if v2_adapter:
                logger.debug(
                    f"Using v2 API for OAuth authentication to delete page '{page_id}'"
                )
                return v2_adapter.delete_page(page_id=page_id)
            else:
                logger.debug(
                    f"Using v1 API for token/basic authentication to delete page '{page_id}'"
                )
                response = self.confluence.remove_page(page_id=page_id)

                # The Atlassian library's remove_page returns the raw response from
                # the REST API call. For a successful deletion, we should get a
                # response object, but it might be empty (HTTP 204 No Content).
                # For REST DELETE operations, a success typically returns 204 or 200

                # Check if we got a response object
                if isinstance(response, requests.Response):
                    # Check if status code indicates success (2xx)
                    success = 200 <= response.status_code < 300
                    logger.debug(
                        f"Delete page {page_id} returned status code {response.status_code}"
                    )
                    return success
                # If it's not a response object but truthy (like True), consider it a success
                elif response:
                    return True
                # Default to true since no exception was raised
                # This is safer than returning false when we don't know what happened
                return True

        except Exception as e:
            logger.error(f"Error deleting page {page_id}: {str(e)}")
            raise Exception(f"Failed to delete page {page_id}: {str(e)}") from e

```

--------------------------------------------------------------------------------
/tests/unit/confluence/conftest.py:
--------------------------------------------------------------------------------

```python
"""
Shared fixtures for Confluence unit tests.

This module provides specialized fixtures for testing Confluence-related functionality.
It builds upon the root conftest.py fixtures and integrates with the new test utilities
framework to provide efficient, reusable test fixtures with session-scoped caching.
"""

import os
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

# Add the root tests directory to PYTHONPATH
sys.path.append(str(Path(__file__).parent.parent.parent))

from fixtures.confluence_mocks import (
    MOCK_COMMENTS_RESPONSE,
    MOCK_CQL_SEARCH_RESPONSE,
    MOCK_LABELS_RESPONSE,
    MOCK_PAGE_RESPONSE,
    MOCK_PAGES_FROM_SPACE_RESPONSE,
    MOCK_SPACES_RESPONSE,
)

from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.utils.oauth import OAuthConfig
from tests.utils.factories import AuthConfigFactory, ConfluencePageFactory
from tests.utils.mocks import MockAtlassianClient, MockPreprocessor

# ============================================================================
# Session-Scoped Confluence Data Fixtures
# ============================================================================


@pytest.fixture(scope="session")
def session_confluence_spaces():
    """
    Session-scoped fixture providing Confluence space definitions.

    This expensive-to-create data is cached for the entire test session
    to improve test performance.

    Returns:
        List[Dict[str, Any]]: Complete Confluence space definitions
    """
    return [
        {
            "id": 12345,
            "key": "TEST",
            "name": "Test Space",
            "type": "global",
            "status": "current",
            "description": {"plain": {"value": "Test space for unit tests"}},
            "_links": {
                "webui": "/spaces/TEST",
                "self": "https://test.atlassian.net/wiki/rest/api/space/TEST",
            },
        },
        {
            "id": 12346,
            "key": "DEMO",
            "name": "Demo Space",
            "type": "global",
            "status": "current",
            "description": {"plain": {"value": "Demo space for testing"}},
            "_links": {
                "webui": "/spaces/DEMO",
                "self": "https://test.atlassian.net/wiki/rest/api/space/DEMO",
            },
        },
        {
            "id": 12347,
            "key": "SAMPLE",
            "name": "Sample Space",
            "type": "personal",
            "status": "current",
            "description": {"plain": {"value": "Sample personal space"}},
            "_links": {
                "webui": "/spaces/SAMPLE",
                "self": "https://test.atlassian.net/wiki/rest/api/space/SAMPLE",
            },
        },
    ]


@pytest.fixture(scope="session")
def session_confluence_content_types():
    """
    Session-scoped fixture providing Confluence content type definitions.

    Returns:
        List[Dict[str, Any]]: Mock Confluence content type data
    """
    return [
        {"name": "page", "type": "content"},
        {"name": "blogpost", "type": "content"},
        {"name": "comment", "type": "content"},
        {"name": "attachment", "type": "content"},
        {"name": "space", "type": "space"},
        {"name": "user", "type": "user"},
    ]


@pytest.fixture(scope="session")
def session_confluence_macros():
    """
    Session-scoped fixture providing Confluence macro definitions.

    Returns:
        List[Dict[str, Any]]: Mock Confluence macro data
    """
    return [
        {"name": "info", "hasBody": True, "bodyType": "rich-text"},
        {"name": "warning", "hasBody": True, "bodyType": "rich-text"},
        {"name": "note", "hasBody": True, "bodyType": "rich-text"},
        {"name": "tip", "hasBody": True, "bodyType": "rich-text"},
        {"name": "code", "hasBody": True, "bodyType": "plain-text"},
        {"name": "toc", "hasBody": False},
        {"name": "children", "hasBody": False},
        {"name": "excerpt", "hasBody": True, "bodyType": "rich-text"},
        {"name": "include", "hasBody": False},
        {"name": "panel", "hasBody": True, "bodyType": "rich-text"},
    ]


# ============================================================================
# Configuration Fixtures
# ============================================================================


@pytest.fixture
def confluence_config_factory():
    """
    Factory for creating ConfluenceConfig instances with customizable options.

    Returns:
        Callable: Function that creates ConfluenceConfig instances

    Example:
        def test_config(confluence_config_factory):
            config = confluence_config_factory(url="https://custom.atlassian.net/wiki")
            assert "custom" in config.url
    """

    def _create_config(**overrides):
        defaults = {
            "url": "https://example.atlassian.net/wiki",
            "auth_type": "basic",
            "username": "test_user",
            "api_token": "test_token",
        }
        config_data = {**defaults, **overrides}
        return ConfluenceConfig(**config_data)

    return _create_config


@pytest.fixture
def mock_config(confluence_config_factory):
    """
    Create a standard mock ConfluenceConfig instance.

    This fixture provides a consistent ConfluenceConfig for tests that don't
    need custom configuration.

    Returns:
        ConfluenceConfig: Standard test configuration
    """
    return confluence_config_factory()


# ============================================================================
# Environment Fixtures
# ============================================================================


@pytest.fixture
def mock_env_vars():
    """
    Mock environment variables for testing.

    Note: This fixture is maintained for backward compatibility.
    Consider using the environment fixtures from root conftest.py.
    """
    with patch.dict(
        "os.environ",
        {
            "CONFLUENCE_URL": "https://example.atlassian.net/wiki",
            "CONFLUENCE_USERNAME": "test_user",
            "CONFLUENCE_API_TOKEN": "test_token",
        },
    ):
        yield


@pytest.fixture
def confluence_auth_environment():
    """
    Fixture providing Confluence-specific authentication environment.

    This sets up environment variables specifically for Confluence authentication
    and can be customized per test.
    """
    auth_config = AuthConfigFactory.create_basic_auth_config()
    confluence_env = {
        "CONFLUENCE_URL": f"{auth_config['url']}/wiki",
        "CONFLUENCE_USERNAME": auth_config["username"],
        "CONFLUENCE_API_TOKEN": auth_config["api_token"],
    }

    with patch.dict(os.environ, confluence_env, clear=False):
        yield confluence_env


# ============================================================================
# Mock Atlassian Client Fixtures
# ============================================================================


@pytest.fixture
def mock_atlassian_confluence(
    session_confluence_spaces, session_confluence_content_types
):
    """
    Enhanced mock of the Atlassian Confluence client.

    This fixture provides a comprehensive mock that uses session-scoped
    data for improved performance and consistency.

    Args:
        session_confluence_spaces: Session-scoped space definitions
        session_confluence_content_types: Session-scoped content type data

    Returns:
        MagicMock: Fully configured mock Confluence client
    """
    with patch("mcp_atlassian.confluence.client.Confluence") as mock:
        confluence_instance = mock.return_value

        # Use original mock data to maintain backward compatibility for existing tests
        confluence_instance.get_all_spaces.return_value = MOCK_SPACES_RESPONSE

        # Set up common return values using both legacy mocks and new factories
        confluence_instance.get_page_by_id.return_value = MOCK_PAGE_RESPONSE
        confluence_instance.get_page_by_title.return_value = MOCK_PAGE_RESPONSE
        confluence_instance.get_all_pages_from_space.return_value = (
            MOCK_PAGES_FROM_SPACE_RESPONSE
        )
        confluence_instance.get_page_comments.return_value = MOCK_COMMENTS_RESPONSE
        confluence_instance.get_page_labels.return_value = MOCK_LABELS_RESPONSE
        confluence_instance.cql.return_value = MOCK_CQL_SEARCH_RESPONSE

        # Enhanced responses using factories
        confluence_instance.create_page.return_value = ConfluencePageFactory.create(
            page_id="123456789", title="New Test Page"
        )

        # Mock update_page to return None (as the actual method does)
        confluence_instance.update_page.return_value = None

        # Mock delete_page to return None
        confluence_instance.delete_page.return_value = None

        # Mock page history
        confluence_instance.get_page_history.return_value = {
            "results": [
                {
                    "version": {"number": 1},
                    "when": "2023-01-01T12:00:00.000Z",
                    "by": {"displayName": "Test User"},
                    "message": "Initial version",
                }
            ]
        }

        # Mock page ancestors
        confluence_instance.get_page_ancestors.return_value = [
            ConfluencePageFactory.create(page_id="parent123", title="Parent Page")
        ]

        # Mock page children
        confluence_instance.get_page_child_by_type.return_value = {
            "results": [
                ConfluencePageFactory.create(page_id="child123", title="Child Page")
            ]
        }

        yield confluence_instance


@pytest.fixture
def enhanced_mock_confluence_client():
    """
    Enhanced mock Confluence client using the new factory system.

    This provides a more flexible mock that can be easily customized
    and integrates with the factory system.

    Returns:
        MagicMock: Enhanced mock Confluence client with factory integration
    """
    return MockAtlassianClient.create_confluence_client()


@pytest.fixture
def mock_atlassian_confluence_with_session_data(
    session_confluence_spaces, session_confluence_content_types
):
    """
    Alternative mock using session-scoped data for new tests.

    This fixture is recommended for new tests as it uses the efficient
    session-scoped data. Existing tests should continue using
    mock_atlassian_confluence for compatibility.

    Args:
        session_confluence_spaces: Session-scoped space definitions
        session_confluence_content_types: Session-scoped content type data

    Returns:
        MagicMock: Mock Confluence client with session-scoped data
    """
    with patch("mcp_atlassian.confluence.client.Confluence") as mock:
        confluence_instance = mock.return_value

        # Use session-scoped data for improved performance
        confluence_instance.get_all_spaces.return_value = {
            "results": session_confluence_spaces,
            "size": len(session_confluence_spaces),
        }

        # Enhanced responses using factories
        confluence_instance.get_page_by_id.return_value = ConfluencePageFactory.create()
        confluence_instance.get_page_by_title.return_value = (
            ConfluencePageFactory.create()
        )
        confluence_instance.create_page.return_value = ConfluencePageFactory.create(
            page_id="123456789", title="New Test Page"
        )

        # Use session data for content types
        confluence_instance.get_content_types.return_value = (
            session_confluence_content_types
        )

        yield confluence_instance


# ============================================================================
# Preprocessor Fixtures
# ============================================================================


@pytest.fixture
def mock_preprocessor():
    """
    Mock the TextPreprocessor with enhanced functionality.

    This fixture provides a preprocessor mock that can be customized
    for testing different content processing scenarios.

    Returns:
        MagicMock: Mock preprocessor with common methods
    """
    preprocessor_instance = MagicMock()

    # Default processing behavior
    preprocessor_instance.process_html_content.return_value = (
        "<p>Processed HTML</p>",
        "Processed Markdown",
    )

    # Additional processing methods
    preprocessor_instance.clean_html.return_value = "<p>Clean HTML</p>"
    preprocessor_instance.html_to_markdown.return_value = "# Markdown Content"
    preprocessor_instance.markdown_to_html.return_value = "<h1>HTML Content</h1>"

    yield preprocessor_instance


@pytest.fixture
def preprocessor_factory():
    """
    Factory for creating preprocessor mocks with different behaviors.

    Returns:
        Dict[str, Callable]: Factory functions for different preprocessor types

    Example:
        def test_preprocessing(preprocessor_factory):
            html_processor = preprocessor_factory["html_to_markdown"]()
            markdown_processor = preprocessor_factory["markdown_to_html"]()
    """
    return {
        "html_to_markdown": MockPreprocessor.create_html_to_markdown,
        "markdown_to_html": MockPreprocessor.create_markdown_to_html,
    }


# ============================================================================
# Client Instance Fixtures
# ============================================================================


@pytest.fixture
def oauth_confluence_client(mock_preprocessor):
    """
    Create a ConfluenceClient instance configured for OAuth authentication.

    This fixture provides a Confluence client configured with OAuth settings
    for testing OAuth-specific functionality.

    Args:
        mock_preprocessor: Mock text preprocessor

    Returns:
        ConfluenceClient: OAuth-configured client instance
    """
    # Create OAuth configuration
    oauth_config = OAuthConfig(
        client_id="test-client-id",
        client_secret="test-client-secret",
        redirect_uri="http://localhost:8080/callback",
        scope="read:confluence-content write:confluence-content",
        cloud_id="test-cloud-id",
    )

    # Convert to ConfluenceConfig format (use .atlassian.net URL to make is_cloud return True)
    config = ConfluenceConfig(
        url="https://test.atlassian.net/wiki",
        auth_type="oauth",
        oauth_config=oauth_config,
    )

    # Mock the OAuth session setup and Confluence client
    with patch(
        "mcp_atlassian.confluence.client.configure_oauth_session"
    ) as mock_oauth_session:
        with patch(
            "mcp_atlassian.confluence.client.Confluence"
        ) as mock_confluence_class:
            with patch(
                "mcp_atlassian.preprocessing.TextPreprocessor"
            ) as mock_text_preprocessor:
                # Mock OAuth session configuration to succeed
                mock_oauth_session.return_value = True

                mock_text_preprocessor.return_value = mock_preprocessor

                # Create the mock Confluence instance
                mock_confluence_instance = MagicMock()
                mock_confluence_class.return_value = mock_confluence_instance

                # Set up OAuth-specific mock responses
                mock_confluence_instance.get_all_spaces.return_value = (
                    MOCK_SPACES_RESPONSE
                )
                mock_confluence_instance.get_page_by_id.return_value = (
                    MOCK_PAGE_RESPONSE
                )
                mock_confluence_instance.create_page.return_value = (
                    ConfluencePageFactory.create(
                        page_id="v2_123456789", title="OAuth Test Page"
                    )
                )

                # Mock the session to have OAuth characteristics
                mock_session = MagicMock()
                mock_confluence_instance._session = mock_session

                # Create the client with OAuth config
                client = ConfluenceClient(config=config)
                client.confluence = mock_confluence_instance
                client.preprocessor = mock_preprocessor

                yield client


@pytest.fixture
def confluence_client(mock_config, mock_atlassian_confluence, mock_preprocessor):
    """
    Create a ConfluenceClient instance with mocked dependencies.

    This fixture provides a fully functional ConfluenceClient with mocked
    Atlassian API calls and content preprocessing for testing.

    Args:
        mock_config: Mock configuration
        mock_atlassian_confluence: Mock Atlassian client
        mock_preprocessor: Mock text preprocessor

    Returns:
        ConfluenceClient: Configured client instance
    """
    # Create a client with a mocked configuration
    with patch(
        "mcp_atlassian.preprocessing.TextPreprocessor"
    ) as mock_text_preprocessor:
        mock_text_preprocessor.return_value = mock_preprocessor

        client = ConfluenceClient(config=mock_config)
        # Replace the actual Confluence instance with our mock
        client.confluence = mock_atlassian_confluence
        # Replace the actual preprocessor with our mock
        client.preprocessor = mock_preprocessor
        yield client


# ============================================================================
# Specialized Test Data Fixtures
# ============================================================================


@pytest.fixture
def make_confluence_page_with_content():
    """
    Factory fixture for creating Confluence pages with rich content.

    Returns:
        Callable: Function that creates page data with content

    Example:
        def test_page_content(make_confluence_page_with_content):
            page = make_confluence_page_with_content(
                title="Rich Page",
                content="<h1>Header</h1><p>Content</p>",
                labels=["test", "content"]
            )
    """

    def _create_page_with_content(
        title: str = "Test Page",
        content: str = "<p>Test content</p>",
        labels: list[str] = None,
        **overrides,
    ):
        labels = labels or ["test"]
        page = ConfluencePageFactory.create(title=title, **overrides)

        # Add rich content
        page["body"]["storage"]["value"] = content

        # Add labels
        page["metadata"] = {
            "labels": {"results": [{"name": label} for label in labels]}
        }

        # Add version info
        page["version"]["message"] = f"Updated {title}"

        return page

    return _create_page_with_content


@pytest.fixture
def make_confluence_search_results():
    """
    Factory fixture for creating Confluence search results.

    Returns:
        Callable: Function that creates CQL search results

    Example:
        def test_search(make_confluence_search_results):
            results = make_confluence_search_results(
                pages=["Page 1", "Page 2"],
                total=2
            )
    """

    def _create_search_results(pages: list[str] = None, total: int = None, **overrides):
        if pages is None:
            pages = ["Test Page 1", "Test Page 2", "Test Page 3"]
        if total is None:
            total = len(pages)

        page_objects = [
            ConfluencePageFactory.create(page_id=str(i), title=title)
            for i, title in enumerate(pages, 1)
        ]

        defaults = {
            "results": page_objects,
            "totalSize": total,
            "start": 0,
            "limit": 25,
        }

        return {**defaults, **overrides}

    return _create_search_results


@pytest.fixture
def make_confluence_space():
    """
    Factory fixture for creating Confluence spaces.

    Returns:
        Callable: Function that creates space data

    Example:
        def test_space(make_confluence_space):
            space = make_confluence_space(
                key="CUSTOM",
                name="Custom Space",
                type="personal"
            )
    """

    def _create_space(
        key: str = "TEST",
        name: str = "Test Space",
        space_type: str = "global",
        **overrides,
    ):
        defaults = {
            "id": 12345,
            "key": key,
            "name": name,
            "type": space_type,
            "status": "current",
            "description": {"plain": {"value": f"{name} for testing"}},
            "_links": {
                "webui": f"/spaces/{key}",
                "self": f"https://test.atlassian.net/wiki/rest/api/space/{key}",
            },
        }

        return {**defaults, **overrides}

    return _create_space


# ============================================================================
# Integration Test Fixtures
# ============================================================================


@pytest.fixture
def confluence_integration_client(session_auth_configs):
    """
    Create a ConfluenceClient for integration testing.

    This fixture creates a client that can be used for integration tests
    when real API credentials are available.

    Args:
        session_auth_configs: Session-scoped auth configurations

    Returns:
        Optional[ConfluenceClient]: Real client if credentials available, None otherwise
    """
    # Check if integration test environment variables are set
    required_vars = ["CONFLUENCE_URL", "CONFLUENCE_USERNAME", "CONFLUENCE_API_TOKEN"]
    if not all(os.environ.get(var) for var in required_vars):
        pytest.skip("Integration test environment variables not set")

    config = ConfluenceConfig(
        url=os.environ["CONFLUENCE_URL"],
        auth_type="basic",
        username=os.environ["CONFLUENCE_USERNAME"],
        api_token=os.environ["CONFLUENCE_API_TOKEN"],
    )

    return ConfluenceClient(config=config)


# ============================================================================
# Parameterized Fixtures
# ============================================================================


@pytest.fixture
def parametrized_confluence_content_type(request):
    """
    Parametrized fixture for testing with different Confluence content types.

    Use with pytest.mark.parametrize to test functionality across
    different content types.

    Example:
        @pytest.mark.parametrize("parametrized_confluence_content_type",
                               ["page", "blogpost"], indirect=True)
        def test_content_types(parametrized_confluence_content_type):
            # Test runs once for each content type
            pass
    """
    content_type = request.param
    return ConfluencePageFactory.create(type=content_type)


@pytest.fixture
def parametrized_confluence_space_type(request):
    """
    Parametrized fixture for testing with different Confluence space types.

    Use with pytest.mark.parametrize to test functionality across
    different space types.
    """
    space_type = request.param
    return {
        "key": "TEST",
        "name": "Test Space",
        "type": space_type,
        "status": "current",
    }

```

--------------------------------------------------------------------------------
/tests/fixtures/jira_mocks.py:
--------------------------------------------------------------------------------

```python
MOCK_JIRA_ISSUE_RESPONSE = {
    "expand": "renderedFields,names,schema,operations,editmeta,changelog,versionedRepresentations",
    "id": "12345",
    "self": "https://example.atlassian.net/rest/api/2/issue/12345",
    "key": "PROJ-123",
    "fields": {
        "summary": "Test Issue Summary",
        "description": "This is a test issue description",
        "created": "2024-01-01T10:00:00.000+0000",
        "updated": "2024-01-02T15:30:00.000+0000",
        "status": {
            "self": "https://example.atlassian.net/rest/api/2/status/3",
            "description": "This issue is currently being worked on.",
            "iconUrl": "https://example.atlassian.net/images/icons/statuses/inprogress.png",
            "name": "In Progress",
            "id": "3",
            "statusCategory": {
                "self": "https://example.atlassian.net/rest/api/2/statuscategory/4",
                "id": 4,
                "key": "indeterminate",
                "colorName": "yellow",
                "name": "In Progress",
            },
        },
        "issuetype": {
            "self": "https://example.atlassian.net/rest/api/2/issuetype/10001",
            "id": "10001",
            "description": "A task that needs to be done.",
            "iconUrl": "https://example.atlassian.net/secure/viewavatar?size=xsmall&avatarId=10318&avatarType=issuetype",
            "name": "Task",
            "subtask": False,
        },
        "priority": {
            "self": "https://example.atlassian.net/rest/api/2/priority/3",
            "iconUrl": "https://example.atlassian.net/images/icons/priorities/medium.svg",
            "name": "Medium",
            "id": "3",
        },
        "assignee": {
            "self": "https://example.atlassian.net/rest/api/2/user?accountId=123",
            "accountId": "123",
            "emailAddress": "[email protected]",
            "avatarUrls": {
                "48x48": "https://secure.gravatar.com/avatar/123?d=https%3A%2F%2Favatar.example.com%2Fdefault.png",
            },
            "displayName": "Test User",
            "active": True,
            "timeZone": "UTC",
        },
        "reporter": {
            "self": "https://example.atlassian.net/rest/api/2/user?accountId=456",
            "accountId": "456",
            "displayName": "Reporter User",
            "active": True,
        },
        "labels": ["test-label"],
        "components": [{"name": "Backend"}],
        "fixVersions": [{"name": "v1.0"}],
        "attachment": [
            {
                "id": "10000",
                "filename": "test_attachment.txt",
                "size": 1024,
                "mimeType": "text/plain",
                "content": "https://example.atlassian.net/secure/attachment/10000/test_attachment.txt",
            }
        ],
        "comment": {
            "comments": [
                {
                    "id": "10001",
                    "author": {"displayName": "Commenter User"},
                    "body": "This is a test comment",
                    "created": "2024-01-01T12:00:00.000+0000",
                    "updated": "2024-01-01T12:00:00.000+0000",
                }
            ],
            "maxResults": 1,
            "total": 1,
            "startAt": 0,
        },
        "timetracking": {
            "originalEstimate": "1d",
            "remainingEstimate": "4h",
            "timeSpent": "4h",
            "originalEstimateSeconds": 28800,
            "remainingEstimateSeconds": 14400,
            "timeSpentSeconds": 14400,
        },
        "project": {
            "id": "10000",
            "key": "PROJ",
            "name": "Test Project",
            "self": "https://example.atlassian.net/rest/api/2/project/10000",
            "avatarUrls": {
                "48x48": "https://example.atlassian.net/secure/projectavatar?size=large&pid=10000"
            },
        },
        "resolution": {
            "self": "https://example.atlassian.net/rest/api/2/resolution/10000",
            "id": "10000",
            "description": "Work has been completed on this issue.",
            "name": "Fixed",
        },
        "duedate": "2024-12-31",
        "resolutiondate": "2024-01-15T11:00:00.000+0000",
        "parent": {
            "id": "12344",
            "key": "PROJ-122",
            "fields": {"summary": "Parent Issue Summary"},
        },
        "subtasks": [
            {
                "id": "12346",
                "key": "PROJ-124",
                "fields": {"summary": "Subtask 1 Summary"},
            }
        ],
        "security": {"name": "Internal", "id": "10001"},
        "worklog": {"startAt": 0, "maxResults": 20, "total": 0, "worklogs": []},
        # Custom fields for testing
        "customfield_10011": "Epic Name Example",  # Epic Name
        "customfield_10014": "EPIC-KEY-1",  # Epic Link
        "customfield_10001": "Custom Text Field Value",
        "customfield_10002": {"value": "Custom Select Value"},
        "customfield_10003": [
            {"value": "Custom MultiSelect 1"},
            {"value": "Custom MultiSelect 2"},
        ],
    },
    "names": {
        "customfield_10011": "Epic Name",
        "customfield_10014": "Epic Link",
        "customfield_10001": "My Custom Text Field",
        "customfield_10002": "My Custom Select",
        "customfield_10003": "My Custom MultiSelect",
    },
}

MOCK_JIRA_JQL_RESPONSE = {
    "expand": "schema,names",
    "startAt": 0,
    "maxResults": 5,
    "total": 34,
    "issues": [
        {
            "expand": "operations,versionedRepresentations,editmeta,changelog,renderedFields",
            "id": "12345",
            "self": "https://example.atlassian.net/rest/api/2/issue/12345",
            "key": "PROJ-123",
            "fields": {
                "parent": {
                    "id": "12340",
                    "key": "PROJ-120",
                    "self": "https://example.atlassian.net/rest/api/2/issue/12340",
                    "fields": {
                        "summary": "Parent Epic Summary",
                        "status": {
                            "self": "https://example.atlassian.net/rest/api/2/status/10000",
                            "description": "",
                            "iconUrl": "https://example.atlassian.net/",
                            "name": "In Progress",
                            "id": "10000",
                            "statusCategory": {
                                "self": "https://example.atlassian.net/rest/api/2/statuscategory/4",
                                "id": 4,
                                "key": "indeterminate",
                                "colorName": "yellow",
                                "name": "In Progress",
                            },
                        },
                        "priority": {
                            "self": "https://example.atlassian.net/rest/api/2/priority/3",
                            "iconUrl": "https://example.atlassian.net/images/icons/priorities/medium.svg",
                            "name": "Medium",
                            "id": "3",
                        },
                        "issuetype": {
                            "self": "https://example.atlassian.net/rest/api/2/issuetype/10001",
                            "id": "10001",
                            "description": "Epics track large pieces of work.",
                            "iconUrl": "https://example.atlassian.net/images/icons/issuetypes/epic.svg",
                            "name": "Epic",
                            "subtask": False,
                            "hierarchyLevel": 1,
                        },
                    },
                },
                "summary": "Test Issue Summary",
                "description": "This is a test issue description",
                "created": "2024-01-01T10:00:00.000+0000",
                "updated": "2024-01-02T15:30:00.000+0000",
                "duedate": "2024-12-31",
                "priority": {
                    "self": "https://example.atlassian.net/rest/api/2/priority/3",
                    "iconUrl": "https://example.atlassian.net/images/icons/priorities/medium.svg",
                    "name": "Medium",
                    "id": "3",
                },
                "status": {
                    "self": "https://example.atlassian.net/rest/api/2/status/10000",
                    "description": "",
                    "iconUrl": "https://example.atlassian.net/",
                    "name": "In Progress",
                    "id": "10000",
                    "statusCategory": {
                        "self": "https://example.atlassian.net/rest/api/2/statuscategory/4",
                        "id": 4,
                        "key": "indeterminate",
                        "colorName": "yellow",
                        "name": "In Progress",
                    },
                },
                "issuetype": {
                    "self": "https://example.atlassian.net/rest/api/2/issuetype/10000",
                    "id": "10000",
                    "description": "A task that needs to be done.",
                    "iconUrl": "https://example.atlassian.net/images/icons/issuetypes/task.svg",
                    "name": "Task",
                    "subtask": False,
                    "hierarchyLevel": 0,
                },
                "project": {
                    "self": "https://example.atlassian.net/rest/api/2/project/10000",
                    "id": "10000",
                    "key": "PROJ",
                    "name": "Test Project",
                    "projectTypeKey": "software",
                    "simplified": True,
                },
                "comment": {
                    "comments": [
                        {
                            "self": "https://example.atlassian.net/rest/api/2/issue/12345/comment/10000",
                            "id": "10000",
                            "author": {"displayName": "Comment User", "active": True},
                            "body": "This is a test comment",
                            "created": "2024-01-01T12:00:00.000+0000",
                            "updated": "2024-01-01T12:00:00.000+0000",
                        }
                    ],
                    "maxResults": 1,
                    "total": 1,
                    "startAt": 0,
                },
            },
        }
    ],
    "names": {
        "customfield_10011": "Epic Name",
        "customfield_10014": "Epic Link",
    },
}

# Generic mock Jira comments data without any company-specific information
MOCK_JIRA_COMMENTS = {
    "startAt": 0,
    "maxResults": 100,
    "total": 5,
    "comments": [
        {
            "self": "https://example.atlassian.net/rest/api/2/issue/10001/comment/10101",
            "id": "10101",
            "author": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-1",
                "accountId": "account-id-1",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user1_48.png",
                    "24x24": "https://avatar.example.com/avatar/user1_24.png",
                    "16x16": "https://avatar.example.com/avatar/user1_16.png",
                    "32x32": "https://avatar.example.com/avatar/user1_32.png",
                },
                "displayName": "John Smith",
                "active": True,
                "timeZone": "UTC",
                "accountType": "atlassian",
            },
            "body": "I've analyzed this issue and found that we need to update the configuration settings.",
            "updateAuthor": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-1",
                "accountId": "account-id-1",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user1_48.png",
                    "24x24": "https://avatar.example.com/avatar/user1_24.png",
                    "16x16": "https://avatar.example.com/avatar/user1_16.png",
                    "32x32": "https://avatar.example.com/avatar/user1_32.png",
                },
                "displayName": "John Smith",
                "active": True,
                "timeZone": "UTC",
                "accountType": "atlassian",
            },
            "created": "2023-01-15T09:14:01.240+0000",
            "updated": "2023-01-15T09:14:15.433+0000",
            "jsdPublic": True,
        },
        {
            "self": "https://example.atlassian.net/rest/api/2/issue/10001/comment/10102",
            "id": "10102",
            "author": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-2",
                "accountId": "account-id-2",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user2_48.png",
                    "24x24": "https://avatar.example.com/avatar/user2_24.png",
                    "16x16": "https://avatar.example.com/avatar/user2_16.png",
                    "32x32": "https://avatar.example.com/avatar/user2_32.png",
                },
                "displayName": "Jane Doe",
                "active": True,
                "timeZone": "America/New_York",
                "accountType": "atlassian",
            },
            "body": "I agree with John. Let's schedule a meeting to discuss the implementation details.",
            "updateAuthor": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-2",
                "accountId": "account-id-2",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user2_48.png",
                    "24x24": "https://avatar.example.com/avatar/user2_24.png",
                    "16x16": "https://avatar.example.com/avatar/user2_16.png",
                    "32x32": "https://avatar.example.com/avatar/user2_32.png",
                },
                "displayName": "Jane Doe",
                "active": True,
                "timeZone": "America/New_York",
                "accountType": "atlassian",
            },
            "created": "2023-01-15T14:35:28.392+0000",
            "updated": "2023-01-15T14:35:28.392+0000",
            "jsdPublic": True,
        },
        {
            "self": "https://example.atlassian.net/rest/api/2/issue/10001/comment/10103",
            "id": "10103",
            "author": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-3",
                "accountId": "account-id-3",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user3_48.png",
                    "24x24": "https://avatar.example.com/avatar/user3_24.png",
                    "16x16": "https://avatar.example.com/avatar/user3_16.png",
                    "32x32": "https://avatar.example.com/avatar/user3_32.png",
                },
                "displayName": "Robert Johnson",
                "active": True,
                "timeZone": "Europe/London",
                "accountType": "atlassian",
            },
            "body": "I've created a draft implementation. Please review the code changes in the linked PR.",
            "updateAuthor": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-3",
                "accountId": "account-id-3",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user3_48.png",
                    "24x24": "https://avatar.example.com/avatar/user3_24.png",
                    "16x16": "https://avatar.example.com/avatar/user3_16.png",
                    "32x32": "https://avatar.example.com/avatar/user3_32.png",
                },
                "displayName": "Robert Johnson",
                "active": True,
                "timeZone": "Europe/London",
                "accountType": "atlassian",
            },
            "created": "2023-01-18T10:47:53.672+0000",
            "updated": "2023-01-18T11:01:55.589+0000",
            "jsdPublic": True,
        },
        {
            "self": "https://example.atlassian.net/rest/api/2/issue/10001/comment/10104",
            "id": "10104",
            "author": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-1",
                "accountId": "account-id-1",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user1_48.png",
                    "24x24": "https://avatar.example.com/avatar/user1_24.png",
                    "16x16": "https://avatar.example.com/avatar/user1_16.png",
                    "32x32": "https://avatar.example.com/avatar/user1_32.png",
                },
                "displayName": "John Smith",
                "active": True,
                "timeZone": "UTC",
                "accountType": "atlassian",
            },
            "body": "The code looks good. I've left some minor suggestions in the PR review.",
            "updateAuthor": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-1",
                "accountId": "account-id-1",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user1_48.png",
                    "24x24": "https://avatar.example.com/avatar/user1_24.png",
                    "16x16": "https://avatar.example.com/avatar/user1_16.png",
                    "32x32": "https://avatar.example.com/avatar/user1_32.png",
                },
                "displayName": "John Smith",
                "active": True,
                "timeZone": "UTC",
                "accountType": "atlassian",
            },
            "created": "2023-01-19T15:20:02.083+0000",
            "updated": "2023-01-19T15:20:02.083+0000",
            "jsdPublic": True,
        },
        {
            "self": "https://example.atlassian.net/rest/api/2/issue/10001/comment/10105",
            "id": "10105",
            "author": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-3",
                "accountId": "account-id-3",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user3_48.png",
                    "24x24": "https://avatar.example.com/avatar/user3_24.png",
                    "16x16": "https://avatar.example.com/avatar/user3_16.png",
                    "32x32": "https://avatar.example.com/avatar/user3_32.png",
                },
                "displayName": "Robert Johnson",
                "active": True,
                "timeZone": "Europe/London",
                "accountType": "atlassian",
            },
            "body": "I've addressed all the feedback and merged the PR. Issue can be closed.",
            "updateAuthor": {
                "self": "https://example.atlassian.net/rest/api/2/user?accountId=account-id-3",
                "accountId": "account-id-3",
                "avatarUrls": {
                    "48x48": "https://avatar.example.com/avatar/user3_48.png",
                    "24x24": "https://avatar.example.com/avatar/user3_24.png",
                    "16x16": "https://avatar.example.com/avatar/user3_16.png",
                    "32x32": "https://avatar.example.com/avatar/user3_32.png",
                },
                "displayName": "Robert Johnson",
                "active": True,
                "timeZone": "Europe/London",
                "accountType": "atlassian",
            },
            "created": "2023-01-20T11:10:38.167+0000",
            "updated": "2023-01-20T11:10:38.167+0000",
            "jsdPublic": True,
        },
    ],
}

# Create a simplified version for test use
MOCK_JIRA_COMMENTS_SIMPLIFIED = {
    "startAt": 0,
    "maxResults": 100,
    "total": MOCK_JIRA_COMMENTS["total"],
    "comments": [
        {
            "id": comment["id"],
            "author": {"displayName": comment["author"]["displayName"]},
            "body": comment["body"],
            "created": comment["created"],
            "updated": comment["updated"],
        }
        for comment in MOCK_JIRA_COMMENTS["comments"][:3]  # Just use first 3 comments
    ],
}

# Create simplified versions of the mock responses
MOCK_JIRA_ISSUE_RESPONSE_SIMPLIFIED = {
    "id": "12345",
    "self": "https://example.atlassian.net/rest/api/2/issue/12345",
    "key": "PROJ-123",
    "fields": {
        "summary": "Test Issue Summary",
        "description": "This is a test issue description",
        "created": "2024-01-01T10:00:00.000+0000",
        "updated": "2024-01-02T15:30:00.000+0000",
        "duedate": "2024-12-31",
        "priority": {
            "name": "Medium",
            "id": "3",
        },
        "status": {
            "name": "In Progress",
            "id": "10000",
            "statusCategory": {
                "id": 4,
                "key": "indeterminate",
                "name": "In Progress",
            },
        },
        "issuetype": {
            "id": "10000",
            "name": "Task",
            "subtask": False,
        },
        "project": {
            "id": "10000",
            "key": "PROJ",
            "name": "Test Project",
        },
        "comment": {
            "comments": [
                {
                    "id": "10000",
                    "author": {"displayName": "Comment User"},
                    "body": "This is a test comment",
                    "created": "2024-01-01T12:00:00.000+0000",
                    "updated": "2024-01-01T12:00:00.000+0000",
                }
            ],
            "total": 1,
        },
        "labels": ["test-label"],
        "fixVersions": [{"name": "v1.0"}],
        "attachment": [
            {
                "id": "10000",
                "filename": "test_attachment.txt",
                "author": {"displayName": "Test User"},
                "created": "2024-01-01T10:00:00.000+0000",
                "size": 1024,
                "mimeType": "text/plain",
                "content": "https://example.atlassian.net/secure/attachment/10000/test_attachment.txt",
            }
        ],
        "timetracking": {
            "originalEstimate": "1d",
            "remainingEstimate": "4h",
            "timeSpent": "4h",
        },
        "custom_fields": {
            "customfield_10011": "My Awesome Epic Name",
            "customfield_10014": "EPIC-123",
            "customfield_10001": "Simple string value",
            "customfield_10002": {"value": "Option value"},
            "customfield_10003": [{"value": "Item 1"}, {"value": "Item 2"}],
        },
    },
}

MOCK_JIRA_JQL_RESPONSE_SIMPLIFIED = {
    "startAt": 0,
    "maxResults": 5,
    "total": 34,
    "issues": [
        {
            "id": "12345",
            "key": "PROJ-123",
            "fields": {
                "parent": {
                    "id": "12340",
                    "key": "PROJ-120",
                    "fields": {
                        "summary": "Parent Epic Summary",
                        "status": {
                            "name": "In Progress",
                        },
                        "issuetype": {
                            "name": "Epic",
                            "subtask": False,
                        },
                    },
                },
                "summary": "Test Issue Summary",
                "description": "This is a test issue description",
                "created": "2024-01-01T10:00:00.000+0000",
                "updated": "2024-01-02T15:30:00.000+0000",
                "status": {
                    "name": "In Progress",
                },
                "issuetype": {
                    "name": "Task",
                    "subtask": False,
                },
                "project": {
                    "key": "PROJ",
                    "name": "Test Project",
                },
                "comment": {
                    "comments": [
                        {
                            "id": "10000",
                            "author": {"displayName": "Comment User"},
                            "body": "This is a test comment",
                            "created": "2024-01-01T12:00:00.000+0000",
                        }
                    ],
                    "total": 1,
                },
            },
        }
    ],
}

```

--------------------------------------------------------------------------------
/tests/unit/models/test_confluence_models.py:
--------------------------------------------------------------------------------

```python
"""
Tests for the Confluence Pydantic models.

These tests validate the conversion of Confluence API responses to structured models
and the simplified dictionary conversion for API responses.
"""

import pytest

from src.mcp_atlassian.models import (
    ConfluenceAttachment,
    ConfluenceComment,
    ConfluenceLabel,
    ConfluencePage,
    ConfluenceSearchResult,
    ConfluenceSpace,
    ConfluenceUser,
    ConfluenceVersion,
)
from src.mcp_atlassian.models.constants import EMPTY_STRING

# Optional: Import real API client for optional real-data testing
try:
    from src.mcp_atlassian.confluence.client import ConfluenceClient  # noqa: F401
except ImportError:
    pass


class TestConfluenceAttachment:
    """Tests for the ConfluenceAttachment model."""

    def test_from_api_response_with_valid_data(self):
        """Test creating a ConfluenceAttachment from valid API data."""
        attachment_data = {
            "id": "att105348",
            "type": "attachment",
            "status": "current",
            "title": "random_geometric_image.svg",
            "extensions": {"mediaType": "application/binary", "fileSize": 1098},
        }

        attachment = ConfluenceAttachment.from_api_response(attachment_data)

        assert attachment.id == "att105348"
        assert attachment.title == "random_geometric_image.svg"
        assert attachment.type == "attachment"
        assert attachment.status == "current"
        assert attachment.media_type == "application/binary"
        assert attachment.file_size == 1098

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceAttachment from empty data."""
        attachment = ConfluenceAttachment.from_api_response({})

        # Should use default values
        assert attachment.id is None
        assert attachment.title is None
        assert attachment.type is None
        assert attachment.status is None
        assert attachment.media_type is None
        assert attachment.file_size is None

    def test_from_api_response_with_none_data(self):
        """Test creating a ConfluenceAttachment from None data."""
        attachment = ConfluenceAttachment.from_api_response(None)

        # Should use default values
        assert attachment.id is None
        assert attachment.title is None
        assert attachment.type is None
        assert attachment.status is None
        assert attachment.media_type is None
        assert attachment.file_size is None

    def test_to_simplified_dict(self):
        """Test converting ConfluenceAttachment to a simplified dictionary."""
        attachment = ConfluenceAttachment(
            id="att105348",
            title="random_geometric_image.svg",
            type="attachment",
            status="current",
            media_type="application/binary",
            file_size=1098,
        )

        simplified = attachment.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["id"] == "att105348"
        assert simplified["title"] == "random_geometric_image.svg"
        assert simplified["type"] == "attachment"
        assert simplified["status"] == "current"
        assert simplified["media_type"] == "application/binary"
        assert simplified["file_size"] == 1098


class TestConfluenceUser:
    """Tests for the ConfluenceUser model."""

    def test_from_api_response_with_valid_data(self):
        """Test creating a ConfluenceUser from valid API data."""
        user_data = {
            "accountId": "user123",
            "displayName": "Test User",
            "email": "[email protected]",
            "profilePicture": {
                "path": "/wiki/aa-avatar/user123",
                "width": 48,
                "height": 48,
            },
            "accountStatus": "active",
            "locale": "en_US",
        }

        user = ConfluenceUser.from_api_response(user_data)

        assert user.account_id == "user123"
        assert user.display_name == "Test User"
        assert user.email == "[email protected]"
        assert user.profile_picture == "/wiki/aa-avatar/user123"
        assert user.is_active is True
        assert user.locale == "en_US"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceUser from empty data."""
        user = ConfluenceUser.from_api_response({})

        # Should use default values
        assert user.account_id is None
        assert user.display_name == "Unassigned"
        assert user.email is None
        assert user.profile_picture is None
        assert user.is_active is True
        assert user.locale is None

    def test_from_api_response_with_none_data(self):
        """Test creating a ConfluenceUser from None data."""
        user = ConfluenceUser.from_api_response(None)

        # Should use default values
        assert user.account_id is None
        assert user.display_name == "Unassigned"
        assert user.email is None
        assert user.profile_picture is None
        assert user.is_active is True
        assert user.locale is None

    def test_to_simplified_dict(self):
        """Test converting ConfluenceUser to a simplified dictionary."""
        user = ConfluenceUser(
            account_id="user123",
            display_name="Test User",
            email="[email protected]",
            profile_picture="/wiki/aa-avatar/user123",
            is_active=True,
            locale="en_US",
        )

        simplified = user.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["display_name"] == "Test User"
        assert simplified["email"] == "[email protected]"
        assert simplified["profile_picture"] == "/wiki/aa-avatar/user123"
        assert "account_id" not in simplified  # Not included in simplified dict
        assert "locale" not in simplified  # Not included in simplified dict


class TestConfluenceSpace:
    """Tests for the ConfluenceSpace model."""

    def test_from_api_response_with_valid_data(self):
        """Test creating a ConfluenceSpace from valid API data."""
        space_data = {
            "id": "123456",
            "key": "TEST",
            "name": "Test Space",
            "type": "global",
            "status": "current",
        }

        space = ConfluenceSpace.from_api_response(space_data)

        assert space.id == "123456"
        assert space.key == "TEST"
        assert space.name == "Test Space"
        assert space.type == "global"
        assert space.status == "current"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceSpace from empty data."""
        space = ConfluenceSpace.from_api_response({})

        # Should use default values
        assert space.id == "0"
        assert space.key == ""
        assert space.name == "Unknown"
        assert space.type == "global"
        assert space.status == "current"

    def test_to_simplified_dict(self):
        """Test converting ConfluenceSpace to a simplified dictionary."""
        space = ConfluenceSpace(
            id="123456", key="TEST", name="Test Space", type="global", status="current"
        )

        simplified = space.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["key"] == "TEST"
        assert simplified["name"] == "Test Space"
        assert simplified["type"] == "global"
        assert simplified["status"] == "current"
        assert "id" not in simplified  # Not included in simplified dict


class TestConfluenceVersion:
    """Tests for the ConfluenceVersion model."""

    def test_from_api_response_with_valid_data(self):
        """Test creating a ConfluenceVersion from valid API data."""
        version_data = {
            "number": 5,
            "when": "2024-01-01T09:00:00.000Z",
            "message": "Updated content",
            "by": {
                "accountId": "user123",
                "displayName": "Test User",
                "email": "[email protected]",
            },
        }

        version = ConfluenceVersion.from_api_response(version_data)

        assert version.number == 5
        assert version.when == "2024-01-01T09:00:00.000Z"
        assert version.message == "Updated content"
        assert version.by is not None
        assert version.by.display_name == "Test User"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceVersion from empty data."""
        version = ConfluenceVersion.from_api_response({})

        # Should use default values
        assert version.number == 0
        assert version.when == ""
        assert version.message is None
        assert version.by is None

    def test_to_simplified_dict(self):
        """Test converting ConfluenceVersion to a simplified dictionary."""
        version = ConfluenceVersion(
            number=5,
            when="2024-01-01T09:00:00.000Z",
            message="Updated content",
            by=ConfluenceUser(account_id="user123", display_name="Test User"),
        )

        simplified = version.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["number"] == 5
        assert simplified["when"] == "2024-01-01 09:00:00"  # Formatted timestamp
        assert simplified["message"] == "Updated content"
        assert simplified["by"] == "Test User"


class TestConfluenceComment:
    """Tests for the ConfluenceComment model."""

    def test_from_api_response_with_valid_data(self, confluence_comments_data):
        """Test creating a ConfluenceComment from valid API data."""
        comment_data = confluence_comments_data["results"][0]

        comment = ConfluenceComment.from_api_response(comment_data)

        assert comment.id == "456789123"
        assert comment.title == "Re: Technical Design Document"
        assert comment.body != ""  # Body should be populated from "value" field
        assert comment.author is not None
        assert comment.author.display_name == "John Doe"
        assert comment.type == "comment"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceComment from empty data."""
        comment = ConfluenceComment.from_api_response({})

        # Should use default values
        assert comment.id == "0"
        assert comment.title is None
        assert comment.body == ""
        assert comment.created == ""
        assert comment.updated == ""
        assert comment.author is None
        assert comment.type == "comment"

    def test_to_simplified_dict(self):
        """Test converting ConfluenceComment to a simplified dictionary."""
        comment = ConfluenceComment(
            id="456789123",
            title="Test Comment",
            body="This is a test comment",
            created="2024-01-01T10:00:00.000Z",
            updated="2024-01-01T10:00:00.000Z",
            author=ConfluenceUser(account_id="user123", display_name="Comment Author"),
            type="comment",
        )

        simplified = comment.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["id"] == "456789123"
        assert simplified["title"] == "Test Comment"
        assert simplified["body"] == "This is a test comment"
        assert simplified["created"] == "2024-01-01 10:00:00"  # Formatted timestamp
        assert simplified["updated"] == "2024-01-01 10:00:00"  # Formatted timestamp
        assert simplified["author"] == "Comment Author"


class TestConfluenceLabel:
    """Tests for the ConfluenceLabel model."""

    def test_from_api_response_with_valid_data(self, confluence_labels_data):
        """Test creating a ConfluenceLabel from valid API data."""
        label_data = confluence_labels_data["results"][0]

        label = ConfluenceLabel.from_api_response(label_data)

        assert label.id == "456789123"
        assert label.name == "meeting-notes"
        assert label.prefix == "global"
        assert label.label == "meeting-notes"
        assert label.type == "label"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceLabel from empty data."""
        label = ConfluenceLabel.from_api_response({})

        # Should use default values
        assert label.id == "0"
        assert label.name == EMPTY_STRING
        assert label.prefix == "global"
        assert label.label == EMPTY_STRING
        assert label.type == "label"

    def test_to_simplified_dict(self):
        """Test converting ConfluenceLabel to a simplified dictionary."""
        label = ConfluenceLabel(
            id="456789123",
            name="test",
            prefix="my",
            label="test",
            type="label",
        )

        simplified = label.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["id"] == "456789123"
        assert simplified["name"] == "test"
        assert simplified["prefix"] == "my"
        assert simplified["label"] == "test"


class TestConfluencePage:
    """Tests for the ConfluencePage model."""

    def test_from_api_response_with_valid_data(self, confluence_page_data):
        """Test creating a ConfluencePage from valid API data."""
        page = ConfluencePage.from_api_response(confluence_page_data)

        assert page.id == "987654321"
        assert page.title == "Example Meeting Notes"
        assert page.type == "page"
        assert page.status == "current"

        # Verify nested objects
        assert page.space is not None
        assert page.space.key == "PROJ"
        assert page.space.name == "Project Space"

        assert page.version is not None
        assert page.version.number == 1
        assert page.version.by is not None
        assert page.version.by.display_name == "Example User (Unlicensed)"

        # Content extraction depends on the implementation
        # If it's not extracting from the mock data, let's skip this check
        # assert "<h2>" in page.content

        # Check timestamps
        assert page.version.when == "2024-01-01T09:00:00.000Z"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluencePage from empty data."""
        page = ConfluencePage.from_api_response({})

        # Should use default values
        assert page.id == "0"
        assert page.title == ""
        assert page.type == "page"
        assert page.status == "current"
        assert page.space is None
        assert page.content == ""
        assert page.content_format == "view"
        assert page.created == ""
        assert page.updated == ""
        assert page.author is None
        assert page.version is None
        assert len(page.ancestors) == 0
        assert isinstance(page.children, dict)
        assert page.url is None

    def test_from_api_response_with_search_result(self, confluence_search_data):
        """Test creating a ConfluencePage from search result content."""
        content_data = confluence_search_data["results"][0]["content"]

        page = ConfluencePage.from_api_response(content_data)

        assert page.id == "123456789"
        assert page.title == "2024-01-01: Team Progress Meeting 01"
        assert page.type == "page"
        assert page.status == "current"

    def test_to_simplified_dict(self, confluence_page_data):
        """Test converting ConfluencePage to a simplified dictionary."""
        page = ConfluencePage.from_api_response(confluence_page_data)

        simplified = page.to_simplified_dict()

        assert isinstance(simplified, dict)
        assert simplified["id"] == "987654321"
        assert simplified["title"] == "Example Meeting Notes"

        # The keys in the simplified dict depend on the implementation
        # Let's check for space information in a more flexible way
        assert page.space is not None
        assert page.space.key == "PROJ"

        # Check space information - could be a string or a dict
        if "space_key" in simplified:
            assert simplified["space_key"] == "PROJ"
        elif "space" in simplified:
            # The space field might be a dictionary with key and name fields
            if isinstance(simplified["space"], dict):
                assert simplified["space"]["key"] == "PROJ"
                assert simplified["space"]["name"] == "Project Space"
            # Or it might be a string with just the key
            else:
                assert (
                    simplified["space"] == "PROJ"
                    or simplified["space"] == "Project Space"
                )

        # Check version is included
        assert "version" in simplified
        assert simplified["version"] == 1

        # URL should be included
        assert "url" in simplified

    def test_from_api_response_with_expandable_space(self):
        """Test creating a ConfluencePage from data with space info in _expandable."""
        page_data = {
            "id": "123456",
            "title": "Test Page",
            "_expandable": {"space": "/rest/api/space/TEST"},
        }

        page = ConfluencePage.from_api_response(
            page_data, base_url="https://confluence.example.com", is_cloud=True
        )

        assert page.space is not None
        assert page.space.key == "TEST"
        assert page.space.name == "Space TEST"
        assert page.url == "https://confluence.example.com/spaces/TEST/pages/123456"

    def test_from_api_response_with_missing_space(self):
        """Test creating a ConfluencePage with no space information."""
        page_data = {"id": "123456", "title": "Test Page"}

        page = ConfluencePage.from_api_response(
            page_data, base_url="https://confluence.example.com", is_cloud=True
        )

        assert page.space is not None
        assert page.space.key == ""  # Default from ConfluenceSpace
        assert page.url == "https://confluence.example.com/spaces/unknown/pages/123456"

    def test_from_api_response_with_empty_space_data(self):
        """Test creating a ConfluencePage with empty space data."""
        page_data = {
            "id": "123456",
            "title": "Test Page",
            "space": {},  # Empty space data
        }

        page = ConfluencePage.from_api_response(
            page_data, base_url="https://confluence.example.com", is_cloud=True
        )

        assert page.space is not None
        assert page.space.key == ""  # Default from ConfluenceSpace
        assert page.url == "https://confluence.example.com/spaces/unknown/pages/123456"

    def test_from_api_response_url_construction_without_base_url(self):
        """Test that URL is None when base_url is not provided."""
        page_data = {
            "id": "123456",
            "title": "Test Page",
            "space": {"key": "TEST", "name": "Test Space"},
        }

        page = ConfluencePage.from_api_response(page_data)  # No base_url provided

        assert page.url is None
        assert page.space is not None
        assert page.space.key == "TEST"

    def test_url_construction_cloud_format(self):
        """Test URL construction in cloud format."""
        page_data = {
            "id": "123456",
            "title": "Test Page",
            "space": {"key": "TEST", "name": "Test Space"},
        }

        page = ConfluencePage.from_api_response(
            page_data, base_url="https://example.atlassian.net/wiki", is_cloud=True
        )

        assert page.url == "https://example.atlassian.net/wiki/spaces/TEST/pages/123456"

    def test_url_construction_server_format(self):
        """Test URL construction in server format."""
        page_data = {
            "id": "123456",
            "title": "Test Page",
            "space": {"key": "TEST", "name": "Test Space"},
        }

        page = ConfluencePage.from_api_response(
            page_data, base_url="https://wiki.corp.example.com", is_cloud=False
        )

        assert (
            page.url
            == "https://wiki.corp.example.com/pages/viewpage.action?pageId=123456"
        )


class TestConfluenceSearchResult:
    """Tests for the ConfluenceSearchResult model."""

    def test_from_api_response_with_valid_data(self, confluence_search_data):
        """Test creating a ConfluenceSearchResult from valid API data."""
        search_result = ConfluenceSearchResult.from_api_response(confluence_search_data)

        assert search_result.total_size == 1
        assert search_result.start == 0
        assert search_result.limit == 50
        assert search_result.cql_query == "parent = 123456789"
        assert search_result.search_duration == 156

        assert len(search_result.results) == 1

        # Verify that results are properly converted to ConfluencePage objects
        page = search_result.results[0]
        assert isinstance(page, ConfluencePage)
        assert page.id == "123456789"
        assert page.title == "2024-01-01: Team Progress Meeting 01"

    def test_from_api_response_with_empty_data(self):
        """Test creating a ConfluenceSearchResult from empty data."""
        search_result = ConfluenceSearchResult.from_api_response({})

        # Should use default values
        assert search_result.total_size == 0
        assert search_result.start == 0
        assert search_result.limit == 0
        assert search_result.cql_query is None
        assert search_result.search_duration is None
        assert len(search_result.results) == 0


class TestRealConfluenceData:
    """Tests using real Confluence data (only run if environment is configured)."""

    def test_real_confluence_page(
        self, use_real_confluence_data, default_confluence_page_id
    ):
        """Test with real Confluence page data from the API."""
        if not use_real_confluence_data:
            pytest.skip("Real Confluence data testing is disabled")

        try:
            # Initialize the Confluence client
            from src.mcp_atlassian.confluence.client import ConfluenceClient
            from src.mcp_atlassian.confluence.config import ConfluenceConfig
            from src.mcp_atlassian.confluence.pages import PagesMixin

            # Use the from_env method to create the config
            config = ConfluenceConfig.from_env()
            confluence_client = ConfluenceClient(config=config)
            pages_client = PagesMixin(config=config)

            # Use the provided page ID from environment or fixture
            page_id = default_confluence_page_id

            # Get page data directly from the Confluence API
            page_data = confluence_client.confluence.get_page_by_id(
                page_id=page_id, expand="body.storage,version,space,children.attachment"
            )

            # Convert to model
            from src.mcp_atlassian.models import ConfluencePage

            page = ConfluencePage.from_api_response(page_data)

            # Verify basic properties
            assert page.id == page_id
            assert page.title is not None
            assert page.space is not None
            assert page.space.key is not None
            assert page.attachments is not None

            # Verify that to_simplified_dict works
            simplified = page.to_simplified_dict()
            assert isinstance(simplified, dict)
            assert simplified["id"] == page_id

            # Get and test comments if available
            try:
                from src.mcp_atlassian.models import ConfluenceComment

                comments_data = confluence_client.confluence.get_page_comments(
                    page_id=page_id, expand="body.view,version"
                )

                if comments_data and comments_data.get("results"):
                    comment_data = comments_data["results"][0]
                    comment = ConfluenceComment.from_api_response(comment_data)

                    assert comment.id is not None
                    assert comment.body is not None

                    # Test simplified dict
                    comment_dict = comment.to_simplified_dict()
                    assert isinstance(comment_dict, dict)
                    assert "body" in comment_dict
            except Exception as e:
                print(f"Comments test skipped: {e}")

            print(
                f"Successfully tested real Confluence page {page_id} in space {page.space.key}"
            )
        except ImportError as e:
            pytest.skip(f"Could not import Confluence client: {e}")
        except Exception as e:
            pytest.fail(f"Error testing real Confluence page: {e}")

```

--------------------------------------------------------------------------------
/tests/integration/test_cross_service.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for cross-service functionality between Jira and Confluence."""

import os
from unittest.mock import MagicMock, patch

import pytest
from requests.sessions import Session

from mcp_atlassian.confluence import ConfluenceConfig
from mcp_atlassian.jira import JiraConfig
from mcp_atlassian.servers.context import MainAppContext
from mcp_atlassian.servers.dependencies import (
    _create_user_config_for_fetcher,
    get_confluence_fetcher,
    get_jira_fetcher,
)
from mcp_atlassian.servers.main import AtlassianMCP, main_lifespan
from mcp_atlassian.utils.environment import get_available_services
from mcp_atlassian.utils.ssl import configure_ssl_verification
from tests.utils.factories import (
    ConfluencePageFactory,
    JiraIssueFactory,
)
from tests.utils.mocks import MockAtlassianClient, MockEnvironment, MockFastMCP


@pytest.mark.integration
class TestCrossServiceUserResolution:
    """Test user resolution across Jira and Confluence services."""

    def test_shared_user_email_resolution(self):
        """Test that user email is resolved consistently across services."""
        user_email = "[email protected]"

        # Create mock clients
        jira_client = MockAtlassianClient.create_jira_client()
        confluence_client = MockAtlassianClient.create_confluence_client()

        # Mock user resolution
        jira_client.user.return_value = {
            "emailAddress": user_email,
            "displayName": "Test User",
            "accountId": "123456",
        }

        confluence_client.get_user.return_value = {
            "email": user_email,
            "displayName": "Test User",
            "accountId": "123456",
        }

        # Verify consistent user resolution
        jira_user = jira_client.user("123456")
        confluence_user = confluence_client.get_user("123456")

        assert jira_user["emailAddress"] == confluence_user["email"]
        assert jira_user["displayName"] == confluence_user["displayName"]
        assert jira_user["accountId"] == confluence_user["accountId"]

    @pytest.mark.anyio
    async def test_user_context_propagation(self):
        """Test that user context is properly propagated between services."""
        with MockEnvironment.oauth_env() as env:
            # Create configurations
            jira_config = JiraConfig.from_env()
            confluence_config = ConfluenceConfig.from_env()

            # Create user-specific configurations
            user_token = "test-user-token"
            user_email = "[email protected]"

            credentials = {
                "user_email_context": user_email,
                "oauth_access_token": user_token,
            }

            # Create user configs for both services
            user_jira_config = _create_user_config_for_fetcher(
                base_config=jira_config, auth_type="oauth", credentials=credentials
            )

            user_confluence_config = _create_user_config_for_fetcher(
                base_config=confluence_config,
                auth_type="oauth",
                credentials=credentials,
            )

            # Verify consistent OAuth configuration
            assert user_jira_config.oauth_config.access_token == user_token
            assert user_confluence_config.oauth_config.access_token == user_token
            assert user_jira_config.username == user_email
            assert user_confluence_config.username == user_email


@pytest.mark.integration
class TestSharedAuthentication:
    """Test shared authentication context between services."""

    def test_oauth_shared_configuration(self):
        """Test that OAuth configuration is shared between services."""
        with MockEnvironment.oauth_env() as env:
            # Both services should use the same OAuth configuration
            jira_config = JiraConfig.from_env()
            confluence_config = ConfluenceConfig.from_env()

            assert (
                jira_config.oauth_config.client_id
                == confluence_config.oauth_config.client_id
            )
            assert (
                jira_config.oauth_config.client_secret
                == confluence_config.oauth_config.client_secret
            )
            assert (
                jira_config.oauth_config.cloud_id
                == confluence_config.oauth_config.cloud_id
            )
            assert (
                jira_config.oauth_config.scope == confluence_config.oauth_config.scope
            )

    def test_basic_auth_shared_configuration(self):
        """Test that basic auth configuration can be shared between services."""
        with MockEnvironment.basic_auth_env() as env:
            # Both services should use consistent authentication
            jira_config = JiraConfig.from_env()
            confluence_config = ConfluenceConfig.from_env()

            assert jira_config.username == confluence_config.username
            assert jira_config.api_token == confluence_config.api_token
            assert jira_config.auth_type == confluence_config.auth_type

    @pytest.mark.anyio
    async def test_authentication_context_in_request(self):
        """Test authentication context is properly maintained in request state."""
        request = MockFastMCP.create_request()
        request.state.user_atlassian_auth_type = "oauth"
        request.state.user_atlassian_token = "test-oauth-token"
        request.state.user_atlassian_email = "[email protected]"

        with patch(
            "mcp_atlassian.servers.dependencies.get_http_request", return_value=request
        ):
            # Create mock context with lifespan data
            ctx = MockFastMCP.create_context()
            ctx.request_context = MagicMock()
            ctx.request_context.lifespan_context = {
                "app_lifespan_context": MainAppContext(
                    full_jira_config=JiraConfig.from_env(),
                    full_confluence_config=ConfluenceConfig.from_env(),
                    read_only=False,
                    enabled_tools=None,
                )
            }

            # Mock the fetcher creation
            with (
                patch("mcp_atlassian.jira.JiraFetcher") as mock_jira_fetcher,
                patch(
                    "mcp_atlassian.confluence.ConfluenceFetcher"
                ) as mock_confluence_fetcher,
            ):
                # Mock the current user validation
                mock_jira_instance = MagicMock()
                mock_jira_instance.get_current_user_account_id.return_value = "user123"
                mock_jira_fetcher.return_value = mock_jira_instance

                mock_confluence_instance = MagicMock()
                mock_confluence_instance.get_current_user_info.return_value = {
                    "email": "[email protected]",
                    "displayName": "Test User",
                }
                mock_confluence_fetcher.return_value = mock_confluence_instance

                # Get fetchers - should use the same auth context
                jira_fetcher = await get_jira_fetcher(ctx)
                confluence_fetcher = await get_confluence_fetcher(ctx)

                # Verify both fetchers were created with user-specific config
                assert request.state.jira_fetcher is not None
                assert request.state.confluence_fetcher is not None


@pytest.mark.integration
class TestCrossServiceErrorHandling:
    """Test error handling and propagation across services."""

    @pytest.mark.anyio
    async def test_jira_failure_does_not_affect_confluence(self):
        """Test that Jira failure doesn't prevent Confluence from working."""
        with MockEnvironment.basic_auth_env():
            app = AtlassianMCP("Test MCP")

            # Mock Jira to fail during initialization
            with patch(
                "mcp_atlassian.jira.config.JiraConfig.from_env"
            ) as mock_jira_config:
                mock_jira_config.side_effect = Exception("Jira config failed")

                # But Confluence should still work
                async with main_lifespan(app) as lifespan_data:
                    context = lifespan_data["app_lifespan_context"]

                    assert context.full_jira_config is None
                    assert context.full_confluence_config is not None

    @pytest.mark.anyio
    async def test_confluence_failure_does_not_affect_jira(self):
        """Test that Confluence failure doesn't prevent Jira from working."""
        with MockEnvironment.basic_auth_env():
            app = AtlassianMCP("Test MCP")

            # Mock Confluence to fail during initialization
            with patch(
                "mcp_atlassian.confluence.config.ConfluenceConfig.from_env"
            ) as mock_conf_config:
                mock_conf_config.side_effect = Exception("Confluence config failed")

                # But Jira should still work
                async with main_lifespan(app) as lifespan_data:
                    context = lifespan_data["app_lifespan_context"]

                    assert context.full_jira_config is not None
                    assert context.full_confluence_config is None

    def test_error_propagation_in_user_config_creation(self):
        """Test error propagation when creating user-specific configurations."""
        base_config = JiraConfig.from_env()

        # Test missing OAuth token
        with pytest.raises(ValueError, match="OAuth access token missing"):
            _create_user_config_for_fetcher(
                base_config=base_config,
                auth_type="oauth",
                credentials={"user_email_context": "[email protected]"},
            )

        # Test missing PAT token
        with pytest.raises(ValueError, match="PAT missing"):
            _create_user_config_for_fetcher(
                base_config=base_config,
                auth_type="pat",
                credentials={"user_email_context": "[email protected]"},
            )

        # Test invalid auth type
        with pytest.raises(ValueError, match="Unsupported auth_type"):
            _create_user_config_for_fetcher(
                base_config=base_config, auth_type="invalid", credentials={}
            )


@pytest.mark.integration
class TestSharedSSLProxyConfiguration:
    """Test shared SSL and proxy configuration between services."""

    def test_ssl_configuration_shared(self):
        """Test that SSL configuration is applied consistently."""
        with MockEnvironment.basic_auth_env():
            # Set SSL verification to false for both services
            with patch.dict(
                os.environ,
                {"JIRA_SSL_VERIFY": "false", "CONFLUENCE_SSL_VERIFY": "false"},
            ):
                jira_config = JiraConfig.from_env()
                confluence_config = ConfluenceConfig.from_env()

                assert jira_config.ssl_verify is False
                assert confluence_config.ssl_verify is False

                # Test SSL adapter configuration
                jira_session = Session()
                confluence_session = Session()

                configure_ssl_verification(
                    "Jira", jira_config.url, jira_session, ssl_verify=False
                )
                configure_ssl_verification(
                    "Confluence",
                    confluence_config.url,
                    confluence_session,
                    ssl_verify=False,
                )

                # Extract domains
                jira_domain = jira_config.url.split("://")[1].split("/")[0]
                confluence_domain = confluence_config.url.split("://")[1].split("/")[0]

                # Both should have SSL ignore adapters
                assert f"https://{jira_domain}" in jira_session.adapters
                assert f"https://{confluence_domain}" in confluence_session.adapters

    def test_proxy_configuration_shared(self):
        """Test that proxy configuration is shared between services."""
        proxy_config = {
            "HTTP_PROXY": "http://proxy.example.com:8080",
            "HTTPS_PROXY": "https://proxy.example.com:8443",
            "NO_PROXY": "localhost,127.0.0.1",
        }

        with MockEnvironment.basic_auth_env():
            with patch.dict(os.environ, proxy_config):
                jira_config = JiraConfig.from_env()
                confluence_config = ConfluenceConfig.from_env()

                # Both services should have the same proxy configuration
                assert jira_config.http_proxy == proxy_config["HTTP_PROXY"]
                assert jira_config.https_proxy == proxy_config["HTTPS_PROXY"]
                assert jira_config.no_proxy == proxy_config["NO_PROXY"]

                assert confluence_config.http_proxy == proxy_config["HTTP_PROXY"]
                assert confluence_config.https_proxy == proxy_config["HTTPS_PROXY"]
                assert confluence_config.no_proxy == proxy_config["NO_PROXY"]


@pytest.mark.integration
class TestConcurrentServiceInitialization:
    """Test concurrent initialization of both services."""

    @pytest.mark.anyio
    async def test_concurrent_service_startup(self):
        """Test that both services can be initialized concurrently."""
        with MockEnvironment.basic_auth_env():
            app = AtlassianMCP("Test MCP")

            # Track initialization order
            init_order = []

            def mock_jira_init(*args, **kwargs):
                init_order.append("jira_start")
                # Can't use async sleep in sync function, just append both immediately
                init_order.append("jira_end")
                return MagicMock(is_auth_configured=MagicMock(return_value=True))

            def mock_confluence_init(*args, **kwargs):
                init_order.append("confluence_start")
                init_order.append("confluence_end")
                return MagicMock(is_auth_configured=MagicMock(return_value=True))

            with (
                patch(
                    "mcp_atlassian.jira.config.JiraConfig.from_env",
                    side_effect=mock_jira_init,
                ),
                patch(
                    "mcp_atlassian.confluence.config.ConfluenceConfig.from_env",
                    side_effect=mock_confluence_init,
                ),
            ):
                async with main_lifespan(app) as lifespan_data:
                    context = lifespan_data["app_lifespan_context"]

                    # Both services should be initialized
                    assert context.full_jira_config is not None
                    assert context.full_confluence_config is not None

                    # Verify concurrent initialization (interleaved order)
                    assert "jira_start" in init_order
                    assert "confluence_start" in init_order

    @pytest.mark.anyio
    async def test_parallel_fetcher_creation(self):
        """Test that fetchers can be created in parallel for both services."""
        with MockEnvironment.oauth_env():
            # Create mock request with user context
            request = MockFastMCP.create_request()
            request.state.user_atlassian_auth_type = "oauth"
            request.state.user_atlassian_token = "test-token"
            request.state.user_atlassian_email = "[email protected]"

            # Create context
            ctx = MockFastMCP.create_context()
            ctx.request_context = MagicMock()
            ctx.request_context.lifespan_context = {
                "app_lifespan_context": MainAppContext(
                    full_jira_config=JiraConfig.from_env(),
                    full_confluence_config=ConfluenceConfig.from_env(),
                    read_only=False,
                    enabled_tools=None,
                )
            }

            with (
                patch(
                    "mcp_atlassian.servers.dependencies.get_http_request",
                    return_value=request,
                ),
                patch("mcp_atlassian.jira.JiraFetcher") as mock_jira_fetcher,
                patch(
                    "mcp_atlassian.confluence.ConfluenceFetcher"
                ) as mock_confluence_fetcher,
            ):
                # Mock fetcher instances
                mock_jira_instance = MagicMock()
                mock_jira_instance.get_current_user_account_id.return_value = "user123"
                mock_jira_fetcher.return_value = mock_jira_instance

                mock_confluence_instance = MagicMock()
                mock_confluence_instance.get_current_user_info.return_value = {
                    "email": "[email protected]",
                    "displayName": "Test User",
                }
                mock_confluence_fetcher.return_value = mock_confluence_instance

                # Create fetchers in parallel using anyio for backend compatibility
                import anyio

                async def fetch_jira():
                    return await get_jira_fetcher(ctx)

                async def fetch_confluence():
                    return await get_confluence_fetcher(ctx)

                # Wait for both using anyio task group
                async with anyio.create_task_group() as tg:
                    jira_future = None
                    confluence_future = None

                    async def set_jira():
                        nonlocal jira_future
                        jira_future = await fetch_jira()

                    async def set_confluence():
                        nonlocal confluence_future
                        confluence_future = await fetch_confluence()

                    tg.start_soon(set_jira)
                    tg.start_soon(set_confluence)

                jira_fetcher = jira_future
                confluence_fetcher = confluence_future

                # Both should be created successfully
                assert jira_fetcher is not None
                assert confluence_fetcher is not None
                assert request.state.jira_fetcher is jira_fetcher
                assert request.state.confluence_fetcher is confluence_fetcher


@pytest.mark.integration
class TestServiceAvailabilityDetection:
    """Test service availability detection and handling."""

    def test_detect_no_services_configured(self):
        """Test detection when no services are configured."""
        with MockEnvironment.clean_env():
            services = get_available_services()
            assert services["jira"] is False
            assert services["confluence"] is False

    def test_detect_only_jira_configured(self):
        """Test detection when only Jira is configured."""
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "[email protected]",
                "JIRA_API_TOKEN": "test-token",
            },
            clear=True,
        ):  # Clear environment to ensure isolation
            services = get_available_services()
            assert services["jira"] is True
            assert services["confluence"] is False

    def test_detect_only_confluence_configured(self):
        """Test detection when only Confluence is configured."""
        with patch.dict(
            os.environ,
            {
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                "CONFLUENCE_USERNAME": "[email protected]",
                "CONFLUENCE_API_TOKEN": "test-token",
            },
            clear=True,
        ):  # Clear environment to ensure isolation
            services = get_available_services()
            assert services["jira"] is False
            assert services["confluence"] is True

    def test_detect_both_services_configured(self):
        """Test detection when both services are configured."""
        with MockEnvironment.basic_auth_env():
            services = get_available_services()
            assert services["jira"] is True
            assert services["confluence"] is True

    def test_partial_configuration_detection(self):
        """Test detection with partial configuration (URL but no auth)."""
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                # No authentication credentials
            },
            clear=True,
        ):  # Clear environment to ensure isolation
            services = get_available_services()
            assert services["jira"] is False
            assert services["confluence"] is False

    @pytest.mark.anyio
    async def test_service_availability_in_lifespan(self):
        """Test that service availability is properly reflected in lifespan context."""
        # Test with only Jira configured
        with patch.dict(
            os.environ,
            {
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "[email protected]",
                "JIRA_API_TOKEN": "test-token",
            },
            clear=True,
        ):
            app = AtlassianMCP("Test MCP")

            async with main_lifespan(app) as lifespan_data:
                context = lifespan_data["app_lifespan_context"]

                # Only Jira should be configured
                assert context.full_jira_config is not None
                assert context.full_confluence_config is None

    def test_oauth_precedence_over_basic_auth(self):
        """Test that OAuth configuration takes precedence over basic auth."""
        # Set both OAuth and basic auth environment variables
        with patch.dict(
            os.environ,
            {
                # OAuth configuration
                "ATLASSIAN_OAUTH_CLIENT_ID": "oauth-client",
                "ATLASSIAN_OAUTH_CLIENT_SECRET": "oauth-secret",
                "ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080",
                "ATLASSIAN_OAUTH_SCOPE": "read:jira-work write:jira-work",
                "ATLASSIAN_OAUTH_CLOUD_ID": "cloud-123",
                # Basic auth configuration
                "JIRA_URL": "https://test.atlassian.net",
                "JIRA_USERNAME": "[email protected]",
                "JIRA_API_TOKEN": "basic-token",
                "CONFLUENCE_URL": "https://test.atlassian.net/wiki",
                "CONFLUENCE_USERNAME": "[email protected]",
                "CONFLUENCE_API_TOKEN": "basic-token",
            },
        ):
            services = get_available_services()
            assert services["jira"] is True
            assert services["confluence"] is True

            # Verify OAuth is used
            jira_config = JiraConfig.from_env()
            confluence_config = ConfluenceConfig.from_env()

            assert jira_config.auth_type == "oauth"
            assert confluence_config.auth_type == "oauth"
            assert jira_config.oauth_config is not None
            assert confluence_config.oauth_config is not None


@pytest.mark.integration
class TestCrossServiceDataSharing:
    """Test data sharing and references between services."""

    def test_jira_issue_confluence_page_link(self):
        """Test linking between Jira issues and Confluence pages."""
        # Create test data
        jira_issue = JiraIssueFactory.create(
            key="TEST-123",
            fields={
                "summary": "Test Issue",
                "description": "See documentation at https://test.atlassian.net/wiki/spaces/TEST/pages/123456",
            },
        )

        confluence_page = ConfluencePageFactory.create(
            page_id="123456",
            title="Test Documentation",
            body={
                "storage": {
                    "value": "<p>Related to <a href='https://test.atlassian.net/browse/TEST-123'>TEST-123</a></p>"
                }
            },
        )

        # Verify cross-references exist
        assert "123456" in jira_issue["fields"]["description"]
        assert "TEST-123" in confluence_page["body"]["storage"]["value"]

    def test_shared_user_mentions(self):
        """Test that user mentions work consistently across services."""
        user_account_id = "557058:c4b6b2f1-2f5f-4b85-b033-4cedbe2d2e17"

        # Jira mention format
        jira_mention = f"[~accountid:{user_account_id}]"

        # Confluence mention format
        confluence_mention = (
            f'<ac:link><ri:user ri:account-id="{user_account_id}" /></ac:link>'
        )

        # Create content with mentions
        jira_comment = {"body": f"Hey {jira_mention}, please review this issue."}

        confluence_content = {
            "body": {
                "storage": {
                    "value": f"<p>Hey {confluence_mention}, please review this page.</p>"
                }
            }
        }

        # Verify mentions are present
        assert user_account_id in jira_comment["body"]
        assert user_account_id in confluence_content["body"]["storage"]["value"]

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_users.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira users module."""

from unittest.mock import MagicMock, patch

import pytest
import requests

from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.jira.users import UsersMixin


class TestUsersMixin:
    """Tests for the UsersMixin class."""

    @pytest.fixture
    def users_mixin(self, jira_client):
        """Create a UsersMixin instance with mocked dependencies."""
        mixin = UsersMixin(config=jira_client.config)
        mixin.jira = jira_client.jira
        return mixin

    def test_get_current_user_account_id_cached(self, users_mixin):
        """Test that get_current_user_account_id returns cached value if available."""
        # Set cached value
        users_mixin._current_user_account_id = "cached-account-id"

        # Call the method
        account_id = users_mixin.get_current_user_account_id()

        # Verify result
        assert account_id == "cached-account-id"
        # Verify the API wasn't called
        users_mixin.jira.myself.assert_not_called()

    def test_get_current_user_account_id_from_api(self, users_mixin):
        """Test that get_current_user_account_id calls the API if no cached value."""
        # Ensure no cached value
        users_mixin._current_user_account_id = None

        # Mock the self.jira.myself() method
        users_mixin.jira.myself = MagicMock(
            return_value={"accountId": "test-account-id"}
        )

        # Call the method
        account_id = users_mixin.get_current_user_account_id()

        # Verify result
        assert account_id == "test-account-id"
        # Verify self.jira.myself was called
        users_mixin.jira.myself.assert_called_once()

    def test_get_current_user_account_id_data_center_timestamp_issue(self, users_mixin):
        """Test that get_current_user_account_id handles Jira Data Center with problematic timestamps."""
        # Ensure no cached value
        users_mixin._current_user_account_id = None

        # Mock the self.jira.myself() method
        users_mixin.jira.myself = MagicMock(
            return_value={
                "key": "jira-dc-user",
                "name": "DC User",
                "created": "9999-12-31T23:59:59.999+0000",
                "lastLogin": "0000-01-01T00:00:00.000+0000",
            }
        )

        # Call the method
        account_id = users_mixin.get_current_user_account_id()

        # Verify result - should extract key without timestamp parsing issues
        assert account_id == "jira-dc-user"
        # Verify self.jira.myself was called
        users_mixin.jira.myself.assert_called_once()

    def test_get_current_user_account_id_error(self, users_mixin):
        """Test that get_current_user_account_id handles errors."""
        # Ensure no cached value
        users_mixin._current_user_account_id = None

        # Mock the self.jira.myself() method to raise an exception
        users_mixin.jira.myself = MagicMock(
            side_effect=requests.RequestException("API error")
        )

        # Call the method and verify it raises the expected exception
        with pytest.raises(
            Exception, match="Unable to get current user account ID: API error"
        ):
            users_mixin.get_current_user_account_id()

        # Verify self.jira.myself was called
        users_mixin.jira.myself.assert_called_once()

    def test_get_current_user_account_id_jira_data_center_key(self, users_mixin):
        """Test that get_current_user_account_id falls back to 'key' for Jira Data Center."""
        # Ensure no cached value
        users_mixin._current_user_account_id = None

        # Mock the self.jira.myself() response with a Jira Data Center response
        users_mixin.jira.myself = MagicMock(
            return_value={"key": "jira-data-center-key", "name": "Test User"}
        )

        # Call the method
        account_id = users_mixin.get_current_user_account_id()

        # Verify result
        assert account_id == "jira-data-center-key"
        # Verify self.jira.myself was called
        users_mixin.jira.myself.assert_called_once()

    def test_get_current_user_account_id_jira_data_center_name(self, users_mixin):
        """Test that get_current_user_account_id falls back to 'name' when no 'key' or 'accountId'."""
        # Ensure no cached value
        users_mixin._current_user_account_id = None

        # Mock the self.jira.myself() response with a Jira Data Center response
        users_mixin.jira.myself = MagicMock(
            return_value={"name": "jira-data-center-name"}
        )

        # Call the method
        account_id = users_mixin.get_current_user_account_id()

        # Verify result
        assert account_id == "jira-data-center-name"
        # Verify self.jira.myself was called
        users_mixin.jira.myself.assert_called_once()

    def test_get_current_user_account_id_no_identifiers(self, users_mixin):
        """Test that get_current_user_account_id raises error when no identifiers are found."""
        # Ensure no cached value
        users_mixin._current_user_account_id = None

        # Mock the self.jira.myself() response with no identifiers
        users_mixin.jira.myself = MagicMock(return_value={"someField": "someValue"})

        # Call the method and verify it raises the expected exception
        with pytest.raises(
            Exception,
            match="Unable to get current user account ID: Could not find accountId, key, or name in user data",
        ):
            users_mixin.get_current_user_account_id()

        # Verify self.jira.myself was called
        users_mixin.jira.myself.assert_called_once()

    def test_get_account_id_already_account_id(self, users_mixin):
        """Test that _get_account_id returns the input if it looks like an account ID."""
        # Call the method with a string that looks like an account ID
        account_id = users_mixin._get_account_id("5abcdef1234567890")

        # Verify result
        assert account_id == "5abcdef1234567890"
        # Verify no lookups were performed
        users_mixin.jira.user_find_by_user_string.assert_not_called()

    def test_get_account_id_direct_lookup(self, users_mixin):
        """Test that _get_account_id uses direct lookup."""
        # Mock both methods to avoid AttributeError
        with (
            patch.object(
                users_mixin, "_lookup_user_directly", return_value="direct-account-id"
            ) as mock_direct,
            patch.object(
                users_mixin, "_lookup_user_by_permissions"
            ) as mock_permissions,
        ):
            # Call the method
            account_id = users_mixin._get_account_id("username")

            # Verify result
            assert account_id == "direct-account-id"
            # Verify direct lookup was called
            mock_direct.assert_called_once_with("username")
            # Verify permissions lookup wasn't called
            mock_permissions.assert_not_called()

    def test_get_account_id_permissions_lookup(self, users_mixin):
        """Test that _get_account_id falls back to permissions lookup."""
        # Mock direct lookup to return None
        with (
            patch.object(
                users_mixin, "_lookup_user_directly", return_value=None
            ) as mock_direct,
            patch.object(
                users_mixin,
                "_lookup_user_by_permissions",
                return_value="permissions-account-id",
            ) as mock_permissions,
        ):
            # Call the method
            account_id = users_mixin._get_account_id("username")

            # Verify result
            assert account_id == "permissions-account-id"
            # Verify both lookups were called
            mock_direct.assert_called_once_with("username")
            mock_permissions.assert_called_once_with("username")

    def test_get_account_id_not_found(self, users_mixin):
        """Test that _get_account_id raises ValueError if user not found."""
        # Mock both lookups to return None
        with (
            patch.object(users_mixin, "_lookup_user_directly", return_value=None),
            patch.object(users_mixin, "_lookup_user_by_permissions", return_value=None),
        ):
            # Call the method and verify it raises the expected exception
            with pytest.raises(
                ValueError, match="Could not find account ID for user: testuser"
            ):
                users_mixin._get_account_id("testuser")

    def test_lookup_user_directly(self, users_mixin):
        """Test _lookup_user_directly when user is found."""
        # Mock the API response
        users_mixin.jira.user_find_by_user_string.return_value = [
            {
                "accountId": "direct-account-id",
                "displayName": "Test User",
                "emailAddress": "[email protected]",
            }
        ]

        # Mock config.is_cloud to return True
        users_mixin.config = MagicMock()
        users_mixin.config.is_cloud = True

        # Call the method
        account_id = users_mixin._lookup_user_directly("Test User")

        # Verify result
        assert account_id == "direct-account-id"
        # Verify API call with query parameter for Cloud
        users_mixin.jira.user_find_by_user_string.assert_called_once_with(
            query="Test User", start=0, limit=1
        )

    def test_lookup_user_directly_server_dc(self, users_mixin):
        """Test _lookup_user_directly for Server/DC when user is found."""
        # Mock the API response
        users_mixin.jira.user_find_by_user_string.return_value = [
            {
                "key": "server-user-key",
                "name": "server-user-name",
                "displayName": "Test User",
                "emailAddress": "[email protected]",
            }
        ]

        # Mock config.is_cloud to return False for Server/DC
        users_mixin.config = MagicMock()
        users_mixin.config.is_cloud = False

        # Call the method
        account_id = users_mixin._lookup_user_directly("Test User")

        # Verify result - should now return name instead of key for Server/DC
        assert account_id == "server-user-name"
        # Verify API call with username parameter for Server/DC
        users_mixin.jira.user_find_by_user_string.assert_called_once_with(
            username="Test User", start=0, limit=1
        )

    def test_lookup_user_directly_server_dc_key_fallback(self, users_mixin):
        """Test _lookup_user_directly for Server/DC falls back to key when name is not available."""
        # Mock the API response
        users_mixin.jira.user_find_by_user_string.return_value = [
            {
                "key": "server-user-key",  # Only key, no name
                "displayName": "Test User",
                "emailAddress": "[email protected]",
            }
        ]

        # Mock config.is_cloud to return False for Server/DC
        users_mixin.config = MagicMock()
        users_mixin.config.is_cloud = False

        # Call the method
        account_id = users_mixin._lookup_user_directly("Test User")

        # Verify result - should fallback to key when name is missing
        assert account_id == "server-user-key"
        # Verify API call with username parameter for Server/DC
        users_mixin.jira.user_find_by_user_string.assert_called_once_with(
            username="Test User", start=0, limit=1
        )

    def test_lookup_user_directly_not_found(self, users_mixin):
        """Test _lookup_user_directly when user is not found."""
        # Mock empty API response
        users_mixin.jira.user_find_by_user_string.return_value = []

        # Mock config.is_cloud to return True (default case)
        users_mixin.config = MagicMock()
        users_mixin.config.is_cloud = True

        # Call the method
        account_id = users_mixin._lookup_user_directly("nonexistent")

        # Verify result
        assert account_id is None

    def test_lookup_user_directly_jira_data_center_key(self, users_mixin):
        """Test _lookup_user_directly when only 'key' is available (Data Center)."""
        # Mock the API response for Jira Data Center (has key but no accountId)
        users_mixin.jira.user_find_by_user_string.return_value = [
            {
                "key": "data-center-key",
                "displayName": "Test User",
                "emailAddress": "[email protected]",
            }
        ]

        # Mock config.is_cloud to return False for Server/DC
        users_mixin.config = MagicMock()
        users_mixin.config.is_cloud = False

        # Call the method
        account_id = users_mixin._lookup_user_directly("Test User")

        # Verify result
        assert account_id == "data-center-key"
        # Verify API call
        users_mixin.jira.user_find_by_user_string.assert_called_once_with(
            username="Test User", start=0, limit=1
        )

    def test_lookup_user_directly_jira_data_center_name(self, users_mixin):
        """Test _lookup_user_directly when only 'name' is available (Data Center)."""
        # Mock the API response for Jira Data Center (has name but no accountId or key)
        users_mixin.jira.user_find_by_user_string.return_value = [
            {
                "name": "data-center-name",
                "displayName": "Test User",
                "emailAddress": "[email protected]",
            }
        ]

        # Mock config.is_cloud to return False for Server/DC
        users_mixin.config = MagicMock()
        users_mixin.config.is_cloud = False

        # Call the method
        account_id = users_mixin._lookup_user_directly("Test User")

        # Verify result
        assert account_id == "data-center-name"
        # Verify API call
        users_mixin.jira.user_find_by_user_string.assert_called_once_with(
            username="Test User", start=0, limit=1
        )

    def test_lookup_user_directly_error(self, users_mixin):
        """Test _lookup_user_directly when API call fails."""
        # Mock API call to raise exception
        users_mixin.jira.user_find_by_user_string.side_effect = Exception("API error")

        # Call the method
        account_id = users_mixin._lookup_user_directly("error")

        # Verify result
        assert account_id is None

    def test_lookup_user_by_permissions(self, users_mixin):
        """Test _lookup_user_by_permissions when user is found."""
        # Mock requests.get
        with patch("requests.get") as mock_get:
            mock_response = MagicMock()
            mock_response.status_code = 200
            mock_response.json.return_value = {
                "users": [{"accountId": "permissions-account-id"}]
            }
            mock_get.return_value = mock_response

            # Call the method
            account_id = users_mixin._lookup_user_by_permissions("username")

            # Verify result
            assert account_id == "permissions-account-id"
            # Verify API call
            mock_get.assert_called_once()
            assert mock_get.call_args[0][0].endswith("/user/permission/search")
            assert mock_get.call_args[1]["params"] == {
                "query": "username",
                "permissions": "BROWSE",
            }

    def test_lookup_user_by_permissions_not_found(self, users_mixin):
        """Test _lookup_user_by_permissions when user is not found."""
        # Mock requests.get
        with patch("requests.get") as mock_get:
            mock_response = MagicMock()
            mock_response.status_code = 200
            mock_response.json.return_value = {"users": []}
            mock_get.return_value = mock_response

            # Call the method
            account_id = users_mixin._lookup_user_by_permissions("nonexistent")

            # Verify result
            assert account_id is None

    def test_lookup_user_by_permissions_jira_data_center(self, users_mixin):
        """Test _lookup_user_by_permissions when both 'key' and 'name' are available (Data Center)."""
        # Mock requests.get
        with patch("requests.get") as mock_get:
            mock_response = MagicMock()
            mock_response.status_code = 200
            mock_response.json.return_value = {
                "users": [
                    {
                        "key": "data-center-permissions-key",
                        "name": "data-center-permissions-name",
                    }
                ]
            }
            mock_get.return_value = mock_response

            # Mock config.is_cloud to return False for Server/DC
            users_mixin.config = MagicMock()
            users_mixin.config.is_cloud = False

            # Call the method
            account_id = users_mixin._lookup_user_by_permissions("username")

            # Verify result - should prioritize name for Server/DC
            assert account_id == "data-center-permissions-name"
            # Verify API call
            mock_get.assert_called_once()
            assert mock_get.call_args[0][0].endswith("/user/permission/search")
            assert mock_get.call_args[1]["params"] == {
                "query": "username",
                "permissions": "BROWSE",
            }

    def test_lookup_user_by_permissions_jira_data_center_key_fallback(
        self, users_mixin
    ):
        """Test _lookup_user_by_permissions when only 'key' is available (Data Center)."""
        # Mock requests.get
        with patch("requests.get") as mock_get:
            mock_response = MagicMock()
            mock_response.status_code = 200
            mock_response.json.return_value = {
                "users": [{"key": "data-center-permissions-key"}]
            }
            mock_get.return_value = mock_response

            # Mock config.is_cloud to return False for Server/DC
            users_mixin.config = MagicMock()
            users_mixin.config.is_cloud = False

            # Call the method
            account_id = users_mixin._lookup_user_by_permissions("username")

            # Verify result - should fallback to key when name is missing
            assert account_id == "data-center-permissions-key"
            # Verify API call
            mock_get.assert_called_once()
            assert mock_get.call_args[0][0].endswith("/user/permission/search")
            assert mock_get.call_args[1]["params"] == {
                "query": "username",
                "permissions": "BROWSE",
            }

    def test_lookup_user_by_permissions_error(self, users_mixin):
        """Test _lookup_user_by_permissions when API call fails."""
        # Mock requests.get to raise exception
        with patch("requests.get", side_effect=Exception("API error")):
            # Call the method
            account_id = users_mixin._lookup_user_by_permissions("error")

            # Verify result
            assert account_id is None

    def test_lookup_user_by_permissions_jira_data_center_name_only(self, users_mixin):
        """Test _lookup_user_by_permissions when only 'name' is available (Data Center)."""
        # Mock requests.get
        with patch("requests.get") as mock_get:
            mock_response = MagicMock()
            mock_response.status_code = 200
            mock_response.json.return_value = {
                "users": [{"name": "data-center-permissions-name"}]
            }
            mock_get.return_value = mock_response

            # Mock config.is_cloud to return False for Server/DC
            users_mixin.config = MagicMock()
            users_mixin.config.is_cloud = False

            # Call the method
            account_id = users_mixin._lookup_user_by_permissions("username")

            # Verify result - should use name when that's all that's available
            assert account_id == "data-center-permissions-name"
            # Verify API call
            mock_get.assert_called_once()
            assert mock_get.call_args[0][0].endswith("/user/permission/search")
            assert mock_get.call_args[1]["params"] == {
                "query": "username",
                "permissions": "BROWSE",
            }

    def test_get_user_profile_by_identifier_cloud_account_id(self, users_mixin):
        """Test get_user_profile_by_identifier with Cloud and accountId."""
        users_mixin.config = MagicMock(spec=JiraConfig)
        users_mixin.config.is_cloud = True

        with patch(
            "src.mcp_atlassian.jira.users.JiraUser.from_api_response"
        ) as mock_from_api_response:
            mock_user_instance = MagicMock()
            mock_from_api_response.return_value = mock_user_instance
            mock_response_data = {
                "accountId": "5b10ac8d82e05b22cc7d4ef5",
                "displayName": "Cloud User",
                "emailAddress": "[email protected]",
                "active": True,
            }
            users_mixin.jira.user = MagicMock(return_value=mock_response_data)
            test_account_id = "5b10ac8d82e05b22cc7d4ef5"
            user = users_mixin.get_user_profile_by_identifier(test_account_id)
            assert user == mock_user_instance
            users_mixin.jira.user.assert_called_once_with(account_id=test_account_id)
            mock_from_api_response.assert_called_once_with(mock_response_data)

    def test_get_user_profile_by_identifier_server_username(self, users_mixin):
        """Test get_user_profile_by_identifier with Server/DC and username."""
        users_mixin.config = MagicMock(spec=JiraConfig)
        users_mixin.config.is_cloud = False

        with patch(
            "src.mcp_atlassian.jira.users.JiraUser.from_api_response"
        ) as mock_from_api_response:
            mock_user_instance = MagicMock()
            mock_from_api_response.return_value = mock_user_instance
            mock_response_data = {
                "name": "server_user",
                "displayName": "Server User",
                "emailAddress": "[email protected]",
                "active": True,
            }
            users_mixin.jira.user = MagicMock(return_value=mock_response_data)
            user = users_mixin.get_user_profile_by_identifier("server_user")
            assert user == mock_user_instance
            users_mixin.jira.user.assert_called_once_with(username="server_user")
            mock_from_api_response.assert_called_once_with(mock_response_data)

    def test_get_user_profile_by_identifier_cloud_email(self, users_mixin):
        """Test get_user_profile_by_identifier with Cloud and email."""
        users_mixin.config = MagicMock(spec=JiraConfig)
        users_mixin.config.is_cloud = True
        users_mixin._lookup_user_directly = MagicMock(
            return_value="5b10ac8d82e05b22cc7d4ef5"
        )
        with patch(
            "src.mcp_atlassian.jira.users.JiraUser.from_api_response"
        ) as mock_from_api_response:
            mock_user_instance = MagicMock()
            mock_from_api_response.return_value = mock_user_instance
            mock_response_data = {
                "accountId": "5b10ac8d82e05b22cc7d4ef5",
                "displayName": "Email User",
                "emailAddress": "[email protected]",
                "active": True,
            }
            users_mixin.jira.user = MagicMock(return_value=mock_response_data)
            user = users_mixin.get_user_profile_by_identifier("[email protected]")
            assert user == mock_user_instance
            users_mixin.jira.user.assert_called_once_with(
                account_id="5b10ac8d82e05b22cc7d4ef5"
            )
            users_mixin._lookup_user_directly.assert_called_once_with(
                "[email protected]"
            )
            mock_from_api_response.assert_called_once_with(mock_response_data)

    def test_get_user_profile_by_identifier_not_found(self, users_mixin):
        """Test get_user_profile_by_identifier when user is not found (404 or cannot resolve)."""
        users_mixin.config = MagicMock(spec=JiraConfig)
        users_mixin.config.is_cloud = True
        users_mixin._lookup_user_directly = MagicMock(return_value=None)
        users_mixin._lookup_user_by_permissions = MagicMock(return_value=None)
        # Simulate the identifier cannot be resolved to an account ID
        with pytest.raises(
            ValueError, match="Could not determine how to look up user 'nonexistent'."
        ):
            users_mixin.get_user_profile_by_identifier("nonexistent")

    def test_get_user_profile_by_identifier_permission_error(self, users_mixin):
        """Test get_user_profile_by_identifier with a permission error (403)."""
        users_mixin.config = MagicMock(spec=JiraConfig)
        users_mixin.config.is_cloud = True
        users_mixin._get_account_id = MagicMock(
            return_value="account-id-for-restricted"
        )
        mock_response = MagicMock(spec=requests.Response)
        mock_response.status_code = 403
        http_error = requests.exceptions.HTTPError(response=mock_response)
        users_mixin.jira.user = MagicMock(side_effect=http_error)
        from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError

        with pytest.raises(
            MCPAtlassianAuthenticationError,
            match="Permission denied accessing user 'restricted_user'.",
        ):
            users_mixin.get_user_profile_by_identifier("restricted_user")

    def test_get_user_profile_by_identifier_api_error(self, users_mixin):
        """Test get_user_profile_by_identifier with a generic API error."""
        # Mock config
        users_mixin.config = MagicMock(spec=JiraConfig)
        users_mixin.config.is_cloud = True
        # Mock resolution methods to succeed
        users_mixin._get_account_id = MagicMock(return_value="account-id-for-error")

        # Mock API to raise a generic exception
        users_mixin.jira.user = MagicMock(side_effect=Exception("Network Timeout"))

        # Call method and assert generic Exception
        with pytest.raises(
            Exception, match="Error processing user profile for 'error_user'"
        ):
            users_mixin.get_user_profile_by_identifier("error_user")

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/confluence.py:
--------------------------------------------------------------------------------

```python
"""Confluence FastMCP server instance and tool definitions."""

import json
import logging
from typing import Annotated

from fastmcp import Context, FastMCP
from pydantic import BeforeValidator, Field

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.servers.dependencies import get_confluence_fetcher
from mcp_atlassian.utils.decorators import (
    check_write_access,
)

logger = logging.getLogger(__name__)

confluence_mcp = FastMCP(
    name="Confluence MCP Service",
    description="Provides tools for interacting with Atlassian Confluence.",
)


@confluence_mcp.tool(tags={"confluence", "read"})
async def search(
    ctx: Context,
    query: Annotated[
        str,
        Field(
            description=(
                "Search query - can be either a simple text (e.g. 'project documentation') or a CQL query string. "
                "Simple queries use 'siteSearch' by default, to mimic the WebUI search, with an automatic fallback "
                "to 'text' search if not supported. Examples of CQL:\n"
                "- Basic search: 'type=page AND space=DEV'\n"
                "- Personal space search: 'space=\"~username\"' (note: personal space keys starting with ~ must be quoted)\n"
                "- Search by title: 'title~\"Meeting Notes\"'\n"
                "- Use siteSearch: 'siteSearch ~ \"important concept\"'\n"
                "- Use text search: 'text ~ \"important concept\"'\n"
                "- Recent content: 'created >= \"2023-01-01\"'\n"
                "- Content with specific label: 'label=documentation'\n"
                "- Recently modified content: 'lastModified > startOfMonth(\"-1M\")'\n"
                "- Content modified this year: 'creator = currentUser() AND lastModified > startOfYear()'\n"
                "- Content you contributed to recently: 'contributor = currentUser() AND lastModified > startOfWeek()'\n"
                "- Content watched by user: 'watcher = \"[email protected]\" AND type = page'\n"
                '- Exact phrase in content: \'text ~ "\\"Urgent Review Required\\"" AND label = "pending-approval"\'\n'
                '- Title wildcards: \'title ~ "Minutes*" AND (space = "HR" OR space = "Marketing")\'\n'
                'Note: Special identifiers need proper quoting in CQL: personal space keys (e.g., "~username"), '
                "reserved words, numeric IDs, and identifiers with special characters."
            )
        ),
    ],
    limit: Annotated[
        int,
        Field(
            description="Maximum number of results (1-50)",
            default=10,
            ge=1,
            le=50,
        ),
    ] = 10,
    spaces_filter: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) Comma-separated list of space keys to filter results by. "
                "Overrides the environment variable CONFLUENCE_SPACES_FILTER if provided. "
                "Use empty string to disable filtering."
            ),
            default=None,
        ),
    ] = None,
) -> str:
    """Search Confluence content using simple terms or CQL.

    Args:
        ctx: The FastMCP context.
        query: Search query - can be simple text or a CQL query string.
        limit: Maximum number of results (1-50).
        spaces_filter: Comma-separated list of space keys to filter by.

    Returns:
        JSON string representing a list of simplified Confluence page objects.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    # Check if the query is a simple search term or already a CQL query
    if query and not any(
        x in query for x in ["=", "~", ">", "<", " AND ", " OR ", "currentUser()"]
    ):
        original_query = query
        try:
            query = f'siteSearch ~ "{original_query}"'
            logger.info(
                f"Converting simple search term to CQL using siteSearch: {query}"
            )
            pages = confluence_fetcher.search(
                query, limit=limit, spaces_filter=spaces_filter
            )
        except Exception as e:
            logger.warning(f"siteSearch failed ('{e}'), falling back to text search.")
            query = f'text ~ "{original_query}"'
            logger.info(f"Falling back to text search with CQL: {query}")
            pages = confluence_fetcher.search(
                query, limit=limit, spaces_filter=spaces_filter
            )
    else:
        pages = confluence_fetcher.search(
            query, limit=limit, spaces_filter=spaces_filter
        )
    search_results = [page.to_simplified_dict() for page in pages]
    return json.dumps(search_results, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "read"})
async def get_page(
    ctx: Context,
    page_id: Annotated[
        str | None,
        Field(
            description=(
                "Confluence page ID (numeric ID, can be found in the page URL). "
                "For example, in the URL 'https://example.atlassian.net/wiki/spaces/TEAM/pages/123456789/Page+Title', "
                "the page ID is '123456789'. "
                "Provide this OR both 'title' and 'space_key'. If page_id is provided, title and space_key will be ignored."
            ),
            default=None,
        ),
    ] = None,
    title: Annotated[
        str | None,
        Field(
            description=(
                "The exact title of the Confluence page. Use this with 'space_key' if 'page_id' is not known."
            ),
            default=None,
        ),
    ] = None,
    space_key: Annotated[
        str | None,
        Field(
            description=(
                "The key of the Confluence space where the page resides (e.g., 'DEV', 'TEAM'). Required if using 'title'."
            ),
            default=None,
        ),
    ] = None,
    include_metadata: Annotated[
        bool,
        Field(
            description="Whether to include page metadata such as creation date, last update, version, and labels.",
            default=True,
        ),
    ] = True,
    convert_to_markdown: Annotated[
        bool,
        Field(
            description=(
                "Whether to convert page to markdown (true) or keep it in raw HTML format (false). "
                "Raw HTML can reveal macros (like dates) not visible in markdown, but CAUTION: "
                "using HTML significantly increases token usage in AI responses."
            ),
            default=True,
        ),
    ] = True,
) -> str:
    """Get content of a specific Confluence page by its ID, or by its title and space key.

    Args:
        ctx: The FastMCP context.
        page_id: Confluence page ID. If provided, 'title' and 'space_key' are ignored.
        title: The exact title of the page. Must be used with 'space_key'.
        space_key: The key of the space. Must be used with 'title'.
        include_metadata: Whether to include page metadata.
        convert_to_markdown: Convert content to markdown (true) or keep raw HTML (false).

    Returns:
        JSON string representing the page content and/or metadata, or an error if not found or parameters are invalid.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    page_object = None

    if page_id:
        if title or space_key:
            logger.warning(
                "page_id was provided; title and space_key parameters will be ignored."
            )
        try:
            page_object = confluence_fetcher.get_page_content(
                page_id, convert_to_markdown=convert_to_markdown
            )
        except Exception as e:
            logger.error(f"Error fetching page by ID '{page_id}': {e}")
            return json.dumps(
                {"error": f"Failed to retrieve page by ID '{page_id}': {e}"},
                indent=2,
                ensure_ascii=False,
            )
    elif title and space_key:
        page_object = confluence_fetcher.get_page_by_title(
            space_key, title, convert_to_markdown=convert_to_markdown
        )
        if not page_object:
            return json.dumps(
                {
                    "error": f"Page with title '{title}' not found in space '{space_key}'."
                },
                indent=2,
                ensure_ascii=False,
            )
    else:
        raise ValueError(
            "Either 'page_id' OR both 'title' and 'space_key' must be provided."
        )

    if not page_object:
        return json.dumps(
            {"error": "Page not found with the provided identifiers."},
            indent=2,
            ensure_ascii=False,
        )

    if include_metadata:
        result = {"metadata": page_object.to_simplified_dict()}
    else:
        result = {"content": {"value": page_object.content}}

    return json.dumps(result, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "read"})
async def get_page_children(
    ctx: Context,
    parent_id: Annotated[
        str,
        Field(
            description="The ID of the parent page whose children you want to retrieve"
        ),
    ],
    expand: Annotated[
        str,
        Field(
            description="Fields to expand in the response (e.g., 'version', 'body.storage')",
            default="version",
        ),
    ] = "version",
    limit: Annotated[
        int,
        Field(
            description="Maximum number of child pages to return (1-50)",
            default=25,
            ge=1,
            le=50,
        ),
    ] = 25,
    include_content: Annotated[
        bool,
        Field(
            description="Whether to include the page content in the response",
            default=False,
        ),
    ] = False,
    convert_to_markdown: Annotated[
        bool,
        Field(
            description="Whether to convert page content to markdown (true) or keep it in raw HTML format (false). Only relevant if include_content is true.",
            default=True,
        ),
    ] = True,
    start: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
) -> str:
    """Get child pages of a specific Confluence page.

    Args:
        ctx: The FastMCP context.
        parent_id: The ID of the parent page.
        expand: Fields to expand.
        limit: Maximum number of child pages.
        include_content: Whether to include page content.
        convert_to_markdown: Convert content to markdown if include_content is true.
        start: Starting index for pagination.

    Returns:
        JSON string representing a list of child page objects.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    if include_content and "body" not in expand:
        expand = f"{expand},body.storage" if expand else "body.storage"

    try:
        pages = confluence_fetcher.get_page_children(
            page_id=parent_id,
            start=start,
            limit=limit,
            expand=expand,
            convert_to_markdown=convert_to_markdown,
        )
        child_pages = [page.to_simplified_dict() for page in pages]
        result = {
            "parent_id": parent_id,
            "count": len(child_pages),
            "limit_requested": limit,
            "start_requested": start,
            "results": child_pages,
        }
    except Exception as e:
        logger.error(
            f"Error getting/processing children for page ID {parent_id}: {e}",
            exc_info=True,
        )
        result = {"error": f"Failed to get child pages: {e}"}

    return json.dumps(result, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "read"})
async def get_comments(
    ctx: Context,
    page_id: Annotated[
        str,
        Field(
            description=(
                "Confluence page ID (numeric ID, can be parsed from URL, "
                "e.g. from 'https://example.atlassian.net/wiki/spaces/TEAM/pages/123456789/Page+Title' "
                "-> '123456789')"
            )
        ),
    ],
) -> str:
    """Get comments for a specific Confluence page.

    Args:
        ctx: The FastMCP context.
        page_id: Confluence page ID.

    Returns:
        JSON string representing a list of comment objects.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    comments = confluence_fetcher.get_page_comments(page_id)
    formatted_comments = [comment.to_simplified_dict() for comment in comments]
    return json.dumps(formatted_comments, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "read"})
async def get_labels(
    ctx: Context,
    page_id: Annotated[
        str,
        Field(
            description=(
                "Confluence page ID (numeric ID, can be parsed from URL, "
                "e.g. from 'https://example.atlassian.net/wiki/spaces/TEAM/pages/123456789/Page+Title' "
                "-> '123456789')"
            )
        ),
    ],
) -> str:
    """Get labels for a specific Confluence page.

    Args:
        ctx: The FastMCP context.
        page_id: Confluence page ID.

    Returns:
        JSON string representing a list of label objects.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    labels = confluence_fetcher.get_page_labels(page_id)
    formatted_labels = [label.to_simplified_dict() for label in labels]
    return json.dumps(formatted_labels, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "write"})
@check_write_access
async def add_label(
    ctx: Context,
    page_id: Annotated[str, Field(description="The ID of the page to update")],
    name: Annotated[str, Field(description="The name of the label")],
) -> str:
    """Add label to an existing Confluence page.

    Args:
        ctx: The FastMCP context.
        page_id: The ID of the page to update.
        name: The name of the label.

    Returns:
        JSON string representing the updated list of label objects for the page.

    Raises:
        ValueError: If in read-only mode or Confluence client is unavailable.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    labels = confluence_fetcher.add_page_label(page_id, name)
    formatted_labels = [label.to_simplified_dict() for label in labels]
    return json.dumps(formatted_labels, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "write"})
@check_write_access
async def create_page(
    ctx: Context,
    space_key: Annotated[
        str,
        Field(
            description="The key of the space to create the page in (usually a short uppercase code like 'DEV', 'TEAM', or 'DOC')"
        ),
    ],
    title: Annotated[str, Field(description="The title of the page")],
    content: Annotated[
        str,
        Field(
            description="The content of the page. Format depends on content_format parameter. Can be Markdown (default), wiki markup, or storage format"
        ),
    ],
    parent_id: Annotated[
        str | None,
        Field(
            description="(Optional) parent page ID. If provided, this page will be created as a child of the specified page",
            default=None,
        ),
        BeforeValidator(lambda x: str(x) if x is not None else None),
    ] = None,
    content_format: Annotated[
        str,
        Field(
            description="(Optional) The format of the content parameter. Options: 'markdown' (default), 'wiki', or 'storage'. Wiki format uses Confluence wiki markup syntax",
            default="markdown",
        ),
    ] = "markdown",
    enable_heading_anchors: Annotated[
        bool,
        Field(
            description="(Optional) Whether to enable automatic heading anchor generation. Only applies when content_format is 'markdown'",
            default=False,
        ),
    ] = False,
) -> str:
    """Create a new Confluence page.

    Args:
        ctx: The FastMCP context.
        space_key: The key of the space.
        title: The title of the page.
        content: The content of the page (format depends on content_format).
        parent_id: Optional parent page ID.
        content_format: The format of the content ('markdown', 'wiki', or 'storage').
        enable_heading_anchors: Whether to enable heading anchors (markdown only).

    Returns:
        JSON string representing the created page object.

    Raises:
        ValueError: If in read-only mode, Confluence client is unavailable, or invalid content_format.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)

    # Validate content_format
    if content_format not in ["markdown", "wiki", "storage"]:
        raise ValueError(
            f"Invalid content_format: {content_format}. Must be 'markdown', 'wiki', or 'storage'"
        )

    # Determine parameters based on content format
    if content_format == "markdown":
        is_markdown = True
        content_representation = None  # Will be converted to storage
    else:
        is_markdown = False
        content_representation = content_format  # Pass 'wiki' or 'storage' directly

    page = confluence_fetcher.create_page(
        space_key=space_key,
        title=title,
        body=content,
        parent_id=parent_id,
        is_markdown=is_markdown,
        enable_heading_anchors=enable_heading_anchors
        if content_format == "markdown"
        else False,
        content_representation=content_representation,
    )
    result = page.to_simplified_dict()
    return json.dumps(
        {"message": "Page created successfully", "page": result},
        indent=2,
        ensure_ascii=False,
    )


@confluence_mcp.tool(tags={"confluence", "write"})
@check_write_access
async def update_page(
    ctx: Context,
    page_id: Annotated[str, Field(description="The ID of the page to update")],
    title: Annotated[str, Field(description="The new title of the page")],
    content: Annotated[
        str,
        Field(
            description="The new content of the page. Format depends on content_format parameter"
        ),
    ],
    is_minor_edit: Annotated[
        bool, Field(description="Whether this is a minor edit", default=False)
    ] = False,
    version_comment: Annotated[
        str | None, Field(description="Optional comment for this version", default=None)
    ] = None,
    parent_id: Annotated[
        str | None,
        Field(description="Optional the new parent page ID", default=None),
        BeforeValidator(lambda x: str(x) if x is not None else None),
    ] = None,
    content_format: Annotated[
        str,
        Field(
            description="(Optional) The format of the content parameter. Options: 'markdown' (default), 'wiki', or 'storage'. Wiki format uses Confluence wiki markup syntax",
            default="markdown",
        ),
    ] = "markdown",
    enable_heading_anchors: Annotated[
        bool,
        Field(
            description="(Optional) Whether to enable automatic heading anchor generation. Only applies when content_format is 'markdown'",
            default=False,
        ),
    ] = False,
) -> str:
    """Update an existing Confluence page.

    Args:
        ctx: The FastMCP context.
        page_id: The ID of the page to update.
        title: The new title of the page.
        content: The new content of the page (format depends on content_format).
        is_minor_edit: Whether this is a minor edit.
        version_comment: Optional comment for this version.
        parent_id: Optional new parent page ID.
        content_format: The format of the content ('markdown', 'wiki', or 'storage').
        enable_heading_anchors: Whether to enable heading anchors (markdown only).

    Returns:
        JSON string representing the updated page object.

    Raises:
        ValueError: If Confluence client is not configured, available, or invalid content_format.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)

    # Validate content_format
    if content_format not in ["markdown", "wiki", "storage"]:
        raise ValueError(
            f"Invalid content_format: {content_format}. Must be 'markdown', 'wiki', or 'storage'"
        )

    # Determine parameters based on content format
    if content_format == "markdown":
        is_markdown = True
        content_representation = None  # Will be converted to storage
    else:
        is_markdown = False
        content_representation = content_format  # Pass 'wiki' or 'storage' directly

    updated_page = confluence_fetcher.update_page(
        page_id=page_id,
        title=title,
        body=content,
        is_minor_edit=is_minor_edit,
        version_comment=version_comment,
        is_markdown=is_markdown,
        parent_id=parent_id,
        enable_heading_anchors=enable_heading_anchors
        if content_format == "markdown"
        else False,
        content_representation=content_representation,
    )
    page_data = updated_page.to_simplified_dict()
    return json.dumps(
        {"message": "Page updated successfully", "page": page_data},
        indent=2,
        ensure_ascii=False,
    )


@confluence_mcp.tool(tags={"confluence", "write"})
@check_write_access
async def delete_page(
    ctx: Context,
    page_id: Annotated[str, Field(description="The ID of the page to delete")],
) -> str:
    """Delete an existing Confluence page.

    Args:
        ctx: The FastMCP context.
        page_id: The ID of the page to delete.

    Returns:
        JSON string indicating success or failure.

    Raises:
        ValueError: If Confluence client is not configured or available.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    try:
        result = confluence_fetcher.delete_page(page_id=page_id)
        if result:
            response = {
                "success": True,
                "message": f"Page {page_id} deleted successfully",
            }
        else:
            response = {
                "success": False,
                "message": f"Unable to delete page {page_id}. API request completed but deletion unsuccessful.",
            }
    except Exception as e:
        logger.error(f"Error deleting Confluence page {page_id}: {str(e)}")
        response = {
            "success": False,
            "message": f"Error deleting page {page_id}",
            "error": str(e),
        }

    return json.dumps(response, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "write"})
@check_write_access
async def add_comment(
    ctx: Context,
    page_id: Annotated[
        str, Field(description="The ID of the page to add a comment to")
    ],
    content: Annotated[
        str, Field(description="The comment content in Markdown format")
    ],
) -> str:
    """Add a comment to a Confluence page.

    Args:
        ctx: The FastMCP context.
        page_id: The ID of the page to add a comment to.
        content: The comment content in Markdown format.

    Returns:
        JSON string representing the created comment.

    Raises:
        ValueError: If in read-only mode or Confluence client is unavailable.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)
    try:
        comment = confluence_fetcher.add_comment(page_id=page_id, content=content)
        if comment:
            comment_data = comment.to_simplified_dict()
            response = {
                "success": True,
                "message": "Comment added successfully",
                "comment": comment_data,
            }
        else:
            response = {
                "success": False,
                "message": f"Unable to add comment to page {page_id}. API request completed but comment creation unsuccessful.",
            }
    except Exception as e:
        logger.error(f"Error adding comment to Confluence page {page_id}: {str(e)}")
        response = {
            "success": False,
            "message": f"Error adding comment to page {page_id}",
            "error": str(e),
        }

    return json.dumps(response, indent=2, ensure_ascii=False)


@confluence_mcp.tool(tags={"confluence", "read"})
async def search_user(
    ctx: Context,
    query: Annotated[
        str,
        Field(
            description=(
                "Search query - a CQL query string for user search. "
                "Examples of CQL:\n"
                "- Basic user lookup by full name: 'user.fullname ~ \"First Last\"'\n"
                'Note: Special identifiers need proper quoting in CQL: personal space keys (e.g., "~username"), '
                "reserved words, numeric IDs, and identifiers with special characters."
            )
        ),
    ],
    limit: Annotated[
        int,
        Field(
            description="Maximum number of results (1-50)",
            default=10,
            ge=1,
            le=50,
        ),
    ] = 10,
) -> str:
    """Search Confluence users using CQL.

    Args:
        ctx: The FastMCP context.
        query: Search query - a CQL query string for user search.
        limit: Maximum number of results (1-50).

    Returns:
        JSON string representing a list of simplified Confluence user search result objects.
    """
    confluence_fetcher = await get_confluence_fetcher(ctx)

    # If the query doesn't look like CQL, wrap it as a user fullname search
    if query and not any(
        x in query for x in ["=", "~", ">", "<", " AND ", " OR ", "user."]
    ):
        # Simple search term - search by fullname
        query = f'user.fullname ~ "{query}"'
        logger.info(f"Converting simple search term to user CQL: {query}")

    try:
        user_results = confluence_fetcher.search_user(query, limit=limit)
        search_results = [user.to_simplified_dict() for user in user_results]
        return json.dumps(search_results, indent=2, ensure_ascii=False)
    except MCPAtlassianAuthenticationError as e:
        logger.error(f"Authentication error during user search: {e}", exc_info=False)
        return json.dumps(
            {
                "error": "Authentication failed. Please check your credentials.",
                "details": str(e),
            },
            indent=2,
            ensure_ascii=False,
        )
    except Exception as e:
        logger.error(f"Error searching users: {str(e)}")
        return json.dumps(
            {
                "error": f"An unexpected error occurred while searching for users: {str(e)}"
            },
            indent=2,
            ensure_ascii=False,
        )

```

--------------------------------------------------------------------------------
/tests/integration/test_authentication.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for authentication functionality."""

import json
import time
from unittest.mock import MagicMock, Mock, patch

import pytest
import requests
from requests.exceptions import HTTPError

from mcp_atlassian.confluence.client import ConfluenceClient
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.oauth import OAuthConfig, configure_oauth_session
from tests.utils.mocks import MockEnvironment


@pytest.mark.integration
class TestOAuthTokenRefreshFlow:
    """Test OAuth token refresh flow with expiration handling."""

    def test_oauth_token_refresh_on_expiration(self):
        """Test automatic token refresh when access token is expired."""
        with MockEnvironment.oauth_env() as oauth_env:
            # Create OAuth config with expired token
            oauth_config = OAuthConfig(
                client_id=oauth_env["ATLASSIAN_OAUTH_CLIENT_ID"],
                client_secret=oauth_env["ATLASSIAN_OAUTH_CLIENT_SECRET"],
                redirect_uri=oauth_env["ATLASSIAN_OAUTH_REDIRECT_URI"],
                scope=oauth_env["ATLASSIAN_OAUTH_SCOPE"],
                cloud_id=oauth_env["ATLASSIAN_OAUTH_CLOUD_ID"],
                access_token="expired-access-token",
                refresh_token="valid-refresh-token",
                expires_at=time.time() - 3600,  # Expired 1 hour ago
            )

            # Mock the token refresh endpoint
            with patch("requests.post") as mock_post:
                mock_response = Mock()
                mock_response.ok = True
                mock_response.json.return_value = {
                    "access_token": "new-access-token",
                    "refresh_token": "new-refresh-token",
                    "expires_in": 3600,
                }
                mock_post.return_value = mock_response

                # Ensure valid token should trigger refresh
                assert oauth_config.is_token_expired is True
                result = oauth_config.ensure_valid_token()

                assert result is True
                assert oauth_config.access_token == "new-access-token"
                assert oauth_config.refresh_token == "new-refresh-token"
                assert oauth_config.expires_at > time.time()

                # Verify the refresh token request
                mock_post.assert_called_once()
                call_args = mock_post.call_args
                assert call_args[0][0] == "https://auth.atlassian.com/oauth/token"
                assert call_args[1]["data"]["grant_type"] == "refresh_token"
                assert call_args[1]["data"]["refresh_token"] == "valid-refresh-token"

    def test_oauth_token_refresh_failure_handling(self):
        """Test handling of token refresh failures."""
        with MockEnvironment.oauth_env() as oauth_env:
            # Create OAuth config with expired token
            oauth_config = OAuthConfig(
                client_id=oauth_env["ATLASSIAN_OAUTH_CLIENT_ID"],
                client_secret=oauth_env["ATLASSIAN_OAUTH_CLIENT_SECRET"],
                redirect_uri=oauth_env["ATLASSIAN_OAUTH_REDIRECT_URI"],
                scope=oauth_env["ATLASSIAN_OAUTH_SCOPE"],
                cloud_id=oauth_env["ATLASSIAN_OAUTH_CLOUD_ID"],
                access_token="expired-access-token",
                refresh_token="invalid-refresh-token",
                expires_at=time.time() - 3600,
            )

            # Mock the token refresh endpoint to fail
            with patch("requests.post") as mock_post:
                mock_response = Mock()
                mock_response.ok = False
                mock_response.raise_for_status.side_effect = HTTPError(
                    "401 Unauthorized"
                )
                mock_post.return_value = mock_response

                # Ensure valid token should fail
                result = oauth_config.ensure_valid_token()
                assert result is False

    def test_oauth_token_expiry_margin(self):
        """Test that tokens are refreshed before actual expiration."""
        with MockEnvironment.oauth_env():
            # Create OAuth config with token expiring in 4 minutes (within margin)
            oauth_config = OAuthConfig(
                client_id="test-client",
                client_secret="test-secret",
                redirect_uri="http://localhost:8080",
                scope="read:jira",
                access_token="almost-expired-token",
                refresh_token="valid-refresh-token",
                expires_at=time.time() + 240,  # 4 minutes from now
            )

            # Token should be considered expired due to margin
            assert oauth_config.is_token_expired is True

            # Create token expiring in 10 minutes (outside margin)
            oauth_config.expires_at = time.time() + 600
            assert oauth_config.is_token_expired is False


@pytest.mark.integration
class TestBasicAuthValidation:
    """Test basic authentication validation against real endpoints."""

    @patch("mcp_atlassian.jira.client.Jira")
    def test_jira_basic_auth_success(self, mock_jira_class):
        """Test successful Jira basic authentication."""
        with MockEnvironment.basic_auth_env() as auth_env:
            # Create mock Jira instance
            mock_jira = MagicMock()
            mock_jira_class.return_value = mock_jira

            # Create Jira client
            config = JiraConfig.from_env()
            client = JiraClient(config)

            # Verify Jira was initialized with correct params
            mock_jira_class.assert_called_once_with(
                url=auth_env["JIRA_URL"],
                username=auth_env["JIRA_USERNAME"],
                password=auth_env["JIRA_API_TOKEN"],
                cloud=True,  # Assuming cloud by default
                verify_ssl=True,
            )

    @patch("mcp_atlassian.confluence.client.Confluence")
    def test_confluence_basic_auth_success(self, mock_confluence_class):
        """Test successful Confluence basic authentication."""
        with MockEnvironment.basic_auth_env() as auth_env:
            # Create mock Confluence instance
            mock_confluence = MagicMock()
            mock_confluence_class.return_value = mock_confluence

            # Create Confluence client
            config = ConfluenceConfig.from_env()
            client = ConfluenceClient(config)

            # Verify Confluence was initialized with correct params
            mock_confluence_class.assert_called_once_with(
                url=auth_env["CONFLUENCE_URL"],
                username=auth_env["CONFLUENCE_USERNAME"],
                password=auth_env["CONFLUENCE_API_TOKEN"],
                cloud=True,
                verify_ssl=True,
            )

    def test_basic_auth_with_invalid_credentials(self):
        """Test basic authentication with invalid credentials."""
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://test.atlassian.net",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "invalid-token",
                },
            ):
                with patch("mcp_atlassian.jira.client.Jira") as mock_jira_class:
                    # Make Jira constructor raise authentication error
                    mock_jira_class.side_effect = HTTPError("401 Unauthorized")

                    config = JiraConfig.from_env()
                    with pytest.raises(HTTPError):
                        JiraClient(config)


@pytest.mark.integration
class TestPATTokenValidation:
    """Test Personal Access Token (PAT) validation and precedence."""

    @patch("mcp_atlassian.jira.client.Jira")
    def test_jira_pat_token_success(self, mock_jira_class):
        """Test successful Jira PAT authentication."""
        # Clear existing auth env vars first
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://jira.company.com",  # Server URL for PAT
                    "JIRA_PERSONAL_TOKEN": "test-personal-access-token",
                },
            ):
                # Create mock Jira instance
                mock_jira = MagicMock()
                mock_jira_class.return_value = mock_jira

                # Create Jira client
                config = JiraConfig.from_env()
                client = JiraClient(config)

                # Verify Jira was initialized with PAT token
                mock_jira_class.assert_called_once_with(
                    url="https://jira.company.com",
                    token="test-personal-access-token",
                    cloud=False,  # Server instance
                    verify_ssl=True,
                )

    @patch("mcp_atlassian.confluence.client.Confluence")
    def test_confluence_pat_token_success(self, mock_confluence_class):
        """Test successful Confluence PAT authentication."""
        # Clear existing auth env vars first
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "CONFLUENCE_URL": "https://confluence.company.com",  # Server URL for PAT
                    "CONFLUENCE_PERSONAL_TOKEN": "test-personal-access-token",
                },
            ):
                # Create mock Confluence instance
                mock_confluence = MagicMock()
                mock_confluence_class.return_value = mock_confluence

                # Create Confluence client
                config = ConfluenceConfig.from_env()
                client = ConfluenceClient(config)

                # Verify Confluence was initialized with PAT token
                mock_confluence_class.assert_called_once_with(
                    url="https://confluence.company.com",
                    token="test-personal-access-token",
                    cloud=False,  # Server instance
                    verify_ssl=True,
                )

    def test_pat_token_precedence_over_basic_auth(self):
        """Test that PAT token takes precedence over basic auth when both are present."""
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://jira.company.com",  # Server URL for PAT
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "basic-api-token",
                    "JIRA_PERSONAL_TOKEN": "personal-access-token",
                },
            ):
                config = JiraConfig.from_env()
                assert config.auth_type == "pat"
                assert config.personal_token == "personal-access-token"


@pytest.mark.integration
class TestAuthenticationFailureRecovery:
    """Test authentication failure recovery patterns."""

    def test_oauth_to_basic_auth_fallback(self):
        """Test fallback from OAuth to basic auth when OAuth fails."""
        # Set up environment with both OAuth and basic auth but incomplete OAuth
        with MockEnvironment.clean_env():
            # Mock token loading to return empty (no stored tokens)
            with patch(
                "mcp_atlassian.utils.oauth.OAuthConfig.load_tokens", return_value={}
            ):
                with patch.dict(
                    "os.environ",
                    {
                        # OAuth config - incomplete (missing cloud_id)
                        "ATLASSIAN_OAUTH_CLIENT_ID": "test-client",
                        "ATLASSIAN_OAUTH_CLIENT_SECRET": "test-secret",
                        "ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080",
                        "ATLASSIAN_OAUTH_SCOPE": "read:jira",
                        # Basic auth config
                        "JIRA_URL": "https://test.atlassian.net",
                        "JIRA_USERNAME": "[email protected]",
                        "JIRA_API_TOKEN": "api-token",
                    },
                ):
                    # Without cloud_id, OAuth config is incomplete and should fallback to basic
                    config = JiraConfig.from_env()
                    assert config.auth_type == "basic"  # Falls back to basic auth

                    # Now add cloud_id to complete OAuth config
                    with patch.dict(
                        "os.environ", {"ATLASSIAN_OAUTH_CLOUD_ID": "test-cloud-id"}
                    ):
                        config = JiraConfig.from_env()
                        assert config.auth_type == "oauth"

                        # OAuth should fail without valid tokens
                        with pytest.raises(
                            MCPAtlassianAuthenticationError,
                            match="Failed to configure OAuth session",
                        ):
                            JiraClient(config)

    def test_authentication_retry_on_401(self):
        """Test retry behavior on 401 authentication errors."""
        with MockEnvironment.oauth_env():
            oauth_config = OAuthConfig(
                client_id="test-client",
                client_secret="test-secret",
                redirect_uri="http://localhost:8080",
                scope="read:jira",
                cloud_id="test-cloud",
                access_token="expired-token",
                refresh_token="valid-refresh-token",
                expires_at=time.time() - 3600,
            )

            session = requests.Session()

            # Mock token refresh to succeed
            with patch.object(oauth_config, "refresh_access_token") as mock_refresh:
                mock_refresh.return_value = True
                oauth_config.access_token = "new-token"

                result = configure_oauth_session(session, oauth_config)
                assert result is True
                assert session.headers["Authorization"] == "Bearer new-token"
                mock_refresh.assert_called_once()


@pytest.mark.integration
class TestTokenExpirationAndRetry:
    """Test token expiration and automatic retry."""

    def test_automatic_token_refresh_in_session(self):
        """Test that expired tokens are automatically refreshed in session."""
        with MockEnvironment.oauth_env():
            # Create OAuth config with soon-to-expire token
            oauth_config = OAuthConfig(
                client_id="test-client",
                client_secret="test-secret",
                redirect_uri="http://localhost:8080",
                scope="read:jira",
                cloud_id="test-cloud",
                access_token="expiring-token",
                refresh_token="valid-refresh-token",
                expires_at=time.time() + 100,  # Expires in 100 seconds (within margin)
            )

            session = requests.Session()

            # Mock the refresh token call
            with patch("requests.post") as mock_post:
                mock_response = Mock()
                mock_response.ok = True
                mock_response.json.return_value = {
                    "access_token": "refreshed-token",
                    "expires_in": 3600,
                }
                mock_post.return_value = mock_response

                # Configure session should refresh the token
                result = configure_oauth_session(session, oauth_config)
                assert result is True
                assert oauth_config.access_token == "refreshed-token"
                assert session.headers["Authorization"] == "Bearer refreshed-token"

    def test_token_storage_and_retrieval(self):
        """Test token storage in keyring and retrieval."""
        client_id = "test-client-storage"

        # Mock keyring operations
        with (
            patch("keyring.set_password") as mock_set,
            patch("keyring.get_password") as mock_get,
        ):
            # Create OAuth config and save tokens
            oauth_config = OAuthConfig(
                client_id=client_id,
                client_secret="test-secret",
                redirect_uri="http://localhost:8080",
                scope="read:jira",
                cloud_id="test-cloud",
                access_token="stored-token",
                refresh_token="stored-refresh",
                expires_at=time.time() + 3600,
            )

            # Save tokens
            oauth_config._save_tokens()

            # Verify keyring was called
            mock_set.assert_called_once()
            service_name, username, token_json = mock_set.call_args[0]
            assert service_name == "mcp-atlassian-oauth"
            assert username == f"oauth-{client_id}"

            # Parse stored token data
            stored_data = json.loads(token_json)
            assert stored_data["access_token"] == "stored-token"
            assert stored_data["refresh_token"] == "stored-refresh"
            assert stored_data["cloud_id"] == "test-cloud"

            # Test token retrieval
            mock_get.return_value = token_json
            loaded_data = OAuthConfig.load_tokens(client_id)
            assert loaded_data["access_token"] == "stored-token"
            assert loaded_data["refresh_token"] == "stored-refresh"


@pytest.mark.integration
class TestMixedAuthenticationScenarios:
    """Test mixed authentication scenarios and fallback patterns."""

    def test_oauth_with_direct_access_token(self):
        """Test OAuth config with only access token (no refresh token)."""
        session = requests.Session()

        # Create OAuth config with only access token
        oauth_config = OAuthConfig(
            client_id="test-client",
            client_secret="test-secret",
            redirect_uri="http://localhost:8080",
            scope="read:jira",
            access_token="direct-access-token",
            # No refresh_token, no expires_at
        )

        # Should use token directly without refresh attempt
        result = configure_oauth_session(session, oauth_config)
        assert result is True
        assert session.headers["Authorization"] == "Bearer direct-access-token"

    def test_environment_detection_priority(self):
        """Test authentication method detection priority from environment."""
        # Test with all auth methods present - OAuth should take precedence
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    # OAuth
                    "ATLASSIAN_OAUTH_CLIENT_ID": "oauth-client",
                    "ATLASSIAN_OAUTH_CLIENT_SECRET": "oauth-secret",
                    "ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080",
                    "ATLASSIAN_OAUTH_SCOPE": "read:jira",
                    "ATLASSIAN_OAUTH_CLOUD_ID": "test-cloud-id",
                    # PAT
                    "JIRA_PERSONAL_TOKEN": "personal-token",
                    # Basic
                    "JIRA_URL": "https://test.atlassian.net",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "api-token",
                },
            ):
                config = JiraConfig.from_env()
                assert config.auth_type == "oauth"

        # Test with PAT and basic - PAT should take precedence (for server)
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://jira.company.com",  # Server URL
                    "JIRA_PERSONAL_TOKEN": "personal-token",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "api-token",
                },
            ):
                config = JiraConfig.from_env()
                assert config.auth_type == "pat"

        # Test with only basic auth
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://test.atlassian.net",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "api-token",
                },
            ):
                config = JiraConfig.from_env()
                assert config.auth_type == "basic"

    def test_cloud_vs_server_authentication(self):
        """Test authentication differences between cloud and server instances."""
        # Cloud instance (default)
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://example.atlassian.net",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "api-token",
                },
            ):
                config = JiraConfig.from_env()
                assert config.is_cloud is True

        # Server instance
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://jira.company.com",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "api-token",
                },
            ):
                config = JiraConfig.from_env()
                assert config.is_cloud is False


@pytest.mark.integration
class TestJiraConfluenceAuthFlows:
    """Test authentication flows for both Jira and Confluence services."""

    @patch("mcp_atlassian.confluence.client.Confluence")
    @patch("mcp_atlassian.jira.client.Jira")
    def test_shared_oauth_config_both_services(
        self, mock_jira_class, mock_confluence_class
    ):
        """Test that both services can share the same OAuth configuration."""
        with MockEnvironment.oauth_env():
            # Mock cloud ID retrieval
            with patch("requests.get") as mock_get:
                mock_response = Mock()
                mock_response.ok = True
                mock_response.json.return_value = [{"id": "test-cloud-id"}]
                mock_get.return_value = mock_response

                # Create OAuth config
                oauth_config = OAuthConfig.from_env()
                oauth_config.access_token = "shared-token"
                oauth_config.cloud_id = "test-cloud-id"

                # Create both clients with same OAuth config
                jira_config = JiraConfig(
                    url="https://test.atlassian.net",
                    auth_type="oauth",
                    oauth_config=oauth_config,
                )
                confluence_config = ConfluenceConfig(
                    url="https://test.atlassian.net/wiki",
                    auth_type="oauth",
                    oauth_config=oauth_config,
                )

                # Initialize clients
                jira_client = JiraClient(jira_config)
                confluence_client = ConfluenceClient(confluence_config)

                # Verify both were initialized with OAuth URLs
                jira_url = mock_jira_class.call_args[1]["url"]
                confluence_url = mock_confluence_class.call_args[1]["url"]

                assert jira_url == "https://api.atlassian.com/ex/jira/test-cloud-id"
                assert (
                    confluence_url
                    == "https://api.atlassian.com/ex/confluence/test-cloud-id"
                )

    def test_service_specific_auth_override(self):
        """Test that service-specific auth overrides shared configuration."""
        with MockEnvironment.clean_env():
            # Mock token loading to return empty (no stored tokens)
            with patch(
                "mcp_atlassian.utils.oauth.OAuthConfig.load_tokens", return_value={}
            ):
                # Test case 1: Only service-specific auth (no OAuth config)
                with patch.dict(
                    "os.environ",
                    {
                        # Jira-specific PAT for server
                        "JIRA_URL": "https://jira.company.com",
                        "JIRA_PERSONAL_TOKEN": "jira-pat-token",
                        # Confluence basic auth
                        "CONFLUENCE_URL": "https://confluence.atlassian.net",
                        "CONFLUENCE_USERNAME": "[email protected]",
                        "CONFLUENCE_API_TOKEN": "conf-api-token",
                    },
                ):
                    # Jira uses PAT (server instance)
                    jira_config = JiraConfig.from_env()
                    assert jira_config.auth_type == "pat"

                    # Confluence uses basic auth (cloud instance)
                    confluence_config = ConfluenceConfig.from_env()
                    assert confluence_config.auth_type == "basic"

                # Test case 2: OAuth takes precedence when fully configured
                with patch.dict(
                    "os.environ",
                    {
                        # Shared OAuth config
                        "ATLASSIAN_OAUTH_CLIENT_ID": "shared-client",
                        "ATLASSIAN_OAUTH_CLIENT_SECRET": "shared-secret",
                        "ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080",
                        "ATLASSIAN_OAUTH_SCOPE": "read:jira read:confluence",
                        "ATLASSIAN_OAUTH_CLOUD_ID": "test-cloud-id",
                        # Service-specific auth also present
                        "JIRA_URL": "https://jira.company.com",
                        "JIRA_USERNAME": "[email protected]",
                        "JIRA_API_TOKEN": "jira-token",
                        "CONFLUENCE_URL": "https://confluence.atlassian.net",
                    },
                ):
                    # OAuth takes precedence for both when cloud_id is present
                    jira_config = JiraConfig.from_env()
                    assert jira_config.auth_type == "oauth"

                    confluence_config = ConfluenceConfig.from_env()
                    assert confluence_config.auth_type == "oauth"

    def test_ssl_and_proxy_with_authentication(self):
        """Test SSL verification and proxy settings work with authentication."""
        with MockEnvironment.clean_env():
            with patch.dict(
                "os.environ",
                {
                    "JIRA_URL": "https://test.atlassian.net",
                    "JIRA_USERNAME": "[email protected]",
                    "JIRA_API_TOKEN": "api-token",
                    "JIRA_SSL_VERIFY": "false",
                    "HTTPS_PROXY": "http://proxy.company.com:8080",
                },
            ):
                config = JiraConfig.from_env()
                assert config.ssl_verify is False
                assert config.https_proxy == "http://proxy.company.com:8080"

                with patch("mcp_atlassian.jira.client.Jira") as mock_jira:
                    client = JiraClient(config)
                    # Verify SSL verification was disabled
                    mock_jira.assert_called_with(
                        url="https://test.atlassian.net",
                        username="[email protected]",
                        password="api-token",
                        cloud=True,
                        verify_ssl=False,
                    )

```
Page 6/10FirstPrevNextLast