#
tokens: 38069/50000 3/194 files (page 9/10)
lines: off (toggle) GitHub
raw markdown copy
This is page 9 of 10. Use http://codebase.md/sooperset/mcp-atlassian?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/test_real_api_validation.py:
--------------------------------------------------------------------------------

```python
"""
Test file for validating the refactored FastMCP tools with real API data.

This test file connects to real Jira and Confluence instances to validate
that our model refactoring works correctly with actual API data.

These tests will be skipped if the required environment variables are not set
or if the --use-real-data flag is not passed to pytest.

To run these tests:
    pytest tests/test_real_api_validation.py --use-real-data

Required environment variables:
    - JIRA_URL, JIRA_USERNAME, JIRA_API_TOKEN
    - CONFLUENCE_URL, CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN
    - JIRA_TEST_ISSUE_KEY, JIRA_TEST_EPIC_KEY
    - CONFLUENCE_TEST_PAGE_ID, JIRA_TEST_PROJECT_KEY, CONFLUENCE_TEST_SPACE_KEY, CONFLUENCE_TEST_SPACE_KEY
"""

import datetime
import json
import os
import uuid
from collections.abc import Callable, Generator, Sequence

import pytest
from fastmcp import Client
from fastmcp.client import FastMCPTransport
from mcp.types import TextContent

from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.comments import CommentsMixin as ConfluenceCommentsMixin
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.confluence.labels import LabelsMixin as ConfluenceLabelsMixin
from mcp_atlassian.confluence.pages import PagesMixin
from mcp_atlassian.confluence.search import SearchMixin as ConfluenceSearchMixin
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.jira.links import LinksMixin
from mcp_atlassian.models.confluence import (
    ConfluenceComment,
    ConfluenceLabel,
    ConfluencePage,
)
from mcp_atlassian.models.jira import JiraIssueLinkType
from mcp_atlassian.servers import main_mcp


# Resource tracking for cleanup
class ResourceTracker:
    """Tracks resources created during tests for cleanup."""

    def __init__(self):
        self.jira_issues: list[str] = []
        self.confluence_pages: list[str] = []
        self.confluence_comments: list[str] = []
        self.jira_comments: list[str] = []

    def add_jira_issue(self, issue_key: str) -> None:
        """Track a Jira issue for later cleanup."""
        self.jira_issues.append(issue_key)

    def add_confluence_page(self, page_id: str) -> None:
        """Track a Confluence page for later cleanup."""
        self.confluence_pages.append(page_id)

    def add_confluence_comment(self, comment_id: str) -> None:
        """Track a Confluence comment for later cleanup."""
        self.confluence_comments.append(comment_id)

    def add_jira_comment(self, issue_key: str, comment_id: str) -> None:
        """Track a Jira comment for later cleanup."""
        self.jira_comments.append((issue_key, comment_id))

    def cleanup(
        self,
        jira_client: JiraFetcher | None = None,
        confluence_client: ConfluenceFetcher | None = None,
    ) -> None:
        """Clean up all tracked resources."""
        if jira_client:
            for issue_key, comment_id in self.jira_comments:
                try:
                    jira_client.delete_comment(issue_key, comment_id)
                    print(f"Deleted Jira comment {comment_id} from issue {issue_key}")
                except Exception as e:
                    print(f"Failed to delete Jira comment {comment_id}: {e}")

            for issue_key in self.jira_issues:
                try:
                    jira_client.delete_issue(issue_key)
                    print(f"Deleted Jira issue {issue_key}")
                except Exception as e:
                    print(f"Failed to delete Jira issue {issue_key}: {e}")

        if confluence_client:
            for comment_id in self.confluence_comments:
                try:
                    confluence_client.delete_comment(comment_id)
                    print(f"Deleted Confluence comment {comment_id}")
                except Exception as e:
                    print(f"Failed to delete Confluence comment {comment_id}: {e}")

            for page_id in self.confluence_pages:
                try:
                    confluence_client.delete_page(page_id)
                    print(f"Deleted Confluence page {page_id}")
                except Exception as e:
                    print(f"Failed to delete Confluence page {page_id}: {e}")


@pytest.fixture
def jira_config() -> JiraConfig:
    """Create a JiraConfig from environment variables."""
    return JiraConfig.from_env()


@pytest.fixture
def confluence_config() -> ConfluenceConfig:
    """Create a ConfluenceConfig from environment variables."""
    return ConfluenceConfig.from_env()


@pytest.fixture
def jira_client(jira_config: JiraConfig) -> JiraFetcher:
    """Create a JiraFetcher instance."""
    return JiraFetcher(config=jira_config)


@pytest.fixture
def confluence_client(confluence_config: ConfluenceConfig) -> ConfluenceFetcher:
    """Create a ConfluenceFetcher instance."""
    return ConfluenceFetcher(config=confluence_config)


@pytest.fixture
def test_issue_key() -> str:
    """Get test issue key from environment."""
    issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY")
    if not issue_key:
        pytest.skip("JIRA_TEST_ISSUE_KEY environment variable not set")
    return issue_key


@pytest.fixture
def test_epic_key() -> str:
    """Get test epic key from environment."""
    epic_key = os.environ.get("JIRA_TEST_EPIC_KEY")
    if not epic_key:
        pytest.skip("JIRA_TEST_EPIC_KEY environment variable not set")
    return epic_key


@pytest.fixture
def test_page_id() -> str:
    """Get test Confluence page ID from environment."""
    page_id = os.environ.get("CONFLUENCE_TEST_PAGE_ID")
    if not page_id:
        pytest.skip("CONFLUENCE_TEST_PAGE_ID environment variable not set")
    return page_id


@pytest.fixture
def test_project_key() -> str:
    """Get test Jira project key from environment."""
    project_key = os.environ.get("JIRA_TEST_PROJECT_KEY")
    if not project_key:
        pytest.skip("JIRA_TEST_PROJECT_KEY environment variable not set")
    return project_key


@pytest.fixture
def test_space_key() -> str:
    """Get test Confluence space key from environment."""
    space_key = os.environ.get("CONFLUENCE_TEST_SPACE_KEY")
    if not space_key:
        pytest.skip("CONFLUENCE_TEST_SPACE_KEY environment variable not set")
    return space_key


@pytest.fixture
def test_board_id() -> str:
    """Get test Jira board ID from environment."""
    board_id = os.environ.get("JIRA_TEST_BOARD_ID")
    if not board_id:
        pytest.skip("JIRA_TEST_BOARD_ID environment variable not set")
    return board_id


@pytest.fixture
def resource_tracker() -> Generator[ResourceTracker, None, None]:
    """Create and yield a ResourceTracker that will be used to clean up after tests."""
    tracker = ResourceTracker()
    yield tracker


@pytest.fixture
def cleanup_resources(
    resource_tracker: ResourceTracker,
    jira_client: JiraFetcher,
    confluence_client: ConfluenceFetcher,
) -> Callable[[], None]:
    """Return a function that can be called to clean up resources."""

    def _cleanup():
        resource_tracker.cleanup(
            jira_client=jira_client, confluence_client=confluence_client
        )

    return _cleanup


# Only use asyncio backend for anyio tests
pytestmark = pytest.mark.anyio(backends=["asyncio"])


@pytest.fixture(scope="class")
async def api_validation_client():
    """Provides a FastMCP client connected to the main server for tool calls."""
    transport = FastMCPTransport(main_mcp)
    client = Client(transport=transport)
    async with client as connected_client:
        yield connected_client


async def call_tool(
    client: Client, tool_name: str, arguments: dict
) -> list[TextContent]:
    """Helper function to call tools via the client."""
    return await client.call_tool(tool_name, arguments)


class TestRealJiraValidation:
    """
    Test class for validating Jira models with real API data.

    These tests will be skipped if:
    1. The --use-real-data flag is not passed to pytest
    2. The required Jira environment variables are not set
    """

    @pytest.mark.anyio
    async def test_get_issue(self, use_real_jira_data, api_validation_client):
        """Test that get_issue returns a proper JiraIssue model."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY", "TES-143")

        result_content = await call_tool(
            api_validation_client, "jira_get_issue", {"issue_key": issue_key}
        )

        assert result_content and isinstance(result_content[0], TextContent)
        issue_data = json.loads(result_content[0].text)

        assert isinstance(issue_data, dict)
        assert issue_data.get("key") == issue_key
        assert "id" in issue_data
        assert "summary" in issue_data

    @pytest.mark.anyio
    async def test_search_issues(self, use_real_jira_data, api_validation_client):
        """Test that search_issues returns JiraIssue models."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        jql = 'project = "TES" ORDER BY created DESC'
        result_content = await call_tool(
            api_validation_client, "jira_search", {"jql": jql, "limit": 5}
        )

        assert result_content and isinstance(result_content[0], TextContent)
        search_data = json.loads(result_content[0].text)

        assert isinstance(search_data, dict)
        assert "issues" in search_data
        assert isinstance(search_data["issues"], list)
        assert len(search_data["issues"]) > 0

        for issue_dict in search_data["issues"]:
            assert isinstance(issue_dict, dict)
            assert "key" in issue_dict
            assert "id" in issue_dict
            assert "summary" in issue_dict

    @pytest.mark.anyio
    async def test_get_issue_comments(self, use_real_jira_data, api_validation_client):
        """Test that issue comments are properly converted to JiraComment models."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY", "TES-143")

        result_content = await call_tool(
            api_validation_client,
            "jira_get_issue",
            {"issue_key": issue_key, "fields": "comment", "comment_limit": 5},
        )
        issue_data = json.loads(result_content[0].text)

        for comment in issue_data["comments"]:
            assert isinstance(comment, dict)
            assert "body" in comment or "author" in comment


class TestRealConfluenceValidation:
    """
    Test class for validating Confluence models with real API data.

    These tests will be skipped if:
    1. The --use-real-data flag is not passed to pytest
    2. The required Confluence environment variables are not set
    """

    def test_get_page_content(self, use_real_confluence_data, test_page_id):
        """Test that get_page_content returns a proper ConfluencePage model."""
        if not use_real_confluence_data:
            pytest.skip("Real Confluence data testing is disabled")

        config = ConfluenceConfig.from_env()
        pages_client = PagesMixin(config=config)

        page = pages_client.get_page_content(test_page_id)

        assert isinstance(page, ConfluencePage)
        assert page.id == test_page_id
        assert page.title is not None
        assert page.content is not None

        assert page.space is not None
        assert page.space.key is not None

        assert page.content_format in ["storage", "view", "markdown"]

    def test_get_page_comments(self, use_real_confluence_data, test_page_id):
        """Test that page comments are properly converted to ConfluenceComment models."""
        if not use_real_confluence_data:
            pytest.skip("Real Confluence data testing is disabled")

        config = ConfluenceConfig.from_env()
        comments_client = ConfluenceCommentsMixin(config=config)

        comments = comments_client.get_page_comments(test_page_id)

        if len(comments) == 0:
            pytest.skip("Test page has no comments")

        for comment in comments:
            assert isinstance(comment, ConfluenceComment)
            assert comment.id is not None
            assert comment.body is not None

    def test_get_page_labels(self, use_real_confluence_data, test_page_id):
        """Test that page labels are properly converted to ConfluenceLabel models."""
        if not use_real_confluence_data:
            pytest.skip("Real Confluence data testing is disabled")

        config = ConfluenceConfig.from_env()
        labels_client = ConfluenceLabelsMixin(config=config)

        labels = labels_client.get_page_labels(test_page_id)

        if len(labels) == 0:
            pytest.skip("Test page has no labels")

        for label in labels:
            assert isinstance(label, ConfluenceLabel)
            assert label.id is not None
            assert label.name is not None

    def test_search_content(self, use_real_confluence_data):
        """Test that search returns ConfluencePage models."""
        if not use_real_confluence_data:
            pytest.skip("Real Confluence data testing is disabled")

        config = ConfluenceConfig.from_env()
        search_client = ConfluenceSearchMixin(config=config)

        cql = 'type = "page" ORDER BY created DESC'
        results = search_client.search(cql, limit=5)

        assert len(results) > 0
        for page in results:
            assert isinstance(page, ConfluencePage)
            assert page.id is not None
            assert page.title is not None


@pytest.mark.anyio
async def test_jira_get_issue(jira_client: JiraFetcher, test_issue_key: str) -> None:
    """Test retrieving an issue from Jira."""
    issue = jira_client.get_issue(test_issue_key)

    assert issue is not None
    assert issue.key == test_issue_key
    assert hasattr(issue, "fields") or hasattr(issue, "summary")


@pytest.mark.anyio
async def test_jira_get_issue_with_fields(
    jira_client: JiraFetcher, test_issue_key: str
) -> None:
    """Test retrieving a Jira issue with specific fields."""
    full_issue = jira_client.get_issue(test_issue_key)
    assert full_issue is not None

    limited_issue = jira_client.get_issue(
        test_issue_key, fields="summary,description,customfield_*"
    )

    assert limited_issue is not None
    assert limited_issue.key == test_issue_key
    assert limited_issue.summary is not None

    full_data = full_issue.to_simplified_dict()
    limited_data = limited_issue.to_simplified_dict()

    assert "key" in full_data and "key" in limited_data
    assert "id" in full_data and "id" in limited_data
    assert "summary" in full_data and "summary" in limited_data

    assert "description" in limited_data

    custom_fields_found = False
    for field in limited_data:
        if field.startswith("customfield_"):
            custom_fields_found = True
            break

    if "assignee" in full_data and "assignee" not in limited_data:
        assert "assignee" not in limited_data
    if "status" in full_data and "status" not in limited_data:
        assert "status" not in limited_data

    list_fields_issue = jira_client.get_issue(
        test_issue_key, fields=["summary", "status"]
    )

    assert list_fields_issue is not None
    list_data = list_fields_issue.to_simplified_dict()
    assert "summary" in list_data
    if "status" in full_data:
        assert "status" in list_data


@pytest.mark.anyio
async def test_jira_get_epic_issues(
    jira_client: JiraFetcher, test_epic_key: str
) -> None:
    """Test retrieving issues linked to an epic from Jira."""
    issues = jira_client.get_epic_issues(test_epic_key)

    assert isinstance(issues, list)

    if issues:
        for issue in issues:
            assert hasattr(issue, "key")
            assert hasattr(issue, "id")


@pytest.mark.anyio
async def test_confluence_get_page_content(
    confluence_client: ConfluenceFetcher, test_page_id: str
) -> None:
    """Test retrieving a page from Confluence."""
    page = confluence_client.get_page_content(test_page_id)

    assert page is not None
    assert page.id == test_page_id
    assert page.title is not None


@pytest.mark.anyio
async def test_jira_create_issue(
    jira_client: JiraFetcher,
    test_project_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test creating an issue in Jira."""
    test_id = str(uuid.uuid4())[:8]
    summary = f"Test Issue (API Validation) {test_id}"
    description = "This is a test issue created by the API validation tests. It should be automatically deleted."

    try:
        issue = jira_client.create_issue(
            project_key=test_project_key,
            summary=summary,
            description=description,
            issue_type="Task",
        )

        resource_tracker.add_jira_issue(issue.key)

        assert issue is not None
        assert issue.key.startswith(test_project_key)
        assert issue.summary == summary

        retrieved_issue = jira_client.get_issue(issue.key)
        assert retrieved_issue is not None
        assert retrieved_issue.key == issue.key
        assert retrieved_issue.summary == summary
    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_jira_create_subtask(
    jira_client: JiraFetcher,
    test_project_key: str,
    test_issue_key: str,
    test_epic_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test creating a subtask in Jira linked to a specified parent and epic."""
    test_id = str(uuid.uuid4())[:8]
    subtask_summary = f"Subtask Test Issue {test_id}"

    try:
        parent_issue_key = test_issue_key

        subtask_issue = jira_client.create_issue(
            project_key=test_project_key,
            summary=subtask_summary,
            description=f"This is a test subtask linked to parent {parent_issue_key} and epic {test_epic_key}",
            issue_type="Subtask",
            parent=parent_issue_key,
            epic_link=test_epic_key,
        )

        resource_tracker.add_jira_issue(subtask_issue.key)

        assert subtask_issue is not None
        assert subtask_issue.key.startswith(test_project_key)
        assert subtask_issue.summary == subtask_summary

        retrieved_subtask = jira_client.get_issue(subtask_issue.key)
        assert retrieved_subtask is not None
        assert retrieved_subtask.key == subtask_issue.key

        if hasattr(retrieved_subtask, "fields"):
            if hasattr(retrieved_subtask.fields, "parent"):
                assert retrieved_subtask.fields.parent.key == parent_issue_key

            field_ids = jira_client.get_field_ids_to_epic()
            epic_link_field = field_ids.get("epic_link") or field_ids.get("Epic Link")

            if epic_link_field and hasattr(retrieved_subtask.fields, epic_link_field):
                epic_key = getattr(retrieved_subtask.fields, epic_link_field)
                assert epic_key == test_epic_key

        print(
            f"\nCreated subtask {subtask_issue.key} under parent {parent_issue_key} and epic {test_epic_key}"
        )

    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_jira_create_task_with_parent(
    jira_client: JiraFetcher,
    test_project_key: str,
    test_epic_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test creating a task in Jira with a parent issue (non-subtask)."""
    test_id = str(uuid.uuid4())[:8]
    task_summary = f"Task with Parent Test {test_id}"

    try:
        parent_issue_key = test_epic_key

        try:
            task_issue = jira_client.create_issue(
                project_key=test_project_key,
                summary=task_summary,
                description=f"This is a test task linked to parent {parent_issue_key}",
                issue_type="Task",
                parent=parent_issue_key,
            )

            resource_tracker.add_jira_issue(task_issue.key)

            assert task_issue is not None
            assert task_issue.key.startswith(test_project_key)
            assert task_issue.summary == task_summary

            retrieved_task = jira_client.get_issue(task_issue.key)
            assert retrieved_task is not None
            assert retrieved_task.key == task_issue.key

            if hasattr(retrieved_task, "fields"):
                if hasattr(retrieved_task.fields, "parent"):
                    assert retrieved_task.fields.parent.key == parent_issue_key

            print(f"\nCreated task {task_issue.key} with parent {parent_issue_key}")

        except Exception as e:
            if "hierarchy" in str(e).lower():
                pytest.skip(
                    f"Parent-child relationship not allowed by Jira configuration: {str(e)}"
                )
            else:
                raise

    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_jira_create_epic(
    jira_client: JiraFetcher,
    test_project_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """
    Test creating an Epic issue in Jira.

    This test verifies that the create_issue method can handle Epic creation
    properly for any Jira instance, regardless of the specific custom field
    configuration used for Epic Name or other Epic-specific fields.
    """
    # Generate unique identifiers for this test
    test_id = str(uuid.uuid4())[:8]
    epic_summary = f"Test Epic {test_id}"

    try:
        epic_issue = jira_client.create_issue(
            project_key=test_project_key,
            summary=epic_summary,
            description="This is a test epic for validating Epic creation functionality.",
            issue_type="Epic",
        )

        resource_tracker.add_jira_issue(epic_issue.key)

        assert epic_issue is not None
        assert epic_issue.key.startswith(test_project_key)
        assert epic_issue.summary == epic_summary

        print(f"\nTEST PASSED: Successfully created Epic issue {epic_issue.key}")

        retrieved_epic = jira_client.get_issue(epic_issue.key)
        assert retrieved_epic is not None
        assert retrieved_epic.key == epic_issue.key
        assert retrieved_epic.summary == epic_summary

    except Exception as e:
        print(f"\nERROR creating Epic: {str(e)}")

        try:
            print("\n=== Jira Field Information for Debugging ===")
            field_ids = jira_client.get_field_ids_to_epic()
            print(f"Available field IDs: {field_ids}")
        except Exception as error:
            print(f"Error retrieving field IDs: {str(error)}")

        raise
    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_jira_add_comment(
    jira_client: JiraFetcher,
    test_issue_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test adding a comment to a Jira issue."""
    # Generate a unique comment text
    test_id = str(uuid.uuid4())[:8]
    comment_text = f"Test comment from API validation tests {test_id}. This should be automatically deleted."

    try:
        comment = jira_client.add_comment(
            issue_key=test_issue_key, comment=comment_text
        )

        if hasattr(comment, "id"):
            resource_tracker.add_jira_comment(test_issue_key, comment.id)
        elif isinstance(comment, dict) and "id" in comment:
            resource_tracker.add_jira_comment(test_issue_key, comment["id"])

        assert comment is not None

        if hasattr(comment, "body"):
            actual_text = comment.body
        elif hasattr(comment, "content"):
            actual_text = comment.content
        elif isinstance(comment, dict) and "body" in comment:
            actual_text = comment["body"]
        else:
            actual_text = str(comment)

        assert comment_text in actual_text
    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_confluence_create_page(
    confluence_client: ConfluenceFetcher,
    test_space_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test creating a page in Confluence."""
    # Generate a unique title
    test_id = str(uuid.uuid4())[:8]
    title = f"Test Page (API Validation) {test_id}"

    timestamp = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
    content = f"""
    <h1>Test Page</h1>
    <p>This is a test page created by the API validation tests at {timestamp}.</p>
    <p>It should be automatically deleted after the test.</p>
    """

    try:
        try:
            page = confluence_client.create_page(
                space_key=test_space_key, title=title, body=content
            )
        except Exception as e:
            if "permission" in str(e).lower():
                pytest.skip(f"No permission to create pages in space {test_space_key}")
                return
            elif "space" in str(e).lower() and (
                "not found" in str(e).lower() or "doesn't exist" in str(e).lower()
            ):
                pytest.skip(
                    f"Space {test_space_key} not found. Skipping page creation test."
                )
                return
            else:
                raise

        page_id = page.id
        resource_tracker.add_confluence_page(page_id)

        assert page is not None
        assert page.title == title

        retrieved_page = confluence_client.get_page_content(page_id)
        assert retrieved_page is not None
    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_confluence_update_page(
    confluence_client: ConfluenceFetcher,
    resource_tracker: ResourceTracker,
    test_space_key: str,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test updating a page in Confluence and validate TextContent structure.

    This test has two purposes:
    1. Test the basic page update functionality
    2. Validate the TextContent class requires the 'type' field to prevent issue #97
    """
    test_id = str(uuid.uuid4())[:8]
    title = f"Update Test Page {test_id}"
    content = f"<p>Initial content {test_id}</p>"

    try:
        page = confluence_client.create_page(
            space_key=test_space_key, title=title, body=content
        )

        page_id = page.id
        resource_tracker.add_confluence_page(page_id)

        now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
        updated_content = f"<p>Updated content {test_id} at {now}</p>"

        updated_page = confluence_client.update_page(
            page_id=page_id, title=title, body=updated_content
        )

        assert updated_page is not None

        # ======= TextContent Validation (prevents issue #97) =======
        # Import TextContent class to test directly
        from mcp.types import TextContent

        print("Testing TextContent validation to prevent issue #97")

        try:
            _ = TextContent(text="This should fail without type field")
            raise AssertionError(
                "TextContent creation without 'type' field should fail but didn't"
            )
        except Exception as e:
            print(f"Correctly got error: {str(e)}")
            assert "type" in str(e), "Error should mention missing 'type' field"

        valid_content = TextContent(
            type="text", text="This should work with type field"
        )
        assert valid_content.type == "text", "TextContent should have type='text'"
        assert valid_content.text == "This should work with type field", (
            "TextContent text should match"
        )

        print("TextContent validation succeeded - 'type' field is properly required")

    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_confluence_add_page_label(
    confluence_client: ConfluenceFetcher,
    resource_tracker: ResourceTracker,
    test_space_key: str,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test adding a label to a page in Confluence"""
    test_id = str(uuid.uuid4())[:8]
    title = f"Update Test Page {test_id}"
    content = f"<p>Initial content {test_id}</p>"

    try:
        page = confluence_client.create_page(
            space_key=test_space_key, title=title, body=content
        )

        page_id = page.id
        resource_tracker.add_confluence_page(page_id)

        name = "test"
        updated_labels = confluence_client.add_page_label(page_id=page_id, name=name)

        assert updated_labels is not None
    finally:
        cleanup_resources()


@pytest.mark.skip(reason="This test modifies data - use with caution")
@pytest.mark.anyio
async def test_jira_transition_issue(
    jira_client: JiraFetcher,
    resource_tracker: ResourceTracker,
    test_project_key: str,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test transitioning an issue in Jira."""
    test_id = str(uuid.uuid4())[:8]
    summary = f"Transition Test Issue {test_id}"

    try:
        issue = jira_client.create_issue(
            project_key=test_project_key,
            summary=summary,
            description="Test issue for transition testing",
            issue_type="Task",
        )

        resource_tracker.add_jira_issue(issue.key)

        transitions = jira_client.get_transitions(issue.key)
        assert transitions is not None
        assert len(transitions) > 0

        transition_id = None
        for transition in transitions:
            if hasattr(transition, "name") and "progress" in transition.name.lower():
                transition_id = transition.id
                break

        if not transition_id and transitions:
            transition_id = (
                transitions[0].id
                if hasattr(transitions[0], "id")
                else transitions[0]["id"]
            )

        assert transition_id is not None

        transition_result = jira_client.transition_issue(
            issue_key=issue.key, transition_id=transition_id
        )

        if transition_result is not None:
            assert transition_result, (
                "Transition should return a truthy value if successful"
            )

        updated_issue = jira_client.get_issue(issue.key)

        if hasattr(updated_issue, "status") and hasattr(updated_issue.status, "name"):
            status_name = updated_issue.status.name
        else:
            status_name = updated_issue["fields"]["status"]["name"]

        assert "to do" not in status_name.lower()
    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_jira_create_epic_with_custom_fields(
    jira_client: JiraFetcher,
    test_project_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """
    Test creating an Epic issue in Jira with custom Epic fields.

    This test verifies that the create_issue method can handle Epic creation
    with explicit Epic Name and Epic Color values, properly detecting the
    correct custom fields regardless of the Jira configuration.
    """
    test_id = str(uuid.uuid4())[:8]
    epic_summary = f"Test Epic {test_id}"
    custom_epic_name = f"Custom Epic Name {test_id}"

    try:
        field_ids = jira_client.get_field_ids_to_epic()
        jira_client._field_ids_cache = None
        print(f"Discovered field IDs: {field_ids}")

        epic_issue = jira_client.create_issue(
            project_key=test_project_key,
            summary=epic_summary,
            description="This is a test epic with custom values.",
            issue_type="Epic",
            epic_name=custom_epic_name,
            epic_color="blue",
        )
        jira_client._field_ids_cache = None

        resource_tracker.add_jira_issue(epic_issue.key)
        jira_client._field_ids_cache = None

        assert epic_issue is not None
        assert epic_issue.key.startswith(test_project_key)
        assert epic_issue.summary == epic_summary

        print(f"\nTEST PASSED: Successfully created Epic issue {epic_issue.key}")

        retrieved_epic = jira_client.get_issue(epic_issue.key)
        jira_client._field_ids_cache = None

        has_epic_name = False
        if hasattr(retrieved_epic, "epic_name") and retrieved_epic.epic_name:
            assert retrieved_epic.epic_name == custom_epic_name
            has_epic_name = True
            print(
                f"Verified Epic Name via epic_name property: {retrieved_epic.epic_name}"
            )

        if not has_epic_name:
            print("Could not verify Epic Name directly, checking raw fields...")
            if hasattr(jira_client, "jira"):
                raw_issue = jira_client.jira.issue(epic_issue.key)
                fields = raw_issue.get("fields", {})

                for field_id, value in fields.items():
                    if field_id.startswith("customfield_") and isinstance(value, str):
                        if value == custom_epic_name:
                            print(
                                f"Found Epic Name in custom field {field_id}: {value}"
                            )
                            has_epic_name = True
                            break

        if not has_epic_name:
            print("WARNING: Could not verify Epic Name was set correctly")

    except Exception as e:
        print(f"\nERROR creating Epic with custom fields: {str(e)}")

        try:
            print("\n=== Jira Field Information for Debugging ===")
            field_ids = jira_client.get_field_ids_to_epic()
            print(f"Available field IDs: {field_ids}")
        except Exception as error:
            print(f"Error retrieving field IDs: {str(error)}")

        raise
    finally:
        cleanup_resources()


@pytest.mark.anyio
async def test_jira_create_epic_two_step(
    jira_client: JiraFetcher,
    test_project_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """
    Test the two-step Epic creation process.

    This test verifies that the create_issue method can successfully create an Epic
    using the two-step approach (create basic issue first, then update Epic fields)
    to work around screen configuration issues.
    """
    test_id = str(uuid.uuid4())[:8]
    epic_summary = f"Two-Step Epic {test_id}"
    epic_name = f"Epic Name {test_id}"

    try:
        field_ids = jira_client.get_field_ids_to_epic()
        jira_client._field_ids_cache = None
        print(f"\nAvailable field IDs for Epic creation: {field_ids}")

        print("\nAttempting to create Epic using two-step process...")
        epic_issue = jira_client.create_issue(
            project_key=test_project_key,
            summary=epic_summary,
            description="This is a test epic using the two-step creation process.",
            issue_type="Epic",
            epic_name=epic_name,  # This should be stored for post-creation update
            epic_color="blue",  # This should be stored for post-creation update
        )
        jira_client._field_ids_cache = None

        resource_tracker.add_jira_issue(epic_issue.key)
        jira_client._field_ids_cache = None

        assert epic_issue is not None
        assert epic_issue.key.startswith(test_project_key)
        assert epic_issue.summary == epic_summary

        print(f"\nSuccessfully created Epic: {epic_issue.key}")

        retrieved_epic = jira_client.get_issue(epic_issue.key)
        jira_client._field_ids_cache = None
        print(f"\nRetrieved Epic: {retrieved_epic.key}")

        print(f"Epic name: {retrieved_epic.epic_name}")

        try:
            if hasattr(retrieved_epic, "_raw"):
                raw_data = retrieved_epic._raw
            else:
                raw_data = jira_client.jira.issue(epic_issue.key)

            if "fields" in raw_data:
                for field_id, field_value in raw_data["fields"].items():
                    if "epic" in field_id.lower() or field_id in field_ids.values():
                        print(f"Field {field_id}: {field_value}")
        except Exception as e:
            print(f"Error getting raw Epic data: {str(e)}")

        print("\nTEST PASSED: Successfully completed two-step Epic creation test")

    except Exception as e:
        print(f"\nERROR in two-step Epic creation test: {str(e)}")

        print("\nAvailable field IDs:")
        try:
            field_ids = jira_client.get_field_ids_to_epic()
            for name, field_id in field_ids.items():
                print(f"  {name}: {field_id}")
        except Exception as field_error:
            print(f"Error getting field IDs: {str(field_error)}")

        raise
    finally:
        cleanup_resources()


# Tool Validation Tests (Requires --use-real-data)
# These tests use the server's call_tool handler to test the full flow
@pytest.mark.usefixtures("use_real_jira_data")
class TestRealToolValidation:
    """
    Test class for validating tool calls with real API data.
    """

    @pytest.mark.anyio
    async def test_jira_search_with_start_at(
        self, use_real_jira_data: bool, test_project_key: str
    ) -> None:
        """Test the jira_search tool with the startAt parameter."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        jql = f'project = "{test_project_key}" ORDER BY created ASC'
        limit = 1

        args1 = {"jql": jql, "limit": limit, "startAt": 0}
        result1_content: Sequence[TextContent] = await call_tool(
            api_validation_client, "jira_search", args1
        )
        assert result1_content and isinstance(result1_content[0], TextContent)
        results1 = json.loads(result1_content[0].text)

        args2 = {"jql": jql, "limit": limit, "startAt": 1}
        result2_content: Sequence[TextContent] = await call_tool(
            api_validation_client, "jira_search", args2
        )
        assert result2_content and isinstance(result2_content[0], TextContent)
        results2 = json.loads(result2_content[0].text)

        assert isinstance(results1.get("issues"), list)
        assert isinstance(results2.get("issues"), list)

        if len(results1["issues"]) > 0 and len(results2["issues"]) > 0:
            assert results1["issues"][0]["key"] != results2["issues"][0]["key"], (
                f"Expected different issues with startAt=0 and startAt=1, but got {results1['issues'][0]['key']} for both."
                f" Ensure project '{test_project_key}' has at least 2 issues."
            )
        elif len(results1["issues"]) <= 1:
            pytest.skip(
                f"Project {test_project_key} has less than 2 issues, cannot test pagination."
            )

    @pytest.mark.anyio
    async def test_jira_get_project_issues_with_start_at(
        self, use_real_jira_data: bool, test_project_key: str
    ) -> None:
        """Test the jira_get_project_issues tool with the startAt parameter."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        limit = 1

        args1 = {"project_key": test_project_key, "limit": limit, "startAt": 0}
        result1_content = list(
            await call_tool(api_validation_client, "jira_get_project_issues", args1)
        )
        assert isinstance(result1_content[0], TextContent)
        results1 = json.loads(result1_content[0].text)

        args2 = {"project_key": test_project_key, "limit": limit, "startAt": 1}
        result2_content = list(
            await call_tool(api_validation_client, "jira_get_project_issues", args2)
        )
        assert isinstance(result2_content[0], TextContent)
        results2 = json.loads(result2_content[0].text)

        assert isinstance(results1, list)
        assert isinstance(results2, list)

        if len(results1) > 0 and len(results2) > 0:
            assert results1[0]["key"] != results2[0]["key"], (
                f"Expected different issues with startAt=0 and startAt=1, but got {results1[0]['key']} for both."
                f" Ensure project '{test_project_key}' has at least 2 issues."
            )
        elif len(results1) <= 1:
            pytest.skip(
                f"Project {test_project_key} has less than 2 issues, cannot test pagination."
            )

    @pytest.mark.anyio
    async def test_jira_get_epic_issues_with_start_at(
        self, use_real_jira_data: bool, test_epic_key: str
    ) -> None:
        """Test the jira_get_epic_issues tool with the startAt parameter."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        limit = 1

        args1 = {"epic_key": test_epic_key, "limit": limit, "startAt": 0}
        result1_content: Sequence[TextContent] = await call_tool(
            api_validation_client, "jira_get_epic_issues", args1
        )
        assert result1_content and isinstance(result1_content[0], TextContent)
        results1 = json.loads(result1_content[0].text)

        args2 = {"epic_key": test_epic_key, "limit": limit, "startAt": 1}
        result2_content: Sequence[TextContent] = await call_tool(
            api_validation_client, "jira_get_epic_issues", args2
        )
        assert result2_content and isinstance(result2_content[0], TextContent)
        results2 = json.loads(result2_content[0].text)

        assert isinstance(results1.get("issues"), list)
        assert isinstance(results2.get("issues"), list)

        if len(results1["issues"]) > 0 and len(results2["issues"]) > 0:
            assert results1["issues"][0]["key"] != results2["issues"][0]["key"], (
                f"Expected different issues with startAt=0 and startAt=1, but got {results1['issues'][0]['key']} for both."
                f" Ensure epic '{test_epic_key}' has at least 2 linked issues."
            )
        elif len(results1["issues"]) <= 1:
            pytest.skip(
                f"Epic {test_epic_key} has less than 2 issues, cannot test pagination."
            )

    @pytest.mark.anyio
    async def test_jira_get_issue_includes_comments(
        self, use_real_jira_data: bool, test_issue_key: str
    ) -> None:
        """Test that jira_get_issue includes comments when comment_limit > 0."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        result = await call_tool(
            api_validation_client,
            "jira_get_issue",
            {"issue_key": test_issue_key, "comment_limit": 10},
        )

        assert isinstance(result, dict)
        assert "comments" in result
        assert isinstance(result["comments"], list)

        result_without_comments = await call_tool(
            api_validation_client,
            "jira_get_issue",
            {"issue_key": test_issue_key, "comment_limit": 10, "fields": "summary"},
        )

        assert isinstance(result_without_comments, dict)
        assert "comments" not in result_without_comments

    @pytest.mark.anyio
    async def test_jira_get_link_types_tool(
        self, use_real_jira_data: bool, api_validation_client: Client
    ) -> None:
        """Test the jira_get_link_types tool."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        try:
            result_content = list(
                await call_tool(api_validation_client, "jira_get_link_types", {})
            )
        except LookupError:
            pytest.skip("Server context not available for call_tool")

        assert isinstance(result_content[0], TextContent)
        link_types = json.loads(result_content[0].text)

        assert isinstance(link_types, list)
        assert len(link_types) > 0

        first_link = link_types[0]
        assert "id" in first_link
        assert "name" in first_link
        assert "inward" in first_link
        assert "outward" in first_link

    @pytest.mark.anyio
    async def test_jira_create_issue_link_tool(
        self, use_real_jira_data: bool, test_project_key: str, api_validation_client
    ) -> None:
        """Test the jira_create_issue_link and jira_remove_issue_link tools."""
        if not use_real_jira_data:
            pytest.skip("Real Jira data testing is disabled")

        test_id = str(uuid.uuid4())[:8]

        issue1_args = {
            "project_key": test_project_key,
            "summary": f"Link Test Source {test_id}",
            "description": "Test issue for link testing via tool",
            "issue_type": "Task",
        }
        issue1_content: Sequence[TextContent] = await call_tool(
            api_validation_client, "jira/create_issue", issue1_args
        )
        assert issue1_content and isinstance(issue1_content[0], TextContent)
        issue1_data = json.loads(issue1_content[0].text)
        issue1_key = issue1_data["key"]

        issue2_args = {
            "project_key": test_project_key,
            "summary": f"Link Test Target {test_id}",
            "description": "Test issue for link testing via tool",
            "issue_type": "Task",
        }
        issue2_content: Sequence[TextContent] = await call_tool(
            api_validation_client, "jira/create_issue", issue2_args
        )
        assert issue2_content and isinstance(issue2_content[0], TextContent)
        issue2_data = json.loads(issue2_content[0].text)
        issue2_key = issue2_data["key"]

        try:
            link_types_content: Sequence[TextContent] = await call_tool(
                api_validation_client, "jira/get_link_types", {}
            )
            assert link_types_content and isinstance(link_types_content[0], TextContent)
            link_types = json.loads(link_types_content[0].text)

            link_type_name = None
            for lt in link_types:
                if "relate" in lt["name"].lower():
                    link_type_name = lt["name"]
                    break

            # If no "relates to" type found, use the first available type
            if not link_type_name:
                link_type_name = link_types[0]["name"]

            link_args = {
                "link_type": link_type_name,
                "inward_issue_key": issue1_key,
                "outward_issue_key": issue2_key,
                "comment": f"Test link created by API validation test {test_id}",
            }
            link_content: Sequence[TextContent] = await call_tool(
                api_validation_client, "jira/create_issue_link", link_args
            )

            assert link_content and isinstance(link_content[0], TextContent)
            link_result = json.loads(link_content[0].text)
            assert link_result["success"] is True

            issue_content: Sequence[TextContent] = await call_tool(
                api_validation_client,
                "jira/get_issue",
                {"issue_key": issue1_key, "fields": "issuelinks"},
            )
            assert issue_content and isinstance(issue_content[0], TextContent)
            issue_data = json.loads(issue_content[0].text)

            link_id = None
            if "issuelinks" in issue_data:
                for link in issue_data["issuelinks"]:
                    if link.get("outwardIssue", {}).get("key") == issue2_key:
                        link_id = link.get("id")
                        break

            # If we found a link ID, test removing it
            if link_id:
                remove_args = {"link_id": link_id}
                remove_content: Sequence[TextContent] = await call_tool(
                    api_validation_client, "jira/remove_issue_link", remove_args
                )

                assert remove_content and isinstance(remove_content[0], TextContent)
                remove_result = json.loads(remove_content[0].text)
                assert remove_result["success"] is True
                assert remove_result["link_id"] == link_id

        finally:
            await call_tool(
                api_validation_client, "jira/delete_issue", {"issue_key": issue1_key}
            )
            await call_tool(
                api_validation_client, "jira/delete_issue", {"issue_key": issue2_key}
            )


@pytest.mark.anyio
async def test_jira_get_issue_link_types(jira_client: JiraFetcher) -> None:
    """Test retrieving issue link types from Jira."""
    links_client = LinksMixin(config=jira_client.config)

    link_types = links_client.get_issue_link_types()

    # An empty list is a valid response if no link types are configured or accessible
    assert isinstance(link_types, list)

    # If the list is not empty, check the structure of the first element
    if link_types:
        first_link = link_types[0]
        assert isinstance(first_link, JiraIssueLinkType)
        assert first_link.id is not None
        assert first_link.name is not None
        assert first_link.inward is not None
        assert first_link.outward is not None


@pytest.mark.anyio
async def test_jira_create_and_remove_issue_link(
    jira_client: JiraFetcher,
    test_project_key: str,
    resource_tracker: ResourceTracker,
    cleanup_resources: Callable[[], None],
) -> None:
    """Test creating and removing a link between two Jira issues."""
    test_id = str(uuid.uuid4())[:8]
    summary1 = f"Link Test Issue 1 {test_id}"
    summary2 = f"Link Test Issue 2 {test_id}"

    try:
        issue1 = jira_client.create_issue(
            project_key=test_project_key,
            summary=summary1,
            description="First test issue for link testing",
            issue_type="Task",
        )

        issue2 = jira_client.create_issue(
            project_key=test_project_key,
            summary=summary2,
            description="Second test issue for link testing",
            issue_type="Task",
        )

        resource_tracker.add_jira_issue(issue1.key)
        resource_tracker.add_jira_issue(issue2.key)

        links_client = LinksMixin(config=jira_client.config)

        link_types = links_client.get_issue_link_types()
        assert len(link_types) > 0

        link_type_name = None
        for lt in link_types:
            if "relate" in lt.name.lower():
                link_type_name = lt.name
                break

        # If no "Relates" type found, use the first available type
        if not link_type_name:
            link_type_name = link_types[0].name

        link_data = {
            "type": {"name": link_type_name},
            "inwardIssue": {"key": issue1.key},
            "outwardIssue": {"key": issue2.key},
            "comment": {"body": f"Test link created by API validation test {test_id}"},
        }

        link_result = links_client.create_issue_link(link_data)

        assert link_result is not None
        assert link_result["success"] is True
        assert link_result["inward_issue"] == issue1.key
        assert link_result["outward_issue"] == issue2.key

        raw_issue = jira_client.jira.issue(issue1.key, fields="issuelinks")

        link_id = None
        if hasattr(raw_issue, "fields") and hasattr(raw_issue.fields, "issuelinks"):
            for link in raw_issue.fields.issuelinks:
                if (
                    hasattr(link, "outwardIssue")
                    and link.outwardIssue.key == issue2.key
                ):
                    link_id = link.id
                    break

        # Skip link removal test if we couldn't find the link ID
        if not link_id:
            pytest.skip("Could not find link ID for removal test")

        remove_result = links_client.remove_issue_link(link_id)

        assert remove_result is not None
        assert remove_result["success"] is True
        assert remove_result["link_id"] == link_id

    finally:
        # Clean up resources even if the test fails
        cleanup_resources()


@pytest.mark.skipif(not os.getenv("TEST_PROXY_URL"), reason="TEST_PROXY_URL not set")
def test_jira_client_real_proxy(jira_config: JiraConfig) -> None:
    """Test JiraClient with a real proxy if TEST_PROXY_URL is set."""
    import requests

    proxy_url = os.environ["TEST_PROXY_URL"]
    os.environ["HTTP_PROXY"] = proxy_url
    os.environ["HTTPS_PROXY"] = proxy_url
    # Use a simple API call to verify proxy is used and no connection error
    client = JiraFetcher(config=JiraConfig.from_env())
    try:
        issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY")
        if not issue_key:
            pytest.skip("JIRA_TEST_ISSUE_KEY not set")
        result = client.get_issue(issue_key)
        assert result is not None
    except requests.exceptions.ProxyError:
        pytest.fail("Proxy connection failed - check TEST_PROXY_URL and network setup.")
    finally:
        # Clean up env
        del os.environ["HTTP_PROXY"]
        del os.environ["HTTPS_PROXY"]

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/jira.py:
--------------------------------------------------------------------------------

```python
"""Jira FastMCP server instance and tool definitions."""

