#
tokens: 8362/50000 4/4 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── LICENSE
├── pubmearch
│   ├── __init__.py
│   ├── analyzer.py
│   ├── pubmed_searcher.py
│   └── server.py
├── pyproject.toml
└── README.md
```

# Files

--------------------------------------------------------------------------------
/pubmearch/__init__.py:
--------------------------------------------------------------------------------

```python
"""
PubMed Analysis MCP Server Package
""" 
```

--------------------------------------------------------------------------------
/pubmearch/analyzer.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
PubMed Analysis Module

This module provides analysis functionality for PubMed search results, 
including research hotspots, trends, and publication statistics.
"""

import os
import re
import json
from datetime import datetime
from collections import Counter, defaultdict
from typing import Dict, List, Optional, Tuple, Any, Union


class PubMedAnalyzer:
    """Class to analyze PubMed search results from text files."""
    
    def __init__(self, results_dir: str = "../results"):
        """
        Initialize the PubMed analyzer.
        
        Args:
            results_dir: Directory containing PubMed search result text files
        """
        self.results_dir = results_dir
        
    def parse_results_file(self, filepath: str) -> List[Dict[str, Any]]:
        """
        Parse a PubMed results file (txt or json) into structured data.
        
        Args:
            filepath: Path to the results file
            
        Returns:
            List of dictionaries containing structured article data
        """
        if not os.path.exists(filepath):
            raise FileNotFoundError(f"File not found: {filepath}")
        
        # Choose parsing method based on file extension
        if filepath.endswith('.json'):
            return self._parse_json_file(filepath)
        else:
            return self._parse_txt_file(filepath)

    def _parse_json_file(self, filepath: str) -> List[Dict[str, Any]]:
        """Parse a JSON results file."""
        with open(filepath, 'r', encoding='utf-8') as f:
            data = json.load(f)
            return data.get("articles", [])

    def _parse_txt_file(self, filepath: str) -> List[Dict[str, Any]]:
        """Parse a text results file."""
        articles = []
        current_article = None
        section = None
        
        with open(filepath, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            
            i = 0
            while i < len(lines):
                line = lines[i].strip()
                
                # New article marker
                if line.startswith("Article ") and "-" * 10 in lines[i+1]:
                    if current_article:
                        articles.append(current_article)
                    current_article = {
                        "title": "",
                        "authors": [],
                        "journal": "",
                        "publication_date": "",
                        "abstract": "",
                        "keywords": [],
                        "pmid": "",
                        "doi": ""
                    }
                    section = None
                    i += 2  # Skip the separator line
                
                # Section headers
                elif line.startswith("Title: "):
                    current_article["title"] = line[7:].strip()
                    section = "title"
                elif line.startswith("Authors: "):
                    authors_line = line[9:].strip()
                    if authors_line != "N/A":
                        current_article["authors"] = [a.strip() for a in authors_line.split(",")]
                    section = None
                elif line.startswith("Journal: "):
                    current_article["journal"] = line[9:].strip()
                    section = None
                elif line.startswith("Publication Date: "):
                    current_article["publication_date"] = line[18:].strip()
                    section = None
                elif line == "Abstract:":
                    section = "abstract"
                elif line.startswith("Keywords: "):
                    keywords_line = line[10:].strip()
                    current_article["keywords"] = [k.strip() for k in keywords_line.split(",")]
                    section = None
                elif line.startswith("PMID: "):
                    current_article["pmid"] = line[6:].strip()
                    section = None
                elif line.startswith("DOI: "):
                    current_article["doi"] = line[5:].strip()
                    section = None
                elif line.startswith("=" * 20):
                    section = None
                
                # Content sections
                elif section == "abstract" and line and not line.startswith("Keywords: "):
                    current_article["abstract"] += line + " "
                
                i += 1
            
            # Add the last article
            if current_article:
                articles.append(current_article)
        
        return articles

    def extract_publication_dates(self, articles: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]:
        """
        Extract and parse publication dates from articles.
        
        Args:
            articles: List of article dictionaries
            
        Returns:
            List of tuples containing (article_title, publication_date)
        """
        publication_dates = []
        
        for article in articles:
            date_str = article.get("publication_date", "")
            
            # Try different formats
            parsed_date = None
            
            # Format: YYYY MMM
            if re.match(r"^\d{4} [A-Za-z]{3}$", date_str):
                try:
                    parsed_date = datetime.strptime(date_str, "%Y %b")
                except ValueError:
                    pass
            
            # Format: YYYY MMM DD
            elif re.match(r"^\d{4} [A-Za-z]{3} \d{1,2}$", date_str):
                try:
                    parsed_date = datetime.strptime(date_str, "%Y %b %d")
                except ValueError:
                    pass
            
            # Format: YYYY MMM-MMM
            elif re.match(r"^\d{4} [A-Za-z]{3}-[A-Za-z]{3}$", date_str):
                try:
                    # Just use the first month
                    month_part = date_str.split(" ")[1].split("-")[0]
                    parsed_date = datetime.strptime(f"{date_str.split(' ')[0]} {month_part}", "%Y %b")
                except (ValueError, IndexError):
                    pass
            
            # Format: YYYY
            elif re.match(r"^\d{4}$", date_str):
                try:
                    parsed_date = datetime.strptime(date_str, "%Y")
                except ValueError:
                    pass
            
            if parsed_date:
                publication_dates.append((article.get("title", ""), parsed_date))
        
        return publication_dates
    
    def analyze_research_keywords(self, articles: List[Dict[str, Any]], top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
        """
        Analyze research hotspots and trends based on keyword frequencies.
        
        Args:
            articles: List of article dictionaries
            top_n: Number of top keywords to include
            include_trends: Bool indicating whether to include trend analysis, default True.
            
        Returns:
            Dictionary with analysis results
        """
        # Extract all keywords
        all_keywords = []
        for article in articles:
            all_keywords.extend(article.get("keywords", []))
        
        # Count keyword frequencies
        keyword_counts = Counter(all_keywords)
        
        # Get top keywords
        top_keywords = keyword_counts.most_common(top_n)
        
        # Organize articles by keyword
        keyword_articles = defaultdict(list)
        for article in articles:
            article_keywords = article.get("keywords", [])
            for kw in article_keywords:
                if kw in dict(top_keywords):
                    keyword_articles[kw].append({
                        "title": article.get("title", ""),
                        "authors": article.get("authors", []),
                        "journal": article.get("journal", ""),
                        "publication_date": article.get("publication_date", ""),
                        "pmid": article.get("pmid", ""),
                        "doi": article.get("doi", "")  
                    })
        
        # Prepare results
        results = {
            "top_keywords": [{"keyword": kw, "count": count} for kw, count in top_keywords],
            "keyword_articles": {kw: articles for kw, articles in keyword_articles.items()}
        }
        
        # 如果需要趋势分析
        if include_trends:
            # 提取发布日期
            pub_dates = self.extract_publication_dates(articles)
            
            # 按月份分组
            monthly_keyword_counts = defaultdict(lambda: defaultdict(int))
            
            for article in articles:
                date_str = article.get("publication_date", "")
                article_keywords = article.get("keywords", [])
                
                # 尝试解析日期
                parsed_date = None
                for title, date in pub_dates:
                    if title == article.get("title", ""):
                        parsed_date = date
                        break
                
                if parsed_date:
                    month_key = parsed_date.strftime("%Y-%m")
                    for kw in article_keywords:
                        if kw in dict(top_keywords):
                            monthly_keyword_counts[month_key][kw] += 1
            
            # 转换为可排序格式并按日期排序
            sorted_months = sorted(monthly_keyword_counts.keys())
            
            # 准备趋势数据
            trend_data = {
                "months": sorted_months,
                "keywords": [kw for kw, _ in top_keywords],
                "counts": []
            }
            
            for keyword, _ in top_keywords:
                keyword_trend = []
                for month in sorted_months:
                    keyword_trend.append(monthly_keyword_counts[month][keyword])
                trend_data["counts"].append({
                    "keyword": keyword,
                    "monthly_counts": keyword_trend
                })
            
            results["trends"] = trend_data
        
        return results
    
    def analyze_publication_count(self, articles: List[Dict[str, Any]], months_per_period: int = 3) -> Dict[str, Any]:
        """
        Analyze publication counts over time.
        
        Args:
            articles: List of article dictionaries
            months_per_period: Number of months to group by
            
        Returns:
            Dictionary with publication count analysis
        """
        # Extract publication dates
        pub_dates = self.extract_publication_dates(articles)
        
        # Group by period
        period_counts = defaultdict(int)
        
        for _, date in pub_dates:
            # Calculate period key based on months_per_period
            year = date.year
            month = date.month
            period = (month - 1) // months_per_period
            period_key = f"{year}-P{period+1}"  # 1-indexed periods
            
            period_counts[period_key] += 1
        
        # Sort periods chronologically
        sorted_periods = sorted(period_counts.keys())
        
        # Prepare result
        results = {
            "periods": sorted_periods,
            "counts": [period_counts[period] for period in sorted_periods],
            "months_per_period": months_per_period,
            "total_publications": len(pub_dates)
        }
        
        return results
    
    def generate_comprehensive_analysis(self, filepath: str, top_keywords: int = 20,
                                     months_per_period: int = 3) -> Dict[str, Any]:
        """
        Generate a comprehensive analysis of PubMed results from a file.
        
        Args:
            filepath: Path to the results text file
            top_keywords: Number of top keywords for hotspot analysis
            months_per_period: Number of months per period for publication count
            
        Returns:
            Dictionary with comprehensive analysis results
        """
        try:
            articles = self.parse_results_file(filepath)
            
            if not articles:
                return {"error": "No articles found in the file."}
            
            # Generate analysis components
            keyword_analysis = self.analyze_research_keywords(articles, top_keywords)
            pub_counts = self.analyze_publication_count(articles, months_per_period)
            
            # Combine results
            results = {
                "file_analyzed": os.path.basename(filepath),
                "analysis_timestamp": datetime.now().isoformat(),
                "article_count": len(articles),
                "keyword_analysis": keyword_analysis,
                "publication_counts": pub_counts
            }
            
            return results
            
        except Exception as e:
            return {"error": str(e)}
    
    def list_result_files(self) -> List[str]:
        """
        List all result files in the results directory.
        
        Returns:
            List of filenames
        """
        if not os.path.exists(self.results_dir):
            return []
        
        return [f for f in os.listdir(self.results_dir) if f.endswith('.txt')]

```

