This is page 7 of 10. Use http://codebase.md/sooperset/mcp-atlassian?page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/integration/test_content_processing.py:
--------------------------------------------------------------------------------
```python
"""Integration tests for content processing functionality.
These tests validate HTML ↔ Markdown conversion, macro handling,
special character preservation, and performance with large content.
"""
import time
from typing import Any
import pytest
from mcp_atlassian.preprocessing.confluence import ConfluencePreprocessor
from mcp_atlassian.preprocessing.jira import JiraPreprocessor
class MockConfluenceClient:
"""Mock Confluence client for testing user lookups."""
def get_user_details_by_accountid(self, account_id: str) -> dict[str, Any]:
"""Mock user details by account ID."""
return {
"displayName": f"User {account_id}",
"accountType": "atlassian",
"accountStatus": "active",
}
def get_user_details_by_username(self, username: str) -> dict[str, Any]:
"""Mock user details by username (Server/DC compatibility)."""
return {
"displayName": f"User {username}",
"accountType": "atlassian",
"accountStatus": "active",
}
@pytest.fixture
def jira_preprocessor():
"""Create a JiraPreprocessor instance."""
return JiraPreprocessor(base_url="https://example.atlassian.net")
@pytest.fixture
def confluence_preprocessor():
"""Create a ConfluencePreprocessor instance with mock client."""
return ConfluencePreprocessor(
base_url="https://example.atlassian.net",
confluence_client=MockConfluenceClient(),
)
@pytest.mark.integration
class TestJiraContentProcessing:
"""Integration tests for Jira content processing."""
def test_jira_markdown_roundtrip_simple(self, jira_preprocessor):
"""Test simple Jira markup to Markdown and back."""
jira_markup = """h1. Main Title
This is *bold* and _italic_ text.
* List item 1
* List item 2
# Numbered item 1
# Numbered item 2"""
# Convert to Markdown
markdown = jira_preprocessor.jira_to_markdown(jira_markup)
# Convert back to Jira
jira_result = jira_preprocessor.markdown_to_jira(markdown)
# Verify key elements are preserved
assert "h1. Main Title" in jira_result
assert "*bold*" in jira_result
assert "_italic_" in jira_result
assert "* List item 1" in jira_result
# Numbered lists in Jira are converted to "1." format in markdown
assert "1. Numbered item 1" in jira_result or "# Numbered item 1" in jira_result
def test_jira_markdown_roundtrip_complex(self, jira_preprocessor):
"""Test complex Jira markup with code blocks, tables, and formatting."""
jira_markup = """h1. Project Documentation
h2. Overview
This project uses *advanced* features with _emphasis_ and {{inline code}}.
h3. Code Example
{code:python}
def process_data(items):
'''Process a list of items.'''
for item in items:
print(f"Processing {item}")
return len(items)
{code}
h3. Table Example
||Header 1||Header 2||Header 3||
|Cell 1|Cell 2|Cell 3|
|Cell 4|Cell 5|Cell 6|
h3. Features
* Feature with *bold* text
** Nested feature with _italic_
*** Deep nested item
# First step
## Sub-step A
## Sub-step B
bq. This is a block quote with *formatting*
h3. Links and Images
[Jira Documentation|https://docs.atlassian.com/jira]
!image.png|alt=Screenshot!
h3. Special Formatting
+inserted text+
-deleted text-
^superscript^
~subscript~
??citation??
{noformat}
Raw text that should not be formatted
with preserved spacing
{noformat}
{quote}
This is a quoted section
with multiple lines
{quote}
{color:red}Red text{color}
{color:#0000FF}Blue text{color}"""
# Convert to Markdown
markdown = jira_preprocessor.jira_to_markdown(jira_markup)
# Convert back to Jira
jira_result = jira_preprocessor.markdown_to_jira(markdown)
# Verify structure is preserved
assert "h1. Project Documentation" in jira_result
assert "h2. Overview" in jira_result
assert "h3. Code Example" in jira_result
# Verify code block
assert "{code:python}" in jira_result
assert "def process_data(items):" in jira_result
assert "{code}" in jira_result
# Verify table structure
assert "||Header 1||Header 2||Header 3||" in jira_result
assert "|Cell 1|Cell 2|Cell 3|" in jira_result
# Verify lists (may be converted differently)
assert "Feature with" in jira_result and "bold" in jira_result
assert "First step" in jira_result
# Verify special formatting
assert "+inserted text+" in jira_result
assert "-deleted text-" in jira_result
assert "^superscript^" in jira_result
assert "~subscript~" in jira_result
assert "??citation??" in jira_result
# Verify links
assert "Jira Documentation" in jira_result
assert "https://docs.atlassian.com/jira" in jira_result
# Verify color formatting
assert "Red text" in jira_result
assert "Blue text" in jira_result
def test_jira_user_mentions_processing(self, jira_preprocessor):
"""Test processing of Jira user mentions."""
content = """h1. Team Update
[~accountid:12345] completed the task.
[~accountid:67890] is reviewing the changes.
See [PROJ-123|https://example.atlassian.net/browse/PROJ-123|smart-link] for details."""
cleaned = jira_preprocessor.clean_jira_text(content)
# User mentions should be processed
assert "User:12345" in cleaned
assert "User:67890" in cleaned
# Smart link should be converted
assert "[PROJ-123](https://example.atlassian.net/browse/PROJ-123)" in cleaned
def test_jira_html_content_processing(self, jira_preprocessor):
"""Test processing of HTML content in Jira."""
html_content = """<p>This is a <strong>test</strong> with <em>HTML</em> content.</p>
<ul>
<li>Item 1</li>
<li>Item 2 with <code>inline code</code></li>
</ul>
<blockquote>
<p>A quote with <strong>formatting</strong></p>
</blockquote>
<pre><code class="language-python">def hello():
print("Hello, World!")
</code></pre>"""
cleaned = jira_preprocessor.clean_jira_text(html_content)
# Verify HTML is converted to Markdown
assert "**test**" in cleaned
assert "*HTML*" in cleaned
assert "`inline code`" in cleaned
assert "def hello():" in cleaned
def test_jira_nested_lists_preservation(self, jira_preprocessor):
"""Test preservation of nested list structures."""
jira_markup = """* Level 1 item
** Level 2 item
*** Level 3 item
**** Level 4 item
** Another Level 2
* Back to Level 1
# Numbered Level 1
## Numbered Level 2
### Numbered Level 3
## Another Numbered Level 2
# Back to Numbered Level 1"""
markdown = jira_preprocessor.jira_to_markdown(jira_markup)
jira_result = jira_preprocessor.markdown_to_jira(markdown)
# Verify nested structure is preserved (checking for presence of items)
assert "Level 1 item" in jira_result
assert "Level 2 item" in jira_result
assert "Level 3 item" in jira_result
assert "Level 4 item" in jira_result
assert "Numbered Level 1" in jira_result
assert "Numbered Level 2" in jira_result
assert "Numbered Level 3" in jira_result
def test_jira_special_characters_preservation(self, jira_preprocessor):
"""Test preservation of special characters and Unicode."""
jira_markup = """h1. Special Characters Test
Unicode: α β γ δ ε ζ η θ
Emojis: 🚀 💻 ✅ ❌ 📝
Symbols: © ® ™ € £ ¥ § ¶
Special chars in code:
{code}
if (x > 0 && y < 10) {
return x & y | z ^ w;
}
{code}
Math: x² + y² = z²
Quotes: "curly quotes" and 'single quotes'
Dashes: em—dash and en–dash"""
markdown = jira_preprocessor.jira_to_markdown(jira_markup)
jira_result = jira_preprocessor.markdown_to_jira(markdown)
# Verify Unicode preservation
assert "α β γ δ ε ζ η θ" in jira_result
assert "🚀 💻 ✅ ❌ 📝" in jira_result
assert "© ® ™ € £ ¥ § ¶" in jira_result
# Verify special characters in code
assert "x > 0 && y < 10" in jira_result
assert "x & y | z ^ w" in jira_result
# Verify other special characters
assert "x² + y² = z²" in jira_result
assert '"curly quotes"' in jira_result
assert "em—dash" in jira_result
assert "en–dash" in jira_result
def test_jira_large_content_performance(self, jira_preprocessor):
"""Test performance with large content (>1MB)."""
# Generate large content
large_content_parts = []
# Add many sections (increase to 200 for larger content)
for i in range(200):
section = f"""h2. Section {i}
This is paragraph {i} with *bold* and _italic_ text.
* List item {i}.1
* List item {i}.2
* List item {i}.3
{{code:python}}
def function_{i}():
# Function {i} implementation
data = [{{"id": j, "value": j * {i}}} for j in range(100)]
return sum(item["value"] for item in data)
{{code}}
||Header A||Header B||Header C||
|Row {i} Cell 1|Row {i} Cell 2|Row {i} Cell 3|
"""
large_content_parts.append(section)
large_content = "\n".join(large_content_parts)
content_size = len(large_content.encode("utf-8"))
# Ensure content is reasonably large (adjust threshold for test)
assert content_size > 50000 # 50KB is enough for performance testing
# Test conversion performance
start_time = time.time()
markdown = jira_preprocessor.jira_to_markdown(large_content)
markdown_time = time.time() - start_time
start_time = time.time()
jira_result = jira_preprocessor.markdown_to_jira(markdown)
jira_time = time.time() - start_time
# Performance assertions (should complete in reasonable time)
assert markdown_time < 10.0 # Should complete within 10 seconds
assert jira_time < 10.0
# Verify content integrity
assert "Section 0" in jira_result
assert "Section 199" in jira_result
assert (
"function" in jira_result
) # Function names might have escaped underscores
def test_jira_edge_cases(self, jira_preprocessor):
"""Test edge cases in Jira content processing."""
# Empty content
assert jira_preprocessor.jira_to_markdown("") == ""
assert jira_preprocessor.markdown_to_jira("") == ""
assert jira_preprocessor.clean_jira_text("") == ""
assert jira_preprocessor.clean_jira_text(None) == ""
# Malformed markup
malformed = "*unclosed bold _mixed italic*"
result = jira_preprocessor.jira_to_markdown(malformed)
assert "**unclosed bold" in result
# Very long lines
long_line = "x" * 10000
result = jira_preprocessor.jira_to_markdown(long_line)
assert len(result) >= 10000
# Nested code blocks (should not process inner content)
nested = """{code}
{code}
inner code
{code}
{code}"""
result = jira_preprocessor.jira_to_markdown(nested)
assert "inner code" in result
@pytest.mark.integration
class TestConfluenceContentProcessing:
"""Integration tests for Confluence content processing."""
def test_confluence_macro_preservation(self, confluence_preprocessor):
"""Test preservation of Confluence macros during processing."""
html_with_macros = """<p>Page content with macros:</p>
<ac:structured-macro ac:name="info" ac:schema-version="1">
<ac:rich-text-body>
<p>This is an info panel with <strong>formatting</strong></p>
</ac:rich-text-body>
</ac:structured-macro>
<ac:structured-macro ac:name="code" ac:schema-version="1">
<ac:parameter ac:name="language">python</ac:parameter>
<ac:plain-text-body><![CDATA[
def process():
return "Hello, World!"
]]></ac:plain-text-body>
</ac:structured-macro>
<ac:structured-macro ac:name="toc">
<ac:parameter ac:name="maxLevel">3</ac:parameter>
</ac:structured-macro>
<ac:structured-macro ac:name="excerpt">
<ac:rich-text-body>
<p>This is an excerpt of the page.</p>
</ac:rich-text-body>
</ac:structured-macro>"""
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(html_with_macros)
)
# Verify macros are preserved in HTML
assert 'ac:structured-macro ac:name="info"' in processed_html
assert 'ac:structured-macro ac:name="code"' in processed_html
assert 'ac:structured-macro ac:name="toc"' in processed_html
assert 'ac:structured-macro ac:name="excerpt"' in processed_html
# Verify parameters are preserved
assert 'ac:parameter ac:name="language">python' in processed_html
assert 'ac:parameter ac:name="maxLevel">3' in processed_html
def test_confluence_user_mentions_complex(self, confluence_preprocessor):
"""Test complex user mention scenarios in Confluence."""
html_content = """<p>Multiple user mentions:</p>
<ac:link>
<ri:user ri:account-id="user123"/>
</ac:link>
<p>User with link body:</p>
<ac:link>
<ri:user ri:account-id="user456"/>
<ac:link-body>@Custom Name</ac:link-body>
</ac:link>
<ac:structured-macro ac:name="profile">
<ac:parameter ac:name="user">
<ri:user ri:account-id="user789"/>
</ac:parameter>
</ac:structured-macro>
<p>Server/DC user with userkey:</p>
<ac:structured-macro ac:name="profile">
<ac:parameter ac:name="user">
<ri:user ri:userkey="admin"/>
</ac:parameter>
</ac:structured-macro>"""
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(html_content)
)
# Verify all user mentions are processed
assert "@User user123" in processed_markdown
assert "@User user456" in processed_markdown
assert "@User user789" in processed_markdown
assert "@User admin" in processed_markdown
def test_confluence_markdown_roundtrip(self, confluence_preprocessor):
"""Test Markdown to Confluence storage format and processing."""
markdown_content = """# Main Title
## Introduction
This is a **bold** paragraph with *italic* text and `inline code`.
### Code Block
```python
def hello_world():
print("Hello, World!")
return True
```
### Lists
- Item 1
- Nested item 1.1
- Nested item 1.2
- Item 2
1. First step
2. Second step
1. Sub-step 2.1
2. Sub-step 2.2
### Table
| Header 1 | Header 2 | Header 3 |
|----------|----------|----------|
| Cell 1 | Cell 2 | Cell 3 |
| Cell 4 | Cell 5 | Cell 6 |
### Links and Images
[Confluence Documentation](https://confluence.atlassian.com/doc/)

### Blockquote
> This is a blockquote
> with multiple lines
### Horizontal Rule
---
### Special Characters
Unicode: α β γ δ ε ζ η θ
Emojis: 🚀 💻 ✅ ❌ 📝
Math: x² + y² = z²"""
# Convert to Confluence storage format
storage_format = confluence_preprocessor.markdown_to_confluence_storage(
markdown_content
)
# Process the storage format
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(storage_format)
)
# Verify key elements are preserved
assert "Main Title" in processed_markdown
assert "**bold**" in processed_markdown
assert "*italic*" in processed_markdown
assert "`inline code`" in processed_markdown
# Verify code block (may have escaped underscores)
assert (
"hello_world" in processed_markdown or "hello\\_world" in processed_markdown
)
assert "Hello, World!" in processed_markdown
# Verify lists
assert "Item 1" in processed_markdown
assert "Nested item 1.1" in processed_markdown
assert "First step" in processed_markdown
assert "Sub-step 2.1" in processed_markdown
# Verify table (tables might be converted to HTML)
assert "Header 1" in processed_markdown
assert "Cell 1" in processed_markdown
# Verify links
assert "Confluence Documentation" in processed_markdown
assert "https://confluence.atlassian.com/doc/" in processed_markdown
# Verify special characters
assert "α β γ δ ε ζ η θ" in processed_markdown
assert "🚀 💻 ✅ ❌ 📝" in processed_markdown
assert "x² + y² = z²" in processed_markdown
def test_confluence_heading_anchor_control(self, confluence_preprocessor):
"""Test control over heading anchor generation."""
markdown_with_headings = """# Main Title
Content under main title.
## Section One
Content in section one.
### Subsection 1.1
Details here.
## Section Two
More content."""
# Test with anchors disabled (default)
storage_no_anchors = confluence_preprocessor.markdown_to_confluence_storage(
markdown_with_headings
)
assert 'id="main-title"' not in storage_no_anchors.lower()
assert 'id="section-one"' not in storage_no_anchors.lower()
# Test with anchors enabled
storage_with_anchors = confluence_preprocessor.markdown_to_confluence_storage(
markdown_with_headings, enable_heading_anchors=True
)
# Verify headings are still present (they may have anchor macros)
assert "Main Title</h1>" in storage_with_anchors
assert "Section One</h2>" in storage_with_anchors
def test_confluence_large_content_performance(self, confluence_preprocessor):
"""Test performance with large Confluence content (>1MB)."""
# Generate large content with various Confluence elements
large_content_parts = []
for i in range(50):
section = f"""<h2>Section {i}</h2>
<p>This is paragraph {i} with <strong>bold</strong> and <em>italic</em> text.</p>
<ac:structured-macro ac:name="info">
<ac:rich-text-body>
<p>Info box {i} with important information.</p>
</ac:rich-text-body>
</ac:structured-macro>
<ul>
<li>List item {i}.1</li>
<li>List item {i}.2 with <code>inline code</code></li>
<li>List item {i}.3</li>
</ul>
<ac:structured-macro ac:name="code">
<ac:parameter ac:name="language">python</ac:parameter>
<ac:plain-text-body><![CDATA[
def function_{i}():
# Large function with many lines
data = []
for j in range(1000):
data.append({{
"id": j,
"value": j * {i},
"description": "Item " + str(j)
}})
result = sum(item["value"] for item in data)
return result
]]></ac:plain-text-body>
</ac:structured-macro>
<table>
<thead>
<tr>
<th>Header A</th>
<th>Header B</th>
<th>Header C</th>
</tr>
</thead>
<tbody>
<tr>
<td>Row {i} Cell 1</td>
<td>Row {i} Cell 2</td>
<td>Row {i} Cell 3</td>
</tr>
</tbody>
</table>
<ac:link>
<ri:user ri:account-id="user{i}"/>
</ac:link> completed this section.
"""
large_content_parts.append(section)
large_content = "\n".join(large_content_parts)
content_size = len(large_content.encode("utf-8"))
# Ensure content is reasonably large (adjust threshold for test)
assert content_size > 50000 # 50KB is enough for performance testing
# Test processing performance
start_time = time.time()
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(large_content)
)
processing_time = time.time() - start_time
# Performance assertion
assert processing_time < 15.0 # Should complete within 15 seconds
# Verify content integrity
assert "Section 0" in processed_markdown
assert "Section 49" in processed_markdown
assert (
"function" in processed_markdown
) # Function names might have escaped underscores
assert "@User user10" in processed_markdown
def test_confluence_nested_structures(self, confluence_preprocessor):
"""Test handling of deeply nested structures."""
nested_html = """<div>
<h1>Top Level</h1>
<div>
<h2>Level 2</h2>
<div>
<h3>Level 3</h3>
<ul>
<li>Item 1
<ul>
<li>Nested 1.1
<ul>
<li>Deep nested 1.1.1</li>
<li>Deep nested 1.1.2</li>
</ul>
</li>
<li>Nested 1.2</li>
</ul>
</li>
<li>Item 2</li>
</ul>
<blockquote>
<p>Quote level 1</p>
<blockquote>
<p>Quote level 2</p>
<blockquote>
<p>Quote level 3</p>
</blockquote>
</blockquote>
</blockquote>
<table>
<tr>
<td>
<table>
<tr>
<td>Nested table cell</td>
</tr>
</table>
</td>
</tr>
</table>
</div>
</div>
</div>"""
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(nested_html)
)
# Verify nested structures are preserved
assert "Top Level" in processed_markdown
assert "Level 2" in processed_markdown
assert "Level 3" in processed_markdown
assert "Deep nested 1.1.1" in processed_markdown
assert "Quote level 1" in processed_markdown
assert "Quote level 2" in processed_markdown
assert "Quote level 3" in processed_markdown
assert "Nested table cell" in processed_markdown
def test_confluence_edge_cases(self, confluence_preprocessor):
"""Test edge cases in Confluence content processing."""
# Empty content
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content("")
)
assert processed_html == ""
assert processed_markdown == ""
# Malformed HTML
malformed_html = "<p>Unclosed paragraph <strong>bold text</p>"
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(malformed_html)
)
assert "Unclosed paragraph" in processed_markdown
assert "bold text" in processed_markdown
# HTML with CDATA sections
cdata_html = """<div>
<![CDATA[This is raw CDATA content with <tags>]]>
</div>"""
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(cdata_html)
)
assert "This is raw CDATA content" in processed_markdown
# Very long single line
long_line_html = f"<p>{'x' * 10000}</p>"
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(long_line_html)
)
assert len(processed_markdown) >= 10000
def test_confluence_special_html_entities(self, confluence_preprocessor):
"""Test handling of HTML entities and special characters."""
html_with_entities = """<p>HTML entities: < > & " '</p>
<p>Named entities: © ® ™ €</p>
<p>Numeric entities: A B C € 😀</p>
<p>Mixed: <tag> && "quoted"</p>"""
processed_html, processed_markdown = (
confluence_preprocessor.process_html_content(html_with_entities)
)
# Verify entities are properly decoded
assert "<" in processed_markdown
assert ">" in processed_markdown
assert "&" in processed_markdown
assert '"' in processed_markdown
assert "©" in processed_markdown
assert "®" in processed_markdown
assert "€" in processed_markdown
assert "😀" in processed_markdown # Emoji from numeric entity
@pytest.mark.integration
class TestContentProcessingInteroperability:
"""Test interoperability between Jira and Confluence content processing."""
def test_cross_platform_content_sharing(
self, jira_preprocessor, confluence_preprocessor
):
"""Test content that might be shared between Jira and Confluence."""
shared_markdown = """# Shared Documentation
## Overview
This content might be used in both Jira and Confluence.
### Key Features
- **Feature 1**: Description with *emphasis*
- **Feature 2**: Contains `code examples`
### Code Sample
```python
def shared_function():
return "Works in both platforms"
```
### Links
[Project Documentation](https://example.com/docs)
[PROJ-123](https://example.atlassian.net/browse/PROJ-123)
### Table
| Platform | Support |
|----------|---------|
| Jira | ✅ |
| Confluence | ✅ |"""
# Convert to Jira format
jira_markup = jira_preprocessor.markdown_to_jira(shared_markdown)
# Convert to Confluence format
confluence_storage = confluence_preprocessor.markdown_to_confluence_storage(
shared_markdown
)
# Verify both conversions preserve key content
assert "Shared Documentation" in jira_markup
assert "Shared Documentation" in confluence_storage
assert "Feature 1" in jira_markup
assert "Feature 1" in confluence_storage
assert "shared_function" in jira_markup
assert "shared_function" in confluence_storage
def test_unicode_consistency(self, jira_preprocessor, confluence_preprocessor):
"""Test Unicode handling consistency across processors."""
unicode_content = """Unicode Test 🌍
Symbols: ™ © ® € £ ¥
Math: ∑ ∏ ∫ ∞ ≈ ≠ ≤ ≥
Greek: Α Β Γ Δ Ε Ζ Η Θ
Arrows: → ← ↑ ↓ ↔ ⇒ ⇐ ⇔
Box Drawing: ┌─┬─┐ │ ├─┼─┤ └─┴─┘
Emojis: 😀 😎 🚀 💻 ✅ ❌ ⚡ 🔥"""
# Process through Jira
jira_result = jira_preprocessor.clean_jira_text(unicode_content)
# Process through Confluence
processed_html, confluence_result = (
confluence_preprocessor.process_html_content(f"<p>{unicode_content}</p>")
)
# Verify Unicode is preserved in both
for char in ["🌍", "™", "∑", "Α", "→", "┌", "😀", "🚀"]:
assert char in jira_result
assert char in confluence_result
def test_error_recovery(self, jira_preprocessor, confluence_preprocessor):
"""Test error recovery in content processing."""
# Test with None input
assert jira_preprocessor.clean_jira_text(None) == ""
# Test with invalid input types (should raise exceptions)
with pytest.raises(Exception):
confluence_preprocessor.process_html_content(None)
# Test with extremely malformed content
malformed_content = "<<<>>>&&&'''\"\"\"{{{{}}}}[[[[]]]]"
# Jira should handle this
jira_result = jira_preprocessor.clean_jira_text(malformed_content)
assert len(jira_result) > 0
# Confluence should handle this
processed_html, confluence_result = (
confluence_preprocessor.process_html_content(malformed_content)
)
assert len(confluence_result) > 0
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_projects.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira ProjectsMixin."""
from typing import Any
from unittest.mock import MagicMock, call, patch
import pytest
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.config import JiraConfig
from mcp_atlassian.jira.projects import ProjectsMixin
from mcp_atlassian.models.jira.issue import JiraIssue
from mcp_atlassian.models.jira.search import JiraSearchResult
@pytest.fixture
def mock_config():
"""Fixture to create a mock JiraConfig instance."""
config = MagicMock(spec=JiraConfig)
config.url = "https://test.atlassian.net"
config.username = "[email protected]"
config.api_token = "test-token"
config.auth_type = "pat"
return config
@pytest.fixture
def projects_mixin(jira_fetcher: JiraFetcher) -> ProjectsMixin:
"""Fixture to create a ProjectsMixin instance for testing."""
mixin = jira_fetcher
return mixin
@pytest.fixture
def mock_projects():
"""Fixture to return mock project data."""
return [
{
"id": "10000",
"key": "PROJ1",
"name": "Project One",
"lead": {"name": "user1", "displayName": "User One"},
},
{
"id": "10001",
"key": "PROJ2",
"name": "Project Two",
"lead": {"name": "user2", "displayName": "User Two"},
},
]
@pytest.fixture
def mock_components():
"""Fixture to return mock project components."""
return [
{"id": "10000", "name": "Component One"},
{"id": "10001", "name": "Component Two"},
]
@pytest.fixture
def mock_versions():
"""Fixture to return mock project versions."""
return [
{"id": "10000", "name": "1.0", "released": True},
{"id": "10001", "name": "2.0", "released": False},
]
@pytest.fixture
def mock_roles():
"""Fixture to return mock project roles."""
return {
"Administrator": {"id": "10000", "name": "Administrator"},
"Developer": {"id": "10001", "name": "Developer"},
}
@pytest.fixture
def mock_role_members():
"""Fixture to return mock project role members."""
return {
"actors": [
{"id": "user1", "name": "user1", "displayName": "User One"},
{"id": "user2", "name": "user2", "displayName": "User Two"},
]
}
@pytest.fixture
def mock_issue_types():
"""Fixture to return mock issue types."""
return [
{"id": "10000", "name": "Bug", "description": "A bug"},
{"id": "10001", "name": "Task", "description": "A task"},
]
def test_get_all_projects(projects_mixin: ProjectsMixin, mock_projects: list[dict]):
"""Test get_all_projects method."""
projects_mixin.jira.projects.return_value = mock_projects
# Test with default value (include_archived=False)
result = projects_mixin.get_all_projects()
assert result == mock_projects
projects_mixin.jira.projects.assert_called_once_with(included_archived=False)
# Reset mock and test with include_archived=True
projects_mixin.jira.projects.reset_mock()
projects_mixin.jira.projects.return_value = mock_projects
result = projects_mixin.get_all_projects(include_archived=True)
assert result == mock_projects
projects_mixin.jira.projects.assert_called_once_with(included_archived=True)
def test_get_all_projects_exception(projects_mixin: ProjectsMixin):
"""Test get_all_projects method with exception."""
projects_mixin.jira.projects.side_effect = Exception("API error")
result = projects_mixin.get_all_projects()
assert result == []
projects_mixin.jira.projects.assert_called_once()
def test_get_all_projects_non_list_response(projects_mixin: ProjectsMixin):
"""Test get_all_projects method with non-list response."""
projects_mixin.jira.projects.return_value = "not a list"
result = projects_mixin.get_all_projects()
assert result == []
projects_mixin.jira.projects.assert_called_once()
def test_get_project(projects_mixin: ProjectsMixin, mock_projects: list[dict]):
"""Test get_project method."""
project = mock_projects[0]
projects_mixin.jira.project.return_value = project
result = projects_mixin.get_project("PROJ1")
assert result == project
projects_mixin.jira.project.assert_called_once_with("PROJ1")
def test_get_project_exception(projects_mixin: ProjectsMixin):
"""Test get_project method with exception."""
projects_mixin.jira.project.side_effect = Exception("API error")
result = projects_mixin.get_project("PROJ1")
assert result is None
projects_mixin.jira.project.assert_called_once()
def test_get_project_issues(projects_mixin: ProjectsMixin):
"""Test get_project_issues method."""
# Setup mock response
# Mock the method that is actually called: search_issues
# It should return a JiraSearchResult object
mock_search_result = JiraSearchResult(
issues=[], total=0, start_at=0, max_results=50
)
projects_mixin.search_issues = MagicMock(return_value=mock_search_result)
# Call the method
result = projects_mixin.get_project_issues("TEST")
# Verify search_issues was called, not jira.jql
projects_mixin.search_issues.assert_called_once_with(
'project = "TEST"',
start=0,
limit=50,
)
assert isinstance(result, JiraSearchResult)
assert len(result.issues) == 0
def test_get_project_issues_with_start(projects_mixin: ProjectsMixin) -> None:
"""Test getting project issues with a start index."""
# Mock search_issues to return a result reflecting pagination
mock_issue = JiraIssue(key="PROJ-2", summary="Issue 2", id="10002")
mock_search_result = JiraSearchResult(
issues=[mock_issue],
total=1,
start_at=3,
max_results=5,
)
projects_mixin.search_issues = MagicMock(return_value=mock_search_result)
project_key = "PROJ"
start_index = 3
# Call the method
result = projects_mixin.get_project_issues(project_key, start=start_index, limit=5)
assert len(result.issues) == 1
# Note: Assertions on start_at, max_results, total should be based on the
# JiraSearchResult object returned by the mocked search_issues
assert result.start_at == 3 # Comes from the mocked JiraSearchResult
assert result.max_results == 5 # Comes from the mocked JiraSearchResult
assert result.total == 1 # Comes from the mocked JiraSearchResult
# Verify search_issues was called with the correct arguments
projects_mixin.search_issues.assert_called_once_with(
f'project = "{project_key}"',
start=start_index,
limit=5,
)
def test_project_exists(projects_mixin: ProjectsMixin, mock_projects: list[dict]):
"""Test project_exists method."""
# Test with existing project
project = mock_projects[0]
projects_mixin.jira.project.return_value = project
result = projects_mixin.project_exists("PROJ1")
assert result is True
projects_mixin.jira.project.assert_called_once_with("PROJ1")
# Test with non-existing project
projects_mixin.jira.project.reset_mock()
projects_mixin.jira.project.return_value = None
result = projects_mixin.project_exists("NONEXISTENT")
assert result is False
projects_mixin.jira.project.assert_called_once()
def test_project_exists_exception(projects_mixin: ProjectsMixin):
"""Test project_exists method with exception."""
projects_mixin.jira.project.side_effect = Exception("API error")
result = projects_mixin.project_exists("PROJ1")
assert result is False
projects_mixin.jira.project.assert_called_once()
def test_get_project_components(
projects_mixin: ProjectsMixin, mock_components: list[dict]
):
"""Test get_project_components method."""
projects_mixin.jira.get_project_components.return_value = mock_components
result = projects_mixin.get_project_components("PROJ1")
assert result == mock_components
projects_mixin.jira.get_project_components.assert_called_once_with(key="PROJ1")
def test_get_project_components_exception(projects_mixin: ProjectsMixin):
"""Test get_project_components method with exception."""
projects_mixin.jira.get_project_components.side_effect = Exception("API error")
result = projects_mixin.get_project_components("PROJ1")
assert result == []
projects_mixin.jira.get_project_components.assert_called_once()
def test_get_project_components_non_list_response(projects_mixin: ProjectsMixin):
"""Test get_project_components method with non-list response."""
projects_mixin.jira.get_project_components.return_value = "not a list"
result = projects_mixin.get_project_components("PROJ1")
assert result == []
projects_mixin.jira.get_project_components.assert_called_once()
def test_get_project_versions(projects_mixin: ProjectsMixin, mock_versions: list[dict]):
"""Test get_project_versions method."""
projects_mixin.jira.get_project_versions.return_value = mock_versions
# Simplified dicts should include id, name, released and archived
expected = [
{
"id": v["id"],
"name": v["name"],
"released": v.get("released", False),
"archived": v.get("archived", False),
}
for v in mock_versions
]
result = projects_mixin.get_project_versions("PROJ1")
assert result == expected
projects_mixin.jira.get_project_versions.assert_called_once_with(key="PROJ1")
def test_get_project_versions_exception(projects_mixin: ProjectsMixin):
"""Test get_project_versions method with exception."""
projects_mixin.jira.get_project_versions.side_effect = Exception("API error")
result = projects_mixin.get_project_versions("PROJ1")
assert result == []
projects_mixin.jira.get_project_versions.assert_called_once_with(key="PROJ1")
def test_get_project_versions_non_list_response(projects_mixin: ProjectsMixin):
"""Test get_project_versions method with non-list response."""
projects_mixin.jira.get_project_versions.return_value = "not a list"
result = projects_mixin.get_project_versions("PROJ1")
assert result == []
projects_mixin.jira.get_project_versions.assert_called_once_with(key="PROJ1")
def test_get_project_roles(
projects_mixin: ProjectsMixin, mock_roles: dict[str, dict[str, str]]
):
"""Test get_project_roles method."""
projects_mixin.jira.get_project_roles.return_value = mock_roles
result = projects_mixin.get_project_roles("PROJ1")
assert result == mock_roles
projects_mixin.jira.get_project_roles.assert_called_once_with(project_key="PROJ1")
def test_get_project_roles_exception(projects_mixin: ProjectsMixin):
"""Test get_project_roles method with exception."""
projects_mixin.jira.get_project_roles.side_effect = Exception("API error")
result = projects_mixin.get_project_roles("PROJ1")
assert result == {}
projects_mixin.jira.get_project_roles.assert_called_once()
def test_get_project_roles_non_dict_response(projects_mixin: ProjectsMixin):
"""Test get_project_roles method with non-dict response."""
projects_mixin.jira.get_project_roles.return_value = "not a dict"
result = projects_mixin.get_project_roles("PROJ1")
assert result == {}
projects_mixin.jira.get_project_roles.assert_called_once()
def test_get_project_role_members(
projects_mixin: ProjectsMixin, mock_role_members: dict[str, list[dict[str, str]]]
):
"""Test get_project_role_members method."""
projects_mixin.jira.get_project_actors_for_role_project.return_value = (
mock_role_members
)
result = projects_mixin.get_project_role_members("PROJ1", "10001")
assert result == mock_role_members["actors"]
projects_mixin.jira.get_project_actors_for_role_project.assert_called_once_with(
project_key="PROJ1", role_id="10001"
)
def test_get_project_role_members_exception(projects_mixin: ProjectsMixin):
"""Test get_project_role_members method with exception."""
projects_mixin.jira.get_project_actors_for_role_project.side_effect = Exception(
"API error"
)
result = projects_mixin.get_project_role_members("PROJ1", "10001")
assert result == []
projects_mixin.jira.get_project_actors_for_role_project.assert_called_once()
def test_get_project_role_members_invalid_response(projects_mixin: ProjectsMixin):
"""Test get_project_role_members method with invalid response."""
# Response without actors
projects_mixin.jira.get_project_actors_for_role_project.return_value = {}
result = projects_mixin.get_project_role_members("PROJ1", "10001")
assert result == []
projects_mixin.jira.get_project_actors_for_role_project.assert_called_once()
# Non-dict response
projects_mixin.jira.get_project_actors_for_role_project.reset_mock()
projects_mixin.jira.get_project_actors_for_role_project.return_value = "not a dict"
result = projects_mixin.get_project_role_members("PROJ1", "10001")
assert result == []
def test_get_project_permission_scheme(projects_mixin: ProjectsMixin):
"""Test get_project_permission_scheme method."""
scheme = {"id": "10000", "name": "Default Permission Scheme"}
projects_mixin.jira.get_project_permission_scheme.return_value = scheme
result = projects_mixin.get_project_permission_scheme("PROJ1")
assert result == scheme
projects_mixin.jira.get_project_permission_scheme.assert_called_once_with(
project_id_or_key="PROJ1"
)
def test_get_project_permission_scheme_exception(projects_mixin: ProjectsMixin):
"""Test get_project_permission_scheme method with exception."""
projects_mixin.jira.get_project_permission_scheme.side_effect = Exception(
"API error"
)
result = projects_mixin.get_project_permission_scheme("PROJ1")
assert result is None
projects_mixin.jira.get_project_permission_scheme.assert_called_once()
def test_get_project_notification_scheme(projects_mixin: ProjectsMixin):
"""Test get_project_notification_scheme method."""
scheme = {"id": "10000", "name": "Default Notification Scheme"}
projects_mixin.jira.get_project_notification_scheme.return_value = scheme
result = projects_mixin.get_project_notification_scheme("PROJ1")
assert result == scheme
projects_mixin.jira.get_project_notification_scheme.assert_called_once_with(
project_id_or_key="PROJ1"
)
def test_get_project_notification_scheme_exception(projects_mixin: ProjectsMixin):
"""Test get_project_notification_scheme method with exception."""
projects_mixin.jira.get_project_notification_scheme.side_effect = Exception(
"API error"
)
result = projects_mixin.get_project_notification_scheme("PROJ1")
assert result is None
projects_mixin.jira.get_project_notification_scheme.assert_called_once()
def test_get_project_issue_types(
projects_mixin: ProjectsMixin, mock_issue_types: list[dict]
):
"""Test get_project_issue_types method."""
createmeta = {
"projects": [
{"key": "PROJ1", "name": "Project One", "issuetypes": mock_issue_types}
]
}
projects_mixin.jira.issue_createmeta.return_value = createmeta
result = projects_mixin.get_project_issue_types("PROJ1")
assert result == mock_issue_types
projects_mixin.jira.issue_createmeta.assert_called_once_with(project="PROJ1")
def test_get_project_issue_types_empty_response(projects_mixin: ProjectsMixin):
"""Test get_project_issue_types method with empty response."""
# Empty projects list
projects_mixin.jira.issue_createmeta.return_value = {"projects": []}
result = projects_mixin.get_project_issue_types("PROJ1")
assert result == []
projects_mixin.jira.issue_createmeta.assert_called_once()
# No issuetypes field
projects_mixin.jira.issue_createmeta.reset_mock()
projects_mixin.jira.issue_createmeta.return_value = {
"projects": [{"key": "PROJ1", "name": "Project One"}]
}
result = projects_mixin.get_project_issue_types("PROJ1")
assert result == []
def test_get_project_issue_types_exception(projects_mixin: ProjectsMixin):
"""Test get_project_issue_types method with exception."""
projects_mixin.jira.issue_createmeta.side_effect = Exception("API error")
result = projects_mixin.get_project_issue_types("PROJ1")
assert result == []
projects_mixin.jira.issue_createmeta.assert_called_once()
def test_get_project_issues_count(projects_mixin: ProjectsMixin):
"""Test get_project_issues_count method."""
jql_result = {"total": 42}
projects_mixin.jira.jql.return_value = jql_result
result = projects_mixin.get_project_issues_count("PROJ1")
assert result == 42
projects_mixin.jira.jql.assert_called_once_with(
jql='project = "PROJ1"', fields="key", limit=1
)
def test_get_project_issues_count__project_with_reserved_keyword(
projects_mixin: ProjectsMixin,
):
"""Test get_project_issues_count method."""
jql_result = {"total": 42}
projects_mixin.jira.jql.return_value = jql_result
result = projects_mixin.get_project_issues_count("AND")
assert result == 42
projects_mixin.jira.jql.assert_called_once_with(
jql='project = "AND"', fields="key", limit=1
)
def test_get_project_issues_count_invalid_response(projects_mixin: ProjectsMixin):
"""Test get_project_issues_count method with invalid response."""
# No total field
projects_mixin.jira.jql.return_value = {}
result = projects_mixin.get_project_issues_count("PROJ1")
assert result == 0
projects_mixin.jira.jql.assert_called_once()
# Non-dict response
projects_mixin.jira.jql.reset_mock()
projects_mixin.jira.jql.return_value = "not a dict"
result = projects_mixin.get_project_issues_count("PROJ1")
assert result == 0
projects_mixin.jira.jql.assert_called_once()
def test_get_project_issues_count_exception(projects_mixin: ProjectsMixin):
"""Test get_project_issues_count method with exception."""
projects_mixin.jira.jql.side_effect = Exception("API error")
result = projects_mixin.get_project_issues_count("PROJ1")
assert result == 0
projects_mixin.jira.jql.assert_called_once()
def test_get_project_issues_with_search_mixin(projects_mixin: ProjectsMixin):
"""Test get_project_issues method with search_issues available."""
# Mock the search_issues method
mock_search_result = [MagicMock(), MagicMock()]
projects_mixin.search_issues = MagicMock(return_value=mock_search_result)
result = projects_mixin.get_project_issues("PROJ1", start=10, limit=20)
assert result == mock_search_result
projects_mixin.search_issues.assert_called_once_with(
'project = "PROJ1"', start=10, limit=20
)
projects_mixin.jira.jql.assert_not_called()
def test_get_project_issues_invalid_response(projects_mixin: ProjectsMixin):
"""Test get_project_issues method with invalid response."""
# Mock search_issues to simulate an empty result scenario
mock_search_result = JiraSearchResult(
issues=[], total=0, start_at=0, max_results=50
)
projects_mixin.search_issues = MagicMock(return_value=mock_search_result)
result = projects_mixin.get_project_issues("PROJ1")
assert result.issues == []
projects_mixin.search_issues.assert_called_once()
# Reset mock and test with non-JiraSearchResult response (this would be handled by the except block)
projects_mixin.search_issues.reset_mock()
projects_mixin.search_issues = MagicMock(
side_effect=TypeError("Not a JiraSearchResult")
)
result = projects_mixin.get_project_issues("PROJ1")
assert result.issues == []
projects_mixin.search_issues.assert_called_once()
def test_get_project_issues_exception(projects_mixin: ProjectsMixin):
"""Test get_project_issues method with exception."""
# Mock search_issues to raise an exception, simulating an API error during the search
projects_mixin.search_issues = MagicMock(side_effect=Exception("API error"))
result = projects_mixin.get_project_issues("PROJ1")
assert result.issues == []
# Verify that search_issues was called, even though it raised an exception
# The except block in get_project_issues catches it and returns an empty result
projects_mixin.search_issues.assert_called_once()
def test_get_project_keys(
projects_mixin: ProjectsMixin, mock_projects: list[dict[str, Any]]
):
"""Test get_project_keys method."""
# Mock the get_all_projects method
with patch.object(projects_mixin, "get_all_projects", return_value=mock_projects):
result = projects_mixin.get_project_keys()
assert result == ["PROJ1", "PROJ2"]
projects_mixin.get_all_projects.assert_called_once()
def test_get_project_keys_exception(projects_mixin: ProjectsMixin):
"""Test get_project_keys method with exception."""
# Mock the get_all_projects method to raise an exception
with patch.object(
projects_mixin, "get_all_projects", side_effect=Exception("Error")
):
result = projects_mixin.get_project_keys()
assert result == []
projects_mixin.get_all_projects.assert_called_once()
def test_get_project_leads(
projects_mixin: ProjectsMixin, mock_projects: list[dict[str, Any]]
):
"""Test get_project_leads method."""
# Mock the get_all_projects method
with patch.object(projects_mixin, "get_all_projects", return_value=mock_projects):
result = projects_mixin.get_project_leads()
assert result == {"PROJ1": "user1", "PROJ2": "user2"}
projects_mixin.get_all_projects.assert_called_once()
def test_get_project_leads_with_different_lead_formats(
projects_mixin: ProjectsMixin, mock_projects: list[dict[str, Any]]
):
"""Test get_project_leads method with different lead formats."""
mixed_projects = [
# Project with lead as dictionary with name
{"key": "PROJ1", "lead": {"name": "user1", "displayName": "User One"}},
# Project with lead as dictionary with displayName but no name
{"key": "PROJ2", "lead": {"displayName": "User Two"}},
# Project with lead as string
{"key": "PROJ3", "lead": "user3"},
# Project without lead
{"key": "PROJ4"},
]
# Mock the get_all_projects method
with patch.object(projects_mixin, "get_all_projects", return_value=mixed_projects):
result = projects_mixin.get_project_leads()
assert result == {"PROJ1": "user1", "PROJ2": "User Two", "PROJ3": "user3"}
projects_mixin.get_all_projects.assert_called_once()
def test_get_project_leads_exception(projects_mixin: ProjectsMixin):
"""Test get_project_leads method with exception."""
# Mock the get_all_projects method to raise an exception
with patch.object(
projects_mixin, "get_all_projects", side_effect=Exception("Error")
):
result = projects_mixin.get_project_leads()
assert result == {}
projects_mixin.get_all_projects.assert_called_once()
def test_get_user_accessible_projects(
projects_mixin: ProjectsMixin, mock_projects: list[dict[str, Any]]
):
"""Test get_user_accessible_projects method."""
# Mock the get_all_projects method
with patch.object(projects_mixin, "get_all_projects", return_value=mock_projects):
# Set up the browse permission responses
browse_users_responses = [
[{"name": "test_user"}], # User has access to PROJ1
[], # User doesn't have access to PROJ2
]
projects_mixin.jira.get_users_with_browse_permission_to_a_project.side_effect = browse_users_responses
result = projects_mixin.get_user_accessible_projects("test_user")
# Only PROJ1 should be in the result
assert len(result) == 1
assert result[0]["key"] == "PROJ1"
# Check that get_users_with_browse_permission_to_a_project was called for both projects
assert (
projects_mixin.jira.get_users_with_browse_permission_to_a_project.call_count
== 2
)
projects_mixin.jira.get_users_with_browse_permission_to_a_project.assert_has_calls(
[
call(username="test_user", project_key="PROJ1", limit=1),
call(username="test_user", project_key="PROJ2", limit=1),
]
)
def test_get_user_accessible_projects_with_permissions_exception(
projects_mixin: ProjectsMixin, mock_projects: list[dict[str, Any]]
):
"""Test get_user_accessible_projects method with exception in permissions check."""
# Mock the get_all_projects method
with patch.object(projects_mixin, "get_all_projects", return_value=mock_projects):
# First call succeeds, second call raises exception
projects_mixin.jira.get_users_with_browse_permission_to_a_project.side_effect = [
[{"name": "test_user"}], # User has access to PROJ1
Exception("Permission error"), # Error checking PROJ2
]
result = projects_mixin.get_user_accessible_projects("test_user")
# Only PROJ1 should be in the result (PROJ2 was skipped due to error)
assert len(result) == 1
assert result[0]["key"] == "PROJ1"
def test_get_user_accessible_projects_exception(projects_mixin: ProjectsMixin):
"""Test get_user_accessible_projects method with main exception."""
# Mock the get_all_projects method to raise an exception
with patch.object(
projects_mixin, "get_all_projects", side_effect=Exception("Error")
):
result = projects_mixin.get_user_accessible_projects("test_user")
assert result == []
projects_mixin.get_all_projects.assert_called_once()
projects_mixin.jira.get_users_with_browse_permission_to_a_project.assert_not_called()
def test_create_project_version_minimal(projects_mixin: ProjectsMixin) -> None:
"""Test create_project_version with only required fields."""
mock_response = {"id": "201", "name": "v4.0"}
with patch.object(
projects_mixin, "create_version", return_value=mock_response
) as mock_create_version:
result = projects_mixin.create_project_version(project_key="PROJ2", name="v4.0")
assert result == mock_response
mock_create_version.assert_called_once_with(
project="PROJ2",
name="v4.0",
start_date=None,
release_date=None,
description=None,
)
def test_create_project_version_all_fields(projects_mixin: ProjectsMixin) -> None:
"""Test create_project_version with all fields."""
mock_response = {
"id": "202",
"name": "v5.0",
"description": "Release 5.0",
"startDate": "2025-08-01",
"releaseDate": "2025-08-15",
}
with patch.object(
projects_mixin, "create_version", return_value=mock_response
) as mock_create_version:
result = projects_mixin.create_project_version(
project_key="PROJ3",
name="v5.0",
start_date="2025-08-01",
release_date="2025-08-15",
description="Release 5.0",
)
assert result == mock_response
mock_create_version.assert_called_once_with(
project="PROJ3",
name="v5.0",
start_date="2025-08-01",
release_date="2025-08-15",
description="Release 5.0",
)
def test_create_project_version_error(projects_mixin: ProjectsMixin) -> None:
"""Test create_project_version propagates errors."""
with patch.object(
projects_mixin, "create_version", side_effect=Exception("API failure")
):
with pytest.raises(Exception):
projects_mixin.create_project_version("PROJ4", "v6.0")
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_search.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira Search mixin."""
from unittest.mock import ANY, MagicMock
import pytest
import requests
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.search import SearchMixin
from mcp_atlassian.models.jira import JiraIssue, JiraSearchResult
class TestSearchMixin:
"""Tests for the SearchMixin class."""
@pytest.fixture
def search_mixin(self, jira_fetcher: JiraFetcher) -> SearchMixin:
"""Create a SearchMixin instance with mocked dependencies."""
mixin = jira_fetcher
# Mock methods that are typically provided by other mixins
mixin._clean_text = MagicMock(side_effect=lambda text: text if text else "")
# Set config with is_cloud=False by default (Server/DC)
mixin.config = MagicMock()
mixin.config.is_cloud = False
mixin.config.projects_filter = None
mixin.config.url = "https://example.atlassian.net"
return mixin
@pytest.fixture
def mock_issues_response(self) -> dict:
"""Create a mock Jira issues response for testing."""
return {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"description": "Test description",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
@pytest.mark.parametrize(
"is_cloud, expected_method_name",
[
(True, "enhanced_jql_get_list_of_tickets"), # Cloud scenario
(False, "jql"), # Server/DC scenario
],
)
def test_search_issues_calls_correct_method(
self,
search_mixin: SearchMixin,
mock_issues_response,
is_cloud,
expected_method_name,
):
"""Test that the correct Jira API method is called based on Cloud/Server setting."""
# Setup: Mock config.is_cloud
search_mixin.config.is_cloud = is_cloud
search_mixin.config.projects_filter = None # No filter for this test
search_mixin.config.url = (
"https://test.example.com" # Model creation needs this
)
# Setup: Mock response for both API methods
search_mixin.jira.enhanced_jql_get_list_of_tickets = MagicMock(
return_value=mock_issues_response["issues"]
)
search_mixin.jira.jql = MagicMock(return_value=mock_issues_response)
# Determine other method name for assertion
other_method_name = (
"jql"
if expected_method_name == "enhanced_jql_get_list_of_tickets"
else "enhanced_jql_get_list_of_tickets"
)
# Act
jql_query = "project = TEST"
result = search_mixin.search_issues(jql_query, limit=10, start=0)
# Assert: Basic result verification
assert isinstance(result, JiraSearchResult)
assert len(result.issues) > 0 # Based on mocked response
# Assert: Correct method call verification
expected_method_mock = getattr(search_mixin.jira, expected_method_name)
# Define expected kwargs based on whether it's Cloud or Server
expected_kwargs = {
"limit": 10,
"expand": None,
}
# Add start param only for Server/DC
if not is_cloud:
expected_kwargs["start"] = 0
expected_method_mock.assert_called_once_with(
jql_query, fields=ANY, **expected_kwargs
)
# Assert: Other method was not called
other_method_mock = getattr(search_mixin.jira, other_method_name)
other_method_mock.assert_not_called()
def test_search_issues_basic(self, search_mixin: SearchMixin):
"""Test basic search functionality."""
# Setup mock response
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"description": "Issue description",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
"priority": {"name": "High"},
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.jql.return_value = mock_issues
# Call the method
result = search_mixin.search_issues("project = TEST")
# Verify
search_mixin.jira.jql.assert_called_once_with(
"project = TEST",
fields=ANY,
start=0,
limit=50,
expand=None,
)
# Verify results
assert isinstance(result, JiraSearchResult)
assert len(result.issues) == 1
assert all(isinstance(issue, JiraIssue) for issue in result.issues)
assert result.total == 1
assert result.start_at == 0
assert result.max_results == 50
# Check the first issue
issue = result.issues[0]
assert issue.key == "TEST-123"
assert issue.summary == "Test issue"
assert issue.description == "Issue description"
assert issue.status is not None
assert issue.status.name == "Open"
assert issue.issue_type is not None
assert issue.issue_type.name == "Bug"
assert issue.priority is not None
assert issue.priority.name == "High"
# Remove backward compatibility checks
assert "Issue description" in issue.description
assert issue.key == "TEST-123"
def test_search_issues_with_empty_description(self, search_mixin: SearchMixin):
"""Test search with issues that have no description."""
# Setup mock response
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"description": None,
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.jql.return_value = mock_issues
# Call the method
result = search_mixin.search_issues("project = TEST")
# Verify results
assert len(result.issues) == 1
assert isinstance(result.issues[0], JiraIssue)
assert result.issues[0].key == "TEST-123"
assert result.issues[0].description is None
assert result.issues[0].summary == "Test issue"
# Update to use direct properties instead of backward compatibility
assert "Test issue" in result.issues[0].summary
def test_search_issues_with_missing_fields(self, search_mixin: SearchMixin):
"""Test search with issues missing some fields."""
# Setup mock response
mock_issues = {
"issues": [
{
"key": "TEST-123",
"fields": {
"summary": "Test issue",
# Missing issuetype, status, etc.
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.jql.return_value = mock_issues
# Call the method
result = search_mixin.search_issues("project = TEST")
# Verify results
assert len(result.issues) == 1
assert isinstance(result.issues[0], JiraIssue)
assert result.issues[0].key == "TEST-123"
assert result.issues[0].summary == "Test issue"
assert result.issues[0].status is None
assert result.issues[0].issue_type is None
def test_search_issues_with_empty_results(self, search_mixin: SearchMixin):
"""Test search with no results."""
# Setup mock response
search_mixin.jira.jql.return_value = {"issues": []}
# Call the method
result = search_mixin.search_issues("project = NONEXISTENT")
# Verify results
assert isinstance(result, JiraSearchResult)
assert len(result.issues) == 0
assert result.total == -1
def test_search_issues_with_error(self, search_mixin: SearchMixin):
"""Test search with API error."""
# Setup mock to raise exception
search_mixin.jira.jql.side_effect = Exception("API Error")
# Call the method and verify it raises the expected exception
with pytest.raises(Exception, match="Error searching issues"):
search_mixin.search_issues("project = TEST")
def test_search_issues_with_projects_filter(self, search_mixin: SearchMixin):
"""Test search with projects filter."""
# Setup mock response
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.jql.return_value = mock_issues
search_mixin.config.url = "https://example.atlassian.net"
# Test with single project filter
result = search_mixin.search_issues("text ~ 'test'", projects_filter="TEST")
search_mixin.jira.jql.assert_called_with(
"(text ~ 'test') AND project = \"TEST\"",
fields=ANY,
start=0,
limit=50,
expand=None,
)
assert len(result.issues) == 1
assert result.total == 1
# Test with multiple project filter
result = search_mixin.search_issues("text ~ 'test'", projects_filter="TEST,DEV")
search_mixin.jira.jql.assert_called_with(
'(text ~ \'test\') AND project IN ("TEST", "DEV")',
fields=ANY,
start=0,
limit=50,
expand=None,
)
assert len(result.issues) == 1
assert result.total == 1
def test_search_issues_with_config_projects_filter(self, search_mixin: SearchMixin):
"""Test search with projects filter from config."""
# Setup mock response
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.jql.return_value = mock_issues
search_mixin.config.url = "https://example.atlassian.net"
search_mixin.config.projects_filter = "TEST,DEV"
# Test with config filter
result = search_mixin.search_issues("text ~ 'test'")
search_mixin.jira.jql.assert_called_with(
'(text ~ \'test\') AND project IN ("TEST", "DEV")',
fields=ANY,
start=0,
limit=50,
expand=None,
)
assert len(result.issues) == 1
assert result.total == 1
# Test with override
result = search_mixin.search_issues("text ~ 'test'", projects_filter="OVERRIDE")
search_mixin.jira.jql.assert_called_with(
"(text ~ 'test') AND project = \"OVERRIDE\"",
fields=ANY,
start=0,
limit=50,
expand=None,
)
assert len(result.issues) == 1
assert result.total == 1
# Test with override - multiple projects
result = search_mixin.search_issues(
"text ~ 'test'", projects_filter="OVER1,OVER2"
)
search_mixin.jira.jql.assert_called_with(
'(text ~ \'test\') AND project IN ("OVER1", "OVER2")',
fields=ANY,
start=0,
limit=50,
expand=None,
)
assert len(result.issues) == 1
assert result.total == 1
def test_search_issues_with_fields_parameter(self, search_mixin: SearchMixin):
"""Test search with specific fields parameter, including custom fields."""
# Setup mock response with a custom field
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue with custom field",
"assignee": {
"displayName": "Test User",
"emailAddress": "[email protected]",
"active": True,
},
"customfield_10049": "Custom value",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"description": "Issue description",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
"priority": {"name": "High"},
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.jql.return_value = mock_issues
search_mixin.config.url = "https://example.atlassian.net"
# Call the method with specific fields
result = search_mixin.search_issues(
"project = TEST", fields="summary,assignee,customfield_10049"
)
# Verify the JQL call includes the fields parameter
search_mixin.jira.jql.assert_called_once_with(
"project = TEST",
fields="summary,assignee,customfield_10049",
start=0,
limit=50,
expand=None,
)
# Verify results
assert isinstance(result, JiraSearchResult)
assert len(result.issues) == 1
issue = result.issues[0]
# Convert to simplified dict to check field filtering
simplified = issue.to_simplified_dict()
# These fields should be included (plus id and key which are always included)
assert "id" in simplified
assert "key" in simplified
assert "summary" in simplified
assert "assignee" in simplified
assert "customfield_10049" in simplified
assert simplified["customfield_10049"] == {"value": "Custom value"}
assert "assignee" in simplified
assert simplified["assignee"]["display_name"] == "Test User"
def test_get_board_issues(self, search_mixin: SearchMixin):
"""Test get_board_issues method."""
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"description": "Issue description",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
"priority": {"name": "High"},
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.get_issues_for_board.return_value = mock_issues
# Call the method
result = search_mixin.get_board_issues("1000", jql="", limit=20)
# Verify results
assert isinstance(result, JiraSearchResult)
assert len(result.issues) == 1
assert all(isinstance(issue, JiraIssue) for issue in result.issues)
assert result.total == 1
assert result.start_at == 0
assert result.max_results == 50
# Check the first issue
issue = result.issues[0]
assert issue.key == "TEST-123"
assert issue.summary == "Test issue"
assert issue.description == "Issue description"
assert issue.status is not None
assert issue.status.name == "Open"
assert issue.issue_type is not None
assert issue.issue_type.name == "Bug"
assert issue.priority is not None
assert issue.priority.name == "High"
# Remove backward compatibility checks
assert "Issue description" in issue.description
assert issue.key == "TEST-123"
def test_get_board_issues_exception(self, search_mixin: SearchMixin):
search_mixin.jira.get_issues_for_board.side_effect = Exception("API Error")
with pytest.raises(Exception) as e:
search_mixin.get_board_issues("1000", jql="", limit=20)
assert "API Error" in str(e.value)
def test_get_board_issues_http_error(self, search_mixin: SearchMixin):
search_mixin.jira.get_issues_for_board.side_effect = requests.HTTPError(
response=MagicMock(content="API Error content")
)
with pytest.raises(Exception) as e:
search_mixin.get_board_issues("1000", jql="", limit=20)
assert "API Error content" in str(e.value)
def test_get_sprint_issues(self, search_mixin: SearchMixin):
"""Test get_sprint_issues method."""
mock_issues = {
"issues": [
{
"id": "10001",
"key": "TEST-123",
"fields": {
"summary": "Test issue",
"issuetype": {"name": "Bug"},
"status": {"name": "Open"},
"description": "Issue description",
"created": "2024-01-01T10:00:00.000+0000",
"updated": "2024-01-01T11:00:00.000+0000",
"priority": {"name": "High"},
},
}
],
"total": 1,
"startAt": 0,
"maxResults": 50,
}
search_mixin.jira.get_sprint_issues.return_value = mock_issues
# Call the method
result = search_mixin.get_sprint_issues("10001")
# Verify results
assert isinstance(result, JiraSearchResult)
assert len(result.issues) == 1
assert all(isinstance(issue, JiraIssue) for issue in result.issues)
assert result.total == 1
assert result.start_at == 0
assert result.max_results == 50
# Check the first issue
issue = result.issues[0]
assert issue.key == "TEST-123"
assert issue.summary == "Test issue"
assert issue.description == "Issue description"
assert issue.status is not None
assert issue.status.name == "Open"
assert issue.issue_type is not None
assert issue.issue_type.name == "Bug"
assert issue.priority is not None
assert issue.priority.name == "High"
def test_get_sprint_issues_exception(self, search_mixin: SearchMixin):
search_mixin.jira.get_sprint_issues.side_effect = Exception("API Error")
with pytest.raises(Exception) as e:
search_mixin.get_sprint_issues("10001")
assert "API Error" in str(e.value)
def test_get_sprint_issues_http_error(self, search_mixin: SearchMixin):
search_mixin.jira.get_sprint_issues.side_effect = requests.HTTPError(
response=MagicMock(content="API Error content")
)
with pytest.raises(Exception) as e:
search_mixin.get_sprint_issues("10001")
assert "API Error content" in str(e.value)
@pytest.mark.parametrize("is_cloud", [True, False])
def test_search_issues_with_projects_filter_jql_construction(
self, search_mixin: SearchMixin, mock_issues_response, is_cloud
):
"""Test that JQL string is correctly constructed when projects_filter is provided."""
# Setup
search_mixin.config.is_cloud = is_cloud
search_mixin.config.projects_filter = (
None # Don't use config filter for this test
)
search_mixin.config.url = "https://test.example.com"
# Setup mock response for both API methods
search_mixin.jira.enhanced_jql_get_list_of_tickets = MagicMock(
return_value=mock_issues_response["issues"]
)
search_mixin.jira.jql = MagicMock(return_value=mock_issues_response)
api_method_mock = getattr(
search_mixin.jira, "enhanced_jql_get_list_of_tickets" if is_cloud else "jql"
)
# Act: Single project filter
search_mixin.search_issues("text ~ 'test'", projects_filter="TEST")
# Define expected kwargs based on is_cloud
expected_kwargs = {
"fields": ANY,
"limit": ANY,
"expand": ANY,
}
# Add start parameter only for Server/DC
if not is_cloud:
expected_kwargs["start"] = ANY
# Assert: JQL verification
api_method_mock.assert_called_with(
"(text ~ 'test') AND project = \"TEST\"", # Check constructed JQL
**expected_kwargs,
)
# Reset mock for next call
api_method_mock.reset_mock()
# Act: Multiple projects filter
search_mixin.search_issues("text ~ 'test'", projects_filter="TEST, DEV")
# Assert: JQL verification
api_method_mock.assert_called_with(
'(text ~ \'test\') AND project IN ("TEST", "DEV")', # Check constructed JQL
**expected_kwargs,
)
# Reset mock for next call
api_method_mock.reset_mock()
# Act: Call with both JQL and filter
search_mixin.search_issues("project = OTHER", projects_filter="TEST")
# Assert: JQL verification (existing JQL has priority)
api_method_mock.assert_called_with("project = OTHER", **expected_kwargs)
@pytest.mark.parametrize("is_cloud", [True, False])
def test_search_issues_with_config_projects_filter_jql_construction(
self, search_mixin: SearchMixin, mock_issues_response, is_cloud
):
"""Test that JQL string is correctly constructed when config.projects_filter is used."""
# Setup
search_mixin.config.is_cloud = is_cloud
search_mixin.config.projects_filter = "CONF1,CONF2" # Set config filter
search_mixin.config.url = "https://test.example.com"
# Setup mock response for both API methods
search_mixin.jira.enhanced_jql_get_list_of_tickets = MagicMock(
return_value=mock_issues_response["issues"]
)
search_mixin.jira.jql = MagicMock(return_value=mock_issues_response)
api_method_mock = getattr(
search_mixin.jira, "enhanced_jql_get_list_of_tickets" if is_cloud else "jql"
)
# Define expected kwargs based on is_cloud
expected_kwargs = {
"fields": ANY,
"limit": ANY,
"expand": ANY,
}
# Add start parameter only for Server/DC
if not is_cloud:
expected_kwargs["start"] = ANY
# Act: Use config filter
search_mixin.search_issues("text ~ 'test'")
# Assert: JQL verification
api_method_mock.assert_called_with(
'(text ~ \'test\') AND project IN ("CONF1", "CONF2")', **expected_kwargs
)
# Reset mock for next call
api_method_mock.reset_mock()
# Act: Override config filter with parameter
search_mixin.search_issues("text ~ 'test'", projects_filter="OVERRIDE")
# Assert: JQL verification
api_method_mock.assert_called_with(
"(text ~ 'test') AND project = \"OVERRIDE\"", **expected_kwargs
)
@pytest.mark.parametrize("is_cloud", [True, False])
def test_search_issues_with_empty_jql_and_projects_filter(
self, search_mixin: SearchMixin, mock_issues_response, is_cloud
):
"""Test that empty JQL correctly prepends project filter without AND."""
# Setup
search_mixin.config.is_cloud = is_cloud
search_mixin.config.projects_filter = None
search_mixin.config.url = "https://test.example.com"
# Setup mock response for both API methods
search_mixin.jira.enhanced_jql_get_list_of_tickets = MagicMock(
return_value=mock_issues_response["issues"]
)
search_mixin.jira.jql = MagicMock(return_value=mock_issues_response)
api_method_mock = getattr(
search_mixin.jira, "enhanced_jql_get_list_of_tickets" if is_cloud else "jql"
)
# Define expected kwargs based on is_cloud
expected_kwargs = {
"fields": ANY,
"limit": ANY,
"expand": ANY,
}
# Add start parameter only for Server/DC
if not is_cloud:
expected_kwargs["start"] = ANY
# Test 1: Empty string JQL with single project
search_mixin.search_issues("", projects_filter="PROJ1")
api_method_mock.assert_called_with('project = "PROJ1"', **expected_kwargs)
# Reset mock
api_method_mock.reset_mock()
# Test 2: Empty string JQL with multiple projects
search_mixin.search_issues("", projects_filter="PROJ1,PROJ2")
api_method_mock.assert_called_with(
'project IN ("PROJ1", "PROJ2")', **expected_kwargs
)
# Reset mock
api_method_mock.reset_mock()
# Test 3: None JQL with projects filter
result = search_mixin.search_issues(None, projects_filter="PROJ1")
api_method_mock.assert_called_with('project = "PROJ1"', **expected_kwargs)
assert isinstance(result, JiraSearchResult)
@pytest.mark.parametrize("is_cloud", [True, False])
def test_search_issues_with_order_by_and_projects_filter(
self, search_mixin: SearchMixin, mock_issues_response, is_cloud
):
"""Test that JQL starting with ORDER BY correctly prepends project filter."""
# Setup
search_mixin.config.is_cloud = is_cloud
search_mixin.config.projects_filter = None
search_mixin.config.url = "https://test.example.com"
# Setup mock response for both API methods
search_mixin.jira.enhanced_jql_get_list_of_tickets = MagicMock(
return_value=mock_issues_response["issues"]
)
search_mixin.jira.jql = MagicMock(return_value=mock_issues_response)
api_method_mock = getattr(
search_mixin.jira, "enhanced_jql_get_list_of_tickets" if is_cloud else "jql"
)
# Define expected kwargs based on is_cloud
expected_kwargs = {
"fields": ANY,
"limit": ANY,
"expand": ANY,
}
# Add start parameter only for Server/DC
if not is_cloud:
expected_kwargs["start"] = ANY
# Test 1: ORDER BY with single project
search_mixin.search_issues("ORDER BY created DESC", projects_filter="PROJ1")
api_method_mock.assert_called_with(
'project = "PROJ1" ORDER BY created DESC', **expected_kwargs
)
# Reset mock
api_method_mock.reset_mock()
# Test 2: ORDER BY with multiple projects
search_mixin.search_issues(
"ORDER BY created DESC", projects_filter="PROJ1,PROJ2"
)
api_method_mock.assert_called_with(
'project IN ("PROJ1", "PROJ2") ORDER BY created DESC', **expected_kwargs
)
# Reset mock
api_method_mock.reset_mock()
# Test 3: Case insensitive ORDER BY
search_mixin.search_issues("order by updated ASC", projects_filter="PROJ1")
api_method_mock.assert_called_with(
'project = "PROJ1" order by updated ASC', **expected_kwargs
)
# Reset mock
api_method_mock.reset_mock()
# Test 4: ORDER BY with extra spaces
search_mixin.search_issues(
" ORDER BY priority DESC ", projects_filter="PROJ1"
)
api_method_mock.assert_called_with(
'project = "PROJ1" ORDER BY priority DESC ', **expected_kwargs
)
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_attachments.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira attachments module."""
from unittest.mock import MagicMock, mock_open, patch
import pytest
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.attachments import AttachmentsMixin
# Test scenarios for AttachmentsMixin
#
# 1. Single Attachment Download (download_attachment method):
# - Success case: Downloads attachment correctly with proper HTTP response
# - Path handling: Converts relative path to absolute path
# - Error cases:
# - No URL provided
# - HTTP error during download
# - File write error
# - File not created after write operation
#
# 2. Issue Attachments Download (download_issue_attachments method):
# - Success case: Downloads all attachments for an issue
# - Path handling: Converts relative target directory to absolute path
# - Edge cases:
# - Issue has no attachments
# - Issue not found
# - Issue has no fields
# - Some attachments fail to download
# - Attachment has missing URL
#
# 3. Single Attachment Upload (upload_attachment method):
# - Success case: Uploads file correctly
# - Path handling: Converts relative file path to absolute path
# - Error cases:
# - No issue key provided
# - No file path provided
# - File not found
# - API error during upload
# - No response from API
#
# 4. Multiple Attachments Upload (upload_attachments method):
# - Success case: Uploads multiple files correctly
# - Partial success: Some files upload successfully, others fail
# - Error cases:
# - Empty list of file paths
# - No issue key provided
class TestAttachmentsMixin:
"""Tests for the AttachmentsMixin class."""
@pytest.fixture
def attachments_mixin(self, jira_fetcher: JiraFetcher) -> AttachmentsMixin:
"""Set up test fixtures before each test method."""
# Create a mock Jira client
attachments_mixin = jira_fetcher
attachments_mixin.jira = MagicMock()
attachments_mixin.jira._session = MagicMock()
return attachments_mixin
def test_download_attachment_success(self, attachments_mixin: AttachmentsMixin):
"""Test successful attachment download."""
# Mock the response
mock_response = MagicMock()
mock_response.iter_content.return_value = [b"test content"]
mock_response.raise_for_status = MagicMock()
attachments_mixin.jira._session.get.return_value = mock_response
# Mock file operations
with (
patch("builtins.open", mock_open()) as mock_file,
patch("os.path.exists") as mock_exists,
patch("os.path.getsize") as mock_getsize,
patch("os.makedirs") as mock_makedirs,
):
mock_exists.return_value = True
mock_getsize.return_value = 12 # Length of "test content"
# Call the method
result = attachments_mixin.download_attachment(
"https://test.url/attachment", "/tmp/test_file.txt"
)
# Assertions
assert result is True
attachments_mixin.jira._session.get.assert_called_once_with(
"https://test.url/attachment", stream=True
)
mock_file.assert_called_once_with("/tmp/test_file.txt", "wb")
mock_file().write.assert_called_once_with(b"test content")
mock_makedirs.assert_called_once()
def test_download_attachment_relative_path(
self, attachments_mixin: AttachmentsMixin
):
"""Test attachment download with a relative path."""
# Mock the response
mock_response = MagicMock()
mock_response.iter_content.return_value = [b"test content"]
mock_response.raise_for_status = MagicMock()
attachments_mixin.jira._session.get.return_value = mock_response
# Mock file operations and os.path.abspath
with (
patch("builtins.open", mock_open()) as mock_file,
patch("os.path.exists") as mock_exists,
patch("os.path.getsize") as mock_getsize,
patch("os.makedirs") as mock_makedirs,
patch("os.path.abspath") as mock_abspath,
patch("os.path.isabs") as mock_isabs,
):
mock_exists.return_value = True
mock_getsize.return_value = 12
mock_isabs.return_value = False
mock_abspath.return_value = "/absolute/path/test_file.txt"
# Call the method with a relative path
result = attachments_mixin.download_attachment(
"https://test.url/attachment", "test_file.txt"
)
# Assertions
assert result is True
mock_isabs.assert_called_once_with("test_file.txt")
mock_abspath.assert_called_once_with("test_file.txt")
mock_file.assert_called_once_with("/absolute/path/test_file.txt", "wb")
def test_download_attachment_no_url(self, attachments_mixin: AttachmentsMixin):
"""Test attachment download with no URL."""
result = attachments_mixin.download_attachment("", "/tmp/test_file.txt")
assert result is False
def test_download_attachment_http_error(self, attachments_mixin: AttachmentsMixin):
"""Test attachment download with an HTTP error."""
# Mock the response to raise an HTTP error
mock_response = MagicMock()
mock_response.raise_for_status.side_effect = Exception("HTTP Error")
attachments_mixin.jira._session.get.return_value = mock_response
result = attachments_mixin.download_attachment(
"https://test.url/attachment", "/tmp/test_file.txt"
)
assert result is False
def test_download_attachment_file_write_error(
self, attachments_mixin: AttachmentsMixin
):
"""Test attachment download with a file write error."""
# Mock the response
mock_response = MagicMock()
mock_response.iter_content.return_value = [b"test content"]
mock_response.raise_for_status = MagicMock()
attachments_mixin.jira._session.get.return_value = mock_response
# Mock file operations to raise an exception during write
with (
patch("builtins.open", mock_open()) as mock_file,
patch("os.makedirs") as mock_makedirs,
):
mock_file().write.side_effect = OSError("Write error")
result = attachments_mixin.download_attachment(
"https://test.url/attachment", "/tmp/test_file.txt"
)
assert result is False
def test_download_attachment_file_not_created(
self, attachments_mixin: AttachmentsMixin
):
"""Test attachment download when file is not created."""
# Mock the response
mock_response = MagicMock()
mock_response.iter_content.return_value = [b"test content"]
mock_response.raise_for_status = MagicMock()
attachments_mixin.jira._session.get.return_value = mock_response
# Mock file operations
with (
patch("builtins.open", mock_open()) as mock_file,
patch("os.path.exists") as mock_exists,
patch("os.makedirs") as mock_makedirs,
):
mock_exists.return_value = False # File doesn't exist after write
result = attachments_mixin.download_attachment(
"https://test.url/attachment", "/tmp/test_file.txt"
)
assert result is False
def test_download_issue_attachments_success(
self, attachments_mixin: AttachmentsMixin
):
"""Test successful download of all issue attachments."""
# Mock the issue data
mock_issue = {
"fields": {
"attachment": [
{
"filename": "test1.txt",
"content": "https://test.url/attachment1",
"size": 100,
},
{
"filename": "test2.txt",
"content": "https://test.url/attachment2",
"size": 200,
},
]
}
}
attachments_mixin.jira.issue.return_value = mock_issue
# Mock JiraAttachment.from_api_response
mock_attachment1 = MagicMock()
mock_attachment1.filename = "test1.txt"
mock_attachment1.url = "https://test.url/attachment1"
mock_attachment1.size = 100
mock_attachment2 = MagicMock()
mock_attachment2.filename = "test2.txt"
mock_attachment2.url = "https://test.url/attachment2"
mock_attachment2.size = 200
# Mock the download_attachment method
with (
patch.object(
attachments_mixin, "download_attachment", return_value=True
) as mock_download,
patch("pathlib.Path.mkdir") as mock_mkdir,
patch(
"mcp_atlassian.models.jira.JiraAttachment.from_api_response",
side_effect=[mock_attachment1, mock_attachment2],
),
):
result = attachments_mixin.download_issue_attachments(
"TEST-123", "/tmp/attachments"
)
# Assertions
assert result["success"] is True
assert len(result["downloaded"]) == 2
assert len(result["failed"]) == 0
assert result["total"] == 2
assert result["issue_key"] == "TEST-123"
assert mock_download.call_count == 2
mock_mkdir.assert_called_once()
def test_download_issue_attachments_relative_path(
self, attachments_mixin: AttachmentsMixin
):
"""Test download issue attachments with a relative path."""
# Mock the issue data
mock_issue = {
"fields": {
"attachment": [
{
"filename": "test1.txt",
"content": "https://test.url/attachment1",
"size": 100,
}
]
}
}
attachments_mixin.jira.issue.return_value = mock_issue
# Mock attachment
mock_attachment = MagicMock()
mock_attachment.filename = "test1.txt"
mock_attachment.url = "https://test.url/attachment1"
mock_attachment.size = 100
# Mock path operations
with (
patch.object(
attachments_mixin, "download_attachment", return_value=True
) as mock_download,
patch("pathlib.Path.mkdir") as mock_mkdir,
patch(
"mcp_atlassian.models.jira.JiraAttachment.from_api_response",
return_value=mock_attachment,
),
patch("os.path.isabs") as mock_isabs,
patch("os.path.abspath") as mock_abspath,
):
mock_isabs.return_value = False
mock_abspath.return_value = "/absolute/path/attachments"
result = attachments_mixin.download_issue_attachments(
"TEST-123", "attachments"
)
# Assertions
assert result["success"] is True
mock_isabs.assert_called_once_with("attachments")
mock_abspath.assert_called_once_with("attachments")
def test_download_issue_attachments_no_attachments(
self, attachments_mixin: AttachmentsMixin
):
"""Test download when issue has no attachments."""
# Mock the issue data with no attachments
mock_issue = {"fields": {"attachment": []}}
attachments_mixin.jira.issue.return_value = mock_issue
with patch("pathlib.Path.mkdir") as mock_mkdir:
result = attachments_mixin.download_issue_attachments(
"TEST-123", "/tmp/attachments"
)
# Assertions
assert result["success"] is True
assert "No attachments found" in result["message"]
assert len(result["downloaded"]) == 0
assert len(result["failed"]) == 0
mock_mkdir.assert_called_once()
def test_download_issue_attachments_issue_not_found(
self, attachments_mixin: AttachmentsMixin
):
"""Test download when issue cannot be retrieved."""
attachments_mixin.jira.issue.return_value = None
with pytest.raises(
TypeError,
match="Unexpected return value type from `jira.issue`: <class 'NoneType'>",
):
attachments_mixin.download_issue_attachments("TEST-123", "/tmp/attachments")
def test_download_issue_attachments_no_fields(
self, attachments_mixin: AttachmentsMixin
):
"""Test download when issue has no fields."""
# Mock the issue data with no fields
mock_issue = {} # Missing 'fields' key
attachments_mixin.jira.issue.return_value = mock_issue
result = attachments_mixin.download_issue_attachments(
"TEST-123", "/tmp/attachments"
)
# Assertions
assert result["success"] is False
assert "Could not retrieve issue" in result["error"]
def test_download_issue_attachments_some_failures(
self, attachments_mixin: AttachmentsMixin
):
"""Test download when some attachments fail to download."""
# Mock the issue data
mock_issue = {
"fields": {
"attachment": [
{
"filename": "test1.txt",
"content": "https://test.url/attachment1",
"size": 100,
},
{
"filename": "test2.txt",
"content": "https://test.url/attachment2",
"size": 200,
},
]
}
}
attachments_mixin.jira.issue.return_value = mock_issue
# Mock attachments
mock_attachment1 = MagicMock()
mock_attachment1.filename = "test1.txt"
mock_attachment1.url = "https://test.url/attachment1"
mock_attachment1.size = 100
mock_attachment2 = MagicMock()
mock_attachment2.filename = "test2.txt"
mock_attachment2.url = "https://test.url/attachment2"
mock_attachment2.size = 200
# Mock the download_attachment method to succeed for first attachment and fail for second
with (
patch.object(
attachments_mixin, "download_attachment", side_effect=[True, False]
) as mock_download,
patch("pathlib.Path.mkdir") as mock_mkdir,
patch(
"mcp_atlassian.models.jira.JiraAttachment.from_api_response",
side_effect=[mock_attachment1, mock_attachment2],
),
):
result = attachments_mixin.download_issue_attachments(
"TEST-123", "/tmp/attachments"
)
# Assertions
assert result["success"] is True
assert len(result["downloaded"]) == 1
assert len(result["failed"]) == 1
assert result["downloaded"][0]["filename"] == "test1.txt"
assert result["failed"][0]["filename"] == "test2.txt"
assert mock_download.call_count == 2
def test_download_issue_attachments_missing_url(
self, attachments_mixin: AttachmentsMixin
):
"""Test download when an attachment has no URL."""
# Mock the issue data
mock_issue = {
"fields": {
"attachment": [
{
"filename": "test1.txt",
"content": "https://test.url/attachment1",
"size": 100,
}
]
}
}
attachments_mixin.jira.issue.return_value = mock_issue
# Mock attachment with no URL
mock_attachment = MagicMock()
mock_attachment.filename = "test1.txt"
mock_attachment.url = None # No URL
mock_attachment.size = 100
# Mock path operations
with (
patch("pathlib.Path.mkdir") as mock_mkdir,
patch(
"mcp_atlassian.models.jira.JiraAttachment.from_api_response",
return_value=mock_attachment,
),
):
result = attachments_mixin.download_issue_attachments(
"TEST-123", "/tmp/attachments"
)
# Assertions
assert result["success"] is True
assert len(result["downloaded"]) == 0
assert len(result["failed"]) == 1
assert result["failed"][0]["filename"] == "test1.txt"
assert "No URL available" in result["failed"][0]["error"]
# Tests for upload_attachment method
def test_upload_attachment_success(self, attachments_mixin: AttachmentsMixin):
"""Test successful attachment upload."""
# Mock the Jira API response
mock_attachment_response = {
"id": "12345",
"filename": "test_file.txt",
"size": 100,
}
attachments_mixin.jira.add_attachment.return_value = mock_attachment_response
# Mock file operations
with (
patch("os.path.exists") as mock_exists,
patch("os.path.getsize") as mock_getsize,
patch("os.path.isabs") as mock_isabs,
patch("os.path.abspath") as mock_abspath,
patch("os.path.basename") as mock_basename,
patch("builtins.open", mock_open(read_data=b"test content")),
):
mock_exists.return_value = True
mock_getsize.return_value = 100
mock_isabs.return_value = True
mock_abspath.return_value = "/absolute/path/test_file.txt"
mock_basename.return_value = "test_file.txt"
# Call the method
result = attachments_mixin.upload_attachment(
"TEST-123", "/absolute/path/test_file.txt"
)
# Assertions
assert result["success"] is True
assert result["issue_key"] == "TEST-123"
assert result["filename"] == "test_file.txt"
assert result["size"] == 100
assert result["id"] == "12345"
attachments_mixin.jira.add_attachment.assert_called_once_with(
issue_key="TEST-123", filename="/absolute/path/test_file.txt"
)
def test_upload_attachment_relative_path(self, attachments_mixin: AttachmentsMixin):
"""Test attachment upload with a relative path."""
# Mock the Jira API response
mock_attachment_response = {
"id": "12345",
"filename": "test_file.txt",
"size": 100,
}
attachments_mixin.jira.add_attachment.return_value = mock_attachment_response
# Mock file operations
with (
patch("os.path.exists") as mock_exists,
patch("os.path.getsize") as mock_getsize,
patch("os.path.isabs") as mock_isabs,
patch("os.path.abspath") as mock_abspath,
patch("os.path.basename") as mock_basename,
patch("builtins.open", mock_open(read_data=b"test content")),
):
mock_exists.return_value = True
mock_getsize.return_value = 100
mock_isabs.return_value = False
mock_abspath.return_value = "/absolute/path/test_file.txt"
mock_basename.return_value = "test_file.txt"
# Call the method with a relative path
result = attachments_mixin.upload_attachment("TEST-123", "test_file.txt")
# Assertions
assert result["success"] is True
mock_isabs.assert_called_once_with("test_file.txt")
mock_abspath.assert_called_once_with("test_file.txt")
attachments_mixin.jira.add_attachment.assert_called_once_with(
issue_key="TEST-123", filename="/absolute/path/test_file.txt"
)
def test_upload_attachment_no_issue_key(self, attachments_mixin: AttachmentsMixin):
"""Test attachment upload with no issue key."""
result = attachments_mixin.upload_attachment("", "/path/to/file.txt")
# Assertions
assert result["success"] is False
assert "No issue key provided" in result["error"]
attachments_mixin.jira.add_attachment.assert_not_called()
def test_upload_attachment_no_file_path(self, attachments_mixin: AttachmentsMixin):
"""Test attachment upload with no file path."""
result = attachments_mixin.upload_attachment("TEST-123", "")
# Assertions
assert result["success"] is False
assert "No file path provided" in result["error"]
attachments_mixin.jira.add_attachment.assert_not_called()
def test_upload_attachment_file_not_found(
self, attachments_mixin: AttachmentsMixin
):
"""Test attachment upload when file doesn't exist."""
# Mock file operations
with (
patch("os.path.exists") as mock_exists,
patch("os.path.isabs") as mock_isabs,
patch("os.path.abspath") as mock_abspath,
patch("builtins.open", mock_open(read_data=b"test content")),
):
mock_exists.return_value = False
mock_isabs.return_value = True
mock_abspath.return_value = "/absolute/path/test_file.txt"
result = attachments_mixin.upload_attachment(
"TEST-123", "/absolute/path/test_file.txt"
)
# Assertions
assert result["success"] is False
assert "File not found" in result["error"]
attachments_mixin.jira.add_attachment.assert_not_called()
def test_upload_attachment_api_error(self, attachments_mixin: AttachmentsMixin):
"""Test attachment upload with an API error."""
# Mock the Jira API to raise an exception
attachments_mixin.jira.add_attachment.side_effect = Exception("API Error")
# Mock file operations
with (
patch("os.path.exists") as mock_exists,
patch("os.path.isabs") as mock_isabs,
patch("os.path.abspath") as mock_abspath,
patch("os.path.basename") as mock_basename,
patch("builtins.open", mock_open(read_data=b"test content")),
):
mock_exists.return_value = True
mock_isabs.return_value = True
mock_abspath.return_value = "/absolute/path/test_file.txt"
mock_basename.return_value = "test_file.txt"
result = attachments_mixin.upload_attachment(
"TEST-123", "/absolute/path/test_file.txt"
)
# Assertions
assert result["success"] is False
assert "API Error" in result["error"]
def test_upload_attachment_no_response(self, attachments_mixin: AttachmentsMixin):
"""Test attachment upload when API returns no response."""
# Mock the Jira API to return None
attachments_mixin.jira.add_attachment.return_value = None
# Mock file operations
with (
patch("os.path.exists") as mock_exists,
patch("os.path.isabs") as mock_isabs,
patch("os.path.abspath") as mock_abspath,
patch("os.path.basename") as mock_basename,
patch("builtins.open", mock_open(read_data=b"test content")),
):
mock_exists.return_value = True
mock_isabs.return_value = True
mock_abspath.return_value = "/absolute/path/test_file.txt"
mock_basename.return_value = "test_file.txt"
result = attachments_mixin.upload_attachment(
"TEST-123", "/absolute/path/test_file.txt"
)
# Assertions
assert result["success"] is False
assert "Failed to upload attachment" in result["error"]
# Tests for upload_attachments method
def test_upload_attachments_success(self, attachments_mixin: AttachmentsMixin):
"""Test successful upload of multiple attachments."""
# Set up mock for upload_attachment method to simulate successful uploads
file_paths = [
"/path/to/file1.txt",
"/path/to/file2.pdf",
"/path/to/file3.jpg",
]
# Create mock successful results for each file
mock_results = [
{
"success": True,
"issue_key": "TEST-123",
"filename": f"file{i + 1}.{ext}",
"size": 100 * (i + 1),
"id": f"id{i + 1}",
}
for i, ext in enumerate(["txt", "pdf", "jpg"])
]
with patch.object(
attachments_mixin, "upload_attachment", side_effect=mock_results
) as mock_upload:
# Call the method
result = attachments_mixin.upload_attachments("TEST-123", file_paths)
# Assertions
assert result["success"] is True
assert result["issue_key"] == "TEST-123"
assert result["total"] == 3
assert len(result["uploaded"]) == 3
assert len(result["failed"]) == 0
# Check that upload_attachment was called for each file
assert mock_upload.call_count == 3
mock_upload.assert_any_call("TEST-123", "/path/to/file1.txt")
mock_upload.assert_any_call("TEST-123", "/path/to/file2.pdf")
mock_upload.assert_any_call("TEST-123", "/path/to/file3.jpg")
# Verify uploaded files details
assert result["uploaded"][0]["filename"] == "file1.txt"
assert result["uploaded"][1]["filename"] == "file2.pdf"
assert result["uploaded"][2]["filename"] == "file3.jpg"
assert result["uploaded"][0]["size"] == 100
assert result["uploaded"][1]["size"] == 200
assert result["uploaded"][2]["size"] == 300
assert result["uploaded"][0]["id"] == "id1"
assert result["uploaded"][1]["id"] == "id2"
assert result["uploaded"][2]["id"] == "id3"
def test_upload_attachments_mixed_results(
self, attachments_mixin: AttachmentsMixin
):
"""Test upload of multiple attachments with mixed success and failure."""
# Set up mock for upload_attachment method to simulate mixed results
file_paths = [
"/path/to/file1.txt", # Will succeed
"/path/to/file2.pdf", # Will fail
"/path/to/file3.jpg", # Will succeed
]
# Create mock results with mixed success/failure
mock_results = [
{
"success": True,
"issue_key": "TEST-123",
"filename": "file1.txt",
"size": 100,
"id": "id1",
},
{"success": False, "error": "File not found: /path/to/file2.pdf"},
{
"success": True,
"issue_key": "TEST-123",
"filename": "file3.jpg",
"size": 300,
"id": "id3",
},
]
with patch.object(
attachments_mixin, "upload_attachment", side_effect=mock_results
) as mock_upload:
# Call the method
result = attachments_mixin.upload_attachments("TEST-123", file_paths)
# Assertions
assert (
result["success"] is True
) # Overall success is True even with partial failures
assert result["issue_key"] == "TEST-123"
assert result["total"] == 3
assert len(result["uploaded"]) == 2
assert len(result["failed"]) == 1
# Check that upload_attachment was called for each file
assert mock_upload.call_count == 3
# Verify uploaded files details
assert result["uploaded"][0]["filename"] == "file1.txt"
assert result["uploaded"][1]["filename"] == "file3.jpg"
assert result["uploaded"][0]["size"] == 100
assert result["uploaded"][1]["size"] == 300
assert result["uploaded"][0]["id"] == "id1"
assert result["uploaded"][1]["id"] == "id3"
# Verify failed file details
assert result["failed"][0]["filename"] == "file2.pdf"
assert "File not found" in result["failed"][0]["error"]
def test_upload_attachments_empty_list(self, attachments_mixin: AttachmentsMixin):
"""Test upload with an empty list of file paths."""
# Call the method with an empty list
result = attachments_mixin.upload_attachments("TEST-123", [])
# Assertions
assert result["success"] is False
assert "No file paths provided" in result["error"]
def test_upload_attachments_no_issue_key(self, attachments_mixin: AttachmentsMixin):
"""Test upload with no issue key provided."""
# Call the method with no issue key
result = attachments_mixin.upload_attachments("", ["/path/to/file.txt"])
# Assertions
assert result["success"] is False
assert "No issue key provided" in result["error"]
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_epics.py:
--------------------------------------------------------------------------------
```python
"""Tests for the Jira Epics mixin."""
from unittest.mock import MagicMock, call
import pytest
from mcp_atlassian.jira import JiraFetcher
from mcp_atlassian.jira.epics import EpicsMixin
from mcp_atlassian.models.jira import JiraIssue
class TestEpicsMixin:
"""Tests for the EpicsMixin class."""
@pytest.fixture
def epics_mixin(self, jira_fetcher: JiraFetcher) -> EpicsMixin:
"""Create an EpicsMixin instance with mocked dependencies."""
mixin = jira_fetcher
# Add a mock for get_issue to use when returning models
mixin.get_issue = MagicMock(
return_value=JiraIssue(
id="12345",
key="TEST-123",
summary="Test Issue",
description="Issue content",
)
)
# Add a mock for search_issues to use for get_epic_issues
mixin.search_issues = MagicMock(
return_value=[
JiraIssue(key="TEST-456", summary="Issue 1"),
JiraIssue(key="TEST-789", summary="Issue 2"),
]
)
return mixin
def test_try_discover_fields_from_existing_epic(self, epics_mixin: EpicsMixin):
"""Test _try_discover_fields_from_existing_epic with a successful discovery."""
# Skip if we already have both required fields
field_ids = {"epic_link": "customfield_10014"} # Missing epic_name
# Mock Epic search response
mock_epic = {
"key": "EPIC-123",
"fields": {
"issuetype": {"name": "Epic"},
"summary": "Test Epic",
"customfield_10011": "Epic Name Value", # This should be discovered as epic_name
},
}
mock_results = {"issues": [mock_epic]}
epics_mixin.jira.jql.return_value = mock_results
# Call the method
epics_mixin._try_discover_fields_from_existing_epic(field_ids)
# Verify the epic_name field was discovered
assert "epic_name" in field_ids
assert field_ids["epic_name"] == "customfield_10011"
def test_try_discover_fields_from_existing_epic_no_epics(
self, epics_mixin: EpicsMixin
):
"""Test _try_discover_fields_from_existing_epic when no epics exist."""
field_ids = {}
# Mock empty search response
mock_results = {"issues": []}
epics_mixin.jira.jql.return_value = mock_results
# Call the method
epics_mixin._try_discover_fields_from_existing_epic(field_ids)
# Verify no fields were discovered
assert not field_ids
def test_try_discover_fields_from_existing_epic_with_both_fields(
self, epics_mixin: EpicsMixin
):
"""Test _try_discover_fields_from_existing_epic when both fields already exist."""
field_ids = {"epic_link": "customfield_10014", "epic_name": "customfield_10011"}
# Call the method - no JQL should be executed
epics_mixin._try_discover_fields_from_existing_epic(field_ids)
# Verify jql was not called
epics_mixin.jira.jql.assert_not_called()
def test_prepare_epic_fields_basic(self, epics_mixin: EpicsMixin):
"""Test prepare_epic_fields with basic epic name and color."""
# Mock get_field_ids_to_epic
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10010",
}
)
# Prepare test data
fields = {}
summary = "Test Epic"
kwargs = {}
# Call the method
epics_mixin.prepare_epic_fields(fields, summary, kwargs)
# Verify the epic fields are stored in kwargs with __epic_ prefix
# instead of directly in fields (for two-step creation)
assert kwargs["__epic_name_value"] == "Test Epic"
assert kwargs["__epic_name_field"] == "customfield_10011"
assert kwargs["__epic_color_value"] == "green"
assert kwargs["__epic_color_field"] == "customfield_10010"
# Verify fields dict remains empty
assert fields == {}
def test_prepare_epic_fields_with_user_values(self, epics_mixin: EpicsMixin):
"""Test prepare_epic_fields with user-provided values."""
# Mock get_field_ids_to_epic
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10010",
}
)
# Prepare test data
fields = {}
summary = "Test Epic"
kwargs = {"epic_name": "Custom Epic Name", "epic_color": "blue"}
# Call the method
epics_mixin.prepare_epic_fields(fields, summary, kwargs)
# Verify the epic fields are stored in kwargs with __epic_ prefix
assert kwargs["__epic_name_value"] == "Custom Epic Name"
assert kwargs["__epic_name_field"] == "customfield_10011"
assert kwargs["__epic_color_value"] == "blue"
assert kwargs["__epic_color_field"] == "customfield_10010"
# Original values should be removed from kwargs
assert "epic_name" not in kwargs
assert "epic_color" not in kwargs
# Verify fields dict remains empty
assert fields == {}
def test_prepare_epic_fields_missing_epic_name(self, epics_mixin: EpicsMixin):
"""Test prepare_epic_fields with missing epic_name field."""
# Mock get_field_ids_to_epic
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={"epic_color": "customfield_10010"}
)
# Prepare test data
fields = {}
summary = "Test Epic"
kwargs = {}
# Call the method
epics_mixin.prepare_epic_fields(fields, summary, kwargs)
# Verify only the color was stored in kwargs
assert "__epic_name_value" not in kwargs
assert "__epic_name_field" not in kwargs
assert kwargs["__epic_color_value"] == "green"
assert kwargs["__epic_color_field"] == "customfield_10010"
# Verify fields dict remains empty
assert fields == {}
def test_prepare_epic_fields_with_error(self, epics_mixin: EpicsMixin):
"""Test prepare_epic_fields catches and logs errors."""
# Mock get_field_ids_to_epic to raise an exception
epics_mixin.get_field_ids_to_epic = MagicMock(
side_effect=Exception("Field error")
)
# Create the fields dict and call the method
fields = {}
epics_mixin.prepare_epic_fields(fields, "Test Epic", {})
# Verify that fields didn't get updated
assert fields == {}
# Verify the error was logged
epics_mixin.get_field_ids_to_epic.assert_called_once()
def test_prepare_epic_fields_with_non_standard_ids(self, epics_mixin: EpicsMixin):
"""Test that prepare_epic_fields correctly handles non-standard field IDs."""
# Mock field IDs with non-standard custom field IDs
mock_field_ids = {
"epic_name": "customfield_54321",
"epic_color": "customfield_98765",
}
# Mock the get_field_ids_to_epic method to return our custom field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(return_value=mock_field_ids)
# Create the fields dict and call the method with basic values
fields = {}
kwargs = {}
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs)
# Verify fields were stored in kwargs with the non-standard IDs
assert kwargs["__epic_name_value"] == "Test Epic"
assert kwargs["__epic_name_field"] == "customfield_54321"
assert kwargs["__epic_color_value"] == "green"
assert kwargs["__epic_color_field"] == "customfield_98765"
# Verify fields dict remains empty
assert fields == {}
# Test with custom values
fields = {}
kwargs = {"epic_name": "Custom Name", "epic_color": "blue"}
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs)
# Verify custom values were stored in kwargs
assert kwargs["__epic_name_value"] == "Custom Name"
assert kwargs["__epic_name_field"] == "customfield_54321"
assert kwargs["__epic_color_value"] == "blue"
assert kwargs["__epic_color_field"] == "customfield_98765"
# Original values should be removed from kwargs
assert "epic_name" not in kwargs
assert "epic_color" not in kwargs
# Verify fields dict remains empty
assert fields == {}
def test_prepare_epic_fields_with_required_epic_name(self, epics_mixin: EpicsMixin):
"""Test Epic field preparation when Epic Name is a required field."""
# Mock get_field_ids_to_epic to return field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10012",
}
)
# Mock get_required_fields to return Epic Name as required
epics_mixin.get_required_fields = MagicMock(
return_value={
"customfield_10011": {
"fieldId": "customfield_10011",
"required": True,
"name": "Epic Name",
}
}
)
fields = {}
kwargs = {"epic_name": "My Epic Name", "epic_color": "blue"}
# Call prepare_epic_fields with project_key
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs, "TEST")
# Assert Epic Name was added to fields for initial creation
assert fields["customfield_10011"] == "My Epic Name"
# Assert Epic Color was stored for post-creation update
assert kwargs["__epic_color_field"] == "customfield_10012"
assert kwargs["__epic_color_value"] == "blue"
# Verify get_required_fields was called with correct parameters
epics_mixin.get_required_fields.assert_called_once_with("Epic", "TEST")
def test_prepare_epic_fields_with_optional_epic_name(self, epics_mixin: EpicsMixin):
"""Test Epic field preparation when Epic Name is not a required field."""
# Mock get_field_ids_to_epic to return field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10012",
}
)
# Mock get_required_fields to return empty dict (no required fields)
epics_mixin.get_required_fields = MagicMock(return_value={})
fields = {}
kwargs = {"epic_name": "My Epic Name", "epic_color": "green"}
# Call prepare_epic_fields with project_key
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs, "TEST")
# Assert Epic Name was stored for post-creation update (not in fields)
assert "customfield_10011" not in fields
assert kwargs["__epic_name_field"] == "customfield_10011"
assert kwargs["__epic_name_value"] == "My Epic Name"
# Assert Epic Color was also stored for post-creation update
assert kwargs["__epic_color_field"] == "customfield_10012"
assert kwargs["__epic_color_value"] == "green"
def test_prepare_epic_fields_mixed_required_optional(self, epics_mixin: EpicsMixin):
"""Test Epic field preparation with mixed required and optional fields."""
# Mock get_field_ids_to_epic to return field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10012",
"epic_start_date": "customfield_10013",
}
)
# Mock get_required_fields to return Epic Name and Start Date as required
epics_mixin.get_required_fields = MagicMock(
return_value={
"customfield_10011": {"fieldId": "customfield_10011", "required": True},
"customfield_10013": {"fieldId": "customfield_10013", "required": True},
}
)
fields = {}
kwargs = {
"epic_name": "My Epic Name",
"epic_color": "purple",
"epic_start_date": "2024-01-01",
}
# Call prepare_epic_fields with project_key
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs, "TEST")
# Assert required fields were added to fields
assert fields["customfield_10011"] == "My Epic Name"
assert fields["customfield_10013"] == "2024-01-01"
# Assert optional field was stored for post-creation update
assert "customfield_10012" not in fields
assert kwargs["__epic_color_field"] == "customfield_10012"
assert kwargs["__epic_color_value"] == "purple"
def test_prepare_epic_fields_no_project_key(self, epics_mixin: EpicsMixin):
"""Test Epic field preparation when no project_key is provided."""
# Mock get_field_ids_to_epic to return field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10012",
}
)
# Mock get_required_fields should not be called
epics_mixin.get_required_fields = MagicMock()
fields = {}
kwargs = {"epic_name": "My Epic Name", "epic_color": "red"}
# Call prepare_epic_fields without project_key (None)
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs, None)
# Assert all fields were stored for post-creation update (fallback behavior)
assert "customfield_10011" not in fields
assert "customfield_10012" not in fields
assert kwargs["__epic_name_field"] == "customfield_10011"
assert kwargs["__epic_name_value"] == "My Epic Name"
assert kwargs["__epic_color_field"] == "customfield_10012"
assert kwargs["__epic_color_value"] == "red"
# Verify get_required_fields was not called
epics_mixin.get_required_fields.assert_not_called()
def test_prepare_epic_fields_get_required_fields_error(
self, epics_mixin: EpicsMixin
):
"""Test Epic field preparation when get_required_fields raises an error."""
# Mock get_field_ids_to_epic to return field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10012",
}
)
# Mock get_required_fields to raise an exception
epics_mixin.get_required_fields = MagicMock(side_effect=Exception("API error"))
fields = {}
kwargs = {"epic_name": "My Epic Name", "epic_color": "yellow"}
# Call prepare_epic_fields with project_key
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs, "TEST")
# Assert it falls back to storing all fields for post-creation update
assert "customfield_10011" not in fields
assert "customfield_10012" not in fields
assert kwargs["__epic_name_field"] == "customfield_10011"
assert kwargs["__epic_name_value"] == "My Epic Name"
assert kwargs["__epic_color_field"] == "customfield_10012"
assert kwargs["__epic_color_value"] == "yellow"
def test_prepare_epic_fields_no_get_required_fields_method(
self, epics_mixin: EpicsMixin
):
"""Test Epic field preparation when get_required_fields method doesn't exist."""
# Mock get_field_ids_to_epic to return field IDs
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"epic_name": "customfield_10011",
"epic_color": "customfield_10012",
}
)
# Mock hasattr to return False for get_required_fields
original_hasattr = hasattr
def mock_hasattr(obj, attr):
if attr == "get_required_fields":
return False
return original_hasattr(obj, attr)
import builtins
builtins.hasattr = mock_hasattr
fields = {}
kwargs = {"epic_name": "My Epic Name", "epic_color": "orange"}
# Call prepare_epic_fields with project_key
epics_mixin.prepare_epic_fields(fields, "Test Epic", kwargs, "TEST")
# Restore original hasattr
builtins.hasattr = original_hasattr
# Assert it falls back to storing all fields for post-creation update
assert "customfield_10011" not in fields
assert "customfield_10012" not in fields
assert kwargs["__epic_name_field"] == "customfield_10011"
assert kwargs["__epic_name_value"] == "My Epic Name"
assert kwargs["__epic_color_field"] == "customfield_10012"
assert kwargs["__epic_color_value"] == "orange"
def test_dynamic_epic_field_discovery(self, epics_mixin: EpicsMixin):
"""Test the dynamic discovery of Epic fields with pattern matching."""
# Mock get_field_ids_to_epic with no epic-related fields
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={
"random_field": "customfield_12345",
"some_other_field": "customfield_67890",
"Epic-FieldName": "customfield_11111", # Should be found by pattern matching
"epic_colour_field": "customfield_22222", # Should be found by pattern matching
}
)
# Create a fields dict and call prepare_epic_fields
fields = {}
kwargs = {}
# The _get_epic_name_field_id and _get_epic_color_field_id methods should discover
# the fields by pattern matching, even though they're not in the standard format
# We need to patch these methods to return the expected values
original_get_name = epics_mixin._get_epic_name_field_id
original_get_color = epics_mixin._get_epic_color_field_id
epics_mixin._get_epic_name_field_id = MagicMock(
return_value="customfield_11111"
)
epics_mixin._get_epic_color_field_id = MagicMock(
return_value="customfield_22222"
)
# Now call prepare_epic_fields
epics_mixin.prepare_epic_fields(fields, "Test Epic Name", kwargs)
# Verify the fields were stored in kwargs
assert kwargs["__epic_name_value"] == "Test Epic Name"
assert kwargs["__epic_name_field"] == "customfield_11111"
assert kwargs["__epic_color_value"] == "green"
assert kwargs["__epic_color_field"] == "customfield_22222"
# Verify fields dict remains empty
assert fields == {}
# Restore the original methods
epics_mixin._get_epic_name_field_id = original_get_name
epics_mixin._get_epic_color_field_id = original_get_color
def test_link_issue_to_epic_success(self, epics_mixin: EpicsMixin):
"""Test link_issue_to_epic with successful linking."""
# Setup mocks
# - issue exists
epics_mixin.jira.get_issue.side_effect = [
{"key": "TEST-123"}, # issue
{ # epic
"key": "EPIC-456",
"fields": {"issuetype": {"name": "Epic"}},
},
]
# Mock get_issue to return a valid JiraIssue
epics_mixin.get_issue = MagicMock(
return_value=JiraIssue(key="TEST-123", id="123456")
)
# - epic link field discovered
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={"epic_link": "customfield_10014"}
)
# - Parent field fails, then epic_link succeeds
epics_mixin.jira.update_issue.side_effect = [
Exception("Parent field error"), # First attempt fails
None, # Second attempt succeeds
]
# Call the method
result = epics_mixin.link_issue_to_epic("TEST-123", "EPIC-456")
# Verify API calls - should have two calls, one for parent and one for epic_link
assert epics_mixin.jira.update_issue.call_count == 2
# First call should be with parent
assert epics_mixin.jira.update_issue.call_args_list[0] == call(
issue_key="TEST-123", update={"fields": {"parent": {"key": "EPIC-456"}}}
)
# Second call should be with epic_link field
assert epics_mixin.jira.update_issue.call_args_list[1] == call(
issue_key="TEST-123", update={"fields": {"customfield_10014": "EPIC-456"}}
)
# Verify get_issue was called to return the result
epics_mixin.get_issue.assert_called_once_with("TEST-123")
# Verify result
assert isinstance(result, JiraIssue)
assert result.key == "TEST-123"
def test_link_issue_to_epic_parent_field_success(self, epics_mixin: EpicsMixin):
"""Test link_issue_to_epic succeeding with parent field."""
# Setup mocks
epics_mixin.jira.get_issue.side_effect = [
{"key": "TEST-123"}, # issue
{ # epic
"key": "EPIC-456",
"fields": {"issuetype": {"name": "Epic"}},
},
]
# Mock get_issue to return a valid JiraIssue
epics_mixin.get_issue = MagicMock(
return_value=JiraIssue(key="TEST-123", id="123456")
)
# - No epic_link field (forces parent usage)
epics_mixin.get_field_ids_to_epic = MagicMock(return_value={})
# Parent field update succeeds
epics_mixin.jira.update_issue.return_value = None
# Call the method
result = epics_mixin.link_issue_to_epic("TEST-123", "EPIC-456")
# Verify only one API call with parent field
epics_mixin.jira.update_issue.assert_called_once_with(
issue_key="TEST-123", update={"fields": {"parent": {"key": "EPIC-456"}}}
)
# Verify result
assert isinstance(result, JiraIssue)
assert result.key == "TEST-123"
def test_link_issue_to_epic_not_epic(self, epics_mixin: EpicsMixin):
"""Test link_issue_to_epic when the target is not an epic."""
# Setup mocks
epics_mixin.jira.get_issue.side_effect = [
{"key": "TEST-123"}, # issue
{ # not an epic
"key": "TEST-456",
"fields": {"issuetype": {"name": "Task"}},
},
]
# Call the method and expect an error
with pytest.raises(
ValueError, match="Error linking issue to epic: TEST-456 is not an Epic"
):
epics_mixin.link_issue_to_epic("TEST-123", "TEST-456")
def test_link_issue_to_epic_all_methods_fail(self, epics_mixin: EpicsMixin):
"""Test link_issue_to_epic when all linking methods fail."""
# Setup mocks
epics_mixin.jira.get_issue.side_effect = [
{"key": "TEST-123"}, # issue
{ # epic
"key": "EPIC-456",
"fields": {"issuetype": {"name": "Epic"}},
},
]
# No epic link fields found
epics_mixin.get_field_ids_to_epic = MagicMock(return_value={})
# All update attempts fail
epics_mixin.jira.update_issue.side_effect = Exception("Update failed")
epics_mixin.jira.create_issue_link.side_effect = Exception("Link failed")
# Call the method and expect a ValueError
with pytest.raises(
ValueError,
match="Could not link issue TEST-123 to epic EPIC-456.",
):
epics_mixin.link_issue_to_epic("TEST-123", "EPIC-456")
def test_link_issue_to_epic_api_error(self, epics_mixin: EpicsMixin):
"""Test link_issue_to_epic with API error in the epic retrieval."""
# Setup mocks to fail at epic retrieval
epics_mixin.jira.get_issue.side_effect = [
{"key": "TEST-123"}, # issue
Exception("API error"), # epic retrieval fails
]
# Call the method and expect the API error to be propagated
with pytest.raises(Exception, match="Error linking issue to epic: API error"):
epics_mixin.link_issue_to_epic("TEST-123", "EPIC-456")
def test_get_epic_issues_success(self, epics_mixin):
"""Test get_epic_issues with successful retrieval."""
# Setup mocks
epics_mixin.jira.get_issue.return_value = {
"key": "EPIC-123",
"fields": {"issuetype": {"name": "Epic"}},
}
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={"epic_link": "customfield_10014"}
)
# Create a mock search result object with issues attribute
mock_issues = [
JiraIssue(key="TEST-456", summary="Issue 1"),
JiraIssue(key="TEST-789", summary="Issue 2"),
]
# Create a mock object with an issues attribute
class MockSearchResult:
def __init__(self, issues):
self.issues = issues
mock_search_result = MockSearchResult(mock_issues)
# Mock search_issues to return our mock search result
epics_mixin.search_issues = MagicMock(return_value=mock_search_result)
# Call the method with start parameter
result = epics_mixin.get_epic_issues("EPIC-123", start=5, limit=10)
# Verify search_issues was called with the right JQL
epics_mixin.search_issues.assert_called_once()
call_args = epics_mixin.search_issues.call_args[0]
assert 'issueFunction in issuesScopedToEpic("EPIC-123")' in call_args[0]
# Verify keyword arguments for start and limit
call_kwargs = epics_mixin.search_issues.call_args[1]
assert call_kwargs.get("start") == 5
assert call_kwargs.get("limit") == 10
# Verify result
assert len(result) == 2
assert result[0].key == "TEST-456"
assert result[1].key == "TEST-789"
def test_get_epic_issues_not_epic(self, epics_mixin):
"""Test get_epic_issues when the issue is not an epic."""
# Setup mocks - issue is not an epic
epics_mixin.jira.get_issue.return_value = {
"key": "TEST-123",
"fields": {"issuetype": {"name": "Task"}},
}
# Call the method and expect an error
with pytest.raises(
ValueError, match="Issue TEST-123 is not an Epic, it is a Task"
):
epics_mixin.get_epic_issues("TEST-123")
def test_get_epic_issues_no_results(self, epics_mixin):
"""Test get_epic_issues when no results are found."""
# Setup mocks
epics_mixin.jira.get_issue.return_value = {
"key": "EPIC-123",
"fields": {"issuetype": {"name": "Epic"}},
}
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={"epic_link": "customfield_10014"}
)
# Make search_issues return empty results
epics_mixin.search_issues = MagicMock(return_value=[])
# Call the method
result = epics_mixin.get_epic_issues("EPIC-123")
# Verify the result is an empty list
assert isinstance(result, list)
assert not result
def test_get_epic_issues_fallback_jql(self, epics_mixin):
"""Test get_epic_issues with fallback JQL queries."""
# Setup mocks
epics_mixin.jira.get_issue.return_value = {
"key": "EPIC-123",
"fields": {"issuetype": {"name": "Epic"}},
}
epics_mixin.get_field_ids_to_epic = MagicMock(
return_value={"epic_link": "customfield_10014", "parent": "parent"}
)
# Create a mock class for SearchResult
class MockSearchResult:
def __init__(self, issues: list[JiraIssue]):
self.issues = issues
def __bool__(self):
return bool(self.issues)
def __len__(self):
return len(self.issues)
# Mock search_issues to return empty results for issueFunction but results for epic_link
def search_side_effect(jql, **kwargs):
if "issueFunction" in jql:
return MockSearchResult([]) # No results for issueFunction
if "customfield_10014" in jql:
# Return results for customfield query
return MockSearchResult(
[
JiraIssue(key="CHILD-1", summary="Child 1"),
JiraIssue(key="CHILD-2", summary="Child 2"),
]
)
msg = f"Unexpected JQL query as {jql}"
raise KeyError(msg)
epics_mixin.search_issues = MagicMock(side_effect=search_side_effect)
# Call the method with start parameter
result = epics_mixin.get_epic_issues("EPIC-123", start=3, limit=10)
# Verify we got results from the second query
assert len(result) == 2
assert result[0].key == "CHILD-1"
assert result[1].key == "CHILD-2"
# Verify the start parameter was passed to search_issues
assert epics_mixin.search_issues.call_count >= 2
# Check last call (which should be the one that returned results)
last_call_kwargs = epics_mixin.search_issues.call_args[1]
assert last_call_kwargs.get("start") == 3
assert last_call_kwargs.get("limit") == 10
def test_get_epic_issues_api_error(self, epics_mixin: EpicsMixin):
"""Test get_epic_issues with API error."""
# Setup mocks - simulate API error
epics_mixin.jira.get_issue.side_effect = Exception("API error")
# Call the method and expect an error
with pytest.raises(
Exception,
match="Error getting epic issues: API error",
):
epics_mixin.get_epic_issues("EPIC-123")
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/issue.py:
--------------------------------------------------------------------------------
```python
"""
Jira issue models.
This module provides Pydantic models for Jira issues.
"""
import logging
import re
from typing import Any, Literal
from pydantic import Field
from ..base import ApiModel, TimestampMixin
from ..constants import (
EMPTY_STRING,
JIRA_DEFAULT_ID,
JIRA_DEFAULT_KEY,
)
from .comment import JiraComment
from .common import (
JiraAttachment,
JiraChangelog,
JiraIssueType,
JiraPriority,
JiraResolution,
JiraStatus,
JiraTimetracking,
JiraUser,
)
from .link import JiraIssueLink
from .project import JiraProject
logger = logging.getLogger(__name__)
# Extended epic field name patterns to support more variations
EPIC_NAME_PATTERNS = [
r"epic\s*name",
r"epic[._-]?name",
r"epicname",
]
EPIC_LINK_PATTERNS = [
r"epic\s*link",
r"epic[._-]?link",
r"Parent Link",
r"parent\s*link",
r"epiclink",
]
class JiraIssue(ApiModel, TimestampMixin):
"""
Model representing a Jira issue.
This is a comprehensive model containing all the common fields
for Jira issues and related metadata.
"""
id: str = JIRA_DEFAULT_ID
key: str = JIRA_DEFAULT_KEY
summary: str = EMPTY_STRING
description: str | None = None
created: str = EMPTY_STRING
updated: str = EMPTY_STRING
status: JiraStatus | None = None
issue_type: JiraIssueType | None = None
priority: JiraPriority | None = None
assignee: JiraUser | None = None
reporter: JiraUser | None = None
labels: list[str] = Field(default_factory=list)
components: list[str] = Field(default_factory=list)
comments: list[JiraComment] = Field(default_factory=list)
attachments: list[JiraAttachment] = Field(default_factory=list)
timetracking: JiraTimetracking | None = None
url: str | None = None
epic_key: str | None = None
epic_name: str | None = None
fix_versions: list[str] = Field(default_factory=list)
custom_fields: dict[str, Any] = Field(default_factory=dict)
requested_fields: Literal["*all"] | list[str] | None = None
project: JiraProject | None = None
resolution: JiraResolution | None = None
duedate: str | None = None
resolutiondate: str | None = None
parent: dict | None = None
subtasks: list[dict] = Field(default_factory=list)
security: dict | None = None
worklog: dict | None = None
changelogs: list[JiraChangelog] = Field(default_factory=list)
issuelinks: list[JiraIssueLink] = Field(default_factory=list)
def __getattribute__(self, name: str) -> Any:
"""
Custom attribute access to handle custom field access.
This allows accessing custom fields by their name as if they were
regular attributes of the JiraIssue class.
Args:
name: The attribute name to access
Returns:
The attribute value or custom field value
"""
# First try to get the attribute normally
try:
return super().__getattribute__(name)
except AttributeError:
# If the attribute doesn't exist, check if it's a custom field
try:
custom_fields = super().__getattribute__("custom_fields")
if name in custom_fields:
return custom_fields[name]
except AttributeError:
pass
# Re-raise the original AttributeError
raise
@property
def page_content(self) -> str | None:
"""
Get the page content from the description.
This is a convenience property for treating Jira issues as documentation pages.
Returns:
The description text or None
"""
# Return description without modification for now
# In the future, we could parse ADF content here
return self.description
@staticmethod
def _find_custom_field_in_api_response(
fields: dict[str, Any], name_patterns: list[str]
) -> Any:
"""
Find a custom field by name patterns in the raw API response.
Used during object creation from API response to extract fields
before the JiraIssue object is instantiated.
Args:
fields: The fields dictionary from the Jira API
name_patterns: List of field name patterns to search for
Returns:
The custom field value or None
"""
if not fields or not isinstance(fields, dict):
return None
# Normalize all patterns for easier matching
normalized_patterns = []
for pattern in name_patterns:
norm_pattern = pattern.lower()
norm_pattern = re.sub(r"[_\-\s]", "", norm_pattern)
normalized_patterns.append(norm_pattern)
custom_field_id = None
# Check if fields has a names fields
names_dict = fields.get("names", {})
if isinstance(names_dict, dict):
for field_id, field_name in names_dict.items():
field_name_norm = re.sub(r"[_\-\s]", "", field_name.lower())
for norm_pattern in normalized_patterns:
if norm_pattern in field_name_norm:
custom_field_id = field_id
break
if custom_field_id:
break
else:
logger.debug("No names dict found in fields", exc_info=True)
# Look at field metadata if name method didn't work
if not custom_field_id:
schema = fields.get("schema", {})
if schema and isinstance(schema, dict) and "fields" in schema:
schema_fields = schema["fields"]
if isinstance(schema_fields, dict):
for field_id, field_info in schema_fields.items():
if not field_id.startswith("customfield_"):
continue
if isinstance(field_info, dict) and "name" in field_info:
field_name = field_info["name"].lower()
field_name_norm = re.sub(r"[_\-\s]", "", field_name)
for norm_pattern in normalized_patterns:
if norm_pattern in field_name_norm:
custom_field_id = field_id
break
if custom_field_id:
break
# Try direct matching of field IDs for common epic fields
if not custom_field_id:
has_epic_link_pattern = any("epiclink" in p for p in normalized_patterns)
has_epic_name_pattern = any("epicname" in p for p in normalized_patterns)
if has_epic_link_pattern:
for field_id in fields:
if field_id.startswith("customfield_") and field_id.endswith("14"):
custom_field_id = field_id
break
elif has_epic_name_pattern:
for field_id in fields:
if field_id.startswith("customfield_") and field_id.endswith("11"):
custom_field_id = field_id
break
# Last attempt - look through all custom fields for names in their values
if not custom_field_id:
for field_id, field_value in fields.items():
if not field_id.startswith("customfield_"):
continue
field_name = None
if isinstance(field_value, dict) and "name" in field_value:
field_name = field_value.get("name", "").lower()
elif isinstance(field_value, dict) and "key" in field_value:
field_name = field_value.get("key", "").lower()
if not field_name:
continue
field_name_norm = re.sub(r"[_\-\s]", "", field_name)
for norm_pattern in normalized_patterns:
if norm_pattern in field_name_norm:
custom_field_id = field_id
break
if custom_field_id:
break
if custom_field_id and custom_field_id in fields:
return fields[custom_field_id]
return None
@classmethod
def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssue":
"""
Create a JiraIssue from a Jira API response.
Args:
data: The issue data from the Jira API
**kwargs: Additional arguments to pass to the constructor
Returns:
A JiraIssue instance
"""
if not data:
return cls()
# Handle non-dictionary data by returning a default instance
if not isinstance(data, dict):
logger.debug("Received non-dictionary data, returning default instance")
return cls()
fields = data.get("fields", {})
if not isinstance(fields, dict):
fields = {}
# Get required simple fields
issue_id = str(data.get("id", JIRA_DEFAULT_ID))
key = str(data.get("key", JIRA_DEFAULT_KEY))
summary = str(fields.get("summary", EMPTY_STRING))
description = fields.get("description")
# Timestamps
created = str(fields.get("created", EMPTY_STRING))
updated = str(fields.get("updated", EMPTY_STRING))
# Extract assignee data
assignee = None
assignee_data = fields.get("assignee")
if assignee_data:
assignee = JiraUser.from_api_response(assignee_data)
# Extract reporter data
reporter = None
reporter_data = fields.get("reporter")
if reporter_data:
reporter = JiraUser.from_api_response(reporter_data)
# Extract status data
status = None
status_data = fields.get("status")
if status_data:
status = JiraStatus.from_api_response(status_data)
# Extract issue type data
issue_type = None
issue_type_data = fields.get("issuetype")
if issue_type_data:
issue_type = JiraIssueType.from_api_response(issue_type_data)
# Extract priority data
priority = None
priority_data = fields.get("priority")
if priority_data:
priority = JiraPriority.from_api_response(priority_data)
# Extract project data
project = None
project_data = fields.get("project")
if isinstance(project_data, dict):
project = JiraProject.from_api_response(project_data)
resolution = None
resolution_data = fields.get("resolution")
if isinstance(resolution_data, dict):
resolution = JiraResolution.from_api_response(resolution_data)
duedate = (
fields.get("duedate") if isinstance(fields.get("duedate"), str) else None
)
resolutiondate = (
fields.get("resolutiondate")
if isinstance(fields.get("resolutiondate"), str)
else None
)
parent = (
fields.get("parent") if isinstance(fields.get("parent"), dict) else None
)
# Ensure subtasks is a list of dicts
subtasks_raw = fields.get("subtasks", [])
subtasks = (
[st for st in subtasks_raw if isinstance(st, dict)]
if isinstance(subtasks_raw, list)
else []
)
security = (
fields.get("security") if isinstance(fields.get("security"), dict) else None
)
worklog = (
fields.get("worklog") if isinstance(fields.get("worklog"), dict) else None
)
# Lists of strings
labels = []
if labels_data := fields.get("labels"):
if isinstance(labels_data, list):
labels = [str(label) for label in labels_data if label]
components = []
if components_data := fields.get("components"):
if isinstance(components_data, list):
components = [
str(comp.get("name", "")) if isinstance(comp, dict) else str(comp)
for comp in components_data
if comp
]
fix_versions = []
if fix_versions_data := fields.get("fixVersions"):
if isinstance(fix_versions_data, list):
fix_versions = [
str(version.get("name", ""))
if isinstance(version, dict)
else str(version)
for version in fix_versions_data
if version
]
# Handling comments
comments = []
comments_field = fields.get("comment", {})
if isinstance(comments_field, dict) and "comments" in comments_field:
comments_data = comments_field["comments"]
if isinstance(comments_data, list):
comments = [
JiraComment.from_api_response(comment)
for comment in comments_data
if comment
]
# Handling changelogs
changelogs = []
changelogs_data = data.get("changelog", {})
if isinstance(changelogs_data, dict) and "histories" in changelogs_data:
changelogs = [
JiraChangelog.from_api_response(history)
for history in changelogs_data["histories"]
]
# Handling attachments
attachments = []
attachments_data = fields.get("attachment", [])
if isinstance(attachments_data, list):
attachments = [
JiraAttachment.from_api_response(attachment)
for attachment in attachments_data
if attachment
]
# Timetracking
timetracking = None
timetracking_data = fields.get("timetracking")
if timetracking_data:
timetracking = JiraTimetracking.from_api_response(timetracking_data)
# URL
url = data.get("self") # API URL for the issue
# Try to find epic fields (varies by Jira instance)
epic_key = None
epic_name = None
# Check for "Epic Link" field
epic_link = cls._find_custom_field_in_api_response(
fields, ["epic link", "parent epic"]
)
if isinstance(epic_link, str):
epic_key = epic_link
# Check for "Epic Name" field
epic_name_value = cls._find_custom_field_in_api_response(fields, ["epic name"])
if isinstance(epic_name_value, str):
epic_name = epic_name_value
# Store custom fields
custom_fields = {}
fields_name_map = data.get("names", {})
for orig_field_id, orig_field_value in fields.items():
if orig_field_id.startswith("customfield_"):
value_obj_to_store = {"value": orig_field_value}
human_readable_name = fields_name_map.get(orig_field_id)
if human_readable_name:
value_obj_to_store["name"] = human_readable_name
custom_fields[orig_field_id] = value_obj_to_store
# Handle requested_fields parameter
requested_fields_param = kwargs.get("requested_fields")
# Convert string requested_fields to list (except "*all")
if isinstance(requested_fields_param, str) and requested_fields_param != "*all":
requested_fields_param = requested_fields_param.split(",")
# Strip whitespace from each field name
requested_fields_param = [field.strip() for field in requested_fields_param]
# Create the issue instance with all the extracted data
return cls(
id=issue_id,
key=key,
summary=summary,
description=description,
created=created,
updated=updated,
status=status,
issue_type=issue_type,
priority=priority,
assignee=assignee,
reporter=reporter,
project=project,
resolution=resolution,
duedate=duedate,
resolutiondate=resolutiondate,
parent=parent,
subtasks=subtasks,
security=security,
worklog=worklog,
labels=labels,
components=components,
comments=comments,
attachments=attachments,
timetracking=timetracking,
url=url,
epic_key=epic_key,
epic_name=epic_name,
fix_versions=fix_versions,
custom_fields=custom_fields,
requested_fields=requested_fields_param,
changelogs=changelogs,
issuelinks=cls._extract_issue_links(fields),
)
def to_simplified_dict(self) -> dict[str, Any]:
"""Convert to simplified dictionary for API response."""
result: dict[str, Any] = {
"id": self.id,
"key": self.key,
}
# Helper method to check if a field should be included
def should_include_field(field_name: str) -> bool:
return (
self.requested_fields == "*all"
or not isinstance(self.requested_fields, list)
or field_name in self.requested_fields
)
# Add summary if requested
if should_include_field("summary"):
result["summary"] = self.summary
# Add URL if available and requested
if self.url and should_include_field("url"):
result["url"] = self.url
# Add description if available and requested
if self.description and should_include_field("description"):
result["description"] = self.description
# Add status if available and requested
if self.status and should_include_field("status"):
result["status"] = self.status.to_simplified_dict()
# Add issue type if available and requested
if self.issue_type and should_include_field("issue_type"):
result["issue_type"] = self.issue_type.to_simplified_dict()
# Add priority if available and requested
if self.priority and should_include_field("priority"):
result["priority"] = self.priority.to_simplified_dict()
# Add project info if available and requested
if self.project and should_include_field("project"):
result["project"] = self.project.to_simplified_dict()
# Add resolution if available and requested
if self.resolution and should_include_field("resolution"):
result["resolution"] = self.resolution.to_simplified_dict()
# Add dates if available and requested
if self.duedate and should_include_field("duedate"):
result["duedate"] = self.duedate
if self.resolutiondate and should_include_field("resolutiondate"):
result["resolutiondate"] = self.resolutiondate
# Add parent and subtasks if available and requested
if self.parent and should_include_field("parent"):
result["parent"] = self.parent
if self.subtasks and should_include_field("subtasks"):
result["subtasks"] = self.subtasks
# Add security and worklog if available and requested
if self.security and should_include_field("security"):
result["security"] = self.security
if self.worklog and should_include_field("worklog"):
result["worklog"] = self.worklog
# Add assignee if requested
if should_include_field("assignee"):
if self.assignee:
result["assignee"] = self.assignee.to_simplified_dict()
else:
result["assignee"] = {"display_name": "Unassigned"}
# Add reporter if available and requested
if self.reporter and should_include_field("reporter"):
result["reporter"] = self.reporter.to_simplified_dict()
# Add lists if available and requested
if self.labels and should_include_field("labels"):
result["labels"] = self.labels
if self.components and should_include_field("components"):
result["components"] = self.components
if self.fix_versions and should_include_field("fix_versions"):
result["fix_versions"] = self.fix_versions
# Add epic fields if available and requested
if self.epic_key and should_include_field("epic_key"):
result["epic_key"] = self.epic_key
if self.epic_name and should_include_field("epic_name"):
result["epic_name"] = self.epic_name
# Add time tracking if available and requested
if self.timetracking and should_include_field("timetracking"):
result["timetracking"] = self.timetracking.to_simplified_dict()
# Add created and updated timestamps if available and requested
if self.created and should_include_field("created"):
result["created"] = self.created
if self.updated and should_include_field("updated"):
result["updated"] = self.updated
# Add comments if available and requested
if self.comments and should_include_field("comment"):
result["comments"] = [
comment.to_simplified_dict() for comment in self.comments
]
# Add attachments if available and requested
if self.attachments and should_include_field("attachment"):
result["attachments"] = [
attachment.to_simplified_dict() for attachment in self.attachments
]
# Not use should_include_field since you won't get changelogs
# if you don't ask for them
if self.changelogs:
result["changelogs"] = [
changelog.to_simplified_dict() for changelog in self.changelogs
]
# Add issue links if available and requested
if self.issuelinks and should_include_field("issuelinks"):
result["issuelinks"] = [
link.to_simplified_dict() for link in self.issuelinks
]
# Process custom fields
if self.custom_fields:
if self.requested_fields == "*all":
for internal_id, field_data_obj in self.custom_fields.items():
processed_value = self._process_custom_field_value(
field_data_obj.get("value")
)
output_value_obj = {"value": processed_value}
if "name" in field_data_obj:
output_value_obj["name"] = field_data_obj["name"]
result[internal_id] = output_value_obj
elif isinstance(self.requested_fields, list):
for requested_key_or_name in self.requested_fields:
found_by_id_or_name = False
if (
requested_key_or_name.startswith("customfield_")
and requested_key_or_name in self.custom_fields
):
field_data_obj = self.custom_fields[requested_key_or_name]
output_value_obj = {
"value": self._process_custom_field_value(
field_data_obj.get("value")
)
}
if "name" in field_data_obj:
output_value_obj["name"] = field_data_obj["name"]
result[requested_key_or_name] = output_value_obj
found_by_id_or_name = True
else:
for internal_id, field_data_obj in self.custom_fields.items():
if (
field_data_obj.get("name", "").lower()
== requested_key_or_name.lower()
):
output_value_obj = {
"value": self._process_custom_field_value(
field_data_obj.get("value")
)
}
output_value_obj["name"] = field_data_obj["name"]
result[internal_id] = output_value_obj
found_by_id_or_name = True
break
if not found_by_id_or_name and requested_key_or_name.startswith(
"cf_"
):
full_id = "customfield_" + requested_key_or_name[3:]
if full_id in self.custom_fields:
field_data_obj = self.custom_fields[full_id]
output_value_obj = {
"value": self._process_custom_field_value(
field_data_obj.get("value")
)
}
if "name" in field_data_obj:
output_value_obj["name"] = field_data_obj["name"]
result[full_id] = output_value_obj
return {k: v for k, v in result.items() if v is not None}
def _process_custom_field_value(self, field_value: Any) -> Any:
"""
Process a custom field value for simplified dict output.
Args:
field_value: The value to process
Returns:
Processed value suitable for API response
"""
if field_value is None or isinstance(field_value, str | int | float | bool):
return field_value
if isinstance(field_value, dict):
# For single-select, user pickers, etc., try to extract 'value' or 'name'
if "value" in field_value:
return field_value["value"]
elif "name" in field_value:
return field_value["name"]
return field_value
if isinstance(field_value, list):
return [self._process_custom_field_value(item) for item in field_value]
return str(field_value)
def _find_custom_field_in_issue(
self, name: str, pattern: bool = False
) -> tuple[str | None, Any]:
"""
Find a custom field by name or pattern in an instantiated JiraIssue.
Used by instance methods like _get_epic_name and _get_epic_link
to search through the custom_fields dictionary of an existing issue.
Args:
name: The name to search for
pattern: If True, use regex pattern matching
Returns:
A tuple of (field_id, field_value) or (None, None) if not found
"""
if not self.custom_fields:
return None, None
# Check if fields has a names() method (some implementations have this)
names_dict = self.custom_fields.get("names", {})
if isinstance(names_dict, dict):
for field_id, field_name in names_dict.items():
if (pattern and re.search(name, field_name, re.IGNORECASE)) or (
not pattern and field_name.lower() == name.lower()
):
return field_id, self.custom_fields.get(field_id)
else:
logger.debug("No names dict found in custom fields", exc_info=True)
# Check field metadata for name (custom fields usually have a name)
for field_id, field_value in self.custom_fields.items():
if not field_id.startswith("customfield_"):
continue
# Custom fields can have a schema with a name
if isinstance(field_value, dict) and field_value.get("name"):
field_name = field_value.get("name")
if field_name and (
(pattern and re.search(name, field_name, re.IGNORECASE))
or (not pattern and field_name.lower() == name.lower())
):
return field_id, field_value
# Fallback: Directly look for keys that match the pattern
if pattern:
for field_id, field_value in self.custom_fields.items():
if re.search(name, field_id, re.IGNORECASE):
return field_id, field_value
return None, None
def _get_epic_name(self) -> str | None:
"""Get the epic name from custom fields if available."""
# Try each pattern in order
for pattern in EPIC_NAME_PATTERNS:
field_id, field_value = self._find_custom_field_in_issue(
pattern, pattern=True
)
if field_id and field_value:
if isinstance(field_value, dict):
return field_value.get("value") or field_value.get("name")
return str(field_value)
return None
def _get_epic_link(self) -> str | None:
"""Get the epic link from custom fields if available."""
# Try each pattern in order
for pattern in EPIC_LINK_PATTERNS:
field_id, field_value = self._find_custom_field_in_issue(
pattern, pattern=True
)
if field_id and field_value:
# Handle different possible value types
if isinstance(field_value, dict):
return field_value.get("key") or field_value.get("value")
return str(field_value)
return None
@staticmethod
def _extract_issue_links(fields: dict[str, Any]) -> list[JiraIssueLink]:
"""
Extract issue links from fields.
Args:
fields: The fields dictionary from the Jira API
Returns:
List of JiraIssueLink objects
"""
if not fields or not isinstance(fields, dict):
return []
issuelinks_data = fields.get("issuelinks", [])
if not isinstance(issuelinks_data, list):
return []
return [
JiraIssueLink.from_api_response(link_data)
for link_data in issuelinks_data
if link_data
]
```
--------------------------------------------------------------------------------
/tests/unit/servers/test_dependencies.py:
--------------------------------------------------------------------------------
```python
"""Unit tests for server dependencies module."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from mcp_atlassian.confluence import ConfluenceConfig, ConfluenceFetcher
from mcp_atlassian.jira import JiraConfig, JiraFetcher
from mcp_atlassian.servers.context import MainAppContext
from mcp_atlassian.servers.dependencies import (
_create_user_config_for_fetcher,
get_confluence_fetcher,
get_jira_fetcher,
)
from mcp_atlassian.utils.oauth import OAuthConfig
from tests.utils.assertions import assert_mock_called_with_partial
from tests.utils.factories import AuthConfigFactory
from tests.utils.mocks import MockFastMCP
# Configure pytest for async tests
pytestmark = pytest.mark.anyio
@pytest.fixture
def config_factory():
"""Factory for creating various configuration objects."""
class ConfigFactory:
@staticmethod
def create_jira_config(auth_type="basic", **overrides):
"""Create a JiraConfig instance."""
defaults = {
"url": "https://test.atlassian.net",
"auth_type": auth_type,
"ssl_verify": True,
"http_proxy": None,
"https_proxy": None,
"no_proxy": None,
"socks_proxy": None,
"projects_filter": ["TEST"],
}
if auth_type == "basic":
defaults.update(
{"username": "test_username", "api_token": "test_token"}
)
elif auth_type == "oauth":
defaults["oauth_config"] = ConfigFactory.create_oauth_config()
elif auth_type == "pat":
defaults["personal_token"] = "test_pat_token"
return JiraConfig(**{**defaults, **overrides})
@staticmethod
def create_confluence_config(auth_type="basic", **overrides):
"""Create a ConfluenceConfig instance."""
defaults = {
"url": "https://test.atlassian.net",
"auth_type": auth_type,
"ssl_verify": True,
"http_proxy": None,
"https_proxy": None,
"no_proxy": None,
"socks_proxy": None,
"spaces_filter": ["TEST"],
}
if auth_type == "basic":
defaults.update(
{"username": "test_username", "api_token": "test_token"}
)
elif auth_type == "oauth":
defaults["oauth_config"] = ConfigFactory.create_oauth_config()
elif auth_type == "pat":
defaults["personal_token"] = "test_pat_token"
return ConfluenceConfig(**{**defaults, **overrides})
@staticmethod
def create_oauth_config(**overrides):
"""Create an OAuthConfig instance."""
oauth_data = AuthConfigFactory.create_oauth_config(**overrides)
return OAuthConfig(
client_id=oauth_data["client_id"],
client_secret=oauth_data["client_secret"],
redirect_uri=oauth_data["redirect_uri"],
scope=oauth_data["scope"],
cloud_id=oauth_data["cloud_id"],
access_token=oauth_data["access_token"],
refresh_token=oauth_data["refresh_token"],
expires_at=9999999999.0,
)
@staticmethod
def create_app_context(jira_config=None, confluence_config=None, **overrides):
"""Create a MainAppContext instance."""
defaults = {
"full_jira_config": jira_config or ConfigFactory.create_jira_config(),
"full_confluence_config": confluence_config
or ConfigFactory.create_confluence_config(),
"read_only": False,
"enabled_tools": ["jira_get_issue", "confluence_get_page"],
}
return MainAppContext(**{**defaults, **overrides})
return ConfigFactory()
@pytest.fixture
def mock_context():
"""Create a mock Context instance."""
return MockFastMCP.create_context()
@pytest.fixture
def mock_request():
"""Create a mock Request instance."""
return MockFastMCP.create_request()
@pytest.fixture
def auth_scenarios():
"""Common authentication scenarios for testing."""
return {
"oauth": {
"auth_type": "oauth",
"token": "user-oauth-token",
"email": "[email protected]",
"credential_key": "oauth_access_token",
},
"pat": {
"auth_type": "pat",
"token": "user-pat-token",
"email": "[email protected]",
"credential_key": "personal_access_token",
},
}
def _create_user_credentials(auth_type, token, email="[email protected]"):
"""Helper to create user credentials for testing."""
credentials = {"user_email_context": email}
if auth_type == "oauth":
credentials["oauth_access_token"] = token
elif auth_type == "pat":
credentials["personal_access_token"] = token
return credentials
def _assert_config_attributes(
config, expected_type, expected_auth_type, expected_token=None
):
"""Helper to assert configuration attributes."""
assert isinstance(config, expected_type)
assert config.auth_type == expected_auth_type
if expected_auth_type == "oauth":
assert config.oauth_config is not None
assert config.oauth_config.access_token == expected_token
assert config.username == "[email protected]"
assert config.api_token is None
assert config.personal_token is None
elif expected_auth_type == "pat":
assert config.personal_token == expected_token
assert config.username is None
assert config.api_token is None
assert config.oauth_config is None
class TestCreateUserConfigForFetcher:
"""Tests for _create_user_config_for_fetcher function."""
@pytest.mark.parametrize(
"config_type,auth_type,token",
[
("jira", "oauth", "user-oauth-token"),
("jira", "pat", "user-pat-token"),
("confluence", "oauth", "user-oauth-token"),
("confluence", "pat", "user-pat-token"),
],
)
def test_create_user_config_success(
self, config_factory, config_type, auth_type, token
):
"""Test creating user-specific configs with various auth types."""
# Create base config
if config_type == "jira":
base_config = config_factory.create_jira_config(auth_type=auth_type)
expected_type = JiraConfig
else:
base_config = config_factory.create_confluence_config(auth_type=auth_type)
expected_type = ConfluenceConfig
credentials = _create_user_credentials(auth_type, token)
result = _create_user_config_for_fetcher(
base_config=base_config,
auth_type=auth_type,
credentials=credentials,
)
_assert_config_attributes(result, expected_type, auth_type, token)
if config_type == "jira":
assert result.projects_filter == ["TEST"]
else:
assert result.spaces_filter == ["TEST"]
def test_oauth_auth_type_minimal_config_success(self):
"""Test OAuth auth type with minimal base config (user-provided tokens mode)."""
# Setup minimal base config (empty credentials)
base_oauth_config = OAuthConfig(
client_id="", # Empty client_id (minimal config)
client_secret="", # Empty client_secret (minimal config)
redirect_uri="",
scope="",
cloud_id="",
)
base_config = JiraConfig(
url="https://base.atlassian.net",
auth_type="oauth",
oauth_config=base_oauth_config,
)
# Test with user-provided cloud_id
credentials = {"oauth_access_token": "user-access-token"}
result_config = _create_user_config_for_fetcher(
base_config=base_config,
auth_type="oauth",
credentials=credentials,
cloud_id="user-cloud-id",
)
# Verify the result
assert isinstance(result_config, JiraConfig)
assert result_config.auth_type == "oauth"
assert result_config.oauth_config is not None
assert result_config.oauth_config.access_token == "user-access-token"
assert result_config.oauth_config.cloud_id == "user-cloud-id"
assert (
result_config.oauth_config.client_id == ""
) # Should preserve minimal config
assert (
result_config.oauth_config.client_secret == ""
) # Should preserve minimal config
def test_multi_tenant_config_isolation(self):
"""Test that user configs are completely isolated from each other."""
# Setup minimal base config
base_oauth_config = OAuthConfig(
client_id="", client_secret="", redirect_uri="", scope="", cloud_id=""
)
base_config = JiraConfig(
url="https://base.atlassian.net",
auth_type="oauth",
oauth_config=base_oauth_config,
)
# Create user config for tenant 1
tenant1_credentials = {"oauth_access_token": "tenant1-token"}
tenant1_config = _create_user_config_for_fetcher(
base_config=base_config,
auth_type="oauth",
credentials=tenant1_credentials,
cloud_id="tenant1-cloud-id",
)
# Create user config for tenant 2
tenant2_credentials = {"oauth_access_token": "tenant2-token"}
tenant2_config = _create_user_config_for_fetcher(
base_config=base_config,
auth_type="oauth",
credentials=tenant2_credentials,
cloud_id="tenant2-cloud-id",
)
# Modify tenant1 config
tenant1_config.oauth_config.access_token = "modified-tenant1-token"
tenant1_config.oauth_config.cloud_id = "modified-tenant1-cloud-id"
# Verify tenant2 config remains unchanged
assert tenant2_config.oauth_config.access_token == "tenant2-token"
assert tenant2_config.oauth_config.cloud_id == "tenant2-cloud-id"
# Verify base config remains unchanged
assert base_oauth_config.access_token is None
assert base_oauth_config.cloud_id == ""
# Verify tenant1 config has the modifications
assert tenant1_config.oauth_config.access_token == "modified-tenant1-token"
assert tenant1_config.oauth_config.cloud_id == "modified-tenant1-cloud-id"
@pytest.mark.parametrize(
"auth_type,missing_credential,expected_error",
[
(
"oauth",
"oauth_access_token",
"OAuth access token missing in credentials",
),
("pat", "personal_access_token", "PAT missing in credentials"),
],
)
def test_missing_credentials(
self, config_factory, auth_type, missing_credential, expected_error
):
"""Test error handling for missing credentials."""
base_config = config_factory.create_jira_config(auth_type=auth_type)
credentials = {"user_email_context": "[email protected]"}
with pytest.raises(ValueError, match=expected_error):
_create_user_config_for_fetcher(
base_config=base_config,
auth_type=auth_type,
credentials=credentials,
)
def test_unsupported_auth_type(self, config_factory):
"""Test error handling for unsupported auth types."""
base_config = config_factory.create_jira_config()
credentials = {"user_email_context": "[email protected]"}
with pytest.raises(ValueError, match="Unsupported auth_type 'invalid'"):
_create_user_config_for_fetcher(
base_config=base_config,
auth_type="invalid",
credentials=credentials,
)
def test_missing_oauth_config(self, config_factory):
"""Test error handling for missing OAuth config when auth_type is oauth."""
base_config = config_factory.create_jira_config(
auth_type="basic"
) # No OAuth config
credentials = _create_user_credentials("oauth", "user-oauth-token")
with pytest.raises(ValueError, match="Global OAuth config.*is missing"):
_create_user_config_for_fetcher(
base_config=base_config,
auth_type="oauth",
credentials=credentials,
)
def test_unsupported_base_config_type(self):
"""Test error handling for unsupported base config types."""
class UnsupportedConfig:
def __init__(self):
self.url = "https://test.atlassian.net"
self.ssl_verify = True
self.http_proxy = None
self.https_proxy = None
self.no_proxy = None
self.socks_proxy = None
base_config = UnsupportedConfig()
credentials = _create_user_credentials("pat", "test-token")
with pytest.raises(TypeError, match="Unsupported base_config type"):
_create_user_config_for_fetcher(
base_config=base_config,
auth_type="pat",
credentials=credentials,
)
def _setup_mock_request_state(mock_request, auth_scenario=None, cached_fetcher=None):
"""Helper to setup mock request state."""
if cached_fetcher:
mock_request.state.jira_fetcher = cached_fetcher
mock_request.state.confluence_fetcher = cached_fetcher
return
mock_request.state.jira_fetcher = None
mock_request.state.confluence_fetcher = None
if auth_scenario:
mock_request.state.user_atlassian_auth_type = auth_scenario["auth_type"]
mock_request.state.user_atlassian_token = auth_scenario["token"]
mock_request.state.user_atlassian_email = auth_scenario["email"]
else:
mock_request.state.user_atlassian_auth_type = None
mock_request.state.user_atlassian_token = None
mock_request.state.user_atlassian_email = None
def _setup_mock_context(mock_context, app_context):
"""Helper to setup mock context with app context."""
mock_context.request_context.lifespan_context = {
"app_lifespan_context": app_context
}
def _create_mock_fetcher(fetcher_class, validation_return=None, validation_error=None):
"""Helper to create mock fetcher with validation behavior."""
mock_fetcher = MagicMock(spec=fetcher_class)
if fetcher_class == JiraFetcher:
if validation_error:
mock_fetcher.get_current_user_account_id.side_effect = validation_error
else:
mock_fetcher.get_current_user_account_id.return_value = (
validation_return or "test-account-id"
)
elif fetcher_class == ConfluenceFetcher:
if validation_error:
mock_fetcher.get_current_user_info.side_effect = validation_error
else:
mock_fetcher.get_current_user_info.return_value = validation_return or {
"email": "[email protected]",
"displayName": "Test User",
}
return mock_fetcher
class TestGetJiraFetcher:
"""Tests for get_jira_fetcher function."""
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.JiraFetcher")
async def test_cached_fetcher_returned(
self, mock_jira_fetcher_class, mock_get_http_request, mock_context, mock_request
):
"""Test returning cached JiraFetcher from request state."""
cached_fetcher = MagicMock(spec=JiraFetcher)
_setup_mock_request_state(mock_request, cached_fetcher=cached_fetcher)
mock_get_http_request.return_value = mock_request
result = await get_jira_fetcher(mock_context)
assert result == cached_fetcher
mock_jira_fetcher_class.assert_not_called()
@pytest.mark.parametrize("scenario_key", ["oauth", "pat"])
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.JiraFetcher")
async def test_user_specific_fetcher_creation(
self,
mock_jira_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
auth_scenarios,
scenario_key,
):
"""Test creating user-specific JiraFetcher with different auth types."""
scenario = auth_scenarios[scenario_key]
# Setup request state
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
# Setup context
jira_config = config_factory.create_jira_config(auth_type=scenario["auth_type"])
confluence_config = config_factory.create_confluence_config(
auth_type=scenario["auth_type"]
)
app_context = config_factory.create_app_context(jira_config, confluence_config)
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher
mock_fetcher = _create_mock_fetcher(JiraFetcher)
mock_jira_fetcher_class.return_value = mock_fetcher
result = await get_jira_fetcher(mock_context)
assert result == mock_fetcher
assert mock_request.state.jira_fetcher == mock_fetcher
mock_jira_fetcher_class.assert_called_once()
# Verify the config passed to JiraFetcher
called_config = mock_jira_fetcher_class.call_args[1]["config"]
assert called_config.auth_type == scenario["auth_type"]
if scenario["auth_type"] == "oauth":
assert called_config.oauth_config.access_token == scenario["token"]
elif scenario["auth_type"] == "pat":
assert called_config.personal_token == scenario["token"]
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.JiraFetcher")
async def test_global_fallback_scenarios(
self,
mock_jira_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
):
"""Test fallback to global JiraFetcher in various scenarios."""
# Test both HTTP context without user token and non-HTTP context
test_scenarios = [
{"name": "no_user_token", "setup_http": True, "user_auth": None},
{"name": "no_http_context", "setup_http": False, "user_auth": None},
]
for scenario in test_scenarios:
# Setup request state
if scenario["setup_http"]:
_setup_mock_request_state(mock_request)
mock_get_http_request.return_value = mock_request
else:
mock_get_http_request.side_effect = RuntimeError("No HTTP context")
# Setup context
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher
mock_fetcher = _create_mock_fetcher(JiraFetcher)
mock_jira_fetcher_class.return_value = mock_fetcher
result = await get_jira_fetcher(mock_context)
assert result == mock_fetcher
assert_mock_called_with_partial(
mock_jira_fetcher_class, config=app_context.full_jira_config
)
# Reset mocks for next iteration
mock_jira_fetcher_class.reset_mock()
mock_get_http_request.reset_mock()
@pytest.mark.parametrize(
"error_scenario,expected_error_match",
[
("missing_global_config", "Jira client \\(fetcher\\) not available"),
("empty_user_token", "User Atlassian token found in state but is empty"),
("validation_failure", "Invalid user Jira token or configuration"),
(
"missing_lifespan_context",
"Jira global configuration.*is not available from lifespan context",
),
],
)
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.JiraFetcher")
async def test_error_scenarios(
self,
mock_jira_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
auth_scenarios,
error_scenario,
expected_error_match,
):
"""Test various error scenarios."""
if error_scenario == "missing_global_config":
mock_get_http_request.side_effect = RuntimeError("No HTTP context")
mock_context.request_context.lifespan_context = {}
elif error_scenario == "empty_user_token":
scenario = auth_scenarios["oauth"].copy()
scenario["token"] = "" # Empty token
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
elif error_scenario == "validation_failure":
scenario = auth_scenarios["pat"]
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher to fail validation
mock_fetcher = _create_mock_fetcher(
JiraFetcher, validation_error=Exception("Invalid token")
)
mock_jira_fetcher_class.return_value = mock_fetcher
elif error_scenario == "missing_lifespan_context":
scenario = auth_scenarios["oauth"]
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
mock_context.request_context.lifespan_context = {}
with pytest.raises(ValueError, match=expected_error_match):
await get_jira_fetcher(mock_context)
class TestGetConfluenceFetcher:
"""Tests for get_confluence_fetcher function."""
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.ConfluenceFetcher")
async def test_cached_fetcher_returned(
self,
mock_confluence_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
):
"""Test returning cached ConfluenceFetcher from request state."""
cached_fetcher = MagicMock(spec=ConfluenceFetcher)
_setup_mock_request_state(mock_request, cached_fetcher=cached_fetcher)
mock_get_http_request.return_value = mock_request
result = await get_confluence_fetcher(mock_context)
assert result == cached_fetcher
mock_confluence_fetcher_class.assert_not_called()
@pytest.mark.parametrize("scenario_key", ["oauth", "pat"])
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.ConfluenceFetcher")
async def test_user_specific_fetcher_creation(
self,
mock_confluence_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
auth_scenarios,
scenario_key,
):
"""Test creating user-specific ConfluenceFetcher with different auth types."""
scenario = auth_scenarios[scenario_key]
# Setup request state
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
# Setup context
jira_config = config_factory.create_jira_config(auth_type=scenario["auth_type"])
confluence_config = config_factory.create_confluence_config(
auth_type=scenario["auth_type"]
)
app_context = config_factory.create_app_context(jira_config, confluence_config)
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher
mock_fetcher = _create_mock_fetcher(ConfluenceFetcher)
mock_confluence_fetcher_class.return_value = mock_fetcher
result = await get_confluence_fetcher(mock_context)
assert result == mock_fetcher
assert mock_request.state.confluence_fetcher == mock_fetcher
mock_confluence_fetcher_class.assert_called_once()
# Verify the config passed to ConfluenceFetcher
called_config = mock_confluence_fetcher_class.call_args[1]["config"]
assert called_config.auth_type == scenario["auth_type"]
if scenario["auth_type"] == "oauth":
assert called_config.oauth_config.access_token == scenario["token"]
elif scenario["auth_type"] == "pat":
assert called_config.personal_token == scenario["token"]
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.ConfluenceFetcher")
async def test_global_fallback_scenarios(
self,
mock_confluence_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
):
"""Test fallback to global ConfluenceFetcher in various scenarios."""
# Test both HTTP context without user token and non-HTTP context
test_scenarios = [
{"name": "no_user_token", "setup_http": True, "user_auth": None},
{"name": "no_http_context", "setup_http": False, "user_auth": None},
]
for scenario in test_scenarios:
# Setup request state
if scenario["setup_http"]:
_setup_mock_request_state(mock_request)
mock_get_http_request.return_value = mock_request
else:
mock_get_http_request.side_effect = RuntimeError("No HTTP context")
# Setup context
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher
mock_fetcher = _create_mock_fetcher(ConfluenceFetcher)
mock_confluence_fetcher_class.return_value = mock_fetcher
result = await get_confluence_fetcher(mock_context)
assert result == mock_fetcher
assert_mock_called_with_partial(
mock_confluence_fetcher_class, config=app_context.full_confluence_config
)
# Reset mocks for next iteration
mock_confluence_fetcher_class.reset_mock()
mock_get_http_request.reset_mock()
@pytest.mark.parametrize(
"email_scenario,expected_email",
[
("derive_email", "[email protected]"),
("preserve_existing", "[email protected]"),
],
)
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.ConfluenceFetcher")
async def test_email_derivation_behavior(
self,
mock_confluence_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
auth_scenarios,
email_scenario,
expected_email,
):
"""Test email derivation behavior in different scenarios."""
scenario = auth_scenarios["pat"].copy()
if email_scenario == "derive_email":
scenario["email"] = None # No existing email
user_info_email = "[email protected]"
else: # preserve_existing
scenario["email"] = "[email protected]"
user_info_email = "[email protected]"
# Setup request state
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
# Setup context
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher with specific user info
mock_fetcher = _create_mock_fetcher(
ConfluenceFetcher,
validation_return={
"email": user_info_email,
"displayName": "Test User",
},
)
mock_confluence_fetcher_class.return_value = mock_fetcher
result = await get_confluence_fetcher(mock_context)
assert result == mock_fetcher
assert mock_request.state.confluence_fetcher == mock_fetcher
assert mock_request.state.user_atlassian_email == expected_email
@pytest.mark.parametrize(
"error_scenario,expected_error_match",
[
("missing_global_config", "Confluence client \\(fetcher\\) not available"),
("empty_user_token", "User Atlassian token found in state but is empty"),
("validation_failure", "Invalid user Confluence token or configuration"),
(
"missing_lifespan_context",
"Confluence global configuration.*is not available from lifespan context",
),
],
)
@patch("mcp_atlassian.servers.dependencies.get_http_request")
@patch("mcp_atlassian.servers.dependencies.ConfluenceFetcher")
async def test_error_scenarios(
self,
mock_confluence_fetcher_class,
mock_get_http_request,
mock_context,
mock_request,
config_factory,
auth_scenarios,
error_scenario,
expected_error_match,
):
"""Test various error scenarios."""
if error_scenario == "missing_global_config":
mock_get_http_request.side_effect = RuntimeError("No HTTP context")
mock_context.request_context.lifespan_context = {}
elif error_scenario == "empty_user_token":
scenario = auth_scenarios["oauth"].copy()
scenario["token"] = "" # Empty token
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
elif error_scenario == "validation_failure":
scenario = auth_scenarios["pat"]
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
app_context = config_factory.create_app_context()
_setup_mock_context(mock_context, app_context)
# Setup mock fetcher to fail validation
mock_fetcher = _create_mock_fetcher(
ConfluenceFetcher, validation_error=Exception("Invalid token")
)
mock_confluence_fetcher_class.return_value = mock_fetcher
elif error_scenario == "missing_lifespan_context":
scenario = auth_scenarios["oauth"]
_setup_mock_request_state(mock_request, scenario)
mock_get_http_request.return_value = mock_request
mock_context.request_context.lifespan_context = {}
with pytest.raises(ValueError, match=expected_error_match):
await get_confluence_fetcher(mock_context)
```