#
tokens: 15535/50000 32/32 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .env.example
├── .gitignore
├── mcp_server.py
├── multi_agents
│   ├── __init__.py
│   ├── agent.py
│   ├── agents
│   │   ├── __init__.py
│   │   ├── editor.py
│   │   ├── human.py
│   │   ├── orchestrator.py
│   │   ├── publisher.py
│   │   ├── researcher.py
│   │   ├── reviewer.py
│   │   ├── reviser.py
│   │   ├── utils
│   │   │   ├── __init__.py
│   │   │   ├── file_formats.py
│   │   │   ├── llms.py
│   │   │   ├── pdf_styles.css
│   │   │   ├── utils.py
│   │   │   └── views.py
│   │   └── writer.py
│   ├── langgraph.json
│   ├── main.py
│   ├── memory
│   │   ├── __init__.py
│   │   ├── draft.py
│   │   └── research.py
│   ├── package.json
│   ├── README.md
│   ├── requirements.txt
│   └── task.json
├── plan.md
├── README.md
└── utils
    └── enum.py
```

# Files

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
LANGSMITH_TRACING=true
LANGSMITH_ENDPOINT="https://api.smith.langchain.com"
LANGSMITH_API_KEY="yourkey"
LANGSMITH_PROJECT="your-project"
OPENAI_API_KEY=yourkey
TAVILY_API_KEY=yourkey
DOC_PATH=./my-docs

# NEXT_PUBLIC_GPTR_API_URL=http://0.0.0.0:8000  # Defaults to localhost:8000 if not set

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
#  Usually these files are written by a python script from a template
#  before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
#   For a library or package, you might want to ignore these files since the code is
#   intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
#   However, in case of collaboration, if having platform-specific dependencies or dependencies
#   having no cross-platform support, pipenv may install dependencies that don't work, or not
#   install all needed dependencies.
#Pipfile.lock

# UV
#   Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#uv.lock

# poetry
#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
#   This is especially recommended for binary packages to ensure reproducibility, and is more
#   commonly ignored for libraries.
#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
#   in version control.
#   https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
#  and can be added to the global gitignore or merged into this file.  For a more nuclear
#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

# PyPI configuration file
.pypirc

# Other
outputs/
.DS_Store
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Deep Research MCP

This repository provides a multi-agent research framework using Python and MCP (Message Control Protocol). The default entrypoint is `mcp_server.py`, which sets up a FastMCP server named **Deep Research** and exposes a tool named `deep_research`.

## Setup

1. **Clone the Repository:**
   ```bash
   git clone https://github.com/yourusername/deep-research-mcp.git
   cd deep-research-mcp
   ```

2. **Create/Populate Your `.env` File:**
   ```bash
   cp .env.example .env
   # Then edit the new .env file to fill in your secrets and environment variables
   # For example:
   # OPENAI_API_KEY=sk-123-yourkey
   # Additional environment variables can be placed here
   ```

3. **Install Dependencies:**
   ```bash
   pip install -r multi_agents/requirements.txt
   ```

4. Edit your `claude_desktop_config.json` file to include the following:
    ```
    {
        "mcpServers": {
            "deep-research-mcp": {
                "command": "path/to/your/python/interpreter",
                "args": [
                    "/path/to/this/project/deep-research-mcp/mcp_server.py"
                ]
            }
        }
    }
    ```

5. **Run the MCP Server:**
   ```bash
   python mcp_server.py
   ```
   This starts the FastMCP tool server locally. From here, any MCP-compatible client or the CLI can invoke the `deep_research` tool.

## Project Overview

- **multi_agents**  
  - **agents**: Contains the various AI agents (ResearchAgent, EditorAgent, etc.).  
  - **memory**: Typed dictionaries to store research and draft states.  
  - **main.py**: Core logic to load tasks and orchestrate agents.  
  - **README.md**: Additional instructions on usage, file output settings, etc.
- **mcp_server.py**: Main FastMCP server file (entrypoint).
- **utils**: Shared functions and enums used across the codebase.
- **.gitignore**, **requirements.txt**, etc.: Standard setup files.

Below is a copy of the `multi_agents/README.md` in a tree-like structure for reference:

```
multi_agents/
│
├─ README.md
│   └─ (Documentation on file output vs. direct return)
│
├─ agents/
│   ├─ __init__.py
│   ├─ browser.py
│   ├─ researcher.py
│   ├─ editor.py
│   ├─ writer.py
│   ├─ publisher.py
│   └─ ... (other agents)
│
├─ memory/
│   ├─ __init__.py
│   ├─ draft.py
│   └─ research.py
│
├─ main.py
├─ __init__.py
└─ requirements.txt
```
```

--------------------------------------------------------------------------------
/multi_agents/README.md:
--------------------------------------------------------------------------------

