#
tokens: 46491/50000 8/194 files (page 7/13)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 7 of 13. Use http://codebase.md/sooperset/mcp-atlassian?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .devcontainer
│   ├── devcontainer.json
│   ├── Dockerfile
│   ├── post-create.sh
│   └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   └── feature_request.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── docker-publish.yml
│       ├── lint.yml
│       ├── publish.yml
│       ├── stale.yml
│       └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│   ├── oauth_authorize.py
│   └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│   └── mcp_atlassian
│       ├── __init__.py
│       ├── confluence
│       │   ├── __init__.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── labels.py
│       │   ├── pages.py
│       │   ├── search.py
│       │   ├── spaces.py
│       │   ├── users.py
│       │   ├── utils.py
│       │   └── v2_adapter.py
│       ├── exceptions.py
│       ├── jira
│       │   ├── __init__.py
│       │   ├── attachments.py
│       │   ├── boards.py
│       │   ├── client.py
│       │   ├── comments.py
│       │   ├── config.py
│       │   ├── constants.py
│       │   ├── epics.py
│       │   ├── fields.py
│       │   ├── formatting.py
│       │   ├── issues.py
│       │   ├── links.py
│       │   ├── projects.py
│       │   ├── protocols.py
│       │   ├── search.py
│       │   ├── sprints.py
│       │   ├── transitions.py
│       │   ├── users.py
│       │   └── worklog.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence
│       │   │   ├── __init__.py
│       │   │   ├── comment.py
│       │   │   ├── common.py
│       │   │   ├── label.py
│       │   │   ├── page.py
│       │   │   ├── search.py
│       │   │   ├── space.py
│       │   │   └── user_search.py
│       │   ├── constants.py
│       │   └── jira
│       │       ├── __init__.py
│       │       ├── agile.py
│       │       ├── comment.py
│       │       ├── common.py
│       │       ├── issue.py
│       │       ├── link.py
│       │       ├── project.py
│       │       ├── search.py
│       │       ├── version.py
│       │       ├── workflow.py
│       │       └── worklog.py
│       ├── preprocessing
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── confluence.py
│       │   └── jira.py
│       ├── servers
│       │   ├── __init__.py
│       │   ├── confluence.py
│       │   ├── context.py
│       │   ├── dependencies.py
│       │   ├── jira.py
│       │   └── main.py
│       └── utils
│           ├── __init__.py
│           ├── date.py
│           ├── decorators.py
│           ├── env.py
│           ├── environment.py
│           ├── io.py
│           ├── lifecycle.py
│           ├── logging.py
│           ├── oauth_setup.py
│           ├── oauth.py
│           ├── ssl.py
│           ├── tools.py
│           └── urls.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── fixtures
│   │   ├── __init__.py
│   │   ├── confluence_mocks.py
│   │   └── jira_mocks.py
│   ├── integration
│   │   ├── conftest.py
│   │   ├── README.md
│   │   ├── test_authentication.py
│   │   ├── test_content_processing.py
│   │   ├── test_cross_service.py
│   │   ├── test_mcp_protocol.py
│   │   ├── test_proxy.py
│   │   ├── test_real_api.py
│   │   ├── test_ssl_verification.py
│   │   ├── test_stdin_monitoring_fix.py
│   │   └── test_transport_lifecycle.py
│   ├── README.md
│   ├── test_preprocessing.py
│   ├── test_real_api_validation.py
│   ├── unit
│   │   ├── confluence
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_labels.py
│   │   │   ├── test_pages.py
│   │   │   ├── test_search.py
│   │   │   ├── test_spaces.py
│   │   │   ├── test_users.py
│   │   │   ├── test_utils.py
│   │   │   └── test_v2_adapter.py
│   │   ├── jira
│   │   │   ├── conftest.py
│   │   │   ├── test_attachments.py
│   │   │   ├── test_boards.py
│   │   │   ├── test_client_oauth.py
│   │   │   ├── test_client.py
│   │   │   ├── test_comments.py
│   │   │   ├── test_config.py
│   │   │   ├── test_constants.py
│   │   │   ├── test_custom_headers.py
│   │   │   ├── test_epics.py
│   │   │   ├── test_fields.py
│   │   │   ├── test_formatting.py
│   │   │   ├── test_issues_markdown.py
│   │   │   ├── test_issues.py
│   │   │   ├── test_links.py
│   │   │   ├── test_projects.py
│   │   │   ├── test_protocols.py
│   │   │   ├── test_search.py
│   │   │   ├── test_sprints.py
│   │   │   ├── test_transitions.py
│   │   │   ├── test_users.py
│   │   │   └── test_worklog.py
│   │   ├── models
│   │   │   ├── __init__.py
│   │   │   ├── conftest.py
│   │   │   ├── test_base_models.py
│   │   │   ├── test_confluence_models.py
│   │   │   ├── test_constants.py
│   │   │   └── test_jira_models.py
│   │   ├── servers
│   │   │   ├── __init__.py
│   │   │   ├── test_confluence_server.py
│   │   │   ├── test_context.py
│   │   │   ├── test_dependencies.py
│   │   │   ├── test_jira_server.py
│   │   │   └── test_main_server.py
│   │   ├── test_exceptions.py
│   │   ├── test_main_transport_selection.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── test_custom_headers.py
│   │       ├── test_date.py
│   │       ├── test_decorators.py
│   │       ├── test_env.py
│   │       ├── test_environment.py
│   │       ├── test_io.py
│   │       ├── test_lifecycle.py
│   │       ├── test_logging.py
│   │       ├── test_masking.py
│   │       ├── test_oauth_setup.py
│   │       ├── test_oauth.py
│   │       ├── test_ssl.py
│   │       ├── test_tools.py
│   │       └── test_urls.py
│   └── utils
│       ├── __init__.py
│       ├── assertions.py
│       ├── base.py
│       ├── factories.py
│       └── mocks.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/integration/test_real_api.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Integration tests with real Atlassian APIs.
  3 | 
  4 | These tests are skipped by default and only run with --integration --use-real-data flags.
  5 | They require proper environment configuration and will create/modify real data.
  6 | """
  7 | 
  8 | import os
  9 | import time
 10 | import uuid
 11 | 
 12 | import pytest
 13 | 
 14 | from mcp_atlassian.confluence import ConfluenceFetcher
 15 | from mcp_atlassian.confluence.config import ConfluenceConfig
 16 | from mcp_atlassian.jira import JiraFetcher
 17 | from mcp_atlassian.jira.config import JiraConfig
 18 | from tests.utils.base import BaseAuthTest
 19 | 
 20 | 
 21 | @pytest.mark.integration
 22 | class TestRealJiraAPI(BaseAuthTest):
 23 |     """Real Jira API integration tests with cleanup."""
 24 | 
 25 |     @pytest.fixture(autouse=True)
 26 |     def skip_without_real_data(self, request):
 27 |         """Skip these tests unless --use-real-data is provided."""
 28 |         if not request.config.getoption("--use-real-data", default=False):
 29 |             pytest.skip("Real API tests only run with --use-real-data flag")
 30 | 
 31 |     @pytest.fixture
 32 |     def jira_client(self):
 33 |         """Create real Jira client from environment."""
 34 |         if not os.getenv("JIRA_URL"):
 35 |             pytest.skip("JIRA_URL not set in environment")
 36 | 
 37 |         config = JiraConfig.from_env()
 38 |         return JiraFetcher(config=config)
 39 | 
 40 |     @pytest.fixture
 41 |     def test_project_key(self):
 42 |         """Get test project key from environment."""
 43 |         key = os.getenv("JIRA_TEST_PROJECT_KEY", "TEST")
 44 |         return key
 45 | 
 46 |     @pytest.fixture
 47 |     def created_issues(self):
 48 |         """Track created issues for cleanup."""
 49 |         issues = []
 50 |         yield issues
 51 |         # Cleanup will be done in individual tests
 52 | 
 53 |     def test_complete_issue_lifecycle(
 54 |         self, jira_client, test_project_key, created_issues
 55 |     ):
 56 |         """Test create, update, transition, and delete issue lifecycle."""
 57 |         # Create unique summary to avoid conflicts
 58 |         unique_id = str(uuid.uuid4())[:8]
 59 |         summary = f"Integration Test Issue {unique_id}"
 60 | 
 61 |         # 1. Create issue
 62 |         issue_data = {
 63 |             "project": {"key": test_project_key},
 64 |             "summary": summary,
 65 |             "description": "This is an integration test issue that will be deleted",
 66 |             "issuetype": {"name": "Task"},
 67 |         }
 68 | 
 69 |         created_issue = jira_client.create_issue(**issue_data)
 70 |         created_issues.append(created_issue.key)
 71 | 
 72 |         assert created_issue.key.startswith(test_project_key)
 73 |         assert created_issue.fields.summary == summary
 74 | 
 75 |         # 2. Update issue
 76 |         update_data = {
 77 |             "summary": f"{summary} - Updated",
 78 |             "description": "Updated description",
 79 |         }
 80 | 
 81 |         updated_issue = jira_client.update_issue(
 82 |             issue_key=created_issue.key, **update_data
 83 |         )
 84 | 
 85 |         assert updated_issue.fields.summary == f"{summary} - Updated"
 86 | 
 87 |         # 3. Add comment
 88 |         comment = jira_client.add_comment(
 89 |             issue_key=created_issue.key, body="Test comment from integration test"
 90 |         )
 91 | 
 92 |         assert comment.body == "Test comment from integration test"
 93 | 
 94 |         # 4. Get available transitions
 95 |         transitions = jira_client.get_transitions(issue_key=created_issue.key)
 96 |         assert len(transitions) > 0
 97 | 
 98 |         # 5. Transition issue (if "Done" transition available)
 99 |         done_transition = next(
100 |             (t for t in transitions if "done" in t.name.lower()), None
101 |         )
102 |         if done_transition:
103 |             jira_client.transition_issue(
104 |                 issue_key=created_issue.key, transition_id=done_transition.id
105 |             )
106 | 
107 |         # 6. Delete issue
108 |         jira_client.delete_issue(issue_key=created_issue.key)
109 |         created_issues.remove(created_issue.key)
110 | 
111 |         # Verify deletion
112 |         with pytest.raises(Exception):
113 |             jira_client.get_issue(issue_key=created_issue.key)
114 | 
115 |     def test_attachment_upload_download(
116 |         self, jira_client, test_project_key, created_issues, tmp_path
117 |     ):
118 |         """Test attachment upload and download flow."""
119 |         # Create test issue
120 |         unique_id = str(uuid.uuid4())[:8]
121 |         issue_data = {
122 |             "project": {"key": test_project_key},
123 |             "summary": f"Attachment Test {unique_id}",
124 |             "issuetype": {"name": "Task"},
125 |         }
126 | 
127 |         issue = jira_client.create_issue(**issue_data)
128 |         created_issues.append(issue.key)
129 | 
130 |         try:
131 |             # Create test file
132 |             test_file = tmp_path / "test_attachment.txt"
133 |             test_content = f"Test content {unique_id}"
134 |             test_file.write_text(test_content)
135 | 
136 |             # Upload attachment
137 |             with open(test_file, "rb") as f:
138 |                 attachments = jira_client.add_attachment(
139 |                     issue_key=issue.key, filename="test_attachment.txt", data=f.read()
140 |                 )
141 | 
142 |             assert len(attachments) == 1
143 |             attachment = attachments[0]
144 |             assert attachment.filename == "test_attachment.txt"
145 | 
146 |             # Get issue with attachments
147 |             issue_with_attachments = jira_client.get_issue(
148 |                 issue_key=issue.key, expand="attachment"
149 |             )
150 | 
151 |             assert len(issue_with_attachments.fields.attachment) == 1
152 | 
153 |         finally:
154 |             # Cleanup
155 |             jira_client.delete_issue(issue_key=issue.key)
156 |             created_issues.remove(issue.key)
157 | 
158 |     def test_jql_search_with_pagination(self, jira_client, test_project_key):
159 |         """Test JQL search with pagination."""
160 |         # Search for recent issues in test project
161 |         jql = f"project = {test_project_key} ORDER BY created DESC"
162 | 
163 |         # First page
164 |         results_page1 = jira_client.search_issues(jql=jql, start_at=0, max_results=2)
165 | 
166 |         assert results_page1.total >= 0
167 | 
168 |         if results_page1.total > 2:
169 |             # Second page
170 |             results_page2 = jira_client.search_issues(
171 |                 jql=jql, start_at=2, max_results=2
172 |             )
173 | 
174 |             # Ensure different issues
175 |             page1_keys = [i.key for i in results_page1.issues]
176 |             page2_keys = [i.key for i in results_page2.issues]
177 |             assert not set(page1_keys).intersection(set(page2_keys))
178 | 
179 |     def test_bulk_issue_creation(self, jira_client, test_project_key, created_issues):
180 |         """Test creating multiple issues in bulk."""
181 |         unique_id = str(uuid.uuid4())[:8]
182 |         issues_data = []
183 | 
184 |         # Prepare 3 issues
185 |         for i in range(3):
186 |             issues_data.append(
187 |                 {
188 |                     "project": {"key": test_project_key},
189 |                     "summary": f"Bulk Test Issue {i + 1} - {unique_id}",
190 |                     "issuetype": {"name": "Task"},
191 |                 }
192 |             )
193 | 
194 |         # Create issues
195 |         created = []
196 |         try:
197 |             for issue_data in issues_data:
198 |                 issue = jira_client.create_issue(**issue_data)
199 |                 created.append(issue)
200 |                 created_issues.append(issue.key)
201 | 
202 |             assert len(created) == 3
203 | 
204 |             # Verify all created
205 |             for i, issue in enumerate(created):
206 |                 assert f"Bulk Test Issue {i + 1}" in issue.fields.summary
207 | 
208 |         finally:
209 |             # Cleanup all created issues
210 |             for issue in created:
211 |                 try:
212 |                     jira_client.delete_issue(issue_key=issue.key)
213 |                     created_issues.remove(issue.key)
214 |                 except Exception:
215 |                     pass
216 | 
217 |     def test_rate_limiting_behavior(self, jira_client):
218 |         """Test API rate limiting behavior with retries."""
219 |         # Make multiple rapid requests
220 |         start_time = time.time()
221 | 
222 |         for _i in range(5):
223 |             try:
224 |                 jira_client.get_fields()
225 |             except Exception as e:
226 |                 if "429" in str(e) or "rate limit" in str(e).lower():
227 |                     # Rate limit hit - this is expected
228 |                     assert True
229 |                     return
230 | 
231 |         # If no rate limit hit, that's also fine
232 |         elapsed = time.time() - start_time
233 |         assert elapsed < 10  # Should complete quickly if no rate limiting
234 | 
235 | 
236 | @pytest.mark.integration
237 | class TestRealConfluenceAPI(BaseAuthTest):
238 |     """Real Confluence API integration tests with cleanup."""
239 | 
240 |     @pytest.fixture(autouse=True)
241 |     def skip_without_real_data(self, request):
242 |         """Skip these tests unless --use-real-data is provided."""
243 |         if not request.config.getoption("--use-real-data", default=False):
244 |             pytest.skip("Real API tests only run with --use-real-data flag")
245 | 
246 |     @pytest.fixture
247 |     def confluence_client(self):
248 |         """Create real Confluence client from environment."""
249 |         if not os.getenv("CONFLUENCE_URL"):
250 |             pytest.skip("CONFLUENCE_URL not set in environment")
251 | 
252 |         config = ConfluenceConfig.from_env()
253 |         return ConfluenceFetcher(config=config)
254 | 
255 |     @pytest.fixture
256 |     def test_space_key(self):
257 |         """Get test space key from environment."""
258 |         key = os.getenv("CONFLUENCE_TEST_SPACE_KEY", "TEST")
259 |         return key
260 | 
261 |     @pytest.fixture
262 |     def created_pages(self):
263 |         """Track created pages for cleanup."""
264 |         pages = []
265 |         yield pages
266 |         # Cleanup will be done in individual tests
267 | 
268 |     def test_page_lifecycle(self, confluence_client, test_space_key, created_pages):
269 |         """Test create, update, and delete page lifecycle."""
270 |         unique_id = str(uuid.uuid4())[:8]
271 |         title = f"Integration Test Page {unique_id}"
272 | 
273 |         # 1. Create page
274 |         page = confluence_client.create_page(
275 |             space_key=test_space_key,
276 |             title=title,
277 |             body="<p>This is an integration test page</p>",
278 |         )
279 |         created_pages.append(page.id)
280 | 
281 |         assert page.title == title
282 |         assert page.space.key == test_space_key
283 | 
284 |         # 2. Update page
285 |         updated_page = confluence_client.update_page(
286 |             page_id=page.id,
287 |             title=f"{title} - Updated",
288 |             body="<p>Updated content</p>",
289 |             version_number=page.version.number + 1,
290 |         )
291 | 
292 |         assert updated_page.title == f"{title} - Updated"
293 |         assert updated_page.version.number == page.version.number + 1
294 | 
295 |         # 3. Add comment
296 |         comment = confluence_client.add_comment(
297 |             page_id=page.id, body="Test comment from integration test"
298 |         )
299 | 
300 |         assert "Test comment" in comment.body.storage.value
301 | 
302 |         # 4. Delete page
303 |         confluence_client.delete_page(page_id=page.id)
304 |         created_pages.remove(page.id)
305 | 
306 |         # Verify deletion
307 |         with pytest.raises(Exception):
308 |             confluence_client.get_page_by_id(page_id=page.id)
309 | 
310 |     def test_page_hierarchy(self, confluence_client, test_space_key, created_pages):
311 |         """Test creating page hierarchy with parent-child relationships."""
312 |         unique_id = str(uuid.uuid4())[:8]
313 | 
314 |         # Create parent page
315 |         parent = confluence_client.create_page(
316 |             space_key=test_space_key,
317 |             title=f"Parent Page {unique_id}",
318 |             body="<p>Parent content</p>",
319 |         )
320 |         created_pages.append(parent.id)
321 | 
322 |         try:
323 |             # Create child page
324 |             child = confluence_client.create_page(
325 |                 space_key=test_space_key,
326 |                 title=f"Child Page {unique_id}",
327 |                 body="<p>Child content</p>",
328 |                 parent_id=parent.id,
329 |             )
330 |             created_pages.append(child.id)
331 | 
332 |             # Get child pages
333 |             children = confluence_client.get_page_children(
334 |                 page_id=parent.id, expand="body.storage"
335 |             )
336 | 
337 |             assert len(children.results) == 1
338 |             assert children.results[0].id == child.id
339 | 
340 |             # Delete child first, then parent
341 |             confluence_client.delete_page(page_id=child.id)
342 |             created_pages.remove(child.id)
343 | 
344 |         finally:
345 |             # Cleanup parent
346 |             confluence_client.delete_page(page_id=parent.id)
347 |             created_pages.remove(parent.id)
348 | 
349 |     def test_cql_search(self, confluence_client, test_space_key):
350 |         """Test CQL search functionality."""
351 |         # Search for pages in test space
352 |         cql = f'space = "{test_space_key}" and type = "page"'
353 | 
354 |         results = confluence_client.search_content(cql=cql, limit=5)
355 | 
356 |         assert results.size >= 0
357 | 
358 |         # Verify all results are from test space
359 |         for result in results.results:
360 |             if hasattr(result, "space"):
361 |                 assert result.space.key == test_space_key
362 | 
363 |     def test_attachment_handling(
364 |         self, confluence_client, test_space_key, created_pages, tmp_path
365 |     ):
366 |         """Test attachment upload to Confluence page."""
367 |         unique_id = str(uuid.uuid4())[:8]
368 | 
369 |         # Create page
370 |         page = confluence_client.create_page(
371 |             space_key=test_space_key,
372 |             title=f"Attachment Test Page {unique_id}",
373 |             body="<p>Page with attachments</p>",
374 |         )
375 |         created_pages.append(page.id)
376 | 
377 |         try:
378 |             # Create test file
379 |             test_file = tmp_path / "confluence_test.txt"
380 |             test_content = f"Confluence test content {unique_id}"
381 |             test_file.write_text(test_content)
382 | 
383 |             # Upload attachment
384 |             with open(test_file, "rb") as f:
385 |                 attachment = confluence_client.create_attachment(
386 |                     page_id=page.id, filename="confluence_test.txt", data=f.read()
387 |                 )
388 | 
389 |             assert attachment.title == "confluence_test.txt"
390 | 
391 |             # Get page attachments
392 |             attachments = confluence_client.get_attachments(page_id=page.id)
393 |             assert len(attachments.results) == 1
394 |             assert attachments.results[0].title == "confluence_test.txt"
395 | 
396 |         finally:
397 |             # Cleanup
398 |             confluence_client.delete_page(page_id=page.id)
399 |             created_pages.remove(page.id)
400 | 
401 |     def test_large_content_handling(
402 |         self, confluence_client, test_space_key, created_pages
403 |     ):
404 |         """Test handling of large content (>1MB)."""
405 |         unique_id = str(uuid.uuid4())[:8]
406 | 
407 |         # Create large content (approximately 1MB)
408 |         large_content = "<p>" + ("Large content block. " * 10000) + "</p>"
409 | 
410 |         # Create page with large content
411 |         page = confluence_client.create_page(
412 |             space_key=test_space_key,
413 |             title=f"Large Content Test {unique_id}",
414 |             body=large_content,
415 |         )
416 |         created_pages.append(page.id)
417 | 
418 |         try:
419 |             # Retrieve and verify
420 |             retrieved = confluence_client.get_page_by_id(
421 |                 page_id=page.id, expand="body.storage"
422 |             )
423 | 
424 |             assert len(retrieved.body.storage.value) > 100000  # At least 100KB
425 | 
426 |         finally:
427 |             # Cleanup
428 |             confluence_client.delete_page(page_id=page.id)
429 |             created_pages.remove(page.id)
430 | 
431 | 
432 | @pytest.mark.integration
433 | class TestCrossServiceIntegration:
434 |     """Test integration between Jira and Confluence services."""
435 | 
436 |     @pytest.fixture(autouse=True)
437 |     def skip_without_real_data(self, request):
438 |         """Skip these tests unless --use-real-data is provided."""
439 |         if not request.config.getoption("--use-real-data", default=False):
440 |             pytest.skip("Real API tests only run with --use-real-data flag")
441 | 
442 |     @pytest.fixture
443 |     def jira_client(self):
444 |         """Create real Jira client from environment."""
445 |         if not os.getenv("JIRA_URL"):
446 |             pytest.skip("JIRA_URL not set in environment")
447 | 
448 |         config = JiraConfig.from_env()
449 |         return JiraFetcher(config=config)
450 | 
451 |     @pytest.fixture
452 |     def confluence_client(self):
453 |         """Create real Confluence client from environment."""
454 |         if not os.getenv("CONFLUENCE_URL"):
455 |             pytest.skip("CONFLUENCE_URL not set in environment")
456 | 
457 |         config = ConfluenceConfig.from_env()
458 |         return ConfluenceFetcher(config=config)
459 | 
460 |     @pytest.fixture
461 |     def test_project_key(self):
462 |         """Get test project key from environment."""
463 |         return os.getenv("JIRA_TEST_PROJECT_KEY", "TEST")
464 | 
465 |     @pytest.fixture
466 |     def test_space_key(self):
467 |         """Get test space key from environment."""
468 |         return os.getenv("CONFLUENCE_TEST_SPACE_KEY", "TEST")
469 | 
470 |     @pytest.fixture
471 |     def created_issues(self):
472 |         """Track created issues for cleanup."""
473 |         issues = []
474 |         yield issues
475 | 
476 |     @pytest.fixture
477 |     def created_pages(self):
478 |         """Track created pages for cleanup."""
479 |         pages = []
480 |         yield pages
481 | 
482 |     def test_jira_confluence_linking(
483 |         self,
484 |         jira_client,
485 |         confluence_client,
486 |         test_project_key,
487 |         test_space_key,
488 |         created_issues,
489 |         created_pages,
490 |     ):
491 |         """Test linking between Jira issues and Confluence pages."""
492 |         unique_id = str(uuid.uuid4())[:8]
493 | 
494 |         # Create Jira issue
495 |         issue = jira_client.create_issue(
496 |             project={"key": test_project_key},
497 |             summary=f"Linked Issue {unique_id}",
498 |             issuetype={"name": "Task"},
499 |         )
500 |         created_issues.append(issue.key)
501 | 
502 |         # Create Confluence page with Jira issue link
503 |         page_content = f'<p>Related to Jira issue: <a href="{jira_client.config.url}/browse/{issue.key}">{issue.key}</a></p>'
504 | 
505 |         page = confluence_client.create_page(
506 |             space_key=test_space_key,
507 |             title=f"Linked Page {unique_id}",
508 |             body=page_content,
509 |         )
510 |         created_pages.append(page.id)
511 | 
512 |         try:
513 |             # Add comment in Jira referencing Confluence page
514 |             confluence_url = (
515 |                 f"{confluence_client.config.url}/pages/viewpage.action?pageId={page.id}"
516 |             )
517 |             jira_client.add_comment(
518 |                 issue_key=issue.key,
519 |                 body=f"Documentation available at: {confluence_url}",
520 |             )
521 | 
522 |             # Verify both exist and contain cross-references
523 |             issue_comments = jira_client.get_comments(issue_key=issue.key)
524 |             assert any(confluence_url in c.body for c in issue_comments.comments)
525 | 
526 |             retrieved_page = confluence_client.get_page_by_id(
527 |                 page_id=page.id, expand="body.storage"
528 |             )
529 |             assert issue.key in retrieved_page.body.storage.value
530 | 
531 |         finally:
532 |             # Cleanup
533 |             jira_client.delete_issue(issue_key=issue.key)
534 |             created_issues.remove(issue.key)
535 |             confluence_client.delete_page(page_id=page.id)
536 |             created_pages.remove(page.id)
537 | 
```

--------------------------------------------------------------------------------
/tests/unit/servers/test_confluence_server.py:
--------------------------------------------------------------------------------

```python
  1 | """Unit tests for the Confluence FastMCP server."""
  2 | 
  3 | import json
  4 | import logging
  5 | from collections.abc import AsyncGenerator
  6 | from contextlib import asynccontextmanager
  7 | from unittest.mock import AsyncMock, MagicMock, patch
  8 | 
  9 | import pytest
 10 | from fastmcp import Client, FastMCP
 11 | from fastmcp.client import FastMCPTransport
 12 | from starlette.requests import Request
 13 | 
 14 | from src.mcp_atlassian.confluence import ConfluenceFetcher
 15 | from src.mcp_atlassian.confluence.config import ConfluenceConfig
 16 | from src.mcp_atlassian.models.confluence.page import ConfluencePage
 17 | from src.mcp_atlassian.servers.context import MainAppContext
 18 | from src.mcp_atlassian.servers.main import AtlassianMCP
 19 | from src.mcp_atlassian.utils.oauth import OAuthConfig
 20 | 
 21 | logger = logging.getLogger(__name__)
 22 | 
 23 | 
 24 | @pytest.fixture
 25 | def mock_confluence_fetcher():
 26 |     """Create a mocked ConfluenceFetcher instance for testing."""
 27 |     mock_fetcher = MagicMock(spec=ConfluenceFetcher)
 28 | 
 29 |     # Mock page for various methods
 30 |     mock_page = MagicMock(spec=ConfluencePage)
 31 |     mock_page.to_simplified_dict.return_value = {
 32 |         "id": "123456",
 33 |         "title": "Test Page Mock Title",
 34 |         "url": "https://example.atlassian.net/wiki/spaces/TEST/pages/123456/Test+Page",
 35 |         "content": {
 36 |             "value": "This is a test page content in Markdown",
 37 |             "format": "markdown",
 38 |         },
 39 |     }
 40 |     mock_page.content = "This is a test page content in Markdown"
 41 | 
 42 |     # Set up mock responses for each method
 43 |     mock_fetcher.search.return_value = [mock_page]
 44 |     mock_fetcher.get_page_content.return_value = mock_page
 45 |     mock_fetcher.get_page_children.return_value = [mock_page]
 46 |     mock_fetcher.create_page.return_value = mock_page
 47 |     mock_fetcher.update_page.return_value = mock_page
 48 |     mock_fetcher.delete_page.return_value = True
 49 | 
 50 |     # Mock comment
 51 |     mock_comment = MagicMock()
 52 |     mock_comment.to_simplified_dict.return_value = {
 53 |         "id": "789",
 54 |         "author": "Test User",
 55 |         "created": "2023-08-01T12:00:00.000Z",
 56 |         "body": "This is a test comment",
 57 |     }
 58 |     mock_fetcher.get_page_comments.return_value = [mock_comment]
 59 | 
 60 |     # Mock label
 61 |     mock_label = MagicMock()
 62 |     mock_label.to_simplified_dict.return_value = {"id": "lbl1", "name": "test-label"}
 63 |     mock_fetcher.get_page_labels.return_value = [mock_label]
 64 |     mock_fetcher.add_page_label.return_value = [mock_label]
 65 | 
 66 |     # Mock add_comment method
 67 |     mock_comment = MagicMock()
 68 |     mock_comment.to_simplified_dict.return_value = {
 69 |         "id": "987",
 70 |         "author": "Test User",
 71 |         "created": "2023-08-01T13:00:00.000Z",
 72 |         "body": "This is a test comment added via API",
 73 |     }
 74 |     mock_fetcher.add_comment.return_value = mock_comment
 75 | 
 76 |     # Mock search_user method
 77 |     mock_user_search_result = MagicMock()
 78 |     mock_user_search_result.to_simplified_dict.return_value = {
 79 |         "entity_type": "user",
 80 |         "title": "First Last",
 81 |         "score": 0.0,
 82 |         "user": {
 83 |             "account_id": "a031248587011jasoidf9832jd8j1",
 84 |             "display_name": "First Last",
 85 |             "email": "[email protected]",
 86 |             "profile_picture": "/wiki/aa-avatar/a031248587011jasoidf9832jd8j1",
 87 |             "is_active": True,
 88 |         },
 89 |         "url": "/people/a031248587011jasoidf9832jd8j1",
 90 |         "last_modified": "2025-06-02T13:35:59.680Z",
 91 |         "excerpt": "",
 92 |     }
 93 |     mock_fetcher.search_user.return_value = [mock_user_search_result]
 94 | 
 95 |     return mock_fetcher
 96 | 
 97 | 
 98 | @pytest.fixture
 99 | def mock_base_confluence_config():
100 |     """Create a mock base ConfluenceConfig for MainAppContext using OAuth for multi-user scenario."""
101 |     mock_oauth_config = OAuthConfig(
102 |         client_id="server_client_id",
103 |         client_secret="server_client_secret",
104 |         redirect_uri="http://localhost",
105 |         scope="read:confluence",
106 |         cloud_id="mock_cloud_id",
107 |     )
108 |     return ConfluenceConfig(
109 |         url="https://mock.atlassian.net/wiki",
110 |         auth_type="oauth",
111 |         oauth_config=mock_oauth_config,
112 |     )
113 | 
114 | 
115 | @pytest.fixture
116 | def test_confluence_mcp(mock_confluence_fetcher, mock_base_confluence_config):
117 |     """Create a test FastMCP instance with standard configuration."""
118 | 
119 |     # Import and register tool functions (as they are in confluence.py)
120 |     from src.mcp_atlassian.servers.confluence import (
121 |         add_comment,
122 |         add_label,
123 |         create_page,
124 |         delete_page,
125 |         get_comments,
126 |         get_labels,
127 |         get_page,
128 |         get_page_children,
129 |         search,
130 |         search_user,
131 |         update_page,
132 |     )
133 | 
134 |     @asynccontextmanager
135 |     async def test_lifespan(app: FastMCP) -> AsyncGenerator[MainAppContext, None]:
136 |         try:
137 |             yield MainAppContext(
138 |                 full_confluence_config=mock_base_confluence_config, read_only=False
139 |             )
140 |         finally:
141 |             pass
142 | 
143 |     test_mcp = AtlassianMCP(
144 |         "TestConfluence",
145 |         description="Test Confluence MCP Server",
146 |         lifespan=test_lifespan,
147 |     )
148 | 
149 |     # Create and configure the sub-MCP for Confluence tools
150 |     confluence_sub_mcp = FastMCP(name="TestConfluenceSubMCP")
151 |     confluence_sub_mcp.tool()(search)
152 |     confluence_sub_mcp.tool()(get_page)
153 |     confluence_sub_mcp.tool()(get_page_children)
154 |     confluence_sub_mcp.tool()(get_comments)
155 |     confluence_sub_mcp.tool()(add_comment)
156 |     confluence_sub_mcp.tool()(get_labels)
157 |     confluence_sub_mcp.tool()(add_label)
158 |     confluence_sub_mcp.tool()(create_page)
159 |     confluence_sub_mcp.tool()(update_page)
160 |     confluence_sub_mcp.tool()(delete_page)
161 |     confluence_sub_mcp.tool()(search_user)
162 | 
163 |     test_mcp.mount("confluence", confluence_sub_mcp)
164 | 
165 |     return test_mcp
166 | 
167 | 
168 | @pytest.fixture
169 | def no_fetcher_test_confluence_mcp(mock_base_confluence_config):
170 |     """Create a test FastMCP instance that simulates missing Confluence fetcher."""
171 | 
172 |     # Import and register tool functions (as they are in confluence.py)
173 |     from src.mcp_atlassian.servers.confluence import (
174 |         add_comment,
175 |         add_label,
176 |         create_page,
177 |         delete_page,
178 |         get_comments,
179 |         get_labels,
180 |         get_page,
181 |         get_page_children,
182 |         search,
183 |         search_user,
184 |         update_page,
185 |     )
186 | 
187 |     @asynccontextmanager
188 |     async def no_fetcher_test_lifespan(
189 |         app: FastMCP,
190 |     ) -> AsyncGenerator[MainAppContext, None]:
191 |         try:
192 |             yield MainAppContext(
193 |                 full_confluence_config=mock_base_confluence_config, read_only=False
194 |             )
195 |         finally:
196 |             pass
197 | 
198 |     test_mcp = AtlassianMCP(
199 |         "NoFetcherTestConfluence",
200 |         description="No Fetcher Test Confluence MCP Server",
201 |         lifespan=no_fetcher_test_lifespan,
202 |     )
203 | 
204 |     # Create and configure the sub-MCP for Confluence tools
205 |     confluence_sub_mcp = FastMCP(name="NoFetcherTestConfluenceSubMCP")
206 |     confluence_sub_mcp.tool()(search)
207 |     confluence_sub_mcp.tool()(get_page)
208 |     confluence_sub_mcp.tool()(get_page_children)
209 |     confluence_sub_mcp.tool()(get_comments)
210 |     confluence_sub_mcp.tool()(add_comment)
211 |     confluence_sub_mcp.tool()(get_labels)
212 |     confluence_sub_mcp.tool()(add_label)
213 |     confluence_sub_mcp.tool()(create_page)
214 |     confluence_sub_mcp.tool()(update_page)
215 |     confluence_sub_mcp.tool()(delete_page)
216 |     confluence_sub_mcp.tool()(search_user)
217 | 
218 |     test_mcp.mount("confluence", confluence_sub_mcp)
219 | 
220 |     return test_mcp
221 | 
222 | 
223 | @pytest.fixture
224 | def mock_request():
225 |     """Provides a mock Starlette Request object with a state."""
226 |     request = MagicMock(spec=Request)
227 |     request.state = MagicMock()
228 |     return request
229 | 
230 | 
231 | @pytest.fixture
232 | async def client(test_confluence_mcp, mock_confluence_fetcher):
233 |     """Create a FastMCP client with mocked Confluence fetcher and request state."""
234 |     with (
235 |         patch(
236 |             "src.mcp_atlassian.servers.confluence.get_confluence_fetcher",
237 |             AsyncMock(return_value=mock_confluence_fetcher),
238 |         ),
239 |         patch(
240 |             "src.mcp_atlassian.servers.dependencies.get_http_request",
241 |             MagicMock(spec=Request, state=MagicMock()),
242 |         ),
243 |     ):
244 |         client_instance = Client(transport=FastMCPTransport(test_confluence_mcp))
245 |         async with client_instance as connected_client:
246 |             yield connected_client
247 | 
248 | 
249 | @pytest.fixture
250 | async def no_fetcher_client_fixture(no_fetcher_test_confluence_mcp, mock_request):
251 |     """Create a client that simulates missing Confluence fetcher configuration."""
252 |     client_for_no_fetcher_test = Client(
253 |         transport=FastMCPTransport(no_fetcher_test_confluence_mcp)
254 |     )
255 |     async with client_for_no_fetcher_test as connected_client_for_no_fetcher:
256 |         yield connected_client_for_no_fetcher
257 | 
258 | 
259 | @pytest.mark.anyio
260 | async def test_search(client, mock_confluence_fetcher):
261 |     """Test the search tool with basic query."""
262 |     response = await client.call_tool("confluence_search", {"query": "test search"})
263 | 
264 |     mock_confluence_fetcher.search.assert_called_once()
265 |     args, kwargs = mock_confluence_fetcher.search.call_args
266 |     assert 'siteSearch ~ "test search"' in args[0]
267 |     assert kwargs.get("limit") == 10
268 |     assert kwargs.get("spaces_filter") is None
269 | 
270 |     result_data = json.loads(response[0].text)
271 |     assert isinstance(result_data, list)
272 |     assert len(result_data) > 0
273 |     assert result_data[0]["title"] == "Test Page Mock Title"
274 | 
275 | 
276 | @pytest.mark.anyio
277 | async def test_get_page(client, mock_confluence_fetcher):
278 |     """Test the get_page tool with default parameters."""
279 |     response = await client.call_tool("confluence_get_page", {"page_id": "123456"})
280 | 
281 |     mock_confluence_fetcher.get_page_content.assert_called_once_with(
282 |         "123456", convert_to_markdown=True
283 |     )
284 | 
285 |     result_data = json.loads(response[0].text)
286 |     assert "metadata" in result_data
287 |     assert result_data["metadata"]["title"] == "Test Page Mock Title"
288 |     assert "content" in result_data["metadata"]
289 |     assert "value" in result_data["metadata"]["content"]
290 |     assert "This is a test page content" in result_data["metadata"]["content"]["value"]
291 | 
292 | 
293 | @pytest.mark.anyio
294 | async def test_get_page_no_metadata(client, mock_confluence_fetcher):
295 |     """Test get_page with metadata disabled."""
296 |     response = await client.call_tool(
297 |         "confluence_get_page", {"page_id": "123456", "include_metadata": False}
298 |     )
299 | 
300 |     mock_confluence_fetcher.get_page_content.assert_called_once_with(
301 |         "123456", convert_to_markdown=True
302 |     )
303 | 
304 |     result_data = json.loads(response[0].text)
305 |     assert "metadata" not in result_data
306 |     assert "content" in result_data
307 |     assert "This is a test page content" in result_data["content"]["value"]
308 | 
309 | 
310 | @pytest.mark.anyio
311 | async def test_get_page_no_markdown(client, mock_confluence_fetcher):
312 |     """Test get_page with HTML content format."""
313 |     mock_page_html = MagicMock(spec=ConfluencePage)
314 |     mock_page_html.to_simplified_dict.return_value = {
315 |         "id": "123456",
316 |         "title": "Test Page HTML",
317 |         "url": "https://example.com/html",
318 |         "content": "<p>HTML Content</p>",
319 |         "content_format": "storage",
320 |     }
321 |     mock_page_html.content = "<p>HTML Content</p>"
322 |     mock_page_html.content_format = "storage"
323 | 
324 |     mock_confluence_fetcher.get_page_content.return_value = mock_page_html
325 | 
326 |     response = await client.call_tool(
327 |         "confluence_get_page", {"page_id": "123456", "convert_to_markdown": False}
328 |     )
329 | 
330 |     mock_confluence_fetcher.get_page_content.assert_called_once_with(
331 |         "123456", convert_to_markdown=False
332 |     )
333 | 
334 |     result_data = json.loads(response[0].text)
335 |     assert "metadata" in result_data
336 |     assert result_data["metadata"]["title"] == "Test Page HTML"
337 |     assert result_data["metadata"]["content"] == "<p>HTML Content</p>"
338 |     assert result_data["metadata"]["content_format"] == "storage"
339 | 
340 | 
341 | @pytest.mark.anyio
342 | async def test_get_page_children(client, mock_confluence_fetcher):
343 |     """Test the get_page_children tool."""
344 |     response = await client.call_tool(
345 |         "confluence_get_page_children", {"parent_id": "123456"}
346 |     )
347 | 
348 |     mock_confluence_fetcher.get_page_children.assert_called_once()
349 |     call_kwargs = mock_confluence_fetcher.get_page_children.call_args.kwargs
350 |     assert call_kwargs["page_id"] == "123456"
351 |     assert call_kwargs.get("start") == 0
352 |     assert call_kwargs.get("limit") == 25
353 |     assert call_kwargs.get("expand") == "version"
354 | 
355 |     result_data = json.loads(response[0].text)
356 |     assert "parent_id" in result_data
357 |     assert "results" in result_data
358 |     assert len(result_data["results"]) > 0
359 |     assert result_data["results"][0]["title"] == "Test Page Mock Title"
360 | 
361 | 
362 | @pytest.mark.anyio
363 | async def test_get_comments(client, mock_confluence_fetcher):
364 |     """Test retrieving page comments."""
365 |     response = await client.call_tool("confluence_get_comments", {"page_id": "123456"})
366 | 
367 |     mock_confluence_fetcher.get_page_comments.assert_called_once_with("123456")
368 | 
369 |     result_data = json.loads(response[0].text)
370 |     assert isinstance(result_data, list)
371 |     assert len(result_data) > 0
372 |     assert result_data[0]["author"] == "Test User"
373 | 
374 | 
375 | @pytest.mark.anyio
376 | async def test_add_comment(client, mock_confluence_fetcher):
377 |     """Test adding a comment to a Confluence page."""
378 |     response = await client.call_tool(
379 |         "confluence_add_comment",
380 |         {"page_id": "123456", "content": "Test comment content"},
381 |     )
382 | 
383 |     mock_confluence_fetcher.add_comment.assert_called_once_with(
384 |         page_id="123456", content="Test comment content"
385 |     )
386 | 
387 |     result_data = json.loads(response[0].text)
388 |     assert isinstance(result_data, dict)
389 |     assert result_data["success"] is True
390 |     assert "comment" in result_data
391 |     assert result_data["comment"]["id"] == "987"
392 |     assert result_data["comment"]["author"] == "Test User"
393 |     assert result_data["comment"]["body"] == "This is a test comment added via API"
394 |     assert result_data["comment"]["created"] == "2023-08-01T13:00:00.000Z"
395 | 
396 | 
397 | @pytest.mark.anyio
398 | async def test_get_labels(client, mock_confluence_fetcher):
399 |     """Test retrieving page labels."""
400 |     response = await client.call_tool("confluence_get_labels", {"page_id": "123456"})
401 |     mock_confluence_fetcher.get_page_labels.assert_called_once_with("123456")
402 |     result_data = json.loads(response[0].text)
403 |     assert isinstance(result_data, list)
404 |     assert result_data[0]["name"] == "test-label"
405 | 
406 | 
407 | @pytest.mark.anyio
408 | async def test_add_label(client, mock_confluence_fetcher):
409 |     """Test adding a label to a page."""
410 |     response = await client.call_tool(
411 |         "confluence_add_label", {"page_id": "123456", "name": "new-label"}
412 |     )
413 |     mock_confluence_fetcher.add_page_label.assert_called_once_with(
414 |         "123456", "new-label"
415 |     )
416 |     result_data = json.loads(response[0].text)
417 |     assert isinstance(result_data, list)
418 |     assert result_data[0]["name"] == "test-label"
419 | 
420 | 
421 | @pytest.mark.anyio
422 | async def test_search_user(client, mock_confluence_fetcher):
423 |     """Test the search_user tool with CQL query."""
424 |     response = await client.call_tool(
425 |         "confluence_search_user", {"query": 'user.fullname ~ "First Last"', "limit": 10}
426 |     )
427 | 
428 |     mock_confluence_fetcher.search_user.assert_called_once_with(
429 |         'user.fullname ~ "First Last"', limit=10
430 |     )
431 | 
432 |     result_data = json.loads(response[0].text)
433 |     assert isinstance(result_data, list)
434 |     assert len(result_data) == 1
435 |     assert result_data[0]["entity_type"] == "user"
436 |     assert result_data[0]["title"] == "First Last"
437 |     assert result_data[0]["user"]["account_id"] == "a031248587011jasoidf9832jd8j1"
438 |     assert result_data[0]["user"]["display_name"] == "First Last"
439 | 
440 | 
441 | @pytest.mark.anyio
442 | async def test_create_page_with_numeric_parent_id(client, mock_confluence_fetcher):
443 |     """Test creating a page with numeric parent_id (integer) - should convert to string."""
444 |     response = await client.call_tool(
445 |         "confluence_create_page",
446 |         {
447 |             "space_key": "TEST",
448 |             "title": "Test Page",
449 |             "content": "Test content",
450 |             "parent_id": 123456789,  # Numeric ID as integer
451 |         },
452 |     )
453 | 
454 |     # Verify the parent_id was converted to string when calling the underlying method
455 |     mock_confluence_fetcher.create_page.assert_called_once()
456 |     call_kwargs = mock_confluence_fetcher.create_page.call_args.kwargs
457 |     assert call_kwargs["parent_id"] == "123456789"  # Should be string
458 |     assert call_kwargs["space_key"] == "TEST"
459 |     assert call_kwargs["title"] == "Test Page"
460 | 
461 |     result_data = json.loads(response[0].text)
462 |     assert result_data["message"] == "Page created successfully"
463 |     assert result_data["page"]["title"] == "Test Page Mock Title"
464 | 
465 | 
466 | @pytest.mark.anyio
467 | async def test_create_page_with_string_parent_id(client, mock_confluence_fetcher):
468 |     """Test creating a page with string parent_id - should remain unchanged."""
469 |     response = await client.call_tool(
470 |         "confluence_create_page",
471 |         {
472 |             "space_key": "TEST",
473 |             "title": "Test Page",
474 |             "content": "Test content",
475 |             "parent_id": "123456789",  # String ID
476 |         },
477 |     )
478 | 
479 |     mock_confluence_fetcher.create_page.assert_called_once()
480 |     call_kwargs = mock_confluence_fetcher.create_page.call_args.kwargs
481 |     assert call_kwargs["parent_id"] == "123456789"  # Should remain string
482 |     assert call_kwargs["space_key"] == "TEST"
483 |     assert call_kwargs["title"] == "Test Page"
484 | 
485 |     result_data = json.loads(response[0].text)
486 |     assert result_data["message"] == "Page created successfully"
487 |     assert result_data["page"]["title"] == "Test Page Mock Title"
488 | 
489 | 
490 | @pytest.mark.anyio
491 | async def test_update_page_with_numeric_parent_id(client, mock_confluence_fetcher):
492 |     """Test updating a page with numeric parent_id (integer) - should convert to string."""
493 |     response = await client.call_tool(
494 |         "confluence_update_page",
495 |         {
496 |             "page_id": "999999",
497 |             "title": "Updated Page",
498 |             "content": "Updated content",
499 |             "parent_id": 123456789,  # Numeric ID as integer
500 |         },
501 |     )
502 | 
503 |     mock_confluence_fetcher.update_page.assert_called_once()
504 |     call_kwargs = mock_confluence_fetcher.update_page.call_args.kwargs
505 |     assert call_kwargs["parent_id"] == "123456789"  # Should be string
506 |     assert call_kwargs["page_id"] == "999999"
507 |     assert call_kwargs["title"] == "Updated Page"
508 | 
509 |     result_data = json.loads(response[0].text)
510 |     assert result_data["message"] == "Page updated successfully"
511 |     assert result_data["page"]["title"] == "Test Page Mock Title"
512 | 
513 | 
514 | @pytest.mark.anyio
515 | async def test_update_page_with_string_parent_id(client, mock_confluence_fetcher):
516 |     """Test updating a page with string parent_id - should remain unchanged."""
517 |     response = await client.call_tool(
518 |         "confluence_update_page",
519 |         {
520 |             "page_id": "999999",
521 |             "title": "Updated Page",
522 |             "content": "Updated content",
523 |             "parent_id": "123456789",  # String ID
524 |         },
525 |     )
526 | 
527 |     mock_confluence_fetcher.update_page.assert_called_once()
528 |     call_kwargs = mock_confluence_fetcher.update_page.call_args.kwargs
529 |     assert call_kwargs["parent_id"] == "123456789"  # Should remain string
530 |     assert call_kwargs["page_id"] == "999999"
531 |     assert call_kwargs["title"] == "Updated Page"
532 | 
533 |     result_data = json.loads(response[0].text)
534 |     assert result_data["message"] == "Page updated successfully"
535 |     assert result_data["page"]["title"] == "Test Page Mock Title"
536 | 
```

--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/fields.py:
--------------------------------------------------------------------------------

```python
  1 | """Module for Jira field operations."""
  2 | 
  3 | import logging
  4 | from typing import Any
  5 | 
  6 | from thefuzz import fuzz
  7 | 
  8 | from .client import JiraClient
  9 | from .protocols import EpicOperationsProto, UsersOperationsProto
 10 | 
 11 | logger = logging.getLogger("mcp-jira")
 12 | 
 13 | 
 14 | class FieldsMixin(JiraClient, EpicOperationsProto, UsersOperationsProto):
 15 |     """Mixin for Jira field operations.
 16 | 
 17 |     This mixin provides methods for discovering, caching, and working with Jira fields.
 18 |     Field IDs in Jira are crucial for many operations since they can differ across
 19 |     different Jira instances, especially for custom fields.
 20 |     """
 21 | 
 22 |     _field_name_to_id_map: dict[str, str] | None = None  # Cache for name -> id mapping
 23 | 
 24 |     def get_fields(self, refresh: bool = False) -> list[dict[str, Any]]:
 25 |         """
 26 |         Get all available fields from Jira.
 27 | 
 28 |         Args:
 29 |             refresh: When True, forces a refresh from the server instead of using cache
 30 | 
 31 |         Returns:
 32 |             List of field definitions
 33 |         """
 34 |         try:
 35 |             # Use cached field data if available and refresh is not requested
 36 |             if self._field_ids_cache is not None and not refresh:
 37 |                 return self._field_ids_cache
 38 | 
 39 |             if refresh:
 40 |                 self._field_name_to_id_map = (
 41 |                     None  # Clear name map cache if refreshing fields
 42 |                 )
 43 | 
 44 |             # Fetch fields from Jira API
 45 |             fields = self.jira.get_all_fields()
 46 |             if not isinstance(fields, list):
 47 |                 msg = f"Unexpected return value type from `jira.get_all_fields`: {type(fields)}"
 48 |                 logger.error(msg)
 49 |                 raise TypeError(msg)
 50 | 
 51 |             # Cache the fields
 52 |             self._field_ids_cache = fields
 53 | 
 54 |             # Regenerate the name map upon fetching new fields
 55 |             self._generate_field_map(force_regenerate=True)
 56 | 
 57 |             # Log available fields for debugging
 58 |             self._log_available_fields(fields)
 59 | 
 60 |             return fields
 61 | 
 62 |         except Exception as e:
 63 |             logger.error(f"Error getting Jira fields: {str(e)}")
 64 |             return []
 65 | 
 66 |     def _generate_field_map(self, force_regenerate: bool = False) -> dict[str, str]:
 67 |         """Generates and caches a map of lowercase field names to field IDs."""
 68 |         if self._field_name_to_id_map is not None and not force_regenerate:
 69 |             return self._field_name_to_id_map
 70 | 
 71 |         # Ensure fields are loaded into cache first
 72 |         fields = (
 73 |             self.get_fields()
 74 |         )  # Uses cache if available unless force_regenerate was True
 75 |         if not fields:
 76 |             self._field_name_to_id_map = {}
 77 |             return {}
 78 | 
 79 |         name_map: dict[str, str] = {}
 80 |         id_map: dict[str, str] = {}  # Also map ID to ID for consistency
 81 |         for field in fields:
 82 |             field_id = field.get("id")
 83 |             field_name = field.get("name")
 84 |             if field_id:
 85 |                 id_map[field_id] = field_id  # Map ID to itself
 86 |                 if field_name:
 87 |                     # Store lowercase name -> ID. Handle potential name collisions if necessary.
 88 |                     name_map.setdefault(field_name.lower(), field_id)
 89 | 
 90 |         # Combine maps, ensuring IDs can also be looked up directly
 91 |         self._field_name_to_id_map = name_map | id_map
 92 |         logger.debug(
 93 |             f"Generated/Updated field name map: {len(self._field_name_to_id_map)} entries"
 94 |         )
 95 |         return self._field_name_to_id_map
 96 | 
 97 |     def get_field_id(self, field_name: str, refresh: bool = False) -> str | None:
 98 |         """
 99 |         Get the ID for a specific field by name.