--------------------------------------------------------------------------------
/pubmearch/server.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
PubMed Analysis MCP Server

This module implements an MCP server for analyzing PubMed search results,
providing tools to identify research hotspots, trends, and publication statistics.

Note:
    - Firstly, always use search_pubmed pubmearch.tool to generate new results.
    - Secondly, for results analysis, always use JSON format files.
"""

import os
import sys
import subprocess
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Union

# Add parent directory to path to import PubMedSearcher from parent
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from .pubmed_searcher import PubMedSearcher
from .analyzer import PubMedAnalyzer

# Import FastMCP
from mcp.server.fastmcp import FastMCP, Context

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler(os.path.join(parent_dir, "pubmed_server.log")),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("pubmed-mcp-server")

# make sure results directory exists
now = datetime.now()
time_string = now.strftime("%Y%m%d%H%M%S")
results_dir = Path(__file__).resolve().parent / "results"
os.makedirs(results_dir, exist_ok=True)
logger.info(f"Results directory: {results_dir}")

# Initialize analyzer
analyzer = PubMedAnalyzer(results_dir=results_dir)

# Initialize MCP server
pubmearch = FastMCP(
    "PubMed Analyzer",
    description="MCP server for analyzing PubMed search results"
)

@pubmearch.tool()
async def search_pubmed(
    advanced_search: str,
    start_date: Optional[str] = None,
    end_date: Optional[str] = None,
    max_results: int = 1000,
    output_filename: Optional[str] = None,
) -> Dict[str, Any]:
    try:
        logger.info(f"Starting PubMed search with query: {advanced_search}")
        NCBI_USER_EMAIL = os.getenv('NCBI_USER_EMAIL')
        NCBI_USER_API_KEY = os.getenv('NCBI_USER_API_KEY')

        if not NCBI_USER_EMAIL:
            logger.error("Email not provided and NCBI_USER_EMAIL environment variable not set")
            return {
                "success": False,
                "error": "Server configuration error: NCBI User Email is not set."
            }
        logger.info(f"Use email address: {NCBI_USER_EMAIL}")
        
        if NCBI_USER_API_KEY:
            logger.info(f"Using API key from environment.")
        else:
            logger.warning(f"NCBI_USER_API_KEY environment variable not found. Proceeding without API key.")
        
        searcher = PubMedSearcher(email = NCBI_USER_EMAIL, api_key = NCBI_USER_API_KEY)
        
        # Create date range if dates are provided
        # Note: The formats of start_date and end_date is always YYYY/MM/DD
        date_range = None
        if start_date or end_date:
            # Validate date formats
            date_pattern = re.compile(r'^\d{4}/\d{2}/\d{2}$')
            if start_date and not date_pattern.match(start_date):
                raise ValueError(f"Invalid start_date format: {start_date}. Must be YYYY/MM/DD")
            if end_date and not date_pattern.match(end_date):
                raise ValueError(f"Invalid end_date format: {end_date}. Must be YYYY/MM/DD")
            
            date_range = (start_date, end_date) if start_date and end_date else None
        
        # Perform search
        records = searcher.search(
            advanced_search=advanced_search,
            date_range=date_range,
            max_results=max_results
        )
        
        if not records:
            logger.warning("No results found for the search criteria")
            return {
                "success": False,
                "error": "No results found for the given criteria."
            }
        
        # Export both TXT and JSON formats
        if not output_filename:
            base_filename = f"pubmed_results_{time_string}"
            json_filename = f"{base_filename}.json"
            txt_filename = f"{base_filename}.txt"
        else:
            # Remove any existing extension
            base_filename = output_filename.rsplit('.', 1)[0] + f"_{time_string}"
            json_filename = f"{base_filename}.json"
            txt_filename = f"{base_filename}.txt"
        
        # Export both formats
        json_path = os.path.abspath(searcher.export_to_json(records, json_filename))
        txt_path = os.path.abspath(searcher.export_to_txt(records, txt_filename))
        
        # Verify if files were saved successfully
        if not os.path.exists(json_path):
            logger.error(f"Failed to create JSON file at {json_path}")
            return {
                "success": False,
                "error": f"Failed to save JSON results file."
            }
            
        logger.info(f"Successfully saved {len(records)} articles to JSON: {json_path}")
        
        return {
            "success": True,
            "message": f"Search completed successfully. Found {len(records)} articles.",
            "json_file": os.path.basename(json_path),
            "txt_file": os.path.basename(txt_path),
            "note": "JSON files are recommended for AI model analysis.",
            "article_count": len(records)
        }
        
    except ValueError as ve: 
        logger.error(f"ValueError in search_pubmed: {str(ve)}", exc_info=True)
        return {"success": False, "error": str(ve)}
    except Exception as e:
        logger.error(f"Error in search_pubmed: {str(e)}", exc_info=True)
        return {
            "success": False,
            "error": f"Error during search: {str(e)}"
        }

@pubmearch.tool()
async def list_result_files() -> Dict[str, Any]:
    """Lists all available PubMed result files.

    Two types of files are returned:
    - JSON files (recommended): structured data, suitable for AI model analysis
    - TXT files (alternative): plain text format, for backward compatibility
    """
    try:
        logger.info(f"Listing result files in: {results_dir}")
        
        if not os.path.exists(results_dir):
            logger.warning(f"Results directory does not exist: {results_dir}")
            os.makedirs(results_dir, exist_ok=True)
            logger.info(f"Created results directory: {results_dir}")
            return {
                "success": True,
                "files": [],
                "count": 0,
                "directory": results_dir
            }
        
        # Get JSON and TXT files separately
        json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
        txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
        
        return {
            "success": True,
            "files": {
                "recommended": json_files,  # JSON files (recommended)
                "alternative": txt_files    # TXT files (alternative)
            },
            "count": len(json_files) + len(txt_files),
            "directory": results_dir,
            "note": "Always use JSON files first."
        }
    except Exception as e:
        logger.error(f"Error in list_result_files: {str(e)}", exc_info=True)
        return {
            "success": False,
            "error": str(e),
            "directory": results_dir if 'results_dir' in locals() else "unknown"
        }

@pubmearch.tool()
async def analyze_research_keywords(filename: str, top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
    """Analyze the research hotspots and trends in PubMed result files according keywords.
    
    Note: It is recommended to use JSON format files for better analysis results.
    
    Args:
        filename: File name of results. (.json format is recommended)
        top_n: Return the top n hot keywords.
        include_trends: Boolean value to determine whether to include trends analysis. Default is True.
    """
    try:
        filepath = os.path.join(results_dir, filename)
        logger.info(f"Analyzing research keywords from file: {filepath}")
        
        # Check if the file exists
        if not os.path.exists(filepath):
            logger.error(f"File not found: {filepath}")
            # JSON first
            json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
            txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
            return {
                "success": False,
                "error": f"File not found: {filepath}",
                "available_files": {
                    "recommended": json_files,
                    "alternative": txt_files
                },
                "note": "Always use JSON files first."
            }
        
        # Parse the result file
        articles = analyzer.parse_results_file(filepath)
        
        if not articles:
            logger.warning(f"No articles found in file: {filepath}")
            return {
                "success": False,
                "error": "No articles found in the file."
            }
        
        # Analyze keywords
        analysis_results = analyzer.analyze_research_keywords(articles, top_n, include_trends)
        
        return {
            "success": True,
            "file_analyzed": filename,
            "article_count": len(articles),
            "keyword_analysis": analysis_results
        }
        
    except Exception as e:
        logger.error(f"Error in analyze_research_keywords: {str(e)}", exc_info=True)
        return {
            "success": False,
            "error": str(e)
        }

@pubmearch.tool()
async def analyze_publication_count(filename: str, months_per_period: int = 3) -> Dict[str, Any]:
    """Analyze publication counts over time from a PubMed results file.
    
    Note: It is recommended to use JSON format files for better analysis results.
    
    Args:
        filename: File name of results. (.json format is recommended)
        months_per_period: Number of months per analysis period
    """
    try:
        filepath = os.path.join(results_dir, filename)
        logger.info(f"Analyzing publication counts from file: {filepath}")
        
        # Check if the file exists
        if not os.path.exists(filepath):
            logger.error(f"File not found: {filepath}")
            json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
            txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
            return {
                "success": False,
                "error": f"File not found: {filepath}",
                "available_files": {
                    "recommended": json_files,
                    "alternative": txt_files
                },
                "note": "Always use JSON files first."
            }
        
        # Parse the result file
        articles = analyzer.parse_results_file(filepath)
        
        if not articles:
            logger.warning(f"No articles found in file: {filepath}")
            return {
                "success": False,
                "error": "No articles found in the file."
            }
        
        # Analyze publication counts
        pub_counts = analyzer.analyze_publication_count(articles, months_per_period)
        
        return {
            "success": True,
            "file_analyzed": filename,
            "article_count": len(articles),
            "publication_counts": pub_counts
        }
        
    except Exception as e:
        logger.error(f"Error in analyze_publication_count: {str(e)}", exc_info=True)
        return {
            "success": False,
            "error": str(e)
        }

@pubmearch.tool()
async def generate_comprehensive_analysis(
    filename: str,
    top_keywords: int = 20,
    months_per_period: int = 3
) -> Dict[str, Any]:
    """Generate a comprehensive analysis of a PubMed results file.
    
    Note: It is recommended to use JSON format files for better analysis results.
    
    Args:
        filename: File name of results. (.json format is recommended)
        top_keywords: Number of top keywords to analyze
        months_per_period: Number of months per analysis period
    """
    try:
        filepath = os.path.join(results_dir, filename)
        logger.info(f"Generating comprehensive analysis from file: {filepath}")
        
        # Check if the file exists
        if not os.path.exists(filepath):
            logger.error(f"File not found: {filepath}")
            json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
            txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
            return {
                "success": False,
                "error": f"File not found: {filepath}",
                "available_files": {
                    "recommended": json_files,
                    "alternative": txt_files
                },
                "note": "Always use JSON files first."
            }
        
        # Generate comprehensive analysis directly
        results = analyzer.generate_comprehensive_analysis(
            filepath,
            top_keywords=top_keywords,
            months_per_period=months_per_period
        )
        
        if "error" in results:
            logger.error(f"Error in analysis: {results['error']}")
            return {
                "success": False,
                "error": results["error"]
            }
        
        logger.info("Comprehensive analysis completed successfully")
        return {
            "success": True,
            "analysis": results
        }
        
    except Exception as e:
        logger.error(f"Error in generate_comprehensive_analysis: {str(e)}", exc_info=True)
        return {
            "success": False,
            "error": str(e)
        }

if __name__ == "__main__":
    os.makedirs(results_dir, exist_ok=True)
    pubmearch.run()
```

--------------------------------------------------------------------------------
/pubmearch/pubmed_searcher.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
PubMed Searcher Module

This module provides functionality for searching PubMed and retrieving article data.
"""

