# Directory Structure
```
├── .gitignore
├── LICENSE
├── pubmearch
│ ├── __init__.py
│ ├── analyzer.py
│ ├── pubmed_searcher.py
│ └── server.py
├── pyproject.toml
└── README.md
```
# Files
--------------------------------------------------------------------------------
/pubmearch/__init__.py:
--------------------------------------------------------------------------------
```python
"""
PubMed Analysis MCP Server Package
"""
```
--------------------------------------------------------------------------------
/pubmearch/analyzer.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
PubMed Analysis Module
This module provides analysis functionality for PubMed search results,
including research hotspots, trends, and publication statistics.
"""
import os
import re
import json
from datetime import datetime
from collections import Counter, defaultdict
from typing import Dict, List, Optional, Tuple, Any, Union
class PubMedAnalyzer:
"""Class to analyze PubMed search results from text files."""
def __init__(self, results_dir: str = "../results"):
"""
Initialize the PubMed analyzer.
Args:
results_dir: Directory containing PubMed search result text files
"""
self.results_dir = results_dir
def parse_results_file(self, filepath: str) -> List[Dict[str, Any]]:
"""
Parse a PubMed results file (txt or json) into structured data.
Args:
filepath: Path to the results file
Returns:
List of dictionaries containing structured article data
"""
if not os.path.exists(filepath):
raise FileNotFoundError(f"File not found: {filepath}")
# Choose parsing method based on file extension
if filepath.endswith('.json'):
return self._parse_json_file(filepath)
else:
return self._parse_txt_file(filepath)
def _parse_json_file(self, filepath: str) -> List[Dict[str, Any]]:
"""Parse a JSON results file."""
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get("articles", [])
def _parse_txt_file(self, filepath: str) -> List[Dict[str, Any]]:
"""Parse a text results file."""
articles = []
current_article = None
section = None
with open(filepath, 'r', encoding='utf-8') as f:
lines = f.readlines()
i = 0
while i < len(lines):
line = lines[i].strip()
# New article marker
if line.startswith("Article ") and "-" * 10 in lines[i+1]:
if current_article:
articles.append(current_article)
current_article = {
"title": "",
"authors": [],
"journal": "",
"publication_date": "",
"abstract": "",
"keywords": [],
"pmid": "",
"doi": ""
}
section = None
i += 2 # Skip the separator line
# Section headers
elif line.startswith("Title: "):
current_article["title"] = line[7:].strip()
section = "title"
elif line.startswith("Authors: "):
authors_line = line[9:].strip()
if authors_line != "N/A":
current_article["authors"] = [a.strip() for a in authors_line.split(",")]
section = None
elif line.startswith("Journal: "):
current_article["journal"] = line[9:].strip()
section = None
elif line.startswith("Publication Date: "):
current_article["publication_date"] = line[18:].strip()
section = None
elif line == "Abstract:":
section = "abstract"
elif line.startswith("Keywords: "):
keywords_line = line[10:].strip()
current_article["keywords"] = [k.strip() for k in keywords_line.split(",")]
section = None
elif line.startswith("PMID: "):
current_article["pmid"] = line[6:].strip()
section = None
elif line.startswith("DOI: "):
current_article["doi"] = line[5:].strip()
section = None
elif line.startswith("=" * 20):
section = None
# Content sections
elif section == "abstract" and line and not line.startswith("Keywords: "):
current_article["abstract"] += line + " "
i += 1
# Add the last article
if current_article:
articles.append(current_article)
return articles
def extract_publication_dates(self, articles: List[Dict[str, Any]]) -> List[Tuple[str, datetime]]:
"""
Extract and parse publication dates from articles.
Args:
articles: List of article dictionaries
Returns:
List of tuples containing (article_title, publication_date)
"""
publication_dates = []
for article in articles:
date_str = article.get("publication_date", "")
# Try different formats
parsed_date = None
# Format: YYYY MMM
if re.match(r"^\d{4} [A-Za-z]{3}$", date_str):
try:
parsed_date = datetime.strptime(date_str, "%Y %b")
except ValueError:
pass
# Format: YYYY MMM DD
elif re.match(r"^\d{4} [A-Za-z]{3} \d{1,2}$", date_str):
try:
parsed_date = datetime.strptime(date_str, "%Y %b %d")
except ValueError:
pass
# Format: YYYY MMM-MMM
elif re.match(r"^\d{4} [A-Za-z]{3}-[A-Za-z]{3}$", date_str):
try:
# Just use the first month
month_part = date_str.split(" ")[1].split("-")[0]
parsed_date = datetime.strptime(f"{date_str.split(' ')[0]} {month_part}", "%Y %b")
except (ValueError, IndexError):
pass
# Format: YYYY
elif re.match(r"^\d{4}$", date_str):
try:
parsed_date = datetime.strptime(date_str, "%Y")
except ValueError:
pass
if parsed_date:
publication_dates.append((article.get("title", ""), parsed_date))
return publication_dates
def analyze_research_keywords(self, articles: List[Dict[str, Any]], top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
"""
Analyze research hotspots and trends based on keyword frequencies.
Args:
articles: List of article dictionaries
top_n: Number of top keywords to include
include_trends: Bool indicating whether to include trend analysis, default True.
Returns:
Dictionary with analysis results
"""
# Extract all keywords
all_keywords = []
for article in articles:
all_keywords.extend(article.get("keywords", []))
# Count keyword frequencies
keyword_counts = Counter(all_keywords)
# Get top keywords
top_keywords = keyword_counts.most_common(top_n)
# Organize articles by keyword
keyword_articles = defaultdict(list)
for article in articles:
article_keywords = article.get("keywords", [])
for kw in article_keywords:
if kw in dict(top_keywords):
keyword_articles[kw].append({
"title": article.get("title", ""),
"authors": article.get("authors", []),
"journal": article.get("journal", ""),
"publication_date": article.get("publication_date", ""),
"pmid": article.get("pmid", ""),
"doi": article.get("doi", "")
})
# Prepare results
results = {
"top_keywords": [{"keyword": kw, "count": count} for kw, count in top_keywords],
"keyword_articles": {kw: articles for kw, articles in keyword_articles.items()}
}
# 如果需要趋势分析
if include_trends:
# 提取发布日期
pub_dates = self.extract_publication_dates(articles)
# 按月份分组
monthly_keyword_counts = defaultdict(lambda: defaultdict(int))
for article in articles:
date_str = article.get("publication_date", "")
article_keywords = article.get("keywords", [])
# 尝试解析日期
parsed_date = None
for title, date in pub_dates:
if title == article.get("title", ""):
parsed_date = date
break
if parsed_date:
month_key = parsed_date.strftime("%Y-%m")
for kw in article_keywords:
if kw in dict(top_keywords):
monthly_keyword_counts[month_key][kw] += 1
# 转换为可排序格式并按日期排序
sorted_months = sorted(monthly_keyword_counts.keys())
# 准备趋势数据
trend_data = {
"months": sorted_months,
"keywords": [kw for kw, _ in top_keywords],
"counts": []
}
for keyword, _ in top_keywords:
keyword_trend = []
for month in sorted_months:
keyword_trend.append(monthly_keyword_counts[month][keyword])
trend_data["counts"].append({
"keyword": keyword,
"monthly_counts": keyword_trend
})
results["trends"] = trend_data
return results
def analyze_publication_count(self, articles: List[Dict[str, Any]], months_per_period: int = 3) -> Dict[str, Any]:
"""
Analyze publication counts over time.
Args:
articles: List of article dictionaries
months_per_period: Number of months to group by
Returns:
Dictionary with publication count analysis
"""
# Extract publication dates
pub_dates = self.extract_publication_dates(articles)
# Group by period
period_counts = defaultdict(int)
for _, date in pub_dates:
# Calculate period key based on months_per_period
year = date.year
month = date.month
period = (month - 1) // months_per_period
period_key = f"{year}-P{period+1}" # 1-indexed periods
period_counts[period_key] += 1
# Sort periods chronologically
sorted_periods = sorted(period_counts.keys())
# Prepare result
results = {
"periods": sorted_periods,
"counts": [period_counts[period] for period in sorted_periods],
"months_per_period": months_per_period,
"total_publications": len(pub_dates)
}
return results
def generate_comprehensive_analysis(self, filepath: str, top_keywords: int = 20,
months_per_period: int = 3) -> Dict[str, Any]:
"""
Generate a comprehensive analysis of PubMed results from a file.
Args:
filepath: Path to the results text file
top_keywords: Number of top keywords for hotspot analysis
months_per_period: Number of months per period for publication count
Returns:
Dictionary with comprehensive analysis results
"""
try:
articles = self.parse_results_file(filepath)
if not articles:
return {"error": "No articles found in the file."}
# Generate analysis components
keyword_analysis = self.analyze_research_keywords(articles, top_keywords)
pub_counts = self.analyze_publication_count(articles, months_per_period)
# Combine results
results = {
"file_analyzed": os.path.basename(filepath),
"analysis_timestamp": datetime.now().isoformat(),
"article_count": len(articles),
"keyword_analysis": keyword_analysis,
"publication_counts": pub_counts
}
return results
except Exception as e:
return {"error": str(e)}
def list_result_files(self) -> List[str]:
"""
List all result files in the results directory.
Returns:
List of filenames
"""
if not os.path.exists(self.results_dir):
return []
return [f for f in os.listdir(self.results_dir) if f.endswith('.txt')]
```
--------------------------------------------------------------------------------
/pubmearch/server.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
PubMed Analysis MCP Server
This module implements an MCP server for analyzing PubMed search results,
providing tools to identify research hotspots, trends, and publication statistics.
Note:
- Firstly, always use search_pubmed pubmearch.tool to generate new results.
- Secondly, for results analysis, always use JSON format files.
"""
import os
import sys
import subprocess
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
# Add parent directory to path to import PubMedSearcher from parent
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from .pubmed_searcher import PubMedSearcher
from .analyzer import PubMedAnalyzer
# Import FastMCP
from mcp.server.fastmcp import FastMCP, Context
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(os.path.join(parent_dir, "pubmed_server.log")),
logging.StreamHandler()
]
)
logger = logging.getLogger("pubmed-mcp-server")
# make sure results directory exists
now = datetime.now()
time_string = now.strftime("%Y%m%d%H%M%S")
results_dir = Path(__file__).resolve().parent / "results"
os.makedirs(results_dir, exist_ok=True)
logger.info(f"Results directory: {results_dir}")
# Initialize analyzer
analyzer = PubMedAnalyzer(results_dir=results_dir)
# Initialize MCP server
pubmearch = FastMCP(
"PubMed Analyzer",
description="MCP server for analyzing PubMed search results"
)
@pubmearch.tool()
async def search_pubmed(
advanced_search: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
max_results: int = 1000,
output_filename: Optional[str] = None,
) -> Dict[str, Any]:
try:
logger.info(f"Starting PubMed search with query: {advanced_search}")
NCBI_USER_EMAIL = os.getenv('NCBI_USER_EMAIL')
NCBI_USER_API_KEY = os.getenv('NCBI_USER_API_KEY')
if not NCBI_USER_EMAIL:
logger.error("Email not provided and NCBI_USER_EMAIL environment variable not set")
return {
"success": False,
"error": "Server configuration error: NCBI User Email is not set."
}
logger.info(f"Use email address: {NCBI_USER_EMAIL}")
if NCBI_USER_API_KEY:
logger.info(f"Using API key from environment.")
else:
logger.warning(f"NCBI_USER_API_KEY environment variable not found. Proceeding without API key.")
searcher = PubMedSearcher(email = NCBI_USER_EMAIL, api_key = NCBI_USER_API_KEY)
# Create date range if dates are provided
# Note: The formats of start_date and end_date is always YYYY/MM/DD
date_range = None
if start_date or end_date:
# Validate date formats
date_pattern = re.compile(r'^\d{4}/\d{2}/\d{2}$')
if start_date and not date_pattern.match(start_date):
raise ValueError(f"Invalid start_date format: {start_date}. Must be YYYY/MM/DD")
if end_date and not date_pattern.match(end_date):
raise ValueError(f"Invalid end_date format: {end_date}. Must be YYYY/MM/DD")
date_range = (start_date, end_date) if start_date and end_date else None
# Perform search
records = searcher.search(
advanced_search=advanced_search,
date_range=date_range,
max_results=max_results
)
if not records:
logger.warning("No results found for the search criteria")
return {
"success": False,
"error": "No results found for the given criteria."
}
# Export both TXT and JSON formats
if not output_filename:
base_filename = f"pubmed_results_{time_string}"
json_filename = f"{base_filename}.json"
txt_filename = f"{base_filename}.txt"
else:
# Remove any existing extension
base_filename = output_filename.rsplit('.', 1)[0] + f"_{time_string}"
json_filename = f"{base_filename}.json"
txt_filename = f"{base_filename}.txt"
# Export both formats
json_path = os.path.abspath(searcher.export_to_json(records, json_filename))
txt_path = os.path.abspath(searcher.export_to_txt(records, txt_filename))
# Verify if files were saved successfully
if not os.path.exists(json_path):
logger.error(f"Failed to create JSON file at {json_path}")
return {
"success": False,
"error": f"Failed to save JSON results file."
}
logger.info(f"Successfully saved {len(records)} articles to JSON: {json_path}")
return {
"success": True,
"message": f"Search completed successfully. Found {len(records)} articles.",
"json_file": os.path.basename(json_path),
"txt_file": os.path.basename(txt_path),
"note": "JSON files are recommended for AI model analysis.",
"article_count": len(records)
}
except ValueError as ve:
logger.error(f"ValueError in search_pubmed: {str(ve)}", exc_info=True)
return {"success": False, "error": str(ve)}
except Exception as e:
logger.error(f"Error in search_pubmed: {str(e)}", exc_info=True)
return {
"success": False,
"error": f"Error during search: {str(e)}"
}
@pubmearch.tool()
async def list_result_files() -> Dict[str, Any]:
"""Lists all available PubMed result files.
Two types of files are returned:
- JSON files (recommended): structured data, suitable for AI model analysis
- TXT files (alternative): plain text format, for backward compatibility
"""
try:
logger.info(f"Listing result files in: {results_dir}")
if not os.path.exists(results_dir):
logger.warning(f"Results directory does not exist: {results_dir}")
os.makedirs(results_dir, exist_ok=True)
logger.info(f"Created results directory: {results_dir}")
return {
"success": True,
"files": [],
"count": 0,
"directory": results_dir
}
# Get JSON and TXT files separately
json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
return {
"success": True,
"files": {
"recommended": json_files, # JSON files (recommended)
"alternative": txt_files # TXT files (alternative)
},
"count": len(json_files) + len(txt_files),
"directory": results_dir,
"note": "Always use JSON files first."
}
except Exception as e:
logger.error(f"Error in list_result_files: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e),
"directory": results_dir if 'results_dir' in locals() else "unknown"
}
@pubmearch.tool()
async def analyze_research_keywords(filename: str, top_n: int = 20, include_trends: bool = True) -> Dict[str, Any]:
"""Analyze the research hotspots and trends in PubMed result files according keywords.
Note: It is recommended to use JSON format files for better analysis results.
Args:
filename: File name of results. (.json format is recommended)
top_n: Return the top n hot keywords.
include_trends: Boolean value to determine whether to include trends analysis. Default is True.
"""
try:
filepath = os.path.join(results_dir, filename)
logger.info(f"Analyzing research keywords from file: {filepath}")
# Check if the file exists
if not os.path.exists(filepath):
logger.error(f"File not found: {filepath}")
# JSON first
json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
return {
"success": False,
"error": f"File not found: {filepath}",
"available_files": {
"recommended": json_files,
"alternative": txt_files
},
"note": "Always use JSON files first."
}
# Parse the result file
articles = analyzer.parse_results_file(filepath)
if not articles:
logger.warning(f"No articles found in file: {filepath}")
return {
"success": False,
"error": "No articles found in the file."
}
# Analyze keywords
analysis_results = analyzer.analyze_research_keywords(articles, top_n, include_trends)
return {
"success": True,
"file_analyzed": filename,
"article_count": len(articles),
"keyword_analysis": analysis_results
}
except Exception as e:
logger.error(f"Error in analyze_research_keywords: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e)
}
@pubmearch.tool()
async def analyze_publication_count(filename: str, months_per_period: int = 3) -> Dict[str, Any]:
"""Analyze publication counts over time from a PubMed results file.
Note: It is recommended to use JSON format files for better analysis results.
Args:
filename: File name of results. (.json format is recommended)
months_per_period: Number of months per analysis period
"""
try:
filepath = os.path.join(results_dir, filename)
logger.info(f"Analyzing publication counts from file: {filepath}")
# Check if the file exists
if not os.path.exists(filepath):
logger.error(f"File not found: {filepath}")
json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
return {
"success": False,
"error": f"File not found: {filepath}",
"available_files": {
"recommended": json_files,
"alternative": txt_files
},
"note": "Always use JSON files first."
}
# Parse the result file
articles = analyzer.parse_results_file(filepath)
if not articles:
logger.warning(f"No articles found in file: {filepath}")
return {
"success": False,
"error": "No articles found in the file."
}
# Analyze publication counts
pub_counts = analyzer.analyze_publication_count(articles, months_per_period)
return {
"success": True,
"file_analyzed": filename,
"article_count": len(articles),
"publication_counts": pub_counts
}
except Exception as e:
logger.error(f"Error in analyze_publication_count: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e)
}
@pubmearch.tool()
async def generate_comprehensive_analysis(
filename: str,
top_keywords: int = 20,
months_per_period: int = 3
) -> Dict[str, Any]:
"""Generate a comprehensive analysis of a PubMed results file.
Note: It is recommended to use JSON format files for better analysis results.
Args:
filename: File name of results. (.json format is recommended)
top_keywords: Number of top keywords to analyze
months_per_period: Number of months per analysis period
"""
try:
filepath = os.path.join(results_dir, filename)
logger.info(f"Generating comprehensive analysis from file: {filepath}")
# Check if the file exists
if not os.path.exists(filepath):
logger.error(f"File not found: {filepath}")
json_files = [f for f in os.listdir(results_dir) if f.endswith('.json')]
txt_files = [f for f in os.listdir(results_dir) if f.endswith('.txt')]
return {
"success": False,
"error": f"File not found: {filepath}",
"available_files": {
"recommended": json_files,
"alternative": txt_files
},
"note": "Always use JSON files first."
}
# Generate comprehensive analysis directly
results = analyzer.generate_comprehensive_analysis(
filepath,
top_keywords=top_keywords,
months_per_period=months_per_period
)
if "error" in results:
logger.error(f"Error in analysis: {results['error']}")
return {
"success": False,
"error": results["error"]
}
logger.info("Comprehensive analysis completed successfully")
return {
"success": True,
"analysis": results
}
except Exception as e:
logger.error(f"Error in generate_comprehensive_analysis: {str(e)}", exc_info=True)
return {
"success": False,
"error": str(e)
}
if __name__ == "__main__":
os.makedirs(results_dir, exist_ok=True)
pubmearch.run()
```
--------------------------------------------------------------------------------
/pubmearch/pubmed_searcher.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
PubMed Searcher Module
This module provides functionality for searching PubMed and retrieving article data.
"""
import os
import re
import time
import json
import logging
from datetime import datetime
from typing import List, Dict, Tuple, Optional, Any, Union
from Bio import Entrez
from pathlib import Path
# Configure logging
logger = logging.getLogger(__name__)
class PubMedSearcher:
"""Class to search PubMed and retrieve article data."""
def __init__(self, email: Optional[str] = None, results_dir: Optional[str] = None, api_key: Optional[str] = None):
"""
Initialize PubMed searcher with email address in .env.
Args:
email: Email address for Entrez. If None, use NCBI_USER_EMAIL from environment variables.
results_dir: Optional custom results directory path
api_key: API key for NCBI. If None, use NCBI_USER_API_KEY from environment variables.
"""
# use NCBI_USER_EMAIL from .env if email is not provided
self.email = email if email is not None else os.getenv('NCBI_USER_EMAIL')
self.api_key = api_key if api_key is not None else os.getenv('NCBI_USER_API_KEY')
if not self.email:
raise ValueError("Email is required. Either pass it directly or set NCBI_USER_EMAIL in .env")
# Set up Entrez
Entrez.email = self.email
Entrez.api_key = self.api_key
# Use provided results directory or create default
self.results_dir = Path(results_dir) if results_dir else Path(__file__).resolve().parent / "results"
os.makedirs(self.results_dir, exist_ok=True)
logger.info(f"Using results directory: {self.results_dir}")
def search(self,
advanced_search: str,
date_range: Optional[Tuple[str, str]] = None,
max_results: int = 1000) -> List[Dict[str, Any]]:
"""
Search PubMed using advanced search syntax.
Args:
advanced_search: PubMed advanced search query
date_range: Optional tuple of (start_date, end_date),
date format is always YYYY/MM/DD
max_results: Maximum number of results to retrieve
Returns:
List of article dictionaries
"""
search_term = advanced_search
# Add date range to query if provided
# Note: The formats of start_date and end_date is always YYYY/MM/DD
if date_range:
start_date, end_date = date_range
date_filter = ""
# start_date
if start_date:
date_filter += f" AND ('{start_date}'[Date - Publication]"
if end_date:
date_filter += f" : '{end_date}'[Date - Publication]"
date_filter += ")"
# if only end_date, set start_date to 1900/01/01 for inclusio
elif end_date:
date_filter += f" AND ('1900/01/01'[Date - Publication] : '{end_date}'[Date - Publication])"
search_term += date_filter
try:
# Search PubMed
logger.info(f"Searching PubMed with query: {search_term}")
search_handle = Entrez.esearch(db="pubmed", term=search_term, retmax=max_results, usehistory="y")
search_results = Entrez.read(search_handle)
search_handle.close()
webenv = search_results["WebEnv"]
query_key = search_results["QueryKey"]
# Get the count of results
count = int(search_results["Count"])
logger.info(f"Found {count} results, retrieving up to {max_results}")
if count == 0:
logger.warning("No results found")
return []
# Initialize an empty list to store articles
articles = []
# Fetch results in batches to avoid timeouts
batch_size = 100
for start in range(0, min(count, max_results), batch_size):
end = min(count, start + batch_size, max_results)
logger.info(f"Retrieving records {start+1} to {end}")
try:
# Fetch the records
fetch_handle = Entrez.efetch(
db="pubmed",
retstart=start,
retmax=batch_size,
webenv=webenv,
query_key=query_key,
retmode="xml"
)
# Parse the records
records = Entrez.read(fetch_handle)["PubmedArticle"]
fetch_handle.close()
# Process each record
for record in records:
article = self._parse_pubmed_record(record)
articles.append(article)
# Sleep to avoid overloading the NCBI server
time.sleep(1)
except Exception as e:
logger.error(f"Error fetching batch {start+1} to {end}: {str(e)}")
continue
return articles
except Exception as e:
logger.error(f"Error searching PubMed: {str(e)}")
return []
def _parse_pubmed_record(self, record: Dict) -> Dict[str, Any]:
"""
Parse a PubMed record into a structured article dictionary.
Args:
record: PubMed record from Entrez.read
Returns:
Dictionary containing structured article data
"""
article_data = {}
# Get MedlineCitation and Article
medline_citation = record.get("MedlineCitation", {})
article = medline_citation.get("Article", {})
# Extract basic article information
article_data["title"] = article.get("ArticleTitle", "")
# Extract authors
authors = []
author_list = article.get("AuthorList", [])
for author in author_list:
if "LastName" in author and "ForeName" in author:
authors.append(f"{author['LastName']} {author['ForeName']}")
elif "LastName" in author and "Initials" in author:
authors.append(f"{author['LastName']} {author['Initials']}")
elif "LastName" in author:
authors.append(author["LastName"])
elif "CollectiveName" in author:
authors.append(author["CollectiveName"])
article_data["authors"] = authors
# Extract journal information
journal = article.get("Journal", {})
article_data["journal"] = journal.get("Title", "")
# Extract publication date
pub_date = {}
journal_issue = journal.get("JournalIssue", {})
if "PubDate" in journal_issue:
pub_date = journal_issue["PubDate"]
pub_date_str = ""
if "Year" in pub_date:
pub_date_str = pub_date["Year"]
if "Month" in pub_date:
pub_date_str += f" {pub_date['Month']}"
if "Day" in pub_date:
pub_date_str += f" {pub_date['Day']}"
article_data["publication_date"] = pub_date_str
# Extract abstract
abstract_text = ""
if "Abstract" in article and "AbstractText" in article["Abstract"]:
# Handle different abstract formats
abstract_parts = article["Abstract"]["AbstractText"]
if isinstance(abstract_parts, list):
for part in abstract_parts:
if isinstance(part, str):
abstract_text += part + " "
elif isinstance(part, dict) and "#text" in part:
label = part.get("Label", "")
text = part["#text"]
if label:
abstract_text += f"{label}: {text} "
else:
abstract_text += text + " "
else:
abstract_text = str(abstract_parts)
article_data["abstract"] = abstract_text.strip()
# Extract keywords
keywords = []
# MeSH headings
mesh_headings = medline_citation.get("MeshHeadingList", [])
for heading in mesh_headings:
if "DescriptorName" in heading:
descriptor = heading["DescriptorName"]
if isinstance(descriptor, dict) and "content" in descriptor:
keywords.append(descriptor["content"])
elif isinstance(descriptor, str):
keywords.append(descriptor)
# Keywords from KeywordList
keyword_lists = medline_citation.get("KeywordList", [])
for keyword_list in keyword_lists:
if isinstance(keyword_list, list):
for keyword in keyword_list:
if isinstance(keyword, str):
keywords.append(keyword)
elif isinstance(keyword, dict) and "content" in keyword:
keywords.append(keyword["content"])
article_data["keywords"] = keywords
# Extract PMID
pmid = medline_citation.get("PMID", "")
if isinstance(pmid, dict) and "content" in pmid:
article_data["pmid"] = pmid["content"]
else:
article_data["pmid"] = str(pmid)
# Extract DOI - Final attempt with careful iteration
doi = ""
try:
pubmed_data = record.get("PubmedData")
if pubmed_data:
article_id_list = pubmed_data.get("ArticleIdList")
# Iterate through article_id_list if it exists and is iterable
if article_id_list:
try:
for id_element in article_id_list:
# Check if the element has attributes and the IdType is 'doi'
# Handles Bio.Entrez.Parser.StringElement and similar objects
if hasattr(id_element, 'attributes') and id_element.attributes.get('IdType') == 'doi':
doi = str(id_element).strip() # Get the string value
if doi: break # Found DOI, exit loop
# Fallback check for plain dictionary structure (less common)
elif isinstance(id_element, dict) and id_element.get('IdType') == 'doi':
doi = id_element.get('content', '').strip() or id_element.get('#text', '').strip()
if doi: break # Found DOI, exit loop
except TypeError:
# Handle cases where article_id_list might not be iterable (e.g., single element)
# Check if the single element itself is the DOI
if hasattr(article_id_list, 'attributes') and article_id_list.attributes.get('IdType') == 'doi':
doi = str(article_id_list).strip()
except Exception as e:
print(f"Warning: Error during DOI extraction for PMID {article_data.get('pmid', 'N/A')}: {e}")
doi = "" # Reset DOI on error
article_data["doi"] = doi
return article_data
def export_to_txt(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
"""
Export articles to a formatted text file.
Args:
articles: List of article dictionaries
filename: Optional output filename
Returns:
Path to the created file
"""
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"pubmed_results_{timestamp}.txt"
filepath = os.path.join(self.results_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
for i, article in enumerate(articles, 1):
f.write(f"Article {i}\n")
f.write("-" * 80 + "\n")
f.write(f"Title: {article.get('title', '')}\n")
f.write(f"Authors: {', '.join(article.get('authors', []))}\n")
f.write(f"Journal: {article.get('journal', '')}\n")
f.write(f"Publication Date: {article.get('publication_date', '')}\n")
f.write(f"Abstract:\n{article.get('abstract', '')}\n")
f.write(f"Keywords: {', '.join(article.get('keywords', []))}\n")
f.write(f"PMID: {article.get('pmid', '')}\n")
f.write(f"DOI: https://doi.org/{article.get('doi', '')}\n")
f.write("=" * 80 + "\n\n")
logger.info(f"Exported {len(articles)} articles to {filepath}")
return filepath
def export_to_json(self, articles: List[Dict[str, Any]], filename: Optional[str] = None) -> str:
"""
Export articles to JSON format file.
Args:
articles: List of article dictionaries
filename: Optional output filename
Returns:
Path to the created file
"""
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"pubmed_results_{timestamp}.json"
filepath = os.path.join(self.results_dir, filename)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump({
"metadata": {
"export_time": datetime.now().isoformat(),
"article_count": len(articles)
},
"articles": articles
}, f, ensure_ascii=False, indent=2)
logger.info(f"Exported {len(articles)} articles to {filepath}")
return filepath
```