100 | 
101 |         Args:
102 |             field_name: The name of the field to look for (case-insensitive)
103 |             refresh: When True, forces a refresh from the server
104 | 
105 |         Returns:
106 |             Field ID if found, None otherwise
107 |         """
108 |         try:
109 |             # Ensure the map is generated/cached
110 |             field_map = self._generate_field_map(force_regenerate=refresh)
111 |             if not field_map:
112 |                 logger.error("Field map could not be generated.")
113 |                 return None
114 | 
115 |             normalized_name = field_name.lower()
116 |             if normalized_name in field_map:
117 |                 return field_map[normalized_name]
118 |             # Fallback: Check if the input IS an ID (using original casing)
119 |             elif field_name in field_map:  # Checks the id_map part
120 |                 return field_map[field_name]
121 |             else:
122 |                 logger.warning(f"Field '{field_name}' not found in generated map.")
123 |                 return None
124 | 
125 |         except Exception as e:
126 |             logger.error(f"Error getting field ID for '{field_name}': {str(e)}")
127 |             return None
128 | 
129 |     def get_field_by_id(
130 |         self, field_id: str, refresh: bool = False
131 |     ) -> dict[str, Any] | None:
132 |         """
133 |         Get field definition by ID.
134 | 
135 |         Args:
136 |             field_id: The ID of the field to look for
137 |             refresh: When True, forces a refresh from the server
138 | 
139 |         Returns:
140 |             Field definition if found, None otherwise
141 |         """
142 |         try:
143 |             fields = self.get_fields(refresh=refresh)
144 | 
145 |             for field in fields:
146 |                 if field.get("id") == field_id:
147 |                     return field
148 | 
149 |             logger.warning(f"Field with ID '{field_id}' not found")
150 |             return None
151 | 
152 |         except Exception as e:
153 |             logger.error(f"Error getting field by ID '{field_id}': {str(e)}")
154 |             return None
155 | 
156 |     def get_custom_fields(self, refresh: bool = False) -> list[dict[str, Any]]:
157 |         """
158 |         Get all custom fields.
159 | 
160 |         Args:
161 |             refresh: When True, forces a refresh from the server
162 | 
163 |         Returns:
164 |             List of custom field definitions
165 |         """
166 |         try:
167 |             fields = self.get_fields(refresh=refresh)
168 |             custom_fields = [
169 |                 field
170 |                 for field in fields
171 |                 if field.get("id", "").startswith("customfield_")
172 |             ]
173 | 
174 |             return custom_fields
175 | 
176 |         except Exception as e:
177 |             logger.error(f"Error getting custom fields: {str(e)}")
178 |             return []
179 | 
180 |     def get_required_fields(self, issue_type: str, project_key: str) -> dict[str, Any]:
181 |         """
182 |         Get required fields for creating an issue of a specific type in a project.
183 | 
184 |         Args:
185 |             issue_type: The issue type (e.g., 'Bug', 'Story', 'Epic')
186 |             project_key: The project key (e.g., 'PROJ')
187 | 
188 |         Returns:
189 |             Dictionary mapping required field names to their definitions
190 |         """
191 |         # Initialize cache if it doesn't exist
192 |         if not hasattr(self, "_required_fields_cache"):
193 |             self._required_fields_cache = {}
194 | 
195 |         # Check cache first
196 |         cache_key = (project_key, issue_type)
197 |         if cache_key in self._required_fields_cache:
198 |             logger.debug(
199 |                 f"Returning cached required fields for {issue_type} in {project_key}"
200 |             )
201 |             return self._required_fields_cache[cache_key]
202 | 
203 |         try:
204 |             # Step 1: Get the ID for the given issue type name within the project
205 |             if not hasattr(self, "get_project_issue_types"):
206 |                 logger.error(
207 |                     "get_project_issue_types method not available. Cannot resolve issue type ID."
208 |                 )
209 |                 return {}
210 | 
211 |             all_issue_types = self.get_project_issue_types(project_key)
212 |             issue_type_id = None
213 |             for it in all_issue_types:
214 |                 if it.get("name", "").lower() == issue_type.lower():
215 |                     issue_type_id = it.get("id")
216 |                     break
217 | 
218 |             if not issue_type_id:
219 |                 logger.warning(
220 |                     f"Issue type '{issue_type}' not found in project '{project_key}'"
221 |                 )
222 |                 return {}
223 | 
224 |             # Step 2: Call the correct API method to get field metadata
225 |             meta = self.jira.issue_createmeta_fieldtypes(
226 |                 project=project_key, issue_type_id=issue_type_id
227 |             )
228 | 
229 |             required_fields = {}
230 |             # Step 3: Parse the response and extract required fields
231 |             if isinstance(meta, dict) and "fields" in meta:
232 |                 if isinstance(meta["fields"], list):
233 |                     for field_meta in meta["fields"]:
234 |                         if isinstance(field_meta, dict) and field_meta.get(
235 |                             "required", False
236 |                         ):
237 |                             field_id = field_meta.get("fieldId")
238 |                             if field_id:
239 |                                 required_fields[field_id] = field_meta
240 |                 else:
241 |                     logger.warning(
242 |                         "Unexpected format for 'fields' in createmeta response."
243 |                     )
244 | 
245 |             if not required_fields:
246 |                 logger.warning(
247 |                     f"No required fields found for issue type '{issue_type}' "
248 |                     f"in project '{project_key}'"
249 |                 )
250 | 
251 |             # Cache the result before returning
252 |             self._required_fields_cache[cache_key] = required_fields
253 |             logger.debug(
254 |                 f"Cached required fields for {issue_type} in {project_key}: "
255 |                 f"{len(required_fields)} fields"
256 |             )
257 | 
258 |             return required_fields
259 | 
260 |         except Exception as e:
261 |             logger.error(
262 |                 f"Error getting required fields for issue type '{issue_type}' "
263 |                 f"in project '{project_key}': {str(e)}"
264 |             )
265 |             return {}
266 | 
267 |     def get_field_ids_to_epic(self) -> dict[str, str]:
268 |         """
269 |         Dynamically discover Jira field IDs relevant to Epic linking.
270 |         This method queries the Jira API to find the correct custom field IDs
271 |         for Epic-related fields, which can vary between different Jira instances.
272 | 
273 |         Returns:
274 |             Dictionary mapping field names to their IDs
275 |             (e.g., {'epic_link': 'customfield_10014', 'epic_name': 'customfield_10011'})
276 |         """
277 |         try:
278 |             # Ensure field list and map are cached/generated
279 |             self._generate_field_map()  # Generates map and ensures fields are cached
280 | 
281 |             # Get all fields (uses cache if available)
282 |             fields = self.get_fields()
283 |             if not fields:  # Check if get_fields failed or returned empty
284 |                 logger.error(
285 |                     "Could not load field definitions for epic field discovery."
286 |                 )
287 |                 return {}
288 | 
289 |             field_ids = {}
290 | 
291 |             # Log the complete list of fields for debugging
292 |             all_field_names = [field.get("name", "").lower() for field in fields]
293 |             logger.debug(f"All field names: {all_field_names}")
294 | 
295 |             # Enhanced logging for debugging
296 |             custom_fields = {
297 |                 field.get("id", ""): field.get("name", "")
298 |                 for field in fields
299 |                 if field.get("id", "").startswith("customfield_")
300 |             }
301 |             logger.debug(f"Custom fields: {custom_fields}")
302 | 
303 |             # Look for Epic-related fields - use multiple strategies to identify them
304 |             for field in fields:
305 |                 field_name = field.get("name", "").lower()
306 |                 original_name = field.get("name", "")
307 |                 field_id = field.get("id", "")
308 |                 field_schema = field.get("schema", {})
309 |                 field_custom = field_schema.get("custom", "")
310 | 
311 |                 if original_name and field_id:
312 |                     field_ids[original_name] = field_id
313 | 
314 |                 # Epic Link field - used to link issues to epics
315 |                 if (
316 |                     field_name == "epic link"
317 |                     or field_name == "epic"
318 |                     or "epic link" in field_name
319 |                     or field_custom == "com.pyxis.greenhopper.jira:gh-epic-link"
320 |                     or field_id == "customfield_10014"
321 |                 ):  # Common in Jira Cloud
322 |                     field_ids["epic_link"] = field_id
323 |                     # For backward compatibility
324 |                     field_ids["Epic Link"] = field_id
325 |                     logger.debug(f"Found Epic Link field: {field_id} ({original_name})")
326 | 
327 |                 # Epic Name field - used when creating epics
328 |                 elif (
329 |                     field_name == "epic name"
330 |                     or field_name == "epic title"
331 |                     or "epic name" in field_name
332 |                     or field_custom == "com.pyxis.greenhopper.jira:gh-epic-label"
333 |                     or field_id == "customfield_10011"
334 |                 ):  # Common in Jira Cloud
335 |                     field_ids["epic_name"] = field_id
336 |                     # For backward compatibility
337 |                     field_ids["Epic Name"] = field_id
338 |                     logger.debug(f"Found Epic Name field: {field_id} ({original_name})")
339 | 
340 |                 # Epic Status field
341 |                 elif (
342 |                     field_name == "epic status"
343 |                     or "epic status" in field_name
344 |                     or field_custom == "com.pyxis.greenhopper.jira:gh-epic-status"
345 |                 ):
346 |                     field_ids["epic_status"] = field_id
347 |                     logger.debug(
348 |                         f"Found Epic Status field: {field_id} ({original_name})"
349 |                     )
350 | 
351 |                 # Epic Color field
352 |                 elif (
353 |                     field_name == "epic color"
354 |                     or field_name == "epic colour"
355 |                     or "epic color" in field_name
356 |                     or "epic colour" in field_name
357 |                     or field_custom == "com.pyxis.greenhopper.jira:gh-epic-color"
358 |                 ):
359 |                     field_ids["epic_color"] = field_id
360 |                     logger.debug(
361 |                         f"Found Epic Color field: {field_id} ({original_name})"
362 |                     )
363 | 
364 |                 # Parent field - sometimes used instead of Epic Link
365 |                 elif (
366 |                     field_name == "parent"
367 |                     or field_name == "parent issue"
368 |                     or "parent issue" in field_name
369 |                 ):
370 |                     field_ids["parent"] = field_id
371 |                     logger.debug(f"Found Parent field: {field_id} ({original_name})")
372 | 
373 |                 # Try to detect any other fields that might be related to Epics
374 |                 elif "epic" in field_name and field_id.startswith("customfield_"):
375 |                     key = f"epic_{field_name.replace(' ', '_').replace('-', '_')}"
376 |                     field_ids[key] = field_id
377 |                     logger.debug(
378 |                         f"Found potential Epic-related field: {field_id} ({original_name})"
379 |                     )
380 | 
381 |             # If we couldn't find certain key fields, try alternative approaches
382 |             if "epic_name" not in field_ids or "epic_link" not in field_ids:
383 |                 logger.debug(
384 |                     "Standard field search didn't find all Epic fields, trying alternative approaches"
385 |                 )
386 |                 self._try_discover_fields_from_existing_epic(field_ids)
387 | 
388 |             logger.debug(f"Discovered field IDs: {field_ids}")
389 | 
390 |             return field_ids
391 | 
392 |         except Exception as e:
393 |             logger.error(f"Error discovering Jira field IDs: {str(e)}")
394 |             # Return an empty dict as fallback
395 |             return {}
396 | 
397 |     def _log_available_fields(self, fields: list[dict]) -> None:
398 |         """
399 |         Log available fields for debugging.
400 | 
401 |         Args:
402 |             fields: List of field definitions
403 |         """
404 |         logger.debug("Available Jira fields:")
405 |         for field in fields:
406 |             field_id = field.get("id", "")
407 |             name = field.get("name", "")
408 |             field_type = field.get("schema", {}).get("type", "")
409 |             logger.debug(f"{field_id}: {name} ({field_type})")
410 | 
411 |     def is_custom_field(self, field_id: str) -> bool:
412 |         """
413 |         Check if a field is a custom field.
414 | 
415 |         Args:
416 |             field_id: The field ID to check
417 | 
418 |         Returns:
419 |             True if it's a custom field, False otherwise
420 |         """
421 |         return field_id.startswith("customfield_")
422 | 
423 |     def format_field_value(self, field_id: str, value: Any) -> Any:
424 |         """
425 |         Format a field value based on its type for update operations.
426 | 
427 |         Different field types in Jira require different JSON formats when updating.
428 |         This method helps format the value correctly for the specific field type.
429 | 
430 |         Args:
431 |             field_id: The ID of the field
432 |             value: The value to format
433 | 
434 |         Returns:
435 |             Properly formatted value for the field
436 |         """
437 |         try:
438 |             # Get field definition
439 |             field = self.get_field_by_id(field_id)
440 | 
441 |             if not field:
442 |                 # For unknown fields, return value as-is
443 |                 return value
444 | 
445 |             field_type = field.get("schema", {}).get("type")
446 | 
447 |             # Format based on field type
448 |             if field_type == "user":
449 |                 # Handle user fields - need accountId for cloud or name for server
450 |                 if isinstance(value, str):
451 |                     try:
452 |                         account_id = self._get_account_id(value)
453 |                         return {"accountId": account_id}
454 |                     except Exception as e:
455 |                         logger.warning(f"Could not resolve user '{value}': {str(e)}")
456 |                         return value
457 |                 else:
458 |                     return value
459 | 
460 |             elif field_type == "array":
461 |                 # Handle array fields - convert single value to list if needed
462 |                 if not isinstance(value, list):
463 |                     return [value]
464 |                 return value
465 | 
466 |             elif field_type == "option":
467 |                 # Handle option fields - convert to {"value": value} format
468 |                 if isinstance(value, str):
469 |                     return {"value": value}
470 |                 return value
471 | 
472 |             # For other types, return as-is
473 |             return value
474 | 
475 |         except Exception as e:
476 |             logger.warning(f"Error formatting field value for '{field_id}': {str(e)}")
477 |             return value
478 | 
479 |     def search_fields(
480 |         self, keyword: str, limit: int = 10, *, refresh: bool = False
481 |     ) -> list[dict[str, Any]]:
482 |         """
483 |         Search fields using fuzzy matching.
484 | 
485 |         Args:
486 |             keyword: The search keyword
487 |             limit: Maximum number of results to return (default: 10)
488 |             refresh: When True, forces a refresh from the server
489 | 
490 |         Returns:
491 |             List of matching field definitions, sorted by relevance
492 |         """
493 |         try:
494 |             # Get all fields
495 |             fields = self.get_fields(refresh=refresh)
496 | 
497 |             # if keyword is empty, return `limit` fields
498 |             if not keyword:
499 |                 return fields[:limit]
500 | 
501 |             def similarity(keyword: str, field: dict) -> int:
502 |                 """Calculate similarity score between keyword and field."""
503 |                 name_candidates = [
504 |                     field.get("id", ""),
505 |                     field.get("key", ""),
506 |                     field.get("name", ""),
507 |                     *field.get("clauseNames", []),
508 |                 ]
509 | 
510 |                 # Calculate the fuzzy match score
511 |                 return max(
512 |                     fuzz.partial_ratio(keyword.lower(), name.lower())
513 |                     for name in name_candidates
514 |                 )
515 | 
516 |             # Sort by similarity
517 |             sorted_fields = sorted(
518 |                 fields, key=lambda x: similarity(keyword, x), reverse=True
519 |             )
520 | 
521 |             # Return the top limit results
522 |             return sorted_fields[:limit]
523 | 
524 |         except Exception as e:
525 |             logger.error(f"Error searching fields: {str(e)}")
526 |             return []
527 | 
```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_search.py:
--------------------------------------------------------------------------------

```python
  1 | """Unit tests for the SearchMixin class."""
  2 | 
  3 | from unittest.mock import MagicMock, patch
  4 | 
  5 | import pytest
  6 | import requests
  7 | from requests import HTTPError
  8 | 
  9 | from mcp_atlassian.confluence.search import SearchMixin
 10 | from mcp_atlassian.confluence.utils import quote_cql_identifier_if_needed
 11 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
 12 | 
 13 | 
 14 | class TestSearchMixin:
 15 |     """Tests for the SearchMixin class."""
 16 | 
 17 |     @pytest.fixture
 18 |     def search_mixin(self, confluence_client):
 19 |         """Create a SearchMixin instance for testing."""
 20 |         # SearchMixin inherits from ConfluenceClient, so we need to create it properly
 21 |         with patch(
 22 |             "mcp_atlassian.confluence.search.ConfluenceClient.__init__"
 23 |         ) as mock_init:
 24 |             mock_init.return_value = None
 25 |             mixin = SearchMixin()
 26 |             # Copy the necessary attributes from our mocked client
 27 |             mixin.confluence = confluence_client.confluence
 28 |             mixin.config = confluence_client.config
 29 |             mixin.preprocessor = confluence_client.preprocessor
 30 |             return mixin
 31 | 
 32 |     def test_search_success(self, search_mixin):
 33 |         """Test search with successful results."""
 34 |         # Prepare the mock
 35 |         search_mixin.confluence.cql.return_value = {
 36 |             "results": [
 37 |                 {
 38 |                     "content": {
 39 |                         "id": "123456789",
 40 |                         "title": "Test Page",
 41 |                         "type": "page",
 42 |                         "space": {"key": "SPACE", "name": "Test Space"},
 43 |                         "version": {"number": 1},
 44 |                     },
 45 |                     "excerpt": "Test content excerpt",
 46 |                     "url": "https://confluence.example.com/pages/123456789",
 47 |                 }
 48 |             ]
 49 |         }
 50 | 
 51 |         # Mock the preprocessor to return processed content
 52 |         search_mixin.preprocessor.process_html_content.return_value = (
 53 |             "<p>Processed HTML</p>",
 54 |             "Processed content",
 55 |         )
 56 | 
 57 |         # Call the method
 58 |         result = search_mixin.search("test query")
 59 | 
 60 |         # Verify API call
 61 |         search_mixin.confluence.cql.assert_called_once_with(cql="test query", limit=10)
 62 | 
 63 |         # Verify result
 64 |         assert len(result) == 1
 65 |         assert result[0].id == "123456789"
 66 |         assert result[0].title == "Test Page"
 67 |         assert result[0].content == "Processed content"
 68 | 
 69 |     def test_search_with_empty_results(self, search_mixin):
 70 |         """Test handling of empty search results."""
 71 |         # Mock an empty result set
 72 |         search_mixin.confluence.cql.return_value = {"results": []}
 73 | 
 74 |         # Act
 75 |         results = search_mixin.search("empty query")
 76 | 
 77 |         # Assert
 78 |         assert isinstance(results, list)
 79 |         assert len(results) == 0
 80 | 
 81 |     def test_search_with_non_page_content(self, search_mixin):
 82 |         """Test handling of non-page content in search results."""
 83 |         # Mock search results with non-page content
 84 |         search_mixin.confluence.cql.return_value = {
 85 |             "results": [
 86 |                 {
 87 |                     "content": {"type": "blogpost", "id": "12345"},
 88 |                     "title": "Blog Post",
 89 |                     "excerpt": "This is a blog post",
 90 |                     "url": "/pages/12345",
 91 |                     "resultGlobalContainer": {"title": "TEST"},
 92 |                 }
 93 |             ]
 94 |         }
 95 | 
 96 |         # Act
 97 |         results = search_mixin.search("blogpost query")
 98 | 
 99 |         # Assert
100 |         assert isinstance(results, list)
101 |         # The method should still handle them as pages since we're using models
102 |         assert len(results) > 0
103 | 
104 |     def test_search_key_error(self, search_mixin):
105 |         """Test handling of KeyError in search results."""
106 |         # Mock a response missing required keys
107 |         search_mixin.confluence.cql.return_value = {"incomplete": "data"}
108 | 
109 |         # Act
110 |         results = search_mixin.search("invalid query")
111 | 
112 |         # Assert
113 |         assert isinstance(results, list)
114 |         assert len(results) == 0
115 | 
116 |     def test_search_request_exception(self, search_mixin):
117 |         """Test handling of RequestException during search."""
118 |         # Mock a network error
119 |         search_mixin.confluence.cql.side_effect = requests.RequestException("API error")
120 | 
121 |         # Act
122 |         results = search_mixin.search("error query")
123 | 
124 |         # Assert
125 |         assert isinstance(results, list)
126 |         assert len(results) == 0
127 | 
128 |     def test_search_value_error(self, search_mixin):
129 |         """Test handling of ValueError during search."""
130 |         # Mock a value error
131 |         search_mixin.confluence.cql.side_effect = ValueError("Value error")
132 | 
133 |         # Act
134 |         results = search_mixin.search("error query")
135 | 
136 |         # Assert
137 |         assert isinstance(results, list)
138 |         assert len(results) == 0
139 | 
140 |     def test_search_type_error(self, search_mixin):
141 |         """Test handling of TypeError during search."""
142 |         # Mock a type error
143 |         search_mixin.confluence.cql.side_effect = TypeError("Type error")
144 | 
145 |         # Act
146 |         results = search_mixin.search("error query")
147 | 
148 |         # Assert
149 |         assert isinstance(results, list)
150 |         assert len(results) == 0
151 | 
152 |     def test_search_with_spaces_filter(self, search_mixin):
153 |         """Test searching with spaces filter from parameter."""
154 |         # Prepare the mock
155 |         search_mixin.confluence.cql.return_value = {
156 |             "results": [
157 |                 {
158 |                     "content": {
159 |                         "id": "123456789",
160 |                         "title": "Test Page",
161 |                         "type": "page",
162 |                         "space": {"key": "SPACE", "name": "Test Space"},
163 |                         "version": {"number": 1},
164 |                     },
165 |                     "excerpt": "Test content excerpt",
166 |                     "url": "https://confluence.example.com/pages/123456789",
167 |                 }
168 |             ]
169 |         }
170 | 
171 |         # Mock the preprocessor
172 |         search_mixin.preprocessor.process_html_content.return_value = (
173 |             "<p>Processed HTML</p>",
174 |             "Processed content",
175 |         )
176 | 
177 |         # Test with single space filter
178 |         result = search_mixin.search("test query", spaces_filter="DEV")
179 | 
180 |         # Verify space was properly quoted in the CQL query
181 |         quoted_dev = quote_cql_identifier_if_needed("DEV")
182 |         search_mixin.confluence.cql.assert_called_with(
183 |             cql=f"(test query) AND (space = {quoted_dev})",
184 |             limit=10,
185 |         )
186 |         assert len(result) == 1
187 | 
188 |         # Test with multiple spaces filter
189 |         result = search_mixin.search("test query", spaces_filter="DEV,TEAM")
190 | 
191 |         # Verify spaces were properly quoted in the CQL query
192 |         quoted_dev = quote_cql_identifier_if_needed("DEV")
193 |         quoted_team = quote_cql_identifier_if_needed("TEAM")
194 |         search_mixin.confluence.cql.assert_called_with(
195 |             cql=f"(test query) AND (space = {quoted_dev} OR space = {quoted_team})",
196 |             limit=10,
197 |         )
198 |         assert len(result) == 1
199 | 
200 |         # Test with filter when query already has space
201 |         result = search_mixin.search('space = "EXISTING"', spaces_filter="DEV")
202 |         search_mixin.confluence.cql.assert_called_with(
203 |             cql='space = "EXISTING"',  # Should not add filter when space already exists
204 |             limit=10,
205 |         )
206 |         assert len(result) == 1
207 | 
208 |     def test_search_with_config_spaces_filter(self, search_mixin):
209 |         """Test search using spaces filter from config."""
210 |         # Prepare the mock
211 |         search_mixin.confluence.cql.return_value = {
212 |             "results": [
213 |                 {
214 |                     "content": {
215 |                         "id": "123456789",
216 |                         "title": "Test Page",
217 |                         "type": "page",
218 |                         "space": {"key": "SPACE", "name": "Test Space"},
219 |                         "version": {"number": 1},
220 |                     },
221 |                     "excerpt": "Test content excerpt",
222 |                     "url": "https://confluence.example.com/pages/123456789",
223 |                 }
224 |             ]
225 |         }
226 | 
227 |         # Mock the preprocessor
228 |         search_mixin.preprocessor.process_html_content.return_value = (
229 |             "<p>Processed HTML</p>",
230 |             "Processed content",
231 |         )
232 | 
233 |         # Set config filter
234 |         search_mixin.config.spaces_filter = "DEV,TEAM"
235 | 
236 |         # Test with config filter
237 |         result = search_mixin.search("test query")
238 | 
239 |         # Verify spaces were properly quoted in the CQL query
240 |         quoted_dev = quote_cql_identifier_if_needed("DEV")
241 |         quoted_team = quote_cql_identifier_if_needed("TEAM")
242 |         search_mixin.confluence.cql.assert_called_with(
243 |             cql=f"(test query) AND (space = {quoted_dev} OR space = {quoted_team})",
244 |             limit=10,
245 |         )
246 |         assert len(result) == 1
247 | 
248 |         # Test that explicit filter overrides config filter
249 |         result = search_mixin.search("test query", spaces_filter="OVERRIDE")
250 | 
251 |         # Verify space was properly quoted in the CQL query
252 |         quoted_override = quote_cql_identifier_if_needed("OVERRIDE")
253 |         search_mixin.confluence.cql.assert_called_with(
254 |             cql=f"(test query) AND (space = {quoted_override})",
255 |             limit=10,
256 |         )
257 |         assert len(result) == 1
258 | 
259 |     def test_search_general_exception(self, search_mixin):
260 |         """Test handling of general exceptions during search."""
261 |         # Mock a general exception
262 |         search_mixin.confluence.cql.side_effect = Exception("General error")
263 | 
264 |         # Act
265 |         results = search_mixin.search("error query")
266 | 
267 |         # Assert
268 |         assert isinstance(results, list)
269 |         assert len(results) == 0
270 | 
271 |     def test_search_user_success(self, search_mixin):
272 |         """Test search_user with successful results."""
273 |         # Prepare the mock response
274 |         search_mixin.confluence.get.return_value = {
275 |             "results": [
276 |                 {
277 |                     "user": {
278 |                         "type": "known",
279 |                         "accountId": "1234asdf",
280 |                         "accountType": "atlassian",
281 |                         "email": "[email protected]",
282 |                         "publicName": "First Last",
283 |                         "displayName": "First Last",
284 |                         "isExternalCollaborator": False,
285 |                         "profilePicture": {
286 |                             "path": "/wiki/aa-avatar/1234asdf",
287 |                             "width": 48,
288 |                             "height": 48,
289 |                             "isDefault": False,
290 |                         },
291 |                     },
292 |                     "title": "First Last",
293 |                     "excerpt": "",
294 |                     "url": "/people/1234asdf",
295 |                     "entityType": "user",
296 |                     "lastModified": "2025-06-02T13:35:59.680Z",
297 |                     "score": 0.0,
298 |                 }
299 |             ],
300 |             "start": 0,
301 |             "limit": 25,
302 |             "size": 1,
303 |             "totalSize": 1,
304 |             "cqlQuery": "( user.fullname ~ 'First Last' )",
305 |             "searchDuration": 115,
306 |         }
307 | 
308 |         # Call the method
309 |         result = search_mixin.search_user('user.fullname ~ "First Last"')
310 | 
311 |         # Verify API call
312 |         search_mixin.confluence.get.assert_called_once_with(
313 |             "rest/api/search/user",
314 |             params={"cql": 'user.fullname ~ "First Last"', "limit": 10},
315 |         )
316 | 
317 |         # Verify result
318 |         assert len(result) == 1
319 |         assert result[0].user.account_id == "1234asdf"
320 |         assert result[0].user.display_name == "First Last"
321 |         assert result[0].user.email == "[email protected]"
322 |         assert result[0].title == "First Last"
323 |         assert result[0].entity_type == "user"
324 | 
325 |     def test_search_user_with_empty_results(self, search_mixin):
326 |         """Test search_user with empty results."""
327 |         # Mock an empty result set
328 |         search_mixin.confluence.get.return_value = {
329 |             "results": [],
330 |             "start": 0,
331 |             "limit": 25,
332 |             "size": 0,
333 |             "totalSize": 0,
334 |             "cqlQuery": 'user.fullname ~ "Nonexistent"',
335 |             "searchDuration": 50,
336 |         }
337 | 
338 |         # Act
339 |         results = search_mixin.search_user('user.fullname ~ "Nonexistent"')
340 | 
341 |         # Assert
342 |         assert isinstance(results, list)
343 |         assert len(results) == 0
344 | 
345 |     def test_search_user_with_custom_limit(self, search_mixin):
346 |         """Test search_user with custom limit."""
347 |         # Prepare the mock response
348 |         search_mixin.confluence.get.return_value = {
349 |             "results": [],
350 |             "start": 0,
351 |             "limit": 5,
352 |             "size": 0,
353 |             "totalSize": 0,
354 |             "cqlQuery": 'user.fullname ~ "Test"',
355 |             "searchDuration": 30,
356 |         }
357 | 
358 |         # Call with custom limit
359 |         search_mixin.search_user('user.fullname ~ "Test"', limit=5)
360 | 
361 |         # Verify API call with correct limit
362 |         search_mixin.confluence.get.assert_called_once_with(
363 |             "rest/api/search/user", params={"cql": 'user.fullname ~ "Test"', "limit": 5}
364 |         )
365 | 
366 |     @pytest.mark.parametrize(
367 |         "exception_type,exception_args,expected_result",
368 |         [
369 |             (requests.RequestException, ("Network error",), []),
370 |             (ValueError, ("Value error",), []),
371 |             (TypeError, ("Type error",), []),
372 |             (Exception, ("General error",), []),
373 |             (KeyError, ("Missing key",), []),
374 |         ],
375 |     )
376 |     def test_search_user_exception_handling(
377 |         self, search_mixin, exception_type, exception_args, expected_result
378 |     ):
379 |         """Test search_user handling of various exceptions that return empty list."""
380 |         # Mock the exception
381 |         search_mixin.confluence.get.side_effect = exception_type(*exception_args)
382 | 
383 |         # Act
384 |         results = search_mixin.search_user('user.fullname ~ "Test"')
385 | 
386 |         # Assert
387 |         assert isinstance(results, list)
388 |         assert results == expected_result
389 | 
390 |     @pytest.mark.parametrize(
391 |         "status_code,exception_type",
392 |         [
393 |             (401, MCPAtlassianAuthenticationError),
394 |             (403, MCPAtlassianAuthenticationError),
395 |         ],
396 |     )
397 |     def test_search_user_http_auth_errors(
398 |         self, search_mixin, status_code, exception_type
399 |     ):
400 |         """Test search_user handling of HTTP authentication errors."""
401 |         # Mock HTTP error
402 |         mock_response = MagicMock()
403 |         mock_response.status_code = status_code
404 |         http_error = HTTPError(f"HTTP {status_code}")
405 |         http_error.response = mock_response
406 |         search_mixin.confluence.get.side_effect = http_error
407 | 
408 |         # Act and assert
409 |         with pytest.raises(exception_type):
410 |             search_mixin.search_user('user.fullname ~ "Test"')
411 | 
412 |     def test_search_user_http_other_error(self, search_mixin):
413 |         """Test search_user handling of other HTTP errors."""
414 |         # Mock HTTP 500 error
415 |         mock_response = MagicMock()
416 |         mock_response.status_code = 500
417 |         http_error = HTTPError("Internal Server Error")
418 |         http_error.response = mock_response
419 |         search_mixin.confluence.get.side_effect = http_error
420 | 
421 |         # Act and assert - should re-raise the HTTPError
422 |         with pytest.raises(HTTPError):
423 |             search_mixin.search_user('user.fullname ~ "Test"')
424 | 
425 |     @pytest.mark.parametrize(
426 |         "mock_response,expected_length",
427 |         [
428 |             ({"incomplete": "data"}, 0),  # KeyError case
429 |             (None, 0),  # None response case
430 |             ({"results": []}, 0),  # Empty results case
431 |         ],
432 |     )
433 |     def test_search_user_edge_cases(self, search_mixin, mock_response, expected_length):
434 |         """Test search_user handling of edge cases in API responses."""
435 |         search_mixin.confluence.get.return_value = mock_response
436 | 
437 |         # Act
438 |         results = search_mixin.search_user('user.fullname ~ "Test"')
439 | 
440 |         # Assert
441 |         assert isinstance(results, list)
442 |         assert len(results) == expected_length
443 | 
444 |     # You can also parametrize the regular search method exception tests:
445 |     @pytest.mark.parametrize(
446 |         "exception_type,exception_args,expected_result",
447 |         [
448 |             (requests.RequestException, ("API error",), []),
449 |             (ValueError, ("Value error",), []),
450 |             (TypeError, ("Type error",), []),
451 |             (Exception, ("General error",), []),
452 |             (KeyError, ("Missing key",), []),
453 |         ],
454 |     )
455 |     def test_search_exception_handling(
456 |         self, search_mixin, exception_type, exception_args, expected_result
457 |     ):
458 |         """Test search handling of various exceptions that return empty list."""
459 |         # Mock the exception
460 |         search_mixin.confluence.cql.side_effect = exception_type(*exception_args)
461 | 
462 |         # Act
463 |         results = search_mixin.search("error query")
464 | 
465 |         # Assert
466 |         assert isinstance(results, list)
467 |         assert results == expected_result
468 | 
469 |     # Parametrize CQL query tests:
470 |     @pytest.mark.parametrize(
471 |         "query,limit,expected_params",
472 |         [
473 |             (
474 |                 'user.fullname ~ "Test"',
475 |                 10,
476 |                 {"cql": 'user.fullname ~ "Test"', "limit": 10},
477 |             ),
478 |             (
479 |                 'user.email ~ "[email protected]"',
480 |                 5,
481 |                 {"cql": 'user.email ~ "[email protected]"', "limit": 5},
482 |             ),
483 |             (
484 |                 'user.fullname ~ "John" AND user.email ~ "@company.com"',
485 |                 15,
486 |                 {
487 |                     "cql": 'user.fullname ~ "John" AND user.email ~ "@company.com"',
488 |                     "limit": 15,
489 |                 },
490 |             ),
491 |         ],
492 |     )
493 |     def test_search_user_api_parameters(
494 |         self, search_mixin, query, limit, expected_params
495 |     ):
496 |         """Test that search_user calls the API with correct parameters."""
497 |         # Mock successful response
498 |         search_mixin.confluence.get.return_value = {
499 |             "results": [],
500 |             "start": 0,
501 |             "limit": limit,
502 |             "totalSize": 0,
503 |         }
504 | 
505 |         # Act
506 |         search_mixin.search_user(query, limit=limit)
507 | 
508 |         # Assert API was called with correct parameters
509 |         search_mixin.confluence.get.assert_called_once_with(
510 |             "rest/api/search/user", params=expected_params
511 |         )
512 | 
513 |     def test_search_user_with_complex_cql_query(self, search_mixin):
514 |         """Test search_user with complex CQL query containing operators."""
515 |         # Mock successful response
516 |         search_mixin.confluence.get.return_value = {
517 |             "results": [],
518 |             "start": 0,
519 |             "limit": 10,
520 |             "totalSize": 0,
521 |         }
522 | 
523 |         complex_query = 'user.fullname ~ "John" AND user.email ~ "@company.com" OR user.displayName ~ "JD"'
524 | 
525 |         # Act
526 |         search_mixin.search_user(complex_query)
527 | 
528 |         # Assert API was called with the exact query
529 |         search_mixin.confluence.get.assert_called_once_with(
530 |             "rest/api/search/user", params={"cql": complex_query, "limit": 10}
531 |         )
532 | 
533 |     def test_search_user_result_processing(self, search_mixin):
534 |         """Test that search_user properly processes and returns user search result objects."""
535 |         # Mock response with user data
536 |         search_mixin.confluence.get.return_value = {
537 |             "results": [
538 |                 {
539 |                     "user": {
540 |                         "accountId": "test-account-id",
541 |                         "displayName": "Test User",
542 |                         "email": "[email protected]",
543 |                         "isExternalCollaborator": False,
544 |                     },
545 |                     "title": "Test User",
546 |                     "entityType": "user",
547 |                     "score": 1.5,
548 |                 }
549 |             ],
550 |             "start": 0,
551 |             "limit": 10,
552 |             "totalSize": 1,
553 |         }
554 | 
555 |         # Act
556 |         results = search_mixin.search_user('user.fullname ~ "Test User"')
557 | 
558 |         # Assert result structure
559 |         assert len(results) == 1
560 |         assert hasattr(results[0], "user")
561 |         assert hasattr(results[0], "title")
562 |         assert hasattr(results[0], "entity_type")
563 |         assert results[0].user.account_id == "test-account-id"
564 |         assert results[0].user.display_name == "Test User"
565 |         assert results[0].title == "Test User"
566 |         assert results[0].entity_type == "user"
567 | 
```

--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/oauth.py:
--------------------------------------------------------------------------------

```python
  1 | """OAuth 2.0 utilities for Atlassian Cloud authentication.
  2 | 
  3 | This module provides utilities for OAuth 2.0 (3LO) authentication with Atlassian Cloud.
  4 | It handles:
  5 | - OAuth configuration
  6 | - Token acquisition, storage, and refresh
  7 | - Session configuration for API clients
  8 | """
  9 | 
 10 | import json
 11 | import logging
 12 | import os
 13 | import pprint
 14 | import time
 15 | import urllib.parse
 16 | from dataclasses import dataclass
 17 | from pathlib import Path
 18 | from typing import Any, Optional
 19 | 
 20 | import keyring
 21 | import requests
 22 | 
 23 | # Configure logging
 24 | logger = logging.getLogger("mcp-atlassian.oauth")
 25 | 
 26 | # Constants
 27 | TOKEN_URL = "https://auth.atlassian.com/oauth/token"  # noqa: S105 - This is a public API endpoint URL, not a password
 28 | AUTHORIZE_URL = "https://auth.atlassian.com/authorize"
 29 | CLOUD_ID_URL = "https://api.atlassian.com/oauth/token/accessible-resources"
 30 | TOKEN_EXPIRY_MARGIN = 300  # 5 minutes in seconds
 31 | KEYRING_SERVICE_NAME = "mcp-atlassian-oauth"
 32 | 
 33 | 
 34 | @dataclass
 35 | class OAuthConfig:
 36 |     """OAuth 2.0 configuration for Atlassian Cloud.
 37 | 
 38 |     This class manages the OAuth configuration and tokens. It handles:
 39 |     - Authentication configuration (client credentials)
 40 |     - Token acquisition and refreshing
 41 |     - Token storage and retrieval
 42 |     - Cloud ID identification
 43 |     """
 44 | 
 45 |     client_id: str
 46 |     client_secret: str
 47 |     redirect_uri: str
 48 |     scope: str
 49 |     cloud_id: str | None = None
 50 |     refresh_token: str | None = None
 51 |     access_token: str | None = None
 52 |     expires_at: float | None = None
 53 | 
 54 |     @property
 55 |     def is_token_expired(self) -> bool:
 56 |         """Check if the access token is expired or will expire soon.
 57 | 
 58 |         Returns:
 59 |             True if the token is expired or will expire soon, False otherwise.
 60 |         """
 61 |         # If we don't have a token or expiry time, consider it expired
 62 |         if not self.access_token or not self.expires_at:
 63 |             return True
 64 | 
 65 |         # Consider the token expired if it will expire within the margin
 66 |         return time.time() + TOKEN_EXPIRY_MARGIN >= self.expires_at
 67 | 
 68 |     def get_authorization_url(self, state: str) -> str:
 69 |         """Get the authorization URL for the OAuth 2.0 flow.
 70 | 
 71 |         Args:
 72 |             state: Random state string for CSRF protection
 73 | 
 74 |         Returns:
 75 |             The authorization URL to redirect the user to.
 76 |         """
 77 |         params = {
 78 |             "audience": "api.atlassian.com",
 79 |             "client_id": self.client_id,
 80 |             "scope": self.scope,
 81 |             "redirect_uri": self.redirect_uri,
 82 |             "response_type": "code",
 83 |             "prompt": "consent",
 84 |             "state": state,
 85 |         }
 86 |         return f"{AUTHORIZE_URL}?{urllib.parse.urlencode(params)}"
 87 | 
 88 |     def exchange_code_for_tokens(self, code: str) -> bool:
 89 |         """Exchange the authorization code for access and refresh tokens.
 90 | 
 91 |         Args:
 92 |             code: The authorization code from the callback
 93 | 
 94 |         Returns:
 95 |             True if tokens were successfully acquired, False otherwise.
 96 |         """
 97 |         try:
 98 |             payload = {
 99 |                 "grant_type": "authorization_code",
100 |                 "client_id": self.client_id,
101 |                 "client_secret": self.client_secret,
102 |                 "code": code,
103 |                 "redirect_uri": self.redirect_uri,
104 |             }
105 | 
106 |             logger.info(f"Exchanging authorization code for tokens at {TOKEN_URL}")
107 |             logger.debug(f"Token exchange payload: {pprint.pformat(payload)}")
108 | 
109 |             response = requests.post(TOKEN_URL, data=payload)
110 | 
111 |             # Log more details about the response
112 |             logger.debug(f"Token exchange response status: {response.status_code}")
113 |             logger.debug(
114 |                 f"Token exchange response headers: {pprint.pformat(response.headers)}"
115 |             )
116 |             logger.debug(f"Token exchange response body: {response.text[:500]}...")
117 | 
118 |             if not response.ok:
119 |                 logger.error(
120 |                     f"Token exchange failed with status {response.status_code}. Response: {response.text}"
121 |                 )
122 |                 return False
123 | 
124 |             # Parse the response
125 |             token_data = response.json()
126 | 
127 |             # Check if required tokens are present
128 |             if "access_token" not in token_data:
129 |                 logger.error(
130 |                     f"Access token not found in response. Keys found: {list(token_data.keys())}"
131 |                 )
132 |                 return False
133 | 
134 |             if "refresh_token" not in token_data:
135 |                 logger.error(
136 |                     "Refresh token not found in response. Ensure 'offline_access' scope is included. "
137 |                     f"Keys found: {list(token_data.keys())}"
138 |                 )
139 |                 return False
140 | 
141 |             self.access_token = token_data["access_token"]
142 |             self.refresh_token = token_data["refresh_token"]
143 |             self.expires_at = time.time() + token_data["expires_in"]
144 | 
145 |             # Get the cloud ID using the access token
146 |             self._get_cloud_id()
147 | 
148 |             # Save the tokens
149 |             self._save_tokens()
150 | 
151 |             # Log success message with token details
152 |             logger.info(
153 |                 f"✅ OAuth token exchange successful! Access token expires in {token_data['expires_in']}s."
154 |             )
155 |             logger.info(
156 |                 f"Access Token (partial): {self.access_token[:10]}...{self.access_token[-5:] if self.access_token else ''}"
157 |             )
158 |             logger.info(
159 |                 f"Refresh Token (partial): {self.refresh_token[:5]}...{self.refresh_token[-3:] if self.refresh_token else ''}"
160 |             )
161 |             if self.cloud_id:
162 |                 logger.info(f"Cloud ID successfully retrieved: {self.cloud_id}")
163 |             else:
164 |                 logger.warning(
165 |                     "Cloud ID was not retrieved after token exchange. Check accessible resources."
166 |                 )
167 |             return True
168 |         except requests.exceptions.RequestException as e:
169 |             logger.error(f"Network error during token exchange: {e}", exc_info=True)
170 |             return False
171 |         except json.JSONDecodeError as e:
172 |             logger.error(
173 |                 f"Failed to decode JSON response from token endpoint: {e}",
174 |                 exc_info=True,
175 |             )
176 |             logger.error(
177 |                 f"Response text that failed to parse: {response.text if 'response' in locals() else 'Response object not available'}"
178 |             )
179 |             return False
180 |         except Exception as e:
181 |             logger.error(f"Failed to exchange code for tokens: {e}")
182 |             return False
183 | 
184 |     def refresh_access_token(self) -> bool:
185 |         """Refresh the access token using the refresh token.
186 | 
187 |         Returns:
188 |             True if the token was successfully refreshed, False otherwise.
189 |         """
190 |         if not self.refresh_token:
191 |             logger.error("No refresh token available")
192 |             return False
193 | 
194 |         try:
195 |             payload = {
196 |                 "grant_type": "refresh_token",
197 |                 "client_id": self.client_id,
198 |                 "client_secret": self.client_secret,
199 |                 "refresh_token": self.refresh_token,
200 |             }
201 | 
202 |             logger.debug("Refreshing access token...")
203 |             response = requests.post(TOKEN_URL, data=payload)
204 |             response.raise_for_status()
205 | 
206 |             # Parse the response
207 |             token_data = response.json()
208 |             self.access_token = token_data["access_token"]
209 |             # Refresh token might also be rotated
210 |             if "refresh_token" in token_data:
211 |                 self.refresh_token = token_data["refresh_token"]
212 |             self.expires_at = time.time() + token_data["expires_in"]
213 | 
214 |             # Save the tokens
215 |             self._save_tokens()
216 | 
217 |             return True
218 |         except Exception as e:
219 |             logger.error(f"Failed to refresh access token: {e}")
220 |             return False
221 | 
222 |     def ensure_valid_token(self) -> bool:
223 |         """Ensure the access token is valid, refreshing if necessary.
224 | 
225 |         Returns:
226 |             True if the token is valid (or was refreshed successfully), False otherwise.
227 |         """
228 |         if not self.is_token_expired:
229 |             return True
230 |         return self.refresh_access_token()
231 | 
232 |     def _get_cloud_id(self) -> None:
233 |         """Get the cloud ID for the Atlassian instance.
234 | 
235 |         This method queries the accessible resources endpoint to get the cloud ID.
236 |         The cloud ID is needed for API calls with OAuth.
237 |         """
238 |         if not self.access_token:
239 |             logger.debug("No access token available to get cloud ID")
240 |             return
241 | 
242 |         try:
243 |             headers = {"Authorization": f"Bearer {self.access_token}"}
244 |             response = requests.get(CLOUD_ID_URL, headers=headers)
245 |             response.raise_for_status()
246 | 
247 |             resources = response.json()
248 |             if resources and len(resources) > 0:
249 |                 # Use the first cloud site (most users have only one)
250 |                 # For users with multiple sites, they might need to specify which one to use
251 |                 self.cloud_id = resources[0]["id"]
252 |                 logger.debug(f"Found cloud ID: {self.cloud_id}")
253 |             else:
254 |                 logger.warning("No Atlassian sites found in the response")
255 |         except Exception as e:
256 |             logger.error(f"Failed to get cloud ID: {e}")
257 | 
258 |     def _get_keyring_username(self) -> str:
259 |         """Get the keyring username for storing tokens.
260 | 
261 |         The username is based on the client ID to allow multiple OAuth apps.
262 | 
263 |         Returns:
264 |             A username string for keyring
265 |         """
266 |         return f"oauth-{self.client_id}"
267 | 
268 |     def _save_tokens(self) -> None:
269 |         """Save the tokens securely using keyring for later use.
270 | 
271 |         This allows the tokens to be reused between runs without requiring
272 |         the user to go through the authorization flow again.
273 |         """
274 |         try:
275 |             username = self._get_keyring_username()
276 | 
277 |             # Store token data as JSON string in keyring
278 |             token_data = {
279 |                 "refresh_token": self.refresh_token,
280 |                 "access_token": self.access_token,
281 |                 "expires_at": self.expires_at,
282 |                 "cloud_id": self.cloud_id,
283 |             }
284 | 
285 |             # Store the token data in the system keyring
286 |             keyring.set_password(KEYRING_SERVICE_NAME, username, json.dumps(token_data))
287 | 
288 |             logger.debug(f"Saved OAuth tokens to keyring for {username}")
289 | 
290 |             # Also maintain backwards compatibility with file storage
291 |             # for environments where keyring might not work
292 |             self._save_tokens_to_file(token_data)
293 | 
294 |         except Exception as e:
295 |             logger.error(f"Failed to save tokens to keyring: {e}")
296 |             # Fall back to file storage if keyring fails
297 |             self._save_tokens_to_file()
298 | 
299 |     def _save_tokens_to_file(self, token_data: dict = None) -> None:
300 |         """Save the tokens to a file as fallback storage.
301 | 
302 |         Args:
303 |             token_data: Optional dict with token data. If not provided,
304 |                         will use the current object attributes.
305 |         """
306 |         try:
307 |             # Create the directory if it doesn't exist
308 |             token_dir = Path.home() / ".mcp-atlassian"
309 |             token_dir.mkdir(exist_ok=True)
310 | 
311 |             # Save the tokens to a file
312 |             token_path = token_dir / f"oauth-{self.client_id}.json"
313 | 
314 |             if token_data is None:
315 |                 token_data = {
316 |                     "refresh_token": self.refresh_token,
317 |                     "access_token": self.access_token,
318 |                     "expires_at": self.expires_at,
319 |                     "cloud_id": self.cloud_id,
320 |                 }
321 | 
322 |             with open(token_path, "w") as f:
323 |                 json.dump(token_data, f)
324 | 
325 |             logger.debug(f"Saved OAuth tokens to file {token_path} (fallback storage)")
326 |         except Exception as e:
327 |             logger.error(f"Failed to save tokens to file: {e}")
328 | 
329 |     @staticmethod
330 |     def load_tokens(client_id: str) -> dict[str, Any]:
331 |         """Load tokens securely from keyring.
332 | 
333 |         Args:
334 |             client_id: The OAuth client ID
335 | 
336 |         Returns:
337 |             Dict with the token data or empty dict if no tokens found
338 |         """
339 |         username = f"oauth-{client_id}"
340 | 
341 |         # Try to load tokens from keyring first
342 |         try:
343 |             token_json = keyring.get_password(KEYRING_SERVICE_NAME, username)
344 |             if token_json:
345 |                 logger.debug(f"Loaded OAuth tokens from keyring for {username}")
346 |                 return json.loads(token_json)
347 |         except Exception as e:
348 |             logger.warning(
349 |                 f"Failed to load tokens from keyring: {e}. Trying file fallback."
350 |             )
351 | 
352 |         # Fall back to loading from file if keyring fails or returns None
353 |         return OAuthConfig._load_tokens_from_file(client_id)
354 | 
355 |     @staticmethod
356 |     def _load_tokens_from_file(client_id: str) -> dict[str, Any]:
357 |         """Load tokens from a file as fallback.
358 | 
359 |         Args:
360 |             client_id: The OAuth client ID
361 | 
362 |         Returns:
363 |             Dict with the token data or empty dict if no tokens found
364 |         """
365 |         token_path = Path.home() / ".mcp-atlassian" / f"oauth-{client_id}.json"
366 | 
367 |         if not token_path.exists():
368 |             return {}
369 | 
370 |         try:
371 |             with open(token_path) as f:
372 |                 token_data = json.load(f)
373 |                 logger.debug(
374 |                     f"Loaded OAuth tokens from file {token_path} (fallback storage)"
375 |                 )
376 |                 return token_data
377 |         except Exception as e:
378 |             logger.error(f"Failed to load tokens from file: {e}")
379 |             return {}
380 | 
381 |     @classmethod
382 |     def from_env(cls) -> Optional["OAuthConfig"]:
383 |         """Create an OAuth configuration from environment variables.
384 | 
385 |         Returns:
386 |             OAuthConfig instance or None if OAuth is not enabled
387 |         """
388 |         # Check if OAuth is explicitly enabled (allows minimal config)
389 |         oauth_enabled = os.getenv("ATLASSIAN_OAUTH_ENABLE", "").lower() in (
390 |             "true",
391 |             "1",
392 |             "yes",
393 |         )
394 | 
395 |         # Check for required environment variables
396 |         client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID")
397 |         client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET")
398 |         redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI")
399 |         scope = os.getenv("ATLASSIAN_OAUTH_SCOPE")
400 | 
401 |         # Full OAuth configuration (traditional mode)
402 |         if all([client_id, client_secret, redirect_uri, scope]):
403 |             # Create the OAuth configuration with full credentials
404 |             config = cls(
405 |                 client_id=client_id,
406 |                 client_secret=client_secret,
407 |                 redirect_uri=redirect_uri,
408 |                 scope=scope,
409 |                 cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),
410 |             )
411 | 
412 |             # Try to load existing tokens
413 |             token_data = cls.load_tokens(client_id)
414 |             if token_data:
415 |                 config.refresh_token = token_data.get("refresh_token")
416 |                 config.access_token = token_data.get("access_token")
417 |                 config.expires_at = token_data.get("expires_at")
418 |                 if not config.cloud_id and "cloud_id" in token_data:
419 |                     config.cloud_id = token_data["cloud_id"]
420 | 
421 |             return config
422 | 
423 |         # Minimal OAuth configuration (user-provided tokens mode)
424 |         elif oauth_enabled:
425 |             # Create minimal config that works with user-provided tokens
426 |             logger.info(
427 |                 "Creating minimal OAuth config for user-provided tokens (ATLASSIAN_OAUTH_ENABLE=true)"
428 |             )
429 |             return cls(
430 |                 client_id="",  # Will be provided by user tokens
431 |                 client_secret="",  # Not needed for user tokens
432 |                 redirect_uri="",  # Not needed for user tokens
433 |                 scope="",  # Will be determined by user token permissions
434 |                 cloud_id=os.getenv("ATLASSIAN_OAUTH_CLOUD_ID"),  # Optional fallback
435 |             )
436 | 
437 |         # No OAuth configuration
438 |         return None
439 | 
440 | 
441 | @dataclass
442 | class BYOAccessTokenOAuthConfig:
443 |     """OAuth configuration when providing a pre-existing access token.
444 | 
445 |     This class is used when the user provides their own Atlassian Cloud ID
446 |     and access token directly, bypassing the full OAuth 2.0 (3LO) flow.
447 |     It's suitable for scenarios like service accounts or CI/CD pipelines
448 |     where an access token is already available.
449 | 
450 |     This configuration does not support token refreshing.
451 |     """
452 | 
453 |     cloud_id: str
454 |     access_token: str
455 |     refresh_token: None = None
456 |     expires_at: None = None
457 | 
458 |     @classmethod
459 |     def from_env(cls) -> Optional["BYOAccessTokenOAuthConfig"]:
460 |         """Create a BYOAccessTokenOAuthConfig from environment variables.
461 | 
462 |         Reads `ATLASSIAN_OAUTH_CLOUD_ID` and `ATLASSIAN_OAUTH_ACCESS_TOKEN`.
463 | 
464 |         Returns:
465 |             BYOAccessTokenOAuthConfig instance or None if required
466 |             environment variables are missing.
467 |         """
468 |         cloud_id = os.getenv("ATLASSIAN_OAUTH_CLOUD_ID")
469 |         access_token = os.getenv("ATLASSIAN_OAUTH_ACCESS_TOKEN")
470 | 
471 |         if not all([cloud_id, access_token]):
472 |             return None
473 | 
474 |         return cls(cloud_id=cloud_id, access_token=access_token)
475 | 
476 | 
477 | def get_oauth_config_from_env() -> OAuthConfig | BYOAccessTokenOAuthConfig | None:
478 |     """Get the appropriate OAuth configuration from environment variables.
479 | 
480 |     This function attempts to load standard OAuth configuration first (OAuthConfig).
481 |     If that's not available, it tries to load a "Bring Your Own Access Token"
482 |     configuration (BYOAccessTokenOAuthConfig).
483 | 
484 |     Returns:
485 |         An instance of OAuthConfig or BYOAccessTokenOAuthConfig if environment
486 |         variables are set for either, otherwise None.
487 |     """
488 |     return BYOAccessTokenOAuthConfig.from_env() or OAuthConfig.from_env()
489 | 
490 | 
491 | def configure_oauth_session(
492 |     session: requests.Session, oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig
493 | ) -> bool:
494 |     """Configure a requests session with OAuth 2.0 authentication.
495 | 
496 |     This function ensures the access token is valid and adds it to the session headers.
497 | 
498 |     Args:
499 |         session: The requests session to configure
500 |         oauth_config: The OAuth configuration to use
501 | 
502 |     Returns:
503 |         True if the session was successfully configured, False otherwise
504 |     """
505 |     logger.debug(
506 |         f"configure_oauth_session: Received OAuthConfig with "
507 |         f"access_token_present={bool(oauth_config.access_token)}, "
508 |         f"refresh_token_present={bool(oauth_config.refresh_token)}, "
509 |         f"cloud_id='{oauth_config.cloud_id}'"
510 |     )
511 |     # If user provided only an access token (no refresh_token), use it directly
512 |     if oauth_config.access_token and not oauth_config.refresh_token:
513 |         logger.info(
514 |             "configure_oauth_session: Using provided OAuth access token directly (no refresh_token)."
515 |         )
516 |         session.headers["Authorization"] = f"Bearer {oauth_config.access_token}"
517 |         return True
518 |     logger.debug("configure_oauth_session: Proceeding to ensure_valid_token.")
519 |     # Otherwise, ensure we have a valid token (refresh if needed)
520 |     if isinstance(oauth_config, BYOAccessTokenOAuthConfig):
521 |         logger.error(
522 |             "configure_oauth_session: oauth access token configuration provided as empty string."
523 |         )
524 |         return False
525 |     if not oauth_config.ensure_valid_token():
526 |         logger.error(
527 |             f"configure_oauth_session: ensure_valid_token returned False. "
528 |             f"Token was expired: {oauth_config.is_token_expired}, "
529 |             f"Refresh token present for attempt: {bool(oauth_config.refresh_token)}"
530 |         )
531 |         return False
532 |     session.headers["Authorization"] = f"Bearer {oauth_config.access_token}"
533 |     logger.info("Successfully configured OAuth session for Atlassian Cloud API")
534 |     return True
535 | 
```

--------------------------------------------------------------------------------
/tests/unit/jira/test_fields.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for the Jira Fields mixin."""
  2 | 
  3 | from typing import Any
  4 | from unittest.mock import MagicMock
  5 | 
  6 | import pytest
  7 | 
  8 | from mcp_atlassian.jira import JiraFetcher
  9 | from mcp_atlassian.jira.fields import FieldsMixin
 10 | 
 11 | 
 12 | class TestFieldsMixin:
 13 |     """Tests for the FieldsMixin class."""
 14 | 
 15 |     @pytest.fixture
 16 |     def fields_mixin(self, jira_fetcher: JiraFetcher) -> FieldsMixin:
 17 |         """Create a FieldsMixin instance with mocked dependencies."""
 18 |         mixin = jira_fetcher
 19 |         mixin._field_ids_cache = None
 20 |         return mixin
 21 | 
 22 |     @pytest.fixture
 23 |     def mock_fields(self):
 24 |         """Return mock field data."""
 25 |         return [
 26 |             {"id": "summary", "name": "Summary", "schema": {"type": "string"}},
 27 |             {"id": "description", "name": "Description", "schema": {"type": "string"}},
 28 |             {"id": "status", "name": "Status", "schema": {"type": "status"}},
 29 |             {"id": "assignee", "name": "Assignee", "schema": {"type": "user"}},
 30 |             {
 31 |                 "id": "customfield_10010",
 32 |                 "name": "Epic Link",
 33 |                 "schema": {
 34 |                     "type": "string",
 35 |                     "custom": "com.pyxis.greenhopper.jira:gh-epic-link",
 36 |                 },
 37 |             },
 38 |             {
 39 |                 "id": "customfield_10011",
 40 |                 "name": "Epic Name",
 41 |                 "schema": {
 42 |                     "type": "string",
 43 |                     "custom": "com.pyxis.greenhopper.jira:gh-epic-label",
 44 |                 },
 45 |             },
 46 |             {
 47 |                 "id": "customfield_10012",
 48 |                 "name": "Story Points",
 49 |                 "schema": {"type": "number"},
 50 |             },
 51 |         ]
 52 | 
 53 |     def test_get_field_ids_cache(self, fields_mixin: FieldsMixin, mock_fields):
 54 |         """Test get_fields uses cache when available."""
 55 |         # Set up the cache
 56 |         fields_mixin._field_ids_cache = mock_fields
 57 | 
 58 |         # Call the method
 59 |         result = fields_mixin.get_fields()
 60 | 
 61 |         # Verify cache was used
 62 |         assert result == mock_fields
 63 |         fields_mixin.jira.get_all_fields.assert_not_called()
 64 | 
 65 |     def test_get_fields_refresh(self, fields_mixin: FieldsMixin, mock_fields):
 66 |         """Test get_fields refreshes data when requested."""
 67 |         # Set up the cache
 68 |         fields_mixin._field_ids_cache = [{"id": "old_data", "name": "old data"}]
 69 | 
 70 |         # Mock the API response
 71 |         fields_mixin.jira.get_all_fields.return_value = mock_fields
 72 | 
 73 |         # Call the method with refresh=True
 74 |         result = fields_mixin.get_fields(refresh=True)
 75 | 
 76 |         # Verify API was called
 77 |         fields_mixin.jira.get_all_fields.assert_called_once()
 78 |         assert result == mock_fields
 79 |         # Verify cache was updated
 80 |         assert fields_mixin._field_ids_cache == mock_fields
 81 | 
 82 |     def test_get_fields_from_api(
 83 |         self, fields_mixin: FieldsMixin, mock_fields: list[dict[str, Any]]
 84 |     ):
 85 |         """Test get_fields fetches from API when no cache exists."""
 86 |         # Mock the API response
 87 |         fields_mixin.jira.get_all_fields.return_value = mock_fields
 88 | 
 89 |         # Call the method
 90 |         result = fields_mixin.get_fields()
 91 | 
 92 |         # Verify API was called
 93 |         fields_mixin.jira.get_all_fields.assert_called_once()
 94 |         assert result == mock_fields
 95 |         # Verify cache was created
 96 |         assert fields_mixin._field_ids_cache == mock_fields
 97 | 
 98 |     def test_get_fields_error(self, fields_mixin: FieldsMixin):
 99 |         """Test get_fields handles errors gracefully."""
