#
tokens: 10942/50000 4/4 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── LICENSE
├── pubmearch
│   ├── __init__.py
│   ├── analyzer.py
│   ├── pubmed_searcher.py
│   └── server.py
├── pyproject.toml
└── README.md
```

# Files

--------------------------------------------------------------------------------
/pubmearch/__init__.py:
--------------------------------------------------------------------------------

```python
1 | """
2 | PubMed Analysis MCP Server Package
3 | """ 
```

--------------------------------------------------------------------------------
/pubmearch/analyzer.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python
  2 | # -*- coding: utf-8 -*-
  3 | 
  4 | """
  5 | PubMed Analysis Module
  6 | 
  7 | This module provides analysis functionality for PubMed search results, 
  8 | including research hotspots, trends, and publication statistics.
  9 | """
 10 | 
 11 | import os
 12 | import re
 13 | import json
 14 | from datetime import datetime
 15 | from collections import Counter, defaultdict
 16 | from typing import Dict, List, Optional, Tuple, Any, Union
 17 | 
 18 | 
 19 | class PubMedAnalyzer:
 20 |     """Class to analyze PubMed search results from text files."""
 21 |     
 22 |     def __init__(self, results_dir: str = "../results"):
 23 |         """
 24 |         Initialize the PubMed analyzer.
 25 |         
 26 |         Args:
 27 |             results_dir: Directory containing PubMed search result text files
 28 |         """
 29 |         self.results_dir = results_dir
 30 |         
 31 |     def parse_results_file(self, filepath: str) -> List[Dict[str, Any]]:
 32 |         """
 33 |         Parse a PubMed results file (txt or json) into structured data.
 34 |         
 35 |         Args:
 36 |             filepath: Path to the results file
 37 |             
 38 |         Returns:
 39 |             List of dictionaries containing structured article data
 40 |         """
 41 |         if not os.path.exists(filepath):
 42 |             raise FileNotFoundError(f"File not found: {filepath}")
 43 |         
 44 |         # Choose parsing method based on file extension
 45 |         if filepath.endswith('.json'):
 46 |             return self._parse_json_file(filepath)
 47 |         else:
 48 |             return self._parse_txt_file(filepath)
 49 | 
 50 |     def _parse_json_file(self, filepath: str) -> List[Dict[str, Any]]:
 51 |         """Parse a JSON results file."""
 52 |         with open(filepath, 'r', encoding='utf-8') as f:
 53 |             data = json.load(f)
 54 |             return data.get("articles", [])
 55 | 
 56 |     def _parse_txt_file(self, filepath: str) -> List[Dict[str, Any]]:
 57 |         """Parse a text results file."""
 58 |         articles = []
 59 |         current_article = None
 60 |         section = None
 61 |         
 62 |         with open(filepath, 'r', encoding='utf-8') as f:
 63 |             lines = f.readlines()
 64 |             
 65 |             i = 0
 66 |             while i < len(lines):
 67 |                 line = lines[i].strip()
 68 |                 
 69 |                 # New article marker
 70 |                 if line.startswith("Article ") and "-" * 10 in lines[i+1]:
 71 |                     if current_article:
 72 |                         articles.append(current_article)
 73 |                     current_article = {
 74 |                         "title": "",
 75 |                         "authors": [],
 76 |                         "journal": "",
 77 |                         "publication_date": "",
 78 |                         "abstract": "",
 79 |                         "keywords": [],
 80 |                         "pmid": "",
 81 |                         "doi": ""
 82 |                     }
 83 |                     section = None
 84 |                     i += 2  # Skip the separator line
 85 |                 
 86 |                 # Section headers
 87 |                 elif line.startswith("Title: "):
 88 |                     current_article["title"] = line[7:].strip()
 89 |                     section = "title"
 90 |                 elif line.startswith("Authors: "):
 91 |                     authors_line = line[9:].strip()
 92 |                     if authors_line != "N/A":
 93 |                         current_article["authors"] = [a.strip() for a in authors_line.split(",")]
 94 |                     section = None
 95 |                 elif line.startswith("Journal: "):
 96 |                     current_article["journal"] = line[9:].strip()
 97 |                     section = None
 98 |                 elif line.startswith("Publication Date: "):
 99 |                     current_article["publication_date"] = line[18:].strip()
100 |                     section = None
101 |                 elif line == "Abstract:":
102 |                     section = "abstract"
103 |                 elif line.startswith("Keywords: "):
104 |                     keywords_line = line[10:].strip()
105 |                     current_article["keywords"] = [k.strip() for k in keywords_line.split(",")]
106 |                     section = None
107 |                 elif line.startswith("PMID: "):
108 |                     current_article["pmid"] = line[6:].strip()
109 |                     section = None
110 |                 elif line.startswith("DOI: "):
111 |                     current_article["doi"] = line[5:].strip()
112 |                     section = None
113 |                 elif line.startswith("=" * 20):
114 |                     section = None
115 |                 
116 |                 # Content sections
117 |                 elif section == "abstract" and line and not line.startswith("Keywords: "):
118 |                     current_article["abstract"] += line + " "
119 |                 
120 |                 i += 1
121 |             
122 |             # Add the last article
123 |             if current_article:
124 |                 articles.append(current_article)
125 |         
126 |         return articles
127 | 
128 |     def extract_publication_dates(self, articles: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]:
129 |         """
130 |         Extract and parse publication dates from articles.
131 |         
132 |         Args:
133 |             articles: List of article dictionaries
134 |             
135 |         Returns:
136 |             List of tuples containing (article_title, publication_date)
137 |         """
138 |         publication_dates = []
139 |         
140 |         for article in articles:
141 |             date_str = article.get("publication_date", "")
142 |             
143 |             # Try different formats
144 |             parsed_date = None
145 |             
146 |             # Format: YYYY MMM
147 |             if re.match(r"^\d{4} [A-Za-z]{3}$", date_str):
148 |                 try:
149 |                     parsed_date = datetime.strptime(date_str, "%Y %b")
150 |                 except ValueError:
151 |                     pass
152 |             
153 |             # Format: YYYY MMM DD
154 |             elif re.match(r"^\d{4} [A-Za-z]{3} \d{1,2}$", date_str):
155 |                 try:
156 |                     parsed_date = datetime.strptime(date_str, "%Y %b %d")
157 |                 except ValueError:
158 |                     pass
159 |             
160 |             # Format: YYYY MMM-MMM
161 |             elif re.match(r"^\d{4} [A-Za-z]{3}-[A-Za-z]{3}$", date_str):
162 |                 try:
163 |                     # Just use the first month
164 |                     month_part = date_str.split(" ")[1].split("-")[0]
165 |                     parsed_date = datetime.strptime(f"{date_str.split(' ')[0]} {month_part}", "%Y %b")
166 |                 except (ValueError, IndexError):
167 |                     pass
168 |             
169 |             # Format: YYYY
170 |             elif re.match(r"^\d{4}$", date_str):
171 |                 try:
172 |                     parsed_date = datetime.strptime(date_str, "%Y")
173 |                 except ValueError:
174 |                     pass
175 |             
176 |             if parsed_date:
177 |                 publication_dates.append((article.get("title", ""), parsed_date))
178 |         
179 |         return publication_dates
180 |     
181 |     def analyze_research_keywords(self, articles: List[Dict[str, Any]], top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
182 |         """
183 |         Analyze research hotspots and trends based on keyword frequencies.
184 |         
185 |         Args:
186 |             articles: List of article dictionaries
187 |             top_n: Number of top keywords to include
188 |             include_trends: Bool indicating whether to include trend analysis, default True.
189 |             
190 |         Returns:
191 |             Dictionary with analysis results
192 |         """
193 |         # Extract all keywords
194 |         all_keywords = []
195 |         for article in articles:
196 |             all_keywords.extend(article.get("keywords", []))
197 |         
198 |         # Count keyword frequencies
199 |         keyword_counts = Counter(all_keywords)
200 |         
201 |         # Get top keywords
202 |         top_keywords = keyword_counts.most_common(top_n)
203 |         
204 |         # Organize articles by keyword
205 |         keyword_articles = defaultdict(list)
206 |         for article in articles:
207 |             article_keywords = article.get("keywords", [])
208 |             for kw in article_keywords:
209 |                 if kw in dict(top_keywords):
210 |                     keyword_articles[kw].append({
211 |                         "title": article.get("title", ""),
212 |                         "authors": article.get("authors", []),
213 |                         "journal": article.get("journal", ""),
214 |                         "publication_date": article.get("publication_date", ""),
215 |                         "pmid": article.get("pmid", ""),
216 |                         "doi": article.get("doi", "")  
217 |                     })
218 |         
219 |         # Prepare results
220 |         results = {
221 |             "top_keywords": [{"keyword": kw, "count": count} for kw, count in top_keywords],
222 |             "keyword_articles": {kw: articles for kw, articles in keyword_articles.items()}
223 |         }
224 |         
225 |         # 如果需要趋势分析
226 |         if include_trends:
227 |             # 提取发布日期
228 |             pub_dates = self.extract_publication_dates(articles)
229 |             
230 |             # 按月份分组
231 |             monthly_keyword_counts = defaultdict(lambda: defaultdict(int))
232 |             
233 |             for article in articles:
234 |                 date_str = article.get("publication_date", "")
235 |                 article_keywords = article.get("keywords", [])
236 |                 
237 |                 # 尝试解析日期
238 |                 parsed_date = None
239 |                 for title, date in pub_dates:
240 |                     if title == article.get("title", ""):
241 |                         parsed_date = date
242 |                         break
243 |                 
244 |                 if parsed_date:
245 |                     month_key = parsed_date.strftime("%Y-%m")
246 |                     for kw in article_keywords:
247 |                         if kw in dict(top_keywords):
248 |                             monthly_keyword_counts[month_key][kw] += 1
249 |             
250 |             # 转换为可排序格式并按日期排序
251 |             sorted_months = sorted(monthly_keyword_counts.keys())
252 |             
253 |             # 准备趋势数据
254 |             trend_data = {
255 |                 "months": sorted_months,
256 |                 "keywords": [kw for kw, _ in top_keywords],
257 |                 "counts": []
258 |             }
259 |             
260 |             for keyword, _ in top_keywords:
261 |                 keyword_trend = []
262 |                 for month in sorted_months:
263 |                     keyword_trend.append(monthly_keyword_counts[month][keyword])
264 |                 trend_data["counts"].append({
265 |                     "keyword": keyword,
266 |                     "monthly_counts": keyword_trend
267 |                 })
268 |             
269 |             results["trends"] = trend_data
270 |         
271 |         return results
272 |     
273 |     def analyze_publication_count(self, articles: List[Dict[str, Any]], months_per_period: int = 3) -> Dict[str, Any]:
274 |         """
275 |         Analyze publication counts over time.
276 |         
277 |         Args:
278 |             articles: List of article dictionaries
279 |             months_per_period: Number of months to group by
280 |             
281 |         Returns:
282 |             Dictionary with publication count analysis
283 |         """
284 |         # Extract publication dates
285 |         pub_dates = self.extract_publication_dates(articles)
286 |         
287 |         # Group by period
288 |         period_counts = defaultdict(int)
289 |         
290 |         for _, date in pub_dates:
291 |             # Calculate period key based on months_per_period
292 |             year = date.year
293 |             month = date.month
294 |             period = (month - 1) // months_per_period
295 |             period_key = f"{year}-P{period+1}"  # 1-indexed periods
296 |             
297 |             period_counts[period_key] += 1
298 |         
299 |         # Sort periods chronologically
300 |         sorted_periods = sorted(period_counts.keys())
301 |         
302 |         # Prepare result
303 |         results = {
304 |             "periods": sorted_periods,
305 |             "counts": [period_counts[period] for period in sorted_periods],
306 |             "months_per_period": months_per_period,
307 |             "total_publications": len(pub_dates)
308 |         }
309 |         
310 |         return results
311 |     
312 |     def generate_comprehensive_analysis(self, filepath: str, top_keywords: int = 20,
313 |                                      months_per_period: int = 3) -> Dict[str, Any]:
314 |         """
315 |         Generate a comprehensive analysis of PubMed results from a file.
316 |         
317 |         Args:
318 |             filepath: Path to the results text file
319 |             top_keywords: Number of top keywords for hotspot analysis
320 |             months_per_period: Number of months per period for publication count
321 |             
322 |         Returns:
323 |             Dictionary with comprehensive analysis results
324 |         """
325 |         try:
326 |             articles = self.parse_results_file(filepath)
327 |             
328 |             if not articles:
329 |                 return {"error": "No articles found in the file."}
330 |             
331 |             # Generate analysis components
332 |             keyword_analysis = self.analyze_research_keywords(articles, top_keywords)
333 |             pub_counts = self.analyze_publication_count(articles, months_per_period)
334 |             
335 |             # Combine results
336 |             results = {
337 |                 "file_analyzed": os.path.basename(filepath),
338 |                 "analysis_timestamp": datetime.now().isoformat(),
339 |                 "article_count": len(articles),
340 |                 "keyword_analysis": keyword_analysis,
341 |                 "publication_counts": pub_counts
342 |             }
343 |             
344 |             return results
345 |             
346 |         except Exception as e:
347 |             return {"error": str(e)}
348 |     
349 |     def list_result_files(self) -> List[str]:
350 |         """
351 |         List all result files in the results directory.
352 |         
353 |         Returns:
354 |             List of filenames
355 |         """
356 |         if not os.path.exists(self.results_dir):
357 |             return []
358 |         
359 |         return [f for f in os.listdir(self.results_dir) if f.endswith('.txt')]
360 | 
```