import json
import logging
from typing import Annotated, Any

from fastmcp import Context, FastMCP
from pydantic import Field
from requests.exceptions import HTTPError

from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.constants import DEFAULT_READ_JIRA_FIELDS
from mcp_atlassian.models.jira.common import JiraUser
from mcp_atlassian.servers.dependencies import get_jira_fetcher
from mcp_atlassian.utils.decorators import check_write_access

logger = logging.getLogger(__name__)

jira_mcp = FastMCP(
    name="Jira MCP Service",
    description="Provides tools for interacting with Atlassian Jira.",
)


@jira_mcp.tool(tags={"jira", "read"})
async def get_user_profile(
    ctx: Context,
    user_identifier: Annotated[
        str,
        Field(
            description="Identifier for the user (e.g., email address '[email protected]', username 'johndoe', account ID 'accountid:...', or key for Server/DC)."
        ),
    ],
) -> str:
    """
    Retrieve profile information for a specific Jira user.

    Args:
        ctx: The FastMCP context.
        user_identifier: User identifier (email, username, key, or account ID).

    Returns:
        JSON string representing the Jira user profile object, or an error object if not found.

    Raises:
        ValueError: If the Jira client is not configured or available.
    """
    jira = await get_jira_fetcher(ctx)
    try:
        user: JiraUser = jira.get_user_profile_by_identifier(user_identifier)
        result = user.to_simplified_dict()
        response_data = {"success": True, "user": result}
    except Exception as e:
        error_message = ""
        log_level = logging.ERROR
        if isinstance(e, ValueError) and "not found" in str(e).lower():
            log_level = logging.WARNING
            error_message = str(e)
        elif isinstance(e, MCPAtlassianAuthenticationError):
            error_message = f"Authentication/Permission Error: {str(e)}"
        elif isinstance(e, OSError | HTTPError):
            error_message = f"Network or API Error: {str(e)}"
        else:
            error_message = (
                "An unexpected error occurred while fetching the user profile."
            )
            logger.exception(
                f"Unexpected error in get_user_profile for '{user_identifier}':"
            )
        error_result = {
            "success": False,
            "error": str(e),
            "user_identifier": user_identifier,
        }
        logger.log(
            log_level,
            f"get_user_profile failed for '{user_identifier}': {error_message}",
        )
        response_data = error_result
    return json.dumps(response_data, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_issue(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
    fields: Annotated[
        str,
        Field(
            description=(
                "(Optional) Comma-separated list of fields to return (e.g., 'summary,status,customfield_10010'). "
                "You may also provide a single field as a string (e.g., 'duedate'). "
                "Use '*all' for all fields (including custom fields), or omit for essential fields only."
            ),
            default=",".join(DEFAULT_READ_JIRA_FIELDS),
        ),
    ] = ",".join(DEFAULT_READ_JIRA_FIELDS),
    expand: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) Fields to expand. Examples: 'renderedFields' (for rendered content), "
                "'transitions' (for available status transitions), 'changelog' (for history)"
            ),
            default=None,
        ),
    ] = None,
    comment_limit: Annotated[
        int,
        Field(
            description="Maximum number of comments to include (0 or null for no comments)",
            default=10,
            ge=0,
            le=100,
        ),
    ] = 10,
    properties: Annotated[
        str | None,
        Field(
            description="(Optional) A comma-separated list of issue properties to return",
            default=None,
        ),
    ] = None,
    update_history: Annotated[
        bool,
        Field(
            description="Whether to update the issue view history for the requesting user",
            default=True,
        ),
    ] = True,
) -> str:
    """Get details of a specific Jira issue including its Epic links and relationship information.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.
        fields: Comma-separated list of fields to return (e.g., 'summary,status,customfield_10010'), a single field as a string (e.g., 'duedate'), '*all' for all fields, or omitted for essentials.
        expand: Optional fields to expand.
        comment_limit: Maximum number of comments.
        properties: Issue properties to return.
        update_history: Whether to update issue view history.

    Returns:
        JSON string representing the Jira issue object.

    Raises:
        ValueError: If the Jira client is not configured or available.
    """
    jira = await get_jira_fetcher(ctx)
    fields_list: str | list[str] | None = fields
    if fields and fields != "*all":
        fields_list = [f.strip() for f in fields.split(",")]

    issue = jira.get_issue(
        issue_key=issue_key,
        fields=fields_list,
        expand=expand,
        comment_limit=comment_limit,
        properties=properties.split(",") if properties else None,
        update_history=update_history,
    )
    result = issue.to_simplified_dict()
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def search(
    ctx: Context,
    jql: Annotated[
        str,
        Field(
            description=(
                "JQL query string (Jira Query Language). Examples:\n"
                '- Find Epics: "issuetype = Epic AND project = PROJ"\n'
                '- Find issues in Epic: "parent = PROJ-123"\n'
                "- Find by status: \"status = 'In Progress' AND project = PROJ\"\n"
                '- Find by assignee: "assignee = currentUser()"\n'
                '- Find recently updated: "updated >= -7d AND project = PROJ"\n'
                '- Find by label: "labels = frontend AND project = PROJ"\n'
                '- Find by priority: "priority = High AND project = PROJ"'
            )
        ),
    ],
    fields: Annotated[
        str,
        Field(
            description=(
                "(Optional) Comma-separated fields to return in the results. "
                "Use '*all' for all fields, or specify individual fields like 'summary,status,assignee,priority'"
            ),
            default=",".join(DEFAULT_READ_JIRA_FIELDS),
        ),
    ] = ",".join(DEFAULT_READ_JIRA_FIELDS),
    limit: Annotated[
        int,
        Field(description="Maximum number of results (1-50)", default=10, ge=1),
    ] = 10,
    start_at: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
    projects_filter: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) Comma-separated list of project keys to filter results by. "
                "Overrides the environment variable JIRA_PROJECTS_FILTER if provided."
            ),
            default=None,
        ),
    ] = None,
    expand: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) fields to expand. Examples: 'renderedFields', 'transitions', 'changelog'"
            ),
            default=None,
        ),
    ] = None,
) -> str:
    """Search Jira issues using JQL (Jira Query Language).

    Args:
        ctx: The FastMCP context.
        jql: JQL query string.
        fields: Comma-separated fields to return.
        limit: Maximum number of results.
        start_at: Starting index for pagination.
        projects_filter: Comma-separated list of project keys to filter by.
        expand: Optional fields to expand.

    Returns:
        JSON string representing the search results including pagination info.
    """
    jira = await get_jira_fetcher(ctx)
    fields_list: str | list[str] | None = fields
    if fields and fields != "*all":
        fields_list = [f.strip() for f in fields.split(",")]

    search_result = jira.search_issues(
        jql=jql,
        fields=fields_list,
        limit=limit,
        start=start_at,
        expand=expand,
        projects_filter=projects_filter,
    )
    result = search_result.to_simplified_dict()
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def search_fields(
    ctx: Context,
    keyword: Annotated[
        str,
        Field(
            description="Keyword for fuzzy search. If left empty, lists the first 'limit' available fields in their default order.",
            default="",
        ),
    ] = "",
    limit: Annotated[
        int, Field(description="Maximum number of results", default=10, ge=1)
    ] = 10,
    refresh: Annotated[
        bool,
        Field(description="Whether to force refresh the field list", default=False),
    ] = False,
) -> str:
    """Search Jira fields by keyword with fuzzy match.

    Args:
        ctx: The FastMCP context.
        keyword: Keyword for fuzzy search.
        limit: Maximum number of results.
        refresh: Whether to force refresh the field list.

    Returns:
        JSON string representing a list of matching field definitions.
    """
    jira = await get_jira_fetcher(ctx)
    result = jira.search_fields(keyword, limit=limit, refresh=refresh)
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_project_issues(
    ctx: Context,
    project_key: Annotated[str, Field(description="The project key")],
    limit: Annotated[
        int,
        Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
    ] = 10,
    start_at: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
) -> str:
    """Get all issues for a specific Jira project.

    Args:
        ctx: The FastMCP context.
        project_key: The project key.
        limit: Maximum number of results.
        start_at: Starting index for pagination.

    Returns:
        JSON string representing the search results including pagination info.
    """
    jira = await get_jira_fetcher(ctx)
    search_result = jira.get_project_issues(
        project_key=project_key, start=start_at, limit=limit
    )
    result = search_result.to_simplified_dict()
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_transitions(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
) -> str:
    """Get available status transitions for a Jira issue.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.

    Returns:
        JSON string representing a list of available transitions.
    """
    jira = await get_jira_fetcher(ctx)
    # Underlying method returns list[dict] in the desired format
    transitions = jira.get_available_transitions(issue_key)
    return json.dumps(transitions, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_worklog(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
) -> str:
    """Get worklog entries for a Jira issue.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.

    Returns:
        JSON string representing the worklog entries.
    """
    jira = await get_jira_fetcher(ctx)
    worklogs = jira.get_worklogs(issue_key)
    result = {"worklogs": worklogs}
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def download_attachments(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
    target_dir: Annotated[
        str, Field(description="Directory where attachments should be saved")
    ],
) -> str:
    """Download attachments from a Jira issue.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.
        target_dir: Directory to save attachments.

    Returns:
        JSON string indicating the result of the download operation.
    """
    jira = await get_jira_fetcher(ctx)
    result = jira.download_issue_attachments(issue_key=issue_key, target_dir=target_dir)
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_agile_boards(
    ctx: Context,
    board_name: Annotated[
        str | None,
        Field(description="(Optional) The name of board, support fuzzy search"),
    ] = None,
    project_key: Annotated[
        str | None, Field(description="(Optional) Jira project key (e.g., 'PROJ-123')")
    ] = None,
    board_type: Annotated[
        str | None,
        Field(
            description="(Optional) The type of jira board (e.g., 'scrum', 'kanban')"
        ),
    ] = None,
    start_at: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
    limit: Annotated[
        int,
        Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
    ] = 10,
) -> str:
    """Get jira agile boards by name, project key, or type.

    Args:
        ctx: The FastMCP context.
        board_name: Name of the board (fuzzy search).
        project_key: Project key.
        board_type: Board type ('scrum' or 'kanban').
        start_at: Starting index.
        limit: Maximum results.

    Returns:
        JSON string representing a list of board objects.
    """
    jira = await get_jira_fetcher(ctx)
    boards = jira.get_all_agile_boards_model(
        board_name=board_name,
        project_key=project_key,
        board_type=board_type,
        start=start_at,
        limit=limit,
    )
    result = [board.to_simplified_dict() for board in boards]
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_board_issues(
    ctx: Context,
    board_id: Annotated[str, Field(description="The id of the board (e.g., '1001')")],
    jql: Annotated[
        str,
        Field(
            description=(
                "JQL query string (Jira Query Language). Examples:\n"
                '- Find Epics: "issuetype = Epic AND project = PROJ"\n'
                '- Find issues in Epic: "parent = PROJ-123"\n'
                "- Find by status: \"status = 'In Progress' AND project = PROJ\"\n"
                '- Find by assignee: "assignee = currentUser()"\n'
                '- Find recently updated: "updated >= -7d AND project = PROJ"\n'
                '- Find by label: "labels = frontend AND project = PROJ"\n'
                '- Find by priority: "priority = High AND project = PROJ"'
            )
        ),
    ],
    fields: Annotated[
        str,
        Field(
            description=(
                "Comma-separated fields to return in the results. "
                "Use '*all' for all fields, or specify individual "
                "fields like 'summary,status,assignee,priority'"
            ),
            default=",".join(DEFAULT_READ_JIRA_FIELDS),
        ),
    ] = ",".join(DEFAULT_READ_JIRA_FIELDS),
    start_at: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
    limit: Annotated[
        int,
        Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
    ] = 10,
    expand: Annotated[
        str,
        Field(
            description="Optional fields to expand in the response (e.g., 'changelog').",
            default="version",
        ),
    ] = "version",
) -> str:
    """Get all issues linked to a specific board filtered by JQL.

    Args:
        ctx: The FastMCP context.
        board_id: The ID of the board.
        jql: JQL query string to filter issues.
        fields: Comma-separated fields to return.
        start_at: Starting index for pagination.
        limit: Maximum number of results.
        expand: Optional fields to expand.

    Returns:
        JSON string representing the search results including pagination info.
    """
    jira = await get_jira_fetcher(ctx)
    fields_list: str | list[str] | None = fields
    if fields and fields != "*all":
        fields_list = [f.strip() for f in fields.split(",")]

    search_result = jira.get_board_issues(
        board_id=board_id,
        jql=jql,
        fields=fields_list,
        start=start_at,
        limit=limit,
        expand=expand,
    )
    result = search_result.to_simplified_dict()
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_sprints_from_board(
    ctx: Context,
    board_id: Annotated[str, Field(description="The id of board (e.g., '1000')")],
    state: Annotated[
        str | None,
        Field(description="Sprint state (e.g., 'active', 'future', 'closed')"),
    ] = None,
    start_at: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
    limit: Annotated[
        int,
        Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
    ] = 10,
) -> str:
    """Get jira sprints from board by state.

    Args:
        ctx: The FastMCP context.
        board_id: The ID of the board.
        state: Sprint state ('active', 'future', 'closed'). If None, returns all sprints.
        start_at: Starting index.
        limit: Maximum results.

    Returns:
        JSON string representing a list of sprint objects.
    """
    jira = await get_jira_fetcher(ctx)
    sprints = jira.get_all_sprints_from_board_model(
        board_id=board_id, state=state, start=start_at, limit=limit
    )
    result = [sprint.to_simplified_dict() for sprint in sprints]
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_sprint_issues(
    ctx: Context,
    sprint_id: Annotated[str, Field(description="The id of sprint (e.g., '10001')")],
    fields: Annotated[
        str,
        Field(
            description=(
                "Comma-separated fields to return in the results. "
                "Use '*all' for all fields, or specify individual "
                "fields like 'summary,status,assignee,priority'"
            ),
            default=",".join(DEFAULT_READ_JIRA_FIELDS),
        ),
    ] = ",".join(DEFAULT_READ_JIRA_FIELDS),
    start_at: Annotated[
        int,
        Field(description="Starting index for pagination (0-based)", default=0, ge=0),
    ] = 0,
    limit: Annotated[
        int,
        Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
    ] = 10,
) -> str:
    """Get jira issues from sprint.

    Args:
        ctx: The FastMCP context.
        sprint_id: The ID of the sprint.
        fields: Comma-separated fields to return.
        start_at: Starting index.
        limit: Maximum results.

    Returns:
        JSON string representing the search results including pagination info.
    """
    jira = await get_jira_fetcher(ctx)
    fields_list: str | list[str] | None = fields
    if fields and fields != "*all":
        fields_list = [f.strip() for f in fields.split(",")]

    search_result = jira.get_sprint_issues(
        sprint_id=sprint_id, fields=fields_list, start=start_at, limit=limit
    )
    result = search_result.to_simplified_dict()
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_link_types(ctx: Context) -> str:
    """Get all available issue link types.

    Args:
        ctx: The FastMCP context.

    Returns:
        JSON string representing a list of issue link type objects.
    """
    jira = await get_jira_fetcher(ctx)
    link_types = jira.get_issue_link_types()
    formatted_link_types = [link_type.to_simplified_dict() for link_type in link_types]
    return json.dumps(formatted_link_types, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_issue(
    ctx: Context,
    project_key: Annotated[
        str,
        Field(
            description=(
                "The JIRA project key (e.g. 'PROJ', 'DEV', 'SUPPORT'). "
                "This is the prefix of issue keys in your project. "
                "Never assume what it might be, always ask the user."
            )
        ),
    ],
    summary: Annotated[str, Field(description="Summary/title of the issue")],
    issue_type: Annotated[
        str,
        Field(
            description=(
                "Issue type (e.g. 'Task', 'Bug', 'Story', 'Epic', 'Subtask'). "
                "The available types depend on your project configuration. "
                "For subtasks, use 'Subtask' (not 'Sub-task') and include parent in additional_fields."
            ),
        ),
    ],
    assignee: Annotated[
        str | None,
        Field(
            description="(Optional) Assignee's user identifier (string): Email, display name, or account ID (e.g., '[email protected]', 'John Doe', 'accountid:...')",
            default=None,
        ),
    ] = None,
    description: Annotated[
        str | None, Field(description="Issue description", default=None)
    ] = None,
    components: Annotated[
        str | None,
        Field(
            description="(Optional) Comma-separated list of component names to assign (e.g., 'Frontend,API')",
            default=None,
        ),
    ] = None,
    additional_fields: Annotated[
        dict[str, Any] | None,
        Field(
            description=(
                "(Optional) Dictionary of additional fields to set. Examples:\n"
                "- Set priority: {'priority': {'name': 'High'}}\n"
                "- Add labels: {'labels': ['frontend', 'urgent']}\n"
                "- Link to parent (for any issue type): {'parent': 'PROJ-123'}\n"
                "- Set Fix Version/s: {'fixVersions': [{'id': '10020'}]}\n"
                "- Custom fields: {'customfield_10010': 'value'}"
            ),
            default=None,
        ),
    ] = None,
) -> str:
    """Create a new Jira issue with optional Epic link or parent for subtasks.

    Args:
        ctx: The FastMCP context.
        project_key: The JIRA project key.
        summary: Summary/title of the issue.
        issue_type: Issue type (e.g., 'Task', 'Bug', 'Story', 'Epic', 'Subtask').
        assignee: Assignee's user identifier (string): Email, display name, or account ID (e.g., '[email protected]', 'John Doe', 'accountid:...').
        description: Issue description.
        components: Comma-separated list of component names.
        additional_fields: Dictionary of additional fields.

    Returns:
        JSON string representing the created issue object.

    Raises:
        ValueError: If in read-only mode or Jira client is unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    # Parse components from comma-separated string to list
    components_list = None
    if components and isinstance(components, str):
        components_list = [
            comp.strip() for comp in components.split(",") if comp.strip()
        ]

    # Use additional_fields directly as dict
    extra_fields = additional_fields or {}
    if not isinstance(extra_fields, dict):
        raise ValueError("additional_fields must be a dictionary.")

    issue = jira.create_issue(
        project_key=project_key,
        summary=summary,
        issue_type=issue_type,
        description=description,
        assignee=assignee,
        components=components_list,
        **extra_fields,
    )
    result = issue.to_simplified_dict()
    return json.dumps(
        {"message": "Issue created successfully", "issue": result},
        indent=2,
        ensure_ascii=False,
    )


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def batch_create_issues(
    ctx: Context,
    issues: Annotated[
        str,
        Field(
            description=(
                "JSON array of issue objects. Each object should contain:\n"
                "- project_key (required): The project key (e.g., 'PROJ')\n"
                "- summary (required): Issue summary/title\n"
                "- issue_type (required): Type of issue (e.g., 'Task', 'Bug')\n"
                "- description (optional): Issue description\n"
                "- assignee (optional): Assignee username or email\n"
                "- components (optional): Array of component names\n"
                "Example: [\n"
                '  {"project_key": "PROJ", "summary": "Issue 1", "issue_type": "Task"},\n'
                '  {"project_key": "PROJ", "summary": "Issue 2", "issue_type": "Bug", "components": ["Frontend"]}\n'
                "]"
            )
        ),
    ],
    validate_only: Annotated[
        bool,
        Field(
            description="If true, only validates the issues without creating them",
            default=False,
        ),
    ] = False,
) -> str:
    """Create multiple Jira issues in a batch.

    Args:
        ctx: The FastMCP context.
        issues: JSON array string of issue objects.
        validate_only: If true, only validates without creating.

    Returns:
        JSON string indicating success and listing created issues (or validation result).

    Raises:
        ValueError: If in read-only mode, Jira client unavailable, or invalid JSON.
    """
    jira = await get_jira_fetcher(ctx)
    # Parse issues from JSON string
    try:
        issues_list = json.loads(issues)
        if not isinstance(issues_list, list):
            raise ValueError("Input 'issues' must be a JSON array string.")
    except json.JSONDecodeError:
        raise ValueError("Invalid JSON in issues")
    except Exception as e:
        raise ValueError(f"Invalid input for issues: {e}") from e

    # Create issues in batch
    created_issues = jira.batch_create_issues(issues_list, validate_only=validate_only)

    message = (
        "Issues validated successfully"
        if validate_only
        else "Issues created successfully"
    )
    result = {
        "message": message,
        "issues": [issue.to_simplified_dict() for issue in created_issues],
    }
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def batch_get_changelogs(
    ctx: Context,
    issue_ids_or_keys: Annotated[
        list[str],
        Field(
            description="List of Jira issue IDs or keys, e.g. ['PROJ-123', 'PROJ-124']"
        ),
    ],
    fields: Annotated[
        list[str] | None,
        Field(
            description="(Optional) Filter the changelogs by fields, e.g. ['status', 'assignee']. Default to None for all fields.",
            default=None,
        ),
    ] = None,
    limit: Annotated[
        int,
        Field(
            description=(
                "Maximum number of changelogs to return in result for each issue. "
                "Default to -1 for all changelogs. "
                "Notice that it only limits the results in the response, "
                "the function will still fetch all the data."
            ),
            default=-1,
        ),
    ] = -1,
) -> str:
    """Get changelogs for multiple Jira issues (Cloud only).

    Args:
        ctx: The FastMCP context.
        issue_ids_or_keys: List of issue IDs or keys.
        fields: List of fields to filter changelogs by. None for all fields.
        limit: Maximum changelogs per issue (-1 for all).

    Returns:
        JSON string representing a list of issues with their changelogs.

    Raises:
        NotImplementedError: If run on Jira Server/Data Center.
        ValueError: If Jira client is unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    # Ensure this runs only on Cloud, as per original function docstring
    if not jira.config.is_cloud:
        raise NotImplementedError(
            "Batch get issue changelogs is only available on Jira Cloud."
        )

    # Call the underlying method
    issues_with_changelogs = jira.batch_get_changelogs(
        issue_ids_or_keys=issue_ids_or_keys, fields=fields
    )

    # Format the response
    results = []
    limit_val = None if limit == -1 else limit
    for issue in issues_with_changelogs:
        results.append(
            {
                "issue_id": issue.id,
                "changelogs": [
                    changelog.to_simplified_dict()
                    for changelog in issue.changelogs[:limit_val]
                ],
            }
        )
    return json.dumps(results, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def update_issue(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
    fields: Annotated[
        dict[str, Any],
        Field(
            description=(
                "Dictionary of fields to update. For 'assignee', provide a string identifier (email, name, or accountId). "
                "Example: `{'assignee': '[email protected]', 'summary': 'New Summary'}`"
            )
        ),
    ],
    additional_fields: Annotated[
        dict[str, Any] | None,
        Field(
            description="(Optional) Dictionary of additional fields to update. Use this for custom fields or more complex updates.",
            default=None,
        ),
    ] = None,
    attachments: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) JSON string array or comma-separated list of file paths to attach to the issue. "
                "Example: '/path/to/file1.txt,/path/to/file2.txt' or ['/path/to/file1.txt','/path/to/file2.txt']"
            ),
            default=None,
        ),
    ] = None,
) -> str:
    """Update an existing Jira issue including changing status, adding Epic links, updating fields, etc.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.
        fields: Dictionary of fields to update.
        additional_fields: Optional dictionary of additional fields.
        attachments: Optional JSON array string or comma-separated list of file paths.

    Returns:
        JSON string representing the updated issue object and attachment results.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable, or invalid input.
    """
    jira = await get_jira_fetcher(ctx)
    # Use fields directly as dict
    if not isinstance(fields, dict):
        raise ValueError("fields must be a dictionary.")
    update_fields = fields

    # Use additional_fields directly as dict
    extra_fields = additional_fields or {}
    if not isinstance(extra_fields, dict):
        raise ValueError("additional_fields must be a dictionary.")

    # Parse attachments
    attachment_paths = []
    if attachments:
        if isinstance(attachments, str):
            try:
                parsed = json.loads(attachments)
                if isinstance(parsed, list):
                    attachment_paths = [str(p) for p in parsed]
                else:
                    raise ValueError("attachments JSON string must be an array.")
            except json.JSONDecodeError:
                # Assume comma-separated if not valid JSON array
                attachment_paths = [
                    p.strip() for p in attachments.split(",") if p.strip()
                ]
        else:
            raise ValueError(
                "attachments must be a JSON array string or comma-separated string."
            )

    # Combine fields and additional_fields
    all_updates = {**update_fields, **extra_fields}
    if attachment_paths:
        all_updates["attachments"] = attachment_paths

    try:
        issue = jira.update_issue(issue_key=issue_key, **all_updates)
        result = issue.to_simplified_dict()
        if (
            hasattr(issue, "custom_fields")
            and "attachment_results" in issue.custom_fields
        ):
            result["attachment_results"] = issue.custom_fields["attachment_results"]
        return json.dumps(
            {"message": "Issue updated successfully", "issue": result},
            indent=2,
            ensure_ascii=False,
        )
    except Exception as e:
        logger.error(f"Error updating issue {issue_key}: {str(e)}", exc_info=True)
        raise ValueError(f"Failed to update issue {issue_key}: {str(e)}")


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def delete_issue(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g. PROJ-123)")],
) -> str:
    """Delete an existing Jira issue.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.

    Returns:
        JSON string indicating success.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    deleted = jira.delete_issue(issue_key)
    result = {"message": f"Issue {issue_key} has been deleted successfully."}
    # The underlying method raises on failure, so if we reach here, it's success.
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def add_comment(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
    comment: Annotated[str, Field(description="Comment text in Markdown format")],
) -> str:
    """Add a comment to a Jira issue.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.
        comment: Comment text in Markdown.

    Returns:
        JSON string representing the added comment object.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    # add_comment returns dict
    result = jira.add_comment(issue_key, comment)
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def add_worklog(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
    time_spent: Annotated[
        str,
        Field(
            description=(
                "Time spent in Jira format. Examples: "
                "'1h 30m' (1 hour and 30 minutes), '1d' (1 day), '30m' (30 minutes), '4h' (4 hours)"
            )
        ),
    ],
    comment: Annotated[
        str | None,
        Field(description="(Optional) Comment for the worklog in Markdown format"),
    ] = None,
    started: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) Start time in ISO format. If not provided, the current time will be used. "
                "Example: '2023-08-01T12:00:00.000+0000'"
            )
        ),
    ] = None,
    # Add original_estimate and remaining_estimate as per original tool
    original_estimate: Annotated[
        str | None, Field(description="(Optional) New value for the original estimate")
    ] = None,
    remaining_estimate: Annotated[
        str | None, Field(description="(Optional) New value for the remaining estimate")
    ] = None,
) -> str:
    """Add a worklog entry to a Jira issue.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.
        time_spent: Time spent in Jira format.
        comment: Optional comment in Markdown.
        started: Optional start time in ISO format.
        original_estimate: Optional new original estimate.
        remaining_estimate: Optional new remaining estimate.


    Returns:
        JSON string representing the added worklog object.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    # add_worklog returns dict
    worklog_result = jira.add_worklog(
        issue_key=issue_key,
        time_spent=time_spent,
        comment=comment,
        started=started,
        original_estimate=original_estimate,
        remaining_estimate=remaining_estimate,
    )
    result = {"message": "Worklog added successfully", "worklog": worklog_result}
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def link_to_epic(
    ctx: Context,
    issue_key: Annotated[
        str, Field(description="The key of the issue to link (e.g., 'PROJ-123')")
    ],
    epic_key: Annotated[
        str, Field(description="The key of the epic to link to (e.g., 'PROJ-456')")
    ],
) -> str:
    """Link an existing issue to an epic.

    Args:
        ctx: The FastMCP context.
        issue_key: The key of the issue to link.
        epic_key: The key of the epic to link to.

    Returns:
        JSON string representing the updated issue object.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    issue = jira.link_issue_to_epic(issue_key, epic_key)
    result = {
        "message": f"Issue {issue_key} has been linked to epic {epic_key}.",
        "issue": issue.to_simplified_dict(),
    }
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_issue_link(
    ctx: Context,
    link_type: Annotated[
        str,
        Field(
            description="The type of link to create (e.g., 'Duplicate', 'Blocks', 'Relates to')"
        ),
    ],
    inward_issue_key: Annotated[
        str, Field(description="The key of the inward issue (e.g., 'PROJ-123')")
    ],
    outward_issue_key: Annotated[
        str, Field(description="The key of the outward issue (e.g., 'PROJ-456')")
    ],
    comment: Annotated[
        str | None, Field(description="(Optional) Comment to add to the link")
    ] = None,
    comment_visibility: Annotated[
        dict[str, str] | None,
        Field(
            description="(Optional) Visibility settings for the comment (e.g., {'type': 'group', 'value': 'jira-users'})",
            default=None,
        ),
    ] = None,
) -> str:
    """Create a link between two Jira issues.

    Args:
        ctx: The FastMCP context.
        link_type: The type of link (e.g., 'Blocks').
        inward_issue_key: The key of the source issue.
        outward_issue_key: The key of the target issue.
        comment: Optional comment text.
        comment_visibility: Optional dictionary for comment visibility.

    Returns:
        JSON string indicating success or failure.

    Raises:
        ValueError: If required fields are missing, invalid input, in read-only mode, or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    if not all([link_type, inward_issue_key, outward_issue_key]):
        raise ValueError(
            "link_type, inward_issue_key, and outward_issue_key are required."
        )

    link_data = {
        "type": {"name": link_type},
        "inwardIssue": {"key": inward_issue_key},
        "outwardIssue": {"key": outward_issue_key},
    }

    if comment:
        comment_obj = {"body": comment}
        if comment_visibility and isinstance(comment_visibility, dict):
            if "type" in comment_visibility and "value" in comment_visibility:
                comment_obj["visibility"] = comment_visibility
            else:
                logger.warning("Invalid comment_visibility dictionary structure.")
        link_data["comment"] = comment_obj

    result = jira.create_issue_link(link_data)
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_remote_issue_link(
    ctx: Context,
    issue_key: Annotated[
        str,
        Field(description="The key of the issue to add the link to (e.g., 'PROJ-123')"),
    ],
    url: Annotated[
        str,
        Field(
            description="The URL to link to (e.g., 'https://example.com/page' or Confluence page URL)"
        ),
    ],
    title: Annotated[
        str,
        Field(
            description="The title/name of the link (e.g., 'Documentation Page', 'Confluence Page')"
        ),
    ],
    summary: Annotated[
        str | None, Field(description="(Optional) Description of the link")
    ] = None,
    relationship: Annotated[
        str | None,
        Field(
            description="(Optional) Relationship description (e.g., 'causes', 'relates to', 'documentation')"
        ),
    ] = None,
    icon_url: Annotated[
        str | None, Field(description="(Optional) URL to a 16x16 icon for the link")
    ] = None,
) -> str:
    """Create a remote issue link (web link or Confluence link) for a Jira issue.

    This tool allows you to add web links and Confluence links to Jira issues.
    The links will appear in the issue's "Links" section and can be clicked to navigate to external resources.

    Args:
        ctx: The FastMCP context.
        issue_key: The key of the issue to add the link to.
        url: The URL to link to (can be any web page or Confluence page).
        title: The title/name that will be displayed for the link.
        summary: Optional description of what the link is for.
        relationship: Optional relationship description.
        icon_url: Optional URL to a 16x16 icon for the link.

    Returns:
        JSON string indicating success or failure.

    Raises:
        ValueError: If required fields are missing, invalid input, in read-only mode, or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    if not issue_key:
        raise ValueError("issue_key is required.")
    if not url:
        raise ValueError("url is required.")
    if not title:
        raise ValueError("title is required.")

    # Build the remote link data structure
    link_object = {
        "url": url,
        "title": title,
    }

    if summary:
        link_object["summary"] = summary

    if icon_url:
        link_object["icon"] = {"url16x16": icon_url, "title": title}

    link_data = {"object": link_object}

    if relationship:
        link_data["relationship"] = relationship

    result = jira.create_remote_issue_link(issue_key, link_data)
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def remove_issue_link(
    ctx: Context,
    link_id: Annotated[str, Field(description="The ID of the link to remove")],
) -> str:
    """Remove a link between two Jira issues.

    Args:
        ctx: The FastMCP context.
        link_id: The ID of the link to remove.

    Returns:
        JSON string indicating success.

    Raises:
        ValueError: If link_id is missing, in read-only mode, or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    if not link_id:
        raise ValueError("link_id is required")

    result = jira.remove_issue_link(link_id)  # Returns dict on success
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def transition_issue(
    ctx: Context,
    issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
    transition_id: Annotated[
        str,
        Field(
            description=(
                "ID of the transition to perform. Use the jira_get_transitions tool first "
                "to get the available transition IDs for the issue. Example values: '11', '21', '31'"
            )
        ),
    ],
    fields: Annotated[
        dict[str, Any] | None,
        Field(
            description=(
                "(Optional) Dictionary of fields to update during the transition. "
                "Some transitions require specific fields to be set (e.g., resolution). "
                "Example: {'resolution': {'name': 'Fixed'}}"
            ),
            default=None,
        ),
    ] = None,
    comment: Annotated[
        str | None,
        Field(
            description=(
                "(Optional) Comment to add during the transition. "
                "This will be visible in the issue history."
            ),
        ),
    ] = None,
) -> str:
    """Transition a Jira issue to a new status.

    Args:
        ctx: The FastMCP context.
        issue_key: Jira issue key.
        transition_id: ID of the transition.
        fields: Optional dictionary of fields to update during transition.
        comment: Optional comment for the transition.

    Returns:
        JSON string representing the updated issue object.

    Raises:
        ValueError: If required fields missing, invalid input, in read-only mode, or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    if not issue_key or not transition_id:
        raise ValueError("issue_key and transition_id are required.")

    # Use fields directly as dict
    update_fields = fields or {}
    if not isinstance(update_fields, dict):
        raise ValueError("fields must be a dictionary.")

    issue = jira.transition_issue(
        issue_key=issue_key,
        transition_id=transition_id,
        fields=update_fields,
        comment=comment,
    )

    result = {
        "message": f"Issue {issue_key} transitioned successfully",
        "issue": issue.to_simplified_dict() if issue else None,
    }
    return json.dumps(result, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_sprint(
    ctx: Context,
    board_id: Annotated[str, Field(description="The id of board (e.g., '1000')")],
    sprint_name: Annotated[
        str, Field(description="Name of the sprint (e.g., 'Sprint 1')")
    ],
    start_date: Annotated[
        str, Field(description="Start time for sprint (ISO 8601 format)")
    ],
    end_date: Annotated[
        str, Field(description="End time for sprint (ISO 8601 format)")
    ],
    goal: Annotated[
        str | None, Field(description="(Optional) Goal of the sprint")
    ] = None,
) -> str:
    """Create Jira sprint for a board.

    Args:
        ctx: The FastMCP context.
        board_id: Board ID.
        sprint_name: Sprint name.
        start_date: Start date (ISO format).
        end_date: End date (ISO format).
        goal: Optional sprint goal.

    Returns:
        JSON string representing the created sprint object.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    sprint = jira.create_sprint(
        board_id=board_id,
        sprint_name=sprint_name,
        start_date=start_date,
        end_date=end_date,
        goal=goal,
    )
    return json.dumps(sprint.to_simplified_dict(), indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def update_sprint(
    ctx: Context,
    sprint_id: Annotated[str, Field(description="The id of sprint (e.g., '10001')")],
    sprint_name: Annotated[
        str | None, Field(description="(Optional) New name for the sprint")
    ] = None,
    state: Annotated[
        str | None,
        Field(description="(Optional) New state for the sprint (future|active|closed)"),
    ] = None,
    start_date: Annotated[
        str | None, Field(description="(Optional) New start date for the sprint")
    ] = None,
    end_date: Annotated[
        str | None, Field(description="(Optional) New end date for the sprint")
    ] = None,
    goal: Annotated[
        str | None, Field(description="(Optional) New goal for the sprint")
    ] = None,
) -> str:
    """Update jira sprint.

    Args:
        ctx: The FastMCP context.
        sprint_id: The ID of the sprint.
        sprint_name: Optional new name.
        state: Optional new state (future|active|closed).
        start_date: Optional new start date.
        end_date: Optional new end date.
        goal: Optional new goal.

    Returns:
        JSON string representing the updated sprint object or an error message.

    Raises:
        ValueError: If in read-only mode or Jira client unavailable.
    """
    jira = await get_jira_fetcher(ctx)
    sprint = jira.update_sprint(
        sprint_id=sprint_id,
        sprint_name=sprint_name,
        state=state,
        start_date=start_date,
        end_date=end_date,
        goal=goal,
    )

    if sprint is None:
        error_payload = {
            "error": f"Failed to update sprint {sprint_id}. Check logs for details."
        }
        return json.dumps(error_payload, indent=2, ensure_ascii=False)
    else:
        return json.dumps(sprint.to_simplified_dict(), indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_project_versions(
    ctx: Context,
    project_key: Annotated[str, Field(description="Jira project key (e.g., 'PROJ')")],
) -> str:
    """Get all fix versions for a specific Jira project."""
    jira = await get_jira_fetcher(ctx)
    versions = jira.get_project_versions(project_key)
    return json.dumps(versions, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "read"})
async def get_all_projects(
    ctx: Context,
    include_archived: Annotated[
        bool,
        Field(
            description="Whether to include archived projects in the results",
            default=False,
        ),
    ] = False,
) -> str:
    """Get all Jira projects accessible to the current user.

    Args:
        ctx: The FastMCP context.
        include_archived: Whether to include archived projects.

    Returns:
        JSON string representing a list of project objects accessible to the user.
        Project keys are always returned in uppercase.
        If JIRA_PROJECTS_FILTER is configured, only returns projects matching those keys.

    Raises:
        ValueError: If the Jira client is not configured or available.
    """
    try:
        jira = await get_jira_fetcher(ctx)
        projects = jira.get_all_projects(include_archived=include_archived)
    except (MCPAtlassianAuthenticationError, HTTPError, OSError, ValueError) as e:
        error_message = ""
        log_level = logging.ERROR
        if isinstance(e, MCPAtlassianAuthenticationError):
            error_message = f"Authentication/Permission Error: {str(e)}"
        elif isinstance(e, OSError | HTTPError):
            error_message = f"Network or API Error: {str(e)}"
        elif isinstance(e, ValueError):
            error_message = f"Configuration Error: {str(e)}"

        error_result = {
            "success": False,
            "error": error_message,
        }
        logger.log(log_level, f"get_all_projects failed: {error_message}")
        return json.dumps(error_result, indent=2, ensure_ascii=False)

    # Ensure all project keys are uppercase
    for project in projects:
        if "key" in project:
            project["key"] = project["key"].upper()

    # Apply project filter if configured
    if jira.config.projects_filter:
        # Split projects filter by commas and handle possible whitespace
        allowed_project_keys = {
            p.strip().upper() for p in jira.config.projects_filter.split(",")
        }
        projects = [
            project
            for project in projects
            if project.get("key") in allowed_project_keys
        ]

    return json.dumps(projects, indent=2, ensure_ascii=False)


@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_version(
    ctx: Context,
    project_key: Annotated[str, Field(description="Jira project key (e.g., 'PROJ')")],
    name: Annotated[str, Field(description="Name of the version")],
    start_date: Annotated[
        str | None, Field(description="Start date (YYYY-MM-DD)", default=None)
    ] = None,
    release_date: Annotated[
        str | None, Field(description="Release date (YYYY-MM-DD)", default=None)
    ] = None,
    description: Annotated[
        str | None, Field(description="Description of the version", default=None)
    ] = None,
) -> str:
    """Create a new fix version in a Jira project.

    Args:
        ctx: The FastMCP context.
        project_key: The project key.
        name: Name of the version.
        start_date: Start date (optional).
        release_date: Release date (optional).
        description: Description (optional).

    Returns:
        JSON string of the created version object.
    """
    jira = await get_jira_fetcher(ctx)
    try:
        version = jira.create_project_version(
            project_key=project_key,
            name=name,
            start_date=start_date,
            release_date=release_date,
            description=description,
        )
        return json.dumps(version, indent=2, ensure_ascii=False)
    except Exception as e:
        logger.error(
            f"Error creating version in project {project_key}: {str(e)}", exc_info=True
        )
        return json.dumps(
            {"success": False, "error": str(e)}, indent=2, ensure_ascii=False
        )


@jira_mcp.tool(name="batch_create_versions", tags={"jira", "write"})
@check_write_access
async def batch_create_versions(
    ctx: Context,
    project_key: Annotated[str, Field(description="Jira project key (e.g., 'PROJ')")],
    versions: Annotated[
        str,
        Field(
            description=(
                "JSON array of version objects. Each object should contain:\n"
                "- name (required): Name of the version\n"
                "- startDate (optional): Start date (YYYY-MM-DD)\n"
                "- releaseDate (optional): Release date (YYYY-MM-DD)\n"
                "- description (optional): Description of the version\n"
                "Example: [\n"
                '  {"name": "v1.0", "startDate": "2025-01-01", "releaseDate": "2025-02-01", "description": "First release"},\n'
                '  {"name": "v2.0"}\n'
                "]"
            )
        ),
    ],
) -> str:
    """Batch create multiple versions in a Jira project.

    Args:
        ctx: The FastMCP context.
        project_key: The project key.
        versions: JSON array string of version objects.

    Returns:
        JSON array of results, each with success flag, version or error.
    """
    jira = await get_jira_fetcher(ctx)
    try:
        version_list = json.loads(versions)
        if not isinstance(version_list, list):
            raise ValueError("Input 'versions' must be a JSON array string.")
    except json.JSONDecodeError:
        raise ValueError("Invalid JSON in versions")
    except Exception as e:
        raise ValueError(f"Invalid input for versions: {e}") from e

    results = []
    if not version_list:
        return json.dumps(results, indent=2, ensure_ascii=False)

    for idx, v in enumerate(version_list):
        # Defensive: ensure v is a dict and has a name
        if not isinstance(v, dict) or not v.get("name"):
            results.append(
                {
                    "success": False,
                    "error": f"Item {idx}: Each version must be an object with at least a 'name' field.",
                }
            )
            continue
        try:
            version = jira.create_project_version(
                project_key=project_key,
                name=v["name"],
                start_date=v.get("startDate"),
                release_date=v.get("releaseDate"),
                description=v.get("description"),
            )
            results.append({"success": True, "version": version})
        except Exception as e:
            logger.error(
                f"Error creating version in batch for project {project_key}: {str(e)}",
                exc_info=True,
            )
            results.append({"success": False, "error": str(e), "input": v})
    return json.dumps(results, indent=2, ensure_ascii=False)

```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/issues.py:
--------------------------------------------------------------------------------

```python
"""Module for Jira issue operations."""

import logging
from collections import defaultdict
from typing import Any

from requests.exceptions import HTTPError

from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraIssue
from ..models.jira.common import JiraChangelog
from ..utils import parse_date
from .client import JiraClient
from .constants import DEFAULT_READ_JIRA_FIELDS
from .protocols import (
    AttachmentsOperationsProto,
    EpicOperationsProto,
    FieldsOperationsProto,
    IssueOperationsProto,
    ProjectsOperationsProto,
    UsersOperationsProto,
)

logger = logging.getLogger("mcp-jira")


class IssuesMixin(
    JiraClient,
    AttachmentsOperationsProto,
    EpicOperationsProto,
    FieldsOperationsProto,
    IssueOperationsProto,
    ProjectsOperationsProto,
    UsersOperationsProto,
):
    """Mixin for Jira issue operations."""

    def get_issue(
        self,
        issue_key: str,
        expand: str | None = None,
        comment_limit: int | str | None = 10,
        fields: str | list[str] | tuple[str, ...] | set[str] | None = None,
        properties: str | list[str] | None = None,
        update_history: bool = True,
    ) -> JiraIssue:
        """
        Get a Jira issue by key.

        Args:
            issue_key: The issue key (e.g., PROJECT-123)
            expand: Fields to expand in the response
            comment_limit: Maximum number of comments to include, or "all"
            fields: Fields to return (comma-separated string, list, tuple, set, or "*all")
            properties: Issue properties to return (comma-separated string or list)
            update_history: Whether to update the issue view history

        Returns:
            JiraIssue model with issue data and metadata

        Raises:
            MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
            Exception: If there is an error retrieving the issue
        """
        try:
            # Obtain the projects filter from the config.
            # These should NOT be overridden by the request.
            filter_to_use = self.config.projects_filter

            # Apply projects filter if present
            if filter_to_use:
                # Split projects filter by commas and handle possible whitespace
                projects = [p.strip() for p in filter_to_use.split(",")]

                # Obtain the project key from issue_key
                issue_key_project = issue_key.split("-")[0]

                if issue_key_project not in projects:
                    # If the project key not in the filter, return an empty issue
                    msg = (
                        "Issue with project prefix "
                        f"'{issue_key_project}' are restricted by configuration"
                    )
                    raise ValueError(msg)

            # Determine fields_param: use provided fields or default from constant
            fields_param = fields
            if fields_param is None:
                fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
            elif isinstance(fields_param, list | tuple | set):
                fields_param = ",".join(fields_param)

            # Ensure necessary fields are included based on special parameters
            if (
                fields_param == ",".join(DEFAULT_READ_JIRA_FIELDS)
                or fields_param == "*all"
            ):
                # Default fields are being used - preserve the order
                default_fields_list = (
                    fields_param.split(",")
                    if fields_param != "*all"
                    else list(DEFAULT_READ_JIRA_FIELDS)
                )
                additional_fields = []

                # Add appropriate fields based on expand parameter
                if expand:
                    expand_params = expand.split(",")
                    if (
                        "changelog" in expand_params
                        and "changelog" not in default_fields_list
                        and "changelog" not in additional_fields
                    ):
                        additional_fields.append("changelog")
                    if (
                        "renderedFields" in expand_params
                        and "rendered" not in default_fields_list
                        and "rendered" not in additional_fields
                    ):
                        additional_fields.append("rendered")

                # Add appropriate fields based on properties parameter
                if (
                    properties
                    and "properties" not in default_fields_list
                    and "properties" not in additional_fields
                ):
                    additional_fields.append("properties")

                # Combine default fields with additional fields, preserving order
                if additional_fields:
                    fields_param = ",".join(default_fields_list + additional_fields)
            # Handle non-default fields string

            # Build expand parameter if provided
            expand_param = expand

            # Convert properties to proper format if it's a list
            properties_param = properties
            if properties and isinstance(properties, list | tuple | set):
                properties_param = ",".join(properties)

            # Get the issue data with all parameters
            issue = self.jira.get_issue(
                issue_key,
                expand=expand_param,
                fields=fields_param,
                properties=properties_param,
                update_history=update_history,
            )
            if not issue:
                msg = f"Issue {issue_key} not found"
                raise ValueError(msg)
            if not isinstance(issue, dict):
                msg = (
                    f"Unexpected return value type from `jira.get_issue`: {type(issue)}"
                )
                logger.error(msg)
                raise TypeError(msg)

            # Extract fields data, safely handling None
            fields_data = issue.get("fields", {}) or {}

            # Get comments if needed
            if "comment" in fields_data:
                comment_limit_int = self._normalize_comment_limit(comment_limit)
                comments = self._get_issue_comments_if_needed(
                    issue_key, comment_limit_int
                )
                # Add comments to the issue data for processing by the model
                fields_data["comment"]["comments"] = comments

            # Extract epic information
            try:
                epic_info = self._extract_epic_information(issue)
            except Exception as e:
                logger.warning(f"Error extracting epic information: {str(e)}")
                epic_info = {"epic_key": None, "epic_name": None}

            # If this is linked to an epic, add the epic information to the fields
            if epic_info.get("epic_key"):
                try:
                    # Get field IDs for epic fields
                    field_ids = self.get_field_ids_to_epic()

                    # Add epic link field if it doesn't exist
                    if (
                        "epic_link" in field_ids
                        and field_ids["epic_link"] not in fields_data
                    ):
                        fields_data[field_ids["epic_link"]] = epic_info["epic_key"]

                    # Add epic name field if it doesn't exist
                    if (
                        epic_info.get("epic_name")
                        and "epic_name" in field_ids
                        and field_ids["epic_name"] not in fields_data
                    ):
                        fields_data[field_ids["epic_name"]] = epic_info["epic_name"]
                except Exception as e:
                    logger.warning(f"Error setting epic fields: {str(e)}")

            # Update the issue data with the fields
            issue["fields"] = fields_data

            # Create and return the JiraIssue model, passing requested_fields
            return JiraIssue.from_api_response(
                issue,
                base_url=self.config.url if hasattr(self, "config") else None,
                requested_fields=fields,
            )
        except HTTPError as http_err:
            if http_err.response is not None and http_err.response.status_code in [
                401,
                403,
            ]:
                error_msg = (
                    f"Authentication failed for Jira API ({http_err.response.status_code}). "
                    "Token may be expired or invalid. Please verify credentials."
                )
                logger.error(error_msg)
                raise MCPAtlassianAuthenticationError(error_msg) from http_err
            else:
                logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
                raise
        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error retrieving issue {issue_key}: {error_msg}")
            raise Exception(f"Error retrieving issue {issue_key}: {error_msg}") from e

    def _normalize_comment_limit(self, comment_limit: int | str | None) -> int | None:
        """
        Normalize the comment limit to an integer or None.

        Args:
            comment_limit: The comment limit as int, string, or None

        Returns:
            Normalized comment limit as int or None
        """
        if comment_limit is None:
            return None

        if isinstance(comment_limit, int):
            return comment_limit

        if comment_limit == "all":
            return None  # No limit

        # Try to convert to int
        try:
            return int(comment_limit)
        except ValueError:
            # If conversion fails, default to 10
            return 10

    def _get_issue_comments_if_needed(
        self, issue_key: str, comment_limit: int | None
    ) -> list[dict]:
        """
        Get comments for an issue if needed.

        Args:
            issue_key: The issue key
            comment_limit: Maximum number of comments to include

        Returns:
            List of comments
        """
        if comment_limit is None or comment_limit > 0:
            try:
                response = self.jira.issue_get_comments(issue_key)
                if not isinstance(response, dict):
                    msg = f"Unexpected return value type from `jira.issue_get_comments`: {type(response)}"
                    logger.error(msg)
                    raise TypeError(msg)

                comments = response["comments"]

                # Limit comments if needed
                if comment_limit is not None:
                    comments = comments[:comment_limit]

                return comments
            except Exception as e:
                logger.warning(f"Error getting comments for {issue_key}: {str(e)}")
                return []
        return []

    def _extract_epic_information(self, issue: dict) -> dict[str, str | None]:
        """
        Extract epic information from an issue.

        Args:
            issue: The issue data

        Returns:
            Dictionary with epic information
        """
        # Initialize with default values
        epic_info = {
            "epic_key": None,
            "epic_name": None,
            "epic_summary": None,
            "is_epic": False,
        }

        try:
            fields = issue.get("fields", {}) or {}
            issue_type = fields.get("issuetype", {}).get("name", "").lower()

            # Get field IDs for epic fields
            try:
                field_ids = self.get_field_ids_to_epic()
            except Exception as e:
                logger.warning(f"Error getting Jira fields: {str(e)}")
                field_ids = {}

            # Check if this is an epic
            if issue_type == "epic":
                epic_info["is_epic"] = True

                # Use the discovered field ID for epic name
                if "epic_name" in field_ids and field_ids["epic_name"] in fields:
                    epic_info["epic_name"] = fields.get(field_ids["epic_name"], "")

            # If not an epic, check for epic link
            elif "epic_link" in field_ids:
                epic_link_field = field_ids["epic_link"]

                if epic_link_field in fields and fields[epic_link_field]:
                    epic_key = fields[epic_link_field]
                    epic_info["epic_key"] = epic_key

                    # Try to get epic details
                    try:
                        epic = self.jira.get_issue(
                            epic_key,
                            expand=None,
                            fields=None,
                            properties=None,
                            update_history=True,
                        )
                        if not isinstance(epic, dict):
                            msg = f"Unexpected return value type from `jira.get_issue`: {type(epic)}"
                            logger.error(msg)
                            raise TypeError(msg)

                        epic_fields = epic.get("fields", {}) or {}

                        # Get epic name using the discovered field ID
                        if "epic_name" in field_ids:
                            epic_info["epic_name"] = epic_fields.get(
                                field_ids["epic_name"], ""
                            )

                        epic_info["epic_summary"] = epic_fields.get("summary", "")
                    except Exception as e:
                        logger.warning(
                            f"Error getting epic details for {epic_key}: {str(e)}"
                        )
        except Exception as e:
            logger.warning(f"Error extracting epic information: {str(e)}")

        return epic_info

    def _format_issue_content(
        self,
        issue_key: str,
        issue: dict,
        description: str,
        comments: list[dict],
        created_date: str,
        epic_info: dict[str, str | None],
    ) -> str:
        """
        Format issue content for display.

        Args:
            issue_key: The issue key
            issue: The issue data
            description: The issue description
            comments: The issue comments
            created_date: The formatted creation date
            epic_info: Epic information

        Returns:
            Formatted issue content
        """
        fields = issue.get("fields", {})

        # Basic issue information
        summary = fields.get("summary", "")
        status = fields.get("status", {}).get("name", "")
        issue_type = fields.get("issuetype", {}).get("name", "")

        # Format content
        content = [f"# {issue_key}: {summary}"]
        content.append(f"**Type**: {issue_type}")
        content.append(f"**Status**: {status}")
        content.append(f"**Created**: {created_date}")

        # Add reporter
        reporter = fields.get("reporter", {})
        reporter_name = reporter.get("displayName", "") or reporter.get("name", "")
        if reporter_name:
            content.append(f"**Reporter**: {reporter_name}")

        # Add assignee
        assignee = fields.get("assignee", {})
        assignee_name = assignee.get("displayName", "") or assignee.get("name", "")
        if assignee_name:
            content.append(f"**Assignee**: {assignee_name}")

        # Add epic information
        if epic_info["is_epic"]:
            content.append(f"**Epic Name**: {epic_info['epic_name']}")
        elif epic_info["epic_key"]:
            content.append(
                f"**Epic**: [{epic_info['epic_key']}] {epic_info['epic_summary']}"
            )

        # Add description
        if description:
            content.append("\n## Description\n")
            content.append(description)

        # Add comments
        if comments:
            content.append("\n## Comments\n")
            for comment in comments:
                author = comment.get("author", {})
                author_name = author.get("displayName", "") or author.get("name", "")
                comment_body = self._clean_text(comment.get("body", ""))

                if author_name and comment_body:
                    comment_date = comment.get("created", "")
                    if comment_date:
                        comment_date = parse_date(comment_date)
                        content.append(f"**{author_name}** ({comment_date}):")
                    else:
                        content.append(f"**{author_name}**:")

                    content.append(f"{comment_body}\n")

        return "\n".join(content)

    def _create_issue_metadata(
        self,
        issue_key: str,
        issue: dict,
        comments: list[dict],
        created_date: str,
        epic_info: dict[str, str | None],
    ) -> dict[str, Any]:
        """
        Create metadata for a Jira issue.

        Args:
            issue_key: The issue key
            issue: The issue data
            comments: The issue comments
            created_date: The formatted creation date
            epic_info: Epic information

        Returns:
            Metadata dictionary
        """
        fields = issue.get("fields", {})

        # Initialize metadata
        metadata = {
            "key": issue_key,
            "title": fields.get("summary", ""),
            "status": fields.get("status", {}).get("name", ""),
            "type": fields.get("issuetype", {}).get("name", ""),
            "created": created_date,
            "url": f"{self.config.url}/browse/{issue_key}",
        }

        # Add assignee if available
        assignee = fields.get("assignee", {})
        if assignee:
            metadata["assignee"] = assignee.get("displayName", "") or assignee.get(
                "name", ""
            )

        # Add epic information
        if epic_info["is_epic"]:
            metadata["is_epic"] = True
            metadata["epic_name"] = epic_info["epic_name"]
        elif epic_info["epic_key"]:
            metadata["epic_key"] = epic_info["epic_key"]
            metadata["epic_name"] = epic_info["epic_name"]
            metadata["epic_summary"] = epic_info["epic_summary"]

        # Add comment count
        metadata["comment_count"] = len(comments)

        return metadata

    def create_issue(
        self,
        project_key: str,
        summary: str,
        issue_type: str,
        description: str = "",
        assignee: str | None = None,
        components: list[str] | None = None,
        **kwargs: Any,  # noqa: ANN401 - Dynamic field types are necessary for Jira API
    ) -> JiraIssue:
        """
        Create a new Jira issue.

        Args:
            project_key: The key of the project
            summary: The issue summary
            issue_type: The type of issue to create
            description: The issue description
            assignee: The username or account ID of the assignee
            components: List of component names to assign (e.g., ["Frontend", "API"])
            **kwargs: Additional fields to set on the issue

        Returns:
            JiraIssue model representing the created issue

        Raises:
            Exception: If there is an error creating the issue
        """
        try:
            # Validate required fields
            if not project_key:
                raise ValueError("Project key is required")
            if not summary:
                raise ValueError("Summary is required")
            if not issue_type:
                raise ValueError("Issue type is required")

            # Handle Epic and Subtask issue type names across different languages
            actual_issue_type = issue_type
            if self._is_epic_issue_type(issue_type) and issue_type.lower() == "epic":
                # If the user provided "Epic" but we need to find the localized name
                epic_type_name = self._find_epic_issue_type_name(project_key)
                if epic_type_name:
                    actual_issue_type = epic_type_name
                    logger.info(
                        f"Using localized Epic issue type name: {actual_issue_type}"
                    )
            elif issue_type.lower() in ["subtask", "sub-task"]:
                # If the user provided "Subtask" but we need to find the localized name
                subtask_type_name = self._find_subtask_issue_type_name(project_key)
                if subtask_type_name:
                    actual_issue_type = subtask_type_name
                    logger.info(
                        f"Using localized Subtask issue type name: {actual_issue_type}"
                    )

            # Prepare fields
            fields: dict[str, Any] = {
                "project": {"key": project_key},
                "summary": summary,
                "issuetype": {"name": actual_issue_type},
            }

            # Add description if provided (convert from Markdown to Jira format)
            if description:
                fields["description"] = self._markdown_to_jira(description)

            # Add assignee if provided
            if assignee:
                try:
                    # _get_account_id now returns the correct identifier (accountId for cloud, name for server)
                    assignee_identifier = self._get_account_id(assignee)
                    self._add_assignee_to_fields(fields, assignee_identifier)
                except ValueError as e:
                    logger.warning(f"Could not assign issue: {str(e)}")

            # Add components if provided
            if components:
                if isinstance(components, list):
                    # Filter out any None or empty/whitespace-only strings
                    valid_components = [
                        comp_name.strip()
                        for comp_name in components
                        if isinstance(comp_name, str) and comp_name.strip()
                    ]
                    if valid_components:
                        # Format as list of {"name": ...} dicts for the API
                        fields["components"] = [
                            {"name": comp_name} for comp_name in valid_components
                        ]

            # Make a copy of kwargs to preserve original values for two-step Epic creation
            kwargs_copy = kwargs.copy()

            # Prepare epic fields if this is an epic
            # This step now stores epic-specific fields in kwargs for post-creation update
            if self._is_epic_issue_type(issue_type):
                self._prepare_epic_fields(fields, summary, kwargs)

            # Prepare parent field if this is a subtask
            if issue_type.lower() == "subtask" or issue_type.lower() == "sub-task":
                self._prepare_parent_fields(fields, kwargs)
            # Allow parent field for all issue types when explicitly provided
            elif "parent" in kwargs:
                self._prepare_parent_fields(fields, kwargs)

            # Process **kwargs using the dynamic field map
            self._process_additional_fields(fields, kwargs_copy)

            # Create the issue
            response = self.jira.create_issue(fields=fields)
            if not isinstance(response, dict):
                msg = f"Unexpected return value type from `jira.create_issue`: {type(response)}"
                logger.error(msg)
                raise TypeError(msg)

            # Get the created issue key
            issue_key = response.get("key")
            if not issue_key:
                error_msg = "No issue key in response"
                raise ValueError(error_msg)

            # For Epics, perform the second step: update Epic-specific fields
            if self._is_epic_issue_type(issue_type):
                # Check if we have any stored Epic fields to update
                has_epic_fields = any(k.startswith("__epic_") for k in kwargs)
                if has_epic_fields:
                    logger.info(
                        f"Performing post-creation update for Epic {issue_key} with Epic-specific fields"
                    )
                    try:
                        return self.update_epic_fields(issue_key, kwargs)
                    except Exception as update_error:
                        logger.error(
                            f"Error during post-creation update of Epic {issue_key}: {str(update_error)}"
                        )
                        logger.info(
                            "Continuing with the original Epic that was successfully created"
                        )

            # Get the full issue data and convert to JiraIssue model
            issue_data = self.jira.get_issue(issue_key)
            if not isinstance(issue_data, dict):
                msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
                logger.error(msg)
                raise TypeError(msg)
            return JiraIssue.from_api_response(issue_data)

        except Exception as e:
            self._handle_create_issue_error(e, issue_type)
            raise  # Re-raise after logging

    def _is_epic_issue_type(self, issue_type: str) -> bool:
        """
        Check if an issue type is an Epic, handling localized names.

        Args:
            issue_type: The issue type name to check

        Returns:
            True if the issue type is an Epic, False otherwise
        """
        # Common Epic names in different languages
        epic_names = {
            "epic",  # English
            "에픽",  # Korean
            "エピック",  # Japanese
            "史诗",  # Chinese (Simplified)
            "史詩",  # Chinese (Traditional)
            "épica",  # Spanish/Portuguese
            "épique",  # French
            "epik",  # Turkish
            "эпик",  # Russian
            "епік",  # Ukrainian
        }

        return issue_type.lower() in epic_names or "epic" in issue_type.lower()

    def _find_epic_issue_type_name(self, project_key: str) -> str | None:
        """
        Find the actual Epic issue type name for a project.

        Args:
            project_key: The project key

        Returns:
            The Epic issue type name if found, None otherwise
        """
        try:
            issue_types = self.get_project_issue_types(project_key)
            for issue_type in issue_types:
                type_name = issue_type.get("name", "")
                if self._is_epic_issue_type(type_name):
                    return type_name
            return None
        except Exception as e:
            logger.warning(f"Could not get issue types for project {project_key}: {e}")
            return None

    def _find_subtask_issue_type_name(self, project_key: str) -> str | None:
        """
        Find the actual Subtask issue type name for a project.

        Args:
            project_key: The project key

        Returns:
            The Subtask issue type name if found, None otherwise
        """
        try:
            issue_types = self.get_project_issue_types(project_key)
            for issue_type in issue_types:
                # Check the subtask field - this is the most reliable way
                if issue_type.get("subtask", False):
                    return issue_type.get("name")
            return None
        except Exception as e:
            logger.warning(f"Could not get issue types for project {project_key}: {e}")
            return None

    def _prepare_epic_fields(
        self, fields: dict[str, Any], summary: str, kwargs: dict[str, Any]
    ) -> None:
        """
        Prepare fields for epic creation.

        This method delegates to the prepare_epic_fields method in EpicsMixin.

        Args:
            fields: The fields dictionary to update
            summary: The epic summary
            kwargs: Additional fields from the user
        """
        # Extract project_key from fields if available
        project_key = None
        if "project" in fields:
            if isinstance(fields["project"], dict):
                project_key = fields["project"].get("key")
            elif isinstance(fields["project"], str):
                project_key = fields["project"]

        # Delegate to EpicsMixin.prepare_epic_fields with project_key
        # Since JiraFetcher inherits from both IssuesMixin and EpicsMixin,
        # this will correctly use the prepare_epic_fields method from EpicsMixin
        # which implements the two-step Epic creation approach
        self.prepare_epic_fields(fields, summary, kwargs, project_key)

    def _prepare_parent_fields(
        self, fields: dict[str, Any], kwargs: dict[str, Any]
    ) -> None:
        """
        Prepare fields for parent relationship.

        Args:
            fields: The fields dictionary to update
            kwargs: Additional fields from the user

        Raises:
            ValueError: If parent issue key is not specified for a subtask
        """
        if "parent" in kwargs:
            parent_key = kwargs.get("parent")
            if parent_key:
                fields["parent"] = {"key": parent_key}
            # Remove parent from kwargs to avoid double processing
            kwargs.pop("parent", None)
        elif "issuetype" in fields and fields["issuetype"]["name"].lower() in (
            "subtask",
            "sub-task",
        ):
            # Only raise error if issue type is subtask and parent is missing
            raise ValueError(
                "Issue type is a sub-task but parent issue key or id not specified. Please provide a 'parent' parameter with the parent issue key."
            )

    def _add_assignee_to_fields(self, fields: dict[str, Any], assignee: str) -> None:
        """
        Add assignee to issue fields.

        Args:
            fields: The fields dictionary to update
            assignee: The assignee account ID
        """
        # Cloud instance uses accountId
        if self.config.is_cloud:
            fields["assignee"] = {"accountId": assignee}
        else:
            # Server/DC might use name instead of accountId
            fields["assignee"] = {"name": assignee}

    def _process_additional_fields(
        self, fields: dict[str, Any], kwargs: dict[str, Any]
    ) -> None:
        """
        Processes keyword arguments to add standard or custom fields to the issue fields dictionary.
        Uses the dynamic field map from FieldsMixin to identify field IDs.

        Args:
            fields: The fields dictionary to update
            kwargs: Additional fields provided via **kwargs
        """
        # Ensure field map is loaded/cached
        field_map = (
            self._generate_field_map()
        )  # Ensure map is ready (method from FieldsMixin)
        if not field_map:
            logger.error(
                "Could not generate field map. Cannot process additional fields."
            )
            return

        # Process each kwarg
        # Iterate over a copy to allow modification of the original kwargs if needed elsewhere
        for key, value in kwargs.copy().items():
            # Skip keys used internally for epic/parent handling or explicitly handled args like assignee/components
            if key.startswith("__epic_") or key in ("parent", "assignee", "components"):
                continue

            normalized_key = key.lower()
            api_field_id = None

            # 1. Check if key is a known field name in the map
            if normalized_key in field_map:
                api_field_id = field_map[normalized_key]
                logger.debug(
                    f"Identified field '{key}' as '{api_field_id}' via name map."
                )

            # 2. Check if key is a direct custom field ID
            elif key.startswith("customfield_"):
                api_field_id = key
                logger.debug(f"Identified field '{key}' as direct custom field ID.")

            # 3. Check if key is a standard system field ID (like 'summary', 'priority')
            elif key in field_map:  # Check original case for system fields
                api_field_id = field_map[key]
                logger.debug(f"Identified field '{key}' as standard system field ID.")

            if api_field_id:
                # Get the full field definition for formatting context if needed
                field_definition = self.get_field_by_id(
                    api_field_id
                )  # From FieldsMixin
                formatted_value = self._format_field_value_for_write(
                    api_field_id, value, field_definition
                )
                if formatted_value is not None:  # Only add if formatting didn't fail
                    fields[api_field_id] = formatted_value
                    logger.debug(
                        f"Added field '{api_field_id}' from kwarg '{key}': {formatted_value}"
                    )
                else:
                    logger.warning(
                        f"Skipping field '{key}' due to formatting error or invalid value."
                    )
            else:
                # 4. Unrecognized key - log a warning and skip
                logger.warning(
                    f"Ignoring unrecognized field '{key}' passed via kwargs."
                )

    def _format_field_value_for_write(
        self, field_id: str, value: Any, field_definition: dict | None
    ) -> Any:
        """Formats field values for the Jira API."""
        # Get schema type if definition is available
        schema_type = (
            field_definition.get("schema", {}).get("type") if field_definition else None
        )
        # Prefer name from definition if available, else use ID for logging/lookup
        field_name_for_format = (
            field_definition.get("name", field_id) if field_definition else field_id
        )

        # Example formatting rules based on standard field names (use lowercase for comparison)
        normalized_name = field_name_for_format.lower()

        if normalized_name == "priority":
            if isinstance(value, str):
                return {"name": value}
            elif isinstance(value, dict) and ("name" in value or "id" in value):
                return value  # Assume pre-formatted
            else:
                logger.warning(
                    f"Invalid format for priority field: {value}. Expected string name or dict."
                )
                return None  # Or raise error
        elif normalized_name == "labels":
            if isinstance(value, list) and all(isinstance(item, str) for item in value):
                return value
            # Allow comma-separated string if passed via additional_fields JSON string
            elif isinstance(value, str):
                return [label.strip() for label in value.split(",") if label.strip()]
            else:
                logger.warning(
                    f"Invalid format for labels field: {value}. Expected list of strings or comma-separated string."
                )
                return None
        elif normalized_name in ["fixversions", "versions", "components"]:
            # These expect lists of objects, typically {"name": "..."} or {"id": "..."}
            if isinstance(value, list):
                formatted_list = []
                for item in value:
                    if isinstance(item, str):
                        formatted_list.append({"name": item})  # Convert simple strings
                    elif isinstance(item, dict) and ("name" in item or "id" in item):
                        formatted_list.append(item)  # Keep pre-formatted dicts
                    else:
                        logger.warning(
                            f"Invalid item format in {normalized_name} list: {item}"
                        )
                return formatted_list
            else:
                logger.warning(
                    f"Invalid format for {normalized_name} field: {value}. Expected list."
                )
                return None
        elif normalized_name == "reporter":
            if isinstance(value, str):
                try:
                    reporter_identifier = self._get_account_id(value)
                    if self.config.is_cloud:
                        return {"accountId": reporter_identifier}
                    else:
                        return {"name": reporter_identifier}
                except ValueError as e:
                    logger.warning(f"Could not format reporter field: {str(e)}")
                    return None
            elif isinstance(value, dict) and ("name" in value or "accountId" in value):
                return value  # Assume pre-formatted
            else:
                logger.warning(f"Invalid format for reporter field: {value}")
                return None
        # Add more formatting rules for other standard fields based on schema_type or field_id
        elif normalized_name == "duedate":
            if isinstance(value, str):  # Basic check, could add date validation
                return value
            else:
                logger.warning(
                    f"Invalid format for duedate field: {value}. Expected YYYY-MM-DD string."
                )
                return None
        elif schema_type == "datetime" and isinstance(value, str):
            # Example: Ensure datetime fields are in ISO format if needed by API
            try:
                dt = parse_date(value)  # Assuming parse_date handles various inputs
                return (
                    dt.isoformat() if dt else value
                )  # Return ISO or original if parse fails
            except Exception:
                logger.warning(
                    f"Could not parse datetime for field {field_id}: {value}"
                )
                return value  # Return original on error

        # Default: return value as is if no specific formatting needed/identified
        return value

    def _handle_create_issue_error(self, exception: Exception, issue_type: str) -> None:
        """
        Handle errors when creating an issue.

        Args:
            exception: The exception that occurred
            issue_type: The type of issue being created
        """
        error_msg = str(exception)

        # Check for specific error types
        if "epic name" in error_msg.lower() or "epicname" in error_msg.lower():
            logger.error(
                f"Error creating {issue_type}: {error_msg}. "
                "Try specifying an epic_name in the additional fields"
            )
        elif "customfield" in error_msg.lower():
            logger.error(
                f"Error creating {issue_type}: {error_msg}. "
                "This may be due to a required custom field"
            )
        else:
            logger.error(f"Error creating {issue_type}: {error_msg}")

    def update_issue(
        self,
        issue_key: str,
        fields: dict[str, Any] | None = None,
        **kwargs: Any,  # noqa: ANN401 - Dynamic field types are necessary for Jira API
    ) -> JiraIssue:
        """
        Update a Jira issue.

        Args:
            issue_key: The key of the issue to update
            fields: Dictionary of fields to update
            **kwargs: Additional fields to update. Special fields include:
                - attachments: List of file paths to upload as attachments
                - status: New status for the issue (handled via transitions)
                - assignee: New assignee for the issue

        Returns:
            JiraIssue model representing the updated issue

        Raises:
            Exception: If there is an error updating the issue
        """
        try:
            # Validate required fields
            if not issue_key:
                raise ValueError("Issue key is required")

            update_fields = fields or {}
            attachments_result = None

            # Convert description from Markdown to Jira format if present
            if "description" in update_fields:
                update_fields["description"] = self._markdown_to_jira(
                    update_fields["description"]
                )

            # Process kwargs
            for key, value in kwargs.items():
                if key == "status":
                    # Status changes are handled separately via transitions
                    # Add status to fields so _update_issue_with_status can find it
                    update_fields["status"] = value
                    return self._update_issue_with_status(issue_key, update_fields)

                elif key == "attachments":
                    # Handle attachments separately - they're not part of fields update
                    if value and isinstance(value, list | tuple):
                        # We'll process attachments after updating fields
                        pass
                    else:
                        logger.warning(f"Invalid attachments value: {value}")

                elif key == "assignee":
                    # Handle assignee updates, allow unassignment with None or empty string
                    if value is None or value == "":
                        update_fields["assignee"] = None
                    else:
                        try:
                            account_id = self._get_account_id(value)
                            self._add_assignee_to_fields(update_fields, account_id)
                        except ValueError as e:
                            logger.warning(f"Could not update assignee: {str(e)}")
                elif key == "description":
                    # Handle description with markdown conversion
                    update_fields["description"] = self._markdown_to_jira(value)
                else:
                    # Process regular fields using _process_additional_fields
                    # Create a temporary dict with just this field
                    field_kwargs = {key: value}
                    self._process_additional_fields(update_fields, field_kwargs)

            # Update the issue fields
            if update_fields:
                self.jira.update_issue(
                    issue_key=issue_key, update={"fields": update_fields}
                )

            # Handle attachments if provided
            if "attachments" in kwargs and kwargs["attachments"]:
                try:
                    attachments_result = self.upload_attachments(
                        issue_key, kwargs["attachments"]
                    )
                    logger.info(
                        f"Uploaded attachments to {issue_key}: {attachments_result}"
                    )
                except Exception as e:
                    logger.error(
                        f"Error uploading attachments to {issue_key}: {str(e)}"
                    )
                    # Continue with the update even if attachments fail

            # Get the updated issue data and convert to JiraIssue model
            issue_data = self.jira.get_issue(issue_key)
            if not isinstance(issue_data, dict):
                msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
                logger.error(msg)
                raise TypeError(msg)
            issue = JiraIssue.from_api_response(issue_data)

            # Add attachment results to the response if available
            if attachments_result:
                issue.custom_fields["attachment_results"] = attachments_result

            return issue

        except Exception as e:
            error_msg = str(e)
            logger.error(f"Error updating issue {issue_key}: {error_msg}")
            raise ValueError(f"Failed to update issue {issue_key}: {error_msg}") from e

    def _update_issue_with_status(
        self, issue_key: str, fields: dict[str, Any]
    ) -> JiraIssue:
        """
        Update an issue with a status change.

        Args:
            issue_key: The key of the issue to update
            fields: Dictionary of fields to update

        Returns:
            JiraIssue model representing the updated issue

        Raises:
            Exception: If there is an error updating the issue
        """
        # Extract status from fields and remove it for the standard update
        status = fields.pop("status", None)

        # First update any fields if needed
        if fields:
            self.jira.update_issue(issue_key=issue_key, fields=fields)  # type: ignore[call-arg]

        # If no status change is requested, return the issue
        if not status:
            issue_data = self.jira.get_issue(issue_key)
            if not isinstance(issue_data, dict):
                msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
                logger.error(msg)
                raise TypeError(msg)
            return JiraIssue.from_api_response(issue_data)

        # Get available transitions (uses TransitionsMixin's normalized implementation)
        transitions = self.get_available_transitions(issue_key)  # type: ignore[attr-defined]

        # Extract status name or ID depending on what we received
        status_name = None
        status_id = None

        # Handle different input formats for status
        if isinstance(status, dict):
            # Dictionary format: {"name": "In Progress"} or {"id": "123"}
            status_name = status.get("name")
            status_id = status.get("id")
        elif isinstance(status, str):
            # String format: could be a name or an ID
            if status.isdigit():
                status_id = status
            else:
                status_name = status
        elif isinstance(status, int):
            # Integer format: must be an ID
            status_id = str(status)
        else:
            # Unknown format
            logger.warning(
                f"Unrecognized status format: {status} (type: {type(status)})"
            )
            status_name = str(status)

        # Log what we're searching for
        if status_name:
            logger.info(f"Looking for transition to status name: '{status_name}'")
        if status_id:
            logger.info(f"Looking for transition with ID: '{status_id}'")

        # Find the appropriate transition
        transition_id = None
        for transition in transitions:
            # TransitionsMixin returns normalized transitions with 'to_status' field
            transition_status_name = transition.get("to_status", "")

            # Match by name (case-insensitive)
            if (
                status_name
                and transition_status_name
                and transition_status_name.lower() == status_name.lower()
            ):
                transition_id = transition.get("id")
                logger.info(
                    f"Found transition ID {transition_id} matching status name '{status_name}'"
                )
                break

            # Direct transition ID match (if status is actually a transition ID)
            if status_id and str(transition.get("id", "")) == str(status_id):
                transition_id = transition.get("id")
                logger.info(f"Using direct transition ID {transition_id}")
                break

        if not transition_id:
            # Build list of available statuses from normalized transitions
            available_statuses = []
            for t in transitions:
                # Include transition name and target status if available
                transition_name = t.get("name", "")
                to_status = t.get("to_status", "")
                if to_status:
                    available_statuses.append(f"{transition_name} -> {to_status}")
                elif transition_name:
                    available_statuses.append(transition_name)

            available_statuses_str = (
                ", ".join(available_statuses) if available_statuses else "None found"
            )
            error_msg = (
                f"Could not find transition to status '{status}'. "
                f"Available transitions: {available_statuses_str}"
            )
            logger.error(error_msg)
            raise ValueError(error_msg)

        # Perform the transition
        logger.info(f"Performing transition with ID {transition_id}")
        self.jira.set_issue_status_by_transition_id(
            issue_key=issue_key,
            transition_id=(
                int(transition_id)
                if isinstance(transition_id, str) and transition_id.isdigit()
                else transition_id
            ),
        )

        # Get the updated issue data
        issue_data = self.jira.get_issue(issue_key)
        if not isinstance(issue_data, dict):
            msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
            logger.error(msg)
            raise TypeError(msg)
        return JiraIssue.from_api_response(issue_data)

    def delete_issue(self, issue_key: str) -> bool:
        """
        Delete a Jira issue.

        Args:
            issue_key: The key of the issue to delete

        Returns:
            True if the issue was deleted successfully

        Raises:
            Exception: If there is an error deleting the issue
        """
        try:
            self.jira.delete_issue(issue_key)
            return True
        except Exception as e:
            msg = f"Error deleting issue {issue_key}: {str(e)}"
            logger.error(msg)
            raise Exception(msg) from e

    def _log_available_fields(self, fields: list[dict]) -> None:
        """
        Log available fields for debugging.

        Args:
            fields: List of field definitions
        """
        logger.debug("Available Jira fields:")
        for field in fields:
            logger.debug(
                f"{field.get('id')}: {field.get('name')} ({field.get('schema', {}).get('type')})"
            )

    def _process_field_for_epic_data(
        self, field: dict, field_ids: dict[str, str]
    ) -> None:
        """
        Process a field for epic-related data.

        Args:
            field: The field data to process
            field_ids: Dictionary of field IDs to update
        """
        try:
            field_id = field.get("id")
            if not field_id:
                return

            # Skip non-custom fields
            if not field_id.startswith("customfield_"):
                return

            name = field.get("name", "").lower()

            # Look for field names related to epics
            if "epic" in name:
                if "link" in name:
                    field_ids["epic_link"] = field_id
                    field_ids["Epic Link"] = field_id
                elif "name" in name:
                    field_ids["epic_name"] = field_id
                    field_ids["Epic Name"] = field_id
        except Exception as e:
            logger.warning(f"Error processing field for epic data: {str(e)}")

    def _get_raw_transitions(self, issue_key: str) -> list[dict]:
        """
        Get raw transition data from the Jira API.

        This is an internal method that returns unprocessed transition data.
        For normalized transitions with proper structure, use get_available_transitions()
        from TransitionsMixin instead.

        Args:
            issue_key: The key of the issue

        Returns:
            List of raw transition data from the API

        Raises:
            Exception: If there is an error getting transitions
        """
        try:
            transitions = self.jira.get_issue_transitions(issue_key)
            return transitions
        except Exception as e:
            logger.error(f"Error getting transitions for issue {issue_key}: {str(e)}")
            raise Exception(
                f"Error getting transitions for issue {issue_key}: {str(e)}"
            ) from e

    def transition_issue(self, issue_key: str, transition_id: str) -> JiraIssue:
        """
        Transition an issue to a new status.

        Args:
            issue_key: The key of the issue
            transition_id: The ID of the transition to perform

        Returns:
            JiraIssue model with the updated issue data

        Raises:
            Exception: If there is an error transitioning the issue
        """
        try:
            self.jira.set_issue_status(
                issue_key=issue_key, status_name=transition_id, fields=None, update=None
            )
            return self.get_issue(issue_key)
        except Exception as e:
            logger.error(f"Error transitioning issue {issue_key}: {str(e)}")
            raise

    def batch_create_issues(
        self,
        issues: list[dict[str, Any]],
        validate_only: bool = False,
    ) -> list[JiraIssue]:
        """Create multiple Jira issues in a batch.

        Args:
            issues: List of issue dictionaries, each containing:
                - project_key (str): Key of the project
                - summary (str): Issue summary
                - issue_type (str): Type of issue
                - description (str, optional): Issue description
                - assignee (str, optional): Username of assignee
                - components (list[str], optional): List of component names
                - **kwargs: Additional fields specific to your Jira instance
            validate_only: If True, only validates the issues without creating them

        Returns:
            List of created JiraIssue objects

        Raises:
            ValueError: If any required fields are missing or invalid
            MCPAtlassianAuthenticationError: If authentication fails
        """
        if not issues:
            return []

        # Prepare issues for bulk creation
        issue_updates = []
        for issue_data in issues:
            try:
                # Extract and validate required fields
                project_key = issue_data.pop("project_key", None)
                summary = issue_data.pop("summary", None)
                issue_type = issue_data.pop("issue_type", None)
                description = issue_data.pop("description", "")
                assignee = issue_data.pop("assignee", None)
                components = issue_data.pop("components", None)

                # Validate required fields
                if not all([project_key, summary, issue_type]):
                    raise ValueError(
                        f"Missing required fields for issue: {project_key=}, {summary=}, {issue_type=}"
                    )

                # Prepare fields dictionary
                fields = {
                    "project": {"key": project_key},
                    "summary": summary,
                    "issuetype": {"name": issue_type},
                }

                # Add optional fields
                if description:
                    fields["description"] = description

                # Add assignee if provided
                if assignee:
                    try:
                        # _get_account_id now returns the correct identifier (accountId for cloud, name for server)
                        assignee_identifier = self._get_account_id(assignee)
                        self._add_assignee_to_fields(fields, assignee_identifier)
                    except ValueError as e:
                        logger.warning(f"Could not assign issue: {str(e)}")

                # Add components if provided
                if components:
                    if isinstance(components, list):
                        valid_components = [
                            comp_name.strip()
                            for comp_name in components
                            if isinstance(comp_name, str) and comp_name.strip()
                        ]
                        if valid_components:
                            fields["components"] = [
                                {"name": comp_name} for comp_name in valid_components
                            ]

                # Add any remaining custom fields
                self._process_additional_fields(fields, issue_data)

                if validate_only:
                    # For validation, just log the issue that would be created
                    logger.info(
                        f"Validated issue creation: {project_key} - {summary} ({issue_type})"
                    )
                    continue

                # Add to bulk creation list
                issue_updates.append({"fields": fields})

            except Exception as e:
                logger.error(f"Failed to prepare issue for creation: {str(e)}")
                if not issue_updates:
                    raise

        if validate_only:
            return []

        try:
            # Call Jira's bulk create endpoint
            response = self.jira.create_issues(issue_updates)
            if not isinstance(response, dict):
                msg = f"Unexpected return value type from `jira.create_issues`: {type(response)}"
                logger.error(msg)
                raise TypeError(msg)

            # Process results
            created_issues = []
            for issue_info in response.get("issues", []):
                issue_key = issue_info.get("key")
                if issue_key:
                    try:
                        # Fetch the full issue data
                        issue_data = self.jira.get_issue(issue_key)
                        if not isinstance(issue_data, dict):
                            msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
                            logger.error(msg)
                            raise TypeError(msg)

                        created_issues.append(
                            JiraIssue.from_api_response(
                                issue_data,
                                base_url=self.config.url
                                if hasattr(self, "config")
                                else None,
                            )
                        )
                    except Exception as e:
                        logger.error(
                            f"Error fetching created issue {issue_key}: {str(e)}"
                        )

            # Log any errors from the bulk creation
            errors = response.get("errors", [])
            if errors:
                for error in errors:
                    logger.error(f"Bulk creation error: {error}")

            return created_issues

        except Exception as e:
            logger.error(f"Error in bulk issue creation: {str(e)}")
            raise

    def batch_get_changelogs(
        self, issue_ids_or_keys: list[str], fields: list[str] | None = None
    ) -> list[JiraIssue]:
        """
        Get changelogs for multiple issues in a batch. Repeatly fetch data if necessary.

        Warning:
            This function is only avaiable on Jira Cloud.

        Args:
            issue_ids_or_keys: List of issue IDs or keys
            fields: Filter the changelogs by fields, e.g. ['status', 'assignee']. Default to None for all fields.

        Returns:
            List of JiraIssue objects that only contain changelogs and id
        """

        if not self.config.is_cloud:
            error_msg = "Batch get issue changelogs is only available on Jira Cloud."
            logger.error(error_msg)
            raise NotImplementedError(error_msg)

        # Get paged api results
        paged_api_results = self.get_paged(
            method="post",
            url=self.jira.resource_url("changelog/bulkfetch"),
            params_or_json={
                "fieldIds": fields,
                "issueIdsOrKeys": issue_ids_or_keys,
            },
        )

        # Save (issue_id, changelogs)
        issue_changelog_results: defaultdict[str, list[JiraChangelog]] = defaultdict(
            list
        )

        for api_result in paged_api_results:
            for data in api_result.get("issueChangeLogs", []):
                issue_id = data.get("issueId", "")
                changelogs = [
                    JiraChangelog.from_api_response(changelog_data)
                    for changelog_data in data.get("changeHistories", [])
                ]

                issue_changelog_results[issue_id].extend(changelogs)

        issues = [
            JiraIssue(id=issue_id, changelogs=changelogs)
            for issue_id, changelogs in issue_changelog_results.items()
        ]

        return issues

```
Page 9/10FirstPrevNextLast