100 | 
101 |         # Mock API error
102 |         fields_mixin.jira.get_all_fields.side_effect = Exception("API error")
103 | 
104 |         # Call the method
105 |         result = fields_mixin.get_fields()
106 | 
107 |         # Verify empty list is returned on error
108 |         assert result == []
109 | 
110 |     def test_get_field_id_by_exact_match(self, fields_mixin: FieldsMixin, mock_fields):
111 |         """Test get_field_id finds field by exact name match."""
112 |         # Set up the fields
113 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
114 | 
115 |         # Call the method
116 |         result = fields_mixin.get_field_id("Summary")
117 | 
118 |         # Verify the result
119 |         assert result == "summary"
120 | 
121 |     def test_get_field_id_case_insensitive(
122 |         self, fields_mixin: FieldsMixin, mock_fields
123 |     ):
124 |         """Test get_field_id is case-insensitive."""
125 |         # Set up the fields
126 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
127 | 
128 |         # Call the method with different case
129 |         result = fields_mixin.get_field_id("summary")
130 | 
131 |         # Verify the result
132 |         assert result == "summary"
133 | 
134 |     def test_get_field_id_exact_match_case_insensitive(
135 |         self, fields_mixin: FieldsMixin, mock_fields
136 |     ):
137 |         """Test get_field_id finds field by exact match (case-insensitive) using the map."""
138 |         # Set up the fields
139 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
140 |         # Ensure the map is generated based on the mock fields for this test
141 |         fields_mixin._generate_field_map(force_regenerate=True)
142 | 
143 |         # Call the method with exact name (case-insensitive)
144 |         result = fields_mixin.get_field_id("epic link")
145 | 
146 |         # Verify the result (should find Epic Link as first match)
147 |         assert result == "customfield_10010"
148 | 
149 |     def test_get_field_id_not_found(self, fields_mixin: FieldsMixin, mock_fields):
150 |         """Test get_field_id returns None when field not found."""
151 |         # Set up the fields
152 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
153 | 
154 |         # Call the method with non-existent field
155 |         result = fields_mixin.get_field_id("NonExistent")
156 | 
157 |         # Verify the result
158 |         assert result is None
159 | 
160 |     def test_get_field_id_error(self, fields_mixin: FieldsMixin):
161 |         """Test get_field_id handles errors gracefully."""
162 |         # Make get_fields raise an exception
163 |         fields_mixin.get_fields = MagicMock(
164 |             side_effect=Exception("Error getting fields")
165 |         )
166 | 
167 |         # Call the method
168 |         result = fields_mixin.get_field_id("Summary")
169 | 
170 |         # Verify None is returned on error
171 |         assert result is None
172 | 
173 |     def test_get_field_by_id(self, fields_mixin: FieldsMixin, mock_fields):
174 |         """Test get_field_by_id retrieves field definition correctly."""
175 |         # Set up the fields
176 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
177 | 
178 |         # Call the method
179 |         result = fields_mixin.get_field_by_id("customfield_10012")
180 | 
181 |         # Verify the result
182 |         assert result == mock_fields[6]  # The Story Points field
183 |         assert result["name"] == "Story Points"
184 | 
185 |     def test_get_field_by_id_not_found(self, fields_mixin: FieldsMixin, mock_fields):
186 |         """Test get_field_by_id returns None when field not found."""
187 |         # Set up the fields
188 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
189 | 
190 |         # Call the method with non-existent ID
191 |         result = fields_mixin.get_field_by_id("customfield_99999")
192 | 
193 |         # Verify the result
194 |         assert result is None
195 | 
196 |     def test_get_custom_fields(self, fields_mixin: FieldsMixin, mock_fields):
197 |         """Test get_custom_fields returns only custom fields."""
198 |         # Set up the fields
199 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
200 | 
201 |         # Call the method
202 |         result = fields_mixin.get_custom_fields()
203 | 
204 |         # Verify the result
205 |         assert len(result) == 3
206 |         assert all(field["id"].startswith("customfield_") for field in result)
207 |         assert result[0]["name"] == "Epic Link"
208 |         assert result[1]["name"] == "Epic Name"
209 |         assert result[2]["name"] == "Story Points"
210 | 
211 |     def test_get_required_fields(self, fields_mixin: FieldsMixin):
212 |         """Test get_required_fields retrieves required fields correctly."""
213 |         # Mock the response for get_project_issue_types
214 |         mock_issue_types = [
215 |             {"id": "10001", "name": "Bug"},
216 |             {"id": "10002", "name": "Task"},
217 |         ]
218 |         fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
219 | 
220 |         # Mock the response for issue_createmeta_fieldtypes based on API docs
221 |         mock_field_meta = {
222 |             "fields": [
223 |                 {
224 |                     "required": True,
225 |                     "schema": {"type": "string", "system": "summary"},
226 |                     "name": "Summary",
227 |                     "fieldId": "summary",
228 |                     "autoCompleteUrl": "",
229 |                     "hasDefaultValue": False,
230 |                     "operations": ["set"],
231 |                     "allowedValues": [],
232 |                 },
233 |                 {
234 |                     "required": False,
235 |                     "schema": {"type": "string", "system": "description"},
236 |                     "name": "Description",
237 |                     "fieldId": "description",
238 |                 },
239 |                 {
240 |                     "required": True,
241 |                     "schema": {"type": "string", "custom": "some.custom.type"},
242 |                     "name": "Epic Link",
243 |                     "fieldId": "customfield_10010",
244 |                 },
245 |             ]
246 |         }
247 |         fields_mixin.jira.issue_createmeta_fieldtypes.return_value = mock_field_meta
248 | 
249 |         # Call the method
250 |         result = fields_mixin.get_required_fields("Bug", "TEST")
251 | 
252 |         # Verify the result
253 |         assert len(result) == 2
254 |         assert "summary" in result
255 |         assert result["summary"]["required"] is True
256 |         assert "customfield_10010" in result
257 |         assert result["customfield_10010"]["required"] is True
258 |         assert "description" not in result
259 |         # Verify the correct API was called
260 |         fields_mixin.get_project_issue_types.assert_called_once_with("TEST")
261 |         fields_mixin.jira.issue_createmeta_fieldtypes.assert_called_once_with(
262 |             project="TEST", issue_type_id="10001"
263 |         )
264 | 
265 |     def test_get_required_fields_not_found(self, fields_mixin: FieldsMixin):
266 |         """Test get_required_fields handles project/issue type not found."""
267 |         # Scenario 1: Issue type not found in project
268 |         mock_issue_types = [{"id": "10002", "name": "Task"}]  # "Bug" is missing
269 |         fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
270 |         fields_mixin.jira.issue_createmeta_fieldtypes = MagicMock()
271 | 
272 |         # Call the method
273 |         result = fields_mixin.get_required_fields("Bug", "TEST")
274 |         # Verify issue type lookup was attempted, but field meta was not called
275 |         fields_mixin.get_project_issue_types.assert_called_once_with("TEST")
276 |         fields_mixin.jira.issue_createmeta_fieldtypes.assert_not_called()
277 | 
278 |         # Verify the result
279 |         assert result == {}
280 | 
281 |     def test_get_required_fields_error(self, fields_mixin: FieldsMixin):
282 |         """Test get_required_fields handles errors gracefully."""
283 |         # Mock the response for get_project_issue_types
284 |         mock_issue_types = [
285 |             {"id": "10001", "name": "Bug"},
286 |         ]
287 |         fields_mixin.get_project_issue_types = MagicMock(return_value=mock_issue_types)
288 |         # Mock issue_createmeta_fieldtypes to raise an error
289 |         fields_mixin.jira.issue_createmeta_fieldtypes.side_effect = Exception(
290 |             "API error"
291 |         )
292 | 
293 |         # Call the method
294 |         result = fields_mixin.get_required_fields("Bug", "TEST")
295 | 
296 |         # Verify the result
297 |         assert result == {}
298 |         # Verify the correct API was called (which then raised the error)
299 |         fields_mixin.jira.issue_createmeta_fieldtypes.assert_called_once_with(
300 |             project="TEST", issue_type_id="10001"
301 |         )
302 | 
303 |     def test_get_jira_field_ids_cached(self, fields_mixin: FieldsMixin):
304 |         """Test get_field_ids_to_epic returns cached field IDs."""
305 |         # Set up the cache
306 |         fields_mixin._field_ids_cache = [
307 |             {"id": "summary", "name": "Summary"},
308 |             {"id": "description", "name": "Description"},
309 |         ]
310 | 
311 |         # Call the method
312 |         result = fields_mixin.get_field_ids_to_epic()
313 | 
314 |         # Verify the result
315 |         assert result == {
316 |             "Summary": "summary",
317 |             "Description": "description",
318 |         }
319 | 
320 |     def test_get_jira_field_ids_from_fields(
321 |         self, fields_mixin: FieldsMixin, mock_fields: list[dict]
322 |     ):
323 |         """Test get_field_ids_to_epic extracts field IDs from field definitions."""
324 |         # Set up the fields
325 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
326 |         # Ensure field map is generated
327 |         fields_mixin._generate_field_map(force_regenerate=True)
328 | 
329 |         # Call the method
330 |         result = fields_mixin.get_field_ids_to_epic()
331 | 
332 |         # Verify that epic-specific fields are properly identified
333 |         assert "epic_link" in result
334 |         assert "Epic Link" in result
335 |         assert result["epic_link"] == "customfield_10010"
336 |         assert "epic_name" in result
337 |         assert "Epic Name" in result
338 |         assert result["epic_name"] == "customfield_10011"
339 | 
340 |     def test_get_jira_field_ids_error(self, fields_mixin: FieldsMixin):
341 |         """Test get_field_ids_to_epic handles errors gracefully."""
342 |         # Ensure no cache exists
343 |         fields_mixin._field_ids_cache = None
344 | 
345 |         # Make get_fields raise an exception
346 |         fields_mixin.get_fields = MagicMock(
347 |             side_effect=Exception("Error getting fields")
348 |         )
349 | 
350 |         # Call the method
351 |         result = fields_mixin.get_field_ids_to_epic()
352 | 
353 |         # Verify the result
354 |         assert result == {}
355 | 
356 |     def test_is_custom_field(self, fields_mixin: FieldsMixin):
357 |         """Test is_custom_field correctly identifies custom fields."""
358 |         # Test with custom field
359 |         assert fields_mixin.is_custom_field("customfield_10010") is True
360 | 
361 |         # Test with standard field
362 |         assert fields_mixin.is_custom_field("summary") is False
363 | 
364 |     def test_format_field_value_user_field(
365 |         self, fields_mixin: FieldsMixin, mock_fields
366 |     ):
367 |         """Test format_field_value formats user fields correctly."""
368 |         # Set up the mocks
369 |         fields_mixin.get_field_by_id = MagicMock(
370 |             return_value=mock_fields[3]
371 |         )  # The Assignee field
372 |         fields_mixin._get_account_id = MagicMock(return_value="account123")
373 | 
374 |         # Call the method with a user field and string value
375 |         result = fields_mixin.format_field_value("assignee", "johndoe")
376 | 
377 |         # Verify the result
378 |         assert result == {"accountId": "account123"}
379 |         fields_mixin._get_account_id.assert_called_once_with("johndoe")
380 | 
381 |     # FIXME: The test covers impossible case.
382 |     #
383 |     # This test is failing because it assumes that the `_get_account_id`
384 |     # method is unavailable. As default, `format_field_value` will return
385 |     # `{"name": value}` for server/DC.
386 |     #
387 |     # However, in any case `JiraFetcher` always inherits from `UsersMixin`
388 |     # and will therefore have the `_get_account_id` method available.
389 |     #
390 |     # That is to say, the `format_field_value` method will never return in
391 |     # format `{"name": value}`.
392 |     #
393 |     # Further fixes are needed in the `FieldsMixin` class to support the case
394 |     # for server/DC.
395 |     #
396 |     # See also:
397 |     # https://github.com/sooperset/mcp-atlassian/blob/651c271e8aa76b469e9c67535669d93267ad5da6/src/mcp_atlassian/jira/fields.py#L279-L297
398 | 
399 |     # def test_format_field_value_user_field_no_account_id(
400 |     #     self, fields_mixin: FieldsMixin, mock_fields
401 |     # ):
402 |     #     """Test format_field_value handles user fields without _get_account_id."""
403 |     #     # Set up the mocks
404 |     #     fields_mixin.get_field_by_id = MagicMock(
405 |     #         return_value=mock_fields[3]
406 |     #     )  # The Assignee field
407 | 
408 |     #     # Call the method with a user field and string value
409 |     #     result = fields_mixin.format_field_value("assignee", "johndoe")
410 | 
411 |     #     # Verify the result - should use name for server/DC
412 |     #     assert result == {"name": "johndoe"}
413 | 
414 |     def test_format_field_value_array_field(self, fields_mixin: FieldsMixin):
415 |         """Test format_field_value formats array fields correctly."""
416 |         # Set up the mocks
417 |         mock_array_field = {
418 |             "id": "labels",
419 |             "name": "Labels",
420 |             "schema": {"type": "array"},
421 |         }
422 |         fields_mixin.get_field_by_id = MagicMock(return_value=mock_array_field)
423 | 
424 |         # Test with single value (should convert to list)
425 |         result = fields_mixin.format_field_value("labels", "bug")
426 |         assert result == ["bug"]
427 | 
428 |         # Test with list value (should keep as list)
429 |         result = fields_mixin.format_field_value("labels", ["bug", "feature"])
430 |         assert result == ["bug", "feature"]
431 | 
432 |     def test_format_field_value_option_field(self, fields_mixin: FieldsMixin):
433 |         """Test format_field_value formats option fields correctly."""
434 |         # Set up the mocks
435 |         mock_option_field = {
436 |             "id": "priority",
437 |             "name": "Priority",
438 |             "schema": {"type": "option"},
439 |         }
440 |         fields_mixin.get_field_by_id = MagicMock(return_value=mock_option_field)
441 | 
442 |         # Test with string value
443 |         result = fields_mixin.format_field_value("priority", "High")
444 |         assert result == {"value": "High"}
445 | 
446 |         # Test with already formatted value
447 |         already_formatted = {"value": "Medium"}
448 |         result = fields_mixin.format_field_value("priority", already_formatted)
449 |         assert result == already_formatted
450 | 
451 |     def test_format_field_value_unknown_field(self, fields_mixin: FieldsMixin):
452 |         """Test format_field_value returns value as-is for unknown fields."""
453 |         # Set up the mocks
454 |         fields_mixin.get_field_by_id = MagicMock(return_value=None)
455 | 
456 |         # Call the method with unknown field
457 |         test_value = "test value"
458 |         result = fields_mixin.format_field_value("unknown", test_value)
459 | 
460 |         # Verify the value is returned as-is
461 |         assert result == test_value
462 | 
463 |     def test_search_fields_empty_keyword(self, fields_mixin: FieldsMixin, mock_fields):
464 |         """Test search_fields returns first N fields when keyword is empty."""
465 |         # Set up the fields
466 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
467 | 
468 |         # Call with empty keyword and limit=3
469 |         result = fields_mixin.search_fields("", limit=3)
470 | 
471 |         # Verify first 3 fields are returned
472 |         assert len(result) == 3
473 |         assert result == mock_fields[:3]
474 | 
475 |     def test_search_fields_exact_match(self, fields_mixin: FieldsMixin, mock_fields):
476 |         """Test search_fields finds exact matches with high relevance."""
477 |         # Set up the fields
478 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
479 | 
480 |         # Search for "Story Points"
481 |         result = fields_mixin.search_fields("Story Points")
482 | 
483 |         # Verify Story Points field is first result
484 |         assert len(result) > 0
485 |         assert result[0]["name"] == "Story Points"
486 |         assert result[0]["id"] == "customfield_10012"
487 | 
488 |     def test_search_fields_partial_match(self, fields_mixin: FieldsMixin, mock_fields):
489 |         """Test search_fields finds partial matches."""
490 |         # Set up the fields
491 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
492 | 
493 |         # Search for "Epic"
494 |         result = fields_mixin.search_fields("Epic")
495 | 
496 |         # Verify Epic-related fields are in results
497 |         epic_fields = [field["name"] for field in result[:2]]  # Top 2 results
498 |         assert "Epic Link" in epic_fields
499 |         assert "Epic Name" in epic_fields
500 | 
501 |     def test_search_fields_case_insensitive(
502 |         self, fields_mixin: FieldsMixin, mock_fields
503 |     ):
504 |         """Test search_fields is case insensitive."""
505 |         # Set up the fields
506 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
507 | 
508 |         # Search with different cases
509 |         result_lower = fields_mixin.search_fields("story points")
510 |         result_upper = fields_mixin.search_fields("STORY POINTS")
511 |         result_mixed = fields_mixin.search_fields("Story Points")
512 | 
513 |         # Verify all searches find the same field
514 |         assert len(result_lower) > 0
515 |         assert len(result_upper) > 0
516 |         assert len(result_mixed) > 0
517 |         assert result_lower[0]["id"] == result_upper[0]["id"] == result_mixed[0]["id"]
518 |         assert result_lower[0]["name"] == "Story Points"
519 | 
520 |     def test_search_fields_with_limit(self, fields_mixin: FieldsMixin, mock_fields):
521 |         """Test search_fields respects the limit parameter."""
522 |         # Set up the fields
523 |         fields_mixin.get_fields = MagicMock(return_value=mock_fields)
524 | 
525 |         # Search with limit=2
526 |         result = fields_mixin.search_fields("field", limit=2)
527 | 
528 |         # Verify only 2 results are returned
529 |         assert len(result) == 2
530 | 
531 |     def test_search_fields_error(self, fields_mixin: FieldsMixin):
532 |         """Test search_fields handles errors gracefully."""
533 |         # Make get_fields raise an exception
534 |         fields_mixin.get_fields = MagicMock(
535 |             side_effect=Exception("Error getting fields")
536 |         )
537 | 
538 |         # Call the method
539 |         result = fields_mixin.search_fields("test")
540 | 
541 |         # Verify empty list is returned on error
542 |         assert result == []
543 | 
```

--------------------------------------------------------------------------------
/tests/unit/confluence/test_users.py:
--------------------------------------------------------------------------------

```python
  1 | """Unit tests for the Confluence users module."""
  2 | 
  3 | from unittest.mock import MagicMock, patch
  4 | 
  5 | import pytest
  6 | from requests.exceptions import HTTPError
  7 | 
  8 | from mcp_atlassian.confluence.users import UsersMixin
  9 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
 10 | 
 11 | 
 12 | class TestUsersMixin:
 13 |     """Tests for the UsersMixin class."""
 14 | 
 15 |     @pytest.fixture
 16 |     def users_mixin(self, confluence_client):
 17 |         """Create a UsersMixin instance for testing."""
 18 |         # UsersMixin inherits from ConfluenceClient, so we need to create it properly
 19 |         with patch(
 20 |             "mcp_atlassian.confluence.users.ConfluenceClient.__init__"
 21 |         ) as mock_init:
 22 |             mock_init.return_value = None
 23 |             mixin = UsersMixin()
 24 |             # Copy the necessary attributes from our mocked client
 25 |             mixin.confluence = confluence_client.confluence
 26 |             mixin.config = confluence_client.config
 27 |             return mixin
 28 | 
 29 |     # Mock user data for different scenarios
 30 |     @pytest.fixture
 31 |     def mock_user_data_cloud(self):
 32 |         """Mock user data for Confluence Cloud."""
 33 |         return {
 34 |             "accountId": "5b10ac8d82e05b22cc7d4ef5",
 35 |             "accountType": "atlassian",
 36 |             "email": "[email protected]",
 37 |             "publicName": "Test User",
 38 |             "displayName": "Test User",
 39 |             "profilePicture": {
 40 |                 "path": "/wiki/aa-avatar/5b10ac8d82e05b22cc7d4ef5",
 41 |                 "width": 48,
 42 |                 "height": 48,
 43 |                 "isDefault": False,
 44 |             },
 45 |             "isExternalCollaborator": False,
 46 |             "accountStatus": "active",
 47 |         }
 48 | 
 49 |     @pytest.fixture
 50 |     def mock_user_data_server(self):
 51 |         """Mock user data for Confluence Server/DC."""
 52 |         return {
 53 |             "username": "testuser",
 54 |             "userKey": "testuser-key-12345",
 55 |             "displayName": "Test User",
 56 |             "fullName": "Test User Full Name",
 57 |             "email": "[email protected]",
 58 |             "status": "active",
 59 |         }
 60 | 
 61 |     @pytest.fixture
 62 |     def mock_user_data_with_status(self):
 63 |         """Mock user data with status expansion."""
 64 |         return {
 65 |             "accountId": "5b10ac8d82e05b22cc7d4ef5",
 66 |             "accountType": "atlassian",
 67 |             "email": "[email protected]",
 68 |             "publicName": "Test User",
 69 |             "displayName": "Test User",
 70 |             "accountStatus": "active",
 71 |             "status": "Active",  # Expanded status field
 72 |         }
 73 | 
 74 |     @pytest.fixture
 75 |     def mock_current_user_data(self):
 76 |         """Mock current user data for get_current_user_info."""
 77 |         return {
 78 |             "accountId": "5b10ac8d82e05b22cc7d4ef5",
 79 |             "type": "known",
 80 |             "accountType": "atlassian",
 81 |             "email": "[email protected]",
 82 |             "publicName": "Current User",
 83 |             "displayName": "Current User",
 84 |             "profilePicture": {
 85 |                 "path": "/wiki/aa-avatar/5b10ac8d82e05b22cc7d4ef5",
 86 |                 "width": 48,
 87 |                 "height": 48,
 88 |                 "isDefault": False,
 89 |             },
 90 |             "isExternalCollaborator": False,
 91 |             "isGuest": False,
 92 |             "locale": "en_US",
 93 |             "accountStatus": "active",
 94 |         }
 95 | 
 96 |     def test_get_user_details_by_accountid_success(
 97 |         self, users_mixin, mock_user_data_cloud
 98 |     ):
 99 |         """Test successfully getting user details by account ID."""