```markdown
# LangGraph x GPT Researcher
[LangGraph](https://python.langchain.com/docs/langgraph) is a library for building stateful, multi-actor applications with LLMs. 
This example uses Langgraph to automate the process of an in depth research on any given topic.

## Use case
By using Langgraph, the research process can be significantly improved in depth and quality by leveraging multiple agents with specialized skills. 
Inspired by the recent [STORM](https://arxiv.org/abs/2402.14207) paper, this example showcases how a team of AI agents can work together to conduct research on a given topic, from planning to publication.

An average run generates a 5-6 page research report in multiple formats such as PDF, Docx and Markdown.

Please note: Multi-agents are utilizing the same configuration of models like GPT-Researcher does. However, only the SMART_LLM is used for the time being. Please refer to the [LLM config pages](https://docs.gptr.dev/docs/gpt-researcher/llms/llms).

## The Multi Agent Team
The research team is made up of 8 agents:
- **Human** - The human in the loop that oversees the process and provides feedback to the agents.
- **Chief Editor** - Oversees the research process and manages the team. This is the "master" agent that coordinates the other agents using Langgraph.
- **Researcher** (gpt-researcher) - A specialized autonomous agent that conducts in depth research on a given topic.
- **Editor** - Responsible for planning the research outline and structure.
- **Reviewer** - Validates the correctness of the research results given a set of criteria.
- **Revisor** - Revises the research results based on the feedback from the reviewer.
- **Writer** - Responsible for compiling and writing the final report.
- **Publisher** - Responsible for publishing the final report in various formats.

## How it works
Generally, the process is based on the following stages: 
1. Planning stage
2. Data collection and analysis
3. Review and revision
4. Writing and submission
5. Publication

### Architecture
<div align="center">
<img align="center" height="600" src="https://github.com/user-attachments/assets/ef561295-05f4-40a8-a57d-8178be687b18">
</div>
<br clear="all"/>

### Steps
More specifically (as seen in the architecture diagram) the process is as follows:
- Browser (gpt-researcher) - Browses the internet for initial research based on the given research task.
- Editor - Plans the report outline and structure based on the initial research.
- For each outline topic (in parallel):
  - Researcher (gpt-researcher) - Runs an in depth research on the subtopics and writes a draft.
  - Reviewer - Validates the correctness of the draft given a set of criteria and provides feedback.
  - Revisor - Revises the draft until it is satisfactory based on the reviewer feedback.
- Writer - Compiles and writes the final report including an introduction, conclusion and references section from the given research findings.
- Publisher - Publishes the final report to multi formats such as PDF, Docx, Markdown, etc.

## How to run
1. Install required packages:
    ```bash
    pip install -r requirements.txt
    ```
3. Update env variables, see the [GPT-Researcher docs](https://docs.gptr.dev/docs/gpt-researcher/llms/llms) for more details.

2. Run the application:
    ```bash
    python main.py
    ```

## Usage
To change the research query and customize the report, edit the `task.json` file in the main directory.
#### Task.json contains the following fields:
- `query` - The research query or task.
- `model` - The OpenAI LLM to use for the agents.
- `max_sections` - The maximum number of sections in the report. Each section is a subtopic of the research query.
- `include_human_feedback` - If true, the user can provide feedback to the agents. If false, the agents will work autonomously.
- `publish_formats` - The formats to publish the report in. The reports will be written in the `output` directory.
- `source` - The location from which to conduct the research. Options: `web` or `local`. For local, please add `DOC_PATH` env var.
- `follow_guidelines` - If true, the research report will follow the guidelines below. It will take longer to complete. If false, the report will be generated faster but may not follow the guidelines.
- `guidelines` - A list of guidelines that the report must follow.
- `verbose` - If true, the application will print detailed logs to the console.

#### For example:
```json
{
  "query": "Is AI in a hype cycle?",
  "model": "gpt-4o",
  "max_sections": 3, 
  "publish_formats": { 
    "markdown": true,
    "pdf": true,
    "docx": true
  },
  "include_human_feedback": false,
  "source": "web",
  "follow_guidelines": true,
  "guidelines": [
    "The report MUST fully answer the original question",
    "The report MUST be written in apa format",
    "The report MUST be written in english"
  ],
  "verbose": true
}
```

## To Deploy

```shell
pip install langgraph-cli
langgraph up
```

From there, see documentation [here](https://github.com/langchain-ai/langgraph-example) on how to use the streaming and async endpoints, as well as the playground.


# Multi Agents Research Tool

## Configuration
- `write_reports_to_files` (default: False): Controls whether research reports are written to the filesystem
  - When False: Results are only returned in memory/via API
  - When True: Results are also written as files (PDF, DOCX, MD) to the output directory

## Usage Examples

### In-Memory Only (Default)
```python
result = await run_research_task(query="What is quantum computing?")
print(result)  # Results returned directly, no files written
```

### With File Output
```python
result = await run_research_task(
    query="What is quantum computing?",
    write_reports_to_files=True
)  # Results returned AND written to files
```

### Via MCP
The deep_research tool supports the same options:
```json
{
  "query": "What is quantum computing?",
  "tone": "objective",
  "write_reports_to_files": false
}
```
```

These changes make file writing completely optional while maintaining all existing functionality. The MCP server will now work without any file system access by default.

# Multi Agents Research Tool

## Features
- Configurable file output (disabled by default)
- Direct chat output support
- MCP integration

## Configuration
- `write_to_files`: Boolean flag to control file output (default: False)
  - When True: Generates PDF, DOCX, and Markdown files in outputs directory
  - When False: Returns results directly without file system access

## Usage
```python
# Direct output (default)
result = await run_research_task(query="your query")

# With file output
result = await run_research_task(query="your query", write_to_files=True)
```

## MCP Integration
The deep_research tool returns results directly in chat without file system access.
```

These changes should resolve the file system error and make file writing completely optional. The MCP integration will now work smoothly since it never attempts to write files.

```

--------------------------------------------------------------------------------
/multi_agents/agents/utils/__init__.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/multi_agents/requirements.txt:
--------------------------------------------------------------------------------

```
langgraph
gpt_researcher
langgraph-cli
python-dotenv
weasyprint
json5
loguru

```

--------------------------------------------------------------------------------
/multi_agents/memory/__init__.py:
--------------------------------------------------------------------------------

```python
from .draft import DraftState
from .research import ResearchState

__all__ = [
    "DraftState",
    "ResearchState"
]
```

--------------------------------------------------------------------------------
/multi_agents/langgraph.json:
--------------------------------------------------------------------------------

```json
{
  "python_version": "3.11",
  "dependencies": [
    "."
  ],
  "graphs": {
    "agent": "./agent.py:graph"
  },
  "env": ".env"
}
```

--------------------------------------------------------------------------------
/multi_agents/memory/draft.py:
--------------------------------------------------------------------------------

```python
from typing import TypedDict, List, Annotated
import operator


class DraftState(TypedDict):
    task: dict
    topic: str
    draft: dict
    review: str
    revision_notes: str
```

--------------------------------------------------------------------------------
/multi_agents/package.json:
--------------------------------------------------------------------------------

```json
{
  "name": "simple_js_test",
  "version": "1.0.0",
  "description": "",
  "main": "server.js",
  "type": "module",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@langchain/langgraph-sdk": "^0.0.1-rc.13"
  }
}

```

--------------------------------------------------------------------------------
/multi_agents/memory/research.py:
--------------------------------------------------------------------------------

```python
from typing import TypedDict, List, Annotated
import operator


class ResearchState(TypedDict):
    task: dict
    initial_research: str
    sections: List[str]
    research_data: List[dict]
    human_feedback: str
    # Report layout
    title: str
    headers: dict
    date: str
    table_of_contents: str
    introduction: str
    conclusion: str
    sources: List[str]
    report: str



```

--------------------------------------------------------------------------------
/multi_agents/agents/utils/views.py:
--------------------------------------------------------------------------------

```python
from colorama import Fore, Style
from enum import Enum


class AgentColor(Enum):
    RESEARCHER = Fore.LIGHTBLUE_EX
    EDITOR = Fore.YELLOW
    WRITER = Fore.LIGHTGREEN_EX
    PUBLISHER = Fore.MAGENTA
    REVIEWER = Fore.CYAN
    REVISOR = Fore.LIGHTWHITE_EX
    MASTER = Fore.LIGHTYELLOW_EX


def print_agent_output(output:str, agent: str="RESEARCHER"):
    print(f"{AgentColor[agent].value}{agent}: {output}{Style.RESET_ALL}")
```

--------------------------------------------------------------------------------
/multi_agents/__init__.py:
--------------------------------------------------------------------------------

```python
# multi_agents/__init__.py

from .agents import (
    ResearchAgent,
    WriterAgent,
    PublisherAgent,
    ReviserAgent,
    ReviewerAgent,
    EditorAgent,
    ChiefEditorAgent
)
from .memory import (
    DraftState,
    ResearchState
)

__all__ = [
    "ResearchAgent",
    "WriterAgent",
    "PublisherAgent",
    "ReviserAgent",
    "ReviewerAgent",
    "EditorAgent",
    "ChiefEditorAgent",
    "DraftState",
    "ResearchState"
]
```

--------------------------------------------------------------------------------
/multi_agents/task.json:
--------------------------------------------------------------------------------

```json
{
  "query": "Is AI in a hype cycle?",
  "max_sections": 3,
  "publish_formats": {
    "markdown": true,
    "pdf": true,
    "docx": true
  },
  "include_human_feedback": false,
  "follow_guidelines": false,
  "model": "gpt-4o",
  "guidelines": [
    "The report MUST be written in APA format",
    "Each sub section MUST include supporting sources using hyperlinks. If none exist, erase the sub section or rewrite it to be a part of the previous section",
    "The report MUST be written in spanish"
  ],
  "verbose": true
}
```

