#
tokens: 46803/50000 14/194 files (page 4/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/unit/jira/test_worklog.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira Worklog mixin."""

from unittest.mock import MagicMock

import pytest

from mcp_atlassian.jira.worklog import WorklogMixin


class TestWorklogMixin:
    """Tests for the WorklogMixin class."""

    @pytest.fixture
    def worklog_mixin(self, jira_client):
        """Create a WorklogMixin instance with mocked dependencies."""
        mixin = WorklogMixin(config=jira_client.config)
        mixin.jira = jira_client.jira

        # Mock methods that are typically provided by other mixins
        mixin._clean_text = MagicMock(side_effect=lambda text: text if text else "")

        return mixin

    def test_parse_time_spent_with_seconds(self, worklog_mixin):
        """Test parsing time spent with seconds specification."""
        assert worklog_mixin._parse_time_spent("60s") == 60
        assert worklog_mixin._parse_time_spent("3600s") == 3600

    def test_parse_time_spent_with_minutes(self, worklog_mixin):
        """Test parsing time spent with minutes."""
        assert worklog_mixin._parse_time_spent("1m") == 60
        assert worklog_mixin._parse_time_spent("30m") == 1800

    def test_parse_time_spent_with_hours(self, worklog_mixin):
        """Test parsing time spent with hours."""
        assert worklog_mixin._parse_time_spent("1h") == 3600
        assert worklog_mixin._parse_time_spent("2h") == 7200

    def test_parse_time_spent_with_days(self, worklog_mixin):
        """Test parsing time spent with days."""
        assert worklog_mixin._parse_time_spent("1d") == 86400
        assert worklog_mixin._parse_time_spent("2d") == 172800

    def test_parse_time_spent_with_weeks(self, worklog_mixin):
        """Test parsing time spent with weeks."""
        assert worklog_mixin._parse_time_spent("1w") == 604800
        assert worklog_mixin._parse_time_spent("2w") == 1209600

    def test_parse_time_spent_with_mixed_units(self, worklog_mixin):
        """Test parsing time spent with mixed units."""
        assert worklog_mixin._parse_time_spent("1h 30m") == 5400
        assert worklog_mixin._parse_time_spent("1d 6h") == 108000
        assert worklog_mixin._parse_time_spent("1w 2d 3h 4m") == 788640

    def test_parse_time_spent_with_invalid_input(self, worklog_mixin):
        """Test parsing time spent with invalid input."""
        # Should default to 60 seconds
        assert worklog_mixin._parse_time_spent("invalid") == 60

    def test_parse_time_spent_with_numeric_input(self, worklog_mixin):
        """Test parsing time spent with numeric input."""
        assert worklog_mixin._parse_time_spent("60") == 60
        assert worklog_mixin._parse_time_spent("3600") == 3600

    def test_get_worklogs_basic(self, worklog_mixin):
        """Test basic functionality of get_worklogs."""
        # Setup mock response
        mock_result = {
            "worklogs": [
                {
                    "id": "10001",
                    "comment": "Work item 1",
                    "created": "2024-01-01T10:00:00.000+0000",
                    "updated": "2024-01-01T10:30:00.000+0000",
                    "started": "2024-01-01T09:00:00.000+0000",
                    "timeSpent": "1h",
                    "timeSpentSeconds": 3600,
                    "author": {"displayName": "Test User"},
                }
            ]
        }
        worklog_mixin.jira.issue_get_worklog.return_value = mock_result

        # Call the method
        result = worklog_mixin.get_worklogs("TEST-123")

        # Verify
        worklog_mixin.jira.issue_get_worklog.assert_called_once_with("TEST-123")
        assert len(result) == 1
        assert result[0]["id"] == "10001"
        assert result[0]["comment"] == "Work item 1"
        assert result[0]["timeSpent"] == "1h"
        assert result[0]["timeSpentSeconds"] == 3600
        assert result[0]["author"] == "Test User"

    def test_get_worklogs_with_multiple_entries(self, worklog_mixin):
        """Test get_worklogs with multiple worklog entries."""
        # Setup mock response with multiple entries
        mock_result = {
            "worklogs": [
                {
                    "id": "10001",
                    "comment": "Work item 1",
                    "created": "2024-01-01T10:00:00.000+0000",
                    "timeSpent": "1h",
                    "timeSpentSeconds": 3600,
                    "author": {"displayName": "User 1"},
                },
                {
                    "id": "10002",
                    "comment": "Work item 2",
                    "created": "2024-01-02T10:00:00.000+0000",
                    "timeSpent": "2h",
                    "timeSpentSeconds": 7200,
                    "author": {"displayName": "User 2"},
                },
            ]
        }
        worklog_mixin.jira.issue_get_worklog.return_value = mock_result

        # Call the method
        result = worklog_mixin.get_worklogs("TEST-123")

        # Verify
        assert len(result) == 2
        assert result[0]["id"] == "10001"
        assert result[1]["id"] == "10002"
        assert result[0]["timeSpentSeconds"] == 3600
        assert result[1]["timeSpentSeconds"] == 7200

    def test_get_worklogs_with_missing_fields(self, worklog_mixin):
        """Test get_worklogs with missing fields."""
        # Setup mock response with missing fields
        mock_result = {
            "worklogs": [
                {
                    "id": "10001",
                    # Missing comment
                    "created": "2024-01-01T10:00:00.000+0000",
                    # Missing other fields
                }
            ]
        }
        worklog_mixin.jira.issue_get_worklog.return_value = mock_result

        # Call the method
        result = worklog_mixin.get_worklogs("TEST-123")

        # Verify
        assert len(result) == 1
        assert result[0]["id"] == "10001"
        assert result[0]["comment"] == ""
        assert result[0]["timeSpent"] == ""
        assert result[0]["timeSpentSeconds"] == 0
        assert result[0]["author"] == "Unknown"

    def test_get_worklogs_with_empty_response(self, worklog_mixin):
        """Test get_worklogs with empty response."""
        # Setup mock response with no worklogs
        worklog_mixin.jira.issue_get_worklog.return_value = {}

        # Call the method
        result = worklog_mixin.get_worklogs("TEST-123")

        # Verify
        assert isinstance(result, list)
        assert len(result) == 0

    def test_get_worklogs_with_error(self, worklog_mixin):
        """Test get_worklogs error handling."""
        # Setup mock to raise exception
        worklog_mixin.jira.issue_get_worklog.side_effect = Exception(
            "Worklog fetch error"
        )

        # Call the method and verify exception
        with pytest.raises(
            Exception, match="Error getting worklogs: Worklog fetch error"
        ):
            worklog_mixin.get_worklogs("TEST-123")

    def test_add_worklog_basic(self, worklog_mixin):
        """Test basic functionality of add_worklog."""
        # Setup mock response
        mock_result = {
            "id": "10001",
            "comment": "Added work",
            "created": "2024-01-01T10:00:00.000+0000",
            "updated": "2024-01-01T10:00:00.000+0000",
            "started": "2024-01-01T09:00:00.000+0000",
            "timeSpent": "1h",
            "timeSpentSeconds": 3600,
            "author": {"displayName": "Test User"},
        }
        worklog_mixin.jira.post.return_value = mock_result
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Call the method
        result = worklog_mixin.add_worklog("TEST-123", "1h", comment="Added work")

        # Verify
        worklog_mixin.jira.resource_url.assert_called_once_with("issue")
        worklog_mixin.jira.post.assert_called_once()
        assert result["id"] == "10001"
        assert result["comment"] == "Added work"
        assert result["timeSpent"] == "1h"
        assert result["timeSpentSeconds"] == 3600
        assert result["author"] == "Test User"
        assert result["original_estimate_updated"] is False
        assert result["remaining_estimate_updated"] is False

    def test_add_worklog_with_original_estimate(self, worklog_mixin):
        """Test add_worklog with original estimate update."""
        # Setup mocks
        mock_result = {
            "id": "10001",
            "timeSpent": "1h",
            "timeSpentSeconds": 3600,
        }
        worklog_mixin.jira.post.return_value = mock_result
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Call the method
        result = worklog_mixin.add_worklog("TEST-123", "1h", original_estimate="4h")

        # Verify
        worklog_mixin.jira.edit_issue.assert_called_once_with(
            issue_id_or_key="TEST-123",
            fields={"timetracking": {"originalEstimate": "4h"}},
        )
        assert result["original_estimate_updated"] is True

    def test_add_worklog_with_remaining_estimate(self, worklog_mixin):
        """Test add_worklog with remaining estimate update."""
        # Setup mocks
        mock_result = {
            "id": "10001",
            "timeSpent": "1h",
            "timeSpentSeconds": 3600,
        }
        worklog_mixin.jira.post.return_value = mock_result
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Call the method
        result = worklog_mixin.add_worklog("TEST-123", "1h", remaining_estimate="3h")

        # Verify post call has correct parameters
        call_args = worklog_mixin.jira.post.call_args
        assert call_args is not None
        args, kwargs = call_args

        # Check that adjustEstimate=new and newEstimate=3h are in params
        assert "params" in kwargs
        assert kwargs["params"]["adjustEstimate"] == "new"
        assert kwargs["params"]["newEstimate"] == "3h"

        assert result["remaining_estimate_updated"] is True

    def test_add_worklog_with_started_time(self, worklog_mixin):
        """Test add_worklog with started time."""
        # Setup mocks
        mock_result = {
            "id": "10001",
            "timeSpent": "1h",
            "timeSpentSeconds": 3600,
        }
        worklog_mixin.jira.post.return_value = mock_result
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Setup started time
        started_time = "2024-01-01T09:00:00.000+0000"

        # Call the method
        worklog_mixin.add_worklog("TEST-123", "1h", started=started_time)

        # Verify worklog data contains started time
        call_args = worklog_mixin.jira.post.call_args
        assert call_args is not None
        args, kwargs = call_args

        assert "data" in kwargs
        assert kwargs["data"]["started"] == started_time

    def test_add_worklog_with_markdown_to_jira_available(self, worklog_mixin):
        """Test add_worklog with _markdown_to_jira conversion."""
        # Setup mocks
        mock_result = {
            "id": "10001",
            "timeSpent": "1h",
            "timeSpentSeconds": 3600,
        }
        worklog_mixin.jira.post.return_value = mock_result
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Add _markdown_to_jira method
        worklog_mixin._markdown_to_jira = MagicMock(return_value="Converted comment")

        # Call the method
        worklog_mixin.add_worklog("TEST-123", "1h", comment="**Markdown** comment")

        # Verify _markdown_to_jira was called
        worklog_mixin._markdown_to_jira.assert_called_once_with("**Markdown** comment")

        # Verify converted comment was used
        call_args = worklog_mixin.jira.post.call_args
        assert call_args is not None
        args, kwargs = call_args

        assert "data" in kwargs
        assert kwargs["data"]["comment"] == "Converted comment"

    def test_add_worklog_with_error(self, worklog_mixin):
        """Test add_worklog error handling."""
        # Setup mock to raise exception
        worklog_mixin.jira.post.side_effect = Exception("Worklog add error")
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Call the method and verify exception
        with pytest.raises(Exception, match="Error adding worklog: Worklog add error"):
            worklog_mixin.add_worklog("TEST-123", "1h")

    def test_add_worklog_with_original_estimate_error(self, worklog_mixin):
        """Test add_worklog with original estimate update error."""
        # Setup mocks
        mock_result = {
            "id": "10001",
            "timeSpent": "1h",
            "timeSpentSeconds": 3600,
        }
        worklog_mixin.jira.post.return_value = mock_result
        worklog_mixin.jira.resource_url.return_value = (
            "https://jira.example.com/rest/api/2/issue"
        )

        # Make edit_issue raise an exception
        worklog_mixin.jira.edit_issue.side_effect = Exception("Estimate update error")

        # Call the method - should continue despite estimate update error
        result = worklog_mixin.add_worklog("TEST-123", "1h", original_estimate="4h")

        # Verify post was still called (worklog added despite estimate error)
        worklog_mixin.jira.post.assert_called_once()
        assert result["original_estimate_updated"] is False

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/__init__.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging
import os
import sys
from importlib.metadata import PackageNotFoundError, version

import click
from dotenv import load_dotenv

from mcp_atlassian.utils.env import is_env_truthy
from mcp_atlassian.utils.lifecycle import (
    ensure_clean_exit,
    setup_signal_handlers,
)
from mcp_atlassian.utils.logging import setup_logging

try:
    __version__ = version("mcp-atlassian")
except PackageNotFoundError:
    # package is not installed
    __version__ = "0.0.0"

# Initialize logging with appropriate level
logging_level = logging.WARNING
if is_env_truthy("MCP_VERBOSE"):
    logging_level = logging.DEBUG

# Set up logging to STDOUT if MCP_LOGGING_STDOUT is set to true
logging_stream = sys.stdout if is_env_truthy("MCP_LOGGING_STDOUT") else sys.stderr

# Set up logging using the utility function
logger = setup_logging(logging_level, logging_stream)


@click.version_option(__version__, prog_name="mcp-atlassian")
@click.command()
@click.option(
    "-v",
    "--verbose",
    count=True,
    help="Increase verbosity (can be used multiple times)",
)
@click.option(
    "--env-file", type=click.Path(exists=True, dir_okay=False), help="Path to .env file"
)
@click.option(
    "--oauth-setup",
    is_flag=True,
    help="Run OAuth 2.0 setup wizard for Atlassian Cloud",
)
@click.option(
    "--transport",
    type=click.Choice(["stdio", "sse", "streamable-http"]),
    default="stdio",
    help="Transport type (stdio, sse, or streamable-http)",
)
@click.option(
    "--port",
    default=8000,
    help="Port to listen on for SSE or Streamable HTTP transport",
)
@click.option(
    "--host",
    default="0.0.0.0",  # noqa: S104
    help="Host to bind to for SSE or Streamable HTTP transport (default: 0.0.0.0)",
)
@click.option(
    "--path",
    default="/mcp",
    help="Path for Streamable HTTP transport (e.g., /mcp).",
)
@click.option(
    "--confluence-url",
    help="Confluence URL (e.g., https://your-domain.atlassian.net/wiki)",
)
@click.option("--confluence-username", help="Confluence username/email")
@click.option("--confluence-token", help="Confluence API token")
@click.option(
    "--confluence-personal-token",
    help="Confluence Personal Access Token (for Confluence Server/Data Center)",
)
@click.option(
    "--confluence-ssl-verify/--no-confluence-ssl-verify",
    default=True,
    help="Verify SSL certificates for Confluence Server/Data Center (default: verify)",
)
@click.option(
    "--confluence-spaces-filter",
    help="Comma-separated list of Confluence space keys to filter search results",
)
@click.option(
    "--jira-url",
    help="Jira URL (e.g., https://your-domain.atlassian.net or https://jira.your-company.com)",
)
@click.option("--jira-username", help="Jira username/email (for Jira Cloud)")
@click.option("--jira-token", help="Jira API token (for Jira Cloud)")
@click.option(
    "--jira-personal-token",
    help="Jira Personal Access Token (for Jira Server/Data Center)",
)
@click.option(
    "--jira-ssl-verify/--no-jira-ssl-verify",
    default=True,
    help="Verify SSL certificates for Jira Server/Data Center (default: verify)",
)
@click.option(
    "--jira-projects-filter",
    help="Comma-separated list of Jira project keys to filter search results",
)
@click.option(
    "--read-only",
    is_flag=True,
    help="Run in read-only mode (disables all write operations)",
)
@click.option(
    "--enabled-tools",
    help="Comma-separated list of tools to enable (enables all if not specified)",
)
@click.option(
    "--oauth-client-id",
    help="OAuth 2.0 client ID for Atlassian Cloud",
)
@click.option(
    "--oauth-client-secret",
    help="OAuth 2.0 client secret for Atlassian Cloud",
)
@click.option(
    "--oauth-redirect-uri",
    help="OAuth 2.0 redirect URI for Atlassian Cloud",
)
@click.option(
    "--oauth-scope",
    help="OAuth 2.0 scopes (space-separated) for Atlassian Cloud",
)
@click.option(
    "--oauth-cloud-id",
    help="Atlassian Cloud ID for OAuth 2.0 authentication",
)
@click.option(
    "--oauth-access-token",
    help="Atlassian Cloud OAuth 2.0 access token (if you have your own you'd like to "
    "use for the session.)",
)
def main(
    verbose: int,
    env_file: str | None,
    oauth_setup: bool,
    transport: str,
    port: int,
    host: str,
    path: str | None,
    confluence_url: str | None,
    confluence_username: str | None,
    confluence_token: str | None,
    confluence_personal_token: str | None,
    confluence_ssl_verify: bool,
    confluence_spaces_filter: str | None,
    jira_url: str | None,
    jira_username: str | None,
    jira_token: str | None,
    jira_personal_token: str | None,
    jira_ssl_verify: bool,
    jira_projects_filter: str | None,
    read_only: bool,
    enabled_tools: str | None,
    oauth_client_id: str | None,
    oauth_client_secret: str | None,
    oauth_redirect_uri: str | None,
    oauth_scope: str | None,
    oauth_cloud_id: str | None,
    oauth_access_token: str | None,
) -> None:
    """MCP Atlassian Server - Jira and Confluence functionality for MCP

    Supports both Atlassian Cloud and Jira Server/Data Center deployments.
    Authentication methods supported:
    - Username and API token (Cloud)
    - Personal Access Token (Server/Data Center)
    - OAuth 2.0 (Cloud only)
    """
    # Logging level logic
    if verbose == 1:
        current_logging_level = logging.INFO
    elif verbose >= 2:  # -vv or more
        current_logging_level = logging.DEBUG
    else:
        # Default to DEBUG if MCP_VERY_VERBOSE is set, else INFO if MCP_VERBOSE is set, else WARNING
        if is_env_truthy("MCP_VERY_VERBOSE", "false"):
            current_logging_level = logging.DEBUG
        elif is_env_truthy("MCP_VERBOSE", "false"):
            current_logging_level = logging.INFO
        else:
            current_logging_level = logging.WARNING

    # Set up logging to STDOUT if MCP_LOGGING_STDOUT is set to true
    logging_stream = sys.stdout if is_env_truthy("MCP_LOGGING_STDOUT") else sys.stderr

    global logger
    logger = setup_logging(current_logging_level, logging_stream)
    logger.debug(f"Logging level set to: {logging.getLevelName(current_logging_level)}")
    logger.debug(
        f"Logging stream set to: {'stdout' if logging_stream is sys.stdout else 'stderr'}"
    )

    def was_option_provided(ctx: click.Context, param_name: str) -> bool:
        return (
            ctx.get_parameter_source(param_name)
            != click.core.ParameterSource.DEFAULT_MAP
            and ctx.get_parameter_source(param_name)
            != click.core.ParameterSource.DEFAULT
        )

    if env_file:
        logger.debug(f"Loading environment from file: {env_file}")
        load_dotenv(env_file, override=True)
    else:
        logger.debug(
            "Attempting to load environment from default .env file if it exists"
        )
        load_dotenv(override=True)

    if oauth_setup:
        logger.info("Starting OAuth 2.0 setup wizard")
        try:
            from .utils.oauth_setup import run_oauth_setup

            sys.exit(run_oauth_setup())
        except ImportError:
            logger.error("Failed to import OAuth setup module.")
            sys.exit(1)

    click_ctx = click.get_current_context(silent=True)

    # Transport precedence
    final_transport = os.getenv("TRANSPORT", "stdio").lower()
    if click_ctx and was_option_provided(click_ctx, "transport"):
        final_transport = transport
    if final_transport not in ["stdio", "sse", "streamable-http"]:
        logger.warning(
            f"Invalid transport '{final_transport}' from env/default, using 'stdio'."
        )
        final_transport = "stdio"
    logger.debug(f"Final transport determined: {final_transport}")

    # Port precedence
    final_port = 8000
    if os.getenv("PORT") and os.getenv("PORT").isdigit():
        final_port = int(os.getenv("PORT"))
    if click_ctx and was_option_provided(click_ctx, "port"):
        final_port = port
    logger.debug(f"Final port for HTTP transports: {final_port}")

    # Host precedence
    final_host = os.getenv("HOST", "0.0.0.0")  # noqa: S104
    if click_ctx and was_option_provided(click_ctx, "host"):
        final_host = host
    logger.debug(f"Final host for HTTP transports: {final_host}")

    # Path precedence
    final_path: str | None = os.getenv("STREAMABLE_HTTP_PATH", None)
    if click_ctx and was_option_provided(click_ctx, "path"):
        final_path = path
    logger.debug(
        f"Final path for Streamable HTTP: {final_path if final_path else 'FastMCP default'}"
    )

    # Set env vars for downstream config
    if click_ctx and was_option_provided(click_ctx, "enabled_tools"):
        os.environ["ENABLED_TOOLS"] = enabled_tools
    if click_ctx and was_option_provided(click_ctx, "confluence_url"):
        os.environ["CONFLUENCE_URL"] = confluence_url
    if click_ctx and was_option_provided(click_ctx, "confluence_username"):
        os.environ["CONFLUENCE_USERNAME"] = confluence_username
    if click_ctx and was_option_provided(click_ctx, "confluence_token"):
        os.environ["CONFLUENCE_API_TOKEN"] = confluence_token
    if click_ctx and was_option_provided(click_ctx, "confluence_personal_token"):
        os.environ["CONFLUENCE_PERSONAL_TOKEN"] = confluence_personal_token
    if click_ctx and was_option_provided(click_ctx, "jira_url"):
        os.environ["JIRA_URL"] = jira_url
    if click_ctx and was_option_provided(click_ctx, "jira_username"):
        os.environ["JIRA_USERNAME"] = jira_username
    if click_ctx and was_option_provided(click_ctx, "jira_token"):
        os.environ["JIRA_API_TOKEN"] = jira_token
    if click_ctx and was_option_provided(click_ctx, "jira_personal_token"):
        os.environ["JIRA_PERSONAL_TOKEN"] = jira_personal_token
    if click_ctx and was_option_provided(click_ctx, "oauth_client_id"):
        os.environ["ATLASSIAN_OAUTH_CLIENT_ID"] = oauth_client_id
    if click_ctx and was_option_provided(click_ctx, "oauth_client_secret"):
        os.environ["ATLASSIAN_OAUTH_CLIENT_SECRET"] = oauth_client_secret
    if click_ctx and was_option_provided(click_ctx, "oauth_redirect_uri"):
        os.environ["ATLASSIAN_OAUTH_REDIRECT_URI"] = oauth_redirect_uri
    if click_ctx and was_option_provided(click_ctx, "oauth_scope"):
        os.environ["ATLASSIAN_OAUTH_SCOPE"] = oauth_scope
    if click_ctx and was_option_provided(click_ctx, "oauth_cloud_id"):
        os.environ["ATLASSIAN_OAUTH_CLOUD_ID"] = oauth_cloud_id
    if click_ctx and was_option_provided(click_ctx, "oauth_access_token"):
        os.environ["ATLASSIAN_OAUTH_ACCESS_TOKEN"] = oauth_access_token
    if click_ctx and was_option_provided(click_ctx, "read_only"):
        os.environ["READ_ONLY_MODE"] = str(read_only).lower()
    if click_ctx and was_option_provided(click_ctx, "confluence_ssl_verify"):
        os.environ["CONFLUENCE_SSL_VERIFY"] = str(confluence_ssl_verify).lower()
    if click_ctx and was_option_provided(click_ctx, "confluence_spaces_filter"):
        os.environ["CONFLUENCE_SPACES_FILTER"] = confluence_spaces_filter
    if click_ctx and was_option_provided(click_ctx, "jira_ssl_verify"):
        os.environ["JIRA_SSL_VERIFY"] = str(jira_ssl_verify).lower()
    if click_ctx and was_option_provided(click_ctx, "jira_projects_filter"):
        os.environ["JIRA_PROJECTS_FILTER"] = jira_projects_filter

    from mcp_atlassian.servers import main_mcp

    run_kwargs = {
        "transport": final_transport,
    }

    if final_transport == "stdio":
        logger.info("Starting server with STDIO transport.")
    elif final_transport in ["sse", "streamable-http"]:
        run_kwargs["host"] = final_host
        run_kwargs["port"] = final_port
        run_kwargs["log_level"] = logging.getLevelName(current_logging_level).lower()

        if final_path is not None:
            run_kwargs["path"] = final_path

        log_display_path = final_path
        if log_display_path is None:
            if final_transport == "sse":
                log_display_path = main_mcp.settings.sse_path or "/sse"
            else:
                log_display_path = main_mcp.settings.streamable_http_path or "/mcp"

        logger.info(
            f"Starting server with {final_transport.upper()} transport on http://{final_host}:{final_port}{log_display_path}"
        )
    else:
        logger.error(
            f"Invalid transport type '{final_transport}' determined. Cannot start server."
        )
        sys.exit(1)

    # Set up signal handlers for graceful shutdown
    setup_signal_handlers()

    # For STDIO transport, also handle EOF detection
    if final_transport == "stdio":
        logger.debug("STDIO transport detected, setting up stdin monitoring")

    try:
        logger.debug("Starting asyncio event loop...")

        # For stdio transport, don't monitor stdin as MCP server handles it internally
        # This prevents race conditions where both try to read from the same stdin
        if final_transport == "stdio":
            asyncio.run(main_mcp.run_async(**run_kwargs))
        else:
            # For HTTP transports (SSE, streamable-http), don't use stdin monitoring
            # as it causes premature shutdown when the client closes stdin
            # The server should only rely on OS signals for shutdown
            logger.debug(
                f"Running server for {final_transport} transport without stdin monitoring"
            )
            asyncio.run(main_mcp.run_async(**run_kwargs))
    except (KeyboardInterrupt, SystemExit) as e:
        logger.info(f"Server shutdown initiated: {type(e).__name__}")
    except Exception as e:
        logger.error(f"Server encountered an error: {e}", exc_info=True)
        sys.exit(1)
    finally:
        ensure_clean_exit()


__all__ = ["main", "__version__"]

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_formatting.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira FormattingMixin."""

from unittest.mock import MagicMock, patch

import pytest

from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.formatting import FormattingMixin
from mcp_atlassian.preprocessing import JiraPreprocessor


@pytest.fixture
def formatting_mixin(jira_fetcher: JiraFetcher) -> FormattingMixin:
    """Fixture to create a FormattingMixin instance for testing."""
    # Create the mixin without calling its __init__ to avoid config dependencies
    mixin = jira_fetcher
    # Set up necessary mocks
    mixin.preprocessor = MagicMock(spec=JiraPreprocessor)
    return mixin


def test_markdown_to_jira(formatting_mixin):
    """Test markdown_to_jira method with valid input."""
    formatting_mixin.preprocessor.markdown_to_jira.return_value = "Converted text"

    result = formatting_mixin.markdown_to_jira("# Markdown text")

    assert result == "Converted text"
    formatting_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
        "# Markdown text"
    )