100 |         # Arrange
101 |         account_id = "5b10ac8d82e05b22cc7d4ef5"
102 |         users_mixin.confluence.get_user_details_by_accountid.return_value = (
103 |             mock_user_data_cloud
104 |         )
105 | 
106 |         # Act
107 |         result = users_mixin.get_user_details_by_accountid(account_id)
108 | 
109 |         # Assert
110 |         users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
111 |             account_id, None
112 |         )
113 |         assert result == mock_user_data_cloud
114 |         assert result["accountId"] == account_id
115 |         assert result["displayName"] == "Test User"
116 | 
117 |     def test_get_user_details_by_accountid_with_expand(
118 |         self, users_mixin, mock_user_data_with_status
119 |     ):
120 |         """Test getting user details by account ID with status expansion."""
121 |         # Arrange
122 |         account_id = "5b10ac8d82e05b22cc7d4ef5"
123 |         expand = "status"
124 |         users_mixin.confluence.get_user_details_by_accountid.return_value = (
125 |             mock_user_data_with_status
126 |         )
127 | 
128 |         # Act
129 |         result = users_mixin.get_user_details_by_accountid(account_id, expand=expand)
130 | 
131 |         # Assert
132 |         users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
133 |             account_id, expand
134 |         )
135 |         assert result == mock_user_data_with_status
136 |         assert result["status"] == "Active"
137 |         assert result["accountStatus"] == "active"
138 | 
139 |     def test_get_user_details_by_accountid_invalid_account_id(self, users_mixin):
140 |         """Test getting user details with invalid account ID."""
141 |         # Arrange
142 |         invalid_account_id = "invalid-account-id"
143 |         users_mixin.confluence.get_user_details_by_accountid.side_effect = Exception(
144 |             "User not found"
145 |         )
146 | 
147 |         # Act/Assert
148 |         with pytest.raises(Exception, match="User not found"):
149 |             users_mixin.get_user_details_by_accountid(invalid_account_id)
150 | 
151 |     def test_get_user_details_by_username_success(
152 |         self, users_mixin, mock_user_data_server
153 |     ):
154 |         """Test successfully getting user details by username."""
155 |         # Arrange
156 |         username = "testuser"
157 |         users_mixin.confluence.get_user_details_by_username.return_value = (
158 |             mock_user_data_server
159 |         )
160 | 
161 |         # Act
162 |         result = users_mixin.get_user_details_by_username(username)
163 | 
164 |         # Assert
165 |         users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
166 |             username, None
167 |         )
168 |         assert result == mock_user_data_server
169 |         assert result["username"] == username
170 |         assert result["displayName"] == "Test User"
171 | 
172 |     def test_get_user_details_by_username_with_expand(
173 |         self, users_mixin, mock_user_data_server
174 |     ):
175 |         """Test getting user details by username with status expansion."""
176 |         # Arrange
177 |         username = "testuser"
178 |         expand = "status"
179 |         mock_data_with_status = mock_user_data_server.copy()
180 |         mock_data_with_status["status"] = "Active"
181 |         users_mixin.confluence.get_user_details_by_username.return_value = (
182 |             mock_data_with_status
183 |         )
184 | 
185 |         # Act
186 |         result = users_mixin.get_user_details_by_username(username, expand=expand)
187 | 
188 |         # Assert
189 |         users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
190 |             username, expand
191 |         )
192 |         assert result == mock_data_with_status
193 |         assert result["status"] == "Active"
194 | 
195 |     def test_get_user_details_by_username_invalid_username(self, users_mixin):
196 |         """Test getting user details with invalid username."""
197 |         # Arrange
198 |         invalid_username = "nonexistent-user"
199 |         users_mixin.confluence.get_user_details_by_username.side_effect = Exception(
200 |             "User not found"
201 |         )
202 | 
203 |         # Act/Assert
204 |         with pytest.raises(Exception, match="User not found"):
205 |             users_mixin.get_user_details_by_username(invalid_username)
206 | 
207 |     def test_get_user_details_by_username_server_dc_pattern(
208 |         self, users_mixin, mock_user_data_server
209 |     ):
210 |         """Test that username lookup follows Server/DC patterns."""
211 |         # Arrange
212 |         username = "[email protected]"  # Email-like username common in DC
213 |         users_mixin.confluence.get_user_details_by_username.return_value = (
214 |             mock_user_data_server
215 |         )
216 | 
217 |         # Act
218 |         result = users_mixin.get_user_details_by_username(username)
219 | 
220 |         # Assert
221 |         users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
222 |             username, None
223 |         )
224 |         assert result == mock_user_data_server
225 | 
226 |     def test_get_current_user_info_success(self, users_mixin, mock_current_user_data):
227 |         """Test successfully getting current user info."""
228 |         # Arrange
229 |         users_mixin.confluence.get.return_value = mock_current_user_data
230 | 
231 |         # Act
232 |         result = users_mixin.get_current_user_info()
233 | 
234 |         # Assert
235 |         users_mixin.confluence.get.assert_called_once_with("rest/api/user/current")
236 |         assert result == mock_current_user_data
237 |         assert result["accountId"] == "5b10ac8d82e05b22cc7d4ef5"
238 |         assert result["displayName"] == "Current User"
239 | 
240 |     def test_get_current_user_info_returns_non_dict(self, users_mixin):
241 |         """Test get_current_user_info when API returns non-dict data."""
242 |         # Arrange
243 |         users_mixin.confluence.get.return_value = "Invalid response"
244 | 
245 |         # Act/Assert
246 |         with pytest.raises(
247 |             MCPAtlassianAuthenticationError,
248 |             match="Confluence token validation failed: Did not receive valid JSON user data",
249 |         ):
250 |             users_mixin.get_current_user_info()
251 | 
252 |         users_mixin.confluence.get.assert_called_once_with("rest/api/user/current")
253 | 
254 |     def test_get_current_user_info_returns_none(self, users_mixin):
255 |         """Test get_current_user_info when API returns None."""
256 |         # Arrange
257 |         users_mixin.confluence.get.return_value = None
258 | 
259 |         # Act/Assert
260 |         with pytest.raises(
261 |             MCPAtlassianAuthenticationError,
262 |             match="Confluence token validation failed: Did not receive valid JSON user data",
263 |         ):
264 |             users_mixin.get_current_user_info()
265 | 
266 |     def test_get_current_user_info_http_error_401(self, users_mixin):
267 |         """Test get_current_user_info with 401 authentication error."""
268 |         # Arrange
269 |         mock_response = MagicMock()
270 |         mock_response.status_code = 401
271 |         http_error = HTTPError(response=mock_response)
272 |         users_mixin.confluence.get.side_effect = http_error
273 | 
274 |         # Act/Assert
275 |         with pytest.raises(
276 |             MCPAtlassianAuthenticationError,
277 |             match="Confluence token validation failed: 401 from /rest/api/user/current",
278 |         ):
279 |             users_mixin.get_current_user_info()
280 | 
281 |     def test_get_current_user_info_http_error_403(self, users_mixin):
282 |         """Test get_current_user_info with 403 forbidden error."""
283 |         # Arrange
284 |         mock_response = MagicMock()
285 |         mock_response.status_code = 403
286 |         http_error = HTTPError(response=mock_response)
287 |         users_mixin.confluence.get.side_effect = http_error
288 | 
289 |         # Act/Assert
290 |         with pytest.raises(
291 |             MCPAtlassianAuthenticationError,
292 |             match="Confluence token validation failed: 403 from /rest/api/user/current",
293 |         ):
294 |             users_mixin.get_current_user_info()
295 | 
296 |     def test_get_current_user_info_http_error_other(self, users_mixin):
297 |         """Test get_current_user_info with other HTTP error codes."""
298 |         # Arrange
299 |         mock_response = MagicMock()
300 |         mock_response.status_code = 500
301 |         http_error = HTTPError(response=mock_response)
302 |         users_mixin.confluence.get.side_effect = http_error
303 | 
304 |         # Act/Assert
305 |         with pytest.raises(
306 |             MCPAtlassianAuthenticationError,
307 |             match="Confluence token validation failed with HTTPError",
308 |         ):
309 |             users_mixin.get_current_user_info()
310 | 
311 |     def test_get_current_user_info_http_error_no_response(self, users_mixin):
312 |         """Test get_current_user_info with HTTPError but no response object."""
313 |         # Arrange
314 |         http_error = HTTPError()
315 |         http_error.response = None
316 |         users_mixin.confluence.get.side_effect = http_error
317 | 
318 |         # Act/Assert
319 |         with pytest.raises(
320 |             MCPAtlassianAuthenticationError,
321 |             match="Confluence token validation failed with HTTPError",
322 |         ):
323 |             users_mixin.get_current_user_info()
324 | 
325 |     def test_get_current_user_info_generic_exception(self, users_mixin):
326 |         """Test get_current_user_info with generic exception."""
327 |         # Arrange
328 |         users_mixin.confluence.get.side_effect = ConnectionError("Network error")
329 | 
330 |         # Act/Assert
331 |         with pytest.raises(
332 |             MCPAtlassianAuthenticationError,
333 |             match="Confluence token validation failed: Network error",
334 |         ):
335 |             users_mixin.get_current_user_info()
336 | 
337 |     @pytest.mark.parametrize(
338 |         "expand_param",
339 |         [
340 |             None,
341 |             "status",
342 |             "",  # Empty string
343 |         ],
344 |     )
345 |     def test_get_user_details_by_accountid_expand_parameter_handling(
346 |         self, users_mixin, mock_user_data_cloud, expand_param
347 |     ):
348 |         """Test that expand parameter is properly handled for account ID lookup."""
349 |         # Arrange
350 |         account_id = "5b10ac8d82e05b22cc7d4ef5"
351 |         expected_data = mock_user_data_cloud.copy()
352 |         if expand_param == "status":
353 |             expected_data["status"] = "Active"
354 | 
355 |         users_mixin.confluence.get_user_details_by_accountid.return_value = (
356 |             expected_data
357 |         )
358 | 
359 |         # Act
360 |         result = users_mixin.get_user_details_by_accountid(account_id, expand_param)
361 | 
362 |         # Assert
363 |         users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
364 |             account_id, expand_param
365 |         )
366 |         assert result == expected_data
367 | 
368 |     @pytest.mark.parametrize(
369 |         "expand_param",
370 |         [
371 |             None,
372 |             "status",
373 |             "",  # Empty string
374 |         ],
375 |     )
376 |     def test_get_user_details_by_username_expand_parameter_handling(
377 |         self, users_mixin, mock_user_data_server, expand_param
378 |     ):
379 |         """Test that expand parameter is properly handled for username lookup."""
380 |         # Arrange
381 |         username = "testuser"
382 |         expected_data = mock_user_data_server.copy()
383 |         if expand_param == "status":
384 |             expected_data["status"] = "Active"
385 | 
386 |         users_mixin.confluence.get_user_details_by_username.return_value = expected_data
387 | 
388 |         # Act
389 |         result = users_mixin.get_user_details_by_username(username, expand_param)
390 | 
391 |         # Assert
392 |         users_mixin.confluence.get_user_details_by_username.assert_called_once_with(
393 |             username, expand_param
394 |         )
395 |         assert result == expected_data
396 | 
397 |     def test_users_mixin_inheritance(self, users_mixin):
398 |         """Test that UsersMixin properly inherits from ConfluenceClient."""
399 |         # Verify that UsersMixin is indeed a ConfluenceClient
400 |         from mcp_atlassian.confluence.client import ConfluenceClient
401 | 
402 |         assert isinstance(users_mixin, ConfluenceClient)
403 | 
404 |         # Verify it has the expected attributes from ConfluenceClient
405 |         assert hasattr(users_mixin, "confluence")
406 |         assert hasattr(users_mixin, "config")
407 | 
408 |     def test_users_mixin_has_required_methods(self):
409 |         """Test that UsersMixin has all required methods."""
410 |         # Verify the mixin has the expected methods
411 |         assert hasattr(UsersMixin, "get_user_details_by_accountid")
412 |         assert hasattr(UsersMixin, "get_user_details_by_username")
413 |         assert hasattr(UsersMixin, "get_current_user_info")
414 | 
415 |         # Verify method signatures
416 |         import inspect
417 | 
418 |         # Check get_user_details_by_accountid signature
419 |         sig = inspect.signature(UsersMixin.get_user_details_by_accountid)
420 |         params = list(sig.parameters.keys())
421 |         assert "self" in params
422 |         assert "account_id" in params
423 |         assert "expand" in params
424 |         assert sig.parameters["expand"].default is None
425 | 
426 |         # Check get_user_details_by_username signature
427 |         sig = inspect.signature(UsersMixin.get_user_details_by_username)
428 |         params = list(sig.parameters.keys())
429 |         assert "self" in params
430 |         assert "username" in params
431 |         assert "expand" in params
432 |         assert sig.parameters["expand"].default is None
433 | 
434 |         # Check get_current_user_info signature
435 |         sig = inspect.signature(UsersMixin.get_current_user_info)
436 |         params = list(sig.parameters.keys())
437 |         assert "self" in params
438 |         assert len(params) == 1  # Only self parameter
439 | 
440 |     def test_user_permission_scenarios(self, users_mixin):
441 |         """Test various permission error scenarios."""
442 |         # Test 401 Unauthorized
443 |         mock_response_401 = MagicMock()
444 |         mock_response_401.status_code = 401
445 |         http_error_401 = HTTPError(response=mock_response_401)
446 |         users_mixin.confluence.get_user_details_by_accountid.side_effect = (
447 |             http_error_401
448 |         )
449 | 
450 |         with pytest.raises(Exception):  # Should propagate the original exception
451 |             users_mixin.get_user_details_by_accountid("test-account-id")
452 | 
453 |         # Test 403 Forbidden
454 |         mock_response_403 = MagicMock()
455 |         mock_response_403.status_code = 403
456 |         http_error_403 = HTTPError(response=mock_response_403)
457 |         users_mixin.confluence.get_user_details_by_username.side_effect = http_error_403
458 | 
459 |         with pytest.raises(Exception):  # Should propagate the original exception
460 |             users_mixin.get_user_details_by_username("testuser")
461 | 
462 |     def test_cloud_vs_server_authentication_patterns(self, users_mixin):
463 |         """Test that different authentication patterns work for Cloud vs Server/DC."""
464 |         # Mock Cloud response (account ID based)
465 |         cloud_user_data = {
466 |             "accountId": "5b10ac8d82e05b22cc7d4ef5",
467 |             "accountType": "atlassian",
468 |             "displayName": "Cloud User",
469 |             "accountStatus": "active",
470 |         }
471 | 
472 |         # Mock Server/DC response (username based)
473 |         server_user_data = {
474 |             "username": "serveruser",
475 |             "userKey": "serveruser-key-12345",
476 |             "displayName": "Server User",
477 |             "status": "active",
478 |         }
479 | 
480 |         # Test Cloud pattern
481 |         users_mixin.confluence.get_user_details_by_accountid.return_value = (
482 |             cloud_user_data
483 |         )
484 |         cloud_result = users_mixin.get_user_details_by_accountid(
485 |             "5b10ac8d82e05b22cc7d4ef5"
486 |         )
487 |         assert cloud_result["accountId"] == "5b10ac8d82e05b22cc7d4ef5"
488 |         assert "accountType" in cloud_result
489 | 
490 |         # Test Server/DC pattern
491 |         users_mixin.confluence.get_user_details_by_username.return_value = (
492 |             server_user_data
493 |         )
494 |         server_result = users_mixin.get_user_details_by_username("serveruser")
495 |         assert server_result["username"] == "serveruser"
496 |         assert "userKey" in server_result
497 | 
498 |     def test_response_data_validation_and_transformation(
499 |         self, users_mixin, mock_user_data_cloud
500 |     ):
501 |         """Test that response data is properly validated and returned as-is."""
502 |         # Arrange
503 |         account_id = "5b10ac8d82e05b22cc7d4ef5"
504 |         users_mixin.confluence.get_user_details_by_accountid.return_value = (
505 |             mock_user_data_cloud
506 |         )
507 | 
508 |         # Act
509 |         result = users_mixin.get_user_details_by_accountid(account_id)
510 | 
511 |         # Assert - should return the data exactly as received from the API
512 |         assert result is mock_user_data_cloud  # Same object reference
513 |         assert isinstance(result, dict)
514 |         assert all(
515 |             key in result
516 |             for key in ["accountId", "displayName", "email", "accountStatus"]
517 |         )
518 | 
519 |     def test_deactivated_user_status_handling(self, users_mixin):
520 |         """Test handling of deactivated users with status expansion."""
521 |         # Arrange
522 |         deactivated_user_data = {
523 |             "accountId": "5b10ac8d82e05b22cc7d4ef5",
524 |             "displayName": "Deactivated User",
525 |             "accountStatus": "inactive",
526 |             "status": "Deactivated",  # Expanded status
527 |         }
528 |         users_mixin.confluence.get_user_details_by_accountid.return_value = (
529 |             deactivated_user_data
530 |         )
531 | 
532 |         # Act
533 |         result = users_mixin.get_user_details_by_accountid(
534 |             "5b10ac8d82e05b22cc7d4ef5", expand="status"
535 |         )
536 | 
537 |         # Assert
538 |         assert result["accountStatus"] == "inactive"
539 |         assert result["status"] == "Deactivated"
540 |         users_mixin.confluence.get_user_details_by_accountid.assert_called_once_with(
541 |             "5b10ac8d82e05b22cc7d4ef5", "status"
542 |         )
543 | 
544 |     def test_method_delegation_to_confluence_client(
545 |         self, users_mixin, mock_current_user_data
546 |     ):
547 |         """Test that methods properly delegate to the underlying confluence client."""
548 |         # Test that the methods are thin wrappers around confluence client methods
549 |         account_id = "test-account-id"
550 |         username = "testuser"
551 |         expand = "status"
552 | 
553 |         # Test account ID method delegation
554 |         users_mixin.get_user_details_by_accountid(account_id, expand)
555 |         users_mixin.confluence.get_user_details_by_accountid.assert_called_with(
556 |             account_id, expand
557 |         )
558 | 
559 |         # Test username method delegation
560 |         users_mixin.get_user_details_by_username(username, expand)
561 |         users_mixin.confluence.get_user_details_by_username.assert_called_with(
562 |             username, expand
563 |         )
564 | 
565 |         # Test current user method delegation - need to mock the return value
566 |         users_mixin.confluence.get.return_value = mock_current_user_data
567 |         users_mixin.get_current_user_info()
568 |         users_mixin.confluence.get.assert_called_with("rest/api/user/current")
569 | 
```

--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/pages.py:
--------------------------------------------------------------------------------

```python
  1 | """Module for Confluence page operations."""
  2 | 
  3 | import logging
  4 | 
  5 | import requests
  6 | from requests.exceptions import HTTPError
  7 | 
  8 | from ..exceptions import MCPAtlassianAuthenticationError
  9 | from ..models.confluence import ConfluencePage
 10 | from .client import ConfluenceClient
 11 | from .v2_adapter import ConfluenceV2Adapter
 12 | 
 13 | logger = logging.getLogger("mcp-atlassian")
 14 | 
 15 | 
 16 | class PagesMixin(ConfluenceClient):
 17 |     """Mixin for Confluence page operations."""
 18 | 
 19 |     @property
 20 |     def _v2_adapter(self) -> ConfluenceV2Adapter | None:
 21 |         """Get v2 API adapter for OAuth authentication.
 22 | 
 23 |         Returns:
 24 |             ConfluenceV2Adapter instance if OAuth is configured, None otherwise
 25 |         """
 26 |         if self.config.auth_type == "oauth" and self.config.is_cloud:
 27 |             return ConfluenceV2Adapter(
 28 |                 session=self.confluence._session, base_url=self.confluence.url
 29 |             )
 30 |         return None
 31 | 
 32 |     def get_page_content(
 33 |         self, page_id: str, *, convert_to_markdown: bool = True
 34 |     ) -> ConfluencePage:
 35 |         """
 36 |         Get content of a specific page.
 37 | 
 38 |         Args:
 39 |             page_id: The ID of the page to retrieve
 40 |             convert_to_markdown: When True, returns content in markdown format,
 41 |                                otherwise returns raw HTML (keyword-only)
 42 | 
 43 |         Returns:
 44 |             ConfluencePage model containing the page content and metadata
 45 | 
 46 |         Raises:
 47 |             MCPAtlassianAuthenticationError: If authentication fails with the Confluence API (401/403)
 48 |             Exception: If there is an error retrieving the page
 49 |         """
 50 |         try:
 51 |             # Use v2 API for OAuth authentication, v1 API for token/basic auth
 52 |             v2_adapter = self._v2_adapter
 53 |             if v2_adapter:
 54 |                 logger.debug(
 55 |                     f"Using v2 API for OAuth authentication to get page '{page_id}'"
 56 |                 )
 57 |                 page = v2_adapter.get_page(
 58 |                     page_id=page_id,
 59 |                     expand="body.storage,version,space,children.attachment",
 60 |                 )
 61 |             else:
 62 |                 logger.debug(
 63 |                     f"Using v1 API for token/basic authentication to get page '{page_id}'"
 64 |                 )
 65 |                 page = self.confluence.get_page_by_id(
 66 |                     page_id=page_id,
 67 |                     expand="body.storage,version,space,children.attachment",
 68 |                 )
 69 | 
 70 |             space_key = page.get("space", {}).get("key", "")
 71 |             content = page["body"]["storage"]["value"]
 72 |             processed_html, processed_markdown = self.preprocessor.process_html_content(
 73 |                 content, space_key=space_key, confluence_client=self.confluence
 74 |             )
 75 | 
 76 |             # Use the appropriate content format based on the convert_to_markdown flag
 77 |             page_content = processed_markdown if convert_to_markdown else processed_html
 78 | 
 79 |             # Create and return the ConfluencePage model
 80 |             return ConfluencePage.from_api_response(
 81 |                 page,
 82 |                 base_url=self.config.url,
 83 |                 include_body=True,
 84 |                 # Override content with our processed version
 85 |                 content_override=page_content,
 86 |                 content_format="storage" if not convert_to_markdown else "markdown",
 87 |                 is_cloud=self.config.is_cloud,
 88 |             )
 89 |         except HTTPError as http_err:
 90 |             if http_err.response is not None and http_err.response.status_code in [
 91 |                 401,
 92 |                 403,
 93 |             ]:
 94 |                 error_msg = (
 95 |                     f"Authentication failed for Confluence API ({http_err.response.status_code}). "
 96 |                     "Token may be expired or invalid. Please verify credentials."
 97 |                 )
 98 |                 logger.error(error_msg)
 99 |                 raise MCPAtlassianAuthenticationError(error_msg) from http_err
