This is page 4 of 13. Use http://codebase.md/sooperset/mcp-atlassian?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/tests/unit/jira/test_client.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira client module."""
2 |
3 | import os
4 | from copy import deepcopy
5 | from typing import Literal
6 | from unittest.mock import MagicMock, call, patch
7 |
8 | import pytest
9 |
10 | from mcp_atlassian.jira.client import JiraClient
11 | from mcp_atlassian.jira.config import JiraConfig
12 |
13 |
14 | class DeepcopyMock(MagicMock):
15 | """A Mock that creates a deep copy of its arguments before storing them."""
16 |
17 | def __call__(self, /, *args, **kwargs):
18 | args = deepcopy(args)
19 | kwargs = deepcopy(kwargs)
20 | return super().__call__(*args, **kwargs)
21 |
22 |
23 | def test_init_with_basic_auth():
24 | """Test initializing the client with basic auth configuration."""
25 | with (
26 | patch("mcp_atlassian.jira.client.Jira") as mock_jira,
27 | patch(
28 | "mcp_atlassian.jira.client.configure_ssl_verification"
29 | ) as mock_configure_ssl,
30 | ):
31 | config = JiraConfig(
32 | url="https://test.atlassian.net",
33 | auth_type="basic",
34 | username="test_username",
35 | api_token="test_token",
36 | )
37 |
38 | client = JiraClient(config=config)
39 |
40 | # Verify Jira was initialized correctly
41 | mock_jira.assert_called_once_with(
42 | url="https://test.atlassian.net",
43 | username="test_username",
44 | password="test_token",
45 | cloud=True,
46 | verify_ssl=True,
47 | )
48 |
49 | # Verify SSL verification was configured
50 | mock_configure_ssl.assert_called_once_with(
51 | service_name="Jira",
52 | url="https://test.atlassian.net",
53 | session=mock_jira.return_value._session,
54 | ssl_verify=True,
55 | )
56 |
57 | assert client.config == config
58 | assert client._field_ids_cache is None
59 | assert client._current_user_account_id is None
60 |
61 |
62 | def test_init_with_token_auth():
63 | """Test initializing the client with token auth configuration."""
64 | with (
65 | patch("mcp_atlassian.jira.client.Jira") as mock_jira,
66 | patch(
67 | "mcp_atlassian.jira.client.configure_ssl_verification"
68 | ) as mock_configure_ssl,
69 | ):
70 | config = JiraConfig(
71 | url="https://jira.example.com",
72 | auth_type="pat",
73 | personal_token="test_personal_token",
74 | ssl_verify=False,
75 | )
76 |
77 | client = JiraClient(config=config)
78 |
79 | # Verify Jira was initialized correctly
80 | mock_jira.assert_called_once_with(
81 | url="https://jira.example.com",
82 | token="test_personal_token",
83 | cloud=False,
84 | verify_ssl=False,
85 | )
86 |
87 | # Verify SSL verification was configured with ssl_verify=False
88 | mock_configure_ssl.assert_called_once_with(
89 | service_name="Jira",
90 | url="https://jira.example.com",
91 | session=mock_jira.return_value._session,
92 | ssl_verify=False,
93 | )
94 |
95 | assert client.config == config
96 |
97 |
98 | def test_init_from_env():
99 | """Test initializing the client from environment variables."""
100 | with (
101 | patch("mcp_atlassian.jira.config.JiraConfig.from_env") as mock_from_env,
102 | patch("mcp_atlassian.jira.client.Jira") as mock_jira,
103 | patch("mcp_atlassian.jira.client.configure_ssl_verification"),
104 | ):
105 | mock_config = MagicMock()
106 | mock_config.auth_type = "basic" # needed for the if condition
107 | mock_from_env.return_value = mock_config
108 |
109 | client = JiraClient()
110 |
111 | mock_from_env.assert_called_once()
112 | assert client.config == mock_config
113 |
114 |
115 | def test_clean_text():
116 | """Test the _clean_text method."""
117 | with (
118 | patch("mcp_atlassian.jira.client.Jira"),
119 | patch("mcp_atlassian.jira.client.configure_ssl_verification"),
120 | ):
121 | client = JiraClient(
122 | config=JiraConfig(
123 | url="https://test.atlassian.net",
124 | auth_type="basic",
125 | username="test_username",
126 | api_token="test_token",
127 | )
128 | )
129 |
130 | # Test with HTML
131 | assert client._clean_text("<p>Test content</p>") == "Test content"
132 |
133 | # Test with empty string
134 | assert client._clean_text("") == ""
135 |
136 | # Test with spaces and newlines
137 | assert client._clean_text(" \n Test with spaces \n ") == "Test with spaces"
138 |
139 |
140 | def _test_get_paged(method: Literal["get", "post"]):
141 | """Test the get_paged method."""
142 | with (
143 | patch(
144 | "mcp_atlassian.jira.client.Jira.get", new_callable=DeepcopyMock
145 | ) as mock_get,
146 | patch(
147 | "mcp_atlassian.jira.client.Jira.post", new_callable=DeepcopyMock
148 | ) as mock_post,
149 | patch("mcp_atlassian.jira.client.configure_ssl_verification"),
150 | ):
151 | config = JiraConfig(
152 | url="https://test.atlassian.net",
153 | auth_type="basic",
154 | username="test_username",
155 | api_token="test_token",
156 | )
157 | client = JiraClient(config=config)
158 |
159 | # Mock paged responses
160 | mock_responses = [
161 | {"data": "page1", "nextPageToken": "token1"},
162 | {"data": "page2", "nextPageToken": "token2"},
163 | {"data": "page3"}, # Last page does not have nextPageToken
164 | ]
165 |
166 | # Create mock method with side effect to return responses in sequence
167 | if method == "get":
168 | mock_get.side_effect = mock_responses
169 | mock_post.side_effect = RuntimeError("This should not be called")
170 | else:
171 | mock_post.side_effect = mock_responses
172 | mock_get.side_effect = RuntimeError("This should not be called")
173 |
174 | # Run the method
175 | params = {"initial": "params"}
176 | results = client.get_paged(method, "/test/url", params)
177 |
178 | # Verify the results
179 | assert results == mock_responses
180 |
181 | # Verify call parameters
182 | if method == "get":
183 | expected_calls = [
184 | call(path="/test/url", params={"initial": "params"}, absolute=False),
185 | call(
186 | path="/test/url",
187 | params={"initial": "params", "nextPageToken": "token1"},
188 | absolute=False,
189 | ),
190 | call(
191 | path="/test/url",
192 | params={"initial": "params", "nextPageToken": "token2"},
193 | absolute=False,
194 | ),
195 | ]
196 | assert mock_get.call_args_list == expected_calls
197 | else:
198 | expected_calls = [
199 | call(path="/test/url", json={"initial": "params"}, absolute=False),
200 | call(
201 | path="/test/url",
202 | json={"initial": "params", "nextPageToken": "token1"},
203 | absolute=False,
204 | ),
205 | call(
206 | path="/test/url",
207 | json={"initial": "params", "nextPageToken": "token2"},
208 | absolute=False,
209 | ),
210 | ]
211 | assert mock_post.call_args_list == expected_calls
212 |
213 |
214 | def test_get_paged_get():
215 | """Test the get_paged method for GET requests."""
216 | _test_get_paged("get")
217 |
218 |
219 | def test_get_paged_post():
220 | """Test the get_paged method for POST requests."""
221 | _test_get_paged("post")
222 |
223 |
224 | def test_get_paged_without_cloud():
225 | """Test the get_paged method without cloud."""
226 | with patch("mcp_atlassian.jira.client.configure_ssl_verification"):
227 | config = JiraConfig(
228 | url="https://jira.example.com",
229 | auth_type="pat",
230 | personal_token="test_token",
231 | )
232 | client = JiraClient(config=config)
233 | with pytest.raises(
234 | ValueError,
235 | match="Paged requests are only available for Jira Cloud platform",
236 | ):
237 | client.get_paged("get", "/test/url")
238 |
239 |
240 | def test_init_sets_proxies_and_no_proxy(monkeypatch):
241 | """Test that JiraClient sets session proxies and NO_PROXY env var from config."""
242 | # Patch Jira and its _session
243 | mock_jira = MagicMock()
244 | mock_session = MagicMock()
245 | mock_session.proxies = {} # Use a real dict for proxies
246 | mock_jira._session = mock_session
247 | monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
248 | monkeypatch.setattr(
249 | "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
250 | )
251 |
252 | # Patch environment
253 | monkeypatch.setenv("NO_PROXY", "")
254 |
255 | config = JiraConfig(
256 | url="https://test.atlassian.net",
257 | auth_type="basic",
258 | username="user",
259 | api_token="pat",
260 | http_proxy="http://proxy:8080",
261 | https_proxy="https://proxy:8443",
262 | socks_proxy="socks5://user:pass@proxy:1080",
263 | no_proxy="localhost,127.0.0.1",
264 | )
265 | client = JiraClient(config=config)
266 | assert mock_session.proxies["http"] == "http://proxy:8080"
267 | assert mock_session.proxies["https"] == "https://proxy:8443"
268 | assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
269 | assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"
270 |
271 |
272 | def test_init_no_proxies(monkeypatch):
273 | """Test that JiraClient does not set proxies if not configured."""
274 | # Patch Jira and its _session
275 | mock_jira = MagicMock()
276 | mock_session = MagicMock()
277 | mock_session.proxies = {} # Use a real dict for proxies
278 | mock_jira._session = mock_session
279 | monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
280 | monkeypatch.setattr(
281 | "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
282 | )
283 |
284 | config = JiraConfig(
285 | url="https://test.atlassian.net",
286 | auth_type="basic",
287 | username="user",
288 | api_token="pat",
289 | )
290 | client = JiraClient(config=config)
291 | assert mock_session.proxies == {}
292 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/attachments.py:
--------------------------------------------------------------------------------
```python
1 | """Attachment operations for Jira API."""
2 |
3 | import logging
4 | import os
5 | from pathlib import Path
6 | from typing import Any
7 |
8 | from ..models.jira import JiraAttachment
9 | from .client import JiraClient
10 | from .protocols import AttachmentsOperationsProto
11 |
12 | # Configure logging
13 | logger = logging.getLogger("mcp-jira")
14 |
15 |
16 | class AttachmentsMixin(JiraClient, AttachmentsOperationsProto):
17 | """Mixin for Jira attachment operations."""
18 |
19 | def download_attachment(self, url: str, target_path: str) -> bool:
20 | """
21 | Download a Jira attachment to the specified path.
22 |
23 | Args:
24 | url: The URL of the attachment to download
25 | target_path: The path where the attachment should be saved
26 |
27 | Returns:
28 | True if successful, False otherwise
29 | """
30 | if not url:
31 | logger.error("No URL provided for attachment download")
32 | return False
33 |
34 | try:
35 | # Convert to absolute path if relative
36 | if not os.path.isabs(target_path):
37 | target_path = os.path.abspath(target_path)
38 |
39 | logger.info(f"Downloading attachment from {url} to {target_path}")
40 |
41 | # Create the directory if it doesn't exist
42 | os.makedirs(os.path.dirname(target_path), exist_ok=True)
43 |
44 | # Use the Jira session to download the file
45 | response = self.jira._session.get(url, stream=True)
46 | response.raise_for_status()
47 |
48 | # Write the file to disk
49 | with open(target_path, "wb") as f:
50 | for chunk in response.iter_content(chunk_size=8192):
51 | f.write(chunk)
52 |
53 | # Verify the file was created
54 | if os.path.exists(target_path):
55 | file_size = os.path.getsize(target_path)
56 | logger.info(
57 | f"Successfully downloaded attachment to {target_path} (size: {file_size} bytes)"
58 | )
59 | return True
60 | else:
61 | logger.error(f"File was not created at {target_path}")
62 | return False
63 |
64 | except Exception as e:
65 | logger.error(f"Error downloading attachment: {str(e)}")
66 | return False
67 |
68 | def download_issue_attachments(
69 | self, issue_key: str, target_dir: str
70 | ) -> dict[str, Any]:
71 | """
72 | Download all attachments for a Jira issue.
73 |
74 | Args:
75 | issue_key: The Jira issue key (e.g., 'PROJ-123')
76 | target_dir: The directory where attachments should be saved
77 |
78 | Returns:
79 | A dictionary with download results
80 | """
81 | # Convert to absolute path if relative
82 | if not os.path.isabs(target_dir):
83 | target_dir = os.path.abspath(target_dir)
84 |
85 | logger.info(
86 | f"Downloading attachments for {issue_key} to directory: {target_dir}"
87 | )
88 |
89 | # Create the target directory if it doesn't exist
90 | target_path = Path(target_dir)
91 | target_path.mkdir(parents=True, exist_ok=True)
92 |
93 | # Get the issue with attachments
94 | logger.info(f"Fetching issue {issue_key} with attachments")
95 | issue_data = self.jira.issue(issue_key, fields="attachment")
96 |
97 | if not isinstance(issue_data, dict):
98 | msg = f"Unexpected return value type from `jira.issue`: {type(issue_data)}"
99 | logger.error(msg)
100 | raise TypeError(msg)
101 |
102 | if "fields" not in issue_data:
103 | logger.error(f"Could not retrieve issue {issue_key}")
104 | return {"success": False, "error": f"Could not retrieve issue {issue_key}"}
105 |
106 | # Process attachments
107 | attachments = []
108 | results = []
109 |
110 | # Extract attachments from the API response
111 | attachment_data = issue_data.get("fields", {}).get("attachment", [])
112 |
113 | if not attachment_data:
114 | return {
115 | "success": True,
116 | "message": f"No attachments found for issue {issue_key}",
117 | "downloaded": [],
118 | "failed": [],
119 | }
120 |
121 | # Create JiraAttachment objects for each attachment
122 | for attachment in attachment_data:
123 | if isinstance(attachment, dict):
124 | attachments.append(JiraAttachment.from_api_response(attachment))
125 |
126 | # Download each attachment
127 | downloaded = []
128 | failed = []
129 |
130 | for attachment in attachments:
131 | if not attachment.url:
132 | logger.warning(f"No URL for attachment {attachment.filename}")
133 | failed.append(
134 | {"filename": attachment.filename, "error": "No URL available"}
135 | )
136 | continue
137 |
138 | # Create a safe filename
139 | safe_filename = Path(attachment.filename).name
140 | file_path = target_path / safe_filename
141 |
142 | # Download the attachment
143 | success = self.download_attachment(attachment.url, str(file_path))
144 |
145 | if success:
146 | downloaded.append(
147 | {
148 | "filename": attachment.filename,
149 | "path": str(file_path),
150 | "size": attachment.size,
151 | }
152 | )
153 | else:
154 | failed.append(
155 | {"filename": attachment.filename, "error": "Download failed"}
156 | )
157 |
158 | return {
159 | "success": True,
160 | "issue_key": issue_key,
161 | "total": len(attachments),
162 | "downloaded": downloaded,
163 | "failed": failed,
164 | }
165 |
166 | def upload_attachment(self, issue_key: str, file_path: str) -> dict[str, Any]:
167 | """
168 | Upload a single attachment to a Jira issue.
169 |
170 | Args:
171 | issue_key: The Jira issue key (e.g., 'PROJ-123')
172 | file_path: The path to the file to upload
173 |
174 | Returns:
175 | A dictionary with upload result information
176 | """
177 | if not issue_key:
178 | logger.error("No issue key provided for attachment upload")
179 | return {"success": False, "error": "No issue key provided"}
180 |
181 | if not file_path:
182 | logger.error("No file path provided for attachment upload")
183 | return {"success": False, "error": "No file path provided"}
184 |
185 | try:
186 | # Convert to absolute path if relative
187 | if not os.path.isabs(file_path):
188 | file_path = os.path.abspath(file_path)
189 |
190 | # Check if file exists
191 | if not os.path.exists(file_path):
192 | logger.error(f"File not found: {file_path}")
193 | return {"success": False, "error": f"File not found: {file_path}"}
194 |
195 | logger.info(f"Uploading attachment from {file_path} to issue {issue_key}")
196 |
197 | # Use the Jira API to upload the file
198 | filename = os.path.basename(file_path)
199 | with open(file_path, "rb") as file:
200 | attachment = self.jira.add_attachment(
201 | issue_key=issue_key, filename=file_path
202 | )
203 |
204 | if attachment:
205 | file_size = os.path.getsize(file_path)
206 | logger.info(
207 | f"Successfully uploaded attachment {filename} to {issue_key} (size: {file_size} bytes)"
208 | )
209 | return {
210 | "success": True,
211 | "issue_key": issue_key,
212 | "filename": filename,
213 | "size": file_size,
214 | "id": attachment.get("id")
215 | if isinstance(attachment, dict)
216 | else None,
217 | }
218 | else:
219 | logger.error(f"Failed to upload attachment {filename} to {issue_key}")
220 | return {
221 | "success": False,
222 | "error": f"Failed to upload attachment {filename} to {issue_key}",
223 | }
224 |
225 | except Exception as e:
226 | error_msg = str(e)
227 | logger.error(f"Error uploading attachment: {error_msg}")
228 | return {"success": False, "error": error_msg}
229 |
230 | def upload_attachments(
231 | self, issue_key: str, file_paths: list[str]
232 | ) -> dict[str, Any]:
233 | """
234 | Upload multiple attachments to a Jira issue.
235 |
236 | Args:
237 | issue_key: The Jira issue key (e.g., 'PROJ-123')
238 | file_paths: List of paths to files to upload
239 |
240 | Returns:
241 | A dictionary with upload results
242 | """
243 | if not issue_key:
244 | logger.error("No issue key provided for attachment upload")
245 | return {"success": False, "error": "No issue key provided"}
246 |
247 | if not file_paths:
248 | logger.error("No file paths provided for attachment upload")
249 | return {"success": False, "error": "No file paths provided"}
250 |
251 | logger.info(f"Uploading {len(file_paths)} attachments to issue {issue_key}")
252 |
253 | # Upload each attachment
254 | uploaded = []
255 | failed = []
256 |
257 | for file_path in file_paths:
258 | result = self.upload_attachment(issue_key, file_path)
259 |
260 | if result.get("success"):
261 | uploaded.append(
262 | {
263 | "filename": result.get("filename"),
264 | "size": result.get("size"),
265 | "id": result.get("id"),
266 | }
267 | )
268 | else:
269 | failed.append(
270 | {
271 | "filename": os.path.basename(file_path),
272 | "error": result.get("error"),
273 | }
274 | )
275 |
276 | return {
277 | "success": True,
278 | "issue_key": issue_key,
279 | "total": len(file_paths),
280 | "uploaded": uploaded,
281 | "failed": failed,
282 | }
283 |
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_environment.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the environment utilities module."""
2 |
3 | import logging
4 |
5 | import pytest
6 |
7 | from mcp_atlassian.utils.environment import get_available_services
8 | from tests.utils.assertions import assert_log_contains
9 | from tests.utils.mocks import MockEnvironment
10 |
11 |
12 | @pytest.fixture(autouse=True)
13 | def setup_logger():
14 | """Ensure logger is set to INFO level for capturing log messages."""
15 | logger = logging.getLogger("mcp-atlassian.utils.environment")
16 | original_level = logger.level
17 | logger.setLevel(logging.INFO)
18 | yield
19 | logger.setLevel(original_level)
20 |
21 |
22 | @pytest.fixture
23 | def env_scenarios():
24 | """Environment configuration scenarios for testing."""
25 | return {
26 | "oauth_cloud": {
27 | "CONFLUENCE_URL": "https://company.atlassian.net",
28 | "JIRA_URL": "https://company.atlassian.net",
29 | "ATLASSIAN_OAUTH_CLIENT_ID": "client_id",
30 | "ATLASSIAN_OAUTH_CLIENT_SECRET": "client_secret",
31 | "ATLASSIAN_OAUTH_REDIRECT_URI": "http://localhost:8080/callback",
32 | "ATLASSIAN_OAUTH_SCOPE": "read:jira-user",
33 | "ATLASSIAN_OAUTH_CLOUD_ID": "cloud_id",
34 | },
35 | "basic_auth_cloud": {
36 | "CONFLUENCE_URL": "https://company.atlassian.net",
37 | "CONFLUENCE_USERNAME": "[email protected]",
38 | "CONFLUENCE_API_TOKEN": "api_token",
39 | "JIRA_URL": "https://company.atlassian.net",
40 | "JIRA_USERNAME": "[email protected]",
41 | "JIRA_API_TOKEN": "api_token",
42 | },
43 | "pat_server": {
44 | "CONFLUENCE_URL": "https://confluence.company.com",
45 | "CONFLUENCE_PERSONAL_TOKEN": "pat_token",
46 | "JIRA_URL": "https://jira.company.com",
47 | "JIRA_PERSONAL_TOKEN": "pat_token",
48 | },
49 | "basic_auth_server": {
50 | "CONFLUENCE_URL": "https://confluence.company.com",
51 | "CONFLUENCE_USERNAME": "admin",
52 | "CONFLUENCE_API_TOKEN": "password",
53 | "JIRA_URL": "https://jira.company.com",
54 | "JIRA_USERNAME": "admin",
55 | "JIRA_API_TOKEN": "password",
56 | },
57 | }
58 |
59 |
60 | def _assert_service_availability(result, confluence_expected, jira_expected):
61 | """Helper to assert service availability."""
62 | assert result == {"confluence": confluence_expected, "jira": jira_expected}
63 |
64 |
65 | def _assert_authentication_logs(caplog, auth_type, services):
66 | """Helper to assert authentication log messages."""
67 | log_patterns = {
68 | "oauth": "OAuth 2.0 (3LO) authentication (Cloud-only features)",
69 | "cloud_basic": "Cloud Basic Authentication (API Token)",
70 | "server": "Server/Data Center authentication (PAT or Basic Auth)",
71 | "not_configured": "is not configured or required environment variables are missing",
72 | }
73 |
74 | for service in services:
75 | service_name = service.title()
76 | if auth_type == "not_configured":
77 | assert_log_contains(
78 | caplog, "INFO", f"{service_name} {log_patterns[auth_type]}"
79 | )
80 | else:
81 | assert_log_contains(
82 | caplog, "INFO", f"Using {service_name} {log_patterns[auth_type]}"
83 | )
84 |
85 |
86 | class TestGetAvailableServices:
87 | """Test cases for get_available_services function."""
88 |
89 | def test_no_services_configured(self, caplog):
90 | """Test that no services are available when no environment variables are set."""
91 | with MockEnvironment.clean_env():
92 | result = get_available_services()
93 | _assert_service_availability(
94 | result, confluence_expected=False, jira_expected=False
95 | )
96 | _assert_authentication_logs(
97 | caplog, "not_configured", ["confluence", "jira"]
98 | )
99 |
100 | @pytest.mark.parametrize(
101 | "scenario,expected_confluence,expected_jira",
102 | [
103 | ("oauth_cloud", True, True),
104 | ("basic_auth_cloud", True, True),
105 | ("pat_server", True, True),
106 | ("basic_auth_server", True, True),
107 | ],
108 | )
109 | def test_valid_authentication_scenarios(
110 | self, env_scenarios, scenario, expected_confluence, expected_jira, caplog
111 | ):
112 | """Test various valid authentication scenarios."""
113 | with MockEnvironment.clean_env():
114 | for key, value in env_scenarios[scenario].items():
115 | import os
116 |
117 | os.environ[key] = value
118 |
119 | result = get_available_services()
120 | _assert_service_availability(
121 | result,
122 | confluence_expected=expected_confluence,
123 | jira_expected=expected_jira,
124 | )
125 |
126 | # Verify appropriate log messages based on scenario
127 | if scenario == "oauth_cloud":
128 | _assert_authentication_logs(caplog, "oauth", ["confluence", "jira"])
129 | elif scenario == "basic_auth_cloud":
130 | _assert_authentication_logs(
131 | caplog, "cloud_basic", ["confluence", "jira"]
132 | )
133 | elif scenario in ["pat_server", "basic_auth_server"]:
134 | _assert_authentication_logs(caplog, "server", ["confluence", "jira"])
135 |
136 | @pytest.mark.parametrize(
137 | "missing_oauth_var",
138 | [
139 | "ATLASSIAN_OAUTH_CLIENT_ID",
140 | "ATLASSIAN_OAUTH_CLIENT_SECRET",
141 | "ATLASSIAN_OAUTH_REDIRECT_URI",
142 | "ATLASSIAN_OAUTH_SCOPE",
143 | "ATLASSIAN_OAUTH_CLOUD_ID",
144 | ],
145 | )
146 | def test_oauth_missing_required_vars(
147 | self, env_scenarios, missing_oauth_var, caplog
148 | ):
149 | """Test that OAuth fails when any required variable is missing."""
150 | with MockEnvironment.clean_env():
151 | oauth_config = env_scenarios["oauth_cloud"]
152 | # Remove one required OAuth variable
153 | del oauth_config[missing_oauth_var]
154 |
155 | for key, value in oauth_config.items():
156 | import os
157 |
158 | os.environ[key] = value
159 |
160 | result = get_available_services()
161 | _assert_service_availability(
162 | result, confluence_expected=False, jira_expected=False
163 | )
164 |
165 | @pytest.mark.parametrize(
166 | "missing_basic_vars,service",
167 | [
168 | (["CONFLUENCE_USERNAME", "JIRA_USERNAME"], "username"),
169 | (["CONFLUENCE_API_TOKEN", "JIRA_API_TOKEN"], "token"),
170 | ],
171 | )
172 | def test_basic_auth_missing_credentials(
173 | self, env_scenarios, missing_basic_vars, service
174 | ):
175 | """Test that basic auth fails when credentials are missing."""
176 | with MockEnvironment.clean_env():
177 | basic_config = env_scenarios["basic_auth_cloud"].copy()
178 |
179 | # Remove required variables
180 | for var in missing_basic_vars:
181 | del basic_config[var]
182 |
183 | for key, value in basic_config.items():
184 | import os
185 |
186 | os.environ[key] = value
187 |
188 | result = get_available_services()
189 | _assert_service_availability(
190 | result, confluence_expected=False, jira_expected=False
191 | )
192 |
193 | def test_oauth_precedence_over_basic_auth(self, env_scenarios, caplog):
194 | """Test that OAuth takes precedence over Basic Auth."""
195 | with MockEnvironment.clean_env():
196 | # Set both OAuth and Basic Auth variables
197 | combined_config = {
198 | **env_scenarios["oauth_cloud"],
199 | **env_scenarios["basic_auth_cloud"],
200 | }
201 |
202 | for key, value in combined_config.items():
203 | import os
204 |
205 | os.environ[key] = value
206 |
207 | result = get_available_services()
208 | _assert_service_availability(
209 | result, confluence_expected=True, jira_expected=True
210 | )
211 |
212 | # Should use OAuth, not Basic Auth
213 | _assert_authentication_logs(caplog, "oauth", ["confluence", "jira"])
214 | assert "Basic Authentication" not in caplog.text
215 |
216 | def test_mixed_service_configuration(self, caplog):
217 | """Test mixed configurations where only one service is configured."""
218 | with MockEnvironment.clean_env():
219 | import os
220 |
221 | os.environ["CONFLUENCE_URL"] = "https://company.atlassian.net"
222 | os.environ["CONFLUENCE_USERNAME"] = "[email protected]"
223 | os.environ["CONFLUENCE_API_TOKEN"] = "api_token"
224 |
225 | result = get_available_services()
226 | _assert_service_availability(
227 | result, confluence_expected=True, jira_expected=False
228 | )
229 |
230 | _assert_authentication_logs(caplog, "cloud_basic", ["confluence"])
231 | _assert_authentication_logs(caplog, "not_configured", ["jira"])
232 |
233 | def test_return_value_structure(self):
234 | """Test that the return value has the correct structure."""
235 | with MockEnvironment.clean_env():
236 | result = get_available_services()
237 |
238 | assert isinstance(result, dict)
239 | assert set(result.keys()) == {"confluence", "jira"}
240 | assert all(isinstance(v, bool) for v in result.values())
241 |
242 | @pytest.mark.parametrize(
243 | "invalid_vars",
244 | [
245 | {"CONFLUENCE_URL": "", "JIRA_URL": ""}, # Empty strings
246 | {"confluence_url": "https://test.com"}, # Wrong case
247 | ],
248 | )
249 | def test_invalid_environment_variables(self, invalid_vars, caplog):
250 | """Test behavior with invalid environment variables."""
251 | with MockEnvironment.clean_env():
252 | for key, value in invalid_vars.items():
253 | import os
254 |
255 | os.environ[key] = value
256 |
257 | result = get_available_services()
258 | _assert_service_availability(
259 | result, confluence_expected=False, jira_expected=False
260 | )
261 | _assert_authentication_logs(
262 | caplog, "not_configured", ["confluence", "jira"]
263 | )
264 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_comments.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira Comments mixin."""
2 |
3 | from unittest.mock import MagicMock, Mock
4 |
5 | import pytest
6 |
7 | from mcp_atlassian.jira.comments import CommentsMixin
8 |
9 |
10 | class TestCommentsMixin:
11 | """Tests for the CommentsMixin class."""
12 |
13 | @pytest.fixture
14 | def comments_mixin(self, jira_client):
15 | """Create a CommentsMixin instance with mocked dependencies."""
16 | mixin = CommentsMixin(config=jira_client.config)
17 | mixin.jira = jira_client.jira
18 |
19 | # Set up a mock preprocessor with markdown_to_jira method
20 | mixin.preprocessor = Mock()
21 | mixin.preprocessor.markdown_to_jira = Mock(
22 | return_value="*This* is _Jira_ formatted"
23 | )
24 |
25 | # Mock the clean_text method
26 | mixin._clean_text = Mock(side_effect=lambda x: x)
27 |
28 | return mixin
29 |
30 | def test_get_issue_comments_basic(self, comments_mixin):
31 | """Test get_issue_comments with basic data."""
32 | # Setup mock response
33 | comments_mixin.jira.issue_get_comments.return_value = {
34 | "comments": [
35 | {
36 | "id": "10001",
37 | "body": "This is a comment",
38 | "created": "2024-01-01T10:00:00.000+0000",
39 | "updated": "2024-01-01T11:00:00.000+0000",
40 | "author": {"displayName": "John Doe"},
41 | }
42 | ]
43 | }
44 |
45 | # Call the method
46 | result = comments_mixin.get_issue_comments("TEST-123")
47 |
48 | # Verify
49 | comments_mixin.jira.issue_get_comments.assert_called_once_with("TEST-123")
50 | assert len(result) == 1
51 | assert result[0]["id"] == "10001"
52 | assert result[0]["body"] == "This is a comment"
53 | assert result[0]["created"] == "2024-01-01 10:00:00+00:00" # Parsed date
54 | assert result[0]["author"] == "John Doe"
55 |
56 | def test_get_issue_comments_with_limit(self, comments_mixin):
57 | """Test get_issue_comments with limit parameter."""
58 | # Setup mock response with multiple comments
59 | comments_mixin.jira.issue_get_comments.return_value = {
60 | "comments": [
61 | {
62 | "id": "10001",
63 | "body": "First comment",
64 | "created": "2024-01-01T10:00:00.000+0000",
65 | "author": {"displayName": "John Doe"},
66 | },
67 | {
68 | "id": "10002",
69 | "body": "Second comment",
70 | "created": "2024-01-02T10:00:00.000+0000",
71 | "author": {"displayName": "Jane Smith"},
72 | },
73 | {
74 | "id": "10003",
75 | "body": "Third comment",
76 | "created": "2024-01-03T10:00:00.000+0000",
77 | "author": {"displayName": "Bob Johnson"},
78 | },
79 | ]
80 | }
81 |
82 | # Call the method with limit=2
83 | result = comments_mixin.get_issue_comments("TEST-123", limit=2)
84 |
85 | # Verify
86 | comments_mixin.jira.issue_get_comments.assert_called_once_with("TEST-123")
87 | assert len(result) == 2 # Only 2 comments should be returned
88 | assert result[0]["id"] == "10001"
89 | assert result[1]["id"] == "10002"
90 | # Third comment shouldn't be included due to limit
91 |
92 | def test_get_issue_comments_with_missing_fields(self, comments_mixin):
93 | """Test get_issue_comments with missing fields in the response."""
94 | # Setup mock response with missing fields
95 | comments_mixin.jira.issue_get_comments.return_value = {
96 | "comments": [
97 | {
98 | "id": "10001",
99 | # Missing body field
100 | "created": "2024-01-01T10:00:00.000+0000",
101 | # Missing author field
102 | },
103 | {
104 | # Missing id field
105 | "body": "Second comment",
106 | # Missing created field
107 | "author": {}, # Empty author object
108 | },
109 | {
110 | "id": "10003",
111 | "body": "Third comment",
112 | "created": "2024-01-03T10:00:00.000+0000",
113 | "author": {"name": "user123"}, # Using name instead of displayName
114 | },
115 | ]
116 | }
117 |
118 | # Call the method
119 | result = comments_mixin.get_issue_comments("TEST-123")
120 |
121 | # Verify
122 | assert len(result) == 3
123 | assert result[0]["id"] == "10001"
124 | assert result[0]["body"] == "" # Should default to empty string
125 | assert result[0]["author"] == "Unknown" # Should default to Unknown
126 |
127 | assert (
128 | "id" not in result[1] or not result[1]["id"]
129 | ) # Should be missing or empty
130 | assert result[1]["author"] == "Unknown" # Should default to Unknown
131 |
132 | assert (
133 | result[2]["author"] == "Unknown"
134 | ) # Should use Unknown when only name is available
135 |
136 | def test_get_issue_comments_with_empty_response(self, comments_mixin):
137 | """Test get_issue_comments with an empty response."""
138 | # Setup mock response with no comments
139 | comments_mixin.jira.issue_get_comments.return_value = {"comments": []}
140 |
141 | # Call the method
142 | result = comments_mixin.get_issue_comments("TEST-123")
143 |
144 | # Verify
145 | assert len(result) == 0 # Should return an empty list
146 |
147 | def test_get_issue_comments_with_error(self, comments_mixin):
148 | """Test get_issue_comments with an error response."""
149 | # Setup mock to raise exception
150 | comments_mixin.jira.issue_get_comments.side_effect = Exception("API Error")
151 |
152 | # Verify it raises the wrapped exception
153 | with pytest.raises(Exception, match="Error getting comments"):
154 | comments_mixin.get_issue_comments("TEST-123")
155 |
156 | def test_add_comment_basic(self, comments_mixin):
157 | """Test add_comment with basic data."""
158 | # Setup mock response
159 | comments_mixin.jira.issue_add_comment.return_value = {
160 | "id": "10001",
161 | "body": "This is a comment",
162 | "created": "2024-01-01T10:00:00.000+0000",
163 | "author": {"displayName": "John Doe"},
164 | }
165 |
166 | # Call the method
167 | result = comments_mixin.add_comment("TEST-123", "Test comment")
168 |
169 | # Verify
170 | comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
171 | "Test comment"
172 | )
173 | comments_mixin.jira.issue_add_comment.assert_called_once_with(
174 | "TEST-123", "*This* is _Jira_ formatted"
175 | )
176 | assert result["id"] == "10001"
177 | assert result["body"] == "This is a comment"
178 | assert result["created"] == "2024-01-01 10:00:00+00:00" # Parsed date
179 | assert result["author"] == "John Doe"
180 |
181 | def test_add_comment_with_markdown_conversion(self, comments_mixin):
182 | """Test add_comment with markdown conversion."""
183 | # Setup mock response
184 | comments_mixin.jira.issue_add_comment.return_value = {
185 | "id": "10001",
186 | "body": "*This* is _Jira_ formatted",
187 | "created": "2024-01-01T10:00:00.000+0000",
188 | "author": {"displayName": "John Doe"},
189 | }
190 |
191 | # Create a complex markdown comment
192 | markdown_comment = """
193 | # Heading 1
194 |
195 | This is a paragraph with **bold** and *italic* text.
196 |
197 | - List item 1
198 | - List item 2
199 |
200 | ```python
201 | def hello():
202 | print("Hello world")
203 | ```
204 | """
205 |
206 | # Call the method
207 | result = comments_mixin.add_comment("TEST-123", markdown_comment)
208 |
209 | # Verify
210 | comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
211 | markdown_comment
212 | )
213 | comments_mixin.jira.issue_add_comment.assert_called_once_with(
214 | "TEST-123", "*This* is _Jira_ formatted"
215 | )
216 | assert result["body"] == "*This* is _Jira_ formatted"
217 |
218 | def test_add_comment_with_empty_comment(self, comments_mixin):
219 | """Test add_comment with an empty comment."""
220 | # Setup mock response
221 | comments_mixin.jira.issue_add_comment.return_value = {
222 | "id": "10001",
223 | "body": "",
224 | "created": "2024-01-01T10:00:00.000+0000",
225 | "author": {"displayName": "John Doe"},
226 | }
227 |
228 | # Call the method with empty comment
229 | result = comments_mixin.add_comment("TEST-123", "")
230 |
231 | # Verify - for empty comments, markdown_to_jira should NOT be called as per implementation
232 | comments_mixin.preprocessor.markdown_to_jira.assert_not_called()
233 | comments_mixin.jira.issue_add_comment.assert_called_once_with("TEST-123", "")
234 | assert result["body"] == ""
235 |
236 | def test_add_comment_with_error(self, comments_mixin):
237 | """Test add_comment with an error response."""
238 | # Setup mock to raise exception
239 | comments_mixin.jira.issue_add_comment.side_effect = Exception("API Error")
240 |
241 | # Verify it raises the wrapped exception
242 | with pytest.raises(Exception, match="Error adding comment"):
243 | comments_mixin.add_comment("TEST-123", "Test comment")
244 |
245 | def test_markdown_to_jira(self, comments_mixin):
246 | """Test markdown to Jira conversion."""
247 | # Setup - need to replace the mock entirely
248 | comments_mixin.preprocessor.markdown_to_jira = MagicMock(
249 | return_value="Jira text"
250 | )
251 |
252 | # Call the method
253 | result = comments_mixin._markdown_to_jira("Markdown text")
254 |
255 | # Verify
256 | assert result == "Jira text"
257 | comments_mixin.preprocessor.markdown_to_jira.assert_called_once_with(
258 | "Markdown text"
259 | )
260 |
261 | def test_markdown_to_jira_with_empty_text(self, comments_mixin):
262 | """Test markdown to Jira conversion with empty text."""
263 | # Call the method with empty text
264 | result = comments_mixin._markdown_to_jira("")
265 |
266 | # Verify
267 | assert result == ""
268 | # The preprocessor should not be called with empty text
269 | comments_mixin.preprocessor.markdown_to_jira.assert_not_called()
270 |
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Root pytest configuration file for MCP Atlassian tests.
3 |
4 | This module provides session-scoped fixtures and utilities that are shared
5 | across all test modules. It integrates with the new test utilities framework
6 | to provide efficient, reusable test fixtures.
7 | """
8 |
9 | import pytest
10 |
11 | from tests.utils.factories import (
12 | AuthConfigFactory,
13 | ConfluencePageFactory,
14 | ErrorResponseFactory,
15 | JiraIssueFactory,
16 | )
17 | from tests.utils.mocks import MockAtlassianClient, MockEnvironment
18 |
19 |
20 | def pytest_addoption(parser):
21 | """Add command-line options for tests."""
22 | parser.addoption(
23 | "--use-real-data",
24 | action="store_true",
25 | default=False,
26 | help="Run tests that use real API data (requires env vars)",
27 | )
28 |
29 |
30 | # ============================================================================
31 | # Session-Scoped Configuration Fixtures
32 | # ============================================================================
33 |
34 |
35 | @pytest.fixture(scope="session")
36 | def session_auth_configs():
37 | """
38 | Session-scoped fixture providing authentication configuration templates.
39 |
40 | This fixture is computed once per test session and provides standard
41 | authentication configurations for OAuth and basic auth scenarios.
42 |
43 | Returns:
44 | Dict[str, Dict[str, str]]: Authentication configuration templates
45 | """
46 | return {
47 | "oauth": AuthConfigFactory.create_oauth_config(),
48 | "basic_auth": AuthConfigFactory.create_basic_auth_config(),
49 | "jira_basic": {
50 | "url": "https://test.atlassian.net",
51 | "username": "[email protected]",
52 | "api_token": "test-jira-token",
53 | },
54 | "confluence_basic": {
55 | "url": "https://test.atlassian.net/wiki",
56 | "username": "[email protected]",
57 | "api_token": "test-confluence-token",
58 | },
59 | }
60 |
61 |
62 | @pytest.fixture(scope="session")
63 | def session_mock_data():
64 | """
65 | Session-scoped fixture providing mock data templates.
66 |
67 | This fixture creates mock data templates once per session to avoid
68 | recreating expensive mock objects for every test.
69 |
70 | Returns:
71 | Dict[str, Any]: Mock data templates for various API responses
72 | """
73 | return {
74 | "jira_issue": JiraIssueFactory.create(),
75 | "jira_issue_minimal": JiraIssueFactory.create_minimal(),
76 | "confluence_page": ConfluencePageFactory.create(),
77 | "api_error": ErrorResponseFactory.create_api_error(),
78 | "auth_error": ErrorResponseFactory.create_auth_error(),
79 | "jira_search_results": {
80 | "issues": [
81 | JiraIssueFactory.create("TEST-1"),
82 | JiraIssueFactory.create("TEST-2"),
83 | JiraIssueFactory.create("TEST-3"),
84 | ],
85 | "total": 3,
86 | "startAt": 0,
87 | "maxResults": 50,
88 | },
89 | }
90 |
91 |
92 | # ============================================================================
93 | # Environment and Configuration Fixtures
94 | # ============================================================================
95 |
96 |
97 | @pytest.fixture
98 | def clean_environment():
99 | """
100 | Fixture that provides a clean environment with no auth variables.
101 |
102 | This is useful for testing error conditions and configuration validation.
103 | """
104 | with MockEnvironment.clean_env() as env:
105 | yield env
106 |
107 |
108 | @pytest.fixture
109 | def oauth_environment():
110 | """
111 | Fixture that provides a complete OAuth environment setup.
112 |
113 | This sets up all necessary OAuth environment variables for testing
114 | OAuth-based authentication flows.
115 | """
116 | with MockEnvironment.oauth_env() as env:
117 | yield env
118 |
119 |
120 | @pytest.fixture
121 | def basic_auth_environment():
122 | """
123 | Fixture that provides basic authentication environment setup.
124 |
125 | This sets up username/token authentication for both Jira and Confluence.
126 | """
127 | with MockEnvironment.basic_auth_env() as env:
128 | yield env
129 |
130 |
131 | # ============================================================================
132 | # Factory-Based Fixtures
133 | # ============================================================================
134 |
135 |
136 | @pytest.fixture
137 | def make_jira_issue():
138 | """
139 | Factory fixture for creating Jira issues with customizable properties.
140 |
141 | Returns:
142 | Callable: Factory function that creates Jira issue data
143 |
144 | Example:
145 | def test_issue_creation(make_jira_issue):
146 | issue = make_jira_issue(key="CUSTOM-123",
147 | fields={"priority": {"name": "High"}})
148 | assert issue["key"] == "CUSTOM-123"
149 | """
150 | return JiraIssueFactory.create
151 |
152 |
153 | @pytest.fixture
154 | def make_confluence_page():
155 | """
156 | Factory fixture for creating Confluence pages with customizable properties.
157 |
158 | Returns:
159 | Callable: Factory function that creates Confluence page data
160 |
161 | Example:
162 | def test_page_creation(make_confluence_page):
163 | page = make_confluence_page(title="Custom Page",
164 | space={"key": "CUSTOM"})
165 | assert page["title"] == "Custom Page"
166 | """
167 | return ConfluencePageFactory.create
168 |
169 |
170 | @pytest.fixture
171 | def make_auth_config():
172 | """
173 | Factory fixture for creating authentication configurations.
174 |
175 | Returns:
176 | Dict[str, Callable]: Factory functions for different auth types
177 |
178 | Example:
179 | def test_oauth_config(make_auth_config):
180 | config = make_auth_config["oauth"](client_id="custom-id")
181 | assert config["client_id"] == "custom-id"
182 | """
183 | return {
184 | "oauth": AuthConfigFactory.create_oauth_config,
185 | "basic": AuthConfigFactory.create_basic_auth_config,
186 | }
187 |
188 |
189 | @pytest.fixture
190 | def make_api_error():
191 | """
192 | Factory fixture for creating API error responses.
193 |
194 | Returns:
195 | Callable: Factory function that creates error response data
196 |
197 | Example:
198 | def test_error_handling(make_api_error):
199 | error = make_api_error(status_code=404, message="Not Found")
200 | assert error["status"] == 404
201 | """
202 | return ErrorResponseFactory.create_api_error
203 |
204 |
205 | # ============================================================================
206 | # Mock Client Fixtures
207 | # ============================================================================
208 |
209 |
210 | @pytest.fixture
211 | def mock_jira_client():
212 | """
213 | Fixture providing a pre-configured mock Jira client.
214 |
215 | The client comes with sensible defaults for common operations
216 | but can be customized per test as needed.
217 |
218 | Returns:
219 | MagicMock: Configured mock Jira client
220 | """
221 | return MockAtlassianClient.create_jira_client()
222 |
223 |
224 | @pytest.fixture
225 | def mock_confluence_client():
226 | """
227 | Fixture providing a pre-configured mock Confluence client.
228 |
229 | The client comes with sensible defaults for common operations
230 | but can be customized per test as needed.
231 |
232 | Returns:
233 | MagicMock: Configured mock Confluence client
234 | """
235 | return MockAtlassianClient.create_confluence_client()
236 |
237 |
238 | # ============================================================================
239 | # Compatibility Fixtures (maintain backward compatibility)
240 | # ============================================================================
241 |
242 |
243 | @pytest.fixture
244 | def use_real_jira_data(request):
245 | """
246 | Check if real Jira data tests should be run.
247 |
248 | This will be True if the --use-real-data flag is passed to pytest.
249 |
250 | Note: This fixture is maintained for backward compatibility.
251 | """
252 | return request.config.getoption("--use-real-data")
253 |
254 |
255 | @pytest.fixture
256 | def use_real_confluence_data(request):
257 | """
258 | Check if real Confluence data tests should be run.
259 |
260 | This will be True if the --use-real-data flag is passed to pytest.
261 |
262 | Note: This fixture is maintained for backward compatibility.
263 | """
264 | return request.config.getoption("--use-real-data")
265 |
266 |
267 | # ============================================================================
268 | # Advanced Environment Utilities
269 | # ============================================================================
270 |
271 |
272 | @pytest.fixture
273 | def env_var_manager():
274 | """
275 | Fixture providing utilities for managing environment variables in tests.
276 |
277 | Returns:
278 | MockEnvironment: Environment management utilities
279 |
280 | Example:
281 | def test_with_custom_env(env_var_manager):
282 | with env_var_manager.oauth_env():
283 | # Test OAuth functionality
284 | pass
285 | """
286 | return MockEnvironment
287 |
288 |
289 | @pytest.fixture
290 | def parametrized_auth_env(request):
291 | """
292 | Parametrized fixture for testing with different authentication environments.
293 |
294 | This fixture can be used with pytest.mark.parametrize to test the same
295 | functionality with different authentication setups.
296 |
297 | Example:
298 | @pytest.mark.parametrize("parametrized_auth_env",
299 | ["oauth", "basic_auth"], indirect=True)
300 | def test_auth_scenarios(parametrized_auth_env):
301 | # Test will run once for OAuth and once for basic auth
302 | pass
303 | """
304 | auth_type = request.param
305 |
306 | if auth_type == "oauth":
307 | with MockEnvironment.oauth_env() as env:
308 | yield env
309 | elif auth_type == "basic_auth":
310 | with MockEnvironment.basic_auth_env() as env:
311 | yield env
312 | elif auth_type == "clean":
313 | with MockEnvironment.clean_env() as env:
314 | yield env
315 | else:
316 | raise ValueError(f"Unknown auth type: {auth_type}")
317 |
318 |
319 | # ============================================================================
320 | # Session Validation and Health Checks
321 | # ============================================================================
322 |
323 |
324 | @pytest.fixture(scope="session", autouse=True)
325 | def validate_test_environment():
326 | """
327 | Session-scoped fixture that validates the test environment setup.
328 |
329 | This fixture runs automatically and ensures that the test environment
330 | is properly configured for running the test suite.
331 | """
332 | # Validate that test utilities are importable
333 | try:
334 | import importlib.util
335 |
336 | # Check if modules can be imported
337 | for module_name in [
338 | "tests.fixtures.confluence_mocks",
339 | "tests.fixtures.jira_mocks",
340 | "tests.utils.base",
341 | "tests.utils.factories",
342 | "tests.utils.mocks",
343 | ]:
344 | spec = importlib.util.find_spec(module_name)
345 | if spec is None:
346 | pytest.fail(f"Failed to find module: {module_name}")
347 | except ImportError as e:
348 | pytest.fail(f"Failed to import test utilities: {e}")
349 |
350 | # Log session start
351 | print("\n🧪 Starting MCP Atlassian test session with enhanced fixtures")
352 |
353 | yield
354 |
355 | # Log session end
356 | print("\n✅ Completed MCP Atlassian test session")
357 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/formatting.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira content formatting utilities."""
2 |
3 | import html
4 | import logging
5 | import re
6 | from typing import Any
7 |
8 | from ..preprocessing.jira import JiraPreprocessor
9 | from .client import JiraClient
10 | from .protocols import (
11 | EpicOperationsProto,
12 | FieldsOperationsProto,
13 | IssueOperationsProto,
14 | UsersOperationsProto,
15 | )
16 |
17 | logger = logging.getLogger("mcp-jira")
18 |
19 |
20 | class FormattingMixin(
21 | JiraClient,
22 | EpicOperationsProto,
23 | FieldsOperationsProto,
24 | IssueOperationsProto,
25 | UsersOperationsProto,
26 | ):
27 | """Mixin for Jira content formatting operations.
28 |
29 | This mixin provides utilities for converting between different formats,
30 | formatting issue content for display, parsing dates, and sanitizing content.
31 | """
32 |
33 | def __init__(self, *args: Any, **kwargs: Any) -> None:
34 | """Initialize the FormattingMixin.
35 |
36 | Args:
37 | *args: Positional arguments for the JiraClient
38 | **kwargs: Keyword arguments for the JiraClient
39 | """
40 | super().__init__(*args, **kwargs)
41 |
42 | # Use the JiraPreprocessor with the base URL from the client
43 | base_url = ""
44 | if hasattr(self, "config") and hasattr(self.config, "url"):
45 | base_url = self.config.url
46 | self.preprocessor = JiraPreprocessor(base_url=base_url)
47 |
48 | def markdown_to_jira(self, markdown_text: str) -> str:
49 | """
50 | Convert Markdown syntax to Jira markup syntax.
51 |
52 | This method uses the TextPreprocessor implementation for consistent
53 | conversion between Markdown and Jira markup.
54 |
55 | Args:
56 | markdown_text: Text in Markdown format
57 |
58 | Returns:
59 | Text in Jira markup format
60 | """
61 | if not markdown_text:
62 | return ""
63 |
64 | try:
65 | # Use the existing preprocessor
66 | return self.preprocessor.markdown_to_jira(markdown_text)
67 |
68 | except Exception as e:
69 | logger.warning(f"Error converting markdown to Jira format: {str(e)}")
70 | # Return the original text if conversion fails
71 | return markdown_text
72 |
73 | def format_issue_content(
74 | self,
75 | issue_key: str,
76 | issue: dict[str, Any],
77 | description: str,
78 | comments: list[dict[str, Any]],
79 | created_date: str,
80 | epic_info: dict[str, str | None],
81 | ) -> str:
82 | """
83 | Format the issue content for display.
84 |
85 | Args:
86 | issue_key: The issue key
87 | issue: The issue data from Jira
88 | description: Processed description text
89 | comments: List of comment dictionaries
90 | created_date: Formatted created date
91 | epic_info: Dictionary with epic_key and epic_name
92 |
93 | Returns:
94 | Formatted content string
95 | """
96 | # Basic issue information
97 | content = f"""Issue: {issue_key}
98 | Title: {issue["fields"].get("summary", "")}
99 | Type: {issue["fields"]["issuetype"]["name"]}
100 | Status: {issue["fields"]["status"]["name"]}
101 | Created: {created_date}
102 | """
103 |
104 | # Add Epic information if available
105 | if epic_info.get("epic_key"):
106 | content += f"Epic: {epic_info['epic_key']}"
107 | if epic_info.get("epic_name"):
108 | content += f" - {epic_info['epic_name']}"
109 | content += "\n"
110 |
111 | content += f"""
112 | Description:
113 | {description}
114 | """
115 | # Add comments if present
116 | if comments:
117 | content += "\nComments:\n" + "\n".join(
118 | [f"{c['created']} - {c['author']}: {c['body']}" for c in comments]
119 | )
120 |
121 | return content
122 |
123 | def create_issue_metadata(
124 | self,
125 | issue_key: str,
126 | issue: dict[str, Any],
127 | comments: list[dict[str, Any]],
128 | created_date: str,
129 | epic_info: dict[str, str | None],
130 | ) -> dict[str, Any]:
131 | """
132 | Create metadata for the issue document.
133 |
134 | Args:
135 | issue_key: The issue key
136 | issue: The issue data from Jira
137 | comments: List of comment dictionaries
138 | created_date: Formatted created date
139 | epic_info: Dictionary with epic_key and epic_name
140 |
141 | Returns:
142 | Metadata dictionary
143 | """
144 | # Extract fields
145 | fields = issue.get("fields", {})
146 |
147 | # Basic metadata
148 | metadata = {
149 | "key": issue_key,
150 | "summary": fields.get("summary", ""),
151 | "type": fields.get("issuetype", {}).get("name", ""),
152 | "status": fields.get("status", {}).get("name", ""),
153 | "created": created_date,
154 | "source": "jira",
155 | }
156 |
157 | # Add assignee if present
158 | if fields.get("assignee"):
159 | metadata["assignee"] = fields["assignee"].get(
160 | "displayName", fields["assignee"].get("name", "")
161 | )
162 |
163 | # Add reporter if present
164 | if fields.get("reporter"):
165 | metadata["reporter"] = fields["reporter"].get(
166 | "displayName", fields["reporter"].get("name", "")
167 | )
168 |
169 | # Add priority if present
170 | if fields.get("priority"):
171 | metadata["priority"] = fields["priority"].get("name", "")
172 |
173 | # Add Epic information to metadata if available
174 | if epic_info.get("epic_key"):
175 | metadata["epic_key"] = epic_info["epic_key"]
176 | if epic_info.get("epic_name"):
177 | metadata["epic_name"] = epic_info["epic_name"]
178 |
179 | # Add project information
180 | if fields.get("project"):
181 | metadata["project"] = fields["project"].get("key", "")
182 | metadata["project_name"] = fields["project"].get("name", "")
183 |
184 | # Add comment count
185 | metadata["comment_count"] = len(comments)
186 |
187 | return metadata
188 |
189 | def extract_epic_information(
190 | self, issue: dict[str, Any]
191 | ) -> dict[str, None] | dict[str, str]:
192 | """
193 | Extract epic information from issue data.
194 |
195 | Args:
196 | issue: Issue data dictionary
197 |
198 | Returns:
199 | Dictionary containing epic_key and epic_name (or None if not found)
200 | """
201 | epic_info = {"epic_key": None, "epic_name": None}
202 |
203 | # Check if the issue has fields
204 | if "fields" not in issue:
205 | return epic_info
206 |
207 | fields = issue["fields"]
208 |
209 | # Try to get the epic link from issue
210 | # (requires the correct field ID which varies across instances)
211 | # Use the field discovery mechanism if available
212 | try:
213 | field_ids = self.get_field_ids_to_epic()
214 |
215 | # Get the epic link field ID
216 | epic_link_field = field_ids.get("Epic Link")
217 | if (
218 | epic_link_field
219 | and epic_link_field in fields
220 | and fields[epic_link_field]
221 | ):
222 | epic_info["epic_key"] = fields[epic_link_field]
223 |
224 | # If the issue is linked to an epic, try to get the epic name
225 | if epic_info["epic_key"] and hasattr(self, "get_issue"):
226 | try:
227 | epic_issue = self.get_issue(epic_info["epic_key"])
228 | epic_fields = epic_issue.get("fields", {})
229 |
230 | # Get the epic name field ID
231 | epic_name_field = field_ids.get("Epic Name")
232 | if epic_name_field and epic_name_field in epic_fields:
233 | epic_info["epic_name"] = epic_fields[epic_name_field]
234 |
235 | except Exception as e:
236 | logger.warning(f"Error getting epic details: {str(e)}")
237 |
238 | except Exception as e:
239 | logger.warning(f"Error extracting epic information: {str(e)}")
240 |
241 | return epic_info
242 |
243 | def sanitize_html(self, html_content: str) -> str:
244 | """
245 | Sanitize HTML content by removing HTML tags.
246 |
247 | Args:
248 | html_content: HTML content to sanitize
249 |
250 | Returns:
251 | Plaintext content with HTML tags removed
252 | """
253 | if not html_content:
254 | return ""
255 |
256 | try:
257 | # Remove HTML tags
258 | plain_text = re.sub(r"<[^>]+>", "", html_content)
259 | # Decode HTML entities
260 | plain_text = html.unescape(plain_text)
261 | # Normalize whitespace
262 | plain_text = re.sub(r"\s+", " ", plain_text).strip()
263 |
264 | return plain_text
265 |
266 | except Exception as e:
267 | logger.warning(f"Error sanitizing HTML: {str(e)}")
268 | return html_content
269 |
270 | def sanitize_transition_fields(self, fields: dict[str, Any]) -> dict[str, Any]:
271 | """
272 | Sanitize fields to ensure they're valid for the Jira API.
273 |
274 | This is used for transition data to properly format field values.
275 |
276 | Args:
277 | fields: Dictionary of fields to sanitize
278 |
279 | Returns:
280 | Dictionary of sanitized fields
281 | """
282 | sanitized_fields = {}
283 |
284 | for key, value in fields.items():
285 | # Skip empty values
286 | if value is None:
287 | continue
288 |
289 | # Handle assignee field specially
290 | if key in ["assignee", "reporter"]:
291 | # If the value is already a dictionary, use it as is
292 | if isinstance(value, dict) and "accountId" in value:
293 | sanitized_fields[key] = value
294 | else:
295 | # Otherwise, look up the account ID
296 | if not isinstance(value, str):
297 | logger.warning(f"Invalid assignee value: {value}")
298 | continue
299 |
300 | try:
301 | account_id = self._get_account_id(value)
302 | if account_id:
303 | sanitized_fields[key] = {"accountId": account_id}
304 | except Exception as e:
305 | logger.warning(
306 | f"Error getting account ID for {value}: {str(e)}"
307 | )
308 | # All other fields pass through as is
309 | else:
310 | sanitized_fields[key] = value
311 |
312 | return sanitized_fields
313 |
314 | def add_comment_to_transition_data(
315 | self, transition_data: dict[str, Any], comment: str | None
316 | ) -> dict[str, Any]:
317 | """
318 | Add a comment to transition data.
319 |
320 | Args:
321 | transition_data: Transition data dictionary
322 | comment: Comment text (in Markdown format) or None
323 |
324 | Returns:
325 | Updated transition data
326 | """
327 | if not comment:
328 | return transition_data
329 |
330 | # Convert markdown to Jira format
331 | jira_formatted_comment = self.markdown_to_jira(comment)
332 |
333 | # Add the comment to the transition data
334 | transition_data["update"] = {
335 | "comment": [{"add": {"body": jira_formatted_comment}}]
336 | }
337 |
338 | return transition_data
339 |
```
--------------------------------------------------------------------------------
/tests/integration/test_transport_lifecycle.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for transport lifecycle behavior.
2 |
3 | These tests ensure that:
4 | 1. No stdin monitoring is used (preventing issues #519 and #524)
5 | 2. Stdio transport doesn't conflict with MCP server's internal stdio handling
6 | 3. All transports use direct execution
7 | 4. Docker scenarios work correctly
8 | """
9 |
10 | import asyncio
11 | from io import StringIO
12 | from unittest.mock import AsyncMock, MagicMock, patch
13 |
14 | import pytest
15 |
16 | from mcp_atlassian import main
17 | from mcp_atlassian.utils.lifecycle import _shutdown_event
18 |
19 |
20 | @pytest.mark.integration
21 | class TestTransportLifecycleBehavior:
22 | """Test transport lifecycle behavior to prevent regression of issues #519 and #524."""
23 |
24 | def setup_method(self):
25 | """Reset state before each test."""
26 | _shutdown_event.clear()
27 |
28 | def test_all_transports_use_direct_execution(self):
29 | """Verify all transports use direct execution without stdin monitoring.
30 |
31 | This is a regression test to ensure stdin monitoring is never reintroduced,
32 | which caused both issue #519 (stdio conflicts) and #524 (HTTP session termination).
33 | """
34 | transports_to_test = ["stdio", "sse", "streamable-http"]
35 |
36 | for transport in transports_to_test:
37 | with patch("asyncio.run") as mock_asyncio_run:
38 | with patch.dict("os.environ", {"TRANSPORT": transport}, clear=False):
39 | with (
40 | patch(
41 | "mcp_atlassian.servers.main.AtlassianMCP"
42 | ) as mock_server_class,
43 | patch("click.core.Context") as mock_click_ctx,
44 | ):
45 | # Setup mocks
46 | mock_server = MagicMock()
47 | mock_server.run_async = AsyncMock()
48 | mock_server_class.return_value = mock_server
49 |
50 | # Mock CLI context
51 | mock_ctx_instance = MagicMock()
52 | mock_ctx_instance.obj = {
53 | "transport": transport,
54 | "port": None,
55 | "host": None,
56 | "path": None,
57 | }
58 | mock_click_ctx.return_value = mock_ctx_instance
59 |
60 | # Execute main
61 | with patch("sys.argv", ["mcp-atlassian"]):
62 | try:
63 | main()
64 | except SystemExit:
65 | pass
66 |
67 | # Verify direct execution for all transports
68 | assert mock_asyncio_run.called, (
69 | f"asyncio.run not called for {transport}"
70 | )
71 | called_coro = mock_asyncio_run.call_args[0][0]
72 |
73 | # Ensure NO stdin monitoring wrapper is used
74 | coro_str = str(called_coro)
75 | assert "run_with_stdio_monitoring" not in coro_str, (
76 | f"{transport} should not use stdin monitoring"
77 | )
78 | assert "run_async" in coro_str or hasattr(
79 | called_coro, "cr_code"
80 | ), f"{transport} should use direct run_async execution"
81 |
82 | @pytest.mark.anyio
83 | async def test_stdio_no_race_condition(self):
84 | """Test that stdio transport doesn't create race condition with MCP server.
85 |
86 | After the fix, stdin monitoring has been removed completely, so there's
87 | no possibility of race conditions between components trying to read stdin.
88 | """
89 | # Create a mock stdin that tracks reads
90 | read_count = 0
91 |
92 | class MockStdin:
93 | def __init__(self):
94 | self.closed = False
95 | self._read_lock = asyncio.Lock()
96 |
97 | async def readline(self):
98 | nonlocal read_count
99 |
100 | async with self._read_lock:
101 | if self.closed:
102 | raise ValueError("I/O operation on closed file")
103 |
104 | read_count += 1
105 | return b"" # EOF
106 |
107 | mock_stdin = MockStdin()
108 |
109 | # Mock the server coroutine that reads stdin
110 | async def mock_server_with_stdio(**kwargs):
111 | """Simulates MCP server reading from stdin."""
112 | # MCP server would normally read stdin here
113 | await mock_stdin.readline()
114 | return "completed"
115 |
116 | # Test direct server execution (current behavior)
117 | with patch("sys.stdin", mock_stdin):
118 | # Run server directly without any stdin monitoring
119 | result = await mock_server_with_stdio()
120 |
121 | # Should only have one read - from the MCP server itself
122 | assert read_count == 1
123 | assert result == "completed"
124 |
125 | def test_main_function_transport_logic(self):
126 | """Test the main function's transport determination logic."""
127 | test_cases = [
128 | # (cli_transport, env_transport, expected_final_transport)
129 | ("stdio", None, "stdio"),
130 | ("sse", None, "sse"),
131 | (None, "stdio", "stdio"),
132 | (None, "sse", "sse"),
133 | ("stdio", "sse", "stdio"), # CLI overrides env
134 | ]
135 |
136 | for cli_transport, env_transport, _expected_transport in test_cases:
137 | with patch("asyncio.run") as mock_asyncio_run:
138 | env_vars = {}
139 | if env_transport:
140 | env_vars["TRANSPORT"] = env_transport
141 |
142 | with patch.dict("os.environ", env_vars, clear=False):
143 | with (
144 | patch(
145 | "mcp_atlassian.servers.main.AtlassianMCP"
146 | ) as mock_server_class,
147 | patch("click.core.Context") as mock_click_ctx,
148 | ):
149 | # Setup mocks
150 | mock_server = MagicMock()
151 | mock_server.run_async = AsyncMock()
152 | mock_server_class.return_value = mock_server
153 |
154 | # Mock CLI context
155 | mock_ctx_instance = MagicMock()
156 | mock_ctx_instance.obj = {
157 | "transport": cli_transport,
158 | "port": None,
159 | "host": None,
160 | "path": None,
161 | }
162 | mock_click_ctx.return_value = mock_ctx_instance
163 |
164 | # Run main
165 | with patch("sys.argv", ["mcp-atlassian"]):
166 | try:
167 | main()
168 | except SystemExit:
169 | pass
170 |
171 | # Verify asyncio.run was called
172 | assert mock_asyncio_run.called
173 |
174 | # All transports now run directly without stdin monitoring
175 | called_coro = mock_asyncio_run.call_args[0][0]
176 | # Should always call run_async directly
177 | assert hasattr(called_coro, "cr_code") or "run_async" in str(
178 | called_coro
179 | )
180 |
181 | @pytest.mark.anyio
182 | async def test_shutdown_event_handling(self):
183 | """Test that shutdown events are handled correctly for all transports."""
184 | # Pre-set shutdown event
185 | _shutdown_event.set()
186 |
187 | async def mock_server(**kwargs):
188 | # Should run even with shutdown event set
189 | return "completed"
190 |
191 | # Server runs directly now
192 | result = await mock_server()
193 |
194 | # Server should complete normally
195 | assert result == "completed"
196 |
197 | def test_docker_stdio_scenario(self):
198 | """Test the specific Docker stdio scenario that caused the bug.
199 |
200 | This simulates running in Docker with -i flag where stdin is available
201 | but both components trying to read it causes conflicts.
202 | """
203 | with patch("asyncio.run") as mock_asyncio_run:
204 | # Simulate Docker environment variables
205 | docker_env = {
206 | "TRANSPORT": "stdio",
207 | "JIRA_URL": "https://example.atlassian.net",
208 | "JIRA_USERNAME": "[email protected]",
209 | "JIRA_API_TOKEN": "token",
210 | }
211 |
212 | with patch.dict("os.environ", docker_env, clear=False):
213 | with (
214 | patch(
215 | "mcp_atlassian.servers.main.AtlassianMCP"
216 | ) as mock_server_class,
217 | patch("sys.stdin", StringIO()), # Simulate available stdin
218 | ):
219 | # Setup mock server
220 | mock_server = MagicMock()
221 | mock_server.run_async = AsyncMock()
222 | mock_server_class.return_value = mock_server
223 |
224 | # Simulate Docker container startup
225 | with patch("sys.argv", ["mcp-atlassian"]):
226 | try:
227 | main()
228 | except SystemExit:
229 | pass
230 |
231 | # Verify stdio transport doesn't use lifecycle monitoring
232 | assert mock_asyncio_run.called
233 | called_coro = mock_asyncio_run.call_args[0][0]
234 |
235 | # All transports now use run_async directly
236 | assert hasattr(called_coro, "cr_code") or "run_async" in str(
237 | called_coro
238 | )
239 |
240 |
241 | @pytest.mark.integration
242 | class TestRegressionPrevention:
243 | """Tests to prevent regression of specific issues."""
244 |
245 | def test_no_stdin_monitoring_in_codebase(self):
246 | """Ensure stdin monitoring is not reintroduced in the codebase.
247 |
248 | This is a safeguard against reintroducing the flawed stdin monitoring
249 | that caused issues #519 and #524.
250 | """
251 | # Check that the problematic function doesn't exist
252 | from mcp_atlassian.utils import lifecycle
253 |
254 | assert not hasattr(lifecycle, "run_with_stdio_monitoring"), (
255 | "run_with_stdio_monitoring should not exist in lifecycle module"
256 | )
257 |
258 | def test_signal_handlers_are_setup(self):
259 | """Verify signal handlers are properly configured."""
260 | with patch("mcp_atlassian.setup_signal_handlers") as mock_setup:
261 | with patch("asyncio.run"):
262 | with patch("mcp_atlassian.servers.main.AtlassianMCP"):
263 | with patch("sys.argv", ["mcp-atlassian"]):
264 | try:
265 | main()
266 | except SystemExit:
267 | pass
268 |
269 | # Signal handlers should always be set up
270 | mock_setup.assert_called_once()
271 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_sprints.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira SprintMixin"""
2 |
3 | from unittest.mock import MagicMock
4 |
5 | import pytest
6 | import requests
7 |
8 | from mcp_atlassian.jira import JiraConfig
9 | from mcp_atlassian.jira.sprints import SprintsMixin
10 | from mcp_atlassian.models.jira import JiraSprint
11 |
12 |
13 | @pytest.fixture
14 | def mock_config():
15 | """Fixture to create a mock JiraConfig instance."""
16 | config = MagicMock(spec=JiraConfig)
17 | config.url = "https://test.atlassian.net"
18 | config.username = "[email protected]"
19 | config.api_token = "test-token"
20 | config.auth_type = "pat"
21 | return config
22 |
23 |
24 | @pytest.fixture
25 | def sprints_mixin(mock_config):
26 | """Fixture to create a SprintsMixin instance for testing."""
27 | mixin = SprintsMixin(config=mock_config)
28 | mixin.jira = MagicMock()
29 |
30 | return mixin
31 |
32 |
33 | @pytest.fixture
34 | def mock_sprints():
35 | """Fixture to return mock boards data."""
36 | return {
37 | "maxResults": 50,
38 | "startAt": 0,
39 | "isLast": True,
40 | "values": [
41 | {
42 | "id": 10000,
43 | "self": "https://test.atlassian.net/rest/agile/1.0/sprint/10000",
44 | "state": "closed",
45 | "name": "Sprint 0",
46 | "startDate": "2099-05-06T02:00:00.000Z",
47 | "endDate": "2100-05-17T10:00:00.000Z",
48 | "completeDate": "2024-05-20T05:17:24.302Z",
49 | "activatedDate": "2024-05-07T01:22:45.128Z",
50 | "originBoardId": 1000,
51 | "goal": "",
52 | "synced": False,
53 | "autoStartStop": False,
54 | },
55 | {
56 | "id": 10001,
57 | "self": "https://test.atlassian.net/rest/agile/1.0/sprint/10001",
58 | "state": "active",
59 | "name": "Sprint 1",
60 | "startDate": "2099-03-24T06:13:00.000Z",
61 | "endDate": "2100-04-07T06:13:00.000Z",
62 | "activatedDate": "2025-03-24T06:13:20.729Z",
63 | "originBoardId": 1000,
64 | "goal": "",
65 | "synced": False,
66 | "autoStartStop": False,
67 | },
68 | {
69 | "id": 10002,
70 | "self": "https://test.atlassian.net/rest/agile/1.0/sprint/10002",
71 | "state": "future",
72 | "name": "Sprint 2",
73 | "originBoardId": 1000,
74 | "synced": False,
75 | "autoStartStop": False,
76 | },
77 | ],
78 | }
79 |
80 |
81 | def test_get_all_sprints_from_board(sprints_mixin, mock_sprints):
82 | """Test get_all_sprints_from_board method."""
83 | sprints_mixin.jira.get_all_sprints_from_board.return_value = mock_sprints
84 |
85 | result = sprints_mixin.get_all_sprints_from_board("1000")
86 | assert result == mock_sprints["values"]
87 |
88 |
89 | def test_get_all_sprints_from_board_exception(sprints_mixin):
90 | """Test get_all_sprints_from_board method with exception."""
91 | sprints_mixin.jira.get_all_sprints_from_board.side_effect = Exception("API Error")
92 |
93 | result = sprints_mixin.get_all_sprints_from_board("1000")
94 | assert result == []
95 | sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()
96 |
97 |
98 | def test_get_all_sprints_from_board_http_error(sprints_mixin):
99 | """Test get_all_sprints_from_board method with HTTPError."""
100 | sprints_mixin.jira.get_all_sprints_from_board.side_effect = requests.HTTPError(
101 | response=MagicMock(content="API Error content")
102 | )
103 |
104 | result = sprints_mixin.get_all_sprints_from_board("1000")
105 | assert result == []
106 | sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()
107 |
108 |
109 | def test_get_all_sprints_from_board_non_dict_response(sprints_mixin):
110 | """Test get_all_sprints_from_board method with non-list response."""
111 | sprints_mixin.jira.get_all_sprints_from_board.return_value = "not a dict"
112 |
113 | result = sprints_mixin.get_all_sprints_from_board("1000")
114 | assert result == []
115 | sprints_mixin.jira.get_all_sprints_from_board.assert_called_once()
116 |
117 |
118 | def test_get_all_sprints_from_board_model(sprints_mixin, mock_sprints):
119 | sprints_mixin.jira.get_all_sprints_from_board.return_value = mock_sprints
120 |
121 | result = sprints_mixin.get_all_sprints_from_board_model(board_id="1000", state=None)
122 | assert result == [
123 | JiraSprint.from_api_response(value) for value in mock_sprints["values"]
124 | ]
125 |
126 |
127 | def test_create_sprint(sprints_mixin, mock_sprints):
128 | """Test create_sprint method."""
129 | sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
130 |
131 | result = sprints_mixin.create_sprint(
132 | sprint_name="Sprint 1",
133 | board_id="10001",
134 | start_date="2099-05-01T00:00:00.000Z",
135 | end_date="2100-05-01T00:00:00.000Z",
136 | goal="Your goal",
137 | )
138 | assert result == JiraSprint.from_api_response(mock_sprints["values"][1])
139 |
140 |
141 | def test_create_sprint_http_exception(sprints_mixin, mock_sprints):
142 | """Test create_sprint method."""
143 | sprints_mixin.jira.create_sprint.side_effect = requests.HTTPError(
144 | response=MagicMock(content="API Error content")
145 | )
146 |
147 | with pytest.raises(requests.HTTPError):
148 | sprints_mixin.create_sprint(
149 | sprint_name="Sprint 1",
150 | board_id="10001",
151 | start_date="2099-05-01T00:00:00.000Z",
152 | end_date="2100-05-15T00:00:00.000Z",
153 | goal="Your goal",
154 | )
155 |
156 |
157 | def test_create_sprint_exception(sprints_mixin, mock_sprints):
158 | """Test create_sprint method throws general Exception."""
159 | sprints_mixin.jira.create_sprint.side_effect = Exception
160 |
161 | with pytest.raises(Exception):
162 | sprints_mixin.create_sprint(
163 | sprint_name="Sprint 1",
164 | board_id="10001",
165 | start_date="2099-05-01T00:00:00.000Z",
166 | end_date="2100-05-15T00:00:00.000Z",
167 | goal="Your goal",
168 | )
169 |
170 |
171 | def test_create_sprint_test_missing_startdate(sprints_mixin, mock_sprints):
172 | """Test create_sprint method."""
173 | sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
174 |
175 | with pytest.raises(ValueError) as excinfo:
176 | sprints_mixin.create_sprint(
177 | sprint_name="Sprint 1",
178 | board_id="10001",
179 | start_date="",
180 | end_date="2100-05-15T00:00:00.000Z",
181 | goal="Your goal",
182 | )
183 | assert str(excinfo.value) == "Start date is required."
184 |
185 |
186 | def test_create_sprint_test_invalid_startdate(sprints_mixin, mock_sprints):
187 | """Test create_sprint method."""
188 | sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
189 |
190 | with pytest.raises(ValueError):
191 | sprints_mixin.create_sprint(
192 | sprint_name="Sprint 1",
193 | board_id="10001",
194 | start_date="IAMNOTADATE!",
195 | end_date="2100-05-15T00:00:00.000Z",
196 | goal="Your goal",
197 | )
198 |
199 |
200 | def test_create_sprint_test_no_enddate(sprints_mixin, mock_sprints):
201 | """Test create_sprint method."""
202 | sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
203 |
204 | result = sprints_mixin.create_sprint(
205 | sprint_name="Sprint 1",
206 | board_id="10001",
207 | start_date="2099-05-15T00:00:00.000Z",
208 | end_date=None,
209 | goal="Your goal",
210 | )
211 | assert result == JiraSprint.from_api_response(mock_sprints["values"][1])
212 |
213 |
214 | def test_create_sprint_test_invalid_enddate(sprints_mixin, mock_sprints):
215 | """Test create_sprint method."""
216 | sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
217 |
218 | with pytest.raises(ValueError):
219 | sprints_mixin.create_sprint(
220 | sprint_name="Sprint 1",
221 | board_id="10001",
222 | start_date="2099-05-15T00:00:00.000Z",
223 | end_date="IAMNOTADATE!",
224 | goal="Your goal",
225 | )
226 |
227 |
228 | def test_create_sprint_test_startdate_after_enddate(sprints_mixin, mock_sprints):
229 | """Test create_sprint method."""
230 | sprints_mixin.jira.create_sprint.return_value = mock_sprints["values"][1]
231 |
232 | with pytest.raises(ValueError, match="Start date must be before end date."):
233 | sprints_mixin.create_sprint(
234 | sprint_name="Sprint 1",
235 | board_id="10001",
236 | start_date="2100-05-15T00:00:00.000Z",
237 | end_date="2099-05-15T00:00:00.000Z",
238 | goal="Your goal",
239 | )
240 |
241 |
242 | def test_update_sprint_success(sprints_mixin, mock_sprints):
243 | """Test update_sprint method with valid data."""
244 | mock_updated_sprint = mock_sprints["values"][0]
245 | sprints_mixin.jira.update_partially_sprint.return_value = mock_updated_sprint
246 |
247 | result = sprints_mixin.update_sprint(
248 | sprint_id="10000",
249 | sprint_name="Updated Sprint Name",
250 | state="active",
251 | start_date="2024-05-01T00:00:00.000Z",
252 | end_date="2024-05-15T00:00:00.000Z",
253 | goal="Updated goal",
254 | )
255 |
256 | assert result == JiraSprint.from_api_response(mock_updated_sprint)
257 | sprints_mixin.jira.update_partially_sprint.assert_called_once_with(
258 | sprint_id="10000",
259 | data={
260 | "name": "Updated Sprint Name",
261 | "state": "active",
262 | "startDate": "2024-05-01T00:00:00.000Z",
263 | "endDate": "2024-05-15T00:00:00.000Z",
264 | "goal": "Updated goal",
265 | },
266 | )
267 |
268 |
269 | def test_update_sprint_invalid_state(sprints_mixin):
270 | """Test update_sprint method with invalid state."""
271 | result = sprints_mixin.update_sprint(
272 | sprint_id="10000",
273 | sprint_name="Updated Sprint Name",
274 | state="invalid_state",
275 | start_date=None,
276 | end_date=None,
277 | goal=None,
278 | )
279 |
280 | assert result is None
281 | sprints_mixin.jira.update_partially_sprint.assert_not_called()
282 |
283 |
284 | def test_update_sprint_missing_sprint_id(sprints_mixin):
285 | """Test update_sprint method with missing sprint_id."""
286 | result = sprints_mixin.update_sprint(
287 | sprint_id=None,
288 | sprint_name="Updated Sprint Name",
289 | state="active",
290 | start_date=None,
291 | end_date=None,
292 | goal=None,
293 | )
294 |
295 | assert result is None
296 | sprints_mixin.jira.update_partially_sprint.assert_not_called()
297 |
298 |
299 | def test_update_sprint_http_error(sprints_mixin):
300 | """Test update_sprint method with HTTPError."""
301 | sprints_mixin.jira.update_partially_sprint.side_effect = requests.HTTPError(
302 | response=MagicMock(content="API Error content")
303 | )
304 |
305 | result = sprints_mixin.update_sprint(
306 | sprint_id="10000",
307 | sprint_name="Updated Sprint Name",
308 | state="active",
309 | start_date=None,
310 | end_date=None,
311 | goal=None,
312 | )
313 |
314 | assert result is None
315 | sprints_mixin.jira.update_partially_sprint.assert_called_once()
316 |
317 |
318 | def test_update_sprint_exception(sprints_mixin):
319 | """Test update_sprint method with a generic exception."""
320 | sprints_mixin.jira.update_partially_sprint.side_effect = Exception(
321 | "Unexpected Error"
322 | )
323 |
324 | result = sprints_mixin.update_sprint(
325 | sprint_id="10000",
326 | sprint_name="Updated Sprint Name",
327 | state="active",
328 | start_date=None,
329 | end_date=None,
330 | goal=None,
331 | )
332 |
333 | assert result is None
334 | sprints_mixin.jira.update_partially_sprint.assert_called_once()
335 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/links.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira issue link operations."""
2 |
3 | import logging
4 | from typing import Any
5 |
6 | from requests.exceptions import HTTPError
7 |
8 | from ..exceptions import MCPAtlassianAuthenticationError
9 | from ..models.jira import JiraIssueLinkType
10 | from .client import JiraClient
11 |
12 | logger = logging.getLogger("mcp-jira")
13 |
14 |
15 | class LinksMixin(JiraClient):
16 | """Mixin for Jira issue link operations."""
17 |
18 | def get_issue_link_types(self) -> list[JiraIssueLinkType]:
19 | """
20 | Get all available issue link types.
21 |
22 | Returns:
23 | List of JiraIssueLinkType objects
24 |
25 | Raises:
26 | MCPAtlassianAuthenticationError: If authentication fails with the Jira API
27 | (401/403)
28 | Exception: If there is an error retrieving issue link types
29 | """
30 | try:
31 | link_types_response = self.jira.get("rest/api/2/issueLinkType")
32 | if not isinstance(link_types_response, dict):
33 | msg = f"Unexpected return value type from `jira.get`: {type(link_types_response)}"
34 | logger.error(msg)
35 | raise TypeError(msg)
36 |
37 | link_types_data = link_types_response.get("issueLinkTypes", [])
38 |
39 | link_types = [
40 | JiraIssueLinkType.from_api_response(link_type)
41 | for link_type in link_types_data
42 | ]
43 |
44 | return link_types
45 |
46 | except HTTPError as http_err:
47 | if http_err.response is not None and http_err.response.status_code in [
48 | 401,
49 | 403,
50 | ]:
51 | error_msg = (
52 | f"Authentication failed for Jira API "
53 | f"({http_err.response.status_code}). "
54 | "Token may be expired or invalid. Please verify credentials."
55 | )
56 | logger.error(error_msg)
57 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
58 | else:
59 | logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
60 | raise Exception(
61 | f"Error getting issue link types: {http_err}"
62 | ) from http_err
63 | except Exception as e:
64 | error_msg = str(e)
65 | logger.error(f"Error getting issue link types: {error_msg}", exc_info=True)
66 | raise Exception(f"Error getting issue link types: {error_msg}") from e
67 |
68 | def create_issue_link(self, data: dict[str, Any]) -> dict[str, Any]:
69 | """
70 | Create a link between two issues.
71 |
72 | Args:
73 | data: A dictionary containing the link data with the following structure:
74 | {
75 | "type": {"name": "Duplicate" }, # Link type name (e.g., "Duplicate", "Blocks", "Relates to")
76 | "inwardIssue": { "key": "ISSUE-1"}, # The issue that is the source of the link
77 | "outwardIssue": {"key": "ISSUE-2"}, # The issue that is the target of the link
78 | "comment": { # Optional comment to add to the link
79 | "body": "Linked related issue!",
80 | "visibility": { # Optional visibility settings
81 | "type": "group",
82 | "value": "jira-software-users"
83 | }
84 | }
85 | }
86 |
87 | Returns:
88 | Dictionary with the created link information
89 |
90 | Raises:
91 | ValueError: If required fields are missing
92 | MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
93 | Exception: If there is an error creating the issue link
94 | """
95 | # Validate required fields
96 | if not data.get("type"):
97 | raise ValueError("Link type is required")
98 | if not data.get("inwardIssue") or not data["inwardIssue"].get("key"):
99 | raise ValueError("Inward issue key is required")
100 | if not data.get("outwardIssue") or not data["outwardIssue"].get("key"):
101 | raise ValueError("Outward issue key is required")
102 |
103 | try:
104 | # Create the issue link
105 | self.jira.create_issue_link(data)
106 |
107 | # Return a response with the link information
108 | response = {
109 | "success": True,
110 | "message": f"Link created between {data['inwardIssue']['key']} and {data['outwardIssue']['key']}",
111 | "link_type": data["type"]["name"],
112 | "inward_issue": data["inwardIssue"]["key"],
113 | "outward_issue": data["outwardIssue"]["key"],
114 | }
115 |
116 | return response
117 |
118 | except HTTPError as http_err:
119 | if http_err.response is not None and http_err.response.status_code in [
120 | 401,
121 | 403,
122 | ]:
123 | error_msg = (
124 | f"Authentication failed for Jira API "
125 | f"({http_err.response.status_code}). "
126 | "Token may be expired or invalid. Please verify credentials."
127 | )
128 | logger.error(error_msg)
129 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
130 | else:
131 | logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
132 | raise Exception(f"Error creating issue link: {http_err}") from http_err
133 | except Exception as e:
134 | error_msg = str(e)
135 | logger.error(f"Error creating issue link: {error_msg}", exc_info=True)
136 | raise Exception(f"Error creating issue link: {error_msg}") from e
137 |
138 | def create_remote_issue_link(
139 | self, issue_key: str, link_data: dict[str, Any]
140 | ) -> dict[str, Any]:
141 | """
142 | Create a remote issue link (web link or Confluence link) for an issue.
143 |
144 | Args:
145 | issue_key: The key of the issue to add the link to (e.g., 'PROJ-123')
146 | link_data: A dictionary containing the remote link data with the following structure:
147 | {
148 | "object": {
149 | "url": "https://example.com/page", # The URL to link to
150 | "title": "Example Page", # The title/name of the link
151 | "summary": "Optional description of the link", # Optional description
152 | "icon": { # Optional icon configuration
153 | "url16x16": "https://example.com/icon16.png",
154 | "title": "Icon Title"
155 | }
156 | },
157 | "relationship": "causes" # Optional relationship description
158 | }
159 |
160 | Returns:
161 | Dictionary with the created remote link information
162 |
163 | Raises:
164 | ValueError: If required fields are missing
165 | MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
166 | Exception: If there is an error creating the remote issue link
167 | """
168 | # Validate required fields
169 | if not issue_key:
170 | raise ValueError("Issue key is required")
171 | if not link_data.get("object"):
172 | raise ValueError("Link object is required")
173 | if not link_data["object"].get("url"):
174 | raise ValueError("URL is required in link object")
175 | if not link_data["object"].get("title"):
176 | raise ValueError("Title is required in link object")
177 |
178 | try:
179 | # Create the remote issue link using the Jira API
180 | endpoint = f"rest/api/3/issue/{issue_key}/remotelink"
181 | response = self.jira.post(endpoint, json=link_data)
182 |
183 | # Return a response with the link information
184 | result = {
185 | "success": True,
186 | "message": f"Remote link created for issue {issue_key}",
187 | "issue_key": issue_key,
188 | "link_title": link_data["object"]["title"],
189 | "link_url": link_data["object"]["url"],
190 | "relationship": link_data.get("relationship", ""),
191 | }
192 |
193 | return result
194 |
195 | except HTTPError as http_err:
196 | if http_err.response is not None and http_err.response.status_code in [
197 | 401,
198 | 403,
199 | ]:
200 | error_msg = (
201 | f"Authentication failed for Jira API "
202 | f"({http_err.response.status_code}). "
203 | "Token may be expired or invalid. Please verify credentials."
204 | )
205 | logger.error(error_msg)
206 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
207 | else:
208 | logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
209 | raise Exception(
210 | f"Error creating remote issue link: {http_err}"
211 | ) from http_err
212 | except Exception as e:
213 | error_msg = str(e)
214 | logger.error(
215 | f"Error creating remote issue link: {error_msg}", exc_info=True
216 | )
217 | raise Exception(f"Error creating remote issue link: {error_msg}") from e
218 |
219 | def remove_issue_link(self, link_id: str) -> dict[str, Any]:
220 | """
221 | Remove a link between two issues.
222 |
223 | Args:
224 | link_id: The ID of the link to remove
225 |
226 | Returns:
227 | Dictionary with the result of the operation
228 |
229 | Raises:
230 | ValueError: If link_id is empty
231 | MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
232 | Exception: If there is an error removing the issue link
233 | """
234 | # Validate input
235 | if not link_id:
236 | raise ValueError("Link ID is required")
237 |
238 | try:
239 | # Remove the issue link
240 | self.jira.remove_issue_link(link_id)
241 |
242 | # Return a response indicating success
243 | response = {
244 | "success": True,
245 | "message": f"Link with ID {link_id} has been removed",
246 | "link_id": link_id,
247 | }
248 |
249 | return response
250 |
251 | except HTTPError as http_err:
252 | if http_err.response is not None and http_err.response.status_code in [
253 | 401,
254 | 403,
255 | ]:
256 | error_msg = (
257 | f"Authentication failed for Jira API "
258 | f"({http_err.response.status_code}). "
259 | "Token may be expired or invalid. Please verify credentials."
260 | )
261 | logger.error(error_msg)
262 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
263 | else:
264 | logger.error(f"HTTP error during API call: {http_err}", exc_info=True)
265 | raise Exception(f"Error removing issue link: {http_err}") from http_err
266 | except Exception as e:
267 | error_msg = str(e)
268 | logger.error(f"Error removing issue link: {error_msg}", exc_info=True)
269 | raise Exception(f"Error removing issue link: {error_msg}") from e
270 |
```
--------------------------------------------------------------------------------
/tests/unit/servers/test_context.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the server context module."""
2 |
3 | import pytest
4 |
5 | from mcp_atlassian.confluence.config import ConfluenceConfig
6 | from mcp_atlassian.jira.config import JiraConfig
7 | from mcp_atlassian.servers.context import MainAppContext
8 |
9 |
10 | class TestMainAppContext:
11 | """Tests for the MainAppContext dataclass."""
12 |
13 | def test_initialization_with_defaults(self):
14 | """Test MainAppContext initialization with default values."""
15 | context = MainAppContext()
16 |
17 | assert context.full_jira_config is None
18 | assert context.full_confluence_config is None
19 | assert context.read_only is False
20 | assert context.enabled_tools is None
21 |
22 | def test_initialization_with_all_parameters(self):
23 | """Test MainAppContext initialization with all parameters provided."""
24 | # Arrange
25 | jira_config = JiraConfig(
26 | url="https://example.atlassian.net",
27 | auth_type="basic",
28 | username="[email protected]",
29 | api_token="test_token",
30 | )
31 | confluence_config = ConfluenceConfig(
32 | url="https://example.atlassian.net/wiki",
33 | auth_type="basic",
34 | username="[email protected]",
35 | api_token="test_token",
36 | )
37 | enabled_tools = ["jira_get_issue", "confluence_get_page"]
38 |
39 | # Act
40 | context = MainAppContext(
41 | full_jira_config=jira_config,
42 | full_confluence_config=confluence_config,
43 | read_only=True,
44 | enabled_tools=enabled_tools,
45 | )
46 |
47 | # Assert
48 | assert context.full_jira_config is jira_config
49 | assert context.full_confluence_config is confluence_config
50 | assert context.read_only is True
51 | assert context.enabled_tools == enabled_tools
52 |
53 | def test_initialization_with_partial_parameters(self):
54 | """Test MainAppContext initialization with some parameters provided."""
55 | # Arrange
56 | jira_config = JiraConfig(
57 | url="https://example.atlassian.net",
58 | auth_type="pat",
59 | personal_token="test_personal_token",
60 | )
61 |
62 | # Act
63 | context = MainAppContext(full_jira_config=jira_config, read_only=True)
64 |
65 | # Assert
66 | assert context.full_jira_config is jira_config
67 | assert context.full_confluence_config is None
68 | assert context.read_only is True
69 | assert context.enabled_tools is None
70 |
71 | def test_frozen_dataclass_behavior(self):
72 | """Test that MainAppContext is frozen and immutable."""
73 | # Arrange
74 | context = MainAppContext(read_only=False)
75 |
76 | # Act & Assert - should raise FrozenInstanceError when trying to modify
77 | with pytest.raises(AttributeError):
78 | context.read_only = True
79 |
80 | with pytest.raises(AttributeError):
81 | context.full_jira_config = JiraConfig(
82 | url="https://test.com",
83 | auth_type="basic",
84 | username="test",
85 | api_token="token",
86 | )
87 |
88 | def test_type_hint_compliance_jira_config(self):
89 | """Test type hint compliance for JiraConfig field."""
90 | # Test with None
91 | context = MainAppContext(full_jira_config=None)
92 | assert context.full_jira_config is None
93 |
94 | # Test with valid JiraConfig
95 | jira_config = JiraConfig(
96 | url="https://jira.example.com", auth_type="pat", personal_token="test_token"
97 | )
98 | context = MainAppContext(full_jira_config=jira_config)
99 | assert isinstance(context.full_jira_config, JiraConfig)
100 | assert context.full_jira_config.url == "https://jira.example.com"
101 |
102 | def test_type_hint_compliance_confluence_config(self):
103 | """Test type hint compliance for ConfluenceConfig field."""
104 | # Test with None
105 | context = MainAppContext(full_confluence_config=None)
106 | assert context.full_confluence_config is None
107 |
108 | # Test with valid ConfluenceConfig
109 | confluence_config = ConfluenceConfig(
110 | url="https://confluence.example.com",
111 | auth_type="pat",
112 | username="[email protected]",
113 | api_token="test_token",
114 | )
115 | context = MainAppContext(full_confluence_config=confluence_config)
116 | assert isinstance(context.full_confluence_config, ConfluenceConfig)
117 | assert context.full_confluence_config.url == "https://confluence.example.com"
118 |
119 | def test_enabled_tools_field_validation(self):
120 | """Test enabled_tools field validation and default handling."""
121 | # Test with None (default)
122 | context = MainAppContext()
123 | assert context.enabled_tools is None
124 |
125 | # Test with empty list
126 | context = MainAppContext(enabled_tools=[])
127 | assert context.enabled_tools == []
128 |
129 | # Test with list of strings
130 | tools = ["jira_create_issue", "confluence_create_page", "jira_search_issues"]
131 | context = MainAppContext(enabled_tools=tools)
132 | assert context.enabled_tools == tools
133 | assert len(context.enabled_tools) == 3
134 |
135 | def test_read_only_field_validation(self):
136 | """Test read_only field validation and default handling."""
137 | # Test default value
138 | context = MainAppContext()
139 | assert context.read_only is False
140 | assert isinstance(context.read_only, bool)
141 |
142 | # Test explicit True
143 | context = MainAppContext(read_only=True)
144 | assert context.read_only is True
145 | assert isinstance(context.read_only, bool)
146 |
147 | # Test explicit False
148 | context = MainAppContext(read_only=False)
149 | assert context.read_only is False
150 | assert isinstance(context.read_only, bool)
151 |
152 | def test_string_representation(self):
153 | """Test the string representation of MainAppContext."""
154 | # Test with default values
155 | context = MainAppContext()
156 | str_repr = str(context)
157 |
158 | assert "MainAppContext" in str_repr
159 | assert "full_jira_config=None" in str_repr
160 | assert "full_confluence_config=None" in str_repr
161 | assert "read_only=False" in str_repr
162 | assert "enabled_tools=None" in str_repr
163 |
164 | # Test with values provided
165 | jira_config = JiraConfig(
166 | url="https://test.atlassian.net",
167 | auth_type="basic",
168 | username="test",
169 | api_token="token",
170 | )
171 | context = MainAppContext(
172 | full_jira_config=jira_config,
173 | read_only=True,
174 | enabled_tools=["tool1", "tool2"],
175 | )
176 | str_repr = str(context)
177 |
178 | assert "MainAppContext" in str_repr
179 | assert "read_only=True" in str_repr
180 | assert "enabled_tools=['tool1', 'tool2']" in str_repr
181 |
182 | def test_equality_comparison(self):
183 | """Test equality comparison between MainAppContext instances."""
184 | # Test identical instances
185 | context1 = MainAppContext()
186 | context2 = MainAppContext()
187 | assert context1 == context2
188 |
189 | # Test instances with same values
190 | jira_config = JiraConfig(
191 | url="https://test.atlassian.net",
192 | auth_type="basic",
193 | username="test",
194 | api_token="token",
195 | )
196 | context1 = MainAppContext(
197 | full_jira_config=jira_config, read_only=True, enabled_tools=["tool1"]
198 | )
199 | context2 = MainAppContext(
200 | full_jira_config=jira_config, read_only=True, enabled_tools=["tool1"]
201 | )
202 | assert context1 == context2
203 |
204 | # Test instances with different values
205 | context3 = MainAppContext(read_only=False)
206 | context4 = MainAppContext(read_only=True)
207 | assert context3 != context4
208 |
209 | # Test with different configs
210 | different_jira_config = JiraConfig(
211 | url="https://different.atlassian.net",
212 | auth_type="basic",
213 | username="different",
214 | api_token="different_token",
215 | )
216 | context5 = MainAppContext(full_jira_config=jira_config)
217 | context6 = MainAppContext(full_jira_config=different_jira_config)
218 | assert context5 != context6
219 |
220 | def test_hash_behavior(self):
221 | """Test hash behavior for MainAppContext instances."""
222 | # Test that instances with only hashable fields (None configs, no enabled_tools) can be hashed
223 | context1 = MainAppContext(read_only=True)
224 | context2 = MainAppContext(read_only=True)
225 | assert hash(context1) == hash(context2)
226 |
227 | # Test instances with different hashable values
228 | context3 = MainAppContext(read_only=False)
229 | context4 = MainAppContext(read_only=True)
230 | contexts_dict = {context3: "value3", context4: "value4"}
231 | assert len(contexts_dict) == 2
232 |
233 | # Test that instances with unhashable fields raise TypeError
234 | jira_config = JiraConfig(
235 | url="https://test.atlassian.net",
236 | auth_type="basic",
237 | username="test",
238 | api_token="token",
239 | )
240 | context_with_config = MainAppContext(full_jira_config=jira_config)
241 | with pytest.raises(TypeError, match="unhashable type"):
242 | hash(context_with_config)
243 |
244 | # Test that instances with list fields raise TypeError
245 | context_with_list = MainAppContext(enabled_tools=["tool1", "tool2"])
246 | with pytest.raises(TypeError, match="unhashable type"):
247 | hash(context_with_list)
248 |
249 | def test_field_access_edge_cases(self):
250 | """Test edge cases for field access."""
251 | # Test accessing fields on empty context
252 | context = MainAppContext()
253 |
254 | # All fields should be accessible
255 | assert hasattr(context, "full_jira_config")
256 | assert hasattr(context, "full_confluence_config")
257 | assert hasattr(context, "read_only")
258 | assert hasattr(context, "enabled_tools")
259 |
260 | # Test that we can't access non-existent fields
261 | assert not hasattr(context, "non_existent_field")
262 |
263 | def test_with_both_configs_different_auth_types(self):
264 | """Test MainAppContext with both Jira and Confluence configs using different auth types."""
265 | # Arrange
266 | jira_config = JiraConfig(
267 | url="https://company.atlassian.net",
268 | auth_type="basic",
269 | username="[email protected]",
270 | api_token="jira_token",
271 | )
272 | confluence_config = ConfluenceConfig(
273 | url="https://company.atlassian.net/wiki",
274 | auth_type="oauth",
275 | oauth_config=None, # Simplified for test
276 | )
277 |
278 | # Act
279 | context = MainAppContext(
280 | full_jira_config=jira_config,
281 | full_confluence_config=confluence_config,
282 | read_only=True,
283 | enabled_tools=[
284 | "jira_get_issue",
285 | "confluence_get_page",
286 | "jira_create_issue",
287 | ],
288 | )
289 |
290 | # Assert
291 | assert context.full_jira_config.auth_type == "basic"
292 | assert context.full_confluence_config.auth_type == "oauth"
293 | assert context.read_only is True
294 | assert len(context.enabled_tools) == 3
295 | assert "jira_get_issue" in context.enabled_tools
296 | assert "confluence_get_page" in context.enabled_tools
297 | assert "jira_create_issue" in context.enabled_tools
298 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_issues_markdown.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for markdown conversion in Jira issue operations."""
2 |
3 | from unittest.mock import MagicMock, Mock
4 |
5 | import pytest
6 |
7 | from mcp_atlassian.jira import JiraFetcher
8 | from mcp_atlassian.jira.issues import IssuesMixin
9 |
10 |
11 | class TestIssuesMarkdownConversion:
12 | """Tests for markdown to Jira conversion in issue operations."""
13 |
14 | @pytest.fixture
15 | def issues_mixin(self, jira_fetcher: JiraFetcher) -> IssuesMixin:
16 | """Create an IssuesMixin instance with mocked dependencies."""
17 | mixin = jira_fetcher
18 |
19 | # Mock the markdown conversion method
20 | mixin._markdown_to_jira = Mock(side_effect=lambda x: f"[CONVERTED] {x}")
21 |
22 | # Add other mock methods
23 | mixin._get_account_id = MagicMock(return_value="test-account-id")
24 |
25 | return mixin
26 |
27 | def test_create_issue_converts_markdown_description(
28 | self, issues_mixin: IssuesMixin
29 | ):
30 | """Test that create_issue converts markdown description to Jira format."""
31 | # Mock create_issue response
32 | create_response = {"key": "TEST-123"}
33 | issues_mixin.jira.create_issue.return_value = create_response
34 |
35 | # Mock get_issue response
36 | issue_data = {
37 | "id": "12345",
38 | "key": "TEST-123",
39 | "fields": {
40 | "summary": "Test Issue",
41 | "description": "[CONVERTED] # Markdown Description",
42 | "status": {"name": "Open"},
43 | "issuetype": {"name": "Bug"},
44 | },
45 | }
46 | issues_mixin.jira.get_issue.return_value = issue_data
47 |
48 | # Create issue with markdown description
49 | markdown_description = "# Markdown Description\n\nThis is **bold** text."
50 | issue = issues_mixin.create_issue(
51 | project_key="TEST",
52 | summary="Test Issue",
53 | issue_type="Bug",
54 | description=markdown_description,
55 | )
56 |
57 | # Verify markdown conversion was called
58 | issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
59 |
60 | # Verify the converted description was passed to API
61 | expected_fields = {
62 | "project": {"key": "TEST"},
63 | "summary": "Test Issue",
64 | "issuetype": {"name": "Bug"},
65 | "description": f"[CONVERTED] {markdown_description}",
66 | }
67 | issues_mixin.jira.create_issue.assert_called_once_with(fields=expected_fields)
68 |
69 | # Verify result
70 | assert issue.key == "TEST-123"
71 |
72 | def test_create_issue_with_empty_description(self, issues_mixin: IssuesMixin):
73 | """Test that create_issue handles empty description correctly."""
74 | # Mock create_issue response
75 | create_response = {"key": "TEST-123"}
76 | issues_mixin.jira.create_issue.return_value = create_response
77 |
78 | # Mock get_issue response
79 | issue_data = {
80 | "id": "12345",
81 | "key": "TEST-123",
82 | "fields": {
83 | "summary": "Test Issue",
84 | "status": {"name": "Open"},
85 | "issuetype": {"name": "Bug"},
86 | },
87 | }
88 | issues_mixin.jira.get_issue.return_value = issue_data
89 |
90 | # Create issue without description
91 | issue = issues_mixin.create_issue(
92 | project_key="TEST",
93 | summary="Test Issue",
94 | issue_type="Bug",
95 | description="",
96 | )
97 |
98 | # Verify markdown conversion was not called (empty string)
99 | issues_mixin._markdown_to_jira.assert_not_called()
100 |
101 | # Verify no description field was added
102 | call_args = issues_mixin.jira.create_issue.call_args[1]
103 | assert "description" not in call_args["fields"]
104 |
105 | # Verify result
106 | assert issue.key == "TEST-123"
107 |
108 | def test_update_issue_converts_markdown_in_fields(self, issues_mixin: IssuesMixin):
109 | """Test that update_issue converts markdown description when passed in fields dict."""
110 | # Mock the issue data for get_issue
111 | issue_data = {
112 | "id": "12345",
113 | "key": "TEST-123",
114 | "fields": {
115 | "summary": "Updated Issue",
116 | "description": "[CONVERTED] # Updated Description",
117 | "status": {"name": "In Progress"},
118 | "issuetype": {"name": "Bug"},
119 | },
120 | }
121 | issues_mixin.jira.get_issue.return_value = issue_data
122 |
123 | # Update issue with markdown description in fields
124 | markdown_description = "# Updated Description\n\nThis is *italic* text."
125 | issue = issues_mixin.update_issue(
126 | issue_key="TEST-123",
127 | fields={"description": markdown_description, "summary": "Updated Issue"},
128 | )
129 |
130 | # Verify markdown conversion was called
131 | issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
132 |
133 | # Verify the converted description was passed to API
134 | issues_mixin.jira.update_issue.assert_called_once_with(
135 | issue_key="TEST-123",
136 | update={
137 | "fields": {
138 | "description": f"[CONVERTED] {markdown_description}",
139 | "summary": "Updated Issue",
140 | }
141 | },
142 | )
143 |
144 | # Verify result
145 | assert issue.key == "TEST-123"
146 |
147 | def test_update_issue_converts_markdown_in_kwargs(self, issues_mixin: IssuesMixin):
148 | """Test that update_issue converts markdown description when passed as kwarg."""
149 | # Mock the issue data for get_issue
150 | issue_data = {
151 | "id": "12345",
152 | "key": "TEST-123",
153 | "fields": {
154 | "summary": "Test Issue",
155 | "description": "[CONVERTED] ## Updated via kwargs",
156 | "status": {"name": "In Progress"},
157 | "issuetype": {"name": "Bug"},
158 | },
159 | }
160 | issues_mixin.jira.get_issue.return_value = issue_data
161 |
162 | # Update issue with markdown description as kwarg
163 | markdown_description = (
164 | "## Updated via kwargs\n\nWith a [link](http://example.com)"
165 | )
166 | issue = issues_mixin.update_issue(
167 | issue_key="TEST-123", description=markdown_description
168 | )
169 |
170 | # Verify markdown conversion was called
171 | issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
172 |
173 | # Verify the converted description was passed to API
174 | issues_mixin.jira.update_issue.assert_called_once_with(
175 | issue_key="TEST-123",
176 | update={"fields": {"description": f"[CONVERTED] {markdown_description}"}},
177 | )
178 |
179 | # Verify result
180 | assert issue.key == "TEST-123"
181 |
182 | def test_update_issue_with_multiple_fields_including_description(
183 | self, issues_mixin: IssuesMixin
184 | ):
185 | """Test update_issue with multiple fields including description."""
186 | # Mock the issue data for get_issue
187 | issue_data = {
188 | "id": "12345",
189 | "key": "TEST-123",
190 | "fields": {
191 | "summary": "Updated Summary",
192 | "description": "[CONVERTED] Updated description",
193 | "status": {"name": "In Progress"},
194 | "issuetype": {"name": "Bug"},
195 | "priority": {"name": "High"},
196 | },
197 | }
198 | issues_mixin.jira.get_issue.return_value = issue_data
199 |
200 | # Update issue with multiple fields
201 | markdown_description = "Updated description with **emphasis**"
202 | issue = issues_mixin.update_issue(
203 | issue_key="TEST-123",
204 | fields={
205 | "summary": "Updated Summary",
206 | "priority": {"name": "High"},
207 | },
208 | description=markdown_description, # As kwarg
209 | )
210 |
211 | # Verify markdown conversion was called
212 | issues_mixin._markdown_to_jira.assert_called_once_with(markdown_description)
213 |
214 | # Verify all fields were updated correctly
215 | expected_fields = {
216 | "summary": "Updated Summary",
217 | "priority": {"name": "High"},
218 | "description": f"[CONVERTED] {markdown_description}",
219 | }
220 | issues_mixin.jira.update_issue.assert_called_once_with(
221 | issue_key="TEST-123", update={"fields": expected_fields}
222 | )
223 |
224 | # Verify result
225 | assert issue.key == "TEST-123"
226 |
227 | def test_markdown_conversion_preserves_none_values(self, issues_mixin: IssuesMixin):
228 | """Test that None descriptions are not converted."""
229 | # Reset the mock to check for actual None handling
230 | issues_mixin._markdown_to_jira = Mock(
231 | side_effect=lambda x: f"[CONVERTED] {x}" if x else ""
232 | )
233 |
234 | # Mock create response
235 | create_response = {"key": "TEST-123"}
236 | issues_mixin.jira.create_issue.return_value = create_response
237 |
238 | # Mock get_issue response
239 | issues_mixin.jira.get_issue.return_value = {
240 | "id": "12345",
241 | "key": "TEST-123",
242 | "fields": {"summary": "Test", "issuetype": {"name": "Task"}},
243 | }
244 |
245 | # Create issue with None description (shouldn't add description field)
246 | issues_mixin.create_issue(
247 | project_key="TEST",
248 | summary="Test Issue",
249 | issue_type="Task",
250 | # description not provided (defaults to "")
251 | )
252 |
253 | # Verify markdown conversion was not called
254 | issues_mixin._markdown_to_jira.assert_not_called()
255 |
256 | # Verify no description field was added
257 | call_args = issues_mixin.jira.create_issue.call_args[1]
258 | assert "description" not in call_args["fields"]
259 |
260 | def test_create_issue_with_markdown_in_additional_fields(
261 | self, issues_mixin: IssuesMixin
262 | ):
263 | """Test that descriptions in additional_fields are NOT converted (edge case)."""
264 | # Mock field map for additional fields processing
265 | issues_mixin._generate_field_map = Mock(
266 | return_value={"mydescription": "customfield_10001"}
267 | )
268 | issues_mixin.get_field_by_id = Mock(
269 | return_value={"name": "MyDescription", "schema": {"type": "string"}}
270 | )
271 |
272 | # Mock create response
273 | create_response = {"key": "TEST-123"}
274 | issues_mixin.jira.create_issue.return_value = create_response
275 | issues_mixin.jira.get_issue.return_value = {
276 | "id": "12345",
277 | "key": "TEST-123",
278 | "fields": {"summary": "Test", "issuetype": {"name": "Task"}},
279 | }
280 |
281 | # Create issue with a custom field that happens to be named 'description'
282 | # This should NOT be converted as it's a different field
283 | issues_mixin.create_issue(
284 | project_key="TEST",
285 | summary="Test Issue",
286 | issue_type="Task",
287 | description="# Main Description", # This SHOULD be converted
288 | mydescription="# Custom Field Description", # This should NOT be converted
289 | )
290 |
291 | # Verify the main description was converted
292 | calls = issues_mixin._markdown_to_jira.call_args_list
293 | assert len(calls) == 1
294 | assert calls[0][0][0] == "# Main Description"
295 |
296 | # Verify fields
297 | create_call = issues_mixin.jira.create_issue.call_args[1]["fields"]
298 | assert create_call["description"] == "[CONVERTED] # Main Description"
299 | # Custom field should not be converted
300 | assert "customfield_10001" in create_call
301 | assert create_call["customfield_10001"] == "# Custom Field Description"
302 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/client.py:
--------------------------------------------------------------------------------
```python
1 | """Base client module for Jira API interactions."""
2 |
3 | import logging
4 | import os
5 | from typing import Any, Literal
6 |
7 | from atlassian import Jira
8 | from requests import Session
9 |
10 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
11 | from mcp_atlassian.preprocessing import JiraPreprocessor
12 | from mcp_atlassian.utils.logging import (
13 | get_masked_session_headers,
14 | log_config_param,
15 | mask_sensitive,
16 | )
17 | from mcp_atlassian.utils.oauth import configure_oauth_session
18 | from mcp_atlassian.utils.ssl import configure_ssl_verification
19 |
20 | from .config import JiraConfig
21 |
22 | # Configure logging
23 | logger = logging.getLogger("mcp-jira")
24 |
25 |
26 | class JiraClient:
27 | """Base client for Jira API interactions."""
28 |
29 | _field_ids_cache: list[dict[str, Any]] | None
30 | _current_user_account_id: str | None
31 |
32 | config: JiraConfig
33 | preprocessor: JiraPreprocessor
34 |
35 | def __init__(self, config: JiraConfig | None = None) -> None:
36 | """Initialize the Jira client with configuration options.
37 |
38 | Args:
39 | config: Optional configuration object (will use env vars if not provided)
40 |
41 | Raises:
42 | ValueError: If configuration is invalid or required credentials are missing
43 | MCPAtlassianAuthenticationError: If OAuth authentication fails
44 | """
45 | # Load configuration from environment variables if not provided
46 | self.config = config or JiraConfig.from_env()
47 |
48 | # Initialize the Jira client based on auth type
49 | if self.config.auth_type == "oauth":
50 | if not self.config.oauth_config or not self.config.oauth_config.cloud_id:
51 | error_msg = "OAuth authentication requires a valid cloud_id"
52 | raise ValueError(error_msg)
53 |
54 | # Create a session for OAuth
55 | session = Session()
56 |
57 | # Configure the session with OAuth authentication
58 | if not configure_oauth_session(session, self.config.oauth_config):
59 | error_msg = "Failed to configure OAuth session"
60 | raise MCPAtlassianAuthenticationError(error_msg)
61 |
62 | # The Jira API URL with OAuth is different
63 | api_url = (
64 | f"https://api.atlassian.com/ex/jira/{self.config.oauth_config.cloud_id}"
65 | )
66 |
67 | # Initialize Jira with the session
68 | self.jira = Jira(
69 | url=api_url,
70 | session=session,
71 | cloud=True, # OAuth is only for Cloud
72 | verify_ssl=self.config.ssl_verify,
73 | )
74 | elif self.config.auth_type == "pat":
75 | logger.debug(
76 | f"Initializing Jira client with Token (PAT) auth. "
77 | f"URL: {self.config.url}, "
78 | f"Token (masked): {mask_sensitive(str(self.config.personal_token))}"
79 | )
80 | self.jira = Jira(
81 | url=self.config.url,
82 | token=self.config.personal_token,
83 | cloud=self.config.is_cloud,
84 | verify_ssl=self.config.ssl_verify,
85 | )
86 | else: # basic auth
87 | logger.debug(
88 | f"Initializing Jira client with Basic auth. "
89 | f"URL: {self.config.url}, Username: {self.config.username}, "
90 | f"API Token present: {bool(self.config.api_token)}, "
91 | f"Is Cloud: {self.config.is_cloud}"
92 | )
93 | self.jira = Jira(
94 | url=self.config.url,
95 | username=self.config.username,
96 | password=self.config.api_token,
97 | cloud=self.config.is_cloud,
98 | verify_ssl=self.config.ssl_verify,
99 | )
100 | logger.debug(
101 | f"Jira client initialized. Session headers (Authorization masked): "
102 | f"{get_masked_session_headers(dict(self.jira._session.headers))}"
103 | )
104 |
105 | # Configure SSL verification using the shared utility
106 | configure_ssl_verification(
107 | service_name="Jira",
108 | url=self.config.url,
109 | session=self.jira._session,
110 | ssl_verify=self.config.ssl_verify,
111 | )
112 |
113 | # Proxy configuration
114 | proxies = {}
115 | if self.config.http_proxy:
116 | proxies["http"] = self.config.http_proxy
117 | if self.config.https_proxy:
118 | proxies["https"] = self.config.https_proxy
119 | if self.config.socks_proxy:
120 | proxies["socks"] = self.config.socks_proxy
121 | if proxies:
122 | self.jira._session.proxies.update(proxies)
123 | for k, v in proxies.items():
124 | log_config_param(
125 | logger, "Jira", f"{k.upper()}_PROXY", v, sensitive=True
126 | )
127 | if self.config.no_proxy and isinstance(self.config.no_proxy, str):
128 | os.environ["NO_PROXY"] = self.config.no_proxy
129 | log_config_param(logger, "Jira", "NO_PROXY", self.config.no_proxy)
130 |
131 | # Apply custom headers if configured
132 | if self.config.custom_headers:
133 | self._apply_custom_headers()
134 |
135 | # Initialize the text preprocessor for text processing capabilities
136 | self.preprocessor = JiraPreprocessor(base_url=self.config.url)
137 | self._field_ids_cache = None
138 | self._current_user_account_id = None
139 |
140 | # Test authentication during initialization (in debug mode only)
141 | if logger.isEnabledFor(logging.DEBUG):
142 | try:
143 | self._validate_authentication()
144 | except MCPAtlassianAuthenticationError:
145 | logger.warning(
146 | "Authentication validation failed during client initialization - "
147 | "continuing anyway"
148 | )
149 |
150 | def _validate_authentication(self) -> None:
151 | """Validate authentication by making a simple API call."""
152 | try:
153 | logger.debug(
154 | "Testing Jira authentication by retrieving current user info..."
155 | )
156 | current_user = self.jira.myself()
157 | if current_user:
158 | logger.info(
159 | f"Jira authentication successful. "
160 | f"Current user: {current_user.get('displayName', 'Unknown')} "
161 | f"({current_user.get('emailAddress', 'No email')})"
162 | )
163 | else:
164 | logger.warning(
165 | "Jira authentication test returned empty user info - "
166 | "this may indicate an issue"
167 | )
168 | except Exception as e:
169 | error_msg = f"Jira authentication validation failed: {e}"
170 | logger.error(error_msg)
171 | logger.debug(
172 | f"Authentication headers during failure: "
173 | f"{get_masked_session_headers(dict(self.jira._session.headers))}"
174 | )
175 | raise MCPAtlassianAuthenticationError(error_msg) from e
176 |
177 | def _apply_custom_headers(self) -> None:
178 | """Apply custom headers to the Jira session."""
179 | if not self.config.custom_headers:
180 | return
181 |
182 | logger.debug(
183 | f"Applying {len(self.config.custom_headers)} custom headers to Jira session"
184 | )
185 | for header_name, header_value in self.config.custom_headers.items():
186 | self.jira._session.headers[header_name] = header_value
187 | logger.debug(f"Applied custom header: {header_name}")
188 |
189 | def _clean_text(self, text: str) -> str:
190 | """Clean text content by:
191 | 1. Processing user mentions and links
192 | 2. Converting HTML/wiki markup to markdown
193 |
194 | Args:
195 | text: Text to clean
196 |
197 | Returns:
198 | Cleaned text
199 | """
200 | if not text:
201 | return ""
202 |
203 | # Otherwise create a temporary one
204 | _ = self.config.url if hasattr(self, "config") else ""
205 | return self.preprocessor.clean_jira_text(text)
206 |
207 | def _markdown_to_jira(self, markdown_text: str) -> str:
208 | """
209 | Convert Markdown syntax to Jira markup syntax.
210 |
211 | Args:
212 | markdown_text: Text in Markdown format
213 |
214 | Returns:
215 | Text in Jira markup format
216 | """
217 | if not markdown_text:
218 | return ""
219 |
220 | # Use the shared preprocessor if available
221 | if hasattr(self, "preprocessor"):
222 | return self.preprocessor.markdown_to_jira(markdown_text)
223 |
224 | # Otherwise create a temporary one
225 | _ = self.config.url if hasattr(self, "config") else ""
226 | return self.preprocessor.markdown_to_jira(markdown_text)
227 |
228 | def get_paged(
229 | self,
230 | method: Literal["get", "post"],
231 | url: str,
232 | params_or_json: dict | None = None,
233 | *,
234 | absolute: bool = False,
235 | ) -> list[dict]:
236 | """
237 | Repeatly fetch paged data from Jira API using `nextPageToken` to paginate.
238 |
239 | Args:
240 | method: The HTTP method to use
241 | url: The URL to retrieve data from
242 | params_or_json: Optional query parameters or JSON data to send
243 | absolute: Whether to use absolute URL
244 |
245 | Returns:
246 | List of requested json data
247 |
248 | Raises:
249 | ValueError: If using paged request on non-cloud Jira
250 | """
251 |
252 | if not self.config.is_cloud:
253 | raise ValueError(
254 | "Paged requests are only available for Jira Cloud platform"
255 | )
256 |
257 | all_results: list[dict] = []
258 | current_data = params_or_json or {}
259 |
260 | while True:
261 | if method == "get":
262 | api_result = self.jira.get(
263 | path=url, params=current_data, absolute=absolute
264 | )
265 | else:
266 | api_result = self.jira.post(
267 | path=url, json=current_data, absolute=absolute
268 | )
269 |
270 | if not isinstance(api_result, dict):
271 | error_message = f"API result is not a dictionary: {api_result}"
272 | logger.error(error_message)
273 | raise ValueError(error_message)
274 |
275 | # Extract values from response
276 | all_results.append(api_result)
277 |
278 | # Check if this is the last page
279 | if "nextPageToken" not in api_result:
280 | break
281 |
282 | # Update for next iteration
283 | current_data["nextPageToken"] = api_result["nextPageToken"]
284 |
285 | return all_results
286 |
287 | def create_version(
288 | self,
289 | project: str,
290 | name: str,
291 | start_date: str = None,
292 | release_date: str = None,
293 | description: str = None,
294 | ) -> dict[str, Any]:
295 | """
296 | Create a new version in a Jira project.
297 |
298 | Args:
299 | project: The project key (e.g., 'PROJ')
300 | name: The name of the version
301 | start_date: The start date (YYYY-MM-DD, optional)
302 | release_date: The release date (YYYY-MM-DD, optional)
303 | description: Description of the version (optional)
304 |
305 | Returns:
306 | The created version object as returned by Jira
307 | """
308 | payload = {"project": project, "name": name}
309 | if start_date:
310 | payload["startDate"] = start_date
311 | if release_date:
312 | payload["releaseDate"] = release_date
313 | if description:
314 | payload["description"] = description
315 | logger.info(f"Creating Jira version: {payload}")
316 | result = self.jira.post("/rest/api/3/version", json=payload)
317 | if not isinstance(result, dict):
318 | error_message = f"Unexpected response from Jira API: {result}"
319 | raise ValueError(error_message)
320 | return result
321 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/search.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira search operations."""
2 |
3 | import logging
4 |
5 | import requests
6 | from requests.exceptions import HTTPError
7 |
8 | from ..exceptions import MCPAtlassianAuthenticationError
9 | from ..models.jira import JiraSearchResult
10 | from .client import JiraClient
11 | from .constants import DEFAULT_READ_JIRA_FIELDS
12 | from .protocols import IssueOperationsProto
13 |
14 | logger = logging.getLogger("mcp-jira")
15 |
16 |
17 | class SearchMixin(JiraClient, IssueOperationsProto):
18 | """Mixin for Jira search operations."""
19 |
20 | def search_issues(
21 | self,
22 | jql: str,
23 | fields: list[str] | tuple[str, ...] | set[str] | str | None = None,
24 | start: int = 0,
25 | limit: int = 50,
26 | expand: str | None = None,
27 | projects_filter: str | None = None,
28 | ) -> JiraSearchResult:
29 | """
30 | Search for issues using JQL (Jira Query Language).
31 |
32 | Args:
33 | jql: JQL query string
34 | fields: Fields to return (comma-separated string, list, tuple, set, or "*all")
35 | start: Starting index if number of issues is greater than the limit
36 | Note: This parameter is ignored in Cloud environments and results will always
37 | start from the first page.
38 | limit: Maximum issues to return
39 | expand: Optional items to expand (comma-separated)
40 | projects_filter: Optional comma-separated list of project keys to filter by, overrides config
41 |
42 | Returns:
43 | JiraSearchResult object containing issues and metadata (total, start_at, max_results)
44 |
45 | Raises:
46 | MCPAtlassianAuthenticationError: If authentication fails with the Jira API (401/403)
47 | Exception: If there is an error searching for issues
48 | """
49 | try:
50 | # Use projects_filter parameter if provided, otherwise fall back to config
51 | filter_to_use = projects_filter or self.config.projects_filter
52 |
53 | # Apply projects filter if present
54 | if filter_to_use:
55 | # Split projects filter by commas and handle possible whitespace
56 | projects = [p.strip() for p in filter_to_use.split(",")]
57 |
58 | # Build the project filter query part
59 | if len(projects) == 1:
60 | project_query = f'project = "{projects[0]}"'
61 | else:
62 | quoted_projects = [f'"{p}"' for p in projects]
63 | projects_list = ", ".join(quoted_projects)
64 | project_query = f"project IN ({projects_list})"
65 |
66 | # Add the project filter to existing query
67 | if not jql:
68 | # Empty JQL - just use project filter
69 | jql = project_query
70 | elif jql.strip().upper().startswith("ORDER BY"):
71 | # JQL starts with ORDER BY - prepend project filter
72 | jql = f"{project_query} {jql}"
73 | elif "project = " not in jql and "project IN" not in jql:
74 | # Only add if not already filtering by project
75 | jql = f"({jql}) AND {project_query}"
76 |
77 | logger.info(f"Applied projects filter to query: {jql}")
78 |
79 | # Convert fields to proper format if it's a list/tuple/set
80 | fields_param: str | None
81 | if fields is None: # Use default if None
82 | fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
83 | elif isinstance(fields, list | tuple | set):
84 | fields_param = ",".join(fields)
85 | else:
86 | fields_param = fields
87 |
88 | if self.config.is_cloud:
89 | actual_total = -1
90 | try:
91 | # Call 1: Get metadata (including total) using standard search API
92 | metadata_params = {"jql": jql, "maxResults": 0}
93 | metadata_response = self.jira.get(
94 | self.jira.resource_url("search"), params=metadata_params
95 | )
96 |
97 | if (
98 | isinstance(metadata_response, dict)
99 | and "total" in metadata_response
100 | ):
101 | try:
102 | actual_total = int(metadata_response["total"])
103 | except (ValueError, TypeError):
104 | logger.warning(
105 | f"Could not parse 'total' from metadata response for JQL: {jql}. Received: {metadata_response.get('total')}"
106 | )
107 | else:
108 | logger.warning(
109 | f"Could not retrieve total count from metadata response for JQL: {jql}. Response type: {type(metadata_response)}"
110 | )
111 | except Exception as meta_err:
112 | logger.error(
113 | f"Error fetching metadata for JQL '{jql}': {str(meta_err)}"
114 | )
115 |
116 | # Call 2: Get the actual issues using the enhanced method
117 | issues_response_list = self.jira.enhanced_jql_get_list_of_tickets(
118 | jql, fields=fields_param, limit=limit, expand=expand
119 | )
120 |
121 | if not isinstance(issues_response_list, list):
122 | msg = f"Unexpected return value type from `jira.enhanced_jql_get_list_of_tickets`: {type(issues_response_list)}"
123 | logger.error(msg)
124 | raise TypeError(msg)
125 |
126 | response_dict_for_model = {
127 | "issues": issues_response_list,
128 | "total": actual_total,
129 | }
130 |
131 | search_result = JiraSearchResult.from_api_response(
132 | response_dict_for_model,
133 | base_url=self.config.url,
134 | requested_fields=fields_param,
135 | )
136 |
137 | # Return the full search result object
138 | return search_result
139 | else:
140 | limit = min(limit, 50)
141 | response = self.jira.jql(
142 | jql, fields=fields_param, start=start, limit=limit, expand=expand
143 | )
144 | if not isinstance(response, dict):
145 | msg = f"Unexpected return value type from `jira.jql`: {type(response)}"
146 | logger.error(msg)
147 | raise TypeError(msg)
148 |
149 | # Convert the response to a search result model
150 | search_result = JiraSearchResult.from_api_response(
151 | response, base_url=self.config.url, requested_fields=fields_param
152 | )
153 |
154 | # Return the full search result object
155 | return search_result
156 |
157 | except HTTPError as http_err:
158 | if http_err.response is not None and http_err.response.status_code in [
159 | 401,
160 | 403,
161 | ]:
162 | error_msg = (
163 | f"Authentication failed for Jira API ({http_err.response.status_code}). "
164 | "Token may be expired or invalid. Please verify credentials."
165 | )
166 | logger.error(error_msg)
167 | raise MCPAtlassianAuthenticationError(error_msg) from http_err
168 | else:
169 | logger.error(f"HTTP error during API call: {http_err}", exc_info=False)
170 | raise http_err
171 | except Exception as e:
172 | logger.error(f"Error searching issues with JQL '{jql}': {str(e)}")
173 | raise Exception(f"Error searching issues: {str(e)}") from e
174 |
175 | def get_board_issues(
176 | self,
177 | board_id: str,
178 | jql: str,
179 | fields: str | None = None,
180 | start: int = 0,
181 | limit: int = 50,
182 | expand: str | None = None,
183 | ) -> JiraSearchResult:
184 | """
185 | Get all issues linked to a specific board.
186 |
187 | Args:
188 | board_id: The ID of the board
189 | jql: JQL query string
190 | fields: Fields to return (comma-separated string or "*all")
191 | start: Starting index
192 | limit: Maximum issues to return
193 | expand: Optional items to expand (comma-separated)
194 |
195 | Returns:
196 | JiraSearchResult object containing board issues and metadata
197 |
198 | Raises:
199 | Exception: If there is an error getting board issues
200 | """
201 | try:
202 | # Determine fields_param
203 | fields_param = fields
204 | if fields_param is None:
205 | fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
206 |
207 | response = self.jira.get_issues_for_board(
208 | board_id=board_id,
209 | jql=jql,
210 | fields=fields_param,
211 | start=start,
212 | limit=limit,
213 | expand=expand,
214 | )
215 | if not isinstance(response, dict):
216 | msg = f"Unexpected return value type from `jira.get_issues_for_board`: {type(response)}"
217 | logger.error(msg)
218 | raise TypeError(msg)
219 |
220 | # Convert the response to a search result model
221 | search_result = JiraSearchResult.from_api_response(
222 | response, base_url=self.config.url, requested_fields=fields_param
223 | )
224 | return search_result
225 | except requests.HTTPError as e:
226 | logger.error(
227 | f"Error searching issues for board with JQL '{board_id}': {str(e.response.content)}"
228 | )
229 | raise Exception(
230 | f"Error searching issues for board with JQL: {str(e.response.content)}"
231 | ) from e
232 | except Exception as e:
233 | logger.error(f"Error searching issues for board with JQL '{jql}': {str(e)}")
234 | raise Exception(
235 | f"Error searching issues for board with JQL {str(e)}"
236 | ) from e
237 |
238 | def get_sprint_issues(
239 | self,
240 | sprint_id: str,
241 | fields: str | None = None,
242 | start: int = 0,
243 | limit: int = 50,
244 | ) -> JiraSearchResult:
245 | """
246 | Get all issues linked to a specific sprint.
247 |
248 | Args:
249 | sprint_id: The ID of the sprint
250 | fields: Fields to return (comma-separated string or "*all")
251 | start: Starting index
252 | limit: Maximum issues to return
253 |
254 | Returns:
255 | JiraSearchResult object containing sprint issues and metadata
256 |
257 | Raises:
258 | Exception: If there is an error getting board issues
259 | """
260 | try:
261 | # Determine fields_param
262 | fields_param = fields
263 | if fields_param is None:
264 | fields_param = ",".join(DEFAULT_READ_JIRA_FIELDS)
265 |
266 | response = self.jira.get_sprint_issues(
267 | sprint_id=sprint_id,
268 | start=start,
269 | limit=limit,
270 | )
271 | if not isinstance(response, dict):
272 | msg = f"Unexpected return value type from `jira.get_sprint_issues`: {type(response)}"
273 | logger.error(msg)
274 | raise TypeError(msg)
275 |
276 | # Convert the response to a search result model
277 | search_result = JiraSearchResult.from_api_response(
278 | response, base_url=self.config.url, requested_fields=fields_param
279 | )
280 | return search_result
281 | except requests.HTTPError as e:
282 | logger.error(
283 | f"Error searching issues for sprint '{sprint_id}': {str(e.response.content)}"
284 | )
285 | raise Exception(
286 | f"Error searching issues for sprint: {str(e.response.content)}"
287 | ) from e
288 | except Exception as e:
289 | logger.error(f"Error searching issues for sprint: {sprint_id}': {str(e)}")
290 | raise Exception(f"Error searching issues for sprint: {str(e)}") from e
291 |
```
--------------------------------------------------------------------------------
/tests/integration/test_proxy.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Integration tests for proxy handling in Jira and Confluence clients (mocked requests).
3 | """
4 |
5 | import os
6 | from unittest.mock import MagicMock, patch
7 |
8 | import pytest
9 | from requests.exceptions import ProxyError
10 |
11 | from mcp_atlassian.confluence.client import ConfluenceClient
12 | from mcp_atlassian.confluence.config import ConfluenceConfig
13 | from mcp_atlassian.jira.client import JiraClient
14 | from mcp_atlassian.jira.config import JiraConfig
15 | from tests.utils.base import BaseAuthTest
16 | from tests.utils.mocks import MockEnvironment
17 |
18 |
19 | @pytest.mark.integration
20 | def test_jira_client_passes_proxies_to_requests(monkeypatch):
21 | """Test that JiraClient passes proxies to requests.Session.request."""
22 | mock_jira = MagicMock()
23 | mock_session = MagicMock()
24 | # Create a proper proxies dictionary that can be updated
25 | mock_session.proxies = {}
26 | mock_jira._session = mock_session
27 | monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
28 | monkeypatch.setattr(
29 | "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
30 | )
31 | config = JiraConfig(
32 | url="https://test.atlassian.net",
33 | auth_type="basic",
34 | username="user",
35 | api_token="pat",
36 | http_proxy="http://proxy:8080",
37 | https_proxy="https://proxy:8443",
38 | socks_proxy="socks5://user:pass@proxy:1080",
39 | no_proxy="localhost,127.0.0.1",
40 | )
41 | client = JiraClient(config=config)
42 | # Simulate a request
43 | client.jira._session.request(
44 | "GET", "https://test.atlassian.net/rest/api/2/issue/TEST-1"
45 | )
46 | assert mock_session.proxies["http"] == "http://proxy:8080"
47 | assert mock_session.proxies["https"] == "https://proxy:8443"
48 | assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
49 |
50 |
51 | @pytest.mark.integration
52 | def test_confluence_client_passes_proxies_to_requests(monkeypatch):
53 | """Test that ConfluenceClient passes proxies to requests.Session.request."""
54 | mock_confluence = MagicMock()
55 | mock_session = MagicMock()
56 | # Create a proper proxies dictionary that can be updated
57 | mock_session.proxies = {}
58 | mock_confluence._session = mock_session
59 | monkeypatch.setattr(
60 | "mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
61 | )
62 | monkeypatch.setattr(
63 | "mcp_atlassian.confluence.client.configure_ssl_verification",
64 | lambda **kwargs: None,
65 | )
66 | monkeypatch.setattr(
67 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
68 | lambda **kwargs: MagicMock(),
69 | )
70 | config = ConfluenceConfig(
71 | url="https://test.atlassian.net/wiki",
72 | auth_type="basic",
73 | username="user",
74 | api_token="pat",
75 | http_proxy="http://proxy:8080",
76 | https_proxy="https://proxy:8443",
77 | socks_proxy="socks5://user:pass@proxy:1080",
78 | no_proxy="localhost,127.0.0.1",
79 | )
80 | client = ConfluenceClient(config=config)
81 | # Simulate a request
82 | client.confluence._session.request(
83 | "GET", "https://test.atlassian.net/wiki/rest/api/content/123"
84 | )
85 | assert mock_session.proxies["http"] == "http://proxy:8080"
86 | assert mock_session.proxies["https"] == "https://proxy:8443"
87 | assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
88 |
89 |
90 | @pytest.mark.integration
91 | def test_jira_client_no_proxy_env(monkeypatch):
92 | """Test that JiraClient sets NO_PROXY env var and requests to excluded hosts bypass proxy."""
93 | mock_jira = MagicMock()
94 | mock_session = MagicMock()
95 | mock_jira._session = mock_session
96 | monkeypatch.setattr("mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira)
97 | monkeypatch.setattr(
98 | "mcp_atlassian.jira.client.configure_ssl_verification", lambda **kwargs: None
99 | )
100 | monkeypatch.setenv("NO_PROXY", "")
101 | config = JiraConfig(
102 | url="https://test.atlassian.net",
103 | auth_type="basic",
104 | username="user",
105 | api_token="pat",
106 | http_proxy="http://proxy:8080",
107 | no_proxy="localhost,127.0.0.1",
108 | )
109 | client = JiraClient(config=config)
110 | assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"
111 |
112 |
113 | class TestProxyConfigurationEnhanced(BaseAuthTest):
114 | """Enhanced proxy configuration tests using test utilities."""
115 |
116 | @pytest.mark.integration
117 | def test_proxy_configuration_from_environment(self):
118 | """Test proxy configuration loaded from environment variables."""
119 | with MockEnvironment.basic_auth_env() as env_vars:
120 | # Set proxy environment variables in os.environ directly
121 | proxy_vars = {
122 | "HTTP_PROXY": "http://proxy.company.com:8080",
123 | "HTTPS_PROXY": "https://proxy.company.com:8443",
124 | "NO_PROXY": "*.internal.com,localhost",
125 | }
126 |
127 | # Patch environment with proxy settings
128 | with patch.dict(os.environ, proxy_vars):
129 | # Jira should pick up proxy settings
130 | jira_config = JiraConfig.from_env()
131 | assert jira_config.http_proxy == "http://proxy.company.com:8080"
132 | assert jira_config.https_proxy == "https://proxy.company.com:8443"
133 | assert jira_config.no_proxy == "*.internal.com,localhost"
134 |
135 | # Confluence should pick up proxy settings
136 | confluence_config = ConfluenceConfig.from_env()
137 | assert confluence_config.http_proxy == "http://proxy.company.com:8080"
138 | assert confluence_config.https_proxy == "https://proxy.company.com:8443"
139 | assert confluence_config.no_proxy == "*.internal.com,localhost"
140 |
141 | @pytest.mark.integration
142 | def test_proxy_authentication_in_url(self):
143 | """Test proxy URLs with authentication credentials."""
144 | config = JiraConfig(
145 | url="https://test.atlassian.net",
146 | auth_type="basic",
147 | username="user",
148 | api_token="token",
149 | http_proxy="http://proxyuser:[email protected]:8080",
150 | https_proxy="https://proxyuser:[email protected]:8443",
151 | )
152 |
153 | # Verify proxy URLs contain authentication
154 | assert "proxyuser:proxypass" in config.http_proxy
155 | assert "proxyuser:proxypass" in config.https_proxy
156 |
157 | @pytest.mark.integration
158 | def test_socks_proxy_configuration(self, monkeypatch):
159 | """Test SOCKS proxy configuration for both services."""
160 | mock_jira = MagicMock()
161 | mock_session = MagicMock()
162 | # Create a proper proxies dictionary that can be updated
163 | mock_session.proxies = {}
164 | mock_jira._session = mock_session
165 | monkeypatch.setattr(
166 | "mcp_atlassian.jira.client.Jira", lambda **kwargs: mock_jira
167 | )
168 | monkeypatch.setattr(
169 | "mcp_atlassian.jira.client.configure_ssl_verification",
170 | lambda **kwargs: None,
171 | )
172 |
173 | # Test SOCKS5 proxy
174 | config = JiraConfig(
175 | url="https://test.atlassian.net",
176 | auth_type="basic",
177 | username="user",
178 | api_token="token",
179 | socks_proxy="socks5://socksuser:[email protected]:1080",
180 | )
181 |
182 | client = JiraClient(config=config)
183 | assert (
184 | mock_session.proxies["socks"]
185 | == "socks5://socksuser:[email protected]:1080"
186 | )
187 |
188 | @pytest.mark.integration
189 | def test_proxy_bypass_for_internal_domains(self, monkeypatch):
190 | """Test that requests to NO_PROXY domains bypass the proxy."""
191 | # Set up environment
192 | monkeypatch.setenv("NO_PROXY", "*.internal.com,localhost,127.0.0.1")
193 |
194 | config = JiraConfig(
195 | url="https://jira.internal.com", # Internal domain
196 | auth_type="basic",
197 | username="user",
198 | api_token="token",
199 | http_proxy="http://proxy.company.com:8080",
200 | no_proxy="*.internal.com,localhost,127.0.0.1",
201 | )
202 |
203 | # Verify NO_PROXY is set in environment
204 | assert os.environ["NO_PROXY"] == "*.internal.com,localhost,127.0.0.1"
205 | assert "internal.com" in config.no_proxy
206 |
207 | @pytest.mark.integration
208 | def test_proxy_error_handling(self, monkeypatch):
209 | """Test proper error handling when proxy connection fails."""
210 | # Mock to simulate proxy connection failure
211 | mock_jira = MagicMock()
212 | mock_jira.side_effect = ProxyError("Unable to connect to proxy")
213 | monkeypatch.setattr("mcp_atlassian.jira.client.Jira", mock_jira)
214 |
215 | config = JiraConfig(
216 | url="https://test.atlassian.net",
217 | auth_type="basic",
218 | username="user",
219 | api_token="token",
220 | http_proxy="http://unreachable.proxy.com:8080",
221 | )
222 |
223 | # Creating client should raise proxy error
224 | with pytest.raises(ProxyError, match="Unable to connect to proxy"):
225 | JiraClient(config=config)
226 |
227 | @pytest.mark.integration
228 | def test_proxy_configuration_precedence(self):
229 | """Test that explicit proxy config takes precedence over environment."""
230 | with patch.dict(
231 | os.environ,
232 | {
233 | "HTTP_PROXY": "http://env.proxy.com:8080",
234 | "HTTPS_PROXY": "https://env.proxy.com:8443",
235 | },
236 | ):
237 | # Explicit configuration should override environment
238 | config = JiraConfig(
239 | url="https://test.atlassian.net",
240 | auth_type="basic",
241 | username="user",
242 | api_token="token",
243 | http_proxy="http://explicit.proxy.com:8080",
244 | https_proxy="https://explicit.proxy.com:8443",
245 | )
246 |
247 | assert config.http_proxy == "http://explicit.proxy.com:8080"
248 | assert config.https_proxy == "https://explicit.proxy.com:8443"
249 |
250 | @pytest.mark.integration
251 | def test_mixed_proxy_and_ssl_configuration(self, monkeypatch):
252 | """Test proxy configuration works correctly with SSL verification disabled."""
253 | mock_confluence = MagicMock()
254 | mock_session = MagicMock()
255 | # Create a proper proxies dictionary that can be updated
256 | mock_session.proxies = {}
257 | mock_confluence._session = mock_session
258 | monkeypatch.setattr(
259 | "mcp_atlassian.confluence.client.Confluence",
260 | lambda **kwargs: mock_confluence,
261 | )
262 | monkeypatch.setattr(
263 | "mcp_atlassian.confluence.client.configure_ssl_verification",
264 | lambda **kwargs: None,
265 | )
266 | monkeypatch.setattr(
267 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
268 | lambda **kwargs: MagicMock(),
269 | )
270 |
271 | # Configure with both proxy and SSL disabled
272 | config = ConfluenceConfig(
273 | url="https://test.atlassian.net/wiki",
274 | auth_type="basic",
275 | username="user",
276 | api_token="token",
277 | http_proxy="http://proxy.company.com:8080",
278 | ssl_verify=False,
279 | )
280 |
281 | client = ConfluenceClient(config=config)
282 |
283 | # Both proxy and SSL settings should be applied
284 | assert mock_session.proxies["http"] == "http://proxy.company.com:8080"
285 | assert config.ssl_verify is False
286 |
287 | @pytest.mark.integration
288 | def test_proxy_with_oauth_configuration(self):
289 | """Test proxy configuration works with OAuth authentication."""
290 | with MockEnvironment.oauth_env() as env_vars:
291 | # Add proxy configuration to env_vars directly, then patch os.environ
292 | proxy_vars = {
293 | "HTTP_PROXY": "http://proxy.company.com:8080",
294 | "HTTPS_PROXY": "https://proxy.company.com:8443",
295 | "NO_PROXY": "localhost,127.0.0.1",
296 | }
297 |
298 | # Merge with OAuth env vars
299 | all_vars = {**env_vars, **proxy_vars}
300 |
301 | # Use patch.dict to ensure environment variables are set
302 | with patch.dict(os.environ, all_vars):
303 | # OAuth should still respect proxy settings
304 | assert os.environ.get("HTTP_PROXY") == "http://proxy.company.com:8080"
305 | assert os.environ.get("HTTPS_PROXY") == "https://proxy.company.com:8443"
306 | assert os.environ.get("NO_PROXY") == "localhost,127.0.0.1"
307 |
```
--------------------------------------------------------------------------------
/scripts/oauth_authorize.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | """
3 | OAuth 2.0 Authorization Flow Helper for MCP Atlassian
4 |
5 | This script helps with the OAuth 2.0 (3LO) authorization flow for Atlassian Cloud:
6 | 1. Opens a browser to the authorization URL
7 | 2. Starts a local server to receive the callback with the authorization code
8 | 3. Exchanges the authorization code for access and refresh tokens
9 | 4. Saves the tokens for later use by MCP Atlassian
10 |
11 | Usage:
12 | python oauth_authorize.py --client-id YOUR_CLIENT_ID --client-secret YOUR_CLIENT_SECRET
13 | --redirect-uri http://localhost:8080/callback
14 | --scope "read:jira-work write:jira-work read:confluence-space.summary offline_access"
15 |
16 | IMPORTANT: The 'offline_access' scope is required for refresh tokens to work properly.
17 | Without this scope, tokens will expire quickly and authentication will fail.
18 |
19 | Environment variables can also be used:
20 | - ATLASSIAN_OAUTH_CLIENT_ID
21 | - ATLASSIAN_OAUTH_CLIENT_SECRET
22 | - ATLASSIAN_OAUTH_REDIRECT_URI
23 | - ATLASSIAN_OAUTH_SCOPE
24 | """
25 |
26 | import argparse
27 | import http.server
28 | import logging
29 | import os
30 | import secrets
31 | import socketserver
32 | import sys
33 | import threading
34 | import time
35 | import urllib.parse
36 | import webbrowser
37 |
38 | # Add the parent directory to the path so we can import the package
39 | sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
40 |
41 | from src.mcp_atlassian.utils.oauth import OAuthConfig
42 |
43 | # Configure logging (basicConfig should be called only once, ideally at the very start)
44 | # Adding lineno for better debugging.
45 | logging.basicConfig(
46 | level=logging.DEBUG,
47 | format="%(asctime)s - %(levelname)s - %(name)s - %(lineno)d - %(message)s",
48 | force=True,
49 | )
50 |
51 |
52 | logger = logging.getLogger("oauth-authorize")
53 | logger.setLevel(logging.DEBUG)
54 | logging.getLogger("mcp-atlassian.oauth").setLevel(logging.DEBUG)
55 |
56 | # Global variables for callback handling
57 | authorization_code = None
58 | received_state = None
59 | callback_received = False
60 | callback_error = None
61 |
62 |
63 | class CallbackHandler(http.server.BaseHTTPRequestHandler):
64 | """HTTP request handler for OAuth callback."""
65 |
66 | def do_GET(self) -> None: # noqa: N802
67 | """Handle GET requests (OAuth callback and favicon)."""
68 | global authorization_code, callback_received, callback_error, received_state
69 |
70 | parsed_path = urllib.parse.urlparse(self.path)
71 | logger.debug(f"CallbackHandler received GET request for: {self.path}")
72 |
73 | # Ignore favicon requests politely
74 | if parsed_path.path == "/favicon.ico":
75 | self.send_error(404, "File not found")
76 | logger.debug("CallbackHandler: Ignored /favicon.ico request.")
77 | return
78 |
79 | # Process only /callback path
80 | if parsed_path.path != "/callback":
81 | self.send_error(404, "Not Found: Only /callback is supported.")
82 | logger.warning(
83 | f"CallbackHandler: Received request for unexpected path: {parsed_path.path}"
84 | )
85 | return
86 |
87 | # Parse the query parameters from the URL
88 | query = parsed_path.query
89 | params = urllib.parse.parse_qs(query)
90 |
91 | if "error" in params:
92 | callback_error = params["error"][0]
93 | callback_received = True
94 | logger.error(f"Authorization error from callback: {callback_error}")
95 | self._send_response(f"Authorization failed: {callback_error}")
96 | return
97 |
98 | if "code" in params:
99 | authorization_code = params["code"][0]
100 | if "state" in params:
101 | received_state = params["state"][0]
102 | callback_received = True
103 | logger.info(
104 | "Authorization code and state received successfully via callback."
105 | )
106 | self._send_response(
107 | "Authorization successful! You can close this window now."
108 | )
109 | else:
110 | logger.error("Invalid callback: 'code' or 'error' parameter missing.")
111 | self._send_response(
112 | "Invalid callback: Authorization code missing", status=400
113 | )
114 |
115 | def _send_response(self, message: str, status: int = 200) -> None:
116 | """Send response to the browser."""
117 | self.send_response(status)
118 | self.send_header("Content-type", "text/html")
119 | self.end_headers()
120 |
121 | html = f"""
122 | <!DOCTYPE html>
123 | <html>
124 | <head>
125 | <title>Atlassian OAuth Authorization</title>
126 | <style>
127 | body {{
128 | font-family: Arial, sans-serif;
129 | text-align: center;
130 | padding: 40px;
131 | max-width: 600px;
132 | margin: 0 auto;
133 | }}
134 | .message {{
135 | padding: 20px;
136 | border-radius: 5px;
137 | margin-bottom: 20px;
138 | }}
139 | .success {{
140 | background-color: #d4edda;
141 | color: #155724;
142 | border: 1px solid #c3e6cb;
143 | }}
144 | .error {{
145 | background-color: #f8d7da;
146 | color: #721c24;
147 | border: 1px solid #f5c6cb;
148 | }}
149 | </style>
150 | </head>
151 | <body>
152 | <h1>Atlassian OAuth Authorization</h1>
153 | <div class="message {"success" if status == 200 else "error"}">
154 | <p>{message}</p>
155 | </div>
156 | <p>This window will automatically close in 5 seconds...</p>
157 | <script>
158 | setTimeout(function() {{
159 | window.close();
160 | }}, 5000);
161 | </script>
162 | </body>
163 | </html>
164 | """
165 | self.wfile.write(html.encode())
166 |
167 | # Make the server quiet
168 | def log_message(self, format: str, *args: str) -> None:
169 | return
170 |
171 |
172 | def start_callback_server(port: int) -> socketserver.TCPServer:
173 | """Start a local server to receive the OAuth callback."""
174 | handler = CallbackHandler
175 | httpd = socketserver.TCPServer(("", port), handler)
176 | server_thread = threading.Thread(target=httpd.serve_forever)
177 | server_thread.daemon = True
178 | server_thread.start()
179 | return httpd
180 |
181 |
182 | def wait_for_callback(timeout: int = 300) -> bool:
183 | """Wait for the callback to be received."""
184 | start_time = time.time()
185 | while not callback_received and (time.time() - start_time) < timeout:
186 | time.sleep(1)
187 |
188 | if not callback_received:
189 | logger.error(
190 | f"Timed out waiting for authorization callback after {timeout} seconds"
191 | )
192 | return False
193 |
194 | if callback_error:
195 | logger.error(f"Authorization error: {callback_error}")
196 | return False
197 |
198 | return True
199 |
200 |
201 | def parse_redirect_uri(redirect_uri: str) -> tuple[str, int]:
202 | """Parse the redirect URI to extract host and port."""
203 | parsed = urllib.parse.urlparse(redirect_uri)
204 | port = parsed.port or (443 if parsed.scheme == "https" else 80)
205 | return parsed.hostname, port
206 |
207 |
208 | def run_oauth_flow(args: argparse.Namespace) -> bool:
209 | """Run the OAuth 2.0 authorization flow."""
210 | # Create OAuth configuration
211 | oauth_config = OAuthConfig(
212 | client_id=args.client_id,
213 | client_secret=args.client_secret,
214 | redirect_uri=args.redirect_uri,
215 | scope=args.scope,
216 | )
217 |
218 | # Generate a random state for CSRF protection
219 | state = secrets.token_urlsafe(16)
220 |
221 | # Start local callback server if using localhost
222 | hostname, port = parse_redirect_uri(args.redirect_uri)
223 | httpd = None
224 |
225 | if hostname and hostname.lower() in ["localhost", "127.0.0.1"]:
226 | logger.info(f"Attempting to start local callback server on {hostname}:{port}")
227 | try:
228 | httpd = start_callback_server(port)
229 | except OSError as e:
230 | logger.error(f"Failed to start callback server: {e}")
231 | logger.error(f"Make sure port {port} is available and not in use")
232 | return False
233 |
234 | # Get the authorization URL
235 | auth_url = oauth_config.get_authorization_url(state=state)
236 |
237 | # Open the browser for authorization
238 | logger.info(f"Opening browser for authorization at {auth_url}")
239 | webbrowser.open(auth_url)
240 | logger.info(
241 | "If the browser doesn't open automatically, please visit this URL manually."
242 | )
243 |
244 | # Wait for the callback
245 | if not wait_for_callback():
246 | if httpd:
247 | httpd.shutdown()
248 | return False
249 |
250 | # Verify state to prevent CSRF attacks
251 | if received_state != state:
252 | logger.error(
253 | f"State mismatch! Possible CSRF attack. Expected: {state}, Received: {received_state}"
254 | )
255 | if httpd:
256 | httpd.shutdown()
257 | return False
258 | logger.info("CSRF state verified successfully.")
259 |
260 | # Exchange the code for tokens
261 | logger.info("Exchanging authorization code for tokens...")
262 | if not authorization_code:
263 | logger.error("Authorization code is missing, cannot exchange for tokens.")
264 | if httpd:
265 | httpd.shutdown()
266 | return False
267 |
268 | if oauth_config.exchange_code_for_tokens(authorization_code):
269 | logger.info("🎉 OAuth authorization flow completed successfully!")
270 |
271 | if oauth_config.cloud_id:
272 | logger.info(f"Retrieved Cloud ID: {oauth_config.cloud_id}")
273 | logger.info(
274 | "\n💡 Tip: Add/update the following in your .env file or environment variables:"
275 | )
276 | logger.info(f"ATLASSIAN_OAUTH_CLIENT_ID={oauth_config.client_id}")
277 | logger.info(f"ATLASSIAN_OAUTH_CLIENT_SECRET={oauth_config.client_secret}")
278 | logger.info(f"ATLASSIAN_OAUTH_REDIRECT_URI={oauth_config.redirect_uri}")
279 | logger.info(f"ATLASSIAN_OAUTH_SCOPE={oauth_config.scope}")
280 | logger.info(f"ATLASSIAN_OAUTH_CLOUD_ID={oauth_config.cloud_id}")
281 | else:
282 | logger.warning(
283 | "Cloud ID could not be obtained. Some API calls might require it."
284 | )
285 |
286 | if httpd:
287 | httpd.shutdown()
288 | return True
289 | else:
290 | logger.error("Failed to exchange authorization code for tokens")
291 | if httpd:
292 | httpd.shutdown()
293 | return False
294 |
295 |
296 | def main() -> int:
297 | """Main entry point."""
298 | parser = argparse.ArgumentParser(
299 | description="OAuth 2.0 Authorization Flow Helper for MCP Atlassian"
300 | )
301 | parser.add_argument("--client-id", help="OAuth Client ID")
302 | parser.add_argument("--client-secret", help="OAuth Client Secret")
303 | parser.add_argument(
304 | "--redirect-uri",
305 | help="OAuth Redirect URI (e.g., http://localhost:8080/callback)",
306 | )
307 | parser.add_argument("--scope", help="OAuth Scope (space-separated)")
308 |
309 | args = parser.parse_args()
310 |
311 | # Check for environment variables if arguments are not provided
312 | if not args.client_id:
313 | args.client_id = os.getenv("ATLASSIAN_OAUTH_CLIENT_ID")
314 | if not args.client_secret:
315 | args.client_secret = os.getenv("ATLASSIAN_OAUTH_CLIENT_SECRET")
316 | if not args.redirect_uri:
317 | args.redirect_uri = os.getenv("ATLASSIAN_OAUTH_REDIRECT_URI")
318 | if not args.scope:
319 | args.scope = os.getenv("ATLASSIAN_OAUTH_SCOPE")
320 |
321 | # Validate required arguments
322 | missing = []
323 | if not args.client_id:
324 | missing.append("client-id")
325 | if not args.client_secret:
326 | missing.append("client-secret")
327 | if not args.redirect_uri:
328 | missing.append("redirect-uri")
329 | if not args.scope:
330 | missing.append("scope")
331 |
332 | if missing:
333 | logger.error(f"Missing required arguments: {', '.join(missing)}")
334 | parser.print_help()
335 | return 1
336 |
337 | # Check for offline_access scope
338 | if args.scope and "offline_access" not in args.scope.split():
339 | logger.warning("\n⚠️ WARNING: The 'offline_access' scope is missing!")
340 | logger.warning(
341 | "Without this scope, refresh tokens will not be issued and authentication will fail when tokens expire."
342 | )
343 | logger.warning("Consider adding 'offline_access' to your scope string.")
344 | proceed = input("Do you want to proceed anyway? (y/n): ")
345 | if proceed.lower() != "y":
346 | return 1
347 |
348 | success = run_oauth_flow(args)
349 | return 0 if success else 1
350 |
351 |
352 | if __name__ == "__main__":
353 | sys.exit(main())
354 |
```