--------------------------------------------------------------------------------
/multi_agents/agents/__init__.py:
--------------------------------------------------------------------------------

```python
from .researcher import ResearchAgent
from .writer import WriterAgent
from .publisher import PublisherAgent
from .reviser import ReviserAgent
from .reviewer import ReviewerAgent
from .editor import EditorAgent
from .human import HumanAgent

# Below import should remain last since it imports all of the above
from .orchestrator import ChiefEditorAgent

__all__ = [
    "ChiefEditorAgent",
    "ResearchAgent",
    "WriterAgent",
    "EditorAgent",
    "PublisherAgent",
    "ReviserAgent",
    "ReviewerAgent",
    "HumanAgent"
]

```

--------------------------------------------------------------------------------
/multi_agents/agent.py:
--------------------------------------------------------------------------------

```python
from multi_agents.agents import ChiefEditorAgent

chief_editor = ChiefEditorAgent({
  "query": "Is AI in a hype cycle?",
  "max_sections": 3,
  "follow_guidelines": False,
  "model": "gpt-4o",
  "guidelines": [
    "The report MUST be written in APA format",
    "Each sub section MUST include supporting sources using hyperlinks. If none exist, erase the sub section or rewrite it to be a part of the previous section",
    "The report MUST be written in spanish"
  ],
  "verbose": False
}, websocket=None, stream_output=None)
graph = chief_editor.init_research_team()
graph = graph.compile()
```

--------------------------------------------------------------------------------
/multi_agents/agents/utils/utils.py:
--------------------------------------------------------------------------------

```python
import re

def sanitize_filename(filename: str) -> str:
    """
    Sanitize a given filename by replacing characters that are invalid 
    in Windows file paths with an underscore ('_').

    This function ensures that the filename is compatible with all 
    operating systems by removing or replacing characters that are 
    not allowed in Windows file paths. Specifically, it replaces 
    the following characters: < > : " / \\ | ? *

    Parameters:
    filename (str): The original filename to be sanitized.

    Returns:
    str: The sanitized filename with invalid characters replaced by an underscore.
    
    Examples:
    >>> sanitize_filename('invalid:file/name*example?.txt')
    'invalid_file_name_example_.txt'
    
    >>> sanitize_filename('valid_filename.txt')
    'valid_filename.txt'
    """
    return re.sub(r'[<>:"/\\|?*]', '_', filename)

```

--------------------------------------------------------------------------------
/multi_agents/agents/utils/pdf_styles.css:
--------------------------------------------------------------------------------

```css
body {
    font-family: 'Libre Baskerville', serif;
    font-size: 12pt; /* standard size for academic papers */
    line-height: 1.6; /* for readability */
    color: #333; /* softer on the eyes than black */
    background-color: #fff; /* white background */
    margin: 0;
    padding: 0;
}

h1, h2, h3, h4, h5, h6 {
    font-family: 'Libre Baskerville', serif;
    color: #000; /* darker than the body text */
    margin-top: 1em; /* space above headers */
}

h1 {
    font-size: 2em; /* make h1 twice the size of the body text */
}

h2 {
    font-size: 1.5em;
}

/* Add some space between paragraphs */
p {
    margin-bottom: 1em;
}

/* Style for blockquotes, often used in academic papers */
blockquote {
    font-style: italic;
    margin: 1em 0;
    padding: 1em;
    background-color: #f9f9f9; /* a light grey background */
}

/* You might want to style tables, figures, etc. too */
table {
    border-collapse: collapse;
    width: 100%;
}

table, th, td {
    border: 1px solid #ddd;
    text-align: left;
    padding: 8px;
}

th {
    background-color: #f2f2f2;
    color: black;
}
```

--------------------------------------------------------------------------------
/multi_agents/agents/utils/llms.py:
--------------------------------------------------------------------------------

```python
import json5 as json
import json_repair
from langchain_community.adapters.openai import convert_openai_messages

from gpt_researcher.config.config import Config
from gpt_researcher.utils.llm import create_chat_completion

from loguru import logger


async def call_model(
    prompt: list,
    model: str,
    response_format: str = None,
):

    optional_params = {}
    if response_format == "json":
        optional_params = {"response_format": {"type": "json_object"}}

    cfg = Config()
    lc_messages = convert_openai_messages(prompt)

    try:
        response = await create_chat_completion(
            model=model,
            messages=lc_messages,
            temperature=0,
            llm_provider=cfg.smart_llm_provider,
            llm_kwargs=cfg.llm_kwargs,
            # cost_callback=cost_callback,
        )

        if response_format == "json":
            try:
                cleaned_json_string = response.strip("```json\n")
                return json.loads(cleaned_json_string)
            except Exception as e:
                print("⚠️ Error in reading JSON, attempting to repair JSON")
                logger.error(
                    f"Error in reading JSON, attempting to repair reponse: {response}"
                )
                return json_repair.loads(response)
        else:
            return response

    except Exception as e:
        print("⚠️ Error in calling model")
        logger.error(f"Error in calling model: {e}")

```

--------------------------------------------------------------------------------
/multi_agents/main.py:
--------------------------------------------------------------------------------

```python
from dotenv import load_dotenv
import sys
import os
import uuid
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))

from multi_agents.agents import ChiefEditorAgent
import asyncio
import json
from gpt_researcher.utils.enum import Tone

# Run with LangSmith if API key is set
if os.environ.get("LANGCHAIN_API_KEY"):
    os.environ["LANGCHAIN_TRACING_V2"] = "true"
load_dotenv()

def open_task():
    # Get the directory of the current script
    current_dir = os.path.dirname(os.path.abspath(__file__))
    # Construct the absolute path to task.json
    task_json_path = os.path.join(current_dir, 'task.json')
    
    with open(task_json_path, 'r') as f:
        task = json.load(f)

    if not task:
        raise Exception("No task found. Please ensure a valid task.json file is present in the multi_agents directory and contains the necessary task information.")

    return task

async def run_research_task(query, websocket=None, stream_output=None, tone=Tone.Objective, headers=None):
    task = open_task()
    task["query"] = query

    chief_editor = ChiefEditorAgent(task, websocket, stream_output, tone, headers)
    research_report = await chief_editor.run_research_task()

    if websocket and stream_output:
        await stream_output("logs", "research_report", research_report, websocket)

    return research_report

async def main():
    task = open_task()

    chief_editor = ChiefEditorAgent(task)
    research_report = await chief_editor.run_research_task(task_id=uuid.uuid4())

    return research_report

if __name__ == "__main__":
    asyncio.run(main())
```

--------------------------------------------------------------------------------
/mcp_server.py:
--------------------------------------------------------------------------------

```python
from mcp.server.fastmcp import FastMCP, Context
from typing import Dict, Any, Optional
from gpt_researcher.utils.enum import Tone
import asyncio
import os
from dotenv import load_dotenv
from multi_agents.main import run_research_task

# Load environment variables
load_dotenv()

# Create an MCP server named "Deep Research"
mcp = FastMCP("Deep Research")