def test_markdown_to_jira_empty_input(formatting_mixin):
    """Test markdown_to_jira method with empty input."""
    result = formatting_mixin.markdown_to_jira("")

    assert result == ""
    formatting_mixin.preprocessor.markdown_to_jira.assert_not_called()


def test_markdown_to_jira_exception(formatting_mixin):
    """Test markdown_to_jira method with exception."""
    formatting_mixin.preprocessor.markdown_to_jira.side_effect = Exception(
        "Conversion error"
    )

    result = formatting_mixin.markdown_to_jira("# Markdown text")

    assert result == "# Markdown text"  # Should return original text on error
    formatting_mixin.preprocessor.markdown_to_jira.assert_called_once()


def test_format_issue_content_basic(formatting_mixin):
    """Test format_issue_content method with basic inputs."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
        }
    }
    description = "This is a test issue."
    comments = []
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": None, "epic_name": None}

    result = formatting_mixin.format_issue_content(
        issue_key, issue, description, comments, created_date, epic_info
    )

    # Check that the result contains all the basic information
    assert "Issue: TEST-123" in result
    assert "Title: Test issue" in result
    assert "Type: Bug" in result
    assert "Status: Open" in result
    assert "Created: 2023-01-01 12:00:00" in result
    assert "Description:" in result
    assert "This is a test issue." in result
    assert "Comments:" not in result  # No comments


def test_format_issue_content_with_epic(formatting_mixin):
    """Test format_issue_content method with epic information."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
        }
    }
    description = "This is a test issue."
    comments = []
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": "EPIC-1", "epic_name": "Test Epic"}

    result = formatting_mixin.format_issue_content(
        issue_key, issue, description, comments, created_date, epic_info
    )

    # Check that the result contains the epic information
    assert "Epic: EPIC-1 - Test Epic" in result


def test_format_issue_content_with_comments(formatting_mixin):
    """Test format_issue_content method with comments."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
        }
    }
    description = "This is a test issue."
    comments = [
        {"created": "2023-01-02", "author": "User1", "body": "Comment 1"},
        {"created": "2023-01-03", "author": "User2", "body": "Comment 2"},
    ]
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": None, "epic_name": None}

    result = formatting_mixin.format_issue_content(
        issue_key, issue, description, comments, created_date, epic_info
    )

    # Check that the result contains the comments
    assert "Comments:" in result
    assert "2023-01-02 - User1: Comment 1" in result
    assert "2023-01-03 - User2: Comment 2" in result


def test_create_issue_metadata_basic(formatting_mixin):
    """Test create_issue_metadata method with basic inputs."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
            "project": {"key": "TEST", "name": "Test Project"},
        }
    }
    comments = []
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": None, "epic_name": None}

    result = formatting_mixin.create_issue_metadata(
        issue_key, issue, comments, created_date, epic_info
    )

    # Check that the result contains all the basic metadata
    assert result["key"] == "TEST-123"
    assert result["summary"] == "Test issue"
    assert result["type"] == "Bug"
    assert result["status"] == "Open"
    assert result["created"] == "2023-01-01 12:00:00"
    assert result["source"] == "jira"
    assert result["project"] == "TEST"
    assert result["project_name"] == "Test Project"
    assert result["comment_count"] == 0
    assert "epic_key" not in result
    assert "epic_name" not in result


def test_create_issue_metadata_with_assignee_and_reporter(formatting_mixin):
    """Test create_issue_metadata method with assignee and reporter."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
            "assignee": {"displayName": "John Doe", "name": "jdoe"},
            "reporter": {"displayName": "Jane Smith", "name": "jsmith"},
            "project": {"key": "TEST", "name": "Test Project"},
        }
    }
    comments = []
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": None, "epic_name": None}

    result = formatting_mixin.create_issue_metadata(
        issue_key, issue, comments, created_date, epic_info
    )

    # Check that the result contains assignee and reporter
    assert result["assignee"] == "John Doe"
    assert result["reporter"] == "Jane Smith"


def test_create_issue_metadata_with_priority(formatting_mixin):
    """Test create_issue_metadata method with priority."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
            "priority": {"name": "High"},
            "project": {"key": "TEST", "name": "Test Project"},
        }
    }
    comments = []
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": None, "epic_name": None}

    result = formatting_mixin.create_issue_metadata(
        issue_key, issue, comments, created_date, epic_info
    )

    # Check that the result contains priority
    assert result["priority"] == "High"


def test_create_issue_metadata_with_epic(formatting_mixin):
    """Test create_issue_metadata method with epic information."""
    issue_key = "TEST-123"
    issue = {
        "fields": {
            "summary": "Test issue",
            "issuetype": {"name": "Bug"},
            "status": {"name": "Open"},
            "project": {"key": "TEST", "name": "Test Project"},
        }
    }
    comments = []
    created_date = "2023-01-01 12:00:00"
    epic_info = {"epic_key": "EPIC-1", "epic_name": "Test Epic"}

    result = formatting_mixin.create_issue_metadata(
        issue_key, issue, comments, created_date, epic_info
    )

    # Check that the result contains epic information
    assert result["epic_key"] == "EPIC-1"
    assert result["epic_name"] == "Test Epic"


def test_extract_epic_information_no_fields(formatting_mixin):
    """Test extract_epic_information method with issue having no fields."""
    issue = {}

    result = formatting_mixin.extract_epic_information(issue)

    assert result == {"epic_key": None, "epic_name": None}