--------------------------------------------------------------------------------
/pubmearch/server.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python
  2 | # -*- coding: utf-8 -*-
  3 | 
  4 | """
  5 | PubMed Analysis MCP Server
  6 | 
  7 | This module implements an MCP server for analyzing PubMed search results,
  8 | providing tools to identify research hotspots, trends, and publication statistics.
  9 | 
 10 | Note:
 11 |     - Firstly, always use search_pubmed pubmearch.tool to generate new results.
 12 |     - Secondly, for results analysis, always use JSON format files.
 13 | """
 14 | 
 15 | import os
 16 | import sys
 17 | import subprocess
 18 | import json
 19 | import logging
 20 | import re
 21 | from datetime import datetime
 22 | from pathlib import Path
 23 | from typing import Dict, List, Optional, Any, Union
 24 | 
 25 | # Add parent directory to path to import PubMedSearcher from parent
 26 | parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
 27 | sys.path.append(parent_dir)
 28 | from .pubmed_searcher import PubMedSearcher
 29 | from .analyzer import PubMedAnalyzer
 30 | 
 31 | # Import FastMCP
 32 | from mcp.server.fastmcp import FastMCP, Context
 33 | 
 34 | # Configure logging
 35 | logging.basicConfig(
 36 |     level=logging.INFO,
 37 |     format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 38 |     handlers=[
 39 |         logging.FileHandler(os.path.join(parent_dir, "pubmed_server.log")),
 40 |         logging.StreamHandler()
 41 |     ]
 42 | )
 43 | logger = logging.getLogger("pubmed-mcp-server")
 44 | 
 45 | # make sure results directory exists
 46 | now = datetime.now()
 47 | time_string = now.strftime("%Y%m%d%H%M%S")
 48 | results_dir = Path(__file__).resolve().parent / "results"
 49 | os.makedirs(results_dir, exist_ok=True)
 50 | logger.info(f"Results directory: {results_dir}")
 51 | 
 52 | # Initialize analyzer
 53 | analyzer = PubMedAnalyzer(results_dir=results_dir)
 54 | 
 55 | # Initialize MCP server
 56 | pubmearch = FastMCP(
 57 |     "PubMed Analyzer",
 58 |     description="MCP server for analyzing PubMed search results"
 59 | )
 60 | 
 61 | @pubmearch.tool()
 62 | async def search_pubmed(
 63 |     advanced_search: str,
 64 |     start_date: Optional[str] = None,
 65 |     end_date: Optional[str] = None,
 66 |     max_results: int = 1000,
 67 |     output_filename: Optional[str] = None,
 68 | ) -> Dict[str, Any]:
 69 |     try:
 70 |         logger.info(f"Starting PubMed search with query: {advanced_search}")
 71 |         NCBI_USER_EMAIL = os.getenv('NCBI_USER_EMAIL')
 72 |         NCBI_USER_API_KEY = os.getenv('NCBI_USER_API_KEY')
 73 | 
 74 |         if not NCBI_USER_EMAIL:
 75 |             logger.error("Email not provided and NCBI_USER_EMAIL environment variable not set")
 76 |             return {
 77 |                 "success": False,
 78 |                 "error": "Server configuration error: NCBI User Email is not set."
 79 |             }
 80 |         logger.info(f"Use email address: {NCBI_USER_EMAIL}")
 81 |         
 82 |         if NCBI_USER_API_KEY:
 83 |             logger.info(f"Using API key from environment.")
 84 |         else:
 85 |             logger.warning(f"NCBI_USER_API_KEY environment variable not found. Proceeding without API key.")
 86 |         
 87 |         searcher = PubMedSearcher(email = NCBI_USER_EMAIL, api_key = NCBI_USER_API_KEY)
 88 |         
 89 |         # Create date range if dates are provided
 90 |         # Note: The formats of start_date and end_date is always YYYY/MM/DD
 91 |         date_range = None
 92 |         if start_date or end_date:
 93 |             # Validate date formats
 94 |             date_pattern = re.compile(r'^\d{4}/\d{2}/\d{2}$')
 95 |             if start_date and not date_pattern.match(start_date):
 96 |                 raise ValueError(f"Invalid start_date format: {start_date}. Must be YYYY/MM/DD")
 97 |             if end_date and not date_pattern.match(end_date):
 98 |                 raise ValueError(f"Invalid end_date format: {end_date}. Must be YYYY/MM/DD")
 99 |             
