This is page 4 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/unit/jira/test_worklog.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira Worklog mixin."""
from unittest.mock import MagicMock
import pytest
from mcp_atlassian.jira.worklog import WorklogMixin
class TestWorklogMixin:
"""Tests for the WorklogMixin class."""
@pytest.fixture
def worklog_mixin(self, jira_client):
"""Create a WorklogMixin instance with mocked dependencies."""
mixin = WorklogMixin(config=jira_client.config)
mixin.jira = jira_client.jira
# Mock methods that are typically provided by other mixins
mixin._clean_text = MagicMock(side_effect=lambda text: text if text else "")
return mixin
def test_parse_time_spent_with_seconds(self, worklog_mixin):
"""Test parsing time spent with seconds specification."""
assert worklog_mixin._parse_time_spent("60s") == 60
assert worklog_mixin._parse_time_spent("3600s") == 3600
def test_parse_time_spent_with_minutes(self, worklog_mixin):
"""Test parsing time spent with minutes."""
assert worklog_mixin._parse_time_spent("1m") == 60
assert worklog_mixin._parse_time_spent("30m") == 1800
def test_parse_time_spent_with_hours(self, worklog_mixin):
"""Test parsing time spent with hours."""
assert worklog_mixin._parse_time_spent("1h") == 3600
assert worklog_mixin._parse_time_spent("2h") == 7200
def test_parse_time_spent_with_days(self, worklog_mixin):
"""Test parsing time spent with days."""
assert worklog_mixin._parse_time_spent("1d") == 86400
assert worklog_mixin._parse_time_spent("2d") == 172800
def test_parse_time_spent_with_weeks(self, worklog_mixin):
"""Test parsing time spent with weeks."""
assert worklog_mixin._parse_time_spent("1w") == 604800
assert worklog_mixin._parse_time_spent("2w") == 1209600
def test_parse_time_spent_with_mixed_units(self, worklog_mixin):
"""Test parsing time spent with mixed units."""
assert worklog_mixin._parse_time_spent("1h 30m") == 5400
assert worklog_mixin._parse_time_spent("1d 6h") == 108000
assert worklog_mixin._parse_time_spent("1w 2d 3h 4m") == 788640
def test_parse_time_spent_with_invalid_input(self, worklog_mixin):
"""Test parsing time spent with invalid input."""
# Should default to 60 seconds
assert worklog_mixin._parse_time_spent("invalid") == 60
def test_parse_time_spent_with_numeric_input(self, worklog_mixin):
"""Test parsing time spent with numeric input."""
assert worklog_mixin._parse_time_spent("60") == 60
assert worklog_mixin._parse_time_spent("3600") == 3600
def test_get_worklogs_basic(self, worklog_mixin):
"""Test basic functionality of get_worklogs."""
# Setup mock response
mock_result = {
"worklogs": [
{
"id": "10001",
"comment": "Work item 1",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T10:30:00.000+0000",
"started": "2024-01-01T09:00:00.000+0000",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
"author": {"displayName": "Test User"},
}
]
}
worklog_mixin.jira.issue_get_worklog.return_value = mock_result
# Call the method
result = worklog_mixin.get_worklogs("TEST-123")
# Verify
worklog_mixin.jira.issue_get_worklog.assert_called_once_with("TEST-123")
assert len(result) == 1
assert result[0]["id"] == "10001"
assert result[0]["comment"] == "Work item 1"
assert result[0]["timeSpent"] == "1h"
assert result[0]["timeSpentSeconds"] == 3600
assert result[0]["author"] == "Test User"
def test_get_worklogs_with_multiple_entries(self, worklog_mixin):
"""Test get_worklogs with multiple worklog entries."""
# Setup mock response with multiple entries
mock_result = {
"worklogs": [
{
"id": "10001",
"comment": "Work item 1",
"created": "2024-01-01T10:00:00.000+0000",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
"author": {"displayName": "User 1"},
},
{
"id": "10002",
"comment": "Work item 2",
"created": "2024-01-02T10:00:00.000+0000",
"timeSpent": "2h",
"timeSpentSeconds": 7200,
"author": {"displayName": "User 2"},
},
]
}
worklog_mixin.jira.issue_get_worklog.return_value = mock_result
# Call the method
result = worklog_mixin.get_worklogs("TEST-123")
# Verify
assert len(result) == 2
assert result[0]["id"] == "10001"
assert result[1]["id"] == "10002"
assert result[0]["timeSpentSeconds"] == 3600
assert result[1]["timeSpentSeconds"] == 7200
def test_get_worklogs_with_missing_fields(self, worklog_mixin):
"""Test get_worklogs with missing fields."""
# Setup mock response with missing fields
mock_result = {
"worklogs": [
{
"id": "10001",
# Missing comment
"created": "2024-01-01T10:00:00.000+0000",
# Missing other fields
}
]
}
worklog_mixin.jira.issue_get_worklog.return_value = mock_result
# Call the method
result = worklog_mixin.get_worklogs("TEST-123")
# Verify
assert len(result) == 1
assert result[0]["id"] == "10001"
assert result[0]["comment"] == ""
assert result[0]["timeSpent"] == ""
assert result[0]["timeSpentSeconds"] == 0
assert result[0]["author"] == "Unknown"
def test_get_worklogs_with_empty_response(self, worklog_mixin):
"""Test get_worklogs with empty response."""
# Setup mock response with no worklogs
worklog_mixin.jira.issue_get_worklog.return_value = {}
# Call the method
result = worklog_mixin.get_worklogs("TEST-123")
# Verify
assert isinstance(result, list)
assert len(result) == 0
def test_get_worklogs_with_error(self, worklog_mixin):
"""Test get_worklogs error handling."""
# Setup mock to raise exception
worklog_mixin.jira.issue_get_worklog.side_effect = Exception(
"Worklog fetch error"
)
# Call the method and verify exception
with pytest.raises(
Exception, match="Error getting worklogs: Worklog fetch error"
):
worklog_mixin.get_worklogs("TEST-123")
def test_add_worklog_basic(self, worklog_mixin):
"""Test basic functionality of add_worklog."""
# Setup mock response
mock_result = {
"id": "10001",
"comment": "Added work",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T10:00:00.000+0000",
"started": "2024-01-01T09:00:00.000+0000",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
"author": {"displayName": "Test User"},
}
worklog_mixin.jira.post.return_value = mock_result
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Call the method
result = worklog_mixin.add_worklog("TEST-123", "1h", comment="Added work")
# Verify
worklog_mixin.jira.resource_url.assert_called_once_with("issue")
worklog_mixin.jira.post.assert_called_once()
assert result["id"] == "10001"
assert result["comment"] == "Added work"
assert result["timeSpent"] == "1h"
assert result["timeSpentSeconds"] == 3600
assert result["author"] == "Test User"
assert result["original_estimate_updated"] is False
assert result["remaining_estimate_updated"] is False
def test_add_worklog_with_original_estimate(self, worklog_mixin):
"""Test add_worklog with original estimate update."""
# Setup mocks
mock_result = {
"id": "10001",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
}
worklog_mixin.jira.post.return_value = mock_result
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Call the method
result = worklog_mixin.add_worklog("TEST-123", "1h", original_estimate="4h")
# Verify
worklog_mixin.jira.edit_issue.assert_called_once_with(
issue_id_or_key="TEST-123",
fields={"timetracking": {"originalEstimate": "4h"}},
)
assert result["original_estimate_updated"] is True
def test_add_worklog_with_remaining_estimate(self, worklog_mixin):
"""Test add_worklog with remaining estimate update."""
# Setup mocks
mock_result = {
"id": "10001",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
}
worklog_mixin.jira.post.return_value = mock_result
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Call the method
result = worklog_mixin.add_worklog("TEST-123", "1h", remaining_estimate="3h")
# Verify post call has correct parameters
call_args = worklog_mixin.jira.post.call_args
assert call_args is not None
args, kwargs = call_args
# Check that adjustEstimate=new and newEstimate=3h are in params
assert "params" in kwargs
assert kwargs["params"]["adjustEstimate"] == "new"
assert kwargs["params"]["newEstimate"] == "3h"
assert result["remaining_estimate_updated"] is True
def test_add_worklog_with_started_time(self, worklog_mixin):
"""Test add_worklog with started time."""
# Setup mocks
mock_result = {
"id": "10001",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
}
worklog_mixin.jira.post.return_value = mock_result
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Setup started time
started_time = "2024-01-01T09:00:00.000+0000"
# Call the method
worklog_mixin.add_worklog("TEST-123", "1h", started=started_time)
# Verify worklog data contains started time
call_args = worklog_mixin.jira.post.call_args
assert call_args is not None
args, kwargs = call_args
assert "data" in kwargs
assert kwargs["data"]["started"] == started_time
def test_add_worklog_with_markdown_to_jira_available(self, worklog_mixin):
"""Test add_worklog with _markdown_to_jira conversion."""
# Setup mocks
mock_result = {
"id": "10001",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
}
worklog_mixin.jira.post.return_value = mock_result
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Add _markdown_to_jira method
worklog_mixin._markdown_to_jira = MagicMock(return_value="Converted comment")
# Call the method
worklog_mixin.add_worklog("TEST-123", "1h", comment="**Markdown** comment")
# Verify _markdown_to_jira was called
worklog_mixin._markdown_to_jira.assert_called_once_with("**Markdown** comment")
# Verify converted comment was used
call_args = worklog_mixin.jira.post.call_args
assert call_args is not None
args, kwargs = call_args
assert "data" in kwargs
assert kwargs["data"]["comment"] == "Converted comment"
def test_add_worklog_with_error(self, worklog_mixin):
"""Test add_worklog error handling."""
# Setup mock to raise exception
worklog_mixin.jira.post.side_effect = Exception("Worklog add error")
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Call the method and verify exception
with pytest.raises(Exception, match="Error adding worklog: Worklog add error"):
worklog_mixin.add_worklog("TEST-123", "1h")
def test_add_worklog_with_original_estimate_error(self, worklog_mixin):
"""Test add_worklog with original estimate update error."""
# Setup mocks
mock_result = {
"id": "10001",
"timeSpent": "1h",
"timeSpentSeconds": 3600,
}
worklog_mixin.jira.post.return_value = mock_result
worklog_mixin.jira.resource_url.return_value = (
"https://jira.example.com/rest/api/2/issue"
)
# Make edit_issue raise an exception
worklog_mixin.jira.edit_issue.side_effect = Exception("Estimate update error")
# Call the method - should continue despite estimate update error
result = worklog_mixin.add_worklog("TEST-123", "1h", original_estimate="4h")
# Verify post was still called (worklog added despite estimate error)
worklog_mixin.jira.post.assert_called_once()
assert result["original_estimate_updated"] is False
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/__init__.py:
--------------------------------------------------------------------------------
```python
import asyncio
import logging
import os
import sys
from importlib.metadata import PackageNotFoundError, version
import click
from dotenv import load_dotenv
from mcp_atlassian.utils.env import is_env_truthy
from mcp_atlassian.utils.lifecycle import (
ensure_clean_exit,
setup_signal_handlers,
)
from mcp_atlassian.utils.logging import setup_logging
try:
__version__ = version("mcp-atlassian")
except PackageNotFoundError:
# package is not installed
__version__ = "0.0.0"
# Initialize logging with appropriate level
logging_level = logging.WARNING
if is_env_truthy("MCP_VERBOSE"):
logging_level = logging.DEBUG
# Set up logging to STDOUT if MCP_LOGGING_STDOUT is set to true
logging_stream = sys.stdout if is_env_truthy("MCP_LOGGING_STDOUT") else sys.stderr
# Set up logging using the utility function
logger = setup_logging(logging_level, logging_stream)
@click.version_option(__version__, prog_name="mcp-atlassian")
@click.command()
@click.option(
"-v",
"--verbose",
count=True,
help="Increase verbosity (can be used multiple times)",
)
@click.option(
"--env-file", type=click.Path(exists=True, dir_okay=False), help="Path to .env file"
)
@click.option(
"--oauth-setup",
is_flag=True,
help="Run OAuth 2.0 setup wizard for Atlassian Cloud",
)
@click.option(
"--transport",
type=click.Choice(["stdio", "sse", "streamable-http"]),
default="stdio",
help="Transport type (stdio, sse, or streamable-http)",
)
@click.option(
"--port",
default=8000,
help="Port to listen on for SSE or Streamable HTTP transport",
)
@click.option(
"--host",
default="0.0.0.0", # noqa: S104
help="Host to bind to for SSE or Streamable HTTP transport (default: 0.0.0.0)",
)
@click.option(
"--path",
default="/mcp",
help="Path for Streamable HTTP transport (e.g., /mcp).",
)
@click.option(
"--confluence-url",
help="Confluence URL (e.g., https://your-domain.atlassian.net/wiki)",
)
@click.option("--confluence-username", help="Confluence username/email")
@click.option("--confluence-token", help="Confluence API token")
@click.option(
"--confluence-personal-token",
help="Confluence Personal Access Token (for Confluence Server/Data Center)",
)
@click.option(
"--confluence-ssl-verify/--no-confluence-ssl-verify",
default=True,
help="Verify SSL certificates for Confluence Server/Data Center (default: verify)",
)
@click.option(
"--confluence-spaces-filter",
help="Comma-separated list of Confluence space keys to filter search results",
)
@click.option(
"--jira-url",
help="Jira URL (e.g., https://your-domain.atlassian.net or https://jira.your-company.com)",
)
@click.option("--jira-username", help="Jira username/email (for Jira Cloud)")
@click.option("--jira-token", help="Jira API token (for Jira Cloud)")
@click.option(
"--jira-personal-token",
help="Jira Personal Access Token (for Jira Server/Data Center)",
)
@click.option(
"--jira-ssl-verify/--no-jira-ssl-verify",
default=True,
help="Verify SSL certificates for Jira Server/Data Center (default: verify)",
)
@click.option(
"--jira-projects-filter",
help="Comma-separated list of Jira project keys to filter search results",
)
@click.option(
"--read-only",
is_flag=True,
help="Run in read-only mode (disables all write operations)",
)
@click.option(
"--enabled-tools",
help="Comma-separated list of tools to enable (enables all if not specified)",
)
@click.option(
"--oauth-client-id",
help="OAuth 2.0 client ID for Atlassian Cloud",
)
@click.option(
"--oauth-client-secret",
help="OAuth 2.0 client secret for Atlassian Cloud",
)
@click.option(
"--oauth-redirect-uri",
help="OAuth 2.0 redirect URI for Atlassian Cloud",
)
@click.option(
"--oauth-scope",
help="OAuth 2.0 scopes (space-separated) for Atlassian Cloud",
)
@click.option(
"--oauth-cloud-id",
help="Atlassian Cloud ID for OAuth 2.0 authentication",
)
@click.option(
"--oauth-access-token",
help="Atlassian Cloud OAuth 2.0 access token (if you have your own you'd like to "
"use for the session.)",
)
def main(
verbose: int,
env_file: str | None,
oauth_setup: bool,
transport: str,
port: int,
host: str,
path: str | None,
confluence_url: str | None,
confluence_username: str | None,
confluence_token: str | None,
confluence_personal_token: str | None,
confluence_ssl_verify: bool,
confluence_spaces_filter: str | None,
jira_url: str | None,
jira_username: str | None,
jira_token: str | None,
jira_personal_token: str | None,
jira_ssl_verify: bool,
jira_projects_filter: str | None,
read_only: bool,
enabled_tools: str | None,
oauth_client_id: str | None,
oauth_client_secret: str | None,
oauth_redirect_uri: str | None,
oauth_scope: str | None,
oauth_cloud_id: str | None,
oauth_access_token: str | None,
) -> None:
"""MCP Atlassian Server - Jira and Confluence functionality for MCP
Supports both Atlassian Cloud and Jira Server/Data Center deployments.
Authentication methods supported:
- Username and API token (Cloud)
- Personal Access Token (Server/Data Center)
- OAuth 2.0 (Cloud only)
"""
# Logging level logic
if verbose == 1:
current_logging_level = logging.INFO
elif verbose >= 2: # -vv or more
current_logging_level = logging.DEBUG
else:
# Default to DEBUG if MCP_VERY_VERBOSE is set, else INFO if MCP_VERBOSE is set, else WARNING
if is_env_truthy("MCP_VERY_VERBOSE", "false"):
current_logging_level = logging.DEBUG
elif is_env_truthy("MCP_VERBOSE", "false"):
current_logging_level = logging.INFO
else:
current_logging_level = logging.WARNING
# Set up logging to STDOUT if MCP_LOGGING_STDOUT is set to true
logging_stream = sys.stdout if is_env_truthy("MCP_LOGGING_STDOUT") else sys.stderr
global logger
logger = setup_logging(current_logging_level, logging_stream)
logger.debug(f"Logging level set to: {logging.getLevelName(current_logging_level)}")
logger.debug(
f"Logging stream set to: {'stdout' if logging_stream is sys.stdout else 'stderr'}"
)
def was_option_provided(ctx: click.Context, param_name: str) -> bool:
return (
ctx.get_parameter_source(param_name)
!= click.core.ParameterSource.DEFAULT_MAP
and ctx.get_parameter_source(param_name)
!= click.core.ParameterSource.DEFAULT
)
if env_file:
logger.debug(f"Loading environment from file: {env_file}")
load_dotenv(env_file, override=True)
else:
logger.debug(
"Attempting to load environment from default .env file if it exists"
)
load_dotenv(override=True)
if oauth_setup:
logger.info("Starting OAuth 2.0 setup wizard")
try:
from .utils.oauth_setup import run_oauth_setup
sys.exit(run_oauth_setup())
except ImportError:
logger.error("Failed to import OAuth setup module.")
sys.exit(1)
click_ctx = click.get_current_context(silent=True)
# Transport precedence
final_transport = os.getenv("TRANSPORT", "stdio").lower()
if click_ctx and was_option_provided(click_ctx, "transport"):
final_transport = transport
if final_transport not in ["stdio", "sse", "streamable-http"]:
logger.warning(
f"Invalid transport '{final_transport}' from env/default, using 'stdio'."
)
final_transport = "stdio"
logger.debug(f"Final transport determined: {final_transport}")
# Port precedence
final_port = 8000
if os.getenv("PORT") and os.getenv("PORT").isdigit():
final_port = int(os.getenv("PORT"))
if click_ctx and was_option_provided(click_ctx, "port"):
final_port = port
logger.debug(f"Final port for HTTP transports: {final_port}")
# Host precedence
final_host = os.getenv("HOST", "0.0.0.0") # noqa: S104
if click_ctx and was_option_provided(click_ctx, "host"):
final_host = host
logger.debug(f"Final host for HTTP transports: {final_host}")
# Path precedence
final_path: str | None = os.getenv("STREAMABLE_HTTP_PATH", None)
if click_ctx and was_option_provided(click_ctx, "path"):
final_path = path
logger.debug(
f"Final path for Streamable HTTP: {final_path if final_path else 'FastMCP default'}"
)
# Set env vars for downstream config
if click_ctx and was_option_provided(click_ctx, "enabled_tools"):
os.environ["ENABLED_TOOLS"] = enabled_tools
if click_ctx and was_option_provided(click_ctx, "confluence_url"):
os.environ["CONFLUENCE_URL"] = confluence_url
if click_ctx and was_option_provided(click_ctx, "confluence_username"):
os.environ["CONFLUENCE_USERNAME"] = confluence_username
if click_ctx and was_option_provided(click_ctx, "confluence_token"):
os.environ["CONFLUENCE_API_TOKEN"] = confluence_token
if click_ctx and was_option_provided(click_ctx, "confluence_personal_token"):
os.environ["CONFLUENCE_PERSONAL_TOKEN"] = confluence_personal_token
if click_ctx and was_option_provided(click_ctx, "jira_url"):
os.environ["JIRA_URL"] = jira_url
if click_ctx and was_option_provided(click_ctx, "jira_username"):
os.environ["JIRA_USERNAME"] = jira_username
if click_ctx and was_option_provided(click_ctx, "jira_token"):
os.environ["JIRA_API_TOKEN"] = jira_token
if click_ctx and was_option_provided(click_ctx, "jira_personal_token"):
os.environ["JIRA_PERSONAL_TOKEN"] = jira_personal_token
if click_ctx and was_option_provided(click_ctx, "oauth_client_id"):
os.environ["ATLASSIAN_OAUTH_CLIENT_ID"] = oauth_client_id
if click_ctx and was_option_provided(click_ctx, "oauth_client_secret"):
os.environ["ATLASSIAN_OAUTH_CLIENT_SECRET"] = oauth_client_secret
if click_ctx and was_option_provided(click_ctx, "oauth_redirect_uri"):
os.environ["ATLASSIAN_OAUTH_REDIRECT_URI"] = oauth_redirect_uri
if click_ctx and was_option_provided(click_ctx, "oauth_scope"):
os.environ["ATLASSIAN_OAUTH_SCOPE"] = oauth_scope
if click_ctx and was_option_provided(click_ctx, "oauth_cloud_id"):
os.environ["ATLASSIAN_OAUTH_CLOUD_ID"] = oauth_cloud_id
if click_ctx and was_option_provided(click_ctx, "oauth_access_token"):
os.environ["ATLASSIAN_OAUTH_ACCESS_TOKEN"] = oauth_access_token
if click_ctx and was_option_provided(click_ctx, "read_only"):
os.environ["READ_ONLY_MODE"] = str(read_only).lower()
if click_ctx and was_option_provided(click_ctx, "confluence_ssl_verify"):
os.environ["CONFLUENCE_SSL_VERIFY"] = str(confluence_ssl_verify).lower()
if click_ctx and was_option_provided(click_ctx, "confluence_spaces_filter"):
os.environ["CONFLUENCE_SPACES_FILTER"] = confluence_spaces_filter
if click_ctx and was_option_provided(click_ctx, "jira_ssl_verify"):
os.environ["JIRA_SSL_VERIFY"] = str(jira_ssl_verify).lower()
if click_ctx and was_option_provided(click_ctx, "jira_projects_filter"):
os.environ["JIRA_PROJECTS_FILTER"] = jira_projects_filter
from mcp_atlassian.servers import main_mcp
run_kwargs = {
"transport": final_transport,
}
if final_transport == "stdio":
logger.info("Starting server with STDIO transport.")
elif final_transport in ["sse", "streamable-http"]:
run_kwargs["host"] = final_host
run_kwargs["port"] = final_port
run_kwargs["log_level"] = logging.getLevelName(current_logging_level).lower()
if final_path is not None:
run_kwargs["path"] = final_path
log_display_path = final_path
if log_display_path is None:
if final_transport == "sse":
log_display_path = main_mcp.settings.sse_path or "/sse"
else:
log_display_path = main_mcp.settings.streamable_http_path or "/mcp"
logger.info(
f"Starting server with {final_transport.upper()} transport on http://{final_host}:{final_port}{log_display_path}"
)
else:
logger.error(
f"Invalid transport type '{final_transport}' determined. Cannot start server."
)
sys.exit(1)
# Set up signal handlers for graceful shutdown
setup_signal_handlers()
# For STDIO transport, also handle EOF detection
if final_transport == "stdio":
logger.debug("STDIO transport detected, setting up stdin monitoring")
try:
logger.debug("Starting asyncio event loop...")
# For stdio transport, don't monitor stdin as MCP server handles it internally
# This prevents race conditions where both try to read from the same stdin
if final_transport == "stdio":
asyncio.run(main_mcp.run_async(**run_kwargs))
else:
# For HTTP transports (SSE, streamable-http), don't use stdin monitoring
# as it causes premature shutdown when the client closes stdin
# The server should only rely on OS signals for shutdown
logger.debug(
f"Running server for {final_transport} transport without stdin monitoring"
)
asyncio.run(main_mcp.run_async(**run_kwargs))
except (KeyboardInterrupt, SystemExit) as e:
logger.info(f"Server shutdown initiated: {type(e).__name__}")
except Exception as e:
logger.error(f"Server encountered an error: {e}", exc_info=True)
sys.exit(1)
finally:
ensure_clean_exit()
__all__ = ["main", "__version__"]
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_formatting.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira FormattingMixin."""
from unittest.mock import MagicMock, patch
import pytest
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.formatting import FormattingMixin
from mcp_atlassian.preprocessing import JiraPreprocessor
@pytest.fixture
def formatting_mixin(jira_fetcher: JiraFetcher) -> FormattingMixin:
"""Fixture to create a FormattingMixin instance for testing."""
# Create the mixin without calling its __init__ to avoid config dependencies
mixin = jira_fetcher
# Set up necessary mocks
mixin.preprocessor = MagicMock(spec=JiraPreprocessor)
return mixin
def test_markdown_to_jira(formatting_mixin):
"""Test markdown_to_jira method with valid input."""
formatting_mixin.preprocessor.markdown_to_jira.return_value = "Converted text"
result = formatting_mixin.markdown_to_jira("# Markdown text")
assert result == "Converted text"
formatting_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
"# Markdown text"
)
def test_markdown_to_jira_empty_input(formatting_mixin):
"""Test markdown_to_jira method with empty input."""
result = formatting_mixin.markdown_to_jira("")
assert result == ""
formatting_mixin.preprocessor.markdown_to_jira.assert_not_called()
def test_markdown_to_jira_exception(formatting_mixin):
"""Test markdown_to_jira method with exception."""
formatting_mixin.preprocessor.markdown_to_jira.side_effect = Exception(
"Conversion error"
)
result = formatting_mixin.markdown_to_jira("# Markdown text")
assert result == "# Markdown text" # Should return original text on error
formatting_mixin.preprocessor.markdown_to_jira.assert_called_once()
def test_format_issue_content_basic(formatting_mixin):
"""Test format_issue_content method with basic inputs."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
}
}
description = "This is a test issue."
comments = []
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": None, "epic_name": None}
result = formatting_mixin.format_issue_content(
issue_key, issue, description, comments, created_date, epic_info
)
# Check that the result contains all the basic information
assert "Issue: TEST-123" in result
assert "Title: Test issue" in result
assert "Type: Bug" in result
assert "Status: Open" in result
assert "Created: 2023-01-01 12:00:00" in result
assert "Description:" in result
assert "This is a test issue." in result
assert "Comments:" not in result # No comments
def test_format_issue_content_with_epic(formatting_mixin):
"""Test format_issue_content method with epic information."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
}
}
description = "This is a test issue."
comments = []
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
result = formatting_mixin.format_issue_content(
issue_key, issue, description, comments, created_date, epic_info
)
# Check that the result contains the epic information
assert "Epic: EPIC-1 - Test Epic" in result
def test_format_issue_content_with_comments(formatting_mixin):
"""Test format_issue_content method with comments."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
}
}
description = "This is a test issue."
comments = [
{"created": "2023-01-02", "author": "User1", "body": "Comment 1"},
{"created": "2023-01-03", "author": "User2", "body": "Comment 2"},
]
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": None, "epic_name": None}
result = formatting_mixin.format_issue_content(
issue_key, issue, description, comments, created_date, epic_info
)
# Check that the result contains the comments
assert "Comments:" in result
assert "2023-01-02 - User1: Comment 1" in result
assert "2023-01-03 - User2: Comment 2" in result
def test_create_issue_metadata_basic(formatting_mixin):
"""Test create_issue_metadata method with basic inputs."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"project": {"key": "TEST", "name": "Test Project"},
}
}
comments = []
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": None, "epic_name": None}
result = formatting_mixin.create_issue_metadata(
issue_key, issue, comments, created_date, epic_info
)
# Check that the result contains all the basic metadata
assert result["key"] == "TEST-123"
assert result["summary"] == "Test issue"
assert result["type"] == "Bug"
assert result["status"] == "Open"
assert result["created"] == "2023-01-01 12:00:00"
assert result["source"] == "jira"
assert result["project"] == "TEST"
assert result["project_name"] == "Test Project"
assert result["comment_count"] == 0
assert "epic_key" not in result
assert "epic_name" not in result
def test_create_issue_metadata_with_assignee_and_reporter(formatting_mixin):
"""Test create_issue_metadata method with assignee and reporter."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"assignee": {"displayName": "John Doe", "name": "jdoe"},
"reporter": {"displayName": "Jane Smith", "name": "jsmith"},
"project": {"key": "TEST", "name": "Test Project"},
}
}
comments = []
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": None, "epic_name": None}
result = formatting_mixin.create_issue_metadata(
issue_key, issue, comments, created_date, epic_info
)
# Check that the result contains assignee and reporter
assert result["assignee"] == "John Doe"
assert result["reporter"] == "Jane Smith"
def test_create_issue_metadata_with_priority(formatting_mixin):
"""Test create_issue_metadata method with priority."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"priority": {"name": "High"},
"project": {"key": "TEST", "name": "Test Project"},
}
}
comments = []
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": None, "epic_name": None}
result = formatting_mixin.create_issue_metadata(
issue_key, issue, comments, created_date, epic_info
)
# Check that the result contains priority
assert result["priority"] == "High"
def test_create_issue_metadata_with_epic(formatting_mixin):
"""Test create_issue_metadata method with epic information."""
issue_key = "TEST-123"
issue = {
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"project": {"key": "TEST", "name": "Test Project"},
}
}
comments = []
created_date = "2023-01-01 12:00:00"
epic_info = {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
result = formatting_mixin.create_issue_metadata(
issue_key, issue, comments, created_date, epic_info
)
# Check that the result contains epic information
assert result["epic_key"] == "EPIC-1"
assert result["epic_name"] == "Test Epic"
def test_extract_epic_information_no_fields(formatting_mixin):
"""Test extract_epic_information method with issue having no fields."""
issue = {}
result = formatting_mixin.extract_epic_information(issue)
assert result == {"epic_key": None, "epic_name": None}
def test_extract_epic_information_with_field_ids(formatting_mixin):
"""Test extract_epic_information method with field IDs available."""
issue = {"fields": {"customfield_10001": "EPIC-1"}}
# Mock get_field_ids_to_epic method
field_ids = {"Epic Link": "customfield_10001", "Epic Name": "customfield_10002"}
formatting_mixin.get_field_ids_to_epic = MagicMock(return_value=field_ids)
# Mock get_issue method
epic_issue = {"fields": {"customfield_10002": "Test Epic"}}
formatting_mixin.get_issue = MagicMock(return_value=epic_issue)
result = formatting_mixin.extract_epic_information(issue)
assert result == {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
formatting_mixin.get_field_ids_to_epic.assert_called_once()
formatting_mixin.get_issue.assert_called_once_with("EPIC-1")
def test_extract_epic_information_get_issue_exception(formatting_mixin):
"""Test extract_epic_information method with get_issue exception."""
issue = {"fields": {"customfield_10001": "EPIC-1"}}
# Mock get_field_ids_to_epic method
field_ids = {"Epic Link": "customfield_10001", "Epic Name": "customfield_10002"}
formatting_mixin.get_field_ids_to_epic = MagicMock(return_value=field_ids)
# Mock get_issue method to raise exception
formatting_mixin.get_issue = MagicMock(side_effect=Exception("API error"))
result = formatting_mixin.extract_epic_information(issue)
assert result == {"epic_key": "EPIC-1", "epic_name": None}
formatting_mixin.get_field_ids_to_epic.assert_called_once()
formatting_mixin.get_issue.assert_called_once()
def test_sanitize_html_valid(formatting_mixin):
"""Test sanitize_html method with valid HTML."""
html_content = "<p>This is <b>bold</b> text.</p>"
result = formatting_mixin.sanitize_html(html_content)
assert result == "This is bold text."
def test_sanitize_html_with_entities(formatting_mixin):
"""Test sanitize_html method with HTML entities."""
html_content = "<p>This & that</p>"
result = formatting_mixin.sanitize_html(html_content)
assert result == "This & that"
def test_sanitize_html_empty(formatting_mixin):
"""Test sanitize_html method with empty input."""
result = formatting_mixin.sanitize_html("")
assert result == ""
def test_sanitize_html_exception(formatting_mixin):
"""Test sanitize_html method with exception."""
# Mock re.sub to raise exception
with patch("re.sub", side_effect=Exception("Regex error")):
result = formatting_mixin.sanitize_html("<p>Test</p>")
assert result == "<p>Test</p>" # Should return original on error
def test_sanitize_transition_fields_basic(formatting_mixin):
"""Test sanitize_transition_fields method with basic fields."""
fields = {"summary": "Test issue", "description": "This is a test issue."}
result = formatting_mixin.sanitize_transition_fields(fields)
assert result == fields
def test_sanitize_transition_fields_with_assignee(formatting_mixin):
"""Test sanitize_transition_fields method with assignee field."""
fields = {"summary": "Test issue", "assignee": "jdoe"}
# Mock _get_account_id method
formatting_mixin._get_account_id = MagicMock(return_value="account-123")
result = formatting_mixin.sanitize_transition_fields(fields)
assert result["summary"] == "Test issue"
assert result["assignee"] == {"accountId": "account-123"}
formatting_mixin._get_account_id.assert_called_once_with("jdoe")
def test_sanitize_transition_fields_with_assignee_dict(formatting_mixin):
"""Test sanitize_transition_fields method with assignee as dictionary."""
fields = {"summary": "Test issue", "assignee": {"accountId": "account-123"}}
# Mock _get_account_id method
formatting_mixin._get_account_id = MagicMock()
result = formatting_mixin.sanitize_transition_fields(fields)
assert result["summary"] == "Test issue"
assert result["assignee"] == {"accountId": "account-123"}
formatting_mixin._get_account_id.assert_not_called()
def test_sanitize_transition_fields_with_reporter(formatting_mixin):
"""Test sanitize_transition_fields method with reporter field."""
fields = {"summary": "Test issue", "reporter": "jsmith"}
# Mock _get_account_id method
formatting_mixin._get_account_id = MagicMock(return_value="account-456")
result = formatting_mixin.sanitize_transition_fields(fields)
assert result["summary"] == "Test issue"
assert result["reporter"] == {"accountId": "account-456"}
formatting_mixin._get_account_id.assert_called_once_with("jsmith")
def test_sanitize_transition_fields_with_none_value(formatting_mixin):
"""Test sanitize_transition_fields method with None value."""
fields = {"summary": "Test issue", "assignee": None}
result = formatting_mixin.sanitize_transition_fields(fields)
assert result == {"summary": "Test issue"}
def test_add_comment_to_transition_data_with_comment(formatting_mixin):
"""Test add_comment_to_transition_data method with comment."""
transition_data = {"transition": {"id": "10"}}
comment = "This is a comment"
# Mock markdown_to_jira method
formatting_mixin.markdown_to_jira = MagicMock(return_value="Converted comment")
result = formatting_mixin.add_comment_to_transition_data(transition_data, comment)
assert result["transition"] == {"id": "10"}
assert result["update"]["comment"][0]["add"]["body"] == "Converted comment"
formatting_mixin.markdown_to_jira.assert_called_once_with("This is a comment")
def test_add_comment_to_transition_data_without_comment(formatting_mixin):
"""Test add_comment_to_transition_data method without comment."""
transition_data = {"transition": {"id": "10"}}
result = formatting_mixin.add_comment_to_transition_data(transition_data, None)
assert result == transition_data # Should return unmodified data
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/main.py:
--------------------------------------------------------------------------------
```python
"""Main FastMCP server setup for Atlassian integration."""
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any, Literal, Optional
from cachetools import TTLCache
from fastmcp import FastMCP
from fastmcp.tools import Tool as FastMCPTool
from mcp.types import Tool as MCPTool
from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse
from mcp_atlassian.confluence import ConfluenceFetcher
from mcp_atlassian.confluence.config import ConfluenceConfig
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.environment import get_available_services
from mcp_atlassian.utils.io import is_read_only_mode
from mcp_atlassian.utils.logging import mask_sensitive
from mcp_atlassian.utils.tools import get_enabled_tools, should_include_tool
from .confluence import confluence_mcp
from .context import MainAppContext
from .jira import jira_mcp
logger = logging.getLogger("mcp-atlassian.server.main")
async def health_check(request: Request) -> JSONResponse:
return JSONResponse({"status": "ok"})
@asynccontextmanager
async def main_lifespan(app: FastMCP[MainAppContext]) -> AsyncIterator[dict]:
logger.info("Main Atlassian MCP server lifespan starting...")
services = get_available_services()
read_only = is_read_only_mode()
enabled_tools = get_enabled_tools()
loaded_jira_config: JiraConfig | None = None
loaded_confluence_config: ConfluenceConfig | None = None
if services.get("jira"):
try:
jira_config = JiraConfig.from_env()
if jira_config.is_auth_configured():
loaded_jira_config = jira_config
logger.info(
"Jira configuration loaded and authentication is configured."
)
else:
logger.warning(
"Jira URL found, but authentication is not fully configured. Jira tools will be unavailable."
)
except Exception as e:
logger.error(f"Failed to load Jira configuration: {e}", exc_info=True)
if services.get("confluence"):
try:
confluence_config = ConfluenceConfig.from_env()
if confluence_config.is_auth_configured():
loaded_confluence_config = confluence_config
logger.info(
"Confluence configuration loaded and authentication is configured."
)
else:
logger.warning(
"Confluence URL found, but authentication is not fully configured. Confluence tools will be unavailable."
)
except Exception as e:
logger.error(f"Failed to load Confluence configuration: {e}", exc_info=True)
app_context = MainAppContext(
full_jira_config=loaded_jira_config,
full_confluence_config=loaded_confluence_config,
read_only=read_only,
enabled_tools=enabled_tools,
)
logger.info(f"Read-only mode: {'ENABLED' if read_only else 'DISABLED'}")
logger.info(f"Enabled tools filter: {enabled_tools or 'All tools enabled'}")
try:
yield {"app_lifespan_context": app_context}
except Exception as e:
logger.error(f"Error during lifespan: {e}", exc_info=True)
raise
finally:
logger.info("Main Atlassian MCP server lifespan shutting down...")
# Perform any necessary cleanup here
try:
# Close any open connections if needed
if loaded_jira_config:
logger.debug("Cleaning up Jira resources...")
if loaded_confluence_config:
logger.debug("Cleaning up Confluence resources...")
except Exception as e:
logger.error(f"Error during cleanup: {e}", exc_info=True)
logger.info("Main Atlassian MCP server lifespan shutdown complete.")
class AtlassianMCP(FastMCP[MainAppContext]):
"""Custom FastMCP server class for Atlassian integration with tool filtering."""
async def _mcp_list_tools(self) -> list[MCPTool]:
# Filter tools based on enabled_tools, read_only mode, and service configuration from the lifespan context.
req_context = self._mcp_server.request_context
if req_context is None or req_context.lifespan_context is None:
logger.warning(
"Lifespan context not available during _main_mcp_list_tools call."
)
return []
lifespan_ctx_dict = req_context.lifespan_context
app_lifespan_state: MainAppContext | None = (
lifespan_ctx_dict.get("app_lifespan_context")
if isinstance(lifespan_ctx_dict, dict)
else None
)
read_only = (
getattr(app_lifespan_state, "read_only", False)
if app_lifespan_state
else False
)
enabled_tools_filter = (
getattr(app_lifespan_state, "enabled_tools", None)
if app_lifespan_state
else None
)
logger.debug(
f"_main_mcp_list_tools: read_only={read_only}, enabled_tools_filter={enabled_tools_filter}"
)
all_tools: dict[str, FastMCPTool] = await self.get_tools()
logger.debug(
f"Aggregated {len(all_tools)} tools before filtering: {list(all_tools.keys())}"
)
filtered_tools: list[MCPTool] = []
for registered_name, tool_obj in all_tools.items():
tool_tags = tool_obj.tags
if not should_include_tool(registered_name, enabled_tools_filter):
logger.debug(f"Excluding tool '{registered_name}' (not enabled)")
continue
if tool_obj and read_only and "write" in tool_tags:
logger.debug(
f"Excluding tool '{registered_name}' due to read-only mode and 'write' tag"
)
continue
# Exclude Jira/Confluence tools if config is not fully authenticated
is_jira_tool = "jira" in tool_tags
is_confluence_tool = "confluence" in tool_tags
service_configured_and_available = True
if app_lifespan_state:
if is_jira_tool and not app_lifespan_state.full_jira_config:
logger.debug(
f"Excluding Jira tool '{registered_name}' as Jira configuration/authentication is incomplete."
)
service_configured_and_available = False
if is_confluence_tool and not app_lifespan_state.full_confluence_config:
logger.debug(
f"Excluding Confluence tool '{registered_name}' as Confluence configuration/authentication is incomplete."
)
service_configured_and_available = False
elif is_jira_tool or is_confluence_tool:
logger.warning(
f"Excluding tool '{registered_name}' as application context is unavailable to verify service configuration."
)
service_configured_and_available = False
if not service_configured_and_available:
continue
filtered_tools.append(tool_obj.to_mcp_tool(name=registered_name))
logger.debug(
f"_main_mcp_list_tools: Total tools after filtering: {len(filtered_tools)}"
)
return filtered_tools
def http_app(
self,
path: str | None = None,
middleware: list[Middleware] | None = None,
transport: Literal["streamable-http", "sse"] = "streamable-http",
) -> "Starlette":
user_token_mw = Middleware(UserTokenMiddleware, mcp_server_ref=self)
final_middleware_list = [user_token_mw]
if middleware:
final_middleware_list.extend(middleware)
app = super().http_app(
path=path, middleware=final_middleware_list, transport=transport
)
return app
token_validation_cache: TTLCache[
int, tuple[bool, str | None, JiraFetcher | None, ConfluenceFetcher | None]
] = TTLCache(maxsize=100, ttl=300)
class UserTokenMiddleware(BaseHTTPMiddleware):
"""Middleware to extract Atlassian user tokens/credentials from Authorization headers."""
def __init__(
self, app: Any, mcp_server_ref: Optional["AtlassianMCP"] = None
) -> None:
super().__init__(app)
self.mcp_server_ref = mcp_server_ref
if not self.mcp_server_ref:
logger.warning(
"UserTokenMiddleware initialized without mcp_server_ref. Path matching for MCP endpoint might fail if settings are needed."
)
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> JSONResponse:
logger.debug(
f"UserTokenMiddleware.dispatch: ENTERED for request path='{request.url.path}', method='{request.method}'"
)
mcp_server_instance = self.mcp_server_ref
if mcp_server_instance is None:
logger.debug(
"UserTokenMiddleware.dispatch: self.mcp_server_ref is None. Skipping MCP auth logic."
)
return await call_next(request)
mcp_path = mcp_server_instance.settings.streamable_http_path.rstrip("/")
request_path = request.url.path.rstrip("/")
logger.debug(
f"UserTokenMiddleware.dispatch: Comparing request_path='{request_path}' with mcp_path='{mcp_path}'. Request method='{request.method}'"
)
if request_path == mcp_path and request.method == "POST":
auth_header = request.headers.get("Authorization")
cloud_id_header = request.headers.get("X-Atlassian-Cloud-Id")
token_for_log = mask_sensitive(
auth_header.split(" ", 1)[1].strip()
if auth_header and " " in auth_header
else auth_header
)
logger.debug(
f"UserTokenMiddleware: Path='{request.url.path}', AuthHeader='{mask_sensitive(auth_header)}', ParsedToken(masked)='{token_for_log}', CloudId='{cloud_id_header}'"
)
# Extract and save cloudId if provided
if cloud_id_header and cloud_id_header.strip():
request.state.user_atlassian_cloud_id = cloud_id_header.strip()
logger.debug(
f"UserTokenMiddleware: Extracted cloudId from header: {cloud_id_header.strip()}"
)
else:
request.state.user_atlassian_cloud_id = None
logger.debug(
"UserTokenMiddleware: No cloudId header provided, will use global config"
)
# Check for mcp-session-id header for debugging
mcp_session_id = request.headers.get("mcp-session-id")
if mcp_session_id:
logger.debug(
f"UserTokenMiddleware: MCP-Session-ID header found: {mcp_session_id}"
)
if auth_header and auth_header.startswith("Bearer "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
return JSONResponse(
{"error": "Unauthorized: Empty Bearer token"},
status_code=401,
)
logger.debug(
f"UserTokenMiddleware.dispatch: Bearer token extracted (masked): ...{mask_sensitive(token, 8)}"
)
request.state.user_atlassian_token = token
request.state.user_atlassian_auth_type = "oauth"
request.state.user_atlassian_email = None
logger.debug(
f"UserTokenMiddleware.dispatch: Set request.state (pre-validation): "
f"auth_type='{getattr(request.state, 'user_atlassian_auth_type', 'N/A')}', "
f"token_present={bool(getattr(request.state, 'user_atlassian_token', None))}"
)
elif auth_header and auth_header.startswith("Token "):
token = auth_header.split(" ", 1)[1].strip()
if not token:
return JSONResponse(
{"error": "Unauthorized: Empty Token (PAT)"},
status_code=401,
)
logger.debug(
f"UserTokenMiddleware.dispatch: PAT (Token scheme) extracted (masked): ...{mask_sensitive(token, 8)}"
)
request.state.user_atlassian_token = token
request.state.user_atlassian_auth_type = "pat"
request.state.user_atlassian_email = (
None # PATs don't carry email in the token itself
)
logger.debug(
"UserTokenMiddleware.dispatch: Set request.state for PAT auth."
)
elif auth_header:
logger.warning(
f"Unsupported Authorization type for {request.url.path}: {auth_header.split(' ', 1)[0] if ' ' in auth_header else 'UnknownType'}"
)
return JSONResponse(
{
"error": "Unauthorized: Only 'Bearer <OAuthToken>' or 'Token <PAT>' types are supported."
},
status_code=401,
)
else:
logger.debug(
f"No Authorization header provided for {request.url.path}. Will proceed with global/fallback server configuration if applicable."
)
response = await call_next(request)
logger.debug(
f"UserTokenMiddleware.dispatch: EXITED for request path='{request.url.path}'"
)
return response
main_mcp = AtlassianMCP(name="Atlassian MCP", lifespan=main_lifespan)
main_mcp.mount("jira", jira_mcp)
main_mcp.mount("confluence", confluence_mcp)
@main_mcp.custom_route("/healthz", methods=["GET"], include_in_schema=False)
async def _health_check_route(request: Request) -> JSONResponse:
return await health_check(request)
logger.info("Added /healthz endpoint for Kubernetes probes")
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/users.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira user operations."""
import logging
import re
from typing import TYPE_CHECKING, TypeVar
import requests
from requests.exceptions import HTTPError
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.models.jira.common import JiraUser
from .client import JiraClient
if TYPE_CHECKING:
from mcp_atlassian.models.jira.common import JiraUser
JiraUserType = TypeVar("JiraUserType", bound="JiraUser")
logger = logging.getLogger("mcp-jira")
class UsersMixin(JiraClient):
"""Mixin for Jira user operations."""
def get_current_user_account_id(self) -> str:
"""
Get the account ID of the current user.
Returns:
str: Account ID of the current user.
Raises:
Exception: If unable to get the current user's account ID.
"""
if getattr(self, "_current_user_account_id", None) is not None:
return self._current_user_account_id
try:
logger.debug(
"Calling self.jira.myself() to get current user details for account ID."
)
myself_data = self.jira.myself()
if not isinstance(myself_data, dict):
error_msg = "Failed to get user data: response was not a dictionary."
logger.error(
f"{error_msg} Response type: {type(myself_data)}, Response: {str(myself_data)[:200]}"
)
raise Exception(error_msg)
logger.debug(f"Received myself_data: {str(myself_data)[:500]}")
account_id = None
if isinstance(myself_data.get("accountId"), str):
account_id = myself_data["accountId"]
elif isinstance(myself_data.get("key"), str):
logger.info(
"Using 'key' instead of 'accountId' for Jira Data Center/Server"
)
account_id = myself_data["key"]
elif isinstance(myself_data.get("name"), str):
logger.info(
"Using 'name' instead of 'accountId' for Jira Data Center/Server"
)
account_id = myself_data["name"]
if account_id is None:
error_msg = f"Could not find accountId, key, or name in user data: {str(myself_data)[:200]}"
raise ValueError(error_msg)
self._current_user_account_id = account_id
return account_id
except HTTPError as http_err:
response_content = ""
if http_err.response is not None:
try:
response_content = http_err.response.text
except Exception:
response_content = "(could not decode response content)"
logger.error(
f"HTTPError getting current user account ID: {http_err}. Response: {response_content[:500]}"
)
error_msg = f"Unable to get current user account ID: {http_err}"
raise Exception(error_msg) from http_err
except Exception as e:
logger.error(f"Error getting current user account ID: {e}", exc_info=True)
error_msg = f"Unable to get current user account ID: {e}"
raise Exception(error_msg) from e
def _get_account_id(self, assignee: str) -> str:
"""
Get the account ID for a username or account ID.
Args:
assignee (str): Username or account ID.
Returns:
str: Account ID.
Raises:
ValueError: If the account ID could not be found.
"""
# If it looks like an account ID already, return it
if assignee.startswith("5") and len(assignee) >= 10:
return assignee
account_id = self._lookup_user_directly(assignee)
if account_id:
return account_id
account_id = self._lookup_user_by_permissions(assignee)
if account_id:
return account_id
error_msg = f"Could not find account ID for user: {assignee}"
raise ValueError(error_msg)
def _lookup_user_directly(self, username: str) -> str | None:
"""
Look up a user account ID directly.
Args:
username (str): Username to look up.
Returns:
Optional[str]: Account ID if found, None otherwise.
"""
try:
params = {}
if self.config.is_cloud:
params["query"] = username
else:
params["username"] = username
response = self.jira.user_find_by_user_string(**params, start=0, limit=1)
if not isinstance(response, list):
msg = f"Unexpected return value type from `jira.user_find_by_user_string`: {type(response)}"
logger.error(msg)
return None
for user in response:
if (
user.get("displayName", "").lower() == username.lower()
or user.get("name", "").lower() == username.lower()
or user.get("emailAddress", "").lower() == username.lower()
):
if self.config.is_cloud:
if "accountId" in user:
return user["accountId"]
else:
if "name" in user:
logger.info(
"Using 'name' for assignee field in Jira Data Center/Server"
)
return user["name"]
elif "key" in user:
logger.info(
"Using 'key' as fallback for assignee name in Jira Data Center/Server"
)
return user["key"]
return None
except Exception as e:
logger.info(f"Error looking up user directly: {str(e)}")
return None
def _lookup_user_by_permissions(self, username: str) -> str | None:
"""
Look up a user account ID by permissions.
Args:
username (str): Username to look up.
Returns:
Optional[str]: Account ID if found, None otherwise.
"""
try:
url = f"{self.config.url}/rest/api/2/user/permission/search"
params = {"query": username, "permissions": "BROWSE"}
auth = None
headers = {}
if self.config.auth_type == "pat":
headers["Authorization"] = f"Bearer {self.config.personal_token}"
else:
auth = (self.config.username or "", self.config.api_token or "")
response = requests.get(
url,
params=params,
auth=auth,
headers=headers,
verify=self.config.ssl_verify,
)
if response.status_code == 200:
data = response.json()
for user in data.get("users", []):
if self.config.is_cloud:
if "accountId" in user:
return user["accountId"]
else:
if "name" in user:
logger.info(
"Using 'name' for assignee field in Jira Data Center/Server"
)
return user["name"]
elif "key" in user:
logger.info(
"Using 'key' as fallback for assignee name in Jira Data Center/Server"
)
return user["key"]
return None
except Exception as e:
logger.info(f"Error looking up user by permissions: {str(e)}")
return None
def _determine_user_api_params(self, identifier: str) -> dict[str, str]:
"""
Determines the correct API parameter and value for the jira.user() call based on the identifier and instance type.
Args:
identifier (str): User identifier (accountId, username, key, or email).
Returns:
Dict[str, str]: A dictionary containing the single keyword argument for self.jira.user().
Raises:
ValueError: If a usable parameter cannot be determined.
"""
api_kwargs: dict[str, str] = {}
# Cloud: identifier is accountId
if self.config.is_cloud and (
re.match(r"^[0-9a-f]{24}$", identifier) or re.match(r"^\d+:\w+", identifier)
):
api_kwargs["account_id"] = identifier
logger.debug(f"Determined param: account_id='{identifier}' (Cloud)")
# Server/DC: username, key, or email
elif not self.config.is_cloud:
if "@" in identifier:
api_kwargs["username"] = identifier
logger.debug(
f"Determined param: username='{identifier}' (Server/DC email - might not work)"
)
elif "-" in identifier and any(c.isdigit() for c in identifier):
api_kwargs["key"] = identifier
logger.debug(f"Determined param: key='{identifier}' (Server/DC)")
else:
api_kwargs["username"] = identifier
logger.debug(f"Determined param: username='{identifier}' (Server/DC)")
# Cloud: identifier is email
elif self.config.is_cloud and "@" in identifier:
try:
resolved_id = self._lookup_user_directly(identifier)
if resolved_id and (
re.match(r"^[0-9a-f]{24}$", resolved_id)
or re.match(r"^\d+:\w+", resolved_id)
):
api_kwargs["account_id"] = resolved_id
logger.debug(
f"Resolved email '{identifier}' to accountId '{resolved_id}'. Determined param: account_id (Cloud)"
)
else:
raise ValueError(
f"Could not resolve email '{identifier}' to a valid account ID for Jira Cloud."
)
except Exception as e:
logger.warning(f"Failed to resolve email '{identifier}': {e}")
raise ValueError(
f"Could not resolve email '{identifier}' to a valid account ID for Jira Cloud."
) from e
# Cloud: identifier is not accountId or email, try to resolve
else:
logger.debug(
f"Identifier '{identifier}' on Cloud is not an account ID or email. Attempting resolution."
)
try:
account_id_resolved = self._get_account_id(identifier)
api_kwargs["account_id"] = account_id_resolved
logger.debug(
f"Resolved identifier '{identifier}' to accountId '{account_id_resolved}'. Determined param: account_id (Cloud)"
)
except ValueError as e:
logger.error(
f"Could not resolve identifier '{identifier}' to a usable format (accountId/username/key)."
)
raise ValueError(
f"Could not determine how to look up user '{identifier}'."
) from e
if not api_kwargs:
logger.error(
f"Logic failed to determine API parameters for identifier '{identifier}'"
)
raise ValueError(
f"Could not determine the correct parameter to use for identifier '{identifier}'."
)
return api_kwargs
def get_user_profile_by_identifier(self, identifier: str) -> "JiraUser":
"""
Retrieve Jira user profile information by identifier.
Args:
identifier (str): User identifier (accountId, username, key, or email).
Returns:
JiraUser: JiraUser model with profile information.
Raises:
ValueError: If the user cannot be found or identifier cannot be resolved.
MCPAtlassianAuthenticationError: If authentication fails.
Exception: For other API errors.
"""
api_kwargs = self._determine_user_api_params(identifier)
try:
logger.debug(f"Calling self.jira.user() with parameters: {api_kwargs}")
user_data = self.jira.user(**api_kwargs)
if not isinstance(user_data, dict):
logger.error(
f"User lookup for '{identifier}' returned unexpected type: {type(user_data)}. Data: {user_data}"
)
raise ValueError(f"User '{identifier}' not found or lookup failed.")
return JiraUser.from_api_response(user_data)
except HTTPError as http_err:
if http_err.response is not None:
response_text = http_err.response.text[:200]
status_code = http_err.response.status_code
if status_code == 404:
raise ValueError(f"User '{identifier}' not found.") from http_err
elif status_code in [401, 403]:
logger.error(
f"Authentication/Permission error for '{identifier}': {status_code}"
)
raise MCPAtlassianAuthenticationError(
f"Permission denied accessing user '{identifier}'."
) from http_err
else:
logger.error(
f"HTTP error {status_code} for '{identifier}': {http_err}. Response: {response_text}"
)
raise Exception(
f"API error getting user profile for '{identifier}': {http_err}"
) from http_err
else:
logger.error(
f"Network or unknown HTTP error (no response object) for '{identifier}': {http_err}"
)
raise Exception(
f"Network error getting user profile for '{identifier}': {http_err}"
) from http_err
except Exception as e:
logger.exception(
f"Unexpected error getting/processing user profile for '{identifier}':"
)
raise Exception(
f"Error processing user profile for '{identifier}': {str(e)}"
) from e
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_oauth_setup.py:
--------------------------------------------------------------------------------
```python
"""Tests for the OAuth setup utilities."""
import json
from unittest.mock import MagicMock, patch
from urllib.parse import parse_qs, urlparse
import pytest
from mcp_atlassian.utils.oauth_setup import (
OAuthSetupArgs,
parse_redirect_uri,
run_oauth_flow,
run_oauth_setup,
)
from tests.utils.assertions import assert_config_contains
from tests.utils.base import BaseAuthTest
from tests.utils.mocks import MockEnvironment, MockOAuthServer
class TestCallbackHandlerLogic:
"""Tests for URL parsing logic."""
@pytest.mark.parametrize(
"path,expected_params",
[
(
"/callback?code=test-auth-code&state=test-state",
{"code": ["test-auth-code"], "state": ["test-state"]},
),
(
"/callback?error=access_denied&error_description=User+denied+access",
{"error": ["access_denied"]},
),
("/callback?state=test-state", {"state": ["test-state"]}),
("/callback", {}),
],
)
def test_url_parsing(self, path, expected_params):
"""Test URL parsing for various callback scenarios."""
query = urlparse(path).query
params = parse_qs(query)
for key, expected_values in expected_params.items():
assert key in params
assert params[key] == expected_values
class TestRedirectUriParsing:
"""Tests for redirect URI parsing functionality."""
@pytest.mark.parametrize(
"redirect_uri,expected_hostname,expected_port",
[
("http://localhost:8080/callback", "localhost", 8080),
("https://example.com:9443/callback", "example.com", 9443),
("http://localhost/callback", "localhost", 80),
("https://example.com/callback", "example.com", 443),
("http://127.0.0.1:3000/callback", "127.0.0.1", 3000),
("https://secure.domain.com:8443/auth", "secure.domain.com", 8443),
],
)
def test_parse_redirect_uri(self, redirect_uri, expected_hostname, expected_port):
"""Test redirect URI parsing for various formats."""
hostname, port = parse_redirect_uri(redirect_uri)
assert hostname == expected_hostname
assert port == expected_port
class TestOAuthFlow:
"""Tests for OAuth flow orchestration."""
@pytest.fixture(autouse=True)
def reset_oauth_state(self):
"""Reset OAuth global state before each test."""
import mcp_atlassian.utils.oauth_setup as oauth_module
oauth_module.authorization_code = None
oauth_module.authorization_state = None
oauth_module.callback_received = False
oauth_module.callback_error = None
def test_run_oauth_flow_success_localhost(self):
"""Test successful OAuth flow with localhost redirect."""
with MockOAuthServer.mock_oauth_flow() as mocks:
with (
patch(
"mcp_atlassian.utils.oauth_setup.OAuthConfig"
) as mock_oauth_config,
patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
patch(
"mcp_atlassian.utils.oauth_setup.start_callback_server"
) as mock_start_server,
):
# Setup global state after callback
def setup_callback_state():
import mcp_atlassian.utils.oauth_setup as oauth_module
oauth_module.authorization_code = "test-auth-code"
oauth_module.authorization_state = "test-state-token"
return True
mock_wait.side_effect = setup_callback_state
mock_httpd = MagicMock()
mock_start_server.return_value = mock_httpd
# Setup OAuth config mock
mock_config = MagicMock()
mock_config.exchange_code_for_tokens.return_value = True
mock_config.client_id = "test-client-id"
mock_config.client_secret = "test-client-secret"
mock_config.redirect_uri = "http://localhost:8080/callback"
mock_config.scope = "read:jira-work"
mock_config.cloud_id = "test-cloud-id"
mock_config.access_token = "test-access-token"
mock_config.refresh_token = "test-refresh-token"
mock_oauth_config.return_value = mock_config
args = OAuthSetupArgs(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="http://localhost:8080/callback",
scope="read:jira-work",
)
result = run_oauth_flow(args)
assert result is True
mock_start_server.assert_called_once_with(8080)
mocks["browser"].assert_called_once()
mock_config.exchange_code_for_tokens.assert_called_once_with(
"test-auth-code"
)
mock_httpd.shutdown.assert_called_once()
def test_run_oauth_flow_success_external_redirect(self):
"""Test successful OAuth flow with external redirect URI."""
with MockOAuthServer.mock_oauth_flow() as mocks:
with (
patch(
"mcp_atlassian.utils.oauth_setup.OAuthConfig"
) as mock_oauth_config,
patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
patch(
"mcp_atlassian.utils.oauth_setup.start_callback_server"
) as mock_start_server,
):
# Setup callback state
def setup_callback_state():
import mcp_atlassian.utils.oauth_setup as oauth_module
oauth_module.authorization_code = "test-auth-code"
oauth_module.authorization_state = "test-state-token"
return True
mock_wait.side_effect = setup_callback_state
mock_config = MagicMock()
mock_config.exchange_code_for_tokens.return_value = True
mock_config.client_id = "test-client-id"
mock_config.client_secret = "test-client-secret"
mock_config.redirect_uri = "https://example.com/callback"
mock_config.scope = "read:jira-work"
mock_config.cloud_id = "test-cloud-id"
mock_config.access_token = "test-access-token"
mock_config.refresh_token = "test-refresh-token"
mock_oauth_config.return_value = mock_config
args = OAuthSetupArgs(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="https://example.com/callback",
scope="read:jira-work",
)
result = run_oauth_flow(args)
assert result is True
mock_start_server.assert_not_called() # No local server for external redirect
mocks["browser"].assert_called_once()
mock_config.exchange_code_for_tokens.assert_called_once_with(
"test-auth-code"
)
def test_run_oauth_flow_server_start_failure(self):
"""Test OAuth flow when server fails to start."""
with MockOAuthServer.mock_oauth_flow() as mocks:
with patch(
"mcp_atlassian.utils.oauth_setup.start_callback_server"
) as mock_start_server:
mock_start_server.side_effect = OSError("Port already in use")
args = OAuthSetupArgs(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="http://localhost:8080/callback",
scope="read:jira-work",
)
result = run_oauth_flow(args)
assert result is False
mocks["browser"].assert_not_called()
@pytest.mark.parametrize(
"failure_condition,expected_result",
[
("timeout", False),
("state_mismatch", False),
("token_exchange_failure", False),
],
)
def test_run_oauth_flow_failures(self, failure_condition, expected_result):
"""Test OAuth flow failure scenarios."""
with MockOAuthServer.mock_oauth_flow() as mocks:
with (
patch(
"mcp_atlassian.utils.oauth_setup.OAuthConfig"
) as mock_oauth_config,
patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
patch(
"mcp_atlassian.utils.oauth_setup.start_callback_server"
) as mock_start_server,
):
mock_httpd = MagicMock()
mock_start_server.return_value = mock_httpd
mock_config = MagicMock()
mock_oauth_config.return_value = mock_config
if failure_condition == "timeout":
mock_wait.return_value = False
elif failure_condition == "state_mismatch":
def setup_mismatched_state():
import mcp_atlassian.utils.oauth_setup as oauth_module
oauth_module.authorization_code = "test-auth-code"
oauth_module.authorization_state = "wrong-state"
return True
mock_wait.side_effect = setup_mismatched_state
elif failure_condition == "token_exchange_failure":
def setup_callback_state():
import mcp_atlassian.utils.oauth_setup as oauth_module
oauth_module.authorization_code = "test-auth-code"
oauth_module.authorization_state = "test-state-token"
return True
mock_wait.side_effect = setup_callback_state
mock_config.exchange_code_for_tokens.return_value = False
args = OAuthSetupArgs(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="http://localhost:8080/callback",
scope="read:jira-work",
)
result = run_oauth_flow(args)
assert result == expected_result
mock_httpd.shutdown.assert_called_once()
class TestInteractiveSetup(BaseAuthTest):
"""Tests for the interactive OAuth setup wizard."""
def test_run_oauth_setup_with_env_vars(self):
"""Test interactive setup using environment variables."""
with MockEnvironment.oauth_env() as env_vars:
with (
patch("builtins.input", side_effect=["", "", "", ""]),
patch(
"mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=True
) as mock_flow,
):
result = run_oauth_setup()
assert result == 0
mock_flow.assert_called_once()
args = mock_flow.call_args[0][0]
assert_config_contains(
vars(args),
client_id=env_vars["ATLASSIAN_OAUTH_CLIENT_ID"],
client_secret=env_vars["ATLASSIAN_OAUTH_CLIENT_SECRET"],
)
@pytest.mark.parametrize(
"input_values,expected_result",
[
(
[
"user-client-id",
"user-secret",
"http://localhost:9000/callback",
"read:jira-work",
],
0,
),
(["", "client-secret", "", ""], 1), # Missing client ID
(["client-id", "", "", ""], 1), # Missing client secret
],
)
def test_run_oauth_setup_user_input(self, input_values, expected_result):
"""Test interactive setup with various user inputs."""
with MockEnvironment.clean_env():
with (
patch("builtins.input", side_effect=input_values),
patch(
"mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=True
) as mock_flow,
):
result = run_oauth_setup()
assert result == expected_result
if expected_result == 0:
mock_flow.assert_called_once()
else:
mock_flow.assert_not_called()
def test_run_oauth_setup_flow_failure(self):
"""Test interactive setup when OAuth flow fails."""
with MockEnvironment.clean_env():
with (
patch(
"builtins.input", side_effect=["client-id", "client-secret", "", ""]
),
patch(
"mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=False
),
):
result = run_oauth_setup()
assert result == 1
class TestOAuthSetupArgs:
"""Tests for the OAuthSetupArgs dataclass."""
def test_oauth_setup_args_creation(self):
"""Test OAuthSetupArgs dataclass creation."""
args = OAuthSetupArgs(
client_id="test-id",
client_secret="test-secret",
redirect_uri="http://localhost:8080/callback",
scope="read:jira-work",
)
expected_config = {
"client_id": "test-id",
"client_secret": "test-secret",
"redirect_uri": "http://localhost:8080/callback",
"scope": "read:jira-work",
}
assert_config_contains(vars(args), **expected_config)
class TestConfigurationGeneration:
"""Tests for configuration output functionality."""
def test_configuration_serialization(self):
"""Test JSON configuration serialization."""
test_config = {
"client_id": "test-id",
"client_secret": "test-secret",
"redirect_uri": "http://localhost:8080/callback",
"scope": "read:jira-work",
"cloud_id": "test-cloud-id",
}
json_str = json.dumps(test_config, indent=4)
assert "test-id" in json_str
assert "test-cloud-id" in json_str
# Verify it can be parsed back
parsed = json.loads(json_str)
assert_config_contains(parsed, **test_config)
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/v2_adapter.py:
--------------------------------------------------------------------------------
```python
"""Confluence REST API v2 adapter for OAuth authentication.
This module provides direct v2 API calls to handle the deprecated v1 endpoints
when using OAuth authentication. The v1 endpoints have been removed for OAuth
but still work for API token authentication.
"""
import logging
from typing import Any
import requests
from requests.exceptions import HTTPError
logger = logging.getLogger("mcp-atlassian")
class ConfluenceV2Adapter:
"""Adapter for Confluence REST API v2 operations when using OAuth."""
def __init__(self, session: requests.Session, base_url: str) -> None:
"""Initialize the v2 adapter.
Args:
session: Authenticated requests session (OAuth configured)
base_url: Base URL for the Confluence instance
"""
self.session = session
self.base_url = base_url
def _get_space_id(self, space_key: str) -> str:
"""Get space ID from space key using v2 API.
Args:
space_key: The space key to look up
Returns:
The space ID
Raises:
ValueError: If space not found or API error
"""
try:
# Use v2 spaces endpoint to get space ID
url = f"{self.base_url}/api/v2/spaces"
params = {"keys": space_key}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
if not results:
raise ValueError(f"Space with key '{space_key}' not found")
space_id = results[0].get("id")
if not space_id:
raise ValueError(f"No ID found for space '{space_key}'")
return space_id
except HTTPError as e:
logger.error(f"HTTP error getting space ID for '{space_key}': {e}")
raise ValueError(f"Failed to get space ID for '{space_key}': {e}") from e
except Exception as e:
logger.error(f"Error getting space ID for '{space_key}': {e}")
raise ValueError(f"Failed to get space ID for '{space_key}': {e}") from e
def create_page(
self,
space_key: str,
title: str,
body: str,
parent_id: str | None = None,
representation: str = "storage",
status: str = "current",
) -> dict[str, Any]:
"""Create a page using the v2 API.
Args:
space_key: The key of the space to create the page in
title: The title of the page
body: The content body in the specified representation
parent_id: Optional parent page ID
representation: Content representation format (default: "storage")
status: Page status (default: "current")
Returns:
The created page data from the API response
Raises:
ValueError: If page creation fails
"""
try:
# Get space ID from space key
space_id = self._get_space_id(space_key)
# Prepare request data for v2 API
data = {
"spaceId": space_id,
"status": status,
"title": title,
"body": {
"representation": representation,
"value": body,
},
}
# Add parent if specified
if parent_id:
data["parentId"] = parent_id
# Make the v2 API call
url = f"{self.base_url}/api/v2/pages"
response = self.session.post(url, json=data)
response.raise_for_status()
result = response.json()
logger.debug(f"Successfully created page '{title}' with v2 API")
# Convert v2 response to v1-compatible format for consistency
return self._convert_v2_to_v1_format(result, space_key)
except HTTPError as e:
logger.error(f"HTTP error creating page '{title}': {e}")
if e.response is not None:
logger.error(f"Response content: {e.response.text}")
raise ValueError(f"Failed to create page '{title}': {e}") from e
except Exception as e:
logger.error(f"Error creating page '{title}': {e}")
raise ValueError(f"Failed to create page '{title}': {e}") from e
def _get_page_version(self, page_id: str) -> int:
"""Get the current version number of a page.
Args:
page_id: The ID of the page
Returns:
The current version number
Raises:
ValueError: If page not found or API error
"""
try:
url = f"{self.base_url}/api/v2/pages/{page_id}"
params = {"body-format": "storage"}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
version_number = data.get("version", {}).get("number")
if version_number is None:
raise ValueError(f"No version number found for page '{page_id}'")
return version_number
except HTTPError as e:
logger.error(f"HTTP error getting page version for '{page_id}': {e}")
raise ValueError(f"Failed to get page version for '{page_id}': {e}") from e
except Exception as e:
logger.error(f"Error getting page version for '{page_id}': {e}")
raise ValueError(f"Failed to get page version for '{page_id}': {e}") from e
def update_page(
self,
page_id: str,
title: str,
body: str,
representation: str = "storage",
version_comment: str = "",
status: str = "current",
) -> dict[str, Any]:
"""Update a page using the v2 API.
Args:
page_id: The ID of the page to update
title: The new title of the page
body: The new content body in the specified representation
representation: Content representation format (default: "storage")
version_comment: Optional comment for this version
status: Page status (default: "current")
Returns:
The updated page data from the API response
Raises:
ValueError: If page update fails
"""
try:
# Get current version and increment it
current_version = self._get_page_version(page_id)
new_version = current_version + 1
# Prepare request data for v2 API
data = {
"id": page_id,
"status": status,
"title": title,
"body": {
"representation": representation,
"value": body,
},
"version": {
"number": new_version,
},
}
# Add version comment if provided
if version_comment:
data["version"]["message"] = version_comment
# Make the v2 API call
url = f"{self.base_url}/api/v2/pages/{page_id}"
response = self.session.put(url, json=data)
response.raise_for_status()
result = response.json()
logger.debug(f"Successfully updated page '{title}' with v2 API")
# Convert v2 response to v1-compatible format for consistency
# For update, we need to extract space key from the result
space_id = result.get("spaceId")
space_key = self._get_space_key_from_id(space_id) if space_id else "unknown"
return self._convert_v2_to_v1_format(result, space_key)
except HTTPError as e:
logger.error(f"HTTP error updating page '{page_id}': {e}")
if e.response is not None:
logger.error(f"Response content: {e.response.text}")
raise ValueError(f"Failed to update page '{page_id}': {e}") from e
except Exception as e:
logger.error(f"Error updating page '{page_id}': {e}")
raise ValueError(f"Failed to update page '{page_id}': {e}") from e
def _get_space_key_from_id(self, space_id: str) -> str:
"""Get space key from space ID using v2 API.
Args:
space_id: The space ID to look up
Returns:
The space key
Raises:
ValueError: If space not found or API error
"""
try:
# Use v2 spaces endpoint to get space key
url = f"{self.base_url}/api/v2/spaces/{space_id}"
response = self.session.get(url)
response.raise_for_status()
data = response.json()
space_key = data.get("key")
if not space_key:
raise ValueError(f"No key found for space ID '{space_id}'")
return space_key
except HTTPError as e:
logger.error(f"HTTP error getting space key for ID '{space_id}': {e}")
# Return the space_id as fallback
return space_id
except Exception as e:
logger.error(f"Error getting space key for ID '{space_id}': {e}")
# Return the space_id as fallback
return space_id
def get_page(
self,
page_id: str,
expand: str | None = None,
) -> dict[str, Any]:
"""Get a page using the v2 API.
Args:
page_id: The ID of the page to retrieve
expand: Fields to expand in the response (not used in v2 API, for compatibility only)
Returns:
The page data from the API response in v1-compatible format
Raises:
ValueError: If page retrieval fails
"""
try:
# Make the v2 API call to get the page
url = f"{self.base_url}/api/v2/pages/{page_id}"
# Convert v1 expand parameters to v2 format
params = {"body-format": "storage"}
response = self.session.get(url, params=params)
response.raise_for_status()
v2_response = response.json()
logger.debug(f"Successfully retrieved page '{page_id}' with v2 API")
# Get space key from space ID
space_id = v2_response.get("spaceId")
space_key = self._get_space_key_from_id(space_id) if space_id else "unknown"
# Convert v2 response to v1-compatible format
v1_compatible = self._convert_v2_to_v1_format(v2_response, space_key)
# Add body.storage structure if body content exists
if "body" in v2_response and v2_response["body"].get("storage"):
storage_value = v2_response["body"]["storage"].get("value", "")
v1_compatible["body"] = {
"storage": {"value": storage_value, "representation": "storage"}
}
# Add space information with more details
if space_id:
v1_compatible["space"] = {
"key": space_key,
"id": space_id,
}
# Add version information
if "version" in v2_response:
v1_compatible["version"] = {
"number": v2_response["version"].get("number", 1)
}
return v1_compatible
except HTTPError as e:
logger.error(f"HTTP error getting page '{page_id}': {e}")
if e.response is not None:
logger.error(f"Response content: {e.response.text}")
raise ValueError(f"Failed to get page '{page_id}': {e}") from e
except Exception as e:
logger.error(f"Error getting page '{page_id}': {e}")
raise ValueError(f"Failed to get page '{page_id}': {e}") from e
def delete_page(self, page_id: str) -> bool:
"""Delete a page using the v2 API.
Args:
page_id: The ID of the page to delete
Returns:
True if the page was successfully deleted, False otherwise
Raises:
ValueError: If page deletion fails
"""
try:
# Make the v2 API call to delete the page
url = f"{self.base_url}/api/v2/pages/{page_id}"
response = self.session.delete(url)
response.raise_for_status()
logger.debug(f"Successfully deleted page '{page_id}' with v2 API")
# Check if status code indicates success (204 No Content is typical for deletes)
if response.status_code in [200, 204]:
return True
# If we get here, it's an unexpected success status
logger.warning(
f"Delete page returned unexpected status {response.status_code}"
)
return True
except HTTPError as e:
logger.error(f"HTTP error deleting page '{page_id}': {e}")
if e.response is not None:
logger.error(f"Response content: {e.response.text}")
raise ValueError(f"Failed to delete page '{page_id}': {e}") from e
except Exception as e:
logger.error(f"Error deleting page '{page_id}': {e}")
raise ValueError(f"Failed to delete page '{page_id}': {e}") from e
def _convert_v2_to_v1_format(
self, v2_response: dict[str, Any], space_key: str
) -> dict[str, Any]:
"""Convert v2 API response to v1-compatible format.
This ensures compatibility with existing code that expects v1 response format.
Args:
v2_response: The response from v2 API
space_key: The space key (needed since v2 response uses space ID)
Returns:
Response formatted like v1 API for compatibility
"""
# Map v2 response fields to v1 format
v1_compatible = {
"id": v2_response.get("id"),
"type": "page",
"status": v2_response.get("status"),
"title": v2_response.get("title"),
"space": {
"key": space_key,
"id": v2_response.get("spaceId"),
},
"version": {
"number": v2_response.get("version", {}).get("number", 1),
},
"_links": v2_response.get("_links", {}),
}
# Add body if present in v2 response
if "body" in v2_response:
v1_compatible["body"] = {
"storage": {
"value": v2_response["body"].get("storage", {}).get("value", ""),
"representation": "storage",
}
}
return v1_compatible
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_transitions.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira Transitions mixin."""
from unittest.mock import MagicMock
import pytest
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.transitions import TransitionsMixin
from mcp_atlassian.models.jira import (
JiraIssue,
JiraStatus,
JiraStatusCategory,
JiraTransition,
)
class TestTransitionsMixin:
"""Tests for the TransitionsMixin class."""
@pytest.fixture
def transitions_mixin(self, jira_fetcher: JiraFetcher) -> TransitionsMixin:
"""Create a TransitionsMixin instance with mocked dependencies."""
mixin = jira_fetcher
# Create a get_issue method to allow returning JiraIssue
mixin.get_issue = MagicMock(
return_value=JiraIssue(
id="12345",
key="TEST-123",
summary="Test Issue",
description="Issue content",
status=JiraStatus(
id="1",
name="Open",
category=JiraStatusCategory(
id=1, key="open", name="To Do", color_name="blue-gray"
),
),
)
)
# Set up mock for get_transitions_models
mock_transitions = [
JiraTransition(
id="10",
name="Start Progress",
to_status=JiraStatus(id="2", name="In Progress"),
)
]
mixin.get_transitions_models = MagicMock(return_value=mock_transitions)
return mixin
def test_get_available_transitions_list_format(
self, transitions_mixin: TransitionsMixin
):
"""Test get_available_transitions with list format response."""
# Setup mock response - list format
mock_transitions = [
{"id": "10", "name": "In Progress", "to_status": "In Progress"},
{"id": "11", "name": "Done", "status": "Done"},
]
transitions_mixin.jira.get_issue_transitions.return_value = mock_transitions
# Call the method
result = transitions_mixin.get_available_transitions("TEST-123")
# Verify
assert len(result) == 2
assert result[0]["id"] == "10"
assert result[0]["name"] == "In Progress"
assert result[0]["to_status"] == "In Progress"
assert result[1]["id"] == "11"
assert result[1]["name"] == "Done"
assert result[1]["to_status"] == "Done"
def test_get_available_transitions_empty_response(
self, transitions_mixin: TransitionsMixin
):
"""Test get_available_transitions with empty response."""
# Setup mock response - empty
transitions_mixin.jira.get_issue_transitions.return_value = {}
# Call the method
result = transitions_mixin.get_available_transitions("TEST-123")
# Verify
assert isinstance(result, list)
assert len(result) == 0
def test_get_available_transitions_invalid_format(
self, transitions_mixin: TransitionsMixin
):
"""Test get_available_transitions with invalid format response."""
# Setup mock response - invalid format
transitions_mixin.jira.get_issue_transitions.return_value = "invalid"
# Call the method
result = transitions_mixin.get_available_transitions("TEST-123")
# Verify
assert isinstance(result, list)
assert len(result) == 0
def test_get_available_transitions_with_error(
self, transitions_mixin: TransitionsMixin
):
"""Test get_available_transitions error handling."""
# Setup mock to raise exception
transitions_mixin.jira.get_issue_transitions.side_effect = Exception(
"Transition fetch error"
)
# Call the method and verify exception
with pytest.raises(
Exception, match="Error getting transitions: Transition fetch error"
):
transitions_mixin.get_available_transitions("TEST-123")
def test_transition_issue_basic(self, transitions_mixin: TransitionsMixin):
"""Test transition_issue with basic parameters."""
# Call the method
result = transitions_mixin.transition_issue("TEST-123", "10")
# Verify
transitions_mixin.jira.set_issue_status.assert_called_once_with(
issue_key="TEST-123", status_name="In Progress", fields=None, update=None
)
transitions_mixin.get_issue.assert_called_once_with("TEST-123")
assert isinstance(result, JiraIssue)
assert result.key == "TEST-123"
assert result.summary == "Test Issue"
assert result.description == "Issue content"
def test_transition_issue_with_int_id(self, transitions_mixin: TransitionsMixin):
"""Test transition_issue with int transition ID."""
# Call the method with int ID
transitions_mixin.transition_issue("TEST-123", 10)
# Verify status name is used instead of ID
transitions_mixin.jira.set_issue_status.assert_called_once_with(
issue_key="TEST-123", status_name="In Progress", fields=None, update=None
)
def test_transition_issue_with_fields(self, transitions_mixin: TransitionsMixin):
"""Test transition_issue with fields."""
# Mock _sanitize_transition_fields to return the fields
transitions_mixin._sanitize_transition_fields = MagicMock(
return_value={"summary": "Updated"}
)
# Call the method with fields
fields = {"summary": "Updated"}
transitions_mixin.transition_issue("TEST-123", "10", fields=fields)
# Verify fields were passed correctly
transitions_mixin.jira.set_issue_status.assert_called_once_with(
issue_key="TEST-123",
status_name="In Progress",
fields={"summary": "Updated"},
update=None,
)
def test_transition_issue_with_empty_sanitized_fields(
self, transitions_mixin: TransitionsMixin
):
"""Test transition_issue with empty sanitized fields."""
# Mock _sanitize_transition_fields to return empty dict
transitions_mixin._sanitize_transition_fields = MagicMock(return_value={})
# Call the method with fields that will be sanitized to empty
fields = {"invalid": "field"}
transitions_mixin.transition_issue("TEST-123", "10", fields=fields)
# Verify fields were passed as None
transitions_mixin.jira.set_issue_status.assert_called_once_with(
issue_key="TEST-123", status_name="In Progress", fields=None, update=None
)
def test_transition_issue_with_comment(self, transitions_mixin: TransitionsMixin):
"""Test transition_issue with comment."""
# Setup
comment = "Test comment"
# Define a side effect to record what's passed to _add_comment_to_transition_data
def add_comment_side_effect(transition_data, comment_text):
transition_data["update"] = {"comment": [{"add": {"body": comment_text}}]}
# Mock _add_comment_to_transition_data
transitions_mixin._add_comment_to_transition_data = MagicMock(
side_effect=add_comment_side_effect
)
# Call the method with comment
transitions_mixin.transition_issue("TEST-123", "10", comment=comment)
# Verify _add_comment_to_transition_data was called
transitions_mixin._add_comment_to_transition_data.assert_called_once()
# Verify set_issue_status was called with the right parameters
transitions_mixin.jira.set_issue_status.assert_called_once_with(
issue_key="TEST-123",
status_name="In Progress",
fields=None,
update={"comment": [{"add": {"body": comment}}]},
)
def test_transition_issue_with_error(self, transitions_mixin: TransitionsMixin):
"""Test transition_issue error handling."""
# Setup mock to raise exception
transitions_mixin.jira.set_issue_status.side_effect = Exception(
"Transition error"
)
# Call the method and verify exception
with pytest.raises(
ValueError,
match="Error transitioning issue TEST-123 with transition ID 10: Transition error",
):
transitions_mixin.transition_issue("TEST-123", "10")
def test_transition_issue_without_status_name(
self, transitions_mixin: TransitionsMixin
):
"""Test transition_issue when status name is not available."""
# Setup - create a transition without to_status
mock_transitions = [
JiraTransition(
id="10",
name="Start Progress",
to_status=None,
)
]
transitions_mixin.get_transitions_models = MagicMock(
return_value=mock_transitions
)
# Add mock for set_issue_status_by_transition_id
transitions_mixin.jira.set_issue_status_by_transition_id = MagicMock()
# Call the method
result = transitions_mixin.transition_issue("TEST-123", "10")
# Verify direct transition ID was used
transitions_mixin.jira.set_issue_status_by_transition_id.assert_called_once_with(
issue_key="TEST-123", transition_id=10
)
# Verify standard status call was not made
transitions_mixin.jira.set_issue_status.assert_not_called()
# Verify result
transitions_mixin.get_issue.assert_called_once_with("TEST-123")
assert isinstance(result, JiraIssue)
def test_normalize_transition_id(self, transitions_mixin: TransitionsMixin):
"""Test _normalize_transition_id with various input types."""
# Test with string
assert transitions_mixin._normalize_transition_id("10") == 10
# Test with non-digit string
assert transitions_mixin._normalize_transition_id("workflow") == "workflow"
# Test with int
assert transitions_mixin._normalize_transition_id(10) == 10
# Test with dict containing id
assert transitions_mixin._normalize_transition_id({"id": "10"}) == 10
# Test with dict containing int id
assert transitions_mixin._normalize_transition_id({"id": 10}) == 10
# Test with None
assert transitions_mixin._normalize_transition_id(None) == 0
def test_sanitize_transition_fields_basic(
self, transitions_mixin: TransitionsMixin
):
"""Test _sanitize_transition_fields with basic fields."""
# Simple fields
fields = {"resolution": {"name": "Fixed"}, "priority": {"name": "High"}}
result = transitions_mixin._sanitize_transition_fields(fields)
# Fields should be passed through unchanged
assert result == fields
def test_sanitize_transition_fields_with_none_values(
self, transitions_mixin: TransitionsMixin
):
"""Test _sanitize_transition_fields with None values."""
# Fields with None values
fields = {"resolution": {"name": "Fixed"}, "priority": None}
result = transitions_mixin._sanitize_transition_fields(fields)
# None values should be skipped
assert "priority" not in result
assert result["resolution"] == {"name": "Fixed"}
def test_sanitize_transition_fields_with_assignee_and_get_account_id(
self, transitions_mixin
):
"""Test _sanitize_transition_fields with assignee when _get_account_id is available."""
# Setup mock for _get_account_id
transitions_mixin._get_account_id = MagicMock(return_value="account-123")
# Fields with assignee
fields = {"assignee": "user.name"}
result = transitions_mixin._sanitize_transition_fields(fields)
# Assignee should be converted to account ID format
transitions_mixin._get_account_id.assert_called_once_with("user.name")
assert result["assignee"] == {"accountId": "account-123"}
def test_sanitize_transition_fields_with_assignee_error(
self, transitions_mixin: TransitionsMixin
):
"""Test _sanitize_transition_fields with assignee that causes error."""
# Setup mock for _get_account_id to raise exception
transitions_mixin._get_account_id = MagicMock(
side_effect=Exception("User not found")
)
# Fields with assignee
fields = {"assignee": "invalid.user", "resolution": {"name": "Fixed"}}
result = transitions_mixin._sanitize_transition_fields(fields)
# Assignee should be skipped due to error, resolution preserved
assert "assignee" not in result
assert result["resolution"] == {"name": "Fixed"}
def test_add_comment_to_transition_data_with_string(
self, transitions_mixin: TransitionsMixin
):
"""Test _add_comment_to_transition_data with string comment."""
# Prepare transition data
transition_data = {"transition": {"id": "10"}}
# Call the method
transitions_mixin._add_comment_to_transition_data(
transition_data, "Test comment"
)
# Verify
assert "update" in transition_data
assert "comment" in transition_data["update"]
assert len(transition_data["update"]["comment"]) == 1
assert transition_data["update"]["comment"][0]["add"]["body"] == "Test comment"
def test_add_comment_to_transition_data_with_non_string(
self, transitions_mixin: TransitionsMixin
):
"""Test _add_comment_to_transition_data with non-string comment."""
# Prepare transition data
transition_data = {"transition": {"id": "10"}}
# Call the method with int
transitions_mixin._add_comment_to_transition_data(transition_data, 123)
# Verify comment was converted to string
assert transition_data["update"]["comment"][0]["add"]["body"] == "123"
def test_add_comment_to_transition_data_with_markdown_to_jira(
self, transitions_mixin
):
"""Test _add_comment_to_transition_data with _markdown_to_jira method."""
# Add _markdown_to_jira method
transitions_mixin._markdown_to_jira = MagicMock(
return_value="Converted comment"
)
# Prepare transition data
transition_data = {"transition": {"id": "10"}}
# Call the method
transitions_mixin._add_comment_to_transition_data(
transition_data, "**Markdown** comment"
)
# Verify
transitions_mixin._markdown_to_jira.assert_called_once_with(
"**Markdown** comment"
)
assert (
transition_data["update"]["comment"][0]["add"]["body"]
== "Converted comment"
)
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/projects.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira project operations."""
import logging
from typing import Any
from ..models import JiraProject
from ..models.jira.search import JiraSearchResult
from ..models.jira.version import JiraVersion
from .client import JiraClient
from .protocols import SearchOperationsProto
logger = logging.getLogger("mcp-jira")
class ProjectsMixin(JiraClient, SearchOperationsProto):
"""Mixin for Jira project operations.
This mixin provides methods for retrieving and working with Jira projects,
including project details, components, versions, and other project-related operations.
"""
def get_all_projects(self, include_archived: bool = False) -> list[dict[str, Any]]:
"""
Get all projects visible to the current user.
Args:
include_archived: Whether to include archived projects
Returns:
List of project data dictionaries
"""
try:
params = {}
if include_archived:
params["includeArchived"] = "true"
projects = self.jira.projects(included_archived=include_archived)
return projects if isinstance(projects, list) else []
except Exception as e:
logger.error(f"Error getting all projects: {str(e)}")
return []
def get_project(self, project_key: str) -> dict[str, Any] | None:
"""
Get project information by key.
Args:
project_key: The project key (e.g. 'PROJ')
Returns:
Project data or None if not found
"""
try:
project_data = self.jira.project(project_key)
if not isinstance(project_data, dict):
msg = f"Unexpected return value type from `jira.project`: {type(project_data)}"
logger.error(msg)
raise TypeError(msg)
return project_data
except Exception as e:
logger.warning(f"Error getting project {project_key}: {e}")
return None
def get_project_model(self, project_key: str) -> JiraProject | None:
"""
Get project information as a JiraProject model.
Args:
project_key: The project key (e.g. 'PROJ')
Returns:
JiraProject model or None if not found
"""
project_data = self.get_project(project_key)
if not project_data:
return None
return JiraProject.from_api_response(project_data)
def project_exists(self, project_key: str) -> bool:
"""
Check if a project exists.
Args:
project_key: The project key to check
Returns:
True if the project exists, False otherwise
"""
try:
project = self.get_project(project_key)
return project is not None
except Exception:
return False
def get_project_components(self, project_key: str) -> list[dict[str, Any]]:
"""
Get all components for a project.
Args:
project_key: The project key
Returns:
List of component data dictionaries
"""
try:
components = self.jira.get_project_components(key=project_key)
return components if isinstance(components, list) else []
except Exception as e:
logger.error(
f"Error getting components for project {project_key}: {str(e)}"
)
return []
def get_project_versions(self, project_key: str) -> list[dict[str, Any]]:
"""
Get all versions for a project.
Args:
project_key: The project key.
Returns:
List of version data dictionaries
"""
try:
raw_versions = self.jira.get_project_versions(key=project_key)
if not isinstance(raw_versions, list):
return []
versions: list[dict[str, Any]] = []
for v in raw_versions:
ver = JiraVersion.from_api_response(v)
versions.append(ver.to_simplified_dict())
return versions
except Exception as e:
logger.error(f"Error getting versions for project {project_key}: {str(e)}")
return []
def get_project_roles(self, project_key: str) -> dict[str, Any]:
"""
Get all roles for a project.
Args:
project_key: The project key
Returns:
Dictionary of role names mapped to role details
"""
try:
roles = self.jira.get_project_roles(project_key=project_key)
return roles if isinstance(roles, dict) else {}
except Exception as e:
logger.error(f"Error getting roles for project {project_key}: {str(e)}")
return {}
def get_project_role_members(
self, project_key: str, role_id: str
) -> list[dict[str, Any]]:
"""
Get members assigned to a specific role in a project.
Args:
project_key: The project key
role_id: The role ID
Returns:
List of role members
"""
try:
members = self.jira.get_project_actors_for_role_project(
project_key=project_key, role_id=role_id
)
# Extract the actors from the response
actors = []
if isinstance(members, dict) and "actors" in members:
actors = members.get("actors", [])
return actors
except Exception as e:
logger.error(
f"Error getting role members for project {project_key}, role {role_id}: {str(e)}"
)
return []
def get_project_permission_scheme(self, project_key: str) -> dict[str, Any] | None:
"""
Get the permission scheme for a project.
Args:
project_key: The project key
Returns:
Permission scheme data if found, None otherwise
"""
try:
scheme = self.jira.get_project_permission_scheme(
project_id_or_key=project_key
)
if not isinstance(scheme, dict):
msg = f"Unexpected return value type from `jira.get_project_permission_scheme`: {type(scheme)}"
logger.error(msg)
raise TypeError(msg)
return scheme
except Exception as e:
logger.error(
f"Error getting permission scheme for project {project_key}: {str(e)}"
)
return None
def get_project_notification_scheme(
self, project_key: str
) -> dict[str, Any] | None:
"""
Get the notification scheme for a project.
Args:
project_key: The project key
Returns:
Notification scheme data if found, None otherwise
"""
try:
scheme = self.jira.get_project_notification_scheme(
project_id_or_key=project_key
)
if not isinstance(scheme, dict):
msg = f"Unexpected return value type from `jira.get_project_notification_scheme`: {type(scheme)}"
logger.error(msg)
raise TypeError(msg)
return scheme
except Exception as e:
logger.error(
f"Error getting notification scheme for project {project_key}: {str(e)}"
)
return None
def get_project_issue_types(self, project_key: str) -> list[dict[str, Any]]:
"""
Get all issue types available for a project.
Args:
project_key: The project key
Returns:
List of issue type data dictionaries
"""
try:
meta = self.jira.issue_createmeta(project=project_key)
if not isinstance(meta, dict):
msg = f"Unexpected return value type from `jira.issue_createmeta`: {type(meta)}"
logger.error(msg)
raise TypeError(msg)
issue_types = []
# Extract issue types from createmeta response
if "projects" in meta and len(meta["projects"]) > 0:
project_data = meta["projects"][0]
if "issuetypes" in project_data:
issue_types = project_data["issuetypes"]
return issue_types
except Exception as e:
logger.error(
f"Error getting issue types for project {project_key}: {str(e)}"
)
return []
def get_project_issues_count(self, project_key: str) -> int:
"""
Get the total number of issues in a project.
Args:
project_key: The project key
Returns:
Count of issues in the project
"""
try:
# Use JQL to count issues in the project
jql = f'project = "{project_key}"'
result = self.jira.jql(jql=jql, fields="key", limit=1)
if not isinstance(result, dict):
msg = f"Unexpected return value type from `jira.jql`: {type(result)}"
logger.error(msg)
raise TypeError(msg)
# Extract total from the response
total = 0
if isinstance(result, dict) and "total" in result:
total = result.get("total", 0)
return total
except Exception as e:
logger.error(
f"Error getting issue count for project {project_key}: {str(e)}"
)
return 0
def get_project_issues(
self, project_key: str, start: int = 0, limit: int = 50
) -> JiraSearchResult:
"""
Get issues for a specific project.
Args:
project_key: The project key
start: Index of the first issue to return
limit: Maximum number of issues to return
Returns:
List of JiraIssue models representing the issues
"""
try:
# Use JQL to get issues in the project
jql = f'project = "{project_key}"'
return self.search_issues(jql, start=start, limit=limit)
except Exception as e:
logger.error(f"Error getting issues for project {project_key}: {str(e)}")
return JiraSearchResult(issues=[], total=0)
def get_project_keys(self) -> list[str]:
"""
Get all project keys.
Returns:
List of project keys
"""
try:
projects = self.get_all_projects()
project_keys: list[str] = []
for project in projects:
key = project.get("key")
if not isinstance(key, str):
msg = f"Unexpected return value type from `get_all_projects`: {type(key)}"
logger.error(msg)
raise TypeError(msg)
project_keys.append(key)
return project_keys
except Exception as e:
logger.error(f"Error getting project keys: {str(e)}")
return []
def get_project_leads(self) -> dict[str, str]:
"""
Get all project leads mapped to their projects.
Returns:
Dictionary mapping project keys to lead usernames
"""
try:
projects = self.get_all_projects()
leads = {}
for project in projects:
if "key" in project and "lead" in project:
key = project.get("key")
lead = project.get("lead", {})
# Handle different formats of lead information
lead_name = None
if isinstance(lead, dict):
lead_name = lead.get("name") or lead.get("displayName")
elif isinstance(lead, str):
lead_name = lead
if key and lead_name:
leads[key] = lead_name
return leads
except Exception as e:
logger.error(f"Error getting project leads: {str(e)}")
return {}
def get_user_accessible_projects(self, username: str) -> list[dict[str, Any]]:
"""
Get projects that a specific user can access.
Args:
username: The username to check access for
Returns:
List of accessible project data dictionaries
"""
try:
# This requires admin permissions
# For non-admins, a different approach might be needed
all_projects = self.get_all_projects()
accessible_projects = []
for project in all_projects:
project_key = project.get("key")
if not project_key:
continue
try:
# Check if user has browse permission for this project
browse_users = (
self.jira.get_users_with_browse_permission_to_a_project(
username=username, project_key=project_key, limit=1
)
)
# If the user is in the list, they have access
user_has_access = False
if isinstance(browse_users, list):
for user in browse_users:
if isinstance(user, dict) and user.get("name") == username:
user_has_access = True
break
if user_has_access:
accessible_projects.append(project)
except Exception:
# Skip projects that cause errors
continue
return accessible_projects
except Exception as e:
logger.error(
f"Error getting accessible projects for user {username}: {str(e)}"
)
return []
def create_project_version(
self,
project_key: str,
name: str,
start_date: str = None,
release_date: str = None,
description: str = None,
) -> dict[str, Any]:
"""
Create a new version in the specified Jira project.
Args:
project_key: The project key (e.g., 'PROJ')
name: The name of the version
start_date: The start date (YYYY-MM-DD, optional)
release_date: The release date (YYYY-MM-DD, optional)
description: Description of the version (optional)
Returns:
The created version object as returned by Jira
"""
return self.create_version(
project=project_key,
name=name,
start_date=start_date,
release_date=release_date,
description=description,
)
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/oauth_setup.py:
--------------------------------------------------------------------------------
```python
"""
OAuth 2.0 Authorization Flow Helper for MCP Atlassian
This module helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud:
1. Opens a browser to the authorization URL
2. Starts a local server to receive the callback with the authorization code
3. Exchanges the authorization code for access and refresh tokens
4. Saves the tokens securely for later use by MCP Atlassian
"""
import http.server
import logging
import os
import socketserver
import threading
import time
import urllib.parse
import webbrowser
from dataclasses import dataclass
from ..utils.oauth import OAuthConfig
# Configure logging
logger = logging.getLogger("mcp-atlassian.oauth-setup")
# Global variables for callback handling
authorization_code = None
authorization_state = None
callback_received = False
callback_error = None
class CallbackHandler(http.server.BaseHTTPRequestHandler):
"""HTTP request handler for OAuth callback."""
def do_GET(self) -> None: # noqa: N802
"""Handle GET requests (OAuth callback)."""
global \
authorization_code, \
callback_received, \
callback_error, \
authorization_state
# Parse the query parameters from the URL
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query)
if "error" in params:
callback_error = params["error"][0]
callback_received = True
self._send_response(f"Authorization failed: {callback_error}")
return
if "code" in params:
authorization_code = params["code"][0]
if "state" in params:
authorization_state = params["state"][0]
callback_received = True
self._send_response(
"Authorization successful! You can close this window now."
)
else:
self._send_response(
"Invalid callback: Authorization code missing", status=400
)
def _send_response(self, message: str, status: int = 200) -> None:
"""Send response to the browser."""
self.send_response(status)
self.send_header("Content-type", "text/html")
self.end_headers()
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Atlassian OAuth Authorization</title>
<style>
body {{
font-family: Arial, sans-serif;
text-align: center;
padding: 40px;
max-width: 600px;
margin: 0 auto;
}}
.message {{
padding: 20px;
border-radius: 5px;
margin-bottom: 20px;
}}
.success {{
background-color: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}}
.error {{
background-color: #f8d7da;
color: #721c24;
border: 1px solid #f5c6cb;
}}
.countdown {{
font-weight: bold;
font-size: 1.2em;
}}
</style>
</head>
<body>
<h1>Atlassian OAuth Authorization</h1>
<div class="message {"success" if status == 200 else "error"}">
<p>{message}</p>
</div>
<p>This window will automatically close in <span class="countdown">5</span> seconds...</p>
<button onclick="window.close()">Close Window Now</button>
<script>
// Countdown timer
var seconds = 5;
var countdown = document.querySelector('.countdown');
var timer = setInterval(function() {{
seconds--;
countdown.textContent = seconds;
if (seconds <= 0) {{
clearInterval(timer);
// Try multiple methods to close the window
window.close();
// If the above doesn't work (which is often the case with modern browsers)
try {{ window.open('', '_self').close(); }} catch (e) {{}}
}}
}}, 1000);
// Force close on success after 5.5 seconds as a fallback
setTimeout(function() {{
// If status is 200 (success), really try hard to close
if ({status} === 200) {{
window.open('about:blank', '_self');
window.close();
}}
}}, 5500);
</script>
</body>
</html>
"""
self.wfile.write(html.encode())
# Make the server quiet
def log_message(self, format: str, *args: str) -> None:
return
def start_callback_server(port: int) -> socketserver.TCPServer:
"""Start a local server to receive the OAuth callback."""
handler = CallbackHandler
httpd = socketserver.TCPServer(("", port), handler)
server_thread = threading.Thread(target=httpd.serve_forever)
server_thread.daemon = True
server_thread.start()
return httpd
def wait_for_callback(timeout: int = 300) -> bool:
"""Wait for the callback to be received."""
start_time = time.time()
while not callback_received and (time.time() - start_time) < timeout:
time.sleep(1)
if not callback_received:
logger.error(
f"Timed out waiting for authorization callback after {timeout} seconds"
)
return False
if callback_error:
logger.error(f"Authorization error: {callback_error}")
return False
return True
def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]:
"""Parse the redirect URI to extract host and port."""
parsed = urllib.parse.urlparse(redirect_uri)
port = parsed.port or (443 if parsed.scheme == "https" else 80)
return parsed.hostname, port
@dataclass
class OAuthSetupArgs:
"""Arguments for the OAuth setup flow."""
client_id: str
client_secret: str
redirect_uri: str
scope: str
def run_oauth_flow(args: OAuthSetupArgs) -> bool:
"""Run the OAuth 2.0 authorization flow."""
# Reset global state (important for multiple runs)
global authorization_code, authorization_state, callback_received, callback_error
authorization_code = None
authorization_state = None
callback_received = False
callback_error = None
# Create OAuth configuration
oauth_config = OAuthConfig(
client_id=args.client_id,
client_secret=args.client_secret,
redirect_uri=args.redirect_uri,
scope=args.scope,
)
# Generate a random state for CSRF protection
import secrets
state = secrets.token_urlsafe(16)
# Start local callback server if using localhost
hostname, port = parse_redirect_uri(args.redirect_uri)
httpd = None
if hostname in ["localhost", "127.0.0.1"]:
logger.info(f"Starting local callback server on port {port}")
try:
httpd = start_callback_server(port)
except OSError as e:
logger.error(f"Failed to start callback server: {e}")
logger.error(f"Make sure port {port} is available and not in use")
return False
# Get the authorization URL
auth_url = oauth_config.get_authorization_url(state=state)
# Open the browser for authorization
logger.info(f"Opening browser for authorization at {auth_url}")
webbrowser.open(auth_url)
logger.info(
"If the browser doesn't open automatically, please visit this URL manually."
)
# Wait for the callback
if not wait_for_callback():
if httpd:
httpd.shutdown()
return False
# Verify state to prevent CSRF attacks
if authorization_state != state:
logger.error("State mismatch! Possible CSRF attack.")
if httpd:
httpd.shutdown()
return False
# Exchange the code for tokens
logger.info("Exchanging authorization code for tokens...")
if oauth_config.exchange_code_for_tokens(authorization_code):
logger.info("✅ OAuth authorization successful!")
logger.info(
f"Access token: {oauth_config.access_token[:10]}...{oauth_config.access_token[-5:]}"
)
logger.info(
f"Refresh token saved: {oauth_config.refresh_token[:5]}...{oauth_config.refresh_token[-3:]}"
)
if oauth_config.cloud_id:
logger.info(f"Cloud ID: {oauth_config.cloud_id}")
# Print environment variable information more clearly
logger.info("\n=== IMPORTANT: ENVIRONMENT VARIABLES ===")
logger.info(
"Your tokens have been securely stored in your system keyring and backup file."
)
logger.info(
"However, to use them in your application, you need these environment variables:"
)
logger.info("")
logger.info(
"Add the following to your .env file or set as environment variables:"
)
logger.info("------------------------------------------------------------")
logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}")
logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}")
logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}")
logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}")
logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}")
logger.info("------------------------------------------------------------")
logger.info("")
logger.info(
"Note: The tokens themselves are not set as environment variables for security reasons."
)
logger.info(
"They are stored securely in your system keyring and will be loaded automatically."
)
logger.info(
f"Token storage location (backup): ~/.mcp-atlassian/oauth-{oauth_config.client_id}.json"
)
# Generate VS Code configuration JSON snippet
import json
vscode_config = {
"mcpServers": {
"mcp-atlassian": {
"command": "docker",
"args": [
"run",
"--rm",
"-i",
"-p",
"8080:8080",
"-e",
"CONFLUENCE_URL",
"-e",
"JIRA_URL",
"-e",
"ATLASSIAN_OAUTH_CLIENT_ID",
"-e",
"ATLASSIAN_OAUTH_CLIENT_SECRET",
"-e",
"ATLASSIAN_OAUTH_REDIRECT_URI",
"-e",
"ATLASSIAN_OAUTH_SCOPE",
"-e",
"ATLASSIAN_OAUTH_CLOUD_ID",
"ghcr.io/sooperset/mcp-atlassian:latest",
],
"env": {
"CONFLUENCE_URL": "https://your-company.atlassian.net/wiki",
"JIRA_URL": "https://your-company.atlassian.net",
"ATLASSIAN_OAUTH_CLIENT_ID": oauth_config.client_id,
"ATLASSIAN_OAUTH_CLIENT_SECRET": oauth_config.client_secret,
"ATLASSIAN_OAUTH_REDIRECT_URI": oauth_config.redirect_uri,
"ATLASSIAN_OAUTH_SCOPE": oauth_config.scope,
"ATLASSIAN_OAUTH_CLOUD_ID": oauth_config.cloud_id,
},
}
}
}
# Pretty print the VS Code configuration JSON
vscode_json = json.dumps(vscode_config, indent=4)
logger.info("\n=== VS CODE CONFIGURATION ===")
logger.info("Add the following to your VS Code settings.json file:")
logger.info("------------------------------------------------------------")
logger.info(vscode_json)
logger.info("------------------------------------------------------------")
logger.info(
"\nNote: If you already have an 'mcp' configuration in settings.json, merge this with your existing configuration."
)
else:
logger.error("Failed to obtain cloud ID!")
if httpd:
httpd.shutdown()
return True
else:
logger.error("Failed to exchange authorization code for tokens")
if httpd:
httpd.shutdown()
return False
def _prompt_for_input(prompt: str, env_var: str = None, is_secret: bool = False) -> str:
"""Prompt the user for input."""
value = os.getenv(env_var, "") if env_var else ""
if value:
if is_secret:
masked = (
value[:3] + "*" * (len(value) - 6) + value[-3:]
if len(value) > 6
else "****"
)
print(f"{prompt} [{masked}]: ", end="")
else:
print(f"{prompt} [{value}]: ", end="")
user_input = input()
return user_input if user_input else value
else:
print(f"{prompt}: ", end="")
return input()
def run_oauth_setup() -> int:
"""Run the OAuth 2.0 setup wizard interactively."""
print("\n=== Atlassian OAuth 2.0 Setup Wizard ===")
print(
"This wizard will guide you through setting up OAuth 2.0 authentication for MCP Atlassian."
)
print("\nYou need to have created an OAuth 2.0 app in your Atlassian account.")
print("You can create one at: https://developer.atlassian.com/console/myapps/")
print("\nPlease provide the following information:\n")
# Check for environment variables first
client_id = _prompt_for_input("OAuth Client ID", "ATLASSIAN_OAUTH_CLIENT_ID")
client_secret = _prompt_for_input(
"OAuth Client Secret", "ATLASSIAN_OAUTH_CLIENT_SECRET", is_secret=True
)
default_redirect = os.getenv(
"ATLASSIAN_OAUTH_REDIRECT_URI", "http://localhost:8080/callback"
)
redirect_uri = (
_prompt_for_input("OAuth Redirect URI", "ATLASSIAN_OAUTH_REDIRECT_URI")
or default_redirect
)
default_scope = os.getenv(
"ATLASSIAN_OAUTH_SCOPE",
"read:jira-work write:jira-work read:confluence-space.summary offline_access",
)
scope = (
_prompt_for_input("OAuth Scopes (space-separated)", "ATLASSIAN_OAUTH_SCOPE")
or default_scope
)
# Validate required arguments
if not client_id:
logger.error("OAuth Client ID is required")
return 1
if not client_secret:
logger.error("OAuth Client Secret is required")
return 1
# Run the OAuth flow
args = OAuthSetupArgs(
client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri,
scope=scope,
)
success = run_oauth_flow(args)
return 0 if success else 1
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_client_oauth.py:
--------------------------------------------------------------------------------
```python
"""Tests for the JiraClient with OAuth authentication."""
import os
from unittest.mock import MagicMock, PropertyMock, patch
import pytest
from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
from mcp_atlassian.jira.client import JiraClient
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig, OAuthConfig
class TestJiraClientOAuth:
"""Tests for JiraClient with OAuth authentication."""
def test_init_with_oauth_config(self):
"""Test initializing the client with OAuth configuration."""
# Create a mock OAuth config with both access and refresh tokens
oauth_config = OAuthConfig(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="https://example.com/callback",
scope="read:jira-work write:jira-work",
cloud_id="test-cloud-id",
access_token="test-access-token",
refresh_token="test-refresh-token",
expires_at=9999999999.0, # Set a future expiry time
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=oauth_config,
)
# Mock dependencies
with (
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch(
"mcp_atlassian.jira.client.configure_oauth_session"
) as mock_configure_oauth,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
patch.object(
OAuthConfig,
"is_token_expired",
new_callable=PropertyMock,
return_value=False,
) as mock_is_expired,
patch.object(
oauth_config, "ensure_valid_token", return_value=True
) as mock_ensure_valid,
):
# Configure the mock to return success for OAuth configuration
mock_configure_oauth.return_value = True
# Initialize client
client = JiraClient(config=config)
# Verify OAuth session configuration was called
mock_configure_oauth.assert_called_once()
# Verify Jira was initialized with the expected parameters
mock_jira.assert_called_once()
jira_kwargs = mock_jira.call_args[1]
assert (
jira_kwargs["url"]
== f"https://api.atlassian.com/ex/jira/{oauth_config.cloud_id}"
)
assert "session" in jira_kwargs
assert jira_kwargs["cloud"] is True
# Verify SSL verification was configured
mock_configure_ssl.assert_called_once()
def test_init_with_oauth_missing_cloud_id(self):
"""Test initializing the client with OAuth but missing cloud_id."""
# Create a mock OAuth config without cloud_id
oauth_config = OAuthConfig(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="https://example.com/callback",
scope="read:jira-work write:jira-work",
# No cloud_id
access_token="test-access-token",
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=oauth_config,
)
# Verify error is raised
with pytest.raises(
ValueError, match="OAuth authentication requires a valid cloud_id"
):
JiraClient(config=config)
def test_init_with_oauth_failed_session_config(self):
"""Test initializing the client with OAuth but failed session configuration."""
# Create a mock OAuth config
oauth_config = OAuthConfig(
client_id="test-client-id",
client_secret="test-client-secret",
redirect_uri="https://example.com/callback",
scope="read:jira-work write:jira-work",
cloud_id="test-cloud-id",
access_token="test-access-token",
refresh_token="test-refresh-token",
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=oauth_config,
)
# Mock dependencies with OAuth configuration failure
with (
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
# Patch where the function is imported, not where it's defined
patch(
"mcp_atlassian.jira.client.configure_oauth_session"
) as mock_configure_oauth,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
patch(
"mcp_atlassian.preprocessing.jira.JiraPreprocessor"
) as mock_preprocessor,
patch.object(
OAuthConfig,
"is_token_expired",
new_callable=PropertyMock,
return_value=False,
) as mock_is_expired,
patch.object(
oauth_config, "ensure_valid_token", return_value=True
) as mock_ensure_valid,
):
# Configure the mock to return failure for OAuth configuration
mock_configure_oauth.return_value = False
# Verify error is raised
with pytest.raises(
MCPAtlassianAuthenticationError,
match="Failed to configure OAuth session",
):
JiraClient(config=config)
def test_init_with_byo_access_token_oauth_config(self):
"""Test initializing the client with BYO Access Token OAuth configuration."""
# Create a mock BYO OAuth config
byo_oauth_config = BYOAccessTokenOAuthConfig(
cloud_id="test-cloud-id", access_token="my-byo-token"
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=byo_oauth_config,
)
# Mock dependencies
with (
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch(
"mcp_atlassian.jira.client.configure_oauth_session"
) as mock_configure_oauth,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
):
# Configure the mock to return success for OAuth configuration
mock_configure_oauth.return_value = True
# Initialize client
client = JiraClient(config=config)
# Verify OAuth session configuration was called
mock_configure_oauth.assert_called_once()
# Verify Jira was initialized with the expected parameters
mock_jira.assert_called_once()
jira_kwargs = mock_jira.call_args[1]
assert (
jira_kwargs["url"]
== f"https://api.atlassian.com/ex/jira/{byo_oauth_config.cloud_id}"
)
assert "session" in jira_kwargs
assert jira_kwargs["cloud"] is True
# Verify SSL verification was configured
mock_configure_ssl.assert_called_once()
def test_init_with_byo_oauth_missing_cloud_id(self):
"""Test initializing with BYO OAuth but missing cloud_id."""
# Create a mock BYO OAuth config with an empty cloud_id
byo_oauth_config = BYOAccessTokenOAuthConfig(
cloud_id="", access_token="my-byo-token"
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=byo_oauth_config,
)
# Verify error is raised
with pytest.raises(
ValueError, match="OAuth authentication requires a valid cloud_id"
):
JiraClient(config=config)
def test_init_with_byo_oauth_failed_session_config(self):
"""Test init with BYO OAuth but failed session configuration."""
# Create a mock BYO OAuth config
byo_oauth_config = BYOAccessTokenOAuthConfig(
cloud_id="test-cloud-id", access_token="my_byo_token"
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=byo_oauth_config,
)
# Mock dependencies with OAuth configuration failure
with (
patch("mcp_atlassian.jira.client.Jira"), # No need to assert mock_jira
patch(
"mcp_atlassian.jira.client.configure_oauth_session"
) as mock_configure_oauth,
patch("mcp_atlassian.jira.client.configure_ssl_verification"),
):
# Configure the mock to return failure for OAuth configuration
mock_configure_oauth.return_value = False
# Verify error is raised
with pytest.raises(
MCPAtlassianAuthenticationError,
match="Failed to configure OAuth session",
):
JiraClient(config=config)
def test_init_with_byo_oauth_empty_token_failed_session_config(self):
"""Test init with BYO OAuth, empty token, so session config fails."""
# Create a mock BYO OAuth config with an empty token
byo_oauth_config_empty_token = BYOAccessTokenOAuthConfig(
cloud_id="test-cloud-id",
access_token="", # Empty token
)
# Create a Jira config with OAuth
config = JiraConfig(
url="https://test.atlassian.net",
auth_type="oauth",
oauth_config=byo_oauth_config_empty_token,
)
# Mock dependencies - configure_oauth_session will be called with real logic
with (
patch("mcp_atlassian.jira.client.Jira"),
patch("mcp_atlassian.jira.client.configure_ssl_verification"),
# We want to test the actual behavior of configure_oauth_session here for empty token
):
# Verify error is raised
with pytest.raises(
MCPAtlassianAuthenticationError,
match="Failed to configure OAuth session",
):
JiraClient(config=config)
def test_from_env_with_oauth(self):
# Mock environment variables
env_vars = {
"JIRA_URL": "https://test.atlassian.net",
"JIRA_AUTH_TYPE": "oauth", # Add auth_type to env vars
"ATLASSIAN_OAUTH_CLIENT_ID": "env-client-id",
"ATLASSIAN_OAUTH_CLIENT_SECRET": "env-client-secret",
"ATLASSIAN_OAUTH_REDIRECT_URI": "https://example.com/callback",
"ATLASSIAN_OAUTH_SCOPE": "read:jira-work",
"ATLASSIAN_OAUTH_CLOUD_ID": "env-cloud-id",
}
# Mock OAuth config and token loading
mock_oauth_config = MagicMock()
mock_oauth_config.cloud_id = "env-cloud-id"
mock_oauth_config.access_token = "env-access-token"
mock_oauth_config.refresh_token = "env-refresh-token"
mock_oauth_config.expires_at = 9999999999.0
with (
patch.dict(os.environ, env_vars),
patch(
"mcp_atlassian.jira.config.get_oauth_config_from_env",
return_value=mock_oauth_config,
),
patch.object(
OAuthConfig,
"is_token_expired",
new_callable=PropertyMock,
return_value=False,
) as mock_is_expired_env,
patch.object(
mock_oauth_config, "ensure_valid_token", return_value=True
) as mock_ensure_valid_env,
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch(
"mcp_atlassian.jira.client.configure_oauth_session", return_value=True
) as mock_configure_oauth,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
):
# Initialize client from environment
client = JiraClient()
# Verify client was initialized with OAuth
assert client.config.auth_type == "oauth"
assert client.config.oauth_config is mock_oauth_config
# Verify Jira was initialized correctly
mock_jira.assert_called_once()
jira_kwargs = mock_jira.call_args[1]
assert (
jira_kwargs["url"]
== f"https://api.atlassian.com/ex/jira/{mock_oauth_config.cloud_id}"
)
assert "session" in jira_kwargs
assert jira_kwargs["cloud"] is True
# Verify OAuth session was configured
mock_configure_oauth.assert_called_once()
def test_from_env_with_byo_token_oauth(self):
"""Test JiraClient.from_env() when BYO token OAuth config is found."""
env_vars = {
"JIRA_URL": "https://test.atlassian.net",
"JIRA_AUTH_TYPE": "oauth",
"ATLASSIAN_OAUTH_ACCESS_TOKEN": "env-byo-access-token",
"ATLASSIAN_OAUTH_CLOUD_ID": "env-byo-cloud-id",
# Ensure other standard OAuth env vars are not set or are ignored
"ATLASSIAN_OAUTH_CLIENT_ID": "",
"ATLASSIAN_OAUTH_CLIENT_SECRET": "",
}
# Mock BYO OAuth config
mock_byo_oauth_config = MagicMock(spec=BYOAccessTokenOAuthConfig)
mock_byo_oauth_config.cloud_id = "env-byo-cloud-id"
mock_byo_oauth_config.access_token = "env-byo-access-token"
# BYO config does not have refresh_token or expires_at in the same way
# and does not have is_token_expired or ensure_valid_token methods
with (
patch.dict(os.environ, env_vars),
patch(
"mcp_atlassian.jira.config.get_oauth_config_from_env",
return_value=mock_byo_oauth_config,
),
patch("mcp_atlassian.jira.client.Jira") as mock_jira,
patch(
"mcp_atlassian.jira.client.configure_oauth_session", return_value=True
) as mock_configure_oauth,
patch(
"mcp_atlassian.jira.client.configure_ssl_verification"
) as mock_configure_ssl,
):
client = JiraClient() # Initializes from env via JiraConfig.from_env()
assert client.config.auth_type == "oauth"
assert client.config.oauth_config is mock_byo_oauth_config
# Verify OAuth session configuration was called
mock_configure_oauth.assert_called_once()
mock_jira.assert_called_once()
jira_kwargs = mock_jira.call_args[1]
assert (
jira_kwargs["url"]
== f"https://api.atlassian.com/ex/jira/{mock_byo_oauth_config.cloud_id}"
)
mock_configure_ssl.assert_called_once()
def test_from_env_with_no_oauth_config_found(self):
"""Test JiraClient.from_env() when no OAuth config is found."""
env_vars = {
"JIRA_URL": "https://test.atlassian.net",
"JIRA_AUTH_TYPE": "oauth",
# No OAuth specific variables set
"ATLASSIAN_OAUTH_CLIENT_ID": "",
"ATLASSIAN_OAUTH_ACCESS_TOKEN": "",
# Explicitly clear basic auth credentials
"JIRA_USERNAME": "",
"JIRA_API_TOKEN": "",
}
with (
patch.dict(os.environ, env_vars, clear=True),
patch(
"mcp_atlassian.jira.config.get_oauth_config_from_env",
return_value=None, # Simulate no config found
),
):
with pytest.raises(
ValueError, # Adjusted to actual error raised by JiraConfig.from_env
match=r"Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN, or OAuth configuration.*",
):
JiraClient()
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/transitions.py:
--------------------------------------------------------------------------------
```python
"""Module for Jira transition operations."""
import logging
from typing import Any
from requests.exceptions import HTTPError
from ..exceptions import MCPAtlassianAuthenticationError
from ..models import JiraIssue, JiraTransition
from .client import JiraClient
from .protocols import IssueOperationsProto, UsersOperationsProto
logger = logging.getLogger("mcp-jira")
class TransitionsMixin(JiraClient, IssueOperationsProto, UsersOperationsProto):
"""Mixin for Jira transition operations."""
def get_available_transitions(self, issue_key: str) -> list[dict[str, Any]]:
"""
Get the available status transitions for an issue.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
Returns:
List of available transitions with id, name, and to status details
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
Exception: If there is an error getting transitions
"""
try:
transitions_data = self.jira.get_issue_transitions(issue_key)
result: list[dict[str, Any]] = []
for transition in transitions_data:
# Skip non-dict transitions
if not isinstance(transition, dict):
continue
# Extract the essential information
transition_info = {
"id": transition.get("id", ""),
"name": transition.get("name", ""),
}
# Handle "to" field in different formats
to_status = None
# Option 1: 'to' field with sub-fields
if "to" in transition and isinstance(transition["to"], dict):
to_status = transition["to"].get("name")
# Option 2: 'to_status' field directly
elif "to_status" in transition:
to_status = transition.get("to_status")
# Option 3: 'status' field directly (sometimes used in tests)
elif "status" in transition:
to_status = transition.get("status")
# Add to_status if found in any format
if to_status:
transition_info["to_status"] = to_status
result.append(transition_info)
return result
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API ({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
raise http_err
except Exception as e:
error_msg = f"Error getting transitions for {issue_key}: {str(e)}"
logger.error(error_msg)
raise Exception(f"Error getting transitions: {str(e)}") from e
def get_transitions(self, issue_key: str) -> list[dict[str, Any]]:
"""
Get the raw transitions data for an issue.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
Returns:
Raw transitions data from the API
"""
return self.jira.get_issue_transitions(issue_key)
def get_transitions_models(self, issue_key: str) -> list[JiraTransition]:
"""
Get the available status transitions for an issue as JiraTransition models.
Args:
issue_key: The issue key (e.g. 'PROJ-123')
Returns:
List of JiraTransition models
"""
transitions_data = self.get_transitions(issue_key)
result: list[JiraTransition] = []
for transition_data in transitions_data:
transition = JiraTransition.from_api_response(transition_data)
result.append(transition)
return result
def transition_issue(
self,
issue_key: str,
transition_id: str | int,
fields: dict[str, Any] | None = None,
comment: str | None = None,
) -> JiraIssue:
"""
Transition a Jira issue to a new status.
Args:
issue_key: The key of the issue to transition
transition_id: The ID of the transition to perform (integer preferred, string accepted)
fields: Optional fields to set during the transition
comment: Optional comment to add during the transition
Returns:
JiraIssue model representing the transitioned issue
Raises:
MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
ValueError: If there is an error transitioning the issue
"""
try:
# Normalize transition_id to an integer when possible, or string otherwise
normalized_transition_id = self._normalize_transition_id(transition_id)
# Validate that this is a valid transition ID
valid_transitions = self.get_transitions_models(issue_key)
valid_ids = [t.id for t in valid_transitions]
# Convert string IDs to integers for proper comparison if normalized_transition_id is an integer
if isinstance(normalized_transition_id, int):
valid_ids = [
int(id_val)
if isinstance(id_val, str) and id_val.isdigit()
else id_val
for id_val in valid_ids
]
# Check if the normalized_transition_id is in the list of valid IDs
id_to_check = normalized_transition_id
if id_to_check not in valid_ids:
available_transitions = ", ".join(
f"{t.id} ({t.name})" for t in valid_transitions
)
logger.warning(
f"Transition ID {id_to_check} not in available transitions: {available_transitions}"
)
# Continue anyway as Jira will validate
# Find the target status name corresponding to the transition ID
target_status_name = None
for transition in valid_transitions:
if str(transition.id) == str(normalized_transition_id):
if transition.to_status and transition.to_status.name:
target_status_name = transition.to_status.name
break
# Sanitize fields if provided
fields_for_api = None
if fields:
sanitized_fields = self._sanitize_transition_fields(fields)
if sanitized_fields:
fields_for_api = sanitized_fields
# Prepare update data for comments if provided
update_for_api = None
if comment:
# Create a temporary dict to hold the transition data
temp_transition_data = {}
self._add_comment_to_transition_data(temp_transition_data, comment)
update_for_api = temp_transition_data.get("update")
# Log the transition request for debugging
logger.info(
f"Transitioning issue {issue_key} with transition ID {normalized_transition_id}"
)
logger.debug(f"Fields: {fields_for_api}, Update: {update_for_api}")
# Attempt to transition the issue using the appropriate method
if target_status_name:
# If we have a status name, use set_issue_status
logger.info(f"Using status name '{target_status_name}' for transition")
self.jira.set_issue_status(
issue_key=issue_key,
status_name=target_status_name,
fields=fields_for_api,
update=update_for_api,
)
else:
# If no status name is found, try direct transition ID method
logger.info(f"Using direct transition ID {normalized_transition_id}")
# Convert to integer if it's a string that looks like an integer
if (
isinstance(normalized_transition_id, str)
and normalized_transition_id.isdigit()
):
normalized_transition_id = int(normalized_transition_id)
# Use set_issue_status_by_transition_id for direct ID transition
self.jira.set_issue_status_by_transition_id(
issue_key=issue_key, transition_id=normalized_transition_id
)
# Apply fields and comments separately if needed
if fields_for_api or update_for_api:
payload = {}
if fields_for_api:
payload["fields"] = fields_for_api
if update_for_api:
payload["update"] = update_for_api
if payload:
base_url = self.jira.resource_url("issue")
url = f"{base_url}/{issue_key}"
self.jira.put(url, data=payload)
# Return the updated issue
return self.get_issue(issue_key)
except HTTPError as http_err:
if http_err.response is not None and http_err.response.status_code in [
401,
403,
]:
error_msg = (
f"Authentication failed for Jira API ({http_err.response.status_code}). "
"Token may be expired or invalid. Please verify credentials."
)
logger.error(error_msg)
raise MCPAtlassianAuthenticationError(error_msg) from http_err
else:
logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
raise http_err
except ValueError as e:
logger.error(f"Value error transitioning issue {issue_key}: {str(e)}")
raise
except Exception as e:
error_msg = (
f"Error transitioning issue {issue_key} with transition ID "
f"{transition_id}: {str(e)}"
)
logger.error(error_msg)
raise ValueError(error_msg) from e
def _normalize_transition_id(self, transition_id: str | int | dict) -> str | int:
"""
Normalize the transition ID to a common format.
Args:
transition_id: The transition ID, which can be a string, int, or dict
Returns:
The normalized transition ID as an integer when possible, or string otherwise
"""
logger.debug(
f"Normalizing transition_id: {transition_id}, type: {type(transition_id)}"
)
# Handle empty or None values
if transition_id is None:
logger.warning("Received None for transition_id, using default 0")
return 0
# Handle integer directly (preferred by the API)
if isinstance(transition_id, int):
return transition_id
# Handle string by converting to integer if it's numeric
if isinstance(transition_id, str):
if transition_id.isdigit():
return int(transition_id)
else:
# For non-numeric strings, keep as string for backward compatibility
return transition_id
# Handle dictionary case
if isinstance(transition_id, dict):
logger.warning(
f"Received dict for transition_id when string expected: {transition_id}"
)
# Try to extract ID from standard formats
for key in ["id", "ID", "transitionId", "transition_id"]:
if key in transition_id and transition_id[key] is not None:
value = transition_id[key]
if isinstance(value, str | int):
logger.warning(f"Using {key}={value} as transition ID")
# Try to convert to int if possible
if isinstance(value, int):
return value
elif isinstance(value, str) and value.isdigit():
return int(value)
else:
return str(value)
# If no standard key found, try to use any string or int value
for key, value in transition_id.items():
if value is not None and isinstance(value, str | int):
logger.warning(f"Using {key}={value} as transition ID from dict")
# Try to convert to int if possible
if isinstance(value, int):
return value
elif isinstance(value, str) and value.isdigit():
return int(value)
else:
return str(value)
# Last resort: try to use the first value
try:
first_value = next(iter(transition_id.values()))
if first_value is not None:
# Try to convert to int if possible
if isinstance(first_value, int):
return first_value
elif isinstance(first_value, str) and str(first_value).isdigit():
return int(first_value)
else:
return str(first_value)
except (StopIteration, AttributeError):
pass
# Nothing worked, return a default
logger.error(f"Could not extract valid transition ID from: {transition_id}")
return 0
# For any other type, convert to string with warning
logger.warning(
f"Unexpected type for transition_id: {type(transition_id)}, trying conversion"
)
try:
str_value = str(transition_id)
if str_value.isdigit():
return int(str_value)
else:
return str_value
except Exception as e:
logger.error(f"Failed to convert transition_id: {str(e)}")
return 0
def _sanitize_transition_fields(self, fields: dict[str, Any]) -> dict[str, Any]:
"""
Sanitize fields to ensure they're valid for the Jira API.
Args:
fields: Dictionary of fields to sanitize
Returns:
Dictionary of sanitized fields
"""
sanitized_fields: dict[str, Any] = {}
for key, value in fields.items():
# Skip None values
if value is None:
continue
# Handle special case for assignee
if key == "assignee" and isinstance(value, str):
try:
# Check if _get_account_id is available (from UsersMixin)
account_id = self._get_account_id(value)
sanitized_fields[key] = {"accountId": account_id}
except Exception as e: # noqa: BLE001 - Intentional fallback with logging
error_msg = f"Could not resolve assignee '{value}': {str(e)}"
logger.warning(error_msg)
# Skip this field
continue
else:
sanitized_fields[key] = value
return sanitized_fields
def _add_comment_to_transition_data(
self, transition_data: dict[str, Any], comment: str | int
) -> None:
"""
Add comment to transition data.
Args:
transition_data: The transition data dictionary to update
comment: The comment to add
"""
# Ensure comment is a string
if not isinstance(comment, str):
logger.warning(
f"Comment must be a string, converting from {type(comment)}: {comment}"
)
comment_str = str(comment)
else:
comment_str = comment
# Convert markdown to Jira format if _markdown_to_jira is available
jira_formatted_comment = comment_str
if hasattr(self, "_markdown_to_jira"):
jira_formatted_comment = self._markdown_to_jira(comment_str)
# Add to transition data
transition_data["update"] = {
"comment": [{"add": {"body": jira_formatted_comment}}]
}
```
--------------------------------------------------------------------------------
/tests/fixtures/confluence_mocks.py:
--------------------------------------------------------------------------------
```python
MOCK_CQL_SEARCH_RESPONSE = {
"results": [
{
"content": {
"id": "123456789",
"type": "page",
"status": "current",
"title": "2024-01-01: Team Progress Meeting 01",
"childTypes": {},
"macroRenderedOutput": {},
"restrictions": {},
"_expandable": {
"container": "",
"metadata": "",
"extensions": "",
"operations": "",
"children": "",
"history": "/rest/api/content/123456789/history",
"ancestors": "",
"body": "",
"version": "",
"descendants": "",
"space": "/rest/api/space/TEAM",
},
"_links": {
"webui": "/spaces/TEAM/pages/123456789/2024-01-01+Team+Progress+Meeting+01",
"self": "https://example.atlassian.net/wiki/rest/api/content/123456789",
"tinyui": "/x/ABC123",
},
},
"title": "2024-01-01: Team Progress Meeting 01",
"excerpt": "📅 Date\n2024-01-01\n👥 Participants\nJohn Smith\nJane Doe\nBob Wilson\n!-@123456",
"url": "/spaces/TEAM/pages/123456789/2024-01-01+Team+Progress+Meeting+01",
"resultGlobalContainer": {
"title": "Team Space",
"displayUrl": "/spaces/TEAM",
},
"breadcrumbs": [],
"entityType": "content",
"iconCssClass": "aui-icon content-type-page",
"lastModified": "2024-01-01T08:00:00.000Z",
"friendlyLastModified": "Jan 01, 2024",
"score": 0.0,
}
],
"start": 0,
"limit": 50,
"size": 1,
"totalSize": 1,
"cqlQuery": "parent = 123456789",
"searchDuration": 156,
"_links": {
"base": "https://example.atlassian.net/wiki",
"context": "/wiki",
"self": "https://example.atlassian.net/wiki/rest/api/search?cql=parent%3D123456789",
},
}
MOCK_PAGE_RESPONSE = {
"id": "987654321",
"type": "page",
"status": "current",
"title": "Example Meeting Notes",
"space": {
"id": 11111111,
"key": "PROJ",
"alias": "PROJ",
"name": "Project Space",
"type": "global",
"status": "current",
"_expandable": {
"settings": "/rest/api/space/PROJ/settings",
"metadata": "",
"operations": "",
"lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=PROJ",
"identifiers": "",
"permissions": "",
"roles": "",
"icon": "",
"description": "",
"theme": "/rest/api/space/PROJ/theme",
"history": "",
"homepage": "/rest/api/content/11111111",
},
"_links": {
"webui": "/spaces/PROJ",
"self": "https://example.atlassian.net/wiki/rest/api/space/PROJ",
},
},
"version": {
"by": {
"type": "known",
"accountId": "user123",
"accountType": "atlassian",
"email": "",
"publicName": "Example User (Unlicensed)",
"profilePicture": {
"path": "/wiki/aa-avatar/user123",
"width": 48,
"height": 48,
"isDefault": False,
},
"displayName": "Example User (Unlicensed)",
"isExternalCollaborator": False,
"isGuest": False,
"locale": "en_US",
"accountStatus": "active",
"_expandable": {"operations": "", "personalSpace": ""},
"_links": {
"self": "https://example.atlassian.net/wiki/rest/api/user?accountId=user123"
},
},
"when": "2024-01-01T09:00:00.000Z",
"friendlyWhen": "Jan 01, 2024",
"message": "",
"number": 1,
"minorEdit": False,
"ncsStepVersion": "1234",
"ncsStepVersionSource": "ncs-ack",
"confRev": "confluence$content$987654321.1",
"contentTypeModified": False,
"_expandable": {"collaborators": "", "content": "/rest/api/content/987654321"},
"_links": {
"self": "https://example.atlassian.net/wiki/rest/api/content/987654321/version/1"
},
},
"children": {
"attachment": {
"results": [
{
"id": "att105348",
"type": "attachment",
"status": "current",
"title": "random_geometric_image.svg",
"extensions": {"mediaType": "application/binary", "fileSize": 1098},
},
{
"id": "att9535345",
"type": "attachment",
"status": "current",
"title": "stockmaster-architecture.svg",
"extensions": {"mediaType": "application/binary", "fileSize": 6186},
},
]
}
},
"body": {
"storage": {
"value": '<h2><ac:emoticon ac:name="blue-star" /> Date</h2><p><time datetime="2024-01-01" /></p><h2><ac:emoticon ac:name="blue-star" /> Participants</h2><ul><li><p><ac:link><ri:user ri:account-id="user123" /></ac:link></p></li></ul><h2><ac:emoticon ac:name="blue-star" /> Goals</h2><ul><li><p>Example goal</p></li></ul>'
'<p><ac:structured-macro ac:name="profile">'
'<ac:parameter ac:name="user">'
'<ri:user ri:account-id="user123" />'
"</ac:parameter>"
"</ac:structured-macro></p>",
"representation": "storage",
"embeddedContent": [],
"_expandable": {"content": "/rest/api/content/987654321"},
},
"_expandable": {
"editor": "",
"atlas_doc_format": "",
"view": "",
"export_view": "",
"styled_view": "",
"dynamic": "",
"editor2": "",
"anonymous_export_view": "",
},
},
"macroRenderedOutput": {},
"extensions": {"position": 123456789},
"_expandable": {
"childTypes": "",
"container": "/rest/api/space/PROJ",
"schedulePublishInfo": "",
"metadata": "",
"operations": "",
"schedulePublishDate": "",
"children": "/rest/api/content/987654321/child",
"restrictions": "/rest/api/content/987654321/restriction/byOperation",
"history": "/rest/api/content/987654321/history",
"ancestors": "",
"descendants": "/rest/api/content/987654321/descendant",
},
"_links": {
"editui": "/pages/resumedraft.action?draftId=987654321",
"webui": "/spaces/PROJ/pages/987654321/Example+Meeting+Notes",
"edituiv2": "/spaces/PROJ/pages/edit-v2/987654321",
"context": "/wiki",
"self": "https://example.atlassian.net/wiki/rest/api/content/987654321",
"tinyui": "/x/AbCdE",
"collection": "/rest/api/content",
"base": "https://example.atlassian.net/wiki",
},
}
MOCK_COMMENTS_RESPONSE = {
"results": [
{
"id": "456789123",
"type": "comment",
"status": "current",
"title": "Re: Technical Design Document",
"version": {
"by": {
"type": "known",
"accountId": "712020:abc123-def456-ghi789",
"accountType": "atlassian",
"email": "",
"publicName": "John Doe",
"profilePicture": {
"path": "/wiki/aa-avatar/712020:abc123-def456-ghi789",
"width": 48,
"height": 48,
"isDefault": False,
},
"displayName": "John Doe",
"isExternalCollaborator": False,
"isGuest": False,
"locale": "en_US",
"accountStatus": "active",
"_expandable": {"operations": "", "personalSpace": ""},
"_links": {
"self": "https://company.atlassian.net/wiki/rest/api/user?accountId=712020:abc123-def456-ghi789"
},
},
"when": "2024-01-01T10:00:00.000Z",
"friendlyWhen": "Jan 1, 2024",
"message": "",
"number": 1,
"minorEdit": False,
"contentTypeModified": False,
"_expandable": {
"collaborators": "",
"content": "/rest/api/content/456789123",
},
"_links": {
"self": "https://company.atlassian.net/wiki/rest/api/content/456789123/version/1"
},
},
"macroRenderedOutput": {},
"body": {
"view": {
"value": "<p>Comment content here</p>",
"representation": "view",
"_expandable": {
"webresource": "",
"embeddedContent": "",
"mediaToken": "",
"content": "/rest/api/content/456789123",
},
},
"_expandable": {
"editor": "",
"atlas_doc_format": "",
"export_view": "",
"styled_view": "",
"dynamic": "",
"storage": "",
"editor2": "",
"anonymous_export_view": "",
},
},
"extensions": {
"location": "inline",
"_expandable": {"inlineProperties": "", "resolution": ""},
},
"_expandable": {
"childTypes": "",
"container": "/rest/api/content/123456789",
"schedulePublishInfo": "",
"metadata": "",
"operations": "",
"schedulePublishDate": "",
"children": "/rest/api/content/456789123/child",
"restrictions": "/rest/api/content/456789123/restriction/byOperation",
"history": "/rest/api/content/456789123/history",
"ancestors": "",
"descendants": "/rest/api/content/456789123/descendant",
"space": "/rest/api/space/TEAM",
},
"_links": {
"webui": "/spaces/TEAM/pages/123456789/Technical+Design+Document?focusedCommentId=456789123",
"self": "https://company.atlassian.net/wiki/rest/api/content/456789123",
},
}
]
}
MOCK_LABELS_RESPONSE = {
"results": [
{
"id": "456789123",
"prefix": "global",
"name": "meeting-notes",
"label": "meeting-notes",
},
{
"id": "456789124",
"prefix": "my",
"name": "important",
},
{
"id": "456789125",
"name": "test",
},
],
"start": 0,
"limit": 200,
"size": 3,
"_links": {
"self": "https://company.atlassian.net/wiki/rest/api/content/456789123",
"base": "https://company.atlassian.net",
"context": "",
},
}
MOCK_SPACES_RESPONSE = {
"results": [
{
"id": 11111111,
"key": "~user1",
"alias": "~user1",
"name": "User One",
"type": "personal",
"status": "archived",
"_expandable": {
"settings": "/rest/api/space/~user1/settings",
"metadata": "",
"operations": "",
"lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=~user1",
"identifiers": "",
"permissions": "",
"roles": "",
"icon": "",
"description": "",
"theme": "/rest/api/space/~user1/theme",
"history": "",
"homepage": "/rest/api/content/11111112",
},
"_links": {
"webui": "/spaces/~user1",
"self": "https://example.atlassian.net/wiki/rest/api/space/~user1",
},
},
{
"id": 22222222,
"key": "PROJ",
"alias": "PROJ",
"name": "Project Space",
"type": "global",
"status": "current",
"_expandable": {
"settings": "/rest/api/space/PROJ/settings",
"metadata": "",
"operations": "",
"lookAndFeel": "/rest/api/settings/lookandfeel?spaceKey=PROJ",
"identifiers": "",
"permissions": "",
"roles": "",
"icon": "",
"description": "",
"theme": "/rest/api/space/PROJ/theme",
"history": "",
"homepage": "/rest/api/content/22222223",
},
"_links": {
"webui": "/spaces/PROJ",
"self": "https://example.atlassian.net/wiki/rest/api/space/PROJ",
},
},
],
"start": 0,
"limit": 5,
"size": 2,
"_links": {
"base": "https://example.atlassian.net/wiki",
"context": "/wiki",
"next": "/rest/api/space?next=true&limit=5&start=5",
"self": "https://example.atlassian.net/wiki/rest/api/space",
},
}
MOCK_PAGES_FROM_SPACE_RESPONSE = [
{
"id": "123456789",
"type": "page",
"status": "current",
"title": "Sample Research Paper Title",
"macroRenderedOutput": {},
"body": {
"storage": {
"value": "<h3>[Date]</h3><p><em>Result</em>: Sample Result</p><p><em>Reviews</em>: </p><p>Sample content with various formatting</p>",
"representation": "storage",
"embeddedContent": [],
"_expandable": {"content": "/rest/api/content/123456789"},
},
"_expandable": {
"editor": "",
"atlas_doc_format": "",
"view": "",
"export_view": "",
"styled_view": "",
"dynamic": "",
"editor2": "",
"anonymous_export_view": "",
},
},
"extensions": {"position": 123456789},
"_expandable": {
"container": "/rest/api/space/PROJ",
"metadata": "",
"restrictions": "/rest/api/content/123456789/restriction/byOperation",
"history": "/rest/api/content/123456789/history",
"version": "",
"descendants": "/rest/api/content/123456789/descendant",
"space": "/rest/api/space/PROJ",
"childTypes": "",
"schedulePublishInfo": "",
"operations": "",
"schedulePublishDate": "",
"children": "/rest/api/content/123456789/child",
"ancestors": "",
},
"_links": {
"self": "https://example.atlassian.net/wiki/rest/api/content/123456789",
"tinyui": "/x/ABC123",
"editui": "/pages/resumedraft.action?draftId=123456789",
"webui": "/spaces/PROJ/pages/123456789/Sample+Research+Paper+Title",
"edituiv2": "/spaces/PROJ/pages/edit-v2/123456789",
},
},
{
"id": "987654321",
"type": "page",
"status": "current",
"title": "Example Meeting Notes",
"space": {
"key": "PROJ",
"name": "Project Space",
"type": "global",
"status": "current",
},
"macroRenderedOutput": {},
"body": {
"storage": {
"value": "<h2>Meeting Agenda</h2><p>Example content</p>",
"representation": "storage",
"embeddedContent": [],
"_expandable": {"content": "/rest/api/content/987654321"},
},
"_expandable": {
"editor": "",
"atlas_doc_format": "",
"view": "",
"export_view": "",
"styled_view": "",
"dynamic": "",
"editor2": "",
"anonymous_export_view": "",
},
},
"extensions": {"position": 987654321},
"_links": {
"webui": "/spaces/PROJ/pages/987654321/Example+Meeting+Notes",
"self": "https://example.atlassian.net/wiki/rest/api/content/987654321",
},
},
]
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/common.py:
--------------------------------------------------------------------------------
```python
"""
Common Jira entity models.
This module provides Pydantic models for common Jira entities like users, statuses,
issue types, priorities, attachments, and time tracking.
"""
import logging
from datetime import datetime
from typing import Any
from pydantic import Field
from mcp_atlassian.utils import parse_date
from ..base import ApiModel, TimestampMixin
from ..constants import (
EMPTY_STRING,
JIRA_DEFAULT_ID,
NONE_VALUE,
UNASSIGNED,
UNKNOWN,
)
logger = logging.getLogger(__name__)
class JiraUser(ApiModel):
"""
Model representing a Jira user.
"""
account_id: str | None = None
display_name: str = UNASSIGNED
email: str | None = None
active: bool = True
avatar_url: str | None = None
time_zone: str | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraUser":
"""
Create a JiraUser from a Jira API response.
Args:
data: The user data from the Jira API
Returns:
A JiraUser instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
avatar_url = None
if avatars := data.get("avatarUrls"):
if isinstance(avatars, dict):
# Get the largest available avatar (48x48)
avatar_url = avatars.get("48x48")
else:
logger.debug(f"Unexpected avatar data format: {type(avatars)}")
return cls(
account_id=data.get("accountId"),
display_name=str(data.get("displayName", UNASSIGNED)),
email=data.get("emailAddress"),
active=bool(data.get("active", True)),
avatar_url=avatar_url,
time_zone=data.get("timeZone"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
return {
"display_name": self.display_name,
"name": self.display_name, # Add name for backward compatibility
"email": self.email,
"avatar_url": self.avatar_url,
}
class JiraStatusCategory(ApiModel):
"""
Model representing a Jira status category.
"""
id: int = 0
key: str = EMPTY_STRING
name: str = UNKNOWN
color_name: str = EMPTY_STRING
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "JiraStatusCategory":
"""
Create a JiraStatusCategory from a Jira API response.
Args:
data: The status category data from the Jira API
Returns:
A JiraStatusCategory instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Safely get and convert fields, handling potential type mismatches
id_value = data.get("id", 0)
try:
# Ensure id is an integer
id_value = int(id_value) if id_value is not None else 0
except (ValueError, TypeError):
id_value = 0
return cls(
id=id_value,
key=str(data.get("key", EMPTY_STRING)),
name=str(data.get("name", UNKNOWN)),
color_name=str(data.get("colorName", EMPTY_STRING)),
)
class JiraStatus(ApiModel):
"""
Model representing a Jira issue status.
"""
id: str = JIRA_DEFAULT_ID
name: str = UNKNOWN
description: str | None = None
icon_url: str | None = None
category: JiraStatusCategory | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraStatus":
"""
Create a JiraStatus from a Jira API response.
Args:
data: The status data from the Jira API
Returns:
A JiraStatus instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
category = None
category_data = data.get("statusCategory")
if category_data:
category = JiraStatusCategory.from_api_response(category_data)
# Ensure ID is a string (API sometimes returns integers)
status_id = data.get("id", JIRA_DEFAULT_ID)
if status_id is not None:
status_id = str(status_id)
return cls(
id=status_id,
name=str(data.get("name", UNKNOWN)),
description=data.get("description"),
icon_url=data.get("iconUrl"),
category=category,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"name": self.name,
}
if self.category:
result["category"] = self.category.name
result["color"] = self.category.color_name
return result
class JiraIssueType(ApiModel):
"""
Model representing a Jira issue type.
"""
id: str = JIRA_DEFAULT_ID
name: str = UNKNOWN
description: str | None = None
icon_url: str | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssueType":
"""
Create a JiraIssueType from a Jira API response.
Args:
data: The issue type data from the Jira API
Returns:
A JiraIssueType instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
issue_type_id = data.get("id", JIRA_DEFAULT_ID)
if issue_type_id is not None:
issue_type_id = str(issue_type_id)
return cls(
id=issue_type_id,
name=str(data.get("name", UNKNOWN)),
description=data.get("description"),
icon_url=data.get("iconUrl"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
return {"name": self.name}
class JiraPriority(ApiModel):
"""
Model representing a Jira priority.
"""
id: str = JIRA_DEFAULT_ID
name: str = NONE_VALUE
description: str | None = None
icon_url: str | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraPriority":
"""
Create a JiraPriority from a Jira API response.
Args:
data: The priority data from the Jira API
Returns:
A JiraPriority instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
priority_id = data.get("id", JIRA_DEFAULT_ID)
if priority_id is not None:
priority_id = str(priority_id)
return cls(
id=priority_id,
name=str(data.get("name", NONE_VALUE)),
description=data.get("description"),
icon_url=data.get("iconUrl"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
return {"name": self.name}
class JiraAttachment(ApiModel):
"""
Model representing a Jira issue attachment.
This model contains information about files attached to Jira issues,
including the filename, size, content type, and download URL.
"""
id: str = JIRA_DEFAULT_ID
filename: str = EMPTY_STRING
size: int = 0
content_type: str | None = None
created: str = EMPTY_STRING
author: JiraUser | None = None
url: str | None = None
thumbnail_url: str | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraAttachment":
"""
Create a JiraAttachment from a Jira API response.
Args:
data: The attachment data from the Jira API
Returns:
A JiraAttachment instance
"""
if not data:
return cls()
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
author = None
author_data = data.get("author")
if author_data:
author = JiraUser.from_api_response(author_data)
attachment_id = data.get("id", JIRA_DEFAULT_ID)
if attachment_id is not None:
attachment_id = str(attachment_id)
size = data.get("size", 0)
try:
size = int(size) if size is not None else 0
except (ValueError, TypeError):
size = 0
return cls(
id=attachment_id,
filename=str(data.get("filename", EMPTY_STRING)),
size=size,
content_type=data.get("mimeType"),
created=str(data.get("created", EMPTY_STRING)),
author=author,
url=data.get("content"), # This is actually the download URL
thumbnail_url=data.get("thumbnail"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"filename": self.filename,
"size": self.size,
"url": self.url,
}
if self.content_type:
result["content_type"] = self.content_type
if self.author:
result["author"] = self.author.to_simplified_dict()
if self.thumbnail_url:
result["thumbnail_url"] = self.thumbnail_url
if self.created:
result["created"] = self.created
return result
class JiraTimetracking(ApiModel):
"""
Model for Jira timetracking information.
This represents time estimates and spent time.
"""
original_estimate: str | None = None
remaining_estimate: str | None = None
time_spent: str | None = None
original_estimate_seconds: int | None = None
remaining_estimate_seconds: int | None = None
time_spent_seconds: int | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "JiraTimetracking":
"""
Create a JiraTimetracking from a Jira API response.
Args:
data: The timetracking data from the Jira API
**kwargs: Additional arguments (not used)
Returns:
A JiraTimetracking instance
"""
if not data:
return cls()
if not isinstance(data, dict):
return cls()
return cls(
original_estimate=data.get("originalEstimate"),
remaining_estimate=data.get("remainingEstimate"),
time_spent=data.get("timeSpent"),
original_estimate_seconds=data.get("originalEstimateSeconds"),
remaining_estimate_seconds=data.get("remainingEstimateSeconds"),
time_spent_seconds=data.get("timeSpentSeconds"),
)
def to_simplified_dict(self) -> dict[str, str | int | None]:
"""Convert to simplified dictionary for API response."""
result = {}
if self.original_estimate:
result["original_estimate"] = self.original_estimate
if self.remaining_estimate:
result["remaining_estimate"] = self.remaining_estimate
if self.time_spent:
result["time_spent"] = self.time_spent
return result
class JiraResolution(ApiModel):
"""Model representing a Jira issue resolution."""
id: str = JIRA_DEFAULT_ID
name: str = UNKNOWN
description: str | None = None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraResolution":
"""Create a JiraResolution from a Jira API response."""
if not isinstance(data, dict):
return cls()
resolution_id = data.get("id", JIRA_DEFAULT_ID)
return cls(
id=str(resolution_id),
name=data.get("name", UNKNOWN),
description=data.get("description"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {"name": self.name}
if self.id != JIRA_DEFAULT_ID:
result["id"] = self.id
return result
class JiraChangelogItem(ApiModel):
"""
Model representing a single change item within a changelog entry.
Each change item represents a field that was modified, including
its previous and new values.
"""
field: str = EMPTY_STRING
fieldtype: str = EMPTY_STRING
from_string: str | None = None
to_string: str | None = None
from_id: str | None = None
to_id: str | None = None
@classmethod
def from_api_response(
cls, data: dict[str, Any], **kwargs: Any
) -> "JiraChangelogItem":
"""
Create a JiraChangeItem from a Jira API response.
Args:
data: The change item data from the Jira API
Returns:
A JiraChangeItem instance
"""
if not data or not isinstance(data, dict):
return cls()
return cls(
field=str(data.get("field", EMPTY_STRING)),
fieldtype=str(data.get("fieldtype", EMPTY_STRING)),
from_string=data.get("fromString"),
to_string=data.get("toString"),
from_id=data.get("from"),
to_id=data.get("to"),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result = {
"field": self.field,
"fieldtype": self.fieldtype,
}
if self.from_string is not None:
result["from_string"] = self.from_string
if self.to_string is not None:
result["to_string"] = self.to_string
if self.from_id is not None:
result["from_id"] = self.from_id
if self.to_id is not None:
result["to_id"] = self.to_id
return result
class JiraChangelog(ApiModel, TimestampMixin):
"""
Model representing a Jira issue changelog entry.
A changelog entry represents a set of changes made to an issue at a specific time,
including who made the changes and what was changed.
"""
id: str = JIRA_DEFAULT_ID
author: JiraUser | None = None
created: datetime | None = None
items: list[JiraChangelogItem] = Field(default_factory=list)
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraChangelog":
"""
Create a JiraChangelog from a Jira API response.
Args:
data: The changelog data from the Jira API
Returns:
A JiraChangelog instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
# Extract author data
author = None
author_data = data.get("author")
if author_data:
author = JiraUser.from_api_response(author_data)
# Ensure ID is a string
changelog_id = data.get("id", JIRA_DEFAULT_ID)
if changelog_id is not None:
changelog_id = str(changelog_id)
# Process change items
items = []
items_data = data.get("items", [])
if isinstance(items_data, list):
for item_data in items_data:
item = JiraChangelogItem.from_api_response(item_data)
items.append(item)
# Process created date
created: datetime | None = None
created_data = data.get("created")
if created_data:
created = parse_date(created_data)
return cls(
id=changelog_id,
author=author,
created=created,
items=items,
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result: dict[str, Any] = {}
if self.items:
result["items"] = [item.to_simplified_dict() for item in self.items]
if self.author:
result["author"] = self.author.to_simplified_dict()
if self.created:
result["created"] = str(self.created)
return result
```