# Directory Structure
```
├── .gitignore
├── LICENSE
├── pubmearch
│ ├── __init__.py
│ ├── analyzer.py
│ ├── pubmed_searcher.py
│ └── server.py
├── pyproject.toml
└── README.md
```
# Files
--------------------------------------------------------------------------------
/pubmearch/__init__.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | PubMed Analysis MCP Server Package
3 | """
```
--------------------------------------------------------------------------------
/pubmearch/analyzer.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | # -*- coding: utf-8 -*-
3 |
4 | """
5 | PubMed Analysis Module
6 |
7 | This module provides analysis functionality for PubMed search results,
8 | including research hotspots, trends, and publication statistics.
9 | """
10 |
11 | import os
12 | import re
13 | import json
14 | from datetime import datetime
15 | from collections import Counter, defaultdict
16 | from typing import Dict, List, Optional, Tuple, Any, Union
17 |
18 |
19 | class PubMedAnalyzer:
20 | """Class to analyze PubMed search results from text files."""
21 |
22 | def __init__(self, results_dir: str = "../results"):
23 | """
24 | Initialize the PubMed analyzer.
25 |
26 | Args:
27 | results_dir: Directory containing PubMed search result text files
28 | """
29 | self.results_dir = results_dir
30 |
31 | def parse_results_file(self, filepath: str) -> List[Dict[str, Any]]:
32 | """
33 | Parse a PubMed results file (txt or json) into structured data.
34 |
35 | Args:
36 | filepath: Path to the results file
37 |
38 | Returns:
39 | List of dictionaries containing structured article data
40 | """
41 | if not os.path.exists(filepath):
42 | raise FileNotFoundError(f"File not found: {filepath}")
43 |
44 | # Choose parsing method based on file extension
45 | if filepath.endswith('.json'):
46 | return self._parse_json_file(filepath)
47 | else:
48 | return self._parse_txt_file(filepath)
49 |
50 | def _parse_json_file(self, filepath: str) -> List[Dict[str, Any]]:
51 | """Parse a JSON results file."""
52 | with open(filepath, 'r', encoding='utf-8') as f:
53 | data = json.load(f)
54 | return data.get("articles", [])
55 |
56 | def _parse_txt_file(self, filepath: str) -> List[Dict[str, Any]]:
57 | """Parse a text results file."""
58 | articles = []
59 | current_article = None
60 | section = None
61 |
62 | with open(filepath, 'r', encoding='utf-8') as f:
63 | lines = f.readlines()
64 |
65 | i = 0
66 | while i < len(lines):
67 | line = lines[i].strip()
68 |
69 | # New article marker
70 | if line.startswith("Article ") and "-" * 10 in lines[i+1]:
71 | if current_article:
72 | articles.append(current_article)
73 | current_article = {
74 | "title": "",
75 | "authors": [],
76 | "journal": "",
77 | "publication_date": "",
78 | "abstract": "",
79 | "keywords": [],
80 | "pmid": "",
81 | "doi": ""
82 | }
83 | section = None
84 | i += 2 # Skip the separator line
85 |
86 | # Section headers
87 | elif line.startswith("Title: "):
88 | current_article["title"] = line[7:].strip()
89 | section = "title"
90 | elif line.startswith("Authors: "):
91 | authors_line = line[9:].strip()
92 | if authors_line != "N/A":
93 | current_article["authors"] = [a.strip() for a in authors_line.split(",")]
94 | section = None
95 | elif line.startswith("Journal: "):
96 | current_article["journal"] = line[9:].strip()
97 | section = None
98 | elif line.startswith("Publication Date: "):
99 | current_article["publication_date"] = line[18:].strip()
100 | section = None
101 | elif line == "Abstract:":
102 | section = "abstract"
103 | elif line.startswith("Keywords: "):
104 | keywords_line = line[10:].strip()
105 | current_article["keywords"] = [k.strip() for k in keywords_line.split(",")]
106 | section = None
107 | elif line.startswith("PMID: "):
108 | current_article["pmid"] = line[6:].strip()
109 | section = None
110 | elif line.startswith("DOI: "):
111 | current_article["doi"] = line[5:].strip()
112 | section = None
113 | elif line.startswith("=" * 20):
114 | section = None
115 |
116 | # Content sections
117 | elif section == "abstract" and line and not line.startswith("Keywords: "):
118 | current_article["abstract"] += line + " "
119 |
120 | i += 1
121 |
122 | # Add the last article
123 | if current_article:
124 | articles.append(current_article)
125 |
126 | return articles
127 |
128 | def extract_publication_dates(self, articles: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]:
129 | """
130 | Extract and parse publication dates from articles.
131 |
132 | Args:
133 | articles: List of article dictionaries
134 |
135 | Returns:
136 | List of tuples containing (article_title, publication_date)
137 | """
138 | publication_dates = []
139 |
140 | for article in articles:
141 | date_str = article.get("publication_date", "")
142 |
143 | # Try different formats
144 | parsed_date = None
145 |
146 | # Format: YYYY MMM
147 | if re.match(r"^\d{4} [A-Za-z]{3}$", date_str):
148 | try:
149 | parsed_date = datetime.strptime(date_str, "%Y %b")
150 | except ValueError:
151 | pass
152 |
153 | # Format: YYYY MMM DD
154 | elif re.match(r"^\d{4} [A-Za-z]{3} \d{1,2}$", date_str):
155 | try:
156 | parsed_date = datetime.strptime(date_str, "%Y %b %d")
157 | except ValueError:
158 | pass
159 |
160 | # Format: YYYY MMM-MMM
161 | elif re.match(r"^\d{4} [A-Za-z]{3}-[A-Za-z]{3}$", date_str):
162 | try:
163 | # Just use the first month
164 | month_part = date_str.split(" ")[1].split("-")[0]
165 | parsed_date = datetime.strptime(f"{date_str.split(' ')[0]} {month_part}", "%Y %b")
166 | except (ValueError, IndexError):
167 | pass
168 |
169 | # Format: YYYY
170 | elif re.match(r"^\d{4}$", date_str):
171 | try:
172 | parsed_date = datetime.strptime(date_str, "%Y")
173 | except ValueError:
174 | pass
175 |
176 | if parsed_date:
177 | publication_dates.append((article.get("title", ""), parsed_date))
178 |
179 | return publication_dates
180 |
181 | def analyze_research_keywords(self, articles: List[Dict[str, Any]], top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
182 | """
183 | Analyze research hotspots and trends based on keyword frequencies.
184 |
185 | Args:
186 | articles: List of article dictionaries
187 | top_n: Number of top keywords to include
188 | include_trends: Bool indicating whether to include trend analysis, default True.
189 |
190 | Returns:
191 | Dictionary with analysis results
192 | """
193 | # Extract all keywords
194 | all_keywords = []
195 | for article in articles:
196 | all_keywords.extend(article.get("keywords", []))
197 |
198 | # Count keyword frequencies
199 | keyword_counts = Counter(all_keywords)
200 |
201 | # Get top keywords
202 | top_keywords = keyword_counts.most_common(top_n)
203 |
204 | # Organize articles by keyword
205 | keyword_articles = defaultdict(list)
206 | for article in articles:
207 | article_keywords = article.get("keywords", [])
208 | for kw in article_keywords:
209 | if kw in dict(top_keywords):
210 | keyword_articles[kw].append({
211 | "title": article.get("title", ""),
212 | "authors": article.get("authors", []),
213 | "journal": article.get("journal", ""),
214 | "publication_date": article.get("publication_date", ""),
215 | "pmid": article.get("pmid", ""),
216 | "doi": article.get("doi", "")
217 | })
218 |
219 | # Prepare results
220 | results = {
221 | "top_keywords": [{"keyword": kw, "count": count} for kw, count in top_keywords],
222 | "keyword_articles": {kw: articles for kw, articles in keyword_articles.items()}
223 | }
224 |
225 | # 如果需要趋势分析
226 | if include_trends:
227 | # 提取发布日期
228 | pub_dates = self.extract_publication_dates(articles)
229 |
230 | # 按月份分组
231 | monthly_keyword_counts = defaultdict(lambda: defaultdict(int))
232 |
233 | for article in articles:
234 | date_str = article.get("publication_date", "")
235 | article_keywords = article.get("keywords", [])
236 |
237 | # 尝试解析日期
238 | parsed_date = None
239 | for title, date in pub_dates:
240 | if title == article.get("title", ""):
241 | parsed_date = date
242 | break
243 |
244 | if parsed_date:
245 | month_key = parsed_date.strftime("%Y-%m")
246 | for kw in article_keywords:
247 | if kw in dict(top_keywords):
248 | monthly_keyword_counts[month_key][kw] += 1
249 |
250 | # 转换为可排序格式并按日期排序
251 | sorted_months = sorted(monthly_keyword_counts.keys())
252 |
253 | # 准备趋势数据
254 | trend_data = {
255 | "months": sorted_months,
256 | "keywords": [kw for kw, _ in top_keywords],
257 | "counts": []
258 | }
259 |
260 | for keyword, _ in top_keywords:
261 | keyword_trend = []
262 | for month in sorted_months:
263 | keyword_trend.append(monthly_keyword_counts[month][keyword])
264 | trend_data["counts"].append({
265 | "keyword": keyword,
266 | "monthly_counts": keyword_trend
267 | })
268 |
269 | results["trends"] = trend_data
270 |
271 | return results
272 |
273 | def analyze_publication_count(self, articles: List[Dict[str, Any]], months_per_period: int = 3) -> Dict[str, Any]:
274 | """
275 | Analyze publication counts over time.
276 |
277 | Args:
278 | articles: List of article dictionaries
279 | months_per_period: Number of months to group by
280 |
281 | Returns:
282 | Dictionary with publication count analysis
283 | """
284 | # Extract publication dates
285 | pub_dates = self.extract_publication_dates(articles)
286 |
287 | # Group by period
288 | period_counts = defaultdict(int)
289 |
290 | for _, date in pub_dates:
291 | # Calculate period key based on months_per_period
292 | year = date.year
293 | month = date.month
294 | period = (month - 1) // months_per_period
295 | period_key = f"{year}-P{period+1}" # 1-indexed periods
296 |
297 | period_counts[period_key] += 1
298 |
299 | # Sort periods chronologically
300 | sorted_periods = sorted(period_counts.keys())
301 |
302 | # Prepare result
303 | results = {
304 | "periods": sorted_periods,
305 | "counts": [period_counts[period] for period in sorted_periods],
306 | "months_per_period": months_per_period,
307 | "total_publications": len(pub_dates)
308 | }
309 |
310 | return results
311 |
312 | def generate_comprehensive_analysis(self, filepath: str, top_keywords: int = 20,
313 | months_per_period: int = 3) -> Dict[str, Any]:
314 | """
315 | Generate a comprehensive analysis of PubMed results from a file.
316 |
317 | Args:
318 | filepath: Path to the results text file
319 | top_keywords: Number of top keywords for hotspot analysis
320 | months_per_period: Number of months per period for publication count
321 |
322 | Returns:
323 | Dictionary with comprehensive analysis results
324 | """
325 | try:
326 | articles = self.parse_results_file(filepath)
327 |
328 | if not articles:
329 | return {"error": "No articles found in the file."}
330 |
331 | # Generate analysis components
332 | keyword_analysis = self.analyze_research_keywords(articles, top_keywords)
333 | pub_counts = self.analyze_publication_count(articles, months_per_period)
334 |
335 | # Combine results
336 | results = {
337 | "file_analyzed": os.path.basename(filepath),
338 | "analysis_timestamp": datetime.now().isoformat(),
339 | "article_count": len(articles),
340 | "keyword_analysis": keyword_analysis,
341 | "publication_counts": pub_counts
342 | }
343 |
344 | return results
345 |
346 | except Exception as e:
347 | return {"error": str(e)}
348 |
349 | def list_result_files(self) -> List[str]:
350 | """
351 | List all result files in the results directory.
352 |
353 | Returns:
354 | List of filenames
355 | """
356 | if not os.path.exists(self.results_dir):
357 | return []
358 |
359 | return [f for f in os.listdir(self.results_dir) if f.endswith('.txt')]
360 |
```
--------------------------------------------------------------------------------
/pubmearch/server.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | # -*- coding: utf-8 -*-
3 |
4 | """
5 | PubMed Analysis MCP Server
6 |
7 | This module implements an MCP server for analyzing PubMed search results,
8 | providing tools to identify research hotspots, trends, and publication statistics.
9 |
10 | Note:
11 | - Firstly, always use search_pubmed pubmearch.tool to generate new results.
12 | - Secondly, for results analysis, always use JSON format files.
13 | """
14 |
15 | import os
16 | import sys
17 | import subprocess
18 | import json
19 | import logging
20 | import re
21 | from datetime import datetime
22 | from pathlib import Path
23 | from typing import Dict, List, Optional, Any, Union
24 |
25 | # Add parent directory to path to import PubMedSearcher from parent
26 | parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
27 | sys.path.append(parent_dir)
28 | from .pubmed_searcher import PubMedSearcher
29 | from .analyzer import PubMedAnalyzer
30 |
31 | # Import FastMCP
32 | from mcp.server.fastmcp import FastMCP, Context
33 |
34 | # Configure logging
35 | logging.basicConfig(
36 | level=logging.INFO,
37 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
38 | handlers=[
39 | logging.FileHandler(os.path.join(parent_dir, "pubmed_server.log")),
40 | logging.StreamHandler()
41 | ]
42 | )
43 | logger = logging.getLogger("pubmed-mcp-server")
44 |
45 | # make sure results directory exists
46 | now = datetime.now()
47 | time_string = now.strftime("%Y%m%d%H%M%S")
48 | results_dir = Path(__file__).resolve().parent / "results"
49 | os.makedirs(results_dir, exist_ok=True)
50 | logger.info(f"Results directory: {results_dir}")
51 |
52 | # Initialize analyzer
53 | analyzer = PubMedAnalyzer(results_dir=results_dir)
54 |
55 | # Initialize MCP server
56 | pubmearch = FastMCP(
57 | "PubMed Analyzer",
58 | description="MCP server for analyzing PubMed search results"
59 | )
60 |
61 | @pubmearch.tool()
62 | async def search_pubmed(
63 | advanced_search: str,
64 | start_date: Optional[str] = None,
65 | end_date: Optional[str] = None,
66 | max_results: int = 1000,
67 | output_filename: Optional[str] = None,
68 | ) -> Dict[str, Any]:
69 | try:
70 | logger.info(f"Starting PubMed search with query: {advanced_search}")
71 | NCBI_USER_EMAIL = os.getenv('NCBI_USER_EMAIL')
72 | NCBI_USER_API_KEY = os.getenv('NCBI_USER_API_KEY')
73 |
74 | if not NCBI_USER_EMAIL:
75 | logger.error("Email not provided and NCBI_USER_EMAIL environment variable not set")
76 | return {
77 | "success": False,
78 | "error": "Server configuration error: NCBI User Email is not set."
79 | }
80 | logger.info(f"Use email address: {NCBI_USER_EMAIL}")
81 |
82 | if NCBI_USER_API_KEY:
83 | logger.info(f"Using API key from environment.")
84 | else:
85 | logger.warning(f"NCBI_USER_API_KEY environment variable not found. Proceeding without API key.")
86 |
87 | searcher = PubMedSearcher(email = NCBI_USER_EMAIL, api_key = NCBI_USER_API_KEY)
88 |
89 | # Create date range if dates are provided
90 | # Note: The formats of start_date and end_date is always YYYY/MM/DD
91 | date_range = None
92 | if start_date or end_date:
93 | # Validate date formats
94 | date_pattern = re.compile(r'^\d{4}/\d{2}/\d{2}$')
95 | if start_date and not date_pattern.match(start_date):
96 | raise ValueError(f"Invalid start_date format: {start_date}. Must be YYYY/MM/DD")
97 | if end_date and not date_pattern.match(end_date):
98 | raise ValueError(f"Invalid end_date format: {end_date}. Must be YYYY/MM/DD")
99 |
100 | date_range = (start_date, end_date) if start_date and end_date else None
101 |
102 | # Perform search
103 | records = searcher.search(
104 | advanced_search=advanced_search,
105 | date_range=date_range,
106 | max_results=max_results
107 | )
108 |
109 | if not records:
110 | logger.warning("No results found for the search criteria")
111 | return {
112 | "success": False,
113 | "error": "No results found for the given criteria."
114 | }
115 |
116 | # Export both TXT and JSON formats
117 | if not output_filename:
118 | base_filename = f"pubmed_results_{time_string}"
119 | json_filename = f"{base_filename}.json"
120 | txt_filename = f"{base_filename}.txt"
121 | else:
122 | # Remove any existing extension
123 | base_filename = output_filename.rsplit('.', 1)[0] + f"_{time_string}"
124 | json_filename = f"{base_filename}.json"
125 | txt_filename = f"{base_filename}.txt"
126 |
127 | # Export both formats
128 | json_path = os.path.abspath(searcher.export_to_json(records, json_filename))
129 | txt_path = os.path.abspath(searcher.export_to_txt(records, txt_filename))
130 |
131 | # Verify if files were saved successfully
132 | if not os.path.exists(json_path):
133 | logger.error(f"Failed to create JSON file at {json_path}")
134 | return {
135 | "success": False,
136 | "error": f"Failed to save JSON results file."
137 | }
138 |
139 | logger.info(f"Successfully saved {len(records)} articles to JSON: {json_path}")
140 |
141 | return {
142 | "success": True,
143 | "message": f"Search completed successfully. Found {len(records)} articles.",
144 | "json_file": os.path.basename(json_path),
145 | "txt_file": os.path.basename(txt_path),
146 | "note": "JSON files are recommended for AI model analysis.",
147 | "article_count": len(records)
148 | }
149 |
150 | except ValueError as ve:
151 | logger.error(f"ValueError in search_pubmed: {str(ve)}", exc_info=True)
152 | return {"success": False, "error": str(ve)}
153 | except Exception as e:
154 | logger.error(f"Error in search_pubmed: {str(e)}", exc_info=True)
155 | return {
156 | "success": False,
157 | "error": f"Error during search: {str(e)}"
158 | }
159 |
160 | @pubmearch.tool()
161 | async def list_result_files() -> Dict[str, Any]:
162 | """Lists all available PubMed result files.
163 |
164 | Two types of files are returned:
165 | - JSON files (recommended): structured data, suitable for AI model analysis
166 | - TXT files (alternative): plain text format, for backward compatibility
167 | """
168 | try:
169 | logger.info(f"Listing result files in: {results_dir}")
170 |
171 | if not os.path.exists(results_dir):
172 | logger.warning(f"Results directory does not exist: {results_dir}")
173 | os.makedirs(results_dir, exist_ok=True)
174 | logger.info(f"Created results directory: {results_dir}")
175 | return {
176 | "success": True,
177 | "files": [],
178 | "count": 0,
179 | "directory": results_dir
180 | }
181 |
182 | # Get JSON and TXT files separately
183 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
184 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
185 |
186 | return {
187 | "success": True,
188 | "files": {
189 | "recommended": json_files, # JSON files (recommended)
190 | "alternative": txt_files # TXT files (alternative)
191 | },
192 | "count": len(json_files) + len(txt_files),
193 | "directory": results_dir,
194 | "note": "Always use JSON files first."
195 | }
196 | except Exception as e:
197 | logger.error(f"Error in list_result_files: {str(e)}", exc_info=True)
198 | return {
199 | "success": False,
200 | "error": str(e),
201 | "directory": results_dir if 'results_dir' in locals() else "unknown"
202 | }
203 |
204 | @pubmearch.tool()
205 | async def analyze_research_keywords(filename: str, top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
206 | """Analyze the research hotspots and trends in PubMed result files according keywords.
207 |
208 | Note: It is recommended to use JSON format files for better analysis results.
209 |
210 | Args:
211 | filename: File name of results. (.json format is recommended)
212 | top_n: Return the top n hot keywords.
213 | include_trends: Boolean value to determine whether to include trends analysis. Default is True.
214 | """
215 | try:
216 | filepath = os.path.join(results_dir, filename)
217 | logger.info(f"Analyzing research keywords from file: {filepath}")
218 |
219 | # Check if the file exists
220 | if not os.path.exists(filepath):
221 | logger.error(f"File not found: {filepath}")
222 | # JSON first
223 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
224 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
225 | return {
226 | "success": False,
227 | "error": f"File not found: {filepath}",
228 | "available_files": {
229 | "recommended": json_files,
230 | "alternative": txt_files
231 | },
232 | "note": "Always use JSON files first."
233 | }
234 |
235 | # Parse the result file
236 | articles = analyzer.parse_results_file(filepath)
237 |
238 | if not articles:
239 | logger.warning(f"No articles found in file: {filepath}")
240 | return {
241 | "success": False,
242 | "error": "No articles found in the file."
243 | }
244 |
245 | # Analyze keywords
246 | analysis_results = analyzer.analyze_research_keywords(articles, top_n, include_trends)
247 |
248 | return {
249 | "success": True,
250 | "file_analyzed": filename,
251 | "article_count": len(articles),
252 | "keyword_analysis": analysis_results
253 | }
254 |
255 | except Exception as e:
256 | logger.error(f"Error in analyze_research_keywords: {str(e)}", exc_info=True)
257 | return {
258 | "success": False,
259 | "error": str(e)
260 | }
261 |
262 | @pubmearch.tool()
263 | async def analyze_publication_count(filename: str, months_per_period: int = 3) -> Dict[str, Any]:
264 | """Analyze publication counts over time from a PubMed results file.
265 |
266 | Note: It is recommended to use JSON format files for better analysis results.
267 |
268 | Args:
269 | filename: File name of results. (.json format is recommended)
270 | months_per_period: Number of months per analysis period
271 | """
272 | try:
273 | filepath = os.path.join(results_dir, filename)
274 | logger.info(f"Analyzing publication counts from file: {filepath}")
275 |
276 | # Check if the file exists
277 | if not os.path.exists(filepath):
278 | logger.error(f"File not found: {filepath}")
279 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
280 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
281 | return {
282 | "success": False,
283 | "error": f"File not found: {filepath}",
284 | "available_files": {
285 | "recommended": json_files,
286 | "alternative": txt_files
287 | },
288 | "note": "Always use JSON files first."
289 | }
290 |
291 | # Parse the result file
292 | articles = analyzer.parse_results_file(filepath)
293 |
294 | if not articles:
295 | logger.warning(f"No articles found in file: {filepath}")
296 | return {
297 | "success": False,
298 | "error": "No articles found in the file."
299 | }
300 |
301 | # Analyze publication counts
302 | pub_counts = analyzer.analyze_publication_count(articles, months_per_period)
303 |
304 | return {
305 | "success": True,
306 | "file_analyzed": filename,
307 | "article_count": len(articles),
308 | "publication_counts": pub_counts
309 | }
310 |
311 | except Exception as e:
312 | logger.error(f"Error in analyze_publication_count: {str(e)}", exc_info=True)
313 | return {
314 | "success": False,
315 | "error": str(e)
316 | }
317 |
318 | @pubmearch.tool()
319 | async def generate_comprehensive_analysis(
320 | filename: str,
321 | top_keywords: int = 20,
322 | months_per_period: int = 3
323 | ) -> Dict[str, Any]:
324 | """Generate a comprehensive analysis of a PubMed results file.
325 |
326 | Note: It is recommended to use JSON format files for better analysis results.
327 |
328 | Args:
329 | filename: File name of results. (.json format is recommended)
330 | top_keywords: Number of top keywords to analyze
331 | months_per_period: Number of months per analysis period
332 | """
333 | try:
334 | filepath = os.path.join(results_dir, filename)
335 | logger.info(f"Generating comprehensive analysis from file: {filepath}")
336 |
337 | # Check if the file exists
338 | if not os.path.exists(filepath):
339 | logger.error(f"File not found: {filepath}")
340 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
341 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
342 | return {
343 | "success": False,
344 | "error": f"File not found: {filepath}",
345 | "available_files": {
346 | "recommended": json_files,
347 | "alternative": txt_files
348 | },
349 | "note": "Always use JSON files first."
350 | }
351 |
352 | # Generate comprehensive analysis directly
353 | results = analyzer.generate_comprehensive_analysis(
354 | filepath,
355 | top_keywords=top_keywords,
356 | months_per_period=months_per_period
357 | )
358 |
359 | if "error" in results:
360 | logger.error(f"Error in analysis: {results['error']}")
361 | return {
362 | "success": False,
363 | "error": results["error"]
364 | }
365 |
366 | logger.info("Comprehensive analysis completed successfully")
367 | return {
368 | "success": True,
369 | "analysis": results
370 | }
371 |
372 | except Exception as e:
373 | logger.error(f"Error in generate_comprehensive_analysis: {str(e)}", exc_info=True)
374 | return {
375 | "success": False,
376 | "error": str(e)
377 | }
378 |
379 | if __name__ == "__main__":
380 | os.makedirs(results_dir, exist_ok=True)
381 | pubmearch.run()
```
--------------------------------------------------------------------------------
/pubmearch/pubmed_searcher.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python
2 | # -*- coding: utf-8 -*-
3 |
4 | """
5 | PubMed Searcher Module
6 |
7 | This module provides functionality for searching PubMed and retrieving article data.
8 | """
9 |
10 | import os
11 | import re
12 | import time
13 | import json
14 | import logging
15 | from datetime import datetime
16 | from typing import List, Dict, Tuple, Optional, Any, Union
17 | from Bio import Entrez
18 | from pathlib import Path
19 |
20 |
21 | # Configure logging
22 | logger = logging.getLogger(__name__)
23 |
24 | class PubMedSearcher:
25 | """Class to search PubMed and retrieve article data."""
26 |
27 | def __init__(self, email: Optional[str] = None, results_dir: Optional[str] = None, api_key: Optional[str] = None):
28 | """
29 | Initialize PubMed searcher with email address in .env.
30 |
31 | Args:
32 | email: Email address for Entrez. If None, use NCBI_USER_EMAIL from environment variables.
33 | results_dir: Optional custom results directory path
34 | api_key: API key for NCBI. If None, use NCBI_USER_API_KEY from environment variables.
35 | """
36 | # use NCBI_USER_EMAIL from .env if email is not provided
37 | self.email = email if email is not None else os.getenv('NCBI_USER_EMAIL')
38 | self.api_key = api_key if api_key is not None else os.getenv('NCBI_USER_API_KEY')
39 | if not self.email:
40 | raise ValueError("Email is required. Either pass it directly or set NCBI_USER_EMAIL in .env")
41 |
42 | # Set up Entrez
43 | Entrez.email = self.email
44 | Entrez.api_key = self.api_key
45 |
46 | # Use provided results directory or create default
47 | self.results_dir = Path(results_dir) if results_dir else Path(__file__).resolve().parent / "results"
48 | os.makedirs(self.results_dir, exist_ok=True)
49 | logger.info(f"Using results directory: {self.results_dir}")
50 |
51 | def search(self,
52 | advanced_search: str,
53 | date_range: Optional[Tuple[str, str]] = None,
54 | max_results: int = 1000) -> List[Dict[str, Any]]:
55 | """
56 | Search PubMed using advanced search syntax.
57 |
58 | Args:
59 | advanced_search: PubMed advanced search query
60 | date_range: Optional tuple of (start_date, end_date),
61 | date format is always YYYY/MM/DD
62 | max_results: Maximum number of results to retrieve
63 |
64 | Returns:
65 | List of article dictionaries
66 | """
67 | search_term = advanced_search
68 |
69 | # Add date range to query if provided
70 | # Note: The formats of start_date and end_date is always YYYY/MM/DD
71 | if date_range:
72 | start_date, end_date = date_range
73 | date_filter = ""
74 |
75 | # start_date
76 | if start_date:
77 | date_filter += f" AND ('{start_date}'[Date - Publication]"
78 | if end_date:
79 | date_filter += f" : '{end_date}'[Date - Publication]"
80 | date_filter += ")"
81 | # if only end_date, set start_date to 1900/01/01 for inclusio
82 | elif end_date:
83 | date_filter += f" AND ('1900/01/01'[Date - Publication] : '{end_date}'[Date - Publication])"
84 |
85 | search_term += date_filter
86 |
87 | try:
88 | # Search PubMed
89 | logger.info(f"Searching PubMed with query: {search_term}")
90 | search_handle = Entrez.esearch(db="pubmed", term=search_term, retmax=max_results, usehistory="y")
91 | search_results = Entrez.read(search_handle)
92 | search_handle.close()
93 |
94 | webenv = search_results["WebEnv"]
95 | query_key = search_results["QueryKey"]
96 |
97 | # Get the count of results
98 | count = int(search_results["Count"])
99 | logger.info(f"Found {count} results, retrieving up to {max_results}")
100 |
101 | if count == 0:
102 | logger.warning("No results found")
103 | return []
104 |
105 | # Initialize an empty list to store articles
106 | articles = []
107 |
108 | # Fetch results in batches to avoid timeouts
109 | batch_size = 100
110 | for start in range(0, min(count, max_results), batch_size):
111 | end = min(count, start + batch_size, max_results)
112 | logger.info(f"Retrieving records {start+1} to {end}")
113 |
114 | try:
115 | # Fetch the records
116 | fetch_handle = Entrez.efetch(
117 | db="pubmed",
118 | retstart=start,
119 | retmax=batch_size,
120 | webenv=webenv,
121 | query_key=query_key,
122 | retmode="xml"
123 | )
124 |
125 | # Parse the records
126 | records = Entrez.read(fetch_handle)["PubmedArticle"]
127 | fetch_handle.close()
128 |
129 | # Process each record
130 | for record in records:
131 | article = self._parse_pubmed_record(record)
132 | articles.append(article)
133 |
134 | # Sleep to avoid overloading the NCBI server
135 | time.sleep(1)
136 |
137 | except Exception as e:
138 | logger.error(f"Error fetching batch {start+1} to {end}: {str(e)}")
139 | continue
140 |
141 | return articles
142 |
143 | except Exception as e:
144 | logger.error(f"Error searching PubMed: {str(e)}")
145 | return []
146 |
147 | def _parse_pubmed_record(self, record: Dict) -> Dict[str, Any]:
148 | """
149 | Parse a PubMed record into a structured article dictionary.
150 |
151 | Args:
152 | record: PubMed record from Entrez.read
153 |
154 | Returns:
155 | Dictionary containing structured article data
156 | """
157 | article_data = {}
158 |
159 | # Get MedlineCitation and Article
160 | medline_citation = record.get("MedlineCitation", {})
161 | article = medline_citation.get("Article", {})
162 |
163 | # Extract basic article information
164 | article_data["title"] = article.get("ArticleTitle", "")
165 |
166 | # Extract authors
167 | authors = []
168 | author_list = article.get("AuthorList", [])
169 | for author in author_list:
170 | if "LastName" in author and "ForeName" in author:
171 | authors.append(f"{author['LastName']} {author['ForeName']}")
172 | elif "LastName" in author and "Initials" in author:
173 | authors.append(f"{author['LastName']} {author['Initials']}")
174 | elif "LastName" in author:
175 | authors.append(author["LastName"])
176 | elif "CollectiveName" in author:
177 | authors.append(author["CollectiveName"])
178 | article_data["authors"] = authors
179 |
180 | # Extract journal information
181 | journal = article.get("Journal", {})
182 | article_data["journal"] = journal.get("Title", "")
183 |
184 | # Extract publication date
185 | pub_date = {}
186 | journal_issue = journal.get("JournalIssue", {})
187 | if "PubDate" in journal_issue:
188 | pub_date = journal_issue["PubDate"]
189 |
190 | pub_date_str = ""
191 | if "Year" in pub_date:
192 | pub_date_str = pub_date["Year"]
193 | if "Month" in pub_date:
194 | pub_date_str += f" {pub_date['Month']}"
195 | if "Day" in pub_date:
196 | pub_date_str += f" {pub_date['Day']}"
197 |
198 | article_data["publication_date"] = pub_date_str
199 |
200 | # Extract abstract
201 | abstract_text = ""
202 | if "Abstract" in article and "AbstractText" in article["Abstract"]:
203 | # Handle different abstract formats
204 | abstract_parts = article["Abstract"]["AbstractText"]
205 | if isinstance(abstract_parts, list):
206 | for part in abstract_parts:
207 | if isinstance(part, str):
208 | abstract_text += part + " "
209 | elif isinstance(part, dict) and "#text" in part:
210 | label = part.get("Label", "")
211 | text = part["#text"]
212 | if label:
213 | abstract_text += f"{label}: {text} "
214 | else:
215 | abstract_text += text + " "
216 | else:
217 | abstract_text = str(abstract_parts)
218 |
219 | article_data["abstract"] = abstract_text.strip()
220 |
221 | # Extract keywords
222 | keywords = []
223 | # MeSH headings
224 | mesh_headings = medline_citation.get("MeshHeadingList", [])
225 | for heading in mesh_headings:
226 | if "DescriptorName" in heading:
227 | descriptor = heading["DescriptorName"]
228 | if isinstance(descriptor, dict) and "content" in descriptor:
229 | keywords.append(descriptor["content"])
230 | elif isinstance(descriptor, str):
231 | keywords.append(descriptor)
232 |
233 | # Keywords from KeywordList
234 | keyword_lists = medline_citation.get("KeywordList", [])
235 | for keyword_list in keyword_lists:
236 | if isinstance(keyword_list, list):
237 | for keyword in keyword_list:
238 | if isinstance(keyword, str):
239 | keywords.append(keyword)
240 | elif isinstance(keyword, dict) and "content" in keyword:
241 | keywords.append(keyword["content"])
242 |
243 | article_data["keywords"] = keywords
244 |
245 | # Extract PMID
246 | pmid = medline_citation.get("PMID", "")
247 | if isinstance(pmid, dict) and "content" in pmid:
248 | article_data["pmid"] = pmid["content"]
249 | else:
250 | article_data["pmid"] = str(pmid)
251 |
252 | # Extract DOI - Final attempt with careful iteration
253 | doi = ""
254 | try:
255 | pubmed_data = record.get("PubmedData")
256 | if pubmed_data:
257 | article_id_list = pubmed_data.get("ArticleIdList")
258 | # Iterate through article_id_list if it exists and is iterable
259 | if article_id_list:
260 | try:
261 | for id_element in article_id_list:
262 | # Check if the element has attributes and the IdType is 'doi'
263 | # Handles Bio.Entrez.Parser.StringElement and similar objects
264 | if hasattr(id_element, 'attributes') and id_element.attributes.get('IdType') == 'doi':
265 | doi = str(id_element).strip() # Get the string value
266 | if doi: break # Found DOI, exit loop
267 | # Fallback check for plain dictionary structure (less common)
268 | elif isinstance(id_element, dict) and id_element.get('IdType') == 'doi':
269 | doi = id_element.get('content', '').strip() or id_element.get('#text', '').strip()
270 | if doi: break # Found DOI, exit loop
271 | except TypeError:
272 | # Handle cases where article_id_list might not be iterable (e.g., single element)
273 | # Check if the single element itself is the DOI
274 | if hasattr(article_id_list, 'attributes') and article_id_list.attributes.get('IdType') == 'doi':
275 | doi = str(article_id_list).strip()
276 |
277 | except Exception as e:
278 | print(f"Warning: Error during DOI extraction for PMID {article_data.get('pmid', 'N/A')}: {e}")
279 | doi = "" # Reset DOI on error
280 |
281 | article_data["doi"] = doi
282 |
283 | return article_data
284 |
285 | def export_to_txt(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
286 | """
287 | Export articles to a formatted text file.
288 |
289 | Args:
290 | articles: List of article dictionaries
291 | filename: Optional output filename
292 |
293 | Returns:
294 | Path to the created file
295 | """
296 | if not filename:
297 | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
298 | filename = f"pubmed_results_{timestamp}.txt"
299 |
300 | filepath = os.path.join(self.results_dir, filename)
301 |
302 | with open(filepath, 'w', encoding='utf-8') as f:
303 | for i, article in enumerate(articles, 1):
304 | f.write(f"Article {i}\n")
305 | f.write("-" * 80 + "\n")
306 | f.write(f"Title: {article.get('title', '')}\n")
307 | f.write(f"Authors: {', '.join(article.get('authors', []))}\n")
308 | f.write(f"Journal: {article.get('journal', '')}\n")
309 | f.write(f"Publication Date: {article.get('publication_date', '')}\n")
310 | f.write(f"Abstract:\n{article.get('abstract', '')}\n")
311 | f.write(f"Keywords: {', '.join(article.get('keywords', []))}\n")
312 | f.write(f"PMID: {article.get('pmid', '')}\n")
313 | f.write(f"DOI: https://doi.org/{article.get('doi', '')}\n")
314 | f.write("=" * 80 + "\n\n")
315 |
316 | logger.info(f"Exported {len(articles)} articles to {filepath}")
317 | return filepath
318 |
319 | def export_to_json(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
320 | """
321 | Export articles to JSON format file.
322 |
323 | Args:
324 | articles: List of article dictionaries
325 | filename: Optional output filename
326 |
327 | Returns:
328 | Path to the created file
329 | """
330 | if not filename:
331 | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
332 | filename = f"pubmed_results_{timestamp}.json"
333 |
334 | filepath = os.path.join(self.results_dir, filename)
335 |
336 | with open(filepath, 'w', encoding='utf-8') as f:
337 | json.dump({
338 | "metadata": {
339 | "export_time": datetime.now().isoformat(),
340 | "article_count": len(articles)
341 | },
342 | "articles": articles
343 | }, f, ensure_ascii=False, indent=2)
344 |
345 | logger.info(f"Exported {len(articles)} articles to {filepath}")
346 | return filepath
347 |
```