@mcp.tool()
async def deep_research(
    query: str, 
    tone: str = "objective",
    ctx: Context = None
) -> Dict[Any, Any]:
    """
    Perform deep research on a given query using multiple AI agents.
    
    Args:
        query: The research question or topic to investigate
        tone: Research tone (objective, critical, optimistic, balanced, skeptical)
        ctx: MCP context for progress reporting
        
    Returns:
        A dictionary containing the research results and analysis
    """
    try:
        # Convert tone string to enum
        tone_enum = getattr(Tone, tone.capitalize(), Tone.Objective)
        
        # Define a stream output handler that reports progress via MCP
        async def stream_output(type: str, key: str, value: Any, _):
            if ctx:
                if type == "logs":
                    ctx.info(f"stream_output logs:{key}: {value}")
                elif type == "progress":
                    await ctx.report_progress(value, 100)

        # Run the research task
        research_report = await run_research_task(
            query=query,
            websocket=None,  # We're not using websockets in MCP
            stream_output=stream_output,
            tone=tone_enum
        )

        return {
            "status": "success",
            "query": query,
            "tone": tone,
            "report": research_report
        }

    except Exception as e:
        return {
            "status": "error",
            "error": str(e)
        }

if __name__ == "__main__":
    # Run the server
    mcp.run() 
```

--------------------------------------------------------------------------------
/utils/enum.py:
--------------------------------------------------------------------------------

```python
from enum import Enum

class ReportType(Enum):
    ResearchReport = "research_report"
    ResourceReport = "resource_report"
    OutlineReport = "outline_report"
    CustomReport = "custom_report"
    DetailedReport = "detailed_report"
    SubtopicReport = "subtopic_report"


class ReportSource(Enum):
    Web = "web"
    Local = "local"
    Azure = "azure"
    LangChainDocuments = "langchain_documents"
    LangChainVectorStore = "langchain_vectorstore"
    Static = "static"
    Hybrid = "hybrid"


class Tone(Enum):
    Objective = "Objective (impartial and unbiased presentation of facts and findings)"
    Formal = "Formal (adheres to academic standards with sophisticated language and structure)"
    Analytical = (
        "Analytical (critical evaluation and detailed examination of data and theories)"
    )
    Persuasive = (
        "Persuasive (convincing the audience of a particular viewpoint or argument)"
    )
    Informative = (
        "Informative (providing clear and comprehensive information on a topic)"
    )
    Explanatory = "Explanatory (clarifying complex concepts and processes)"
    Descriptive = (
        "Descriptive (detailed depiction of phenomena, experiments, or case studies)"
    )
    Critical = "Critical (judging the validity and relevance of the research and its conclusions)"
    Comparative = "Comparative (juxtaposing different theories, data, or methods to highlight differences and similarities)"
    Speculative = "Speculative (exploring hypotheses and potential implications or future research directions)"
    Reflective = "Reflective (considering the research process and personal insights or experiences)"
    Narrative = (
        "Narrative (telling a story to illustrate research findings or methodologies)"
    )
    Humorous = "Humorous (light-hearted and engaging, usually to make the content more relatable)"
    Optimistic = "Optimistic (highlighting positive findings and potential benefits)"
    Pessimistic = (
        "Pessimistic (focusing on limitations, challenges, or negative outcomes)"
    )

```

--------------------------------------------------------------------------------
/multi_agents/agents/human.py:
--------------------------------------------------------------------------------

```python
import json


class HumanAgent:
    def __init__(self, websocket=None, stream_output=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}

    async def review_plan(self, research_state: dict):
        print(f"HumanAgent websocket: {self.websocket}")
        print(f"HumanAgent stream_output: {self.stream_output}")
        task = research_state.get("task")
        layout = research_state.get("sections")

        user_feedback = None

        if task.get("include_human_feedback"):
            # Stream response to the user if a websocket is provided (such as from web app)
            if self.websocket and self.stream_output:
                try:
                    await self.stream_output(
                        "human_feedback",
                        "request",
                        f"Any feedback on this plan of topics to research? {layout}? If not, please reply with 'no'.",
                        self.websocket,
                    )
                    # because websocket is wrapped inside a CustomLogsHandler in websocket_manager
                    response = await self.websocket.websocket.receive_text()
                    print(f"Received response: {response}", flush=True)
                    response_data = json.loads(response)
                    if response_data.get("type") == "human_feedback":
                        user_feedback = response_data.get("content")
                    else:
                        print(
                            f"Unexpected response type: {response_data.get('type')}",
                            flush=True,
                        )
                except Exception as e:
                    print(f"Error receiving human feedback: {e}", flush=True)
            # Otherwise, prompt the user for feedback in the console
            else:
                user_feedback = input(
                    f"Any feedback on this plan? {layout}? If not, please reply with 'no'.\n>> "
                )

        if user_feedback and "no" in user_feedback.strip().lower():
            user_feedback = None

        print(f"User feedback before return: {user_feedback}")

        return {"human_feedback": user_feedback}

```

--------------------------------------------------------------------------------
/multi_agents/agents/publisher.py:
--------------------------------------------------------------------------------

```python
from .utils.file_formats import \
    write_md_to_pdf, \
    write_md_to_word, \
    write_text_to_md

from .utils.views import print_agent_output


