This is page 9 of 10. Use http://codebase.md/sooperset/mcp-atlassian?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/test_real_api_validation.py:
--------------------------------------------------------------------------------
```python
"""
Test file for validating the refactored FastMCP tools with real API data.
This test file connects to real Jira and Confluence instances to validate
that our model refactoring works correctly with actual API data.
These tests will be skipped if the required environment variables are not set
or if the --use-real-data flag is not passed to pytest.
To run these tests:
pytest tests/test_real_api_validation.py --use-real-data
Required environment variables:
- JIRA_URL, JIRA_USERNAME, JIRA_API_TOKEN
- CONFLUENCE_URL, CONFLUENCE_USERNAME, CONFLUENCE_API_TOKEN
- JIRA_TEST_ISSUE_KEY, JIRA_TEST_EPIC_KEY
- CONFLUENCE_TEST_PAGE_ID, JIRA_TEST_PROJECT_KEY, CONFLUENCE_TEST_SPACE_KEY, CONFLUENCE_TEST_SPACE_KEY
"""
import datetime
import json
import os
import uuid
from collections.abc import Callable, Generator, Sequence
import pytest
from fastmcp import Client
from fastmcp.client import FastMCPTransport
from mcp.types import TextContent
from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.comments import CommentsMixin as ConfluenceCommentsMixin
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.confluence.labels import LabelsMixin as ConfluenceLabelsMixin
from mcp_atlassian.confluence.pages import PagesMixin
from mcp_atlassian.confluence.search import SearchMixin as ConfluenceSearchMixin
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.jira.links import LinksMixin
from mcp_atlassian.models.confluence import (
ConfluenceComment,
ConfluenceLabel,
ConfluencePage,
)
from mcp_atlassian.models.jira import JiraIssueLinkType
from mcp_atlassian.servers import main_mcp
# Resource tracking for cleanup
class ResourceTracker:
"""Tracks resources created during tests for cleanup."""
def __init__(self):
self.jira_issues: list[str] = []
self.confluence_pages: list[str] = []
self.confluence_comments: list[str] = []
self.jira_comments: list[str] = []
def add_jira_issue(self, issue_key: str) -> None:
"""Track a Jira issue for later cleanup."""
self.jira_issues.append(issue_key)
def add_confluence_page(self, page_id: str) -> None:
"""Track a Confluence page for later cleanup."""
self.confluence_pages.append(page_id)
def add_confluence_comment(self, comment_id: str) -> None:
"""Track a Confluence comment for later cleanup."""
self.confluence_comments.append(comment_id)
def add_jira_comment(self, issue_key: str, comment_id: str) -> None:
"""Track a Jira comment for later cleanup."""
self.jira_comments.append((issue_key, comment_id))
def cleanup(
self,
jira_client: JiraFetcher | None = None,
confluence_client: ConfluenceFetcher | None = None,
) -> None:
"""Clean up all tracked resources."""
if jira_client:
for issue_key, comment_id in self.jira_comments:
try:
jira_client.delete_comment(issue_key, comment_id)
print(f"Deleted Jira comment {comment_id} from issue {issue_key}")
except Exception as e:
print(f"Failed to delete Jira comment {comment_id}: {e}")
for issue_key in self.jira_issues:
try:
jira_client.delete_issue(issue_key)
print(f"Deleted Jira issue {issue_key}")
except Exception as e:
print(f"Failed to delete Jira issue {issue_key}: {e}")
if confluence_client:
for comment_id in self.confluence_comments:
try:
confluence_client.delete_comment(comment_id)
print(f"Deleted Confluence comment {comment_id}")
except Exception as e:
print(f"Failed to delete Confluence comment {comment_id}: {e}")
for page_id in self.confluence_pages:
try:
confluence_client.delete_page(page_id)
print(f"Deleted Confluence page {page_id}")
except Exception as e:
print(f"Failed to delete Confluence page {page_id}: {e}")
@pytest.fixture
def jira_config() -> JiraConfig:
"""Create a JiraConfig from environment variables."""
return JiraConfig.from_env()
@pytest.fixture
def confluence_config() -> ConfluenceConfig:
"""Create a ConfluenceConfig from environment variables."""
return ConfluenceConfig.from_env()
@pytest.fixture
def jira_client(jira_config: JiraConfig) -> JiraFetcher:
"""Create a JiraFetcher instance."""
return JiraFetcher(config=jira_config)
@pytest.fixture
def confluence_client(confluence_config: ConfluenceConfig) -> ConfluenceFetcher:
"""Create a ConfluenceFetcher instance."""
return ConfluenceFetcher(config=confluence_config)
@pytest.fixture
def test_issue_key() -> str:
"""Get test issue key from environment."""
issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY")
if not issue_key:
pytest.skip("JIRA_TEST_ISSUE_KEY environment variable not set")
return issue_key
@pytest.fixture
def test_epic_key() -> str:
"""Get test epic key from environment."""
epic_key = os.environ.get("JIRA_TEST_EPIC_KEY")
if not epic_key:
pytest.skip("JIRA_TEST_EPIC_KEY environment variable not set")
return epic_key
@pytest.fixture
def test_page_id() -> str:
"""Get test Confluence page ID from environment."""
page_id = os.environ.get("CONFLUENCE_TEST_PAGE_ID")
if not page_id:
pytest.skip("CONFLUENCE_TEST_PAGE_ID environment variable not set")
return page_id
@pytest.fixture
def test_project_key() -> str:
"""Get test Jira project key from environment."""
project_key = os.environ.get("JIRA_TEST_PROJECT_KEY")
if not project_key:
pytest.skip("JIRA_TEST_PROJECT_KEY environment variable not set")
return project_key
@pytest.fixture
def test_space_key() -> str:
"""Get test Confluence space key from environment."""
space_key = os.environ.get("CONFLUENCE_TEST_SPACE_KEY")
if not space_key:
pytest.skip("CONFLUENCE_TEST_SPACE_KEY environment variable not set")
return space_key
@pytest.fixture
def test_board_id() -> str:
"""Get test Jira board ID from environment."""
board_id = os.environ.get("JIRA_TEST_BOARD_ID")
if not board_id:
pytest.skip("JIRA_TEST_BOARD_ID environment variable not set")
return board_id
@pytest.fixture
def resource_tracker() -> Generator[ResourceTracker, None, None]:
"""Create and yield a ResourceTracker that will be used to clean up after tests."""
tracker = ResourceTracker()
yield tracker
@pytest.fixture
def cleanup_resources(
resource_tracker: ResourceTracker,
jira_client: JiraFetcher,
confluence_client: ConfluenceFetcher,
) -> Callable[[], None]:
"""Return a function that can be called to clean up resources."""
def _cleanup():
resource_tracker.cleanup(
jira_client=jira_client, confluence_client=confluence_client
)
return _cleanup
# Only use asyncio backend for anyio tests
pytestmark = pytest.mark.anyio(backends=["asyncio"])
@pytest.fixture(scope="class")
async def api_validation_client():
"""Provides a FastMCP client connected to the main server for tool calls."""
transport = FastMCPTransport(main_mcp)
client = Client(transport=transport)
async with client as connected_client:
yield connected_client
async def call_tool(
client: Client, tool_name: str, arguments: dict
) -> list[TextContent]:
"""Helper function to call tools via the client."""
return await client.call_tool(tool_name, arguments)
class TestRealJiraValidation:
"""
Test class for validating Jira models with real API data.
These tests will be skipped if:
1. The --use-real-data flag is not passed to pytest
2. The required Jira environment variables are not set
"""
@pytest.mark.anyio
async def test_get_issue(self, use_real_jira_data, api_validation_client):
"""Test that get_issue returns a proper JiraIssue model."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY", "TES-143")
result_content = await call_tool(
api_validation_client, "jira_get_issue", {"issue_key": issue_key}
)
assert result_content and isinstance(result_content[0], TextContent)
issue_data = json.loads(result_content[0].text)
assert isinstance(issue_data, dict)
assert issue_data.get("key") == issue_key
assert "id" in issue_data
assert "summary" in issue_data
@pytest.mark.anyio
async def test_search_issues(self, use_real_jira_data, api_validation_client):
"""Test that search_issues returns JiraIssue models."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
jql = 'project = "TES" ORDER BY created DESC'
result_content = await call_tool(
api_validation_client, "jira_search", {"jql": jql, "limit": 5}
)
assert result_content and isinstance(result_content[0], TextContent)
search_data = json.loads(result_content[0].text)
assert isinstance(search_data, dict)
assert "issues" in search_data
assert isinstance(search_data["issues"], list)
assert len(search_data["issues"]) > 0
for issue_dict in search_data["issues"]:
assert isinstance(issue_dict, dict)
assert "key" in issue_dict
assert "id" in issue_dict
assert "summary" in issue_dict
@pytest.mark.anyio
async def test_get_issue_comments(self, use_real_jira_data, api_validation_client):
"""Test that issue comments are properly converted to JiraComment models."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY", "TES-143")
result_content = await call_tool(
api_validation_client,
"jira_get_issue",
{"issue_key": issue_key, "fields": "comment", "comment_limit": 5},
)
issue_data = json.loads(result_content[0].text)
for comment in issue_data["comments"]:
assert isinstance(comment, dict)
assert "body" in comment or "author" in comment
class TestRealConfluenceValidation:
"""
Test class for validating Confluence models with real API data.
These tests will be skipped if:
1. The --use-real-data flag is not passed to pytest
2. The required Confluence environment variables are not set
"""
def test_get_page_content(self, use_real_confluence_data, test_page_id):
"""Test that get_page_content returns a proper ConfluencePage model."""
if not use_real_confluence_data:
pytest.skip("Real Confluence data testing is disabled")
config = ConfluenceConfig.from_env()
pages_client = PagesMixin(config=config)
page = pages_client.get_page_content(test_page_id)
assert isinstance(page, ConfluencePage)
assert page.id == test_page_id
assert page.title is not None
assert page.content is not None
assert page.space is not None
assert page.space.key is not None
assert page.content_format in ["storage", "view", "markdown"]
def test_get_page_comments(self, use_real_confluence_data, test_page_id):
"""Test that page comments are properly converted to ConfluenceComment models."""
if not use_real_confluence_data:
pytest.skip("Real Confluence data testing is disabled")
config = ConfluenceConfig.from_env()
comments_client = ConfluenceCommentsMixin(config=config)
comments = comments_client.get_page_comments(test_page_id)
if len(comments) == 0:
pytest.skip("Test page has no comments")
for comment in comments:
assert isinstance(comment, ConfluenceComment)
assert comment.id is not None
assert comment.body is not None
def test_get_page_labels(self, use_real_confluence_data, test_page_id):
"""Test that page labels are properly converted to ConfluenceLabel models."""
if not use_real_confluence_data:
pytest.skip("Real Confluence data testing is disabled")
config = ConfluenceConfig.from_env()
labels_client = ConfluenceLabelsMixin(config=config)
labels = labels_client.get_page_labels(test_page_id)
if len(labels) == 0:
pytest.skip("Test page has no labels")
for label in labels:
assert isinstance(label, ConfluenceLabel)
assert label.id is not None
assert label.name is not None
def test_search_content(self, use_real_confluence_data):
"""Test that search returns ConfluencePage models."""
if not use_real_confluence_data:
pytest.skip("Real Confluence data testing is disabled")
config = ConfluenceConfig.from_env()
search_client = ConfluenceSearchMixin(config=config)
cql = 'type = "page" ORDER BY created DESC'
results = search_client.search(cql, limit=5)
assert len(results) > 0
for page in results:
assert isinstance(page, ConfluencePage)
assert page.id is not None
assert page.title is not None
@pytest.mark.anyio
async def test_jira_get_issue(jira_client: JiraFetcher, test_issue_key: str) -> None:
"""Test retrieving an issue from Jira."""
issue = jira_client.get_issue(test_issue_key)
assert issue is not None
assert issue.key == test_issue_key
assert hasattr(issue, "fields") or hasattr(issue, "summary")
@pytest.mark.anyio
async def test_jira_get_issue_with_fields(
jira_client: JiraFetcher, test_issue_key: str
) -> None:
"""Test retrieving a Jira issue with specific fields."""
full_issue = jira_client.get_issue(test_issue_key)
assert full_issue is not None
limited_issue = jira_client.get_issue(
test_issue_key, fields="summary,description,customfield_*"
)
assert limited_issue is not None
assert limited_issue.key == test_issue_key
assert limited_issue.summary is not None
full_data = full_issue.to_simplified_dict()
limited_data = limited_issue.to_simplified_dict()
assert "key" in full_data and "key" in limited_data
assert "id" in full_data and "id" in limited_data
assert "summary" in full_data and "summary" in limited_data
assert "description" in limited_data
custom_fields_found = False
for field in limited_data:
if field.startswith("customfield_"):
custom_fields_found = True
break
if "assignee" in full_data and "assignee" not in limited_data:
assert "assignee" not in limited_data
if "status" in full_data and "status" not in limited_data:
assert "status" not in limited_data
list_fields_issue = jira_client.get_issue(
test_issue_key, fields=["summary", "status"]
)
assert list_fields_issue is not None
list_data = list_fields_issue.to_simplified_dict()
assert "summary" in list_data
if "status" in full_data:
assert "status" in list_data
@pytest.mark.anyio
async def test_jira_get_epic_issues(
jira_client: JiraFetcher, test_epic_key: str
) -> None:
"""Test retrieving issues linked to an epic from Jira."""
issues = jira_client.get_epic_issues(test_epic_key)
assert isinstance(issues, list)
if issues:
for issue in issues:
assert hasattr(issue, "key")
assert hasattr(issue, "id")
@pytest.mark.anyio
async def test_confluence_get_page_content(
confluence_client: ConfluenceFetcher, test_page_id: str
) -> None:
"""Test retrieving a page from Confluence."""
page = confluence_client.get_page_content(test_page_id)
assert page is not None
assert page.id == test_page_id
assert page.title is not None
@pytest.mark.anyio
async def test_jira_create_issue(
jira_client: JiraFetcher,
test_project_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""Test creating an issue in Jira."""
test_id = str(uuid.uuid4())[:8]
summary = f"Test Issue (API Validation) {test_id}"
description = "This is a test issue created by the API validation tests. It should be automatically deleted."
try:
issue = jira_client.create_issue(
project_key=test_project_key,
summary=summary,
description=description,
issue_type="Task",
)
resource_tracker.add_jira_issue(issue.key)
assert issue is not None
assert issue.key.startswith(test_project_key)
assert issue.summary == summary
retrieved_issue = jira_client.get_issue(issue.key)
assert retrieved_issue is not None
assert retrieved_issue.key == issue.key
assert retrieved_issue.summary == summary
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_jira_create_subtask(
jira_client: JiraFetcher,
test_project_key: str,
test_issue_key: str,
test_epic_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""Test creating a subtask in Jira linked to a specified parent and epic."""
test_id = str(uuid.uuid4())[:8]
subtask_summary = f"Subtask Test Issue {test_id}"
try:
parent_issue_key = test_issue_key
subtask_issue = jira_client.create_issue(
project_key=test_project_key,
summary=subtask_summary,
description=f"This is a test subtask linked to parent {parent_issue_key} and epic {test_epic_key}",
issue_type="Subtask",
parent=parent_issue_key,
epic_link=test_epic_key,
)
resource_tracker.add_jira_issue(subtask_issue.key)
assert subtask_issue is not None
assert subtask_issue.key.startswith(test_project_key)
assert subtask_issue.summary == subtask_summary
retrieved_subtask = jira_client.get_issue(subtask_issue.key)
assert retrieved_subtask is not None
assert retrieved_subtask.key == subtask_issue.key
if hasattr(retrieved_subtask, "fields"):
if hasattr(retrieved_subtask.fields, "parent"):
assert retrieved_subtask.fields.parent.key == parent_issue_key
field_ids = jira_client.get_field_ids_to_epic()
epic_link_field = field_ids.get("epic_link") or field_ids.get("Epic Link")
if epic_link_field and hasattr(retrieved_subtask.fields, epic_link_field):
epic_key = getattr(retrieved_subtask.fields, epic_link_field)
assert epic_key == test_epic_key
print(
f"\nCreated subtask {subtask_issue.key} under parent {parent_issue_key} and epic {test_epic_key}"
)
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_jira_create_task_with_parent(
jira_client: JiraFetcher,
test_project_key: str,
test_epic_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""Test creating a task in Jira with a parent issue (non-subtask)."""
test_id = str(uuid.uuid4())[:8]
task_summary = f"Task with Parent Test {test_id}"
try:
parent_issue_key = test_epic_key
try:
task_issue = jira_client.create_issue(
project_key=test_project_key,
summary=task_summary,
description=f"This is a test task linked to parent {parent_issue_key}",
issue_type="Task",
parent=parent_issue_key,
)
resource_tracker.add_jira_issue(task_issue.key)
assert task_issue is not None
assert task_issue.key.startswith(test_project_key)
assert task_issue.summary == task_summary
retrieved_task = jira_client.get_issue(task_issue.key)
assert retrieved_task is not None
assert retrieved_task.key == task_issue.key
if hasattr(retrieved_task, "fields"):
if hasattr(retrieved_task.fields, "parent"):
assert retrieved_task.fields.parent.key == parent_issue_key
print(f"\nCreated task {task_issue.key} with parent {parent_issue_key}")
except Exception as e:
if "hierarchy" in str(e).lower():
pytest.skip(
f"Parent-child relationship not allowed by Jira configuration: {str(e)}"
)
else:
raise
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_jira_create_epic(
jira_client: JiraFetcher,
test_project_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""
Test creating an Epic issue in Jira.
This test verifies that the create_issue method can handle Epic creation
properly for any Jira instance, regardless of the specific custom field
configuration used for Epic Name or other Epic-specific fields.
"""
# Generate unique identifiers for this test
test_id = str(uuid.uuid4())[:8]
epic_summary = f"Test Epic {test_id}"
try:
epic_issue = jira_client.create_issue(
project_key=test_project_key,
summary=epic_summary,
description="This is a test epic for validating Epic creation functionality.",
issue_type="Epic",
)
resource_tracker.add_jira_issue(epic_issue.key)
assert epic_issue is not None
assert epic_issue.key.startswith(test_project_key)
assert epic_issue.summary == epic_summary
print(f"\nTEST PASSED: Successfully created Epic issue {epic_issue.key}")
retrieved_epic = jira_client.get_issue(epic_issue.key)
assert retrieved_epic is not None
assert retrieved_epic.key == epic_issue.key
assert retrieved_epic.summary == epic_summary
except Exception as e:
print(f"\nERROR creating Epic: {str(e)}")
try:
print("\n=== Jira Field Information for Debugging ===")
field_ids = jira_client.get_field_ids_to_epic()
print(f"Available field IDs: {field_ids}")
except Exception as error:
print(f"Error retrieving field IDs: {str(error)}")
raise
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_jira_add_comment(
jira_client: JiraFetcher,
test_issue_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""Test adding a comment to a Jira issue."""
# Generate a unique comment text
test_id = str(uuid.uuid4())[:8]
comment_text = f"Test comment from API validation tests {test_id}. This should be automatically deleted."
try:
comment = jira_client.add_comment(
issue_key=test_issue_key, comment=comment_text
)
if hasattr(comment, "id"):
resource_tracker.add_jira_comment(test_issue_key, comment.id)
elif isinstance(comment, dict) and "id" in comment:
resource_tracker.add_jira_comment(test_issue_key, comment["id"])
assert comment is not None
if hasattr(comment, "body"):
actual_text = comment.body
elif hasattr(comment, "content"):
actual_text = comment.content
elif isinstance(comment, dict) and "body" in comment:
actual_text = comment["body"]
else:
actual_text = str(comment)
assert comment_text in actual_text
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_confluence_create_page(
confluence_client: ConfluenceFetcher,
test_space_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""Test creating a page in Confluence."""
# Generate a unique title
test_id = str(uuid.uuid4())[:8]
title = f"Test Page (API Validation) {test_id}"
timestamp = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
content = f"""
<h1>Test Page</h1>
<p>This is a test page created by the API validation tests at {timestamp}.</p>
<p>It should be automatically deleted after the test.</p>
"""
try:
try:
page = confluence_client.create_page(
space_key=test_space_key, title=title, body=content
)
except Exception as e:
if "permission" in str(e).lower():
pytest.skip(f"No permission to create pages in space {test_space_key}")
return
elif "space" in str(e).lower() and (
"not found" in str(e).lower() or "doesn't exist" in str(e).lower()
):
pytest.skip(
f"Space {test_space_key} not found. Skipping page creation test."
)
return
else:
raise
page_id = page.id
resource_tracker.add_confluence_page(page_id)
assert page is not None
assert page.title == title
retrieved_page = confluence_client.get_page_content(page_id)
assert retrieved_page is not None
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_confluence_update_page(
confluence_client: ConfluenceFetcher,
resource_tracker: ResourceTracker,
test_space_key: str,
cleanup_resources: Callable[[], None],
) -> None:
"""Test updating a page in Confluence and validate TextContent structure.
This test has two purposes:
1. Test the basic page update functionality
2. Validate the TextContent class requires the 'type' field to prevent issue #97
"""
test_id = str(uuid.uuid4())[:8]
title = f"Update Test Page {test_id}"
content = f"<p>Initial content {test_id}</p>"
try:
page = confluence_client.create_page(
space_key=test_space_key, title=title, body=content
)
page_id = page.id
resource_tracker.add_confluence_page(page_id)
now = datetime.datetime.now(tz=datetime.timezone.utc).isoformat()
updated_content = f"<p>Updated content {test_id} at {now}</p>"
updated_page = confluence_client.update_page(
page_id=page_id, title=title, body=updated_content
)
assert updated_page is not None
# ======= TextContent Validation (prevents issue #97) =======
# Import TextContent class to test directly
from mcp.types import TextContent
print("Testing TextContent validation to prevent issue #97")
try:
_ = TextContent(text="This should fail without type field")
raise AssertionError(
"TextContent creation without 'type' field should fail but didn't"
)
except Exception as e:
print(f"Correctly got error: {str(e)}")
assert "type" in str(e), "Error should mention missing 'type' field"
valid_content = TextContent(
type="text", text="This should work with type field"
)
assert valid_content.type == "text", "TextContent should have type='text'"
assert valid_content.text == "This should work with type field", (
"TextContent text should match"
)
print("TextContent validation succeeded - 'type' field is properly required")
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_confluence_add_page_label(
confluence_client: ConfluenceFetcher,
resource_tracker: ResourceTracker,
test_space_key: str,
cleanup_resources: Callable[[], None],
) -> None:
"""Test adding a label to a page in Confluence"""
test_id = str(uuid.uuid4())[:8]
title = f"Update Test Page {test_id}"
content = f"<p>Initial content {test_id}</p>"
try:
page = confluence_client.create_page(
space_key=test_space_key, title=title, body=content
)
page_id = page.id
resource_tracker.add_confluence_page(page_id)
name = "test"
updated_labels = confluence_client.add_page_label(page_id=page_id, name=name)
assert updated_labels is not None
finally:
cleanup_resources()
@pytest.mark.skip(reason="This test modifies data - use with caution")
@pytest.mark.anyio
async def test_jira_transition_issue(
jira_client: JiraFetcher,
resource_tracker: ResourceTracker,
test_project_key: str,
cleanup_resources: Callable[[], None],
) -> None:
"""Test transitioning an issue in Jira."""
test_id = str(uuid.uuid4())[:8]
summary = f"Transition Test Issue {test_id}"
try:
issue = jira_client.create_issue(
project_key=test_project_key,
summary=summary,
description="Test issue for transition testing",
issue_type="Task",
)
resource_tracker.add_jira_issue(issue.key)
transitions = jira_client.get_transitions(issue.key)
assert transitions is not None
assert len(transitions) > 0
transition_id = None
for transition in transitions:
if hasattr(transition, "name") and "progress" in transition.name.lower():
transition_id = transition.id
break
if not transition_id and transitions:
transition_id = (
transitions[0].id
if hasattr(transitions[0], "id")
else transitions[0]["id"]
)
assert transition_id is not None
transition_result = jira_client.transition_issue(
issue_key=issue.key, transition_id=transition_id
)
if transition_result is not None:
assert transition_result, (
"Transition should return a truthy value if successful"
)
updated_issue = jira_client.get_issue(issue.key)
if hasattr(updated_issue, "status") and hasattr(updated_issue.status, "name"):
status_name = updated_issue.status.name
else:
status_name = updated_issue["fields"]["status"]["name"]
assert "to do" not in status_name.lower()
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_jira_create_epic_with_custom_fields(
jira_client: JiraFetcher,
test_project_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""
Test creating an Epic issue in Jira with custom Epic fields.
This test verifies that the create_issue method can handle Epic creation
with explicit Epic Name and Epic Color values, properly detecting the
correct custom fields regardless of the Jira configuration.
"""
test_id = str(uuid.uuid4())[:8]
epic_summary = f"Test Epic {test_id}"
custom_epic_name = f"Custom Epic Name {test_id}"
try:
field_ids = jira_client.get_field_ids_to_epic()
jira_client._field_ids_cache = None
print(f"Discovered field IDs: {field_ids}")
epic_issue = jira_client.create_issue(
project_key=test_project_key,
summary=epic_summary,
description="This is a test epic with custom values.",
issue_type="Epic",
epic_name=custom_epic_name,
epic_color="blue",
)
jira_client._field_ids_cache = None
resource_tracker.add_jira_issue(epic_issue.key)
jira_client._field_ids_cache = None
assert epic_issue is not None
assert epic_issue.key.startswith(test_project_key)
assert epic_issue.summary == epic_summary
print(f"\nTEST PASSED: Successfully created Epic issue {epic_issue.key}")
retrieved_epic = jira_client.get_issue(epic_issue.key)
jira_client._field_ids_cache = None
has_epic_name = False
if hasattr(retrieved_epic, "epic_name") and retrieved_epic.epic_name:
assert retrieved_epic.epic_name == custom_epic_name
has_epic_name = True
print(
f"Verified Epic Name via epic_name property: {retrieved_epic.epic_name}"
)
if not has_epic_name:
print("Could not verify Epic Name directly, checking raw fields...")
if hasattr(jira_client, "jira"):
raw_issue = jira_client.jira.issue(epic_issue.key)
fields = raw_issue.get("fields", {})
for field_id, value in fields.items():
if field_id.startswith("customfield_") and isinstance(value, str):
if value == custom_epic_name:
print(
f"Found Epic Name in custom field {field_id}: {value}"
)
has_epic_name = True
break
if not has_epic_name:
print("WARNING: Could not verify Epic Name was set correctly")
except Exception as e:
print(f"\nERROR creating Epic with custom fields: {str(e)}")
try:
print("\n=== Jira Field Information for Debugging ===")
field_ids = jira_client.get_field_ids_to_epic()
print(f"Available field IDs: {field_ids}")
except Exception as error:
print(f"Error retrieving field IDs: {str(error)}")
raise
finally:
cleanup_resources()
@pytest.mark.anyio
async def test_jira_create_epic_two_step(
jira_client: JiraFetcher,
test_project_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""
Test the two-step Epic creation process.
This test verifies that the create_issue method can successfully create an Epic
using the two-step approach (create basic issue first, then update Epic fields)
to work around screen configuration issues.
"""
test_id = str(uuid.uuid4())[:8]
epic_summary = f"Two-Step Epic {test_id}"
epic_name = f"Epic Name {test_id}"
try:
field_ids = jira_client.get_field_ids_to_epic()
jira_client._field_ids_cache = None
print(f"\nAvailable field IDs for Epic creation: {field_ids}")
print("\nAttempting to create Epic using two-step process...")
epic_issue = jira_client.create_issue(
project_key=test_project_key,
summary=epic_summary,
description="This is a test epic using the two-step creation process.",
issue_type="Epic",
epic_name=epic_name, # This should be stored for post-creation update
epic_color="blue", # This should be stored for post-creation update
)
jira_client._field_ids_cache = None
resource_tracker.add_jira_issue(epic_issue.key)
jira_client._field_ids_cache = None
assert epic_issue is not None
assert epic_issue.key.startswith(test_project_key)
assert epic_issue.summary == epic_summary
print(f"\nSuccessfully created Epic: {epic_issue.key}")
retrieved_epic = jira_client.get_issue(epic_issue.key)
jira_client._field_ids_cache = None
print(f"\nRetrieved Epic: {retrieved_epic.key}")
print(f"Epic name: {retrieved_epic.epic_name}")
try:
if hasattr(retrieved_epic, "_raw"):
raw_data = retrieved_epic._raw
else:
raw_data = jira_client.jira.issue(epic_issue.key)
if "fields" in raw_data:
for field_id, field_value in raw_data["fields"].items():
if "epic" in field_id.lower() or field_id in field_ids.values():
print(f"Field {field_id}: {field_value}")
except Exception as e:
print(f"Error getting raw Epic data: {str(e)}")
print("\nTEST PASSED: Successfully completed two-step Epic creation test")
except Exception as e:
print(f"\nERROR in two-step Epic creation test: {str(e)}")
print("\nAvailable field IDs:")
try:
field_ids = jira_client.get_field_ids_to_epic()
for name, field_id in field_ids.items():
print(f" {name}: {field_id}")
except Exception as field_error:
print(f"Error getting field IDs: {str(field_error)}")
raise
finally:
cleanup_resources()
# Tool Validation Tests (Requires --use-real-data)
# These tests use the server's call_tool handler to test the full flow
@pytest.mark.usefixtures("use_real_jira_data")
class TestRealToolValidation:
"""
Test class for validating tool calls with real API data.
"""
@pytest.mark.anyio
async def test_jira_search_with_start_at(
self, use_real_jira_data: bool, test_project_key: str
) -> None:
"""Test the jira_search tool with the startAt parameter."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
jql = f'project = "{test_project_key}" ORDER BY created ASC'
limit = 1
args1 = {"jql": jql, "limit": limit, "startAt": 0}
result1_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira_search", args1
)
assert result1_content and isinstance(result1_content[0], TextContent)
results1 = json.loads(result1_content[0].text)
args2 = {"jql": jql, "limit": limit, "startAt": 1}
result2_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira_search", args2
)
assert result2_content and isinstance(result2_content[0], TextContent)
results2 = json.loads(result2_content[0].text)
assert isinstance(results1.get("issues"), list)
assert isinstance(results2.get("issues"), list)
if len(results1["issues"]) > 0 and len(results2["issues"]) > 0:
assert results1["issues"][0]["key"] != results2["issues"][0]["key"], (
f"Expected different issues with startAt=0 and startAt=1, but got {results1['issues'][0]['key']} for both."
f" Ensure project '{test_project_key}' has at least 2 issues."
)
elif len(results1["issues"]) <= 1:
pytest.skip(
f"Project {test_project_key} has less than 2 issues, cannot test pagination."
)
@pytest.mark.anyio
async def test_jira_get_project_issues_with_start_at(
self, use_real_jira_data: bool, test_project_key: str
) -> None:
"""Test the jira_get_project_issues tool with the startAt parameter."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
limit = 1
args1 = {"project_key": test_project_key, "limit": limit, "startAt": 0}
result1_content = list(
await call_tool(api_validation_client, "jira_get_project_issues", args1)
)
assert isinstance(result1_content[0], TextContent)
results1 = json.loads(result1_content[0].text)
args2 = {"project_key": test_project_key, "limit": limit, "startAt": 1}
result2_content = list(
await call_tool(api_validation_client, "jira_get_project_issues", args2)
)
assert isinstance(result2_content[0], TextContent)
results2 = json.loads(result2_content[0].text)
assert isinstance(results1, list)
assert isinstance(results2, list)
if len(results1) > 0 and len(results2) > 0:
assert results1[0]["key"] != results2[0]["key"], (
f"Expected different issues with startAt=0 and startAt=1, but got {results1[0]['key']} for both."
f" Ensure project '{test_project_key}' has at least 2 issues."
)
elif len(results1) <= 1:
pytest.skip(
f"Project {test_project_key} has less than 2 issues, cannot test pagination."
)
@pytest.mark.anyio
async def test_jira_get_epic_issues_with_start_at(
self, use_real_jira_data: bool, test_epic_key: str
) -> None:
"""Test the jira_get_epic_issues tool with the startAt parameter."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
limit = 1
args1 = {"epic_key": test_epic_key, "limit": limit, "startAt": 0}
result1_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira_get_epic_issues", args1
)
assert result1_content and isinstance(result1_content[0], TextContent)
results1 = json.loads(result1_content[0].text)
args2 = {"epic_key": test_epic_key, "limit": limit, "startAt": 1}
result2_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira_get_epic_issues", args2
)
assert result2_content and isinstance(result2_content[0], TextContent)
results2 = json.loads(result2_content[0].text)
assert isinstance(results1.get("issues"), list)
assert isinstance(results2.get("issues"), list)
if len(results1["issues"]) > 0 and len(results2["issues"]) > 0:
assert results1["issues"][0]["key"] != results2["issues"][0]["key"], (
f"Expected different issues with startAt=0 and startAt=1, but got {results1['issues'][0]['key']} for both."
f" Ensure epic '{test_epic_key}' has at least 2 linked issues."
)
elif len(results1["issues"]) <= 1:
pytest.skip(
f"Epic {test_epic_key} has less than 2 issues, cannot test pagination."
)
@pytest.mark.anyio
async def test_jira_get_issue_includes_comments(
self, use_real_jira_data: bool, test_issue_key: str
) -> None:
"""Test that jira_get_issue includes comments when comment_limit > 0."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
result = await call_tool(
api_validation_client,
"jira_get_issue",
{"issue_key": test_issue_key, "comment_limit": 10},
)
assert isinstance(result, dict)
assert "comments" in result
assert isinstance(result["comments"], list)
result_without_comments = await call_tool(
api_validation_client,
"jira_get_issue",
{"issue_key": test_issue_key, "comment_limit": 10, "fields": "summary"},
)
assert isinstance(result_without_comments, dict)
assert "comments" not in result_without_comments
@pytest.mark.anyio
async def test_jira_get_link_types_tool(
self, use_real_jira_data: bool, api_validation_client: Client
) -> None:
"""Test the jira_get_link_types tool."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
try:
result_content = list(
await call_tool(api_validation_client, "jira_get_link_types", {})
)
except LookupError:
pytest.skip("Server context not available for call_tool")
assert isinstance(result_content[0], TextContent)
link_types = json.loads(result_content[0].text)
assert isinstance(link_types, list)
assert len(link_types) > 0
first_link = link_types[0]
assert "id" in first_link
assert "name" in first_link
assert "inward" in first_link
assert "outward" in first_link
@pytest.mark.anyio
async def test_jira_create_issue_link_tool(
self, use_real_jira_data: bool, test_project_key: str, api_validation_client
) -> None:
"""Test the jira_create_issue_link and jira_remove_issue_link tools."""
if not use_real_jira_data:
pytest.skip("Real Jira data testing is disabled")
test_id = str(uuid.uuid4())[:8]
issue1_args = {
"project_key": test_project_key,
"summary": f"Link Test Source {test_id}",
"description": "Test issue for link testing via tool",
"issue_type": "Task",
}
issue1_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira/create_issue", issue1_args
)
assert issue1_content and isinstance(issue1_content[0], TextContent)
issue1_data = json.loads(issue1_content[0].text)
issue1_key = issue1_data["key"]
issue2_args = {
"project_key": test_project_key,
"summary": f"Link Test Target {test_id}",
"description": "Test issue for link testing via tool",
"issue_type": "Task",
}
issue2_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira/create_issue", issue2_args
)
assert issue2_content and isinstance(issue2_content[0], TextContent)
issue2_data = json.loads(issue2_content[0].text)
issue2_key = issue2_data["key"]
try:
link_types_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira/get_link_types", {}
)
assert link_types_content and isinstance(link_types_content[0], TextContent)
link_types = json.loads(link_types_content[0].text)
link_type_name = None
for lt in link_types:
if "relate" in lt["name"].lower():
link_type_name = lt["name"]
break
# If no "relates to" type found, use the first available type
if not link_type_name:
link_type_name = link_types[0]["name"]
link_args = {
"link_type": link_type_name,
"inward_issue_key": issue1_key,
"outward_issue_key": issue2_key,
"comment": f"Test link created by API validation test {test_id}",
}
link_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira/create_issue_link", link_args
)
assert link_content and isinstance(link_content[0], TextContent)
link_result = json.loads(link_content[0].text)
assert link_result["success"] is True
issue_content: Sequence[TextContent] = await call_tool(
api_validation_client,
"jira/get_issue",
{"issue_key": issue1_key, "fields": "issuelinks"},
)
assert issue_content and isinstance(issue_content[0], TextContent)
issue_data = json.loads(issue_content[0].text)
link_id = None
if "issuelinks" in issue_data:
for link in issue_data["issuelinks"]:
if link.get("outwardIssue", {}).get("key") == issue2_key:
link_id = link.get("id")
break
# If we found a link ID, test removing it
if link_id:
remove_args = {"link_id": link_id}
remove_content: Sequence[TextContent] = await call_tool(
api_validation_client, "jira/remove_issue_link", remove_args
)
assert remove_content and isinstance(remove_content[0], TextContent)
remove_result = json.loads(remove_content[0].text)
assert remove_result["success"] is True
assert remove_result["link_id"] == link_id
finally:
await call_tool(
api_validation_client, "jira/delete_issue", {"issue_key": issue1_key}
)
await call_tool(
api_validation_client, "jira/delete_issue", {"issue_key": issue2_key}
)
@pytest.mark.anyio
async def test_jira_get_issue_link_types(jira_client: JiraFetcher) -> None:
"""Test retrieving issue link types from Jira."""
links_client = LinksMixin(config=jira_client.config)
link_types = links_client.get_issue_link_types()
# An empty list is a valid response if no link types are configured or accessible
assert isinstance(link_types, list)
# If the list is not empty, check the structure of the first element
if link_types:
first_link = link_types[0]
assert isinstance(first_link, JiraIssueLinkType)
assert first_link.id is not None
assert first_link.name is not None
assert first_link.inward is not None
assert first_link.outward is not None
@pytest.mark.anyio
async def test_jira_create_and_remove_issue_link(
jira_client: JiraFetcher,
test_project_key: str,
resource_tracker: ResourceTracker,
cleanup_resources: Callable[[], None],
) -> None:
"""Test creating and removing a link between two Jira issues."""
test_id = str(uuid.uuid4())[:8]
summary1 = f"Link Test Issue 1 {test_id}"
summary2 = f"Link Test Issue 2 {test_id}"
try:
issue1 = jira_client.create_issue(
project_key=test_project_key,
summary=summary1,
description="First test issue for link testing",
issue_type="Task",
)
issue2 = jira_client.create_issue(
project_key=test_project_key,
summary=summary2,
description="Second test issue for link testing",
issue_type="Task",
)
resource_tracker.add_jira_issue(issue1.key)
resource_tracker.add_jira_issue(issue2.key)
links_client = LinksMixin(config=jira_client.config)
link_types = links_client.get_issue_link_types()
assert len(link_types) > 0
link_type_name = None
for lt in link_types:
if "relate" in lt.name.lower():
link_type_name = lt.name
break
# If no "Relates" type found, use the first available type
if not link_type_name:
link_type_name = link_types[0].name
link_data = {
"type": {"name": link_type_name},
"inwardIssue": {"key": issue1.key},
"outwardIssue": {"key": issue2.key},
"comment": {"body": f"Test link created by API validation test {test_id}"},
}
link_result = links_client.create_issue_link(link_data)
assert link_result is not None
assert link_result["success"] is True
assert link_result["inward_issue"] == issue1.key
assert link_result["outward_issue"] == issue2.key
raw_issue = jira_client.jira.issue(issue1.key, fields="issuelinks")
link_id = None
if hasattr(raw_issue, "fields") and hasattr(raw_issue.fields, "issuelinks"):
for link in raw_issue.fields.issuelinks:
if (
hasattr(link, "outwardIssue")
and link.outwardIssue.key == issue2.key
):
link_id = link.id
break
# Skip link removal test if we couldn't find the link ID
if not link_id:
pytest.skip("Could not find link ID for removal test")
remove_result = links_client.remove_issue_link(link_id)
assert remove_result is not None
assert remove_result["success"] is True
assert remove_result["link_id"] == link_id
finally:
# Clean up resources even if the test fails
cleanup_resources()
@pytest.mark.skipif(not os.getenv("TEST_PROXY_URL"), reason="TEST_PROXY_URL not set")
def test_jira_client_real_proxy(jira_config: JiraConfig) -> None:
"""Test JiraClient with a real proxy if TEST_PROXY_URL is set."""
import requests
proxy_url = os.environ["TEST_PROXY_URL"]
os.environ["HTTP_PROXY"] = proxy_url
os.environ["HTTPS_PROXY"] = proxy_url
# Use a simple API call to verify proxy is used and no connection error
client = JiraFetcher(config=JiraConfig.from_env())
try:
issue_key = os.environ.get("JIRA_TEST_ISSUE_KEY")
if not issue_key:
pytest.skip("JIRA_TEST_ISSUE_KEY not set")
result = client.get_issue(issue_key)
assert result is not None
except requests.exceptions.ProxyError:
pytest.fail("Proxy connection failed - check TEST_PROXY_URL and network setup.")
finally:
# Clean up env
del os.environ["HTTP_PROXY"]
del os.environ["HTTPS_PROXY"]
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/jira.py:
--------------------------------------------------------------------------------
```python
"""Jira FastMCP server instance and tool definitions."""
import json
import logging
from typing import Annotated, Any
from fastmcp import Context, FastMCP
from pydantic import Field
from requests.exceptions import HTTPError
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.constants import DEFAULT_READ_JIRA_FIELDS
from mcp_atlassian.models.jira.common import JiraUser
from mcp_atlassian.servers.dependencies import get_jira_fetcher
from mcp_atlassian.utils.decorators import check_write_access
logger = logging.getLogger(__name__)
jira_mcp = FastMCP(
name="Jira MCP Service",
description="Provides tools for interacting with Atlassian Jira.",
)
@jira_mcp.tool(tags={"jira", "read"})
async def get_user_profile(
ctx: Context,
user_identifier: Annotated[
str,
Field(
description="Identifier for the user (e.g., email address '[email protected]', username 'johndoe', account ID 'accountid:...', or key for Server/DC)."
),
],
) -> str:
"""
Retrieve profile information for a specific Jira user.
Args:
ctx: The FastMCP context.
user_identifier: User identifier (email, username, key, or account ID).
Returns:
JSON string representing the Jira user profile object, or an error object if not found.
Raises:
ValueError: If the Jira client is not configured or available.
"""
jira = await get_jira_fetcher(ctx)
try:
user: JiraUser = jira.get_user_profile_by_identifier(user_identifier)
result = user.to_simplified_dict()
response_data = {"success": True, "user": result}
except Exception as e:
error_message = ""
log_level = logging.ERROR
if isinstance(e, ValueError) and "not found" in str(e).lower():
log_level = logging.WARNING
error_message = str(e)
elif isinstance(e, MCPAtlassianAuthenticationError):
error_message = f"Authentication/Permission Error: {str(e)}"
elif isinstance(e, OSError | HTTPError):
error_message = f"Network or API Error: {str(e)}"
else:
error_message = (
"An unexpected error occurred while fetching the user profile."
)
logger.exception(
f"Unexpected error in get_user_profile for '{user_identifier}':"
)
error_result = {
"success": False,
"error": str(e),
"user_identifier": user_identifier,
}
logger.log(
log_level,
f"get_user_profile failed for '{user_identifier}': {error_message}",
)
response_data = error_result
return json.dumps(response_data, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_issue(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
fields: Annotated[
str,
Field(
description=(
"(Optional) Comma-separated list of fields to return (e.g., 'summary,status,customfield_10010'). "
"You may also provide a single field as a string (e.g., 'duedate'). "
"Use '*all' for all fields (including custom fields), or omit for essential fields only."
),
default=",".join(DEFAULT_READ_JIRA_FIELDS),
),
] = ",".join(DEFAULT_READ_JIRA_FIELDS),
expand: Annotated[
str | None,
Field(
description=(
"(Optional) Fields to expand. Examples: 'renderedFields' (for rendered content), "
"'transitions' (for available status transitions), 'changelog' (for history)"
),
default=None,
),
] = None,
comment_limit: Annotated[
int,
Field(
description="Maximum number of comments to include (0 or null for no comments)",
default=10,
ge=0,
le=100,
),
] = 10,
properties: Annotated[
str | None,
Field(
description="(Optional) A comma-separated list of issue properties to return",
default=None,
),
] = None,
update_history: Annotated[
bool,
Field(
description="Whether to update the issue view history for the requesting user",
default=True,
),
] = True,
) -> str:
"""Get details of a specific Jira issue including its Epic links and relationship information.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
fields: Comma-separated list of fields to return (e.g., 'summary,status,customfield_10010'), a single field as a string (e.g., 'duedate'), '*all' for all fields, or omitted for essentials.
expand: Optional fields to expand.
comment_limit: Maximum number of comments.
properties: Issue properties to return.
update_history: Whether to update issue view history.
Returns:
JSON string representing the Jira issue object.
Raises:
ValueError: If the Jira client is not configured or available.
"""
jira = await get_jira_fetcher(ctx)
fields_list: str | list[str] | None = fields
if fields and fields != "*all":
fields_list = [f.strip() for f in fields.split(",")]
issue = jira.get_issue(
issue_key=issue_key,
fields=fields_list,
expand=expand,
comment_limit=comment_limit,
properties=properties.split(",") if properties else None,
update_history=update_history,
)
result = issue.to_simplified_dict()
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def search(
ctx: Context,
jql: Annotated[
str,
Field(
description=(
"JQL query string (Jira Query Language). Examples:\n"
'- Find Epics: "issuetype = Epic AND project = PROJ"\n'
'- Find issues in Epic: "parent = PROJ-123"\n'
"- Find by status: \"status = 'In Progress' AND project = PROJ\"\n"
'- Find by assignee: "assignee = currentUser()"\n'
'- Find recently updated: "updated >= -7d AND project = PROJ"\n'
'- Find by label: "labels = frontend AND project = PROJ"\n'
'- Find by priority: "priority = High AND project = PROJ"'
)
),
],
fields: Annotated[
str,
Field(
description=(
"(Optional) Comma-separated fields to return in the results. "
"Use '*all' for all fields, or specify individual fields like 'summary,status,assignee,priority'"
),
default=",".join(DEFAULT_READ_JIRA_FIELDS),
),
] = ",".join(DEFAULT_READ_JIRA_FIELDS),
limit: Annotated[
int,
Field(description="Maximum number of results (1-50)", default=10, ge=1),
] = 10,
start_at: Annotated[
int,
Field(description="Starting index for pagination (0-based)", default=0, ge=0),
] = 0,
projects_filter: Annotated[
str | None,
Field(
description=(
"(Optional) Comma-separated list of project keys to filter results by. "
"Overrides the environment variable JIRA_PROJECTS_FILTER if provided."
),
default=None,
),
] = None,
expand: Annotated[
str | None,
Field(
description=(
"(Optional) fields to expand. Examples: 'renderedFields', 'transitions', 'changelog'"
),
default=None,
),
] = None,
) -> str:
"""Search Jira issues using JQL (Jira Query Language).
Args:
ctx: The FastMCP context.
jql: JQL query string.
fields: Comma-separated fields to return.
limit: Maximum number of results.
start_at: Starting index for pagination.
projects_filter: Comma-separated list of project keys to filter by.
expand: Optional fields to expand.
Returns:
JSON string representing the search results including pagination info.
"""
jira = await get_jira_fetcher(ctx)
fields_list: str | list[str] | None = fields
if fields and fields != "*all":
fields_list = [f.strip() for f in fields.split(",")]
search_result = jira.search_issues(
jql=jql,
fields=fields_list,
limit=limit,
start=start_at,
expand=expand,
projects_filter=projects_filter,
)
result = search_result.to_simplified_dict()
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def search_fields(
ctx: Context,
keyword: Annotated[
str,
Field(
description="Keyword for fuzzy search. If left empty, lists the first 'limit' available fields in their default order.",
default="",
),
] = "",
limit: Annotated[
int, Field(description="Maximum number of results", default=10, ge=1)
] = 10,
refresh: Annotated[
bool,
Field(description="Whether to force refresh the field list", default=False),
] = False,
) -> str:
"""Search Jira fields by keyword with fuzzy match.
Args:
ctx: The FastMCP context.
keyword: Keyword for fuzzy search.
limit: Maximum number of results.
refresh: Whether to force refresh the field list.
Returns:
JSON string representing a list of matching field definitions.
"""
jira = await get_jira_fetcher(ctx)
result = jira.search_fields(keyword, limit=limit, refresh=refresh)
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_project_issues(
ctx: Context,
project_key: Annotated[str, Field(description="The project key")],
limit: Annotated[
int,
Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
] = 10,
start_at: Annotated[
int,
Field(description="Starting index for pagination (0-based)", default=0, ge=0),
] = 0,
) -> str:
"""Get all issues for a specific Jira project.
Args:
ctx: The FastMCP context.
project_key: The project key.
limit: Maximum number of results.
start_at: Starting index for pagination.
Returns:
JSON string representing the search results including pagination info.
"""
jira = await get_jira_fetcher(ctx)
search_result = jira.get_project_issues(
project_key=project_key, start=start_at, limit=limit
)
result = search_result.to_simplified_dict()
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_transitions(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
) -> str:
"""Get available status transitions for a Jira issue.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
Returns:
JSON string representing a list of available transitions.
"""
jira = await get_jira_fetcher(ctx)
# Underlying method returns list[dict] in the desired format
transitions = jira.get_available_transitions(issue_key)
return json.dumps(transitions, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_worklog(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
) -> str:
"""Get worklog entries for a Jira issue.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
Returns:
JSON string representing the worklog entries.
"""
jira = await get_jira_fetcher(ctx)
worklogs = jira.get_worklogs(issue_key)
result = {"worklogs": worklogs}
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def download_attachments(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
target_dir: Annotated[
str, Field(description="Directory where attachments should be saved")
],
) -> str:
"""Download attachments from a Jira issue.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
target_dir: Directory to save attachments.
Returns:
JSON string indicating the result of the download operation.
"""
jira = await get_jira_fetcher(ctx)
result = jira.download_issue_attachments(issue_key=issue_key, target_dir=target_dir)
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_agile_boards(
ctx: Context,
board_name: Annotated[
str | None,
Field(description="(Optional) The name of board, support fuzzy search"),
] = None,
project_key: Annotated[
str | None, Field(description="(Optional) Jira project key (e.g., 'PROJ-123')")
] = None,
board_type: Annotated[
str | None,
Field(
description="(Optional) The type of jira board (e.g., 'scrum', 'kanban')"
),
] = None,
start_at: Annotated[
int,
Field(description="Starting index for pagination (0-based)", default=0, ge=0),
] = 0,
limit: Annotated[
int,
Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
] = 10,
) -> str:
"""Get jira agile boards by name, project key, or type.
Args:
ctx: The FastMCP context.
board_name: Name of the board (fuzzy search).
project_key: Project key.
board_type: Board type ('scrum' or 'kanban').
start_at: Starting index.
limit: Maximum results.
Returns:
JSON string representing a list of board objects.
"""
jira = await get_jira_fetcher(ctx)
boards = jira.get_all_agile_boards_model(
board_name=board_name,
project_key=project_key,
board_type=board_type,
start=start_at,
limit=limit,
)
result = [board.to_simplified_dict() for board in boards]
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_board_issues(
ctx: Context,
board_id: Annotated[str, Field(description="The id of the board (e.g., '1001')")],
jql: Annotated[
str,
Field(
description=(
"JQL query string (Jira Query Language). Examples:\n"
'- Find Epics: "issuetype = Epic AND project = PROJ"\n'
'- Find issues in Epic: "parent = PROJ-123"\n'
"- Find by status: \"status = 'In Progress' AND project = PROJ\"\n"
'- Find by assignee: "assignee = currentUser()"\n'
'- Find recently updated: "updated >= -7d AND project = PROJ"\n'
'- Find by label: "labels = frontend AND project = PROJ"\n'
'- Find by priority: "priority = High AND project = PROJ"'
)
),
],
fields: Annotated[
str,
Field(
description=(
"Comma-separated fields to return in the results. "
"Use '*all' for all fields, or specify individual "
"fields like 'summary,status,assignee,priority'"
),
default=",".join(DEFAULT_READ_JIRA_FIELDS),
),
] = ",".join(DEFAULT_READ_JIRA_FIELDS),
start_at: Annotated[
int,
Field(description="Starting index for pagination (0-based)", default=0, ge=0),
] = 0,
limit: Annotated[
int,
Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
] = 10,
expand: Annotated[
str,
Field(
description="Optional fields to expand in the response (e.g., 'changelog').",
default="version",
),
] = "version",
) -> str:
"""Get all issues linked to a specific board filtered by JQL.
Args:
ctx: The FastMCP context.
board_id: The ID of the board.
jql: JQL query string to filter issues.
fields: Comma-separated fields to return.
start_at: Starting index for pagination.
limit: Maximum number of results.
expand: Optional fields to expand.
Returns:
JSON string representing the search results including pagination info.
"""
jira = await get_jira_fetcher(ctx)
fields_list: str | list[str] | None = fields
if fields and fields != "*all":
fields_list = [f.strip() for f in fields.split(",")]
search_result = jira.get_board_issues(
board_id=board_id,
jql=jql,
fields=fields_list,
start=start_at,
limit=limit,
expand=expand,
)
result = search_result.to_simplified_dict()
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_sprints_from_board(
ctx: Context,
board_id: Annotated[str, Field(description="The id of board (e.g., '1000')")],
state: Annotated[
str | None,
Field(description="Sprint state (e.g., 'active', 'future', 'closed')"),
] = None,
start_at: Annotated[
int,
Field(description="Starting index for pagination (0-based)", default=0, ge=0),
] = 0,
limit: Annotated[
int,
Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
] = 10,
) -> str:
"""Get jira sprints from board by state.
Args:
ctx: The FastMCP context.
board_id: The ID of the board.
state: Sprint state ('active', 'future', 'closed'). If None, returns all sprints.
start_at: Starting index.
limit: Maximum results.
Returns:
JSON string representing a list of sprint objects.
"""
jira = await get_jira_fetcher(ctx)
sprints = jira.get_all_sprints_from_board_model(
board_id=board_id, state=state, start=start_at, limit=limit
)
result = [sprint.to_simplified_dict() for sprint in sprints]
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_sprint_issues(
ctx: Context,
sprint_id: Annotated[str, Field(description="The id of sprint (e.g., '10001')")],
fields: Annotated[
str,
Field(
description=(
"Comma-separated fields to return in the results. "
"Use '*all' for all fields, or specify individual "
"fields like 'summary,status,assignee,priority'"
),
default=",".join(DEFAULT_READ_JIRA_FIELDS),
),
] = ",".join(DEFAULT_READ_JIRA_FIELDS),
start_at: Annotated[
int,
Field(description="Starting index for pagination (0-based)", default=0, ge=0),
] = 0,
limit: Annotated[
int,
Field(description="Maximum number of results (1-50)", default=10, ge=1, le=50),
] = 10,
) -> str:
"""Get jira issues from sprint.
Args:
ctx: The FastMCP context.
sprint_id: The ID of the sprint.
fields: Comma-separated fields to return.
start_at: Starting index.
limit: Maximum results.
Returns:
JSON string representing the search results including pagination info.
"""
jira = await get_jira_fetcher(ctx)
fields_list: str | list[str] | None = fields
if fields and fields != "*all":
fields_list = [f.strip() for f in fields.split(",")]
search_result = jira.get_sprint_issues(
sprint_id=sprint_id, fields=fields_list, start=start_at, limit=limit
)
result = search_result.to_simplified_dict()
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_link_types(ctx: Context) -> str:
"""Get all available issue link types.
Args:
ctx: The FastMCP context.
Returns:
JSON string representing a list of issue link type objects.
"""
jira = await get_jira_fetcher(ctx)
link_types = jira.get_issue_link_types()
formatted_link_types = [link_type.to_simplified_dict() for link_type in link_types]
return json.dumps(formatted_link_types, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_issue(
ctx: Context,
project_key: Annotated[
str,
Field(
description=(
"The JIRA project key (e.g. 'PROJ', 'DEV', 'SUPPORT'). "
"This is the prefix of issue keys in your project. "
"Never assume what it might be, always ask the user."
)
),
],
summary: Annotated[str, Field(description="Summary/title of the issue")],
issue_type: Annotated[
str,
Field(
description=(
"Issue type (e.g. 'Task', 'Bug', 'Story', 'Epic', 'Subtask'). "
"The available types depend on your project configuration. "
"For subtasks, use 'Subtask' (not 'Sub-task') and include parent in additional_fields."
),
),
],
assignee: Annotated[
str | None,
Field(
description="(Optional) Assignee's user identifier (string): Email, display name, or account ID (e.g., '[email protected]', 'John Doe', 'accountid:...')",
default=None,
),
] = None,
description: Annotated[
str | None, Field(description="Issue description", default=None)
] = None,
components: Annotated[
str | None,
Field(
description="(Optional) Comma-separated list of component names to assign (e.g., 'Frontend,API')",
default=None,
),
] = None,
additional_fields: Annotated[
dict[str, Any] | None,
Field(
description=(
"(Optional) Dictionary of additional fields to set. Examples:\n"
"- Set priority: {'priority': {'name': 'High'}}\n"
"- Add labels: {'labels': ['frontend', 'urgent']}\n"
"- Link to parent (for any issue type): {'parent': 'PROJ-123'}\n"
"- Set Fix Version/s: {'fixVersions': [{'id': '10020'}]}\n"
"- Custom fields: {'customfield_10010': 'value'}"
),
default=None,
),
] = None,
) -> str:
"""Create a new Jira issue with optional Epic link or parent for subtasks.
Args:
ctx: The FastMCP context.
project_key: The JIRA project key.
summary: Summary/title of the issue.
issue_type: Issue type (e.g., 'Task', 'Bug', 'Story', 'Epic', 'Subtask').
assignee: Assignee's user identifier (string): Email, display name, or account ID (e.g., '[email protected]', 'John Doe', 'accountid:...').
description: Issue description.
components: Comma-separated list of component names.
additional_fields: Dictionary of additional fields.
Returns:
JSON string representing the created issue object.
Raises:
ValueError: If in read-only mode or Jira client is unavailable.
"""
jira = await get_jira_fetcher(ctx)
# Parse components from comma-separated string to list
components_list = None
if components and isinstance(components, str):
components_list = [
comp.strip() for comp in components.split(",") if comp.strip()
]
# Use additional_fields directly as dict
extra_fields = additional_fields or {}
if not isinstance(extra_fields, dict):
raise ValueError("additional_fields must be a dictionary.")
issue = jira.create_issue(
project_key=project_key,
summary=summary,
issue_type=issue_type,
description=description,
assignee=assignee,
components=components_list,
**extra_fields,
)
result = issue.to_simplified_dict()
return json.dumps(
{"message": "Issue created successfully", "issue": result},
indent=2,
ensure_ascii=False,
)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def batch_create_issues(
ctx: Context,
issues: Annotated[
str,
Field(
description=(
"JSON array of issue objects. Each object should contain:\n"
"- project_key (required): The project key (e.g., 'PROJ')\n"
"- summary (required): Issue summary/title\n"
"- issue_type (required): Type of issue (e.g., 'Task', 'Bug')\n"
"- description (optional): Issue description\n"
"- assignee (optional): Assignee username or email\n"
"- components (optional): Array of component names\n"
"Example: [\n"
' {"project_key": "PROJ", "summary": "Issue 1", "issue_type": "Task"},\n'
' {"project_key": "PROJ", "summary": "Issue 2", "issue_type": "Bug", "components": ["Frontend"]}\n'
"]"
)
),
],
validate_only: Annotated[
bool,
Field(
description="If true, only validates the issues without creating them",
default=False,
),
] = False,
) -> str:
"""Create multiple Jira issues in a batch.
Args:
ctx: The FastMCP context.
issues: JSON array string of issue objects.
validate_only: If true, only validates without creating.
Returns:
JSON string indicating success and listing created issues (or validation result).
Raises:
ValueError: If in read-only mode, Jira client unavailable, or invalid JSON.
"""
jira = await get_jira_fetcher(ctx)
# Parse issues from JSON string
try:
issues_list = json.loads(issues)
if not isinstance(issues_list, list):
raise ValueError("Input 'issues' must be a JSON array string.")
except json.JSONDecodeError:
raise ValueError("Invalid JSON in issues")
except Exception as e:
raise ValueError(f"Invalid input for issues: {e}") from e
# Create issues in batch
created_issues = jira.batch_create_issues(issues_list, validate_only=validate_only)
message = (
"Issues validated successfully"
if validate_only
else "Issues created successfully"
)
result = {
"message": message,
"issues": [issue.to_simplified_dict() for issue in created_issues],
}
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def batch_get_changelogs(
ctx: Context,
issue_ids_or_keys: Annotated[
list[str],
Field(
description="List of Jira issue IDs or keys, e.g. ['PROJ-123', 'PROJ-124']"
),
],
fields: Annotated[
list[str] | None,
Field(
description="(Optional) Filter the changelogs by fields, e.g. ['status', 'assignee']. Default to None for all fields.",
default=None,
),
] = None,
limit: Annotated[
int,
Field(
description=(
"Maximum number of changelogs to return in result for each issue. "
"Default to -1 for all changelogs. "
"Notice that it only limits the results in the response, "
"the function will still fetch all the data."
),
default=-1,
),
] = -1,
) -> str:
"""Get changelogs for multiple Jira issues (Cloud only).
Args:
ctx: The FastMCP context.
issue_ids_or_keys: List of issue IDs or keys.
fields: List of fields to filter changelogs by. None for all fields.
limit: Maximum changelogs per issue (-1 for all).
Returns:
JSON string representing a list of issues with their changelogs.
Raises:
NotImplementedError: If run on Jira Server/Data Center.
ValueError: If Jira client is unavailable.
"""
jira = await get_jira_fetcher(ctx)
# Ensure this runs only on Cloud, as per original function docstring
if not jira.config.is_cloud:
raise NotImplementedError(
"Batch get issue changelogs is only available on Jira Cloud."
)
# Call the underlying method
issues_with_changelogs = jira.batch_get_changelogs(
issue_ids_or_keys=issue_ids_or_keys, fields=fields
)
# Format the response
results = []
limit_val = None if limit == -1 else limit
for issue in issues_with_changelogs:
results.append(
{
"issue_id": issue.id,
"changelogs": [
changelog.to_simplified_dict()
for changelog in issue.changelogs[:limit_val]
],
}
)
return json.dumps(results, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def update_issue(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
fields: Annotated[
dict[str, Any],
Field(
description=(
"Dictionary of fields to update. For 'assignee', provide a string identifier (email, name, or accountId). "
"Example: `{'assignee': '[email protected]', 'summary': 'New Summary'}`"
)
),
],
additional_fields: Annotated[
dict[str, Any] | None,
Field(
description="(Optional) Dictionary of additional fields to update. Use this for custom fields or more complex updates.",
default=None,
),
] = None,
attachments: Annotated[
str | None,
Field(
description=(
"(Optional) JSON string array or comma-separated list of file paths to attach to the issue. "
"Example: '/path/to/file1.txt,/path/to/file2.txt' or ['/path/to/file1.txt','/path/to/file2.txt']"
),
default=None,
),
] = None,
) -> str:
"""Update an existing Jira issue including changing status, adding Epic links, updating fields, etc.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
fields: Dictionary of fields to update.
additional_fields: Optional dictionary of additional fields.
attachments: Optional JSON array string or comma-separated list of file paths.
Returns:
JSON string representing the updated issue object and attachment results.
Raises:
ValueError: If in read-only mode or Jira client unavailable, or invalid input.
"""
jira = await get_jira_fetcher(ctx)
# Use fields directly as dict
if not isinstance(fields, dict):
raise ValueError("fields must be a dictionary.")
update_fields = fields
# Use additional_fields directly as dict
extra_fields = additional_fields or {}
if not isinstance(extra_fields, dict):
raise ValueError("additional_fields must be a dictionary.")
# Parse attachments
attachment_paths = []
if attachments:
if isinstance(attachments, str):
try:
parsed = json.loads(attachments)
if isinstance(parsed, list):
attachment_paths = [str(p) for p in parsed]
else:
raise ValueError("attachments JSON string must be an array.")
except json.JSONDecodeError:
# Assume comma-separated if not valid JSON array
attachment_paths = [
p.strip() for p in attachments.split(",") if p.strip()
]
else:
raise ValueError(
"attachments must be a JSON array string or comma-separated string."
)
# Combine fields and additional_fields
all_updates = {**update_fields, **extra_fields}
if attachment_paths:
all_updates["attachments"] = attachment_paths
try:
issue = jira.update_issue(issue_key=issue_key, **all_updates)
result = issue.to_simplified_dict()
if (
hasattr(issue, "custom_fields")
and "attachment_results" in issue.custom_fields
):
result["attachment_results"] = issue.custom_fields["attachment_results"]
return json.dumps(
{"message": "Issue updated successfully", "issue": result},
indent=2,
ensure_ascii=False,
)
except Exception as e:
logger.error(f"Error updating issue {issue_key}: {str(e)}", exc_info=True)
raise ValueError(f"Failed to update issue {issue_key}: {str(e)}")
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def delete_issue(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g. PROJ-123)")],
) -> str:
"""Delete an existing Jira issue.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
Returns:
JSON string indicating success.
Raises:
ValueError: If in read-only mode or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
deleted = jira.delete_issue(issue_key)
result = {"message": f"Issue {issue_key} has been deleted successfully."}
# The underlying method raises on failure, so if we reach here, it's success.
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def add_comment(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
comment: Annotated[str, Field(description="Comment text in Markdown format")],
) -> str:
"""Add a comment to a Jira issue.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
comment: Comment text in Markdown.
Returns:
JSON string representing the added comment object.
Raises:
ValueError: If in read-only mode or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
# add_comment returns dict
result = jira.add_comment(issue_key, comment)
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def add_worklog(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
time_spent: Annotated[
str,
Field(
description=(
"Time spent in Jira format. Examples: "
"'1h 30m' (1 hour and 30 minutes), '1d' (1 day), '30m' (30 minutes), '4h' (4 hours)"
)
),
],
comment: Annotated[
str | None,
Field(description="(Optional) Comment for the worklog in Markdown format"),
] = None,
started: Annotated[
str | None,
Field(
description=(
"(Optional) Start time in ISO format. If not provided, the current time will be used. "
"Example: '2023-08-01T12:00:00.000+0000'"
)
),
] = None,
# Add original_estimate and remaining_estimate as per original tool
original_estimate: Annotated[
str | None, Field(description="(Optional) New value for the original estimate")
] = None,
remaining_estimate: Annotated[
str | None, Field(description="(Optional) New value for the remaining estimate")
] = None,
) -> str:
"""Add a worklog entry to a Jira issue.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
time_spent: Time spent in Jira format.
comment: Optional comment in Markdown.
started: Optional start time in ISO format.
original_estimate: Optional new original estimate.
remaining_estimate: Optional new remaining estimate.
Returns:
JSON string representing the added worklog object.
Raises:
ValueError: If in read-only mode or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
# add_worklog returns dict
worklog_result = jira.add_worklog(
issue_key=issue_key,
time_spent=time_spent,
comment=comment,
started=started,
original_estimate=original_estimate,
remaining_estimate=remaining_estimate,
)
result = {"message": "Worklog added successfully", "worklog": worklog_result}
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def link_to_epic(
ctx: Context,
issue_key: Annotated[
str, Field(description="The key of the issue to link (e.g., 'PROJ-123')")
],
epic_key: Annotated[
str, Field(description="The key of the epic to link to (e.g., 'PROJ-456')")
],
) -> str:
"""Link an existing issue to an epic.
Args:
ctx: The FastMCP context.
issue_key: The key of the issue to link.
epic_key: The key of the epic to link to.
Returns:
JSON string representing the updated issue object.
Raises:
ValueError: If in read-only mode or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
issue = jira.link_issue_to_epic(issue_key, epic_key)
result = {
"message": f"Issue {issue_key} has been linked to epic {epic_key}.",
"issue": issue.to_simplified_dict(),
}
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_issue_link(
ctx: Context,
link_type: Annotated[
str,
Field(
description="The type of link to create (e.g., 'Duplicate', 'Blocks', 'Relates to')"
),
],
inward_issue_key: Annotated[
str, Field(description="The key of the inward issue (e.g., 'PROJ-123')")
],
outward_issue_key: Annotated[
str, Field(description="The key of the outward issue (e.g., 'PROJ-456')")
],
comment: Annotated[
str | None, Field(description="(Optional) Comment to add to the link")
] = None,
comment_visibility: Annotated[
dict[str, str] | None,
Field(
description="(Optional) Visibility settings for the comment (e.g., {'type': 'group', 'value': 'jira-users'})",
default=None,
),
] = None,
) -> str:
"""Create a link between two Jira issues.
Args:
ctx: The FastMCP context.
link_type: The type of link (e.g., 'Blocks').
inward_issue_key: The key of the source issue.
outward_issue_key: The key of the target issue.
comment: Optional comment text.
comment_visibility: Optional dictionary for comment visibility.
Returns:
JSON string indicating success or failure.
Raises:
ValueError: If required fields are missing, invalid input, in read-only mode, or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
if not all([link_type, inward_issue_key, outward_issue_key]):
raise ValueError(
"link_type, inward_issue_key, and outward_issue_key are required."
)
link_data = {
"type": {"name": link_type},
"inwardIssue": {"key": inward_issue_key},
"outwardIssue": {"key": outward_issue_key},
}
if comment:
comment_obj = {"body": comment}
if comment_visibility and isinstance(comment_visibility, dict):
if "type" in comment_visibility and "value" in comment_visibility:
comment_obj["visibility"] = comment_visibility
else:
logger.warning("Invalid comment_visibility dictionary structure.")
link_data["comment"] = comment_obj
result = jira.create_issue_link(link_data)
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_remote_issue_link(
ctx: Context,
issue_key: Annotated[
str,
Field(description="The key of the issue to add the link to (e.g., 'PROJ-123')"),
],
url: Annotated[
str,
Field(
description="The URL to link to (e.g., 'https://example.com/page' or Confluence page URL)"
),
],
title: Annotated[
str,
Field(
description="The title/name of the link (e.g., 'Documentation Page', 'Confluence Page')"
),
],
summary: Annotated[
str | None, Field(description="(Optional) Description of the link")
] = None,
relationship: Annotated[
str | None,
Field(
description="(Optional) Relationship description (e.g., 'causes', 'relates to', 'documentation')"
),
] = None,
icon_url: Annotated[
str | None, Field(description="(Optional) URL to a 16x16 icon for the link")
] = None,
) -> str:
"""Create a remote issue link (web link or Confluence link) for a Jira issue.
This tool allows you to add web links and Confluence links to Jira issues.
The links will appear in the issue's "Links" section and can be clicked to navigate to external resources.
Args:
ctx: The FastMCP context.
issue_key: The key of the issue to add the link to.
url: The URL to link to (can be any web page or Confluence page).
title: The title/name that will be displayed for the link.
summary: Optional description of what the link is for.
relationship: Optional relationship description.
icon_url: Optional URL to a 16x16 icon for the link.
Returns:
JSON string indicating success or failure.
Raises:
ValueError: If required fields are missing, invalid input, in read-only mode, or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
if not issue_key:
raise ValueError("issue_key is required.")
if not url:
raise ValueError("url is required.")
if not title:
raise ValueError("title is required.")
# Build the remote link data structure
link_object = {
"url": url,
"title": title,
}
if summary:
link_object["summary"] = summary
if icon_url:
link_object["icon"] = {"url16x16": icon_url, "title": title}
link_data = {"object": link_object}
if relationship:
link_data["relationship"] = relationship
result = jira.create_remote_issue_link(issue_key, link_data)
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def remove_issue_link(
ctx: Context,
link_id: Annotated[str, Field(description="The ID of the link to remove")],
) -> str:
"""Remove a link between two Jira issues.
Args:
ctx: The FastMCP context.
link_id: The ID of the link to remove.
Returns:
JSON string indicating success.
Raises:
ValueError: If link_id is missing, in read-only mode, or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
if not link_id:
raise ValueError("link_id is required")
result = jira.remove_issue_link(link_id) # Returns dict on success
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def transition_issue(
ctx: Context,
issue_key: Annotated[str, Field(description="Jira issue key (e.g., 'PROJ-123')")],
transition_id: Annotated[
str,
Field(
description=(
"ID of the transition to perform. Use the jira_get_transitions tool first "
"to get the available transition IDs for the issue. Example values: '11', '21', '31'"
)
),
],
fields: Annotated[
dict[str, Any] | None,
Field(
description=(
"(Optional) Dictionary of fields to update during the transition. "
"Some transitions require specific fields to be set (e.g., resolution). "
"Example: {'resolution': {'name': 'Fixed'}}"
),
default=None,
),
] = None,
comment: Annotated[
str | None,
Field(
description=(
"(Optional) Comment to add during the transition. "
"This will be visible in the issue history."
),
),
] = None,
) -> str:
"""Transition a Jira issue to a new status.
Args:
ctx: The FastMCP context.
issue_key: Jira issue key.
transition_id: ID of the transition.
fields: Optional dictionary of fields to update during transition.
comment: Optional comment for the transition.
Returns:
JSON string representing the updated issue object.
Raises:
ValueError: If required fields missing, invalid input, in read-only mode, or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
if not issue_key or not transition_id:
raise ValueError("issue_key and transition_id are required.")
# Use fields directly as dict
update_fields = fields or {}
if not isinstance(update_fields, dict):
raise ValueError("fields must be a dictionary.")
issue = jira.transition_issue(
issue_key=issue_key,
transition_id=transition_id,
fields=update_fields,
comment=comment,
)
result = {
"message": f"Issue {issue_key} transitioned successfully",
"issue": issue.to_simplified_dict() if issue else None,
}
return json.dumps(result, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_sprint(
ctx: Context,
board_id: Annotated[str, Field(description="The id of board (e.g., '1000')")],
sprint_name: Annotated[
str, Field(description="Name of the sprint (e.g., 'Sprint 1')")
],
start_date: Annotated[
str, Field(description="Start time for sprint (ISO 8601 format)")
],
end_date: Annotated[
str, Field(description="End time for sprint (ISO 8601 format)")
],
goal: Annotated[
str | None, Field(description="(Optional) Goal of the sprint")
] = None,
) -> str:
"""Create Jira sprint for a board.
Args:
ctx: The FastMCP context.
board_id: Board ID.
sprint_name: Sprint name.
start_date: Start date (ISO format).
end_date: End date (ISO format).
goal: Optional sprint goal.
Returns:
JSON string representing the created sprint object.
Raises:
ValueError: If in read-only mode or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
sprint = jira.create_sprint(
board_id=board_id,
sprint_name=sprint_name,
start_date=start_date,
end_date=end_date,
goal=goal,
)
return json.dumps(sprint.to_simplified_dict(), indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def update_sprint(
ctx: Context,
sprint_id: Annotated[str, Field(description="The id of sprint (e.g., '10001')")],
sprint_name: Annotated[
str | None, Field(description="(Optional) New name for the sprint")
] = None,
state: Annotated[
str | None,
Field(description="(Optional) New state for the sprint (future|active|closed)"),
] = None,
start_date: Annotated[
str | None, Field(description="(Optional) New start date for the sprint")
] = None,
end_date: Annotated[
str | None, Field(description="(Optional) New end date for the sprint")
] = None,
goal: Annotated[
str | None, Field(description="(Optional) New goal for the sprint")
] = None,
) -> str:
"""Update jira sprint.
Args:
ctx: The FastMCP context.
sprint_id: The ID of the sprint.
sprint_name: Optional new name.
state: Optional new state (future|active|closed).
start_date: Optional new start date.
end_date: Optional new end date.
goal: Optional new goal.
Returns:
JSON string representing the updated sprint object or an error message.
Raises:
ValueError: If in read-only mode or Jira client unavailable.
"""
jira = await get_jira_fetcher(ctx)
sprint = jira.update_sprint(
sprint_id=sprint_id,
sprint_name=sprint_name,
state=state,
start_date=start_date,
end_date=end_date,
goal=goal,
)
if sprint is None:
error_payload = {
"error": f"Failed to update sprint {sprint_id}. Check logs for details."
}
return json.dumps(error_payload, indent=2, ensure_ascii=False)
else:
return json.dumps(sprint.to_simplified_dict(), indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_project_versions(
ctx: Context,
project_key: Annotated[str, Field(description="Jira project key (e.g., 'PROJ')")],
) -> str:
"""Get all fix versions for a specific Jira project."""
jira = await get_jira_fetcher(ctx)
versions = jira.get_project_versions(project_key)
return json.dumps(versions, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "read"})
async def get_all_projects(
ctx: Context,
include_archived: Annotated[
bool,
Field(
description="Whether to include archived projects in the results",
default=False,
),
] = False,
) -> str:
"""Get all Jira projects accessible to the current user.
Args:
ctx: The FastMCP context.
include_archived: Whether to include archived projects.
Returns:
JSON string representing a list of project objects accessible to the user.
Project keys are always returned in uppercase.
If JIRA_PROJECTS_FILTER is configured, only returns projects matching those keys.
Raises:
ValueError: If the Jira client is not configured or available.
"""
try:
jira = await get_jira_fetcher(ctx)
projects = jira.get_all_projects(include_archived=include_archived)
except (MCPAtlassianAuthenticationError, HTTPError, OSError, ValueError) as e:
error_message = ""
log_level = logging.ERROR
if isinstance(e, MCPAtlassianAuthenticationError):
error_message = f"Authentication/Permission Error: {str(e)}"
elif isinstance(e, OSError | HTTPError):
error_message = f"Network or API Error: {str(e)}"
elif isinstance(e, ValueError):
error_message = f"Configuration Error: {str(e)}"
error_result = {
"success": False,
"error": error_message,
}
logger.log(log_level, f"get_all_projects failed: {error_message}")
return json.dumps(error_result, indent=2, ensure_ascii=False)
# Ensure all project keys are uppercase
for project in projects:
if "key" in project:
project["key"] = project["key"].upper()
# Apply project filter if configured
if jira.config.projects_filter:
# Split projects filter by commas and handle possible whitespace
allowed_project_keys = {
p.strip().upper() for p in jira.config.projects_filter.split(",")
}
projects = [
project
for project in projects
if project.get("key") in allowed_project_keys
]
return json.dumps(projects, indent=2, ensure_ascii=False)
@jira_mcp.tool(tags={"jira", "write"})
@check_write_access
async def create_version(
ctx: Context,
project_key: Annotated[str, Field(description="Jira project key (e.g., 'PROJ')")],
name: Annotated[str, Field(description="Name of the version")],
start_date: Annotated[
str | None, Field(description="Start date (YYYY-MM-DD)", default=None)
] = None,
release_date: Annotated[
str | None, Field(description="Release date (YYYY-MM-DD)", default=None)
] = None,
description: Annotated[
str | None, Field(description="Description of the version", default=None)
] = None,
) -> str:
"""Create a new fix version in a Jira project.
Args:
ctx: The FastMCP context.
project_key: The project key.
name: Name of the version.
start_date: Start date (optional).
release_date: Release date (optional).
description: Description (optional).
Returns:
JSON string of the created version object.
"""
jira = await get_jira_fetcher(ctx)
try:
version = jira.create_project_version(
project_key=project_key,
name=name,
start_date=start_date,
release_date=release_date,
description=description,
)
return json.dumps(version, indent=2, ensure_ascii=False)
except Exception as e:
logger.error(
f"Error creating version in project {project_key}: {str(e)}", exc_info=True
)
return json.dumps(
{"success": False, "error": str(e)}, indent=2, ensure_ascii=False
)
@jira_mcp.tool(name="batch_create_versions", tags={"jira", "write"})
@check_write_access
async def batch_create_versions(
ctx: Context,
project_key: Annotated[str, Field(description="Jira project key (e.g., 'PROJ')")],
versions: Annotated[
str,
Field(
description=(
"JSON array of version objects. Each object should contain:\n"
"- name (required): Name of the version\n"
"- startDate (optional): Start date (YYYY-MM-DD)\n"
"- releaseDate (optional): Release date (YYYY-MM-DD)\n"
"- description (optional): Description of the version\n"
"Example: [\n"
' {"name": "v1.0", "startDate": "2025-01-01", "releaseDate": "2025-02-01", "description": "First release"},\n'
' {"name": "v2.0"}\n'
"]"
)
),
],
) -> str:
"""Batch create multiple versions in a Jira project.
Args:
ctx: The FastMCP context.
project_key: The project key.
versions: JSON array string of version objects.
Returns:
JSON array of results, each with success flag, version or error.
"""
jira = await get_jira_fetcher(ctx)
try:
version_list = json.loads(versions)
if not isinstance(version_list, list):
raise ValueError("Input 'versions' must be a JSON array string.")
except json.JSONDecodeError:
raise ValueError("Invalid JSON in versions")
except Exception as e:
raise ValueError(f"Invalid input for versions: {e}") from e
results = []
if not version_list:
return json.dumps(results, indent=2, ensure_ascii=False)
for idx, v in enumerate(version_list):
# Defensive: ensure v is a dict and has a name
if not isinstance(v, dict) or not v.get("name"):
results.append(
{
"success": False,
"error": f"Item {idx}: Each version must be an object with at least a 'name' field.",
}
)
continue
try:
version = jira.create_project_version(
project_key=project_key,
name=v["name"],
start_date=v.get("startDate"),
release_date=v.get("releaseDate"),
description=v.get("description"),
)
results.append({"success": True, "version": version})
except Exception as e:
logger.error(
f"Error creating version in batch for project {project_key}: {str(e)}",
exc_info=True,
)
results.append({"success": False, "error": str(e), "input": v})
return json.dumps(results, indent=2, ensure_ascii=False)
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/issues.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira issue operations."""
import logging
from collections import defaultdict
from typing import Any
from requests.exceptions import HTTPError
from ..exceptions import MCPAtlassianAuthenticationError
from ..models.jira import JiraIssue
from ..models.jira.common import JiraChangelog
from ..utils import parse_date
from .client import JiraClient
from .constants import DEFAULT_READ_JIRA_FIELDS
from .protocols import (
AttachmentsOperationsProto,
EpicOperationsProto,
FieldsOperationsProto,
IssueOperationsProto,
ProjectsOperationsProto,
UsersOperationsProto,
)
logger = logging.getLogger("mcp-jira")
class IssuesMixin(
JiraClient,
AttachmentsOperationsProto,
EpicOperationsProto,
FieldsOperationsProto,
IssueOperationsProto,
ProjectsOperationsProto,
UsersOperationsProto,
):
"""Mixin for Jira issue operations."""
def get_issue(
self,
issue_key: str,
expand: str | None = None,
comment_limit: int | str | None = 10,
fields: str | list[str] | tuple[str, ...] | set[str] | None = None,
properties: str | list[str] | None = None,
update_history: bool = True,
) -> JiraIssue:
"""
Get a Jira issue by key.
Args:
issue_key: The issue key (e.g., PROJECT-123)
expand: Fields to expand in the response
comment_limit: Maximum number of comments to include, or "all"
fields: Fields to return (comma-separated string, list, tuple, set, or "*all")
properties: Issue properties to return (comma-separated string or list)
update_history: Whether to update the issue view history
Returns:
JiraIssue model with issue data and metadata
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error retrieving the issue
"""
try:
# Obtain the projects filter from the config.
# These should NOT be overridden by the request.
filter_to_use = self.config.projects_filter
# Apply projects filter if present
if filter_to_use:
# Split projects filter by commas and handle possible whitespace
projects = [p.strip() for p in filter_to_use.split(",")]
# Obtain the project key from issue_key
issue_key_project = issue_key.split("-")[0]
if issue_key_project not in projects:
# If the project key not in the filter, return an empty issue
msg = (
"Issue with project prefix "
f"'{issue_key_project}' are restricted by configuration"
)
raise ValueError(msg)
# Determine fields_param: use provided fields or default from constant
fields_param = fields
if fields_param is None:
fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
elif isinstance(fields_param, list | tuple | set):
fields_param = ",".join(fields_param)
# Ensure necessary fields are included based on special parameters
if (
fields_param == ",".join(DEFAULT_READ_JIRA_FIELDS)
or fields_param == "*all"
):
# Default fields are being used - preserve the order
default_fields_list = (
fields_param.split(",")
if fields_param != "*all"
else list(DEFAULT_READ_JIRA_FIELDS)
)
additional_fields = []
# Add appropriate fields based on expand parameter
if expand:
expand_params = expand.split(",")
if (
"changelog" in expand_params
and "changelog" not in default_fields_list
and "changelog" not in additional_fields
):
additional_fields.append("changelog")
if (
"renderedFields" in expand_params
and "rendered" not in default_fields_list
and "rendered" not in additional_fields
):
additional_fields.append("rendered")
# Add appropriate fields based on properties parameter
if (
properties
and "properties" not in default_fields_list
and "properties" not in additional_fields
):
additional_fields.append("properties")
# Combine default fields with additional fields, preserving order
if additional_fields:
fields_param = ",".join(default_fields_list + additional_fields)
# Handle non-default fields string
# Build expand parameter if provided
expand_param = expand
# Convert properties to proper format if it's a list
properties_param = properties
if properties and isinstance(properties, list | tuple | set):
properties_param = ",".join(properties)
# Get the issue data with all parameters
issue = self.jira.get_issue(
issue_key,
expand=expand_param,
fields=fields_param,
properties=properties_param,
update_history=update_history,
)
if not issue:
msg = f"Issue {issue_key} not found"
raise ValueError(msg)
if not isinstance(issue, dict):
msg = (
f"Unexpected return value type from `jira.get_issue`: {type(issue)}"
)
logger.error(msg)
raise TypeError(msg)
# Extract fields data, safely handling None
fields_data = issue.get("fields", {}) or {}
# Get comments if needed
if "comment" in fields_data:
comment_limit_int = self._normalize_comment_limit(comment_limit)
comments = self._get_issue_comments_if_needed(
issue_key, comment_limit_int
)
# Add comments to the issue data for processing by the model
fields_data["comment"]["comments"] = comments
# Extract epic information
try:
epic_info = self._extract_epic_information(issue)
except Exception as e:
logger.warning(f"Error extracting epic information: {str(e)}")
epic_info = {"epic_key": None, "epic_name": None}
# If this is linked to an epic, add the epic information to the fields
if epic_info.get("epic_key"):
try:
# Get field IDs for epic fields
field_ids = self.get_field_ids_to_epic()
# Add epic link field if it doesn't exist
if (
"epic_link" in field_ids
and field_ids["epic_link"] not in fields_data
):
fields_data[field_ids["epic_link"]] = epic_info["epic_key"]
# Add epic name field if it doesn't exist
if (
epic_info.get("epic_name")
and "epic_name" in field_ids
and field_ids["epic_name"] not in fields_data
):
fields_data[field_ids["epic_name"]] = epic_info["epic_name"]
except Exception as e:
logger.warning(f"Error setting epic fields: {str(e)}")
# Update the issue data with the fields
issue["fields"] = fields_data
# Create and return the JiraIssue model, passing requested_fields
return JiraIssue.from_api_response(
issue,
base_url=self.config.url if hasattr(self, "config") else None,
requested_fields=fields,
)
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API ({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
raise
except Exception as e:
error_msg = str(e)
logger.error(f"Error retrieving issue {issue_key}: {error_msg}")
raise Exception(f"Error retrieving issue {issue_key}: {error_msg}") from e
def _normalize_comment_limit(self, comment_limit: int | str | None) -> int | None:
"""
Normalize the comment limit to an integer or None.
Args:
comment_limit: The comment limit as int, string, or None
Returns:
Normalized comment limit as int or None
"""
if comment_limit is None:
return None
if isinstance(comment_limit, int):
return comment_limit
if comment_limit == "all":
return None # No limit
# Try to convert to int
try:
return int(comment_limit)
except ValueError:
# If conversion fails, default to 10
return 10
def _get_issue_comments_if_needed(
self, issue_key: str, comment_limit: int | None
) -> list[dict]:
"""
Get comments for an issue if needed.
Args:
issue_key: The issue key
comment_limit: Maximum number of comments to include
Returns:
List of comments
"""
if comment_limit is None or comment_limit > 0:
try:
response = self.jira.issue_get_comments(issue_key)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.issue_get_comments`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
comments = response["comments"]
# Limit comments if needed
if comment_limit is not None:
comments = comments[:comment_limit]
return comments
except Exception as e:
logger.warning(f"Error getting comments for {issue_key}: {str(e)}")
return []
return []
def _extract_epic_information(self, issue: dict) -> dict[str, str | None]:
"""
Extract epic information from an issue.
Args:
issue: The issue data
Returns:
Dictionary with epic information
"""
# Initialize with default values
epic_info = {
"epic_key": None,
"epic_name": None,
"epic_summary": None,
"is_epic": False,
}
try:
fields = issue.get("fields", {}) or {}
issue_type = fields.get("issuetype", {}).get("name", "").lower()
# Get field IDs for epic fields
try:
field_ids = self.get_field_ids_to_epic()
except Exception as e:
logger.warning(f"Error getting Jira fields: {str(e)}")
field_ids = {}
# Check if this is an epic
if issue_type == "epic":
epic_info["is_epic"] = True
# Use the discovered field ID for epic name
if "epic_name" in field_ids and field_ids["epic_name"] in fields:
epic_info["epic_name"] = fields.get(field_ids["epic_name"], "")
# If not an epic, check for epic link
elif "epic_link" in field_ids:
epic_link_field = field_ids["epic_link"]
if epic_link_field in fields and fields[epic_link_field]:
epic_key = fields[epic_link_field]
epic_info["epic_key"] = epic_key
# Try to get epic details
try:
epic = self.jira.get_issue(
epic_key,
expand=None,
fields=None,
properties=None,
update_history=True,
)
if not isinstance(epic, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(epic)}"
logger.error(msg)
raise TypeError(msg)
epic_fields = epic.get("fields", {}) or {}
# Get epic name using the discovered field ID
if "epic_name" in field_ids:
epic_info["epic_name"] = epic_fields.get(
field_ids["epic_name"], ""
)
epic_info["epic_summary"] = epic_fields.get("summary", "")
except Exception as e:
logger.warning(
f"Error getting epic details for {epic_key}: {str(e)}"
)
except Exception as e:
logger.warning(f"Error extracting epic information: {str(e)}")
return epic_info
def _format_issue_content(
self,
issue_key: str,
issue: dict,
description: str,
comments: list[dict],
created_date: str,
epic_info: dict[str, str | None],
) -> str:
"""
Format issue content for display.
Args:
issue_key: The issue key
issue: The issue data
description: The issue description
comments: The issue comments
created_date: The formatted creation date
epic_info: Epic information
Returns:
Formatted issue content
"""
fields = issue.get("fields", {})
# Basic issue information
summary = fields.get("summary", "")
status = fields.get("status", {}).get("name", "")
issue_type = fields.get("issuetype", {}).get("name", "")
# Format content
content = [f"# {issue_key}: {summary}"]
content.append(f"**Type**: {issue_type}")
content.append(f"**Status**: {status}")
content.append(f"**Created**: {created_date}")
# Add reporter
reporter = fields.get("reporter", {})
reporter_name = reporter.get("displayName", "") or reporter.get("name", "")
if reporter_name:
content.append(f"**Reporter**: {reporter_name}")
# Add assignee
assignee = fields.get("assignee", {})
assignee_name = assignee.get("displayName", "") or assignee.get("name", "")
if assignee_name:
content.append(f"**Assignee**: {assignee_name}")
# Add epic information
if epic_info["is_epic"]:
content.append(f"**Epic Name**: {epic_info['epic_name']}")
elif epic_info["epic_key"]:
content.append(
f"**Epic**: [{epic_info['epic_key']}] {epic_info['epic_summary']}"
)
# Add description
if description:
content.append("\n## Description\n")
content.append(description)
# Add comments
if comments:
content.append("\n## Comments\n")
for comment in comments:
author = comment.get("author", {})
author_name = author.get("displayName", "") or author.get("name", "")
comment_body = self._clean_text(comment.get("body", ""))
if author_name and comment_body:
comment_date = comment.get("created", "")
if comment_date:
comment_date = parse_date(comment_date)
content.append(f"**{author_name}** ({comment_date}):")
else:
content.append(f"**{author_name}**:")
content.append(f"{comment_body}\n")
return "\n".join(content)
def _create_issue_metadata(
self,
issue_key: str,
issue: dict,
comments: list[dict],
created_date: str,
epic_info: dict[str, str | None],
) -> dict[str, Any]:
"""
Create metadata for a Jira issue.
Args:
issue_key: The issue key
issue: The issue data
comments: The issue comments
created_date: The formatted creation date
epic_info: Epic information
Returns:
Metadata dictionary
"""
fields = issue.get("fields", {})
# Initialize metadata
metadata = {
"key": issue_key,
"title": fields.get("summary", ""),
"status": fields.get("status", {}).get("name", ""),
"type": fields.get("issuetype", {}).get("name", ""),
"created": created_date,
"url": f"{self.config.url}/browse/{issue_key}",
}
# Add assignee if available
assignee = fields.get("assignee", {})
if assignee:
metadata["assignee"] = assignee.get("displayName", "") or assignee.get(
"name", ""
)
# Add epic information
if epic_info["is_epic"]:
metadata["is_epic"] = True
metadata["epic_name"] = epic_info["epic_name"]
elif epic_info["epic_key"]:
metadata["epic_key"] = epic_info["epic_key"]
metadata["epic_name"] = epic_info["epic_name"]
metadata["epic_summary"] = epic_info["epic_summary"]
# Add comment count
metadata["comment_count"] = len(comments)
return metadata
def create_issue(
self,
project_key: str,
summary: str,
issue_type: str,
description: str = "",
assignee: str | None = None,
components: list[str] | None = None,
**kwargs: Any, # noqa: ANN401 - Dynamic field types are necessary for Jira API
) -> JiraIssue:
"""
Create a new Jira issue.
Args:
project_key: The key of the project
summary: The issue summary
issue_type: The type of issue to create
description: The issue description
assignee: The username or account ID of the assignee
components: List of component names to assign (e.g., ["Frontend", "API"])
**kwargs: Additional fields to set on the issue
Returns:
JiraIssue model representing the created issue
Raises:
Exception: If there is an error creating the issue
"""
try:
# Validate required fields
if not project_key:
raise ValueError("Project key is required")
if not summary:
raise ValueError("Summary is required")
if not issue_type:
raise ValueError("Issue type is required")
# Handle Epic and Subtask issue type names across different languages
actual_issue_type = issue_type
if self._is_epic_issue_type(issue_type) and issue_type.lower() == "epic":
# If the user provided "Epic" but we need to find the localized name
epic_type_name = self._find_epic_issue_type_name(project_key)
if epic_type_name:
actual_issue_type = epic_type_name
logger.info(
f"Using localized Epic issue type name: {actual_issue_type}"
)
elif issue_type.lower() in ["subtask", "sub-task"]:
# If the user provided "Subtask" but we need to find the localized name
subtask_type_name = self._find_subtask_issue_type_name(project_key)
if subtask_type_name:
actual_issue_type = subtask_type_name
logger.info(
f"Using localized Subtask issue type name: {actual_issue_type}"
)
# Prepare fields
fields: dict[str, Any] = {
"project": {"key": project_key},
"summary": summary,
"issuetype": {"name": actual_issue_type},
}
# Add description if provided (convert from Markdown to Jira format)
if description:
fields["description"] = self._markdown_to_jira(description)
# Add assignee if provided
if assignee:
try:
# _get_account_id now returns the correct identifier (accountId for cloud, name for server)
assignee_identifier = self._get_account_id(assignee)
self._add_assignee_to_fields(fields, assignee_identifier)
except ValueError as e:
logger.warning(f"Could not assign issue: {str(e)}")
# Add components if provided
if components:
if isinstance(components, list):
# Filter out any None or empty/whitespace-only strings
valid_components = [
comp_name.strip()
for comp_name in components
if isinstance(comp_name, str) and comp_name.strip()
]
if valid_components:
# Format as list of {"name": ...} dicts for the API
fields["components"] = [
{"name": comp_name} for comp_name in valid_components
]
# Make a copy of kwargs to preserve original values for two-step Epic creation
kwargs_copy = kwargs.copy()
# Prepare epic fields if this is an epic
# This step now stores epic-specific fields in kwargs for post-creation update
if self._is_epic_issue_type(issue_type):
self._prepare_epic_fields(fields, summary, kwargs)
# Prepare parent field if this is a subtask
if issue_type.lower() == "subtask" or issue_type.lower() == "sub-task":
self._prepare_parent_fields(fields, kwargs)
# Allow parent field for all issue types when explicitly provided
elif "parent" in kwargs:
self._prepare_parent_fields(fields, kwargs)
# Process **kwargs using the dynamic field map
self._process_additional_fields(fields, kwargs_copy)
# Create the issue
response = self.jira.create_issue(fields=fields)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.create_issue`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Get the created issue key
issue_key = response.get("key")
if not issue_key:
error_msg = "No issue key in response"
raise ValueError(error_msg)
# For Epics, perform the second step: update Epic-specific fields
if self._is_epic_issue_type(issue_type):
# Check if we have any stored Epic fields to update
has_epic_fields = any(k.startswith("__epic_") for k in kwargs)
if has_epic_fields:
logger.info(
f"Performing post-creation update for Epic {issue_key} with Epic-specific fields"
)
try:
return self.update_epic_fields(issue_key, kwargs)
except Exception as update_error:
logger.error(
f"Error during post-creation update of Epic {issue_key}: {str(update_error)}"
)
logger.info(
"Continuing with the original Epic that was successfully created"
)
# Get the full issue data and convert to JiraIssue model
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
return JiraIssue.from_api_response(issue_data)
except Exception as e:
self._handle_create_issue_error(e, issue_type)
raise # Re-raise after logging
def _is_epic_issue_type(self, issue_type: str) -> bool:
"""
Check if an issue type is an Epic, handling localized names.
Args:
issue_type: The issue type name to check
Returns:
True if the issue type is an Epic, False otherwise
"""
# Common Epic names in different languages
epic_names = {
"epic", # English
"에픽", # Korean
"エピック", # Japanese
"史诗", # Chinese (Simplified)
"史詩", # Chinese (Traditional)
"épica", # Spanish/Portuguese
"épique", # French
"epik", # Turkish
"эпик", # Russian
"епік", # Ukrainian
}
return issue_type.lower() in epic_names or "epic" in issue_type.lower()
def _find_epic_issue_type_name(self, project_key: str) -> str | None:
"""
Find the actual Epic issue type name for a project.
Args:
project_key: The project key
Returns:
The Epic issue type name if found, None otherwise
"""
try:
issue_types = self.get_project_issue_types(project_key)
for issue_type in issue_types:
type_name = issue_type.get("name", "")
if self._is_epic_issue_type(type_name):
return type_name
return None
except Exception as e:
logger.warning(f"Could not get issue types for project {project_key}: {e}")
return None
def _find_subtask_issue_type_name(self, project_key: str) -> str | None:
"""
Find the actual Subtask issue type name for a project.
Args:
project_key: The project key
Returns:
The Subtask issue type name if found, None otherwise
"""
try:
issue_types = self.get_project_issue_types(project_key)
for issue_type in issue_types:
# Check the subtask field - this is the most reliable way
if issue_type.get("subtask", False):
return issue_type.get("name")
return None
except Exception as e:
logger.warning(f"Could not get issue types for project {project_key}: {e}")
return None
def _prepare_epic_fields(
self, fields: dict[str, Any], summary: str, kwargs: dict[str, Any]
) -> None:
"""
Prepare fields for epic creation.
This method delegates to the prepare_epic_fields method in EpicsMixin.
Args:
fields: The fields dictionary to update
summary: The epic summary
kwargs: Additional fields from the user
"""
# Extract project_key from fields if available
project_key = None
if "project" in fields:
if isinstance(fields["project"], dict):
project_key = fields["project"].get("key")
elif isinstance(fields["project"], str):
project_key = fields["project"]
# Delegate to EpicsMixin.prepare_epic_fields with project_key
# Since JiraFetcher inherits from both IssuesMixin and EpicsMixin,
# this will correctly use the prepare_epic_fields method from EpicsMixin
# which implements the two-step Epic creation approach
self.prepare_epic_fields(fields, summary, kwargs, project_key)
def _prepare_parent_fields(
self, fields: dict[str, Any], kwargs: dict[str, Any]
) -> None:
"""
Prepare fields for parent relationship.
Args:
fields: The fields dictionary to update
kwargs: Additional fields from the user
Raises:
ValueError: If parent issue key is not specified for a subtask
"""
if "parent" in kwargs:
parent_key = kwargs.get("parent")
if parent_key:
fields["parent"] = {"key": parent_key}
# Remove parent from kwargs to avoid double processing
kwargs.pop("parent", None)
elif "issuetype" in fields and fields["issuetype"]["name"].lower() in (
"subtask",
"sub-task",
):
# Only raise error if issue type is subtask and parent is missing
raise ValueError(
"Issue type is a sub-task but parent issue key or id not specified. Please provide a 'parent' parameter with the parent issue key."
)
def _add_assignee_to_fields(self, fields: dict[str, Any], assignee: str) -> None:
"""
Add assignee to issue fields.
Args:
fields: The fields dictionary to update
assignee: The assignee account ID
"""
# Cloud instance uses accountId
if self.config.is_cloud:
fields["assignee"] = {"accountId": assignee}
else:
# Server/DC might use name instead of accountId
fields["assignee"] = {"name": assignee}
def _process_additional_fields(
self, fields: dict[str, Any], kwargs: dict[str, Any]
) -> None:
"""
Processes keyword arguments to add standard or custom fields to the issue fields dictionary.
Uses the dynamic field map from FieldsMixin to identify field IDs.
Args:
fields: The fields dictionary to update
kwargs: Additional fields provided via **kwargs
"""
# Ensure field map is loaded/cached
field_map = (
self._generate_field_map()
) # Ensure map is ready (method from FieldsMixin)
if not field_map:
logger.error(
"Could not generate field map. Cannot process additional fields."
)
return
# Process each kwarg
# Iterate over a copy to allow modification of the original kwargs if needed elsewhere
for key, value in kwargs.copy().items():
# Skip keys used internally for epic/parent handling or explicitly handled args like assignee/components
if key.startswith("__epic_") or key in ("parent", "assignee", "components"):
continue
normalized_key = key.lower()
api_field_id = None
# 1. Check if key is a known field name in the map
if normalized_key in field_map:
api_field_id = field_map[normalized_key]
logger.debug(
f"Identified field '{key}' as '{api_field_id}' via name map."
)
# 2. Check if key is a direct custom field ID
elif key.startswith("customfield_"):
api_field_id = key
logger.debug(f"Identified field '{key}' as direct custom field ID.")
# 3. Check if key is a standard system field ID (like 'summary', 'priority')
elif key in field_map: # Check original case for system fields
api_field_id = field_map[key]
logger.debug(f"Identified field '{key}' as standard system field ID.")
if api_field_id:
# Get the full field definition for formatting context if needed
field_definition = self.get_field_by_id(
api_field_id
) # From FieldsMixin
formatted_value = self._format_field_value_for_write(
api_field_id, value, field_definition
)
if formatted_value is not None: # Only add if formatting didn't fail
fields[api_field_id] = formatted_value
logger.debug(
f"Added field '{api_field_id}' from kwarg '{key}': {formatted_value}"
)
else:
logger.warning(
f"Skipping field '{key}' due to formatting error or invalid value."
)
else:
# 4. Unrecognized key - log a warning and skip
logger.warning(
f"Ignoring unrecognized field '{key}' passed via kwargs."
)
def _format_field_value_for_write(
self, field_id: str, value: Any, field_definition: dict | None
) -> Any:
"""Formats field values for the Jira API."""
# Get schema type if definition is available
schema_type = (
field_definition.get("schema", {}).get("type") if field_definition else None
)
# Prefer name from definition if available, else use ID for logging/lookup
field_name_for_format = (
field_definition.get("name", field_id) if field_definition else field_id
)
# Example formatting rules based on standard field names (use lowercase for comparison)
normalized_name = field_name_for_format.lower()
if normalized_name == "priority":
if isinstance(value, str):
return {"name": value}
elif isinstance(value, dict) and ("name" in value or "id" in value):
return value # Assume pre-formatted
else:
logger.warning(
f"Invalid format for priority field: {value}. Expected string name or dict."
)
return None # Or raise error
elif normalized_name == "labels":
if isinstance(value, list) and all(isinstance(item, str) for item in value):
return value
# Allow comma-separated string if passed via additional_fields JSON string
elif isinstance(value, str):
return [label.strip() for label in value.split(",") if label.strip()]
else:
logger.warning(
f"Invalid format for labels field: {value}. Expected list of strings or comma-separated string."
)
return None
elif normalized_name in ["fixversions", "versions", "components"]:
# These expect lists of objects, typically {"name": "..."} or {"id": "..."}
if isinstance(value, list):
formatted_list = []
for item in value:
if isinstance(item, str):
formatted_list.append({"name": item}) # Convert simple strings
elif isinstance(item, dict) and ("name" in item or "id" in item):
formatted_list.append(item) # Keep pre-formatted dicts
else:
logger.warning(
f"Invalid item format in {normalized_name} list: {item}"
)
return formatted_list
else:
logger.warning(
f"Invalid format for {normalized_name} field: {value}. Expected list."
)
return None
elif normalized_name == "reporter":
if isinstance(value, str):
try:
reporter_identifier = self._get_account_id(value)
if self.config.is_cloud:
return {"accountId": reporter_identifier}
else:
return {"name": reporter_identifier}
except ValueError as e:
logger.warning(f"Could not format reporter field: {str(e)}")
return None
elif isinstance(value, dict) and ("name" in value or "accountId" in value):
return value # Assume pre-formatted
else:
logger.warning(f"Invalid format for reporter field: {value}")
return None
# Add more formatting rules for other standard fields based on schema_type or field_id
elif normalized_name == "duedate":
if isinstance(value, str): # Basic check, could add date validation
return value
else:
logger.warning(
f"Invalid format for duedate field: {value}. Expected YYYY-MM-DD string."
)
return None
elif schema_type == "datetime" and isinstance(value, str):
# Example: Ensure datetime fields are in ISO format if needed by API
try:
dt = parse_date(value) # Assuming parse_date handles various inputs
return (
dt.isoformat() if dt else value
) # Return ISO or original if parse fails
except Exception:
logger.warning(
f"Could not parse datetime for field {field_id}: {value}"
)
return value # Return original on error
# Default: return value as is if no specific formatting needed/identified
return value
def _handle_create_issue_error(self, exception: Exception, issue_type: str) -> None:
"""
Handle errors when creating an issue.
Args:
exception: The exception that occurred
issue_type: The type of issue being created
"""
error_msg = str(exception)
# Check for specific error types
if "epic name" in error_msg.lower() or "epicname" in error_msg.lower():
logger.error(
f"Error creating {issue_type}: {error_msg}. "
"Try specifying an epic_name in the additional fields"
)
elif "customfield" in error_msg.lower():
logger.error(
f"Error creating {issue_type}: {error_msg}. "
"This may be due to a required custom field"
)
else:
logger.error(f"Error creating {issue_type}: {error_msg}")
def update_issue(
self,
issue_key: str,
fields: dict[str, Any] | None = None,
**kwargs: Any, # noqa: ANN401 - Dynamic field types are necessary for Jira API
) -> JiraIssue:
"""
Update a Jira issue.
Args:
issue_key: The key of the issue to update
fields: Dictionary of fields to update
**kwargs: Additional fields to update. Special fields include:
- attachments: List of file paths to upload as attachments
- status: New status for the issue (handled via transitions)
- assignee: New assignee for the issue
Returns:
JiraIssue model representing the updated issue
Raises:
Exception: If there is an error updating the issue
"""
try:
# Validate required fields
if not issue_key:
raise ValueError("Issue key is required")
update_fields = fields or {}
attachments_result = None
# Convert description from Markdown to Jira format if present
if "description" in update_fields:
update_fields["description"] = self._markdown_to_jira(
update_fields["description"]
)
# Process kwargs
for key, value in kwargs.items():
if key == "status":
# Status changes are handled separately via transitions
# Add status to fields so _update_issue_with_status can find it
update_fields["status"] = value
return self._update_issue_with_status(issue_key, update_fields)
elif key == "attachments":
# Handle attachments separately - they're not part of fields update
if value and isinstance(value, list | tuple):
# We'll process attachments after updating fields
pass
else:
logger.warning(f"Invalid attachments value: {value}")
elif key == "assignee":
# Handle assignee updates, allow unassignment with None or empty string
if value is None or value == "":
update_fields["assignee"] = None
else:
try:
account_id = self._get_account_id(value)
self._add_assignee_to_fields(update_fields, account_id)
except ValueError as e:
logger.warning(f"Could not update assignee: {str(e)}")
elif key == "description":
# Handle description with markdown conversion
update_fields["description"] = self._markdown_to_jira(value)
else:
# Process regular fields using _process_additional_fields
# Create a temporary dict with just this field
field_kwargs = {key: value}
self._process_additional_fields(update_fields, field_kwargs)
# Update the issue fields
if update_fields:
self.jira.update_issue(
issue_key=issue_key, update={"fields": update_fields}
)
# Handle attachments if provided
if "attachments" in kwargs and kwargs["attachments"]:
try:
attachments_result = self.upload_attachments(
issue_key, kwargs["attachments"]
)
logger.info(
f"Uploaded attachments to {issue_key}: {attachments_result}"
)
except Exception as e:
logger.error(
f"Error uploading attachments to {issue_key}: {str(e)}"
)
# Continue with the update even if attachments fail
# Get the updated issue data and convert to JiraIssue model
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
issue = JiraIssue.from_api_response(issue_data)
# Add attachment results to the response if available
if attachments_result:
issue.custom_fields["attachment_results"] = attachments_result
return issue
except Exception as e:
error_msg = str(e)
logger.error(f"Error updating issue {issue_key}: {error_msg}")
raise ValueError(f"Failed to update issue {issue_key}: {error_msg}") from e
def _update_issue_with_status(
self, issue_key: str, fields: dict[str, Any]
) -> JiraIssue:
"""
Update an issue with a status change.
Args:
issue_key: The key of the issue to update
fields: Dictionary of fields to update
Returns:
JiraIssue model representing the updated issue
Raises:
Exception: If there is an error updating the issue
"""
# Extract status from fields and remove it for the standard update
status = fields.pop("status", None)
# First update any fields if needed
if fields:
self.jira.update_issue(issue_key=issue_key, fields=fields) # type: ignore[call-arg]
# If no status change is requested, return the issue
if not status:
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
return JiraIssue.from_api_response(issue_data)
# Get available transitions (uses TransitionsMixin's normalized implementation)
transitions = self.get_available_transitions(issue_key) # type: ignore[attr-defined]
# Extract status name or ID depending on what we received
status_name = None
status_id = None
# Handle different input formats for status
if isinstance(status, dict):
# Dictionary format: {"name": "In Progress"} or {"id": "123"}
status_name = status.get("name")
status_id = status.get("id")
elif isinstance(status, str):
# String format: could be a name or an ID
if status.isdigit():
status_id = status
else:
status_name = status
elif isinstance(status, int):
# Integer format: must be an ID
status_id = str(status)
else:
# Unknown format
logger.warning(
f"Unrecognized status format: {status} (type: {type(status)})"
)
status_name = str(status)
# Log what we're searching for
if status_name:
logger.info(f"Looking for transition to status name: '{status_name}'")
if status_id:
logger.info(f"Looking for transition with ID: '{status_id}'")
# Find the appropriate transition
transition_id = None
for transition in transitions:
# TransitionsMixin returns normalized transitions with 'to_status' field
transition_status_name = transition.get("to_status", "")
# Match by name (case-insensitive)
if (
status_name
and transition_status_name
and transition_status_name.lower() == status_name.lower()
):
transition_id = transition.get("id")
logger.info(
f"Found transition ID {transition_id} matching status name '{status_name}'"
)
break
# Direct transition ID match (if status is actually a transition ID)
if status_id and str(transition.get("id", "")) == str(status_id):
transition_id = transition.get("id")
logger.info(f"Using direct transition ID {transition_id}")
break
if not transition_id:
# Build list of available statuses from normalized transitions
available_statuses = []
for t in transitions:
# Include transition name and target status if available
transition_name = t.get("name", "")
to_status = t.get("to_status", "")
if to_status:
available_statuses.append(f"{transition_name} -> {to_status}")
elif transition_name:
available_statuses.append(transition_name)
available_statuses_str = (
", ".join(available_statuses) if available_statuses else "None found"
)
error_msg = (
f"Could not find transition to status '{status}'. "
f"Available transitions: {available_statuses_str}"
)
logger.error(error_msg)
raise ValueError(error_msg)
# Perform the transition
logger.info(f"Performing transition with ID {transition_id}")
self.jira.set_issue_status_by_transition_id(
issue_key=issue_key,
transition_id=(
int(transition_id)
if isinstance(transition_id, str) and transition_id.isdigit()
else transition_id
),
)
# Get the updated issue data
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
return JiraIssue.from_api_response(issue_data)
def delete_issue(self, issue_key: str) -> bool:
"""
Delete a Jira issue.
Args:
issue_key: The key of the issue to delete
Returns:
True if the issue was deleted successfully
Raises:
Exception: If there is an error deleting the issue
"""
try:
self.jira.delete_issue(issue_key)
return True
except Exception as e:
msg = f"Error deleting issue {issue_key}: {str(e)}"
logger.error(msg)
raise Exception(msg) from e
def _log_available_fields(self, fields: list[dict]) -> None:
"""
Log available fields for debugging.
Args:
fields: List of field definitions
"""
logger.debug("Available Jira fields:")
for field in fields:
logger.debug(
f"{field.get('id')}: {field.get('name')} ({field.get('schema', {}).get('type')})"
)
def _process_field_for_epic_data(
self, field: dict, field_ids: dict[str, str]
) -> None:
"""
Process a field for epic-related data.
Args:
field: The field data to process
field_ids: Dictionary of field IDs to update
"""
try:
field_id = field.get("id")
if not field_id:
return
# Skip non-custom fields
if not field_id.startswith("customfield_"):
return
name = field.get("name", "").lower()
# Look for field names related to epics
if "epic" in name:
if "link" in name:
field_ids["epic_link"] = field_id
field_ids["Epic Link"] = field_id
elif "name" in name:
field_ids["epic_name"] = field_id
field_ids["Epic Name"] = field_id
except Exception as e:
logger.warning(f"Error processing field for epic data: {str(e)}")
def _get_raw_transitions(self, issue_key: str) -> list[dict]:
"""
Get raw transition data from the Jira API.
This is an internal method that returns unprocessed transition data.
For normalized transitions with proper structure, use get_available_transitions()
from TransitionsMixin instead.
Args:
issue_key: The key of the issue
Returns:
List of raw transition data from the API
Raises:
Exception: If there is an error getting transitions
"""
try:
transitions = self.jira.get_issue_transitions(issue_key)
return transitions
except Exception as e:
logger.error(f"Error getting transitions for issue {issue_key}: {str(e)}")
raise Exception(
f"Error getting transitions for issue {issue_key}: {str(e)}"
) from e
def transition_issue(self, issue_key: str, transition_id: str) -> JiraIssue:
"""
Transition an issue to a new status.
Args:
issue_key: The key of the issue
transition_id: The ID of the transition to perform
Returns:
JiraIssue model with the updated issue data
Raises:
Exception: If there is an error transitioning the issue
"""
try:
self.jira.set_issue_status(
issue_key=issue_key, status_name=transition_id, fields=None, update=None
)
return self.get_issue(issue_key)
except Exception as e:
logger.error(f"Error transitioning issue {issue_key}: {str(e)}")
raise
def batch_create_issues(
self,
issues: list[dict[str, Any]],
validate_only: bool = False,
) -> list[JiraIssue]:
"""Create multiple Jira issues in a batch.
Args:
issues: List of issue dictionaries, each containing:
- project_key (str): Key of the project
- summary (str): Issue summary
- issue_type (str): Type of issue
- description (str, optional): Issue description
- assignee (str, optional): Username of assignee
- components (list[str], optional): List of component names
- **kwargs: Additional fields specific to your Jira instance
validate_only: If True, only validates the issues without creating them
Returns:
List of created JiraIssue objects
Raises:
ValueError: If any required fields are missing or invalid
MCPAtlassianAuthenticationError: If authentication fails
"""
if not issues:
return []
# Prepare issues for bulk creation
issue_updates = []
for issue_data in issues:
try:
# Extract and validate required fields
project_key = issue_data.pop("project_key", None)
summary = issue_data.pop("summary", None)
issue_type = issue_data.pop("issue_type", None)
description = issue_data.pop("description", "")
assignee = issue_data.pop("assignee", None)
components = issue_data.pop("components", None)
# Validate required fields
if not all([project_key, summary, issue_type]):
raise ValueError(
f"Missing required fields for issue: {project_key=}, {summary=}, {issue_type=}"
)
# Prepare fields dictionary
fields = {
"project": {"key": project_key},
"summary": summary,
"issuetype": {"name": issue_type},
}
# Add optional fields
if description:
fields["description"] = description
# Add assignee if provided
if assignee:
try:
# _get_account_id now returns the correct identifier (accountId for cloud, name for server)
assignee_identifier = self._get_account_id(assignee)
self._add_assignee_to_fields(fields, assignee_identifier)
except ValueError as e:
logger.warning(f"Could not assign issue: {str(e)}")
# Add components if provided
if components:
if isinstance(components, list):
valid_components = [
comp_name.strip()
for comp_name in components
if isinstance(comp_name, str) and comp_name.strip()
]
if valid_components:
fields["components"] = [
{"name": comp_name} for comp_name in valid_components
]
# Add any remaining custom fields
self._process_additional_fields(fields, issue_data)
if validate_only:
# For validation, just log the issue that would be created
logger.info(
f"Validated issue creation: {project_key} - {summary} ({issue_type})"
)
continue
# Add to bulk creation list
issue_updates.append({"fields": fields})
except Exception as e:
logger.error(f"Failed to prepare issue for creation: {str(e)}")
if not issue_updates:
raise
if validate_only:
return []
try:
# Call Jira's bulk create endpoint
response = self.jira.create_issues(issue_updates)
if not isinstance(response, dict):
msg = f"Unexpected return value type from `jira.create_issues`: {type(response)}"
logger.error(msg)
raise TypeError(msg)
# Process results
created_issues = []
for issue_info in response.get("issues", []):
issue_key = issue_info.get("key")
if issue_key:
try:
# Fetch the full issue data
issue_data = self.jira.get_issue(issue_key)
if not isinstance(issue_data, dict):
msg = f"Unexpected return value type from `jira.get_issue`: {type(issue_data)}"
logger.error(msg)
raise TypeError(msg)
created_issues.append(
JiraIssue.from_api_response(
issue_data,
base_url=self.config.url
if hasattr(self, "config")
else None,
)
)
except Exception as e:
logger.error(
f"Error fetching created issue {issue_key}: {str(e)}"
)
# Log any errors from the bulk creation
errors = response.get("errors", [])
if errors:
for error in errors:
logger.error(f"Bulk creation error: {error}")
return created_issues
except Exception as e:
logger.error(f"Error in bulk issue creation: {str(e)}")
raise
def batch_get_changelogs(
self, issue_ids_or_keys: list[str], fields: list[str] | None = None
) -> list[JiraIssue]:
"""
Get changelogs for multiple issues in a batch. Repeatly fetch data if necessary.
Warning:
This function is only avaiable on Jira Cloud.
Args:
issue_ids_or_keys: List of issue IDs or keys
fields: Filter the changelogs by fields, e.g. ['status', 'assignee']. Default to None for all fields.
Returns:
List of JiraIssue objects that only contain changelogs and id
"""
if not self.config.is_cloud:
error_msg = "Batch get issue changelogs is only available on Jira Cloud."
logger.error(error_msg)
raise NotImplementedError(error_msg)
# Get paged api results
paged_api_results = self.get_paged(
method="post",
url=self.jira.resource_url("changelog/bulkfetch"),
params_or_json={
"fieldIds": fields,
"issueIdsOrKeys": issue_ids_or_keys,
},
)
# Save (issue_id, changelogs)
issue_changelog_results: defaultdict[str, list[JiraChangelog]] = defaultdict(
list
)
for api_result in paged_api_results:
for data in api_result.get("issueChangeLogs", []):
issue_id = data.get("issueId", "")
changelogs = [
JiraChangelog.from_api_response(changelog_data)
for changelog_data in data.get("changeHistories", [])
]
issue_changelog_results[issue_id].extend(changelogs)
issues = [
JiraIssue(id=issue_id, changelogs=changelogs)
for issue_id, changelogs in issue_changelog_results.items()
]
return issues
```