100 |             date_range = (start_date, end_date) if start_date and end_date else None
101 |         
102 |         # Perform search
103 |         records = searcher.search(
104 |             advanced_search=advanced_search,
105 |             date_range=date_range,
106 |             max_results=max_results
107 |         )
108 |         
109 |         if not records:
110 |             logger.warning("No results found for the search criteria")
111 |             return {
112 |                 "success": False,
113 |                 "error": "No results found for the given criteria."
114 |             }
115 |         
116 |         # Export both TXT and JSON formats
117 |         if not output_filename:
118 |             base_filename = f"pubmed_results_{time_string}"
119 |             json_filename = f"{base_filename}.json"
120 |             txt_filename = f"{base_filename}.txt"
121 |         else:
122 |             # Remove any existing extension
123 |             base_filename = output_filename.rsplit('.', 1)[0] + f"_{time_string}"
124 |             json_filename = f"{base_filename}.json"
125 |             txt_filename = f"{base_filename}.txt"
126 |         
127 |         # Export both formats
128 |         json_path = os.path.abspath(searcher.export_to_json(records, json_filename))
129 |         txt_path = os.path.abspath(searcher.export_to_txt(records, txt_filename))
130 |         
131 |         # Verify if files were saved successfully
132 |         if not os.path.exists(json_path):
133 |             logger.error(f"Failed to create JSON file at {json_path}")
134 |             return {
135 |                 "success": False,
136 |                 "error": f"Failed to save JSON results file."
137 |             }
138 |             
139 |         logger.info(f"Successfully saved {len(records)} articles to JSON: {json_path}")
140 |         
141 |         return {
142 |             "success": True,
143 |             "message": f"Search completed successfully. Found {len(records)} articles.",
144 |             "json_file": os.path.basename(json_path),
145 |             "txt_file": os.path.basename(txt_path),
146 |             "note": "JSON files are recommended for AI model analysis.",
147 |             "article_count": len(records)
148 |         }
149 |         
150 |     except ValueError as ve: 
151 |         logger.error(f"ValueError in search_pubmed: {str(ve)}", exc_info=True)
152 |         return {"success": False, "error": str(ve)}
153 |     except Exception as e:
154 |         logger.error(f"Error in search_pubmed: {str(e)}", exc_info=True)
155 |         return {
156 |             "success": False,
157 |             "error": f"Error during search: {str(e)}"
158 |         }
159 | 
160 | @pubmearch.tool()
161 | async def list_result_files() -> Dict[str, Any]:
162 |     """Lists all available PubMed result files.
163 | 
164 |     Two types of files are returned:
165 |     - JSON files (recommended): structured data, suitable for AI model analysis
166 |     - TXT files (alternative): plain text format, for backward compatibility
167 |     """
168 |     try:
169 |         logger.info(f"Listing result files in: {results_dir}")
170 |         
171 |         if not os.path.exists(results_dir):
172 |             logger.warning(f"Results directory does not exist: {results_dir}")
173 |             os.makedirs(results_dir, exist_ok=True)
174 |             logger.info(f"Created results directory: {results_dir}")
175 |             return {
176 |                 "success": True,
177 |                 "files": [],
178 |                 "count": 0,
179 |                 "directory": results_dir
180 |             }
181 |         
182 |         # Get JSON and TXT files separately
183 |         json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
184 |         txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
185 |         
186 |         return {
187 |             "success": True,
188 |             "files": {
189 |                 "recommended": json_files,  # JSON files (recommended)
190 |                 "alternative": txt_files    # TXT files (alternative)
191 |             },
192 |             "count": len(json_files) + len(txt_files),
193 |             "directory": results_dir,
194 |             "note": "Always use JSON files first."
195 |         }
196 |     except Exception as e:
197 |         logger.error(f"Error in list_result_files: {str(e)}", exc_info=True)
198 |         return {
199 |             "success": False,
200 |             "error": str(e),
201 |             "directory": results_dir if 'results_dir' in locals() else "unknown"
202 |         }
203 | 
204 | @pubmearch.tool()
205 | async def analyze_research_keywords(filename: str, top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
206 |     """Analyze the research hotspots and trends in PubMed result files according keywords.
207 |     
208 |     Note: It is recommended to use JSON format files for better analysis results.
209 |     
210 |     Args:
211 |         filename: File name of results. (.json format is recommended)
212 |         top_n: Return the top n hot keywords.
213 |         include_trends: Boolean value to determine whether to include trends analysis. Default is True.
214 |     """
215 |     try:
216 |         filepath = os.path.join(results_dir, filename)
217 |         logger.info(f"Analyzing research keywords from file: {filepath}")
218 |         
219 |         # Check if the file exists
220 |         if not os.path.exists(filepath):
221 |             logger.error(f"File not found: {filepath}")
222 |             # JSON first
223 |             json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
224 |             txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
225 |             return {
226 |                 "success": False,
227 |                 "error": f"File not found: {filepath}",
228 |                 "available_files": {
229 |                     "recommended": json_files,
230 |                     "alternative": txt_files
231 |                 },
232 |                 "note": "Always use JSON files first."
233 |             }
234 |         
235 |         # Parse the result file
236 |         articles = analyzer.parse_results_file(filepath)
237 |         
238 |         if not articles:
239 |             logger.warning(f"No articles found in file: {filepath}")
240 |             return {
241 |                 "success": False,
242 |                 "error": "No articles found in the file."
243 |             }
244 |         
245 |         # Analyze keywords
246 |         analysis_results = analyzer.analyze_research_keywords(articles, top_n, include_trends)
247 |         
248 |         return {
249 |             "success": True,
250 |             "file_analyzed": filename,
251 |             "article_count": len(articles),
252 |             "keyword_analysis": analysis_results
253 |         }
254 |         
255 |     except Exception as e:
256 |         logger.error(f"Error in analyze_research_keywords: {str(e)}", exc_info=True)
257 |         return {
258 |             "success": False,
259 |             "error": str(e)
260 |         }
261 | 
262 | @pubmearch.tool()
263 | async def analyze_publication_count(filename: str, months_per_period: int = 3) -> Dict[str, Any]:
264 |     """Analyze publication counts over time from a PubMed results file.
265 |     
266 |     Note: It is recommended to use JSON format files for better analysis results.
267 |     
268 |     Args:
269 |         filename: File name of results. (.json format is recommended)
270 |         months_per_period: Number of months per analysis period
271 |     """
272 |     try:
273 |         filepath = os.path.join(results_dir, filename)
274 |         logger.info(f"Analyzing publication counts from file: {filepath}")
275 |         
276 |         # Check if the file exists
277 |         if not os.path.exists(filepath):
278 |             logger.error(f"File not found: {filepath}")
279 |             json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
280 |             txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
281 |             return {
282 |                 "success": False,
283 |                 "error": f"File not found: {filepath}",
284 |                 "available_files": {
285 |                     "recommended": json_files,
286 |                     "alternative": txt_files
287 |                 },
288 |                 "note": "Always use JSON files first."
289 |             }
290 |         
291 |         # Parse the result file
292 |         articles = analyzer.parse_results_file(filepath)
293 |         
294 |         if not articles:
295 |             logger.warning(f"No articles found in file: {filepath}")
296 |             return {
297 |                 "success": False,
298 |                 "error": "No articles found in the file."
299 |             }
300 |         
301 |         # Analyze publication counts
302 |         pub_counts = analyzer.analyze_publication_count(articles, months_per_period)
303 |         
304 |         return {
305 |             "success": True,
306 |             "file_analyzed": filename,
307 |             "article_count": len(articles),
308 |             "publication_counts": pub_counts
309 |         }
310 |         
311 |     except Exception as e:
312 |         logger.error(f"Error in analyze_publication_count: {str(e)}", exc_info=True)
313 |         return {
314 |             "success": False,
315 |             "error": str(e)
316 |         }
317 | 
318 | @pubmearch.tool()
319 | async def generate_comprehensive_analysis(
320 |     filename: str,
321 |     top_keywords: int = 20,
322 |     months_per_period: int = 3
323 | ) -> Dict[str, Any]:
324 |     """Generate a comprehensive analysis of a PubMed results file.
325 |     
326 |     Note: It is recommended to use JSON format files for better analysis results.
327 |     
328 |     Args:
329 |         filename: File name of results. (.json format is recommended)
330 |         top_keywords: Number of top keywords to analyze
331 |         months_per_period: Number of months per analysis period
332 |     """
333 |     try:
334 |         filepath = os.path.join(results_dir, filename)
335 |         logger.info(f"Generating comprehensive analysis from file: {filepath}")
336 |         
337 |         # Check if the file exists
338 |         if not os.path.exists(filepath):
339 |             logger.error(f"File not found: {filepath}")
340 |             json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
341 |             txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
342 |             return {
343 |                 "success": False,
344 |                 "error": f"File not found: {filepath}",
345 |                 "available_files": {
346 |                     "recommended": json_files,
347 |                     "alternative": txt_files
348 |                 },
349 |                 "note": "Always use JSON files first."
350 |             }
351 |         
352 |         # Generate comprehensive analysis directly
353 |         results = analyzer.generate_comprehensive_analysis(
354 |             filepath,
355 |             top_keywords=top_keywords,
356 |             months_per_period=months_per_period
357 |         )
358 |         
359 |         if "error" in results:
360 |             logger.error(f"Error in analysis: {results['error']}")
361 |             return {
362 |                 "success": False,
363 |                 "error": results["error"]
364 |             }
365 |         
366 |         logger.info("Comprehensive analysis completed successfully")
367 |         return {
368 |             "success": True,
369 |             "analysis": results
370 |         }
371 |         
372 |     except Exception as e:
373 |         logger.error(f"Error in generate_comprehensive_analysis: {str(e)}", exc_info=True)
374 |         return {
375 |             "success": False,
376 |             "error": str(e)
377 |         }
378 | 
379 | if __name__ == "__main__":
380 |     os.makedirs(results_dir, exist_ok=True)
381 |     pubmearch.run()
```

--------------------------------------------------------------------------------
/pubmearch/pubmed_searcher.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python
  2 | # -*- coding: utf-8 -*-
  3 | 
  4 | """
  5 | PubMed Searcher Module
  6 | 
  7 | This module provides functionality for searching PubMed and retrieving article data.
  8 | """
  9 | 
 10 | import os
 11 | import re
 12 | import time
 13 | import json
 14 | import logging
 15 | from datetime import datetime
 16 | from typing import List, Dict, Tuple, Optional, Any, Union
 17 | from Bio import Entrez
 18 | from pathlib import Path
 19 | 
 20 | 
 21 | # Configure logging
 22 | logger = logging.getLogger(__name__)
 23 | 
 24 | class PubMedSearcher:
 25 |     """Class to search PubMed and retrieve article data."""
 26 |     
 27 |     def __init__(self, email: Optional[str] = None, results_dir: Optional[str] = None, api_key: Optional[str] = None):
 28 |         """
 29 |         Initialize PubMed searcher with email address in .env.
 30 |         
 31 |         Args:
 32 |             email: Email address for Entrez. If None, use NCBI_USER_EMAIL from environment variables.
 33 |             results_dir: Optional custom results directory path
 34 |             api_key: API key for NCBI. If None, use NCBI_USER_API_KEY from environment variables.
 35 |         """
 36 |         # use NCBI_USER_EMAIL from .env if email is not provided
 37 |         self.email = email if email is not None else os.getenv('NCBI_USER_EMAIL')
 38 |         self.api_key = api_key if api_key is not None else os.getenv('NCBI_USER_API_KEY')
 39 |         if not self.email:
 40 |             raise ValueError("Email is required. Either pass it directly or set NCBI_USER_EMAIL in .env")
 41 |         
 42 |         # Set up Entrez
 43 |         Entrez.email = self.email
 44 |         Entrez.api_key = self.api_key
 45 |         
 46 |         # Use provided results directory or create default
 47 |         self.results_dir = Path(results_dir) if results_dir else Path(__file__).resolve().parent / "results"
 48 |         os.makedirs(self.results_dir, exist_ok=True)
 49 |         logger.info(f"Using results directory: {self.results_dir}")
 50 |     
 51 |     def search(self, 
 52 |               advanced_search: str, 
 53 |               date_range: Optional[Tuple[str, str]] = None,
 54 |               max_results: int = 1000) -> List[Dict[str, Any]]:
 55 |         """
 56 |         Search PubMed using advanced search syntax.
 57 |         
 58 |         Args:
 59 |             advanced_search: PubMed advanced search query
 60 |             date_range: Optional tuple of (start_date, end_date), 
 61 |                         date format is always YYYY/MM/DD
 62 |             max_results: Maximum number of results to retrieve
 63 |             
 64 |         Returns:
 65 |             List of article dictionaries
 66 |         """
 67 |         search_term = advanced_search
 68 |         
 69 |         # Add date range to query if provided
 70 |         # Note: The formats of start_date and end_date is always YYYY/MM/DD
 71 |         if date_range:
 72 |             start_date, end_date = date_range
 73 |             date_filter = ""
 74 |             
 75 |             # start_date
 76 |             if start_date:
 77 |                 date_filter += f" AND ('{start_date}'[Date - Publication]"
 78 |                 if end_date:
 79 |                     date_filter += f" : '{end_date}'[Date - Publication]"
 80 |                 date_filter += ")"
 81 |             # if only end_date, set start_date to 1900/01/01 for inclusio
 82 |             elif end_date:
 83 |                 date_filter += f" AND ('1900/01/01'[Date - Publication] : '{end_date}'[Date - Publication])"
 84 |             
 85 |             search_term += date_filter
 86 |         
 87 |         try:
 88 |             # Search PubMed
 89 |             logger.info(f"Searching PubMed with query: {search_term}")
 90 |             search_handle = Entrez.esearch(db="pubmed", term=search_term, retmax=max_results, usehistory="y")
 91 |             search_results = Entrez.read(search_handle)
 92 |             search_handle.close()
 93 |             
 94 |             webenv = search_results["WebEnv"]
 95 |             query_key = search_results["QueryKey"]
 96 |             
 97 |             # Get the count of results
 98 |             count = int(search_results["Count"])
 99 |             logger.info(f"Found {count} results, retrieving up to {max_results}")
100 |             
101 |             if count == 0:
102 |                 logger.warning("No results found")
103 |                 return []
104 |             
105 |             # Initialize an empty list to store articles
106 |             articles = []
107 |             
108 |             # Fetch results in batches to avoid timeouts
109 |             batch_size = 100
110 |             for start in range(0, min(count, max_results), batch_size):
111 |                 end = min(count, start + batch_size, max_results)
112 |                 logger.info(f"Retrieving records {start+1} to {end}")
113 |                 
114 |                 try:
115 |                     # Fetch the records
116 |                     fetch_handle = Entrez.efetch(
117 |                         db="pubmed", 
118 |                         retstart=start, 
119 |                         retmax=batch_size,
120 |                         webenv=webenv,
121 |                         query_key=query_key,
122 |                         retmode="xml"
123 |                     )
124 |                     
125 |                     # Parse the records
126 |                     records = Entrez.read(fetch_handle)["PubmedArticle"]
127 |                     fetch_handle.close()
128 |                     
129 |                     # Process each record
130 |                     for record in records:
131 |                         article = self._parse_pubmed_record(record)
132 |                         articles.append(article)
133 |                     
134 |                     # Sleep to avoid overloading the NCBI server
135 |                     time.sleep(1)
136 |                     
137 |                 except Exception as e:
138 |                     logger.error(f"Error fetching batch {start+1} to {end}: {str(e)}")
139 |                     continue
140 |             
141 |             return articles
142 |             
143 |         except Exception as e:
144 |             logger.error(f"Error searching PubMed: {str(e)}")
145 |             return []
146 |     
147 |     def _parse_pubmed_record(self, record: Dict) -> Dict[str, Any]:
148 |         """
149 |         Parse a PubMed record into a structured article dictionary.
150 |         
151 |         Args:
152 |             record: PubMed record from Entrez.read
153 |             
154 |         Returns:
155 |             Dictionary containing structured article data
156 |         """
157 |         article_data = {}
158 |         
159 |         # Get MedlineCitation and Article
160 |         medline_citation = record.get("MedlineCitation", {})
161 |         article = medline_citation.get("Article", {})
162 |         
163 |         # Extract basic article information
164 |         article_data["title"] = article.get("ArticleTitle", "")
165 |         
166 |         # Extract authors
167 |         authors = []
168 |         author_list = article.get("AuthorList", [])
169 |         for author in author_list:
170 |             if "LastName" in author and "ForeName" in author:
171 |                 authors.append(f"{author['LastName']} {author['ForeName']}")
172 |             elif "LastName" in author and "Initials" in author:
173 |                 authors.append(f"{author['LastName']} {author['Initials']}")
174 |             elif "LastName" in author:
175 |                 authors.append(author["LastName"])
176 |             elif "CollectiveName" in author:
177 |                 authors.append(author["CollectiveName"])
178 |         article_data["authors"] = authors
179 |         
180 |         # Extract journal information
181 |         journal = article.get("Journal", {})
182 |         article_data["journal"] = journal.get("Title", "")
183 |         
184 |         # Extract publication date
185 |         pub_date = {}
186 |         journal_issue = journal.get("JournalIssue", {})
187 |         if "PubDate" in journal_issue:
188 |             pub_date = journal_issue["PubDate"]
189 |         
190 |         pub_date_str = ""
191 |         if "Year" in pub_date:
192 |             pub_date_str = pub_date["Year"]
193 |             if "Month" in pub_date:
194 |                 pub_date_str += f" {pub_date['Month']}"
195 |                 if "Day" in pub_date:
196 |                     pub_date_str += f" {pub_date['Day']}"
197 |                     
198 |         article_data["publication_date"] = pub_date_str
199 |         
200 |         # Extract abstract
201 |         abstract_text = ""
202 |         if "Abstract" in article and "AbstractText" in article["Abstract"]:
203 |             # Handle different abstract formats
204 |             abstract_parts = article["Abstract"]["AbstractText"]
205 |             if isinstance(abstract_parts, list):
206 |                 for part in abstract_parts:
207 |                     if isinstance(part, str):
208 |                         abstract_text += part + " "
209 |                     elif isinstance(part, dict) and "#text" in part:
210 |                         label = part.get("Label", "")
211 |                         text = part["#text"]
212 |                         if label:
213 |                             abstract_text += f"{label}: {text} "
214 |                         else:
215 |                             abstract_text += text + " "
216 |             else:
217 |                 abstract_text = str(abstract_parts)
218 |         
219 |         article_data["abstract"] = abstract_text.strip()
220 |         
221 |         # Extract keywords
222 |         keywords = []
223 |         # MeSH headings
224 |         mesh_headings = medline_citation.get("MeshHeadingList", [])
225 |         for heading in mesh_headings:
226 |             if "DescriptorName" in heading:
227 |                 descriptor = heading["DescriptorName"]
228 |                 if isinstance(descriptor, dict) and "content" in descriptor:
229 |                     keywords.append(descriptor["content"])
230 |                 elif isinstance(descriptor, str):
231 |                     keywords.append(descriptor)
232 |         
233 |         # Keywords from KeywordList
234 |         keyword_lists = medline_citation.get("KeywordList", [])
235 |         for keyword_list in keyword_lists:
236 |             if isinstance(keyword_list, list):
237 |                 for keyword in keyword_list:
238 |                     if isinstance(keyword, str):
239 |                         keywords.append(keyword)
240 |                     elif isinstance(keyword, dict) and "content" in keyword:
241 |                         keywords.append(keyword["content"])
242 |         
243 |         article_data["keywords"] = keywords
244 |         
245 |         # Extract PMID
246 |         pmid = medline_citation.get("PMID", "")
247 |         if isinstance(pmid, dict) and "content" in pmid:
248 |             article_data["pmid"] = pmid["content"]
249 |         else:
250 |             article_data["pmid"] = str(pmid)
251 |         
252 |         # Extract DOI - Final attempt with careful iteration
253 |         doi = ""
254 |         try:
255 |             pubmed_data = record.get("PubmedData")
256 |             if pubmed_data:
257 |                 article_id_list = pubmed_data.get("ArticleIdList")
258 |                 # Iterate through article_id_list if it exists and is iterable
259 |                 if article_id_list:
260 |                     try:
261 |                         for id_element in article_id_list:
262 |                             # Check if the element has attributes and the IdType is 'doi'
263 |                             # Handles Bio.Entrez.Parser.StringElement and similar objects
264 |                             if hasattr(id_element, 'attributes') and id_element.attributes.get('IdType') == 'doi':
265 |                                 doi = str(id_element).strip() # Get the string value
266 |                                 if doi: break # Found DOI, exit loop
267 |                             # Fallback check for plain dictionary structure (less common)
268 |                             elif isinstance(id_element, dict) and id_element.get('IdType') == 'doi':
269 |                                 doi = id_element.get('content', '').strip() or id_element.get('#text', '').strip()
270 |                                 if doi: break # Found DOI, exit loop
271 |                     except TypeError:
272 |                         # Handle cases where article_id_list might not be iterable (e.g., single element)
273 |                         # Check if the single element itself is the DOI
274 |                         if hasattr(article_id_list, 'attributes') and article_id_list.attributes.get('IdType') == 'doi':
275 |                             doi = str(article_id_list).strip()
276 | 
277 |         except Exception as e:
278 |             print(f"Warning: Error during DOI extraction for PMID {article_data.get('pmid', 'N/A')}: {e}")
279 |             doi = "" # Reset DOI on error
280 |         
281 |         article_data["doi"] = doi
282 |         
283 |         return article_data
284 |     
285 |     def export_to_txt(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
286 |         """
287 |         Export articles to a formatted text file.
288 |         
289 |         Args:
290 |             articles: List of article dictionaries
291 |             filename: Optional output filename
292 |             
293 |         Returns:
294 |             Path to the created file
295 |         """
296 |         if not filename:
297 |             timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
298 |             filename = f"pubmed_results_{timestamp}.txt"
299 |         
300 |         filepath = os.path.join(self.results_dir, filename)
301 |         
302 |         with open(filepath, 'w', encoding='utf-8') as f:
303 |             for i, article in enumerate(articles, 1):
304 |                 f.write(f"Article {i}\n")
305 |                 f.write("-" * 80 + "\n")
306 |                 f.write(f"Title: {article.get('title', '')}\n")
307 |                 f.write(f"Authors: {', '.join(article.get('authors', []))}\n")
308 |                 f.write(f"Journal: {article.get('journal', '')}\n")
309 |                 f.write(f"Publication Date: {article.get('publication_date', '')}\n")
310 |                 f.write(f"Abstract:\n{article.get('abstract', '')}\n")
311 |                 f.write(f"Keywords: {', '.join(article.get('keywords', []))}\n")
312 |                 f.write(f"PMID: {article.get('pmid', '')}\n")
313 |                 f.write(f"DOI: https://doi.org/{article.get('doi', '')}\n")
314 |                 f.write("=" * 80 + "\n\n")
315 |         
316 |         logger.info(f"Exported {len(articles)} articles to {filepath}")
317 |         return filepath
318 |     
319 |     def export_to_json(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
320 |         """
321 |         Export articles to JSON format file.
322 |         
323 |         Args:
324 |             articles: List of article dictionaries
325 |             filename: Optional output filename
326 |             
327 |         Returns:
328 |             Path to the created file
329 |         """
330 |         if not filename:
331 |             timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
332 |             filename = f"pubmed_results_{timestamp}.json"
333 |         
334 |         filepath = os.path.join(self.results_dir, filename)
335 |         
336 |         with open(filepath, 'w', encoding='utf-8') as f:
337 |             json.dump({
338 |                 "metadata": {
339 |                     "export_time": datetime.now().isoformat(),
340 |                     "article_count": len(articles)
341 |                 },
342 |                 "articles": articles
343 |             }, f, ensure_ascii=False, indent=2)
344 |         
345 |         logger.info(f"Exported {len(articles)} articles to {filepath}")
346 |         return filepath
347 | 
```