This is page 3 of 13. Use http://codebase.md/sooperset/mcp-atlassian?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .devcontainer
│ ├── devcontainer.json
│ ├── Dockerfile
│ ├── post-create.sh
│ └── post-start.sh
├── .dockerignore
├── .env.example
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ └── feature_request.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── docker-publish.yml
│ ├── lint.yml
│ ├── publish.yml
│ ├── stale.yml
│ └── tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── AGENTS.md
├── CONTRIBUTING.md
├── Dockerfile
├── LICENSE
├── pyproject.toml
├── README.md
├── scripts
│ ├── oauth_authorize.py
│ └── test_with_real_data.sh
├── SECURITY.md
├── smithery.yaml
├── src
│ └── mcp_atlassian
│ ├── __init__.py
│ ├── confluence
│ │ ├── __init__.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── labels.py
│ │ ├── pages.py
│ │ ├── search.py
│ │ ├── spaces.py
│ │ ├── users.py
│ │ ├── utils.py
│ │ └── v2_adapter.py
│ ├── exceptions.py
│ ├── jira
│ │ ├── __init__.py
│ │ ├── attachments.py
│ │ ├── boards.py
│ │ ├── client.py
│ │ ├── comments.py
│ │ ├── config.py
│ │ ├── constants.py
│ │ ├── epics.py
│ │ ├── fields.py
│ │ ├── formatting.py
│ │ ├── issues.py
│ │ ├── links.py
│ │ ├── projects.py
│ │ ├── protocols.py
│ │ ├── search.py
│ │ ├── sprints.py
│ │ ├── transitions.py
│ │ ├── users.py
│ │ └── worklog.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── comment.py
│ │ │ ├── common.py
│ │ │ ├── label.py
│ │ │ ├── page.py
│ │ │ ├── search.py
│ │ │ ├── space.py
│ │ │ └── user_search.py
│ │ ├── constants.py
│ │ └── jira
│ │ ├── __init__.py
│ │ ├── agile.py
│ │ ├── comment.py
│ │ ├── common.py
│ │ ├── issue.py
│ │ ├── link.py
│ │ ├── project.py
│ │ ├── search.py
│ │ ├── version.py
│ │ ├── workflow.py
│ │ └── worklog.py
│ ├── preprocessing
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── confluence.py
│ │ └── jira.py
│ ├── servers
│ │ ├── __init__.py
│ │ ├── confluence.py
│ │ ├── context.py
│ │ ├── dependencies.py
│ │ ├── jira.py
│ │ └── main.py
│ └── utils
│ ├── __init__.py
│ ├── date.py
│ ├── decorators.py
│ ├── env.py
│ ├── environment.py
│ ├── io.py
│ ├── lifecycle.py
│ ├── logging.py
│ ├── oauth_setup.py
│ ├── oauth.py
│ ├── ssl.py
│ ├── tools.py
│ └── urls.py
├── tests
│ ├── __init__.py
│ ├── conftest.py
│ ├── fixtures
│ │ ├── __init__.py
│ │ ├── confluence_mocks.py
│ │ └── jira_mocks.py
│ ├── integration
│ │ ├── conftest.py
│ │ ├── README.md
│ │ ├── test_authentication.py
│ │ ├── test_content_processing.py
│ │ ├── test_cross_service.py
│ │ ├── test_mcp_protocol.py
│ │ ├── test_proxy.py
│ │ ├── test_real_api.py
│ │ ├── test_ssl_verification.py
│ │ ├── test_stdin_monitoring_fix.py
│ │ └── test_transport_lifecycle.py
│ ├── README.md
│ ├── test_preprocessing.py
│ ├── test_real_api_validation.py
│ ├── unit
│ │ ├── confluence
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_labels.py
│ │ │ ├── test_pages.py
│ │ │ ├── test_search.py
│ │ │ ├── test_spaces.py
│ │ │ ├── test_users.py
│ │ │ ├── test_utils.py
│ │ │ └── test_v2_adapter.py
│ │ ├── jira
│ │ │ ├── conftest.py
│ │ │ ├── test_attachments.py
│ │ │ ├── test_boards.py
│ │ │ ├── test_client_oauth.py
│ │ │ ├── test_client.py
│ │ │ ├── test_comments.py
│ │ │ ├── test_config.py
│ │ │ ├── test_constants.py
│ │ │ ├── test_custom_headers.py
│ │ │ ├── test_epics.py
│ │ │ ├── test_fields.py
│ │ │ ├── test_formatting.py
│ │ │ ├── test_issues_markdown.py
│ │ │ ├── test_issues.py
│ │ │ ├── test_links.py
│ │ │ ├── test_projects.py
│ │ │ ├── test_protocols.py
│ │ │ ├── test_search.py
│ │ │ ├── test_sprints.py
│ │ │ ├── test_transitions.py
│ │ │ ├── test_users.py
│ │ │ └── test_worklog.py
│ │ ├── models
│ │ │ ├── __init__.py
│ │ │ ├── conftest.py
│ │ │ ├── test_base_models.py
│ │ │ ├── test_confluence_models.py
│ │ │ ├── test_constants.py
│ │ │ └── test_jira_models.py
│ │ ├── servers
│ │ │ ├── __init__.py
│ │ │ ├── test_confluence_server.py
│ │ │ ├── test_context.py
│ │ │ ├── test_dependencies.py
│ │ │ ├── test_jira_server.py
│ │ │ └── test_main_server.py
│ │ ├── test_exceptions.py
│ │ ├── test_main_transport_selection.py
│ │ └── utils
│ │ ├── __init__.py
│ │ ├── test_custom_headers.py
│ │ ├── test_date.py
│ │ ├── test_decorators.py
│ │ ├── test_env.py
│ │ ├── test_environment.py
│ │ ├── test_io.py
│ │ ├── test_lifecycle.py
│ │ ├── test_logging.py
│ │ ├── test_masking.py
│ │ ├── test_oauth_setup.py
│ │ ├── test_oauth.py
│ │ ├── test_ssl.py
│ │ ├── test_tools.py
│ │ └── test_urls.py
│ └── utils
│ ├── __init__.py
│ ├── assertions.py
│ ├── base.py
│ ├── factories.py
│ └── mocks.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/comments.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Confluence comment operations."""
2 |
3 | import logging
4 |
5 | import requests
6 |
7 | from ..models.confluence import ConfluenceComment
8 | from .client import ConfluenceClient
9 |
10 | logger = logging.getLogger("mcp-atlassian")
11 |
12 |
13 | class CommentsMixin(ConfluenceClient):
14 | """Mixin for Confluence comment operations."""
15 |
16 | def get_page_comments(
17 | self, page_id: str, *, return_markdown: bool = True
18 | ) -> list[ConfluenceComment]:
19 | """
20 | Get all comments for a specific page.
21 |
22 | Args:
23 | page_id: The ID of the page to get comments from
24 | return_markdown: When True, returns content in markdown format,
25 | otherwise returns raw HTML (keyword-only)
26 |
27 | Returns:
28 | List of ConfluenceComment models containing comment content and metadata
29 | """
30 | try:
31 | # Get page info to extract space details
32 | page = self.confluence.get_page_by_id(page_id=page_id, expand="space")
33 | space_key = page.get("space", {}).get("key", "")
34 |
35 | # Get comments with expanded content
36 | comments_response = self.confluence.get_page_comments(
37 | content_id=page_id, expand="body.view.value,version", depth="all"
38 | )
39 |
40 | # Process each comment
41 | comment_models = []
42 | for comment_data in comments_response.get("results", []):
43 | # Get the content based on format
44 | body = comment_data["body"]["view"]["value"]
45 | processed_html, processed_markdown = (
46 | self.preprocessor.process_html_content(
47 | body, space_key=space_key, confluence_client=self.confluence
48 | )
49 | )
50 |
51 | # Create a copy of the comment data to modify
52 | modified_comment_data = comment_data.copy()
53 |
54 | # Modify the body value based on the return format
55 | if "body" not in modified_comment_data:
56 | modified_comment_data["body"] = {}
57 | if "view" not in modified_comment_data["body"]:
58 | modified_comment_data["body"]["view"] = {}
59 |
60 | # Set the appropriate content based on return format
61 | modified_comment_data["body"]["view"]["value"] = (
62 | processed_markdown if return_markdown else processed_html
63 | )
64 |
65 | # Create the model with the processed content
66 | comment_model = ConfluenceComment.from_api_response(
67 | modified_comment_data,
68 | base_url=self.config.url,
69 | )
70 |
71 | comment_models.append(comment_model)
72 |
73 | return comment_models
74 |
75 | except KeyError as e:
76 | logger.error(f"Missing key in comment data: {str(e)}")
77 | return []
78 | except requests.RequestException as e:
79 | logger.error(f"Network error when fetching comments: {str(e)}")
80 | return []
81 | except (ValueError, TypeError) as e:
82 | logger.error(f"Error processing comment data: {str(e)}")
83 | return []
84 | except Exception as e: # noqa: BLE001 - Intentional fallback with full logging
85 | logger.error(f"Unexpected error fetching comments: {str(e)}")
86 | logger.debug("Full exception details for comments:", exc_info=True)
87 | return []
88 |
89 | def add_comment(self, page_id: str, content: str) -> ConfluenceComment | None:
90 | """
91 | Add a comment to a Confluence page.
92 |
93 | Args:
94 | page_id: The ID of the page to add the comment to
95 | content: The content of the comment (in Confluence storage format)
96 |
97 | Returns:
98 | ConfluenceComment object if comment was added successfully, None otherwise
99 | """
100 | try:
101 | # Get page info to extract space details
102 | page = self.confluence.get_page_by_id(page_id=page_id, expand="space")
103 | space_key = page.get("space", {}).get("key", "")
104 |
105 | # Convert markdown to Confluence storage format if needed
106 | # The atlassian-python-api expects content in Confluence storage format
107 | if not content.strip().startswith("<"):
108 | # If content doesn't appear to be HTML/XML, treat it as markdown
109 | content = self.preprocessor.markdown_to_confluence_storage(content)
110 |
111 | # Add the comment via the Confluence API
112 | response = self.confluence.add_comment(page_id, content)
113 |
114 | if not response:
115 | logger.error("Failed to add comment: empty response")
116 | return None
117 |
118 | # Process the comment to return a consistent model
119 | processed_html, processed_markdown = self.preprocessor.process_html_content(
120 | response.get("body", {}).get("view", {}).get("value", ""),
121 | space_key=space_key,
122 | confluence_client=self.confluence,
123 | )
124 |
125 | # Modify the response to include processed content
126 | modified_response = response.copy()
127 | if "body" not in modified_response:
128 | modified_response["body"] = {}
129 | if "view" not in modified_response["body"]:
130 | modified_response["body"]["view"] = {}
131 |
132 | modified_response["body"]["view"]["value"] = processed_markdown
133 |
134 | # Create and return the comment model
135 | return ConfluenceComment.from_api_response(
136 | modified_response,
137 | base_url=self.config.url,
138 | )
139 |
140 | except requests.RequestException as e:
141 | logger.error(f"Network error when adding comment: {str(e)}")
142 | return None
143 | except (ValueError, TypeError, KeyError) as e:
144 | logger.error(f"Error processing comment data: {str(e)}")
145 | return None
146 | except Exception as e: # noqa: BLE001 - Intentional fallback with full logging
147 | logger.error(f"Unexpected error adding comment: {str(e)}")
148 | logger.debug("Full exception details for adding comment:", exc_info=True)
149 | return None
150 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/protocols.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira protocol definitions."""
2 |
3 | from abc import abstractmethod
4 | from typing import Any, Protocol, runtime_checkable
5 |
6 | from ..models.jira import JiraIssue
7 | from ..models.jira.search import JiraSearchResult
8 |
9 |
10 | class AttachmentsOperationsProto(Protocol):
11 | """Protocol defining attachments operations interface."""
12 |
13 | @abstractmethod
14 | def upload_attachments(
15 | self, issue_key: str, file_paths: list[str]
16 | ) -> dict[str, Any]:
17 | """
18 | Upload multiple attachments to a Jira issue.
19 |
20 | Args:
21 | issue_key: The Jira issue key (e.g., 'PROJ-123')
22 | file_paths: List of paths to files to upload
23 |
24 | Returns:
25 | A dictionary with upload results
26 | """
27 |
28 |
29 | class IssueOperationsProto(Protocol):
30 | """Protocol defining issue operations interface."""
31 |
32 | @abstractmethod
33 | def get_issue(
34 | self,
35 | issue_key: str,
36 | expand: str | None = None,
37 | comment_limit: int | str | None = 10,
38 | fields: str
39 | | list[str]
40 | | tuple[str, ...]
41 | | set[str]
42 | | None = "summary,description,status,assignee,reporter,labels,priority,created,updated,issuetype",
43 | properties: str | list[str] | None = None,
44 | update_history: bool = True,
45 | ) -> JiraIssue:
46 | """Get a Jira issue by key."""
47 |
48 |
49 | class SearchOperationsProto(Protocol):
50 | """Protocol defining search operations interface."""
51 |
52 | @abstractmethod
53 | def search_issues(
54 | self,
55 | jql: str,
56 | fields: str
57 | | list[str]
58 | | tuple[str, ...]
59 | | set[str]
60 | | None = "summary,description,status,assignee,reporter,labels,priority,created,updated,issuetype",
61 | start: int = 0,
62 | limit: int = 50,
63 | expand: str | None = None,
64 | projects_filter: str | None = None,
65 | ) -> JiraSearchResult:
66 | """Search for issues using JQL."""
67 |
68 |
69 | class EpicOperationsProto(Protocol):
70 | """Protocol defining epic operations interface."""
71 |
72 | @abstractmethod
73 | def update_epic_fields(self, issue_key: str, kwargs: dict[str, Any]) -> JiraIssue:
74 | """
75 | Update Epic-specific fields after Epic creation.
76 |
77 | This method implements the second step of the two-step Epic creation process,
78 | applying Epic-specific fields that may be rejected during initial creation
79 | due to screen configuration restrictions.
80 |
81 | Args:
82 | issue_key: The key of the created Epic
83 | kwargs: Dictionary containing special keys with Epic field information
84 |
85 | Returns:
86 | JiraIssue: The updated Epic
87 |
88 | Raises:
89 | Exception: If there is an error updating the Epic fields
90 | """
91 |
92 | @abstractmethod
93 | def prepare_epic_fields(
94 | self,
95 | fields: dict[str, Any],
96 | summary: str,
97 | kwargs: dict[str, Any],
98 | project_key: str = None,
99 | ) -> None:
100 | """
101 | Prepare epic-specific fields for issue creation.
102 |
103 | Args:
104 | fields: The fields dictionary to update
105 | summary: The issue summary that can be used as a default epic name
106 | kwargs: Additional fields from the user
107 | """
108 |
109 | @abstractmethod
110 | def _try_discover_fields_from_existing_epic(
111 | self, field_ids: dict[str, str]
112 | ) -> None:
113 | """
114 | Try to discover Epic fields from existing epics in the Jira instance.
115 |
116 | This is a fallback method used when standard field discovery doesn't find
117 | all the necessary Epic-related fields. It searches for an existing Epic and
118 | analyzes its field structure to identify Epic fields dynamically.
119 |
120 | Args:
121 | field_ids: Dictionary of field IDs to update with discovered fields
122 | """
123 |
124 |
125 | class FieldsOperationsProto(Protocol):
126 | """Protocol defining fields operations interface."""
127 |
128 | @abstractmethod
129 | def _generate_field_map(self, force_regenerate: bool = False) -> dict[str, str]:
130 | """
131 | Generates and caches a map of lowercase field names to field IDs.
132 |
133 | Args:
134 | force_regenerate: If True, forces regeneration even if cache exists.
135 |
136 | Returns:
137 | A dictionary mapping lowercase field names and field IDs to actual field IDs.
138 | """
139 |
140 | @abstractmethod
141 | def get_field_by_id(
142 | self, field_id: str, refresh: bool = False
143 | ) -> dict[str, Any] | None:
144 | """
145 | Get field definition by ID.
146 | """
147 |
148 | @abstractmethod
149 | def get_field_ids_to_epic(self) -> dict[str, str]:
150 | """
151 | Dynamically discover Jira field IDs relevant to Epic linking.
152 | This method queries the Jira API to find the correct custom field IDs
153 | for Epic-related fields, which can vary between different Jira instances.
154 |
155 | Returns:
156 | Dictionary mapping field names to their IDs
157 | (e.g., {'epic_link': 'customfield_10014', 'epic_name': 'customfield_10011'})
158 | """
159 |
160 | @abstractmethod
161 | def get_required_fields(self, issue_type: str, project_key: str) -> dict[str, Any]:
162 | """
163 | Get required fields for creating an issue of a specific type in a project.
164 |
165 | Args:
166 | issue_type: The issue type (e.g., 'Bug', 'Story', 'Epic')
167 | project_key: The project key (e.g., 'PROJ')
168 |
169 | Returns:
170 | Dictionary mapping required field names to their definitions
171 | """
172 |
173 |
174 | @runtime_checkable
175 | class ProjectsOperationsProto(Protocol):
176 | """Protocol defining project operations interface."""
177 |
178 | @abstractmethod
179 | def get_project_issue_types(self, project_key: str) -> list[dict[str, Any]]:
180 | """
181 | Get all issue types available for a project.
182 |
183 | Args:
184 | project_key: The project key
185 |
186 | Returns:
187 | List of issue type data dictionaries
188 | """
189 |
190 |
191 | @runtime_checkable
192 | class UsersOperationsProto(Protocol):
193 | """Protocol defining user operations interface."""
194 |
195 | @abstractmethod
196 | def _get_account_id(self, assignee: str) -> str:
197 | """Get the account ID for a username.
198 |
199 | Args:
200 | assignee: Username or account ID
201 |
202 | Returns:
203 | Account ID
204 |
205 | Raises:
206 | ValueError: If the account ID could not be found
207 | """
208 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/sprints.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira sprints operations."""
2 |
3 | import datetime
4 | import logging
5 | from typing import Any
6 |
7 | import requests
8 |
9 | from ..models.jira import JiraSprint
10 | from ..utils import parse_date
11 | from .client import JiraClient
12 |
13 | logger = logging.getLogger("mcp-jira")
14 |
15 |
16 | class SprintsMixin(JiraClient):
17 | """Mixin for Jira sprints operations."""
18 |
19 | def get_all_sprints_from_board(
20 | self, board_id: str, state: str | None = None, start: int = 0, limit: int = 50
21 | ) -> list[dict[str, Any]]:
22 | """
23 | Get all sprints from a board.
24 |
25 | Args:
26 | board_id: Board ID
27 | state: Sprint state (e.g., active, future, closed) if None, return all state sprints
28 | start: Start index
29 | limit: Maximum number of sprints to return
30 |
31 | Returns:
32 | List of sprints
33 | """
34 | try:
35 | sprints = self.jira.get_all_sprints_from_board(
36 | board_id=board_id,
37 | state=state,
38 | start=start,
39 | limit=limit,
40 | )
41 | return sprints.get("values", []) if isinstance(sprints, dict) else []
42 | except requests.HTTPError as e:
43 | logger.error(
44 | f"Error getting all sprints from board: {str(e.response.content)}"
45 | )
46 | return []
47 | except Exception as e:
48 | logger.error(f"Error getting all sprints from board: {str(e)}")
49 | return []
50 |
51 | def get_all_sprints_from_board_model(
52 | self, board_id: str, state: str | None = None, start: int = 0, limit: int = 50
53 | ) -> list[JiraSprint]:
54 | """
55 | Get all sprints as JiraSprint from a board.
56 |
57 | Args:
58 | board_id: Board ID
59 | state: Sprint state (e.g., active, future, closed) if None, return all state sprints
60 | start: Start index
61 | limit: Maximum number of sprints to return
62 |
63 | Returns:
64 | List of JiraSprint
65 | """
66 | sprints = self.get_all_sprints_from_board(
67 | board_id=board_id,
68 | state=state,
69 | start=start,
70 | limit=limit,
71 | )
72 | return [JiraSprint.from_api_response(sprint) for sprint in sprints]
73 |
74 | def update_sprint(
75 | self,
76 | sprint_id: str,
77 | sprint_name: str | None,
78 | state: str | None,
79 | start_date: str | None,
80 | end_date: str | None,
81 | goal: str | None,
82 | ) -> JiraSprint | None:
83 | """
84 | Update a sprint.
85 |
86 | Args:
87 | sprint_id: Sprint ID
88 | sprint_name: New name for the sprint (optional)
89 | state: New state for the sprint (future|active|closed - optional)
90 | start_date: New start date for the sprint (optional)
91 | end_date: New end date for the sprint (optional)
92 | goal: New goal for the sprint (optional)
93 |
94 | Returns:
95 | Updated sprint
96 | """
97 | data = {}
98 | if sprint_name:
99 | data["name"] = sprint_name
100 | if state and state not in ["future", "active", "closed"]:
101 | logger.warning("Invalid state. Valid states are: future, active, closed.")
102 | return None
103 | elif state:
104 | data["state"] = state
105 | if start_date:
106 | data["startDate"] = start_date
107 | if end_date:
108 | data["endDate"] = end_date
109 | if goal:
110 | data["goal"] = goal
111 | if not sprint_id:
112 | logger.warning("Sprint ID is required.")
113 | return None
114 | try:
115 | updated_sprint = self.jira.update_partially_sprint(
116 | sprint_id=sprint_id,
117 | data=data,
118 | )
119 |
120 | if not isinstance(updated_sprint, dict):
121 | msg = f"Unexpected return value type from `SprintMixin.update_sprint`: {type(updated_sprint)}"
122 | logger.error(msg)
123 | raise TypeError(msg)
124 |
125 | return JiraSprint.from_api_response(updated_sprint)
126 | except requests.HTTPError as e:
127 | logger.error(f"Error updating sprint: {str(e.response.content)}")
128 | return None
129 | except Exception as e:
130 | logger.error(f"Error updating sprint: {str(e)}")
131 | return None
132 |
133 | def create_sprint(
134 | self,
135 | board_id: str,
136 | sprint_name: str,
137 | start_date: str,
138 | end_date: str,
139 | goal: str | None = None,
140 | ) -> JiraSprint:
141 | """
142 | Create a new sprint.
143 |
144 | Args:
145 | board_id: Board ID
146 | sprint_name: Sprint name
147 | start_date: Start date in ISO format
148 | end_date: End date in ISO format
149 | goal: Sprint goal
150 |
151 | Returns:
152 | Created sprint details
153 | """
154 |
155 | if not start_date:
156 | raise ValueError("Start date is required.")
157 |
158 | # validate start date format
159 | parsed_start_date = parse_date(start_date)
160 |
161 | if parsed_start_date is None:
162 | raise ValueError("Start date is required.")
163 |
164 | # validate start date is not in the past
165 | if parsed_start_date < datetime.datetime.now(datetime.timezone.utc):
166 | raise ValueError("Start date cannot be in the past.")
167 |
168 | # validate end date format
169 | if end_date:
170 | parsed_end_date = parse_date(end_date)
171 | if parsed_end_date is not None and parsed_start_date >= parsed_end_date:
172 | raise ValueError("Start date must be before end date.")
173 |
174 | try:
175 | sprint = self.jira.create_sprint(
176 | name=sprint_name,
177 | board_id=board_id,
178 | start_date=start_date,
179 | end_date=end_date,
180 | goal=goal,
181 | )
182 |
183 | logger.info(f"Sprint created: {sprint}")
184 |
185 | if not isinstance(sprint, dict):
186 | msg = f"Unexpected return value type from `SprintMixin.create_sprint`: {type(sprint)}"
187 | logger.error(msg)
188 | raise TypeError(msg)
189 |
190 | return JiraSprint.from_api_response(sprint)
191 |
192 | except requests.HTTPError as e:
193 | logger.error(f"Error creating sprint: {str(e.response.content)}")
194 | raise
195 | except Exception as e:
196 | logger.error(f"Error creating sprint: {str(e)}")
197 | raise
198 |
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_ssl.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the SSL utilities module."""
2 |
3 | import ssl
4 | from unittest.mock import MagicMock, patch
5 |
6 | from requests.adapters import HTTPAdapter
7 | from requests.sessions import Session
8 |
9 | from mcp_atlassian.utils.ssl import SSLIgnoreAdapter, configure_ssl_verification
10 |
11 |
12 | def test_ssl_ignore_adapter_cert_verify():
13 | """Test that SSLIgnoreAdapter overrides cert verification."""
14 | # Arrange
15 | adapter = SSLIgnoreAdapter()
16 | connection = MagicMock()
17 | url = "https://example.com"
18 | cert = None
19 |
20 | # Mock the super class's cert_verify method
21 | with patch.object(HTTPAdapter, "cert_verify") as mock_super_cert_verify:
22 | # Act
23 | adapter.cert_verify(
24 | connection, url, verify=True, cert=cert
25 | ) # Pass True, but expect False to be used
26 |
27 | # Assert
28 | mock_super_cert_verify.assert_called_once_with(
29 | connection, url, verify=False, cert=cert
30 | )
31 |
32 |
33 | def test_ssl_ignore_adapter_init_poolmanager():
34 | """Test that SSLIgnoreAdapter properly initializes the connection pool with SSL verification disabled."""
35 | # Arrange
36 | adapter = SSLIgnoreAdapter()
37 |
38 | # Create a mock for PoolManager that will be returned by constructor
39 | mock_pool_manager = MagicMock()
40 |
41 | # Mock ssl.create_default_context
42 | with patch("ssl.create_default_context") as mock_create_context:
43 | mock_context = MagicMock()
44 | mock_create_context.return_value = mock_context
45 |
46 | # Patch the PoolManager constructor
47 | with patch(
48 | "mcp_atlassian.utils.ssl.PoolManager", return_value=mock_pool_manager
49 | ) as mock_pool_manager_cls:
50 | # Act
51 | adapter.init_poolmanager(5, 10, block=True)
52 |
53 | # Assert
54 | mock_create_context.assert_called_once()
55 | assert mock_context.check_hostname is False
56 | assert mock_context.verify_mode == ssl.CERT_NONE
57 |
58 | # Verify PoolManager was called with our context
59 | mock_pool_manager_cls.assert_called_once()
60 | _, kwargs = mock_pool_manager_cls.call_args
61 | assert kwargs["num_pools"] == 5
62 | assert kwargs["maxsize"] == 10
63 | assert kwargs["block"] is True
64 | assert kwargs["ssl_context"] == mock_context
65 |
66 |
67 | def test_configure_ssl_verification_disabled():
68 | """Test configure_ssl_verification when SSL verification is disabled."""
69 | # Arrange
70 | service_name = "TestService"
71 | url = "https://test.example.com/path"
72 | session = MagicMock() # Use MagicMock instead of actual Session
73 | ssl_verify = False
74 |
75 | # Mock the logger to avoid issues with real logging
76 | with patch("mcp_atlassian.utils.ssl.logger") as mock_logger:
77 | with patch("mcp_atlassian.utils.ssl.SSLIgnoreAdapter") as mock_adapter_class:
78 | mock_adapter = MagicMock()
79 | mock_adapter_class.return_value = mock_adapter
80 |
81 | # Act
82 | configure_ssl_verification(service_name, url, session, ssl_verify)
83 |
84 | # Assert
85 | mock_adapter_class.assert_called_once()
86 | # Verify the adapter is mounted for both http and https
87 | assert session.mount.call_count == 2
88 | session.mount.assert_any_call("https://test.example.com", mock_adapter)
89 | session.mount.assert_any_call("http://test.example.com", mock_adapter)
90 |
91 |
92 | def test_configure_ssl_verification_enabled():
93 | """Test configure_ssl_verification when SSL verification is enabled."""
94 | # Arrange
95 | service_name = "TestService"
96 | url = "https://test.example.com/path"
97 | session = MagicMock() # Use MagicMock instead of actual Session
98 | ssl_verify = True
99 |
100 | with patch("mcp_atlassian.utils.ssl.SSLIgnoreAdapter") as mock_adapter_class:
101 | # Act
102 | configure_ssl_verification(service_name, url, session, ssl_verify)
103 |
104 | # Assert
105 | mock_adapter_class.assert_not_called()
106 | assert session.mount.call_count == 0
107 |
108 |
109 | def test_configure_ssl_verification_enabled_with_real_session():
110 | """Test SSL verification configuration when verification is enabled using a real Session."""
111 | session = Session()
112 | original_adapters_count = len(session.adapters)
113 |
114 | # Configure with SSL verification enabled
115 | configure_ssl_verification(
116 | service_name="Test",
117 | url="https://example.com",
118 | session=session,
119 | ssl_verify=True,
120 | )
121 |
122 | # No adapters should be added when SSL verification is enabled
123 | assert len(session.adapters) == original_adapters_count
124 |
125 |
126 | def test_configure_ssl_verification_disabled_with_real_session():
127 | """Test SSL verification configuration when verification is disabled using a real Session."""
128 | session = Session()
129 | original_adapters_count = len(session.adapters)
130 |
131 | # Mock the logger to avoid issues with real logging
132 | with patch("mcp_atlassian.utils.ssl.logger") as mock_logger:
133 | # Configure with SSL verification disabled
134 | configure_ssl_verification(
135 | service_name="Test",
136 | url="https://example.com",
137 | session=session,
138 | ssl_verify=False,
139 | )
140 |
141 | # Should add custom adapters for http and https protocols
142 | assert len(session.adapters) == original_adapters_count + 2
143 | assert "https://example.com" in session.adapters
144 | assert "http://example.com" in session.adapters
145 | assert isinstance(session.adapters["https://example.com"], SSLIgnoreAdapter)
146 | assert isinstance(session.adapters["http://example.com"], SSLIgnoreAdapter)
147 |
148 |
149 | def test_ssl_ignore_adapter():
150 | """Test the SSLIgnoreAdapter overrides the cert_verify method."""
151 | # Mock objects
152 | adapter = SSLIgnoreAdapter()
153 | conn = MagicMock()
154 | url = "https://example.com"
155 | cert = None
156 |
157 | # Test with verify=True - the adapter should still bypass SSL verification
158 | with patch.object(HTTPAdapter, "cert_verify") as mock_cert_verify:
159 | adapter.cert_verify(conn, url, verify=True, cert=cert)
160 | mock_cert_verify.assert_called_once_with(conn, url, verify=False, cert=cert)
161 |
162 | # Test with verify=False - same behavior
163 | with patch.object(HTTPAdapter, "cert_verify") as mock_cert_verify:
164 | adapter.cert_verify(conn, url, verify=False, cert=cert)
165 | mock_cert_verify.assert_called_once_with(conn, url, verify=False, cert=cert)
166 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_config.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Jira config module."""
2 |
3 | import os
4 | from unittest.mock import patch
5 |
6 | import pytest
7 |
8 | from mcp_atlassian.jira.config import JiraConfig
9 |
10 |
11 | def test_from_env_basic_auth():
12 | """Test that from_env correctly loads basic auth configuration."""
13 | with patch.dict(
14 | os.environ,
15 | {
16 | "JIRA_URL": "https://test.atlassian.net",
17 | "JIRA_USERNAME": "test_username",
18 | "JIRA_API_TOKEN": "test_token",
19 | },
20 | clear=True,
21 | ):
22 | config = JiraConfig.from_env()
23 | assert config.url == "https://test.atlassian.net"
24 | assert config.auth_type == "basic"
25 | assert config.username == "test_username"
26 | assert config.api_token == "test_token"
27 | assert config.personal_token is None
28 | assert config.ssl_verify is True
29 |
30 |
31 | def test_from_env_token_auth():
32 | """Test that from_env correctly loads token auth configuration."""
33 | with patch.dict(
34 | os.environ,
35 | {
36 | "JIRA_URL": "https://jira.example.com",
37 | "JIRA_PERSONAL_TOKEN": "test_personal_token",
38 | "JIRA_SSL_VERIFY": "false",
39 | },
40 | clear=True,
41 | ):
42 | config = JiraConfig.from_env()
43 | assert config.url == "https://jira.example.com"
44 | assert config.auth_type == "pat"
45 | assert config.username is None
46 | assert config.api_token is None
47 | assert config.personal_token == "test_personal_token"
48 | assert config.ssl_verify is False
49 |
50 |
51 | def test_from_env_missing_url():
52 | """Test that from_env raises ValueError when URL is missing."""
53 | original_env = os.environ.copy()
54 | try:
55 | os.environ.clear()
56 | with pytest.raises(
57 | ValueError, match="Missing required JIRA_URL environment variable"
58 | ):
59 | JiraConfig.from_env()
60 | finally:
61 | # Restore original environment
62 | os.environ.clear()
63 | os.environ.update(original_env)
64 |
65 |
66 | def test_from_env_missing_cloud_auth():
67 | """Test that from_env raises ValueError when cloud auth credentials are missing."""
68 | with patch.dict(
69 | os.environ,
70 | {
71 | "JIRA_URL": "https://test.atlassian.net", # Cloud URL
72 | },
73 | clear=True,
74 | ):
75 | with pytest.raises(
76 | ValueError,
77 | match="Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN",
78 | ):
79 | JiraConfig.from_env()
80 |
81 |
82 | def test_from_env_missing_server_auth():
83 | """Test that from_env raises ValueError when server auth credentials are missing."""
84 | with patch.dict(
85 | os.environ,
86 | {
87 | "JIRA_URL": "https://jira.example.com", # Server URL
88 | },
89 | clear=True,
90 | ):
91 | with pytest.raises(
92 | ValueError,
93 | match="Server/Data Center authentication requires JIRA_PERSONAL_TOKEN",
94 | ):
95 | JiraConfig.from_env()
96 |
97 |
98 | def test_is_cloud():
99 | """Test that is_cloud property returns correct value."""
100 | # Arrange & Act - Cloud URL
101 | config = JiraConfig(
102 | url="https://example.atlassian.net",
103 | auth_type="basic",
104 | username="test",
105 | api_token="test",
106 | )
107 |
108 | # Assert
109 | assert config.is_cloud is True
110 |
111 | # Arrange & Act - Server URL
112 | config = JiraConfig(
113 | url="https://jira.example.com",
114 | auth_type="pat",
115 | personal_token="test",
116 | )
117 |
118 | # Assert
119 | assert config.is_cloud is False
120 |
121 | # Arrange & Act - Localhost URL (Data Center/Server)
122 | config = JiraConfig(
123 | url="http://localhost:8080",
124 | auth_type="pat",
125 | personal_token="test",
126 | )
127 |
128 | # Assert
129 | assert config.is_cloud is False
130 |
131 | # Arrange & Act - IP localhost URL (Data Center/Server)
132 | config = JiraConfig(
133 | url="http://127.0.0.1:8080",
134 | auth_type="pat",
135 | personal_token="test",
136 | )
137 |
138 | # Assert
139 | assert config.is_cloud is False
140 |
141 |
142 | def test_from_env_proxy_settings():
143 | """Test that from_env correctly loads proxy environment variables."""
144 | with patch.dict(
145 | os.environ,
146 | {
147 | "JIRA_URL": "https://test.atlassian.net",
148 | "JIRA_USERNAME": "test_username",
149 | "JIRA_API_TOKEN": "test_token",
150 | "HTTP_PROXY": "http://proxy.example.com:8080",
151 | "HTTPS_PROXY": "https://proxy.example.com:8443",
152 | "SOCKS_PROXY": "socks5://user:[email protected]:1080",
153 | "NO_PROXY": "localhost,127.0.0.1",
154 | },
155 | clear=True,
156 | ):
157 | config = JiraConfig.from_env()
158 | assert config.http_proxy == "http://proxy.example.com:8080"
159 | assert config.https_proxy == "https://proxy.example.com:8443"
160 | assert config.socks_proxy == "socks5://user:[email protected]:1080"
161 | assert config.no_proxy == "localhost,127.0.0.1"
162 |
163 | # Service-specific overrides
164 | with patch.dict(
165 | os.environ,
166 | {
167 | "JIRA_URL": "https://test.atlassian.net",
168 | "JIRA_USERNAME": "test_username",
169 | "JIRA_API_TOKEN": "test_token",
170 | "JIRA_HTTP_PROXY": "http://jira-proxy.example.com:8080",
171 | "JIRA_HTTPS_PROXY": "https://jira-proxy.example.com:8443",
172 | "JIRA_SOCKS_PROXY": "socks5://user:[email protected]:1080",
173 | "JIRA_NO_PROXY": "localhost,127.0.0.1,.internal.example.com",
174 | },
175 | clear=True,
176 | ):
177 | config = JiraConfig.from_env()
178 | assert config.http_proxy == "http://jira-proxy.example.com:8080"
179 | assert config.https_proxy == "https://jira-proxy.example.com:8443"
180 | assert config.socks_proxy == "socks5://user:[email protected]:1080"
181 | assert config.no_proxy == "localhost,127.0.0.1,.internal.example.com"
182 |
183 |
184 | def test_is_cloud_oauth_with_cloud_id():
185 | """Test that is_cloud returns True for OAuth with cloud_id regardless of URL."""
186 | from mcp_atlassian.utils.oauth import BYOAccessTokenOAuthConfig
187 |
188 | # OAuth with cloud_id and no URL - should be Cloud
189 | oauth_config = BYOAccessTokenOAuthConfig(
190 | cloud_id="test-cloud-id", access_token="test-token"
191 | )
192 | config = JiraConfig(
193 | url=None, # URL can be None in Multi-Cloud OAuth mode
194 | auth_type="oauth",
195 | oauth_config=oauth_config,
196 | )
197 | assert config.is_cloud is True
198 |
199 | # OAuth with cloud_id and server URL - should still be Cloud
200 | config = JiraConfig(
201 | url="https://jira.example.com", # Server-like URL
202 | auth_type="oauth",
203 | oauth_config=oauth_config,
204 | )
205 | assert config.is_cloud is True
206 |
```
--------------------------------------------------------------------------------
/tests/utils/mocks.py:
--------------------------------------------------------------------------------
```python
1 | """Reusable mock utilities and fixtures for MCP Atlassian tests."""
2 |
3 | import os
4 | from contextlib import contextmanager
5 | from typing import Any
6 | from unittest.mock import MagicMock, patch
7 |
8 | from .factories import AuthConfigFactory, ConfluencePageFactory, JiraIssueFactory
9 |
10 |
11 | class MockEnvironment:
12 | """Utility for mocking environment variables."""
13 |
14 | @staticmethod
15 | @contextmanager
16 | def oauth_env():
17 | """Context manager for OAuth environment variables."""
18 | oauth_vars = AuthConfigFactory.create_oauth_config()
19 | env_vars = {
20 | "ATLASSIAN_OAUTH_CLIENT_ID": oauth_vars["client_id"],
21 | "ATLASSIAN_OAUTH_CLIENT_SECRET": oauth_vars["client_secret"],
22 | "ATLASSIAN_OAUTH_REDIRECT_URI": oauth_vars["redirect_uri"],
23 | "ATLASSIAN_OAUTH_SCOPE": oauth_vars["scope"],
24 | "ATLASSIAN_OAUTH_CLOUD_ID": oauth_vars["cloud_id"],
25 | }
26 | with patch.dict(os.environ, env_vars, clear=False):
27 | yield env_vars
28 |
29 | @staticmethod
30 | @contextmanager
31 | def basic_auth_env():
32 | """Context manager for basic auth environment variables."""
33 | auth_config = AuthConfigFactory.create_basic_auth_config()
34 | env_vars = {
35 | "JIRA_URL": auth_config["url"],
36 | "JIRA_USERNAME": auth_config["username"],
37 | "JIRA_API_TOKEN": auth_config["api_token"],
38 | "CONFLUENCE_URL": f"{auth_config['url']}/wiki",
39 | "CONFLUENCE_USERNAME": auth_config["username"],
40 | "CONFLUENCE_API_TOKEN": auth_config["api_token"],
41 | }
42 | with patch.dict(os.environ, env_vars, clear=False):
43 | yield env_vars
44 |
45 | @staticmethod
46 | @contextmanager
47 | def clean_env():
48 | """Context manager with no authentication environment variables."""
49 | auth_vars = [
50 | "JIRA_URL",
51 | "JIRA_USERNAME",
52 | "JIRA_API_TOKEN",
53 | "CONFLUENCE_URL",
54 | "CONFLUENCE_USERNAME",
55 | "CONFLUENCE_API_TOKEN",
56 | "ATLASSIAN_OAUTH_CLIENT_ID",
57 | "ATLASSIAN_OAUTH_CLIENT_SECRET",
58 | "ATLASSIAN_OAUTH_REDIRECT_URI",
59 | "ATLASSIAN_OAUTH_SCOPE",
60 | "ATLASSIAN_OAUTH_CLOUD_ID",
61 | "ATLASSIAN_OAUTH_ENABLE",
62 | ]
63 |
64 | # Remove auth vars from environment
65 | with patch.dict(os.environ, {}, clear=False) as env_dict:
66 | for var in auth_vars:
67 | env_dict.pop(var, None)
68 | yield env_dict
69 |
70 |
71 | class MockAtlassianClient:
72 | """Factory for creating mock Atlassian clients."""
73 |
74 | @staticmethod
75 | def create_jira_client(**response_overrides):
76 | """Create a mock Jira client with common responses."""
77 | client = MagicMock()
78 |
79 | # Default responses
80 | default_responses = {
81 | "issue": JiraIssueFactory.create(),
82 | "search_issues": {
83 | "issues": [
84 | JiraIssueFactory.create("TEST-1"),
85 | JiraIssueFactory.create("TEST-2"),
86 | ],
87 | "total": 2,
88 | },
89 | "projects": [{"key": "TEST", "name": "Test Project"}],
90 | "fields": [{"id": "summary", "name": "Summary"}],
91 | }
92 |
93 | # Merge with overrides
94 | responses = {**default_responses, **response_overrides}
95 |
96 | # Set up mock methods
97 | client.issue.return_value = responses["issue"]
98 | client.search_issues.return_value = responses["search_issues"]
99 | client.projects.return_value = responses["projects"]
100 | client.fields.return_value = responses["fields"]
101 |
102 | return client
103 |
104 | @staticmethod
105 | def create_confluence_client(**response_overrides):
106 | """Create a mock Confluence client with common responses."""
107 | client = MagicMock()
108 |
109 | # Default responses
110 | default_responses = {
111 | "get_page_by_id": ConfluencePageFactory.create(),
112 | "get_all_pages_from_space": {
113 | "results": [
114 | ConfluencePageFactory.create("123"),
115 | ConfluencePageFactory.create("456"),
116 | ]
117 | },
118 | "get_all_spaces": {"results": [{"key": "TEST", "name": "Test Space"}]},
119 | }
120 |
121 | # Merge with overrides
122 | responses = {**default_responses, **response_overrides}
123 |
124 | # Set up mock methods
125 | client.get_page_by_id.return_value = responses["get_page_by_id"]
126 | client.get_all_pages_from_space.return_value = responses[
127 | "get_all_pages_from_space"
128 | ]
129 | client.get_all_spaces.return_value = responses["get_all_spaces"]
130 |
131 | return client
132 |
133 |
134 | class MockOAuthServer:
135 | """Utility for mocking OAuth server interactions."""
136 |
137 | @staticmethod
138 | @contextmanager
139 | def mock_oauth_flow():
140 | """Context manager for mocking complete OAuth flow."""
141 | with (
142 | patch("http.server.HTTPServer") as mock_server,
143 | patch("webbrowser.open") as mock_browser,
144 | patch("secrets.token_urlsafe") as mock_token,
145 | ):
146 | # Configure mocks
147 | mock_token.return_value = "test-state-token"
148 | mock_server_instance = MagicMock()
149 | mock_server.return_value = mock_server_instance
150 |
151 | yield {
152 | "server": mock_server,
153 | "server_instance": mock_server_instance,
154 | "browser": mock_browser,
155 | "token": mock_token,
156 | }
157 |
158 |
159 | class MockFastMCP:
160 | """Utility for mocking FastMCP components."""
161 |
162 | @staticmethod
163 | def create_request(state_data: dict[str, Any] | None = None):
164 | """Create a mock FastMCP request."""
165 | request = MagicMock()
166 | request.state = MagicMock()
167 |
168 | if state_data:
169 | for key, value in state_data.items():
170 | setattr(request.state, key, value)
171 |
172 | return request
173 |
174 | @staticmethod
175 | def create_context():
176 | """Create a mock FastMCP context."""
177 | return MagicMock()
178 |
179 |
180 | class MockPreprocessor:
181 | """Utility for mocking content preprocessors."""
182 |
183 | @staticmethod
184 | def create_html_to_markdown():
185 | """Create a mock HTML to Markdown preprocessor."""
186 | preprocessor = MagicMock()
187 | preprocessor.process.return_value = "# Markdown Content"
188 | return preprocessor
189 |
190 | @staticmethod
191 | def create_markdown_to_html():
192 | """Create a mock Markdown to HTML preprocessor."""
193 | preprocessor = MagicMock()
194 | preprocessor.process.return_value = "<h1>HTML Content</h1>"
195 | return preprocessor
196 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_links.py:
--------------------------------------------------------------------------------
```python
1 | from unittest.mock import MagicMock, Mock, patch
2 |
3 | import pytest
4 | from requests.exceptions import HTTPError
5 |
6 | from mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
7 | from mcp_atlassian.jira.links import LinksMixin
8 | from mcp_atlassian.models.jira import JiraIssueLinkType
9 |
10 |
11 | class TestLinksMixin:
12 | @pytest.fixture
13 | def links_mixin(self, mock_config, mock_atlassian_jira):
14 | mixin = LinksMixin(config=mock_config)
15 | mixin.jira = mock_atlassian_jira
16 | return mixin
17 |
18 | def test_get_issue_link_types_success(self, links_mixin):
19 | """Test successful retrieval of issue link types."""
20 | mock_response = {
21 | "issueLinkTypes": [
22 | {
23 | "id": "10000",
24 | "name": "Blocks",
25 | "inward": "is blocked by",
26 | "outward": "blocks",
27 | },
28 | {
29 | "id": "10001",
30 | "name": "Duplicate",
31 | "inward": "is duplicated by",
32 | "outward": "duplicates",
33 | },
34 | ]
35 | }
36 | links_mixin.jira.get.return_value = mock_response
37 |
38 | def fake_from_api_response(data):
39 | mock = MagicMock()
40 | mock.name = data["name"]
41 | return mock
42 |
43 | with patch.object(
44 | JiraIssueLinkType, "from_api_response", side_effect=fake_from_api_response
45 | ):
46 | result = links_mixin.get_issue_link_types()
47 |
48 | assert len(result) == 2
49 | assert result[0].name == "Blocks"
50 | assert result[1].name == "Duplicate"
51 | links_mixin.jira.get.assert_called_once_with("rest/api/2/issueLinkType")
52 |
53 | def test_get_issue_link_types_authentication_error(self, links_mixin):
54 | links_mixin.jira.get.side_effect = HTTPError(response=Mock(status_code=401))
55 |
56 | with pytest.raises(MCPAtlassianAuthenticationError):
57 | links_mixin.get_issue_link_types()
58 |
59 | def test_get_issue_link_types_generic_error(self, links_mixin):
60 | links_mixin.jira.get.side_effect = Exception("Unexpected error")
61 |
62 | with pytest.raises(Exception, match="Unexpected error"):
63 | links_mixin.get_issue_link_types()
64 |
65 | def test_create_issue_link_success(self, links_mixin):
66 | data = {
67 | "type": {"name": "Blocks"},
68 | "inwardIssue": {"key": "PROJ-123"},
69 | "outwardIssue": {"key": "PROJ-456"},
70 | }
71 |
72 | response = links_mixin.create_issue_link(data)
73 |
74 | assert response["success"] is True
75 | assert "Link created between PROJ-123 and PROJ-456" in response["message"]
76 | links_mixin.jira.create_issue_link.assert_called_once_with(data)
77 |
78 | def test_create_issue_link_missing_type(self, links_mixin):
79 | data = {
80 | "inwardIssue": {"key": "PROJ-123"},
81 | "outwardIssue": {"key": "PROJ-456"},
82 | }
83 |
84 | with pytest.raises(ValueError, match="Link type is required"):
85 | links_mixin.create_issue_link(data)
86 |
87 | def test_create_issue_link_authentication_error(self, links_mixin):
88 | data = {
89 | "type": {"name": "Blocks"},
90 | "inwardIssue": {"key": "PROJ-123"},
91 | "outwardIssue": {"key": "PROJ-456"},
92 | }
93 | links_mixin.jira.create_issue_link.side_effect = HTTPError(
94 | response=Mock(status_code=401)
95 | )
96 |
97 | with pytest.raises(MCPAtlassianAuthenticationError):
98 | links_mixin.create_issue_link(data)
99 |
100 | def test_create_remote_issue_link_success(self, links_mixin):
101 | issue_key = "PROJ-123"
102 | link_data = {
103 | "object": {
104 | "url": "https://example.com/page",
105 | "title": "Example Page",
106 | "summary": "A test page",
107 | },
108 | "relationship": "documentation",
109 | }
110 |
111 | response = links_mixin.create_remote_issue_link(issue_key, link_data)
112 |
113 | assert response["success"] is True
114 | assert response["issue_key"] == issue_key
115 | assert response["link_title"] == "Example Page"
116 | assert response["link_url"] == "https://example.com/page"
117 | assert response["relationship"] == "documentation"
118 | links_mixin.jira.post.assert_called_once_with(
119 | "rest/api/3/issue/PROJ-123/remotelink", json=link_data
120 | )
121 |
122 | def test_create_remote_issue_link_missing_issue_key(self, links_mixin):
123 | link_data = {
124 | "object": {"url": "https://example.com/page", "title": "Example Page"}
125 | }
126 |
127 | with pytest.raises(ValueError, match="Issue key is required"):
128 | links_mixin.create_remote_issue_link("", link_data)
129 |
130 | def test_create_remote_issue_link_missing_object(self, links_mixin):
131 | issue_key = "PROJ-123"
132 | link_data = {"relationship": "documentation"}
133 |
134 | with pytest.raises(ValueError, match="Link object is required"):
135 | links_mixin.create_remote_issue_link(issue_key, link_data)
136 |
137 | def test_create_remote_issue_link_missing_url(self, links_mixin):
138 | issue_key = "PROJ-123"
139 | link_data = {"object": {"title": "Example Page"}}
140 |
141 | with pytest.raises(ValueError, match="URL is required in link object"):
142 | links_mixin.create_remote_issue_link(issue_key, link_data)
143 |
144 | def test_create_remote_issue_link_missing_title(self, links_mixin):
145 | issue_key = "PROJ-123"
146 | link_data = {"object": {"url": "https://example.com/page"}}
147 |
148 | with pytest.raises(ValueError, match="Title is required in link object"):
149 | links_mixin.create_remote_issue_link(issue_key, link_data)
150 |
151 | def test_create_remote_issue_link_authentication_error(self, links_mixin):
152 | issue_key = "PROJ-123"
153 | link_data = {
154 | "object": {"url": "https://example.com/page", "title": "Example Page"}
155 | }
156 | links_mixin.jira.post.side_effect = HTTPError(response=Mock(status_code=401))
157 |
158 | with pytest.raises(MCPAtlassianAuthenticationError):
159 | links_mixin.create_remote_issue_link(issue_key, link_data)
160 |
161 | def test_remove_issue_link_success(self, links_mixin):
162 | link_id = "10000"
163 |
164 | response = links_mixin.remove_issue_link(link_id)
165 |
166 | assert response["success"] is True
167 | assert f"Link with ID {link_id} has been removed" in response["message"]
168 | links_mixin.jira.remove_issue_link.assert_called_once_with(link_id)
169 |
170 | def test_remove_issue_link_empty_id(self, links_mixin):
171 | with pytest.raises(ValueError, match="Link ID is required"):
172 | links_mixin.remove_issue_link("")
173 |
174 | def test_remove_issue_link_authentication_error(self, links_mixin):
175 | link_id = "10000"
176 | links_mixin.jira.remove_issue_link.side_effect = HTTPError(
177 | response=Mock(status_code=401)
178 | )
179 |
180 | with pytest.raises(MCPAtlassianAuthenticationError):
181 | links_mixin.remove_issue_link(link_id)
182 |
```
--------------------------------------------------------------------------------
/tests/unit/jira/test_protocols.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for Jira protocol definitions."""
2 |
3 | import inspect
4 | from typing import Any, get_type_hints
5 |
6 | from mcp_atlassian.jira.protocols import (
7 | AttachmentsOperationsProto,
8 | UsersOperationsProto,
9 | )
10 | from mcp_atlassian.models.jira import JiraIssue
11 | from mcp_atlassian.models.jira.search import JiraSearchResult
12 |
13 |
14 | class TestProtocolCompliance:
15 | """Tests for protocol compliance checking."""
16 |
17 | def test_compliant_attachments_implementation(self):
18 | """Test compliant attachments implementation."""
19 |
20 | class CompliantAttachments:
21 | def upload_attachments(
22 | self, issue_key: str, file_paths: list[str]
23 | ) -> dict[str, Any]:
24 | return {"uploaded": len(file_paths)}
25 |
26 | instance = CompliantAttachments()
27 | assert hasattr(instance, "upload_attachments")
28 | assert callable(instance.upload_attachments)
29 |
30 | def test_compliant_issue_implementation(self):
31 | """Test compliant issue implementation."""
32 |
33 | class CompliantIssues:
34 | def get_issue(
35 | self,
36 | issue_key: str,
37 | expand: str | None = None,
38 | comment_limit: int | str | None = 10,
39 | fields: str | list[str] | tuple[str, ...] | set[str] | None = (
40 | "summary,description,status,assignee,reporter,labels,"
41 | "priority,created,updated,issuetype"
42 | ),
43 | properties: str | list[str] | None = None,
44 | *,
45 | update_history: bool = True,
46 | ) -> JiraIssue:
47 | return JiraIssue(id="123", key=issue_key, summary="Test Issue")
48 |
49 | instance = CompliantIssues()
50 | assert hasattr(instance, "get_issue")
51 | result = instance.get_issue("TEST-1")
52 | assert isinstance(result, JiraIssue)
53 | assert result.key == "TEST-1"
54 |
55 | def test_compliant_search_implementation(self):
56 | """Test compliant search implementation."""
57 |
58 | class CompliantSearch:
59 | def search_issues(
60 | self,
61 | jql: str,
62 | fields: str | list[str] | tuple[str, ...] | set[str] | None = (
63 | "summary,description,status,assignee,reporter,labels,"
64 | "priority,created,updated,issuetype"
65 | ),
66 | start: int = 0,
67 | limit: int = 50,
68 | expand: str | None = None,
69 | projects_filter: str | None = None,
70 | ) -> JiraSearchResult:
71 | return JiraSearchResult(
72 | total=0, start_at=start, max_results=limit, issues=[]
73 | )
74 |
75 | instance = CompliantSearch()
76 | result = instance.search_issues("project = TEST")
77 | assert isinstance(result, JiraSearchResult)
78 |
79 | def test_runtime_checkable_users_protocol(self):
80 | """Test runtime checking for UsersOperationsProto."""
81 |
82 | class CompliantUsers:
83 | def _get_account_id(self, assignee: str) -> str:
84 | return f"account-id-for-{assignee}"
85 |
86 | class NonCompliantUsers:
87 | pass
88 |
89 | compliant_instance = CompliantUsers()
90 | non_compliant_instance = NonCompliantUsers()
91 |
92 | # Runtime checkable only checks method existence
93 | assert isinstance(compliant_instance, UsersOperationsProto)
94 | assert not isinstance(non_compliant_instance, UsersOperationsProto)
95 |
96 |
97 | class TestProtocolContractValidation:
98 | """Tests for validating protocol contract compliance."""
99 |
100 | def test_method_signature_validation(self):
101 | """Test method signature validation helper."""
102 |
103 | def validate_method_signature(protocol_class, method_name: str, implementation):
104 | """Validate implementation method signature matches protocol."""
105 | protocol_method = getattr(protocol_class, method_name)
106 | impl_method = getattr(implementation, method_name)
107 |
108 | protocol_sig = inspect.signature(protocol_method)
109 | impl_sig = inspect.signature(impl_method)
110 |
111 | # Compare parameter names (excluding 'self')
112 | protocol_params = [p for p in protocol_sig.parameters.keys() if p != "self"]
113 | impl_params = [p for p in impl_sig.parameters.keys() if p != "self"]
114 |
115 | return protocol_params == impl_params
116 |
117 | class TestImplementation:
118 | def upload_attachments(
119 | self, issue_key: str, file_paths: list[str]
120 | ) -> dict[str, Any]:
121 | return {}
122 |
123 | impl = TestImplementation()
124 | assert validate_method_signature(
125 | AttachmentsOperationsProto, "upload_attachments", impl
126 | )
127 |
128 | def test_type_hint_validation(self):
129 | """Test type hint compliance validation."""
130 |
131 | def validate_type_hints(protocol_class, method_name: str, implementation):
132 | """Validate type hints match between protocol and implementation."""
133 | protocol_method = getattr(protocol_class, method_name)
134 | impl_method = getattr(implementation, method_name)
135 |
136 | protocol_hints = get_type_hints(protocol_method)
137 | impl_hints = get_type_hints(impl_method)
138 |
139 | # Check return type
140 | return protocol_hints.get("return") == impl_hints.get("return")
141 |
142 | class TypeCompliantImplementation:
143 | def upload_attachments(
144 | self, issue_key: str, file_paths: list[str]
145 | ) -> dict[str, Any]:
146 | return {}
147 |
148 | impl = TypeCompliantImplementation()
149 | assert validate_type_hints(
150 | AttachmentsOperationsProto, "upload_attachments", impl
151 | )
152 |
153 | def test_structural_compliance_check(self):
154 | """Test structural typing validation."""
155 |
156 | def check_structural_compliance(instance, protocol_class):
157 | """Check if instance structurally complies with protocol."""
158 | abstract_methods = []
159 | for attr_name in dir(protocol_class):
160 | if not attr_name.startswith("__"):
161 | attr = getattr(protocol_class, attr_name, None)
162 | if (
163 | callable(attr)
164 | and hasattr(attr, "__isabstractmethod__")
165 | and attr.__isabstractmethod__
166 | ):
167 | abstract_methods.append(attr_name)
168 |
169 | # Check if instance has all required methods
170 | for method_name in abstract_methods:
171 | if not hasattr(instance, method_name):
172 | return False
173 | if not callable(getattr(instance, method_name)):
174 | return False
175 | return True
176 |
177 | class CompliantImplementation:
178 | def upload_attachments(
179 | self, issue_key: str, file_paths: list[str]
180 | ) -> dict[str, Any]:
181 | return {}
182 |
183 | class NonCompliantImplementation:
184 | def some_other_method(self):
185 | pass
186 |
187 | compliant = CompliantImplementation()
188 | non_compliant = NonCompliantImplementation()
189 |
190 | assert check_structural_compliance(compliant, AttachmentsOperationsProto)
191 | assert not check_structural_compliance(
192 | non_compliant, AttachmentsOperationsProto
193 | )
194 |
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_custom_headers.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for custom headers parsing functionality."""
2 |
3 | from mcp_atlassian.utils.env import get_custom_headers
4 |
5 |
6 | class TestParseCustomHeaders:
7 | """Test the parse_custom_headers function."""
8 |
9 | def test_empty_input(self, monkeypatch):
10 | """Test parse_custom_headers with empty/None inputs."""
11 | # Test unset environment variable
12 | monkeypatch.delenv("TEST_HEADERS", raising=False)
13 | assert get_custom_headers("TEST_HEADERS") == {}
14 |
15 | # Test empty string
16 | monkeypatch.setenv("TEST_HEADERS", "")
17 | assert get_custom_headers("TEST_HEADERS") == {}
18 |
19 | # Test whitespace only
20 | monkeypatch.setenv("TEST_HEADERS", " ")
21 | assert get_custom_headers("TEST_HEADERS") == {}
22 |
23 | monkeypatch.setenv("TEST_HEADERS", "\t\n")
24 | assert get_custom_headers("TEST_HEADERS") == {}
25 |
26 | def test_single_header(self, monkeypatch):
27 | """Test parsing a single header."""
28 | monkeypatch.setenv("TEST_HEADERS", "X-Custom=value123")
29 | result = get_custom_headers("TEST_HEADERS")
30 | assert result == {"X-Custom": "value123"}
31 |
32 | # Test with spaces around key and value
33 | monkeypatch.setenv("TEST_HEADERS", " X-Spaced = value with spaces ")
34 | result = get_custom_headers("TEST_HEADERS")
35 | assert result == {"X-Spaced": "value with spaces"}
36 |
37 | def test_multiple_headers(self, monkeypatch):
38 | """Test parsing multiple comma-separated headers."""
39 | monkeypatch.setenv("TEST_HEADERS", "X-Corp-Auth=token123,X-Dept=engineering")
40 | result = get_custom_headers("TEST_HEADERS")
41 | expected = {"X-Corp-Auth": "token123", "X-Dept": "engineering"}
42 | assert result == expected
43 |
44 | def test_headers_with_spaces(self, monkeypatch):
45 | """Test parsing headers with various spacing."""
46 | monkeypatch.setenv("TEST_HEADERS", " X-Key = value , X-Another = value2 ")
47 | result = get_custom_headers("TEST_HEADERS")
48 | expected = {"X-Key": "value", "X-Another": "value2"}
49 | assert result == expected
50 |
51 | def test_value_with_equals_signs(self, monkeypatch):
52 | """Test parsing headers where values contain equals signs."""
53 | monkeypatch.setenv("TEST_HEADERS", "X-Token=abc=def=123")
54 | result = get_custom_headers("TEST_HEADERS")
55 | assert result == {"X-Token": "abc=def=123"}
56 |
57 | # Multiple headers with equals in values
58 | monkeypatch.setenv(
59 | "TEST_HEADERS", "X-Token=abc=def,X-URL=https://api.example.com/v1?key=value"
60 | )
61 | result = get_custom_headers("TEST_HEADERS")
62 | expected = {
63 | "X-Token": "abc=def",
64 | "X-URL": "https://api.example.com/v1?key=value",
65 | }
66 | assert result == expected
67 |
68 | def test_malformed_headers(self, monkeypatch):
69 | """Test handling of malformed header strings."""
70 | # Header without equals sign - should be skipped
71 | monkeypatch.setenv("TEST_HEADERS", "invalid-header-format")
72 | result = get_custom_headers("TEST_HEADERS")
73 | assert result == {}
74 |
75 | # Mix of valid and invalid headers
76 | monkeypatch.setenv(
77 | "TEST_HEADERS", "X-Valid=value,invalid-header,X-Another=value2"
78 | )
79 | result = get_custom_headers("TEST_HEADERS")
80 | expected = {"X-Valid": "value", "X-Another": "value2"}
81 | assert result == expected
82 |
83 | def test_empty_key_or_value(self, monkeypatch):
84 | """Test handling of empty keys or values."""
85 | # Empty key - should be skipped
86 | monkeypatch.setenv("TEST_HEADERS", "=value")
87 | result = get_custom_headers("TEST_HEADERS")
88 | assert result == {}
89 |
90 | # Empty value - should be included
91 | monkeypatch.setenv("TEST_HEADERS", "X-Empty=")
92 | result = get_custom_headers("TEST_HEADERS")
93 | assert result == {"X-Empty": ""}
94 |
95 | # Whitespace-only key - should be skipped
96 | monkeypatch.setenv("TEST_HEADERS", " =value")
97 | result = get_custom_headers("TEST_HEADERS")
98 | assert result == {}
99 |
100 | # Mix of empty and valid
101 | monkeypatch.setenv("TEST_HEADERS", "=empty_key,X-Valid=value, =another_empty")
102 | result = get_custom_headers("TEST_HEADERS")
103 | assert result == {"X-Valid": "value"}
104 |
105 | def test_special_characters_in_values(self, monkeypatch):
106 | """Test headers with special characters in values."""
107 | monkeypatch.setenv("TEST_HEADERS", "X-Special=!@#$%^&*()_+-[]{}|;':\"/<>?")
108 | result = get_custom_headers("TEST_HEADERS")
109 | assert result == {"X-Special": "!@#$%^&*()_+-[]{}|;':\"/<>?"}
110 |
111 | def test_unicode_characters(self, monkeypatch):
112 | """Test headers with unicode characters."""
113 | monkeypatch.setenv("TEST_HEADERS", "X-Unicode=café,X-Emoji=🚀")
114 | result = get_custom_headers("TEST_HEADERS")
115 | expected = {"X-Unicode": "café", "X-Emoji": "🚀"}
116 | assert result == expected
117 |
118 | def test_empty_pairs_in_list(self, monkeypatch):
119 | """Test handling of empty pairs in comma-separated list."""
120 | # Empty pairs should be skipped
121 | monkeypatch.setenv("TEST_HEADERS", "X-First=value1,,X-Second=value2,")
122 | result = get_custom_headers("TEST_HEADERS")
123 | expected = {"X-First": "value1", "X-Second": "value2"}
124 | assert result == expected
125 |
126 | # Only commas
127 | monkeypatch.setenv("TEST_HEADERS", ",,,")
128 | result = get_custom_headers("TEST_HEADERS")
129 | assert result == {}
130 |
131 | def test_complex_real_world_example(self, monkeypatch):
132 | """Test a complex real-world example."""
133 | headers_string = (
134 | "Authorization=Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9,"
135 | "X-API-Key=sk-1234567890abcdef,"
136 | "X-Request-ID=req_123456789,"
137 | "X-Custom-Header=value with spaces and = signs,"
138 | "User-Agent=MyApp/1.0 (Custom Agent)"
139 | )
140 |
141 | monkeypatch.setenv("TEST_HEADERS", headers_string)
142 | result = get_custom_headers("TEST_HEADERS")
143 | expected = {
144 | "Authorization": "Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9",
145 | "X-API-Key": "sk-1234567890abcdef",
146 | "X-Request-ID": "req_123456789",
147 | "X-Custom-Header": "value with spaces and = signs",
148 | "User-Agent": "MyApp/1.0 (Custom Agent)",
149 | }
150 | assert result == expected
151 |
152 | def test_case_sensitive_keys(self, monkeypatch):
153 | """Test that header keys are case-sensitive."""
154 | monkeypatch.setenv(
155 | "TEST_HEADERS", "x-lower=value1,X-UPPER=value2,X-Mixed=value3"
156 | )
157 | result = get_custom_headers("TEST_HEADERS")
158 | expected = {"x-lower": "value1", "X-UPPER": "value2", "X-Mixed": "value3"}
159 | assert result == expected
160 |
161 | def test_duplicate_keys(self, monkeypatch):
162 | """Test behavior with duplicate keys - later values should override."""
163 | monkeypatch.setenv("TEST_HEADERS", "X-Duplicate=first,X-Duplicate=second")
164 | result = get_custom_headers("TEST_HEADERS")
165 | assert result == {"X-Duplicate": "second"}
166 |
167 | def test_newlines_and_tabs_in_input(self, monkeypatch):
168 | """Test handling of newlines and tabs in input."""
169 | # These should be treated as part of values, not separators
170 | monkeypatch.setenv(
171 | "TEST_HEADERS", "X-Multi=line1\nline2,X-Tab=value\twith\ttabs"
172 | )
173 | result = get_custom_headers("TEST_HEADERS")
174 | expected = {"X-Multi": "line1\nline2", "X-Tab": "value\twith\ttabs"}
175 | assert result == expected
176 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/client.py:
--------------------------------------------------------------------------------
```python
1 | """Base client module for Confluence API interactions."""
2 |
3 | import logging
4 | import os
5 |
6 | from atlassian import Confluence
7 | from requests import Session
8 |
9 | from ..exceptions import MCPAtlassianAuthenticationError
10 | from ..utils.logging import get_masked_session_headers, log_config_param, mask_sensitive
11 | from ..utils.oauth import configure_oauth_session
12 | from ..utils.ssl import configure_ssl_verification
13 | from .config import ConfluenceConfig
14 |
15 | # Configure logging
16 | logger = logging.getLogger("mcp-atlassian")
17 |
18 |
19 | class ConfluenceClient:
20 | """Base client for Confluence API interactions."""
21 |
22 | def __init__(self, config: ConfluenceConfig | None = None) -> None:
23 | """Initialize the Confluence client with given or environment config.
24 |
25 | Args:
26 | config: Configuration for Confluence client. If None, will load from
27 | environment.
28 |
29 | Raises:
30 | ValueError: If configuration is invalid or environment variables are missing
31 | MCPAtlassianAuthenticationError: If OAuth authentication fails
32 | """
33 | self.config = config or ConfluenceConfig.from_env()
34 |
35 | # Initialize the Confluence client based on auth type
36 | if self.config.auth_type == "oauth":
37 | if not self.config.oauth_config or not self.config.oauth_config.cloud_id:
38 | error_msg = "OAuth authentication requires a valid cloud_id"
39 | raise ValueError(error_msg)
40 |
41 | # Create a session for OAuth
42 | session = Session()
43 |
44 | # Configure the session with OAuth authentication
45 | if not configure_oauth_session(session, self.config.oauth_config):
46 | error_msg = "Failed to configure OAuth session"
47 | raise MCPAtlassianAuthenticationError(error_msg)
48 |
49 | # The Confluence API URL with OAuth is different
50 | api_url = f"https://api.atlassian.com/ex/confluence/{self.config.oauth_config.cloud_id}"
51 |
52 | # Initialize Confluence with the session
53 | self.confluence = Confluence(
54 | url=api_url,
55 | session=session,
56 | cloud=True, # OAuth is only for Cloud
57 | verify_ssl=self.config.ssl_verify,
58 | )
59 | elif self.config.auth_type == "pat":
60 | logger.debug(
61 | f"Initializing Confluence client with Token (PAT) auth. "
62 | f"URL: {self.config.url}, "
63 | f"Token (masked): {mask_sensitive(str(self.config.personal_token))}"
64 | )
65 | self.confluence = Confluence(
66 | url=self.config.url,
67 | token=self.config.personal_token,
68 | cloud=self.config.is_cloud,
69 | verify_ssl=self.config.ssl_verify,
70 | )
71 | else: # basic auth
72 | logger.debug(
73 | f"Initializing Confluence client with Basic auth. "
74 | f"URL: {self.config.url}, Username: {self.config.username}, "
75 | f"API Token present: {bool(self.config.api_token)}, "
76 | f"Is Cloud: {self.config.is_cloud}"
77 | )
78 | self.confluence = Confluence(
79 | url=self.config.url,
80 | username=self.config.username,
81 | password=self.config.api_token, # API token is used as password
82 | cloud=self.config.is_cloud,
83 | verify_ssl=self.config.ssl_verify,
84 | )
85 | logger.debug(
86 | f"Confluence client initialized. "
87 | f"Session headers (Authorization masked): "
88 | f"{get_masked_session_headers(dict(self.confluence._session.headers))}"
89 | )
90 |
91 | # Configure SSL verification using the shared utility
92 | configure_ssl_verification(
93 | service_name="Confluence",
94 | url=self.config.url,
95 | session=self.confluence._session,
96 | ssl_verify=self.config.ssl_verify,
97 | )
98 |
99 | # Proxy configuration
100 | proxies = {}
101 | if self.config.http_proxy:
102 | proxies["http"] = self.config.http_proxy
103 | if self.config.https_proxy:
104 | proxies["https"] = self.config.https_proxy
105 | if self.config.socks_proxy:
106 | proxies["socks"] = self.config.socks_proxy
107 | if proxies:
108 | self.confluence._session.proxies.update(proxies)
109 | for k, v in proxies.items():
110 | log_config_param(
111 | logger, "Confluence", f"{k.upper()}_PROXY", v, sensitive=True
112 | )
113 | if self.config.no_proxy and isinstance(self.config.no_proxy, str):
114 | os.environ["NO_PROXY"] = self.config.no_proxy
115 | log_config_param(logger, "Confluence", "NO_PROXY", self.config.no_proxy)
116 |
117 | # Apply custom headers if configured
118 | if self.config.custom_headers:
119 | self._apply_custom_headers()
120 |
121 | # Import here to avoid circular imports
122 | from ..preprocessing.confluence import ConfluencePreprocessor
123 |
124 | self.preprocessor = ConfluencePreprocessor(base_url=self.config.url)
125 |
126 | # Test authentication during initialization (in debug mode only)
127 | if logger.isEnabledFor(logging.DEBUG):
128 | try:
129 | self._validate_authentication()
130 | except MCPAtlassianAuthenticationError:
131 | logger.warning(
132 | "Authentication validation failed during client initialization - "
133 | "continuing anyway"
134 | )
135 |
136 | def _validate_authentication(self) -> None:
137 | """Validate authentication by making a simple API call."""
138 | try:
139 | logger.debug(
140 | "Testing Confluence authentication by making a simple API call..."
141 | )
142 | # Make a simple API call to test authentication
143 | spaces = self.confluence.get_all_spaces(start=0, limit=1)
144 | if spaces is not None:
145 | logger.info(
146 | f"Confluence authentication successful. "
147 | f"API call returned {len(spaces.get('results', []))} spaces."
148 | )
149 | else:
150 | logger.warning(
151 | "Confluence authentication test returned None - "
152 | "this may indicate an issue"
153 | )
154 | except Exception as e:
155 | error_msg = f"Confluence authentication validation failed: {e}"
156 | logger.error(error_msg)
157 | logger.debug(
158 | f"Authentication headers during failure: "
159 | f"{get_masked_session_headers(dict(self.confluence._session.headers))}"
160 | )
161 | raise MCPAtlassianAuthenticationError(error_msg) from e
162 |
163 | def _apply_custom_headers(self) -> None:
164 | """Apply custom headers to the Confluence session."""
165 | if not self.config.custom_headers:
166 | return
167 |
168 | logger.debug(
169 | f"Applying {len(self.config.custom_headers)} custom headers to Confluence session"
170 | )
171 | for header_name, header_value in self.config.custom_headers.items():
172 | self.confluence._session.headers[header_name] = header_value
173 | logger.debug(f"Applied custom header: {header_name}")
174 |
175 | def _process_html_content(
176 | self, html_content: str, space_key: str
177 | ) -> tuple[str, str]:
178 | """Process HTML content into both HTML and markdown formats.
179 |
180 | Args:
181 | html_content: Raw HTML content from Confluence
182 | space_key: The key of the space containing the content
183 |
184 | Returns:
185 | Tuple of (processed_html, processed_markdown)
186 | """
187 | return self.preprocessor.process_html_content(
188 | html_content, space_key, self.confluence
189 | )
190 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/config.py:
--------------------------------------------------------------------------------
```python
1 | """Configuration module for Jira API interactions."""
2 |
3 | import logging
4 | import os
5 | from dataclasses import dataclass
6 | from typing import Literal
7 |
8 | from ..utils.env import get_custom_headers, is_env_ssl_verify
9 | from ..utils.oauth import (
10 | BYOAccessTokenOAuthConfig,
11 | OAuthConfig,
12 | get_oauth_config_from_env,
13 | )
14 | from ..utils.urls import is_atlassian_cloud_url
15 |
16 |
17 | @dataclass
18 | class JiraConfig:
19 | """Jira API configuration.
20 |
21 | Handles authentication for Jira Cloud and Server/Data Center:
22 | - Cloud: username/API token (basic auth) or OAuth 2.0 (3LO)
23 | - Server/DC: personal access token or basic auth
24 | """
25 |
26 | url: str # Base URL for Jira
27 | auth_type: Literal["basic", "pat", "oauth"] # Authentication type
28 | username: str | None = None # Email or username (Cloud)
29 | api_token: str | None = None # API token (Cloud)
30 | personal_token: str | None = None # Personal access token (Server/DC)
31 | oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig | None = None
32 | ssl_verify: bool = True # Whether to verify SSL certificates
33 | projects_filter: str | None = None # List of project keys to filter searches
34 | http_proxy: str | None = None # HTTP proxy URL
35 | https_proxy: str | None = None # HTTPS proxy URL
36 | no_proxy: str | None = None # Comma-separated list of hosts to bypass proxy
37 | socks_proxy: str | None = None # SOCKS proxy URL (optional)
38 | custom_headers: dict[str, str] | None = None # Custom HTTP headers
39 |
40 | @property
41 | def is_cloud(self) -> bool:
42 | """Check if this is a cloud instance.
43 |
44 | Returns:
45 | True if this is a cloud instance (atlassian.net), False otherwise.
46 | Localhost URLs are always considered non-cloud (Server/Data Center).
47 | """
48 | # Multi-Cloud OAuth mode: URL might be None, but we use api.atlassian.com
49 | if (
50 | self.auth_type == "oauth"
51 | and self.oauth_config
52 | and self.oauth_config.cloud_id
53 | ):
54 | # OAuth with cloud_id uses api.atlassian.com which is always Cloud
55 | return True
56 |
57 | # For other auth types, check the URL
58 | return is_atlassian_cloud_url(self.url) if self.url else False
59 |
60 | @property
61 | def verify_ssl(self) -> bool:
62 | """Compatibility property for old code.
63 |
64 | Returns:
65 | The ssl_verify value
66 | """
67 | return self.ssl_verify
68 |
69 | @classmethod
70 | def from_env(cls) -> "JiraConfig":
71 | """Create configuration from environment variables.
72 |
73 | Returns:
74 | JiraConfig with values from environment variables
75 |
76 | Raises:
77 | ValueError: If required environment variables are missing or invalid
78 | """
79 | url = os.getenv("JIRA_URL")
80 | if not url and not os.getenv("ATLASSIAN_OAUTH_ENABLE"):
81 | error_msg = "Missing required JIRA_URL environment variable"
82 | raise ValueError(error_msg)
83 |
84 | # Determine authentication type based on available environment variables
85 | username = os.getenv("JIRA_USERNAME")
86 | api_token = os.getenv("JIRA_API_TOKEN")
87 | personal_token = os.getenv("JIRA_PERSONAL_TOKEN")
88 |
89 | # Check for OAuth configuration
90 | oauth_config = get_oauth_config_from_env()
91 | auth_type = None
92 |
93 | # Use the shared utility function directly
94 | is_cloud = is_atlassian_cloud_url(url)
95 |
96 | if oauth_config:
97 | # OAuth is available - could be full config or minimal config for user-provided tokens
98 | auth_type = "oauth"
99 | elif is_cloud:
100 | if username and api_token:
101 | auth_type = "basic"
102 | else:
103 | error_msg = "Cloud authentication requires JIRA_USERNAME and JIRA_API_TOKEN, or OAuth configuration (set ATLASSIAN_OAUTH_ENABLE=true for user-provided tokens)"
104 | raise ValueError(error_msg)
105 | else: # Server/Data Center
106 | if personal_token:
107 | auth_type = "pat"
108 | elif username and api_token:
109 | # Allow basic auth for Server/DC too
110 | auth_type = "basic"
111 | else:
112 | error_msg = "Server/Data Center authentication requires JIRA_PERSONAL_TOKEN or JIRA_USERNAME and JIRA_API_TOKEN"
113 | raise ValueError(error_msg)
114 |
115 | # SSL verification (for Server/DC)
116 | ssl_verify = is_env_ssl_verify("JIRA_SSL_VERIFY")
117 |
118 | # Get the projects filter if provided
119 | projects_filter = os.getenv("JIRA_PROJECTS_FILTER")
120 |
121 | # Proxy settings
122 | http_proxy = os.getenv("JIRA_HTTP_PROXY", os.getenv("HTTP_PROXY"))
123 | https_proxy = os.getenv("JIRA_HTTPS_PROXY", os.getenv("HTTPS_PROXY"))
124 | no_proxy = os.getenv("JIRA_NO_PROXY", os.getenv("NO_PROXY"))
125 | socks_proxy = os.getenv("JIRA_SOCKS_PROXY", os.getenv("SOCKS_PROXY"))
126 |
127 | # Custom headers - service-specific only
128 | custom_headers = get_custom_headers("JIRA_CUSTOM_HEADERS")
129 |
130 | return cls(
131 | url=url,
132 | auth_type=auth_type,
133 | username=username,
134 | api_token=api_token,
135 | personal_token=personal_token,
136 | oauth_config=oauth_config,
137 | ssl_verify=ssl_verify,
138 | projects_filter=projects_filter,
139 | http_proxy=http_proxy,
140 | https_proxy=https_proxy,
141 | no_proxy=no_proxy,
142 | socks_proxy=socks_proxy,
143 | custom_headers=custom_headers,
144 | )
145 |
146 | def is_auth_configured(self) -> bool:
147 | """Check if the current authentication configuration is complete and valid for making API calls.
148 |
149 | Returns:
150 | bool: True if authentication is fully configured, False otherwise.
151 | """
152 | logger = logging.getLogger("mcp-atlassian.jira.config")
153 | if self.auth_type == "oauth":
154 | # Handle different OAuth configuration types
155 | if self.oauth_config:
156 | # Full OAuth configuration (traditional mode)
157 | if isinstance(self.oauth_config, OAuthConfig):
158 | if (
159 | self.oauth_config.client_id
160 | and self.oauth_config.client_secret
161 | and self.oauth_config.redirect_uri
162 | and self.oauth_config.scope
163 | and self.oauth_config.cloud_id
164 | ):
165 | return True
166 | # Minimal OAuth configuration (user-provided tokens mode)
167 | # This is valid if we have oauth_config but missing client credentials
168 | # In this case, we expect authentication to come from user-provided headers
169 | elif (
170 | not self.oauth_config.client_id
171 | and not self.oauth_config.client_secret
172 | ):
173 | logger.debug(
174 | "Minimal OAuth config detected - expecting user-provided tokens via headers"
175 | )
176 | return True
177 | # Bring Your Own Access Token mode
178 | elif isinstance(self.oauth_config, BYOAccessTokenOAuthConfig):
179 | if self.oauth_config.cloud_id and self.oauth_config.access_token:
180 | return True
181 |
182 | # Partial configuration is invalid
183 | logger.warning("Incomplete OAuth configuration detected")
184 | return False
185 | elif self.auth_type == "pat":
186 | return bool(self.personal_token)
187 | elif self.auth_type == "basic":
188 | return bool(self.username and self.api_token)
189 | logger.warning(
190 | f"Unknown or unsupported auth_type: {self.auth_type} in JiraConfig"
191 | )
192 | return False
193 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/confluence/config.py:
--------------------------------------------------------------------------------
```python
1 | """Configuration module for the Confluence client."""
2 |
3 | import logging
4 | import os
5 | from dataclasses import dataclass
6 | from typing import Literal
7 |
8 | from ..utils.env import get_custom_headers, is_env_ssl_verify
9 | from ..utils.oauth import (
10 | BYOAccessTokenOAuthConfig,
11 | OAuthConfig,
12 | get_oauth_config_from_env,
13 | )
14 | from ..utils.urls import is_atlassian_cloud_url
15 |
16 |
17 | @dataclass
18 | class ConfluenceConfig:
19 | """Confluence API configuration.
20 |
21 | Handles authentication for Confluence Cloud and Server/Data Center:
22 | - Cloud: username/API token (basic auth) or OAuth 2.0 (3LO)
23 | - Server/DC: personal access token or basic auth
24 | """
25 |
26 | url: str # Base URL for Confluence
27 | auth_type: Literal["basic", "pat", "oauth"] # Authentication type
28 | username: str | None = None # Email or username
29 | api_token: str | None = None # API token used as password
30 | personal_token: str | None = None # Personal access token (Server/DC)
31 | oauth_config: OAuthConfig | BYOAccessTokenOAuthConfig | None = None
32 | ssl_verify: bool = True # Whether to verify SSL certificates
33 | spaces_filter: str | None = None # List of space keys to filter searches
34 | http_proxy: str | None = None # HTTP proxy URL
35 | https_proxy: str | None = None # HTTPS proxy URL
36 | no_proxy: str | None = None # Comma-separated list of hosts to bypass proxy
37 | socks_proxy: str | None = None # SOCKS proxy URL (optional)
38 | custom_headers: dict[str, str] | None = None # Custom HTTP headers
39 |
40 | @property
41 | def is_cloud(self) -> bool:
42 | """Check if this is a cloud instance.
43 |
44 | Returns:
45 | True if this is a cloud instance (atlassian.net), False otherwise.
46 | Localhost URLs are always considered non-cloud (Server/Data Center).
47 | """
48 | # Multi-Cloud OAuth mode: URL might be None, but we use api.atlassian.com
49 | if (
50 | self.auth_type == "oauth"
51 | and self.oauth_config
52 | and self.oauth_config.cloud_id
53 | ):
54 | # OAuth with cloud_id uses api.atlassian.com which is always Cloud
55 | return True
56 |
57 | # For other auth types, check the URL
58 | return is_atlassian_cloud_url(self.url) if self.url else False
59 |
60 | @property
61 | def verify_ssl(self) -> bool:
62 | """Compatibility property for old code.
63 |
64 | Returns:
65 | The ssl_verify value
66 | """
67 | return self.ssl_verify
68 |
69 | @classmethod
70 | def from_env(cls) -> "ConfluenceConfig":
71 | """Create configuration from environment variables.
72 |
73 | Returns:
74 | ConfluenceConfig with values from environment variables
75 |
76 | Raises:
77 | ValueError: If any required environment variable is missing
78 | """
79 | url = os.getenv("CONFLUENCE_URL")
80 | if not url and not os.getenv("ATLASSIAN_OAUTH_ENABLE"):
81 | error_msg = "Missing required CONFLUENCE_URL environment variable"
82 | raise ValueError(error_msg)
83 |
84 | # Determine authentication type based on available environment variables
85 | username = os.getenv("CONFLUENCE_USERNAME")
86 | api_token = os.getenv("CONFLUENCE_API_TOKEN")
87 | personal_token = os.getenv("CONFLUENCE_PERSONAL_TOKEN")
88 |
89 | # Check for OAuth configuration
90 | oauth_config = get_oauth_config_from_env()
91 | auth_type = None
92 |
93 | # Use the shared utility function directly
94 | is_cloud = is_atlassian_cloud_url(url)
95 |
96 | if oauth_config:
97 | # OAuth is available - could be full config or minimal config for user-provided tokens
98 | auth_type = "oauth"
99 | elif is_cloud:
100 | if username and api_token:
101 | auth_type = "basic"
102 | else:
103 | error_msg = "Cloud authentication requires CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN, or OAuth configuration (set ATLASSIAN_OAUTH_ENABLE=true for user-provided tokens)"
104 | raise ValueError(error_msg)
105 | else: # Server/Data Center
106 | if personal_token:
107 | auth_type = "pat"
108 | elif username and api_token:
109 | # Allow basic auth for Server/DC too
110 | auth_type = "basic"
111 | else:
112 | error_msg = "Server/Data Center authentication requires CONFLUENCE_PERSONAL_TOKEN or CONFLUENCE_USERNAME and CONFLUENCE_API_TOKEN"
113 | raise ValueError(error_msg)
114 |
115 | # SSL verification (for Server/DC)
116 | ssl_verify = is_env_ssl_verify("CONFLUENCE_SSL_VERIFY")
117 |
118 | # Get the spaces filter if provided
119 | spaces_filter = os.getenv("CONFLUENCE_SPACES_FILTER")
120 |
121 | # Proxy settings
122 | http_proxy = os.getenv("CONFLUENCE_HTTP_PROXY", os.getenv("HTTP_PROXY"))
123 | https_proxy = os.getenv("CONFLUENCE_HTTPS_PROXY", os.getenv("HTTPS_PROXY"))
124 | no_proxy = os.getenv("CONFLUENCE_NO_PROXY", os.getenv("NO_PROXY"))
125 | socks_proxy = os.getenv("CONFLUENCE_SOCKS_PROXY", os.getenv("SOCKS_PROXY"))
126 |
127 | # Custom headers - service-specific only
128 | custom_headers = get_custom_headers("CONFLUENCE_CUSTOM_HEADERS")
129 |
130 | return cls(
131 | url=url,
132 | auth_type=auth_type,
133 | username=username,
134 | api_token=api_token,
135 | personal_token=personal_token,
136 | oauth_config=oauth_config,
137 | ssl_verify=ssl_verify,
138 | spaces_filter=spaces_filter,
139 | http_proxy=http_proxy,
140 | https_proxy=https_proxy,
141 | no_proxy=no_proxy,
142 | socks_proxy=socks_proxy,
143 | custom_headers=custom_headers,
144 | )
145 |
146 | def is_auth_configured(self) -> bool:
147 | """Check if the current authentication configuration is complete and valid for making API calls.
148 |
149 | Returns:
150 | bool: True if authentication is fully configured, False otherwise.
151 | """
152 | logger = logging.getLogger("mcp-atlassian.confluence.config")
153 | if self.auth_type == "oauth":
154 | # Handle different OAuth configuration types
155 | if self.oauth_config:
156 | # Full OAuth configuration (traditional mode)
157 | if isinstance(self.oauth_config, OAuthConfig):
158 | if (
159 | self.oauth_config.client_id
160 | and self.oauth_config.client_secret
161 | and self.oauth_config.redirect_uri
162 | and self.oauth_config.scope
163 | and self.oauth_config.cloud_id
164 | ):
165 | return True
166 | # Minimal OAuth configuration (user-provided tokens mode)
167 | # This is valid if we have oauth_config but missing client credentials
168 | # In this case, we expect authentication to come from user-provided headers
169 | elif (
170 | not self.oauth_config.client_id
171 | and not self.oauth_config.client_secret
172 | ):
173 | logger.debug(
174 | "Minimal OAuth config detected - expecting user-provided tokens via headers"
175 | )
176 | return True
177 | # Bring Your Own Access Token mode
178 | elif isinstance(self.oauth_config, BYOAccessTokenOAuthConfig):
179 | if self.oauth_config.cloud_id and self.oauth_config.access_token:
180 | return True
181 |
182 | # Partial configuration is invalid
183 | logger.warning("Incomplete OAuth configuration detected")
184 | return False
185 | elif self.auth_type == "pat":
186 | return bool(self.personal_token)
187 | elif self.auth_type == "basic":
188 | return bool(self.username and self.api_token)
189 | logger.warning(
190 | f"Unknown or unsupported auth_type: {self.auth_type} in ConfluenceConfig"
191 | )
192 | return False
193 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/jira/link.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Jira issue link models.
3 |
4 | This module provides Pydantic models for Jira issue links and link types.
5 | """
6 |
7 | import logging
8 | from typing import Any
9 |
10 | from ..base import ApiModel
11 | from ..constants import EMPTY_STRING, JIRA_DEFAULT_ID, UNKNOWN
12 | from .common import JiraIssueType, JiraPriority, JiraStatus
13 |
14 | logger = logging.getLogger(__name__)
15 |
16 |
17 | class JiraIssueLinkType(ApiModel):
18 | """
19 | Model representing a Jira issue link type.
20 | """
21 |
22 | id: str = JIRA_DEFAULT_ID
23 | name: str = UNKNOWN
24 | inward: str = EMPTY_STRING
25 | outward: str = EMPTY_STRING
26 | self_url: str | None = None
27 |
28 | @classmethod
29 | def from_api_response(
30 | cls, data: dict[str, Any], **kwargs: Any
31 | ) -> "JiraIssueLinkType":
32 | """
33 | Create a JiraIssueLinkType from a Jira API response.
34 |
35 | Args:
36 | data: The issue link type data from the Jira API
37 |
38 | Returns:
39 | A JiraIssueLinkType instance
40 | """
41 | if not data:
42 | return cls()
43 |
44 | if not isinstance(data, dict):
45 | logger.debug("Received non-dictionary data, returning default instance")
46 | return cls()
47 |
48 | link_type_id = data.get("id", JIRA_DEFAULT_ID)
49 | if link_type_id is not None:
50 | link_type_id = str(link_type_id)
51 |
52 | return cls(
53 | id=link_type_id,
54 | name=str(data.get("name", UNKNOWN)),
55 | inward=str(data.get("inward", EMPTY_STRING)),
56 | outward=str(data.get("outward", EMPTY_STRING)),
57 | self_url=data.get("self"),
58 | )
59 |
60 | def to_simplified_dict(self) -> dict[str, Any]:
61 | """Convert to simplified dictionary for API response."""
62 | result = {
63 | "id": self.id,
64 | "name": self.name,
65 | "inward": self.inward,
66 | "outward": self.outward,
67 | }
68 |
69 | if self.self_url:
70 | result["self"] = self.self_url
71 |
72 | return result
73 |
74 |
75 | class JiraLinkedIssueFields(ApiModel):
76 | """
77 | Model representing the fields of a linked issue.
78 | """
79 |
80 | summary: str = EMPTY_STRING
81 | status: JiraStatus | None = None
82 | priority: JiraPriority | None = None
83 | issuetype: JiraIssueType | None = None
84 |
85 | @classmethod
86 | def from_api_response(
87 | cls, data: dict[str, Any], **kwargs: Any
88 | ) -> "JiraLinkedIssueFields":
89 | """
90 | Create a JiraLinkedIssueFields from a Jira API response.
91 |
92 | Args:
93 | data: The linked issue fields data from the Jira API
94 |
95 | Returns:
96 | A JiraLinkedIssueFields instance
97 | """
98 | if not data:
99 | return cls()
100 |
101 | if not isinstance(data, dict):
102 | logger.debug("Received non-dictionary data, returning default instance")
103 | return cls()
104 |
105 | # Extract status data
106 | status = None
107 | status_data = data.get("status")
108 | if status_data:
109 | status = JiraStatus.from_api_response(status_data)
110 |
111 | # Extract priority data
112 | priority = None
113 | priority_data = data.get("priority")
114 | if priority_data:
115 | priority = JiraPriority.from_api_response(priority_data)
116 |
117 | # Extract issue type data
118 | issuetype = None
119 | issuetype_data = data.get("issuetype")
120 | if issuetype_data:
121 | issuetype = JiraIssueType.from_api_response(issuetype_data)
122 |
123 | return cls(
124 | summary=str(data.get("summary", EMPTY_STRING)),
125 | status=status,
126 | priority=priority,
127 | issuetype=issuetype,
128 | )
129 |
130 | def to_simplified_dict(self) -> dict[str, Any]:
131 | """Convert to simplified dictionary for API response."""
132 | result = {
133 | "summary": self.summary,
134 | }
135 |
136 | if self.status:
137 | result["status"] = self.status.to_simplified_dict()
138 |
139 | if self.priority:
140 | result["priority"] = self.priority.to_simplified_dict()
141 |
142 | if self.issuetype:
143 | result["issuetype"] = self.issuetype.to_simplified_dict()
144 |
145 | return result
146 |
147 |
148 | class JiraLinkedIssue(ApiModel):
149 | """
150 | Model representing a linked issue in Jira.
151 | """
152 |
153 | id: str = JIRA_DEFAULT_ID
154 | key: str = EMPTY_STRING
155 | self_url: str | None = None
156 | fields: JiraLinkedIssueFields | None = None
157 |
158 | @classmethod
159 | def from_api_response(
160 | cls, data: dict[str, Any], **kwargs: Any
161 | ) -> "JiraLinkedIssue":
162 | """
163 | Create a JiraLinkedIssue from a Jira API response.
164 |
165 | Args:
166 | data: The linked issue data from the Jira API
167 |
168 | Returns:
169 | A JiraLinkedIssue instance
170 | """
171 | if not data:
172 | return cls()
173 |
174 | if not isinstance(data, dict):
175 | logger.debug("Received non-dictionary data, returning default instance")
176 | return cls()
177 |
178 | # Extract fields data
179 | fields = None
180 | fields_data = data.get("fields")
181 | if fields_data:
182 | fields = JiraLinkedIssueFields.from_api_response(fields_data)
183 |
184 | # Ensure ID is a string
185 | issue_id = data.get("id", JIRA_DEFAULT_ID)
186 | if issue_id is not None:
187 | issue_id = str(issue_id)
188 |
189 | return cls(
190 | id=issue_id,
191 | key=str(data.get("key", EMPTY_STRING)),
192 | self_url=data.get("self"),
193 | fields=fields,
194 | )
195 |
196 | def to_simplified_dict(self) -> dict[str, Any]:
197 | """Convert to simplified dictionary for API response."""
198 | result = {
199 | "id": self.id,
200 | "key": self.key,
201 | }
202 |
203 | if self.self_url:
204 | result["self"] = self.self_url
205 |
206 | if self.fields:
207 | result["fields"] = self.fields.to_simplified_dict()
208 |
209 | return result
210 |
211 |
212 | class JiraIssueLink(ApiModel):
213 | """
214 | Model representing a link between two Jira issues.
215 | """
216 |
217 | id: str = JIRA_DEFAULT_ID
218 | type: JiraIssueLinkType | None = None
219 | inward_issue: JiraLinkedIssue | None = None
220 | outward_issue: JiraLinkedIssue | None = None
221 |
222 | @classmethod
223 | def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "JiraIssueLink":
224 | """
225 | Create a JiraIssueLink from a Jira API response.
226 |
227 | Args:
228 | data: The issue link data from the Jira API
229 |
230 | Returns:
231 | A JiraIssueLink instance
232 | """
233 | if not data:
234 | return cls()
235 |
236 | if not isinstance(data, dict):
237 | logger.debug("Received non-dictionary data, returning default instance")
238 | return cls()
239 |
240 | # Extract link type data
241 | link_type = None
242 | type_data = data.get("type")
243 | if type_data:
244 | link_type = JiraIssueLinkType.from_api_response(type_data)
245 |
246 | # Extract inward issue data
247 | inward_issue = None
248 | inward_issue_data = data.get("inwardIssue")
249 | if inward_issue_data:
250 | inward_issue = JiraLinkedIssue.from_api_response(inward_issue_data)
251 |
252 | # Extract outward issue data
253 | outward_issue = None
254 | outward_issue_data = data.get("outwardIssue")
255 | if outward_issue_data:
256 | outward_issue = JiraLinkedIssue.from_api_response(outward_issue_data)
257 |
258 | # Ensure ID is a string
259 | link_id = data.get("id", JIRA_DEFAULT_ID)
260 | if link_id is not None:
261 | link_id = str(link_id)
262 |
263 | return cls(
264 | id=link_id,
265 | type=link_type,
266 | inward_issue=inward_issue,
267 | outward_issue=outward_issue,
268 | )
269 |
270 | def to_simplified_dict(self) -> dict[str, Any]:
271 | """Convert to simplified dictionary for API response."""
272 | result = {
273 | "id": self.id,
274 | }
275 |
276 | if self.type:
277 | result["type"] = self.type.to_simplified_dict()
278 |
279 | if self.inward_issue:
280 | result["inward_issue"] = self.inward_issue.to_simplified_dict()
281 |
282 | if self.outward_issue:
283 | result["outward_issue"] = self.outward_issue.to_simplified_dict()
284 |
285 | return result
286 |
```
--------------------------------------------------------------------------------
/tests/unit/test_exceptions.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the exceptions module.
3 | """
4 |
5 | import pickle
6 |
7 | import pytest
8 |
9 | from src.mcp_atlassian.exceptions import MCPAtlassianAuthenticationError
10 |
11 |
12 | class TestMCPAtlassianAuthenticationError:
13 | """Tests for the MCPAtlassianAuthenticationError exception class."""
14 |
15 | def test_instantiation_without_message(self):
16 | """Test creating exception without a message."""
17 | error = MCPAtlassianAuthenticationError()
18 |
19 | assert isinstance(error, MCPAtlassianAuthenticationError)
20 | assert isinstance(error, Exception)
21 | assert str(error) == ""
22 | assert error.args == ()
23 |
24 | def test_instantiation_with_message(self):
25 | """Test creating exception with a message."""
26 | message = "Authentication failed"
27 | error = MCPAtlassianAuthenticationError(message)
28 |
29 | assert isinstance(error, MCPAtlassianAuthenticationError)
30 | assert isinstance(error, Exception)
31 | assert str(error) == message
32 | assert error.args == (message,)
33 |
34 | def test_instantiation_with_multiple_args(self):
35 | """Test creating exception with multiple arguments."""
36 | message = "Authentication failed"
37 | code = 401
38 | error = MCPAtlassianAuthenticationError(message, code)
39 |
40 | assert isinstance(error, MCPAtlassianAuthenticationError)
41 | assert isinstance(error, Exception)
42 | # When multiple args are present, str() returns tuple representation
43 | assert str(error) == "('Authentication failed', 401)"
44 | assert error.args == (message, code)
45 |
46 | def test_inheritance_hierarchy(self):
47 | """Test that the exception properly inherits from Exception."""
48 | error = MCPAtlassianAuthenticationError("test")
49 |
50 | assert isinstance(error, MCPAtlassianAuthenticationError)
51 | assert isinstance(error, Exception)
52 | assert isinstance(error, BaseException)
53 | assert issubclass(MCPAtlassianAuthenticationError, Exception)
54 | assert issubclass(MCPAtlassianAuthenticationError, BaseException)
55 |
56 | def test_string_representation(self):
57 | """Test string representation of the exception."""
58 | # Empty message
59 | error = MCPAtlassianAuthenticationError()
60 | assert str(error) == ""
61 | assert repr(error) == "MCPAtlassianAuthenticationError()"
62 |
63 | # With message
64 | message = "Invalid credentials provided"
65 | error = MCPAtlassianAuthenticationError(message)
66 | assert str(error) == message
67 | assert repr(error) == f"MCPAtlassianAuthenticationError('{message}')"
68 |
69 | # With multiple args
70 | error = MCPAtlassianAuthenticationError("Auth failed", 403)
71 | assert str(error) == "('Auth failed', 403)"
72 | assert repr(error) == "MCPAtlassianAuthenticationError('Auth failed', 403)"
73 |
74 | def test_exception_raising_and_catching(self):
75 | """Test raising and catching the exception."""
76 | message = "401 Unauthorized"
77 |
78 | with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
79 | raise MCPAtlassianAuthenticationError(message)
80 |
81 | assert str(exc_info.value) == message
82 | assert exc_info.value.args == (message,)
83 |
84 | def test_exception_catching_as_base_exception(self):
85 | """Test that the exception can be caught as base Exception."""
86 | message = "403 Forbidden"
87 |
88 | with pytest.raises(Exception) as exc_info:
89 | raise MCPAtlassianAuthenticationError(message)
90 |
91 | assert isinstance(exc_info.value, MCPAtlassianAuthenticationError)
92 | assert str(exc_info.value) == message
93 |
94 | def test_exception_chaining_with_cause(self):
95 | """Test exception chaining using 'raise from' syntax."""
96 | original_error = ValueError("Invalid token format")
97 | auth_message = "Authentication failed due to invalid token"
98 |
99 | with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
100 | try:
101 | raise original_error
102 | except ValueError as e:
103 | raise MCPAtlassianAuthenticationError(auth_message) from e
104 |
105 | assert str(exc_info.value) == auth_message
106 | assert exc_info.value.__cause__ is original_error
107 | # Context is still preserved even with explicit 'raise from'
108 | assert exc_info.value.__context__ is original_error
109 |
110 | def test_exception_chaining_with_context(self):
111 | """Test implicit exception chaining (context preservation)."""
112 | original_error = ConnectionError("Network timeout")
113 | auth_message = "Authentication failed"
114 |
115 | with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
116 | try:
117 | raise original_error
118 | except ConnectionError:
119 | raise MCPAtlassianAuthenticationError(auth_message) from None
120 |
121 | assert str(exc_info.value) == auth_message
122 | assert exc_info.value.__context__ is original_error
123 | assert exc_info.value.__cause__ is None
124 |
125 | def test_exception_suppressed_context(self):
126 | """Test exception with suppressed context."""
127 | original_error = RuntimeError("Some runtime error")
128 | auth_message = "Authentication failed"
129 |
130 | with pytest.raises(MCPAtlassianAuthenticationError) as exc_info:
131 | try:
132 | raise original_error
133 | except RuntimeError:
134 | error = MCPAtlassianAuthenticationError(auth_message)
135 | error.__suppress_context__ = True
136 | raise error from None
137 |
138 | assert str(exc_info.value) == auth_message
139 | assert exc_info.value.__suppress_context__ is True
140 |
141 | def test_serialization_with_pickle(self):
142 | """Test that the exception can be pickled and unpickled."""
143 | message = "Authentication error for serialization test"
144 | original_error = MCPAtlassianAuthenticationError(message)
145 |
146 | # Serialize
147 | pickled_data = pickle.dumps(original_error)
148 |
149 | # Deserialize
150 | unpickled_error = pickle.loads(pickled_data)
151 |
152 | assert isinstance(unpickled_error, MCPAtlassianAuthenticationError)
153 | assert str(unpickled_error) == message
154 | assert unpickled_error.args == original_error.args
155 |
156 | def test_exception_attributes_access(self):
157 | """Test accessing exception attributes."""
158 | message = "Test message"
159 | error = MCPAtlassianAuthenticationError(message)
160 |
161 | # Test standard exception attributes
162 | assert hasattr(error, "args")
163 | assert hasattr(error, "__traceback__")
164 | assert hasattr(error, "__cause__")
165 | assert hasattr(error, "__context__")
166 | assert hasattr(error, "__suppress_context__")
167 |
168 | # Test docstring access
169 | expected_doc = "Raised when Atlassian API authentication fails (401/403)."
170 | assert error.__doc__ == expected_doc
171 |
172 | def test_exception_equality(self):
173 | """Test exception equality comparison."""
174 | message = "Same message"
175 | error1 = MCPAtlassianAuthenticationError(message)
176 | error2 = MCPAtlassianAuthenticationError(message)
177 | error3 = MCPAtlassianAuthenticationError("Different message")
178 |
179 | # Exceptions with same args should have same args but different identity
180 | assert error1.args == error2.args
181 | assert error1 is not error2
182 | assert error1.args != error3.args
183 |
184 | def test_realistic_authentication_scenarios(self):
185 | """Test realistic authentication error scenarios."""
186 | # 401 Unauthorized
187 | msg_401 = "401 Unauthorized: Invalid API token"
188 | error_401 = MCPAtlassianAuthenticationError(msg_401)
189 | assert "401" in str(error_401)
190 | assert "Invalid API token" in str(error_401)
191 |
192 | # 403 Forbidden
193 | msg_403 = "403 Forbidden: Insufficient permissions"
194 | error_403 = MCPAtlassianAuthenticationError(msg_403)
195 | assert "403" in str(error_403)
196 | assert "Insufficient permissions" in str(error_403)
197 |
198 | # OAuth token expired
199 | oauth_error = MCPAtlassianAuthenticationError("OAuth token has expired")
200 | assert "OAuth" in str(oauth_error)
201 | assert "expired" in str(oauth_error)
202 |
```
--------------------------------------------------------------------------------
/tests/unit/utils/test_env.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for environment variable utility functions."""
2 |
3 | from mcp_atlassian.utils.env import (
4 | is_env_extended_truthy,
5 | is_env_ssl_verify,
6 | is_env_truthy,
7 | )
8 |
9 |
10 | class TestIsEnvTruthy:
11 | """Test the is_env_truthy function."""
12 |
13 | def test_standard_truthy_values(self, monkeypatch):
14 | """Test standard truthy values: 'true', '1', 'yes'."""
15 | truthy_values = ["true", "1", "yes"]
16 |
17 | for value in truthy_values:
18 | monkeypatch.setenv("TEST_VAR", value)
19 | assert is_env_truthy("TEST_VAR") is True
20 |
21 | # Test uppercase variants
22 | for value in truthy_values:
23 | monkeypatch.setenv("TEST_VAR", value.upper())
24 | assert is_env_truthy("TEST_VAR") is True
25 |
26 | # Test mixed case variants
27 | for value in truthy_values:
28 | monkeypatch.setenv("TEST_VAR", value.capitalize())
29 | assert is_env_truthy("TEST_VAR") is True
30 |
31 | def test_standard_falsy_values(self, monkeypatch):
32 | """Test that standard falsy values return False."""
33 | falsy_values = ["false", "0", "no", "", "invalid", "y", "on"]
34 |
35 | for value in falsy_values:
36 | monkeypatch.setenv("TEST_VAR", value)
37 | assert is_env_truthy("TEST_VAR") is False
38 |
39 | def test_unset_variable_with_default(self, monkeypatch):
40 | """Test behavior when variable is unset with various defaults."""
41 | monkeypatch.delenv("TEST_VAR", raising=False)
42 |
43 | # Default empty string
44 | assert is_env_truthy("TEST_VAR") is False
45 |
46 | # Default truthy value
47 | assert is_env_truthy("TEST_VAR", "true") is True
48 | assert is_env_truthy("TEST_VAR", "1") is True
49 | assert is_env_truthy("TEST_VAR", "yes") is True
50 |
51 | # Default falsy value
52 | assert is_env_truthy("TEST_VAR", "false") is False
53 | assert is_env_truthy("TEST_VAR", "0") is False
54 |
55 | def test_empty_string_environment_variable(self, monkeypatch):
56 | """Test behavior when environment variable is set to empty string."""
57 | monkeypatch.setenv("TEST_VAR", "")
58 | assert is_env_truthy("TEST_VAR") is False
59 |
60 |
61 | class TestIsEnvExtendedTruthy:
62 | """Test the is_env_extended_truthy function."""
63 |
64 | def test_extended_truthy_values(self, monkeypatch):
65 | """Test extended truthy values: 'true', '1', 'yes', 'y', 'on'."""
66 | truthy_values = ["true", "1", "yes", "y", "on"]
67 |
68 | for value in truthy_values:
69 | monkeypatch.setenv("TEST_VAR", value)
70 | assert is_env_extended_truthy("TEST_VAR") is True
71 |
72 | # Test uppercase variants
73 | for value in truthy_values:
74 | monkeypatch.setenv("TEST_VAR", value.upper())
75 | assert is_env_extended_truthy("TEST_VAR") is True
76 |
77 | # Test mixed case variants
78 | for value in truthy_values:
79 | monkeypatch.setenv("TEST_VAR", value.capitalize())
80 | assert is_env_extended_truthy("TEST_VAR") is True
81 |
82 | def test_extended_falsy_values(self, monkeypatch):
83 | """Test that extended falsy values return False."""
84 | falsy_values = ["false", "0", "no", "", "invalid", "off"]
85 |
86 | for value in falsy_values:
87 | monkeypatch.setenv("TEST_VAR", value)
88 | assert is_env_extended_truthy("TEST_VAR") is False
89 |
90 | def test_extended_vs_standard_difference(self, monkeypatch):
91 | """Test that extended truthy accepts 'y' and 'on' while standard doesn't."""
92 | extended_only_values = ["y", "on"]
93 |
94 | for value in extended_only_values:
95 | monkeypatch.setenv("TEST_VAR", value)
96 | # Extended should be True
97 | assert is_env_extended_truthy("TEST_VAR") is True
98 | # Standard should be False
99 | assert is_env_truthy("TEST_VAR") is False
100 |
101 | def test_unset_variable_with_default(self, monkeypatch):
102 | """Test behavior when variable is unset with various defaults."""
103 | monkeypatch.delenv("TEST_VAR", raising=False)
104 |
105 | # Default empty string
106 | assert is_env_extended_truthy("TEST_VAR") is False
107 |
108 | # Default truthy values
109 | assert is_env_extended_truthy("TEST_VAR", "true") is True
110 | assert is_env_extended_truthy("TEST_VAR", "y") is True
111 | assert is_env_extended_truthy("TEST_VAR", "on") is True
112 |
113 | # Default falsy value
114 | assert is_env_extended_truthy("TEST_VAR", "false") is False
115 |
116 |
117 | class TestIsEnvSslVerify:
118 | """Test the is_env_ssl_verify function."""
119 |
120 | def test_ssl_verify_default_true(self, monkeypatch):
121 | """Test that SSL verification defaults to True when unset."""
122 | monkeypatch.delenv("TEST_VAR", raising=False)
123 | assert is_env_ssl_verify("TEST_VAR") is True
124 |
125 | def test_ssl_verify_explicit_false_values(self, monkeypatch):
126 | """Test that SSL verification is False only for explicit false values."""
127 | false_values = ["false", "0", "no"]
128 |
129 | for value in false_values:
130 | monkeypatch.setenv("TEST_VAR", value)
131 | assert is_env_ssl_verify("TEST_VAR") is False
132 |
133 | # Test uppercase variants
134 | for value in false_values:
135 | monkeypatch.setenv("TEST_VAR", value.upper())
136 | assert is_env_ssl_verify("TEST_VAR") is False
137 |
138 | # Test mixed case variants
139 | for value in false_values:
140 | monkeypatch.setenv("TEST_VAR", value.capitalize())
141 | assert is_env_ssl_verify("TEST_VAR") is False
142 |
143 | def test_ssl_verify_truthy_and_other_values(self, monkeypatch):
144 | """Test that SSL verification is True for truthy and other values."""
145 | truthy_values = ["true", "1", "yes", "y", "on", "enable", "enabled", "anything"]
146 |
147 | for value in truthy_values:
148 | monkeypatch.setenv("TEST_VAR", value)
149 | assert is_env_ssl_verify("TEST_VAR") is True
150 |
151 | def test_ssl_verify_custom_default(self, monkeypatch):
152 | """Test SSL verification with custom defaults."""
153 | monkeypatch.delenv("TEST_VAR", raising=False)
154 |
155 | # Custom default true
156 | assert is_env_ssl_verify("TEST_VAR", "true") is True
157 |
158 | # Custom default false
159 | assert is_env_ssl_verify("TEST_VAR", "false") is False
160 |
161 | # Custom default other value
162 | assert is_env_ssl_verify("TEST_VAR", "anything") is True
163 |
164 | def test_ssl_verify_empty_string(self, monkeypatch):
165 | """Test SSL verification when set to empty string."""
166 | monkeypatch.setenv("TEST_VAR", "")
167 | # Empty string is not in the false values, so should be True
168 | assert is_env_ssl_verify("TEST_VAR") is True
169 |
170 |
171 | class TestEdgeCases:
172 | """Test edge cases and special scenarios."""
173 |
174 | def test_whitespace_handling(self, monkeypatch):
175 | """Test that whitespace in values is not stripped."""
176 | # Values with leading/trailing whitespace should not match
177 | monkeypatch.setenv("TEST_VAR", " true ")
178 | assert is_env_truthy("TEST_VAR") is False
179 | assert is_env_extended_truthy("TEST_VAR") is False
180 |
181 | monkeypatch.setenv("TEST_VAR", " false ")
182 | assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
183 |
184 | def test_special_characters(self, monkeypatch):
185 | """Test behavior with special characters."""
186 | special_values = ["true!", "@yes", "1.0", "y,", "on;"]
187 |
188 | for value in special_values:
189 | monkeypatch.setenv("TEST_VAR", value)
190 | assert is_env_truthy("TEST_VAR") is False
191 | assert is_env_extended_truthy("TEST_VAR") is False
192 | assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
193 |
194 | def test_unicode_values(self, monkeypatch):
195 | """Test behavior with unicode values."""
196 | unicode_values = ["truë", "yés", "1️⃣"]
197 |
198 | for value in unicode_values:
199 | monkeypatch.setenv("TEST_VAR", value)
200 | assert is_env_truthy("TEST_VAR") is False
201 | assert is_env_extended_truthy("TEST_VAR") is False
202 | assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
203 |
204 | def test_numeric_string_edge_cases(self, monkeypatch):
205 | """Test numeric string edge cases."""
206 | numeric_values = ["01", "1.0", "10", "-1", "2"]
207 |
208 | for value in numeric_values:
209 | monkeypatch.setenv("TEST_VAR", value)
210 | if value == "01":
211 | # "01" is not exactly "1", so should be False
212 | assert is_env_truthy("TEST_VAR") is False
213 | assert is_env_extended_truthy("TEST_VAR") is False
214 | else:
215 | assert is_env_truthy("TEST_VAR") is False
216 | assert is_env_extended_truthy("TEST_VAR") is False
217 | assert is_env_ssl_verify("TEST_VAR") is True # Not in false values
218 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/jira/worklog.py:
--------------------------------------------------------------------------------
```python
1 | """Module for Jira worklog operations."""
2 |
3 | import logging
4 | import re
5 | from typing import Any
6 |
7 | from ..models import JiraWorklog
8 | from ..utils import parse_date
9 | from .client import JiraClient
10 |
11 | logger = logging.getLogger("mcp-jira")
12 |
13 |
14 | class WorklogMixin(JiraClient):
15 | """Mixin for Jira worklog operations."""
16 |
17 | def _parse_time_spent(self, time_spent: str) -> int:
18 | """
19 | Parse time spent string into seconds.
20 |
21 | Args:
22 | time_spent: Time spent string (e.g. 1h 30m, 1d, etc.)
23 |
24 | Returns:
25 | Time spent in seconds
26 | """
27 | # Base case for direct specification in seconds
28 | if time_spent.endswith("s"):
29 | try:
30 | return int(time_spent[:-1])
31 | except ValueError:
32 | pass
33 |
34 | total_seconds = 0
35 | time_units = {
36 | "w": 7 * 24 * 60 * 60, # weeks to seconds
37 | "d": 24 * 60 * 60, # days to seconds
38 | "h": 60 * 60, # hours to seconds
39 | "m": 60, # minutes to seconds
40 | }
41 |
42 | # Regular expression to find time components like 1w, 2d, 3h, 4m
43 | pattern = r"(\d+)([wdhm])"
44 | matches = re.findall(pattern, time_spent)
45 |
46 | for value, unit in matches:
47 | # Convert value to int and multiply by the unit in seconds
48 | seconds = int(value) * time_units[unit]
49 | total_seconds += seconds
50 |
51 | if total_seconds == 0:
52 | # If we couldn't parse anything, try using the raw value
53 | try:
54 | return int(float(time_spent)) # Convert to float first, then to int
55 | except ValueError:
56 | # If all else fails, default to 60 seconds (1 minute)
57 | logger.warning(
58 | f"Could not parse time: {time_spent}, defaulting to 60 seconds"
59 | )
60 | return 60
61 |
62 | return total_seconds
63 |
64 | def add_worklog(
65 | self,
66 | issue_key: str,
67 | time_spent: str,
68 | comment: str | None = None,
69 | started: str | None = None,
70 | original_estimate: str | None = None,
71 | remaining_estimate: str | None = None,
72 | ) -> dict[str, Any]:
73 | """
74 | Add a worklog entry to a Jira issue.
75 |
76 | Args:
77 | issue_key: The issue key (e.g. 'PROJ-123')
78 | time_spent: Time spent (e.g. '1h 30m', '3h', '1d')
79 | comment: Optional comment for the worklog
80 | started: Optional ISO8601 date time string for when work began
81 | original_estimate: Optional new value for the original estimate
82 | remaining_estimate: Optional new value for the remaining estimate
83 |
84 | Returns:
85 | Response data if successful
86 |
87 | Raises:
88 | Exception: If there's an error adding the worklog
89 | """
90 | try:
91 | # Convert time_spent string to seconds
92 | time_spent_seconds = self._parse_time_spent(time_spent)
93 |
94 | # Convert Markdown comment to Jira format if provided
95 | if comment:
96 | # Check if _markdown_to_jira is available (from CommentsMixin)
97 | if hasattr(self, "_markdown_to_jira"):
98 | comment = self._markdown_to_jira(comment)
99 |
100 | # Step 1: Update original estimate if provided (separate API call)
101 | original_estimate_updated = False
102 | if original_estimate:
103 | try:
104 | fields = {"timetracking": {"originalEstimate": original_estimate}}
105 | self.jira.edit_issue(issue_id_or_key=issue_key, fields=fields)
106 | original_estimate_updated = True
107 | logger.info(f"Updated original estimate for issue {issue_key}")
108 | except Exception as e: # noqa: BLE001 - Intentional fallback with logging
109 | logger.error(
110 | f"Failed to update original estimate for issue {issue_key}: "
111 | f"{str(e)}"
112 | )
113 | # Continue with worklog creation even if estimate update fails
114 |
115 | # Step 2: Prepare worklog data
116 | worklog_data: dict[str, Any] = {"timeSpentSeconds": time_spent_seconds}
117 | if comment:
118 | worklog_data["comment"] = comment
119 | if started:
120 | worklog_data["started"] = started
121 |
122 | # Step 3: Prepare query parameters for remaining estimate
123 | params = {}
124 | remaining_estimate_updated = False
125 | if remaining_estimate:
126 | params["adjustEstimate"] = "new"
127 | params["newEstimate"] = remaining_estimate
128 | remaining_estimate_updated = True
129 |
130 | # Step 4: Add the worklog with remaining estimate adjustment
131 | base_url = self.jira.resource_url("issue")
132 | url = f"{base_url}/{issue_key}/worklog"
133 |
134 | result = self.jira.post(url, data=worklog_data, params=params)
135 | if not isinstance(result, dict):
136 | msg = f"Unexpected return value type from `jira.post`: {type(result)}"
137 | logger.error(msg)
138 | raise TypeError(msg)
139 |
140 | # Format and return the result
141 | return {
142 | "id": result.get("id"),
143 | "comment": self._clean_text(result.get("comment", "")),
144 | "created": str(parse_date(result.get("created", ""))),
145 | "updated": str(parse_date(result.get("updated", ""))),
146 | "started": str(parse_date(result.get("started", ""))),
147 | "timeSpent": result.get("timeSpent", ""),
148 | "timeSpentSeconds": result.get("timeSpentSeconds", 0),
149 | "author": result.get("author", {}).get("displayName", "Unknown"),
150 | "original_estimate_updated": original_estimate_updated,
151 | "remaining_estimate_updated": remaining_estimate_updated,
152 | }
153 | except Exception as e:
154 | logger.error(f"Error adding worklog to issue {issue_key}: {str(e)}")
155 | raise Exception(f"Error adding worklog: {str(e)}") from e
156 |
157 | def get_worklog(self, issue_key: str) -> dict[str, Any]:
158 | """
159 | Get the worklog data for an issue.
160 |
161 | Args:
162 | issue_key: The issue key (e.g. 'PROJ-123')
163 |
164 | Returns:
165 | Raw worklog data from the API
166 | """
167 | try:
168 | return self.jira.worklog(issue_key) # type: ignore[attr-defined]
169 | except Exception as e:
170 | logger.warning(f"Error getting worklog for {issue_key}: {e}")
171 | return {"worklogs": []}
172 |
173 | def get_worklog_models(self, issue_key: str) -> list[JiraWorklog]:
174 | """
175 | Get all worklog entries for an issue as JiraWorklog models.
176 |
177 | Args:
178 | issue_key: The issue key (e.g. 'PROJ-123')
179 |
180 | Returns:
181 | List of JiraWorklog models
182 | """
183 | worklog_data = self.get_worklog(issue_key)
184 | result: list[JiraWorklog] = []
185 |
186 | if "worklogs" in worklog_data and worklog_data["worklogs"]:
187 | for log_data in worklog_data["worklogs"]:
188 | worklog = JiraWorklog.from_api_response(log_data)
189 | result.append(worklog)
190 |
191 | return result
192 |
193 | def get_worklogs(self, issue_key: str) -> list[dict[str, Any]]:
194 | """
195 | Get all worklog entries for an issue.
196 |
197 | Args:
198 | issue_key: The issue key (e.g. 'PROJ-123')
199 |
200 | Returns:
201 | List of worklog entries
202 |
203 | Raises:
204 | Exception: If there's an error getting the worklogs
205 | """
206 | try:
207 | result = self.jira.issue_get_worklog(issue_key)
208 | if not isinstance(result, dict):
209 | msg = f"Unexpected return value type from `jira.issue_get_worklog`: {type(result)}"
210 | logger.error(msg)
211 | raise TypeError(msg)
212 |
213 | # Process the worklogs
214 | worklogs = []
215 | for worklog in result.get("worklogs", []):
216 | worklogs.append(
217 | {
218 | "id": worklog.get("id"),
219 | "comment": self._clean_text(worklog.get("comment", "")),
220 | "created": str(parse_date(worklog.get("created", ""))),
221 | "updated": str(parse_date(worklog.get("updated", ""))),
222 | "started": str(parse_date(worklog.get("started", ""))),
223 | "timeSpent": worklog.get("timeSpent", ""),
224 | "timeSpentSeconds": worklog.get("timeSpentSeconds", 0),
225 | "author": worklog.get("author", {}).get(
226 | "displayName", "Unknown"
227 | ),
228 | }
229 | )
230 |
231 | return worklogs
232 | except Exception as e:
233 | logger.error(f"Error getting worklogs for issue {issue_key}: {str(e)}")
234 | raise Exception(f"Error getting worklogs: {str(e)}") from e
235 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/models/confluence/page.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Confluence page models.
3 | This module provides Pydantic models for Confluence pages and their versions.
4 | """
5 |
6 | import logging
7 | import warnings
8 | from typing import Any
9 |
10 | from pydantic import Field
11 |
12 | from ..base import ApiModel, TimestampMixin
13 | from ..constants import (
14 | CONFLUENCE_DEFAULT_ID,
15 | EMPTY_STRING,
16 | )
17 |
18 | # Import other necessary models using relative imports
19 | from .common import ConfluenceAttachment, ConfluenceUser
20 | from .space import ConfluenceSpace
21 |
22 | logger = logging.getLogger(__name__)
23 |
24 |
25 | class ConfluenceVersion(ApiModel, TimestampMixin):
26 | """
27 | Model representing a Confluence page version.
28 | """
29 |
30 | number: int = 0
31 | when: str = EMPTY_STRING
32 | message: str | None = None
33 | by: ConfluenceUser | None = None
34 |
35 | @classmethod
36 | def from_api_response(
37 | cls, data: dict[str, Any], **kwargs: Any
38 | ) -> "ConfluenceVersion":
39 | """
40 | Create a ConfluenceVersion from a Confluence API response.
41 |
42 | Args:
43 | data: The version data from the Confluence API
44 |
45 | Returns:
46 | A ConfluenceVersion instance
47 | """
48 | if not data:
49 | return cls()
50 |
51 | by_user = None
52 | if by_data := data.get("by"):
53 | by_user = ConfluenceUser.from_api_response(by_data)
54 |
55 | return cls(
56 | number=data.get("number", 0),
57 | when=data.get("when", EMPTY_STRING),
58 | message=data.get("message"),
59 | by=by_user,
60 | )
61 |
62 | def to_simplified_dict(self) -> dict[str, Any]:
63 | """Convert to simplified dictionary for API response."""
64 | result = {"number": self.number, "when": self.format_timestamp(self.when)}
65 |
66 | if self.message:
67 | result["message"] = self.message
68 |
69 | if self.by:
70 | result["by"] = self.by.display_name
71 |
72 | return result
73 |
74 |
75 | class ConfluencePage(ApiModel, TimestampMixin):
76 | """
77 | Model representing a Confluence page.
78 |
79 | This model includes the content, metadata, and version information
80 | for a Confluence page.
81 | """
82 |
83 | id: str = CONFLUENCE_DEFAULT_ID
84 | title: str = EMPTY_STRING
85 | type: str = "page" # "page", "blogpost", etc.
86 | status: str = "current"
87 | space: ConfluenceSpace | None = None
88 | content: str = EMPTY_STRING
89 | content_format: str = "view" # "view", "storage", etc.
90 | created: str = EMPTY_STRING
91 | updated: str = EMPTY_STRING
92 | author: ConfluenceUser | None = None
93 | version: ConfluenceVersion | None = None
94 | ancestors: list[dict[str, Any]] = Field(default_factory=list)
95 | children: dict[str, Any] = Field(default_factory=dict)
96 | attachments: list[ConfluenceAttachment] = Field(default_factory=list)
97 | url: str | None = None
98 |
99 | @property
100 | def page_content(self) -> str:
101 | """
102 | Alias for content to maintain compatibility with tests.
103 |
104 | Deprecated: Use content instead.
105 | """
106 | warnings.warn(
107 | "The 'page_content' property is deprecated. Use 'content' instead.",
108 | DeprecationWarning,
109 | stacklevel=2,
110 | )
111 | return self.content
112 |
113 | @classmethod
114 | def from_api_response(cls, data: dict[str, Any], **kwargs: Any) -> "ConfluencePage":
115 | """
116 | Create a ConfluencePage from a Confluence API response.
117 |
118 | Args:
119 | data: The page data from the Confluence API
120 | **kwargs: Additional keyword arguments
121 | base_url: Base URL for constructing page URLs
122 | include_body: Whether to include body content
123 | content_override: Override the content value
124 | content_format: Override the content format
125 | is_cloud: Whether this is a cloud instance (affects URL format)
126 |
127 | Returns:
128 | A ConfluencePage instance
129 | """
130 | if not data:
131 | return cls()
132 |
133 | # Extract space information first to ensure it's available for URL construction
134 | space_data = data.get("space", {})
135 | if not space_data:
136 | # Try to extract space info from _expandable if available
137 | if expandable := data.get("_expandable", {}):
138 | if space_path := expandable.get("space"):
139 | # Extract space key from REST API path
140 | if space_path.startswith("/rest/api/space/"):
141 | space_key = space_path.split("/rest/api/space/")[1]
142 | space_data = {"key": space_key, "name": f"Space {space_key}"}
143 |
144 | # Create space model
145 | space = ConfluenceSpace.from_api_response(space_data)
146 |
147 | # Extract content based on format or use override if provided
148 | content = EMPTY_STRING
149 | content_format = kwargs.get("content_format", "view")
150 | include_body = kwargs.get("include_body", True)
151 |
152 | # Allow content override to be provided directly
153 | if content_override := kwargs.get("content_override"):
154 | content = content_override
155 | elif include_body and "body" in data:
156 | body = data.get("body", {})
157 | if content_format in body:
158 | content = body.get(content_format, {}).get("value", EMPTY_STRING)
159 |
160 | # Adjust content_format if convert_to_markdown is False and content is processed HTML
161 | convert_to_markdown = kwargs.get("convert_to_markdown", True)
162 | if not convert_to_markdown:
163 | content_format = "html"
164 |
165 | # Process author/creator
166 | author = None
167 | if author_data := data.get("author"):
168 | author = ConfluenceUser.from_api_response(author_data)
169 |
170 | # Process version
171 | version = None
172 | if version_data := data.get("version"):
173 | version = ConfluenceVersion.from_api_response(version_data)
174 |
175 | # Process attachments
176 | attachments = []
177 | if (
178 | attachments_data := data.get("children", {})
179 | .get("attachment", {})
180 | .get("results", [])
181 | ):
182 | attachments = [
183 | ConfluenceAttachment.from_api_response(attachment)
184 | for attachment in attachments_data
185 | ]
186 |
187 | # Process metadata timestamps
188 | created = EMPTY_STRING
189 | updated = EMPTY_STRING
190 |
191 | if history := data.get("history"):
192 | created = history.get("createdDate", EMPTY_STRING)
193 | updated = history.get("lastUpdated", {}).get("when", EMPTY_STRING)
194 |
195 | # Fall back to version date if no history is available
196 | if not updated and version and version.when:
197 | updated = version.when
198 |
199 | # Construct URL if base_url is provided
200 | url = None
201 | if base_url := kwargs.get("base_url"):
202 | page_id = data.get("id")
203 |
204 | # Use different URL format based on whether it's cloud or server
205 | is_cloud = kwargs.get("is_cloud", False)
206 | if is_cloud:
207 | # Cloud format: {base_url}/spaces/{space_key}/pages/{page_id}
208 | space_key = space.key if space and space.key else "unknown"
209 | url = f"{base_url}/spaces/{space_key}/pages/{page_id}"
210 | else:
211 | # Server format: {base_url}/pages/viewpage.action?pageId={page_id}
212 | url = f"{base_url}/pages/viewpage.action?pageId={page_id}"
213 |
214 | return cls(
215 | id=str(data.get("id", CONFLUENCE_DEFAULT_ID)),
216 | title=data.get("title", EMPTY_STRING),
217 | type=data.get("type", "page"),
218 | status=data.get("status", "current"),
219 | space=space,
220 | content=content,
221 | content_format=content_format,
222 | created=created,
223 | updated=updated,
224 | author=author,
225 | version=version,
226 | ancestors=data.get("ancestors", []),
227 | children=data.get("children", {}),
228 | attachments=attachments,
229 | url=url,
230 | )
231 |
232 | def to_simplified_dict(self) -> dict[str, Any]:
233 | """Convert to simplified dictionary for API response."""
234 | result = {
235 | "id": self.id,
236 | "title": self.title,
237 | "type": self.type,
238 | "created": self.format_timestamp(self.created),
239 | "updated": self.format_timestamp(self.updated),
240 | "url": self.url,
241 | }
242 |
243 | # Add space information if available
244 | if self.space:
245 | result["space"] = {"key": self.space.key, "name": self.space.name}
246 |
247 | # Add author information if available
248 | if self.author:
249 | result["author"] = self.author.display_name
250 |
251 | # Add version information if available
252 | if self.version:
253 | result["version"] = self.version.number
254 |
255 | # Add attachments if available
256 | result["attachments"] = [
257 | attachment.to_simplified_dict() for attachment in self.attachments
258 | ]
259 |
260 | # Add content if it's not empty
261 | if self.content and self.content_format:
262 | result["content"] = {"value": self.content, "format": self.content_format}
263 |
264 | # Add ancestors if there are any
265 | if self.ancestors:
266 | result["ancestors"] = [
267 | {"id": a.get("id"), "title": a.get("title")}
268 | for a in self.ancestors
269 | if "id" in a
270 | ]
271 |
272 | return result
273 |
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_client.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for the ConfluenceClient class."""
2 |
3 | import os
4 | from unittest.mock import MagicMock, patch
5 |
6 | from mcp_atlassian.confluence import ConfluenceFetcher
7 | from mcp_atlassian.confluence.client import ConfluenceClient
8 | from mcp_atlassian.confluence.config import ConfluenceConfig
9 |
10 |
11 | def test_init_with_basic_auth():
12 | """Test initializing the client with basic auth configuration."""
13 | # Arrange
14 | config = ConfluenceConfig(
15 | url="https://test.atlassian.net/wiki",
16 | auth_type="basic",
17 | username="test_user",
18 | api_token="test_token",
19 | )
20 |
21 | # Mock the Confluence class, ConfluencePreprocessor, and configure_ssl_verification
22 | with (
23 | patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
24 | patch(
25 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
26 | ) as mock_preprocessor,
27 | patch(
28 | "mcp_atlassian.confluence.client.configure_ssl_verification"
29 | ) as mock_configure_ssl,
30 | ):
31 | # Act
32 | client = ConfluenceClient(config=config)
33 |
34 | # Assert
35 | mock_confluence.assert_called_once_with(
36 | url="https://test.atlassian.net/wiki",
37 | username="test_user",
38 | password="test_token",
39 | cloud=True,
40 | verify_ssl=True,
41 | )
42 | assert client.config == config
43 | assert client.confluence == mock_confluence.return_value
44 | assert client.preprocessor == mock_preprocessor.return_value
45 |
46 | # Verify SSL verification was configured
47 | mock_configure_ssl.assert_called_once_with(
48 | service_name="Confluence",
49 | url="https://test.atlassian.net/wiki",
50 | session=mock_confluence.return_value._session,
51 | ssl_verify=True,
52 | )
53 |
54 |
55 | def test_init_with_token_auth():
56 | """Test initializing the client with token auth configuration."""
57 | # Arrange
58 | config = ConfluenceConfig(
59 | url="https://confluence.example.com",
60 | auth_type="pat",
61 | personal_token="test_personal_token",
62 | ssl_verify=False,
63 | )
64 |
65 | # Mock the Confluence class, ConfluencePreprocessor, and configure_ssl_verification
66 | with (
67 | patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
68 | patch(
69 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
70 | ) as mock_preprocessor,
71 | patch(
72 | "mcp_atlassian.confluence.client.configure_ssl_verification"
73 | ) as mock_configure_ssl,
74 | ):
75 | # Act
76 | client = ConfluenceClient(config=config)
77 |
78 | # Assert
79 | mock_confluence.assert_called_once_with(
80 | url="https://confluence.example.com",
81 | token="test_personal_token",
82 | cloud=False,
83 | verify_ssl=False,
84 | )
85 | assert client.config == config
86 | assert client.confluence == mock_confluence.return_value
87 | assert client.preprocessor == mock_preprocessor.return_value
88 |
89 | # Verify SSL verification was configured with ssl_verify=False
90 | mock_configure_ssl.assert_called_once_with(
91 | service_name="Confluence",
92 | url="https://confluence.example.com",
93 | session=mock_confluence.return_value._session,
94 | ssl_verify=False,
95 | )
96 |
97 |
98 | def test_init_from_env():
99 | """Test initializing the client from environment variables."""
100 | # Arrange
101 | with (
102 | patch(
103 | "mcp_atlassian.confluence.config.ConfluenceConfig.from_env"
104 | ) as mock_from_env,
105 | patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence,
106 | patch("mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"),
107 | patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
108 | ):
109 | mock_config = MagicMock()
110 | mock_from_env.return_value = mock_config
111 |
112 | # Act
113 | client = ConfluenceClient()
114 |
115 | # Assert
116 | mock_from_env.assert_called_once()
117 | assert client.config == mock_config
118 |
119 |
120 | def test_process_html_content():
121 | """Test the _process_html_content method."""
122 | # Arrange
123 | with (
124 | patch("mcp_atlassian.confluence.client.ConfluenceConfig.from_env"),
125 | patch("mcp_atlassian.confluence.client.Confluence"),
126 | patch(
127 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"
128 | ) as mock_preprocessor_class,
129 | patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
130 | ):
131 | mock_preprocessor = mock_preprocessor_class.return_value
132 | mock_preprocessor.process_html_content.return_value = (
133 | "<p>HTML</p>",
134 | "Markdown",
135 | )
136 |
137 | client = ConfluenceClient()
138 |
139 | # Act
140 | html, markdown = client._process_html_content("<p>Test</p>", "TEST")
141 |
142 | # Assert
143 | mock_preprocessor.process_html_content.assert_called_once_with(
144 | "<p>Test</p>", "TEST", client.confluence
145 | )
146 | assert html == "<p>HTML</p>"
147 | assert markdown == "Markdown"
148 |
149 |
150 | def test_get_user_details_by_accountid():
151 | """Test the get_user_details_by_accountid method."""
152 | # Arrange
153 | with (
154 | patch("mcp_atlassian.confluence.client.ConfluenceConfig.from_env"),
155 | patch("mcp_atlassian.confluence.client.Confluence") as mock_confluence_class,
156 | patch("mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor"),
157 | patch("mcp_atlassian.confluence.client.configure_ssl_verification"),
158 | ):
159 | mock_confluence = mock_confluence_class.return_value
160 | mock_confluence.get_user_details_by_accountid.return_value = {
161 | "displayName": "Test User",
162 | "accountId": "123456",
163 | "emailAddress": "[email protected]",
164 | "active": True,
165 | }
166 |
167 | client = ConfluenceFetcher()
168 |
169 | # Act
170 | user_details = client.get_user_details_by_accountid("123456")
171 |
172 | # Assert
173 | mock_confluence.get_user_details_by_accountid.assert_called_once_with(
174 | "123456", None
175 | )
176 | assert user_details["displayName"] == "Test User"
177 | assert user_details["accountId"] == "123456"
178 | assert user_details["emailAddress"] == "[email protected]"
179 | assert user_details["active"] is True
180 |
181 | # Test with expand parameter
182 | mock_confluence.get_user_details_by_accountid.reset_mock()
183 | mock_confluence.get_user_details_by_accountid.return_value = {
184 | "displayName": "Test User",
185 | "accountId": "123456",
186 | "status": "active",
187 | }
188 |
189 | user_details = client.get_user_details_by_accountid("123456", expand="status")
190 |
191 | mock_confluence.get_user_details_by_accountid.assert_called_once_with(
192 | "123456", "status"
193 | )
194 | assert user_details["status"] == "active"
195 |
196 |
197 | def test_init_sets_proxies_and_no_proxy(monkeypatch):
198 | """Test that ConfluenceClient sets session proxies and NO_PROXY env var from config."""
199 | # Patch Confluence and its _session
200 | mock_confluence = MagicMock()
201 | mock_session = MagicMock()
202 | mock_session.proxies = {} # Use a real dict for proxies
203 | mock_confluence._session = mock_session
204 | monkeypatch.setattr(
205 | "mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
206 | )
207 | monkeypatch.setattr(
208 | "mcp_atlassian.confluence.client.configure_ssl_verification",
209 | lambda **kwargs: None,
210 | )
211 | monkeypatch.setattr(
212 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
213 | lambda **kwargs: MagicMock(),
214 | )
215 |
216 | # Patch environment
217 | monkeypatch.setenv("NO_PROXY", "")
218 |
219 | config = ConfluenceConfig(
220 | url="https://test.atlassian.net/wiki",
221 | auth_type="basic",
222 | username="user",
223 | api_token="token",
224 | http_proxy="http://proxy:8080",
225 | https_proxy="https://proxy:8443",
226 | socks_proxy="socks5://user:pass@proxy:1080",
227 | no_proxy="localhost,127.0.0.1",
228 | )
229 | client = ConfluenceClient(config=config)
230 | assert mock_session.proxies["http"] == "http://proxy:8080"
231 | assert mock_session.proxies["https"] == "https://proxy:8443"
232 | assert mock_session.proxies["socks"] == "socks5://user:pass@proxy:1080"
233 | assert os.environ["NO_PROXY"] == "localhost,127.0.0.1"
234 |
235 |
236 | def test_init_no_proxies(monkeypatch):
237 | """Test that ConfluenceClient does not set proxies if not configured."""
238 | # Patch Confluence and its _session
239 | mock_confluence = MagicMock()
240 | mock_session = MagicMock()
241 | mock_session.proxies = {} # Use a real dict for proxies
242 | mock_confluence._session = mock_session
243 | monkeypatch.setattr(
244 | "mcp_atlassian.confluence.client.Confluence", lambda **kwargs: mock_confluence
245 | )
246 | monkeypatch.setattr(
247 | "mcp_atlassian.confluence.client.configure_ssl_verification",
248 | lambda **kwargs: None,
249 | )
250 | monkeypatch.setattr(
251 | "mcp_atlassian.preprocessing.confluence.ConfluencePreprocessor",
252 | lambda **kwargs: MagicMock(),
253 | )
254 |
255 | config = ConfluenceConfig(
256 | url="https://test.atlassian.net/wiki",
257 | auth_type="basic",
258 | username="user",
259 | api_token="token",
260 | )
261 | client = ConfluenceClient(config=config)
262 | assert mock_session.proxies == {}
263 |
```
--------------------------------------------------------------------------------
/src/mcp_atlassian/preprocessing/base.py:
--------------------------------------------------------------------------------
```python
1 | """Base preprocessing module."""
2 |
3 | import logging
4 | import re
5 | import warnings
6 | from typing import Any, Protocol
7 |
8 | from bs4 import BeautifulSoup, Tag
9 | from markdownify import markdownify as md
10 |
11 | logger = logging.getLogger("mcp-atlassian")
12 |
13 |
14 | class ConfluenceClient(Protocol):
15 | """Protocol for Confluence client."""
16 |
17 | def get_user_details_by_accountid(self, account_id: str) -> dict[str, Any]:
18 | """Get user details by account ID."""
19 | ...
20 |
21 | def get_user_details_by_username(self, username: str) -> dict[str, Any]:
22 | """Get user details by username (for Server/DC compatibility)."""
23 | ...
24 |
25 |
26 | class BasePreprocessor:
27 | """Base class for text preprocessing operations."""
28 |
29 | def __init__(self, base_url: str = "") -> None:
30 | """
31 | Initialize the base text preprocessor.
32 |
33 | Args:
34 | base_url: Base URL for API server
35 | """
36 | self.base_url = base_url.rstrip("/") if base_url else ""
37 |
38 | def process_html_content(
39 | self,
40 | html_content: str,
41 | space_key: str = "",
42 | confluence_client: ConfluenceClient | None = None,
43 | ) -> tuple[str, str]:
44 | """
45 | Process HTML content to replace user refs and page links.
46 |
47 | Args:
48 | html_content: The HTML content to process
49 | space_key: Optional space key for context
50 | confluence_client: Optional Confluence client for user lookups
51 |
52 | Returns:
53 | Tuple of (processed_html, processed_markdown)
54 | """
55 | try:
56 | # Parse the HTML content
57 | soup = BeautifulSoup(html_content, "html.parser")
58 |
59 | # Process user mentions
60 | self._process_user_mentions_in_soup(soup, confluence_client)
61 | self._process_user_profile_macros_in_soup(soup, confluence_client)
62 |
63 | # Convert to string and markdown
64 | processed_html = str(soup)
65 | processed_markdown = md(processed_html)
66 |
67 | return processed_html, processed_markdown
68 |
69 | except Exception as e:
70 | logger.error(f"Error in process_html_content: {str(e)}")
71 | raise
72 |
73 | def _process_user_mentions_in_soup(
74 | self, soup: BeautifulSoup, confluence_client: ConfluenceClient | None = None
75 | ) -> None:
76 | """
77 | Process user mentions in BeautifulSoup object.
78 |
79 | Args:
80 | soup: BeautifulSoup object containing HTML
81 | confluence_client: Optional Confluence client for user lookups
82 | """
83 | # Find all ac:link elements that might contain user mentions
84 | user_mentions = soup.find_all("ac:link")
85 |
86 | for user_element in user_mentions:
87 | user_ref = user_element.find("ri:user")
88 | if user_ref and user_ref.get("ri:account-id"):
89 | # Case 1: Direct user reference without link-body
90 | account_id = user_ref.get("ri:account-id")
91 | if isinstance(account_id, str):
92 | self._replace_user_mention(
93 | user_element, account_id, confluence_client
94 | )
95 | continue
96 |
97 | # Case 2: User reference with link-body containing @
98 | link_body = user_element.find("ac:link-body")
99 | if link_body and "@" in link_body.get_text(strip=True):
100 | user_ref = user_element.find("ri:user")
101 | if user_ref and user_ref.get("ri:account-id"):
102 | account_id = user_ref.get("ri:account-id")
103 | if isinstance(account_id, str):
104 | self._replace_user_mention(
105 | user_element, account_id, confluence_client
106 | )
107 |
108 | def _process_user_profile_macros_in_soup(
109 | self, soup: BeautifulSoup, confluence_client: ConfluenceClient | None = None
110 | ) -> None:
111 | """
112 | Process Confluence User Profile macros in BeautifulSoup object.
113 | Replaces <ac:structured-macro ac:name="profile">...</ac:structured-macro>
114 | with the user's display name, typically formatted as @DisplayName.
115 |
116 | Args:
117 | soup: BeautifulSoup object containing HTML
118 | confluence_client: Optional Confluence client for user lookups
119 | """
120 | profile_macros = soup.find_all(
121 | "ac:structured-macro", attrs={"ac:name": "profile"}
122 | )
123 |
124 | for macro_element in profile_macros:
125 | user_param = macro_element.find("ac:parameter", attrs={"ac:name": "user"})
126 | if not user_param:
127 | logger.debug(
128 | "User profile macro found without a 'user' parameter. Replacing with placeholder."
129 | )
130 | macro_element.replace_with("[User Profile Macro (Malformed)]")
131 | continue
132 |
133 | user_ref = user_param.find("ri:user")
134 | if not user_ref:
135 | logger.debug(
136 | "User profile macro's 'user' parameter found without 'ri:user' tag. Replacing with placeholder."
137 | )
138 | macro_element.replace_with("[User Profile Macro (Malformed)]")
139 | continue
140 |
141 | account_id = user_ref.get("ri:account-id")
142 | userkey = user_ref.get("ri:userkey") # Fallback for Confluence Server/DC
143 |
144 | user_identifier_for_log = account_id or userkey
145 | display_name = None
146 |
147 | if confluence_client and user_identifier_for_log:
148 | try:
149 | if account_id and isinstance(account_id, str):
150 | user_details = confluence_client.get_user_details_by_accountid(
151 | account_id
152 | )
153 | display_name = user_details.get("displayName")
154 | elif userkey and isinstance(userkey, str):
155 | # For Confluence Server/DC, userkey might be the username
156 | user_details = confluence_client.get_user_details_by_username(
157 | userkey
158 | )
159 | display_name = user_details.get("displayName")
160 | except Exception as e:
161 | logger.warning(
162 | f"Error fetching user details for profile macro (user: {user_identifier_for_log}): {e}"
163 | )
164 | elif not confluence_client:
165 | logger.warning(
166 | "Confluence client not available for User Profile Macro processing."
167 | )
168 |
169 | if display_name:
170 | replacement_text = f"@{display_name}"
171 | macro_element.replace_with(replacement_text)
172 | else:
173 | fallback_identifier = (
174 | user_identifier_for_log
175 | if user_identifier_for_log
176 | else "unknown_user"
177 | )
178 | fallback_text = f"[User Profile: {fallback_identifier}]"
179 | macro_element.replace_with(fallback_text)
180 | logger.debug(f"Using fallback for user profile macro: {fallback_text}")
181 |
182 | def _replace_user_mention(
183 | self,
184 | user_element: Tag,
185 | account_id: str,
186 | confluence_client: ConfluenceClient | None = None,
187 | ) -> None:
188 | """
189 | Replace a user mention with the user's display name.
190 |
191 | Args:
192 | user_element: The HTML element containing the user mention
193 | account_id: The user's account ID
194 | confluence_client: Optional Confluence client for user lookups
195 | """
196 | try:
197 | # Only attempt to get user details if we have a valid confluence client
198 | if confluence_client is not None:
199 | user_details = confluence_client.get_user_details_by_accountid(
200 | account_id
201 | )
202 | display_name = user_details.get("displayName", "")
203 | if display_name:
204 | new_text = f"@{display_name}"
205 | user_element.replace_with(new_text)
206 | return
207 | # If we don't have a confluence client or couldn't get user details,
208 | # use fallback
209 | self._use_fallback_user_mention(user_element, account_id)
210 | except Exception as e:
211 | logger.warning(f"Error processing user mention: {str(e)}")
212 | self._use_fallback_user_mention(user_element, account_id)
213 |
214 | def _use_fallback_user_mention(self, user_element: Tag, account_id: str) -> None:
215 | """
216 | Replace user mention with a fallback when the API call fails.
217 |
218 | Args:
219 | user_element: The HTML element containing the user mention
220 | account_id: The user's account ID
221 | """
222 | # Fallback: just use the account ID
223 | new_text = f"@user_{account_id}"
224 | user_element.replace_with(new_text)
225 |
226 | def _convert_html_to_markdown(self, text: str) -> str:
227 | """Convert HTML content to markdown if needed."""
228 | if re.search(r"<[^>]+>", text):
229 | try:
230 | with warnings.catch_warnings():
231 | warnings.filterwarnings("ignore", category=UserWarning)
232 | soup = BeautifulSoup(f"<div>{text}</div>", "html.parser")
233 | html = str(soup.div.decode_contents()) if soup.div else text
234 | text = md(html)
235 | except Exception as e:
236 | logger.warning(f"Error converting HTML to markdown: {str(e)}")
237 | return text
238 |
```
--------------------------------------------------------------------------------
/tests/unit/confluence/test_comments.py:
--------------------------------------------------------------------------------
```python
1 | """Unit tests for the CommentsMixin class."""
2 |
3 | from unittest.mock import patch
4 |
5 | import pytest
6 | import requests
7 |
8 | from mcp_atlassian.confluence.comments import CommentsMixin
9 |
10 |
11 | class TestCommentsMixin:
12 | """Tests for the CommentsMixin class."""
13 |
14 | @pytest.fixture
15 | def comments_mixin(self, confluence_client):
16 | """Create a CommentsMixin instance for testing."""
17 | # CommentsMixin inherits from ConfluenceClient, so we need to create it properly
18 | with patch(
19 | "mcp_atlassian.confluence.comments.ConfluenceClient.__init__"
20 | ) as mock_init:
21 | mock_init.return_value = None
22 | mixin = CommentsMixin()
23 | # Copy the necessary attributes from our mocked client
24 | mixin.confluence = confluence_client.confluence
25 | mixin.config = confluence_client.config
26 | mixin.preprocessor = confluence_client.preprocessor
27 | return mixin
28 |
29 | def test_get_page_comments_success(self, comments_mixin):
30 | """Test get_page_comments with success response."""
31 | # Setup
32 | page_id = "12345"
33 | # Configure the mock to return a successful response
34 | comments_mixin.confluence.get_page_comments.return_value = {
35 | "results": [
36 | {
37 | "id": "12345",
38 | "body": {"view": {"value": "<p>Comment content here</p>"}},
39 | "version": {"number": 1},
40 | "author": {"displayName": "John Doe"},
41 | }
42 | ]
43 | }
44 |
45 | # Mock preprocessor
46 | comments_mixin.preprocessor.process_html_content.return_value = (
47 | "<p>Processed HTML</p>",
48 | "Processed Markdown",
49 | )
50 |
51 | # Call the method
52 | result = comments_mixin.get_page_comments(page_id)
53 |
54 | # Verify
55 | comments_mixin.confluence.get_page_comments.assert_called_once_with(
56 | content_id=page_id, expand="body.view.value,version", depth="all"
57 | )
58 | assert len(result) == 1
59 | assert result[0].body == "Processed Markdown"
60 |
61 | def test_get_page_comments_with_html(self, comments_mixin):
62 | """Test get_page_comments with HTML output instead of markdown."""
63 | # Setup
64 | page_id = "12345"
65 | comments_mixin.confluence.get_page_comments.return_value = {
66 | "results": [
67 | {
68 | "id": "12345",
69 | "body": {"view": {"value": "<p>Comment content here</p>"}},
70 | "version": {"number": 1},
71 | "author": {"displayName": "John Doe"},
72 | }
73 | ]
74 | }
75 |
76 | # Mock the HTML processing
77 | comments_mixin.preprocessor.process_html_content.return_value = (
78 | "<p>Processed HTML</p>",
79 | "Processed markdown",
80 | )
81 |
82 | # Call the method
83 | result = comments_mixin.get_page_comments(page_id, return_markdown=False)
84 |
85 | # Verify result
86 | assert len(result) == 1
87 | comment = result[0]
88 | assert comment.body == "<p>Processed HTML</p>"
89 |
90 | def test_get_page_comments_api_error(self, comments_mixin):
91 | """Test handling of API errors."""
92 | # Mock the API to raise an exception
93 | comments_mixin.confluence.get_page_comments.side_effect = (
94 | requests.RequestException("API error")
95 | )
96 |
97 | # Act
98 | result = comments_mixin.get_page_comments("987654321")
99 |
100 | # Assert
101 | assert isinstance(result, list)
102 | assert len(result) == 0 # Empty list on error
103 |
104 | def test_get_page_comments_key_error(self, comments_mixin):
105 | """Test handling of missing keys in API response."""
106 | # Mock the response to be missing expected keys
107 | comments_mixin.confluence.get_page_comments.return_value = {"invalid": "data"}
108 |
109 | # Act
110 | result = comments_mixin.get_page_comments("987654321")
111 |
112 | # Assert
113 | assert isinstance(result, list)
114 | assert len(result) == 0 # Empty list on error
115 |
116 | def test_get_page_comments_value_error(self, comments_mixin):
117 | """Test handling of unexpected data types."""
118 | # Cause a value error by returning a string where a dict is expected
119 | comments_mixin.confluence.get_page_by_id.return_value = "invalid"
120 |
121 | # Act
122 | result = comments_mixin.get_page_comments("987654321")
123 |
124 | # Assert
125 | assert isinstance(result, list)
126 | assert len(result) == 0 # Empty list on error
127 |
128 | def test_get_page_comments_with_empty_results(self, comments_mixin):
129 | """Test handling of empty results."""
130 | # Mock empty results
131 | comments_mixin.confluence.get_page_comments.return_value = {"results": []}
132 |
133 | # Act
134 | result = comments_mixin.get_page_comments("987654321")
135 |
136 | # Assert
137 | assert isinstance(result, list)
138 | assert len(result) == 0 # Empty list with no comments
139 |
140 | def test_add_comment_success(self, comments_mixin):
141 | """Test adding a comment with success response."""
142 | # Setup
143 | page_id = "12345"
144 | content = "This is a test comment"
145 |
146 | # Mock the page retrieval
147 | comments_mixin.confluence.get_page_by_id.return_value = {
148 | "space": {"key": "TEST"}
149 | }
150 |
151 | # Mock the preprocessor's conversion method
152 | comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
153 | "<p>This is a test comment</p>"
154 | )
155 |
156 | # Configure the mock to return a successful response
157 | comments_mixin.confluence.add_comment.return_value = {
158 | "id": "98765",
159 | "body": {"view": {"value": "<p>This is a test comment</p>"}},
160 | "version": {"number": 1},
161 | "author": {"displayName": "Test User"},
162 | }
163 |
164 | # Mock the HTML processing
165 | comments_mixin.preprocessor.process_html_content.return_value = (
166 | "<p>This is a test comment</p>",
167 | "This is a test comment",
168 | )
169 |
170 | # Call the method
171 | result = comments_mixin.add_comment(page_id, content)
172 |
173 | # Verify
174 | comments_mixin.confluence.add_comment.assert_called_once_with(
175 | page_id, "<p>This is a test comment</p>"
176 | )
177 | assert result is not None
178 | assert result.id == "98765"
179 | assert result.body == "This is a test comment"
180 |
181 | def test_add_comment_with_html_content(self, comments_mixin):
182 | """Test adding a comment with HTML content."""
183 | # Setup
184 | page_id = "12345"
185 | content = "<p>This is an <strong>HTML</strong> comment</p>"
186 |
187 | # Mock the page retrieval
188 | comments_mixin.confluence.get_page_by_id.return_value = {
189 | "space": {"key": "TEST"}
190 | }
191 |
192 | # Configure the mock to return a successful response
193 | comments_mixin.confluence.add_comment.return_value = {
194 | "id": "98765",
195 | "body": {
196 | "view": {"value": "<p>This is an <strong>HTML</strong> comment</p>"}
197 | },
198 | "version": {"number": 1},
199 | "author": {"displayName": "Test User"},
200 | }
201 |
202 | # Mock the HTML processing
203 | comments_mixin.preprocessor.process_html_content.return_value = (
204 | "<p>This is an <strong>HTML</strong> comment</p>",
205 | "This is an **HTML** comment",
206 | )
207 |
208 | # Call the method
209 | result = comments_mixin.add_comment(page_id, content)
210 |
211 | # Verify - should not call markdown conversion since content is already HTML
212 | comments_mixin.preprocessor.markdown_to_confluence_storage.assert_not_called()
213 | comments_mixin.confluence.add_comment.assert_called_once_with(page_id, content)
214 | assert result is not None
215 | assert result.body == "This is an **HTML** comment"
216 |
217 | def test_add_comment_api_error(self, comments_mixin):
218 | """Test handling of API errors when adding a comment."""
219 | # Setup
220 | page_id = "12345"
221 | content = "This is a test comment"
222 |
223 | # Mock the page retrieval
224 | comments_mixin.confluence.get_page_by_id.return_value = {
225 | "space": {"key": "TEST"}
226 | }
227 |
228 | # Mock the preprocessor's conversion method
229 | comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
230 | "<p>This is a test comment</p>"
231 | )
232 |
233 | # Mock the API to raise an exception
234 | comments_mixin.confluence.add_comment.side_effect = requests.RequestException(
235 | "API error"
236 | )
237 |
238 | # Call the method
239 | result = comments_mixin.add_comment(page_id, content)
240 |
241 | # Verify
242 | assert result is None
243 |
244 | def test_add_comment_empty_response(self, comments_mixin):
245 | """Test handling of empty API response when adding a comment."""
246 | # Setup
247 | page_id = "12345"
248 | content = "This is a test comment"
249 |
250 | # Mock the page retrieval
251 | comments_mixin.confluence.get_page_by_id.return_value = {
252 | "space": {"key": "TEST"}
253 | }
254 |
255 | # Mock the preprocessor's conversion method
256 | comments_mixin.preprocessor.markdown_to_confluence_storage.return_value = (
257 | "<p>This is a test comment</p>"
258 | )
259 |
260 | # Configure the mock to return an empty response
261 | comments_mixin.confluence.add_comment.return_value = None
262 |
263 | # Call the method
264 | result = comments_mixin.add_comment(page_id, content)
265 |
266 | # Verify
267 | assert result is None
268 |
```
--------------------------------------------------------------------------------
/tests/integration/test_ssl_verification.py:
--------------------------------------------------------------------------------
```python
1 | """Integration tests for SSL verification functionality."""
2 |
3 | import os
4 | from unittest.mock import MagicMock, patch
5 |
6 | import pytest
7 | from requests.exceptions import SSLError
8 | from requests.sessions import Session
9 |
10 | from mcp_atlassian.confluence.config import ConfluenceConfig
11 | from mcp_atlassian.jira.client import JiraClient
12 | from mcp_atlassian.jira.config import JiraConfig
13 | from mcp_atlassian.utils.ssl import SSLIgnoreAdapter, configure_ssl_verification
14 | from tests.utils.base import BaseAuthTest
15 | from tests.utils.mocks import MockEnvironment
16 |
17 |
18 | @pytest.mark.integration
19 | def test_configure_ssl_verification_with_real_confluence_url():
20 | """Test SSL verification configuration with real Confluence URL from environment."""
21 | # Get the URL from the environment
22 | url = os.getenv("CONFLUENCE_URL")
23 | if not url:
24 | pytest.skip("CONFLUENCE_URL not set in environment")
25 |
26 | # Create a real session
27 | session = Session()
28 | original_adapters_count = len(session.adapters)
29 |
30 | # Mock the SSL_VERIFY value to be False for this test
31 | with patch.dict(os.environ, {"CONFLUENCE_SSL_VERIFY": "false"}):
32 | # Configure SSL verification - explicitly pass ssl_verify=False
33 | configure_ssl_verification(
34 | service_name="Confluence",
35 | url=url,
36 | session=session,
37 | ssl_verify=False,
38 | )
39 |
40 | # Extract domain from URL (remove protocol and path)
41 | domain = url.split("://")[1].split("/")[0]
42 |
43 | # Verify the adapters are mounted correctly
44 | assert len(session.adapters) == original_adapters_count + 2
45 | assert f"https://{domain}" in session.adapters
46 | assert f"http://{domain}" in session.adapters
47 | assert isinstance(session.adapters[f"https://{domain}"], SSLIgnoreAdapter)
48 | assert isinstance(session.adapters[f"http://{domain}"], SSLIgnoreAdapter)
49 |
50 |
51 | class TestSSLVerificationEnhanced(BaseAuthTest):
52 | """Enhanced SSL verification tests using test utilities."""
53 |
54 | @pytest.mark.integration
55 | def test_ssl_verification_enabled_by_default(self):
56 | """Test that SSL verification is enabled by default."""
57 | with MockEnvironment.basic_auth_env():
58 | # For Jira
59 | jira_config = JiraConfig.from_env()
60 | assert jira_config.ssl_verify is True
61 |
62 | # For Confluence
63 | confluence_config = ConfluenceConfig.from_env()
64 | assert confluence_config.ssl_verify is True
65 |
66 | @pytest.mark.integration
67 | def test_ssl_verification_disabled_via_env(self):
68 | """Test SSL verification can be disabled via environment variables."""
69 | with MockEnvironment.basic_auth_env() as env_vars:
70 | env_vars["JIRA_SSL_VERIFY"] = "false"
71 | env_vars["CONFLUENCE_SSL_VERIFY"] = "false"
72 |
73 | # For Jira - need to reload config after env change
74 | with patch.dict(os.environ, env_vars):
75 | jira_config = JiraConfig.from_env()
76 | assert jira_config.ssl_verify is False
77 |
78 | # For Confluence
79 | confluence_config = ConfluenceConfig.from_env()
80 | assert confluence_config.ssl_verify is False
81 |
82 | @pytest.mark.integration
83 | def test_ssl_adapter_mounting_for_multiple_domains(self):
84 | """Test SSL adapters are correctly mounted for multiple domains."""
85 | session = Session()
86 |
87 | # Configure for multiple domains
88 | urls = [
89 | "https://domain1.atlassian.net",
90 | "https://domain2.atlassian.net/wiki",
91 | "https://custom.domain.com/jira",
92 | ]
93 |
94 | for url in urls:
95 | configure_ssl_verification(
96 | service_name="Test", url=url, session=session, ssl_verify=False
97 | )
98 |
99 | # Verify all domains have SSL adapters
100 | assert "https://domain1.atlassian.net" in session.adapters
101 | assert "https://domain2.atlassian.net" in session.adapters
102 | assert "https://custom.domain.com" in session.adapters
103 |
104 | @pytest.mark.integration
105 | def test_ssl_error_handling_with_invalid_cert(self, monkeypatch):
106 | """Test SSL error handling when certificate validation fails."""
107 | # Mock the Jira class to simulate SSL error
108 | mock_jira = MagicMock()
109 | mock_jira.side_effect = SSLError("Certificate verification failed")
110 | monkeypatch.setattr("mcp_atlassian.jira.client.Jira", mock_jira)
111 |
112 | with MockEnvironment.basic_auth_env():
113 | config = JiraConfig.from_env()
114 | config.ssl_verify = True # Ensure SSL verification is on
115 |
116 | # Creating client should raise SSL error
117 | with pytest.raises(SSLError, match="Certificate verification failed"):
118 | JiraClient(config=config)
119 |
120 | @pytest.mark.integration
121 | def test_ssl_verification_with_custom_ca_bundle(self):
122 | """Test SSL verification with custom CA bundle path."""
123 | with MockEnvironment.basic_auth_env() as env_vars:
124 | # Set custom CA bundle path
125 | custom_ca_path = "/path/to/custom/ca-bundle.crt"
126 | env_vars["JIRA_SSL_VERIFY"] = custom_ca_path
127 | env_vars["CONFLUENCE_SSL_VERIFY"] = custom_ca_path
128 |
129 | # For Jira - need to reload config after env change
130 | with patch.dict(os.environ, env_vars):
131 | jira_config = JiraConfig.from_env()
132 | # Note: Current implementation only supports boolean ssl_verify
133 | # Custom CA bundle paths are not supported in the config parsing
134 | assert (
135 | jira_config.ssl_verify is True
136 | ) # Any non-false value becomes True
137 |
138 | # For Confluence
139 | confluence_config = ConfluenceConfig.from_env()
140 | assert (
141 | confluence_config.ssl_verify is True
142 | ) # Any non-false value becomes True
143 |
144 | @pytest.mark.integration
145 | def test_ssl_adapter_not_mounted_when_verification_enabled(self):
146 | """Test that SSL adapters are not mounted when verification is enabled."""
147 | session = Session()
148 | original_adapter_count = len(session.adapters)
149 |
150 | # Configure with SSL verification enabled
151 | configure_ssl_verification(
152 | service_name="Jira",
153 | url="https://test.atlassian.net",
154 | session=session,
155 | ssl_verify=True, # SSL verification enabled
156 | )
157 |
158 | # No additional adapters should be mounted
159 | assert len(session.adapters) == original_adapter_count
160 | assert "https://test.atlassian.net" not in session.adapters
161 |
162 | @pytest.mark.integration
163 | def test_ssl_configuration_persistence_across_requests(self):
164 | """Test SSL configuration persists across multiple requests."""
165 | session = Session()
166 |
167 | # Configure SSL for a domain
168 | configure_ssl_verification(
169 | service_name="Jira",
170 | url="https://test.atlassian.net",
171 | session=session,
172 | ssl_verify=False,
173 | )
174 |
175 | # Get the adapter
176 | adapter = session.adapters.get("https://test.atlassian.net")
177 | assert isinstance(adapter, SSLIgnoreAdapter)
178 |
179 | # Configure again - should not create duplicate adapters
180 | configure_ssl_verification(
181 | service_name="Jira",
182 | url="https://test.atlassian.net",
183 | session=session,
184 | ssl_verify=False,
185 | )
186 |
187 | # Should still have an SSLIgnoreAdapter present
188 | new_adapter = session.adapters.get("https://test.atlassian.net")
189 | assert isinstance(new_adapter, SSLIgnoreAdapter)
190 |
191 | @pytest.mark.integration
192 | def test_ssl_verification_with_oauth_configuration(self):
193 | """Test SSL verification works correctly with OAuth configuration."""
194 | with MockEnvironment.oauth_env() as env_vars:
195 | # Add SSL configuration
196 | env_vars["JIRA_SSL_VERIFY"] = "false"
197 | env_vars["CONFLUENCE_SSL_VERIFY"] = "false"
198 |
199 | # OAuth config should still respect SSL settings
200 | # Need to reload config after env change
201 | with patch.dict(os.environ, env_vars):
202 | # Note: OAuth flow would need additional setup, but we're testing config only
203 | assert os.environ.get("JIRA_SSL_VERIFY") == "false"
204 | assert os.environ.get("CONFLUENCE_SSL_VERIFY") == "false"
205 |
206 |
207 | @pytest.mark.integration
208 | def test_configure_ssl_verification_with_real_jira_url():
209 | """Test SSL verification configuration with real Jira URL from environment."""
210 | # Get the URL from the environment
211 | url = os.getenv("JIRA_URL")
212 | if not url:
213 | pytest.skip("JIRA_URL not set in environment")
214 |
215 | # Create a real session
216 | session = Session()
217 | original_adapters_count = len(session.adapters)
218 |
219 | # Mock the SSL_VERIFY value to be False for this test
220 | with patch.dict(os.environ, {"JIRA_SSL_VERIFY": "false"}):
221 | # Configure SSL verification - explicitly pass ssl_verify=False
222 | configure_ssl_verification(
223 | service_name="Jira",
224 | url=url,
225 | session=session,
226 | ssl_verify=False,
227 | )
228 |
229 | # Extract domain from URL (remove protocol and path)
230 | domain = url.split("://")[1].split("/")[0]
231 |
232 | # Verify the adapters are mounted correctly
233 | assert len(session.adapters) == original_adapters_count + 2
234 | assert f"https://{domain}" in session.adapters
235 | assert f"http://{domain}" in session.adapters
236 | assert isinstance(session.adapters[f"https://{domain}"], SSLIgnoreAdapter)
237 | assert isinstance(session.adapters[f"http://{domain}"], SSLIgnoreAdapter)
238 |
```