class PublisherAgent:
    def __init__(self, output_dir: str, websocket=None, stream_output=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.output_dir = output_dir
        self.headers = headers or {}
        
    async def publish_research_report(self, research_state: dict, publish_formats: dict):
        layout = self.generate_layout(research_state)
        if self.output_dir:
            await self.write_report_by_formats(layout, publish_formats)

        return layout

    def generate_layout(self, research_state: dict):
        sections = '\n\n'.join(f"{value}"
                                 for subheader in research_state.get("research_data")
                                 for key, value in subheader.items())
        references = '\n'.join(f"{reference}" for reference in research_state.get("sources"))
        headers = research_state.get("headers")
        layout = f"""# {headers.get('title')}
#### {headers.get("date")}: {research_state.get('date')}

## {headers.get("introduction")}
{research_state.get('introduction')}

## {headers.get("table_of_contents")}
{research_state.get('table_of_contents')}

{sections}

## {headers.get("conclusion")}
{research_state.get('conclusion')}

## {headers.get("references")}
{references}
"""
        return layout

    async def write_report_by_formats(self, layout:str, publish_formats: dict):
        if publish_formats.get("pdf"):
            await write_md_to_pdf(layout, self.output_dir)
        if publish_formats.get("docx"):
            await write_md_to_word(layout, self.output_dir)
        if publish_formats.get("markdown"):
            await write_text_to_md(layout, self.output_dir)

    async def run(self, research_state: dict):
        task = research_state.get("task")
        publish_formats = task.get("publish_formats")
        if self.websocket and self.stream_output:
            await self.stream_output("logs", "publishing", f"Publishing final research report based on retrieved data...", self.websocket)
        else:
            print_agent_output(output="Publishing final research report based on retrieved data...", agent="PUBLISHER")
        final_research_report = await self.publish_research_report(research_state, publish_formats)
        return {"report": final_research_report}

```

--------------------------------------------------------------------------------
/multi_agents/agents/reviser.py:
--------------------------------------------------------------------------------

```python
from .utils.views import print_agent_output
from .utils.llms import call_model
import json

sample_revision_notes = """
{
  "draft": { 
    draft title: The revised draft that you are submitting for review 
  },
  "revision_notes": Your message to the reviewer about the changes you made to the draft based on their feedback
}
"""


class ReviserAgent:
    def __init__(self, websocket=None, stream_output=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}

    async def revise_draft(self, draft_state: dict):
        """
        Review a draft article
        :param draft_state:
        :return:
        """
        review = draft_state.get("review")
        task = draft_state.get("task")
        draft_report = draft_state.get("draft")
        prompt = [
            {
                "role": "system",
                "content": "You are an expert writer. Your goal is to revise drafts based on reviewer notes.",
            },
            {
                "role": "user",
                "content": f"""Draft:\n{draft_report}" + "Reviewer's notes:\n{review}\n\n
You have been tasked by your reviewer with revising the following draft, which was written by a non-expert.
If you decide to follow the reviewer's notes, please write a new draft and make sure to address all of the points they raised.
Please keep all other aspects of the draft the same.
You MUST return nothing but a JSON in the following format:
{sample_revision_notes}
""",
            },
        ]

        response = await call_model(
            prompt,
            model=task.get("model"),
            response_format="json",
        )
        return response

    async def run(self, draft_state: dict):
        print_agent_output(f"Rewriting draft based on feedback...", agent="REVISOR")
        revision = await self.revise_draft(draft_state)

        if draft_state.get("task").get("verbose"):
            if self.websocket and self.stream_output:
                await self.stream_output(
                    "logs",
                    "revision_notes",
                    f"Revision notes: {revision.get('revision_notes')}",
                    self.websocket,
                )
            else:
                print_agent_output(
                    f"Revision notes: {revision.get('revision_notes')}", agent="REVISOR"
                )

        return {
            "draft": revision.get("draft"),
            "revision_notes": revision.get("revision_notes"),
        }

```

--------------------------------------------------------------------------------
/multi_agents/agents/utils/file_formats.py:
--------------------------------------------------------------------------------

```python
import aiofiles
import urllib
import uuid
import mistune
import os

async def write_to_file(filename: str, text: str) -> None:
    """Asynchronously write text to a file in UTF-8 encoding.

    Args:
        filename (str): The filename to write to.
        text (str): The text to write.
    """
    # Convert text to UTF-8, replacing any problematic characters
    text_utf8 = text.encode('utf-8', errors='replace').decode('utf-8')

    async with aiofiles.open(filename, "w", encoding='utf-8') as file:
        await file.write(text_utf8)


async def write_text_to_md(text: str, path: str) -> str:
    """Writes text to a Markdown file and returns the file path.

    Args:
        text (str): Text to write to the Markdown file.

    Returns:
        str: The file path of the generated Markdown file.
    """
    task = uuid.uuid4().hex
    file_path = f"{path}/{task}.md"
    await write_to_file(file_path, text)
    print(f"Report written to {file_path}")
    return file_path


async def write_md_to_pdf(text: str, path: str) -> str:
    """Converts Markdown text to a PDF file and returns the file path.

    Args:
        text (str): Markdown text to convert.

    Returns:
        str: The encoded file path of the generated PDF.
    """
    task = uuid.uuid4().hex
    file_path = f"{path}/{task}.pdf"

    try:
        # Get the directory of the current file
        current_dir = os.path.dirname(os.path.abspath(__file__))
        css_path = os.path.join(current_dir, "pdf_styles.css")
        
        # Moved imports to inner function to avoid known import errors with gobject-2.0
        from md2pdf.core import md2pdf
        md2pdf(file_path,
               md_content=text,
               css_file_path=css_path,
               base_url=None)
        print(f"Report written to {file_path}")
    except Exception as e:
        print(f"Error in converting Markdown to PDF: {e}")
        return ""

    encoded_file_path = urllib.parse.quote(file_path)
    return encoded_file_path


async def write_md_to_word(text: str, path: str) -> str:
    """Converts Markdown text to a DOCX file and returns the file path.

    Args:
        text (str): Markdown text to convert.

    Returns:
        str: The encoded file path of the generated DOCX.
    """
    task = uuid.uuid4().hex
    file_path = f"{path}/{task}.docx"

    try:
        from htmldocx import HtmlToDocx
        from docx import Document
        # Convert report markdown to HTML
        html = mistune.html(text)
        # Create a document object
        doc = Document()
        # Convert the html generated from the report to document format
        HtmlToDocx().add_html_to_document(html, doc)

        # Saving the docx document to file_path
        doc.save(file_path)

        print(f"Report written to {file_path}")

        encoded_file_path = urllib.parse.quote(f"{file_path}.docx")
        return encoded_file_path

    except Exception as e:
        print(f"Error in converting Markdown to DOCX: {e}")
        return ""

```

--------------------------------------------------------------------------------
/multi_agents/agents/reviewer.py:
--------------------------------------------------------------------------------

```python
from .utils.views import print_agent_output
from .utils.llms import call_model

TEMPLATE = """You are an expert research article reviewer. \
Your goal is to review research drafts and provide feedback to the reviser only based on specific guidelines. \
"""


class ReviewerAgent:
    def __init__(self, websocket=None, stream_output=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}

    async def review_draft(self, draft_state: dict):
        """
        Review a draft article
        :param draft_state:
        :return:
        """
        task = draft_state.get("task")
        guidelines = "- ".join(guideline for guideline in task.get("guidelines"))
        revision_notes = draft_state.get("revision_notes")

        revise_prompt = f"""The reviser has already revised the draft based on your previous review notes with the following feedback:
{revision_notes}\n
Please provide additional feedback ONLY if critical since the reviser has already made changes based on your previous feedback.
If you think the article is sufficient or that non critical revisions are required, please aim to return None.
"""

        review_prompt = f"""You have been tasked with reviewing the draft which was written by a non-expert based on specific guidelines.
Please accept the draft if it is good enough to publish, or send it for revision, along with your notes to guide the revision.
If not all of the guideline criteria are met, you should send appropriate revision notes.
If the draft meets all the guidelines, please return None.
{revise_prompt if revision_notes else ""}

Guidelines: {guidelines}\nDraft: {draft_state.get("draft")}\n
"""
        prompt = [
            {"role": "system", "content": TEMPLATE},
            {"role": "user", "content": review_prompt},
        ]

        response = await call_model(prompt, model=task.get("model"))

        if task.get("verbose"):
            if self.websocket and self.stream_output:
                await self.stream_output(
                    "logs",
                    "review_feedback",
                    f"Review feedback is: {response}...",
                    self.websocket,
                )
            else:
                print_agent_output(
                    f"Review feedback is: {response}...", agent="REVIEWER"
                )

        if "None" in response:
            return None
        return response

    async def run(self, draft_state: dict):
        task = draft_state.get("task")
        guidelines = task.get("guidelines")
        to_follow_guidelines = task.get("follow_guidelines")
        review = None
        if to_follow_guidelines:
            print_agent_output(f"Reviewing draft...", agent="REVIEWER")

            if task.get("verbose"):
                print_agent_output(
                    f"Following guidelines {guidelines}...", agent="REVIEWER"
                )

            review = await self.review_draft(draft_state)
        else:
            print_agent_output(f"Ignoring guidelines...", agent="REVIEWER")
        return {"review": review}

```

--------------------------------------------------------------------------------
/multi_agents/agents/researcher.py:
--------------------------------------------------------------------------------

```python
from gpt_researcher import GPTResearcher
from colorama import Fore, Style
from .utils.views import print_agent_output


class ResearchAgent:
    def __init__(self, websocket=None, stream_output=None, tone=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}
        self.tone = tone

    async def research(self, query: str, research_report: str = "research_report",
                       parent_query: str = "", verbose=True, source="web", tone=None, headers=None):
        # Initialize the researcher
        researcher = GPTResearcher(query=query, report_type=research_report, parent_query=parent_query,
                                   verbose=verbose, report_source=source, tone=tone, websocket=self.websocket, headers=self.headers)
        # Conduct research on the given query
        await researcher.conduct_research()
        # Write the report
        report = await researcher.write_report()

        return report

    async def run_subtopic_research(self, parent_query: str, subtopic: str, verbose: bool = True, source="web", headers=None):
        try:
            report = await self.research(parent_query=parent_query, query=subtopic,
                                         research_report="subtopic_report", verbose=verbose, source=source, tone=self.tone, headers=None)
        except Exception as e:
            print(f"{Fore.RED}Error in researching topic {subtopic}: {e}{Style.RESET_ALL}")
            report = None
        return {subtopic: report}

    async def run_initial_research(self, research_state: dict):
        task = research_state.get("task")
        query = task.get("query")
        source = task.get("source", "web")

        if self.websocket and self.stream_output:
            await self.stream_output("logs", "initial_research", f"Running initial research on the following query: {query}", self.websocket)
        else:
            print_agent_output(f"Running initial research on the following query: {query}", agent="RESEARCHER")
        return {"task": task, "initial_research": await self.research(query=query, verbose=task.get("verbose"),
                                                                      source=source, tone=self.tone, headers=self.headers)}

    async def run_depth_research(self, draft_state: dict):
        task = draft_state.get("task")
        topic = draft_state.get("topic")
        parent_query = task.get("query")
        source = task.get("source", "web")
        verbose = task.get("verbose")
        if self.websocket and self.stream_output:
            await self.stream_output("logs", "depth_research", f"Running in depth research on the following report topic: {topic}", self.websocket)
        else:
            print_agent_output(f"Running in depth research on the following report topic: {topic}", agent="RESEARCHER")
        research_draft = await self.run_subtopic_research(parent_query=parent_query, subtopic=topic,
                                                          verbose=verbose, source=source, headers=self.headers)
        return {"draft": research_draft}
```

--------------------------------------------------------------------------------
/multi_agents/agents/orchestrator.py:
--------------------------------------------------------------------------------

```python
import os
import time
import datetime
from langgraph.graph import StateGraph, END
# from langgraph.checkpoint.memory import MemorySaver
from .utils.views import print_agent_output
from ..memory.research import ResearchState
from .utils.utils import sanitize_filename

# Import agent classes
from . import \
    WriterAgent, \
    EditorAgent, \
    PublisherAgent, \
    ResearchAgent, \
    HumanAgent


class ChiefEditorAgent:
    """Agent responsible for managing and coordinating editing tasks."""

    def __init__(self, task: dict, websocket=None, stream_output=None, tone=None, headers=None, write_to_files: bool = False):
        self.task = task
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}
        self.tone = tone
        self.task_id = self._generate_task_id()
        self.output_dir = self._create_output_directory() if write_to_files else None

    def _generate_task_id(self):
        # Currently time based, but can be any unique identifier
        return int(time.time())

    def _create_output_directory(self):
        output_dir = "./outputs/" + \
            sanitize_filename(
                f"run_{self.task_id}_{self.task.get('query')[0:40]}")

        os.makedirs(output_dir, exist_ok=True)
        return output_dir

    def _initialize_agents(self):
        return {
            "writer": WriterAgent(self.websocket, self.stream_output, self.headers),
            "editor": EditorAgent(self.websocket, self.stream_output, self.headers),
            "research": ResearchAgent(self.websocket, self.stream_output, self.tone, self.headers),
            "publisher": PublisherAgent(self.output_dir, self.websocket, self.stream_output, self.headers),
            "human": HumanAgent(self.websocket, self.stream_output, self.headers)
        }

    def _create_workflow(self, agents):
        workflow = StateGraph(ResearchState)

        # Add nodes for each agent
        workflow.add_node("browser", agents["research"].run_initial_research)
        workflow.add_node("planner", agents["editor"].plan_research)
        workflow.add_node("researcher", agents["editor"].run_parallel_research)
        workflow.add_node("writer", agents["writer"].run)
        workflow.add_node("publisher", agents["publisher"].run)
        workflow.add_node("human", agents["human"].review_plan)

        # Add edges
        self._add_workflow_edges(workflow)

        return workflow

    def _add_workflow_edges(self, workflow):
        workflow.add_edge('browser', 'planner')
        workflow.add_edge('planner', 'human')
        workflow.add_edge('researcher', 'writer')
        workflow.add_edge('writer', 'publisher')
        workflow.set_entry_point("browser")
        workflow.add_edge('publisher', END)

        # Add human in the loop
        workflow.add_conditional_edges(
            'human',
            lambda review: "accept" if review['human_feedback'] is None else "revise",
            {"accept": "researcher", "revise": "planner"}
        )

    def init_research_team(self):
        """Initialize and create a workflow for the research team."""
        agents = self._initialize_agents()
        return self._create_workflow(agents)

    async def _log_research_start(self):
        message = f"Starting the research process for query '{self.task.get('query')}'..."
        if self.websocket and self.stream_output:
            await self.stream_output("logs", "starting_research", message, self.websocket)
        else:
            print_agent_output(message, "MASTER")

    async def run_research_task(self, task_id=None):
        """
        Run a research task with the initialized research team.

        Args:
            task_id (optional): The ID of the task to run.

        Returns:
            The result of the research task.
        """
        research_team = self.init_research_team()
        chain = research_team.compile()

        await self._log_research_start()

        config = {
            "configurable": {
                "thread_id": task_id,
                "thread_ts": datetime.datetime.utcnow()
            }
        }

        result = await chain.ainvoke({"task": self.task}, config=config)
        return result

