This is page 5 of 13. Use http://codebase.md/sooperset/mcp-atlassian?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_atlassian/preprocessing/jira.py:
--------------------------------------------------------------------------------
```python
1 | """Jira-specific text preprocessing module."""
2 |
3 | import logging
4 | import re
5 | from typing import Any
6 |
7 | from .base import BasePreprocessor
8 |
9 | logger = logging.getLogger("mcp-atlassian")
10 |
11 |
12 | class JiraPreprocessor(BasePreprocessor):
13 | """Handles text preprocessing for Jira content."""
14 |
15 | def __init__(self, base_url: str = "", **kwargs: Any) -> None:
16 | """
17 | Initialize the Jira text preprocessor.
18 |
19 | Args:
20 | base_url: Base URL for Jira API
21 | **kwargs: Additional arguments for the base class
22 | """
23 | super().__init__(base_url=base_url, **kwargs)
24 |
25 | def clean_jira_text(self, text: str) -> str:
26 | """
27 | Clean Jira text content by:
28 | 1. Processing user mentions and links
29 | 2. Converting Jira markup to markdown
30 | 3. Converting HTML/wiki markup to markdown
31 | """
32 | if not text:
33 | return ""
34 |
35 | # Process user mentions
36 | mention_pattern = r"\[~accountid:(.*?)\]"
37 | text = self._process_mentions(text, mention_pattern)
38 |
39 | # Process Jira smart links
40 | text = self._process_smart_links(text)
41 |
42 | # First convert any Jira markup to Markdown
43 | text = self.jira_to_markdown(text)
44 |
45 | # Then convert any remaining HTML to markdown
46 | text = self._convert_html_to_markdown(text)
47 |
48 | return text.strip()
49 |
50 | def _process_mentions(self, text: str, pattern: str) -> str:
51 | """
52 | Process user mentions in text.
53 |
54 | Args:
55 | text: The text containing mentions
56 | pattern: Regular expression pattern to match mentions
57 |
58 | Returns:
59 | Text with mentions replaced with display names
60 | """
61 | mentions = re.findall(pattern, text)
62 | for account_id in mentions:
63 | try:
64 | # Note: This is a placeholder - actual user fetching should be injected
65 | display_name = f"User:{account_id}"
66 | text = text.replace(f"[~accountid:{account_id}]", display_name)
67 | except Exception as e:
68 | logger.error(f"Error processing mention for {account_id}: {str(e)}")
69 | return text
70 |
71 | def _process_smart_links(self, text: str) -> str:
72 | """Process Jira/Confluence smart links."""
73 | # Pattern matches: [text|url|smart-link]
74 | link_pattern = r"\[(.*?)\|(.*?)\|smart-link\]"
75 | matches = re.finditer(link_pattern, text)
76 |
77 | for match in matches:
78 | full_match = match.group(0)
79 | link_text = match.group(1)
80 | link_url = match.group(2)
81 |
82 | # Extract issue key if it's a Jira issue link
83 | issue_key_match = re.search(r"browse/([A-Z]+-\d+)", link_url)
84 | # Check if it's a Confluence wiki link
85 | confluence_match = re.search(
86 | r"wiki/spaces/.+?/pages/\d+/(.+?)(?:\?|$)", link_url
87 | )
88 |
89 | if issue_key_match:
90 | issue_key = issue_key_match.group(1)
91 | clean_url = f"{self.base_url}/browse/{issue_key}"
92 | text = text.replace(full_match, f"[{issue_key}]({clean_url})")
93 | elif confluence_match:
94 | url_title = confluence_match.group(1)
95 | readable_title = url_title.replace("+", " ")
96 | readable_title = re.sub(r"^[A-Z]+-\d+\s+", "", readable_title)
97 | text = text.replace(full_match, f"[{readable_title}]({link_url})")
98 | else:
99 | clean_url = link_url.split("?")[0]
100 | text = text.replace(full_match, f"[{link_text}]({clean_url})")
101 |
102 | return text
103 |
104 | def jira_to_markdown(self, input_text: str) -> str:
105 | """
106 | Convert Jira markup to Markdown format.
107 |
108 | Args:
109 | input_text: Text in Jira markup format
110 |
111 | Returns:
112 | Text in Markdown format
113 | """
114 | if not input_text:
115 | return ""
116 |
117 | # Block quotes
118 | output = re.sub(r"^bq\.(.*?)$", r"> \1\n", input_text, flags=re.MULTILINE)
119 |
120 | # Text formatting (bold, italic)
121 | output = re.sub(
122 | r"([*_])(.*?)\1",
123 | lambda match: ("**" if match.group(1) == "*" else "*")
124 | + match.group(2)
125 | + ("**" if match.group(1) == "*" else "*"),
126 | output,
127 | )
128 |
129 | # Multi-level numbered list
130 | output = re.sub(
131 | r"^((?:#|-|\+|\*)+) (.*)$",
132 | lambda match: self._convert_jira_list_to_markdown(match),
133 | output,
134 | flags=re.MULTILINE,
135 | )
136 |
137 | # Headers
138 | output = re.sub(
139 | r"^h([0-6])\.(.*)$",
140 | lambda match: "#" * int(match.group(1)) + match.group(2),
141 | output,
142 | flags=re.MULTILINE,
143 | )
144 |
145 | # Inline code
146 | output = re.sub(r"\{\{([^}]+)\}\}", r"`\1`", output)
147 |
148 | # Citation
149 | output = re.sub(r"\?\?((?:.[^?]|[^?].)+)\?\?", r"<cite>\1</cite>", output)
150 |
151 | # Inserted text
152 | output = re.sub(r"\+([^+]*)\+", r"<ins>\1</ins>", output)
153 |
154 | # Superscript
155 | output = re.sub(r"\^([^^]*)\^", r"<sup>\1</sup>", output)
156 |
157 | # Subscript
158 | output = re.sub(r"~([^~]*)~", r"<sub>\1</sub>", output)
159 |
160 | # Strikethrough
161 | output = re.sub(r"-([^-]*)-", r"-\1-", output)
162 |
163 | # Code blocks with optional language specification
164 | output = re.sub(
165 | r"\{code(?::([a-z]+))?\}([\s\S]*?)\{code\}",
166 | r"```\1\n\2\n```",
167 | output,
168 | flags=re.MULTILINE,
169 | )
170 |
171 | # No format
172 | output = re.sub(r"\{noformat\}([\s\S]*?)\{noformat\}", r"```\n\1\n```", output)
173 |
174 | # Quote blocks
175 | output = re.sub(
176 | r"\{quote\}([\s\S]*)\{quote\}",
177 | lambda match: "\n".join(
178 | [f"> {line}" for line in match.group(1).split("\n")]
179 | ),
180 | output,
181 | flags=re.MULTILINE,
182 | )
183 |
184 | # Images with alt text
185 | output = re.sub(
186 | r"!([^|\n\s]+)\|([^\n!]*)alt=([^\n!\,]+?)(,([^\n!]*))?!",
187 | r"",
188 | output,
189 | )
190 |
191 | # Images with other parameters (ignore them)
192 | output = re.sub(r"!([^|\n\s]+)\|([^\n!]*)!", r"", output)
193 |
194 | # Images without parameters
195 | output = re.sub(r"!([^\n\s!]+)!", r"", output)
196 |
197 | # Links
198 | output = re.sub(r"\[([^|]+)\|(.+?)\]", r"[\1](\2)", output)
199 | output = re.sub(r"\[(.+?)\]([^\(]+)", r"<\1>\2", output)
200 |
201 | # Colored text
202 | output = re.sub(
203 | r"\{color:([^}]+)\}([\s\S]*?)\{color\}",
204 | r"<span style=\"color:\1\">\2</span>",
205 | output,
206 | flags=re.MULTILINE,
207 | )
208 |
209 | # Convert Jira table headers (||) to markdown table format
210 | lines = output.split("\n")
211 | i = 0
212 | while i < len(lines):
213 | line = lines[i]
214 |
215 | if "||" in line:
216 | # Replace Jira table headers
217 | lines[i] = lines[i].replace("||", "|")
218 |
219 | # Add a separator line for markdown tables
220 | header_cells = lines[i].count("|") - 1
221 | if header_cells > 0:
222 | separator_line = "|" + "---|" * header_cells
223 | lines.insert(i + 1, separator_line)
224 | i += 1 # Skip the newly inserted line in next iteration
225 |
226 | i += 1
227 |
228 | # Rejoin the lines
229 | output = "\n".join(lines)
230 |
231 | return output
232 |
233 | def markdown_to_jira(self, input_text: str) -> str:
234 | """
235 | Convert Markdown syntax to Jira markup syntax.
236 |
237 | Args:
238 | input_text: Text in Markdown format
239 |
240 | Returns:
241 | Text in Jira markup format
242 | """
243 | if not input_text:
244 | return ""
245 |
246 | # Save code blocks to prevent recursive processing
247 | code_blocks = []
248 | inline_codes = []
249 |
250 | # Extract code blocks
251 | def save_code_block(match: re.Match) -> str:
252 | """
253 | Process and save a code block.
254 |
255 | Args:
256 | match: Regex match object containing the code block
257 |
258 | Returns:
259 | Jira-formatted code block
260 | """
261 | syntax = match.group(1) or ""
262 | content = match.group(2)
263 | code = "{code"
264 | if syntax:
265 | code += ":" + syntax
266 | code += "}" + content + "{code}"
267 | code_blocks.append(code)
268 | return str(code) # Ensure we return a string
269 |
270 | # Extract inline code
271 | def save_inline_code(match: re.Match) -> str:
272 | """
273 | Process and save inline code.
274 |
275 | Args:
276 | match: Regex match object containing the inline code
277 |
278 | Returns:
279 | Jira-formatted inline code
280 | """
281 | content = match.group(1)
282 | code = "{{" + content + "}}"
283 | inline_codes.append(code)
284 | return str(code) # Ensure we return a string
285 |
286 | # Save code sections temporarily
287 | output = re.sub(r"```(\w*)\n([\s\S]+?)```", save_code_block, input_text)
288 | output = re.sub(r"`([^`]+)`", save_inline_code, output)
289 |
290 | # Headers with = or - underlines
291 | output = re.sub(
292 | r"^(.*?)\n([=-])+$",
293 | lambda match: f"h{1 if match.group(2)[0] == '=' else 2}. {match.group(1)}",
294 | output,
295 | flags=re.MULTILINE,
296 | )
297 |
298 | # Headers with # prefix
299 | output = re.sub(
300 | r"^([#]+)(.*?)$",
301 | lambda match: f"h{len(match.group(1))}." + match.group(2),
302 | output,
303 | flags=re.MULTILINE,
304 | )
305 |
306 | # Bold and italic
307 | output = re.sub(
308 | r"([*_]+)(.*?)\1",
309 | lambda match: ("_" if len(match.group(1)) == 1 else "*")
310 | + match.group(2)
311 | + ("_" if len(match.group(1)) == 1 else "*"),
312 | output,
313 | )
314 |
315 | # Multi-level bulleted list
316 | output = re.sub(
317 | r"^(\s*)- (.*)$",
318 | lambda match: (
319 | "* " + match.group(2)
320 | if not match.group(1)
321 | else " " * (len(match.group(1)) // 2) + "* " + match.group(2)
322 | ),
323 | output,
324 | flags=re.MULTILINE,
325 | )
326 |
327 | # Multi-level numbered list
328 | output = re.sub(
329 | r"^(\s+)1\. (.*)$",
330 | lambda match: "#" * (int(len(match.group(1)) / 4) + 2)
331 | + " "
332 | + match.group(2),
333 | output,
334 | flags=re.MULTILINE,
335 | )
336 |
337 | # HTML formatting tags to Jira markup
338 | tag_map = {"cite": "??", "del": "-", "ins": "+", "sup": "^", "sub": "~"}
339 |
340 | for tag, replacement in tag_map.items():
341 | output = re.sub(
342 | rf"<{tag}>(.*?)<\/{tag}>", rf"{replacement}\1{replacement}", output
343 | )
344 |
345 | # Colored text
346 | output = re.sub(
347 | r"<span style=\"color:(#[^\"]+)\">([\s\S]*?)</span>",
348 | r"{color:\1}\2{color}",
349 | output,
350 | flags=re.MULTILINE,
351 | )
352 |
353 | # Strikethrough
354 | output = re.sub(r"~~(.*?)~~", r"-\1-", output)
355 |
356 | # Images without alt text
357 | output = re.sub(r"!\[\]\(([^)\n\s]+)\)", r"!\1!", output)
358 |
359 | # Images with alt text
360 | output = re.sub(r"!\[([^\]\n]+)\]\(([^)\n\s]+)\)", r"!\2|alt=\1!", output)
361 |
362 | # Links
363 | output = re.sub(r"\[([^\]]+)\]\(([^)]+)\)", r"[\1|\2]", output)
364 | output = re.sub(r"<([^>]+)>", r"[\1]", output)
365 |
366 | # Convert markdown tables to Jira table format
367 | lines = output.split("\n")
368 | i = 0
369 | while i < len(lines):
370 | if i < len(lines) - 1 and re.match(r"\|[-\s|]+\|", lines[i + 1]):
371 | # Convert header row to Jira format
372 | lines[i] = lines[i].replace("|", "||")
373 | # Remove the separator line
374 | lines.pop(i + 1)
375 | i += 1
376 |
377 | # Rejoin the lines
378 | output = "\n".join(lines)
379 |
380 | return output
381 |
382 | def _convert_jira_list_to_markdown(self, match: re.Match) -> str:
383 | """
384 | Helper method to convert Jira lists to Markdown format.
385 |
386 | Args:
387 | match: Regex match object containing the Jira list markup
388 |
389 | Returns:
390 | Markdown-formatted list item
391 | """
392 | jira_bullets = match.group(1)
393 | content = match.group(2)
394 |
395 | # Calculate indentation level based on number of symbols
396 | indent_level = len(jira_bullets) - 1
397 | indent = " " * (indent_level * 2)
398 |
399 | # Determine the marker based on the last character
400 | last_char = jira_bullets[-1]
401 | prefix = "1." if last_char == "#" else "-"
402 |
403 | return f"{indent}{prefix} {content}"
404 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_worklog.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira Worklog mixin."""
2 |
3 | from unittest.mock import MagicMock
4 |
5 | import pytest
6 |
7 | from mcp_atlassian.jira.worklog import WorklogMixin
8 |
9 |
10 | class TestWorklogMixin:
11 | """Tests for the WorklogMixin class."""
12 |
13 | @pytest.fixture
14 | def worklog_mixin(self, jira_client):
15 | """Create a WorklogMixin instance with mocked dependencies."""
16 | mixin = WorklogMixin(config=jira_client.config)
17 | mixin.jira = jira_client.jira
18 |
19 | # Mock methods that are typically provided by other mixins
20 | mixin._clean_text = MagicMock(side_effect=lambda text: text if text else "")
21 |
22 | return mixin
23 |
24 | def test_parse_time_spent_with_seconds(self, worklog_mixin):
25 | """Test parsing time spent with seconds specification."""
26 | assert worklog_mixin._parse_time_spent("60s") == 60
27 | assert worklog_mixin._parse_time_spent("3600s") == 3600
28 |
29 | def test_parse_time_spent_with_minutes(self, worklog_mixin):
30 | """Test parsing time spent with minutes."""
31 | assert worklog_mixin._parse_time_spent("1m") == 60
32 | assert worklog_mixin._parse_time_spent("30m") == 1800
33 |
34 | def test_parse_time_spent_with_hours(self, worklog_mixin):
35 | """Test parsing time spent with hours."""
36 | assert worklog_mixin._parse_time_spent("1h") == 3600
37 | assert worklog_mixin._parse_time_spent("2h") == 7200
38 |
39 | def test_parse_time_spent_with_days(self, worklog_mixin):
40 | """Test parsing time spent with days."""
41 | assert worklog_mixin._parse_time_spent("1d") == 86400
42 | assert worklog_mixin._parse_time_spent("2d") == 172800
43 |
44 | def test_parse_time_spent_with_weeks(self, worklog_mixin):
45 | """Test parsing time spent with weeks."""
46 | assert worklog_mixin._parse_time_spent("1w") == 604800
47 | assert worklog_mixin._parse_time_spent("2w") == 1209600
48 |
49 | def test_parse_time_spent_with_mixed_units(self, worklog_mixin):
50 | """Test parsing time spent with mixed units."""
51 | assert worklog_mixin._parse_time_spent("1h 30m") == 5400
52 | assert worklog_mixin._parse_time_spent("1d 6h") == 108000
53 | assert worklog_mixin._parse_time_spent("1w 2d 3h 4m") == 788640
54 |
55 | def test_parse_time_spent_with_invalid_input(self, worklog_mixin):
56 | """Test parsing time spent with invalid input."""
57 | # Should default to 60 seconds
58 | assert worklog_mixin._parse_time_spent("invalid") == 60
59 |
60 | def test_parse_time_spent_with_numeric_input(self, worklog_mixin):
61 | """Test parsing time spent with numeric input."""
62 | assert worklog_mixin._parse_time_spent("60") == 60
63 | assert worklog_mixin._parse_time_spent("3600") == 3600
64 |
65 | def test_get_worklogs_basic(self, worklog_mixin):
66 | """Test basic functionality of get_worklogs."""
67 | # Setup mock response
68 | mock_result = {
69 | "worklogs": [
70 | {
71 | "id": "10001",
72 | "comment": "Work item 1",
73 | "created": "2024-01-01T10:00:00.000+0000",
74 | "updated": "2024-01-01T10:30:00.000+0000",
75 | "started": "2024-01-01T09:00:00.000+0000",
76 | "timeSpent": "1h",
77 | "timeSpentSeconds": 3600,
78 | "author": {"displayName": "Test User"},
79 | }
80 | ]
81 | }
82 | worklog_mixin.jira.issue_get_worklog.return_value = mock_result
83 |
84 | # Call the method
85 | result = worklog_mixin.get_worklogs("TEST-123")
86 |
87 | # Verify
88 | worklog_mixin.jira.issue_get_worklog.assert_called_once_with("TEST-123")
89 | assert len(result) == 1
90 | assert result[0]["id"] == "10001"
91 | assert result[0]["comment"] == "Work item 1"
92 | assert result[0]["timeSpent"] == "1h"
93 | assert result[0]["timeSpentSeconds"] == 3600
94 | assert result[0]["author"] == "Test User"
95 |
96 | def test_get_worklogs_with_multiple_entries(self, worklog_mixin):
97 | """Test get_worklogs with multiple worklog entries."""
98 | # Setup mock response with multiple entries
99 | mock_result = {
100 | "worklogs": [
101 | {
102 | "id": "10001",
103 | "comment": "Work item 1",
104 | "created": "2024-01-01T10:00:00.000+0000",
105 | "timeSpent": "1h",
106 | "timeSpentSeconds": 3600,
107 | "author": {"displayName": "User 1"},
108 | },
109 | {
110 | "id": "10002",
111 | "comment": "Work item 2",
112 | "created": "2024-01-02T10:00:00.000+0000",
113 | "timeSpent": "2h",
114 | "timeSpentSeconds": 7200,
115 | "author": {"displayName": "User 2"},
116 | },
117 | ]
118 | }
119 | worklog_mixin.jira.issue_get_worklog.return_value = mock_result
120 |
121 | # Call the method
122 | result = worklog_mixin.get_worklogs("TEST-123")
123 |
124 | # Verify
125 | assert len(result) == 2
126 | assert result[0]["id"] == "10001"
127 | assert result[1]["id"] == "10002"
128 | assert result[0]["timeSpentSeconds"] == 3600
129 | assert result[1]["timeSpentSeconds"] == 7200
130 |
131 | def test_get_worklogs_with_missing_fields(self, worklog_mixin):
132 | """Test get_worklogs with missing fields."""
133 | # Setup mock response with missing fields
134 | mock_result = {
135 | "worklogs": [
136 | {
137 | "id": "10001",
138 | # Missing comment
139 | "created": "2024-01-01T10:00:00.000+0000",
140 | # Missing other fields
141 | }
142 | ]
143 | }
144 | worklog_mixin.jira.issue_get_worklog.return_value = mock_result
145 |
146 | # Call the method
147 | result = worklog_mixin.get_worklogs("TEST-123")
148 |
149 | # Verify
150 | assert len(result) == 1
151 | assert result[0]["id"] == "10001"
152 | assert result[0]["comment"] == ""
153 | assert result[0]["timeSpent"] == ""
154 | assert result[0]["timeSpentSeconds"] == 0
155 | assert result[0]["author"] == "Unknown"
156 |
157 | def test_get_worklogs_with_empty_response(self, worklog_mixin):
158 | """Test get_worklogs with empty response."""
159 | # Setup mock response with no worklogs
160 | worklog_mixin.jira.issue_get_worklog.return_value = {}
161 |
162 | # Call the method
163 | result = worklog_mixin.get_worklogs("TEST-123")
164 |
165 | # Verify
166 | assert isinstance(result, list)
167 | assert len(result) == 0
168 |
169 | def test_get_worklogs_with_error(self, worklog_mixin):
170 | """Test get_worklogs error handling."""
171 | # Setup mock to raise exception
172 | worklog_mixin.jira.issue_get_worklog.side_effect = Exception(
173 | "Worklog fetch error"
174 | )
175 |
176 | # Call the method and verify exception
177 | with pytest.raises(
178 | Exception, match="Error getting worklogs: Worklog fetch error"
179 | ):
180 | worklog_mixin.get_worklogs("TEST-123")
181 |
182 | def test_add_worklog_basic(self, worklog_mixin):
183 | """Test basic functionality of add_worklog."""
184 | # Setup mock response
185 | mock_result = {
186 | "id": "10001",
187 | "comment": "Added work",
188 | "created": "2024-01-01T10:00:00.000+0000",
189 | "updated": "2024-01-01T10:00:00.000+0000",
190 | "started": "2024-01-01T09:00:00.000+0000",
191 | "timeSpent": "1h",
192 | "timeSpentSeconds": 3600,
193 | "author": {"displayName": "Test User"},
194 | }
195 | worklog_mixin.jira.post.return_value = mock_result
196 | worklog_mixin.jira.resource_url.return_value = (
197 | "https://jira.example.com/rest/api/2/issue"
198 | )
199 |
200 | # Call the method
201 | result = worklog_mixin.add_worklog("TEST-123", "1h", comment="Added work")
202 |
203 | # Verify
204 | worklog_mixin.jira.resource_url.assert_called_once_with("issue")
205 | worklog_mixin.jira.post.assert_called_once()
206 | assert result["id"] == "10001"
207 | assert result["comment"] == "Added work"
208 | assert result["timeSpent"] == "1h"
209 | assert result["timeSpentSeconds"] == 3600
210 | assert result["author"] == "Test User"
211 | assert result["original_estimate_updated"] is False
212 | assert result["remaining_estimate_updated"] is False
213 |
214 | def test_add_worklog_with_original_estimate(self, worklog_mixin):
215 | """Test add_worklog with original estimate update."""
216 | # Setup mocks
217 | mock_result = {
218 | "id": "10001",
219 | "timeSpent": "1h",
220 | "timeSpentSeconds": 3600,
221 | }
222 | worklog_mixin.jira.post.return_value = mock_result
223 | worklog_mixin.jira.resource_url.return_value = (
224 | "https://jira.example.com/rest/api/2/issue"
225 | )
226 |
227 | # Call the method
228 | result = worklog_mixin.add_worklog("TEST-123", "1h", original_estimate="4h")
229 |
230 | # Verify
231 | worklog_mixin.jira.edit_issue.assert_called_once_with(
232 | issue_id_or_key="TEST-123",
233 | fields={"timetracking": {"originalEstimate": "4h"}},
234 | )
235 | assert result["original_estimate_updated"] is True
236 |
237 | def test_add_worklog_with_remaining_estimate(self, worklog_mixin):
238 | """Test add_worklog with remaining estimate update."""
239 | # Setup mocks
240 | mock_result = {
241 | "id": "10001",
242 | "timeSpent": "1h",
243 | "timeSpentSeconds": 3600,
244 | }
245 | worklog_mixin.jira.post.return_value = mock_result
246 | worklog_mixin.jira.resource_url.return_value = (
247 | "https://jira.example.com/rest/api/2/issue"
248 | )
249 |
250 | # Call the method
251 | result = worklog_mixin.add_worklog("TEST-123", "1h", remaining_estimate="3h")
252 |
253 | # Verify post call has correct parameters
254 | call_args = worklog_mixin.jira.post.call_args
255 | assert call_args is not None
256 | args, kwargs = call_args
257 |
258 | # Check that adjustEstimate=new and newEstimate=3h are in params
259 | assert "params" in kwargs
260 | assert kwargs["params"]["adjustEstimate"] == "new"
261 | assert kwargs["params"]["newEstimate"] == "3h"
262 |
263 | assert result["remaining_estimate_updated"] is True
264 |
265 | def test_add_worklog_with_started_time(self, worklog_mixin):
266 | """Test add_worklog with started time."""
267 | # Setup mocks
268 | mock_result = {
269 | "id": "10001",
270 | "timeSpent": "1h",
271 | "timeSpentSeconds": 3600,
272 | }
273 | worklog_mixin.jira.post.return_value = mock_result
274 | worklog_mixin.jira.resource_url.return_value = (
275 | "https://jira.example.com/rest/api/2/issue"
276 | )
277 |
278 | # Setup started time
279 | started_time = "2024-01-01T09:00:00.000+0000"
280 |
281 | # Call the method
282 | worklog_mixin.add_worklog("TEST-123", "1h", started=started_time)
283 |
284 | # Verify worklog data contains started time
285 | call_args = worklog_mixin.jira.post.call_args
286 | assert call_args is not None
287 | args, kwargs = call_args
288 |
289 | assert "data" in kwargs
290 | assert kwargs["data"]["started"] == started_time
291 |
292 | def test_add_worklog_with_markdown_to_jira_available(self, worklog_mixin):
293 | """Test add_worklog with _markdown_to_jira conversion."""
294 | # Setup mocks
295 | mock_result = {
296 | "id": "10001",
297 | "timeSpent": "1h",
298 | "timeSpentSeconds": 3600,
299 | }
300 | worklog_mixin.jira.post.return_value = mock_result
301 | worklog_mixin.jira.resource_url.return_value = (
302 | "https://jira.example.com/rest/api/2/issue"
303 | )
304 |
305 | # Add _markdown_to_jira method
306 | worklog_mixin._markdown_to_jira = MagicMock(return_value="Converted comment")
307 |
308 | # Call the method
309 | worklog_mixin.add_worklog("TEST-123", "1h", comment="**Markdown** comment")
310 |
311 | # Verify _markdown_to_jira was called
312 | worklog_mixin._markdown_to_jira.assert_called_once_with("**Markdown** comment")
313 |
314 | # Verify converted comment was used
315 | call_args = worklog_mixin.jira.post.call_args
316 | assert call_args is not None
317 | args, kwargs = call_args
318 |
319 | assert "data" in kwargs
320 | assert kwargs["data"]["comment"] == "Converted comment"
321 |
322 | def test_add_worklog_with_error(self, worklog_mixin):
323 | """Test add_worklog error handling."""
324 | # Setup mock to raise exception
325 | worklog_mixin.jira.post.side_effect = Exception("Worklog add error")
326 | worklog_mixin.jira.resource_url.return_value = (
327 | "https://jira.example.com/rest/api/2/issue"
328 | )
329 |
330 | # Call the method and verify exception
331 | with pytest.raises(Exception, match="Error adding worklog: Worklog add error"):
332 | worklog_mixin.add_worklog("TEST-123", "1h")
333 |
334 | def test_add_worklog_with_original_estimate_error(self, worklog_mixin):
335 | """Test add_worklog with original estimate update error."""
336 | # Setup mocks
337 | mock_result = {
338 | "id": "10001",
339 | "timeSpent": "1h",
340 | "timeSpentSeconds": 3600,
341 | }
342 | worklog_mixin.jira.post.return_value = mock_result
343 | worklog_mixin.jira.resource_url.return_value = (
344 | "https://jira.example.com/rest/api/2/issue"
345 | )
346 |
347 | # Make edit_issue raise an exception
348 | worklog_mixin.jira.edit_issue.side_effect = Exception("Estimate update error")
349 |
350 | # Call the method - should continue despite estimate update error
351 | result = worklog_mixin.add_worklog("TEST-123", "1h", original_estimate="4h")
352 |
353 | # Verify post was still called (worklog added despite estimate error)
354 | worklog_mixin.jira.post.assert_called_once()
355 | assert result["original_estimate_updated"] is False
356 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/__init__.py:
--------------------------------------------------------------------------------
```python
1 | import asyncio
2 | import logging
3 | import os
4 | import sys
5 | from importlib.metadata import PackageNotFoundError, version
6 |
7 | import click
8 | from dotenv import load_dotenv
9 |
10 | from mcp_atlassian.utils.env import is_env_truthy
11 | from mcp_atlassian.utils.lifecycle import (
12 | ensure_clean_exit,
13 | setup_signal_handlers,
14 | )
15 | from mcp_atlassian.utils.logging import setup_logging
16 |
17 | try:
18 | __version__ = version("mcp-atlassian")
19 | except PackageNotFoundError:
20 | # package is not installed
21 | __version__ = "0.0.0"
22 |
23 | # Initialize logging with appropriate level
24 | logging_level = logging.WARNING
25 | if is_env_truthy("MCP_VERBOSE"):
26 | logging_level = logging.DEBUG
27 |
28 | # Set up logging to STDOUT if MCP_LOGGING_STDOUT is set to true
29 | logging_stream = sys.stdout if is_env_truthy("MCP_LOGGING_STDOUT") else sys.stderr
30 |
31 | # Set up logging using the utility function
32 | logger = setup_logging(logging_level, logging_stream)
33 |
34 |
35 | @click.version_option(__version__, prog_name="mcp-atlassian")
36 | @click.command()
37 | @click.option(
38 | "-v",
39 | "--verbose",
40 | count=True,
41 | help="Increase verbosity (can be used multiple times)",
42 | )
43 | @click.option(
44 | "--env-file", type=click.Path(exists=True, dir_okay=False), help="Path to .env file"
45 | )
46 | @click.option(
47 | "--oauth-setup",
48 | is_flag=True,
49 | help="Run OAuth 2.0 setup wizard for Atlassian Cloud",
50 | )
51 | @click.option(
52 | "--transport",
53 | type=click.Choice(["stdio", "sse", "streamable-http"]),
54 | default="stdio",
55 | help="Transport type (stdio, sse, or streamable-http)",
56 | )
57 | @click.option(
58 | "--port",
59 | default=8000,
60 | help="Port to listen on for SSE or Streamable HTTP transport",
61 | )
62 | @click.option(
63 | "--host",
64 | default="0.0.0.0", # noqa: S104
65 | help="Host to bind to for SSE or Streamable HTTP transport (default: 0.0.0.0)",
66 | )
67 | @click.option(
68 | "--path",
69 | default="/mcp",
70 | help="Path for Streamable HTTP transport (e.g., /mcp).",
71 | )
72 | @click.option(
73 | "--confluence-url",
74 | help="Confluence URL (e.g., https://your-domain.atlassian.net/wiki)",
75 | )
76 | @click.option("--confluence-username", help="Confluence username/email")
77 | @click.option("--confluence-token", help="Confluence API token")
78 | @click.option(
79 | "--confluence-personal-token",
80 | help="Confluence Personal Access Token (for Confluence Server/Data Center)",
81 | )
82 | @click.option(
83 | "--confluence-ssl-verify/--no-confluence-ssl-verify",
84 | default=True,
85 | help="Verify SSL certificates for Confluence Server/Data Center (default: verify)",
86 | )
87 | @click.option(
88 | "--confluence-spaces-filter",
89 | help="Comma-separated list of Confluence space keys to filter search results",
90 | )
91 | @click.option(
92 | "--jira-url",
93 | help="Jira URL (e.g., https://your-domain.atlassian.net or https://jira.your-company.com)",
94 | )
95 | @click.option("--jira-username", help="Jira username/email (for Jira Cloud)")
96 | @click.option("--jira-token", help="Jira API token (for Jira Cloud)")
97 | @click.option(
98 | "--jira-personal-token",
99 | help="Jira Personal Access Token (for Jira Server/Data Center)",
100 | )
101 | @click.option(
102 | "--jira-ssl-verify/--no-jira-ssl-verify",
103 | default=True,
104 | help="Verify SSL certificates for Jira Server/Data Center (default: verify)",
105 | )
106 | @click.option(
107 | "--jira-projects-filter",
108 | help="Comma-separated list of Jira project keys to filter search results",
109 | )
110 | @click.option(
111 | "--read-only",
112 | is_flag=True,
113 | help="Run in read-only mode (disables all write operations)",
114 | )
115 | @click.option(
116 | "--enabled-tools",
117 | help="Comma-separated list of tools to enable (enables all if not specified)",
118 | )
119 | @click.option(
120 | "--oauth-client-id",
121 | help="OAuth 2.0 client ID for Atlassian Cloud",
122 | )
123 | @click.option(
124 | "--oauth-client-secret",
125 | help="OAuth 2.0 client secret for Atlassian Cloud",
126 | )
127 | @click.option(
128 | "--oauth-redirect-uri",
129 | help="OAuth 2.0 redirect URI for Atlassian Cloud",
130 | )
131 | @click.option(
132 | "--oauth-scope",
133 | help="OAuth 2.0 scopes (space-separated) for Atlassian Cloud",
134 | )
135 | @click.option(
136 | "--oauth-cloud-id",
137 | help="Atlassian Cloud ID for OAuth 2.0 authentication",
138 | )
139 | @click.option(
140 | "--oauth-access-token",
141 | help="Atlassian Cloud OAuth 2.0 access token (if you have your own you'd like to "
142 | "use for the session.)",
143 | )
144 | def main(
145 | verbose: int,
146 | env_file: str | None,
147 | oauth_setup: bool,
148 | transport: str,
149 | port: int,
150 | host: str,
151 | path: str | None,
152 | confluence_url: str | None,
153 | confluence_username: str | None,
154 | confluence_token: str | None,
155 | confluence_personal_token: str | None,
156 | confluence_ssl_verify: bool,
157 | confluence_spaces_filter: str | None,
158 | jira_url: str | None,
159 | jira_username: str | None,
160 | jira_token: str | None,
161 | jira_personal_token: str | None,
162 | jira_ssl_verify: bool,
163 | jira_projects_filter: str | None,
164 | read_only: bool,
165 | enabled_tools: str | None,
166 | oauth_client_id: str | None,
167 | oauth_client_secret: str | None,
168 | oauth_redirect_uri: str | None,
169 | oauth_scope: str | None,
170 | oauth_cloud_id: str | None,
171 | oauth_access_token: str | None,
172 | ) -> None:
173 | """MCP Atlassian Server - Jira and Confluence functionality for MCP
174 |
175 | Supports both Atlassian Cloud and Jira Server/Data Center deployments.
176 | Authentication methods supported:
177 | - Username and API token (Cloud)
178 | - Personal Access Token (Server/Data Center)
179 | - OAuth 2.0 (Cloud only)
180 | """
181 | # Logging level logic
182 | if verbose == 1:
183 | current_logging_level = logging.INFO
184 | elif verbose >= 2: # -vv or more
185 | current_logging_level = logging.DEBUG
186 | else:
187 | # Default to DEBUG if MCP_VERY_VERBOSE is set, else INFO if MCP_VERBOSE is set, else WARNING
188 | if is_env_truthy("MCP_VERY_VERBOSE", "false"):
189 | current_logging_level = logging.DEBUG
190 | elif is_env_truthy("MCP_VERBOSE", "false"):
191 | current_logging_level = logging.INFO
192 | else:
193 | current_logging_level = logging.WARNING
194 |
195 | # Set up logging to STDOUT if MCP_LOGGING_STDOUT is set to true
196 | logging_stream = sys.stdout if is_env_truthy("MCP_LOGGING_STDOUT") else sys.stderr
197 |
198 | global logger
199 | logger = setup_logging(current_logging_level, logging_stream)
200 | logger.debug(f"Logging level set to: {logging.getLevelName(current_logging_level)}")
201 | logger.debug(
202 | f"Logging stream set to: {'stdout' if logging_stream is sys.stdout else 'stderr'}"
203 | )
204 |
205 | def was_option_provided(ctx: click.Context, param_name: str) -> bool:
206 | return (
207 | ctx.get_parameter_source(param_name)
208 | != click.core.ParameterSource.DEFAULT_MAP
209 | and ctx.get_parameter_source(param_name)
210 | != click.core.ParameterSource.DEFAULT
211 | )
212 |
213 | if env_file:
214 | logger.debug(f"Loading environment from file: {env_file}")
215 | load_dotenv(env_file, override=True)
216 | else:
217 | logger.debug(
218 | "Attempting to load environment from default .env file if it exists"
219 | )
220 | load_dotenv(override=True)
221 |
222 | if oauth_setup:
223 | logger.info("Starting OAuth 2.0 setup wizard")
224 | try:
225 | from .utils.oauth_setup import run_oauth_setup
226 |
227 | sys.exit(run_oauth_setup())
228 | except ImportError:
229 | logger.error("Failed to import OAuth setup module.")
230 | sys.exit(1)
231 |
232 | click_ctx = click.get_current_context(silent=True)
233 |
234 | # Transport precedence
235 | final_transport = os.getenv("TRANSPORT", "stdio").lower()
236 | if click_ctx and was_option_provided(click_ctx, "transport"):
237 | final_transport = transport
238 | if final_transport not in ["stdio", "sse", "streamable-http"]:
239 | logger.warning(
240 | f"Invalid transport '{final_transport}' from env/default, using 'stdio'."
241 | )
242 | final_transport = "stdio"
243 | logger.debug(f"Final transport determined: {final_transport}")
244 |
245 | # Port precedence
246 | final_port = 8000
247 | if os.getenv("PORT") and os.getenv("PORT").isdigit():
248 | final_port = int(os.getenv("PORT"))
249 | if click_ctx and was_option_provided(click_ctx, "port"):
250 | final_port = port
251 | logger.debug(f"Final port for HTTP transports: {final_port}")
252 |
253 | # Host precedence
254 | final_host = os.getenv("HOST", "0.0.0.0") # noqa: S104
255 | if click_ctx and was_option_provided(click_ctx, "host"):
256 | final_host = host
257 | logger.debug(f"Final host for HTTP transports: {final_host}")
258 |
259 | # Path precedence
260 | final_path: str | None = os.getenv("STREAMABLE_HTTP_PATH", None)
261 | if click_ctx and was_option_provided(click_ctx, "path"):
262 | final_path = path
263 | logger.debug(
264 | f"Final path for Streamable HTTP: {final_path if final_path else 'FastMCP default'}"
265 | )
266 |
267 | # Set env vars for downstream config
268 | if click_ctx and was_option_provided(click_ctx, "enabled_tools"):
269 | os.environ["ENABLED_TOOLS"] = enabled_tools
270 | if click_ctx and was_option_provided(click_ctx, "confluence_url"):
271 | os.environ["CONFLUENCE_URL"] = confluence_url
272 | if click_ctx and was_option_provided(click_ctx, "confluence_username"):
273 | os.environ["CONFLUENCE_USERNAME"] = confluence_username
274 | if click_ctx and was_option_provided(click_ctx, "confluence_token"):
275 | os.environ["CONFLUENCE_API_TOKEN"] = confluence_token
276 | if click_ctx and was_option_provided(click_ctx, "confluence_personal_token"):
277 | os.environ["CONFLUENCE_PERSONAL_TOKEN"] = confluence_personal_token
278 | if click_ctx and was_option_provided(click_ctx, "jira_url"):
279 | os.environ["JIRA_URL"] = jira_url
280 | if click_ctx and was_option_provided(click_ctx, "jira_username"):
281 | os.environ["JIRA_USERNAME"] = jira_username
282 | if click_ctx and was_option_provided(click_ctx, "jira_token"):
283 | os.environ["JIRA_API_TOKEN"] = jira_token
284 | if click_ctx and was_option_provided(click_ctx, "jira_personal_token"):
285 | os.environ["JIRA_PERSONAL_TOKEN"] = jira_personal_token
286 | if click_ctx and was_option_provided(click_ctx, "oauth_client_id"):
287 | os.environ["ATLASSIAN_OAUTH_CLIENT_ID"] = oauth_client_id
288 | if click_ctx and was_option_provided(click_ctx, "oauth_client_secret"):
289 | os.environ["ATLASSIAN_OAUTH_CLIENT_SECRET"] = oauth_client_secret
290 | if click_ctx and was_option_provided(click_ctx, "oauth_redirect_uri"):
291 | os.environ["ATLASSIAN_OAUTH_REDIRECT_URI"] = oauth_redirect_uri
292 | if click_ctx and was_option_provided(click_ctx, "oauth_scope"):
293 | os.environ["ATLASSIAN_OAUTH_SCOPE"] = oauth_scope
294 | if click_ctx and was_option_provided(click_ctx, "oauth_cloud_id"):
295 | os.environ["ATLASSIAN_OAUTH_CLOUD_ID"] = oauth_cloud_id
296 | if click_ctx and was_option_provided(click_ctx, "oauth_access_token"):
297 | os.environ["ATLASSIAN_OAUTH_ACCESS_TOKEN"] = oauth_access_token
298 | if click_ctx and was_option_provided(click_ctx, "read_only"):
299 | os.environ["READ_ONLY_MODE"] = str(read_only).lower()
300 | if click_ctx and was_option_provided(click_ctx, "confluence_ssl_verify"):
301 | os.environ["CONFLUENCE_SSL_VERIFY"] = str(confluence_ssl_verify).lower()
302 | if click_ctx and was_option_provided(click_ctx, "confluence_spaces_filter"):
303 | os.environ["CONFLUENCE_SPACES_FILTER"] = confluence_spaces_filter
304 | if click_ctx and was_option_provided(click_ctx, "jira_ssl_verify"):
305 | os.environ["JIRA_SSL_VERIFY"] = str(jira_ssl_verify).lower()
306 | if click_ctx and was_option_provided(click_ctx, "jira_projects_filter"):
307 | os.environ["JIRA_PROJECTS_FILTER"] = jira_projects_filter
308 |
309 | from mcp_atlassian.servers import main_mcp
310 |
311 | run_kwargs = {
312 | "transport": final_transport,
313 | }
314 |
315 | if final_transport == "stdio":
316 | logger.info("Starting server with STDIO transport.")
317 | elif final_transport in ["sse", "streamable-http"]:
318 | run_kwargs["host"] = final_host
319 | run_kwargs["port"] = final_port
320 | run_kwargs["log_level"] = logging.getLevelName(current_logging_level).lower()
321 |
322 | if final_path is not None:
323 | run_kwargs["path"] = final_path
324 |
325 | log_display_path = final_path
326 | if log_display_path is None:
327 | if final_transport == "sse":
328 | log_display_path = main_mcp.settings.sse_path or "/sse"
329 | else:
330 | log_display_path = main_mcp.settings.streamable_http_path or "/mcp"
331 |
332 | logger.info(
333 | f"Starting server with {final_transport.upper()} transport on http://{final_host}:{final_port}{log_display_path}"
334 | )
335 | else:
336 | logger.error(
337 | f"Invalid transport type '{final_transport}' determined. Cannot start server."
338 | )
339 | sys.exit(1)
340 |
341 | # Set up signal handlers for graceful shutdown
342 | setup_signal_handlers()
343 |
344 | # For STDIO transport, also handle EOF detection
345 | if final_transport == "stdio":
346 | logger.debug("STDIO transport detected, setting up stdin monitoring")
347 |
348 | try:
349 | logger.debug("Starting asyncio event loop...")
350 |
351 | # For stdio transport, don't monitor stdin as MCP server handles it internally
352 | # This prevents race conditions where both try to read from the same stdin
353 | if final_transport == "stdio":
354 | asyncio.run(main_mcp.run_async(**run_kwargs))
355 | else:
356 | # For HTTP transports (SSE, streamable-http), don't use stdin monitoring
357 | # as it causes premature shutdown when the client closes stdin
358 | # The server should only rely on OS signals for shutdown
359 | logger.debug(
360 | f"Running server for {final_transport} transport without stdin monitoring"
361 | )
362 | asyncio.run(main_mcp.run_async(**run_kwargs))
363 | except (KeyboardInterrupt, SystemExit) as e:
364 | logger.info(f"Server shutdown initiated: {type(e).__name__}")
365 | except Exception as e:
366 | logger.error(f"Server encountered an error: {e}", exc_info=True)
367 | sys.exit(1)
368 | finally:
369 | ensure_clean_exit()
370 |
371 |
372 | __all__ = ["main", "__version__"]
373 |
374 | if __name__ == "__main__":
375 | main()
376 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_formatting.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira FormattingMixin."""
2 |
3 | from unittest.mock import MagicMock, patch
4 |
5 | import pytest
6 |
7 | from mcp_atlassian.jira import JiraFetcher
8 | from mcp_atlassian.jira.formatting import FormattingMixin
9 | from mcp_atlassian.preprocessing import JiraPreprocessor
10 |
11 |
12 | @pytest.fixture
13 | def formatting_mixin(jira_fetcher: JiraFetcher) -> FormattingMixin:
14 | """Fixture to create a FormattingMixin instance for testing."""
15 | # Create the mixin without calling its __init__ to avoid config dependencies
16 | mixin = jira_fetcher
17 | # Set up necessary mocks
18 | mixin.preprocessor = MagicMock(spec=JiraPreprocessor)
19 | return mixin
20 |
21 |
22 | def test_markdown_to_jira(formatting_mixin):
23 | """Test markdown_to_jira method with valid input."""
24 | formatting_mixin.preprocessor.markdown_to_jira.return_value = "Converted text"
25 |
26 | result = formatting_mixin.markdown_to_jira("# Markdown text")
27 |
28 | assert result == "Converted text"
29 | formatting_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
30 | "# Markdown text"
31 | )
32 |
33 |
34 | def test_markdown_to_jira_empty_input(formatting_mixin):
35 | """Test markdown_to_jira method with empty input."""
36 | result = formatting_mixin.markdown_to_jira("")
37 |
38 | assert result == ""
39 | formatting_mixin.preprocessor.markdown_to_jira.assert_not_called()
40 |
41 |
42 | def test_markdown_to_jira_exception(formatting_mixin):
43 | """Test markdown_to_jira method with exception."""
44 | formatting_mixin.preprocessor.markdown_to_jira.side_effect = Exception(
45 | "Conversion error"
46 | )
47 |
48 | result = formatting_mixin.markdown_to_jira("# Markdown text")
49 |
50 | assert result == "# Markdown text" # Should return original text on error
51 | formatting_mixin.preprocessor.markdown_to_jira.assert_called_once()
52 |
53 |
54 | def test_format_issue_content_basic(formatting_mixin):
55 | """Test format_issue_content method with basic inputs."""
56 | issue_key = "TEST-123"
57 | issue = {
58 | "fields": {
59 | "summary": "Test issue",
60 | "issuetype": {"name": "Bug"},
61 | "status": {"name": "Open"},
62 | }
63 | }
64 | description = "This is a test issue."
65 | comments = []
66 | created_date = "2023-01-01 12:00:00"
67 | epic_info = {"epic_key": None, "epic_name": None}
68 |
69 | result = formatting_mixin.format_issue_content(
70 | issue_key, issue, description, comments, created_date, epic_info
71 | )
72 |
73 | # Check that the result contains all the basic information
74 | assert "Issue: TEST-123" in result
75 | assert "Title: Test issue" in result
76 | assert "Type: Bug" in result
77 | assert "Status: Open" in result
78 | assert "Created: 2023-01-01 12:00:00" in result
79 | assert "Description:" in result
80 | assert "This is a test issue." in result
81 | assert "Comments:" not in result # No comments
82 |
83 |
84 | def test_format_issue_content_with_epic(formatting_mixin):
85 | """Test format_issue_content method with epic information."""
86 | issue_key = "TEST-123"
87 | issue = {
88 | "fields": {
89 | "summary": "Test issue",
90 | "issuetype": {"name": "Bug"},
91 | "status": {"name": "Open"},
92 | }
93 | }
94 | description = "This is a test issue."
95 | comments = []
96 | created_date = "2023-01-01 12:00:00"
97 | epic_info = {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
98 |
99 | result = formatting_mixin.format_issue_content(
100 | issue_key, issue, description, comments, created_date, epic_info
101 | )
102 |
103 | # Check that the result contains the epic information
104 | assert "Epic: EPIC-1 - Test Epic" in result
105 |
106 |
107 | def test_format_issue_content_with_comments(formatting_mixin):
108 | """Test format_issue_content method with comments."""
109 | issue_key = "TEST-123"
110 | issue = {
111 | "fields": {
112 | "summary": "Test issue",
113 | "issuetype": {"name": "Bug"},
114 | "status": {"name": "Open"},
115 | }
116 | }
117 | description = "This is a test issue."
118 | comments = [
119 | {"created": "2023-01-02", "author": "User1", "body": "Comment 1"},
120 | {"created": "2023-01-03", "author": "User2", "body": "Comment 2"},
121 | ]
122 | created_date = "2023-01-01 12:00:00"
123 | epic_info = {"epic_key": None, "epic_name": None}
124 |
125 | result = formatting_mixin.format_issue_content(
126 | issue_key, issue, description, comments, created_date, epic_info
127 | )
128 |
129 | # Check that the result contains the comments
130 | assert "Comments:" in result
131 | assert "2023-01-02 - User1: Comment 1" in result
132 | assert "2023-01-03 - User2: Comment 2" in result
133 |
134 |
135 | def test_create_issue_metadata_basic(formatting_mixin):
136 | """Test create_issue_metadata method with basic inputs."""
137 | issue_key = "TEST-123"
138 | issue = {
139 | "fields": {
140 | "summary": "Test issue",
141 | "issuetype": {"name": "Bug"},
142 | "status": {"name": "Open"},
143 | "project": {"key": "TEST", "name": "Test Project"},
144 | }
145 | }
146 | comments = []
147 | created_date = "2023-01-01 12:00:00"
148 | epic_info = {"epic_key": None, "epic_name": None}
149 |
150 | result = formatting_mixin.create_issue_metadata(
151 | issue_key, issue, comments, created_date, epic_info
152 | )
153 |
154 | # Check that the result contains all the basic metadata
155 | assert result["key"] == "TEST-123"
156 | assert result["summary"] == "Test issue"
157 | assert result["type"] == "Bug"
158 | assert result["status"] == "Open"
159 | assert result["created"] == "2023-01-01 12:00:00"
160 | assert result["source"] == "jira"
161 | assert result["project"] == "TEST"
162 | assert result["project_name"] == "Test Project"
163 | assert result["comment_count"] == 0
164 | assert "epic_key" not in result
165 | assert "epic_name" not in result
166 |
167 |
168 | def test_create_issue_metadata_with_assignee_and_reporter(formatting_mixin):
169 | """Test create_issue_metadata method with assignee and reporter."""
170 | issue_key = "TEST-123"
171 | issue = {
172 | "fields": {
173 | "summary": "Test issue",
174 | "issuetype": {"name": "Bug"},
175 | "status": {"name": "Open"},
176 | "assignee": {"displayName": "John Doe", "name": "jdoe"},
177 | "reporter": {"displayName": "Jane Smith", "name": "jsmith"},
178 | "project": {"key": "TEST", "name": "Test Project"},
179 | }
180 | }
181 | comments = []
182 | created_date = "2023-01-01 12:00:00"
183 | epic_info = {"epic_key": None, "epic_name": None}
184 |
185 | result = formatting_mixin.create_issue_metadata(
186 | issue_key, issue, comments, created_date, epic_info
187 | )
188 |
189 | # Check that the result contains assignee and reporter
190 | assert result["assignee"] == "John Doe"
191 | assert result["reporter"] == "Jane Smith"
192 |
193 |
194 | def test_create_issue_metadata_with_priority(formatting_mixin):
195 | """Test create_issue_metadata method with priority."""
196 | issue_key = "TEST-123"
197 | issue = {
198 | "fields": {
199 | "summary": "Test issue",
200 | "issuetype": {"name": "Bug"},
201 | "status": {"name": "Open"},
202 | "priority": {"name": "High"},
203 | "project": {"key": "TEST", "name": "Test Project"},
204 | }
205 | }
206 | comments = []
207 | created_date = "2023-01-01 12:00:00"
208 | epic_info = {"epic_key": None, "epic_name": None}
209 |
210 | result = formatting_mixin.create_issue_metadata(
211 | issue_key, issue, comments, created_date, epic_info
212 | )
213 |
214 | # Check that the result contains priority
215 | assert result["priority"] == "High"
216 |
217 |
218 | def test_create_issue_metadata_with_epic(formatting_mixin):
219 | """Test create_issue_metadata method with epic information."""
220 | issue_key = "TEST-123"
221 | issue = {
222 | "fields": {
223 | "summary": "Test issue",
224 | "issuetype": {"name": "Bug"},
225 | "status": {"name": "Open"},
226 | "project": {"key": "TEST", "name": "Test Project"},
227 | }
228 | }
229 | comments = []
230 | created_date = "2023-01-01 12:00:00"
231 | epic_info = {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
232 |
233 | result = formatting_mixin.create_issue_metadata(
234 | issue_key, issue, comments, created_date, epic_info
235 | )
236 |
237 | # Check that the result contains epic information
238 | assert result["epic_key"] == "EPIC-1"
239 | assert result["epic_name"] == "Test Epic"
240 |
241 |
242 | def test_extract_epic_information_no_fields(formatting_mixin):
243 | """Test extract_epic_information method with issue having no fields."""
244 | issue = {}
245 |
246 | result = formatting_mixin.extract_epic_information(issue)
247 |
248 | assert result == {"epic_key": None, "epic_name": None}
249 |
250 |
251 | def test_extract_epic_information_with_field_ids(formatting_mixin):
252 | """Test extract_epic_information method with field IDs available."""
253 | issue = {"fields": {"customfield_10001": "EPIC-1"}}
254 |
255 | # Mock get_field_ids_to_epic method
256 | field_ids = {"Epic Link": "customfield_10001", "Epic Name": "customfield_10002"}
257 | formatting_mixin.get_field_ids_to_epic = MagicMock(return_value=field_ids)
258 |
259 | # Mock get_issue method
260 | epic_issue = {"fields": {"customfield_10002": "Test Epic"}}
261 | formatting_mixin.get_issue = MagicMock(return_value=epic_issue)
262 |
263 | result = formatting_mixin.extract_epic_information(issue)
264 |
265 | assert result == {"epic_key": "EPIC-1", "epic_name": "Test Epic"}
266 | formatting_mixin.get_field_ids_to_epic.assert_called_once()
267 | formatting_mixin.get_issue.assert_called_once_with("EPIC-1")
268 |
269 |
270 | def test_extract_epic_information_get_issue_exception(formatting_mixin):
271 | """Test extract_epic_information method with get_issue exception."""
272 | issue = {"fields": {"customfield_10001": "EPIC-1"}}
273 |
274 | # Mock get_field_ids_to_epic method
275 | field_ids = {"Epic Link": "customfield_10001", "Epic Name": "customfield_10002"}
276 | formatting_mixin.get_field_ids_to_epic = MagicMock(return_value=field_ids)
277 |
278 | # Mock get_issue method to raise exception
279 | formatting_mixin.get_issue = MagicMock(side_effect=Exception("API error"))
280 |
281 | result = formatting_mixin.extract_epic_information(issue)
282 |
283 | assert result == {"epic_key": "EPIC-1", "epic_name": None}
284 | formatting_mixin.get_field_ids_to_epic.assert_called_once()
285 | formatting_mixin.get_issue.assert_called_once()
286 |
287 |
288 | def test_sanitize_html_valid(formatting_mixin):
289 | """Test sanitize_html method with valid HTML."""
290 | html_content = "<p>This is <b>bold</b> text.</p>"
291 |
292 | result = formatting_mixin.sanitize_html(html_content)
293 |
294 | assert result == "This is bold text."
295 |
296 |
297 | def test_sanitize_html_with_entities(formatting_mixin):
298 | """Test sanitize_html method with HTML entities."""
299 | html_content = "<p>This & that</p>"
300 |
301 | result = formatting_mixin.sanitize_html(html_content)
302 |
303 | assert result == "This & that"
304 |
305 |
306 | def test_sanitize_html_empty(formatting_mixin):
307 | """Test sanitize_html method with empty input."""
308 | result = formatting_mixin.sanitize_html("")
309 |
310 | assert result == ""
311 |
312 |
313 | def test_sanitize_html_exception(formatting_mixin):
314 | """Test sanitize_html method with exception."""
315 | # Mock re.sub to raise exception
316 | with patch("re.sub", side_effect=Exception("Regex error")):
317 | result = formatting_mixin.sanitize_html("<p>Test</p>")
318 |
319 | assert result == "<p>Test</p>" # Should return original on error
320 |
321 |
322 | def test_sanitize_transition_fields_basic(formatting_mixin):
323 | """Test sanitize_transition_fields method with basic fields."""
324 | fields = {"summary": "Test issue", "description": "This is a test issue."}
325 |
326 | result = formatting_mixin.sanitize_transition_fields(fields)
327 |
328 | assert result == fields
329 |
330 |
331 | def test_sanitize_transition_fields_with_assignee(formatting_mixin):
332 | """Test sanitize_transition_fields method with assignee field."""
333 | fields = {"summary": "Test issue", "assignee": "jdoe"}
334 |
335 | # Mock _get_account_id method
336 | formatting_mixin._get_account_id = MagicMock(return_value="account-123")
337 |
338 | result = formatting_mixin.sanitize_transition_fields(fields)
339 |
340 | assert result["summary"] == "Test issue"
341 | assert result["assignee"] == {"accountId": "account-123"}
342 | formatting_mixin._get_account_id.assert_called_once_with("jdoe")
343 |
344 |
345 | def test_sanitize_transition_fields_with_assignee_dict(formatting_mixin):
346 | """Test sanitize_transition_fields method with assignee as dictionary."""
347 | fields = {"summary": "Test issue", "assignee": {"accountId": "account-123"}}
348 |
349 | # Mock _get_account_id method
350 | formatting_mixin._get_account_id = MagicMock()
351 |
352 | result = formatting_mixin.sanitize_transition_fields(fields)
353 |
354 | assert result["summary"] == "Test issue"
355 | assert result["assignee"] == {"accountId": "account-123"}
356 | formatting_mixin._get_account_id.assert_not_called()
357 |
358 |
359 | def test_sanitize_transition_fields_with_reporter(formatting_mixin):
360 | """Test sanitize_transition_fields method with reporter field."""
361 | fields = {"summary": "Test issue", "reporter": "jsmith"}
362 |
363 | # Mock _get_account_id method
364 | formatting_mixin._get_account_id = MagicMock(return_value="account-456")
365 |
366 | result = formatting_mixin.sanitize_transition_fields(fields)
367 |
368 | assert result["summary"] == "Test issue"
369 | assert result["reporter"] == {"accountId": "account-456"}
370 | formatting_mixin._get_account_id.assert_called_once_with("jsmith")
371 |
372 |
373 | def test_sanitize_transition_fields_with_none_value(formatting_mixin):
374 | """Test sanitize_transition_fields method with None value."""
375 | fields = {"summary": "Test issue", "assignee": None}
376 |
377 | result = formatting_mixin.sanitize_transition_fields(fields)
378 |
379 | assert result == {"summary": "Test issue"}
380 |
381 |
382 | def test_add_comment_to_transition_data_with_comment(formatting_mixin):
383 | """Test add_comment_to_transition_data method with comment."""
384 | transition_data = {"transition": {"id": "10"}}
385 | comment = "This is a comment"
386 |
387 | # Mock markdown_to_jira method
388 | formatting_mixin.markdown_to_jira = MagicMock(return_value="Converted comment")
389 |
390 | result = formatting_mixin.add_comment_to_transition_data(transition_data, comment)
391 |
392 | assert result["transition"] == {"id": "10"}
393 | assert result["update"]["comment"][0]["add"]["body"] == "Converted comment"
394 | formatting_mixin.markdown_to_jira.assert_called_once_with("This is a comment")
395 |
396 |
397 | def test_add_comment_to_transition_data_without_comment(formatting_mixin):
398 | """Test add_comment_to_transition_data method without comment."""
399 | transition_data = {"transition": {"id": "10"}}
400 |
401 | result = formatting_mixin.add_comment_to_transition_data(transition_data, None)
402 |
403 | assert result == transition_data # Should return unmodified data
404 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/servers/main.py:
--------------------------------------------------------------------------------
```python
1 | """Main FastMCP server setup for Atlassian integration."""
2 |
3 | import logging
4 | from collections.abc import AsyncIterator
5 | from contextlib import asynccontextmanager
6 | from typing import Any, Literal, Optional
7 |
8 | from cachetools import TTLCache
9 | from fastmcp import FastMCP
10 | from fastmcp.tools import Tool as FastMCPTool
11 | from mcp.types import Tool as MCPTool
12 | from starlette.applications import Starlette
13 | from starlette.middleware import Middleware
14 | from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
15 | from starlette.requests import Request
16 | from starlette.responses import JSONResponse
17 |
18 | from mcp_atlassian.confluence import ConfluenceFetcher
19 | from mcp_atlassian.confluence.config import ConfluenceConfig
20 | from mcp_atlassian.jira import JiraFetcher
21 | from mcp_atlassian.jira.config import JiraConfig
22 | from mcp_atlassian.utils.environment import get_available_services
23 | from mcp_atlassian.utils.io import is_read_only_mode
24 | from mcp_atlassian.utils.logging import mask_sensitive
25 | from mcp_atlassian.utils.tools import get_enabled_tools, should_include_tool
26 |
27 | from .confluence import confluence_mcp
28 | from .context import MainAppContext
29 | from .jira import jira_mcp
30 |
31 | logger = logging.getLogger("mcp-atlassian.server.main")
32 |
33 |
34 | async def health_check(request: Request) -> JSONResponse:
35 | return JSONResponse({"status": "ok"})
36 |
37 |
38 | @asynccontextmanager
39 | async def main_lifespan(app: FastMCP[MainAppContext]) -> AsyncIterator[dict]:
40 | logger.info("Main Atlassian MCP server lifespan starting...")
41 | services = get_available_services()
42 | read_only = is_read_only_mode()
43 | enabled_tools = get_enabled_tools()
44 |
45 | loaded_jira_config: JiraConfig | None = None
46 | loaded_confluence_config: ConfluenceConfig | None = None
47 |
48 | if services.get("jira"):
49 | try:
50 | jira_config = JiraConfig.from_env()
51 | if jira_config.is_auth_configured():
52 | loaded_jira_config = jira_config
53 | logger.info(
54 | "Jira configuration loaded and authentication is configured."
55 | )
56 | else:
57 | logger.warning(
58 | "Jira URL found, but authentication is not fully configured. Jira tools will be unavailable."
59 | )
60 | except Exception as e:
61 | logger.error(f"Failed to load Jira configuration: {e}", exc_info=True)
62 |
63 | if services.get("confluence"):
64 | try:
65 | confluence_config = ConfluenceConfig.from_env()
66 | if confluence_config.is_auth_configured():
67 | loaded_confluence_config = confluence_config
68 | logger.info(
69 | "Confluence configuration loaded and authentication is configured."
70 | )
71 | else:
72 | logger.warning(
73 | "Confluence URL found, but authentication is not fully configured. Confluence tools will be unavailable."
74 | )
75 | except Exception as e:
76 | logger.error(f"Failed to load Confluence configuration: {e}", exc_info=True)
77 |
78 | app_context = MainAppContext(
79 | full_jira_config=loaded_jira_config,
80 | full_confluence_config=loaded_confluence_config,
81 | read_only=read_only,
82 | enabled_tools=enabled_tools,
83 | )
84 | logger.info(f"Read-only mode: {'ENABLED' if read_only else 'DISABLED'}")
85 | logger.info(f"Enabled tools filter: {enabled_tools or 'All tools enabled'}")
86 |
87 | try:
88 | yield {"app_lifespan_context": app_context}
89 | except Exception as e:
90 | logger.error(f"Error during lifespan: {e}", exc_info=True)
91 | raise
92 | finally:
93 | logger.info("Main Atlassian MCP server lifespan shutting down...")
94 | # Perform any necessary cleanup here
95 | try:
96 | # Close any open connections if needed
97 | if loaded_jira_config:
98 | logger.debug("Cleaning up Jira resources...")
99 | if loaded_confluence_config:
100 | logger.debug("Cleaning up Confluence resources...")
101 | except Exception as e:
102 | logger.error(f"Error during cleanup: {e}", exc_info=True)
103 | logger.info("Main Atlassian MCP server lifespan shutdown complete.")
104 |
105 |
106 | class AtlassianMCP(FastMCP[MainAppContext]):
107 | """Custom FastMCP server class for Atlassian integration with tool filtering."""
108 |
109 | async def _mcp_list_tools(self) -> list[MCPTool]:
110 | # Filter tools based on enabled_tools, read_only mode, and service configuration from the lifespan context.
111 | req_context = self._mcp_server.request_context
112 | if req_context is None or req_context.lifespan_context is None:
113 | logger.warning(
114 | "Lifespan context not available during _main_mcp_list_tools call."
115 | )
116 | return []
117 |
118 | lifespan_ctx_dict = req_context.lifespan_context
119 | app_lifespan_state: MainAppContext | None = (
120 | lifespan_ctx_dict.get("app_lifespan_context")
121 | if isinstance(lifespan_ctx_dict, dict)
122 | else None
123 | )
124 | read_only = (
125 | getattr(app_lifespan_state, "read_only", False)
126 | if app_lifespan_state
127 | else False
128 | )
129 | enabled_tools_filter = (
130 | getattr(app_lifespan_state, "enabled_tools", None)
131 | if app_lifespan_state
132 | else None
133 | )
134 | logger.debug(
135 | f"_main_mcp_list_tools: read_only={read_only}, enabled_tools_filter={enabled_tools_filter}"
136 | )
137 |
138 | all_tools: dict[str, FastMCPTool] = await self.get_tools()
139 | logger.debug(
140 | f"Aggregated {len(all_tools)} tools before filtering: {list(all_tools.keys())}"
141 | )
142 |
143 | filtered_tools: list[MCPTool] = []
144 | for registered_name, tool_obj in all_tools.items():
145 | tool_tags = tool_obj.tags
146 |
147 | if not should_include_tool(registered_name, enabled_tools_filter):
148 | logger.debug(f"Excluding tool '{registered_name}' (not enabled)")
149 | continue
150 |
151 | if tool_obj and read_only and "write" in tool_tags:
152 | logger.debug(
153 | f"Excluding tool '{registered_name}' due to read-only mode and 'write' tag"
154 | )
155 | continue
156 |
157 | # Exclude Jira/Confluence tools if config is not fully authenticated
158 | is_jira_tool = "jira" in tool_tags
159 | is_confluence_tool = "confluence" in tool_tags
160 | service_configured_and_available = True
161 | if app_lifespan_state:
162 | if is_jira_tool and not app_lifespan_state.full_jira_config:
163 | logger.debug(
164 | f"Excluding Jira tool '{registered_name}' as Jira configuration/authentication is incomplete."
165 | )
166 | service_configured_and_available = False
167 | if is_confluence_tool and not app_lifespan_state.full_confluence_config:
168 | logger.debug(
169 | f"Excluding Confluence tool '{registered_name}' as Confluence configuration/authentication is incomplete."
170 | )
171 | service_configured_and_available = False
172 | elif is_jira_tool or is_confluence_tool:
173 | logger.warning(
174 | f"Excluding tool '{registered_name}' as application context is unavailable to verify service configuration."
175 | )
176 | service_configured_and_available = False
177 |
178 | if not service_configured_and_available:
179 | continue
180 |
181 | filtered_tools.append(tool_obj.to_mcp_tool(name=registered_name))
182 |
183 | logger.debug(
184 | f"_main_mcp_list_tools: Total tools after filtering: {len(filtered_tools)}"
185 | )
186 | return filtered_tools
187 |
188 | def http_app(
189 | self,
190 | path: str | None = None,
191 | middleware: list[Middleware] | None = None,
192 | transport: Literal["streamable-http", "sse"] = "streamable-http",
193 | ) -> "Starlette":
194 | user_token_mw = Middleware(UserTokenMiddleware, mcp_server_ref=self)
195 | final_middleware_list = [user_token_mw]
196 | if middleware:
197 | final_middleware_list.extend(middleware)
198 | app = super().http_app(
199 | path=path, middleware=final_middleware_list, transport=transport
200 | )
201 | return app
202 |
203 |
204 | token_validation_cache: TTLCache[
205 | int, tuple[bool, str | None, JiraFetcher | None, ConfluenceFetcher | None]
206 | ] = TTLCache(maxsize=100, ttl=300)
207 |
208 |
209 | class UserTokenMiddleware(BaseHTTPMiddleware):
210 | """Middleware to extract Atlassian user tokens/credentials from Authorization headers."""
211 |
212 | def __init__(
213 | self, app: Any, mcp_server_ref: Optional["AtlassianMCP"] = None
214 | ) -> None:
215 | super().__init__(app)
216 | self.mcp_server_ref = mcp_server_ref
217 | if not self.mcp_server_ref:
218 | logger.warning(
219 | "UserTokenMiddleware initialized without mcp_server_ref. Path matching for MCP endpoint might fail if settings are needed."
220 | )
221 |
222 | async def dispatch(
223 | self, request: Request, call_next: RequestResponseEndpoint
224 | ) -> JSONResponse:
225 | logger.debug(
226 | f"UserTokenMiddleware.dispatch: ENTERED for request path='{request.url.path}', method='{request.method}'"
227 | )
228 | mcp_server_instance = self.mcp_server_ref
229 | if mcp_server_instance is None:
230 | logger.debug(
231 | "UserTokenMiddleware.dispatch: self.mcp_server_ref is None. Skipping MCP auth logic."
232 | )
233 | return await call_next(request)
234 |
235 | mcp_path = mcp_server_instance.settings.streamable_http_path.rstrip("/")
236 | request_path = request.url.path.rstrip("/")
237 | logger.debug(
238 | f"UserTokenMiddleware.dispatch: Comparing request_path='{request_path}' with mcp_path='{mcp_path}'. Request method='{request.method}'"
239 | )
240 | if request_path == mcp_path and request.method == "POST":
241 | auth_header = request.headers.get("Authorization")
242 | cloud_id_header = request.headers.get("X-Atlassian-Cloud-Id")
243 |
244 | token_for_log = mask_sensitive(
245 | auth_header.split(" ", 1)[1].strip()
246 | if auth_header and " " in auth_header
247 | else auth_header
248 | )
249 | logger.debug(
250 | f"UserTokenMiddleware: Path='{request.url.path}', AuthHeader='{mask_sensitive(auth_header)}', ParsedToken(masked)='{token_for_log}', CloudId='{cloud_id_header}'"
251 | )
252 |
253 | # Extract and save cloudId if provided
254 | if cloud_id_header and cloud_id_header.strip():
255 | request.state.user_atlassian_cloud_id = cloud_id_header.strip()
256 | logger.debug(
257 | f"UserTokenMiddleware: Extracted cloudId from header: {cloud_id_header.strip()}"
258 | )
259 | else:
260 | request.state.user_atlassian_cloud_id = None
261 | logger.debug(
262 | "UserTokenMiddleware: No cloudId header provided, will use global config"
263 | )
264 |
265 | # Check for mcp-session-id header for debugging
266 | mcp_session_id = request.headers.get("mcp-session-id")
267 | if mcp_session_id:
268 | logger.debug(
269 | f"UserTokenMiddleware: MCP-Session-ID header found: {mcp_session_id}"
270 | )
271 | if auth_header and auth_header.startswith("Bearer "):
272 | token = auth_header.split(" ", 1)[1].strip()
273 | if not token:
274 | return JSONResponse(
275 | {"error": "Unauthorized: Empty Bearer token"},
276 | status_code=401,
277 | )
278 | logger.debug(
279 | f"UserTokenMiddleware.dispatch: Bearer token extracted (masked): ...{mask_sensitive(token, 8)}"
280 | )
281 | request.state.user_atlassian_token = token
282 | request.state.user_atlassian_auth_type = "oauth"
283 | request.state.user_atlassian_email = None
284 | logger.debug(
285 | f"UserTokenMiddleware.dispatch: Set request.state (pre-validation): "
286 | f"auth_type='{getattr(request.state, 'user_atlassian_auth_type', 'N/A')}', "
287 | f"token_present={bool(getattr(request.state, 'user_atlassian_token', None))}"
288 | )
289 | elif auth_header and auth_header.startswith("Token "):
290 | token = auth_header.split(" ", 1)[1].strip()
291 | if not token:
292 | return JSONResponse(
293 | {"error": "Unauthorized: Empty Token (PAT)"},
294 | status_code=401,
295 | )
296 | logger.debug(
297 | f"UserTokenMiddleware.dispatch: PAT (Token scheme) extracted (masked): ...{mask_sensitive(token, 8)}"
298 | )
299 | request.state.user_atlassian_token = token
300 | request.state.user_atlassian_auth_type = "pat"
301 | request.state.user_atlassian_email = (
302 | None # PATs don't carry email in the token itself
303 | )
304 | logger.debug(
305 | "UserTokenMiddleware.dispatch: Set request.state for PAT auth."
306 | )
307 | elif auth_header:
308 | logger.warning(
309 | f"Unsupported Authorization type for {request.url.path}: {auth_header.split(' ', 1)[0] if ' ' in auth_header else 'UnknownType'}"
310 | )
311 | return JSONResponse(
312 | {
313 | "error": "Unauthorized: Only 'Bearer <OAuthToken>' or 'Token <PAT>' types are supported."
314 | },
315 | status_code=401,
316 | )
317 | else:
318 | logger.debug(
319 | f"No Authorization header provided for {request.url.path}. Will proceed with global/fallback server configuration if applicable."
320 | )
321 | response = await call_next(request)
322 | logger.debug(
323 | f"UserTokenMiddleware.dispatch: EXITED for request path='{request.url.path}'"
324 | )
325 | return response
326 |
327 |
328 | main_mcp = AtlassianMCP(name="Atlassian MCP", lifespan=main_lifespan)
329 | main_mcp.mount("jira", jira_mcp)
330 | main_mcp.mount("confluence", confluence_mcp)
331 |
332 |
333 | @main_mcp.custom_route("/healthz", methods=["GET"], include_in_schema=False)
334 | async def _health_check_route(request: Request) -> JSONResponse:
335 | return await health_check(request)
336 |
337 |
338 | logger.info("Added /healthz endpoint for Kubernetes probes")
339 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/users.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira user operations."""
2 |
3 | import logging
4 | import re
5 | from typing import TYPE_CHECKING, TypeVar
6 |
7 | import requests
8 | from requests.exceptions import HTTPError
9 |
10 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
11 | from mcp_atlassian.models.jira.common import JiraUser
12 |
13 | from .client import JiraClient
14 |
15 | if TYPE_CHECKING:
16 | from mcp_atlassian.models.jira.common import JiraUser
17 |
18 | JiraUserType = TypeVar("JiraUserType", bound="JiraUser")
19 |
20 | logger = logging.getLogger("mcp-jira")
21 |
22 |
23 | class UsersMixin(JiraClient):
24 | """Mixin for Jira user operations."""
25 |
26 | def get_current_user_account_id(self) -> str:
27 | """
28 | Get the account ID of the current user.
29 |
30 | Returns:
31 | str: Account ID of the current user.
32 |
33 | Raises:
34 | Exception: If unable to get the current user's account ID.
35 | """
36 | if getattr(self, "_current_user_account_id", None) is not None:
37 | return self._current_user_account_id
38 |
39 | try:
40 | logger.debug(
41 | "Calling self.jira.myself() to get current user details for account ID."
42 | )
43 | myself_data = self.jira.myself()
44 |
45 | if not isinstance(myself_data, dict):
46 | error_msg = "Failed to get user data: response was not a dictionary."
47 | logger.error(
48 | f"{error_msg} Response type: {type(myself_data)}, Response: {str(myself_data)[:200]}"
49 | )
50 | raise Exception(error_msg)
51 |
52 | logger.debug(f"Received myself_data: {str(myself_data)[:500]}")
53 |
54 | account_id = None
55 | if isinstance(myself_data.get("accountId"), str):
56 | account_id = myself_data["accountId"]
57 | elif isinstance(myself_data.get("key"), str):
58 | logger.info(
59 | "Using 'key' instead of 'accountId' for Jira Data Center/Server"
60 | )
61 | account_id = myself_data["key"]
62 | elif isinstance(myself_data.get("name"), str):
63 | logger.info(
64 | "Using 'name' instead of 'accountId' for Jira Data Center/Server"
65 | )
66 | account_id = myself_data["name"]
67 |
68 | if account_id is None:
69 | error_msg = f"Could not find accountId, key, or name in user data: {str(myself_data)[:200]}"
70 | raise ValueError(error_msg)
71 |
72 | self._current_user_account_id = account_id
73 | return account_id
74 | except HTTPError as http_err:
75 | response_content = ""
76 | if http_err.response is not None:
77 | try:
78 | response_content = http_err.response.text
79 | except Exception:
80 | response_content = "(could not decode response content)"
81 | logger.error(
82 | f"HTTPError getting current user account ID: {http_err}. Response: {response_content[:500]}"
83 | )
84 | error_msg = f"Unable to get current user account ID: {http_err}"
85 | raise Exception(error_msg) from http_err
86 | except Exception as e:
87 | logger.error(f"Error getting current user account ID: {e}", exc_info=True)
88 | error_msg = f"Unable to get current user account ID: {e}"
89 | raise Exception(error_msg) from e
90 |
91 | def _get_account_id(self, assignee: str) -> str:
92 | """
93 | Get the account ID for a username or account ID.
94 |
95 | Args:
96 | assignee (str): Username or account ID.
97 |
98 | Returns:
99 | str: Account ID.
100 |
101 | Raises:
102 | ValueError: If the account ID could not be found.
103 | """
104 | # If it looks like an account ID already, return it
105 | if assignee.startswith("5") and len(assignee) >= 10:
106 | return assignee
107 |
108 | account_id = self._lookup_user_directly(assignee)
109 | if account_id:
110 | return account_id
111 |
112 | account_id = self._lookup_user_by_permissions(assignee)
113 | if account_id:
114 | return account_id
115 |
116 | error_msg = f"Could not find account ID for user: {assignee}"
117 | raise ValueError(error_msg)
118 |
119 | def _lookup_user_directly(self, username: str) -> str | None:
120 | """
121 | Look up a user account ID directly.
122 |
123 | Args:
124 | username (str): Username to look up.
125 |
126 | Returns:
127 | Optional[str]: Account ID if found, None otherwise.
128 | """
129 | try:
130 | params = {}
131 | if self.config.is_cloud:
132 | params["query"] = username
133 | else:
134 | params["username"] = username
135 |
136 | response = self.jira.user_find_by_user_string(**params, start=0, limit=1)
137 | if not isinstance(response, list):
138 | msg = f"Unexpected return value type from `jira.user_find_by_user_string`: {type(response)}"
139 | logger.error(msg)
140 | return None
141 |
142 | for user in response:
143 | if (
144 | user.get("displayName", "").lower() == username.lower()
145 | or user.get("name", "").lower() == username.lower()
146 | or user.get("emailAddress", "").lower() == username.lower()
147 | ):
148 | if self.config.is_cloud:
149 | if "accountId" in user:
150 | return user["accountId"]
151 | else:
152 | if "name" in user:
153 | logger.info(
154 | "Using 'name' for assignee field in Jira Data Center/Server"
155 | )
156 | return user["name"]
157 | elif "key" in user:
158 | logger.info(
159 | "Using 'key' as fallback for assignee name in Jira Data Center/Server"
160 | )
161 | return user["key"]
162 | return None
163 | except Exception as e:
164 | logger.info(f"Error looking up user directly: {str(e)}")
165 | return None
166 |
167 | def _lookup_user_by_permissions(self, username: str) -> str | None:
168 | """
169 | Look up a user account ID by permissions.
170 |
171 | Args:
172 | username (str): Username to look up.
173 |
174 | Returns:
175 | Optional[str]: Account ID if found, None otherwise.
176 | """
177 | try:
178 | url = f"{self.config.url}/rest/api/2/user/permission/search"
179 | params = {"query": username, "permissions": "BROWSE"}
180 |
181 | auth = None
182 | headers = {}
183 | if self.config.auth_type == "pat":
184 | headers["Authorization"] = f"Bearer {self.config.personal_token}"
185 | else:
186 | auth = (self.config.username or "", self.config.api_token or "")
187 |
188 | response = requests.get(
189 | url,
190 | params=params,
191 | auth=auth,
192 | headers=headers,
193 | verify=self.config.ssl_verify,
194 | )
195 |
196 | if response.status_code == 200:
197 | data = response.json()
198 | for user in data.get("users", []):
199 | if self.config.is_cloud:
200 | if "accountId" in user:
201 | return user["accountId"]
202 | else:
203 | if "name" in user:
204 | logger.info(
205 | "Using 'name' for assignee field in Jira Data Center/Server"
206 | )
207 | return user["name"]
208 | elif "key" in user:
209 | logger.info(
210 | "Using 'key' as fallback for assignee name in Jira Data Center/Server"
211 | )
212 | return user["key"]
213 | return None
214 | except Exception as e:
215 | logger.info(f"Error looking up user by permissions: {str(e)}")
216 | return None
217 |
218 | def _determine_user_api_params(self, identifier: str) -> dict[str, str]:
219 | """
220 | Determines the correct API parameter and value for the jira.user() call based on the identifier and instance type.
221 |
222 | Args:
223 | identifier (str): User identifier (accountId, username, key, or email).
224 |
225 | Returns:
226 | Dict[str, str]: A dictionary containing the single keyword argument for self.jira.user().
227 |
228 | Raises:
229 | ValueError: If a usable parameter cannot be determined.
230 | """
231 | api_kwargs: dict[str, str] = {}
232 |
233 | # Cloud: identifier is accountId
234 | if self.config.is_cloud and (
235 | re.match(r"^[0-9a-f]{24}$", identifier) or re.match(r"^\d+:\w+", identifier)
236 | ):
237 | api_kwargs["account_id"] = identifier
238 | logger.debug(f"Determined param: account_id='{identifier}' (Cloud)")
239 | # Server/DC: username, key, or email
240 | elif not self.config.is_cloud:
241 | if "@" in identifier:
242 | api_kwargs["username"] = identifier
243 | logger.debug(
244 | f"Determined param: username='{identifier}' (Server/DC email - might not work)"
245 | )
246 | elif "-" in identifier and any(c.isdigit() for c in identifier):
247 | api_kwargs["key"] = identifier
248 | logger.debug(f"Determined param: key='{identifier}' (Server/DC)")
249 | else:
250 | api_kwargs["username"] = identifier
251 | logger.debug(f"Determined param: username='{identifier}' (Server/DC)")
252 | # Cloud: identifier is email
253 | elif self.config.is_cloud and "@" in identifier:
254 | try:
255 | resolved_id = self._lookup_user_directly(identifier)
256 | if resolved_id and (
257 | re.match(r"^[0-9a-f]{24}$", resolved_id)
258 | or re.match(r"^\d+:\w+", resolved_id)
259 | ):
260 | api_kwargs["account_id"] = resolved_id
261 | logger.debug(
262 | f"Resolved email '{identifier}' to accountId '{resolved_id}'. Determined param: account_id (Cloud)"
263 | )
264 | else:
265 | raise ValueError(
266 | f"Could not resolve email '{identifier}' to a valid account ID for Jira Cloud."
267 | )
268 | except Exception as e:
269 | logger.warning(f"Failed to resolve email '{identifier}': {e}")
270 | raise ValueError(
271 | f"Could not resolve email '{identifier}' to a valid account ID for Jira Cloud."
272 | ) from e
273 | # Cloud: identifier is not accountId or email, try to resolve
274 | else:
275 | logger.debug(
276 | f"Identifier '{identifier}' on Cloud is not an account ID or email. Attempting resolution."
277 | )
278 | try:
279 | account_id_resolved = self._get_account_id(identifier)
280 | api_kwargs["account_id"] = account_id_resolved
281 | logger.debug(
282 | f"Resolved identifier '{identifier}' to accountId '{account_id_resolved}'. Determined param: account_id (Cloud)"
283 | )
284 | except ValueError as e:
285 | logger.error(
286 | f"Could not resolve identifier '{identifier}' to a usable format (accountId/username/key)."
287 | )
288 | raise ValueError(
289 | f"Could not determine how to look up user '{identifier}'."
290 | ) from e
291 |
292 | if not api_kwargs:
293 | logger.error(
294 | f"Logic failed to determine API parameters for identifier '{identifier}'"
295 | )
296 | raise ValueError(
297 | f"Could not determine the correct parameter to use for identifier '{identifier}'."
298 | )
299 |
300 | return api_kwargs
301 |
302 | def get_user_profile_by_identifier(self, identifier: str) -> "JiraUser":
303 | """
304 | Retrieve Jira user profile information by identifier.
305 |
306 | Args:
307 | identifier (str): User identifier (accountId, username, key, or email).
308 |
309 | Returns:
310 | JiraUser: JiraUser model with profile information.
311 |
312 | Raises:
313 | ValueError: If the user cannot be found or identifier cannot be resolved.
314 | MCPAtlassianAuthenticationError: If authentication fails.
315 | Exception: For other API errors.
316 | """
317 | api_kwargs = self._determine_user_api_params(identifier)
318 |
319 | try:
320 | logger.debug(f"Calling self.jira.user() with parameters: {api_kwargs}")
321 | user_data = self.jira.user(**api_kwargs)
322 | if not isinstance(user_data, dict):
323 | logger.error(
324 | f"User lookup for '{identifier}' returned unexpected type: {type(user_data)}. Data: {user_data}"
325 | )
326 | raise ValueError(f"User '{identifier}' not found or lookup failed.")
327 | return JiraUser.from_api_response(user_data)
328 | except HTTPError as http_err:
329 | if http_err.response is not None:
330 | response_text = http_err.response.text[:200]
331 | status_code = http_err.response.status_code
332 | if status_code == 404:
333 | raise ValueError(f"User '{identifier}' not found.") from http_err
334 | elif status_code in [401, 403]:
335 | logger.error(
336 | f"Authentication/Permission error for '{identifier}': {status_code}"
337 | )
338 | raise MCPAtlassianAuthenticationError(
339 | f"Permission denied accessing user '{identifier}'."
340 | ) from http_err
341 | else:
342 | logger.error(
343 | f"HTTP error {status_code} for '{identifier}': {http_err}. Response: {response_text}"
344 | )
345 | raise Exception(
346 | f"API error getting user profile for '{identifier}': {http_err}"
347 | ) from http_err
348 | else:
349 | logger.error(
350 | f"Network or unknown HTTP error (no response object) for '{identifier}': {http_err}"
351 | )
352 | raise Exception(
353 | f"Network error getting user profile for '{identifier}': {http_err}"
354 | ) from http_err
355 | except Exception as e:
356 | logger.exception(
357 | f"Unexpected error getting/processing user profile for '{identifier}':"
358 | )
359 | raise Exception(
360 | f"Error processing user profile for '{identifier}': {str(e)}"
361 | ) from e
362 |
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_oauth_setup.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the OAuth setup utilities."""
2 |
3 | import json
4 | from unittest.mock import MagicMock, patch
5 | from urllib.parse import parse_qs, urlparse
6 |
7 | import pytest
8 |
9 | from mcp_atlassian.utils.oauth_setup import (
10 | OAuthSetupArgs,
11 | parse_redirect_uri,
12 | run_oauth_flow,
13 | run_oauth_setup,
14 | )
15 | from tests.utils.assertions import assert_config_contains
16 | from tests.utils.base import BaseAuthTest
17 | from tests.utils.mocks import MockEnvironment, MockOAuthServer
18 |
19 |
20 | class TestCallbackHandlerLogic:
21 | """Tests for URL parsing logic."""
22 |
23 | @pytest.mark.parametrize(
24 | "path,expected_params",
25 | [
26 | (
27 | "/callback?code=test-auth-code&state=test-state",
28 | {"code": ["test-auth-code"], "state": ["test-state"]},
29 | ),
30 | (
31 | "/callback?error=access_denied&error_description=User+denied+access",
32 | {"error": ["access_denied"]},
33 | ),
34 | ("/callback?state=test-state", {"state": ["test-state"]}),
35 | ("/callback", {}),
36 | ],
37 | )
38 | def test_url_parsing(self, path, expected_params):
39 | """Test URL parsing for various callback scenarios."""
40 | query = urlparse(path).query
41 | params = parse_qs(query)
42 |
43 | for key, expected_values in expected_params.items():
44 | assert key in params
45 | assert params[key] == expected_values
46 |
47 |
48 | class TestRedirectUriParsing:
49 | """Tests for redirect URI parsing functionality."""
50 |
51 | @pytest.mark.parametrize(
52 | "redirect_uri,expected_hostname,expected_port",
53 | [
54 | ("http://localhost:8080/callback", "localhost", 8080),
55 | ("https://example.com:9443/callback", "example.com", 9443),
56 | ("http://localhost/callback", "localhost", 80),
57 | ("https://example.com/callback", "example.com", 443),
58 | ("http://127.0.0.1:3000/callback", "127.0.0.1", 3000),
59 | ("https://secure.domain.com:8443/auth", "secure.domain.com", 8443),
60 | ],
61 | )
62 | def test_parse_redirect_uri(self, redirect_uri, expected_hostname, expected_port):
63 | """Test redirect URI parsing for various formats."""
64 | hostname, port = parse_redirect_uri(redirect_uri)
65 | assert hostname == expected_hostname
66 | assert port == expected_port
67 |
68 |
69 | class TestOAuthFlow:
70 | """Tests for OAuth flow orchestration."""
71 |
72 | @pytest.fixture(autouse=True)
73 | def reset_oauth_state(self):
74 | """Reset OAuth global state before each test."""
75 | import mcp_atlassian.utils.oauth_setup as oauth_module
76 |
77 | oauth_module.authorization_code = None
78 | oauth_module.authorization_state = None
79 | oauth_module.callback_received = False
80 | oauth_module.callback_error = None
81 |
82 | def test_run_oauth_flow_success_localhost(self):
83 | """Test successful OAuth flow with localhost redirect."""
84 | with MockOAuthServer.mock_oauth_flow() as mocks:
85 | with (
86 | patch(
87 | "mcp_atlassian.utils.oauth_setup.OAuthConfig"
88 | ) as mock_oauth_config,
89 | patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
90 | patch(
91 | "mcp_atlassian.utils.oauth_setup.start_callback_server"
92 | ) as mock_start_server,
93 | ):
94 | # Setup global state after callback
95 | def setup_callback_state():
96 | import mcp_atlassian.utils.oauth_setup as oauth_module
97 |
98 | oauth_module.authorization_code = "test-auth-code"
99 | oauth_module.authorization_state = "test-state-token"
100 | return True
101 |
102 | mock_wait.side_effect = setup_callback_state
103 | mock_httpd = MagicMock()
104 | mock_start_server.return_value = mock_httpd
105 |
106 | # Setup OAuth config mock
107 | mock_config = MagicMock()
108 | mock_config.exchange_code_for_tokens.return_value = True
109 | mock_config.client_id = "test-client-id"
110 | mock_config.client_secret = "test-client-secret"
111 | mock_config.redirect_uri = "http://localhost:8080/callback"
112 | mock_config.scope = "read:jira-work"
113 | mock_config.cloud_id = "test-cloud-id"
114 | mock_config.access_token = "test-access-token"
115 | mock_config.refresh_token = "test-refresh-token"
116 | mock_oauth_config.return_value = mock_config
117 |
118 | args = OAuthSetupArgs(
119 | client_id="test-client-id",
120 | client_secret="test-client-secret",
121 | redirect_uri="http://localhost:8080/callback",
122 | scope="read:jira-work",
123 | )
124 |
125 | result = run_oauth_flow(args)
126 |
127 | assert result is True
128 | mock_start_server.assert_called_once_with(8080)
129 | mocks["browser"].assert_called_once()
130 | mock_config.exchange_code_for_tokens.assert_called_once_with(
131 | "test-auth-code"
132 | )
133 | mock_httpd.shutdown.assert_called_once()
134 |
135 | def test_run_oauth_flow_success_external_redirect(self):
136 | """Test successful OAuth flow with external redirect URI."""
137 | with MockOAuthServer.mock_oauth_flow() as mocks:
138 | with (
139 | patch(
140 | "mcp_atlassian.utils.oauth_setup.OAuthConfig"
141 | ) as mock_oauth_config,
142 | patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
143 | patch(
144 | "mcp_atlassian.utils.oauth_setup.start_callback_server"
145 | ) as mock_start_server,
146 | ):
147 | # Setup callback state
148 | def setup_callback_state():
149 | import mcp_atlassian.utils.oauth_setup as oauth_module
150 |
151 | oauth_module.authorization_code = "test-auth-code"
152 | oauth_module.authorization_state = "test-state-token"
153 | return True
154 |
155 | mock_wait.side_effect = setup_callback_state
156 |
157 | mock_config = MagicMock()
158 | mock_config.exchange_code_for_tokens.return_value = True
159 | mock_config.client_id = "test-client-id"
160 | mock_config.client_secret = "test-client-secret"
161 | mock_config.redirect_uri = "https://example.com/callback"
162 | mock_config.scope = "read:jira-work"
163 | mock_config.cloud_id = "test-cloud-id"
164 | mock_config.access_token = "test-access-token"
165 | mock_config.refresh_token = "test-refresh-token"
166 | mock_oauth_config.return_value = mock_config
167 |
168 | args = OAuthSetupArgs(
169 | client_id="test-client-id",
170 | client_secret="test-client-secret",
171 | redirect_uri="https://example.com/callback",
172 | scope="read:jira-work",
173 | )
174 |
175 | result = run_oauth_flow(args)
176 |
177 | assert result is True
178 | mock_start_server.assert_not_called() # No local server for external redirect
179 | mocks["browser"].assert_called_once()
180 | mock_config.exchange_code_for_tokens.assert_called_once_with(
181 | "test-auth-code"
182 | )
183 |
184 | def test_run_oauth_flow_server_start_failure(self):
185 | """Test OAuth flow when server fails to start."""
186 | with MockOAuthServer.mock_oauth_flow() as mocks:
187 | with patch(
188 | "mcp_atlassian.utils.oauth_setup.start_callback_server"
189 | ) as mock_start_server:
190 | mock_start_server.side_effect = OSError("Port already in use")
191 |
192 | args = OAuthSetupArgs(
193 | client_id="test-client-id",
194 | client_secret="test-client-secret",
195 | redirect_uri="http://localhost:8080/callback",
196 | scope="read:jira-work",
197 | )
198 |
199 | result = run_oauth_flow(args)
200 | assert result is False
201 | mocks["browser"].assert_not_called()
202 |
203 | @pytest.mark.parametrize(
204 | "failure_condition,expected_result",
205 | [
206 | ("timeout", False),
207 | ("state_mismatch", False),
208 | ("token_exchange_failure", False),
209 | ],
210 | )
211 | def test_run_oauth_flow_failures(self, failure_condition, expected_result):
212 | """Test OAuth flow failure scenarios."""
213 | with MockOAuthServer.mock_oauth_flow() as mocks:
214 | with (
215 | patch(
216 | "mcp_atlassian.utils.oauth_setup.OAuthConfig"
217 | ) as mock_oauth_config,
218 | patch("mcp_atlassian.utils.oauth_setup.wait_for_callback") as mock_wait,
219 | patch(
220 | "mcp_atlassian.utils.oauth_setup.start_callback_server"
221 | ) as mock_start_server,
222 | ):
223 | mock_httpd = MagicMock()
224 | mock_start_server.return_value = mock_httpd
225 | mock_config = MagicMock()
226 | mock_oauth_config.return_value = mock_config
227 |
228 | if failure_condition == "timeout":
229 | mock_wait.return_value = False
230 | elif failure_condition == "state_mismatch":
231 |
232 | def setup_mismatched_state():
233 | import mcp_atlassian.utils.oauth_setup as oauth_module
234 |
235 | oauth_module.authorization_code = "test-auth-code"
236 | oauth_module.authorization_state = "wrong-state"
237 | return True
238 |
239 | mock_wait.side_effect = setup_mismatched_state
240 | elif failure_condition == "token_exchange_failure":
241 |
242 | def setup_callback_state():
243 | import mcp_atlassian.utils.oauth_setup as oauth_module
244 |
245 | oauth_module.authorization_code = "test-auth-code"
246 | oauth_module.authorization_state = "test-state-token"
247 | return True
248 |
249 | mock_wait.side_effect = setup_callback_state
250 | mock_config.exchange_code_for_tokens.return_value = False
251 |
252 | args = OAuthSetupArgs(
253 | client_id="test-client-id",
254 | client_secret="test-client-secret",
255 | redirect_uri="http://localhost:8080/callback",
256 | scope="read:jira-work",
257 | )
258 |
259 | result = run_oauth_flow(args)
260 | assert result == expected_result
261 | mock_httpd.shutdown.assert_called_once()
262 |
263 |
264 | class TestInteractiveSetup(BaseAuthTest):
265 | """Tests for the interactive OAuth setup wizard."""
266 |
267 | def test_run_oauth_setup_with_env_vars(self):
268 | """Test interactive setup using environment variables."""
269 | with MockEnvironment.oauth_env() as env_vars:
270 | with (
271 | patch("builtins.input", side_effect=["", "", "", ""]),
272 | patch(
273 | "mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=True
274 | ) as mock_flow,
275 | ):
276 | result = run_oauth_setup()
277 |
278 | assert result == 0
279 | mock_flow.assert_called_once()
280 | args = mock_flow.call_args[0][0]
281 | assert_config_contains(
282 | vars(args),
283 | client_id=env_vars["ATLASSIAN_OAUTH_CLIENT_ID"],
284 | client_secret=env_vars["ATLASSIAN_OAUTH_CLIENT_SECRET"],
285 | )
286 |
287 | @pytest.mark.parametrize(
288 | "input_values,expected_result",
289 | [
290 | (
291 | [
292 | "user-client-id",
293 | "user-secret",
294 | "http://localhost:9000/callback",
295 | "read:jira-work",
296 | ],
297 | 0,
298 | ),
299 | (["", "client-secret", "", ""], 1), # Missing client ID
300 | (["client-id", "", "", ""], 1), # Missing client secret
301 | ],
302 | )
303 | def test_run_oauth_setup_user_input(self, input_values, expected_result):
304 | """Test interactive setup with various user inputs."""
305 | with MockEnvironment.clean_env():
306 | with (
307 | patch("builtins.input", side_effect=input_values),
308 | patch(
309 | "mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=True
310 | ) as mock_flow,
311 | ):
312 | result = run_oauth_setup()
313 | assert result == expected_result
314 |
315 | if expected_result == 0:
316 | mock_flow.assert_called_once()
317 | else:
318 | mock_flow.assert_not_called()
319 |
320 | def test_run_oauth_setup_flow_failure(self):
321 | """Test interactive setup when OAuth flow fails."""
322 | with MockEnvironment.clean_env():
323 | with (
324 | patch(
325 | "builtins.input", side_effect=["client-id", "client-secret", "", ""]
326 | ),
327 | patch(
328 | "mcp_atlassian.utils.oauth_setup.run_oauth_flow", return_value=False
329 | ),
330 | ):
331 | result = run_oauth_setup()
332 | assert result == 1
333 |
334 |
335 | class TestOAuthSetupArgs:
336 | """Tests for the OAuthSetupArgs dataclass."""
337 |
338 | def test_oauth_setup_args_creation(self):
339 | """Test OAuthSetupArgs dataclass creation."""
340 | args = OAuthSetupArgs(
341 | client_id="test-id",
342 | client_secret="test-secret",
343 | redirect_uri="http://localhost:8080/callback",
344 | scope="read:jira-work",
345 | )
346 |
347 | expected_config = {
348 | "client_id": "test-id",
349 | "client_secret": "test-secret",
350 | "redirect_uri": "http://localhost:8080/callback",
351 | "scope": "read:jira-work",
352 | }
353 | assert_config_contains(vars(args), **expected_config)
354 |
355 |
356 | class TestConfigurationGeneration:
357 | """Tests for configuration output functionality."""
358 |
359 | def test_configuration_serialization(self):
360 | """Test JSON configuration serialization."""
361 | test_config = {
362 | "client_id": "test-id",
363 | "client_secret": "test-secret",
364 | "redirect_uri": "http://localhost:8080/callback",
365 | "scope": "read:jira-work",
366 | "cloud_id": "test-cloud-id",
367 | }
368 |
369 | json_str = json.dumps(test_config, indent=4)
370 | assert "test-id" in json_str
371 | assert "test-cloud-id" in json_str
372 |
373 | # Verify it can be parsed back
374 | parsed = json.loads(json_str)
375 | assert_config_contains(parsed, **test_config)
376 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/v2_adapter.py:
--------------------------------------------------------------------------------
```python
1 | """Confluence REST API v2 adapter for OAuth authentication.
2 |
3 | This module provides direct v2 API calls to handle the deprecated v1 endpoints
4 | when using OAuth authentication. The v1 endpoints have been removed for OAuth
5 | but still work for API token authentication.
6 | """
7 |
8 | import logging
9 | from typing import Any
10 |
11 | import requests
12 | from requests.exceptions import HTTPError
13 |
14 | logger = logging.getLogger("mcp-atlassian")
15 |
16 |
17 | class ConfluenceV2Adapter:
18 | """Adapter for Confluence REST API v2 operations when using OAuth."""
19 |
20 | def __init__(self, session: requests.Session, base_url: str) -> None:
21 | """Initialize the v2 adapter.
22 |
23 | Args:
24 | session: Authenticated requests session (OAuth configured)
25 | base_url: Base URL for the Confluence instance
26 | """
27 | self.session = session
28 | self.base_url = base_url
29 |
30 | def _get_space_id(self, space_key: str) -> str:
31 | """Get space ID from space key using v2 API.
32 |
33 | Args:
34 | space_key: The space key to look up
35 |
36 | Returns:
37 | The space ID
38 |
39 | Raises:
40 | ValueError: If space not found or API error
41 | """
42 | try:
43 | # Use v2 spaces endpoint to get space ID
44 | url = f"{self.base_url}/api/v2/spaces"
45 | params = {"keys": space_key}
46 |
47 | response = self.session.get(url, params=params)
48 | response.raise_for_status()
49 |
50 | data = response.json()
51 | results = data.get("results", [])
52 |
53 | if not results:
54 | raise ValueError(f"Space with key '{space_key}' not found")
55 |
56 | space_id = results[0].get("id")
57 | if not space_id:
58 | raise ValueError(f"No ID found for space '{space_key}'")
59 |
60 | return space_id
61 |
62 | except HTTPError as e:
63 | logger.error(f"HTTP error getting space ID for '{space_key}': {e}")
64 | raise ValueError(f"Failed to get space ID for '{space_key}': {e}") from e
65 | except Exception as e:
66 | logger.error(f"Error getting space ID for '{space_key}': {e}")
67 | raise ValueError(f"Failed to get space ID for '{space_key}': {e}") from e
68 |
69 | def create_page(
70 | self,
71 | space_key: str,
72 | title: str,
73 | body: str,
74 | parent_id: str | None = None,
75 | representation: str = "storage",
76 | status: str = "current",
77 | ) -> dict[str, Any]:
78 | """Create a page using the v2 API.
79 |
80 | Args:
81 | space_key: The key of the space to create the page in
82 | title: The title of the page
83 | body: The content body in the specified representation
84 | parent_id: Optional parent page ID
85 | representation: Content representation format (default: "storage")
86 | status: Page status (default: "current")
87 |
88 | Returns:
89 | The created page data from the API response
90 |
91 | Raises:
92 | ValueError: If page creation fails
93 | """
94 | try:
95 | # Get space ID from space key
96 | space_id = self._get_space_id(space_key)
97 |
98 | # Prepare request data for v2 API
99 | data = {
100 | "spaceId": space_id,
101 | "status": status,
102 | "title": title,
103 | "body": {
104 | "representation": representation,
105 | "value": body,
106 | },
107 | }
108 |
109 | # Add parent if specified
110 | if parent_id:
111 | data["parentId"] = parent_id
112 |
113 | # Make the v2 API call
114 | url = f"{self.base_url}/api/v2/pages"
115 | response = self.session.post(url, json=data)
116 | response.raise_for_status()
117 |
118 | result = response.json()
119 | logger.debug(f"Successfully created page '{title}' with v2 API")
120 |
121 | # Convert v2 response to v1-compatible format for consistency
122 | return self._convert_v2_to_v1_format(result, space_key)
123 |
124 | except HTTPError as e:
125 | logger.error(f"HTTP error creating page '{title}': {e}")
126 | if e.response is not None:
127 | logger.error(f"Response content: {e.response.text}")
128 | raise ValueError(f"Failed to create page '{title}': {e}") from e
129 | except Exception as e:
130 | logger.error(f"Error creating page '{title}': {e}")
131 | raise ValueError(f"Failed to create page '{title}': {e}") from e
132 |
133 | def _get_page_version(self, page_id: str) -> int:
134 | """Get the current version number of a page.
135 |
136 | Args:
137 | page_id: The ID of the page
138 |
139 | Returns:
140 | The current version number
141 |
142 | Raises:
143 | ValueError: If page not found or API error
144 | """
145 | try:
146 | url = f"{self.base_url}/api/v2/pages/{page_id}"
147 | params = {"body-format": "storage"}
148 |
149 | response = self.session.get(url, params=params)
150 | response.raise_for_status()
151 |
152 | data = response.json()
153 | version_number = data.get("version", {}).get("number")
154 |
155 | if version_number is None:
156 | raise ValueError(f"No version number found for page '{page_id}'")
157 |
158 | return version_number
159 |
160 | except HTTPError as e:
161 | logger.error(f"HTTP error getting page version for '{page_id}': {e}")
162 | raise ValueError(f"Failed to get page version for '{page_id}': {e}") from e
163 | except Exception as e:
164 | logger.error(f"Error getting page version for '{page_id}': {e}")
165 | raise ValueError(f"Failed to get page version for '{page_id}': {e}") from e
166 |
167 | def update_page(
168 | self,
169 | page_id: str,
170 | title: str,
171 | body: str,
172 | representation: str = "storage",
173 | version_comment: str = "",
174 | status: str = "current",
175 | ) -> dict[str, Any]:
176 | """Update a page using the v2 API.
177 |
178 | Args:
179 | page_id: The ID of the page to update
180 | title: The new title of the page
181 | body: The new content body in the specified representation
182 | representation: Content representation format (default: "storage")
183 | version_comment: Optional comment for this version
184 | status: Page status (default: "current")
185 |
186 | Returns:
187 | The updated page data from the API response
188 |
189 | Raises:
190 | ValueError: If page update fails
191 | """
192 | try:
193 | # Get current version and increment it
194 | current_version = self._get_page_version(page_id)
195 | new_version = current_version + 1
196 |
197 | # Prepare request data for v2 API
198 | data = {
199 | "id": page_id,
200 | "status": status,
201 | "title": title,
202 | "body": {
203 | "representation": representation,
204 | "value": body,
205 | },
206 | "version": {
207 | "number": new_version,
208 | },
209 | }
210 |
211 | # Add version comment if provided
212 | if version_comment:
213 | data["version"]["message"] = version_comment
214 |
215 | # Make the v2 API call
216 | url = f"{self.base_url}/api/v2/pages/{page_id}"
217 | response = self.session.put(url, json=data)
218 | response.raise_for_status()
219 |
220 | result = response.json()
221 | logger.debug(f"Successfully updated page '{title}' with v2 API")
222 |
223 | # Convert v2 response to v1-compatible format for consistency
224 | # For update, we need to extract space key from the result
225 | space_id = result.get("spaceId")
226 | space_key = self._get_space_key_from_id(space_id) if space_id else "unknown"
227 |
228 | return self._convert_v2_to_v1_format(result, space_key)
229 |
230 | except HTTPError as e:
231 | logger.error(f"HTTP error updating page '{page_id}': {e}")
232 | if e.response is not None:
233 | logger.error(f"Response content: {e.response.text}")
234 | raise ValueError(f"Failed to update page '{page_id}': {e}") from e
235 | except Exception as e:
236 | logger.error(f"Error updating page '{page_id}': {e}")
237 | raise ValueError(f"Failed to update page '{page_id}': {e}") from e
238 |
239 | def _get_space_key_from_id(self, space_id: str) -> str:
240 | """Get space key from space ID using v2 API.
241 |
242 | Args:
243 | space_id: The space ID to look up
244 |
245 | Returns:
246 | The space key
247 |
248 | Raises:
249 | ValueError: If space not found or API error
250 | """
251 | try:
252 | # Use v2 spaces endpoint to get space key
253 | url = f"{self.base_url}/api/v2/spaces/{space_id}"
254 |
255 | response = self.session.get(url)
256 | response.raise_for_status()
257 |
258 | data = response.json()
259 | space_key = data.get("key")
260 |
261 | if not space_key:
262 | raise ValueError(f"No key found for space ID '{space_id}'")
263 |
264 | return space_key
265 |
266 | except HTTPError as e:
267 | logger.error(f"HTTP error getting space key for ID '{space_id}': {e}")
268 | # Return the space_id as fallback
269 | return space_id
270 | except Exception as e:
271 | logger.error(f"Error getting space key for ID '{space_id}': {e}")
272 | # Return the space_id as fallback
273 | return space_id
274 |
275 | def get_page(
276 | self,
277 | page_id: str,
278 | expand: str | None = None,
279 | ) -> dict[str, Any]:
280 | """Get a page using the v2 API.
281 |
282 | Args:
283 | page_id: The ID of the page to retrieve
284 | expand: Fields to expand in the response (not used in v2 API, for compatibility only)
285 |
286 | Returns:
287 | The page data from the API response in v1-compatible format
288 |
289 | Raises:
290 | ValueError: If page retrieval fails
291 | """
292 | try:
293 | # Make the v2 API call to get the page
294 | url = f"{self.base_url}/api/v2/pages/{page_id}"
295 |
296 | # Convert v1 expand parameters to v2 format
297 | params = {"body-format": "storage"}
298 |
299 | response = self.session.get(url, params=params)
300 | response.raise_for_status()
301 |
302 | v2_response = response.json()
303 | logger.debug(f"Successfully retrieved page '{page_id}' with v2 API")
304 |
305 | # Get space key from space ID
306 | space_id = v2_response.get("spaceId")
307 | space_key = self._get_space_key_from_id(space_id) if space_id else "unknown"
308 |
309 | # Convert v2 response to v1-compatible format
310 | v1_compatible = self._convert_v2_to_v1_format(v2_response, space_key)
311 |
312 | # Add body.storage structure if body content exists
313 | if "body" in v2_response and v2_response["body"].get("storage"):
314 | storage_value = v2_response["body"]["storage"].get("value", "")
315 | v1_compatible["body"] = {
316 | "storage": {"value": storage_value, "representation": "storage"}
317 | }
318 |
319 | # Add space information with more details
320 | if space_id:
321 | v1_compatible["space"] = {
322 | "key": space_key,
323 | "id": space_id,
324 | }
325 |
326 | # Add version information
327 | if "version" in v2_response:
328 | v1_compatible["version"] = {
329 | "number": v2_response["version"].get("number", 1)
330 | }
331 |
332 | return v1_compatible
333 |
334 | except HTTPError as e:
335 | logger.error(f"HTTP error getting page '{page_id}': {e}")
336 | if e.response is not None:
337 | logger.error(f"Response content: {e.response.text}")
338 | raise ValueError(f"Failed to get page '{page_id}': {e}") from e
339 | except Exception as e:
340 | logger.error(f"Error getting page '{page_id}': {e}")
341 | raise ValueError(f"Failed to get page '{page_id}': {e}") from e
342 |
343 | def delete_page(self, page_id: str) -> bool:
344 | """Delete a page using the v2 API.
345 |
346 | Args:
347 | page_id: The ID of the page to delete
348 |
349 | Returns:
350 | True if the page was successfully deleted, False otherwise
351 |
352 | Raises:
353 | ValueError: If page deletion fails
354 | """
355 | try:
356 | # Make the v2 API call to delete the page
357 | url = f"{self.base_url}/api/v2/pages/{page_id}"
358 | response = self.session.delete(url)
359 | response.raise_for_status()
360 |
361 | logger.debug(f"Successfully deleted page '{page_id}' with v2 API")
362 |
363 | # Check if status code indicates success (204 No Content is typical for deletes)
364 | if response.status_code in [200, 204]:
365 | return True
366 |
367 | # If we get here, it's an unexpected success status
368 | logger.warning(
369 | f"Delete page returned unexpected status {response.status_code}"
370 | )
371 | return True
372 |
373 | except HTTPError as e:
374 | logger.error(f"HTTP error deleting page '{page_id}': {e}")
375 | if e.response is not None:
376 | logger.error(f"Response content: {e.response.text}")
377 | raise ValueError(f"Failed to delete page '{page_id}': {e}") from e
378 | except Exception as e:
379 | logger.error(f"Error deleting page '{page_id}': {e}")
380 | raise ValueError(f"Failed to delete page '{page_id}': {e}") from e
381 |
382 | def _convert_v2_to_v1_format(
383 | self, v2_response: dict[str, Any], space_key: str
384 | ) -> dict[str, Any]:
385 | """Convert v2 API response to v1-compatible format.
386 |
387 | This ensures compatibility with existing code that expects v1 response format.
388 |
389 | Args:
390 | v2_response: The response from v2 API
391 | space_key: The space key (needed since v2 response uses space ID)
392 |
393 | Returns:
394 | Response formatted like v1 API for compatibility
395 | """
396 | # Map v2 response fields to v1 format
397 | v1_compatible = {
398 | "id": v2_response.get("id"),
399 | "type": "page",
400 | "status": v2_response.get("status"),
401 | "title": v2_response.get("title"),
402 | "space": {
403 | "key": space_key,
404 | "id": v2_response.get("spaceId"),
405 | },
406 | "version": {
407 | "number": v2_response.get("version", {}).get("number", 1),
408 | },
409 | "_links": v2_response.get("_links", {}),
410 | }
411 |
412 | # Add body if present in v2 response
413 | if "body" in v2_response:
414 | v1_compatible["body"] = {
415 | "storage": {
416 | "value": v2_response["body"].get("storage", {}).get("value", ""),
417 | "representation": "storage",
418 | }
419 | }
420 |
421 | return v1_compatible
422 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_transitions.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira Transitions mixin."""
2 |
3 | from unittest.mock import MagicMock
4 |
5 | import pytest
6 |
7 | from mcp_atlassian.jira import JiraFetcher
8 | from mcp_atlassian.jira.transitions import TransitionsMixin
9 | from mcp_atlassian.models.jira import (
10 | JiraIssue,
11 | JiraStatus,
12 | JiraStatusCategory,
13 | JiraTransition,
14 | )
15 |
16 |
17 | class TestTransitionsMixin:
18 | """Tests for the TransitionsMixin class."""
19 |
20 | @pytest.fixture
21 | def transitions_mixin(self, jira_fetcher: JiraFetcher) -> TransitionsMixin:
22 | """Create a TransitionsMixin instance with mocked dependencies."""
23 | mixin = jira_fetcher
24 |
25 | # Create a get_issue method to allow returning JiraIssue
26 | mixin.get_issue = MagicMock(
27 | return_value=JiraIssue(
28 | id="12345",
29 | key="TEST-123",
30 | summary="Test Issue",
31 | description="Issue content",
32 | status=JiraStatus(
33 | id="1",
34 | name="Open",
35 | category=JiraStatusCategory(
36 | id=1, key="open", name="To Do", color_name="blue-gray"
37 | ),
38 | ),
39 | )
40 | )
41 |
42 | # Set up mock for get_transitions_models
43 | mock_transitions = [
44 | JiraTransition(
45 | id="10",
46 | name="Start Progress",
47 | to_status=JiraStatus(id="2", name="In Progress"),
48 | )
49 | ]
50 | mixin.get_transitions_models = MagicMock(return_value=mock_transitions)
51 |
52 | return mixin
53 |
54 | def test_get_available_transitions_list_format(
55 | self, transitions_mixin: TransitionsMixin
56 | ):
57 | """Test get_available_transitions with list format response."""
58 | # Setup mock response - list format
59 | mock_transitions = [
60 | {"id": "10", "name": "In Progress", "to_status": "In Progress"},
61 | {"id": "11", "name": "Done", "status": "Done"},
62 | ]
63 | transitions_mixin.jira.get_issue_transitions.return_value = mock_transitions
64 |
65 | # Call the method
66 | result = transitions_mixin.get_available_transitions("TEST-123")
67 |
68 | # Verify
69 | assert len(result) == 2
70 | assert result[0]["id"] == "10"
71 | assert result[0]["name"] == "In Progress"
72 | assert result[0]["to_status"] == "In Progress"
73 | assert result[1]["id"] == "11"
74 | assert result[1]["name"] == "Done"
75 | assert result[1]["to_status"] == "Done"
76 |
77 | def test_get_available_transitions_empty_response(
78 | self, transitions_mixin: TransitionsMixin
79 | ):
80 | """Test get_available_transitions with empty response."""
81 | # Setup mock response - empty
82 | transitions_mixin.jira.get_issue_transitions.return_value = {}
83 |
84 | # Call the method
85 | result = transitions_mixin.get_available_transitions("TEST-123")
86 |
87 | # Verify
88 | assert isinstance(result, list)
89 | assert len(result) == 0
90 |
91 | def test_get_available_transitions_invalid_format(
92 | self, transitions_mixin: TransitionsMixin
93 | ):
94 | """Test get_available_transitions with invalid format response."""
95 | # Setup mock response - invalid format
96 | transitions_mixin.jira.get_issue_transitions.return_value = "invalid"
97 |
98 | # Call the method
99 | result = transitions_mixin.get_available_transitions("TEST-123")
100 |
101 | # Verify
102 | assert isinstance(result, list)
103 | assert len(result) == 0
104 |
105 | def test_get_available_transitions_with_error(
106 | self, transitions_mixin: TransitionsMixin
107 | ):
108 | """Test get_available_transitions error handling."""
109 | # Setup mock to raise exception
110 | transitions_mixin.jira.get_issue_transitions.side_effect = Exception(
111 | "Transition fetch error"
112 | )
113 |
114 | # Call the method and verify exception
115 | with pytest.raises(
116 | Exception, match="Error getting transitions: Transition fetch error"
117 | ):
118 | transitions_mixin.get_available_transitions("TEST-123")
119 |
120 | def test_transition_issue_basic(self, transitions_mixin: TransitionsMixin):
121 | """Test transition_issue with basic parameters."""
122 | # Call the method
123 | result = transitions_mixin.transition_issue("TEST-123", "10")
124 |
125 | # Verify
126 | transitions_mixin.jira.set_issue_status.assert_called_once_with(
127 | issue_key="TEST-123", status_name="In Progress", fields=None, update=None
128 | )
129 | transitions_mixin.get_issue.assert_called_once_with("TEST-123")
130 | assert isinstance(result, JiraIssue)
131 | assert result.key == "TEST-123"
132 | assert result.summary == "Test Issue"
133 | assert result.description == "Issue content"
134 |
135 | def test_transition_issue_with_int_id(self, transitions_mixin: TransitionsMixin):
136 | """Test transition_issue with int transition ID."""
137 | # Call the method with int ID
138 | transitions_mixin.transition_issue("TEST-123", 10)
139 |
140 | # Verify status name is used instead of ID
141 | transitions_mixin.jira.set_issue_status.assert_called_once_with(
142 | issue_key="TEST-123", status_name="In Progress", fields=None, update=None
143 | )
144 |
145 | def test_transition_issue_with_fields(self, transitions_mixin: TransitionsMixin):
146 | """Test transition_issue with fields."""
147 | # Mock _sanitize_transition_fields to return the fields
148 | transitions_mixin._sanitize_transition_fields = MagicMock(
149 | return_value={"summary": "Updated"}
150 | )
151 |
152 | # Call the method with fields
153 | fields = {"summary": "Updated"}
154 | transitions_mixin.transition_issue("TEST-123", "10", fields=fields)
155 |
156 | # Verify fields were passed correctly
157 | transitions_mixin.jira.set_issue_status.assert_called_once_with(
158 | issue_key="TEST-123",
159 | status_name="In Progress",
160 | fields={"summary": "Updated"},
161 | update=None,
162 | )
163 |
164 | def test_transition_issue_with_empty_sanitized_fields(
165 | self, transitions_mixin: TransitionsMixin
166 | ):
167 | """Test transition_issue with empty sanitized fields."""
168 | # Mock _sanitize_transition_fields to return empty dict
169 | transitions_mixin._sanitize_transition_fields = MagicMock(return_value={})
170 |
171 | # Call the method with fields that will be sanitized to empty
172 | fields = {"invalid": "field"}
173 | transitions_mixin.transition_issue("TEST-123", "10", fields=fields)
174 |
175 | # Verify fields were passed as None
176 | transitions_mixin.jira.set_issue_status.assert_called_once_with(
177 | issue_key="TEST-123", status_name="In Progress", fields=None, update=None
178 | )
179 |
180 | def test_transition_issue_with_comment(self, transitions_mixin: TransitionsMixin):
181 | """Test transition_issue with comment."""
182 | # Setup
183 | comment = "Test comment"
184 |
185 | # Define a side effect to record what's passed to _add_comment_to_transition_data
186 | def add_comment_side_effect(transition_data, comment_text):
187 | transition_data["update"] = {"comment": [{"add": {"body": comment_text}}]}
188 |
189 | # Mock _add_comment_to_transition_data
190 | transitions_mixin._add_comment_to_transition_data = MagicMock(
191 | side_effect=add_comment_side_effect
192 | )
193 |
194 | # Call the method with comment
195 | transitions_mixin.transition_issue("TEST-123", "10", comment=comment)
196 |
197 | # Verify _add_comment_to_transition_data was called
198 | transitions_mixin._add_comment_to_transition_data.assert_called_once()
199 |
200 | # Verify set_issue_status was called with the right parameters
201 | transitions_mixin.jira.set_issue_status.assert_called_once_with(
202 | issue_key="TEST-123",
203 | status_name="In Progress",
204 | fields=None,
205 | update={"comment": [{"add": {"body": comment}}]},
206 | )
207 |
208 | def test_transition_issue_with_error(self, transitions_mixin: TransitionsMixin):
209 | """Test transition_issue error handling."""
210 | # Setup mock to raise exception
211 | transitions_mixin.jira.set_issue_status.side_effect = Exception(
212 | "Transition error"
213 | )
214 |
215 | # Call the method and verify exception
216 | with pytest.raises(
217 | ValueError,
218 | match="Error transitioning issue TEST-123 with transition ID 10: Transition error",
219 | ):
220 | transitions_mixin.transition_issue("TEST-123", "10")
221 |
222 | def test_transition_issue_without_status_name(
223 | self, transitions_mixin: TransitionsMixin
224 | ):
225 | """Test transition_issue when status name is not available."""
226 | # Setup - create a transition without to_status
227 | mock_transitions = [
228 | JiraTransition(
229 | id="10",
230 | name="Start Progress",
231 | to_status=None,
232 | )
233 | ]
234 | transitions_mixin.get_transitions_models = MagicMock(
235 | return_value=mock_transitions
236 | )
237 |
238 | # Add mock for set_issue_status_by_transition_id
239 | transitions_mixin.jira.set_issue_status_by_transition_id = MagicMock()
240 |
241 | # Call the method
242 | result = transitions_mixin.transition_issue("TEST-123", "10")
243 |
244 | # Verify direct transition ID was used
245 | transitions_mixin.jira.set_issue_status_by_transition_id.assert_called_once_with(
246 | issue_key="TEST-123", transition_id=10
247 | )
248 |
249 | # Verify standard status call was not made
250 | transitions_mixin.jira.set_issue_status.assert_not_called()
251 |
252 | # Verify result
253 | transitions_mixin.get_issue.assert_called_once_with("TEST-123")
254 | assert isinstance(result, JiraIssue)
255 |
256 | def test_normalize_transition_id(self, transitions_mixin: TransitionsMixin):
257 | """Test _normalize_transition_id with various input types."""
258 | # Test with string
259 | assert transitions_mixin._normalize_transition_id("10") == 10
260 |
261 | # Test with non-digit string
262 | assert transitions_mixin._normalize_transition_id("workflow") == "workflow"
263 |
264 | # Test with int
265 | assert transitions_mixin._normalize_transition_id(10) == 10
266 |
267 | # Test with dict containing id
268 | assert transitions_mixin._normalize_transition_id({"id": "10"}) == 10
269 |
270 | # Test with dict containing int id
271 | assert transitions_mixin._normalize_transition_id({"id": 10}) == 10
272 |
273 | # Test with None
274 | assert transitions_mixin._normalize_transition_id(None) == 0
275 |
276 | def test_sanitize_transition_fields_basic(
277 | self, transitions_mixin: TransitionsMixin
278 | ):
279 | """Test _sanitize_transition_fields with basic fields."""
280 | # Simple fields
281 | fields = {"resolution": {"name": "Fixed"}, "priority": {"name": "High"}}
282 |
283 | result = transitions_mixin._sanitize_transition_fields(fields)
284 |
285 | # Fields should be passed through unchanged
286 | assert result == fields
287 |
288 | def test_sanitize_transition_fields_with_none_values(
289 | self, transitions_mixin: TransitionsMixin
290 | ):
291 | """Test _sanitize_transition_fields with None values."""
292 | # Fields with None values
293 | fields = {"resolution": {"name": "Fixed"}, "priority": None}
294 |
295 | result = transitions_mixin._sanitize_transition_fields(fields)
296 |
297 | # None values should be skipped
298 | assert "priority" not in result
299 | assert result["resolution"] == {"name": "Fixed"}
300 |
301 | def test_sanitize_transition_fields_with_assignee_and_get_account_id(
302 | self, transitions_mixin
303 | ):
304 | """Test _sanitize_transition_fields with assignee when _get_account_id is available."""
305 | # Setup mock for _get_account_id
306 | transitions_mixin._get_account_id = MagicMock(return_value="account-123")
307 |
308 | # Fields with assignee
309 | fields = {"assignee": "user.name"}
310 |
311 | result = transitions_mixin._sanitize_transition_fields(fields)
312 |
313 | # Assignee should be converted to account ID format
314 | transitions_mixin._get_account_id.assert_called_once_with("user.name")
315 | assert result["assignee"] == {"accountId": "account-123"}
316 |
317 | def test_sanitize_transition_fields_with_assignee_error(
318 | self, transitions_mixin: TransitionsMixin
319 | ):
320 | """Test _sanitize_transition_fields with assignee that causes error."""
321 | # Setup mock for _get_account_id to raise exception
322 | transitions_mixin._get_account_id = MagicMock(
323 | side_effect=Exception("User not found")
324 | )
325 |
326 | # Fields with assignee
327 | fields = {"assignee": "invalid.user", "resolution": {"name": "Fixed"}}
328 |
329 | result = transitions_mixin._sanitize_transition_fields(fields)
330 |
331 | # Assignee should be skipped due to error, resolution preserved
332 | assert "assignee" not in result
333 | assert result["resolution"] == {"name": "Fixed"}
334 |
335 | def test_add_comment_to_transition_data_with_string(
336 | self, transitions_mixin: TransitionsMixin
337 | ):
338 | """Test _add_comment_to_transition_data with string comment."""
339 | # Prepare transition data
340 | transition_data = {"transition": {"id": "10"}}
341 |
342 | # Call the method
343 | transitions_mixin._add_comment_to_transition_data(
344 | transition_data, "Test comment"
345 | )
346 |
347 | # Verify
348 | assert "update" in transition_data
349 | assert "comment" in transition_data["update"]
350 | assert len(transition_data["update"]["comment"]) == 1
351 | assert transition_data["update"]["comment"][0]["add"]["body"] == "Test comment"
352 |
353 | def test_add_comment_to_transition_data_with_non_string(
354 | self, transitions_mixin: TransitionsMixin
355 | ):
356 | """Test _add_comment_to_transition_data with non-string comment."""
357 | # Prepare transition data
358 | transition_data = {"transition": {"id": "10"}}
359 |
360 | # Call the method with int
361 | transitions_mixin._add_comment_to_transition_data(transition_data, 123)
362 |
363 | # Verify comment was converted to string
364 | assert transition_data["update"]["comment"][0]["add"]["body"] == "123"
365 |
366 | def test_add_comment_to_transition_data_with_markdown_to_jira(
367 | self, transitions_mixin
368 | ):
369 | """Test _add_comment_to_transition_data with _markdown_to_jira method."""
370 | # Add _markdown_to_jira method
371 | transitions_mixin._markdown_to_jira = MagicMock(
372 | return_value="Converted comment"
373 | )
374 |
375 | # Prepare transition data
376 | transition_data = {"transition": {"id": "10"}}
377 |
378 | # Call the method
379 | transitions_mixin._add_comment_to_transition_data(
380 | transition_data, "**Markdown** comment"
381 | )
382 |
383 | # Verify
384 | transitions_mixin._markdown_to_jira.assert_called_once_with(
385 | "**Markdown** comment"
386 | )
387 | assert (
388 | transition_data["update"]["comment"][0]["add"]["body"]
389 | == "Converted comment"
390 | )
391 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/projects.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira project operations."""
2 |
3 | import logging
4 | from typing import Any
5 |
6 | from ..models import JiraProject
7 | from ..models.jira.search import JiraSearchResult
8 | from ..models.jira.version import JiraVersion
9 | from .client import JiraClient
10 | from .protocols import SearchOperationsProto
11 |
12 | logger = logging.getLogger("mcp-jira")
13 |
14 |
15 | class ProjectsMixin(JiraClient, SearchOperationsProto):
16 | """Mixin for Jira project operations.
17 |
18 | This mixin provides methods for retrieving and working with Jira projects,
19 | including project details, components, versions, and other project-related operations.
20 | """
21 |
22 | def get_all_projects(self, include_archived: bool = False) -> list[dict[str, Any]]:
23 | """
24 | Get all projects visible to the current user.
25 |
26 | Args:
27 | include_archived: Whether to include archived projects
28 |
29 | Returns:
30 | List of project data dictionaries
31 | """
32 | try:
33 | params = {}
34 | if include_archived:
35 | params["includeArchived"] = "true"
36 |
37 | projects = self.jira.projects(included_archived=include_archived)
38 | return projects if isinstance(projects, list) else []
39 |
40 | except Exception as e:
41 | logger.error(f"Error getting all projects: {str(e)}")
42 | return []
43 |
44 | def get_project(self, project_key: str) -> dict[str, Any] | None:
45 | """
46 | Get project information by key.
47 |
48 | Args:
49 | project_key: The project key (e.g. 'PROJ')
50 |
51 | Returns:
52 | Project data or None if not found
53 | """
54 | try:
55 | project_data = self.jira.project(project_key)
56 | if not isinstance(project_data, dict):
57 | msg = f"Unexpected return value type from `jira.project`: {type(project_data)}"
58 | logger.error(msg)
59 | raise TypeError(msg)
60 | return project_data
61 | except Exception as e:
62 | logger.warning(f"Error getting project {project_key}: {e}")
63 | return None
64 |
65 | def get_project_model(self, project_key: str) -> JiraProject | None:
66 | """
67 | Get project information as a JiraProject model.
68 |
69 | Args:
70 | project_key: The project key (e.g. 'PROJ')
71 |
72 | Returns:
73 | JiraProject model or None if not found
74 | """
75 | project_data = self.get_project(project_key)
76 | if not project_data:
77 | return None
78 |
79 | return JiraProject.from_api_response(project_data)
80 |
81 | def project_exists(self, project_key: str) -> bool:
82 | """
83 | Check if a project exists.
84 |
85 | Args:
86 | project_key: The project key to check
87 |
88 | Returns:
89 | True if the project exists, False otherwise
90 | """
91 | try:
92 | project = self.get_project(project_key)
93 | return project is not None
94 |
95 | except Exception:
96 | return False
97 |
98 | def get_project_components(self, project_key: str) -> list[dict[str, Any]]:
99 | """
100 | Get all components for a project.
101 |
102 | Args:
103 | project_key: The project key
104 |
105 | Returns:
106 | List of component data dictionaries
107 | """
108 | try:
109 | components = self.jira.get_project_components(key=project_key)
110 | return components if isinstance(components, list) else []
111 |
112 | except Exception as e:
113 | logger.error(
114 | f"Error getting components for project {project_key}: {str(e)}"
115 | )
116 | return []
117 |
118 | def get_project_versions(self, project_key: str) -> list[dict[str, Any]]:
119 | """
120 | Get all versions for a project.
121 |
122 | Args:
123 | project_key: The project key.
124 |
125 | Returns:
126 | List of version data dictionaries
127 | """
128 | try:
129 | raw_versions = self.jira.get_project_versions(key=project_key)
130 | if not isinstance(raw_versions, list):
131 | return []
132 | versions: list[dict[str, Any]] = []
133 | for v in raw_versions:
134 | ver = JiraVersion.from_api_response(v)
135 | versions.append(ver.to_simplified_dict())
136 | return versions
137 | except Exception as e:
138 | logger.error(f"Error getting versions for project {project_key}: {str(e)}")
139 | return []
140 |
141 | def get_project_roles(self, project_key: str) -> dict[str, Any]:
142 | """
143 | Get all roles for a project.
144 |
145 | Args:
146 | project_key: The project key
147 |
148 | Returns:
149 | Dictionary of role names mapped to role details
150 | """
151 | try:
152 | roles = self.jira.get_project_roles(project_key=project_key)
153 | return roles if isinstance(roles, dict) else {}
154 |
155 | except Exception as e:
156 | logger.error(f"Error getting roles for project {project_key}: {str(e)}")
157 | return {}
158 |
159 | def get_project_role_members(
160 | self, project_key: str, role_id: str
161 | ) -> list[dict[str, Any]]:
162 | """
163 | Get members assigned to a specific role in a project.
164 |
165 | Args:
166 | project_key: The project key
167 | role_id: The role ID
168 |
169 | Returns:
170 | List of role members
171 | """
172 | try:
173 | members = self.jira.get_project_actors_for_role_project(
174 | project_key=project_key, role_id=role_id
175 | )
176 | # Extract the actors from the response
177 | actors = []
178 | if isinstance(members, dict) and "actors" in members:
179 | actors = members.get("actors", [])
180 | return actors
181 |
182 | except Exception as e:
183 | logger.error(
184 | f"Error getting role members for project {project_key}, role {role_id}: {str(e)}"
185 | )
186 | return []
187 |
188 | def get_project_permission_scheme(self, project_key: str) -> dict[str, Any] | None:
189 | """
190 | Get the permission scheme for a project.
191 |
192 | Args:
193 | project_key: The project key
194 |
195 | Returns:
196 | Permission scheme data if found, None otherwise
197 | """
198 | try:
199 | scheme = self.jira.get_project_permission_scheme(
200 | project_id_or_key=project_key
201 | )
202 | if not isinstance(scheme, dict):
203 | msg = f"Unexpected return value type from `jira.get_project_permission_scheme`: {type(scheme)}"
204 | logger.error(msg)
205 | raise TypeError(msg)
206 | return scheme
207 |
208 | except Exception as e:
209 | logger.error(
210 | f"Error getting permission scheme for project {project_key}: {str(e)}"
211 | )
212 | return None
213 |
214 | def get_project_notification_scheme(
215 | self, project_key: str
216 | ) -> dict[str, Any] | None:
217 | """
218 | Get the notification scheme for a project.
219 |
220 | Args:
221 | project_key: The project key
222 |
223 | Returns:
224 | Notification scheme data if found, None otherwise
225 | """
226 | try:
227 | scheme = self.jira.get_project_notification_scheme(
228 | project_id_or_key=project_key
229 | )
230 | if not isinstance(scheme, dict):
231 | msg = f"Unexpected return value type from `jira.get_project_notification_scheme`: {type(scheme)}"
232 | logger.error(msg)
233 | raise TypeError(msg)
234 | return scheme
235 |
236 | except Exception as e:
237 | logger.error(
238 | f"Error getting notification scheme for project {project_key}: {str(e)}"
239 | )
240 | return None
241 |
242 | def get_project_issue_types(self, project_key: str) -> list[dict[str, Any]]:
243 | """
244 | Get all issue types available for a project.
245 |
246 | Args:
247 | project_key: The project key
248 |
249 | Returns:
250 | List of issue type data dictionaries
251 | """
252 | try:
253 | meta = self.jira.issue_createmeta(project=project_key)
254 | if not isinstance(meta, dict):
255 | msg = f"Unexpected return value type from `jira.issue_createmeta`: {type(meta)}"
256 | logger.error(msg)
257 | raise TypeError(msg)
258 |
259 | issue_types = []
260 | # Extract issue types from createmeta response
261 | if "projects" in meta and len(meta["projects"]) > 0:
262 | project_data = meta["projects"][0]
263 | if "issuetypes" in project_data:
264 | issue_types = project_data["issuetypes"]
265 |
266 | return issue_types
267 |
268 | except Exception as e:
269 | logger.error(
270 | f"Error getting issue types for project {project_key}: {str(e)}"
271 | )
272 | return []
273 |
274 | def get_project_issues_count(self, project_key: str) -> int:
275 | """
276 | Get the total number of issues in a project.
277 |
278 | Args:
279 | project_key: The project key
280 |
281 | Returns:
282 | Count of issues in the project
283 | """
284 | try:
285 | # Use JQL to count issues in the project
286 | jql = f'project = "{project_key}"'
287 | result = self.jira.jql(jql=jql, fields="key", limit=1)
288 | if not isinstance(result, dict):
289 | msg = f"Unexpected return value type from `jira.jql`: {type(result)}"
290 | logger.error(msg)
291 | raise TypeError(msg)
292 |
293 | # Extract total from the response
294 | total = 0
295 | if isinstance(result, dict) and "total" in result:
296 | total = result.get("total", 0)
297 |
298 | return total
299 |
300 | except Exception as e:
301 | logger.error(
302 | f"Error getting issue count for project {project_key}: {str(e)}"
303 | )
304 | return 0
305 |
306 | def get_project_issues(
307 | self, project_key: str, start: int = 0, limit: int = 50
308 | ) -> JiraSearchResult:
309 | """
310 | Get issues for a specific project.
311 |
312 | Args:
313 | project_key: The project key
314 | start: Index of the first issue to return
315 | limit: Maximum number of issues to return
316 |
317 | Returns:
318 | List of JiraIssue models representing the issues
319 | """
320 | try:
321 | # Use JQL to get issues in the project
322 | jql = f'project = "{project_key}"'
323 |
324 | return self.search_issues(jql, start=start, limit=limit)
325 |
326 | except Exception as e:
327 | logger.error(f"Error getting issues for project {project_key}: {str(e)}")
328 | return JiraSearchResult(issues=[], total=0)
329 |
330 | def get_project_keys(self) -> list[str]:
331 | """
332 | Get all project keys.
333 |
334 | Returns:
335 | List of project keys
336 | """
337 | try:
338 | projects = self.get_all_projects()
339 | project_keys: list[str] = []
340 | for project in projects:
341 | key = project.get("key")
342 | if not isinstance(key, str):
343 | msg = f"Unexpected return value type from `get_all_projects`: {type(key)}"
344 | logger.error(msg)
345 | raise TypeError(msg)
346 | project_keys.append(key)
347 | return project_keys
348 |
349 | except Exception as e:
350 | logger.error(f"Error getting project keys: {str(e)}")
351 | return []
352 |
353 | def get_project_leads(self) -> dict[str, str]:
354 | """
355 | Get all project leads mapped to their projects.
356 |
357 | Returns:
358 | Dictionary mapping project keys to lead usernames
359 | """
360 | try:
361 | projects = self.get_all_projects()
362 | leads = {}
363 |
364 | for project in projects:
365 | if "key" in project and "lead" in project:
366 | key = project.get("key")
367 | lead = project.get("lead", {})
368 |
369 | # Handle different formats of lead information
370 | lead_name = None
371 | if isinstance(lead, dict):
372 | lead_name = lead.get("name") or lead.get("displayName")
373 | elif isinstance(lead, str):
374 | lead_name = lead
375 |
376 | if key and lead_name:
377 | leads[key] = lead_name
378 |
379 | return leads
380 |
381 | except Exception as e:
382 | logger.error(f"Error getting project leads: {str(e)}")
383 | return {}
384 |
385 | def get_user_accessible_projects(self, username: str) -> list[dict[str, Any]]:
386 | """
387 | Get projects that a specific user can access.
388 |
389 | Args:
390 | username: The username to check access for
391 |
392 | Returns:
393 | List of accessible project data dictionaries
394 | """
395 | try:
396 | # This requires admin permissions
397 | # For non-admins, a different approach might be needed
398 | all_projects = self.get_all_projects()
399 | accessible_projects = []
400 |
401 | for project in all_projects:
402 | project_key = project.get("key")
403 | if not project_key:
404 | continue
405 |
406 | try:
407 | # Check if user has browse permission for this project
408 | browse_users = (
409 | self.jira.get_users_with_browse_permission_to_a_project(
410 | username=username, project_key=project_key, limit=1
411 | )
412 | )
413 |
414 | # If the user is in the list, they have access
415 | user_has_access = False
416 | if isinstance(browse_users, list):
417 | for user in browse_users:
418 | if isinstance(user, dict) and user.get("name") == username:
419 | user_has_access = True
420 | break
421 |
422 | if user_has_access:
423 | accessible_projects.append(project)
424 |
425 | except Exception:
426 | # Skip projects that cause errors
427 | continue
428 |
429 | return accessible_projects
430 |
431 | except Exception as e:
432 | logger.error(
433 | f"Error getting accessible projects for user {username}: {str(e)}"
434 | )
435 | return []
436 |
437 | def create_project_version(
438 | self,
439 | project_key: str,
440 | name: str,
441 | start_date: str = None,
442 | release_date: str = None,
443 | description: str = None,
444 | ) -> dict[str, Any]:
445 | """
446 | Create a new version in the specified Jira project.
447 |
448 | Args:
449 | project_key: The project key (e.g., 'PROJ')
450 | name: The name of the version
451 | start_date: The start date (YYYY-MM-DD, optional)
452 | release_date: The release date (YYYY-MM-DD, optional)
453 | description: Description of the version (optional)
454 |
455 | Returns:
456 | The created version object as returned by Jira
457 | """
458 | return self.create_version(
459 | project=project_key,
460 | name=name,
461 | start_date=start_date,
462 | release_date=release_date,
463 | description=description,
464 | )
465 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/utils/oauth_setup.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | OAuth 2.0 Authorization Flow Helper for MCP Atlassian
3 |
4 | This module helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud:
5 | 1. Opens a browser to the authorization URL
6 | 2. Starts a local server to receive the callback with the authorization code
7 | 3. Exchanges the authorization code for access and refresh tokens
8 | 4. Saves the tokens securely for later use by MCP Atlassian
9 | """
10 |
11 | import http.server
12 | import logging
13 | import os
14 | import socketserver
15 | import threading
16 | import time
17 | import urllib.parse
18 | import webbrowser
19 | from dataclasses import dataclass
20 |
21 | from ..utils.oauth import OAuthConfig
22 |
23 | # Configure logging
24 | logger = logging.getLogger("mcp-atlassian.oauth-setup")
25 |
26 | # Global variables for callback handling
27 | authorization_code = None
28 | authorization_state = None
29 | callback_received = False
30 | callback_error = None
31 |
32 |
33 | class CallbackHandler(http.server.BaseHTTPRequestHandler):
34 | """HTTP request handler for OAuth callback."""
35 |
36 | def do_GET(self) -> None: # noqa: N802
37 | """Handle GET requests (OAuth callback)."""
38 | global \
39 | authorization_code, \
40 | callback_received, \
41 | callback_error, \
42 | authorization_state
43 |
44 | # Parse the query parameters from the URL
45 | query = urllib.parse.urlparse(self.path).query
46 | params = urllib.parse.parse_qs(query)
47 |
48 | if "error" in params:
49 | callback_error = params["error"][0]
50 | callback_received = True
51 | self._send_response(f"Authorization failed: {callback_error}")
52 | return
53 |
54 | if "code" in params:
55 | authorization_code = params["code"][0]
56 | if "state" in params:
57 | authorization_state = params["state"][0]
58 | callback_received = True
59 | self._send_response(
60 | "Authorization successful! You can close this window now."
61 | )
62 | else:
63 | self._send_response(
64 | "Invalid callback: Authorization code missing", status=400
65 | )
66 |
67 | def _send_response(self, message: str, status: int = 200) -> None:
68 | """Send response to the browser."""
69 | self.send_response(status)
70 | self.send_header("Content-type", "text/html")
71 | self.end_headers()
72 |
73 | html = f"""
74 | <!DOCTYPE html>
75 | <html>
76 | <head>
77 | <title>Atlassian OAuth Authorization</title>
78 | <style>
79 | body {{
80 | font-family: Arial, sans-serif;
81 | text-align: center;
82 | padding: 40px;
83 | max-width: 600px;
84 | margin: 0 auto;
85 | }}
86 | .message {{
87 | padding: 20px;
88 | border-radius: 5px;
89 | margin-bottom: 20px;
90 | }}
91 | .success {{
92 | background-color: #d4edda;
93 | color: #155724;
94 | border: 1px solid #c3e6cb;
95 | }}
96 | .error {{
97 | background-color: #f8d7da;
98 | color: #721c24;
99 | border: 1px solid #f5c6cb;
100 | }}
101 | .countdown {{
102 | font-weight: bold;
103 | font-size: 1.2em;
104 | }}
105 | </style>
106 | </head>
107 | <body>
108 | <h1>Atlassian OAuth Authorization</h1>
109 | <div class="message {"success" if status == 200 else "error"}">
110 | <p>{message}</p>
111 | </div>
112 | <p>This window will automatically close in <span class="countdown">5</span> seconds...</p>
113 | <button onclick="window.close()">Close Window Now</button>
114 | <script>
115 | // Countdown timer
116 | var seconds = 5;
117 | var countdown = document.querySelector('.countdown');
118 | var timer = setInterval(function() {{
119 | seconds--;
120 | countdown.textContent = seconds;
121 | if (seconds <= 0) {{
122 | clearInterval(timer);
123 | // Try multiple methods to close the window
124 | window.close();
125 | // If the above doesn't work (which is often the case with modern browsers)
126 | try {{ window.open('', '_self').close(); }} catch (e) {{}}
127 | }}
128 | }}, 1000);
129 |
130 | // Force close on success after 5.5 seconds as a fallback
131 | setTimeout(function() {{
132 | // If status is 200 (success), really try hard to close
133 | if ({status} === 200) {{
134 | window.open('about:blank', '_self');
135 | window.close();
136 | }}
137 | }}, 5500);
138 | </script>
139 | </body>
140 | </html>
141 | """
142 | self.wfile.write(html.encode())
143 |
144 | # Make the server quiet
145 | def log_message(self, format: str, *args: str) -> None:
146 | return
147 |
148 |
149 | def start_callback_server(port: int) -> socketserver.TCPServer:
150 | """Start a local server to receive the OAuth callback."""
151 | handler = CallbackHandler
152 | httpd = socketserver.TCPServer(("", port), handler)
153 | server_thread = threading.Thread(target=httpd.serve_forever)
154 | server_thread.daemon = True
155 | server_thread.start()
156 | return httpd
157 |
158 |
159 | def wait_for_callback(timeout: int = 300) -> bool:
160 | """Wait for the callback to be received."""
161 | start_time = time.time()
162 | while not callback_received and (time.time() - start_time) < timeout:
163 | time.sleep(1)
164 |
165 | if not callback_received:
166 | logger.error(
167 | f"Timed out waiting for authorization callback after {timeout} seconds"
168 | )
169 | return False
170 |
171 | if callback_error:
172 | logger.error(f"Authorization error: {callback_error}")
173 | return False
174 |
175 | return True
176 |
177 |
178 | def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]:
179 | """Parse the redirect URI to extract host and port."""
180 | parsed = urllib.parse.urlparse(redirect_uri)
181 | port = parsed.port or (443 if parsed.scheme == "https" else 80)
182 | return parsed.hostname, port
183 |
184 |
185 | @dataclass
186 | class OAuthSetupArgs:
187 | """Arguments for the OAuth setup flow."""
188 |
189 | client_id: str
190 | client_secret: str
191 | redirect_uri: str
192 | scope: str
193 |
194 |
195 | def run_oauth_flow(args: OAuthSetupArgs) -> bool:
196 | """Run the OAuth 2.0 authorization flow."""
197 | # Reset global state (important for multiple runs)
198 | global authorization_code, authorization_state, callback_received, callback_error
199 | authorization_code = None
200 | authorization_state = None
201 | callback_received = False
202 | callback_error = None
203 |
204 | # Create OAuth configuration
205 | oauth_config = OAuthConfig(
206 | client_id=args.client_id,
207 | client_secret=args.client_secret,
208 | redirect_uri=args.redirect_uri,
209 | scope=args.scope,
210 | )
211 |
212 | # Generate a random state for CSRF protection
213 | import secrets
214 |
215 | state = secrets.token_urlsafe(16)
216 |
217 | # Start local callback server if using localhost
218 | hostname, port = parse_redirect_uri(args.redirect_uri)
219 | httpd = None
220 |
221 | if hostname in ["localhost", "127.0.0.1"]:
222 | logger.info(f"Starting local callback server on port {port}")
223 | try:
224 | httpd = start_callback_server(port)
225 | except OSError as e:
226 | logger.error(f"Failed to start callback server: {e}")
227 | logger.error(f"Make sure port {port} is available and not in use")
228 | return False
229 |
230 | # Get the authorization URL
231 | auth_url = oauth_config.get_authorization_url(state=state)
232 |
233 | # Open the browser for authorization
234 | logger.info(f"Opening browser for authorization at {auth_url}")
235 | webbrowser.open(auth_url)
236 | logger.info(
237 | "If the browser doesn't open automatically, please visit this URL manually."
238 | )
239 |
240 | # Wait for the callback
241 | if not wait_for_callback():
242 | if httpd:
243 | httpd.shutdown()
244 | return False
245 |
246 | # Verify state to prevent CSRF attacks
247 | if authorization_state != state:
248 | logger.error("State mismatch! Possible CSRF attack.")
249 | if httpd:
250 | httpd.shutdown()
251 | return False
252 |
253 | # Exchange the code for tokens
254 | logger.info("Exchanging authorization code for tokens...")
255 | if oauth_config.exchange_code_for_tokens(authorization_code):
256 | logger.info("✅ OAuth authorization successful!")
257 | logger.info(
258 | f"Access token: {oauth_config.access_token[:10]}...{oauth_config.access_token[-5:]}"
259 | )
260 | logger.info(
261 | f"Refresh token saved: {oauth_config.refresh_token[:5]}...{oauth_config.refresh_token[-3:]}"
262 | )
263 |
264 | if oauth_config.cloud_id:
265 | logger.info(f"Cloud ID: {oauth_config.cloud_id}")
266 |
267 | # Print environment variable information more clearly
268 | logger.info("\n=== IMPORTANT: ENVIRONMENT VARIABLES ===")
269 | logger.info(
270 | "Your tokens have been securely stored in your system keyring and backup file."
271 | )
272 | logger.info(
273 | "However, to use them in your application, you need these environment variables:"
274 | )
275 | logger.info("")
276 | logger.info(
277 | "Add the following to your .env file or set as environment variables:"
278 | )
279 | logger.info("------------------------------------------------------------")
280 | logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}")
281 | logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}")
282 | logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}")
283 | logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}")
284 | logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}")
285 | logger.info("------------------------------------------------------------")
286 | logger.info("")
287 | logger.info(
288 | "Note: The tokens themselves are not set as environment variables for security reasons."
289 | )
290 | logger.info(
291 | "They are stored securely in your system keyring and will be loaded automatically."
292 | )
293 | logger.info(
294 | f"Token storage location (backup): ~/.mcp-atlassian/oauth-{oauth_config.client_id}.json"
295 | )
296 |
297 | # Generate VS Code configuration JSON snippet
298 | import json
299 |
300 | vscode_config = {
301 | "mcpServers": {
302 | "mcp-atlassian": {
303 | "command": "docker",
304 | "args": [
305 | "run",
306 | "--rm",
307 | "-i",
308 | "-p",
309 | "8080:8080",
310 | "-e",
311 | "CONFLUENCE_URL",
312 | "-e",
313 | "JIRA_URL",
314 | "-e",
315 | "ATLASSIAN_OAUTH_CLIENT_ID",
316 | "-e",
317 | "ATLASSIAN_OAUTH_CLIENT_SECRET",
318 | "-e",
319 | "ATLASSIAN_OAUTH_REDIRECT_URI",
320 | "-e",
321 | "ATLASSIAN_OAUTH_SCOPE",
322 | "-e",
323 | "ATLASSIAN_OAUTH_CLOUD_ID",
324 | "ghcr.io/sooperset/mcp-atlassian:latest",
325 | ],
326 | "env": {
327 | "CONFLUENCE_URL": "https://your-company.atlassian.net/wiki",
328 | "JIRA_URL": "https://your-company.atlassian.net",
329 | "ATLASSIAN_OAUTH_CLIENT_ID": oauth_config.client_id,
330 | "ATLASSIAN_OAUTH_CLIENT_SECRET": oauth_config.client_secret,
331 | "ATLASSIAN_OAUTH_REDIRECT_URI": oauth_config.redirect_uri,
332 | "ATLASSIAN_OAUTH_SCOPE": oauth_config.scope,
333 | "ATLASSIAN_OAUTH_CLOUD_ID": oauth_config.cloud_id,
334 | },
335 | }
336 | }
337 | }
338 |
339 | # Pretty print the VS Code configuration JSON
340 | vscode_json = json.dumps(vscode_config, indent=4)
341 |
342 | logger.info("\n=== VS CODE CONFIGURATION ===")
343 | logger.info("Add the following to your VS Code settings.json file:")
344 | logger.info("------------------------------------------------------------")
345 | logger.info(vscode_json)
346 | logger.info("------------------------------------------------------------")
347 | logger.info(
348 | "\nNote: If you already have an 'mcp' configuration in settings.json, merge this with your existing configuration."
349 | )
350 | else:
351 | logger.error("Failed to obtain cloud ID!")
352 |
353 | if httpd:
354 | httpd.shutdown()
355 | return True
356 | else:
357 | logger.error("Failed to exchange authorization code for tokens")
358 | if httpd:
359 | httpd.shutdown()
360 | return False
361 |
362 |
363 | def _prompt_for_input(prompt: str, env_var: str = None, is_secret: bool = False) -> str:
364 | """Prompt the user for input."""
365 | value = os.getenv(env_var, "") if env_var else ""
366 | if value:
367 | if is_secret:
368 | masked = (
369 | value[:3] + "*" * (len(value) - 6) + value[-3:]
370 | if len(value) > 6
371 | else "****"
372 | )
373 | print(f"{prompt} [{masked}]: ", end="")
374 | else:
375 | print(f"{prompt} [{value}]: ", end="")
376 | user_input = input()
377 | return user_input if user_input else value
378 | else:
379 | print(f"{prompt}: ", end="")
380 | return input()
381 |
382 |
383 | def run_oauth_setup() -> int:
384 | """Run the OAuth 2.0 setup wizard interactively."""
385 | print("\n=== Atlassian OAuth 2.0 Setup Wizard ===")
386 | print(
387 | "This wizard will guide you through setting up OAuth 2.0 authentication for MCP Atlassian."
388 | )
389 | print("\nYou need to have created an OAuth 2.0 app in your Atlassian account.")
390 | print("You can create one at: https://developer.atlassian.com/console/myapps/")
391 | print("\nPlease provide the following information:\n")
392 |
393 | # Check for environment variables first
394 | client_id = _prompt_for_input("OAuth Client ID", "ATLASSIAN_OAUTH_CLIENT_ID")
395 |
396 | client_secret = _prompt_for_input(
397 | "OAuth Client Secret", "ATLASSIAN_OAUTH_CLIENT_SECRET", is_secret=True
398 | )
399 |
400 | default_redirect = os.getenv(
401 | "ATLASSIAN_OAUTH_REDIRECT_URI", "http://localhost:8080/callback"
402 | )
403 | redirect_uri = (
404 | _prompt_for_input("OAuth Redirect URI", "ATLASSIAN_OAUTH_REDIRECT_URI")
405 | or default_redirect
406 | )
407 |
408 | default_scope = os.getenv(
409 | "ATLASSIAN_OAUTH_SCOPE",
410 | "read:jira-work write:jira-work read:confluence-space.summary offline_access",
411 | )
412 | scope = (
413 | _prompt_for_input("OAuth Scopes (space-separated)", "ATLASSIAN_OAUTH_SCOPE")
414 | or default_scope
415 | )
416 |
417 | # Validate required arguments
418 | if not client_id:
419 | logger.error("OAuth Client ID is required")
420 | return 1
421 | if not client_secret:
422 | logger.error("OAuth Client Secret is required")
423 | return 1
424 |
425 | # Run the OAuth flow
426 | args = OAuthSetupArgs(
427 | client_id=client_id,
428 | client_secret=client_secret,
429 | redirect_uri=redirect_uri,
430 | scope=scope,
431 | )
432 |
433 | success = run_oauth_flow(args)
434 | return 0 if success else 1
435 |
```