# Directory Structure ``` ├── .gitignore ├── LICENSE ├── pubmearch │ ├── __init__.py │ ├── analyzer.py │ ├── pubmed_searcher.py │ └── server.py ├── pyproject.toml └── README.md ``` # Files -------------------------------------------------------------------------------- /pubmearch/__init__.py: -------------------------------------------------------------------------------- ```python """ PubMed Analysis MCP Server Package """ ``` -------------------------------------------------------------------------------- /pubmearch/analyzer.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # -*- coding: utf-8 -*- """ PubMed Analysis Module This module provides analysis functionality for PubMed search results, including research hotspots, trends, and publication statistics. """ import os import re import json from datetime import datetime from collections import Counter, defaultdict from typing import Dict, List, Optional, Tuple, Any, Union class PubMedAnalyzer: """Class to analyze PubMed search results from text files.""" def __init__(self, results_dir: str = "../results"): """ Initialize the PubMed analyzer. Args: results_dir: Directory containing PubMed search result text files """ self.results_dir = results_dir def parse_results_file(self, filepath: str) -> List[Dict[str, Any]]: """ Parse a PubMed results file (txt or json) into structured data. Args: filepath: Path to the results file Returns: List of dictionaries containing structured article data """ if not os.path.exists(filepath): raise FileNotFoundError(f"File not found: {filepath}") # Choose parsing method based on file extension if filepath.endswith('.json'): return self._parse_json_file(filepath) else: return self._parse_txt_file(filepath) def _parse_json_file(self, filepath: str) -> List[Dict[str, Any]]: """Parse a JSON results file.""" with open(filepath, 'r', encoding='utf-8') as f: data = json.load(f) return data.get("articles", []) def _parse_txt_file(self, filepath: str) -> List[Dict[str, Any]]: """Parse a text results file.""" articles = [] current_article = None section = None with open(filepath, 'r', encoding='utf-8') as f: lines = f.readlines() i = 0 while i < len(lines): line = lines[i].strip() # New article marker if line.startswith("Article ") and "-" * 10 in lines[i+1]: if current_article: articles.append(current_article) current_article = { "title": "", "authors": [], "journal": "", "publication_date": "", "abstract": "", "keywords": [], "pmid": "", "doi": "" } section = None i += 2 # Skip the separator line # Section headers elif line.startswith("Title: "): current_article["title"] = line[7:].strip() section = "title" elif line.startswith("Authors: "): authors_line = line[9:].strip() if authors_line != "N/A": current_article["authors"] = [a.strip() for a in authors_line.split(",")] section = None elif line.startswith("Journal: "): current_article["journal"] = line[9:].strip() section = None elif line.startswith("Publication Date: "): current_article["publication_date"] = line[18:].strip() section = None elif line == "Abstract:": section = "abstract" elif line.startswith("Keywords: "): keywords_line = line[10:].strip() current_article["keywords"] = [k.strip() for k in keywords_line.split(",")] section = None elif line.startswith("PMID: "): current_article["pmid"] = line[6:].strip() section = None elif line.startswith("DOI: "): current_article["doi"] = line[5:].strip() section = None elif line.startswith("=" * 20): section = None # Content sections elif section == "abstract" and line and not line.startswith("Keywords: "): current_article["abstract"] += line + " " i += 1 # Add the last article if current_article: articles.append(current_article) return articles def extract_publication_dates(self, articles: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]: """ Extract and parse publication dates from articles. Args: articles: List of article dictionaries Returns: List of tuples containing (article_title, publication_date) """ publication_dates = [] for article in articles: date_str = article.get("publication_date", "") # Try different formats parsed_date = None # Format: YYYY MMM if re.match(r"^\d{4} [A-Za-z]{3}$", date_str): try: parsed_date = datetime.strptime(date_str, "%Y %b") except ValueError: pass # Format: YYYY MMM DD elif re.match(r"^\d{4} [A-Za-z]{3} \d{1,2}$", date_str): try: parsed_date = datetime.strptime(date_str, "%Y %b %d") except ValueError: pass # Format: YYYY MMM-MMM elif re.match(r"^\d{4} [A-Za-z]{3}-[A-Za-z]{3}$", date_str): try: # Just use the first month month_part = date_str.split(" ")[1].split("-")[0] parsed_date = datetime.strptime(f"{date_str.split(' ')[0]} {month_part}", "%Y %b") except (ValueError, IndexError): pass # Format: YYYY elif re.match(r"^\d{4}$", date_str): try: parsed_date = datetime.strptime(date_str, "%Y") except ValueError: pass if parsed_date: publication_dates.append((article.get("title", ""), parsed_date)) return publication_dates def analyze_research_keywords(self, articles: List[Dict[str, Any]], top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]: """ Analyze research hotspots and trends based on keyword frequencies. Args: articles: List of article dictionaries top_n: Number of top keywords to include include_trends: Bool indicating whether to include trend analysis, default True. Returns: Dictionary with analysis results """ # Extract all keywords all_keywords = [] for article in articles: all_keywords.extend(article.get("keywords", [])) # Count keyword frequencies keyword_counts = Counter(all_keywords) # Get top keywords top_keywords = keyword_counts.most_common(top_n) # Organize articles by keyword keyword_articles = defaultdict(list) for article in articles: article_keywords = article.get("keywords", []) for kw in article_keywords: if kw in dict(top_keywords): keyword_articles[kw].append({ "title": article.get("title", ""), "authors": article.get("authors", []), "journal": article.get("journal", ""), "publication_date": article.get("publication_date", ""), "pmid": article.get("pmid", ""), "doi": article.get("doi", "") }) # Prepare results results = { "top_keywords": [{"keyword": kw, "count": count} for kw, count in top_keywords], "keyword_articles": {kw: articles for kw, articles in keyword_articles.items()} } # 如果需要趋势分析 if include_trends: # 提取发布日期 pub_dates = self.extract_publication_dates(articles) # 按月份分组 monthly_keyword_counts = defaultdict(lambda: defaultdict(int)) for article in articles: date_str = article.get("publication_date", "") article_keywords = article.get("keywords", []) # 尝试解析日期 parsed_date = None for title, date in pub_dates: if title == article.get("title", ""): parsed_date = date break if parsed_date: month_key = parsed_date.strftime("%Y-%m") for kw in article_keywords: if kw in dict(top_keywords): monthly_keyword_counts[month_key][kw] += 1 # 转换为可排序格式并按日期排序 sorted_months = sorted(monthly_keyword_counts.keys()) # 准备趋势数据 trend_data = { "months": sorted_months, "keywords": [kw for kw, _ in top_keywords], "counts": [] } for keyword, _ in top_keywords: keyword_trend = [] for month in sorted_months: keyword_trend.append(monthly_keyword_counts[month][keyword]) trend_data["counts"].append({ "keyword": keyword, "monthly_counts": keyword_trend }) results["trends"] = trend_data return results def analyze_publication_count(self, articles: List[Dict[str, Any]], months_per_period: int = 3) -> Dict[str, Any]: """ Analyze publication counts over time. Args: articles: List of article dictionaries months_per_period: Number of months to group by Returns: Dictionary with publication count analysis """ # Extract publication dates pub_dates = self.extract_publication_dates(articles) # Group by period period_counts = defaultdict(int) for _, date in pub_dates: # Calculate period key based on months_per_period year = date.year month = date.month period = (month - 1) // months_per_period period_key = f"{year}-P{period+1}" # 1-indexed periods period_counts[period_key] += 1 # Sort periods chronologically sorted_periods = sorted(period_counts.keys()) # Prepare result results = { "periods": sorted_periods, "counts": [period_counts[period] for period in sorted_periods], "months_per_period": months_per_period, "total_publications": len(pub_dates) } return results def generate_comprehensive_analysis(self, filepath: str, top_keywords: int = 20, months_per_period: int = 3) -> Dict[str, Any]: """ Generate a comprehensive analysis of PubMed results from a file. Args: filepath: Path to the results text file top_keywords: Number of top keywords for hotspot analysis months_per_period: Number of months per period for publication count Returns: Dictionary with comprehensive analysis results """ try: articles = self.parse_results_file(filepath) if not articles: return {"error": "No articles found in the file."} # Generate analysis components keyword_analysis = self.analyze_research_keywords(articles, top_keywords) pub_counts = self.analyze_publication_count(articles, months_per_period) # Combine results results = { "file_analyzed": os.path.basename(filepath), "analysis_timestamp": datetime.now().isoformat(), "article_count": len(articles), "keyword_analysis": keyword_analysis, "publication_counts": pub_counts } return results except Exception as e: return {"error": str(e)} def list_result_files(self) -> List[str]: """ List all result files in the results directory. Returns: List of filenames """ if not os.path.exists(self.results_dir): return [] return [f for f in os.listdir(self.results_dir) if f.endswith('.txt')] ``` -------------------------------------------------------------------------------- /pubmearch/server.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # -*- coding: utf-8 -*- """ PubMed Analysis MCP Server This module implements an MCP server for analyzing PubMed search results, providing tools to identify research hotspots, trends, and publication statistics. Note: - Firstly, always use search_pubmed pubmearch.tool to generate new results. - Secondly, for results analysis, always use JSON format files. """ import os import sys import subprocess import json import logging import re from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any, Union # Add parent directory to path to import PubMedSearcher from parent parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(parent_dir) from .pubmed_searcher import PubMedSearcher from .analyzer import PubMedAnalyzer # Import FastMCP from mcp.server.fastmcp import FastMCP, Context # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(os.path.join(parent_dir, "pubmed_server.log")), logging.StreamHandler() ] ) logger = logging.getLogger("pubmed-mcp-server") # make sure results directory exists now = datetime.now() time_string = now.strftime("%Y%m%d%H%M%S") results_dir = Path(__file__).resolve().parent / "results" os.makedirs(results_dir, exist_ok=True) logger.info(f"Results directory: {results_dir}") # Initialize analyzer analyzer = PubMedAnalyzer(results_dir=results_dir) # Initialize MCP server pubmearch = FastMCP( "PubMed Analyzer", description="MCP server for analyzing PubMed search results" ) @pubmearch.tool() async def search_pubmed( advanced_search: str, start_date: Optional[str] = None, end_date: Optional[str] = None, max_results: int = 1000, output_filename: Optional[str] = None, ) -> Dict[str, Any]: try: logger.info(f"Starting PubMed search with query: {advanced_search}") NCBI_USER_EMAIL = os.getenv('NCBI_USER_EMAIL') NCBI_USER_API_KEY = os.getenv('NCBI_USER_API_KEY') if not NCBI_USER_EMAIL: logger.error("Email not provided and NCBI_USER_EMAIL environment variable not set") return { "success": False, "error": "Server configuration error: NCBI User Email is not set." } logger.info(f"Use email address: {NCBI_USER_EMAIL}") if NCBI_USER_API_KEY: logger.info(f"Using API key from environment.") else: logger.warning(f"NCBI_USER_API_KEY environment variable not found. Proceeding without API key.") searcher = PubMedSearcher(email = NCBI_USER_EMAIL, api_key = NCBI_USER_API_KEY) # Create date range if dates are provided # Note: The formats of start_date and end_date is always YYYY/MM/DD date_range = None if start_date or end_date: # Validate date formats date_pattern = re.compile(r'^\d{4}/\d{2}/\d{2}$') if start_date and not date_pattern.match(start_date): raise ValueError(f"Invalid start_date format: {start_date}. Must be YYYY/MM/DD") if end_date and not date_pattern.match(end_date): raise ValueError(f"Invalid end_date format: {end_date}. Must be YYYY/MM/DD") date_range = (start_date, end_date) if start_date and end_date else None # Perform search records = searcher.search( advanced_search=advanced_search, date_range=date_range, max_results=max_results ) if not records: logger.warning("No results found for the search criteria") return { "success": False, "error": "No results found for the given criteria." } # Export both TXT and JSON formats if not output_filename: base_filename = f"pubmed_results_{time_string}" json_filename = f"{base_filename}.json" txt_filename = f"{base_filename}.txt" else: # Remove any existing extension base_filename = output_filename.rsplit('.', 1)[0] + f"_{time_string}" json_filename = f"{base_filename}.json" txt_filename = f"{base_filename}.txt" # Export both formats json_path = os.path.abspath(searcher.export_to_json(records, json_filename)) txt_path = os.path.abspath(searcher.export_to_txt(records, txt_filename)) # Verify if files were saved successfully if not os.path.exists(json_path): logger.error(f"Failed to create JSON file at {json_path}") return { "success": False, "error": f"Failed to save JSON results file." } logger.info(f"Successfully saved {len(records)} articles to JSON: {json_path}") return { "success": True, "message": f"Search completed successfully. Found {len(records)} articles.", "json_file": os.path.basename(json_path), "txt_file": os.path.basename(txt_path), "note": "JSON files are recommended for AI model analysis.", "article_count": len(records) } except ValueError as ve: logger.error(f"ValueError in search_pubmed: {str(ve)}", exc_info=True) return {"success": False, "error": str(ve)} except Exception as e: logger.error(f"Error in search_pubmed: {str(e)}", exc_info=True) return { "success": False, "error": f"Error during search: {str(e)}" } @pubmearch.tool() async def list_result_files() -> Dict[str, Any]: """Lists all available PubMed result files. Two types of files are returned: - JSON files (recommended): structured data, suitable for AI model analysis - TXT files (alternative): plain text format, for backward compatibility """ try: logger.info(f"Listing result files in: {results_dir}") if not os.path.exists(results_dir): logger.warning(f"Results directory does not exist: {results_dir}") os.makedirs(results_dir, exist_ok=True) logger.info(f"Created results directory: {results_dir}") return { "success": True, "files": [], "count": 0, "directory": results_dir } # Get JSON and TXT files separately json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] return { "success": True, "files": { "recommended": json_files, # JSON files (recommended) "alternative": txt_files # TXT files (alternative) }, "count": len(json_files) + len(txt_files), "directory": results_dir, "note": "Always use JSON files first." } except Exception as e: logger.error(f"Error in list_result_files: {str(e)}", exc_info=True) return { "success": False, "error": str(e), "directory": results_dir if 'results_dir' in locals() else "unknown" } @pubmearch.tool() async def analyze_research_keywords(filename: str, top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]: """Analyze the research hotspots and trends in PubMed result files according keywords. Note: It is recommended to use JSON format files for better analysis results. Args: filename: File name of results. (.json format is recommended) top_n: Return the top n hot keywords. include_trends: Boolean value to determine whether to include trends analysis. Default is True. """ try: filepath = os.path.join(results_dir, filename) logger.info(f"Analyzing research keywords from file: {filepath}") # Check if the file exists if not os.path.exists(filepath): logger.error(f"File not found: {filepath}") # JSON first json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] return { "success": False, "error": f"File not found: {filepath}", "available_files": { "recommended": json_files, "alternative": txt_files }, "note": "Always use JSON files first." } # Parse the result file articles = analyzer.parse_results_file(filepath) if not articles: logger.warning(f"No articles found in file: {filepath}") return { "success": False, "error": "No articles found in the file." } # Analyze keywords analysis_results = analyzer.analyze_research_keywords(articles, top_n, include_trends) return { "success": True, "file_analyzed": filename, "article_count": len(articles), "keyword_analysis": analysis_results } except Exception as e: logger.error(f"Error in analyze_research_keywords: {str(e)}", exc_info=True) return { "success": False, "error": str(e) } @pubmearch.tool() async def analyze_publication_count(filename: str, months_per_period: int = 3) -> Dict[str, Any]: """Analyze publication counts over time from a PubMed results file. Note: It is recommended to use JSON format files for better analysis results. Args: filename: File name of results. (.json format is recommended) months_per_period: Number of months per analysis period """ try: filepath = os.path.join(results_dir, filename) logger.info(f"Analyzing publication counts from file: {filepath}") # Check if the file exists if not os.path.exists(filepath): logger.error(f"File not found: {filepath}") json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] return { "success": False, "error": f"File not found: {filepath}", "available_files": { "recommended": json_files, "alternative": txt_files }, "note": "Always use JSON files first." } # Parse the result file articles = analyzer.parse_results_file(filepath) if not articles: logger.warning(f"No articles found in file: {filepath}") return { "success": False, "error": "No articles found in the file." } # Analyze publication counts pub_counts = analyzer.analyze_publication_count(articles, months_per_period) return { "success": True, "file_analyzed": filename, "article_count": len(articles), "publication_counts": pub_counts } except Exception as e: logger.error(f"Error in analyze_publication_count: {str(e)}", exc_info=True) return { "success": False, "error": str(e) } @pubmearch.tool() async def generate_comprehensive_analysis( filename: str, top_keywords: int = 20, months_per_period: int = 3 ) -> Dict[str, Any]: """Generate a comprehensive analysis of a PubMed results file. Note: It is recommended to use JSON format files for better analysis results. Args: filename: File name of results. (.json format is recommended) top_keywords: Number of top keywords to analyze months_per_period: Number of months per analysis period """ try: filepath = os.path.join(results_dir, filename) logger.info(f"Generating comprehensive analysis from file: {filepath}") # Check if the file exists if not os.path.exists(filepath): logger.error(f"File not found: {filepath}") json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')] txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')] return { "success": False, "error": f"File not found: {filepath}", "available_files": { "recommended": json_files, "alternative": txt_files }, "note": "Always use JSON files first." } # Generate comprehensive analysis directly results = analyzer.generate_comprehensive_analysis( filepath, top_keywords=top_keywords, months_per_period=months_per_period ) if "error" in results: logger.error(f"Error in analysis: {results['error']}") return { "success": False, "error": results["error"] } logger.info("Comprehensive analysis completed successfully") return { "success": True, "analysis": results } except Exception as e: logger.error(f"Error in generate_comprehensive_analysis: {str(e)}", exc_info=True) return { "success": False, "error": str(e) } if __name__ == "__main__": os.makedirs(results_dir, exist_ok=True) pubmearch.run() ``` -------------------------------------------------------------------------------- /pubmearch/pubmed_searcher.py: -------------------------------------------------------------------------------- ```python #!/usr/bin/env python # -*- coding: utf-8 -*- """ PubMed Searcher Module This module provides functionality for searching PubMed and retrieving article data. """ import os import re import time import json import logging from datetime import datetime from typing import List, Dict, Tuple, Optional, Any, Union from Bio import Entrez from pathlib import Path # Configure logging logger = logging.getLogger(__name__) class PubMedSearcher: """Class to search PubMed and retrieve article data.""" def __init__(self, email: Optional[str] = None, results_dir: Optional[str] = None, api_key: Optional[str] = None): """ Initialize PubMed searcher with email address in .env. Args: email: Email address for Entrez. If None, use NCBI_USER_EMAIL from environment variables. results_dir: Optional custom results directory path api_key: API key for NCBI. If None, use NCBI_USER_API_KEY from environment variables. """ # use NCBI_USER_EMAIL from .env if email is not provided self.email = email if email is not None else os.getenv('NCBI_USER_EMAIL') self.api_key = api_key if api_key is not None else os.getenv('NCBI_USER_API_KEY') if not self.email: raise ValueError("Email is required. Either pass it directly or set NCBI_USER_EMAIL in .env") # Set up Entrez Entrez.email = self.email Entrez.api_key = self.api_key # Use provided results directory or create default self.results_dir = Path(results_dir) if results_dir else Path(__file__).resolve().parent / "results" os.makedirs(self.results_dir, exist_ok=True) logger.info(f"Using results directory: {self.results_dir}") def search(self, advanced_search: str, date_range: Optional[Tuple[str, str]] = None, max_results: int = 1000) -> List[Dict[str, Any]]: """ Search PubMed using advanced search syntax. Args: advanced_search: PubMed advanced search query date_range: Optional tuple of (start_date, end_date), date format is always YYYY/MM/DD max_results: Maximum number of results to retrieve Returns: List of article dictionaries """ search_term = advanced_search # Add date range to query if provided # Note: The formats of start_date and end_date is always YYYY/MM/DD if date_range: start_date, end_date = date_range date_filter = "" # start_date if start_date: date_filter += f" AND ('{start_date}'[Date - Publication]" if end_date: date_filter += f" : '{end_date}'[Date - Publication]" date_filter += ")" # if only end_date, set start_date to 1900/01/01 for inclusio elif end_date: date_filter += f" AND ('1900/01/01'[Date - Publication] : '{end_date}'[Date - Publication])" search_term += date_filter try: # Search PubMed logger.info(f"Searching PubMed with query: {search_term}") search_handle = Entrez.esearch(db="pubmed", term=search_term, retmax=max_results, usehistory="y") search_results = Entrez.read(search_handle) search_handle.close() webenv = search_results["WebEnv"] query_key = search_results["QueryKey"] # Get the count of results count = int(search_results["Count"]) logger.info(f"Found {count} results, retrieving up to {max_results}") if count == 0: logger.warning("No results found") return [] # Initialize an empty list to store articles articles = [] # Fetch results in batches to avoid timeouts batch_size = 100 for start in range(0, min(count, max_results), batch_size): end = min(count, start + batch_size, max_results) logger.info(f"Retrieving records {start+1} to {end}") try: # Fetch the records fetch_handle = Entrez.efetch( db="pubmed", retstart=start, retmax=batch_size, webenv=webenv, query_key=query_key, retmode="xml" ) # Parse the records records = Entrez.read(fetch_handle)["PubmedArticle"] fetch_handle.close() # Process each record for record in records: article = self._parse_pubmed_record(record) articles.append(article) # Sleep to avoid overloading the NCBI server time.sleep(1) except Exception as e: logger.error(f"Error fetching batch {start+1} to {end}: {str(e)}") continue return articles except Exception as e: logger.error(f"Error searching PubMed: {str(e)}") return [] def _parse_pubmed_record(self, record: Dict) -> Dict[str, Any]: """ Parse a PubMed record into a structured article dictionary. Args: record: PubMed record from Entrez.read Returns: Dictionary containing structured article data """ article_data = {} # Get MedlineCitation and Article medline_citation = record.get("MedlineCitation", {}) article = medline_citation.get("Article", {}) # Extract basic article information article_data["title"] = article.get("ArticleTitle", "") # Extract authors authors = [] author_list = article.get("AuthorList", []) for author in author_list: if "LastName" in author and "ForeName" in author: authors.append(f"{author['LastName']} {author['ForeName']}") elif "LastName" in author and "Initials" in author: authors.append(f"{author['LastName']} {author['Initials']}") elif "LastName" in author: authors.append(author["LastName"]) elif "CollectiveName" in author: authors.append(author["CollectiveName"]) article_data["authors"] = authors # Extract journal information journal = article.get("Journal", {}) article_data["journal"] = journal.get("Title", "") # Extract publication date pub_date = {} journal_issue = journal.get("JournalIssue", {}) if "PubDate" in journal_issue: pub_date = journal_issue["PubDate"] pub_date_str = "" if "Year" in pub_date: pub_date_str = pub_date["Year"] if "Month" in pub_date: pub_date_str += f" {pub_date['Month']}" if "Day" in pub_date: pub_date_str += f" {pub_date['Day']}" article_data["publication_date"] = pub_date_str # Extract abstract abstract_text = "" if "Abstract" in article and "AbstractText" in article["Abstract"]: # Handle different abstract formats abstract_parts = article["Abstract"]["AbstractText"] if isinstance(abstract_parts, list): for part in abstract_parts: if isinstance(part, str): abstract_text += part + " " elif isinstance(part, dict) and "#text" in part: label = part.get("Label", "") text = part["#text"] if label: abstract_text += f"{label}: {text} " else: abstract_text += text + " " else: abstract_text = str(abstract_parts) article_data["abstract"] = abstract_text.strip() # Extract keywords keywords = [] # MeSH headings mesh_headings = medline_citation.get("MeshHeadingList", []) for heading in mesh_headings: if "DescriptorName" in heading: descriptor = heading["DescriptorName"] if isinstance(descriptor, dict) and "content" in descriptor: keywords.append(descriptor["content"]) elif isinstance(descriptor, str): keywords.append(descriptor) # Keywords from KeywordList keyword_lists = medline_citation.get("KeywordList", []) for keyword_list in keyword_lists: if isinstance(keyword_list, list): for keyword in keyword_list: if isinstance(keyword, str): keywords.append(keyword) elif isinstance(keyword, dict) and "content" in keyword: keywords.append(keyword["content"]) article_data["keywords"] = keywords # Extract PMID pmid = medline_citation.get("PMID", "") if isinstance(pmid, dict) and "content" in pmid: article_data["pmid"] = pmid["content"] else: article_data["pmid"] = str(pmid) # Extract DOI - Final attempt with careful iteration doi = "" try: pubmed_data = record.get("PubmedData") if pubmed_data: article_id_list = pubmed_data.get("ArticleIdList") # Iterate through article_id_list if it exists and is iterable if article_id_list: try: for id_element in article_id_list: # Check if the element has attributes and the IdType is 'doi' # Handles Bio.Entrez.Parser.StringElement and similar objects if hasattr(id_element, 'attributes') and id_element.attributes.get('IdType') == 'doi': doi = str(id_element).strip() # Get the string value if doi: break # Found DOI, exit loop # Fallback check for plain dictionary structure (less common) elif isinstance(id_element, dict) and id_element.get('IdType') == 'doi': doi = id_element.get('content', '').strip() or id_element.get('#text', '').strip() if doi: break # Found DOI, exit loop except TypeError: # Handle cases where article_id_list might not be iterable (e.g., single element) # Check if the single element itself is the DOI if hasattr(article_id_list, 'attributes') and article_id_list.attributes.get('IdType') == 'doi': doi = str(article_id_list).strip() except Exception as e: print(f"Warning: Error during DOI extraction for PMID {article_data.get('pmid', 'N/A')}: {e}") doi = "" # Reset DOI on error article_data["doi"] = doi return article_data def export_to_txt(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str: """ Export articles to a formatted text file. Args: articles: List of article dictionaries filename: Optional output filename Returns: Path to the created file """ if not filename: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"pubmed_results_{timestamp}.txt" filepath = os.path.join(self.results_dir, filename) with open(filepath, 'w', encoding='utf-8') as f: for i, article in enumerate(articles, 1): f.write(f"Article {i}\n") f.write("-" * 80 + "\n") f.write(f"Title: {article.get('title', '')}\n") f.write(f"Authors: {', '.join(article.get('authors', []))}\n") f.write(f"Journal: {article.get('journal', '')}\n") f.write(f"Publication Date: {article.get('publication_date', '')}\n") f.write(f"Abstract:\n{article.get('abstract', '')}\n") f.write(f"Keywords: {', '.join(article.get('keywords', []))}\n") f.write(f"PMID: {article.get('pmid', '')}\n") f.write(f"DOI: https://doi.org/{article.get('doi', '')}\n") f.write("=" * 80 + "\n\n") logger.info(f"Exported {len(articles)} articles to {filepath}") return filepath def export_to_json(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str: """ Export articles to JSON format file. Args: articles: List of article dictionaries filename: Optional output filename Returns: Path to the created file """ if not filename: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"pubmed_results_{timestamp}.json" filepath = os.path.join(self.results_dir, filename) with open(filepath, 'w', encoding='utf-8') as f: json.dump({ "metadata": { "export_time": datetime.now().isoformat(), "article_count": len(articles) }, "articles": articles }, f, ensure_ascii=False, indent=2) logger.info(f"Exported {len(articles)} articles to {filepath}") return filepath ```