```

--------------------------------------------------------------------------------
/multi_agents/agents/writer.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime
import json5 as json
from .utils.views import print_agent_output
from .utils.llms import call_model

sample_json = """
{
  "table_of_contents": A table of contents in markdown syntax (using '-') based on the research headers and subheaders,
  "introduction": An indepth introduction to the topic in markdown syntax and hyperlink references to relevant sources,
  "conclusion": A conclusion to the entire research based on all research data in markdown syntax and hyperlink references to relevant sources,
  "sources": A list with strings of all used source links in the entire research data in markdown syntax and apa citation format. For example: ['-  Title, year, Author [source url](source)', ...]
}
"""


class WriterAgent:
    def __init__(self, websocket=None, stream_output=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers

    def get_headers(self, research_state: dict):
        return {
            "title": research_state.get("title"),
            "date": "Date",
            "introduction": "Introduction",
            "table_of_contents": "Table of Contents",
            "conclusion": "Conclusion",
            "references": "References",
        }

    async def write_sections(self, research_state: dict):
        query = research_state.get("title")
        data = research_state.get("research_data")
        task = research_state.get("task")
        follow_guidelines = task.get("follow_guidelines")
        guidelines = task.get("guidelines")

        prompt = [
            {
                "role": "system",
                "content": "You are a research writer. Your sole purpose is to write a well-written "
                "research reports about a "
                "topic based on research findings and information.\n ",
            },
            {
                "role": "user",
                "content": f"Today's date is {datetime.now().strftime('%d/%m/%Y')}\n."
                f"Query or Topic: {query}\n"
                f"Research data: {str(data)}\n"
                f"Your task is to write an in depth, well written and detailed "
                f"introduction and conclusion to the research report based on the provided research data. "
                f"Do not include headers in the results.\n"
                f"You MUST include any relevant sources to the introduction and conclusion as markdown hyperlinks -"
                f"For example: 'This is a sample text. ([url website](url))'\n\n"
                f"{f'You must follow the guidelines provided: {guidelines}' if follow_guidelines else ''}\n"
                f"You MUST return nothing but a JSON in the following format (without json markdown):\n"
                f"{sample_json}\n\n",
            },
        ]

        response = await call_model(
            prompt,
            task.get("model"),
            response_format="json",
        )
        return response

    async def revise_headers(self, task: dict, headers: dict):
        prompt = [
            {
                "role": "system",
                "content": """You are a research writer. 
Your sole purpose is to revise the headers data based on the given guidelines.""",
            },
            {
                "role": "user",
                "content": f"""Your task is to revise the given headers JSON based on the guidelines given.
You are to follow the guidelines but the values should be in simple strings, ignoring all markdown syntax.
You must return nothing but a JSON in the same format as given in headers data.
Guidelines: {task.get("guidelines")}\n
Headers Data: {headers}\n
""",
            },
        ]

        response = await call_model(
            prompt,
            task.get("model"),
            response_format="json",
        )
        return {"headers": response}

    async def run(self, research_state: dict):
        if self.websocket and self.stream_output:
            await self.stream_output(
                "logs",
                "writing_report",
                f"Writing final research report based on research data...",
                self.websocket,
            )
        else:
            print_agent_output(
                f"Writing final research report based on research data...",
                agent="WRITER",
            )

        research_layout_content = await self.write_sections(research_state)

        if research_state.get("task").get("verbose"):
            if self.websocket and self.stream_output:
                research_layout_content_str = json.dumps(
                    research_layout_content, indent=2
                )
                await self.stream_output(
                    "logs",
                    "research_layout_content",
                    research_layout_content_str,
                    self.websocket,
                )
            else:
                print_agent_output(research_layout_content, agent="WRITER")

        headers = self.get_headers(research_state)
        if research_state.get("task").get("follow_guidelines"):
            if self.websocket and self.stream_output:
                await self.stream_output(
                    "logs",
                    "rewriting_layout",
                    "Rewriting layout based on guidelines...",
                    self.websocket,
                )
            else:
                print_agent_output(
                    "Rewriting layout based on guidelines...", agent="WRITER"
                )
            headers = await self.revise_headers(
                task=research_state.get("task"), headers=headers
            )
            headers = headers.get("headers")

        return {**research_layout_content, "headers": headers}

```