100 |             else:
101 |                 logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
102 |                 raise http_err
103 |         except Exception as e:
104 |             logger.error(
105 |                 f"Error retrieving page content for page ID {page_id}: {str(e)}"
106 |             )
107 |             raise Exception(f"Error retrieving page content: {str(e)}") from e
108 | 
109 |     def get_page_ancestors(self, page_id: str) -> list[ConfluencePage]:
110 |         """
111 |         Get ancestors (parent pages) of a specific page.
112 | 
113 |         Args:
114 |             page_id: The ID of the page to get ancestors for
115 | 
116 |         Returns:
117 |             List of ConfluencePage models representing the ancestors in hierarchical order
118 |                 (immediate parent first, root ancestor last)
119 | 
120 |         Raises:
121 |             MCPAtlassianAuthenticationError: If authentication fails with the Confluence API (401/403)
122 |         """
123 |         try:
124 |             # Use the Atlassian Python API to get ancestors
125 |             ancestors = self.confluence.get_page_ancestors(page_id)
126 | 
127 |             # Process each ancestor
128 |             ancestor_models = []
129 |             for ancestor in ancestors:
130 |                 # Create the page model without fetching content
131 |                 page_model = ConfluencePage.from_api_response(
132 |                     ancestor,
133 |                     base_url=self.config.url,
134 |                     include_body=False,
135 |                 )
136 |                 ancestor_models.append(page_model)
137 | 
138 |             return ancestor_models
139 |         except HTTPError as http_err:
140 |             if http_err.response is not None and http_err.response.status_code in [
141 |                 401,
142 |                 403,
143 |             ]:
144 |                 error_msg = (
145 |                     f"Authentication failed for Confluence API ({http_err.response.status_code}). "
146 |                     "Token may be expired or invalid. Please verify credentials."
147 |                 )
148 |                 logger.error(error_msg)
149 |                 raise MCPAtlassianAuthenticationError(error_msg) from http_err
150 |             else:
151 |                 logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
152 |                 raise http_err
153 |         except Exception as e:
154 |             logger.error(f"Error fetching ancestors for page {page_id}: {str(e)}")
155 |             logger.debug("Full exception details:", exc_info=True)
156 |             return []
157 | 
158 |     def get_page_by_title(
159 |         self, space_key: str, title: str, *, convert_to_markdown: bool = True
160 |     ) -> ConfluencePage | None:
161 |         """
162 |         Get a specific page by its title from a Confluence space.
163 | 
164 |         Args:
165 |             space_key: The key of the space containing the page
166 |             title: The title of the page to retrieve
167 |             convert_to_markdown: When True, returns content in markdown format,
168 |                                otherwise returns raw HTML (keyword-only)
169 | 
170 |         Returns:
171 |             ConfluencePage model containing the page content and metadata, or None if not found
172 |         """
173 |         try:
174 |             # Directly try to find the page by title
175 |             page = self.confluence.get_page_by_title(
176 |                 space=space_key, title=title, expand="body.storage,version"
177 |             )
178 | 
179 |             if not page:
180 |                 logger.warning(
181 |                     f"Page '{title}' not found in space '{space_key}'. "
182 |                     f"The space may be invalid, the page may not exist, or permissions may be insufficient."
183 |                 )
184 |                 return None
185 | 
186 |             content = page["body"]["storage"]["value"]
187 |             processed_html, processed_markdown = self.preprocessor.process_html_content(
188 |                 content, space_key=space_key, confluence_client=self.confluence
189 |             )
190 | 
191 |             # Use the appropriate content format based on the convert_to_markdown flag
192 |             page_content = processed_markdown if convert_to_markdown else processed_html
193 | 
194 |             # Create and return the ConfluencePage model
195 |             return ConfluencePage.from_api_response(
196 |                 page,
197 |                 base_url=self.config.url,
198 |                 include_body=True,
199 |                 # Override content with our processed version
200 |                 content_override=page_content,
201 |                 content_format="storage" if not convert_to_markdown else "markdown",
202 |                 is_cloud=self.config.is_cloud,
203 |             )
204 | 
205 |         except KeyError as e:
206 |             logger.error(f"Missing key in page data: {str(e)}")
207 |             return None
208 |         except requests.RequestException as e:
209 |             logger.error(f"Network error when fetching page: {str(e)}")
210 |             return None
211 |         except (ValueError, TypeError) as e:
212 |             logger.error(f"Error processing page data: {str(e)}")
213 |             return None
214 |         except Exception as e:  # noqa: BLE001 - Intentional fallback with full logging
215 |             logger.error(f"Unexpected error fetching page: {str(e)}")
216 |             # Log the full traceback at debug level for troubleshooting
217 |             logger.debug("Full exception details:", exc_info=True)
218 |             return None
219 | 
220 |     def get_space_pages(
221 |         self,
222 |         space_key: str,
223 |         start: int = 0,
224 |         limit: int = 10,
225 |         *,
226 |         convert_to_markdown: bool = True,
227 |     ) -> list[ConfluencePage]:
228 |         """
229 |         Get all pages from a specific space.
230 | 
231 |         Args:
232 |             space_key: The key of the space to get pages from
233 |             start: The starting index for pagination
234 |             limit: Maximum number of pages to return
235 |             convert_to_markdown: When True, returns content in markdown format,
236 |                                otherwise returns raw HTML (keyword-only)
237 | 
238 |         Returns:
239 |             List of ConfluencePage models containing page content and metadata
240 |         """
241 |         pages = self.confluence.get_all_pages_from_space(
242 |             space=space_key, start=start, limit=limit, expand="body.storage"
243 |         )
244 | 
245 |         page_models = []
246 |         for page in pages:
247 |             content = page["body"]["storage"]["value"]
248 |             processed_html, processed_markdown = self.preprocessor.process_html_content(
249 |                 content, space_key=space_key, confluence_client=self.confluence
250 |             )
251 | 
252 |             # Use the appropriate content format based on the convert_to_markdown flag
253 |             page_content = processed_markdown if convert_to_markdown else processed_html
254 | 
255 |             # Ensure space information is included
256 |             if "space" not in page:
257 |                 page["space"] = {
258 |                     "key": space_key,
259 |                     "name": space_key,  # Use space_key as name if not available
260 |                 }
261 | 
262 |             # Create the ConfluencePage model
263 |             page_model = ConfluencePage.from_api_response(
264 |                 page,
265 |                 base_url=self.config.url,
266 |                 include_body=True,
267 |                 # Override content with our processed version
268 |                 content_override=page_content,
269 |                 content_format="storage" if not convert_to_markdown else "markdown",
270 |                 is_cloud=self.config.is_cloud,
271 |             )
272 | 
273 |             page_models.append(page_model)
274 | 
275 |         return page_models
276 | 
277 |     def create_page(
278 |         self,
279 |         space_key: str,
280 |         title: str,
281 |         body: str,
282 |         parent_id: str | None = None,
283 |         *,
284 |         is_markdown: bool = True,
285 |         enable_heading_anchors: bool = False,
286 |         content_representation: str | None = None,
287 |     ) -> ConfluencePage:
288 |         """
289 |         Create a new page in a Confluence space.
290 | 
291 |         Args:
292 |             space_key: The key of the space to create the page in
293 |             title: The title of the new page
294 |             body: The content of the page (markdown, wiki markup, or storage format)
295 |             parent_id: Optional ID of a parent page
296 |             is_markdown: Whether the body content is in markdown format (default: True, keyword-only)
297 |             enable_heading_anchors: Whether to enable automatic heading anchor generation (default: False, keyword-only)
298 |             content_representation: Content format when is_markdown=False ('wiki' or 'storage', keyword-only)
299 | 
300 |         Returns:
301 |             ConfluencePage model containing the new page's data
302 | 
303 |         Raises:
304 |             Exception: If there is an error creating the page
305 |         """
306 |         try:
307 |             # Determine body and representation based on content type
308 |             if is_markdown:
309 |                 # Convert markdown to Confluence storage format
310 |                 final_body = self.preprocessor.markdown_to_confluence_storage(
311 |                     body, enable_heading_anchors=enable_heading_anchors
312 |                 )
313 |                 representation = "storage"
314 |             else:
315 |                 # Use body as-is with specified representation
316 |                 final_body = body
317 |                 representation = content_representation or "storage"
318 | 
319 |             # Use v2 API for OAuth authentication, v1 API for token/basic auth
320 |             v2_adapter = self._v2_adapter
321 |             if v2_adapter:
322 |                 logger.debug(
323 |                     f"Using v2 API for OAuth authentication to create page '{title}'"
324 |                 )
325 |                 result = v2_adapter.create_page(
326 |                     space_key=space_key,
327 |                     title=title,
328 |                     body=final_body,
329 |                     parent_id=parent_id,
330 |                     representation=representation,
331 |                 )
332 |             else:
333 |                 logger.debug(
334 |                     f"Using v1 API for token/basic authentication to create page '{title}'"
335 |                 )
336 |                 result = self.confluence.create_page(
337 |                     space=space_key,
338 |                     title=title,
339 |                     body=final_body,
340 |                     parent_id=parent_id,
341 |                     representation=representation,
342 |                 )
343 | 
344 |             # Get the new page content
345 |             page_id = result.get("id")
346 |             if not page_id:
347 |                 raise ValueError("Create page response did not contain an ID")
348 | 
349 |             return self.get_page_content(page_id)
350 |         except Exception as e:
351 |             logger.error(
352 |                 f"Error creating page '{title}' in space {space_key}: {str(e)}"
353 |             )
354 |             raise Exception(
355 |                 f"Failed to create page '{title}' in space {space_key}: {str(e)}"
356 |             ) from e
357 | 
358 |     def update_page(
359 |         self,
360 |         page_id: str,
361 |         title: str,
362 |         body: str,
363 |         *,
364 |         is_minor_edit: bool = False,
365 |         version_comment: str = "",
366 |         is_markdown: bool = True,
367 |         parent_id: str | None = None,
368 |         enable_heading_anchors: bool = False,
369 |         content_representation: str | None = None,
370 |     ) -> ConfluencePage:
371 |         """
372 |         Update an existing page in Confluence.
373 | 
374 |         Args:
375 |             page_id: The ID of the page to update
376 |             title: The new title of the page
377 |             body: The new content of the page (markdown, wiki markup, or storage format)
378 |             is_minor_edit: Whether this is a minor edit (keyword-only)
379 |             version_comment: Optional comment for this version (keyword-only)
380 |             is_markdown: Whether the body content is in markdown format (default: True, keyword-only)
381 |             parent_id: Optional new parent page ID (keyword-only)
382 |             enable_heading_anchors: Whether to enable automatic heading anchor generation (default: False, keyword-only)
383 |             content_representation: Content format when is_markdown=False ('wiki' or 'storage', keyword-only)
384 | 
385 |         Returns:
386 |             ConfluencePage model containing the updated page's data
387 | 
388 |         Raises:
389 |             Exception: If there is an error updating the page
390 |         """
391 |         try:
392 |             # Determine body and representation based on content type
393 |             if is_markdown:
394 |                 # Convert markdown to Confluence storage format
395 |                 final_body = self.preprocessor.markdown_to_confluence_storage(
396 |                     body, enable_heading_anchors=enable_heading_anchors
397 |                 )
398 |                 representation = "storage"
399 |             else:
400 |                 # Use body as-is with specified representation
401 |                 final_body = body
402 |                 representation = content_representation or "storage"
403 | 
404 |             logger.debug(f"Updating page {page_id} with title '{title}'")
405 | 
406 |             # Use v2 API for OAuth authentication, v1 API for token/basic auth
407 |             v2_adapter = self._v2_adapter
408 |             if v2_adapter:
409 |                 logger.debug(
410 |                     f"Using v2 API for OAuth authentication to update page '{page_id}'"
411 |                 )
412 |                 response = v2_adapter.update_page(
413 |                     page_id=page_id,
414 |                     title=title,
415 |                     body=final_body,
416 |                     representation=representation,
417 |                     version_comment=version_comment,
418 |                 )
419 |             else:
420 |                 logger.debug(
421 |                     f"Using v1 API for token/basic authentication to update page '{page_id}'"
422 |                 )
423 |                 update_kwargs = {
424 |                     "page_id": page_id,
425 |                     "title": title,
426 |                     "body": final_body,
427 |                     "type": "page",
428 |                     "representation": representation,
429 |                     "minor_edit": is_minor_edit,
430 |                     "version_comment": version_comment,
431 |                     "always_update": True,
432 |                 }
433 |                 if parent_id:
434 |                     update_kwargs["parent_id"] = parent_id
435 | 
436 |                 self.confluence.update_page(**update_kwargs)
437 | 
438 |             # After update, refresh the page data
439 |             return self.get_page_content(page_id)
440 |         except Exception as e:
441 |             logger.error(f"Error updating page {page_id}: {str(e)}")
442 |             raise Exception(f"Failed to update page {page_id}: {str(e)}") from e
443 | 
444 |     def get_page_children(
445 |         self,
446 |         page_id: str,
447 |         start: int = 0,
448 |         limit: int = 25,
449 |         expand: str = "version",
450 |         *,
451 |         convert_to_markdown: bool = True,
452 |     ) -> list[ConfluencePage]:
453 |         """
454 |         Get child pages of a specific Confluence page.
455 | 
456 |         Args:
457 |             page_id: The ID of the parent page
458 |             start: The starting index for pagination
459 |             limit: Maximum number of child pages to return
460 |             expand: Fields to expand in the response
461 |             convert_to_markdown: When True, returns content in markdown format,
462 |                                otherwise returns raw HTML (keyword-only)
463 | 
464 |         Returns:
465 |             List of ConfluencePage models containing the child pages
466 |         """
467 |         try:
468 |             # Use the Atlassian Python API's get_page_child_by_type method
469 |             results = self.confluence.get_page_child_by_type(
470 |                 page_id=page_id, type="page", start=start, limit=limit, expand=expand
471 |             )
472 | 
473 |             # Process results
474 |             page_models = []
475 | 
476 |             # Handle both pagination modes
477 |             if isinstance(results, dict) and "results" in results:
478 |                 child_pages = results.get("results", [])
479 |             else:
480 |                 child_pages = results or []
481 | 
482 |             space_key = ""
483 | 
484 |             # Get space key from the first result if available
485 |             if child_pages and "space" in child_pages[0]:
486 |                 space_key = child_pages[0].get("space", {}).get("key", "")
487 | 
488 |             # Process each child page
489 |             for page in child_pages:
490 |                 # Only process content if we have "body" expanded
491 |                 content_override = None
492 |                 if "body" in page and convert_to_markdown:
493 |                     content = page.get("body", {}).get("storage", {}).get("value", "")
494 |                     if content:
495 |                         _, processed_markdown = self.preprocessor.process_html_content(
496 |                             content,
497 |                             space_key=space_key,
498 |                             confluence_client=self.confluence,
499 |                         )
500 |                         content_override = processed_markdown
501 | 
502 |                 # Create the page model
503 |                 page_model = ConfluencePage.from_api_response(
504 |                     page,
505 |                     base_url=self.config.url,
506 |                     include_body=True,
507 |                     content_override=content_override,
508 |                     content_format="markdown" if convert_to_markdown else "storage",
509 |                 )
510 | 
511 |                 page_models.append(page_model)
512 | 
513 |             return page_models
514 | 
515 |         except Exception as e:
516 |             logger.error(f"Error fetching child pages for page {page_id}: {str(e)}")
517 |             logger.debug("Full exception details:", exc_info=True)
518 |             return []
519 | 
520 |     def delete_page(self, page_id: str) -> bool:
521 |         """
522 |         Delete a Confluence page by its ID.
523 | 
524 |         Args:
525 |             page_id: The ID of the page to delete
526 | 
527 |         Returns:
528 |             Boolean indicating success (True) or failure (False)
529 | 
530 |         Raises:
531 |             Exception: If there is an error deleting the page
532 |         """
533 |         try:
534 |             logger.debug(f"Deleting page {page_id}")
535 | 
536 |             # Use v2 API for OAuth authentication, v1 API for token/basic auth
537 |             v2_adapter = self._v2_adapter
538 |             if v2_adapter:
539 |                 logger.debug(
540 |                     f"Using v2 API for OAuth authentication to delete page '{page_id}'"
541 |                 )
542 |                 return v2_adapter.delete_page(page_id=page_id)
543 |             else:
544 |                 logger.debug(
545 |                     f"Using v1 API for token/basic authentication to delete page '{page_id}'"
546 |                 )
547 |                 response = self.confluence.remove_page(page_id=page_id)
548 | 
549 |                 # The Atlassian library's remove_page returns the raw response from
550 |                 # the REST API call. For a successful deletion, we should get a
551 |                 # response object, but it might be empty (HTTP 204 No Content).
552 |                 # For REST DELETE operations, a success typically returns 204 or 200
553 | 
554 |                 # Check if we got a response object
555 |                 if isinstance(response, requests.Response):
556 |                     # Check if status code indicates success (2xx)
557 |                     success = 200 <= response.status_code < 300
558 |                     logger.debug(
559 |                         f"Delete page {page_id} returned status code {response.status_code}"
560 |                     )
561 |                     return success
562 |                 # If it's not a response object but truthy (like True), consider it a success
563 |                 elif response:
564 |                     return True
565 |                 # Default to true since no exception was raised
566 |                 # This is safer than returning false when we don't know what happened
567 |                 return True
568 | 
569 |         except Exception as e:
570 |             logger.error(f"Error deleting page {page_id}: {str(e)}")
571 |             raise Exception(f"Failed to delete page {page_id}: {str(e)}") from e
572 | 
```
Page 7/13FirstPrevNextLast