def test_extract_epic_information_with_field_ids(formatting_mixin):
    """Test extract_epic_information method with field IDs available."""
    issue = {"fields": {"customfield_10001": "EPIC-1"}}

    # Mock get_field_ids_to_epic method
    field_ids = {"Epic Link": "customfield_10001", "Epic Name": "customfield_10002"}
    formatting_mixin.get_field_ids_to_epic = MagicMock(return_value=field_ids)

    # Mock get_issue method
    epic_issue = {"fields": {"customfield_10002": "Test Epic"}}
    formatting_mixin.get_issue = MagicMock(return_value=epic_issue)

    result = formatting_mixin.extract_epic_information(issue)

    assert result == {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
    formatting_mixin.get_field_ids_to_epic.assert_called_once()
    formatting_mixin.get_issue.assert_called_once_with("EPIC-1")


def test_extract_epic_information_get_issue_exception(formatting_mixin):
    """Test extract_epic_information method with get_issue exception."""
    issue = {"fields": {"customfield_10001": "EPIC-1"}}

    # Mock get_field_ids_to_epic method
    field_ids = {"Epic Link": "customfield_10001", "Epic Name": "customfield_10002"}
    formatting_mixin.get_field_ids_to_epic = MagicMock(return_value=field_ids)

    # Mock get_issue method to raise exception
    formatting_mixin.get_issue = MagicMock(side_effect=Exception("API error"))

    result = formatting_mixin.extract_epic_information(issue)

    assert result == {"epic_key": "EPIC-1", "epic_name": None}
    formatting_mixin.get_field_ids_to_epic.assert_called_once()
    formatting_mixin.get_issue.assert_called_once()


def test_sanitize_html_valid(formatting_mixin):
    """Test sanitize_html method with valid HTML."""
    html_content = "<p>This is <b>bold</b> text.</p>"

    result = formatting_mixin.sanitize_html(html_content)

    assert result == "This is bold text."


def test_sanitize_html_with_entities(formatting_mixin):
    """Test sanitize_html method with HTML entities."""
    html_content = "<p>This &amp; that</p>"

    result = formatting_mixin.sanitize_html(html_content)

    assert result == "This & that"


def test_sanitize_html_empty(formatting_mixin):
    """Test sanitize_html method with empty input."""
    result = formatting_mixin.sanitize_html("")

    assert result == ""


def test_sanitize_html_exception(formatting_mixin):
    """Test sanitize_html method with exception."""
    # Mock re.sub to raise exception
    with patch("re.sub", side_effect=Exception("Regex error")):
        result = formatting_mixin.sanitize_html("<p>Test</p>")

        assert result == "<p>Test</p>"  # Should return original on error


def test_sanitize_transition_fields_basic(formatting_mixin):
    """Test sanitize_transition_fields method with basic fields."""
    fields = {"summary": "Test issue", "description": "This is a test issue."}

    result = formatting_mixin.sanitize_transition_fields(fields)

    assert result == fields


def test_sanitize_transition_fields_with_assignee(formatting_mixin):
    """Test sanitize_transition_fields method with assignee field."""
    fields = {"summary": "Test issue", "assignee": "jdoe"}

    # Mock _get_account_id method
    formatting_mixin._get_account_id = MagicMock(return_value="account-123")

    result = formatting_mixin.sanitize_transition_fields(fields)

    assert result["summary"] == "Test issue"
    assert result["assignee"] == {"accountId": "account-123"}
    formatting_mixin._get_account_id.assert_called_once_with("jdoe")


def test_sanitize_transition_fields_with_assignee_dict(formatting_mixin):
    """Test sanitize_transition_fields method with assignee as dictionary."""
    fields = {"summary": "Test issue", "assignee": {"accountId": "account-123"}}

    # Mock _get_account_id method
    formatting_mixin._get_account_id = MagicMock()

    result = formatting_mixin.sanitize_transition_fields(fields)

    assert result["summary"] == "Test issue"
    assert result["assignee"] == {"accountId": "account-123"}
    formatting_mixin._get_account_id.assert_not_called()


def test_sanitize_transition_fields_with_reporter(formatting_mixin):
    """Test sanitize_transition_fields method with reporter field."""
    fields = {"summary": "Test issue", "reporter": "jsmith"}

    # Mock _get_account_id method
    formatting_mixin._get_account_id = MagicMock(return_value="account-456")

    result = formatting_mixin.sanitize_transition_fields(fields)

    assert result["summary"] == "Test issue"
    assert result["reporter"] == {"accountId": "account-456"}
    formatting_mixin._get_account_id.assert_called_once_with("jsmith")


def test_sanitize_transition_fields_with_none_value(formatting_mixin):
    """Test sanitize_transition_fields method with None value."""
    fields = {"summary": "Test issue", "assignee": None}

    result = formatting_mixin.sanitize_transition_fields(fields)

    assert result == {"summary": "Test issue"}


def test_add_comment_to_transition_data_with_comment(formatting_mixin):
    """Test add_comment_to_transition_data method with comment."""
    transition_data = {"transition": {"id": "10"}}
    comment = "This is a comment"

    # Mock markdown_to_jira method
    formatting_mixin.markdown_to_jira = MagicMock(return_value="Converted comment")

    result = formatting_mixin.add_comment_to_transition_data(transition_data, comment)

    assert result["transition"] == {"id": "10"}
    assert result["update"]["comment"][0]["add"]["body"] == "Converted comment"
    formatting_mixin.markdown_to_jira.assert_called_once_with("This is a comment")


def test_add_comment_to_transition_data_without_comment(formatting_mixin):
    """Test add_comment_to_transition_data method without comment."""
    transition_data = {"transition": {"id": "10"}}

    result = formatting_mixin.add_comment_to_transition_data(transition_data, None)

    assert result == transition_data  # Should return unmodified data

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/main.py:
--------------------------------------------------------------------------------

```python
"""Main FastMCP server setup for Atlassian integration."""

import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, Literal, Optional

from cachetools import TTLCache
from fastmcp import FastMCP
from fastmcp.tools import Tool as FastMCPTool
from mcp.types import Tool as MCPTool
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse

from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.environment import get_available_services
from mcp_atlassian.utils.io import is_read_only_mode
from mcp_atlassian.utils.logging import mask_sensitive
from mcp_atlassian.utils.tools import get_enabled_tools, should_include_tool

from .confluence import confluence_mcp
from .context import MainAppContext
from .jira import jira_mcp

logger = logging.getLogger("mcp-atlassian.server.main")


async def health_check(request: Request) -> JSONResponse:
    return JSONResponse({"status": "ok"})


@asynccontextmanager
async def main_lifespan(app: FastMCP[MainAppContext]) -> AsyncIterator[dict]:
    logger.info("Main Atlassian MCP server lifespan starting...")
    services = get_available_services()
    read_only = is_read_only_mode()
    enabled_tools = get_enabled_tools()

    loaded_jira_config: JiraConfig | None = None
    loaded_confluence_config: ConfluenceConfig | None = None

    if services.get("jira"):
        try:
            jira_config = JiraConfig.from_env()
            if jira_config.is_auth_configured():
                loaded_jira_config = jira_config
                logger.info(
                    "Jira configuration loaded and authentication is configured."
                )
            else:
                logger.warning(
                    "Jira URL found, but authentication is not fully configured. Jira tools will be unavailable."
                )
        except Exception as e:
            logger.error(f"Failed to load Jira configuration: {e}", exc_info=True)

    if services.get("confluence"):
        try:
            confluence_config = ConfluenceConfig.from_env()
            if confluence_config.is_auth_configured():
                loaded_confluence_config = confluence_config
                logger.info(
                    "Confluence configuration loaded and authentication is configured."
                )
            else:
                logger.warning(
                    "Confluence URL found, but authentication is not fully configured. Confluence tools will be unavailable."
                )
        except Exception as e:
            logger.error(f"Failed to load Confluence configuration: {e}", exc_info=True)

    app_context = MainAppContext(
        full_jira_config=loaded_jira_config,
        full_confluence_config=loaded_confluence_config,
        read_only=read_only,
        enabled_tools=enabled_tools,
    )
    logger.info(f"Read-only mode: {'ENABLED' if read_only else 'DISABLED'}")
    logger.info(f"Enabled tools filter: {enabled_tools or 'All tools enabled'}")

    try:
        yield {"app_lifespan_context": app_context}
    except Exception as e:
        logger.error(f"Error during lifespan: {e}", exc_info=True)
        raise
    finally:
        logger.info("Main Atlassian MCP server lifespan shutting down...")
        # Perform any necessary cleanup here
        try:
            # Close any open connections if needed
            if loaded_jira_config:
                logger.debug("Cleaning up Jira resources...")
            if loaded_confluence_config:
                logger.debug("Cleaning up Confluence resources...")
        except Exception as e:
            logger.error(f"Error during cleanup: {e}", exc_info=True)
        logger.info("Main Atlassian MCP server lifespan shutdown complete.")


class AtlassianMCP(FastMCP[MainAppContext]):
    """Custom FastMCP server class for Atlassian integration with tool filtering."""

    async def _mcp_list_tools(self) -> list[MCPTool]:
        # Filter tools based on enabled_tools, read_only mode, and service configuration from the lifespan context.
        req_context = self._mcp_server.request_context
        if req_context is None or req_context.lifespan_context is None:
            logger.warning(
                "Lifespan context not available during _main_mcp_list_tools call."
            )
            return []

        lifespan_ctx_dict = req_context.lifespan_context
        app_lifespan_state: MainAppContext | None = (
            lifespan_ctx_dict.get("app_lifespan_context")
            if isinstance(lifespan_ctx_dict, dict)
            else None
        )
        read_only = (
            getattr(app_lifespan_state, "read_only", False)
            if app_lifespan_state
            else False
        )
        enabled_tools_filter = (
            getattr(app_lifespan_state, "enabled_tools", None)
            if app_lifespan_state
            else None
        )
        logger.debug(
            f"_main_mcp_list_tools: read_only={read_only}, enabled_tools_filter={enabled_tools_filter}"
        )

        all_tools: dict[str, FastMCPTool] = await self.get_tools()
        logger.debug(
            f"Aggregated {len(all_tools)} tools before filtering: {list(all_tools.keys())}"
        )

        filtered_tools: list[MCPTool] = []
        for registered_name, tool_obj in all_tools.items():
            tool_tags = tool_obj.tags

            if not should_include_tool(registered_name, enabled_tools_filter):
                logger.debug(f"Excluding tool '{registered_name}' (not enabled)")
                continue

            if tool_obj and read_only and "write" in tool_tags:
                logger.debug(
                    f"Excluding tool '{registered_name}' due to read-only mode and 'write' tag"
                )
                continue

            # Exclude Jira/Confluence tools if config is not fully authenticated
            is_jira_tool = "jira" in tool_tags
            is_confluence_tool = "confluence" in tool_tags
            service_configured_and_available = True
            if app_lifespan_state:
                if is_jira_tool and not app_lifespan_state.full_jira_config:
                    logger.debug(
                        f"Excluding Jira tool '{registered_name}' as Jira configuration/authentication is incomplete."
                    )
                    service_configured_and_available = False
                if is_confluence_tool and not app_lifespan_state.full_confluence_config:
                    logger.debug(
                        f"Excluding Confluence tool '{registered_name}' as Confluence configuration/authentication is incomplete."
                    )
                    service_configured_and_available = False
            elif is_jira_tool or is_confluence_tool:
                logger.warning(
                    f"Excluding tool '{registered_name}' as application context is unavailable to verify service configuration."
                )
                service_configured_and_available = False

            if not service_configured_and_available:
                continue

            filtered_tools.append(tool_obj.to_mcp_tool(name=registered_name))

        logger.debug(
            f"_main_mcp_list_tools: Total tools after filtering: {len(filtered_tools)}"
        )
        return filtered_tools

    def http_app(
        self,
        path: str | None = None,
        middleware: list[Middleware] | None = None,
        transport: Literal["streamable-http", "sse"] = "streamable-http",
    ) -> "Starlette":
        user_token_mw = Middleware(UserTokenMiddleware, mcp_server_ref=self)
        final_middleware_list = [user_token_mw]
        if middleware:
            final_middleware_list.extend(middleware)
        app = super().http_app(
            path=path, middleware=final_middleware_list, transport=transport
        )
        return app


token_validation_cache: TTLCache[
    int, tuple[bool, str | None, JiraFetcher | None, ConfluenceFetcher | None]
] = TTLCache(maxsize=100, ttl=300)


class UserTokenMiddleware(BaseHTTPMiddleware):
    """Middleware to extract Atlassian user tokens/credentials from Authorization headers."""

    def __init__(
        self, app: Any, mcp_server_ref: Optional["AtlassianMCP"] = None
    ) -> None:
        super().__init__(app)
        self.mcp_server_ref = mcp_server_ref
        if not self.mcp_server_ref:
            logger.warning(
                "UserTokenMiddleware initialized without mcp_server_ref. Path matching for MCP endpoint might fail if settings are needed."
            )

    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> JSONResponse:
        logger.debug(
            f"UserTokenMiddleware.dispatch: ENTERED for request path='{request.url.path}', method='{request.method}'"
        )
        mcp_server_instance = self.mcp_server_ref
        if mcp_server_instance is None:
            logger.debug(
                "UserTokenMiddleware.dispatch: self.mcp_server_ref is None. Skipping MCP auth logic."
            )
            return await call_next(request)

        mcp_path = mcp_server_instance.settings.streamable_http_path.rstrip("/")
        request_path = request.url.path.rstrip("/")
        logger.debug(
            f"UserTokenMiddleware.dispatch: Comparing request_path='{request_path}' with mcp_path='{mcp_path}'. Request method='{request.method}'"
        )
        if request_path == mcp_path and request.method == "POST":
            auth_header = request.headers.get("Authorization")
            cloud_id_header = request.headers.get("X-Atlassian-Cloud-Id")

            token_for_log = mask_sensitive(
                auth_header.split(" ", 1)[1].strip()
                if auth_header and " " in auth_header
                else auth_header
            )
            logger.debug(
                f"UserTokenMiddleware: Path='{request.url.path}', AuthHeader='{mask_sensitive(auth_header)}', ParsedToken(masked)='{token_for_log}', CloudId='{cloud_id_header}'"
            )

            # Extract and save cloudId if provided
            if cloud_id_header and cloud_id_header.strip():
                request.state.user_atlassian_cloud_id = cloud_id_header.strip()
                logger.debug(
                    f"UserTokenMiddleware: Extracted cloudId from header: {cloud_id_header.strip()}"
                )
            else:
                request.state.user_atlassian_cloud_id = None
                logger.debug(
                    "UserTokenMiddleware: No cloudId header provided, will use global config"
                )

            # Check for mcp-session-id header for debugging
            mcp_session_id = request.headers.get("mcp-session-id")
            if mcp_session_id:
                logger.debug(
                    f"UserTokenMiddleware: MCP-Session-ID header found: {mcp_session_id}"
                )
            if auth_header and auth_header.startswith("Bearer "):
                token = auth_header.split(" ", 1)[1].strip()
                if not token:
                    return JSONResponse(
                        {"error": "Unauthorized: Empty Bearer token"},
                        status_code=401,
                    )
                logger.debug(
                    f"UserTokenMiddleware.dispatch: Bearer token extracted (masked): ...{mask_sensitive(token, 8)}"
                )
                request.state.user_atlassian_token = token
                request.state.user_atlassian_auth_type = "oauth"
                request.state.user_atlassian_email = None
                logger.debug(
                    f"UserTokenMiddleware.dispatch: Set request.state (pre-validation): "
                    f"auth_type='{getattr(request.state, 'user_atlassian_auth_type', 'N/A')}', "
                    f"token_present={bool(getattr(request.state, 'user_atlassian_token', None))}"
                )
            elif auth_header and auth_header.startswith("Token "):
                token = auth_header.split(" ", 1)[1].strip()
                if not token:
                    return JSONResponse(
                        {"error": "Unauthorized: Empty Token (PAT)"},
                        status_code=401,
                    )
                logger.debug(
                    f"UserTokenMiddleware.dispatch: PAT (Token scheme) extracted (masked): ...{mask_sensitive(token, 8)}"
                )
                request.state.user_atlassian_token = token
                request.state.user_atlassian_auth_type = "pat"
                request.state.user_atlassian_email = (
                    None  # PATs don't carry email in the token itself
                )
                logger.debug(
                    "UserTokenMiddleware.dispatch: Set request.state for PAT auth."
                )
            elif auth_header:
                logger.warning(
                    f"Unsupported Authorization type for {request.url.path}: {auth_header.split(' ', 1)[0] if ' ' in auth_header else 'UnknownType'}"
                )
                return JSONResponse(
                    {
                        "error": "Unauthorized: Only 'Bearer <OAuthToken>' or 'Token <PAT>' types are supported."
                    },
                    status_code=401,
                )
            else:
                logger.debug(
                    f"No Authorization header provided for {request.url.path}. Will proceed with global/fallback server configuration if applicable."
                )
        response = await call_next(request)
        logger.debug(
            f"UserTokenMiddleware.dispatch: EXITED for request path='{request.url.path}'"
        )
        return response


main_mcp = AtlassianMCP(name="Atlassian MCP", lifespan=main_lifespan)
main_mcp.mount("jira", jira_mcp)
main_mcp.mount("confluence", confluence_mcp)


@main_mcp.custom_route("/healthz", methods=["GET"], include_in_schema=False)
async def _health_check_route(request: Request) -> JSONResponse:
    return await health_check(request)


logger.info("Added /healthz endpoint for Kubernetes probes")

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/users.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira user operations."""

import logging
import re
from typing import TYPE_CHECKING, TypeVar

import requests
from requests.exceptions import HTTPError

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.models.jira.common import JiraUser

from .client import JiraClient

if TYPE_CHECKING:
    from mcp_atlassian.models.jira.common import JiraUser

JiraUserType = TypeVar("JiraUserType", bound="JiraUser")

logger = logging.getLogger("mcp-jira")


class UsersMixin(JiraClient):
    """Mixin for Jira user operations."""

    def get_current_user_account_id(self) -> str:
        """
        Get the account ID of the current user.

        Returns:
            str: Account ID of the current user.

        Raises:
            Exception: If unable to get the current user's account ID.
        """
        if getattr(self, "_current_user_account_id", None) is not None:
            return self._current_user_account_id

        try:
            logger.debug(
                "Calling self.jira.myself() to get current user details for account ID."
            )
            myself_data = self.jira.myself()

            if not isinstance(myself_data, dict):
                error_msg = "Failed to get user data: response was not a dictionary."
                logger.error(
                    f"{error_msg} Response type: {type(myself_data)}, Response: {str(myself_data)[:200]}"
                )
                raise Exception(error_msg)

            logger.debug(f"Received myself_data: {str(myself_data)[:500]}")

            account_id = None
            if isinstance(myself_data.get("accountId"), str):
                account_id = myself_data["accountId"]
            elif isinstance(myself_data.get("key"), str):
                logger.info(
                    "Using 'key' instead of 'accountId' for Jira Data Center/Server"
                )
                account_id = myself_data["key"]
            elif isinstance(myself_data.get("name"), str):
                logger.info(
                    "Using 'name' instead of 'accountId' for Jira Data Center/Server"
                )
                account_id = myself_data["name"]

            if account_id is None:
                error_msg = f"Could not find accountId, key, or name in user data: {str(myself_data)[:200]}"
                raise ValueError(error_msg)

            self._current_user_account_id = account_id
            return account_id
        except HTTPError as http_err:
            response_content = ""
            if http_err.response is not None:
                try:
                    response_content = http_err.response.text
                except Exception:
                    response_content = "(could not decode response content)"
            logger.error(
                f"HTTPError getting current user account ID: {http_err}. Response: {response_content[:500]}"
            )
            error_msg = f"Unable to get current user account ID: {http_err}"
            raise Exception(error_msg) from http_err
        except Exception as e:
            logger.error(f"Error getting current user account ID: {e}", exc_info=True)
            error_msg = f"Unable to get current user account ID: {e}"
            raise Exception(error_msg) from e

    def _get_account_id(self, assignee: str) -> str:
        """
        Get the account ID for a username or account ID.

        Args:
            assignee (str): Username or account ID.

        Returns:
            str: Account ID.

        Raises:
            ValueError: If the account ID could not be found.
        """
        # If it looks like an account ID already, return it
        if assignee.startswith("5") and len(assignee) >= 10:
            return assignee

        account_id = self._lookup_user_directly(assignee)
        if account_id:
            return account_id

        account_id = self._lookup_user_by_permissions(assignee)
        if account_id:
            return account_id

        error_msg = f"Could not find account ID for user: {assignee}"
        raise ValueError(error_msg)

    def _lookup_user_directly(self, username: str) -> str | None:
        """
        Look up a user account ID directly.

        Args:
            username (str): Username to look up.

        Returns:
            Optional[str]: Account ID if found, None otherwise.
        """
        try:
            params = {}
            if self.config.is_cloud:
                params["query"] = username
            else:
                params["username"] = username

            response = self.jira.user_find_by_user_string(**params, start=0, limit=1)
            if not isinstance(response, list):
                msg = f"Unexpected return value type from `jira.user_find_by_user_string`: {type(response)}"
                logger.error(msg)
                return None

            for user in response:
                if (
                    user.get("displayName", "").lower() == username.lower()
                    or user.get("name", "").lower() == username.lower()
                    or user.get("emailAddress", "").lower() == username.lower()
                ):
                    if self.config.is_cloud:
                        if "accountId" in user:
                            return user["accountId"]
                    else:
                        if "name" in user:
                            logger.info(
                                "Using 'name' for assignee field in Jira Data Center/Server"
                            )
                            return user["name"]
                        elif "key" in user:
                            logger.info(
                                "Using 'key' as fallback for assignee name in Jira Data Center/Server"
                            )
                            return user["key"]
            return None
        except Exception as e:
            logger.info(f"Error looking up user directly: {str(e)}")
            return None

    def _lookup_user_by_permissions(self, username: str) -> str | None:
        """
        Look up a user account ID by permissions.

        Args:
            username (str): Username to look up.

        Returns:
            Optional[str]: Account ID if found, None otherwise.
        """
        try:
            url = f"{self.config.url}/rest/api/2/user/permission/search"
            params = {"query": username, "permissions": "BROWSE"}

            auth = None
            headers = {}
            if self.config.auth_type == "pat":
                headers["Authorization"] = f"Bearer {self.config.personal_token}"
            else:
                auth = (self.config.username or "", self.config.api_token or "")

            response = requests.get(
                url,
                params=params,
                auth=auth,
                headers=headers,
                verify=self.config.ssl_verify,
            )

            if response.status_code == 200:
                data = response.json()
                for user in data.get("users", []):
                    if self.config.is_cloud:
                        if "accountId" in user:
                            return user["accountId"]
                    else:
                        if "name" in user:
                            logger.info(
                                "Using 'name' for assignee field in Jira Data Center/Server"
                            )
                            return user["name"]
                        elif "key" in user:
                            logger.info(
                                "Using 'key' as fallback for assignee name in Jira Data Center/Server"
                            )
                            return user["key"]
            return None
        except Exception as e:
            logger.info(f"Error looking up user by permissions: {str(e)}")
            return None

    def _determine_user_api_params(self, identifier: str) -> dict[str, str]:
        """
        Determines the correct API parameter and value for the jira.user() call based on the identifier and instance type.

        Args:
            identifier (str): User identifier (accountId, username, key, or email).

        Returns:
            Dict[str, str]: A dictionary containing the single keyword argument for self.jira.user().

        Raises:
            ValueError: If a usable parameter cannot be determined.
        """
        api_kwargs: dict[str, str] = {}

        # Cloud: identifier is accountId
        if self.config.is_cloud and (
            re.match(r"^[0-9a-f]{24}$", identifier) or re.match(r"^\d+:\w+", identifier)
        ):
            api_kwargs["account_id"] = identifier
            logger.debug(f"Determined param: account_id='{identifier}' (Cloud)")
        # Server/DC: username, key, or email
        elif not self.config.is_cloud:
            if "@" in identifier:
                api_kwargs["username"] = identifier
                logger.debug(
                    f"Determined param: username='{identifier}' (Server/DC email - might not work)"
                )
            elif "-" in identifier and any(c.isdigit() for c in identifier):
                api_kwargs["key"] = identifier
                logger.debug(f"Determined param: key='{identifier}' (Server/DC)")
            else:
                api_kwargs["username"] = identifier
                logger.debug(f"Determined param: username='{identifier}' (Server/DC)")
        # Cloud: identifier is email
        elif self.config.is_cloud and "@" in identifier:
            try:
                resolved_id = self._lookup_user_directly(identifier)
                if resolved_id and (
                    re.match(r"^[0-9a-f]{24}$", resolved_id)
                    or re.match(r"^\d+:\w+", resolved_id)
                ):
                    api_kwargs["account_id"] = resolved_id
                    logger.debug(
                        f"Resolved email '{identifier}' to accountId '{resolved_id}'. Determined param: account_id (Cloud)"
                    )
                else:
                    raise ValueError(
                        f"Could not resolve email '{identifier}' to a valid account ID for Jira Cloud."
                    )
            except Exception as e:
                logger.warning(f"Failed to resolve email '{identifier}': {e}")
                raise ValueError(
                    f"Could not resolve email '{identifier}' to a valid account ID for Jira Cloud."
                ) from e
        # Cloud: identifier is not accountId or email, try to resolve
        else:
            logger.debug(
                f"Identifier '{identifier}' on Cloud is not an account ID or email. Attempting resolution."
            )
            try:
                account_id_resolved = self._get_account_id(identifier)
                api_kwargs["account_id"] = account_id_resolved
                logger.debug(
                    f"Resolved identifier '{identifier}' to accountId '{account_id_resolved}'. Determined param: account_id (Cloud)"
                )
            except ValueError as e:
                logger.error(
                    f"Could not resolve identifier '{identifier}' to a usable format (accountId/username/key)."
                )
                raise ValueError(
                    f"Could not determine how to look up user '{identifier}'."
                ) from e

        if not api_kwargs:
            logger.error(
                f"Logic failed to determine API parameters for identifier '{identifier}'"
            )
            raise ValueError(
                f"Could not determine the correct parameter to use for identifier '{identifier}'."
            )

        return api_kwargs

    def get_user_profile_by_identifier(self, identifier: str) -> "JiraUser":
        """
        Retrieve Jira user profile information by identifier.

        Args:
            identifier (str): User identifier (accountId, username, key, or email).

        Returns:
            JiraUser: JiraUser model with profile information.

        Raises:
            ValueError: If the user cannot be found or identifier cannot be resolved.
            MCPAtlassianAuthenticationError: If authentication fails.
            Exception: For other API errors.
        """
        api_kwargs = self._determine_user_api_params(identifier)

        try:
            logger.debug(f"Calling self.jira.user() with parameters: {api_kwargs}")
            user_data = self.jira.user(**api_kwargs)
            if not isinstance(user_data, dict):
                logger.error(
                    f"User lookup for '{identifier}' returned unexpected type: {type(user_data)}. Data: {user_data}"
                )
                raise ValueError(f"User '{identifier}' not found or lookup failed.")
            return JiraUser.from_api_response(user_data)
        except HTTPError as http_err:
            if http_err.response is not None:
                response_text = http_err.response.text[:200]
                status_code = http_err.response.status_code
                if status_code == 404:
                    raise ValueError(f"User '{identifier}' not found.") from http_err
                elif status_code in [401, 403]:
                    logger.error(
                        f"Authentication/Permission error for '{identifier}': {status_code}"
                    )
                    raise MCPAtlassianAuthenticationError(
                        f"Permission denied accessing user '{identifier}'."
                    ) from http_err
                else:
                    logger.error(
                        f"HTTP error {status_code} for '{identifier}': {http_err}. Response: {response_text}"
                    )
                    raise Exception(
                        f"API error getting user profile for '{identifier}': {http_err}"
                    ) from http_err
            else:
                logger.error(
                    f"Network or unknown HTTP error (no response object) for '{identifier}': {http_err}"
                )
                raise Exception(
                    f"Network error getting user profile for '{identifier}': {http_err}"
                ) from http_err
        except Exception as e:
            logger.exception(
                f"Unexpected error getting/processing user profile for '{identifier}':"
            )
            raise Exception(
                f"Error processing user profile for '{identifier}': {str(e)}"
            ) from e

```

--------------------------------------------------------------------------------
/tests/unit/utils/test_oauth_setup.py:
--------------------------------------------------------------------------------

```python
"""Tests for the OAuth setup utilities."""

import json
from unittest.mock import MagicMock, patch
from urllib.parse import parse_qs, urlparse

import pytest

from mcp_atlassian.utils.oauth_setup import (
    OAuthSetupArgs,
    parse_redirect_uri,
    run_oauth_flow,
    run_oauth_setup,
)
from tests.utils.assertions import assert_config_contains
from tests.utils.base import BaseAuthTest
from tests.utils.mocks import MockEnvironment, MockOAuthServer


class TestCallbackHandlerLogic:
    """Tests for URL parsing logic."""

    @pytest.mark.parametrize(
        "path,expected_params",
        [
            (
                "/callback?code=test-auth-code&state=test-state",
                {"code": ["test-auth-code"], "state": ["test-state"]},
            ),
            (
                "/callback?error=access_denied&error_description=User+denied+access",
                {"error": ["access_denied"]},
            ),
            ("/callback?state=test-state", {"state": ["test-state"]}),
            ("/callback", {}),
        ],
    )
    def test_url_parsing(self, path, expected_params):
        """Test URL parsing for various callback scenarios."""
        query = urlparse(path).query
        params = parse_qs(query)

        for key, expected_values in expected_params.items():
            assert key in params
            assert params[key] == expected_values


class TestRedirectUriParsing:
    """Tests for redirect URI parsing functionality."""

    @pytest.mark.parametrize(
        "redirect_uri,expected_hostname,expected_port",
        [
            ("http://localhost:8080/callback", "localhost", 8080),
            ("https://example.com:9443/callback", "example.com", 9443),
            ("http://localhost/callback", "localhost", 80),
            ("https://example.com/callback", "example.com", 443),
            ("http://127.0.0.1:3000/callback", "127.0.0.1", 3000),
            ("https://secure.domain.com:8443/auth", "secure.domain.com", 8443),
        ],
    )
    def test_parse_redirect_uri(self, redirect_uri, expected_hostname, expected_port):
        """Test redirect URI parsing for various formats."""
        hostname, port = parse_redirect_uri(redirect_uri)
        assert hostname == expected_hostname
        assert port == expected_port


class TestOAuthFlow:
    """Tests for OAuth flow orchestration."""

    @pytest.fixture(autouse=True)
    def reset_oauth_state(self):
        """Reset OAuth global state before each test."""
        import mcp_atlassian.utils.oauth_setup as oauth_module

        oauth_module.authorization_code = None
        oauth_module.authorization_state = None
        oauth_module.callback_received = False
        oauth_module.callback_error = None

    def test_run_oauth_flow_success_localhost(self):
        """Test successful OAuth flow with localhost redirect."""
        with MockOAuthServer.mock_oauth_flow() as mocks:
            with (
                patch(
                    "mcp_atlassian.utils.oauth_setup.OAuthConfig"
                ) as mock_oauth_config,
                patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
                patch(
                    "mcp_atlassian.utils.oauth_setup.start_callback_server"
                ) as mock_start_server,
            ):
                # Setup global state after callback
                def setup_callback_state():
                    import mcp_atlassian.utils.oauth_setup as oauth_module

                    oauth_module.authorization_code = "test-auth-code"
                    oauth_module.authorization_state = "test-state-token"
                    return True

                mock_wait.side_effect = setup_callback_state
                mock_httpd = MagicMock()
                mock_start_server.return_value = mock_httpd

                # Setup OAuth config mock
                mock_config = MagicMock()
                mock_config.exchange_code_for_tokens.return_value = True
                mock_config.client_id = "test-client-id"
                mock_config.client_secret = "test-client-secret"
                mock_config.redirect_uri = "http://localhost:8080/callback"
                mock_config.scope = "read:jira-work"
                mock_config.cloud_id = "test-cloud-id"
                mock_config.access_token = "test-access-token"
                mock_config.refresh_token = "test-refresh-token"
                mock_oauth_config.return_value = mock_config

                args = OAuthSetupArgs(
                    client_id="test-client-id",
                    client_secret="test-client-secret",
                    redirect_uri="http://localhost:8080/callback",
                    scope="read:jira-work",
                )

                result = run_oauth_flow(args)

                assert result is True
                mock_start_server.assert_called_once_with(8080)
                mocks["browser"].assert_called_once()
                mock_config.exchange_code_for_tokens.assert_called_once_with(
                    "test-auth-code"
                )
                mock_httpd.shutdown.assert_called_once()

    def test_run_oauth_flow_success_external_redirect(self):
        """Test successful OAuth flow with external redirect URI."""
        with MockOAuthServer.mock_oauth_flow() as mocks:
            with (
                patch(
                    "mcp_atlassian.utils.oauth_setup.OAuthConfig"
                ) as mock_oauth_config,
                patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
                patch(
                    "mcp_atlassian.utils.oauth_setup.start_callback_server"
                ) as mock_start_server,
            ):
                # Setup callback state
                def setup_callback_state():
                    import mcp_atlassian.utils.oauth_setup as oauth_module

                    oauth_module.authorization_code = "test-auth-code"
                    oauth_module.authorization_state = "test-state-token"
                    return True

                mock_wait.side_effect = setup_callback_state

                mock_config = MagicMock()
                mock_config.exchange_code_for_tokens.return_value = True
                mock_config.client_id = "test-client-id"
                mock_config.client_secret = "test-client-secret"
                mock_config.redirect_uri = "https://example.com/callback"
                mock_config.scope = "read:jira-work"
                mock_config.cloud_id = "test-cloud-id"
                mock_config.access_token = "test-access-token"
                mock_config.refresh_token = "test-refresh-token"
                mock_oauth_config.return_value = mock_config

                args = OAuthSetupArgs(
                    client_id="test-client-id",
                    client_secret="test-client-secret",
                    redirect_uri="https://example.com/callback",
                    scope="read:jira-work",
                )

                result = run_oauth_flow(args)

                assert result is True
                mock_start_server.assert_not_called()  # No local server for external redirect
                mocks["browser"].assert_called_once()
                mock_config.exchange_code_for_tokens.assert_called_once_with(
                    "test-auth-code"
                )

    def test_run_oauth_flow_server_start_failure(self):
        """Test OAuth flow when server fails to start."""
        with MockOAuthServer.mock_oauth_flow() as mocks:
            with patch(
                "mcp_atlassian.utils.oauth_setup.start_callback_server"
            ) as mock_start_server:
                mock_start_server.side_effect = OSError("Port already in use")

                args = OAuthSetupArgs(
                    client_id="test-client-id",
                    client_secret="test-client-secret",
                    redirect_uri="http://localhost:8080/callback",
                    scope="read:jira-work",
                )

                result = run_oauth_flow(args)
                assert result is False
                mocks["browser"].assert_not_called()

    @pytest.mark.parametrize(
        "failure_condition,expected_result",
        [
            ("timeout", False),
            ("state_mismatch", False),
            ("token_exchange_failure", False),
        ],
    )
    def test_run_oauth_flow_failures(self, failure_condition, expected_result):
        """Test OAuth flow failure scenarios."""
        with MockOAuthServer.mock_oauth_flow() as mocks:
            with (
                patch(
                    "mcp_atlassian.utils.oauth_setup.OAuthConfig"
                ) as mock_oauth_config,
                patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
                patch(
                    "mcp_atlassian.utils.oauth_setup.start_callback_server"
                ) as mock_start_server,
            ):
                mock_httpd = MagicMock()
                mock_start_server.return_value = mock_httpd
                mock_config = MagicMock()
                mock_oauth_config.return_value = mock_config

                if failure_condition == "timeout":
                    mock_wait.return_value = False
                elif failure_condition == "state_mismatch":

                    def setup_mismatched_state():
                        import mcp_atlassian.utils.oauth_setup as oauth_module

                        oauth_module.authorization_code = "test-auth-code"
                        oauth_module.authorization_state = "wrong-state"
                        return True

                    mock_wait.side_effect = setup_mismatched_state
                elif failure_condition == "token_exchange_failure":

                    def setup_callback_state():
                        import mcp_atlassian.utils.oauth_setup as oauth_module

                        oauth_module.authorization_code = "test-auth-code"
                        oauth_module.authorization_state = "test-state-token"
                        return True

                    mock_wait.side_effect = setup_callback_state
                    mock_config.exchange_code_for_tokens.return_value = False

                args = OAuthSetupArgs(
                    client_id="test-client-id",
                    client_secret="test-client-secret",
                    redirect_uri="http://localhost:8080/callback",
                    scope="read:jira-work",
                )

                result = run_oauth_flow(args)
                assert result == expected_result
                mock_httpd.shutdown.assert_called_once()


class TestInteractiveSetup(BaseAuthTest):
    """Tests for the interactive OAuth setup wizard."""

    def test_run_oauth_setup_with_env_vars(self):
        """Test interactive setup using environment variables."""
        with MockEnvironment.oauth_env() as env_vars:
            with (
                patch("builtins.input", side_effect=["", "", "", ""]),
                patch(
                    "mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=True
                ) as mock_flow,
            ):
                result = run_oauth_setup()

                assert result == 0
                mock_flow.assert_called_once()
                args = mock_flow.call_args[0][0]
                assert_config_contains(
                    vars(args),
                    client_id=env_vars["ATLASSIAN_OAUTH_CLIENT_ID"],
                    client_secret=env_vars["ATLASSIAN_OAUTH_CLIENT_SECRET"],
                )

    @pytest.mark.parametrize(
        "input_values,expected_result",
        [
            (
                [
                    "user-client-id",
                    "user-secret",
                    "http://localhost:9000/callback",
                    "read:jira-work",
                ],
                0,
            ),
            (["", "client-secret", "", ""], 1),  # Missing client ID
            (["client-id", "", "", ""], 1),  # Missing client secret
        ],
    )
    def test_run_oauth_setup_user_input(self, input_values, expected_result):
        """Test interactive setup with various user inputs."""
        with MockEnvironment.clean_env():
            with (
                patch("builtins.input", side_effect=input_values),
                patch(
                    "mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=True
                ) as mock_flow,
            ):
                result = run_oauth_setup()
                assert result == expected_result

                if expected_result == 0:
                    mock_flow.assert_called_once()
                else:
                    mock_flow.assert_not_called()

    def test_run_oauth_setup_flow_failure(self):
        """Test interactive setup when OAuth flow fails."""
        with MockEnvironment.clean_env():
            with (
                patch(
                    "builtins.input", side_effect=["client-id", "client-secret", "", ""]
                ),
                patch(
                    "mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=False
                ),
            ):
                result = run_oauth_setup()
                assert result == 1


class TestOAuthSetupArgs:
    """Tests for the OAuthSetupArgs dataclass."""

    def test_oauth_setup_args_creation(self):
        """Test OAuthSetupArgs dataclass creation."""
        args = OAuthSetupArgs(
            client_id="test-id",
            client_secret="test-secret",
            redirect_uri="http://localhost:8080/callback",
            scope="read:jira-work",
        )

        expected_config = {
            "client_id": "test-id",
            "client_secret": "test-secret",
            "redirect_uri": "http://localhost:8080/callback",
            "scope": "read:jira-work",
        }
        assert_config_contains(vars(args), **expected_config)


class TestConfigurationGeneration:
    """Tests for configuration output functionality."""

    def test_configuration_serialization(self):
        """Test JSON configuration serialization."""
        test_config = {
            "client_id": "test-id",
            "client_secret": "test-secret",
            "redirect_uri": "http://localhost:8080/callback",
            "scope": "read:jira-work",
            "cloud_id": "test-cloud-id",
        }

        json_str = json.dumps(test_config, indent=4)
        assert "test-id" in json_str
        assert "test-cloud-id" in json_str

        # Verify it can be parsed back
        parsed = json.loads(json_str)
        assert_config_contains(parsed, **test_config)

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/v2_adapter.py:
--------------------------------------------------------------------------------

```python
"""Confluence REST API v2 adapter for OAuth authentication.

This module provides direct v2 API calls to handle the deprecated v1 endpoints
when using OAuth authentication. The v1 endpoints have been removed for OAuth
but still work for API token authentication.
"""

import logging
from typing import Any

import requests
from requests.exceptions import HTTPError

logger = logging.getLogger("mcp-atlassian")


class ConfluenceV2Adapter:
    """Adapter for Confluence REST API v2 operations when using OAuth."""

    def __init__(self, session: requests.Session, base_url: str) -> None:
        """Initialize the v2 adapter.

        Args:
            session: Authenticated requests session (OAuth configured)
            base_url: Base URL for the Confluence instance
        """
        self.session = session
        self.base_url = base_url

    def _get_space_id(self, space_key: str) -> str:
        """Get space ID from space key using v2 API.

        Args:
            space_key: The space key to look up

        Returns:
            The space ID

        Raises:
            ValueError: If space not found or API error
        """
        try:
            # Use v2 spaces endpoint to get space ID
            url = f"{self.base_url}/api/v2/spaces"
            params = {"keys": space_key}

            response = self.session.get(url, params=params)
            response.raise_for_status()

            data = response.json()
            results = data.get("results", [])

            if not results:
                raise ValueError(f"Space with key '{space_key}' not found")

            space_id = results[0].get("id")
            if not space_id:
                raise ValueError(f"No ID found for space '{space_key}'")

            return space_id

        except HTTPError as e:
            logger.error(f"HTTP error getting space ID for '{space_key}': {e}")
            raise ValueError(f"Failed to get space ID for '{space_key}': {e}") from e
        except Exception as e:
            logger.error(f"Error getting space ID for '{space_key}': {e}")
            raise ValueError(f"Failed to get space ID for '{space_key}': {e}") from e

    def create_page(
        self,
        space_key: str,
        title: str,
        body: str,
        parent_id: str | None = None,
        representation: str = "storage",
        status: str = "current",
    ) -> dict[str, Any]:
        """Create a page using the v2 API.

        Args:
            space_key: The key of the space to create the page in
            title: The title of the page
            body: The content body in the specified representation
            parent_id: Optional parent page ID
            representation: Content representation format (default: "storage")
            status: Page status (default: "current")

        Returns:
            The created page data from the API response

        Raises:
            ValueError: If page creation fails
        """
        try:
            # Get space ID from space key
            space_id = self._get_space_id(space_key)

            # Prepare request data for v2 API
            data = {
                "spaceId": space_id,
                "status": status,
                "title": title,
                "body": {
                    "representation": representation,
                    "value": body,
                },
            }

            # Add parent if specified
            if parent_id:
                data["parentId"] = parent_id

            # Make the v2 API call
            url = f"{self.base_url}/api/v2/pages"
            response = self.session.post(url, json=data)
            response.raise_for_status()

            result = response.json()
            logger.debug(f"Successfully created page '{title}' with v2 API")

            # Convert v2 response to v1-compatible format for consistency
            return self._convert_v2_to_v1_format(result, space_key)

        except HTTPError as e:
            logger.error(f"HTTP error creating page '{title}': {e}")
            if e.response is not None:
                logger.error(f"Response content: {e.response.text}")
            raise ValueError(f"Failed to create page '{title}': {e}") from e
        except Exception as e:
            logger.error(f"Error creating page '{title}': {e}")
            raise ValueError(f"Failed to create page '{title}': {e}") from e

    def _get_page_version(self, page_id: str) -> int:
        """Get the current version number of a page.

        Args:
            page_id: The ID of the page

        Returns:
            The current version number

        Raises:
            ValueError: If page not found or API error
        """
        try:
            url = f"{self.base_url}/api/v2/pages/{page_id}"
            params = {"body-format": "storage"}

            response = self.session.get(url, params=params)
            response.raise_for_status()

            data = response.json()
            version_number = data.get("version", {}).get("number")

            if version_number is None:
                raise ValueError(f"No version number found for page '{page_id}'")

            return version_number

        except HTTPError as e:
            logger.error(f"HTTP error getting page version for '{page_id}': {e}")
            raise ValueError(f"Failed to get page version for '{page_id}': {e}") from e
        except Exception as e:
            logger.error(f"Error getting page version for '{page_id}': {e}")
            raise ValueError(f"Failed to get page version for '{page_id}': {e}") from e

    def update_page(
        self,
        page_id: str,
        title: str,
        body: str,
        representation: str = "storage",
        version_comment: str = "",
        status: str = "current",
    ) -> dict[str, Any]:
        """Update a page using the v2 API.

        Args:
            page_id: The ID of the page to update
            title: The new title of the page
            body: The new content body in the specified representation
            representation: Content representation format (default: "storage")
            version_comment: Optional comment for this version
            status: Page status (default: "current")

        Returns:
            The updated page data from the API response

        Raises:
            ValueError: If page update fails
        """
        try:
            # Get current version and increment it
            current_version = self._get_page_version(page_id)
            new_version = current_version + 1

            # Prepare request data for v2 API
            data = {
                "id": page_id,
                "status": status,
                "title": title,
                "body": {
                    "representation": representation,
                    "value": body,
                },
                "version": {
                    "number": new_version,
                },
            }

            # Add version comment if provided
            if version_comment:
                data["version"]["message"] = version_comment

            # Make the v2 API call
            url = f"{self.base_url}/api/v2/pages/{page_id}"
            response = self.session.put(url, json=data)
            response.raise_for_status()

            result = response.json()
            logger.debug(f"Successfully updated page '{title}' with v2 API")

            # Convert v2 response to v1-compatible format for consistency
            # For update, we need to extract space key from the result
            space_id = result.get("spaceId")
            space_key = self._get_space_key_from_id(space_id) if space_id else "unknown"

            return self._convert_v2_to_v1_format(result, space_key)

        except HTTPError as e:
            logger.error(f"HTTP error updating page '{page_id}': {e}")
            if e.response is not None:
                logger.error(f"Response content: {e.response.text}")
            raise ValueError(f"Failed to update page '{page_id}': {e}") from e
        except Exception as e:
            logger.error(f"Error updating page '{page_id}': {e}")
            raise ValueError(f"Failed to update page '{page_id}': {e}") from e

    def _get_space_key_from_id(self, space_id: str) -> str:
        """Get space key from space ID using v2 API.

        Args:
            space_id: The space ID to look up

        Returns:
            The space key

        Raises:
            ValueError: If space not found or API error
        """
        try:
            # Use v2 spaces endpoint to get space key
            url = f"{self.base_url}/api/v2/spaces/{space_id}"

            response = self.session.get(url)
            response.raise_for_status()

            data = response.json()
            space_key = data.get("key")

            if not space_key:
                raise ValueError(f"No key found for space ID '{space_id}'")

            return space_key

        except HTTPError as e:
            logger.error(f"HTTP error getting space key for ID '{space_id}': {e}")
            # Return the space_id as fallback
            return space_id
        except Exception as e:
            logger.error(f"Error getting space key for ID '{space_id}': {e}")
            # Return the space_id as fallback
            return space_id

    def get_page(
        self,
        page_id: str,
        expand: str | None = None,
    ) -> dict[str, Any]:
        """Get a page using the v2 API.

        Args:
            page_id: The ID of the page to retrieve
            expand: Fields to expand in the response (not used in v2 API, for compatibility only)

        Returns:
            The page data from the API response in v1-compatible format

        Raises:
            ValueError: If page retrieval fails
        """
        try:
            # Make the v2 API call to get the page
            url = f"{self.base_url}/api/v2/pages/{page_id}"

            # Convert v1 expand parameters to v2 format
            params = {"body-format": "storage"}

            response = self.session.get(url, params=params)
            response.raise_for_status()

            v2_response = response.json()
            logger.debug(f"Successfully retrieved page '{page_id}' with v2 API")

            # Get space key from space ID
            space_id = v2_response.get("spaceId")
            space_key = self._get_space_key_from_id(space_id) if space_id else "unknown"

            # Convert v2 response to v1-compatible format
            v1_compatible = self._convert_v2_to_v1_format(v2_response, space_key)

            # Add body.storage structure if body content exists
            if "body" in v2_response and v2_response["body"].get("storage"):
                storage_value = v2_response["body"]["storage"].get("value", "")
                v1_compatible["body"] = {
                    "storage": {"value": storage_value, "representation": "storage"}
                }

            # Add space information with more details
            if space_id:
                v1_compatible["space"] = {
                    "key": space_key,
                    "id": space_id,
                }

            # Add version information
            if "version" in v2_response:
                v1_compatible["version"] = {
                    "number": v2_response["version"].get("number", 1)
                }

            return v1_compatible

        except HTTPError as e:
            logger.error(f"HTTP error getting page '{page_id}': {e}")
            if e.response is not None:
                logger.error(f"Response content: {e.response.text}")
            raise ValueError(f"Failed to get page '{page_id}': {e}") from e
        except Exception as e:
            logger.error(f"Error getting page '{page_id}': {e}")
            raise ValueError(f"Failed to get page '{page_id}': {e}") from e

    def delete_page(self, page_id: str) -> bool:
        """Delete a page using the v2 API.

        Args:
            page_id: The ID of the page to delete

        Returns:
            True if the page was successfully deleted, False otherwise

        Raises:
            ValueError: If page deletion fails
        """
        try:
            # Make the v2 API call to delete the page
            url = f"{self.base_url}/api/v2/pages/{page_id}"
            response = self.session.delete(url)
            response.raise_for_status()

            logger.debug(f"Successfully deleted page '{page_id}' with v2 API")

            # Check if status code indicates success (204 No Content is typical for deletes)
            if response.status_code in [200, 204]:
                return True

            # If we get here, it's an unexpected success status
            logger.warning(
                f"Delete page returned unexpected status {response.status_code}"
            )
            return True

        except HTTPError as e:
            logger.error(f"HTTP error deleting page '{page_id}': {e}")
            if e.response is not None:
                logger.error(f"Response content: {e.response.text}")
            raise ValueError(f"Failed to delete page '{page_id}': {e}") from e
        except Exception as e:
            logger.error(f"Error deleting page '{page_id}': {e}")
            raise ValueError(f"Failed to delete page '{page_id}': {e}") from e

    def _convert_v2_to_v1_format(
        self, v2_response: dict[str, Any], space_key: str
    ) -> dict[str, Any]:
        """Convert v2 API response to v1-compatible format.

        This ensures compatibility with existing code that expects v1 response format.

        Args:
            v2_response: The response from v2 API
            space_key: The space key (needed since v2 response uses space ID)

        Returns:
            Response formatted like v1 API for compatibility
        """
        # Map v2 response fields to v1 format
        v1_compatible = {
            "id": v2_response.get("id"),
            "type": "page",
            "status": v2_response.get("status"),
            "title": v2_response.get("title"),
            "space": {
                "key": space_key,
                "id": v2_response.get("spaceId"),
            },
            "version": {
                "number": v2_response.get("version", {}).get("number", 1),
            },
            "_links": v2_response.get("_links", {}),
        }

        # Add body if present in v2 response
        if "body" in v2_response:
            v1_compatible["body"] = {
                "storage": {
                    "value": v2_response["body"].get("storage", {}).get("value", ""),
                    "representation": "storage",
                }
            }

        return v1_compatible

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_transitions.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Jira Transitions mixin."""

from unittest.mock import MagicMock

import pytest

from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.transitions import TransitionsMixin
from mcp_atlassian.models.jira import (
    JiraIssue,
    JiraStatus,
    JiraStatusCategory,
    JiraTransition,
)


class TestTransitionsMixin:
    """Tests for the TransitionsMixin class."""

    @pytest.fixture
    def transitions_mixin(self, jira_fetcher: JiraFetcher) -> TransitionsMixin:
        """Create a TransitionsMixin instance with mocked dependencies."""
        mixin = jira_fetcher

        # Create a get_issue method to allow returning JiraIssue
        mixin.get_issue = MagicMock(
            return_value=JiraIssue(
                id="12345",
                key="TEST-123",
                summary="Test Issue",
                description="Issue content",
                status=JiraStatus(
                    id="1",
                    name="Open",
                    category=JiraStatusCategory(
                        id=1, key="open", name="To Do", color_name="blue-gray"
                    ),
                ),
            )
        )

        # Set up mock for get_transitions_models
        mock_transitions = [
            JiraTransition(
                id="10",
                name="Start Progress",
                to_status=JiraStatus(id="2", name="In Progress"),
            )
        ]
        mixin.get_transitions_models = MagicMock(return_value=mock_transitions)

        return mixin

    def test_get_available_transitions_list_format(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test get_available_transitions with list format response."""
        # Setup mock response - list format
        mock_transitions = [
            {"id": "10", "name": "In Progress", "to_status": "In Progress"},
            {"id": "11", "name": "Done", "status": "Done"},
        ]
        transitions_mixin.jira.get_issue_transitions.return_value = mock_transitions

        # Call the method
        result = transitions_mixin.get_available_transitions("TEST-123")

        # Verify
        assert len(result) == 2
        assert result[0]["id"] == "10"
        assert result[0]["name"] == "In Progress"
        assert result[0]["to_status"] == "In Progress"
        assert result[1]["id"] == "11"
        assert result[1]["name"] == "Done"
        assert result[1]["to_status"] == "Done"

    def test_get_available_transitions_empty_response(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test get_available_transitions with empty response."""
        # Setup mock response - empty
        transitions_mixin.jira.get_issue_transitions.return_value = {}

        # Call the method
        result = transitions_mixin.get_available_transitions("TEST-123")

        # Verify
        assert isinstance(result, list)
        assert len(result) == 0

    def test_get_available_transitions_invalid_format(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test get_available_transitions with invalid format response."""
        # Setup mock response - invalid format
        transitions_mixin.jira.get_issue_transitions.return_value = "invalid"

        # Call the method
        result = transitions_mixin.get_available_transitions("TEST-123")

        # Verify
        assert isinstance(result, list)
        assert len(result) == 0

    def test_get_available_transitions_with_error(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test get_available_transitions error handling."""
        # Setup mock to raise exception
        transitions_mixin.jira.get_issue_transitions.side_effect = Exception(
            "Transition fetch error"
        )

        # Call the method and verify exception
        with pytest.raises(
            Exception, match="Error getting transitions: Transition fetch error"
        ):
            transitions_mixin.get_available_transitions("TEST-123")

    def test_transition_issue_basic(self, transitions_mixin: TransitionsMixin):
        """Test transition_issue with basic parameters."""
        # Call the method
        result = transitions_mixin.transition_issue("TEST-123", "10")

        # Verify
        transitions_mixin.jira.set_issue_status.assert_called_once_with(
            issue_key="TEST-123", status_name="In Progress", fields=None, update=None
        )
        transitions_mixin.get_issue.assert_called_once_with("TEST-123")
        assert isinstance(result, JiraIssue)
        assert result.key == "TEST-123"
        assert result.summary == "Test Issue"
        assert result.description == "Issue content"

    def test_transition_issue_with_int_id(self, transitions_mixin: TransitionsMixin):
        """Test transition_issue with int transition ID."""
        # Call the method with int ID
        transitions_mixin.transition_issue("TEST-123", 10)

        # Verify status name is used instead of ID
        transitions_mixin.jira.set_issue_status.assert_called_once_with(
            issue_key="TEST-123", status_name="In Progress", fields=None, update=None
        )

    def test_transition_issue_with_fields(self, transitions_mixin: TransitionsMixin):
        """Test transition_issue with fields."""
        # Mock _sanitize_transition_fields to return the fields
        transitions_mixin._sanitize_transition_fields = MagicMock(
            return_value={"summary": "Updated"}
        )

        # Call the method with fields
        fields = {"summary": "Updated"}
        transitions_mixin.transition_issue("TEST-123", "10", fields=fields)

        # Verify fields were passed correctly
        transitions_mixin.jira.set_issue_status.assert_called_once_with(
            issue_key="TEST-123",
            status_name="In Progress",
            fields={"summary": "Updated"},
            update=None,
        )

    def test_transition_issue_with_empty_sanitized_fields(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test transition_issue with empty sanitized fields."""
        # Mock _sanitize_transition_fields to return empty dict
        transitions_mixin._sanitize_transition_fields = MagicMock(return_value={})

        # Call the method with fields that will be sanitized to empty
        fields = {"invalid": "field"}
        transitions_mixin.transition_issue("TEST-123", "10", fields=fields)

        # Verify fields were passed as None
        transitions_mixin.jira.set_issue_status.assert_called_once_with(
            issue_key="TEST-123", status_name="In Progress", fields=None, update=None
        )

    def test_transition_issue_with_comment(self, transitions_mixin: TransitionsMixin):
        """Test transition_issue with comment."""
        # Setup
        comment = "Test comment"

        # Define a side effect to record what's passed to _add_comment_to_transition_data
        def add_comment_side_effect(transition_data, comment_text):
            transition_data["update"] = {"comment": [{"add": {"body": comment_text}}]}

        # Mock _add_comment_to_transition_data
        transitions_mixin._add_comment_to_transition_data = MagicMock(
            side_effect=add_comment_side_effect
        )

        # Call the method with comment
        transitions_mixin.transition_issue("TEST-123", "10", comment=comment)

        # Verify _add_comment_to_transition_data was called
        transitions_mixin._add_comment_to_transition_data.assert_called_once()

        # Verify set_issue_status was called with the right parameters
        transitions_mixin.jira.set_issue_status.assert_called_once_with(
            issue_key="TEST-123",
            status_name="In Progress",
            fields=None,
            update={"comment": [{"add": {"body": comment}}]},
        )

    def test_transition_issue_with_error(self, transitions_mixin: TransitionsMixin):
        """Test transition_issue error handling."""
        # Setup mock to raise exception
        transitions_mixin.jira.set_issue_status.side_effect = Exception(
            "Transition error"
        )

        # Call the method and verify exception
        with pytest.raises(
            ValueError,
            match="Error transitioning issue TEST-123 with transition ID 10: Transition error",
        ):
            transitions_mixin.transition_issue("TEST-123", "10")

    def test_transition_issue_without_status_name(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test transition_issue when status name is not available."""
        # Setup - create a transition without to_status
        mock_transitions = [
            JiraTransition(
                id="10",
                name="Start Progress",
                to_status=None,
            )
        ]
        transitions_mixin.get_transitions_models = MagicMock(
            return_value=mock_transitions
        )

        # Add mock for set_issue_status_by_transition_id
        transitions_mixin.jira.set_issue_status_by_transition_id = MagicMock()

        # Call the method
        result = transitions_mixin.transition_issue("TEST-123", "10")

        # Verify direct transition ID was used
        transitions_mixin.jira.set_issue_status_by_transition_id.assert_called_once_with(
            issue_key="TEST-123", transition_id=10
        )

        # Verify standard status call was not made
        transitions_mixin.jira.set_issue_status.assert_not_called()

        # Verify result
        transitions_mixin.get_issue.assert_called_once_with("TEST-123")
        assert isinstance(result, JiraIssue)

    def test_normalize_transition_id(self, transitions_mixin: TransitionsMixin):
        """Test _normalize_transition_id with various input types."""
        # Test with string
        assert transitions_mixin._normalize_transition_id("10") == 10

        # Test with non-digit string
        assert transitions_mixin._normalize_transition_id("workflow") == "workflow"

        # Test with int
        assert transitions_mixin._normalize_transition_id(10) == 10

        # Test with dict containing id
        assert transitions_mixin._normalize_transition_id({"id": "10"}) == 10

        # Test with dict containing int id
        assert transitions_mixin._normalize_transition_id({"id": 10}) == 10

        # Test with None
        assert transitions_mixin._normalize_transition_id(None) == 0

    def test_sanitize_transition_fields_basic(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test _sanitize_transition_fields with basic fields."""
        # Simple fields
        fields = {"resolution": {"name": "Fixed"}, "priority": {"name": "High"}}

        result = transitions_mixin._sanitize_transition_fields(fields)

        # Fields should be passed through unchanged
        assert result == fields

    def test_sanitize_transition_fields_with_none_values(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test _sanitize_transition_fields with None values."""
        # Fields with None values
        fields = {"resolution": {"name": "Fixed"}, "priority": None}

        result = transitions_mixin._sanitize_transition_fields(fields)

        # None values should be skipped
        assert "priority" not in result
        assert result["resolution"] == {"name": "Fixed"}

    def test_sanitize_transition_fields_with_assignee_and_get_account_id(
        self, transitions_mixin
    ):
        """Test _sanitize_transition_fields with assignee when _get_account_id is available."""
        # Setup mock for _get_account_id
        transitions_mixin._get_account_id = MagicMock(return_value="account-123")

        # Fields with assignee
        fields = {"assignee": "user.name"}

        result = transitions_mixin._sanitize_transition_fields(fields)

        # Assignee should be converted to account ID format
        transitions_mixin._get_account_id.assert_called_once_with("user.name")
        assert result["assignee"] == {"accountId": "account-123"}

    def test_sanitize_transition_fields_with_assignee_error(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test _sanitize_transition_fields with assignee that causes error."""
        # Setup mock for _get_account_id to raise exception
        transitions_mixin._get_account_id = MagicMock(
            side_effect=Exception("User not found")
        )

        # Fields with assignee
        fields = {"assignee": "invalid.user", "resolution": {"name": "Fixed"}}

        result = transitions_mixin._sanitize_transition_fields(fields)

        # Assignee should be skipped due to error, resolution preserved
        assert "assignee" not in result
        assert result["resolution"] == {"name": "Fixed"}

    def test_add_comment_to_transition_data_with_string(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test _add_comment_to_transition_data with string comment."""
        # Prepare transition data
        transition_data = {"transition": {"id": "10"}}

        # Call the method
        transitions_mixin._add_comment_to_transition_data(
            transition_data, "Test comment"
        )

        # Verify
        assert "update" in transition_data
        assert "comment" in transition_data["update"]
        assert len(transition_data["update"]["comment"]) == 1
        assert transition_data["update"]["comment"][0]["add"]["body"] == "Test comment"

    def test_add_comment_to_transition_data_with_non_string(
        self, transitions_mixin: TransitionsMixin
    ):
        """Test _add_comment_to_transition_data with non-string comment."""
        # Prepare transition data
        transition_data = {"transition": {"id": "10"}}

        # Call the method with int
        transitions_mixin._add_comment_to_transition_data(transition_data, 123)

        # Verify comment was converted to string
        assert transition_data["update"]["comment"][0]["add"]["body"] == "123"

    def test_add_comment_to_transition_data_with_markdown_to_jira(
        self, transitions_mixin
    ):
        """Test _add_comment_to_transition_data with _markdown_to_jira method."""
        # Add _markdown_to_jira method
        transitions_mixin._markdown_to_jira = MagicMock(
            return_value="Converted comment"
        )

        # Prepare transition data
        transition_data = {"transition": {"id": "10"}}

        # Call the method
        transitions_mixin._add_comment_to_transition_data(
            transition_data, "**Markdown** comment"
        )

        # Verify
        transitions_mixin._markdown_to_jira.assert_called_once_with(
            "**Markdown** comment"
        )
        assert (
            transition_data["update"]["comment"][0]["add"]["body"]
            == "Converted comment"
        )

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/projects.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira project operations."""

import logging
from typing import Any

from ..models import JiraProject
from ..models.jira.search import JiraSearchResult
from ..models.jira.version import JiraVersion
from .client import JiraClient
from .protocols import SearchOperationsProto

logger = logging.getLogger("mcp-jira")


class ProjectsMixin(JiraClient, SearchOperationsProto):
    """Mixin for Jira project operations.

    This mixin provides methods for retrieving and working with Jira projects,
    including project details, components, versions, and other project-related operations.
    """

    def get_all_projects(self, include_archived: bool = False) -> list[dict[str, Any]]:
        """
        Get all projects visible to the current user.

        Args:
            include_archived: Whether to include archived projects

        Returns:
            List of project data dictionaries
        """
        try:
            params = {}
            if include_archived:
                params["includeArchived"] = "true"

            projects = self.jira.projects(included_archived=include_archived)
            return projects if isinstance(projects, list) else []

        except Exception as e:
            logger.error(f"Error getting all projects: {str(e)}")
            return []

    def get_project(self, project_key: str) -> dict[str, Any] | None:
        """
        Get project information by key.

        Args:
            project_key: The project key (e.g. 'PROJ')

        Returns:
            Project data or None if not found
        """
        try:
            project_data = self.jira.project(project_key)
            if not isinstance(project_data, dict):
                msg = f"Unexpected return value type from `jira.project`: {type(project_data)}"
                logger.error(msg)
                raise TypeError(msg)
            return project_data
        except Exception as e:
            logger.warning(f"Error getting project {project_key}: {e}")
            return None

    def get_project_model(self, project_key: str) -> JiraProject | None:
        """
        Get project information as a JiraProject model.

        Args:
            project_key: The project key (e.g. 'PROJ')

        Returns:
            JiraProject model or None if not found
        """
        project_data = self.get_project(project_key)
        if not project_data:
            return None

        return JiraProject.from_api_response(project_data)

    def project_exists(self, project_key: str) -> bool:
        """
        Check if a project exists.

        Args:
            project_key: The project key to check

        Returns:
            True if the project exists, False otherwise
        """
        try:
            project = self.get_project(project_key)
            return project is not None

        except Exception:
            return False

    def get_project_components(self, project_key: str) -> list[dict[str, Any]]:
        """
        Get all components for a project.

        Args:
            project_key: The project key

        Returns:
            List of component data dictionaries
        """
        try:
            components = self.jira.get_project_components(key=project_key)
            return components if isinstance(components, list) else []

        except Exception as e:
            logger.error(
                f"Error getting components for project {project_key}: {str(e)}"
            )
            return []

    def get_project_versions(self, project_key: str) -> list[dict[str, Any]]:
        """
        Get all versions for a project.

        Args:
            project_key: The project key.

        Returns:
            List of version data dictionaries
        """
        try:
            raw_versions = self.jira.get_project_versions(key=project_key)
            if not isinstance(raw_versions, list):
                return []
            versions: list[dict[str, Any]] = []
            for v in raw_versions:
                ver = JiraVersion.from_api_response(v)
                versions.append(ver.to_simplified_dict())
            return versions
        except Exception as e:
            logger.error(f"Error getting versions for project {project_key}: {str(e)}")
            return []

    def get_project_roles(self, project_key: str) -> dict[str, Any]:
        """
        Get all roles for a project.

        Args:
            project_key: The project key

        Returns:
            Dictionary of role names mapped to role details
        """
        try:
            roles = self.jira.get_project_roles(project_key=project_key)
            return roles if isinstance(roles, dict) else {}

        except Exception as e:
            logger.error(f"Error getting roles for project {project_key}: {str(e)}")
            return {}

    def get_project_role_members(
        self, project_key: str, role_id: str
    ) -> list[dict[str, Any]]:
        """
        Get members assigned to a specific role in a project.

        Args:
            project_key: The project key
            role_id: The role ID

        Returns:
            List of role members
        """
        try:
            members = self.jira.get_project_actors_for_role_project(
                project_key=project_key, role_id=role_id
            )
            # Extract the actors from the response
            actors = []
            if isinstance(members, dict) and "actors" in members:
                actors = members.get("actors", [])
            return actors

        except Exception as e:
            logger.error(
                f"Error getting role members for project {project_key}, role {role_id}: {str(e)}"
            )
            return []

    def get_project_permission_scheme(self, project_key: str) -> dict[str, Any] | None:
        """
        Get the permission scheme for a project.

        Args:
            project_key: The project key

        Returns:
            Permission scheme data if found, None otherwise
        """
        try:
            scheme = self.jira.get_project_permission_scheme(
                project_id_or_key=project_key
            )
            if not isinstance(scheme, dict):
                msg = f"Unexpected return value type from `jira.get_project_permission_scheme`: {type(scheme)}"
                logger.error(msg)
                raise TypeError(msg)
            return scheme

        except Exception as e:
            logger.error(
                f"Error getting permission scheme for project {project_key}: {str(e)}"
            )
            return None

    def get_project_notification_scheme(
        self, project_key: str
    ) -> dict[str, Any] | None:
        """
        Get the notification scheme for a project.

        Args:
            project_key: The project key

        Returns:
            Notification scheme data if found, None otherwise
        """
        try:
            scheme = self.jira.get_project_notification_scheme(
                project_id_or_key=project_key
            )
            if not isinstance(scheme, dict):
                msg = f"Unexpected return value type from `jira.get_project_notification_scheme`: {type(scheme)}"
                logger.error(msg)
                raise TypeError(msg)
            return scheme

        except Exception as e:
            logger.error(
                f"Error getting notification scheme for project {project_key}: {str(e)}"
            )
            return None

    def get_project_issue_types(self, project_key: str) -> list[dict[str, Any]]:
        """
        Get all issue types available for a project.

        Args:
            project_key: The project key

        Returns:
            List of issue type data dictionaries
        """
        try:
            meta = self.jira.issue_createmeta(project=project_key)
            if not isinstance(meta, dict):
                msg = f"Unexpected return value type from `jira.issue_createmeta`: {type(meta)}"
                logger.error(msg)
                raise TypeError(msg)

            issue_types = []
            # Extract issue types from createmeta response
            if "projects" in meta and len(meta["projects"]) > 0:
                project_data = meta["projects"][0]
                if "issuetypes" in project_data:
                    issue_types = project_data["issuetypes"]

            return issue_types

        except Exception as e:
            logger.error(
                f"Error getting issue types for project {project_key}: {str(e)}"
            )
            return []

    def get_project_issues_count(self, project_key: str) -> int:
        """
        Get the total number of issues in a project.

        Args:
            project_key: The project key

        Returns:
            Count of issues in the project
        """
        try:
            # Use JQL to count issues in the project
            jql = f'project = "{project_key}"'
            result = self.jira.jql(jql=jql, fields="key", limit=1)
            if not isinstance(result, dict):
                msg = f"Unexpected return value type from `jira.jql`: {type(result)}"
                logger.error(msg)
                raise TypeError(msg)

            # Extract total from the response
            total = 0
            if isinstance(result, dict) and "total" in result:
                total = result.get("total", 0)

            return total

        except Exception as e:
            logger.error(
                f"Error getting issue count for project {project_key}: {str(e)}"
            )
            return 0

    def get_project_issues(
        self, project_key: str, start: int = 0, limit: int = 50
    ) -> JiraSearchResult:
        """
        Get issues for a specific project.

        Args:
            project_key: The project key
            start: Index of the first issue to return
            limit: Maximum number of issues to return

        Returns:
            List of JiraIssue models representing the issues
        """
        try:
            # Use JQL to get issues in the project
            jql = f'project = "{project_key}"'

            return self.search_issues(jql, start=start, limit=limit)

        except Exception as e:
            logger.error(f"Error getting issues for project {project_key}: {str(e)}")
            return JiraSearchResult(issues=[], total=0)

    def get_project_keys(self) -> list[str]:
        """
        Get all project keys.

        Returns:
            List of project keys
        """
        try:
            projects = self.get_all_projects()
            project_keys: list[str] = []
            for project in projects:
                key = project.get("key")
                if not isinstance(key, str):
                    msg = f"Unexpected return value type from `get_all_projects`: {type(key)}"
                    logger.error(msg)
                    raise TypeError(msg)
                project_keys.append(key)
            return project_keys

        except Exception as e:
            logger.error(f"Error getting project keys: {str(e)}")
            return []

    def get_project_leads(self) -> dict[str, str]:
        """
        Get all project leads mapped to their projects.

        Returns:
            Dictionary mapping project keys to lead usernames
        """
        try:
            projects = self.get_all_projects()
            leads = {}

            for project in projects:
                if "key" in project and "lead" in project:
                    key = project.get("key")
                    lead = project.get("lead", {})

                    # Handle different formats of lead information
                    lead_name = None
                    if isinstance(lead, dict):
                        lead_name = lead.get("name") or lead.get("displayName")
                    elif isinstance(lead, str):
                        lead_name = lead

                    if key and lead_name:
                        leads[key] = lead_name

            return leads

        except Exception as e:
            logger.error(f"Error getting project leads: {str(e)}")
            return {}

    def get_user_accessible_projects(self, username: str) -> list[dict[str, Any]]:
        """
        Get projects that a specific user can access.

        Args:
            username: The username to check access for

        Returns:
            List of accessible project data dictionaries
        """
        try:
            # This requires admin permissions
            # For non-admins, a different approach might be needed
            all_projects = self.get_all_projects()
            accessible_projects = []

            for project in all_projects:
                project_key = project.get("key")
                if not project_key:
                    continue

                try:
                    # Check if user has browse permission for this project
                    browse_users = (
                        self.jira.get_users_with_browse_permission_to_a_project(
                            username=username, project_key=project_key, limit=1
                        )
                    )

                    # If the user is in the list, they have access
                    user_has_access = False
                    if isinstance(browse_users, list):
                        for user in browse_users:
                            if isinstance(user, dict) and user.get("name") == username:
                                user_has_access = True
                                break

                    if user_has_access:
                        accessible_projects.append(project)

                except Exception:
                    # Skip projects that cause errors
                    continue

            return accessible_projects

        except Exception as e:
            logger.error(
                f"Error getting accessible projects for user {username}: {str(e)}"
            )
            return []

    def create_project_version(
        self,
        project_key: str,
        name: str,
        start_date: str = None,
        release_date: str = None,
        description: str = None,
    ) -> dict[str, Any]:
        """
        Create a new version in the specified Jira project.

        Args:
            project_key: The project key (e.g., 'PROJ')
            name: The name of the version
            start_date: The start date (YYYY-MM-DD, optional)
            release_date: The release date (YYYY-MM-DD, optional)
            description: Description of the version (optional)

        Returns:
            The created version object as returned by Jira
        """
        return self.create_version(
            project=project_key,
            name=name,
            start_date=start_date,
            release_date=release_date,
            description=description,
        )

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/oauth_setup.py:
--------------------------------------------------------------------------------

```python
"""
OAuth 2.0 Authorization Flow Helper for MCP Atlassian

This module helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud:
1. Opens a browser to the authorization URL
2. Starts a local server to receive the callback with the authorization code
3. Exchanges the authorization code for access and refresh tokens
4. Saves the tokens securely for later use by MCP Atlassian
"""

import http.server
import logging
import os
import socketserver
import threading
import time
import urllib.parse
import webbrowser
from dataclasses import dataclass

from ..utils.oauth import OAuthConfig

# Configure logging
logger = logging.getLogger("mcp-atlassian.oauth-setup")

# Global variables for callback handling
authorization_code = None
authorization_state = None
callback_received = False
callback_error = None


class CallbackHandler(http.server.BaseHTTPRequestHandler):
    """HTTP request handler for OAuth callback."""

    def do_GET(self) -> None:  # noqa: N802
        """Handle GET requests (OAuth callback)."""
        global \
            authorization_code, \
            callback_received, \
            callback_error, \
            authorization_state

        # Parse the query parameters from the URL
        query = urllib.parse.urlparse(self.path).query
        params = urllib.parse.parse_qs(query)

        if "error" in params:
            callback_error = params["error"][0]
            callback_received = True
            self._send_response(f"Authorization failed: {callback_error}")
            return

        if "code" in params:
            authorization_code = params["code"][0]
            if "state" in params:
                authorization_state = params["state"][0]
            callback_received = True
            self._send_response(
                "Authorization successful! You can close this window now."
            )
        else:
            self._send_response(
                "Invalid callback: Authorization code missing", status=400
            )

    def _send_response(self, message: str, status: int = 200) -> None:
        """Send response to the browser."""
        self.send_response(status)
        self.send_header("Content-type", "text/html")
        self.end_headers()

        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Atlassian OAuth Authorization</title>
            <style>
                body {{
                    font-family: Arial, sans-serif;
                    text-align: center;
                    padding: 40px;
                    max-width: 600px;
                    margin: 0 auto;
                }}
                .message {{
                    padding: 20px;
                    border-radius: 5px;
                    margin-bottom: 20px;
                }}
                .success {{
                    background-color: #d4edda;
                    color: #155724;
                    border: 1px solid #c3e6cb;
                }}
                .error {{
                    background-color: #f8d7da;
                    color: #721c24;
                    border: 1px solid #f5c6cb;
                }}
                .countdown {{
                    font-weight: bold;
                    font-size: 1.2em;
                }}
            </style>
        </head>
        <body>
            <h1>Atlassian OAuth Authorization</h1>
            <div class="message {"success" if status == 200 else "error"}">
                <p>{message}</p>
            </div>
            <p>This window will automatically close in <span class="countdown">5</span> seconds...</p>
            <button onclick="window.close()">Close Window Now</button>
            <script>
                // Countdown timer
                var seconds = 5;
                var countdown = document.querySelector('.countdown');
                var timer = setInterval(function() {{
                    seconds--;
                    countdown.textContent = seconds;
                    if (seconds <= 0) {{
                        clearInterval(timer);
                        // Try multiple methods to close the window
                        window.close();
                        // If the above doesn't work (which is often the case with modern browsers)
                        try {{ window.open('', '_self').close(); }} catch (e) {{}}
                    }}
                }}, 1000);

                // Force close on success after 5.5 seconds as a fallback
                setTimeout(function() {{
                    // If status is 200 (success), really try hard to close
                    if ({status} === 200) {{
                        window.open('about:blank', '_self');
                        window.close();
                    }}
                }}, 5500);
            </script>
        </body>
        </html>
        """
        self.wfile.write(html.encode())

    # Make the server quiet
    def log_message(self, format: str, *args: str) -> None:
        return


def start_callback_server(port: int) -> socketserver.TCPServer:
    """Start a local server to receive the OAuth callback."""
    handler = CallbackHandler
    httpd = socketserver.TCPServer(("", port), handler)
    server_thread = threading.Thread(target=httpd.serve_forever)
    server_thread.daemon = True
    server_thread.start()
    return httpd


def wait_for_callback(timeout: int = 300) -> bool:
    """Wait for the callback to be received."""
    start_time = time.time()
    while not callback_received and (time.time() - start_time) < timeout:
        time.sleep(1)

    if not callback_received:
        logger.error(
            f"Timed out waiting for authorization callback after {timeout} seconds"
        )
        return False

    if callback_error:
        logger.error(f"Authorization error: {callback_error}")
        return False

    return True


def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]:
    """Parse the redirect URI to extract host and port."""
    parsed = urllib.parse.urlparse(redirect_uri)
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    return parsed.hostname, port


@dataclass
class OAuthSetupArgs:
    """Arguments for the OAuth setup flow."""

    client_id: str
    client_secret: str
    redirect_uri: str
    scope: str


def run_oauth_flow(args: OAuthSetupArgs) -> bool:
    """Run the OAuth 2.0 authorization flow."""
    # Reset global state (important for multiple runs)
    global authorization_code, authorization_state, callback_received, callback_error
    authorization_code = None
    authorization_state = None
    callback_received = False
    callback_error = None

    # Create OAuth configuration
    oauth_config = OAuthConfig(
        client_id=args.client_id,
        client_secret=args.client_secret,
        redirect_uri=args.redirect_uri,
        scope=args.scope,
    )

    # Generate a random state for CSRF protection
    import secrets

    state = secrets.token_urlsafe(16)

    # Start local callback server if using localhost
    hostname, port = parse_redirect_uri(args.redirect_uri)
    httpd = None

    if hostname in ["localhost", "127.0.0.1"]:
        logger.info(f"Starting local callback server on port {port}")
        try:
            httpd = start_callback_server(port)
        except OSError as e:
            logger.error(f"Failed to start callback server: {e}")
            logger.error(f"Make sure port {port} is available and not in use")
            return False

    # Get the authorization URL
    auth_url = oauth_config.get_authorization_url(state=state)

    # Open the browser for authorization
    logger.info(f"Opening browser for authorization at {auth_url}")
    webbrowser.open(auth_url)
    logger.info(
        "If the browser doesn't open automatically, please visit this URL manually."
    )

    # Wait for the callback
    if not wait_for_callback():
        if httpd:
            httpd.shutdown()
        return False

    # Verify state to prevent CSRF attacks
    if authorization_state != state:
        logger.error("State mismatch! Possible CSRF attack.")
        if httpd:
            httpd.shutdown()
        return False

    # Exchange the code for tokens
    logger.info("Exchanging authorization code for tokens...")
    if oauth_config.exchange_code_for_tokens(authorization_code):
        logger.info("✅ OAuth authorization successful!")
        logger.info(
            f"Access token: {oauth_config.access_token[:10]}...{oauth_config.access_token[-5:]}"
        )
        logger.info(
            f"Refresh token saved: {oauth_config.refresh_token[:5]}...{oauth_config.refresh_token[-3:]}"
        )

        if oauth_config.cloud_id:
            logger.info(f"Cloud ID: {oauth_config.cloud_id}")

            # Print environment variable information more clearly
            logger.info("\n=== IMPORTANT: ENVIRONMENT VARIABLES ===")
            logger.info(
                "Your tokens have been securely stored in your system keyring and backup file."
            )
            logger.info(
                "However, to use them in your application, you need these environment variables:"
            )
            logger.info("")
            logger.info(
                "Add the following to your .env file or set as environment variables:"
            )
            logger.info("------------------------------------------------------------")
            logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}")
            logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}")
            logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}")
            logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}")
            logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}")
            logger.info("------------------------------------------------------------")
            logger.info("")
            logger.info(
                "Note: The tokens themselves are not set as environment variables for security reasons."
            )
            logger.info(
                "They are stored securely in your system keyring and will be loaded automatically."
            )
            logger.info(
                f"Token storage location (backup): ~/.mcp-atlassian/oauth-{oauth_config.client_id}.json"
            )

            # Generate VS Code configuration JSON snippet
            import json

            vscode_config = {
                "mcpServers": {
                    "mcp-atlassian": {
                        "command": "docker",
                        "args": [
                            "run",
                            "--rm",
                            "-i",
                            "-p",
                            "8080:8080",
                            "-e",
                            "CONFLUENCE_URL",
                            "-e",
                            "JIRA_URL",
                            "-e",
                            "ATLASSIAN_OAUTH_CLIENT_ID",
                            "-e",
                            "ATLASSIAN_OAUTH_CLIENT_SECRET",
                            "-e",
                            "ATLASSIAN_OAUTH_REDIRECT_URI",
                            "-e",
                            "ATLASSIAN_OAUTH_SCOPE",
                            "-e",
                            "ATLASSIAN_OAUTH_CLOUD_ID",
                            "ghcr.io/sooperset/mcp-atlassian:latest",
                        ],
                        "env": {
                            "CONFLUENCE_URL": "https://your-company.atlassian.net/wiki",
                            "JIRA_URL": "https://your-company.atlassian.net",
                            "ATLASSIAN_OAUTH_CLIENT_ID": oauth_config.client_id,
                            "ATLASSIAN_OAUTH_CLIENT_SECRET": oauth_config.client_secret,
                            "ATLASSIAN_OAUTH_REDIRECT_URI": oauth_config.redirect_uri,
                            "ATLASSIAN_OAUTH_SCOPE": oauth_config.scope,
                            "ATLASSIAN_OAUTH_CLOUD_ID": oauth_config.cloud_id,
                        },
                    }
                }
            }

            # Pretty print the VS Code configuration JSON
            vscode_json = json.dumps(vscode_config, indent=4)

            logger.info("\n=== VS CODE CONFIGURATION ===")
            logger.info("Add the following to your VS Code settings.json file:")
            logger.info("------------------------------------------------------------")
            logger.info(vscode_json)
            logger.info("------------------------------------------------------------")
            logger.info(
                "\nNote: If you already have an 'mcp' configuration in settings.json, merge this with your existing configuration."
            )
        else:
            logger.error("Failed to obtain cloud ID!")

        if httpd:
            httpd.shutdown()
        return True
    else:
        logger.error("Failed to exchange authorization code for tokens")
        if httpd:
            httpd.shutdown()
        return False


def _prompt_for_input(prompt: str, env_var: str = None, is_secret: bool = False) -> str:
    """Prompt the user for input."""
    value = os.getenv(env_var, "") if env_var else ""
    if value:
        if is_secret:
            masked = (
                value[:3] + "*" * (len(value) - 6) + value[-3:]
                if len(value) > 6
                else "****"
            )
            print(f"{prompt} [{masked}]: ", end="")
        else:
            print(f"{prompt} [{value}]: ", end="")
        user_input = input()
        return user_input if user_input else value
    else:
        print(f"{prompt}: ", end="")
        return input()


def run_oauth_setup() -> int:
    """Run the OAuth 2.0 setup wizard interactively."""
    print("\n=== Atlassian OAuth 2.0 Setup Wizard ===")
    print(
        "This wizard will guide you through setting up OAuth 2.0 authentication for MCP Atlassian."
    )
    print("\nYou need to have created an OAuth 2.0 app in your Atlassian account.")
    print("You can create one at: https://developer.atlassian.com/console/myapps/")
    print("\nPlease provide the following information:\n")

    # Check for environment variables first
    client_id = _prompt_for_input("OAuth Client ID", "ATLASSIAN_OAUTH_CLIENT_ID")

    client_secret = _prompt_for_input(
        "OAuth Client Secret", "ATLASSIAN_OAUTH_CLIENT_SECRET", is_secret=True
    )

    default_redirect = os.getenv(
        "ATLASSIAN_OAUTH_REDIRECT_URI", "http://localhost:8080/callback"
    )
    redirect_uri = (
        _prompt_for_input("OAuth Redirect URI", "ATLASSIAN_OAUTH_REDIRECT_URI")
        or default_redirect
    )

    default_scope = os.getenv(
        "ATLASSIAN_OAUTH_SCOPE",
        "read:jira-work write:jira-work read:confluence-space.summary offline_access",
    )
    scope = (
        _prompt_for_input("OAuth Scopes (space-separated)", "ATLASSIAN_OAUTH_SCOPE")
        or default_scope
    )

    # Validate required arguments
    if not client_id:
        logger.error("OAuth Client ID is required")
        return 1
    if not client_secret:
        logger.error("OAuth Client Secret is required")
        return 1

    # Run the OAuth flow
    args = OAuthSetupArgs(
        client_id=client_id,
        client_secret=client_secret,
        redirect_uri=redirect_uri,
        scope=scope,
    )

    success = run_oauth_flow(args)
    return 0 if success else 1

```

--------------------------------------------------------------------------------
/tests/unit/jira/test_client_oauth.py:
--------------------------------------------------------------------------------

```python
"""Tests for the JiraClient with OAuth authentication."""

import os
from unittest.mock import MagicMock, PropertyMock, patch

import pytest

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig, OAuthConfig


class TestJiraClientOAuth:
    """Tests for JiraClient with OAuth authentication."""

    def test_init_with_oauth_config(self):
        """Test initializing the client with OAuth configuration."""
        # Create a mock OAuth config with both access and refresh tokens
        oauth_config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            cloud_id="test-cloud-id",
            access_token="test-access-token",
            refresh_token="test-refresh-token",
            expires_at=9999999999.0,  # Set a future expiry time
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=oauth_config,
        )

        # Mock dependencies
        with (
            patch("mcp_atlassian.jira.client.Jira") as mock_jira,
            patch(
                "mcp_atlassian.jira.client.configure_oauth_session"
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.jira.client.configure_ssl_verification"
            ) as mock_configure_ssl,
            patch.object(
                OAuthConfig,
                "is_token_expired",
                new_callable=PropertyMock,
                return_value=False,
            ) as mock_is_expired,
            patch.object(
                oauth_config, "ensure_valid_token", return_value=True
            ) as mock_ensure_valid,
        ):
            # Configure the mock to return success for OAuth configuration
            mock_configure_oauth.return_value = True

            # Initialize client
            client = JiraClient(config=config)

            # Verify OAuth session configuration was called
            mock_configure_oauth.assert_called_once()

            # Verify Jira was initialized with the expected parameters
            mock_jira.assert_called_once()
            jira_kwargs = mock_jira.call_args[1]
            assert (
                jira_kwargs["url"]
                == f"https://api.atlassian.com/ex/jira/{oauth_config.cloud_id}"
            )
            assert "session" in jira_kwargs
            assert jira_kwargs["cloud"] is True

            # Verify SSL verification was configured
            mock_configure_ssl.assert_called_once()

    def test_init_with_oauth_missing_cloud_id(self):
        """Test initializing the client with OAuth but missing cloud_id."""
        # Create a mock OAuth config without cloud_id
        oauth_config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            # No cloud_id
            access_token="test-access-token",
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=oauth_config,
        )

        # Verify error is raised
        with pytest.raises(
            ValueError, match="OAuth authentication requires a valid cloud_id"
        ):
            JiraClient(config=config)

    def test_init_with_oauth_failed_session_config(self):
        """Test initializing the client with OAuth but failed session configuration."""
        # Create a mock OAuth config
        oauth_config = OAuthConfig(
            client_id="test-client-id",
            client_secret="test-client-secret",
            redirect_uri="https://example.com/callback",
            scope="read:jira-work write:jira-work",
            cloud_id="test-cloud-id",
            access_token="test-access-token",
            refresh_token="test-refresh-token",
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=oauth_config,
        )

        # Mock dependencies with OAuth configuration failure
        with (
            patch("mcp_atlassian.jira.client.Jira") as mock_jira,
            # Patch where the function is imported, not where it's defined
            patch(
                "mcp_atlassian.jira.client.configure_oauth_session"
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.jira.client.configure_ssl_verification"
            ) as mock_configure_ssl,
            patch(
                "mcp_atlassian.preprocessing.jira.JiraPreprocessor"
            ) as mock_preprocessor,
            patch.object(
                OAuthConfig,
                "is_token_expired",
                new_callable=PropertyMock,
                return_value=False,
            ) as mock_is_expired,
            patch.object(
                oauth_config, "ensure_valid_token", return_value=True
            ) as mock_ensure_valid,
        ):
            # Configure the mock to return failure for OAuth configuration
            mock_configure_oauth.return_value = False

            # Verify error is raised
            with pytest.raises(
                MCPAtlassianAuthenticationError,
                match="Failed to configure OAuth session",
            ):
                JiraClient(config=config)

    def test_init_with_byo_access_token_oauth_config(self):
        """Test initializing the client with BYO Access Token OAuth configuration."""
        # Create a mock BYO OAuth config
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="test-cloud-id", access_token="my-byo-token"
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Mock dependencies
        with (
            patch("mcp_atlassian.jira.client.Jira") as mock_jira,
            patch(
                "mcp_atlassian.jira.client.configure_oauth_session"
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.jira.client.configure_ssl_verification"
            ) as mock_configure_ssl,
        ):
            # Configure the mock to return success for OAuth configuration
            mock_configure_oauth.return_value = True

            # Initialize client
            client = JiraClient(config=config)

            # Verify OAuth session configuration was called
            mock_configure_oauth.assert_called_once()

            # Verify Jira was initialized with the expected parameters
            mock_jira.assert_called_once()
            jira_kwargs = mock_jira.call_args[1]
            assert (
                jira_kwargs["url"]
                == f"https://api.atlassian.com/ex/jira/{byo_oauth_config.cloud_id}"
            )
            assert "session" in jira_kwargs
            assert jira_kwargs["cloud"] is True

            # Verify SSL verification was configured
            mock_configure_ssl.assert_called_once()

    def test_init_with_byo_oauth_missing_cloud_id(self):
        """Test initializing with BYO OAuth but missing cloud_id."""
        # Create a mock BYO OAuth config with an empty cloud_id
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="", access_token="my-byo-token"
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Verify error is raised
        with pytest.raises(
            ValueError, match="OAuth authentication requires a valid cloud_id"
        ):
            JiraClient(config=config)

    def test_init_with_byo_oauth_failed_session_config(self):
        """Test init with BYO OAuth but failed session configuration."""
        # Create a mock BYO OAuth config
        byo_oauth_config = BYOAccessTokenOAuthConfig(
            cloud_id="test-cloud-id", access_token="my_byo_token"
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=byo_oauth_config,
        )

        # Mock dependencies with OAuth configuration failure
        with (
            patch("mcp_atlassian.jira.client.Jira"),  # No need to assert mock_jira
            patch(
                "mcp_atlassian.jira.client.configure_oauth_session"
            ) as mock_configure_oauth,
            patch("mcp_atlassian.jira.client.configure_ssl_verification"),
        ):
            # Configure the mock to return failure for OAuth configuration
            mock_configure_oauth.return_value = False

            # Verify error is raised
            with pytest.raises(
                MCPAtlassianAuthenticationError,
                match="Failed to configure OAuth session",
            ):
                JiraClient(config=config)

    def test_init_with_byo_oauth_empty_token_failed_session_config(self):
        """Test init with BYO OAuth, empty token, so session config fails."""
        # Create a mock BYO OAuth config with an empty token
        byo_oauth_config_empty_token = BYOAccessTokenOAuthConfig(
            cloud_id="test-cloud-id",
            access_token="",  # Empty token
        )

        # Create a Jira config with OAuth
        config = JiraConfig(
            url="https://test.atlassian.net",
            auth_type="oauth",
            oauth_config=byo_oauth_config_empty_token,
        )

        # Mock dependencies - configure_oauth_session will be called with real logic
        with (
            patch("mcp_atlassian.jira.client.Jira"),
            patch("mcp_atlassian.jira.client.configure_ssl_verification"),
            # We want to test the actual behavior of configure_oauth_session here for empty token
        ):
            # Verify error is raised
            with pytest.raises(
                MCPAtlassianAuthenticationError,
                match="Failed to configure OAuth session",
            ):
                JiraClient(config=config)

    def test_from_env_with_oauth(self):
        # Mock environment variables
        env_vars = {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_AUTH_TYPE": "oauth",  # Add auth_type to env vars
            "ATLASSIAN_OAUTH_CLIENT_ID": "env-client-id",
            "ATLASSIAN_OAUTH_CLIENT_SECRET": "env-client-secret",
            "ATLASSIAN_OAUTH_REDIRECT_URI": "https://example.com/callback",
            "ATLASSIAN_OAUTH_SCOPE": "read:jira-work",
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-cloud-id",
        }

        # Mock OAuth config and token loading
        mock_oauth_config = MagicMock()
        mock_oauth_config.cloud_id = "env-cloud-id"
        mock_oauth_config.access_token = "env-access-token"
        mock_oauth_config.refresh_token = "env-refresh-token"
        mock_oauth_config.expires_at = 9999999999.0

        with (
            patch.dict(os.environ, env_vars),
            patch(
                "mcp_atlassian.jira.config.get_oauth_config_from_env",
                return_value=mock_oauth_config,
            ),
            patch.object(
                OAuthConfig,
                "is_token_expired",
                new_callable=PropertyMock,
                return_value=False,
            ) as mock_is_expired_env,
            patch.object(
                mock_oauth_config, "ensure_valid_token", return_value=True
            ) as mock_ensure_valid_env,
            patch("mcp_atlassian.jira.client.Jira") as mock_jira,
            patch(
                "mcp_atlassian.jira.client.configure_oauth_session", return_value=True
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.jira.client.configure_ssl_verification"
            ) as mock_configure_ssl,
        ):
            # Initialize client from environment
            client = JiraClient()

            # Verify client was initialized with OAuth
            assert client.config.auth_type == "oauth"
            assert client.config.oauth_config is mock_oauth_config

            # Verify Jira was initialized correctly
            mock_jira.assert_called_once()
            jira_kwargs = mock_jira.call_args[1]
            assert (
                jira_kwargs["url"]
                == f"https://api.atlassian.com/ex/jira/{mock_oauth_config.cloud_id}"
            )
            assert "session" in jira_kwargs
            assert jira_kwargs["cloud"] is True

            # Verify OAuth session was configured
            mock_configure_oauth.assert_called_once()

    def test_from_env_with_byo_token_oauth(self):
        """Test JiraClient.from_env() when BYO token OAuth config is found."""
        env_vars = {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_AUTH_TYPE": "oauth",
            "ATLASSIAN_OAUTH_ACCESS_TOKEN": "env-byo-access-token",
            "ATLASSIAN_OAUTH_CLOUD_ID": "env-byo-cloud-id",
            # Ensure other standard OAuth env vars are not set or are ignored
            "ATLASSIAN_OAUTH_CLIENT_ID": "",
            "ATLASSIAN_OAUTH_CLIENT_SECRET": "",
        }

        # Mock BYO OAuth config
        mock_byo_oauth_config = MagicMock(spec=BYOAccessTokenOAuthConfig)
        mock_byo_oauth_config.cloud_id = "env-byo-cloud-id"
        mock_byo_oauth_config.access_token = "env-byo-access-token"
        # BYO config does not have refresh_token or expires_at in the same way
        # and does not have is_token_expired or ensure_valid_token methods

        with (
            patch.dict(os.environ, env_vars),
            patch(
                "mcp_atlassian.jira.config.get_oauth_config_from_env",
                return_value=mock_byo_oauth_config,
            ),
            patch("mcp_atlassian.jira.client.Jira") as mock_jira,
            patch(
                "mcp_atlassian.jira.client.configure_oauth_session", return_value=True
            ) as mock_configure_oauth,
            patch(
                "mcp_atlassian.jira.client.configure_ssl_verification"
            ) as mock_configure_ssl,
        ):
            client = JiraClient()  # Initializes from env via JiraConfig.from_env()

            assert client.config.auth_type == "oauth"
            assert client.config.oauth_config is mock_byo_oauth_config

            # Verify OAuth session configuration was called
            mock_configure_oauth.assert_called_once()

            mock_jira.assert_called_once()
            jira_kwargs = mock_jira.call_args[1]
            assert (
                jira_kwargs["url"]
                == f"https://api.atlassian.com/ex/jira/{mock_byo_oauth_config.cloud_id}"
            )
            mock_configure_ssl.assert_called_once()

    def test_from_env_with_no_oauth_config_found(self):
        """Test JiraClient.from_env() when no OAuth config is found."""
        env_vars = {
            "JIRA_URL": "https://test.atlassian.net",
            "JIRA_AUTH_TYPE": "oauth",
            # No OAuth specific variables set
            "ATLASSIAN_OAUTH_CLIENT_ID": "",
            "ATLASSIAN_OAUTH_ACCESS_TOKEN": "",
            # Explicitly clear basic auth credentials
            "JIRA_USERNAME": "",
            "JIRA_API_TOKEN": "",
        }

        with (
            patch.dict(os.environ, env_vars, clear=True),
            patch(
                "mcp_atlassian.jira.config.get_oauth_config_from_env",
                return_value=None,  # Simulate no config found
            ),
        ):
            with pytest.raises(
                ValueError,  # Adjusted to actual error raised by JiraConfig.from_env
                match=r"Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN, or OAuth configuration.*",
            ):
                JiraClient()

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/transitions.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira transition operations."""

import logging
from typing import Any

from requests.exceptions import HTTPError

from ..exceptions import MCPAtlassianAuthenticationError
from ..models import JiraIssue, JiraTransition
from .client import JiraClient
from .protocols import IssueOperationsProto, UsersOperationsProto

logger = logging.getLogger("mcp-jira")


class TransitionsMixin(JiraClient, IssueOperationsProto, UsersOperationsProto):
    """Mixin for Jira transition operations."""

    def get_available_transitions(self, issue_key: str) -> list[dict[str, Any]]:
        """
        Get the available status transitions for an issue.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')

        Returns:
            List of available transitions with id, name, and to status details

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            Exception: If there is an error getting transitions
        """
        try:
            transitions_data = self.jira.get_issue_transitions(issue_key)
            result: list[dict[str, Any]] = []

            for transition in transitions_data:
                # Skip non-dict transitions
                if not isinstance(transition, dict):
                    continue

                # Extract the essential information
                transition_info = {
                    "id": transition.get("id", ""),
                    "name": transition.get("name", ""),
                }

                # Handle "to" field in different formats
                to_status = None
                # Option 1: 'to' field with sub-fields
                if "to" in transition and isinstance(transition["to"], dict):
                    to_status = transition["to"].get("name")
                # Option 2: 'to_status' field directly
                elif "to_status" in transition:
                    to_status = transition.get("to_status")
                # Option 3: 'status' field directly (sometimes used in tests)
                elif "status" in transition:
                    to_status = transition.get("status")

                # Add to_status if found in any format
                if to_status:
                    transition_info["to_status"] = to_status

                result.append(transition_info)

            return result
        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API ({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
                raise http_err
        except Exception as e:
            error_msg = f"Error getting transitions for {issue_key}: {str(e)}"
            logger.error(error_msg)
            raise Exception(f"Error getting transitions: {str(e)}") from e

    def get_transitions(self, issue_key: str) -> list[dict[str, Any]]:
        """
        Get the raw transitions data for an issue.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')

        Returns:
            Raw transitions data from the API
        """
        return self.jira.get_issue_transitions(issue_key)

    def get_transitions_models(self, issue_key: str) -> list[JiraTransition]:
        """
        Get the available status transitions for an issue as JiraTransition models.

        Args:
            issue_key: The issue key (e.g. 'PROJ-123')

        Returns:
            List of JiraTransition models
        """
        transitions_data = self.get_transitions(issue_key)
        result: list[JiraTransition] = []

        for transition_data in transitions_data:
            transition = JiraTransition.from_api_response(transition_data)
            result.append(transition)

        return result

    def transition_issue(
        self,
        issue_key: str,
        transition_id: str | int,
        fields: dict[str, Any] | None = None,
        comment: str | None = None,
    ) -> JiraIssue:
        """
        Transition a Jira issue to a new status.

        Args:
            issue_key: The key of the issue to transition
            transition_id: The ID of the transition to perform (integer preferred, string accepted)
            fields: Optional fields to set during the transition
            comment: Optional comment to add during the transition

        Returns:
            JiraIssue model representing the transitioned issue

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            ValueError: If there is an error transitioning the issue
        """
        try:
            # Normalize transition_id to an integer when possible, or string otherwise
            normalized_transition_id = self._normalize_transition_id(transition_id)

            # Validate that this is a valid transition ID
            valid_transitions = self.get_transitions_models(issue_key)
            valid_ids = [t.id for t in valid_transitions]

            # Convert string IDs to integers for proper comparison if normalized_transition_id is an integer
            if isinstance(normalized_transition_id, int):
                valid_ids = [
                    int(id_val)
                    if isinstance(id_val, str) and id_val.isdigit()
                    else id_val
                    for id_val in valid_ids
                ]

            # Check if the normalized_transition_id is in the list of valid IDs
            id_to_check = normalized_transition_id
            if id_to_check not in valid_ids:
                available_transitions = ", ".join(
                    f"{t.id} ({t.name})" for t in valid_transitions
                )
                logger.warning(
                    f"Transition ID {id_to_check} not in available transitions: {available_transitions}"
                )
                # Continue anyway as Jira will validate

            # Find the target status name corresponding to the transition ID
            target_status_name = None
            for transition in valid_transitions:
                if str(transition.id) == str(normalized_transition_id):
                    if transition.to_status and transition.to_status.name:
                        target_status_name = transition.to_status.name
                        break

            # Sanitize fields if provided
            fields_for_api = None
            if fields:
                sanitized_fields = self._sanitize_transition_fields(fields)
                if sanitized_fields:
                    fields_for_api = sanitized_fields

            # Prepare update data for comments if provided
            update_for_api = None
            if comment:
                # Create a temporary dict to hold the transition data
                temp_transition_data = {}
                self._add_comment_to_transition_data(temp_transition_data, comment)
                update_for_api = temp_transition_data.get("update")

            # Log the transition request for debugging
            logger.info(
                f"Transitioning issue {issue_key} with transition ID {normalized_transition_id}"
            )
            logger.debug(f"Fields: {fields_for_api}, Update: {update_for_api}")

            # Attempt to transition the issue using the appropriate method
            if target_status_name:
                # If we have a status name, use set_issue_status
                logger.info(f"Using status name '{target_status_name}' for transition")
                self.jira.set_issue_status(
                    issue_key=issue_key,
                    status_name=target_status_name,
                    fields=fields_for_api,
                    update=update_for_api,
                )
            else:
                # If no status name is found, try direct transition ID method
                logger.info(f"Using direct transition ID {normalized_transition_id}")
                # Convert to integer if it's a string that looks like an integer
                if (
                    isinstance(normalized_transition_id, str)
                    and normalized_transition_id.isdigit()
                ):
                    normalized_transition_id = int(normalized_transition_id)

                # Use set_issue_status_by_transition_id for direct ID transition
                self.jira.set_issue_status_by_transition_id(
                    issue_key=issue_key, transition_id=normalized_transition_id
                )

                # Apply fields and comments separately if needed
                if fields_for_api or update_for_api:
                    payload = {}
                    if fields_for_api:
                        payload["fields"] = fields_for_api
                    if update_for_api:
                        payload["update"] = update_for_api

                    if payload:
                        base_url = self.jira.resource_url("issue")
                        url = f"{base_url}/{issue_key}"
                        self.jira.put(url, data=payload)

            # Return the updated issue
            return self.get_issue(issue_key)
        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API ({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
                raise http_err
        except ValueError as e:
            logger.error(f"Value error transitioning issue {issue_key}: {str(e)}")
            raise
        except Exception as e:
            error_msg = (
                f"Error transitioning issue {issue_key} with transition ID "
                f"{transition_id}: {str(e)}"
            )
            logger.error(error_msg)
            raise ValueError(error_msg) from e

    def _normalize_transition_id(self, transition_id: str | int | dict) -> str | int:
        """
        Normalize the transition ID to a common format.

        Args:
            transition_id: The transition ID, which can be a string, int, or dict

        Returns:
            The normalized transition ID as an integer when possible, or string otherwise
        """
        logger.debug(
            f"Normalizing transition_id: {transition_id}, type: {type(transition_id)}"
        )

        # Handle empty or None values
        if transition_id is None:
            logger.warning("Received None for transition_id, using default 0")
            return 0

        # Handle integer directly (preferred by the API)
        if isinstance(transition_id, int):
            return transition_id

        # Handle string by converting to integer if it's numeric
        if isinstance(transition_id, str):
            if transition_id.isdigit():
                return int(transition_id)
            else:
                # For non-numeric strings, keep as string for backward compatibility
                return transition_id

        # Handle dictionary case
        if isinstance(transition_id, dict):
            logger.warning(
                f"Received dict for transition_id when string expected: {transition_id}"
            )

            # Try to extract ID from standard formats
            for key in ["id", "ID", "transitionId", "transition_id"]:
                if key in transition_id and transition_id[key] is not None:
                    value = transition_id[key]
                    if isinstance(value, str | int):
                        logger.warning(f"Using {key}={value} as transition ID")
                        # Try to convert to int if possible
                        if isinstance(value, int):
                            return value
                        elif isinstance(value, str) and value.isdigit():
                            return int(value)
                        else:
                            return str(value)

            # If no standard key found, try to use any string or int value
            for key, value in transition_id.items():
                if value is not None and isinstance(value, str | int):
                    logger.warning(f"Using {key}={value} as transition ID from dict")
                    # Try to convert to int if possible
                    if isinstance(value, int):
                        return value
                    elif isinstance(value, str) and value.isdigit():
                        return int(value)
                    else:
                        return str(value)

            # Last resort: try to use the first value
            try:
                first_value = next(iter(transition_id.values()))
                if first_value is not None:
                    # Try to convert to int if possible
                    if isinstance(first_value, int):
                        return first_value
                    elif isinstance(first_value, str) and str(first_value).isdigit():
                        return int(first_value)
                    else:
                        return str(first_value)
            except (StopIteration, AttributeError):
                pass

            # Nothing worked, return a default
            logger.error(f"Could not extract valid transition ID from: {transition_id}")
            return 0

        # For any other type, convert to string with warning
        logger.warning(
            f"Unexpected type for transition_id: {type(transition_id)}, trying conversion"
        )
        try:
            str_value = str(transition_id)
            if str_value.isdigit():
                return int(str_value)
            else:
                return str_value
        except Exception as e:
            logger.error(f"Failed to convert transition_id: {str(e)}")
            return 0

    def _sanitize_transition_fields(self, fields: dict[str, Any]) -> dict[str, Any]:
        """
        Sanitize fields to ensure they're valid for the Jira API.

        Args:
            fields: Dictionary of fields to sanitize

        Returns:
            Dictionary of sanitized fields
        """
        sanitized_fields: dict[str, Any] = {}
        for key, value in fields.items():
            # Skip None values
            if value is None:
                continue

            # Handle special case for assignee
            if key == "assignee" and isinstance(value, str):
                try:
                    # Check if _get_account_id is available (from UsersMixin)
                    account_id = self._get_account_id(value)
                    sanitized_fields[key] = {"accountId": account_id}
                except Exception as e:  # noqa: BLE001 - Intentional fallback with logging
                    error_msg = f"Could not resolve assignee '{value}': {str(e)}"
                    logger.warning(error_msg)
                    # Skip this field
                    continue
            else:
                sanitized_fields[key] = value

        return sanitized_fields

    def _add_comment_to_transition_data(
        self, transition_data: dict[str, Any], comment: str | int
    ) -> None:
        """
        Add comment to transition data.

        Args:
            transition_data: The transition data dictionary to update
            comment: The comment to add
        """
        # Ensure comment is a string
        if not isinstance(comment, str):
            logger.warning(
                f"Comment must be a string, converting from {type(comment)}: {comment}"
            )
            comment_str = str(comment)
        else:
            comment_str = comment

        # Convert markdown to Jira format if _markdown_to_jira is available
        jira_formatted_comment = comment_str
        if hasattr(self, "_markdown_to_jira"):
            jira_formatted_comment = self._markdown_to_jira(comment_str)

        # Add to transition data
        transition_data["update"] = {
            "comment": [{"add": {"body": jira_formatted_comment}}]
        }

```

--------------------------------------------------------------------------------
/tests/fixtures/confluence_mocks.py:
--------------------------------------------------------------------------------

```python
MOCK_CQL_SEARCH_RESPONSE = {
    "results": [
        {
            "content": {
                "id": "123456789",
                "type": "page",
                "status": "current",
                "title": "2024-01-01: Team Progress Meeting 01",
                "childTypes": {},
                "macroRenderedOutput": {},
                "restrictions": {},
                "_expandable": {
                    "container": "",
                    "metadata": "",
                    "extensions": "",
                    "operations": "",
                    "children": "",
                    "history": "/rest/api/content/123456789/history",
                    "ancestors": "",
                    "body": "",
                    "version": "",
                    "descendants": "",
                    "space": "/rest/api/space/TEAM",
                },
                "_links": {
                    "webui": "/spaces/TEAM/pages/123456789/2024-01-01+Team+Progress+Meeting+01",
                    "self": "https://example.atlassian.net/wiki/rest/api/content/123456789",
                    "tinyui": "/x/ABC123",
                },
            },
            "title": "2024-01-01: Team Progress Meeting 01",
            "excerpt": "📅 Date\n2024-01-01\n👥 Participants\nJohn Smith\nJane Doe\nBob Wilson\n!-@123456",
            "url": "/spaces/TEAM/pages/123456789/2024-01-01+Team+Progress+Meeting+01",
            "resultGlobalContainer": {
                "title": "Team Space",
                "displayUrl": "/spaces/TEAM",
            },
            "breadcrumbs": [],
            "entityType": "content",
            "iconCssClass": "aui-icon content-type-page",
            "lastModified": "2024-01-01T08:00:00.000Z",
            "friendlyLastModified": "Jan 01, 2024",
            "score": 0.0,
        }
    ],
    "start": 0,
    "limit": 50,
    "size": 1,
    "totalSize": 1,
    "cqlQuery": "parent = 123456789",
    "searchDuration": 156,
    "_links": {
        "base": "https://example.atlassian.net/wiki",
        "context": "/wiki",
        "self": "https://example.atlassian.net/wiki/rest/api/search?cql=parent%3D123456789",
    },
}

MOCK_PAGE_RESPONSE = {
    "id": "987654321",
    "type": "page",
    "status": "current",
    "title": "Example Meeting Notes",
    "space": {
        "id": 11111111,
        "key": "PROJ",
        "alias": "PROJ",
        "name": "Project Space",
        "type": "global",
        "status": "current",
        "_expandable": {
            "settings": "/rest/api/space/PROJ/settings",
            "metadata": "",
            "operations": "",
            "lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=PROJ",
            "identifiers": "",
            "permissions": "",
            "roles": "",
            "icon": "",
            "description": "",
            "theme": "/rest/api/space/PROJ/theme",
            "history": "",
            "homepage": "/rest/api/content/11111111",
        },
        "_links": {
            "webui": "/spaces/PROJ",
            "self": "https://example.atlassian.net/wiki/rest/api/space/PROJ",
        },
    },
    "version": {
        "by": {
            "type": "known",
            "accountId": "user123",
            "accountType": "atlassian",
            "email": "",
            "publicName": "Example User (Unlicensed)",
            "profilePicture": {
                "path": "/wiki/aa-avatar/user123",
                "width": 48,
                "height": 48,
                "isDefault": False,
            },
            "displayName": "Example User (Unlicensed)",
            "isExternalCollaborator": False,
            "isGuest": False,
            "locale": "en_US",
            "accountStatus": "active",
            "_expandable": {"operations": "", "personalSpace": ""},
            "_links": {
                "self": "https://example.atlassian.net/wiki/rest/api/user?accountId=user123"
            },
        },
        "when": "2024-01-01T09:00:00.000Z",
        "friendlyWhen": "Jan 01, 2024",
        "message": "",
        "number": 1,
        "minorEdit": False,
        "ncsStepVersion": "1234",
        "ncsStepVersionSource": "ncs-ack",
        "confRev": "confluence$content$987654321.1",
        "contentTypeModified": False,
        "_expandable": {"collaborators": "", "content": "/rest/api/content/987654321"},
        "_links": {
            "self": "https://example.atlassian.net/wiki/rest/api/content/987654321/version/1"
        },
    },
    "children": {
        "attachment": {
            "results": [
                {
                    "id": "att105348",
                    "type": "attachment",
                    "status": "current",
                    "title": "random_geometric_image.svg",
                    "extensions": {"mediaType": "application/binary", "fileSize": 1098},
                },
                {
                    "id": "att9535345",
                    "type": "attachment",
                    "status": "current",
                    "title": "stockmaster-architecture.svg",
                    "extensions": {"mediaType": "application/binary", "fileSize": 6186},
                },
            ]
        }
    },
    "body": {
        "storage": {
            "value": '<h2><ac:emoticon ac:name="blue-star" />&nbsp;Date</h2><p><time datetime="2024-01-01" /></p><h2><ac:emoticon ac:name="blue-star" />&nbsp;Participants</h2><ul><li><p><ac:link><ri:user ri:account-id="user123" /></ac:link></p></li></ul><h2><ac:emoticon ac:name="blue-star" />&nbsp;Goals</h2><ul><li><p>Example goal</p></li></ul>'
            '<p><ac:structured-macro ac:name="profile">'
            '<ac:parameter ac:name="user">'
            '<ri:user ri:account-id="user123" />'
            "</ac:parameter>"
            "</ac:structured-macro></p>",
            "representation": "storage",
            "embeddedContent": [],
            "_expandable": {"content": "/rest/api/content/987654321"},
        },
        "_expandable": {
            "editor": "",
            "atlas_doc_format": "",
            "view": "",
            "export_view": "",
            "styled_view": "",
            "dynamic": "",
            "editor2": "",
            "anonymous_export_view": "",
        },
    },
    "macroRenderedOutput": {},
    "extensions": {"position": 123456789},
    "_expandable": {
        "childTypes": "",
        "container": "/rest/api/space/PROJ",
        "schedulePublishInfo": "",
        "metadata": "",
        "operations": "",
        "schedulePublishDate": "",
        "children": "/rest/api/content/987654321/child",
        "restrictions": "/rest/api/content/987654321/restriction/byOperation",
        "history": "/rest/api/content/987654321/history",
        "ancestors": "",
        "descendants": "/rest/api/content/987654321/descendant",
    },
    "_links": {
        "editui": "/pages/resumedraft.action?draftId=987654321",
        "webui": "/spaces/PROJ/pages/987654321/Example+Meeting+Notes",
        "edituiv2": "/spaces/PROJ/pages/edit-v2/987654321",
        "context": "/wiki",
        "self": "https://example.atlassian.net/wiki/rest/api/content/987654321",
        "tinyui": "/x/AbCdE",
        "collection": "/rest/api/content",
        "base": "https://example.atlassian.net/wiki",
    },
}


MOCK_COMMENTS_RESPONSE = {
    "results": [
        {
            "id": "456789123",
            "type": "comment",
            "status": "current",
            "title": "Re: Technical Design Document",
            "version": {
                "by": {
                    "type": "known",
                    "accountId": "712020:abc123-def456-ghi789",
                    "accountType": "atlassian",
                    "email": "",
                    "publicName": "John Doe",
                    "profilePicture": {
                        "path": "/wiki/aa-avatar/712020:abc123-def456-ghi789",
                        "width": 48,
                        "height": 48,
                        "isDefault": False,
                    },
                    "displayName": "John Doe",
                    "isExternalCollaborator": False,
                    "isGuest": False,
                    "locale": "en_US",
                    "accountStatus": "active",
                    "_expandable": {"operations": "", "personalSpace": ""},
                    "_links": {
                        "self": "https://company.atlassian.net/wiki/rest/api/user?accountId=712020:abc123-def456-ghi789"
                    },
                },
                "when": "2024-01-01T10:00:00.000Z",
                "friendlyWhen": "Jan 1, 2024",
                "message": "",
                "number": 1,
                "minorEdit": False,
                "contentTypeModified": False,
                "_expandable": {
                    "collaborators": "",
                    "content": "/rest/api/content/456789123",
                },
                "_links": {
                    "self": "https://company.atlassian.net/wiki/rest/api/content/456789123/version/1"
                },
            },
            "macroRenderedOutput": {},
            "body": {
                "view": {
                    "value": "<p>Comment content here</p>",
                    "representation": "view",
                    "_expandable": {
                        "webresource": "",
                        "embeddedContent": "",
                        "mediaToken": "",
                        "content": "/rest/api/content/456789123",
                    },
                },
                "_expandable": {
                    "editor": "",
                    "atlas_doc_format": "",
                    "export_view": "",
                    "styled_view": "",
                    "dynamic": "",
                    "storage": "",
                    "editor2": "",
                    "anonymous_export_view": "",
                },
            },
            "extensions": {
                "location": "inline",
                "_expandable": {"inlineProperties": "", "resolution": ""},
            },
            "_expandable": {
                "childTypes": "",
                "container": "/rest/api/content/123456789",
                "schedulePublishInfo": "",
                "metadata": "",
                "operations": "",
                "schedulePublishDate": "",
                "children": "/rest/api/content/456789123/child",
                "restrictions": "/rest/api/content/456789123/restriction/byOperation",
                "history": "/rest/api/content/456789123/history",
                "ancestors": "",
                "descendants": "/rest/api/content/456789123/descendant",
                "space": "/rest/api/space/TEAM",
            },
            "_links": {
                "webui": "/spaces/TEAM/pages/123456789/Technical+Design+Document?focusedCommentId=456789123",
                "self": "https://company.atlassian.net/wiki/rest/api/content/456789123",
            },
        }
    ]
}

MOCK_LABELS_RESPONSE = {
    "results": [
        {
            "id": "456789123",
            "prefix": "global",
            "name": "meeting-notes",
            "label": "meeting-notes",
        },
        {
            "id": "456789124",
            "prefix": "my",
            "name": "important",
        },
        {
            "id": "456789125",
            "name": "test",
        },
    ],
    "start": 0,
    "limit": 200,
    "size": 3,
    "_links": {
        "self": "https://company.atlassian.net/wiki/rest/api/content/456789123",
        "base": "https://company.atlassian.net",
        "context": "",
    },
}

MOCK_SPACES_RESPONSE = {
    "results": [
        {
            "id": 11111111,
            "key": "~user1",
            "alias": "~user1",
            "name": "User One",
            "type": "personal",
            "status": "archived",
            "_expandable": {
                "settings": "/rest/api/space/~user1/settings",
                "metadata": "",
                "operations": "",
                "lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=~user1",
                "identifiers": "",
                "permissions": "",
                "roles": "",
                "icon": "",
                "description": "",
                "theme": "/rest/api/space/~user1/theme",
                "history": "",
                "homepage": "/rest/api/content/11111112",
            },
            "_links": {
                "webui": "/spaces/~user1",
                "self": "https://example.atlassian.net/wiki/rest/api/space/~user1",
            },
        },
        {
            "id": 22222222,
            "key": "PROJ",
            "alias": "PROJ",
            "name": "Project Space",
            "type": "global",
            "status": "current",
            "_expandable": {
                "settings": "/rest/api/space/PROJ/settings",
                "metadata": "",
                "operations": "",
                "lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=PROJ",
                "identifiers": "",
                "permissions": "",
                "roles": "",
                "icon": "",
                "description": "",
                "theme": "/rest/api/space/PROJ/theme",
                "history": "",
                "homepage": "/rest/api/content/22222223",
            },
            "_links": {
                "webui": "/spaces/PROJ",
                "self": "https://example.atlassian.net/wiki/rest/api/space/PROJ",
            },
        },
    ],
    "start": 0,
    "limit": 5,
    "size": 2,
    "_links": {
        "base": "https://example.atlassian.net/wiki",
        "context": "/wiki",
        "next": "/rest/api/space?next=true&limit=5&start=5",
        "self": "https://example.atlassian.net/wiki/rest/api/space",
    },
}

MOCK_PAGES_FROM_SPACE_RESPONSE = [
    {
        "id": "123456789",
        "type": "page",
        "status": "current",
        "title": "Sample Research Paper Title",
        "macroRenderedOutput": {},
        "body": {
            "storage": {
                "value": "<h3>[Date]</h3><p><em>Result</em>: Sample Result</p><p><em>Reviews</em>: </p><p>Sample content with various formatting</p>",
                "representation": "storage",
                "embeddedContent": [],
                "_expandable": {"content": "/rest/api/content/123456789"},
            },
            "_expandable": {
                "editor": "",
                "atlas_doc_format": "",
                "view": "",
                "export_view": "",
                "styled_view": "",
                "dynamic": "",
                "editor2": "",
                "anonymous_export_view": "",
            },
        },
        "extensions": {"position": 123456789},
        "_expandable": {
            "container": "/rest/api/space/PROJ",
            "metadata": "",
            "restrictions": "/rest/api/content/123456789/restriction/byOperation",
            "history": "/rest/api/content/123456789/history",
            "version": "",
            "descendants": "/rest/api/content/123456789/descendant",
            "space": "/rest/api/space/PROJ",
            "childTypes": "",
            "schedulePublishInfo": "",
            "operations": "",
            "schedulePublishDate": "",
            "children": "/rest/api/content/123456789/child",
            "ancestors": "",
        },
        "_links": {
            "self": "https://example.atlassian.net/wiki/rest/api/content/123456789",
            "tinyui": "/x/ABC123",
            "editui": "/pages/resumedraft.action?draftId=123456789",
            "webui": "/spaces/PROJ/pages/123456789/Sample+Research+Paper+Title",
            "edituiv2": "/spaces/PROJ/pages/edit-v2/123456789",
        },
    },
    {
        "id": "987654321",
        "type": "page",
        "status": "current",
        "title": "Example Meeting Notes",
        "space": {
            "key": "PROJ",
            "name": "Project Space",
            "type": "global",
            "status": "current",
        },
        "macroRenderedOutput": {},
        "body": {
            "storage": {
                "value": "<h2>Meeting Agenda</h2><p>Example content</p>",
                "representation": "storage",
                "embeddedContent": [],
                "_expandable": {"content": "/rest/api/content/987654321"},
            },
            "_expandable": {
                "editor": "",
                "atlas_doc_format": "",
                "view": "",
                "export_view": "",
                "styled_view": "",
                "dynamic": "",
                "editor2": "",
                "anonymous_export_view": "",
            },
        },
        "extensions": {"position": 987654321},
        "_links": {
            "webui": "/spaces/PROJ/pages/987654321/Example+Meeting+Notes",
            "self": "https://example.atlassian.net/wiki/rest/api/content/987654321",
        },
    },
]

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/common.py:
--------------------------------------------------------------------------------

```python
"""
Common Jira entity models.

This module provides Pydantic models for common Jira entities like users, statuses,
issue types, priorities, attachments, and time tracking.
"""

import logging
from datetime import datetime
from typing import Any

from pydantic import Field

from mcp_atlassian.utils import parse_date

from ..base import ApiModel, TimestampMixin
from ..constants import (
    EMPTY_STRING,
    JIRA_DEFAULT_ID,
    NONE_VALUE,
    UNASSIGNED,
    UNKNOWN,
)

logger = logging.getLogger(__name__)


class JiraUser(ApiModel):
    """
    Model representing a Jira user.
    """

    account_id: str | None = None
    display_name: str = UNASSIGNED
    email: str | None = None
    active: bool = True
    avatar_url: str | None = None
    time_zone: str | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraUser":
        """
        Create a JiraUser from a Jira API response.

        Args:
            data: The user data from the Jira API

        Returns:
            A JiraUser instance
        """
        if not data:
            return cls()

        # Handle non-dictionary data by returning a default instance
        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        avatar_url = None
        if avatars := data.get("avatarUrls"):
            if isinstance(avatars, dict):
                # Get the largest available avatar (48x48)
                avatar_url = avatars.get("48x48")
            else:
                logger.debug(f"Unexpected avatar data format: {type(avatars)}")

        return cls(
            account_id=data.get("accountId"),
            display_name=str(data.get("displayName", UNASSIGNED)),
            email=data.get("emailAddress"),
            active=bool(data.get("active", True)),
            avatar_url=avatar_url,
            time_zone=data.get("timeZone"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        return {
            "display_name": self.display_name,
            "name": self.display_name,  # Add name for backward compatibility
            "email": self.email,
            "avatar_url": self.avatar_url,
        }


class JiraStatusCategory(ApiModel):
    """
    Model representing a Jira status category.
    """

    id: int = 0
    key: str = EMPTY_STRING
    name: str = UNKNOWN
    color_name: str = EMPTY_STRING

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "JiraStatusCategory":
        """
        Create a JiraStatusCategory from a Jira API response.

        Args:
            data: The status category data from the Jira API

        Returns:
            A JiraStatusCategory instance
        """
        if not data:
            return cls()

        # Handle non-dictionary data by returning a default instance
        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Safely get and convert fields, handling potential type mismatches
        id_value = data.get("id", 0)
        try:
            # Ensure id is an integer
            id_value = int(id_value) if id_value is not None else 0
        except (ValueError, TypeError):
            id_value = 0

        return cls(
            id=id_value,
            key=str(data.get("key", EMPTY_STRING)),
            name=str(data.get("name", UNKNOWN)),
            color_name=str(data.get("colorName", EMPTY_STRING)),
        )


class JiraStatus(ApiModel):
    """
    Model representing a Jira issue status.
    """

    id: str = JIRA_DEFAULT_ID
    name: str = UNKNOWN
    description: str | None = None
    icon_url: str | None = None
    category: JiraStatusCategory | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraStatus":
        """
        Create a JiraStatus from a Jira API response.

        Args:
            data: The status data from the Jira API

        Returns:
            A JiraStatus instance
        """
        if not data:
            return cls()

        # Handle non-dictionary data by returning a default instance
        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        category = None
        category_data = data.get("statusCategory")
        if category_data:
            category = JiraStatusCategory.from_api_response(category_data)

        # Ensure ID is a string (API sometimes returns integers)
        status_id = data.get("id", JIRA_DEFAULT_ID)
        if status_id is not None:
            status_id = str(status_id)

        return cls(
            id=status_id,
            name=str(data.get("name", UNKNOWN)),
            description=data.get("description"),
            icon_url=data.get("iconUrl"),
            category=category,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "name": self.name,
        }

        if self.category:
            result["category"] = self.category.name
            result["color"] = self.category.color_name

        return result


class JiraIssueType(ApiModel):
    """
    Model representing a Jira issue type.
    """

    id: str = JIRA_DEFAULT_ID
    name: str = UNKNOWN
    description: str | None = None
    icon_url: str | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssueType":
        """
        Create a JiraIssueType from a Jira API response.

        Args:
            data: The issue type data from the Jira API

        Returns:
            A JiraIssueType instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        issue_type_id = data.get("id", JIRA_DEFAULT_ID)
        if issue_type_id is not None:
            issue_type_id = str(issue_type_id)

        return cls(
            id=issue_type_id,
            name=str(data.get("name", UNKNOWN)),
            description=data.get("description"),
            icon_url=data.get("iconUrl"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        return {"name": self.name}


class JiraPriority(ApiModel):
    """
    Model representing a Jira priority.
    """

    id: str = JIRA_DEFAULT_ID
    name: str = NONE_VALUE
    description: str | None = None
    icon_url: str | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraPriority":
        """
        Create a JiraPriority from a Jira API response.

        Args:
            data: The priority data from the Jira API

        Returns:
            A JiraPriority instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        priority_id = data.get("id", JIRA_DEFAULT_ID)
        if priority_id is not None:
            priority_id = str(priority_id)

        return cls(
            id=priority_id,
            name=str(data.get("name", NONE_VALUE)),
            description=data.get("description"),
            icon_url=data.get("iconUrl"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        return {"name": self.name}


class JiraAttachment(ApiModel):
    """
    Model representing a Jira issue attachment.

    This model contains information about files attached to Jira issues,
    including the filename, size, content type, and download URL.
    """

    id: str = JIRA_DEFAULT_ID
    filename: str = EMPTY_STRING
    size: int = 0
    content_type: str | None = None
    created: str = EMPTY_STRING
    author: JiraUser | None = None
    url: str | None = None
    thumbnail_url: str | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraAttachment":
        """
        Create a JiraAttachment from a Jira API response.

        Args:
            data: The attachment data from the Jira API

        Returns:
            A JiraAttachment instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        author = None
        author_data = data.get("author")
        if author_data:
            author = JiraUser.from_api_response(author_data)

        attachment_id = data.get("id", JIRA_DEFAULT_ID)
        if attachment_id is not None:
            attachment_id = str(attachment_id)

        size = data.get("size", 0)
        try:
            size = int(size) if size is not None else 0
        except (ValueError, TypeError):
            size = 0

        return cls(
            id=attachment_id,
            filename=str(data.get("filename", EMPTY_STRING)),
            size=size,
            content_type=data.get("mimeType"),
            created=str(data.get("created", EMPTY_STRING)),
            author=author,
            url=data.get("content"),  # This is actually the download URL
            thumbnail_url=data.get("thumbnail"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "filename": self.filename,
            "size": self.size,
            "url": self.url,
        }

        if self.content_type:
            result["content_type"] = self.content_type

        if self.author:
            result["author"] = self.author.to_simplified_dict()

        if self.thumbnail_url:
            result["thumbnail_url"] = self.thumbnail_url

        if self.created:
            result["created"] = self.created

        return result


class JiraTimetracking(ApiModel):
    """
    Model for Jira timetracking information.

    This represents time estimates and spent time.
    """

    original_estimate: str | None = None
    remaining_estimate: str | None = None
    time_spent: str | None = None
    original_estimate_seconds: int | None = None
    remaining_estimate_seconds: int | None = None
    time_spent_seconds: int | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "JiraTimetracking":
        """
        Create a JiraTimetracking from a Jira API response.

        Args:
            data: The timetracking data from the Jira API
            **kwargs: Additional arguments (not used)

        Returns:
            A JiraTimetracking instance
        """
        if not data:
            return cls()

        if not isinstance(data, dict):
            return cls()

        return cls(
            original_estimate=data.get("originalEstimate"),
            remaining_estimate=data.get("remainingEstimate"),
            time_spent=data.get("timeSpent"),
            original_estimate_seconds=data.get("originalEstimateSeconds"),
            remaining_estimate_seconds=data.get("remainingEstimateSeconds"),
            time_spent_seconds=data.get("timeSpentSeconds"),
        )

    def to_simplified_dict(self) -> dict[str, str | int | None]:
        """Convert to simplified dictionary for API response."""
        result = {}

        if self.original_estimate:
            result["original_estimate"] = self.original_estimate
        if self.remaining_estimate:
            result["remaining_estimate"] = self.remaining_estimate
        if self.time_spent:
            result["time_spent"] = self.time_spent

        return result


class JiraResolution(ApiModel):
    """Model representing a Jira issue resolution."""

    id: str = JIRA_DEFAULT_ID
    name: str = UNKNOWN
    description: str | None = None

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraResolution":
        """Create a JiraResolution from a Jira API response."""
        if not isinstance(data, dict):
            return cls()
        resolution_id = data.get("id", JIRA_DEFAULT_ID)
        return cls(
            id=str(resolution_id),
            name=data.get("name", UNKNOWN),
            description=data.get("description"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {"name": self.name}
        if self.id != JIRA_DEFAULT_ID:
            result["id"] = self.id
        return result


class JiraChangelogItem(ApiModel):
    """
    Model representing a single change item within a changelog entry.
    Each change item represents a field that was modified, including
    its previous and new values.
    """

    field: str = EMPTY_STRING
    fieldtype: str = EMPTY_STRING
    from_string: str | None = None
    to_string: str | None = None
    from_id: str | None = None
    to_id: str | None = None

    @classmethod
    def from_api_response(
        cls, data: dict[str, Any], **kwargs: Any
    ) -> "JiraChangelogItem":
        """
        Create a JiraChangeItem from a Jira API response.
        Args:
            data: The change item data from the Jira API
        Returns:
            A JiraChangeItem instance
        """
        if not data or not isinstance(data, dict):
            return cls()

        return cls(
            field=str(data.get("field", EMPTY_STRING)),
            fieldtype=str(data.get("fieldtype", EMPTY_STRING)),
            from_string=data.get("fromString"),
            to_string=data.get("toString"),
            from_id=data.get("from"),
            to_id=data.get("to"),
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result = {
            "field": self.field,
            "fieldtype": self.fieldtype,
        }

        if self.from_string is not None:
            result["from_string"] = self.from_string

        if self.to_string is not None:
            result["to_string"] = self.to_string

        if self.from_id is not None:
            result["from_id"] = self.from_id

        if self.to_id is not None:
            result["to_id"] = self.to_id

        return result


class JiraChangelog(ApiModel, TimestampMixin):
    """
    Model representing a Jira issue changelog entry.
    A changelog entry represents a set of changes made to an issue at a specific time,
    including who made the changes and what was changed.
    """

    id: str = JIRA_DEFAULT_ID
    author: JiraUser | None = None
    created: datetime | None = None
    items: list[JiraChangelogItem] = Field(default_factory=list)

    @classmethod
    def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraChangelog":
        """
        Create a JiraChangelog from a Jira API response.
        Args:
            data: The changelog data from the Jira API
        Returns:
            A JiraChangelog instance
        """
        if not data:
            return cls()

        # Handle non-dictionary data by returning a default instance
        if not isinstance(data, dict):
            logger.debug("Received non-dictionary data, returning default instance")
            return cls()

        # Extract author data
        author = None
        author_data = data.get("author")
        if author_data:
            author = JiraUser.from_api_response(author_data)

        # Ensure ID is a string
        changelog_id = data.get("id", JIRA_DEFAULT_ID)
        if changelog_id is not None:
            changelog_id = str(changelog_id)

        # Process change items
        items = []
        items_data = data.get("items", [])
        if isinstance(items_data, list):
            for item_data in items_data:
                item = JiraChangelogItem.from_api_response(item_data)
                items.append(item)

        # Process created date
        created: datetime | None = None
        created_data = data.get("created")
        if created_data:
            created = parse_date(created_data)

        return cls(
            id=changelog_id,
            author=author,
            created=created,
            items=items,
        )

    def to_simplified_dict(self) -> dict[str, Any]:
        """Convert to simplified dictionary for API response."""
        result: dict[str, Any] = {}

        if self.items:
            result["items"] = [item.to_simplified_dict() for item in self.items]

        if self.author:
            result["author"] = self.author.to_simplified_dict()

        if self.created:
            result["created"] = str(self.created)

        return result

```
Page 4/10FirstPrevNextLast