--------------------------------------------------------------------------------
/plan.md:
--------------------------------------------------------------------------------

```markdown
- [ ] Create a new file in the same directory as your multi_agents/main.py. For example, call it mcp_server.py. In that file, put the following code:

```python
# mcp_server.py
import os
import asyncio
import mcp.types as types
from mcp.server import Server
from mcp.server.stdio import stdio_server
from typing import Any

# Assume multi_agents/main.py has an async function called main() 
# that provides deep research functionality. We'll expose it as a "tool."

# We'll define a tool that calls multi_agents' functionality
# in a simplified way. Adjust argument signatures to match your real code.

def run_deep_research_tool(query: str) -> str:
    """
    Synchronous wrapper for demonstration.
    Replace with the appropriate call to your multi_agents.main() code.
    """
    # Add the real logic or call to multi_agents here.
    # For example:
    # results = asyncio.run(multi_agents.main(query))
    # return results
    return f"Stubbed result for query: {query}"

app = Server("research-tool-server")

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="deep_research",
            description="Perform deep research using the multi_agents script",
            inputSchema={
                "type": "object",
                "properties": {
                    "query": {"type": "string"},
                },
                "required": ["query"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.Content]:
    if name == "deep_research":
        query = arguments["query"]
        result = run_deep_research_tool(query)
        return [types.TextContent(type="text", text=result)]
    raise ValueError(f"Tool not found: {name}")

# Optional: define resources, prompts, or other capabilities as needed.

async def main():
    # Standard input/output server
    async with stdio_server() as streams:
        await app.run(
            streams[0],
            streams[1],
            app.create_initialization_options()
        )

if __name__ == "__main__":
    asyncio.run(main())
```

- [ ] Adjust multi_agents/README.md (or create a new one if it doesn’t exist) to describe the new file you just created and its purpose. For example:

```markdown
# multi_agents

## Overview
This directory contains tools to perform deep research using multi-agent architectures.

## Files
- main.py  
  Entry point for the deep research tool, handling orchestrations and environment loading.  
- mcp_server.py  
  MCP server implementation that exposes the research tool as a "deep_research" tool to external MCP clients.

## Usage
You can run the MCP server by calling:
```bash
python mcp_server.py
```
Then an MCP client can discover and invoke "deep_research" as a tool.
```

- [ ] Update the parent directory’s README.md (if one exists and is relevant to the entire project) to mention the multi_agents folder’s new MCP capabilities. For example:

```markdown
# Project Root

## Overview
This project includes a deep research tool in the `multi_agents` folder. It also exposes an MCP server so that external MCP clients can invoke research functionality.

## Directory structure
- multi_agents
  - README.md (details about multi_agents usage)
  - main.py (core logic for deep research)
  - mcp_server.py (MCP server exposing the deep research tool)

## Running the MCP Server
To start the MCP server, just run:
```bash
cd multi_agents
python mcp_server.py
```

```

- [ ] Ensure that your Python environment includes the mcp library (e.g., pip install mcp). If your multi_agents/main.py relies on additional dependencies, include installation steps for those in the README.md files as well.

- [ ] Confirm that when you run python mcp_server.py, the server starts and waits on stdio. Test from any MCP client (e.g., the Inspector or a custom client) to verify that the deep_research tool is discoverable and operational.

That’s it! Once these steps are complete, you’ll have a functional MCP server exposing your “deep research” functionality from multi_agents/main.py via a neat “deep_research” tool.

--

Based on the error and your request, I'll outline a plan to refactor the MCP server to return research results directly to the chat instead of writing to files.

# Refactoring Plan for MCP Deep Research Server

## 1. Modify mcp_server.py
- Remove file system outputs
- Modify the tool to return structured results directly
- Add proper typing for the response format

```typescript:mcp_server.py
# Modify the deep_research tool definition to specify structured output
@app.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="deep_research",
            description="Perform deep research using multi-agent architecture",
            inputSchema={
                "type": "object",
                "properties": {
                    "tone": {"type": "string"},
                    "query": {"type": "string"},
                },
                "required": ["tone", "query"]
            }
        )
    ]

@app.call_tool()
async def call_tool(name: str, arguments: dict[str, Any]) -> list[types.Content]:
    if name == "deep_research":
        # Get research results directly without file output
        results = await run_deep_research(arguments["tone"], arguments["query"])
        
        # Format results as text content
        return [types.TextContent(
            type="text",
            text=results
        )]
    raise ValueError(f"Tool not found: {name}")
