# Directory Structure ``` ├── .gitignore ├── LICENSE ├── pubmearch │ ├── __init__.py │ ├── analyzer.py │ ├── pubmed_searcher.py │ └── server.py ├── pyproject.toml └── README.md ``` # Files -------------------------------------------------------------------------------- /pubmearch/__init__.py: -------------------------------------------------------------------------------- ```python 1 | """ 2 | PubMed Analysis MCP Server Package 3 | """ ``` -------------------------------------------------------------------------------- /pubmearch/analyzer.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # -*- coding: utf-8 -*- 3 | 4 | """ 5 | PubMed Analysis Module 6 | 7 | This module provides analysis functionality for PubMed search results, 8 | including research hotspots, trends, and publication statistics. 9 | """ 10 | 11 | import os 12 | import re 13 | import json 14 | from datetime import datetime 15 | from collections import Counter, defaultdict 16 | from typing import Dict, List, Optional, Tuple, Any, Union 17 | 18 | 19 | class PubMedAnalyzer: 20 | """Class to analyze PubMed search results from text files.""" 21 | 22 | def __init__(self, results_dir: str = "../results"): 23 | """ 24 | Initialize the PubMed analyzer. 25 | 26 | Args: 27 | results_dir: Directory containing PubMed search result text files 28 | """ 29 | self.results_dir = results_dir 30 | 31 | def parse_results_file(self, filepath: str) -> List[Dict[str, Any]]: 32 | """ 33 | Parse a PubMed results file (txt or json) into structured data. 34 | 35 | Args: 36 | filepath: Path to the results file 37 | 38 | Returns: 39 | List of dictionaries containing structured article data 40 | """ 41 | if not os.path.exists(filepath): 42 | raise FileNotFoundError(f"File not found: {filepath}") 43 | 44 | # Choose parsing method based on file extension 45 | if filepath.endswith('.json'): 46 | return self._parse_json_file(filepath) 47 | else: 48 | return self._parse_txt_file(filepath) 49 | 50 | def _parse_json_file(self, filepath: str) -> List[Dict[str, Any]]: 51 | """Parse a JSON results file.""" 52 | with open(filepath, 'r', encoding='utf-8') as f: 53 | data = json.load(f) 54 | return data.get("articles", []) 55 | 56 | def _parse_txt_file(self, filepath: str) -> List[Dict[str, Any]]: 57 | """Parse a text results file.""" 58 | articles = [] 59 | current_article = None 60 | section = None 61 | 62 | with open(filepath, 'r', encoding='utf-8') as f: 63 | lines = f.readlines() 64 | 65 | i = 0 66 | while i < len(lines): 67 | line = lines[i].strip() 68 | 69 | # New article marker 70 | if line.startswith("Article ") and "-" * 10 in lines[i+1]: 71 | if current_article: 72 | articles.append(current_article) 73 | current_article = { 74 | "title": "", 75 | "authors": [], 76 | "journal": "", 77 | "publication_date": "", 78 | "abstract": "", 79 | "keywords": [], 80 | "pmid": "", 81 | "doi": "" 82 | } 83 | section = None 84 | i += 2 # Skip the separator line 85 | 86 | # Section headers 87 | elif line.startswith("Title: "): 88 | current_article["title"] = line[7:].strip() 89 | section = "title" 90 | elif line.startswith("Authors: "): 91 | authors_line = line[9:].strip() 92 | if authors_line != "N/A": 93 | current_article["authors"] = [a.strip() for a in authors_line.split(",")] 94 | section = None 95 | elif line.startswith("Journal: "): 96 | current_article["journal"] = line[9:].strip() 97 | section = None 98 | elif line.startswith("Publication Date: "): 99 | current_article["publication_date"] = line[18:].strip() 100 | section = None 101 | elif line == "Abstract:": 102 | section = "abstract" 103 | elif line.startswith("Keywords: "): 104 | keywords_line = line[10:].strip() 105 | current_article["keywords"] = [k.strip() for k in keywords_line.split(",")] 106 | section = None 107 | elif line.startswith("PMID: "): 108 | current_article["pmid"] = line[6:].strip() 109 | section = None 110 | elif line.startswith("DOI: "): 111 | current_article["doi"] = line[5:].strip() 112 | section = None 113 | elif line.startswith("=" * 20): 114 | section = None 115 | 116 | # Content sections 117 | elif section == "abstract" and line and not line.startswith("Keywords: "): 118 | current_article["abstract"] += line + " " 119 | 120 | i += 1 121 | 122 | # Add the last article 123 | if current_article: 124 | articles.append(current_article) 125 | 126 | return articles 127 | 128 | def extract_publication_dates(self, articles: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]: 129 | """ 130 | Extract and parse publication dates from articles. 131 | 132 | Args: 133 | articles: List of article dictionaries 134 | 135 | Returns: 136 | List of tuples containing (article_title, publication_date) 137 | """ 138 | publication_dates = [] 139 | 140 | for article in articles: 141 | date_str = article.get("publication_date", "") 142 | 143 | # Try different formats 144 | parsed_date = None 145 | 146 | # Format: YYYY MMM 147 | if re.match(r"^\d{4} [A-Za-z]{3}$", date_str): 148 | try: 149 | parsed_date = datetime.strptime(date_str, "%Y %b") 150 | except ValueError: 151 | pass 152 | 153 | # Format: YYYY MMM DD 154 | elif re.match(r"^\d{4} [A-Za-z]{3} \d{1,2}$", date_str): 155 | try: 156 | parsed_date = datetime.strptime(date_str, "%Y %b %d") 157 | except ValueError: 158 | pass 159 | 160 | # Format: YYYY MMM-MMM 161 | elif re.match(r"^\d{4} [A-Za-z]{3}-[A-Za-z]{3}$", date_str): 162 | try: 163 | # Just use the first month 164 | month_part = date_str.split(" ")[1].split("-")[0] 165 | parsed_date = datetime.strptime(f"{date_str.split(' ')[0]} {month_part}", "%Y %b") 166 | except (ValueError, IndexError): 167 | pass 168 | 169 | # Format: YYYY 170 | elif re.match(r"^\d{4}$", date_str): 171 | try: 172 | parsed_date = datetime.strptime(date_str, "%Y") 173 | except ValueError: 174 | pass 175 | 176 | if parsed_date: 177 | publication_dates.append((article.get("title", ""), parsed_date)) 178 | 179 | return publication_dates 180 | 181 | def analyze_research_keywords(self, articles: List[Dict[str, Any]], top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]: 182 | """ 183 | Analyze research hotspots and trends based on keyword frequencies. 184 | 185 | Args: 186 | articles: List of article dictionaries 187 | top_n: Number of top keywords to include 188 | include_trends: Bool indicating whether to include trend analysis, default True. 189 | 190 | Returns: 191 | Dictionary with analysis results 192 | """ 193 | # Extract all keywords 194 | all_keywords = [] 195 | for article in articles: 196 | all_keywords.extend(article.get("keywords", [])) 197 | 198 | # Count keyword frequencies 199 | keyword_counts = Counter(all_keywords) 200 | 201 | # Get top keywords 202 | top_keywords = keyword_counts.most_common(top_n) 203 | 204 | # Organize articles by keyword 205 | keyword_articles = defaultdict(list) 206 | for article in articles: 207 | article_keywords = article.get("keywords", []) 208 | for kw in article_keywords: 209 | if kw in dict(top_keywords): 210 | keyword_articles[kw].append({ 211 | "title": article.get("title", ""), 212 | "authors": article.get("authors", []), 213 | "journal": article.get("journal", ""), 214 | "publication_date": article.get("publication_date", ""), 215 | "pmid": article.get("pmid", ""), 216 | "doi": article.get("doi", "") 217 | }) 218 | 219 | # Prepare results 220 | results = { 221 | "top_keywords": [{"keyword": kw, "count": count} for kw, count in top_keywords], 222 | "keyword_articles": {kw: articles for kw, articles in keyword_articles.items()} 223 | } 224 | 225 | # 如果需要趋势分析 226 | if include_trends: 227 | # 提取发布日期 228 | pub_dates = self.extract_publication_dates(articles) 229 | 230 | # 按月份分组 231 | monthly_keyword_counts = defaultdict(lambda: defaultdict(int)) 232 | 233 | for article in articles: 234 | date_str = article.get("publication_date", "") 235 | article_keywords = article.get("keywords", []) 236 | 237 | # 尝试解析日期 238 | parsed_date = None 239 | for title, date in pub_dates: 240 | if title == article.get("title", ""): 241 | parsed_date = date 242 | break 243 | 244 | if parsed_date: 245 | month_key = parsed_date.strftime("%Y-%m") 246 | for kw in article_keywords: 247 | if kw in dict(top_keywords): 248 | monthly_keyword_counts[month_key][kw] += 1 249 | 250 | # 转换为可排序格式并按日期排序 251 | sorted_months = sorted(monthly_keyword_counts.keys()) 252 | 253 | # 准备趋势数据 254 | trend_data = { 255 | "months": sorted_months, 256 | "keywords": [kw for kw, _ in top_keywords], 257 | "counts": [] 258 | } 259 | 260 | for keyword, _ in top_keywords: 261 | keyword_trend = [] 262 | for month in sorted_months: 263 | keyword_trend.append(monthly_keyword_counts[month][keyword]) 264 | trend_data["counts"].append({ 265 | "keyword": keyword, 266 | "monthly_counts": keyword_trend 267 | }) 268 | 269 | results["trends"] = trend_data 270 | 271 | return results 272 | 273 | def analyze_publication_count(self, articles: List[Dict[str, Any]], months_per_period: int = 3) -> Dict[str, Any]: 274 | """ 275 | Analyze publication counts over time. 276 | 277 | Args: 278 | articles: List of article dictionaries 279 | months_per_period: Number of months to group by 280 | 281 | Returns: 282 | Dictionary with publication count analysis 283 | """ 284 | # Extract publication dates 285 | pub_dates = self.extract_publication_dates(articles) 286 | 287 | # Group by period 288 | period_counts = defaultdict(int) 289 | 290 | for _, date in pub_dates: 291 | # Calculate period key based on months_per_period 292 | year = date.year 293 | month = date.month 294 | period = (month - 1) // months_per_period 295 | period_key = f"{year}-P{period+1}" # 1-indexed periods 296 | 297 | period_counts[period_key] += 1 298 | 299 | # Sort periods chronologically 300 | sorted_periods = sorted(period_counts.keys()) 301 | 302 | # Prepare result 303 | results = { 304 | "periods": sorted_periods, 305 | "counts": [period_counts[period] for period in sorted_periods], 306 | "months_per_period": months_per_period, 307 | "total_publications": len(pub_dates) 308 | } 309 | 310 | return results 311 | 312 | def generate_comprehensive_analysis(self, filepath: str, top_keywords: int = 20, 313 | months_per_period: int = 3) -> Dict[str, Any]: 314 | """ 315 | Generate a comprehensive analysis of PubMed results from a file. 316 | 317 | Args: 318 | filepath: Path to the results text file 319 | top_keywords: Number of top keywords for hotspot analysis 320 | months_per_period: Number of months per period for publication count 321 | 322 | Returns: 323 | Dictionary with comprehensive analysis results 324 | """ 325 | try: 326 | articles = self.parse_results_file(filepath) 327 | 328 | if not articles: 329 | return {"error": "No articles found in the file."} 330 | 331 | # Generate analysis components 332 | keyword_analysis = self.analyze_research_keywords(articles, top_keywords) 333 | pub_counts = self.analyze_publication_count(articles, months_per_period) 334 | 335 | # Combine results 336 | results = { 337 | "file_analyzed": os.path.basename(filepath), 338 | "analysis_timestamp": datetime.now().isoformat(), 339 | "article_count": len(articles), 340 | "keyword_analysis": keyword_analysis, 341 | "publication_counts": pub_counts 342 | } 343 | 344 | return results 345 | 346 | except Exception as e: 347 | return {"error": str(e)} 348 | 349 | def list_result_files(self) -> List[str]: 350 | """ 351 | List all result files in the results directory. 352 | 353 | Returns: 354 | List of filenames 355 | """ 356 | if not os.path.exists(self.results_dir): 357 | return [] 358 | 359 | return [f for f in os.listdir(self.results_dir) if f.endswith('.txt')] 360 | ``` -------------------------------------------------------------------------------- /pubmearch/server.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # -*- coding: utf-8 -*- 3 | 4 | """ 5 | PubMed Analysis MCP Server 6 | 7 | This module implements an MCP server for analyzing PubMed search results, 8 | providing tools to identify research hotspots, trends, and publication statistics. 9 | 10 | Note: 11 | - Firstly, always use search_pubmed pubmearch.tool to generate new results. 12 | - Secondly, for results analysis, always use JSON format files. 13 | """ 14 | 15 | import os 16 | import sys 17 | import subprocess 18 | import json 19 | import logging 20 | import re 21 | from datetime import datetime 22 | from pathlib import Path 23 | from typing import Dict, List, Optional, Any, Union 24 | 25 | # Add parent directory to path to import PubMedSearcher from parent 26 | parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) 27 | sys.path.append(parent_dir) 28 | from .pubmed_searcher import PubMedSearcher 29 | from .analyzer import PubMedAnalyzer 30 | 31 | # Import FastMCP 32 | from mcp.server.fastmcp import FastMCP, Context 33 | 34 | # Configure logging 35 | logging.basicConfig( 36 | level=logging.INFO, 37 | format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", 38 | handlers=[ 39 | logging.FileHandler(os.path.join(parent_dir, "pubmed_server.log")), 40 | logging.StreamHandler() 41 | ] 42 | ) 43 | logger = logging.getLogger("pubmed-mcp-server") 44 | 45 | # make sure results directory exists 46 | now = datetime.now() 47 | time_string = now.strftime("%Y%m%d%H%M%S") 48 | results_dir = Path(__file__).resolve().parent / "results" 49 | os.makedirs(results_dir, exist_ok=True) 50 | logger.info(f"Results directory: {results_dir}") 51 | 52 | # Initialize analyzer 53 | analyzer = PubMedAnalyzer(results_dir=results_dir) 54 | 55 | # Initialize MCP server 56 | pubmearch = FastMCP( 57 | "PubMed Analyzer", 58 | description="MCP server for analyzing PubMed search results" 59 | ) 60 | 61 | @pubmearch.tool() 62 | async def search_pubmed( 63 | advanced_search: str, 64 | start_date: Optional[str] = None, 65 | end_date: Optional[str] = None, 66 | max_results: int = 1000, 67 | output_filename: Optional[str] = None, 68 | ) -> Dict[str, Any]: 69 | try: 70 | logger.info(f"Starting PubMed search with query: {advanced_search}") 71 | NCBI_USER_EMAIL = os.getenv('NCBI_USER_EMAIL') 72 | NCBI_USER_API_KEY = os.getenv('NCBI_USER_API_KEY') 73 | 74 | if not NCBI_USER_EMAIL: 75 | logger.error("Email not provided and NCBI_USER_EMAIL environment variable not set") 76 | return { 77 | "success": False, 78 | "error": "Server configuration error: NCBI User Email is not set." 79 | } 80 | logger.info(f"Use email address: {NCBI_USER_EMAIL}") 81 | 82 | if NCBI_USER_API_KEY: 83 | logger.info(f"Using API key from environment.") 84 | else: 85 | logger.warning(f"NCBI_USER_API_KEY environment variable not found. Proceeding without API key.") 86 | 87 | searcher = PubMedSearcher(email = NCBI_USER_EMAIL, api_key = NCBI_USER_API_KEY) 88 | 89 | # Create date range if dates are provided 90 | # Note: The formats of start_date and end_date is always YYYY/MM/DD 91 | date_range = None 92 | if start_date or end_date: 93 | # Validate date formats 94 | date_pattern = re.compile(r'^\d{4}/\d{2}/\d{2}$') 95 | if start_date and not date_pattern.match(start_date): 96 | raise ValueError(f"Invalid start_date format: {start_date}. Must be YYYY/MM/DD") 97 | if end_date and not date_pattern.match(end_date): 98 | raise ValueError(f"Invalid end_date format: {end_date}. Must be YYYY/MM/DD") 99 | 100 | date_range = (start_date, end_date) if start_date and end_date else None 101 | 102 | # Perform search 103 | records = searcher.search( 104 | advanced_search=advanced_search, 105 | date_range=date_range, 106 | max_results=max_results 107 | ) 108 | 109 | if not records: 110 | logger.warning("No results found for the search criteria") 111 | return { 112 | "success": False, 113 | "error": "No results found for the given criteria." 114 | } 115 | 116 | # Export both TXT and JSON formats 117 | if not output_filename: 118 | base_filename = f"pubmed_results_{time_string}" 119 | json_filename = f"{base_filename}.json" 120 | txt_filename = f"{base_filename}.txt" 121 | else: 122 | # Remove any existing extension 123 | base_filename = output_filename.rsplit('.', 1)[0] + f"_{time_string}" 124 | json_filename = f"{base_filename}.json" 125 | txt_filename = f"{base_filename}.txt" 126 | 127 | # Export both formats 128 | json_path = os.path.abspath(searcher.export_to_json(records, json_filename)) 129 | txt_path = os.path.abspath(searcher.export_to_txt(records, txt_filename)) 130 | 131 | # Verify if files were saved successfully 132 | if not os.path.exists(json_path): 133 | logger.error(f"Failed to create JSON file at {json_path}") 134 | return { 135 | "success": False, 136 | "error": f"Failed to save JSON results file." 137 | } 138 | 139 | logger.info(f"Successfully saved {len(records)} articles to JSON: {json_path}") 140 | 141 | return { 142 | "success": True, 143 | "message": f"Search completed successfully. Found {len(records)} articles.", 144 | "json_file": os.path.basename(json_path), 145 | "txt_file": os.path.basename(txt_path), 146 | "note": "JSON files are recommended for AI model analysis.", 147 | "article_count": len(records) 148 | } 149 | 150 | except ValueError as ve: 151 | logger.error(f"ValueError in search_pubmed: {str(ve)}", exc_info=True) 152 | return {"success": False, "error": str(ve)} 153 | except Exception as e: 154 | logger.error(f"Error in search_pubmed: {str(e)}", exc_info=True) 155 | return { 156 | "success": False, 157 | "error": f"Error during search: {str(e)}" 158 | } 159 | 160 | @pubmearch.tool() 161 | async def list_result_files() -> Dict[str, Any]: 162 | """Lists all available PubMed result files. 163 | 164 | Two types of files are returned: 165 | - JSON files (recommended): structured data, suitable for AI model analysis 166 | - TXT files (alternative): plain text format, for backward compatibility 167 | """ 168 | try: 169 | logger.info(f"Listing result files in: {results_dir}") 170 | 171 | if not os.path.exists(results_dir): 172 | logger.warning(f"Results directory does not exist: {results_dir}") 173 | os.makedirs(results_dir, exist_ok=True) 174 | logger.info(f"Created results directory: {results_dir}") 175 | return { 176 | "success": True, 177 | "files": [], 178 | "count": 0, 179 | "directory": results_dir 180 | } 181 | 182 | # Get JSON and TXT files separately 183 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] 184 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] 185 | 186 | return { 187 | "success": True, 188 | "files": { 189 | "recommended": json_files, # JSON files (recommended) 190 | "alternative": txt_files # TXT files (alternative) 191 | }, 192 | "count": len(json_files) + len(txt_files), 193 | "directory": results_dir, 194 | "note": "Always use JSON files first." 195 | } 196 | except Exception as e: 197 | logger.error(f"Error in list_result_files: {str(e)}", exc_info=True) 198 | return { 199 | "success": False, 200 | "error": str(e), 201 | "directory": results_dir if 'results_dir' in locals() else "unknown" 202 | } 203 | 204 | @pubmearch.tool() 205 | async def analyze_research_keywords(filename: str, top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]: 206 | """Analyze the research hotspots and trends in PubMed result files according keywords. 207 | 208 | Note: It is recommended to use JSON format files for better analysis results. 209 | 210 | Args: 211 | filename: File name of results. (.json format is recommended) 212 | top_n: Return the top n hot keywords. 213 | include_trends: Boolean value to determine whether to include trends analysis. Default is True. 214 | """ 215 | try: 216 | filepath = os.path.join(results_dir, filename) 217 | logger.info(f"Analyzing research keywords from file: {filepath}") 218 | 219 | # Check if the file exists 220 | if not os.path.exists(filepath): 221 | logger.error(f"File not found: {filepath}") 222 | # JSON first 223 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] 224 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] 225 | return { 226 | "success": False, 227 | "error": f"File not found: {filepath}", 228 | "available_files": { 229 | "recommended": json_files, 230 | "alternative": txt_files 231 | }, 232 | "note": "Always use JSON files first." 233 | } 234 | 235 | # Parse the result file 236 | articles = analyzer.parse_results_file(filepath) 237 | 238 | if not articles: 239 | logger.warning(f"No articles found in file: {filepath}") 240 | return { 241 | "success": False, 242 | "error": "No articles found in the file." 243 | } 244 | 245 | # Analyze keywords 246 | analysis_results = analyzer.analyze_research_keywords(articles, top_n, include_trends) 247 | 248 | return { 249 | "success": True, 250 | "file_analyzed": filename, 251 | "article_count": len(articles), 252 | "keyword_analysis": analysis_results 253 | } 254 | 255 | except Exception as e: 256 | logger.error(f"Error in analyze_research_keywords: {str(e)}", exc_info=True) 257 | return { 258 | "success": False, 259 | "error": str(e) 260 | } 261 | 262 | @pubmearch.tool() 263 | async def analyze_publication_count(filename: str, months_per_period: int = 3) -> Dict[str, Any]: 264 | """Analyze publication counts over time from a PubMed results file. 265 | 266 | Note: It is recommended to use JSON format files for better analysis results. 267 | 268 | Args: 269 | filename: File name of results. (.json format is recommended) 270 | months_per_period: Number of months per analysis period 271 | """ 272 | try: 273 | filepath = os.path.join(results_dir, filename) 274 | logger.info(f"Analyzing publication counts from file: {filepath}") 275 | 276 | # Check if the file exists 277 | if not os.path.exists(filepath): 278 | logger.error(f"File not found: {filepath}") 279 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] 280 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] 281 | return { 282 | "success": False, 283 | "error": f"File not found: {filepath}", 284 | "available_files": { 285 | "recommended": json_files, 286 | "alternative": txt_files 287 | }, 288 | "note": "Always use JSON files first." 289 | } 290 | 291 | # Parse the result file 292 | articles = analyzer.parse_results_file(filepath) 293 | 294 | if not articles: 295 | logger.warning(f"No articles found in file: {filepath}") 296 | return { 297 | "success": False, 298 | "error": "No articles found in the file." 299 | } 300 | 301 | # Analyze publication counts 302 | pub_counts = analyzer.analyze_publication_count(articles, months_per_period) 303 | 304 | return { 305 | "success": True, 306 | "file_analyzed": filename, 307 | "article_count": len(articles), 308 | "publication_counts": pub_counts 309 | } 310 | 311 | except Exception as e: 312 | logger.error(f"Error in analyze_publication_count: {str(e)}", exc_info=True) 313 | return { 314 | "success": False, 315 | "error": str(e) 316 | } 317 | 318 | @pubmearch.tool() 319 | async def generate_comprehensive_analysis( 320 | filename: str, 321 | top_keywords: int = 20, 322 | months_per_period: int = 3 323 | ) -> Dict[str, Any]: 324 | """Generate a comprehensive analysis of a PubMed results file. 325 | 326 | Note: It is recommended to use JSON format files for better analysis results. 327 | 328 | Args: 329 | filename: File name of results. (.json format is recommended) 330 | top_keywords: Number of top keywords to analyze 331 | months_per_period: Number of months per analysis period 332 | """ 333 | try: 334 | filepath = os.path.join(results_dir, filename) 335 | logger.info(f"Generating comprehensive analysis from file: {filepath}") 336 | 337 | # Check if the file exists 338 | if not os.path.exists(filepath): 339 | logger.error(f"File not found: {filepath}") 340 | json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] 341 | txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] 342 | return { 343 | "success": False, 344 | "error": f"File not found: {filepath}", 345 | "available_files": { 346 | "recommended": json_files, 347 | "alternative": txt_files 348 | }, 349 | "note": "Always use JSON files first." 350 | } 351 | 352 | # Generate comprehensive analysis directly 353 | results = analyzer.generate_comprehensive_analysis( 354 | filepath, 355 | top_keywords=top_keywords, 356 | months_per_period=months_per_period 357 | ) 358 | 359 | if "error" in results: 360 | logger.error(f"Error in analysis: {results['error']}") 361 | return { 362 | "success": False, 363 | "error": results["error"] 364 | } 365 | 366 | logger.info("Comprehensive analysis completed successfully") 367 | return { 368 | "success": True, 369 | "analysis": results 370 | } 371 | 372 | except Exception as e: 373 | logger.error(f"Error in generate_comprehensive_analysis: {str(e)}", exc_info=True) 374 | return { 375 | "success": False, 376 | "error": str(e) 377 | } 378 | 379 | if __name__ == "__main__": 380 | os.makedirs(results_dir, exist_ok=True) 381 | pubmearch.run() ``` -------------------------------------------------------------------------------- /pubmearch/pubmed_searcher.py: -------------------------------------------------------------------------------- ```python 1 | #!/usr/bin/env python 2 | # -*- coding: utf-8 -*- 3 | 4 | """ 5 | PubMed Searcher Module 6 | 7 | This module provides functionality for searching PubMed and retrieving article data. 8 | """ 9 | 10 | import os 11 | import re 12 | import time 13 | import json 14 | import logging 15 | from datetime import datetime 16 | from typing import List, Dict, Tuple, Optional, Any, Union 17 | from Bio import Entrez 18 | from pathlib import Path 19 | 20 | 21 | # Configure logging 22 | logger = logging.getLogger(__name__) 23 | 24 | class PubMedSearcher: 25 | """Class to search PubMed and retrieve article data.""" 26 | 27 | def __init__(self, email: Optional[str] = None, results_dir: Optional[str] = None, api_key: Optional[str] = None): 28 | """ 29 | Initialize PubMed searcher with email address in .env. 30 | 31 | Args: 32 | email: Email address for Entrez. If None, use NCBI_USER_EMAIL from environment variables. 33 | results_dir: Optional custom results directory path 34 | api_key: API key for NCBI. If None, use NCBI_USER_API_KEY from environment variables. 35 | """ 36 | # use NCBI_USER_EMAIL from .env if email is not provided 37 | self.email = email if email is not None else os.getenv('NCBI_USER_EMAIL') 38 | self.api_key = api_key if api_key is not None else os.getenv('NCBI_USER_API_KEY') 39 | if not self.email: 40 | raise ValueError("Email is required. Either pass it directly or set NCBI_USER_EMAIL in .env") 41 | 42 | # Set up Entrez 43 | Entrez.email = self.email 44 | Entrez.api_key = self.api_key 45 | 46 | # Use provided results directory or create default 47 | self.results_dir = Path(results_dir) if results_dir else Path(__file__).resolve().parent / "results" 48 | os.makedirs(self.results_dir, exist_ok=True) 49 | logger.info(f"Using results directory: {self.results_dir}") 50 | 51 | def search(self, 52 | advanced_search: str, 53 | date_range: Optional[Tuple[str, str]] = None, 54 | max_results: int = 1000) -> List[Dict[str, Any]]: 55 | """ 56 | Search PubMed using advanced search syntax. 57 | 58 | Args: 59 | advanced_search: PubMed advanced search query 60 | date_range: Optional tuple of (start_date, end_date), 61 | date format is always YYYY/MM/DD 62 | max_results: Maximum number of results to retrieve 63 | 64 | Returns: 65 | List of article dictionaries 66 | """ 67 | search_term = advanced_search 68 | 69 | # Add date range to query if provided 70 | # Note: The formats of start_date and end_date is always YYYY/MM/DD 71 | if date_range: 72 | start_date, end_date = date_range 73 | date_filter = "" 74 | 75 | # start_date 76 | if start_date: 77 | date_filter += f" AND ('{start_date}'[Date - Publication]" 78 | if end_date: 79 | date_filter += f" : '{end_date}'[Date - Publication]" 80 | date_filter += ")" 81 | # if only end_date, set start_date to 1900/01/01 for inclusio 82 | elif end_date: 83 | date_filter += f" AND ('1900/01/01'[Date - Publication] : '{end_date}'[Date - Publication])" 84 | 85 | search_term += date_filter 86 | 87 | try: 88 | # Search PubMed 89 | logger.info(f"Searching PubMed with query: {search_term}") 90 | search_handle = Entrez.esearch(db="pubmed", term=search_term, retmax=max_results, usehistory="y") 91 | search_results = Entrez.read(search_handle) 92 | search_handle.close() 93 | 94 | webenv = search_results["WebEnv"] 95 | query_key = search_results["QueryKey"] 96 | 97 | # Get the count of results 98 | count = int(search_results["Count"]) 99 | logger.info(f"Found {count} results, retrieving up to {max_results}") 100 | 101 | if count == 0: 102 | logger.warning("No results found") 103 | return [] 104 | 105 | # Initialize an empty list to store articles 106 | articles = [] 107 | 108 | # Fetch results in batches to avoid timeouts 109 | batch_size = 100 110 | for start in range(0, min(count, max_results), batch_size): 111 | end = min(count, start + batch_size, max_results) 112 | logger.info(f"Retrieving records {start+1} to {end}") 113 | 114 | try: 115 | # Fetch the records 116 | fetch_handle = Entrez.efetch( 117 | db="pubmed", 118 | retstart=start, 119 | retmax=batch_size, 120 | webenv=webenv, 121 | query_key=query_key, 122 | retmode="xml" 123 | ) 124 | 125 | # Parse the records 126 | records = Entrez.read(fetch_handle)["PubmedArticle"] 127 | fetch_handle.close() 128 | 129 | # Process each record 130 | for record in records: 131 | article = self._parse_pubmed_record(record) 132 | articles.append(article) 133 | 134 | # Sleep to avoid overloading the NCBI server 135 | time.sleep(1) 136 | 137 | except Exception as e: 138 | logger.error(f"Error fetching batch {start+1} to {end}: {str(e)}") 139 | continue 140 | 141 | return articles 142 | 143 | except Exception as e: 144 | logger.error(f"Error searching PubMed: {str(e)}") 145 | return [] 146 | 147 | def _parse_pubmed_record(self, record: Dict) -> Dict[str, Any]: 148 | """ 149 | Parse a PubMed record into a structured article dictionary. 150 | 151 | Args: 152 | record: PubMed record from Entrez.read 153 | 154 | Returns: 155 | Dictionary containing structured article data 156 | """ 157 | article_data = {} 158 | 159 | # Get MedlineCitation and Article 160 | medline_citation = record.get("MedlineCitation", {}) 161 | article = medline_citation.get("Article", {}) 162 | 163 | # Extract basic article information 164 | article_data["title"] = article.get("ArticleTitle", "") 165 | 166 | # Extract authors 167 | authors = [] 168 | author_list = article.get("AuthorList", []) 169 | for author in author_list: 170 | if "LastName" in author and "ForeName" in author: 171 | authors.append(f"{author['LastName']} {author['ForeName']}") 172 | elif "LastName" in author and "Initials" in author: 173 | authors.append(f"{author['LastName']} {author['Initials']}") 174 | elif "LastName" in author: 175 | authors.append(author["LastName"]) 176 | elif "CollectiveName" in author: 177 | authors.append(author["CollectiveName"]) 178 | article_data["authors"] = authors 179 | 180 | # Extract journal information 181 | journal = article.get("Journal", {}) 182 | article_data["journal"] = journal.get("Title", "") 183 | 184 | # Extract publication date 185 | pub_date = {} 186 | journal_issue = journal.get("JournalIssue", {}) 187 | if "PubDate" in journal_issue: 188 | pub_date = journal_issue["PubDate"] 189 | 190 | pub_date_str = "" 191 | if "Year" in pub_date: 192 | pub_date_str = pub_date["Year"] 193 | if "Month" in pub_date: 194 | pub_date_str += f" {pub_date['Month']}" 195 | if "Day" in pub_date: 196 | pub_date_str += f" {pub_date['Day']}" 197 | 198 | article_data["publication_date"] = pub_date_str 199 | 200 | # Extract abstract 201 | abstract_text = "" 202 | if "Abstract" in article and "AbstractText" in article["Abstract"]: 203 | # Handle different abstract formats 204 | abstract_parts = article["Abstract"]["AbstractText"] 205 | if isinstance(abstract_parts, list): 206 | for part in abstract_parts: 207 | if isinstance(part, str): 208 | abstract_text += part + " " 209 | elif isinstance(part, dict) and "#text" in part: 210 | label = part.get("Label", "") 211 | text = part["#text"] 212 | if label: 213 | abstract_text += f"{label}: {text} " 214 | else: 215 | abstract_text += text + " " 216 | else: 217 | abstract_text = str(abstract_parts) 218 | 219 | article_data["abstract"] = abstract_text.strip() 220 | 221 | # Extract keywords 222 | keywords = [] 223 | # MeSH headings 224 | mesh_headings = medline_citation.get("MeshHeadingList", []) 225 | for heading in mesh_headings: 226 | if "DescriptorName" in heading: 227 | descriptor = heading["DescriptorName"] 228 | if isinstance(descriptor, dict) and "content" in descriptor: 229 | keywords.append(descriptor["content"]) 230 | elif isinstance(descriptor, str): 231 | keywords.append(descriptor) 232 | 233 | # Keywords from KeywordList 234 | keyword_lists = medline_citation.get("KeywordList", []) 235 | for keyword_list in keyword_lists: 236 | if isinstance(keyword_list, list): 237 | for keyword in keyword_list: 238 | if isinstance(keyword, str): 239 | keywords.append(keyword) 240 | elif isinstance(keyword, dict) and "content" in keyword: 241 | keywords.append(keyword["content"]) 242 | 243 | article_data["keywords"] = keywords 244 | 245 | # Extract PMID 246 | pmid = medline_citation.get("PMID", "") 247 | if isinstance(pmid, dict) and "content" in pmid: 248 | article_data["pmid"] = pmid["content"] 249 | else: 250 | article_data["pmid"] = str(pmid) 251 | 252 | # Extract DOI - Final attempt with careful iteration 253 | doi = "" 254 | try: 255 | pubmed_data = record.get("PubmedData") 256 | if pubmed_data: 257 | article_id_list = pubmed_data.get("ArticleIdList") 258 | # Iterate through article_id_list if it exists and is iterable 259 | if article_id_list: 260 | try: 261 | for id_element in article_id_list: 262 | # Check if the element has attributes and the IdType is 'doi' 263 | # Handles Bio.Entrez.Parser.StringElement and similar objects 264 | if hasattr(id_element, 'attributes') and id_element.attributes.get('IdType') == 'doi': 265 | doi = str(id_element).strip() # Get the string value 266 | if doi: break # Found DOI, exit loop 267 | # Fallback check for plain dictionary structure (less common) 268 | elif isinstance(id_element, dict) and id_element.get('IdType') == 'doi': 269 | doi = id_element.get('content', '').strip() or id_element.get('#text', '').strip() 270 | if doi: break # Found DOI, exit loop 271 | except TypeError: 272 | # Handle cases where article_id_list might not be iterable (e.g., single element) 273 | # Check if the single element itself is the DOI 274 | if hasattr(article_id_list, 'attributes') and article_id_list.attributes.get('IdType') == 'doi': 275 | doi = str(article_id_list).strip() 276 | 277 | except Exception as e: 278 | print(f"Warning: Error during DOI extraction for PMID {article_data.get('pmid', 'N/A')}: {e}") 279 | doi = "" # Reset DOI on error 280 | 281 | article_data["doi"] = doi 282 | 283 | return article_data 284 | 285 | def export_to_txt(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str: 286 | """ 287 | Export articles to a formatted text file. 288 | 289 | Args: 290 | articles: List of article dictionaries 291 | filename: Optional output filename 292 | 293 | Returns: 294 | Path to the created file 295 | """ 296 | if not filename: 297 | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 298 | filename = f"pubmed_results_{timestamp}.txt" 299 | 300 | filepath = os.path.join(self.results_dir, filename) 301 | 302 | with open(filepath, 'w', encoding='utf-8') as f: 303 | for i, article in enumerate(articles, 1): 304 | f.write(f"Article {i}\n") 305 | f.write("-" * 80 + "\n") 306 | f.write(f"Title: {article.get('title', '')}\n") 307 | f.write(f"Authors: {', '.join(article.get('authors', []))}\n") 308 | f.write(f"Journal: {article.get('journal', '')}\n") 309 | f.write(f"Publication Date: {article.get('publication_date', '')}\n") 310 | f.write(f"Abstract:\n{article.get('abstract', '')}\n") 311 | f.write(f"Keywords: {', '.join(article.get('keywords', []))}\n") 312 | f.write(f"PMID: {article.get('pmid', '')}\n") 313 | f.write(f"DOI: https://doi.org/{article.get('doi', '')}\n") 314 | f.write("=" * 80 + "\n\n") 315 | 316 | logger.info(f"Exported {len(articles)} articles to {filepath}") 317 | return filepath 318 | 319 | def export_to_json(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str: 320 | """ 321 | Export articles to JSON format file. 322 | 323 | Args: 324 | articles: List of article dictionaries 325 | filename: Optional output filename 326 | 327 | Returns: 328 | Path to the created file 329 | """ 330 | if not filename: 331 | timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") 332 | filename = f"pubmed_results_{timestamp}.json" 333 | 334 | filepath = os.path.join(self.results_dir, filename) 335 | 336 | with open(filepath, 'w', encoding='utf-8') as f: 337 | json.dump({ 338 | "metadata": { 339 | "export_time": datetime.now().isoformat(), 340 | "article_count": len(articles) 341 | }, 342 | "articles": articles 343 | }, f, ensure_ascii=False, indent=2) 344 | 345 | logger.info(f"Exported {len(articles)} articles to {filepath}") 346 | return filepath 347 | ```