This is page 7 of 13. Use http://codebase.md/sooperset/mcp-atlassian?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/integration/test_real_api.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Integration tests with real Atlassian APIs.
3 |
4 | These tests are skipped by default and only run with --integration --use-real-data flags.
5 | They require proper environment configuration and will create/modify real data.
6 | """
7 |
8 | import os
9 | import time
10 | import uuid
11 |
12 | import pytest
13 |
14 | from mcp_atlassian.confluence import ConfluenceFetcher
15 | from mcp_atlassian.confluence.config import ConfluenceConfig
16 | from mcp_atlassian.jira import JiraFetcher
17 | from mcp_atlassian.jira.config import JiraConfig
18 | from tests.utils.base import BaseAuthTest
19 |
20 |
21 | @pytest.mark.integration
22 | class TestRealJiraAPI(BaseAuthTest):
23 | """Real Jira API integration tests with cleanup."""
24 |
25 | @pytest.fixture(autouse=True)
26 | def skip_without_real_data(self, request):
27 | """Skip these tests unless --use-real-data is provided."""
28 | if not request.config.getoption("--use-real-data", default=False):
29 | pytest.skip("Real API tests only run with --use-real-data flag")
30 |
31 | @pytest.fixture
32 | def jira_client(self):
33 | """Create real Jira client from environment."""
34 | if not os.getenv("JIRA_URL"):
35 | pytest.skip("JIRA_URL not set in environment")
36 |
37 | config = JiraConfig.from_env()
38 | return JiraFetcher(config=config)
39 |
40 | @pytest.fixture
41 | def test_project_key(self):
42 | """Get test project key from environment."""
43 | key = os.getenv("JIRA_TEST_PROJECT_KEY", "TEST")
44 | return key
45 |
46 | @pytest.fixture
47 | def created_issues(self):
48 | """Track created issues for cleanup."""
49 | issues = []
50 | yield issues
51 | # Cleanup will be done in individual tests
52 |
53 | def test_complete_issue_lifecycle(
54 | self, jira_client, test_project_key, created_issues
55 | ):
56 | """Test create, update, transition, and delete issue lifecycle."""
57 | # Create unique summary to avoid conflicts
58 | unique_id = str(uuid.uuid4())[:8]
59 | summary = f"Integration Test Issue {unique_id}"
60 |
61 | # 1. Create issue
62 | issue_data = {
63 | "project": {"key": test_project_key},
64 | "summary": summary,
65 | "description": "This is an integration test issue that will be deleted",
66 | "issuetype": {"name": "Task"},
67 | }
68 |
69 | created_issue = jira_client.create_issue(**issue_data)
70 | created_issues.append(created_issue.key)
71 |
72 | assert created_issue.key.startswith(test_project_key)
73 | assert created_issue.fields.summary == summary
74 |
75 | # 2. Update issue
76 | update_data = {
77 | "summary": f"{summary} - Updated",
78 | "description": "Updated description",
79 | }
80 |
81 | updated_issue = jira_client.update_issue(
82 | issue_key=created_issue.key, **update_data
83 | )
84 |
85 | assert updated_issue.fields.summary == f"{summary} - Updated"
86 |
87 | # 3. Add comment
88 | comment = jira_client.add_comment(
89 | issue_key=created_issue.key, body="Test comment from integration test"
90 | )
91 |
92 | assert comment.body == "Test comment from integration test"
93 |
94 | # 4. Get available transitions
95 | transitions = jira_client.get_transitions(issue_key=created_issue.key)
96 | assert len(transitions) > 0
97 |
98 | # 5. Transition issue (if "Done" transition available)
99 | done_transition = next(
100 | (t for t in transitions if "done" in t.name.lower()), None
101 | )
102 | if done_transition:
103 | jira_client.transition_issue(
104 | issue_key=created_issue.key, transition_id=done_transition.id
105 | )
106 |
107 | # 6. Delete issue
108 | jira_client.delete_issue(issue_key=created_issue.key)
109 | created_issues.remove(created_issue.key)
110 |
111 | # Verify deletion
112 | with pytest.raises(Exception):
113 | jira_client.get_issue(issue_key=created_issue.key)
114 |
115 | def test_attachment_upload_download(
116 | self, jira_client, test_project_key, created_issues, tmp_path
117 | ):
118 | """Test attachment upload and download flow."""
119 | # Create test issue
120 | unique_id = str(uuid.uuid4())[:8]
121 | issue_data = {
122 | "project": {"key": test_project_key},
123 | "summary": f"Attachment Test {unique_id}",
124 | "issuetype": {"name": "Task"},
125 | }
126 |
127 | issue = jira_client.create_issue(**issue_data)
128 | created_issues.append(issue.key)
129 |
130 | try:
131 | # Create test file
132 | test_file = tmp_path / "test_attachment.txt"
133 | test_content = f"Test content {unique_id}"
134 | test_file.write_text(test_content)
135 |
136 | # Upload attachment
137 | with open(test_file, "rb") as f:
138 | attachments = jira_client.add_attachment(
139 | issue_key=issue.key, filename="test_attachment.txt", data=f.read()
140 | )
141 |
142 | assert len(attachments) == 1
143 | attachment = attachments[0]
144 | assert attachment.filename == "test_attachment.txt"
145 |
146 | # Get issue with attachments
147 | issue_with_attachments = jira_client.get_issue(
148 | issue_key=issue.key, expand="attachment"
149 | )
150 |
151 | assert len(issue_with_attachments.fields.attachment) == 1
152 |
153 | finally:
154 | # Cleanup
155 | jira_client.delete_issue(issue_key=issue.key)
156 | created_issues.remove(issue.key)
157 |
158 | def test_jql_search_with_pagination(self, jira_client, test_project_key):
159 | """Test JQL search with pagination."""
160 | # Search for recent issues in test project
161 | jql = f"project = {test_project_key} ORDER BY created DESC"
162 |
163 | # First page
164 | results_page1 = jira_client.search_issues(jql=jql, start_at=0, max_results=2)
165 |
166 | assert results_page1.total >= 0
167 |
168 | if results_page1.total > 2:
169 | # Second page
170 | results_page2 = jira_client.search_issues(
171 | jql=jql, start_at=2, max_results=2
172 | )
173 |
174 | # Ensure different issues
175 | page1_keys = [i.key for i in results_page1.issues]
176 | page2_keys = [i.key for i in results_page2.issues]
177 | assert not set(page1_keys).intersection(set(page2_keys))
178 |
179 | def test_bulk_issue_creation(self, jira_client, test_project_key, created_issues):
180 | """Test creating multiple issues in bulk."""
181 | unique_id = str(uuid.uuid4())[:8]
182 | issues_data = []
183 |
184 | # Prepare 3 issues
185 | for i in range(3):
186 | issues_data.append(
187 | {
188 | "project": {"key": test_project_key},
189 | "summary": f"Bulk Test Issue {i + 1} - {unique_id}",
190 | "issuetype": {"name": "Task"},
191 | }
192 | )
193 |
194 | # Create issues
195 | created = []
196 | try:
197 | for issue_data in issues_data:
198 | issue = jira_client.create_issue(**issue_data)
199 | created.append(issue)
200 | created_issues.append(issue.key)
201 |
202 | assert len(created) == 3
203 |
204 | # Verify all created
205 | for i, issue in enumerate(created):
206 | assert f"Bulk Test Issue {i + 1}" in issue.fields.summary
207 |
208 | finally:
209 | # Cleanup all created issues
210 | for issue in created:
211 | try:
212 | jira_client.delete_issue(issue_key=issue.key)
213 | created_issues.remove(issue.key)
214 | except Exception:
215 | pass
216 |
217 | def test_rate_limiting_behavior(self, jira_client):
218 | """Test API rate limiting behavior with retries."""
219 | # Make multiple rapid requests
220 | start_time = time.time()
221 |
222 | for _i in range(5):
223 | try:
224 | jira_client.get_fields()
225 | except Exception as e:
226 | if "429" in str(e) or "rate limit" in str(e).lower():
227 | # Rate limit hit - this is expected
228 | assert True
229 | return
230 |
231 | # If no rate limit hit, that's also fine
232 | elapsed = time.time() - start_time
233 | assert elapsed < 10 # Should complete quickly if no rate limiting
234 |
235 |
236 | @pytest.mark.integration
237 | class TestRealConfluenceAPI(BaseAuthTest):
238 | """Real Confluence API integration tests with cleanup."""
239 |
240 | @pytest.fixture(autouse=True)
241 | def skip_without_real_data(self, request):
242 | """Skip these tests unless --use-real-data is provided."""
243 | if not request.config.getoption("--use-real-data", default=False):
244 | pytest.skip("Real API tests only run with --use-real-data flag")
245 |
246 | @pytest.fixture
247 | def confluence_client(self):
248 | """Create real Confluence client from environment."""
249 | if not os.getenv("CONFLUENCE_URL"):
250 | pytest.skip("CONFLUENCE_URL not set in environment")
251 |
252 | config = ConfluenceConfig.from_env()
253 | return ConfluenceFetcher(config=config)
254 |
255 | @pytest.fixture
256 | def test_space_key(self):
257 | """Get test space key from environment."""
258 | key = os.getenv("CONFLUENCE_TEST_SPACE_KEY", "TEST")
259 | return key
260 |
261 | @pytest.fixture
262 | def created_pages(self):
263 | """Track created pages for cleanup."""
264 | pages = []
265 | yield pages
266 | # Cleanup will be done in individual tests
267 |
268 | def test_page_lifecycle(self, confluence_client, test_space_key, created_pages):
269 | """Test create, update, and delete page lifecycle."""
270 | unique_id = str(uuid.uuid4())[:8]
271 | title = f"Integration Test Page {unique_id}"
272 |
273 | # 1. Create page
274 | page = confluence_client.create_page(
275 | space_key=test_space_key,
276 | title=title,
277 | body="<p>This is an integration test page</p>",
278 | )
279 | created_pages.append(page.id)
280 |
281 | assert page.title == title
282 | assert page.space.key == test_space_key
283 |
284 | # 2. Update page
285 | updated_page = confluence_client.update_page(
286 | page_id=page.id,
287 | title=f"{title} - Updated",
288 | body="<p>Updated content</p>",
289 | version_number=page.version.number + 1,
290 | )
291 |
292 | assert updated_page.title == f"{title} - Updated"
293 | assert updated_page.version.number == page.version.number + 1
294 |
295 | # 3. Add comment
296 | comment = confluence_client.add_comment(
297 | page_id=page.id, body="Test comment from integration test"
298 | )
299 |
300 | assert "Test comment" in comment.body.storage.value
301 |
302 | # 4. Delete page
303 | confluence_client.delete_page(page_id=page.id)
304 | created_pages.remove(page.id)
305 |
306 | # Verify deletion
307 | with pytest.raises(Exception):
308 | confluence_client.get_page_by_id(page_id=page.id)
309 |
310 | def test_page_hierarchy(self, confluence_client, test_space_key, created_pages):
311 | """Test creating page hierarchy with parent-child relationships."""
312 | unique_id = str(uuid.uuid4())[:8]
313 |
314 | # Create parent page
315 | parent = confluence_client.create_page(
316 | space_key=test_space_key,
317 | title=f"Parent Page {unique_id}",
318 | body="<p>Parent content</p>",
319 | )
320 | created_pages.append(parent.id)
321 |
322 | try:
323 | # Create child page
324 | child = confluence_client.create_page(
325 | space_key=test_space_key,
326 | title=f"Child Page {unique_id}",
327 | body="<p>Child content</p>",
328 | parent_id=parent.id,
329 | )
330 | created_pages.append(child.id)
331 |
332 | # Get child pages
333 | children = confluence_client.get_page_children(
334 | page_id=parent.id, expand="body.storage"
335 | )
336 |
337 | assert len(children.results) == 1
338 | assert children.results[0].id == child.id
339 |
340 | # Delete child first, then parent
341 | confluence_client.delete_page(page_id=child.id)
342 | created_pages.remove(child.id)
343 |
344 | finally:
345 | # Cleanup parent
346 | confluence_client.delete_page(page_id=parent.id)
347 | created_pages.remove(parent.id)
348 |
349 | def test_cql_search(self, confluence_client, test_space_key):
350 | """Test CQL search functionality."""
351 | # Search for pages in test space
352 | cql = f'space = "{test_space_key}" and type = "page"'
353 |
354 | results = confluence_client.search_content(cql=cql, limit=5)
355 |
356 | assert results.size >= 0
357 |
358 | # Verify all results are from test space
359 | for result in results.results:
360 | if hasattr(result, "space"):
361 | assert result.space.key == test_space_key
362 |
363 | def test_attachment_handling(
364 | self, confluence_client, test_space_key, created_pages, tmp_path
365 | ):
366 | """Test attachment upload to Confluence page."""
367 | unique_id = str(uuid.uuid4())[:8]
368 |
369 | # Create page
370 | page = confluence_client.create_page(
371 | space_key=test_space_key,
372 | title=f"Attachment Test Page {unique_id}",
373 | body="<p>Page with attachments</p>",
374 | )
375 | created_pages.append(page.id)
376 |
377 | try:
378 | # Create test file
379 | test_file = tmp_path / "confluence_test.txt"
380 | test_content = f"Confluence test content {unique_id}"
381 | test_file.write_text(test_content)
382 |
383 | # Upload attachment
384 | with open(test_file, "rb") as f:
385 | attachment = confluence_client.create_attachment(
386 | page_id=page.id, filename="confluence_test.txt", data=f.read()
387 | )
388 |
389 | assert attachment.title == "confluence_test.txt"
390 |
391 | # Get page attachments
392 | attachments = confluence_client.get_attachments(page_id=page.id)
393 | assert len(attachments.results) == 1
394 | assert attachments.results[0].title == "confluence_test.txt"
395 |
396 | finally:
397 | # Cleanup
398 | confluence_client.delete_page(page_id=page.id)
399 | created_pages.remove(page.id)
400 |
401 | def test_large_content_handling(
402 | self, confluence_client, test_space_key, created_pages
403 | ):
404 | """Test handling of large content (>1MB)."""
405 | unique_id = str(uuid.uuid4())[:8]
406 |
407 | # Create large content (approximately 1MB)
408 | large_content = "<p>" + ("Large content block. " * 10000) + "</p>"
409 |
410 | # Create page with large content
411 | page = confluence_client.create_page(
412 | space_key=test_space_key,
413 | title=f"Large Content Test {unique_id}",
414 | body=large_content,
415 | )
416 | created_pages.append(page.id)
417 |
418 | try:
419 | # Retrieve and verify
420 | retrieved = confluence_client.get_page_by_id(
421 | page_id=page.id, expand="body.storage"
422 | )
423 |
424 | assert len(retrieved.body.storage.value) > 100000 # At least 100KB
425 |
426 | finally:
427 | # Cleanup
428 | confluence_client.delete_page(page_id=page.id)
429 | created_pages.remove(page.id)
430 |
431 |
432 | @pytest.mark.integration
433 | class TestCrossServiceIntegration:
434 | """Test integration between Jira and Confluence services."""
435 |
436 | @pytest.fixture(autouse=True)
437 | def skip_without_real_data(self, request):
438 | """Skip these tests unless --use-real-data is provided."""
439 | if not request.config.getoption("--use-real-data", default=False):
440 | pytest.skip("Real API tests only run with --use-real-data flag")
441 |
442 | @pytest.fixture
443 | def jira_client(self):
444 | """Create real Jira client from environment."""
445 | if not os.getenv("JIRA_URL"):
446 | pytest.skip("JIRA_URL not set in environment")
447 |
448 | config = JiraConfig.from_env()
449 | return JiraFetcher(config=config)
450 |
451 | @pytest.fixture
452 | def confluence_client(self):
453 | """Create real Confluence client from environment."""
454 | if not os.getenv("CONFLUENCE_URL"):
455 | pytest.skip("CONFLUENCE_URL not set in environment")
456 |
457 | config = ConfluenceConfig.from_env()
458 | return ConfluenceFetcher(config=config)
459 |
460 | @pytest.fixture
461 | def test_project_key(self):
462 | """Get test project key from environment."""
463 | return os.getenv("JIRA_TEST_PROJECT_KEY", "TEST")
464 |
465 | @pytest.fixture
466 | def test_space_key(self):
467 | """Get test space key from environment."""
468 | return os.getenv("CONFLUENCE_TEST_SPACE_KEY", "TEST")
469 |
470 | @pytest.fixture
471 | def created_issues(self):
472 | """Track created issues for cleanup."""
473 | issues = []
474 | yield issues
475 |
476 | @pytest.fixture
477 | def created_pages(self):
478 | """Track created pages for cleanup."""
479 | pages = []
480 | yield pages
481 |
482 | def test_jira_confluence_linking(
483 | self,
484 | jira_client,
485 | confluence_client,
486 | test_project_key,
487 | test_space_key,
488 | created_issues,
489 | created_pages,
490 | ):
491 | """Test linking between Jira issues and Confluence pages."""
492 | unique_id = str(uuid.uuid4())[:8]
493 |
494 | # Create Jira issue
495 | issue = jira_client.create_issue(
496 | project={"key": test_project_key},
497 | summary=f"Linked Issue {unique_id}",
498 | issuetype={"name": "Task"},
499 | )
500 | created_issues.append(issue.key)
501 |
502 | # Create Confluence page with Jira issue link
503 | page_content = f'<p>Related to Jira issue: <a href="{jira_client.config.url}/browse/{issue.key}">{issue.key}</a></p>'
504 |
505 | page = confluence_client.create_page(
506 | space_key=test_space_key,
507 | title=f"Linked Page {unique_id}",
508 | body=page_content,
509 | )
510 | created_pages.append(page.id)
511 |
512 | try:
513 | # Add comment in Jira referencing Confluence page
514 | confluence_url = (
515 | f"{confluence_client.config.url}/pages/viewpage.action?pageId={page.id}"
516 | )
517 | jira_client.add_comment(
518 | issue_key=issue.key,
519 | body=f"Documentation available at: {confluence_url}",
520 | )
521 |
522 | # Verify both exist and contain cross-references
523 | issue_comments = jira_client.get_comments(issue_key=issue.key)
524 | assert any(confluence_url in c.body for c in issue_comments.comments)
525 |
526 | retrieved_page = confluence_client.get_page_by_id(
527 | page_id=page.id, expand="body.storage"
528 | )
529 | assert issue.key in retrieved_page.body.storage.value
530 |
531 | finally:
532 | # Cleanup
533 | jira_client.delete_issue(issue_key=issue.key)
534 | created_issues.remove(issue.key)
535 | confluence_client.delete_page(page_id=page.id)
536 | created_pages.remove(page.id)
537 |
```
--------------------------------------------------------------------------------
/tests/unit/servers/test_confluence_server.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for the Confluence FastMCP server."""
2 |
3 | import json
4 | import logging
5 | from collections.abc import AsyncGenerator
6 | from contextlib import asynccontextmanager
7 | from unittest.mock import AsyncMock, MagicMock, patch
8 |
9 | import pytest
10 | from fastmcp import Client, FastMCP
11 | from fastmcp.client import FastMCPTransport
12 | from starlette.requests import Request
13 |
14 | from src.mcp_atlassian.confluence import ConfluenceFetcher
15 | from src.mcp_atlassian.confluence.config import ConfluenceConfig
16 | from src.mcp_atlassian.models.confluence.page import ConfluencePage
17 | from src.mcp_atlassian.servers.context import MainAppContext
18 | from src.mcp_atlassian.servers.main import AtlassianMCP
19 | from src.mcp_atlassian.utils.oauth import OAuthConfig
20 |
21 | logger = logging.getLogger(__name__)
22 |
23 |
24 | @pytest.fixture
25 | def mock_confluence_fetcher():
26 | """Create a mocked ConfluenceFetcher instance for testing."""
27 | mock_fetcher = MagicMock(spec=ConfluenceFetcher)
28 |
29 | # Mock page for various methods
30 | mock_page = MagicMock(spec=ConfluencePage)
31 | mock_page.to_simplified_dict.return_value = {
32 | "id": "123456",
33 | "title": "Test Page Mock Title",
34 | "url": "https://example.atlassian.net/wiki/spaces/TEST/pages/123456/Test+Page",
35 | "content": {
36 | "value": "This is a test page content in Markdown",
37 | "format": "markdown",
38 | },
39 | }
40 | mock_page.content = "This is a test page content in Markdown"
41 |
42 | # Set up mock responses for each method
43 | mock_fetcher.search.return_value = [mock_page]
44 | mock_fetcher.get_page_content.return_value = mock_page
45 | mock_fetcher.get_page_children.return_value = [mock_page]
46 | mock_fetcher.create_page.return_value = mock_page
47 | mock_fetcher.update_page.return_value = mock_page
48 | mock_fetcher.delete_page.return_value = True
49 |
50 | # Mock comment
51 | mock_comment = MagicMock()
52 | mock_comment.to_simplified_dict.return_value = {
53 | "id": "789",
54 | "author": "Test User",
55 | "created": "2023-08-01T12:00:00.000Z",
56 | "body": "This is a test comment",
57 | }
58 | mock_fetcher.get_page_comments.return_value = [mock_comment]
59 |
60 | # Mock label
61 | mock_label = MagicMock()
62 | mock_label.to_simplified_dict.return_value = {"id": "lbl1", "name": "test-label"}
63 | mock_fetcher.get_page_labels.return_value = [mock_label]
64 | mock_fetcher.add_page_label.return_value = [mock_label]
65 |
66 | # Mock add_comment method
67 | mock_comment = MagicMock()
68 | mock_comment.to_simplified_dict.return_value = {
69 | "id": "987",
70 | "author": "Test User",
71 | "created": "2023-08-01T13:00:00.000Z",
72 | "body": "This is a test comment added via API",
73 | }
74 | mock_fetcher.add_comment.return_value = mock_comment
75 |
76 | # Mock search_user method
77 | mock_user_search_result = MagicMock()
78 | mock_user_search_result.to_simplified_dict.return_value = {
79 | "entity_type": "user",
80 | "title": "First Last",
81 | "score": 0.0,
82 | "user": {
83 | "account_id": "a031248587011jasoidf9832jd8j1",
84 | "display_name": "First Last",
85 | "email": "[email protected]",
86 | "profile_picture": "/wiki/aa-avatar/a031248587011jasoidf9832jd8j1",
87 | "is_active": True,
88 | },
89 | "url": "/people/a031248587011jasoidf9832jd8j1",
90 | "last_modified": "2025-06-02T13:35:59.680Z",
91 | "excerpt": "",
92 | }
93 | mock_fetcher.search_user.return_value = [mock_user_search_result]
94 |
95 | return mock_fetcher
96 |
97 |
98 | @pytest.fixture
99 | def mock_base_confluence_config():
100 | """Create a mock base ConfluenceConfig for MainAppContext using OAuth for multi-user scenario."""
101 | mock_oauth_config = OAuthConfig(
102 | client_id="server_client_id",
103 | client_secret="server_client_secret",
104 | redirect_uri="http://localhost",
105 | scope="read:confluence",
106 | cloud_id="mock_cloud_id",
107 | )
108 | return ConfluenceConfig(
109 | url="https://mock.atlassian.net/wiki",
110 | auth_type="oauth",
111 | oauth_config=mock_oauth_config,
112 | )
113 |
114 |
115 | @pytest.fixture
116 | def test_confluence_mcp(mock_confluence_fetcher, mock_base_confluence_config):
117 | """Create a test FastMCP instance with standard configuration."""
118 |
119 | # Import and register tool functions (as they are in confluence.py)
120 | from src.mcp_atlassian.servers.confluence import (
121 | add_comment,
122 | add_label,
123 | create_page,
124 | delete_page,
125 | get_comments,
126 | get_labels,
127 | get_page,
128 | get_page_children,
129 | search,
130 | search_user,
131 | update_page,
132 | )
133 |
134 | @asynccontextmanager
135 | async def test_lifespan(app: FastMCP) -> AsyncGenerator[MainAppContext, None]:
136 | try:
137 | yield MainAppContext(
138 | full_confluence_config=mock_base_confluence_config, read_only=False
139 | )
140 | finally:
141 | pass
142 |
143 | test_mcp = AtlassianMCP(
144 | "TestConfluence",
145 | description="Test Confluence MCP Server",
146 | lifespan=test_lifespan,
147 | )
148 |
149 | # Create and configure the sub-MCP for Confluence tools
150 | confluence_sub_mcp = FastMCP(name="TestConfluenceSubMCP")
151 | confluence_sub_mcp.tool()(search)
152 | confluence_sub_mcp.tool()(get_page)
153 | confluence_sub_mcp.tool()(get_page_children)
154 | confluence_sub_mcp.tool()(get_comments)
155 | confluence_sub_mcp.tool()(add_comment)
156 | confluence_sub_mcp.tool()(get_labels)
157 | confluence_sub_mcp.tool()(add_label)
158 | confluence_sub_mcp.tool()(create_page)
159 | confluence_sub_mcp.tool()(update_page)
160 | confluence_sub_mcp.tool()(delete_page)
161 | confluence_sub_mcp.tool()(search_user)
162 |
163 | test_mcp.mount("confluence", confluence_sub_mcp)
164 |
165 | return test_mcp
166 |
167 |
168 | @pytest.fixture
169 | def no_fetcher_test_confluence_mcp(mock_base_confluence_config):
170 | """Create a test FastMCP instance that simulates missing Confluence fetcher."""
171 |
172 | # Import and register tool functions (as they are in confluence.py)
173 | from src.mcp_atlassian.servers.confluence import (
174 | add_comment,
175 | add_label,
176 | create_page,
177 | delete_page,
178 | get_comments,
179 | get_labels,
180 | get_page,
181 | get_page_children,
182 | search,
183 | search_user,
184 | update_page,
185 | )
186 |
187 | @asynccontextmanager
188 | async def no_fetcher_test_lifespan(
189 | app: FastMCP,
190 | ) -> AsyncGenerator[MainAppContext, None]:
191 | try:
192 | yield MainAppContext(
193 | full_confluence_config=mock_base_confluence_config, read_only=False
194 | )
195 | finally:
196 | pass
197 |
198 | test_mcp = AtlassianMCP(
199 | "NoFetcherTestConfluence",
200 | description="No Fetcher Test Confluence MCP Server",
201 | lifespan=no_fetcher_test_lifespan,
202 | )
203 |
204 | # Create and configure the sub-MCP for Confluence tools
205 | confluence_sub_mcp = FastMCP(name="NoFetcherTestConfluenceSubMCP")
206 | confluence_sub_mcp.tool()(search)
207 | confluence_sub_mcp.tool()(get_page)
208 | confluence_sub_mcp.tool()(get_page_children)
209 | confluence_sub_mcp.tool()(get_comments)
210 | confluence_sub_mcp.tool()(add_comment)
211 | confluence_sub_mcp.tool()(get_labels)
212 | confluence_sub_mcp.tool()(add_label)
213 | confluence_sub_mcp.tool()(create_page)
214 | confluence_sub_mcp.tool()(update_page)
215 | confluence_sub_mcp.tool()(delete_page)
216 | confluence_sub_mcp.tool()(search_user)
217 |
218 | test_mcp.mount("confluence", confluence_sub_mcp)
219 |
220 | return test_mcp
221 |
222 |
223 | @pytest.fixture
224 | def mock_request():
225 | """Provides a mock Starlette Request object with a state."""
226 | request = MagicMock(spec=Request)
227 | request.state = MagicMock()
228 | return request
229 |
230 |
231 | @pytest.fixture
232 | async def client(test_confluence_mcp, mock_confluence_fetcher):
233 | """Create a FastMCP client with mocked Confluence fetcher and request state."""
234 | with (
235 | patch(
236 | "src.mcp_atlassian.servers.confluence.get_confluence_fetcher",
237 | AsyncMock(return_value=mock_confluence_fetcher),
238 | ),
239 | patch(
240 | "src.mcp_atlassian.servers.dependencies.get_http_request",
241 | MagicMock(spec=Request, state=MagicMock()),
242 | ),
243 | ):
244 | client_instance = Client(transport=FastMCPTransport(test_confluence_mcp))
245 | async with client_instance as connected_client:
246 | yield connected_client
247 |
248 |
249 | @pytest.fixture
250 | async def no_fetcher_client_fixture(no_fetcher_test_confluence_mcp, mock_request):
251 | """Create a client that simulates missing Confluence fetcher configuration."""
252 | client_for_no_fetcher_test = Client(
253 | transport=FastMCPTransport(no_fetcher_test_confluence_mcp)
254 | )
255 | async with client_for_no_fetcher_test as connected_client_for_no_fetcher:
256 | yield connected_client_for_no_fetcher
257 |
258 |
259 | @pytest.mark.anyio
260 | async def test_search(client, mock_confluence_fetcher):
261 | """Test the search tool with basic query."""
262 | response = await client.call_tool("confluence_search", {"query": "test search"})
263 |
264 | mock_confluence_fetcher.search.assert_called_once()
265 | args, kwargs = mock_confluence_fetcher.search.call_args
266 | assert 'siteSearch ~ "test search"' in args[0]
267 | assert kwargs.get("limit") == 10
268 | assert kwargs.get("spaces_filter") is None
269 |
270 | result_data = json.loads(response[0].text)
271 | assert isinstance(result_data, list)
272 | assert len(result_data) > 0
273 | assert result_data[0]["title"] == "Test Page Mock Title"
274 |
275 |
276 | @pytest.mark.anyio
277 | async def test_get_page(client, mock_confluence_fetcher):
278 | """Test the get_page tool with default parameters."""
279 | response = await client.call_tool("confluence_get_page", {"page_id": "123456"})
280 |
281 | mock_confluence_fetcher.get_page_content.assert_called_once_with(
282 | "123456", convert_to_markdown=True
283 | )
284 |
285 | result_data = json.loads(response[0].text)
286 | assert "metadata" in result_data
287 | assert result_data["metadata"]["title"] == "Test Page Mock Title"
288 | assert "content" in result_data["metadata"]
289 | assert "value" in result_data["metadata"]["content"]
290 | assert "This is a test page content" in result_data["metadata"]["content"]["value"]
291 |
292 |
293 | @pytest.mark.anyio
294 | async def test_get_page_no_metadata(client, mock_confluence_fetcher):
295 | """Test get_page with metadata disabled."""
296 | response = await client.call_tool(
297 | "confluence_get_page", {"page_id": "123456", "include_metadata": False}
298 | )
299 |
300 | mock_confluence_fetcher.get_page_content.assert_called_once_with(
301 | "123456", convert_to_markdown=True
302 | )
303 |
304 | result_data = json.loads(response[0].text)
305 | assert "metadata" not in result_data
306 | assert "content" in result_data
307 | assert "This is a test page content" in result_data["content"]["value"]
308 |
309 |
310 | @pytest.mark.anyio
311 | async def test_get_page_no_markdown(client, mock_confluence_fetcher):
312 | """Test get_page with HTML content format."""
313 | mock_page_html = MagicMock(spec=ConfluencePage)
314 | mock_page_html.to_simplified_dict.return_value = {
315 | "id": "123456",
316 | "title": "Test Page HTML",
317 | "url": "https://example.com/html",
318 | "content": "<p>HTML Content</p>",
319 | "content_format": "storage",
320 | }
321 | mock_page_html.content = "<p>HTML Content</p>"
322 | mock_page_html.content_format = "storage"
323 |
324 | mock_confluence_fetcher.get_page_content.return_value = mock_page_html
325 |
326 | response = await client.call_tool(
327 | "confluence_get_page", {"page_id": "123456", "convert_to_markdown": False}
328 | )
329 |
330 | mock_confluence_fetcher.get_page_content.assert_called_once_with(
331 | "123456", convert_to_markdown=False
332 | )
333 |
334 | result_data = json.loads(response[0].text)
335 | assert "metadata" in result_data
336 | assert result_data["metadata"]["title"] == "Test Page HTML"
337 | assert result_data["metadata"]["content"] == "<p>HTML Content</p>"
338 | assert result_data["metadata"]["content_format"] == "storage"
339 |
340 |
341 | @pytest.mark.anyio
342 | async def test_get_page_children(client, mock_confluence_fetcher):
343 | """Test the get_page_children tool."""
344 | response = await client.call_tool(
345 | "confluence_get_page_children", {"parent_id": "123456"}
346 | )
347 |
348 | mock_confluence_fetcher.get_page_children.assert_called_once()
349 | call_kwargs = mock_confluence_fetcher.get_page_children.call_args.kwargs
350 | assert call_kwargs["page_id"] == "123456"
351 | assert call_kwargs.get("start") == 0
352 | assert call_kwargs.get("limit") == 25
353 | assert call_kwargs.get("expand") == "version"
354 |
355 | result_data = json.loads(response[0].text)
356 | assert "parent_id" in result_data
357 | assert "results" in result_data
358 | assert len(result_data["results"]) > 0
359 | assert result_data["results"][0]["title"] == "Test Page Mock Title"
360 |
361 |
362 | @pytest.mark.anyio
363 | async def test_get_comments(client, mock_confluence_fetcher):
364 | """Test retrieving page comments."""
365 | response = await client.call_tool("confluence_get_comments", {"page_id": "123456"})
366 |
367 | mock_confluence_fetcher.get_page_comments.assert_called_once_with("123456")
368 |
369 | result_data = json.loads(response[0].text)
370 | assert isinstance(result_data, list)
371 | assert len(result_data) > 0
372 | assert result_data[0]["author"] == "Test User"
373 |
374 |
375 | @pytest.mark.anyio
376 | async def test_add_comment(client, mock_confluence_fetcher):
377 | """Test adding a comment to a Confluence page."""
378 | response = await client.call_tool(
379 | "confluence_add_comment",
380 | {"page_id": "123456", "content": "Test comment content"},
381 | )
382 |
383 | mock_confluence_fetcher.add_comment.assert_called_once_with(
384 | page_id="123456", content="Test comment content"
385 | )
386 |
387 | result_data = json.loads(response[0].text)
388 | assert isinstance(result_data, dict)
389 | assert result_data["success"] is True
390 | assert "comment" in result_data
391 | assert result_data["comment"]["id"] == "987"
392 | assert result_data["comment"]["author"] == "Test User"
393 | assert result_data["comment"]["body"] == "This is a test comment added via API"
394 | assert result_data["comment"]["created"] == "2023-08-01T13:00:00.000Z"
395 |
396 |
397 | @pytest.mark.anyio
398 | async def test_get_labels(client, mock_confluence_fetcher):
399 | """Test retrieving page labels."""
400 | response = await client.call_tool("confluence_get_labels", {"page_id": "123456"})
401 | mock_confluence_fetcher.get_page_labels.assert_called_once_with("123456")
402 | result_data = json.loads(response[0].text)
403 | assert isinstance(result_data, list)
404 | assert result_data[0]["name"] == "test-label"
405 |
406 |
407 | @pytest.mark.anyio
408 | async def test_add_label(client, mock_confluence_fetcher):
409 | """Test adding a label to a page."""
410 | response = await client.call_tool(
411 | "confluence_add_label", {"page_id": "123456", "name": "new-label"}
412 | )
413 | mock_confluence_fetcher.add_page_label.assert_called_once_with(
414 | "123456", "new-label"
415 | )
416 | result_data = json.loads(response[0].text)
417 | assert isinstance(result_data, list)
418 | assert result_data[0]["name"] == "test-label"
419 |
420 |
421 | @pytest.mark.anyio
422 | async def test_search_user(client, mock_confluence_fetcher):
423 | """Test the search_user tool with CQL query."""
424 | response = await client.call_tool(
425 | "confluence_search_user", {"query": 'user.fullname ~ "First Last"', "limit": 10}
426 | )
427 |
428 | mock_confluence_fetcher.search_user.assert_called_once_with(
429 | 'user.fullname ~ "First Last"', limit=10
430 | )
431 |
432 | result_data = json.loads(response[0].text)
433 | assert isinstance(result_data, list)
434 | assert len(result_data) == 1
435 | assert result_data[0]["entity_type"] == "user"
436 | assert result_data[0]["title"] == "First Last"
437 | assert result_data[0]["user"]["account_id"] == "a031248587011jasoidf9832jd8j1"
438 | assert result_data[0]["user"]["display_name"] == "First Last"
439 |
440 |
441 | @pytest.mark.anyio
442 | async def test_create_page_with_numeric_parent_id(client, mock_confluence_fetcher):
443 | """Test creating a page with numeric parent_id (integer) - should convert to string."""
444 | response = await client.call_tool(
445 | "confluence_create_page",
446 | {
447 | "space_key": "TEST",
448 | "title": "Test Page",
449 | "content": "Test content",
450 | "parent_id": 123456789, # Numeric ID as integer
451 | },
452 | )
453 |
454 | # Verify the parent_id was converted to string when calling the underlying method
455 | mock_confluence_fetcher.create_page.assert_called_once()
456 | call_kwargs = mock_confluence_fetcher.create_page.call_args.kwargs
457 | assert call_kwargs["parent_id"] == "123456789" # Should be string
458 | assert call_kwargs["space_key"] == "TEST"
459 | assert call_kwargs["title"] == "Test Page"
460 |
461 | result_data = json.loads(response[0].text)
462 | assert result_data["message"] == "Page created successfully"
463 | assert result_data["page"]["title"] == "Test Page Mock Title"
464 |
465 |
466 | @pytest.mark.anyio
467 | async def test_create_page_with_string_parent_id(client, mock_confluence_fetcher):
468 | """Test creating a page with string parent_id - should remain unchanged."""
469 | response = await client.call_tool(
470 | "confluence_create_page",
471 | {
472 | "space_key": "TEST",
473 | "title": "Test Page",
474 | "content": "Test content",
475 | "parent_id": "123456789", # String ID
476 | },
477 | )
478 |
479 | mock_confluence_fetcher.create_page.assert_called_once()
480 | call_kwargs = mock_confluence_fetcher.create_page.call_args.kwargs
481 | assert call_kwargs["parent_id"] == "123456789" # Should remain string
482 | assert call_kwargs["space_key"] == "TEST"
483 | assert call_kwargs["title"] == "Test Page"
484 |
485 | result_data = json.loads(response[0].text)
486 | assert result_data["message"] == "Page created successfully"
487 | assert result_data["page"]["title"] == "Test Page Mock Title"
488 |
489 |
490 | @pytest.mark.anyio
491 | async def test_update_page_with_numeric_parent_id(client, mock_confluence_fetcher):
492 | """Test updating a page with numeric parent_id (integer) - should convert to string."""
493 | response = await client.call_tool(
494 | "confluence_update_page",
495 | {
496 | "page_id": "999999",
497 | "title": "Updated Page",
498 | "content": "Updated content",
499 | "parent_id": 123456789, # Numeric ID as integer
500 | },
501 | )
502 |
503 | mock_confluence_fetcher.update_page.assert_called_once()
504 | call_kwargs = mock_confluence_fetcher.update_page.call_args.kwargs
505 | assert call_kwargs["parent_id"] == "123456789" # Should be string
506 | assert call_kwargs["page_id"] == "999999"
507 | assert call_kwargs["title"] == "Updated Page"
508 |
509 | result_data = json.loads(response[0].text)
510 | assert result_data["message"] == "Page updated successfully"
511 | assert result_data["page"]["title"] == "Test Page Mock Title"
512 |
513 |
514 | @pytest.mark.anyio
515 | async def test_update_page_with_string_parent_id(client, mock_confluence_fetcher):
516 | """Test updating a page with string parent_id - should remain unchanged."""
517 | response = await client.call_tool(
518 | "confluence_update_page",
519 | {
520 | "page_id": "999999",
521 | "title": "Updated Page",
522 | "content": "Updated content",
523 | "parent_id": "123456789", # String ID
524 | },
525 | )
526 |
527 | mock_confluence_fetcher.update_page.assert_called_once()
528 | call_kwargs = mock_confluence_fetcher.update_page.call_args.kwargs
529 | assert call_kwargs["parent_id"] == "123456789" # Should remain string
530 | assert call_kwargs["page_id"] == "999999"
531 | assert call_kwargs["title"] == "Updated Page"
532 |
533 | result_data = json.loads(response[0].text)
534 | assert result_data["message"] == "Page updated successfully"
535 | assert result_data["page"]["title"] == "Test Page Mock Title"
536 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/fields.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira field operations."""
2 |
3 | import logging
4 | from typing import Any
5 |
6 | from thefuzz import fuzz
7 |
8 | from .client import JiraClient
9 | from .protocols import EpicOperationsProto, UsersOperationsProto
10 |
11 | logger = logging.getLogger("mcp-jira")
12 |
13 |
14 | class FieldsMixin(JiraClient, EpicOperationsProto, UsersOperationsProto):
15 | """Mixin for Jira field operations.
16 |
17 | This mixin provides methods for discovering, caching, and working with Jira fields.
18 | Field IDs in Jira are crucial for many operations since they can differ across
19 | different Jira instances, especially for custom fields.
20 | """
21 |
22 | _field_name_to_id_map: dict[str, str] | None = None # Cache for name -> id mapping
23 |
24 | def get_fields(self, refresh: bool = False) -> list[dict[str, Any]]:
25 | """
26 | Get all available fields from Jira.
27 |
28 | Args:
29 | refresh: When True, forces a refresh from the server instead of using cache
30 |
31 | Returns:
32 | List of field definitions
33 | """
34 | try:
35 | # Use cached field data if available and refresh is not requested
36 | if self._field_ids_cache is not None and not refresh:
37 | return self._field_ids_cache
38 |
39 | if refresh:
40 | self._field_name_to_id_map = (
41 | None # Clear name map cache if refreshing fields
42 | )
43 |
44 | # Fetch fields from Jira API
45 | fields = self.jira.get_all_fields()
46 | if not isinstance(fields, list):
47 | msg = f"Unexpected return value type from `jira.get_all_fields`: {type(fields)}"
48 | logger.error(msg)
49 | raise TypeError(msg)
50 |
51 | # Cache the fields
52 | self._field_ids_cache = fields
53 |
54 | # Regenerate the name map upon fetching new fields
55 | self._generate_field_map(force_regenerate=True)
56 |
57 | # Log available fields for debugging
58 | self._log_available_fields(fields)
59 |
60 | return fields
61 |
62 | except Exception as e:
63 | logger.error(f"Error getting Jira fields: {str(e)}")
64 | return []
65 |
66 | def _generate_field_map(self, force_regenerate: bool = False) -> dict[str, str]:
67 | """Generates and caches a map of lowercase field names to field IDs."""
68 | if self._field_name_to_id_map is not None and not force_regenerate:
69 | return self._field_name_to_id_map
70 |
71 | # Ensure fields are loaded into cache first
72 | fields = (
73 | self.get_fields()
74 | ) # Uses cache if available unless force_regenerate was True
75 | if not fields:
76 | self._field_name_to_id_map = {}
77 | return {}
78 |
79 | name_map: dict[str, str] = {}
80 | id_map: dict[str, str] = {} # Also map ID to ID for consistency
81 | for field in fields:
82 | field_id = field.get("id")
83 | field_name = field.get("name")
84 | if field_id:
85 | id_map[field_id] = field_id # Map ID to itself
86 | if field_name:
87 | # Store lowercase name -> ID. Handle potential name collisions if necessary.
88 | name_map.setdefault(field_name.lower(), field_id)
89 |
90 | # Combine maps, ensuring IDs can also be looked up directly
91 | self._field_name_to_id_map = name_map | id_map
92 | logger.debug(
93 | f"Generated/Updated field name map: {len(self._field_name_to_id_map)} entries"
94 | )
95 | return self._field_name_to_id_map
96 |
97 | def get_field_id(self, field_name: str, refresh: bool = False) -> str | None:
98 | """
99 | Get the ID for a specific field by name.
100 |
101 | Args:
102 | field_name: The name of the field to look for (case-insensitive)
103 | refresh: When True, forces a refresh from the server
104 |
105 | Returns:
106 | Field ID if found, None otherwise
107 | """
108 | try:
109 | # Ensure the map is generated/cached
110 | field_map = self._generate_field_map(force_regenerate=refresh)
111 | if not field_map:
112 | logger.error("Field map could not be generated.")
113 | return None
114 |
115 | normalized_name = field_name.lower()
116 | if normalized_name in field_map:
117 | return field_map[normalized_name]
118 | # Fallback: Check if the input IS an ID (using original casing)
119 | elif field_name in field_map: # Checks the id_map part
120 | return field_map[field_name]
121 | else:
122 | logger.warning(f"Field '{field_name}' not found in generated map.")
123 | return None
124 |
125 | except Exception as e:
126 | logger.error(f"Error getting field ID for '{field_name}': {str(e)}")
127 | return None
128 |
129 | def get_field_by_id(
130 | self, field_id: str, refresh: bool = False
131 | ) -> dict[str, Any] | None:
132 | """
133 | Get field definition by ID.
134 |
135 | Args:
136 | field_id: The ID of the field to look for
137 | refresh: When True, forces a refresh from the server
138 |
139 | Returns:
140 | Field definition if found, None otherwise
141 | """
142 | try:
143 | fields = self.get_fields(refresh=refresh)
144 |
145 | for field in fields:
146 | if field.get("id") == field_id:
147 | return field
148 |
149 | logger.warning(f"Field with ID '{field_id}' not found")
150 | return None
151 |
152 | except Exception as e:
153 | logger.error(f"Error getting field by ID '{field_id}': {str(e)}")
154 | return None
155 |
156 | def get_custom_fields(self, refresh: bool = False) -> list[dict[str, Any]]:
157 | """
158 | Get all custom fields.
159 |
160 | Args:
161 | refresh: When True, forces a refresh from the server
162 |
163 | Returns:
164 | List of custom field definitions
165 | """
166 | try:
167 | fields = self.get_fields(refresh=refresh)
168 | custom_fields = [
169 | field
170 | for field in fields
171 | if field.get("id", "").startswith("customfield_")
172 | ]
173 |
174 | return custom_fields
175 |
176 | except Exception as e:
177 | logger.error(f"Error getting custom fields: {str(e)}")
178 | return []
179 |
180 | def get_required_fields(self, issue_type: str, project_key: str) -> dict[str, Any]:
181 | """
182 | Get required fields for creating an issue of a specific type in a project.
183 |
184 | Args:
185 | issue_type: The issue type (e.g., 'Bug', 'Story', 'Epic')
186 | project_key: The project key (e.g., 'PROJ')
187 |
188 | Returns:
189 | Dictionary mapping required field names to their definitions
190 | """
191 | # Initialize cache if it doesn't exist
192 | if not hasattr(self, "_required_fields_cache"):
193 | self._required_fields_cache = {}
194 |
195 | # Check cache first
196 | cache_key = (project_key, issue_type)
197 | if cache_key in self._required_fields_cache:
198 | logger.debug(
199 | f"Returning cached required fields for {issue_type} in {project_key}"
200 | )
201 | return self._required_fields_cache[cache_key]
202 |
203 | try:
204 | # Step 1: Get the ID for the given issue type name within the project
205 | if not hasattr(self, "get_project_issue_types"):
206 | logger.error(
207 | "get_project_issue_types method not available. Cannot resolve issue type ID."
208 | )
209 | return {}
210 |
211 | all_issue_types = self.get_project_issue_types(project_key)
212 | issue_type_id = None
213 | for it in all_issue_types:
214 | if it.get("name", "").lower() == issue_type.lower():
215 | issue_type_id = it.get("id")
216 | break
217 |
218 | if not issue_type_id:
219 | logger.warning(
220 | f"Issue type '{issue_type}' not found in project '{project_key}'"
221 | )
222 | return {}
223 |
224 | # Step 2: Call the correct API method to get field metadata
225 | meta = self.jira.issue_createmeta_fieldtypes(
226 | project=project_key, issue_type_id=issue_type_id
227 | )
228 |
229 | required_fields = {}
230 | # Step 3: Parse the response and extract required fields
231 | if isinstance(meta, dict) and "fields" in meta:
232 | if isinstance(meta["fields"], list):
233 | for field_meta in meta["fields"]:
234 | if isinstance(field_meta, dict) and field_meta.get(
235 | "required", False
236 | ):
237 | field_id = field_meta.get("fieldId")
238 | if field_id:
239 | required_fields[field_id] = field_meta
240 | else:
241 | logger.warning(
242 | "Unexpected format for 'fields' in createmeta response."
243 | )
244 |
245 | if not required_fields:
246 | logger.warning(
247 | f"No required fields found for issue type '{issue_type}' "
248 | f"in project '{project_key}'"
249 | )
250 |
251 | # Cache the result before returning
252 | self._required_fields_cache[cache_key] = required_fields
253 | logger.debug(
254 | f"Cached required fields for {issue_type} in {project_key}: "
255 | f"{len(required_fields)} fields"
256 | )
257 |
258 | return required_fields
259 |
260 | except Exception as e:
261 | logger.error(
262 | f"Error getting required fields for issue type '{issue_type}' "
263 | f"in project '{project_key}': {str(e)}"
264 | )
265 | return {}
266 |
267 | def get_field_ids_to_epic(self) -> dict[str, str]:
268 | """
269 | Dynamically discover Jira field IDs relevant to Epic linking.
270 | This method queries the Jira API to find the correct custom field IDs
271 | for Epic-related fields, which can vary between different Jira instances.
272 |
273 | Returns:
274 | Dictionary mapping field names to their IDs
275 | (e.g., {'epic_link': 'customfield_10014', 'epic_name': 'customfield_10011'})
276 | """
277 | try:
278 | # Ensure field list and map are cached/generated
279 | self._generate_field_map() # Generates map and ensures fields are cached
280 |
281 | # Get all fields (uses cache if available)
282 | fields = self.get_fields()
283 | if not fields: # Check if get_fields failed or returned empty
284 | logger.error(
285 | "Could not load field definitions for epic field discovery."
286 | )
287 | return {}
288 |
289 | field_ids = {}
290 |
291 | # Log the complete list of fields for debugging
292 | all_field_names = [field.get("name", "").lower() for field in fields]
293 | logger.debug(f"All field names: {all_field_names}")
294 |
295 | # Enhanced logging for debugging
296 | custom_fields = {
297 | field.get("id", ""): field.get("name", "")
298 | for field in fields
299 | if field.get("id", "").startswith("customfield_")
300 | }
301 | logger.debug(f"Custom fields: {custom_fields}")
302 |
303 | # Look for Epic-related fields - use multiple strategies to identify them
304 | for field in fields:
305 | field_name = field.get("name", "").lower()
306 | original_name = field.get("name", "")
307 | field_id = field.get("id", "")
308 | field_schema = field.get("schema", {})
309 | field_custom = field_schema.get("custom", "")
310 |
311 | if original_name and field_id:
312 | field_ids[original_name] = field_id
313 |
314 | # Epic Link field - used to link issues to epics
315 | if (
316 | field_name == "epic link"
317 | or field_name == "epic"
318 | or "epic link" in field_name
319 | or field_custom == "com.pyxis.greenhopper.jira:gh-epic-link"
320 | or field_id == "customfield_10014"
321 | ): # Common in Jira Cloud
322 | field_ids["epic_link"] = field_id
323 | # For backward compatibility
324 | field_ids["Epic Link"] = field_id
325 | logger.debug(f"Found Epic Link field: {field_id} ({original_name})")
326 |
327 | # Epic Name field - used when creating epics
328 | elif (
329 | field_name == "epic name"
330 | or field_name == "epic title"
331 | or "epic name" in field_name
332 | or field_custom == "com.pyxis.greenhopper.jira:gh-epic-label"
333 | or field_id == "customfield_10011"
334 | ): # Common in Jira Cloud
335 | field_ids["epic_name"] = field_id
336 | # For backward compatibility
337 | field_ids["Epic Name"] = field_id
338 | logger.debug(f"Found Epic Name field: {field_id} ({original_name})")
339 |
340 | # Epic Status field
341 | elif (
342 | field_name == "epic status"
343 | or "epic status" in field_name
344 | or field_custom == "com.pyxis.greenhopper.jira:gh-epic-status"
345 | ):
346 | field_ids["epic_status"] = field_id
347 | logger.debug(
348 | f"Found Epic Status field: {field_id} ({original_name})"
349 | )
350 |
351 | # Epic Color field
352 | elif (
353 | field_name == "epic color"
354 | or field_name == "epic colour"
355 | or "epic color" in field_name
356 | or "epic colour" in field_name
357 | or field_custom == "com.pyxis.greenhopper.jira:gh-epic-color"
358 | ):
359 | field_ids["epic_color"] = field_id
360 | logger.debug(
361 | f"Found Epic Color field: {field_id} ({original_name})"
362 | )
363 |
364 | # Parent field - sometimes used instead of Epic Link
365 | elif (
366 | field_name == "parent"
367 | or field_name == "parent issue"
368 | or "parent issue" in field_name
369 | ):
370 | field_ids["parent"] = field_id
371 | logger.debug(f"Found Parent field: {field_id} ({original_name})")
372 |
373 | # Try to detect any other fields that might be related to Epics
374 | elif "epic" in field_name and field_id.startswith("customfield_"):
375 | key = f"epic_{field_name.replace(' ', '_').replace('-', '_')}"
376 | field_ids[key] = field_id
377 | logger.debug(
378 | f"Found potential Epic-related field: {field_id} ({original_name})"
379 | )
380 |
381 | # If we couldn't find certain key fields, try alternative approaches
382 | if "epic_name" not in field_ids or "epic_link" not in field_ids:
383 | logger.debug(
384 | "Standard field search didn't find all Epic fields, trying alternative approaches"
385 | )
386 | self._try_discover_fields_from_existing_epic(field_ids)
387 |
388 | logger.debug(f"Discovered field IDs: {field_ids}")
389 |
390 | return field_ids
391 |
392 | except Exception as e:
393 | logger.error(f"Error discovering Jira field IDs: {str(e)}")
394 | # Return an empty dict as fallback
395 | return {}
396 |
397 | def _log_available_fields(self, fields: list[dict]) -> None:
398 | """
399 | Log available fields for debugging.
400 |
401 | Args:
402 | fields: List of field definitions
403 | """
404 | logger.debug("Available Jira fields:")
405 | for field in fields:
406 | field_id = field.get("id", "")
407 | name = field.get("name", "")
408 | field_type = field.get("schema", {}).get("type", "")
409 | logger.debug(f"{field_id}: {name} ({field_type})")
410 |
411 | def is_custom_field(self, field_id: str) -> bool:
412 | """
413 | Check if a field is a custom field.
414 |
415 | Args:
416 | field_id: The field ID to check
417 |
418 | Returns:
419 | True if it's a custom field, False otherwise
420 | """
421 | return field_id.startswith("customfield_")
422 |
423 | def format_field_value(self, field_id: str, value: Any) -> Any:
424 | """
425 | Format a field value based on its type for update operations.
426 |
427 | Different field types in Jira require different JSON formats when updating.
428 | This method helps format the value correctly for the specific field type.
429 |
430 | Args:
431 | field_id: The ID of the field
432 | value: The value to format
433 |
434 | Returns:
435 | Properly formatted value for the field
436 | """
437 | try:
438 | # Get field definition
439 | field = self.get_field_by_id(field_id)
440 |
441 | if not field:
442 | # For unknown fields, return value as-is
443 | return value
444 |
445 | field_type = field.get("schema", {}).get("type")
446 |
447 | # Format based on field type
448 | if field_type == "user":
449 | # Handle user fields - need accountId for cloud or name for server
450 | if isinstance(value, str):
451 | try:
452 | account_id = self._get_account_id(value)
453 | return {"accountId": account_id}
454 | except Exception as e:
455 | logger.warning(f"Could not resolve user '{value}': {str(e)}")
456 | return value
457 | else:
458 | return value
459 |
460 | elif field_type == "array":
461 | # Handle array fields - convert single value to list if needed
462 | if not isinstance(value, list):
463 | return [value]
464 | return value
465 |
466 | elif field_type == "option":
467 | # Handle option fields - convert to {"value": value} format
468 | if isinstance(value, str):
469 | return {"value": value}
470 | return value
471 |
472 | # For other types, return as-is
473 | return value
474 |
475 | except Exception as e:
476 | logger.warning(f"Error formatting field value for '{field_id}': {str(e)}")
477 | return value
478 |
479 | def search_fields(
480 | self, keyword: str, limit: int = 10, *, refresh: bool = False
481 | ) -> list[dict[str, Any]]:
482 | """
483 | Search fields using fuzzy matching.
484 |
485 | Args:
486 | keyword: The search keyword
487 | limit: Maximum number of results to return (default: 10)
488 | refresh: When True, forces a refresh from the server
489 |
490 | Returns:
491 | List of matching field definitions, sorted by relevance
492 | """
493 | try:
494 | # Get all fields
495 | fields = self.get_fields(refresh=refresh)
496 |
497 | # if keyword is empty, return `limit` fields
498 | if not keyword:
499 | return fields[:limit]
500 |
501 | def similarity(keyword: str, field: dict) -> int:
502 | """Calculate similarity score between keyword and field."""
503 | name_candidates = [
504 | field.get("id", ""),
505 | field.get("key", ""),
506 | field.get("name", ""),
507 | *field.get("clauseNames", []),
508 | ]
509 |
510 | # Calculate the fuzzy match score
511 | return max(
512 | fuzz.partial_ratio(keyword.lower(), name.lower())
513 | for name in name_candidates
514 | )
515 |
516 | # Sort by similarity
517 | sorted_fields = sorted(
518 | fields, key=lambda x: similarity(keyword, x), reverse=True
519 | )
520 |
521 | # Return the top limit results
522 | return sorted_fields[:limit]
523 |
524 | except Exception as e:
525 | logger.error(f"Error searching fields: {str(e)}")
526 | return []
527 |
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_search.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for the SearchMixin class."""
2 |
3 | from unittest.mock import MagicMock, patch
4 |
5 | import pytest
6 | import requests
7 | from requests import HTTPError
8 |
9 | from mcp_atlassian.confluence.search import SearchMixin
10 | from mcp_atlassian.confluence.utils import quote_cql_identifier_if_needed
11 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
12 |
13 |
14 | class TestSearchMixin:
15 | """Tests for the SearchMixin class."""
16 |
17 | @pytest.fixture
18 | def search_mixin(self, confluence_client):
19 | """Create a SearchMixin instance for testing."""
20 | # SearchMixin inherits from ConfluenceClient, so we need to create it properly
21 | with patch(
22 | "mcp_atlassian.confluence.search.ConfluenceClient.__init__"
23 | ) as mock_init:
24 | mock_init.return_value = None
25 | mixin = SearchMixin()
26 | # Copy the necessary attributes from our mocked client
27 | mixin.confluence = confluence_client.confluence
28 | mixin.config = confluence_client.config
29 | mixin.preprocessor = confluence_client.preprocessor
30 | return mixin
31 |
32 | def test_search_success(self, search_mixin):
33 | """Test search with successful results."""
34 | # Prepare the mock
35 | search_mixin.confluence.cql.return_value = {
36 | "results": [
37 | {
38 | "content": {
39 | "id": "123456789",
40 | "title": "Test Page",
41 | "type": "page",
42 | "space": {"key": "SPACE", "name": "Test Space"},
43 | "version": {"number": 1},
44 | },
45 | "excerpt": "Test content excerpt",
46 | "url": "https://confluence.example.com/pages/123456789",
47 | }
48 | ]
49 | }
50 |
51 | # Mock the preprocessor to return processed content
52 | search_mixin.preprocessor.process_html_content.return_value = (
53 | "<p>Processed HTML</p>",
54 | "Processed content",
55 | )
56 |
57 | # Call the method
58 | result = search_mixin.search("test query")
59 |
60 | # Verify API call
61 | search_mixin.confluence.cql.assert_called_once_with(cql="test query", limit=10)
62 |
63 | # Verify result
64 | assert len(result) == 1
65 | assert result[0].id == "123456789"
66 | assert result[0].title == "Test Page"
67 | assert result[0].content == "Processed content"
68 |
69 | def test_search_with_empty_results(self, search_mixin):
70 | """Test handling of empty search results."""
71 | # Mock an empty result set
72 | search_mixin.confluence.cql.return_value = {"results": []}
73 |
74 | # Act
75 | results = search_mixin.search("empty query")
76 |
77 | # Assert
78 | assert isinstance(results, list)
79 | assert len(results) == 0
80 |
81 | def test_search_with_non_page_content(self, search_mixin):
82 | """Test handling of non-page content in search results."""
83 | # Mock search results with non-page content
84 | search_mixin.confluence.cql.return_value = {
85 | "results": [
86 | {
87 | "content": {"type": "blogpost", "id": "12345"},
88 | "title": "Blog Post",
89 | "excerpt": "This is a blog post",
90 | "url": "/pages/12345",
91 | "resultGlobalContainer": {"title": "TEST"},
92 | }
93 | ]
94 | }
95 |
96 | # Act
97 | results = search_mixin.search("blogpost query")
98 |
99 | # Assert
100 | assert isinstance(results, list)
101 | # The method should still handle them as pages since we're using models
102 | assert len(results) > 0
103 |
104 | def test_search_key_error(self, search_mixin):
105 | """Test handling of KeyError in search results."""
106 | # Mock a response missing required keys
107 | search_mixin.confluence.cql.return_value = {"incomplete": "data"}
108 |
109 | # Act
110 | results = search_mixin.search("invalid query")
111 |
112 | # Assert
113 | assert isinstance(results, list)
114 | assert len(results) == 0
115 |
116 | def test_search_request_exception(self, search_mixin):
117 | """Test handling of RequestException during search."""
118 | # Mock a network error
119 | search_mixin.confluence.cql.side_effect = requests.RequestException("API error")
120 |
121 | # Act
122 | results = search_mixin.search("error query")
123 |
124 | # Assert
125 | assert isinstance(results, list)
126 | assert len(results) == 0
127 |
128 | def test_search_value_error(self, search_mixin):
129 | """Test handling of ValueError during search."""
130 | # Mock a value error
131 | search_mixin.confluence.cql.side_effect = ValueError("Value error")
132 |
133 | # Act
134 | results = search_mixin.search("error query")
135 |
136 | # Assert
137 | assert isinstance(results, list)
138 | assert len(results) == 0
139 |
140 | def test_search_type_error(self, search_mixin):
141 | """Test handling of TypeError during search."""
142 | # Mock a type error
143 | search_mixin.confluence.cql.side_effect = TypeError("Type error")
144 |
145 | # Act
146 | results = search_mixin.search("error query")
147 |
148 | # Assert
149 | assert isinstance(results, list)
150 | assert len(results) == 0
151 |
152 | def test_search_with_spaces_filter(self, search_mixin):
153 | """Test searching with spaces filter from parameter."""
154 | # Prepare the mock
155 | search_mixin.confluence.cql.return_value = {
156 | "results": [
157 | {
158 | "content": {
159 | "id": "123456789",
160 | "title": "Test Page",
161 | "type": "page",
162 | "space": {"key": "SPACE", "name": "Test Space"},
163 | "version": {"number": 1},
164 | },
165 | "excerpt": "Test content excerpt",
166 | "url": "https://confluence.example.com/pages/123456789",
167 | }
168 | ]
169 | }
170 |
171 | # Mock the preprocessor
172 | search_mixin.preprocessor.process_html_content.return_value = (
173 | "<p>Processed HTML</p>",
174 | "Processed content",
175 | )
176 |
177 | # Test with single space filter
178 | result = search_mixin.search("test query", spaces_filter="DEV")
179 |
180 | # Verify space was properly quoted in the CQL query
181 | quoted_dev = quote_cql_identifier_if_needed("DEV")
182 | search_mixin.confluence.cql.assert_called_with(
183 | cql=f"(test query) AND (space = {quoted_dev})",
184 | limit=10,
185 | )
186 | assert len(result) == 1
187 |
188 | # Test with multiple spaces filter
189 | result = search_mixin.search("test query", spaces_filter="DEV,TEAM")
190 |
191 | # Verify spaces were properly quoted in the CQL query
192 | quoted_dev = quote_cql_identifier_if_needed("DEV")
193 | quoted_team = quote_cql_identifier_if_needed("TEAM")
194 | search_mixin.confluence.cql.assert_called_with(
195 | cql=f"(test query) AND (space = {quoted_dev} OR space = {quoted_team})",
196 | limit=10,
197 | )
198 | assert len(result) == 1
199 |
200 | # Test with filter when query already has space
201 | result = search_mixin.search('space = "EXISTING"', spaces_filter="DEV")
202 | search_mixin.confluence.cql.assert_called_with(
203 | cql='space = "EXISTING"', # Should not add filter when space already exists
204 | limit=10,
205 | )
206 | assert len(result) == 1
207 |
208 | def test_search_with_config_spaces_filter(self, search_mixin):
209 | """Test search using spaces filter from config."""
210 | # Prepare the mock
211 | search_mixin.confluence.cql.return_value = {
212 | "results": [
213 | {
214 | "content": {
215 | "id": "123456789",
216 | "title": "Test Page",
217 | "type": "page",
218 | "space": {"key": "SPACE", "name": "Test Space"},
219 | "version": {"number": 1},
220 | },
221 | "excerpt": "Test content excerpt",
222 | "url": "https://confluence.example.com/pages/123456789",
223 | }
224 | ]
225 | }
226 |
227 | # Mock the preprocessor
228 | search_mixin.preprocessor.process_html_content.return_value = (
229 | "<p>Processed HTML</p>",
230 | "Processed content",
231 | )
232 |
233 | # Set config filter
234 | search_mixin.config.spaces_filter = "DEV,TEAM"
235 |
236 | # Test with config filter
237 | result = search_mixin.search("test query")
238 |
239 | # Verify spaces were properly quoted in the CQL query
240 | quoted_dev = quote_cql_identifier_if_needed("DEV")
241 | quoted_team = quote_cql_identifier_if_needed("TEAM")
242 | search_mixin.confluence.cql.assert_called_with(
243 | cql=f"(test query) AND (space = {quoted_dev} OR space = {quoted_team})",
244 | limit=10,
245 | )
246 | assert len(result) == 1
247 |
248 | # Test that explicit filter overrides config filter
249 | result = search_mixin.search("test query", spaces_filter="OVERRIDE")
250 |
251 | # Verify space was properly quoted in the CQL query
252 | quoted_override = quote_cql_identifier_if_needed("OVERRIDE")
253 | search_mixin.confluence.cql.assert_called_with(
254 | cql=f"(test query) AND (space = {quoted_override})",
255 | limit=10,
256 | )
257 | assert len(result) == 1
258 |
259 | def test_search_general_exception(self, search_mixin):
260 | """Test handling of general exceptions during search."""
261 | # Mock a general exception
262 | search_mixin.confluence.cql.side_effect = Exception("General error")
263 |
264 | # Act
265 | results = search_mixin.search("error query")
266 |
267 | # Assert
268 | assert isinstance(results, list)
269 | assert len(results) == 0
270 |
271 | def test_search_user_success(self, search_mixin):
272 | """Test search_user with successful results."""
273 | # Prepare the mock response
274 | search_mixin.confluence.get.return_value = {
275 | "results": [
276 | {
277 | "user": {
278 | "type": "known",
279 | "accountId": "1234asdf",
280 | "accountType": "atlassian",
281 | "email": "[email protected]",
282 | "publicName": "First Last",
283 | "displayName": "First Last",
284 | "isExternalCollaborator": False,
285 | "profilePicture": {
286 | "path": "/wiki/aa-avatar/1234asdf",
287 | "width": 48,
288 | "height": 48,
289 | "isDefault": False,
290 | },
291 | },
292 | "title": "First Last",
293 | "excerpt": "",
294 | "url": "/people/1234asdf",
295 | "entityType": "user",
296 | "lastModified": "2025-06-02T13:35:59.680Z",
297 | "score": 0.0,
298 | }
299 | ],
300 | "start": 0,
301 | "limit": 25,
302 | "size": 1,
303 | "totalSize": 1,
304 | "cqlQuery": "( user.fullname ~ 'First Last' )",
305 | "searchDuration": 115,
306 | }
307 |
308 | # Call the method
309 | result = search_mixin.search_user('user.fullname ~ "First Last"')
310 |
311 | # Verify API call
312 | search_mixin.confluence.get.assert_called_once_with(
313 | "rest/api/search/user",
314 | params={"cql": 'user.fullname ~ "First Last"', "limit": 10},
315 | )
316 |
317 | # Verify result
318 | assert len(result) == 1
319 | assert result[0].user.account_id == "1234asdf"
320 | assert result[0].user.display_name == "First Last"
321 | assert result[0].user.email == "[email protected]"
322 | assert result[0].title == "First Last"
323 | assert result[0].entity_type == "user"
324 |
325 | def test_search_user_with_empty_results(self, search_mixin):
326 | """Test search_user with empty results."""
327 | # Mock an empty result set
328 | search_mixin.confluence.get.return_value = {
329 | "results": [],
330 | "start": 0,
331 | "limit": 25,
332 | "size": 0,
333 | "totalSize": 0,
334 | "cqlQuery": 'user.fullname ~ "Nonexistent"',
335 | "searchDuration": 50,
336 | }
337 |
338 | # Act
339 | results = search_mixin.search_user('user.fullname ~ "Nonexistent"')
340 |
341 | # Assert
342 | assert isinstance(results, list)
343 | assert len(results) == 0
344 |
345 | def test_search_user_with_custom_limit(self, search_mixin):
346 | """Test search_user with custom limit."""
347 | # Prepare the mock response
348 | search_mixin.confluence.get.return_value = {
349 | "results": [],
350 | "start": 0,
351 | "limit": 5,
352 | "size": 0,
353 | "totalSize": 0,
354 | "cqlQuery": 'user.fullname ~ "Test"',
355 | "searchDuration": 30,
356 | }
357 |
358 | # Call with custom limit
359 | search_mixin.search_user('user.fullname ~ "Test"', limit=5)
360 |
361 | # Verify API call with correct limit
362 | search_mixin.confluence.get.assert_called_once_with(
363 | "rest/api/search/user", params={"cql": 'user.fullname ~ "Test"', "limit": 5}
364 | )
365 |
366 | @pytest.mark.parametrize(
367 | "exception_type,exception_args,expected_result",
368 | [
369 | (requests.RequestException, ("Network error",), []),
370 | (ValueError, ("Value error",), []),
371 | (TypeError, ("Type error",), []),
372 | (Exception, ("General error",), []),
373 | (KeyError, ("Missing key",), []),
374 | ],
375 | )
376 | def test_search_user_exception_handling(
377 | self, search_mixin, exception_type, exception_args, expected_result
378 | ):
379 | """Test search_user handling of various exceptions that return empty list."""
380 | # Mock the exception
381 | search_mixin.confluence.get.side_effect = exception_type(*exception_args)
382 |
383 | # Act
384 | results = search_mixin.search_user('user.fullname ~ "Test"')
385 |
386 | # Assert
387 | assert isinstance(results, list)
388 | assert results == expected_result
389 |
390 | @pytest.mark.parametrize(
391 | "status_code,exception_type",
392 | [
393 | (401, MCPAtlassianAuthenticationError),
394 | (403, MCPAtlassianAuthenticationError),
395 | ],
396 | )
397 | def test_search_user_http_auth_errors(
398 | self, search_mixin, status_code, exception_type
399 | ):
400 | """Test search_user handling of HTTP authentication errors."""
401 | # Mock HTTP error
402 | mock_response = MagicMock()
403 | mock_response.status_code = status_code
404 | http_error = HTTPError(f"HTTP {status_code}")
405 | http_error.response = mock_response
406 | search_mixin.confluence.get.side_effect = http_error
407 |
408 | # Act and assert
409 | with pytest.raises(exception_type):
410 | search_mixin.search_user('user.fullname ~ "Test"')
411 |
412 | def test_search_user_http_other_error(self, search_mixin):
413 | """Test search_user handling of other HTTP errors."""
414 | # Mock HTTP 500 error
415 | mock_response = MagicMock()
416 | mock_response.status_code = 500
417 | http_error = HTTPError("Internal Server Error")
418 | http_error.response = mock_response
419 | search_mixin.confluence.get.side_effect = http_error
420 |
421 | # Act and assert - should re-raise the HTTPError
422 | with pytest.raises(HTTPError):
423 | search_mixin.search_user('user.fullname ~ "Test"')
424 |
425 | @pytest.mark.parametrize(
426 | "mock_response,expected_length",
427 | [
428 | ({"incomplete": "data"}, 0), # KeyError case
429 | (None, 0), # None response case
430 | ({"results": []}, 0), # Empty results case
431 | ],
432 | )
433 | def test_search_user_edge_cases(self, search_mixin, mock_response, expected_length):
434 | """Test search_user handling of edge cases in API responses."""
435 | search_mixin.confluence.get.return_value = mock_response
436 |
437 | # Act
438 | results = search_mixin.search_user('user.fullname ~ "Test"')
439 |
440 | # Assert
441 | assert isinstance(results, list)
442 | assert len(results) == expected_length
443 |
444 | # You can also parametrize the regular search method exception tests:
445 | @pytest.mark.parametrize(
446 | "exception_type,exception_args,expected_result",
447 | [
448 | (requests.RequestException, ("API error",), []),
449 | (ValueError, ("Value error",), []),
450 | (TypeError, ("Type error",), []),
451 | (Exception, ("General error",), []),
452 | (KeyError, ("Missing key",), []),
453 | ],
454 | )
455 | def test_search_exception_handling(
456 | self, search_mixin, exception_type, exception_args, expected_result
457 | ):
458 | """Test search handling of various exceptions that return empty list."""
459 | # Mock the exception
460 | search_mixin.confluence.cql.side_effect = exception_type(*exception_args)
461 |
462 | # Act
463 | results = search_mixin.search("error query")
464 |
465 | # Assert
466 | assert isinstance(results, list)
467 | assert results == expected_result
468 |
469 | # Parametrize CQL query tests:
470 | @pytest.mark.parametrize(
471 | "query,limit,expected_params",
472 | [
473 | (
474 | 'user.fullname ~ "Test"',
475 | 10,
476 | {"cql": 'user.fullname ~ "Test"', "limit": 10},
477 | ),
478 | (
479 | 'user.email ~ "[email protected]"',
480 | 5,
481 | {"cql": 'user.email ~ "[email protected]"', "limit": 5},
482 | ),
483 | (
484 | 'user.fullname ~ "John" AND user.email ~ "@company.com"',
485 | 15,
486 | {
487 | "cql": 'user.fullname ~ "John" AND user.email ~ "@company.com"',
488 | "limit": 15,
489 | },
490 | ),
491 | ],
492 | )
493 | def test_search_user_api_parameters(
494 | self, search_mixin, query, limit, expected_params
495 | ):
496 | """Test that search_user calls the API with correct parameters."""
497 | # Mock successful response
498 | search_mixin.confluence.get.return_value = {
499 | "results": [],
500 | "start": 0,
501 | "limit": limit,
502 | "totalSize": 0,
503 | }
504 |
505 | # Act
506 | search_mixin.search_user(query, limit=limit)
507 |
508 | # Assert API was called with correct parameters
509 | search_mixin.confluence.get.assert_called_once_with(
510 | "rest/api/search/user", params=expected_params
511 | )
512 |
513 | def test_search_user_with_complex_cql_query(self, search_mixin):
514 | """Test search_user with complex CQL query containing operators."""
515 | # Mock successful response
516 | search_mixin.confluence.get.return_value = {
517 | "results": [],
518 | "start": 0,
519 | "limit": 10,
520 | "totalSize": 0,
521 | }
522 |
523 | complex_query = 'user.fullname ~ "John" AND user.email ~ "@company.com" OR user.displayName ~ "JD"'
524 |
525 | # Act
526 | search_mixin.search_user(complex_query)
527 |
528 | # Assert API was called with the exact query
529 | search_mixin.confluence.get.assert_called_once_with(
530 | "rest/api/search/user", params={"cql": complex_query, "limit": 10}
531 | )
532 |
533 | def test_search_user_result_processing(self, search_mixin):
534 | """Test that search_user properly processes and returns user search result objects."""
535 | # Mock response with user data
536 | search_mixin.confluence.get.return_value = {
537 | "results": [
538 | {
539 | "user": {
540 | "accountId": "test-account-id",
541 | "displayName": "Test User",
542 | "email": "[email protected]",
543 | "isExternalCollaborator": False,
544 | },
545 | "title": "Test User",
546 | "entityType": "user",
547 | "score": 1.5,
548 | }
549 | ],
550 | "start": 0,
551 | "limit": 10,
552 | "totalSize": 1,
553 | }
554 |
555 | # Act
556 | results = search_mixin.search_user('user.fullname ~ "Test User"')
557 |
558 | # Assert result structure
559 | assert len(results) == 1
560 | assert hasattr(results[0], "user")
561 | assert hasattr(results[0], "title")
562 | assert hasattr(results[0], "entity_type")
563 | assert results[0].user.account_id == "test-account-id"
564 | assert results[0].user.display_name == "Test User"
565 | assert results[0].title == "Test User"
566 | assert results[0].entity_type == "user"
567 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/oauth.py:
--------------------------------------------------------------------------------
```python
1 | """OAuth 2.0 utilities for Atlassian Cloud authentication.
2 |
3 | This module provides utilities for OAuth 2.0 (3LO) authentication with Atlassian Cloud.
4 | It handles:
5 | - OAuth configuration
6 | - Token acquisition, storage, and refresh
7 | - Session configuration for API clients
8 | """
9 |
10 | import json
11 | import logging
12 | import os
13 | import pprint
14 | import time
15 | import urllib.parse
16 | from dataclasses import dataclass
17 | from pathlib import Path
18 | from typing import Any, Optional
19 |
20 | import keyring
21 | import requests
22 |
23 | # Configure logging
24 | logger = logging.getLogger("mcp-atlassian.oauth")
25 |
26 | # Constants
27 | TOKEN_URL = "https://auth.atlassian.com/oauth/token" # noqa: S105 - This is a public API endpoint URL, not a password
28 | AUTHORIZE_URL = "https://auth.atlassian.com/authorize"
29 | CLOUD_ID_URL = "https://api.atlassian.com/oauth/token/accessible-resources"
30 | TOKEN_EXPIRY_MARGIN = 300 # 5 minutes in seconds
31 | KEYRING_SERVICE_NAME = "mcp-atlassian-oauth"
32 |
33 |
34 | @dataclass
35 | class OAuthConfig:
36 | """OAuth 2.0 configuration for Atlassian Cloud.
37 |
38 | This class manages the OAuth configuration and tokens. It handles:
39 | - Authentication configuration (client credentials)
40 | - Token acquisition and refreshing
41 | - Token storage and retrieval
42 | - Cloud ID identification
43 | """
44 |
45 | client_id: str
46 | client_secret: str
47 | redirect_uri: str
48 | scope: str
49 | cloud_id: str | None = None
50 | refresh_token: str | None = None
51 | access_token: str | None = None
52 | expires_at: float | None = None
53 |
54 | @property
55 | def is_token_expired(self) -> bool:
56 | """Check if the access token is expired or will expire soon.
57 |
58 | Returns:
59 | True if the token is expired or will expire soon, False otherwise.
60 | """
61 | # If we don't have a token or expiry time, consider it expired
62 | if not self.access_token or not self.expires_at:
63 | return True
64 |
65 | # Consider the token expired if it will expire within the margin
66 | return time.time() + TOKEN_EXPIRY_MARGIN >= self.expires_at
67 |
68 | def get_authorization_url(self, state: str) -> str:
69 | """Get the authorization URL for the OAuth 2.0 flow.
70 |
71 | Args:
72 | state: Random state string for CSRF protection
73 |
74 | Returns:
75 | The authorization URL to redirect the user to.
76 | """
77 | params = {
78 | "audience": "api.atlassian.com",
79 | "client_id": self.client_id,
80 | "scope": self.scope,
81 | "redirect_uri": self.redirect_uri,
82 | "response_type": "code",
83 | "prompt": "consent",
84 | "state": state,
85 | }
86 | return f"{AUTHORIZE_URL}?{urllib.parse.urlencode(params)}"
87 |
88 | def exchange_code_for_tokens(self, code: str) -> bool:
89 | """Exchange the authorization code for access and refresh tokens.
90 |
91 | Args:
92 | code: The authorization code from the callback
93 |
94 | Returns:
95 | True if tokens were successfully acquired, False otherwise.
96 | """
97 | try:
98 | payload = {
99 | "grant_type": "authorization_code",
100 | "client_id": self.client_id,
101 | "client_secret": self.client_secret,
102 | "code": code,
103 | "redirect_uri": self.redirect_uri,
104 | }
105 |
106 | logger.info(f"Exchanging authorization code for tokens at {TOKEN_URL}")
107 | logger.debug(f"Token exchange payload: {pprint.pformat(payload)}")
108 |
109 | response = requests.post(TOKEN_URL, data=payload)
110 |
111 | # Log more details about the response
112 | logger.debug(f"Token exchange response status: {response.status_code}")
113 | logger.debug(
114 | f"Token exchange response headers: {pprint.pformat(response.headers)}"
115 | )
116 | logger.debug(f"Token exchange response body: {response.text[:500]}...")
117 |
118 | if not response.ok:
119 | logger.error(
120 | f"Token exchange failed with status {response.status_code}. Response: {response.text}"
121 | )
122 | return False
123 |
124 | # Parse the response
125 | token_data = response.json()
126 |
127 | # Check if required tokens are present
128 | if "access_token" not in token_data:
129 | logger.error(
130 | f"Access token not found in response. Keys found: {list(token_data.keys())}"
131 | )
132 | return False
133 |
134 | if "refresh_token" not in token_data:
135 | logger.error(
136 | "Refresh token not found in response. Ensure 'offline_access' scope is included. "
137 | f"Keys found: {list(token_data.keys())}"
138 | )
139 | return False
140 |
141 | self.access_token = token_data["access_token"]
142 | self.refresh_token = token_data["refresh_token"]
143 | self.expires_at = time.time() + token_data["expires_in"]
144 |
145 | # Get the cloud ID using the access token
146 | self._get_cloud_id()
147 |
148 | # Save the tokens
149 | self._save_tokens()
150 |
151 | # Log success message with token details
152 | logger.info(
153 | f"✅ OAuth token exchange successful! Access token expires in {token_data['expires_in']}s."
154 | )
155 | logger.info(
156 | f"Access Token (partial): {self.access_token[:10]}...{self.access_token[-5:] if self.access_token else ''}"
157 | )
158 | logger.info(
159 | f"Refresh Token (partial): {self.refresh_token[:5]}...{self.refresh_token[-3:] if self.refresh_token else ''}"
160 | )
161 | if self.cloud_id:
162 | logger.info(f"Cloud ID successfully retrieved: {self.cloud_id}")
163 | else:
164 | logger.warning(
165 | "Cloud ID was not retrieved after token exchange. Check accessible resources."
166 | )
167 | return True
168 | except requests.exceptions.RequestException as e:
169 | logger.error(f"Network error during token exchange: {e}", exc_info=True)
170 | return False
171 | except json.JSONDecodeError as e:
172 | logger.error(
173 | f"Failed to decode JSON response from token endpoint: {e}",
174 | exc_info=True,
175 | )
176 | logger.error(
177 | f"Response text that failed to parse: {response.text if 'response' in locals() else 'Response object not available'}"
178 | )
179 | return False
180 | except Exception as e:
181 | logger.error(f"Failed to exchange code for tokens: {e}")
182 | return False
183 |
184 | def refresh_access_token(self) -> bool:
185 | """Refresh the access token using the refresh token.
186 |
187 | Returns:
188 | True if the token was successfully refreshed, False otherwise.
189 | """
190 | if not self.refresh_token:
191 | logger.error("No refresh token available")
192 | return False
193 |
194 | try:
195 | payload = {
196 | "grant_type": "refresh_token",
197 | "client_id": self.client_id,
198 | "client_secret": self.client_secret,
199 | "refresh_token": self.refresh_token,
200 | }
201 |
202 | logger.debug("Refreshing access token...")
203 | response = requests.post(TOKEN_URL, data=payload)
204 | response.raise_for_status()
205 |
206 | # Parse the response
207 | token_data = response.json()
208 | self.access_token = token_data["access_token"]
209 | # Refresh token might also be rotated
210 | if "refresh_token" in token_data:
211 | self.refresh_token = token_data["refresh_token"]
212 | self.expires_at = time.time() + token_data["expires_in"]
213 |
214 | # Save the tokens
215 | self._save_tokens()
216 |
217 | return True
218 | except Exception as e:
219 | logger.error(f"Failed to refresh access token: {e}")
220 | return False
221 |
222 | def ensure_valid_token(self) -> bool:
223 | """Ensure the access token is valid, refreshing if necessary.
224 |
225 | Returns:
226 | True if the token is valid (or was refreshed successfully), False otherwise.
227 | """
228 | if not self.is_token_expired:
229 | return True
230 | return self.refresh_access_token()
231 |
232 | def _get_cloud_id(self) -> None:
233 | """Get the cloud ID for the Atlassian instance.
234 |
235 | This method queries the accessible resources endpoint to get the cloud ID.
236 | The cloud ID is needed for API calls with OAuth.
237 | """
238 | if not self.access_token:
239 | logger.debug("No access token available to get cloud ID")
240 | return
241 |
242 | try:
243 | headers = {"Authorization": f"Bearer {self.access_token}"}
244 | response = requests.get(CLOUD_ID_URL, headers=headers)
245 | response.raise_for_status()
246 |
247 | resources = response.json()
248 | if resources and len(resources) > 0:
249 | # Use the first cloud site (most users have only one)
250 | # For users with multiple sites, they might need to specify which one to use
251 | self.cloud_id = resources[0]["id"]
252 | logger.debug(f"Found cloud ID: {self.cloud_id}")
253 | else:
254 | logger.warning("No Atlassian sites found in the response")
255 | except Exception as e:
256 | logger.error(f"Failed to get cloud ID: {e}")
257 |
258 | def _get_keyring_username(self) -> str:
259 | """Get the keyring username for storing tokens.
260 |
261 | The username is based on the client ID to allow multiple OAuth apps.
262 |
263 | Returns:
264 | A username string for keyring
265 | """
266 | return f"oauth-{self.client_id}"
267 |
268 | def _save_tokens(self) -> None:
269 | """Save the tokens securely using keyring for later use.
270 |
271 | This allows the tokens to be reused between runs without requiring
272 | the user to go through the authorization flow again.
273 | """
274 | try:
275 | username = self._get_keyring_username()
276 |
277 | # Store token data as JSON string in keyring
278 | token_data = {
279 | "refresh_token": self.refresh_token,
280 | "access_token": self.access_token,
281 | "expires_at": self.expires_at,
282 | "cloud_id": self.cloud_id,
283 | }
284 |
285 | # Store the token data in the system keyring
286 | keyring.set_password(KEYRING_SERVICE_NAME, username, json.dumps(token_data))
287 |
288 | logger.debug(f"Saved OAuth tokens to keyring for {username}")
289 |
290 | # Also maintain backwards compatibility with file storage
291 | # for environments where keyring might not work
292 | self._save_tokens_to_file(token_data)
293 |
294 | except Exception as e:
295 | logger.error(f"Failed to save tokens to keyring: {e}")
296 | # Fall back to file storage if keyring fails
297 | self._save_tokens_to_file()
298 |
299 | def _save_tokens_to_file(self, token_data: dict = None) -> None:
300 | """Save the tokens to a file as fallback storage.
301 |
302 | Args:
303 | token_data: Optional dict with token data. If not provided,
304 | will use the current object attributes.
305 | """
306 | try:
307 | # Create the directory if it doesn't exist
308 | token_dir = Path.home() / ".mcp-atlassian"
309 | token_dir.mkdir(exist_ok=True)
310 |
311 | # Save the tokens to a file
312 | token_path = token_dir / f"oauth-{self.client_id}.json"
313 |
314 | if token_data is None:
315 | token_data = {
316 | "refresh_token": self.refresh_token,
317 | "access_token": self.access_token,
318 | "expires_at": self.expires_at,
319 | "cloud_id": self.cloud_id,
320 | }
321 |
322 | with open(token_path, "w") as f:
323 | json.dump(token_data, f)
324 |
325 | logger.debug(f"Saved OAuth tokens to file {token_path} (fallback storage)")
326 | except Exception as e:
327 | logger.error(f"Failed to save tokens to file: {e}")
328 |
329 | @staticmethod
330 | def load_tokens(client_id: str) -> dict[str, Any]:
331 | """Load tokens securely from keyring.
332 |
333 | Args:
334 | client_id: The OAuth client ID
335 |
336 | Returns:
337 | Dict with the token data or empty dict if no tokens found
338 | """
339 | username = f"oauth-{client_id}"
340 |
341 | # Try to load tokens from keyring first
342 | try:
343 | token_json = keyring.get_password(KEYRING_SERVICE_NAME, username)
344 | if token_json:
345 | logger.debug(f"Loaded OAuth tokens from keyring for {username}")
346 | return json.loads(token_json)
347 | except Exception as e:
348 | logger.warning(
349 | f"Failed to load tokens from keyring: {e}. Trying file fallback."
350 | )
351 |
352 | # Fall back to loading from file if keyring fails or returns None
353 | return OAuthConfig._load_tokens_from_file(client_id)
354 |
355 | @staticmethod
356 | def _load_tokens_from_file(client_id: str) -> dict[str, Any]:
357 | """Load tokens from a file as fallback.
358 |
359 | Args:
360 | client_id: The OAuth client ID
361 |
362 | Returns:
363 | Dict with the token data or empty dict if no tokens found
364 | """
365 | token_path = Path.home() / ".mcp-atlassian" / f"oauth-{client_id}.json"
366 |
367 | if not token_path.exists():
368 | return {}
369 |
370 | try:
371 | with open(token_path) as f:
372 | token_data = json.load(f)
373 | logger.debug(
374 | f"Loaded OAuth tokens from file {token_path} (fallback storage)"
375 | )
376 | return token_data
377 | except Exception as e:
378 | logger.error(f"Failed to load tokens from file: {e}")
379 | return {}
380 |
381 | @classmethod
382 | def from_env(cls) -> Optional["OAuthConfig"]:
383 | """Create an OAuth configuration from environment variables.
384 |
385 | Returns:
386 | OAuthConfig instance or None if OAuth is not enabled
387 | """
388 | # Check if OAuth is explicitly enabled (allows minimal config)
389 | oauth_enabled = os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in (
390 | "true",
391 | "1",
392 | "yes",
393 | )
394 |
395 | # Check for required environment variables
396 | client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID")
397 | client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET")
398 | redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI")
399 | scope = os.getenv("ATLASSIAN_OAUTH_SCOPE")
400 |
401 | # Full OAuth configuration (traditional mode)
402 | if all([client_id, client_secret, redirect_uri, scope]):
403 | # Create the OAuth configuration with full credentials
404 | config = cls(
405 | client_id=client_id,
406 | client_secret=client_secret,
407 | redirect_uri=redirect_uri,
408 | scope=scope,
409 | cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
410 | )
411 |
412 | # Try to load existing tokens
413 | token_data = cls.load_tokens(client_id)
414 | if token_data:
415 | config.refresh_token = token_data.get("refresh_token")
416 | config.access_token = token_data.get("access_token")
417 | config.expires_at = token_data.get("expires_at")
418 | if not config.cloud_id and "cloud_id" in token_data:
419 | config.cloud_id = token_data["cloud_id"]
420 |
421 | return config
422 |
423 | # Minimal OAuth configuration (user-provided tokens mode)
424 | elif oauth_enabled:
425 | # Create minimal config that works with user-provided tokens
426 | logger.info(
427 | "Creating minimal OAuth config for user-provided tokens (ATLASSIAN_OAUTH_ENABLE=true)"
428 | )
429 | return cls(
430 | client_id="", # Will be provided by user tokens
431 | client_secret="", # Not needed for user tokens
432 | redirect_uri="", # Not needed for user tokens
433 | scope="", # Will be determined by user token permissions
434 | cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"), # Optional fallback
435 | )
436 |
437 | # No OAuth configuration
438 | return None
439 |
440 |
441 | @dataclass
442 | class BYOAccessTokenOAuthConfig:
443 | """OAuth configuration when providing a pre-existing access token.
444 |
445 | This class is used when the user provides their own Atlassian Cloud ID
446 | and access token directly, bypassing the full OAuth 2.0 (3LO) flow.
447 | It's suitable for scenarios like service accounts or CI/CD pipelines
448 | where an access token is already available.
449 |
450 | This configuration does not support token refreshing.
451 | """
452 |
453 | cloud_id: str
454 | access_token: str
455 | refresh_token: None = None
456 | expires_at: None = None
457 |
458 | @classmethod
459 | def from_env(cls) -> Optional["BYOAccessTokenOAuthConfig"]:
460 | """Create a BYOAccessTokenOAuthConfig from environment variables.
461 |
462 | Reads `ATLASSIAN_OAUTH_CLOUD_ID` and `ATLASSIAN_OAUTH_ACCESS_TOKEN`.
463 |
464 | Returns:
465 | BYOAccessTokenOAuthConfig instance or None if required
466 | environment variables are missing.
467 | """
468 | cloud_id = os.getenv("ATLASSIAN_OAUTH_CLOUD_ID")
469 | access_token = os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN")
470 |
471 | if not all([cloud_id, access_token]):
472 | return None
473 |
474 | return cls(cloud_id=cloud_id, access_token=access_token)
475 |
476 |
477 | def get_oauth_config_from_env() -> OAuthConfig | BYOAccessTokenOAuthConfig | None:
478 | """Get the appropriate OAuth configuration from environment variables.
479 |
480 | This function attempts to load standard OAuth configuration first (OAuthConfig).
481 | If that's not available, it tries to load a "Bring Your Own Access Token"
482 | configuration (BYOAccessTokenOAuthConfig).
483 |
484 | Returns:
485 | An instance of OAuthConfig or BYOAccessTokenOAuthConfig if environment
486 | variables are set for either, otherwise None.
487 | """
488 | return BYOAccessTokenOAuthConfig.from_env() or OAuthConfig.from_env()
489 |
490 |
491 | def configure_oauth_session(
492 | session: requests.Session, oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig
493 | ) -> bool:
494 | """Configure a requests session with OAuth 2.0 authentication.
495 |
496 | This function ensures the access token is valid and adds it to the session headers.
497 |
498 | Args:
499 | session: The requests session to configure
500 | oauth_config: The OAuth configuration to use
501 |
502 | Returns:
503 | True if the session was successfully configured, False otherwise
504 | """
505 | logger.debug(
506 | f"configure_oauth_session: Received OAuthConfig with "
507 | f"access_token_present={bool(oauth_config.access_token)}, "
508 | f"refresh_token_present={bool(oauth_config.refresh_token)}, "
509 | f"cloud_id='{oauth_config.cloud_id}'"
510 | )
511 | # If user provided only an access token (no refresh_token), use it directly
512 | if oauth_config.access_token and not oauth_config.refresh_token:
513 | logger.info(
514 | "configure_oauth_session: Using provided OAuth access token directly (no refresh_token)."
515 | )
516 | session.headers["Authorization"] = f"Bearer {oauth_config.access_token}"
517 | return True
518 | logger.debug("configure_oauth_session: Proceeding to ensure_valid_token.")
519 | # Otherwise, ensure we have a valid token (refresh if needed)
520 | if isinstance(oauth_config, BYOAccessTokenOAuthConfig):
521 | logger.error(
522 | "configure_oauth_session: oauth access token configuration provided as empty string."
523 | )
524 | return False
525 | if not oauth_config.ensure_valid_token():
526 | logger.error(
527 | f"configure_oauth_session: ensure_valid_token returned False. "
528 | f"Token was expired: {oauth_config.is_token_expired}, "
529 | f"Refresh token present for attempt: {bool(oauth_config.refresh_token)}"
530 | )
531 | return False
532 | session.headers["Authorization"] = f"Bearer {oauth_config.access_token}"
533 | logger.info("Successfully configured OAuth session for Atlassian Cloud API")
534 | return True
535 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_fields.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira Fields mixin."""
2 |
3 | from typing import Any
4 | from unittest.mock import MagicMock
5 |
6 | import pytest
7 |
8 | from mcp_atlassian.jira import JiraFetcher
9 | from mcp_atlassian.jira.fields import FieldsMixin
10 |
11 |
12 | class TestFieldsMixin:
13 | """Tests for the FieldsMixin class."""
14 |
15 | @pytest.fixture
16 | def fields_mixin(self, jira_fetcher: JiraFetcher) -> FieldsMixin:
17 | """Create a FieldsMixin instance with mocked dependencies."""
18 | mixin = jira_fetcher
19 | mixin._field_ids_cache = None
20 | return mixin
21 |
22 | @pytest.fixture
23 | def mock_fields(self):
24 | """Return mock field data."""
25 | return [
26 | {"id": "summary", "name": "Summary", "schema": {"type": "string"}},
27 | {"id": "description", "name": "Description", "schema": {"type": "string"}},
28 | {"id": "status", "name": "Status", "schema": {"type": "status"}},
29 | {"id": "assignee", "name": "Assignee", "schema": {"type": "user"}},
30 | {
31 | "id": "customfield_10010",
32 | "name": "Epic Link",
33 | "schema": {
34 | "type": "string",
35 | "custom": "com.pyxis.greenhopper.jira:gh-epic-link",
36 | },
37 | },
38 | {
39 | "id": "customfield_10011",
40 | "name": "Epic Name",
41 | "schema": {
42 | "type": "string",
43 | "custom": "com.pyxis.greenhopper.jira:gh-epic-label",
44 | },
45 | },
46 | {
47 | "id": "customfield_10012",
48 | "name": "Story Points",
49 | "schema": {"type": "number"},
50 | },
51 | ]
52 |
53 | def test_get_field_ids_cache(self, fields_mixin: FieldsMixin, mock_fields):
54 | """Test get_fields uses cache when available."""
55 | # Set up the cache
56 | fields_mixin._field_ids_cache = mock_fields
57 |
58 | # Call the method
59 | result = fields_mixin.get_fields()
60 |
61 | # Verify cache was used
62 | assert result == mock_fields
63 | fields_mixin.jira.get_all_fields.assert_not_called()
64 |
65 | def test_get_fields_refresh(self, fields_mixin: FieldsMixin, mock_fields):
66 | """Test get_fields refreshes data when requested."""
67 | # Set up the cache
68 | fields_mixin._field_ids_cache = [{"id": "old_data", "name": "old data"}]
69 |
70 | # Mock the API response
71 | fields_mixin.jira.get_all_fields.return_value = mock_fields
72 |
73 | # Call the method with refresh=True
74 | result = fields_mixin.get_fields(refresh=True)
75 |
76 | # Verify API was called
77 | fields_mixin.jira.get_all_fields.assert_called_once()
78 | assert result == mock_fields
79 | # Verify cache was updated
80 | assert fields_mixin._field_ids_cache == mock_fields
81 |
82 | def test_get_fields_from_api(
83 | self, fields_mixin: FieldsMixin, mock_fields: list[dict[str, Any]]
84 | ):
85 | """Test get_fields fetches from API when no cache exists."""
86 | # Mock the API response
87 | fields_mixin.jira.get_all_fields.return_value = mock_fields
88 |
89 | # Call the method
90 | result = fields_mixin.get_fields()
91 |
92 | # Verify API was called
93 | fields_mixin.jira.get_all_fields.assert_called_once()
94 | assert result == mock_fields
95 | # Verify cache was created
96 | assert fields_mixin._field_ids_cache == mock_fields
97 |
98 | def test_get_fields_error(self, fields_mixin: FieldsMixin):
99 | """Test get_fields handles errors gracefully."""
100 |
101 | # Mock API error
102 | fields_mixin.jira.get_all_fields.side_effect = Exception("API error")
103 |
104 | # Call the method
105 | result = fields_mixin.get_fields()
106 |
107 | # Verify empty list is returned on error
108 | assert result == []
109 |
110 | def test_get_field_id_by_exact_match(self, fields_mixin: FieldsMixin, mock_fields):
111 | """Test get_field_id finds field by exact name match."""
112 | # Set up the fields
113 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
114 |
115 | # Call the method
116 | result = fields_mixin.get_field_id("Summary")
117 |
118 | # Verify the result
119 | assert result == "summary"
120 |
121 | def test_get_field_id_case_insensitive(
122 | self, fields_mixin: FieldsMixin, mock_fields
123 | ):
124 | """Test get_field_id is case-insensitive."""
125 | # Set up the fields
126 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
127 |
128 | # Call the method with different case
129 | result = fields_mixin.get_field_id("summary")
130 |
131 | # Verify the result
132 | assert result == "summary"
133 |
134 | def test_get_field_id_exact_match_case_insensitive(
135 | self, fields_mixin: FieldsMixin, mock_fields
136 | ):
137 | """Test get_field_id finds field by exact match (case-insensitive) using the map."""
138 | # Set up the fields
139 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
140 | # Ensure the map is generated based on the mock fields for this test
141 | fields_mixin._generate_field_map(force_regenerate=True)
142 |
143 | # Call the method with exact name (case-insensitive)
144 | result = fields_mixin.get_field_id("epic link")
145 |
146 | # Verify the result (should find Epic Link as first match)
147 | assert result == "customfield_10010"
148 |
149 | def test_get_field_id_not_found(self, fields_mixin: FieldsMixin, mock_fields):
150 | """Test get_field_id returns None when field not found."""
151 | # Set up the fields
152 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
153 |
154 | # Call the method with non-existent field
155 | result = fields_mixin.get_field_id("NonExistent")
156 |
157 | # Verify the result
158 | assert result is None
159 |
160 | def test_get_field_id_error(self, fields_mixin: FieldsMixin):
161 | """Test get_field_id handles errors gracefully."""
162 | # Make get_fields raise an exception
163 | fields_mixin.get_fields = MagicMock(
164 | side_effect=Exception("Error getting fields")
165 | )
166 |
167 | # Call the method
168 | result = fields_mixin.get_field_id("Summary")
169 |
170 | # Verify None is returned on error
171 | assert result is None
172 |
173 | def test_get_field_by_id(self, fields_mixin: FieldsMixin, mock_fields):
174 | """Test get_field_by_id retrieves field definition correctly."""
175 | # Set up the fields
176 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
177 |
178 | # Call the method
179 | result = fields_mixin.get_field_by_id("customfield_10012")
180 |
181 | # Verify the result
182 | assert result == mock_fields[6] # The Story Points field
183 | assert result["name"] == "Story Points"
184 |
185 | def test_get_field_by_id_not_found(self, fields_mixin: FieldsMixin, mock_fields):
186 | """Test get_field_by_id returns None when field not found."""
187 | # Set up the fields
188 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
189 |
190 | # Call the method with non-existent ID
191 | result = fields_mixin.get_field_by_id("customfield_99999")
192 |
193 | # Verify the result
194 | assert result is None
195 |
196 | def test_get_custom_fields(self, fields_mixin: FieldsMixin, mock_fields):
197 | """Test get_custom_fields returns only custom fields."""
198 | # Set up the fields
199 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
200 |
201 | # Call the method
202 | result = fields_mixin.get_custom_fields()
203 |
204 | # Verify the result
205 | assert len(result) == 3
206 | assert all(field["id"].startswith("customfield_") for field in result)
207 | assert result[0]["name"] == "Epic Link"
208 | assert result[1]["name"] == "Epic Name"
209 | assert result[2]["name"] == "Story Points"
210 |
211 | def test_get_required_fields(self, fields_mixin: FieldsMixin):
212 | """Test get_required_fields retrieves required fields correctly."""
213 | # Mock the response for get_project_issue_types
214 | mock_issue_types = [
215 | {"id": "10001", "name": "Bug"},
216 | {"id": "10002", "name": "Task"},
217 | ]
218 | fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
219 |
220 | # Mock the response for issue_createmeta_fieldtypes based on API docs
221 | mock_field_meta = {
222 | "fields": [
223 | {
224 | "required": True,
225 | "schema": {"type": "string", "system": "summary"},
226 | "name": "Summary",
227 | "fieldId": "summary",
228 | "autoCompleteUrl": "",
229 | "hasDefaultValue": False,
230 | "operations": ["set"],
231 | "allowedValues": [],
232 | },
233 | {
234 | "required": False,
235 | "schema": {"type": "string", "system": "description"},
236 | "name": "Description",
237 | "fieldId": "description",
238 | },
239 | {
240 | "required": True,
241 | "schema": {"type": "string", "custom": "some.custom.type"},
242 | "name": "Epic Link",
243 | "fieldId": "customfield_10010",
244 | },
245 | ]
246 | }
247 | fields_mixin.jira.issue_createmeta_fieldtypes.return_value = mock_field_meta
248 |
249 | # Call the method
250 | result = fields_mixin.get_required_fields("Bug", "TEST")
251 |
252 | # Verify the result
253 | assert len(result) == 2
254 | assert "summary" in result
255 | assert result["summary"]["required"] is True
256 | assert "customfield_10010" in result
257 | assert result["customfield_10010"]["required"] is True
258 | assert "description" not in result
259 | # Verify the correct API was called
260 | fields_mixin.get_project_issue_types.assert_called_once_with("TEST")
261 | fields_mixin.jira.issue_createmeta_fieldtypes.assert_called_once_with(
262 | project="TEST", issue_type_id="10001"
263 | )
264 |
265 | def test_get_required_fields_not_found(self, fields_mixin: FieldsMixin):
266 | """Test get_required_fields handles project/issue type not found."""
267 | # Scenario 1: Issue type not found in project
268 | mock_issue_types = [{"id": "10002", "name": "Task"}] # "Bug" is missing
269 | fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
270 | fields_mixin.jira.issue_createmeta_fieldtypes = MagicMock()
271 |
272 | # Call the method
273 | result = fields_mixin.get_required_fields("Bug", "TEST")
274 | # Verify issue type lookup was attempted, but field meta was not called
275 | fields_mixin.get_project_issue_types.assert_called_once_with("TEST")
276 | fields_mixin.jira.issue_createmeta_fieldtypes.assert_not_called()
277 |
278 | # Verify the result
279 | assert result == {}
280 |
281 | def test_get_required_fields_error(self, fields_mixin: FieldsMixin):
282 | """Test get_required_fields handles errors gracefully."""
283 | # Mock the response for get_project_issue_types
284 | mock_issue_types = [
285 | {"id": "10001", "name": "Bug"},
286 | ]
287 | fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
288 | # Mock issue_createmeta_fieldtypes to raise an error
289 | fields_mixin.jira.issue_createmeta_fieldtypes.side_effect = Exception(
290 | "API error"
291 | )
292 |
293 | # Call the method
294 | result = fields_mixin.get_required_fields("Bug", "TEST")
295 |
296 | # Verify the result
297 | assert result == {}
298 | # Verify the correct API was called (which then raised the error)
299 | fields_mixin.jira.issue_createmeta_fieldtypes.assert_called_once_with(
300 | project="TEST", issue_type_id="10001"
301 | )
302 |
303 | def test_get_jira_field_ids_cached(self, fields_mixin: FieldsMixin):
304 | """Test get_field_ids_to_epic returns cached field IDs."""
305 | # Set up the cache
306 | fields_mixin._field_ids_cache = [
307 | {"id": "summary", "name": "Summary"},
308 | {"id": "description", "name": "Description"},
309 | ]
310 |
311 | # Call the method
312 | result = fields_mixin.get_field_ids_to_epic()
313 |
314 | # Verify the result
315 | assert result == {
316 | "Summary": "summary",
317 | "Description": "description",
318 | }
319 |
320 | def test_get_jira_field_ids_from_fields(
321 | self, fields_mixin: FieldsMixin, mock_fields: list[dict]
322 | ):
323 | """Test get_field_ids_to_epic extracts field IDs from field definitions."""
324 | # Set up the fields
325 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
326 | # Ensure field map is generated
327 | fields_mixin._generate_field_map(force_regenerate=True)
328 |
329 | # Call the method
330 | result = fields_mixin.get_field_ids_to_epic()
331 |
332 | # Verify that epic-specific fields are properly identified
333 | assert "epic_link" in result
334 | assert "Epic Link" in result
335 | assert result["epic_link"] == "customfield_10010"
336 | assert "epic_name" in result
337 | assert "Epic Name" in result
338 | assert result["epic_name"] == "customfield_10011"
339 |
340 | def test_get_jira_field_ids_error(self, fields_mixin: FieldsMixin):
341 | """Test get_field_ids_to_epic handles errors gracefully."""
342 | # Ensure no cache exists
343 | fields_mixin._field_ids_cache = None
344 |
345 | # Make get_fields raise an exception
346 | fields_mixin.get_fields = MagicMock(
347 | side_effect=Exception("Error getting fields")
348 | )
349 |
350 | # Call the method
351 | result = fields_mixin.get_field_ids_to_epic()
352 |
353 | # Verify the result
354 | assert result == {}
355 |
356 | def test_is_custom_field(self, fields_mixin: FieldsMixin):
357 | """Test is_custom_field correctly identifies custom fields."""
358 | # Test with custom field
359 | assert fields_mixin.is_custom_field("customfield_10010") is True
360 |
361 | # Test with standard field
362 | assert fields_mixin.is_custom_field("summary") is False
363 |
364 | def test_format_field_value_user_field(
365 | self, fields_mixin: FieldsMixin, mock_fields
366 | ):
367 | """Test format_field_value formats user fields correctly."""
368 | # Set up the mocks
369 | fields_mixin.get_field_by_id = MagicMock(
370 | return_value=mock_fields[3]
371 | ) # The Assignee field
372 | fields_mixin._get_account_id = MagicMock(return_value="account123")
373 |
374 | # Call the method with a user field and string value
375 | result = fields_mixin.format_field_value("assignee", "johndoe")
376 |
377 | # Verify the result
378 | assert result == {"accountId": "account123"}
379 | fields_mixin._get_account_id.assert_called_once_with("johndoe")
380 |
381 | # FIXME: The test covers impossible case.
382 | #
383 | # This test is failing because it assumes that the `_get_account_id`
384 | # method is unavailable. As default, `format_field_value` will return
385 | # `{"name": value}` for server/DC.
386 | #
387 | # However, in any case `JiraFetcher` always inherits from `UsersMixin`
388 | # and will therefore have the `_get_account_id` method available.
389 | #
390 | # That is to say, the `format_field_value` method will never return in
391 | # format `{"name": value}`.
392 | #
393 | # Further fixes are needed in the `FieldsMixin` class to support the case
394 | # for server/DC.
395 | #
396 | # See also:
397 | # https://github.com/sooperset/mcp-atlassian/blob/651c271e8aa76b469e9c67535669d93267ad5da6/src/mcp_atlassian/jira/fields.py#L279-L297
398 |
399 | # def test_format_field_value_user_field_no_account_id(
400 | # self, fields_mixin: FieldsMixin, mock_fields
401 | # ):
402 | # """Test format_field_value handles user fields without _get_account_id."""
403 | # # Set up the mocks
404 | # fields_mixin.get_field_by_id = MagicMock(
405 | # return_value=mock_fields[3]
406 | # ) # The Assignee field
407 |
408 | # # Call the method with a user field and string value
409 | # result = fields_mixin.format_field_value("assignee", "johndoe")
410 |
411 | # # Verify the result - should use name for server/DC
412 | # assert result == {"name": "johndoe"}
413 |
414 | def test_format_field_value_array_field(self, fields_mixin: FieldsMixin):
415 | """Test format_field_value formats array fields correctly."""
416 | # Set up the mocks
417 | mock_array_field = {
418 | "id": "labels",
419 | "name": "Labels",
420 | "schema": {"type": "array"},
421 | }
422 | fields_mixin.get_field_by_id = MagicMock(return_value=mock_array_field)
423 |
424 | # Test with single value (should convert to list)
425 | result = fields_mixin.format_field_value("labels", "bug")
426 | assert result == ["bug"]
427 |
428 | # Test with list value (should keep as list)
429 | result = fields_mixin.format_field_value("labels", ["bug", "feature"])
430 | assert result == ["bug", "feature"]
431 |
432 | def test_format_field_value_option_field(self, fields_mixin: FieldsMixin):
433 | """Test format_field_value formats option fields correctly."""
434 | # Set up the mocks
435 | mock_option_field = {
436 | "id": "priority",
437 | "name": "Priority",
438 | "schema": {"type": "option"},
439 | }
440 | fields_mixin.get_field_by_id = MagicMock(return_value=mock_option_field)
441 |
442 | # Test with string value
443 | result = fields_mixin.format_field_value("priority", "High")
444 | assert result == {"value": "High"}
445 |
446 | # Test with already formatted value
447 | already_formatted = {"value": "Medium"}
448 | result = fields_mixin.format_field_value("priority", already_formatted)
449 | assert result == already_formatted
450 |
451 | def test_format_field_value_unknown_field(self, fields_mixin: FieldsMixin):
452 | """Test format_field_value returns value as-is for unknown fields."""
453 | # Set up the mocks
454 | fields_mixin.get_field_by_id = MagicMock(return_value=None)
455 |
456 | # Call the method with unknown field
457 | test_value = "test value"
458 | result = fields_mixin.format_field_value("unknown", test_value)
459 |
460 | # Verify the value is returned as-is
461 | assert result == test_value
462 |
463 | def test_search_fields_empty_keyword(self, fields_mixin: FieldsMixin, mock_fields):
464 | """Test search_fields returns first N fields when keyword is empty."""
465 | # Set up the fields
466 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
467 |
468 | # Call with empty keyword and limit=3
469 | result = fields_mixin.search_fields("", limit=3)
470 |
471 | # Verify first 3 fields are returned
472 | assert len(result) == 3
473 | assert result == mock_fields[:3]
474 |
475 | def test_search_fields_exact_match(self, fields_mixin: FieldsMixin, mock_fields):
476 | """Test search_fields finds exact matches with high relevance."""
477 | # Set up the fields
478 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
479 |
480 | # Search for "Story Points"
481 | result = fields_mixin.search_fields("Story Points")
482 |
483 | # Verify Story Points field is first result
484 | assert len(result) > 0
485 | assert result[0]["name"] == "Story Points"
486 | assert result[0]["id"] == "customfield_10012"
487 |
488 | def test_search_fields_partial_match(self, fields_mixin: FieldsMixin, mock_fields):
489 | """Test search_fields finds partial matches."""
490 | # Set up the fields
491 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
492 |
493 | # Search for "Epic"
494 | result = fields_mixin.search_fields("Epic")
495 |
496 | # Verify Epic-related fields are in results
497 | epic_fields = [field["name"] for field in result[:2]] # Top 2 results
498 | assert "Epic Link" in epic_fields
499 | assert "Epic Name" in epic_fields
500 |
501 | def test_search_fields_case_insensitive(
502 | self, fields_mixin: FieldsMixin, mock_fields
503 | ):
504 | """Test search_fields is case insensitive."""
505 | # Set up the fields
506 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
507 |
508 | # Search with different cases
509 | result_lower = fields_mixin.search_fields("story points")
510 | result_upper = fields_mixin.search_fields("STORY POINTS")
511 | result_mixed = fields_mixin.search_fields("Story Points")
512 |
513 | # Verify all searches find the same field
514 | assert len(result_lower) > 0
515 | assert len(result_upper) > 0
516 | assert len(result_mixed) > 0
517 | assert result_lower[0]["id"] == result_upper[0]["id"] == result_mixed[0]["id"]
518 | assert result_lower[0]["name"] == "Story Points"
519 |
520 | def test_search_fields_with_limit(self, fields_mixin: FieldsMixin, mock_fields):
521 | """Test search_fields respects the limit parameter."""
522 | # Set up the fields
523 | fields_mixin.get_fields = MagicMock(return_value=mock_fields)
524 |
525 | # Search with limit=2
526 | result = fields_mixin.search_fields("field", limit=2)
527 |
528 | # Verify only 2 results are returned
529 | assert len(result) == 2
530 |
531 | def test_search_fields_error(self, fields_mixin: FieldsMixin):
532 | """Test search_fields handles errors gracefully."""
533 | # Make get_fields raise an exception
534 | fields_mixin.get_fields = MagicMock(
535 | side_effect=Exception("Error getting fields")
536 | )
537 |
538 | # Call the method
539 | result = fields_mixin.search_fields("test")
540 |
541 | # Verify empty list is returned on error
542 | assert result == []
543 |
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_users.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for the Confluence users module."""
2 |
3 | from unittest.mock import MagicMock, patch
4 |
5 | import pytest
6 | from requests.exceptions import HTTPError
7 |
8 | from mcp_atlassian.confluence.users import UsersMixin
9 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
10 |
11 |
12 | class TestUsersMixin:
13 | """Tests for the UsersMixin class."""
14 |
15 | @pytest.fixture
16 | def users_mixin(self, confluence_client):
17 | """Create a UsersMixin instance for testing."""
18 | # UsersMixin inherits from ConfluenceClient, so we need to create it properly
19 | with patch(
20 | "mcp_atlassian.confluence.users.ConfluenceClient.__init__"
21 | ) as mock_init:
22 | mock_init.return_value = None
23 | mixin = UsersMixin()
24 | # Copy the necessary attributes from our mocked client
25 | mixin.confluence = confluence_client.confluence
26 | mixin.config = confluence_client.config
27 | return mixin
28 |
29 | # Mock user data for different scenarios
30 | @pytest.fixture
31 | def mock_user_data_cloud(self):
32 | """Mock user data for Confluence Cloud."""
33 | return {
34 | "accountId": "5b10ac8d82e05b22cc7d4ef5",
35 | "accountType": "atlassian",
36 | "email": "[email protected]",
37 | "publicName": "Test User",
38 | "displayName": "Test User",
39 | "profilePicture": {
40 | "path": "/wiki/aa-avatar/5b10ac8d82e05b22cc7d4ef5",
41 | "width": 48,
42 | "height": 48,
43 | "isDefault": False,
44 | },
45 | "isExternalCollaborator": False,
46 | "accountStatus": "active",
47 | }
48 |
49 | @pytest.fixture
50 | def mock_user_data_server(self):
51 | """Mock user data for Confluence Server/DC."""
52 | return {
53 | "username": "testuser",
54 | "userKey": "testuser-key-12345",
55 | "displayName": "Test User",
56 | "fullName": "Test User Full Name",
57 | "email": "[email protected]",
58 | "status": "active",
59 | }
60 |
61 | @pytest.fixture
62 | def mock_user_data_with_status(self):
63 | """Mock user data with status expansion."""
64 | return {
65 | "accountId": "5b10ac8d82e05b22cc7d4ef5",
66 | "accountType": "atlassian",
67 | "email": "[email protected]",
68 | "publicName": "Test User",
69 | "displayName": "Test User",
70 | "accountStatus": "active",
71 | "status": "Active", # Expanded status field
72 | }
73 |
74 | @pytest.fixture
75 | def mock_current_user_data(self):
76 | """Mock current user data for get_current_user_info."""
77 | return {
78 | "accountId": "5b10ac8d82e05b22cc7d4ef5",
79 | "type": "known",
80 | "accountType": "atlassian",
81 | "email": "[email protected]",
82 | "publicName": "Current User",
83 | "displayName": "Current User",
84 | "profilePicture": {
85 | "path": "/wiki/aa-avatar/5b10ac8d82e05b22cc7d4ef5",
86 | "width": 48,
87 | "height": 48,
88 | "isDefault": False,
89 | },
90 | "isExternalCollaborator": False,
91 | "isGuest": False,
92 | "locale": "en_US",
93 | "accountStatus": "active",
94 | }
95 |
96 | def test_get_user_details_by_accountid_success(
97 | self, users_mixin, mock_user_data_cloud
98 | ):
99 | """Test successfully getting user details by account ID."""
100 | # Arrange
101 | account_id = "5b10ac8d82e05b22cc7d4ef5"
102 | users_mixin.confluence.get_user_details_by_accountid.return_value = (
103 | mock_user_data_cloud
104 | )
105 |
106 | # Act
107 | result = users_mixin.get_user_details_by_accountid(account_id)
108 |
109 | # Assert
110 | users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
111 | account_id, None
112 | )
113 | assert result == mock_user_data_cloud
114 | assert result["accountId"] == account_id
115 | assert result["displayName"] == "Test User"
116 |
117 | def test_get_user_details_by_accountid_with_expand(
118 | self, users_mixin, mock_user_data_with_status
119 | ):
120 | """Test getting user details by account ID with status expansion."""
121 | # Arrange
122 | account_id = "5b10ac8d82e05b22cc7d4ef5"
123 | expand = "status"
124 | users_mixin.confluence.get_user_details_by_accountid.return_value = (
125 | mock_user_data_with_status
126 | )
127 |
128 | # Act
129 | result = users_mixin.get_user_details_by_accountid(account_id, expand=expand)
130 |
131 | # Assert
132 | users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
133 | account_id, expand
134 | )
135 | assert result == mock_user_data_with_status
136 | assert result["status"] == "Active"
137 | assert result["accountStatus"] == "active"
138 |
139 | def test_get_user_details_by_accountid_invalid_account_id(self, users_mixin):
140 | """Test getting user details with invalid account ID."""
141 | # Arrange
142 | invalid_account_id = "invalid-account-id"
143 | users_mixin.confluence.get_user_details_by_accountid.side_effect = Exception(
144 | "User not found"
145 | )
146 |
147 | # Act/Assert
148 | with pytest.raises(Exception, match="User not found"):
149 | users_mixin.get_user_details_by_accountid(invalid_account_id)
150 |
151 | def test_get_user_details_by_username_success(
152 | self, users_mixin, mock_user_data_server
153 | ):
154 | """Test successfully getting user details by username."""
155 | # Arrange
156 | username = "testuser"
157 | users_mixin.confluence.get_user_details_by_username.return_value = (
158 | mock_user_data_server
159 | )
160 |
161 | # Act
162 | result = users_mixin.get_user_details_by_username(username)
163 |
164 | # Assert
165 | users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
166 | username, None
167 | )
168 | assert result == mock_user_data_server
169 | assert result["username"] == username
170 | assert result["displayName"] == "Test User"
171 |
172 | def test_get_user_details_by_username_with_expand(
173 | self, users_mixin, mock_user_data_server
174 | ):
175 | """Test getting user details by username with status expansion."""
176 | # Arrange
177 | username = "testuser"
178 | expand = "status"
179 | mock_data_with_status = mock_user_data_server.copy()
180 | mock_data_with_status["status"] = "Active"
181 | users_mixin.confluence.get_user_details_by_username.return_value = (
182 | mock_data_with_status
183 | )
184 |
185 | # Act
186 | result = users_mixin.get_user_details_by_username(username, expand=expand)
187 |
188 | # Assert
189 | users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
190 | username, expand
191 | )
192 | assert result == mock_data_with_status
193 | assert result["status"] == "Active"
194 |
195 | def test_get_user_details_by_username_invalid_username(self, users_mixin):
196 | """Test getting user details with invalid username."""
197 | # Arrange
198 | invalid_username = "nonexistent-user"
199 | users_mixin.confluence.get_user_details_by_username.side_effect = Exception(
200 | "User not found"
201 | )
202 |
203 | # Act/Assert
204 | with pytest.raises(Exception, match="User not found"):
205 | users_mixin.get_user_details_by_username(invalid_username)
206 |
207 | def test_get_user_details_by_username_server_dc_pattern(
208 | self, users_mixin, mock_user_data_server
209 | ):
210 | """Test that username lookup follows Server/DC patterns."""
211 | # Arrange
212 | username = "[email protected]" # Email-like username common in DC
213 | users_mixin.confluence.get_user_details_by_username.return_value = (
214 | mock_user_data_server
215 | )
216 |
217 | # Act
218 | result = users_mixin.get_user_details_by_username(username)
219 |
220 | # Assert
221 | users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
222 | username, None
223 | )
224 | assert result == mock_user_data_server
225 |
226 | def test_get_current_user_info_success(self, users_mixin, mock_current_user_data):
227 | """Test successfully getting current user info."""
228 | # Arrange
229 | users_mixin.confluence.get.return_value = mock_current_user_data
230 |
231 | # Act
232 | result = users_mixin.get_current_user_info()
233 |
234 | # Assert
235 | users_mixin.confluence.get.assert_called_once_with("rest/api/user/current")
236 | assert result == mock_current_user_data
237 | assert result["accountId"] == "5b10ac8d82e05b22cc7d4ef5"
238 | assert result["displayName"] == "Current User"
239 |
240 | def test_get_current_user_info_returns_non_dict(self, users_mixin):
241 | """Test get_current_user_info when API returns non-dict data."""
242 | # Arrange
243 | users_mixin.confluence.get.return_value = "Invalid response"
244 |
245 | # Act/Assert
246 | with pytest.raises(
247 | MCPAtlassianAuthenticationError,
248 | match="Confluence token validation failed: Did not receive valid JSON user data",
249 | ):
250 | users_mixin.get_current_user_info()
251 |
252 | users_mixin.confluence.get.assert_called_once_with("rest/api/user/current")
253 |
254 | def test_get_current_user_info_returns_none(self, users_mixin):
255 | """Test get_current_user_info when API returns None."""
256 | # Arrange
257 | users_mixin.confluence.get.return_value = None
258 |
259 | # Act/Assert
260 | with pytest.raises(
261 | MCPAtlassianAuthenticationError,
262 | match="Confluence token validation failed: Did not receive valid JSON user data",
263 | ):
264 | users_mixin.get_current_user_info()
265 |
266 | def test_get_current_user_info_http_error_401(self, users_mixin):
267 | """Test get_current_user_info with 401 authentication error."""
268 | # Arrange
269 | mock_response = MagicMock()
270 | mock_response.status_code = 401
271 | http_error = HTTPError(response=mock_response)
272 | users_mixin.confluence.get.side_effect = http_error
273 |
274 | # Act/Assert
275 | with pytest.raises(
276 | MCPAtlassianAuthenticationError,
277 | match="Confluence token validation failed: 401 from /rest/api/user/current",
278 | ):
279 | users_mixin.get_current_user_info()
280 |
281 | def test_get_current_user_info_http_error_403(self, users_mixin):
282 | """Test get_current_user_info with 403 forbidden error."""
283 | # Arrange
284 | mock_response = MagicMock()
285 | mock_response.status_code = 403
286 | http_error = HTTPError(response=mock_response)
287 | users_mixin.confluence.get.side_effect = http_error
288 |
289 | # Act/Assert
290 | with pytest.raises(
291 | MCPAtlassianAuthenticationError,
292 | match="Confluence token validation failed: 403 from /rest/api/user/current",
293 | ):
294 | users_mixin.get_current_user_info()
295 |
296 | def test_get_current_user_info_http_error_other(self, users_mixin):
297 | """Test get_current_user_info with other HTTP error codes."""
298 | # Arrange
299 | mock_response = MagicMock()
300 | mock_response.status_code = 500
301 | http_error = HTTPError(response=mock_response)
302 | users_mixin.confluence.get.side_effect = http_error
303 |
304 | # Act/Assert
305 | with pytest.raises(
306 | MCPAtlassianAuthenticationError,
307 | match="Confluence token validation failed with HTTPError",
308 | ):
309 | users_mixin.get_current_user_info()
310 |
311 | def test_get_current_user_info_http_error_no_response(self, users_mixin):
312 | """Test get_current_user_info with HTTPError but no response object."""
313 | # Arrange
314 | http_error = HTTPError()
315 | http_error.response = None
316 | users_mixin.confluence.get.side_effect = http_error
317 |
318 | # Act/Assert
319 | with pytest.raises(
320 | MCPAtlassianAuthenticationError,
321 | match="Confluence token validation failed with HTTPError",
322 | ):
323 | users_mixin.get_current_user_info()
324 |
325 | def test_get_current_user_info_generic_exception(self, users_mixin):
326 | """Test get_current_user_info with generic exception."""
327 | # Arrange
328 | users_mixin.confluence.get.side_effect = ConnectionError("Network error")
329 |
330 | # Act/Assert
331 | with pytest.raises(
332 | MCPAtlassianAuthenticationError,
333 | match="Confluence token validation failed: Network error",
334 | ):
335 | users_mixin.get_current_user_info()
336 |
337 | @pytest.mark.parametrize(
338 | "expand_param",
339 | [
340 | None,
341 | "status",
342 | "", # Empty string
343 | ],
344 | )
345 | def test_get_user_details_by_accountid_expand_parameter_handling(
346 | self, users_mixin, mock_user_data_cloud, expand_param
347 | ):
348 | """Test that expand parameter is properly handled for account ID lookup."""
349 | # Arrange
350 | account_id = "5b10ac8d82e05b22cc7d4ef5"
351 | expected_data = mock_user_data_cloud.copy()
352 | if expand_param == "status":
353 | expected_data["status"] = "Active"
354 |
355 | users_mixin.confluence.get_user_details_by_accountid.return_value = (
356 | expected_data
357 | )
358 |
359 | # Act
360 | result = users_mixin.get_user_details_by_accountid(account_id, expand_param)
361 |
362 | # Assert
363 | users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
364 | account_id, expand_param
365 | )
366 | assert result == expected_data
367 |
368 | @pytest.mark.parametrize(
369 | "expand_param",
370 | [
371 | None,
372 | "status",
373 | "", # Empty string
374 | ],
375 | )
376 | def test_get_user_details_by_username_expand_parameter_handling(
377 | self, users_mixin, mock_user_data_server, expand_param
378 | ):
379 | """Test that expand parameter is properly handled for username lookup."""
380 | # Arrange
381 | username = "testuser"
382 | expected_data = mock_user_data_server.copy()
383 | if expand_param == "status":
384 | expected_data["status"] = "Active"
385 |
386 | users_mixin.confluence.get_user_details_by_username.return_value = expected_data
387 |
388 | # Act
389 | result = users_mixin.get_user_details_by_username(username, expand_param)
390 |
391 | # Assert
392 | users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
393 | username, expand_param
394 | )
395 | assert result == expected_data
396 |
397 | def test_users_mixin_inheritance(self, users_mixin):
398 | """Test that UsersMixin properly inherits from ConfluenceClient."""
399 | # Verify that UsersMixin is indeed a ConfluenceClient
400 | from mcp_atlassian.confluence.client import ConfluenceClient
401 |
402 | assert isinstance(users_mixin, ConfluenceClient)
403 |
404 | # Verify it has the expected attributes from ConfluenceClient
405 | assert hasattr(users_mixin, "confluence")
406 | assert hasattr(users_mixin, "config")
407 |
408 | def test_users_mixin_has_required_methods(self):
409 | """Test that UsersMixin has all required methods."""
410 | # Verify the mixin has the expected methods
411 | assert hasattr(UsersMixin, "get_user_details_by_accountid")
412 | assert hasattr(UsersMixin, "get_user_details_by_username")
413 | assert hasattr(UsersMixin, "get_current_user_info")
414 |
415 | # Verify method signatures
416 | import inspect
417 |
418 | # Check get_user_details_by_accountid signature
419 | sig = inspect.signature(UsersMixin.get_user_details_by_accountid)
420 | params = list(sig.parameters.keys())
421 | assert "self" in params
422 | assert "account_id" in params
423 | assert "expand" in params
424 | assert sig.parameters["expand"].default is None
425 |
426 | # Check get_user_details_by_username signature
427 | sig = inspect.signature(UsersMixin.get_user_details_by_username)
428 | params = list(sig.parameters.keys())
429 | assert "self" in params
430 | assert "username" in params
431 | assert "expand" in params
432 | assert sig.parameters["expand"].default is None
433 |
434 | # Check get_current_user_info signature
435 | sig = inspect.signature(UsersMixin.get_current_user_info)
436 | params = list(sig.parameters.keys())
437 | assert "self" in params
438 | assert len(params) == 1 # Only self parameter
439 |
440 | def test_user_permission_scenarios(self, users_mixin):
441 | """Test various permission error scenarios."""
442 | # Test 401 Unauthorized
443 | mock_response_401 = MagicMock()
444 | mock_response_401.status_code = 401
445 | http_error_401 = HTTPError(response=mock_response_401)
446 | users_mixin.confluence.get_user_details_by_accountid.side_effect = (
447 | http_error_401
448 | )
449 |
450 | with pytest.raises(Exception): # Should propagate the original exception
451 | users_mixin.get_user_details_by_accountid("test-account-id")
452 |
453 | # Test 403 Forbidden
454 | mock_response_403 = MagicMock()
455 | mock_response_403.status_code = 403
456 | http_error_403 = HTTPError(response=mock_response_403)
457 | users_mixin.confluence.get_user_details_by_username.side_effect = http_error_403
458 |
459 | with pytest.raises(Exception): # Should propagate the original exception
460 | users_mixin.get_user_details_by_username("testuser")
461 |
462 | def test_cloud_vs_server_authentication_patterns(self, users_mixin):
463 | """Test that different authentication patterns work for Cloud vs Server/DC."""
464 | # Mock Cloud response (account ID based)
465 | cloud_user_data = {
466 | "accountId": "5b10ac8d82e05b22cc7d4ef5",
467 | "accountType": "atlassian",
468 | "displayName": "Cloud User",
469 | "accountStatus": "active",
470 | }
471 |
472 | # Mock Server/DC response (username based)
473 | server_user_data = {
474 | "username": "serveruser",
475 | "userKey": "serveruser-key-12345",
476 | "displayName": "Server User",
477 | "status": "active",
478 | }
479 |
480 | # Test Cloud pattern
481 | users_mixin.confluence.get_user_details_by_accountid.return_value = (
482 | cloud_user_data
483 | )
484 | cloud_result = users_mixin.get_user_details_by_accountid(
485 | "5b10ac8d82e05b22cc7d4ef5"
486 | )
487 | assert cloud_result["accountId"] == "5b10ac8d82e05b22cc7d4ef5"
488 | assert "accountType" in cloud_result
489 |
490 | # Test Server/DC pattern
491 | users_mixin.confluence.get_user_details_by_username.return_value = (
492 | server_user_data
493 | )
494 | server_result = users_mixin.get_user_details_by_username("serveruser")
495 | assert server_result["username"] == "serveruser"
496 | assert "userKey" in server_result
497 |
498 | def test_response_data_validation_and_transformation(
499 | self, users_mixin, mock_user_data_cloud
500 | ):
501 | """Test that response data is properly validated and returned as-is."""
502 | # Arrange
503 | account_id = "5b10ac8d82e05b22cc7d4ef5"
504 | users_mixin.confluence.get_user_details_by_accountid.return_value = (
505 | mock_user_data_cloud
506 | )
507 |
508 | # Act
509 | result = users_mixin.get_user_details_by_accountid(account_id)
510 |
511 | # Assert - should return the data exactly as received from the API
512 | assert result is mock_user_data_cloud # Same object reference
513 | assert isinstance(result, dict)
514 | assert all(
515 | key in result
516 | for key in ["accountId", "displayName", "email", "accountStatus"]
517 | )
518 |
519 | def test_deactivated_user_status_handling(self, users_mixin):
520 | """Test handling of deactivated users with status expansion."""
521 | # Arrange
522 | deactivated_user_data = {
523 | "accountId": "5b10ac8d82e05b22cc7d4ef5",
524 | "displayName": "Deactivated User",
525 | "accountStatus": "inactive",
526 | "status": "Deactivated", # Expanded status
527 | }
528 | users_mixin.confluence.get_user_details_by_accountid.return_value = (
529 | deactivated_user_data
530 | )
531 |
532 | # Act
533 | result = users_mixin.get_user_details_by_accountid(
534 | "5b10ac8d82e05b22cc7d4ef5", expand="status"
535 | )
536 |
537 | # Assert
538 | assert result["accountStatus"] == "inactive"
539 | assert result["status"] == "Deactivated"
540 | users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
541 | "5b10ac8d82e05b22cc7d4ef5", "status"
542 | )
543 |
544 | def test_method_delegation_to_confluence_client(
545 | self, users_mixin, mock_current_user_data
546 | ):
547 | """Test that methods properly delegate to the underlying confluence client."""
548 | # Test that the methods are thin wrappers around confluence client methods
549 | account_id = "test-account-id"
550 | username = "testuser"
551 | expand = "status"
552 |
553 | # Test account ID method delegation
554 | users_mixin.get_user_details_by_accountid(account_id, expand)
555 | users_mixin.confluence.get_user_details_by_accountid.assert_called_with(
556 | account_id, expand
557 | )
558 |
559 | # Test username method delegation
560 | users_mixin.get_user_details_by_username(username, expand)
561 | users_mixin.confluence.get_user_details_by_username.assert_called_with(
562 | username, expand
563 | )
564 |
565 | # Test current user method delegation - need to mock the return value
566 | users_mixin.confluence.get.return_value = mock_current_user_data
567 | users_mixin.get_current_user_info()
568 | users_mixin.confluence.get.assert_called_with("rest/api/user/current")
569 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/pages.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Confluence page operations."""
2 |
3 | import logging
4 |
5 | import requests
6 | from requests.exceptions import HTTPError
7 |
8 | from ..exceptions import MCPAtlassianAuthenticationError
9 | from ..models.confluence import ConfluencePage
10 | from .client import ConfluenceClient
11 | from .v2_adapter import ConfluenceV2Adapter
12 |
13 | logger = logging.getLogger("mcp-atlassian")
14 |
15 |
16 | class PagesMixin(ConfluenceClient):
17 | """Mixin for Confluence page operations."""
18 |
19 | @property
20 | def _v2_adapter(self) -> ConfluenceV2Adapter | None:
21 | """Get v2 API adapter for OAuth authentication.
22 |
23 | Returns:
24 | ConfluenceV2Adapter instance if OAuth is configured, None otherwise
25 | """
26 | if self.config.auth_type == "oauth" and self.config.is_cloud:
27 | return ConfluenceV2Adapter(
28 | session=self.confluence._session, base_url=self.confluence.url
29 | )
30 | return None
31 |
32 | def get_page_content(
33 | self, page_id: str, *, convert_to_markdown: bool = True
34 | ) -> ConfluencePage:
35 | """
36 | Get content of a specific page.
37 |
38 | Args:
39 | page_id: The ID of the page to retrieve
40 | convert_to_markdown: When True, returns content in markdown format,
41 | otherwise returns raw HTML (keyword-only)
42 |
43 | Returns:
44 | ConfluencePage model containing the page content and metadata
45 |
46 | Raises:
47 | MCPAtlassianAuthenticationError: If authentication fails with the Confluence API (401/403)
48 | Exception: If there is an error retrieving the page
49 | """
50 | try:
51 | # Use v2 API for OAuth authentication, v1 API for token/basic auth
52 | v2_adapter = self._v2_adapter
53 | if v2_adapter:
54 | logger.debug(
55 | f"Using v2 API for OAuth authentication to get page '{page_id}'"
56 | )
57 | page = v2_adapter.get_page(
58 | page_id=page_id,
59 | expand="body.storage,version,space,children.attachment",
60 | )
61 | else:
62 | logger.debug(
63 | f"Using v1 API for token/basic authentication to get page '{page_id}'"
64 | )
65 | page = self.confluence.get_page_by_id(
66 | page_id=page_id,
67 | expand="body.storage,version,space,children.attachment",
68 | )
69 |
70 | space_key = page.get("space", {}).get("key", "")
71 | content = page["body"]["storage"]["value"]
72 | processed_html, processed_markdown = self.preprocessor.process_html_content(
73 | content, space_key=space_key, confluence_client=self.confluence
74 | )
75 |
76 | # Use the appropriate content format based on the convert_to_markdown flag
77 | page_content = processed_markdown if convert_to_markdown else processed_html
78 |
79 | # Create and return the ConfluencePage model
80 | return ConfluencePage.from_api_response(
81 | page,
82 | base_url=self.config.url,
83 | include_body=True,
84 | # Override content with our processed version
85 | content_override=page_content,
86 | content_format="storage" if not convert_to_markdown else "markdown",
87 | is_cloud=self.config.is_cloud,
88 | )
89 | except HTTPError as http_err:
90 | if http_err.response is not None and http_err.response.status_code in [
91 | 401,
92 | 403,
93 | ]:
94 | error_msg = (
95 | f"Authentication failed for Confluence API ({http_err.response.status_code}). "
96 | "Token may be expired or invalid. Please verify credentials."
97 | )
98 | logger.error(error_msg)
99 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
100 | else:
101 | logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
102 | raise http_err
103 | except Exception as e:
104 | logger.error(
105 | f"Error retrieving page content for page ID {page_id}: {str(e)}"
106 | )
107 | raise Exception(f"Error retrieving page content: {str(e)}") from e
108 |
109 | def get_page_ancestors(self, page_id: str) -> list[ConfluencePage]:
110 | """
111 | Get ancestors (parent pages) of a specific page.
112 |
113 | Args:
114 | page_id: The ID of the page to get ancestors for
115 |
116 | Returns:
117 | List of ConfluencePage models representing the ancestors in hierarchical order
118 | (immediate parent first, root ancestor last)
119 |
120 | Raises:
121 | MCPAtlassianAuthenticationError: If authentication fails with the Confluence API (401/403)
122 | """
123 | try:
124 | # Use the Atlassian Python API to get ancestors
125 | ancestors = self.confluence.get_page_ancestors(page_id)
126 |
127 | # Process each ancestor
128 | ancestor_models = []
129 | for ancestor in ancestors:
130 | # Create the page model without fetching content
131 | page_model = ConfluencePage.from_api_response(
132 | ancestor,
133 | base_url=self.config.url,
134 | include_body=False,
135 | )
136 | ancestor_models.append(page_model)
137 |
138 | return ancestor_models
139 | except HTTPError as http_err:
140 | if http_err.response is not None and http_err.response.status_code in [
141 | 401,
142 | 403,
143 | ]:
144 | error_msg = (
145 | f"Authentication failed for Confluence API ({http_err.response.status_code}). "
146 | "Token may be expired or invalid. Please verify credentials."
147 | )
148 | logger.error(error_msg)
149 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
150 | else:
151 | logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
152 | raise http_err
153 | except Exception as e:
154 | logger.error(f"Error fetching ancestors for page {page_id}: {str(e)}")
155 | logger.debug("Full exception details:", exc_info=True)
156 | return []
157 |
158 | def get_page_by_title(
159 | self, space_key: str, title: str, *, convert_to_markdown: bool = True
160 | ) -> ConfluencePage | None:
161 | """
162 | Get a specific page by its title from a Confluence space.
163 |
164 | Args:
165 | space_key: The key of the space containing the page
166 | title: The title of the page to retrieve
167 | convert_to_markdown: When True, returns content in markdown format,
168 | otherwise returns raw HTML (keyword-only)
169 |
170 | Returns:
171 | ConfluencePage model containing the page content and metadata, or None if not found
172 | """
173 | try:
174 | # Directly try to find the page by title
175 | page = self.confluence.get_page_by_title(
176 | space=space_key, title=title, expand="body.storage,version"
177 | )
178 |
179 | if not page:
180 | logger.warning(
181 | f"Page '{title}' not found in space '{space_key}'. "
182 | f"The space may be invalid, the page may not exist, or permissions may be insufficient."
183 | )
184 | return None
185 |
186 | content = page["body"]["storage"]["value"]
187 | processed_html, processed_markdown = self.preprocessor.process_html_content(
188 | content, space_key=space_key, confluence_client=self.confluence
189 | )
190 |
191 | # Use the appropriate content format based on the convert_to_markdown flag
192 | page_content = processed_markdown if convert_to_markdown else processed_html
193 |
194 | # Create and return the ConfluencePage model
195 | return ConfluencePage.from_api_response(
196 | page,
197 | base_url=self.config.url,
198 | include_body=True,
199 | # Override content with our processed version
200 | content_override=page_content,
201 | content_format="storage" if not convert_to_markdown else "markdown",
202 | is_cloud=self.config.is_cloud,
203 | )
204 |
205 | except KeyError as e:
206 | logger.error(f"Missing key in page data: {str(e)}")
207 | return None
208 | except requests.RequestException as e:
209 | logger.error(f"Network error when fetching page: {str(e)}")
210 | return None
211 | except (ValueError, TypeError) as e:
212 | logger.error(f"Error processing page data: {str(e)}")
213 | return None
214 | except Exception as e: # noqa: BLE001 - Intentional fallback with full logging
215 | logger.error(f"Unexpected error fetching page: {str(e)}")
216 | # Log the full traceback at debug level for troubleshooting
217 | logger.debug("Full exception details:", exc_info=True)
218 | return None
219 |
220 | def get_space_pages(
221 | self,
222 | space_key: str,
223 | start: int = 0,
224 | limit: int = 10,
225 | *,
226 | convert_to_markdown: bool = True,
227 | ) -> list[ConfluencePage]:
228 | """
229 | Get all pages from a specific space.
230 |
231 | Args:
232 | space_key: The key of the space to get pages from
233 | start: The starting index for pagination
234 | limit: Maximum number of pages to return
235 | convert_to_markdown: When True, returns content in markdown format,
236 | otherwise returns raw HTML (keyword-only)
237 |
238 | Returns:
239 | List of ConfluencePage models containing page content and metadata
240 | """
241 | pages = self.confluence.get_all_pages_from_space(
242 | space=space_key, start=start, limit=limit, expand="body.storage"
243 | )
244 |
245 | page_models = []
246 | for page in pages:
247 | content = page["body"]["storage"]["value"]
248 | processed_html, processed_markdown = self.preprocessor.process_html_content(
249 | content, space_key=space_key, confluence_client=self.confluence
250 | )
251 |
252 | # Use the appropriate content format based on the convert_to_markdown flag
253 | page_content = processed_markdown if convert_to_markdown else processed_html
254 |
255 | # Ensure space information is included
256 | if "space" not in page:
257 | page["space"] = {
258 | "key": space_key,
259 | "name": space_key, # Use space_key as name if not available
260 | }
261 |
262 | # Create the ConfluencePage model
263 | page_model = ConfluencePage.from_api_response(
264 | page,
265 | base_url=self.config.url,
266 | include_body=True,
267 | # Override content with our processed version
268 | content_override=page_content,
269 | content_format="storage" if not convert_to_markdown else "markdown",
270 | is_cloud=self.config.is_cloud,
271 | )
272 |
273 | page_models.append(page_model)
274 |
275 | return page_models
276 |
277 | def create_page(
278 | self,
279 | space_key: str,
280 | title: str,
281 | body: str,
282 | parent_id: str | None = None,
283 | *,
284 | is_markdown: bool = True,
285 | enable_heading_anchors: bool = False,
286 | content_representation: str | None = None,
287 | ) -> ConfluencePage:
288 | """
289 | Create a new page in a Confluence space.
290 |
291 | Args:
292 | space_key: The key of the space to create the page in
293 | title: The title of the new page
294 | body: The content of the page (markdown, wiki markup, or storage format)
295 | parent_id: Optional ID of a parent page
296 | is_markdown: Whether the body content is in markdown format (default: True, keyword-only)
297 | enable_heading_anchors: Whether to enable automatic heading anchor generation (default: False, keyword-only)
298 | content_representation: Content format when is_markdown=False ('wiki' or 'storage', keyword-only)
299 |
300 | Returns:
301 | ConfluencePage model containing the new page's data
302 |
303 | Raises:
304 | Exception: If there is an error creating the page
305 | """
306 | try:
307 | # Determine body and representation based on content type
308 | if is_markdown:
309 | # Convert markdown to Confluence storage format
310 | final_body = self.preprocessor.markdown_to_confluence_storage(
311 | body, enable_heading_anchors=enable_heading_anchors
312 | )
313 | representation = "storage"
314 | else:
315 | # Use body as-is with specified representation
316 | final_body = body
317 | representation = content_representation or "storage"
318 |
319 | # Use v2 API for OAuth authentication, v1 API for token/basic auth
320 | v2_adapter = self._v2_adapter
321 | if v2_adapter:
322 | logger.debug(
323 | f"Using v2 API for OAuth authentication to create page '{title}'"
324 | )
325 | result = v2_adapter.create_page(
326 | space_key=space_key,
327 | title=title,
328 | body=final_body,
329 | parent_id=parent_id,
330 | representation=representation,
331 | )
332 | else:
333 | logger.debug(
334 | f"Using v1 API for token/basic authentication to create page '{title}'"
335 | )
336 | result = self.confluence.create_page(
337 | space=space_key,
338 | title=title,
339 | body=final_body,
340 | parent_id=parent_id,
341 | representation=representation,
342 | )
343 |
344 | # Get the new page content
345 | page_id = result.get("id")
346 | if not page_id:
347 | raise ValueError("Create page response did not contain an ID")
348 |
349 | return self.get_page_content(page_id)
350 | except Exception as e:
351 | logger.error(
352 | f"Error creating page '{title}' in space {space_key}: {str(e)}"
353 | )
354 | raise Exception(
355 | f"Failed to create page '{title}' in space {space_key}: {str(e)}"
356 | ) from e
357 |
358 | def update_page(
359 | self,
360 | page_id: str,
361 | title: str,
362 | body: str,
363 | *,
364 | is_minor_edit: bool = False,
365 | version_comment: str = "",
366 | is_markdown: bool = True,
367 | parent_id: str | None = None,
368 | enable_heading_anchors: bool = False,
369 | content_representation: str | None = None,
370 | ) -> ConfluencePage:
371 | """
372 | Update an existing page in Confluence.
373 |
374 | Args:
375 | page_id: The ID of the page to update
376 | title: The new title of the page
377 | body: The new content of the page (markdown, wiki markup, or storage format)
378 | is_minor_edit: Whether this is a minor edit (keyword-only)
379 | version_comment: Optional comment for this version (keyword-only)
380 | is_markdown: Whether the body content is in markdown format (default: True, keyword-only)
381 | parent_id: Optional new parent page ID (keyword-only)
382 | enable_heading_anchors: Whether to enable automatic heading anchor generation (default: False, keyword-only)
383 | content_representation: Content format when is_markdown=False ('wiki' or 'storage', keyword-only)
384 |
385 | Returns:
386 | ConfluencePage model containing the updated page's data
387 |
388 | Raises:
389 | Exception: If there is an error updating the page
390 | """
391 | try:
392 | # Determine body and representation based on content type
393 | if is_markdown:
394 | # Convert markdown to Confluence storage format
395 | final_body = self.preprocessor.markdown_to_confluence_storage(
396 | body, enable_heading_anchors=enable_heading_anchors
397 | )
398 | representation = "storage"
399 | else:
400 | # Use body as-is with specified representation
401 | final_body = body
402 | representation = content_representation or "storage"
403 |
404 | logger.debug(f"Updating page {page_id} with title '{title}'")
405 |
406 | # Use v2 API for OAuth authentication, v1 API for token/basic auth
407 | v2_adapter = self._v2_adapter
408 | if v2_adapter:
409 | logger.debug(
410 | f"Using v2 API for OAuth authentication to update page '{page_id}'"
411 | )
412 | response = v2_adapter.update_page(
413 | page_id=page_id,
414 | title=title,
415 | body=final_body,
416 | representation=representation,
417 | version_comment=version_comment,
418 | )
419 | else:
420 | logger.debug(
421 | f"Using v1 API for token/basic authentication to update page '{page_id}'"
422 | )
423 | update_kwargs = {
424 | "page_id": page_id,
425 | "title": title,
426 | "body": final_body,
427 | "type": "page",
428 | "representation": representation,
429 | "minor_edit": is_minor_edit,
430 | "version_comment": version_comment,
431 | "always_update": True,
432 | }
433 | if parent_id:
434 | update_kwargs["parent_id"] = parent_id
435 |
436 | self.confluence.update_page(**update_kwargs)
437 |
438 | # After update, refresh the page data
439 | return self.get_page_content(page_id)
440 | except Exception as e:
441 | logger.error(f"Error updating page {page_id}: {str(e)}")
442 | raise Exception(f"Failed to update page {page_id}: {str(e)}") from e
443 |
444 | def get_page_children(
445 | self,
446 | page_id: str,
447 | start: int = 0,
448 | limit: int = 25,
449 | expand: str = "version",
450 | *,
451 | convert_to_markdown: bool = True,
452 | ) -> list[ConfluencePage]:
453 | """
454 | Get child pages of a specific Confluence page.
455 |
456 | Args:
457 | page_id: The ID of the parent page
458 | start: The starting index for pagination
459 | limit: Maximum number of child pages to return
460 | expand: Fields to expand in the response
461 | convert_to_markdown: When True, returns content in markdown format,
462 | otherwise returns raw HTML (keyword-only)
463 |
464 | Returns:
465 | List of ConfluencePage models containing the child pages
466 | """
467 | try:
468 | # Use the Atlassian Python API's get_page_child_by_type method
469 | results = self.confluence.get_page_child_by_type(
470 | page_id=page_id, type="page", start=start, limit=limit, expand=expand
471 | )
472 |
473 | # Process results
474 | page_models = []
475 |
476 | # Handle both pagination modes
477 | if isinstance(results, dict) and "results" in results:
478 | child_pages = results.get("results", [])
479 | else:
480 | child_pages = results or []
481 |
482 | space_key = ""
483 |
484 | # Get space key from the first result if available
485 | if child_pages and "space" in child_pages[0]:
486 | space_key = child_pages[0].get("space", {}).get("key", "")
487 |
488 | # Process each child page
489 | for page in child_pages:
490 | # Only process content if we have "body" expanded
491 | content_override = None
492 | if "body" in page and convert_to_markdown:
493 | content = page.get("body", {}).get("storage", {}).get("value", "")
494 | if content:
495 | _, processed_markdown = self.preprocessor.process_html_content(
496 | content,
497 | space_key=space_key,
498 | confluence_client=self.confluence,
499 | )
500 | content_override = processed_markdown
501 |
502 | # Create the page model
503 | page_model = ConfluencePage.from_api_response(
504 | page,
505 | base_url=self.config.url,
506 | include_body=True,
507 | content_override=content_override,
508 | content_format="markdown" if convert_to_markdown else "storage",
509 | )
510 |
511 | page_models.append(page_model)
512 |
513 | return page_models
514 |
515 | except Exception as e:
516 | logger.error(f"Error fetching child pages for page {page_id}: {str(e)}")
517 | logger.debug("Full exception details:", exc_info=True)
518 | return []
519 |
520 | def delete_page(self, page_id: str) -> bool:
521 | """
522 | Delete a Confluence page by its ID.
523 |
524 | Args:
525 | page_id: The ID of the page to delete
526 |
527 | Returns:
528 | Boolean indicating success (True) or failure (False)
529 |
530 | Raises:
531 | Exception: If there is an error deleting the page
532 | """
533 | try:
534 | logger.debug(f"Deleting page {page_id}")
535 |
536 | # Use v2 API for OAuth authentication, v1 API for token/basic auth
537 | v2_adapter = self._v2_adapter
538 | if v2_adapter:
539 | logger.debug(
540 | f"Using v2 API for OAuth authentication to delete page '{page_id}'"
541 | )
542 | return v2_adapter.delete_page(page_id=page_id)
543 | else:
544 | logger.debug(
545 | f"Using v1 API for token/basic authentication to delete page '{page_id}'"
546 | )
547 | response = self.confluence.remove_page(page_id=page_id)
548 |
549 | # The Atlassian library's remove_page returns the raw response from
550 | # the REST API call. For a successful deletion, we should get a
551 | # response object, but it might be empty (HTTP 204 No Content).
552 | # For REST DELETE operations, a success typically returns 204 or 200
553 |
554 | # Check if we got a response object
555 | if isinstance(response, requests.Response):
556 | # Check if status code indicates success (2xx)
557 | success = 200 <= response.status_code < 300
558 | logger.debug(
559 | f"Delete page {page_id} returned status code {response.status_code}"
560 | )
561 | return success
562 | # If it's not a response object but truthy (like True), consider it a success
563 | elif response:
564 | return True
565 | # Default to true since no exception was raised
566 | # This is safer than returning false when we don't know what happened
567 | return True
568 |
569 | except Exception as e:
570 | logger.error(f"Error deleting page {page_id}: {str(e)}")
571 | raise Exception(f"Failed to delete page {page_id}: {str(e)}") from e
572 |
```