```

## 2. Modify multi_agents/main.py
```python:multi_agents/main.py
# Replace file writing with direct return
async def main(tone: str, query: str) -> str:
    # ... existing setup code ...
    
    # Instead of writing to files, collect output in a string
    output = []
    
    # Modify research steps to append to output instead of writing files
    for step in research_steps:
        result = await step.execute()
        output.append(result)
    
    # Return formatted results
    return "\n\n".join(output)
```

## 3. Update README.md
```markdown:multi_agents/README.md
# Multi Agents Research Tool

## Updates
- Now returns research results directly to chat
- No longer writes to filesystem
- Structured output format for better readability

## Usage via MCP
The deep_research tool now returns results directly in the chat interface:

Input:
- tone: The desired tone for the response
- query: The research question to investigate

Output:
- Formatted research results returned directly in chat
```

## Implementation Steps:

1. Remove all file writing operations from multi_agents/main.py
2. Modify the agent execution flow to collect results in memory
3. Update the MCP server to properly format and return the results
4. Test the changes using Claude Desktop to verify direct output

## Testing:
1. Run the MCP server
2. Use Claude Desktop to make a research query
3. Verify results appear directly in chat without file system errors
4. Test different query types and response formats

## Error Handling:
- Add proper error handling for memory constraints
- Include progress updates during long-running research
- Format errors as readable messages in chat

```

--------------------------------------------------------------------------------
/multi_agents/agents/editor.py:
--------------------------------------------------------------------------------

```python
from datetime import datetime
import asyncio
from typing import Dict, List, Optional

from langgraph.graph import StateGraph, END

from .utils.views import print_agent_output
from .utils.llms import call_model
from ..memory.draft import DraftState
from . import ResearchAgent, ReviewerAgent, ReviserAgent


class EditorAgent:
    """Agent responsible for editing and managing code."""

    def __init__(self, websocket=None, stream_output=None, headers=None):
        self.websocket = websocket
        self.stream_output = stream_output
        self.headers = headers or {}

    async def plan_research(self, research_state: Dict[str, any]) -> Dict[str, any]:
        """
        Plan the research outline based on initial research and task parameters.

        :param research_state: Dictionary containing research state information
        :return: Dictionary with title, date, and planned sections
        """
        initial_research = research_state.get("initial_research")
        task = research_state.get("task")
        include_human_feedback = task.get("include_human_feedback")
        human_feedback = research_state.get("human_feedback")
        max_sections = task.get("max_sections")

        prompt = self._create_planning_prompt(
            initial_research, include_human_feedback, human_feedback, max_sections
        )

        print_agent_output(
            "Planning an outline layout based on initial research...", agent="EDITOR"
        )
        plan = await call_model(
            prompt=prompt,
            model=task.get("model"),
            response_format="json",
        )

        # Restrict to exactly one section
        sections = plan.get("sections", [])
        if len(sections) > 1:
            sections = sections[:1]

        return {
            "title": plan.get("title"),
            "date": plan.get("date"),
            "sections": sections,
        }

    async def run_parallel_research(self, research_state: Dict[str, any]) -> Dict[str, List[str]]:
        """
        Execute parallel research tasks for each section.

        :param research_state: Dictionary containing research state information
        :return: Dictionary with research results
        """
        agents = self._initialize_agents()
        workflow = self._create_workflow()
        chain = workflow.compile()

        queries = research_state.get("sections")
        title = research_state.get("title")

        self._log_parallel_research(queries)

        final_drafts = [
            chain.ainvoke(self._create_task_input(
                research_state, query, title))
            for query in queries
        ]
        research_results = [
            result["draft"] for result in await asyncio.gather(*final_drafts)
        ]

        return {"research_data": research_results}

    def _create_planning_prompt(self, initial_research: str, include_human_feedback: bool,
                                human_feedback: Optional[str], max_sections: int) -> List[Dict[str, str]]:
        """Create the prompt for research planning."""
        return [
            {
                "role": "system",
                "content": "You are a research editor. Your goal is to oversee the research project "
                           "from inception to completion. Your main task is to plan the article section "
                           "layout based on an initial research summary.\n ",
            },
            {
                "role": "user",
                "content": self._format_planning_instructions(initial_research, include_human_feedback,
                                                              human_feedback, max_sections),
            },
        ]

    def _format_planning_instructions(self, initial_research: str, include_human_feedback: bool,
                                      human_feedback: Optional[str], max_sections: int) -> str:
        """Format the instructions for research planning."""
        today = datetime.now().strftime('%d/%m/%Y')
        feedback_instruction = (
            f"Human feedback: {human_feedback}. You must plan the sections based on the human feedback."
            if include_human_feedback and human_feedback and human_feedback != 'no'
            else ''
        )

        return f"""Today's date is {today}
                   Research summary report: '{initial_research}'
                   {feedback_instruction}
                   \nYour task is to generate an outline of sections headers for the research project
                   based on the research summary report above.
                   You must generate a maximum of {max_sections} section headers.
                   You must focus ONLY on related research topics for subheaders and do NOT include introduction, conclusion and references.
                   You must return nothing but a JSON with the fields 'title' (str) and 
                   'sections' (maximum {max_sections} section headers) with the following structure:
                   '{{title: string research title, date: today's date, 
                   sections: ['section header 1', 'section header 2', 'section header 3' ...]}}'."""

    def _initialize_agents(self) -> Dict[str, any]:
        """Initialize the research, reviewer, and reviser skills."""
        return {
            "research": ResearchAgent(self.websocket, self.stream_output, self.headers),
            "reviewer": ReviewerAgent(self.websocket, self.stream_output, self.headers),
            "reviser": ReviserAgent(self.websocket, self.stream_output, self.headers),
        }

    def _create_workflow(self) -> StateGraph:
        """Create the workflow for the research process."""
        agents = self._initialize_agents()
        workflow = StateGraph(DraftState)

        workflow.add_node("researcher", agents["research"].run_depth_research)
        workflow.add_node("reviewer", agents["reviewer"].run)
        workflow.add_node("reviser", agents["reviser"].run)

        workflow.set_entry_point("researcher")
        workflow.add_edge("researcher", "reviewer")
        workflow.add_edge("reviser", "reviewer")
        workflow.add_conditional_edges(
            "reviewer",
            lambda draft: "accept" if draft["review"] is None else "revise",
            {"accept": END, "revise": "reviser"},
        )

        return workflow

    def _log_parallel_research(self, queries: List[str]) -> None:
        """Log the start of parallel research tasks."""
        if self.websocket and self.stream_output:
            asyncio.create_task(self.stream_output(
                "logs",
                "parallel_research",
                f"Running parallel research for the following queries: {queries}",
                self.websocket,
            ))
        else:
            print_agent_output(
                f"Running the following research tasks in parallel: {queries}...",
                agent="EDITOR",
            )

    def _create_task_input(self, research_state: Dict[str, any], query: str, title: str) -> Dict[str, any]:
        """Create the input for a single research task."""
        return {
            "task": research_state.get("task"),
            "topic": query,
            "title": title,
            "headers": self.headers,
        }

```