import os
import re
import time
import json
import logging
from datetime import datetime
from typing import List, Dict, Tuple, Optional, Any, Union
from Bio import Entrez
from pathlib import Path


# Configure logging
logger = logging.getLogger(__name__)

class PubMedSearcher:
    """Class to search PubMed and retrieve article data."""
    
    def __init__(self, email: Optional[str] = None, results_dir: Optional[str] = None, api_key: Optional[str] = None):
        """
        Initialize PubMed searcher with email address in .env.
        
        Args:
            email: Email address for Entrez. If None, use NCBI_USER_EMAIL from environment variables.
            results_dir: Optional custom results directory path
            api_key: API key for NCBI. If None, use NCBI_USER_API_KEY from environment variables.
        """
        # use NCBI_USER_EMAIL from .env if email is not provided
        self.email = email if email is not None else os.getenv('NCBI_USER_EMAIL')
        self.api_key = api_key if api_key is not None else os.getenv('NCBI_USER_API_KEY')
        if not self.email:
            raise ValueError("Email is required. Either pass it directly or set NCBI_USER_EMAIL in .env")
        
        # Set up Entrez
        Entrez.email = self.email
        Entrez.api_key = self.api_key
        
        # Use provided results directory or create default
        self.results_dir = Path(results_dir) if results_dir else Path(__file__).resolve().parent / "results"
        os.makedirs(self.results_dir, exist_ok=True)
        logger.info(f"Using results directory: {self.results_dir}")
    
    def search(self, 
              advanced_search: str, 
              date_range: Optional[Tuple[str, str]] = None,
              max_results: int = 1000) -> List[Dict[str, Any]]:
        """
        Search PubMed using advanced search syntax.
        
        Args:
            advanced_search: PubMed advanced search query
            date_range: Optional tuple of (start_date, end_date), 
                        date format is always YYYY/MM/DD
            max_results: Maximum number of results to retrieve
            
        Returns:
            List of article dictionaries
        """
        search_term = advanced_search
        
        # Add date range to query if provided
        # Note: The formats of start_date and end_date is always YYYY/MM/DD
        if date_range:
            start_date, end_date = date_range
            date_filter = ""
            
            # start_date
            if start_date:
                date_filter += f" AND ('{start_date}'[Date - Publication]"
                if end_date:
                    date_filter += f" : '{end_date}'[Date - Publication]"
                date_filter += ")"
            # if only end_date, set start_date to 1900/01/01 for inclusio
            elif end_date:
                date_filter += f" AND ('1900/01/01'[Date - Publication] : '{end_date}'[Date - Publication])"
            
            search_term += date_filter
        
        try:
            # Search PubMed
            logger.info(f"Searching PubMed with query: {search_term}")
            search_handle = Entrez.esearch(db="pubmed", term=search_term, retmax=max_results, usehistory="y")
            search_results = Entrez.read(search_handle)
            search_handle.close()
            
            webenv = search_results["WebEnv"]
            query_key = search_results["QueryKey"]
            
            # Get the count of results
            count = int(search_results["Count"])
            logger.info(f"Found {count} results, retrieving up to {max_results}")
            
            if count == 0:
                logger.warning("No results found")
                return []
            
            # Initialize an empty list to store articles
            articles = []
            
            # Fetch results in batches to avoid timeouts
            batch_size = 100
            for start in range(0, min(count, max_results), batch_size):
                end = min(count, start + batch_size, max_results)
                logger.info(f"Retrieving records {start+1} to {end}")
                
                try:
                    # Fetch the records
                    fetch_handle = Entrez.efetch(
                        db="pubmed", 
                        retstart=start, 
                        retmax=batch_size,
                        webenv=webenv,
                        query_key=query_key,
                        retmode="xml"
                    )
                    
                    # Parse the records
                    records = Entrez.read(fetch_handle)["PubmedArticle"]
                    fetch_handle.close()
                    
                    # Process each record
                    for record in records:
                        article = self._parse_pubmed_record(record)
                        articles.append(article)
                    
                    # Sleep to avoid overloading the NCBI server
                    time.sleep(1)
                    
                except Exception as e:
                    logger.error(f"Error fetching batch {start+1} to {end}: {str(e)}")
                    continue
            
            return articles
            
        except Exception as e:
            logger.error(f"Error searching PubMed: {str(e)}")
            return []
    
    def _parse_pubmed_record(self, record: Dict) -> Dict[str, Any]:
        """
        Parse a PubMed record into a structured article dictionary.
        
        Args:
            record: PubMed record from Entrez.read
            
        Returns:
            Dictionary containing structured article data
        """
        article_data = {}
        
        # Get MedlineCitation and Article
        medline_citation = record.get("MedlineCitation", {})
        article = medline_citation.get("Article", {})
        
        # Extract basic article information
        article_data["title"] = article.get("ArticleTitle", "")
        
        # Extract authors
        authors = []
        author_list = article.get("AuthorList", [])
        for author in author_list:
            if "LastName" in author and "ForeName" in author:
                authors.append(f"{author['LastName']} {author['ForeName']}")
            elif "LastName" in author and "Initials" in author:
                authors.append(f"{author['LastName']} {author['Initials']}")
            elif "LastName" in author:
                authors.append(author["LastName"])
            elif "CollectiveName" in author:
                authors.append(author["CollectiveName"])
        article_data["authors"] = authors
        
        # Extract journal information
        journal = article.get("Journal", {})
        article_data["journal"] = journal.get("Title", "")
        
        # Extract publication date
        pub_date = {}
        journal_issue = journal.get("JournalIssue", {})
        if "PubDate" in journal_issue:
            pub_date = journal_issue["PubDate"]
        
        pub_date_str = ""
        if "Year" in pub_date:
            pub_date_str = pub_date["Year"]
            if "Month" in pub_date:
                pub_date_str += f" {pub_date['Month']}"
                if "Day" in pub_date:
                    pub_date_str += f" {pub_date['Day']}"
                    
        article_data["publication_date"] = pub_date_str
        
        # Extract abstract
        abstract_text = ""
        if "Abstract" in article and "AbstractText" in article["Abstract"]:
            # Handle different abstract formats
            abstract_parts = article["Abstract"]["AbstractText"]
            if isinstance(abstract_parts, list):
                for part in abstract_parts:
                    if isinstance(part, str):
                        abstract_text += part + " "
                    elif isinstance(part, dict) and "#text" in part:
                        label = part.get("Label", "")
                        text = part["#text"]
                        if label:
                            abstract_text += f"{label}: {text} "
                        else:
                            abstract_text += text + " "
            else:
                abstract_text = str(abstract_parts)
        
        article_data["abstract"] = abstract_text.strip()
        
        # Extract keywords
        keywords = []
        # MeSH headings
        mesh_headings = medline_citation.get("MeshHeadingList", [])
        for heading in mesh_headings:
            if "DescriptorName" in heading:
                descriptor = heading["DescriptorName"]
                if isinstance(descriptor, dict) and "content" in descriptor:
                    keywords.append(descriptor["content"])
                elif isinstance(descriptor, str):
                    keywords.append(descriptor)
        
        # Keywords from KeywordList
        keyword_lists = medline_citation.get("KeywordList", [])
        for keyword_list in keyword_lists:
            if isinstance(keyword_list, list):
                for keyword in keyword_list:
                    if isinstance(keyword, str):
                        keywords.append(keyword)
                    elif isinstance(keyword, dict) and "content" in keyword:
                        keywords.append(keyword["content"])
        
        article_data["keywords"] = keywords
        
        # Extract PMID
        pmid = medline_citation.get("PMID", "")
        if isinstance(pmid, dict) and "content" in pmid:
            article_data["pmid"] = pmid["content"]
        else:
            article_data["pmid"] = str(pmid)
        
        # Extract DOI - Final attempt with careful iteration
        doi = ""
        try:
            pubmed_data = record.get("PubmedData")
            if pubmed_data:
                article_id_list = pubmed_data.get("ArticleIdList")
                # Iterate through article_id_list if it exists and is iterable
                if article_id_list:
                    try:
                        for id_element in article_id_list:
                            # Check if the element has attributes and the IdType is 'doi'
                            # Handles Bio.Entrez.Parser.StringElement and similar objects
                            if hasattr(id_element, 'attributes') and id_element.attributes.get('IdType') == 'doi':
                                doi = str(id_element).strip() # Get the string value
                                if doi: break # Found DOI, exit loop
                            # Fallback check for plain dictionary structure (less common)
                            elif isinstance(id_element, dict) and id_element.get('IdType') == 'doi':
                                doi = id_element.get('content', '').strip() or id_element.get('#text', '').strip()
                                if doi: break # Found DOI, exit loop
                    except TypeError:
                        # Handle cases where article_id_list might not be iterable (e.g., single element)
                        # Check if the single element itself is the DOI
                        if hasattr(article_id_list, 'attributes') and article_id_list.attributes.get('IdType') == 'doi':
                            doi = str(article_id_list).strip()

        except Exception as e:
            print(f"Warning: Error during DOI extraction for PMID {article_data.get('pmid', 'N/A')}: {e}")
            doi = "" # Reset DOI on error
        
        article_data["doi"] = doi
        
        return article_data
    
    def export_to_txt(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
        """
        Export articles to a formatted text file.
        
        Args:
            articles: List of article dictionaries
            filename: Optional output filename
            
        Returns:
            Path to the created file
        """
        if not filename:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"pubmed_results_{timestamp}.txt"
        
        filepath = os.path.join(self.results_dir, filename)
        
        with open(filepath, 'w', encoding='utf-8') as f:
            for i, article in enumerate(articles, 1):
                f.write(f"Article {i}\n")
                f.write("-" * 80 + "\n")
                f.write(f"Title: {article.get('title', '')}\n")
                f.write(f"Authors: {', '.join(article.get('authors', []))}\n")
                f.write(f"Journal: {article.get('journal', '')}\n")
                f.write(f"Publication Date: {article.get('publication_date', '')}\n")
                f.write(f"Abstract:\n{article.get('abstract', '')}\n")
                f.write(f"Keywords: {', '.join(article.get('keywords', []))}\n")
                f.write(f"PMID: {article.get('pmid', '')}\n")
                f.write(f"DOI: https://doi.org/{article.get('doi', '')}\n")
                f.write("=" * 80 + "\n\n")
        
        logger.info(f"Exported {len(articles)} articles to {filepath}")
        return filepath
    
    def export_to_json(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
        """
        Export articles to JSON format file.
        
        Args:
            articles: List of article dictionaries
            filename: Optional output filename
            
        Returns:
            Path to the created file
        """
        if not filename:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"pubmed_results_{timestamp}.json"
        
        filepath = os.path.join(self.results_dir, filename)
        
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump({
                "metadata": {
                    "export_time": datetime.now().isoformat(),
                    "article_count": len(articles)
                },
                "articles": articles
            }, f, ensure_ascii=False, indent=2)
        
        logger.info(f"Exported {len(articles)} articles to {filepath}")
        return filepath

```