#
tokens: 47593/50000 12/391 files (page 8/14)
lines: off (toggle) GitHub
raw markdown copy
This is page 8 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── .internal_dspyai
│   │   ├── internals
│   │   │   ├── build-and-release.md
│   │   │   └── release-checklist.md
│   │   └── pyproject.toml
│   ├── .tmp
│   │   └── .generated-actions
│   │       └── run-pypi-publish-in-docker-container
│   │           └── action.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   └── feature_request.yml
│   ├── PULL_REQUEST_TEMPLATE
│   │   └── pull_request_template.md
│   ├── workflow_scripts
│   │   └── install_testpypi_pkg.sh
│   └── workflows
│       ├── build_and_release.yml
│       ├── build_utils
│       │   └── test_version.py
│       ├── docs-push.yml
│       ├── precommits_check.yml
│       └── run_tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── CONTRIBUTING.md
├── docs
│   ├── .gitignore
│   ├── docs
│   │   ├── api
│   │   │   ├── adapters
│   │   │   │   ├── Adapter.md
│   │   │   │   ├── ChatAdapter.md
│   │   │   │   ├── JSONAdapter.md
│   │   │   │   └── TwoStepAdapter.md
│   │   │   ├── evaluation
│   │   │   │   ├── answer_exact_match.md
│   │   │   │   ├── answer_passage_match.md
│   │   │   │   ├── CompleteAndGrounded.md
│   │   │   │   ├── Evaluate.md
│   │   │   │   ├── EvaluationResult.md
│   │   │   │   └── SemanticF1.md
│   │   │   ├── experimental
│   │   │   │   ├── Citations.md
│   │   │   │   └── Document.md
│   │   │   ├── index.md
│   │   │   ├── models
│   │   │   │   ├── Embedder.md
│   │   │   │   └── LM.md
│   │   │   ├── modules
│   │   │   │   ├── BestOfN.md
│   │   │   │   ├── ChainOfThought.md
│   │   │   │   ├── CodeAct.md
│   │   │   │   ├── Module.md
│   │   │   │   ├── MultiChainComparison.md
│   │   │   │   ├── Parallel.md
│   │   │   │   ├── Predict.md
│   │   │   │   ├── ProgramOfThought.md
│   │   │   │   ├── ReAct.md
│   │   │   │   └── Refine.md
│   │   │   ├── optimizers
│   │   │   │   ├── BetterTogether.md
│   │   │   │   ├── BootstrapFewShot.md
│   │   │   │   ├── BootstrapFewShotWithRandomSearch.md
│   │   │   │   ├── BootstrapFinetune.md
│   │   │   │   ├── BootstrapRS.md
│   │   │   │   ├── COPRO.md
│   │   │   │   ├── Ensemble.md
│   │   │   │   ├── GEPA
│   │   │   │   │   ├── GEPA_Advanced.md
│   │   │   │   │   └── overview.md
│   │   │   │   ├── InferRules.md
│   │   │   │   ├── KNN.md
│   │   │   │   ├── KNNFewShot.md
│   │   │   │   ├── LabeledFewShot.md
│   │   │   │   ├── MIPROv2.md
│   │   │   │   └── SIMBA.md
│   │   │   ├── primitives
│   │   │   │   ├── Audio.md
│   │   │   │   ├── Code.md
│   │   │   │   ├── Example.md
│   │   │   │   ├── History.md
│   │   │   │   ├── Image.md
│   │   │   │   ├── Prediction.md
│   │   │   │   ├── Tool.md
│   │   │   │   └── ToolCalls.md
│   │   │   ├── signatures
│   │   │   │   ├── InputField.md
│   │   │   │   ├── OutputField.md
│   │   │   │   └── Signature.md
│   │   │   ├── tools
│   │   │   │   ├── ColBERTv2.md
│   │   │   │   ├── Embeddings.md
│   │   │   │   └── PythonInterpreter.md
│   │   │   └── utils
│   │   │       ├── asyncify.md
│   │   │       ├── configure_cache.md
│   │   │       ├── disable_litellm_logging.md
│   │   │       ├── disable_logging.md
│   │   │       ├── enable_litellm_logging.md
│   │   │       ├── enable_logging.md
│   │   │       ├── inspect_history.md
│   │   │       ├── load.md
│   │   │       ├── StatusMessage.md
│   │   │       ├── StatusMessageProvider.md
│   │   │       ├── streamify.md
│   │   │       └── StreamListener.md
│   │   ├── cheatsheet.md
│   │   ├── community
│   │   │   ├── community-resources.md
│   │   │   ├── how-to-contribute.md
│   │   │   └── use-cases.md
│   │   ├── deep-dive
│   │   │   └── data-handling
│   │   │       ├── built-in-datasets.md
│   │   │       ├── examples.md
│   │   │       ├── img
│   │   │       │   └── data-loading.png
│   │   │       └── loading-custom-data.md
│   │   ├── faqs.md
│   │   ├── index.md
│   │   ├── js
│   │   │   └── runllm-widget.js
│   │   ├── learn
│   │   │   ├── evaluation
│   │   │   │   ├── data.md
│   │   │   │   ├── metrics.md
│   │   │   │   └── overview.md
│   │   │   ├── figures
│   │   │   │   ├── native_tool_call.png
│   │   │   │   └── teleprompter-classes.png
│   │   │   ├── index.md
│   │   │   ├── optimization
│   │   │   │   ├── optimizers.md
│   │   │   │   └── overview.md
│   │   │   └── programming
│   │   │       ├── 7-assertions.md
│   │   │       ├── adapters.md
│   │   │       ├── language_models.md
│   │   │       ├── mcp.md
│   │   │       ├── modules.md
│   │   │       ├── overview.md
│   │   │       ├── signatures.md
│   │   │       └── tools.md
│   │   ├── production
│   │   │   └── index.md
│   │   ├── roadmap.md
│   │   ├── static
│   │   │   ├── .nojekyll
│   │   │   └── img
│   │   │       ├── dspy_logo.png
│   │   │       ├── logo.png
│   │   │       ├── mlflow-tracing-rag.png
│   │   │       ├── modular.png
│   │   │       ├── optimize.png
│   │   │       ├── undraw_docusaurus_mountain.svg
│   │   │       ├── undraw_docusaurus_react.svg
│   │   │       ├── undraw_docusaurus_tree.svg
│   │   │       └── universal_compatibility.png
│   │   ├── stylesheets
│   │   │   └── extra.css
│   │   └── tutorials
│   │       ├── agents
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-agent.png
│   │       ├── ai_text_game
│   │       │   └── index.md
│   │       ├── async
│   │       │   └── index.md
│   │       ├── audio
│   │       │   └── index.ipynb
│   │       ├── build_ai_program
│   │       │   └── index.md
│   │       ├── cache
│   │       │   └── index.md
│   │       ├── classification
│   │       │   └── index.md
│   │       ├── classification_finetuning
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-classification.png
│   │       ├── conversation_history
│   │       │   └── index.md
│   │       ├── core_development
│   │       │   └── index.md
│   │       ├── custom_module
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-custom-module.png
│   │       ├── customer_service_agent
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-customer-service-agent.png
│   │       ├── deployment
│   │       │   ├── dspy_mlflow_ui.png
│   │       │   └── index.md
│   │       ├── email_extraction
│   │       │   ├── index.md
│   │       │   └── mlflow-tracing-email-extraction.png
│   │       ├── entity_extraction
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-entity-extraction.png
│   │       ├── games
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-agent.png
│   │       ├── gepa_ai_program
│   │       │   └── index.md
│   │       ├── gepa_aime
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-aime.png
│   │       │   └── mlflow-tracking-gepa-aime-optimization.png
│   │       ├── gepa_facilitysupportanalyzer
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-support.png
│   │       │   └── mlflow-tracking-gepa-support-optimization.png
│   │       ├── gepa_papillon
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-papilon.png
│   │       │   └── mlflow-tracking-gepa-papilon-optimization.png
│   │       ├── image_generation_prompting
│   │       │   └── index.ipynb
│   │       ├── index.md
│   │       ├── llms_txt_generation
│   │       │   └── index.md
│   │       ├── math
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-math.png
│   │       ├── mcp
│   │       │   └── index.md
│   │       ├── mem0_react_agent
│   │       │   └── index.md
│   │       ├── multihop_search
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-multi-hop.png
│   │       ├── observability
│   │       │   ├── index.md
│   │       │   ├── mlflow_trace_ui_navigation.gif
│   │       │   ├── mlflow_trace_ui.png
│   │       │   └── mlflow_trace_view.png
│   │       ├── optimize_ai_program
│   │       │   └── index.md
│   │       ├── optimizer_tracking
│   │       │   ├── child_run.png
│   │       │   ├── experiment.png
│   │       │   ├── index.md
│   │       │   └── parent_run.png
│   │       ├── output_refinement
│   │       │   └── best-of-n-and-refine.md
│   │       ├── papillon
│   │       │   └── index.md
│   │       ├── program_of_thought
│   │       │   └── index.ipynb
│   │       ├── rag
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-rag.png
│   │       ├── real_world_examples
│   │       │   └── index.md
│   │       ├── rl_ai_program
│   │       │   └── index.md
│   │       ├── rl_multihop
│   │       │   └── index.ipynb
│   │       ├── rl_papillon
│   │       │   └── index.ipynb
│   │       ├── sample_code_generation
│   │       │   └── index.md
│   │       ├── saving
│   │       │   └── index.md
│   │       ├── streaming
│   │       │   └── index.md
│   │       ├── tool_use
│   │       │   └── index.ipynb
│   │       └── yahoo_finance_react
│   │           └── index.md
│   ├── mkdocs.yml
│   ├── overrides
│   │   ├── home.html
│   │   ├── main.html
│   │   └── partials
│   │       └── tabs.html
│   ├── Pipfile
│   ├── Pipfile.lock
│   ├── README.md
│   ├── requirements.txt
│   ├── scripts
│   │   ├── generate_api_docs.py
│   │   └── generate_api_summary.py
│   └── vercel.json
├── dspy
│   ├── __init__.py
│   ├── __metadata__.py
│   ├── adapters
│   │   ├── __init__.py
│   │   ├── baml_adapter.py
│   │   ├── base.py
│   │   ├── chat_adapter.py
│   │   ├── json_adapter.py
│   │   ├── two_step_adapter.py
│   │   ├── types
│   │   │   ├── __init__.py
│   │   │   ├── audio.py
│   │   │   ├── base_type.py
│   │   │   ├── citation.py
│   │   │   ├── code.py
│   │   │   ├── document.py
│   │   │   ├── history.py
│   │   │   ├── image.py
│   │   │   └── tool.py
│   │   ├── utils.py
│   │   └── xml_adapter.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── base_lm.py
│   │   ├── cache.py
│   │   ├── databricks.py
│   │   ├── embedding.py
│   │   ├── lm_local_arbor.py
│   │   ├── lm_local.py
│   │   ├── lm.py
│   │   ├── openai.py
│   │   ├── provider.py
│   │   └── utils_finetune.py
│   ├── datasets
│   │   ├── __init__.py
│   │   ├── alfworld
│   │   │   ├── __init__.py
│   │   │   ├── alfworld.py
│   │   │   └── base_config.yml
│   │   ├── colors.py
│   │   ├── dataloader.py
│   │   ├── dataset.py
│   │   ├── gsm8k.py
│   │   ├── hotpotqa.py
│   │   └── math.py
│   ├── dsp
│   │   ├── __init__.py
│   │   ├── colbertv2.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── dpr.py
│   │       ├── settings.py
│   │       └── utils.py
│   ├── evaluate
│   │   ├── __init__.py
│   │   ├── auto_evaluation.py
│   │   ├── evaluate.py
│   │   └── metrics.py
│   ├── experimental
│   │   └── __init__.py
│   ├── predict
│   │   ├── __init__.py
│   │   ├── aggregation.py
│   │   ├── avatar
│   │   │   ├── __init__.py
│   │   │   ├── avatar.py
│   │   │   ├── models.py
│   │   │   └── signatures.py
│   │   ├── best_of_n.py
│   │   ├── chain_of_thought.py
│   │   ├── code_act.py
│   │   ├── knn.py
│   │   ├── multi_chain_comparison.py
│   │   ├── parallel.py
│   │   ├── parameter.py
│   │   ├── predict.py
│   │   ├── program_of_thought.py
│   │   ├── react.py
│   │   ├── refine.py
│   │   └── retry.py
│   ├── primitives
│   │   ├── __init__.py
│   │   ├── base_module.py
│   │   ├── example.py
│   │   ├── module.py
│   │   ├── prediction.py
│   │   ├── python_interpreter.py
│   │   └── runner.js
│   ├── propose
│   │   ├── __init__.py
│   │   ├── dataset_summary_generator.py
│   │   ├── grounded_proposer.py
│   │   ├── propose_base.py
│   │   └── utils.py
│   ├── retrievers
│   │   ├── __init__.py
│   │   ├── databricks_rm.py
│   │   ├── embeddings.py
│   │   ├── retrieve.py
│   │   └── weaviate_rm.py
│   ├── signatures
│   │   ├── __init__.py
│   │   ├── field.py
│   │   ├── signature.py
│   │   └── utils.py
│   ├── streaming
│   │   ├── __init__.py
│   │   ├── messages.py
│   │   ├── streamify.py
│   │   └── streaming_listener.py
│   ├── teleprompt
│   │   ├── __init__.py
│   │   ├── avatar_optimizer.py
│   │   ├── bettertogether.py
│   │   ├── bootstrap_finetune.py
│   │   ├── bootstrap_trace.py
│   │   ├── bootstrap.py
│   │   ├── copro_optimizer.py
│   │   ├── ensemble.py
│   │   ├── gepa
│   │   │   ├── __init__.py
│   │   │   ├── gepa_utils.py
│   │   │   ├── gepa.py
│   │   │   └── instruction_proposal.py
│   │   ├── grpo.py
│   │   ├── infer_rules.py
│   │   ├── knn_fewshot.py
│   │   ├── mipro_optimizer_v2.py
│   │   ├── random_search.py
│   │   ├── signature_opt.py
│   │   ├── simba_utils.py
│   │   ├── simba.py
│   │   ├── teleprompt_optuna.py
│   │   ├── teleprompt.py
│   │   ├── utils.py
│   │   └── vanilla.py
│   └── utils
│       ├── __init__.py
│       ├── annotation.py
│       ├── asyncify.py
│       ├── caching.py
│       ├── callback.py
│       ├── dummies.py
│       ├── exceptions.py
│       ├── hasher.py
│       ├── inspect_history.py
│       ├── langchain_tool.py
│       ├── logging_utils.py
│       ├── mcp.py
│       ├── parallelizer.py
│       ├── saving.py
│       ├── syncify.py
│       ├── unbatchify.py
│       └── usage_tracker.py
├── LICENSE
├── pyproject.toml
├── README.md
├── tests
│   ├── __init__.py
│   ├── adapters
│   │   ├── test_adapter_utils.py
│   │   ├── test_baml_adapter.py
│   │   ├── test_base_type.py
│   │   ├── test_chat_adapter.py
│   │   ├── test_citation.py
│   │   ├── test_code.py
│   │   ├── test_document.py
│   │   ├── test_json_adapter.py
│   │   ├── test_tool.py
│   │   ├── test_two_step_adapter.py
│   │   └── test_xml_adapter.py
│   ├── callback
│   │   └── test_callback.py
│   ├── clients
│   │   ├── test_cache.py
│   │   ├── test_databricks.py
│   │   ├── test_embedding.py
│   │   ├── test_inspect_global_history.py
│   │   └── test_lm.py
│   ├── conftest.py
│   ├── datasets
│   │   └── test_dataset.py
│   ├── docs
│   │   └── test_mkdocs_links.py
│   ├── evaluate
│   │   ├── test_evaluate.py
│   │   └── test_metrics.py
│   ├── examples
│   │   └── test_baleen.py
│   ├── metadata
│   │   └── test_metadata.py
│   ├── predict
│   │   ├── test_aggregation.py
│   │   ├── test_best_of_n.py
│   │   ├── test_chain_of_thought.py
│   │   ├── test_code_act.py
│   │   ├── test_knn.py
│   │   ├── test_multi_chain_comparison.py
│   │   ├── test_parallel.py
│   │   ├── test_predict.py
│   │   ├── test_program_of_thought.py
│   │   ├── test_react.py
│   │   ├── test_refine.py
│   │   └── test_retry.py
│   ├── primitives
│   │   ├── resources
│   │   │   └── saved_program.json
│   │   ├── test_base_module.py
│   │   ├── test_example.py
│   │   ├── test_module.py
│   │   └── test_python_interpreter.py
│   ├── propose
│   │   └── test_grounded_proposer.py
│   ├── README.md
│   ├── reliability
│   │   ├── __init__.py
│   │   ├── complex_types
│   │   │   └── generated
│   │   │       ├── test_many_types_1
│   │   │       │   ├── inputs
│   │   │       │   │   ├── input1.json
│   │   │       │   │   └── input2.json
│   │   │       │   ├── program.py
│   │   │       │   └── schema.json
│   │   │       ├── test_nesting_1
│   │   │       │   ├── inputs
│   │   │       │   │   ├── input1.json
│   │   │       │   │   └── input2.json
│   │   │       │   ├── program.py
│   │   │       │   └── schema.json
│   │   │       └── test_nesting_2
│   │   │           ├── inputs
│   │   │           │   └── input1.json
│   │   │           ├── program.py
│   │   │           └── schema.json
│   │   ├── conftest.py
│   │   ├── generate
│   │   │   ├── __init__.py
│   │   │   ├── __main__.py
│   │   │   └── utils.py
│   │   ├── input_formats
│   │   │   └── generated
│   │   │       └── test_markdown_1
│   │   │           ├── inputs
│   │   │           │   ├── input1.json
│   │   │           │   └── input2.json
│   │   │           ├── program.py
│   │   │           └── schema.json
│   │   ├── README.md
│   │   ├── reliability_conf.yaml
│   │   ├── test_generated.py
│   │   ├── test_pydantic_models.py
│   │   └── utils.py
│   ├── retrievers
│   │   └── test_embeddings.py
│   ├── signatures
│   │   ├── test_adapter_image.py
│   │   ├── test_custom_types.py
│   │   └── test_signature.py
│   ├── streaming
│   │   └── test_streaming.py
│   ├── teleprompt
│   │   ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json
│   │   ├── gepa_dummy_lm.json
│   │   ├── test_bootstrap_finetune.py
│   │   ├── test_bootstrap_trace.py
│   │   ├── test_bootstrap.py
│   │   ├── test_copro_optimizer.py
│   │   ├── test_ensemble.py
│   │   ├── test_finetune.py
│   │   ├── test_gepa_instruction_proposer.py
│   │   ├── test_gepa.py
│   │   ├── test_grpo.py
│   │   ├── test_knn_fewshot.py
│   │   ├── test_random_search.py
│   │   ├── test_teleprompt.py
│   │   └── test_utils.py
│   ├── test_utils
│   │   ├── __init__.py
│   │   └── server
│   │       ├── __init__.py
│   │       ├── litellm_server_config.yaml
│   │       └── litellm_server.py
│   └── utils
│       ├── __init__.py
│       ├── resources
│       │   └── mcp_server.py
│       ├── test_annotation.py
│       ├── test_asyncify.py
│       ├── test_exceptions.py
│       ├── test_langchain_tool.py
│       ├── test_mcp.py
│       ├── test_parallelizer.py
│       ├── test_saving.py
│       ├── test_settings.py
│       ├── test_syncify.py
│       ├── test_unbatchify.py
│       └── test_usage_tracker.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/dspy/clients/databricks.py:
--------------------------------------------------------------------------------

```python
import logging
import os
import re
import time
from typing import TYPE_CHECKING, Any

import orjson
import requests

from dspy.clients.provider import Provider, TrainingJob
from dspy.clients.utils_finetune import TrainDataFormat, get_finetune_directory

if TYPE_CHECKING:
    from databricks.sdk import WorkspaceClient

logger = logging.getLogger(__name__)


class TrainingJobDatabricks(TrainingJob):
    def __init__(self, finetuning_run=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.finetuning_run = finetuning_run
        self.launch_started = False
        self.launch_completed = False
        self.endpoint_name = None

    def status(self):
        if not self.finetuning_run:
            return None
        try:
            from databricks.model_training import foundation_model as fm
        except ImportError:
            raise ImportError(
                "To use Databricks finetuning, please install the databricks_genai package via "
                "`pip install databricks_genai`."
            )
        run = fm.get(self.finetuning_run)
        return run.status


class DatabricksProvider(Provider):
    finetunable = True
    TrainingJob = TrainingJobDatabricks

    @staticmethod
    def is_provider_model(model: str) -> bool:
        # We don't automatically infer Databricks models because Databricks is not a proprietary model provider.
        return False

    @staticmethod
    def deploy_finetuned_model(
        model: str,
        data_format: TrainDataFormat | None = None,
        databricks_host: str | None = None,
        databricks_token: str | None = None,
        deploy_timeout: int = 900,
    ):
        workspace_client = _get_workspace_client()
        model_version = next(workspace_client.model_versions.list(model)).version

        # Allow users to override the host and token. This is useful on Databricks hosted runtime.
        databricks_host = databricks_host or workspace_client.config.host
        databricks_token = databricks_token or workspace_client.config.token

        headers = {"Context-Type": "text/json", "Authorization": f"Bearer {databricks_token}"}

        optimizable_info = requests.get(
            url=f"{databricks_host}/api/2.0/serving-endpoints/get-model-optimization-info/{model}/{model_version}",
            headers=headers,
        ).json()

        if "optimizable" not in optimizable_info or not optimizable_info["optimizable"]:
            raise ValueError(f"Model is not eligible for provisioned throughput: {optimizable_info}")

        chunk_size = optimizable_info["throughput_chunk_size"]

        # Minimum desired provisioned throughput
        min_provisioned_throughput = 0

        # Maximum desired provisioned throughput
        max_provisioned_throughput = chunk_size

        # Databricks serving endpoint names cannot contain ".".
        model_name = model.replace(".", "_")

        get_endpoint_response = requests.get(
            url=f"{databricks_host}/api/2.0/serving-endpoints/{model_name}", json={"name": model_name}, headers=headers
        )

        if get_endpoint_response.status_code == 200:
            logger.info(f"Serving endpoint {model_name} already exists, updating it instead of creating a new one.")
            # The serving endpoint already exists, we will update it instead of creating a new one.
            data = {
                "served_entities": [
                    {
                        "name": model_name,
                        "entity_name": model,
                        "entity_version": model_version,
                        "min_provisioned_throughput": min_provisioned_throughput,
                        "max_provisioned_throughput": max_provisioned_throughput,
                    }
                ]
            }

            response = requests.put(
                url=f"{databricks_host}/api/2.0/serving-endpoints/{model_name}/config",
                json=data,
                headers=headers,
            )
        else:
            logger.info(f"Creating serving endpoint {model_name} on Databricks model serving!")
            # Send the POST request to create the serving endpoint.
            data = {
                "name": model_name,
                "config": {
                    "served_entities": [
                        {
                            "name": model_name,
                            "entity_name": model,
                            "entity_version": model_version,
                            "min_provisioned_throughput": min_provisioned_throughput,
                            "max_provisioned_throughput": max_provisioned_throughput,
                        }
                    ]
                },
            }

            response = requests.post(url=f"{databricks_host}/api/2.0/serving-endpoints", json=data, headers=headers)

        if response.status_code == 200:
            logger.info(
                f"Successfully started creating/updating serving endpoint {model_name} on Databricks model serving!"
            )
        else:
            raise ValueError(f"Failed to create serving endpoint: {response.json()}.")

        logger.info(
            f"Waiting for serving endpoint {model_name} to be ready, this might take a few minutes... You can check "
            f"the status of the endpoint at {databricks_host}/ml/endpoints/{model_name}"
        )
        from openai import OpenAI

        client = OpenAI(
            api_key=databricks_token,
            base_url=f"{databricks_host}/serving-endpoints",
        )
        # Wait for the deployment to be ready.
        num_retries = deploy_timeout // 60
        for _ in range(num_retries):
            try:
                if data_format == TrainDataFormat.CHAT:
                    client.chat.completions.create(
                        messages=[{"role": "user", "content": "hi"}], model=model_name, max_tokens=1
                    )
                elif data_format == TrainDataFormat.COMPLETION:
                    client.completions.create(prompt="hi", model=model_name, max_tokens=1)
                logger.info(f"Databricks model serving endpoint {model_name} is ready!")
                return
            except Exception:
                time.sleep(60)

        raise ValueError(
            f"Failed to create serving endpoint {model_name} on Databricks model serving platform within "
            f"{deploy_timeout} seconds."
        )

    @staticmethod
    def finetune(
        job: TrainingJobDatabricks,
        model: str,
        train_data: list[dict[str, Any]],
        train_data_format: TrainDataFormat | str | None = "chat",
        train_kwargs: dict[str, Any] | None = None,
    ) -> str:
        if isinstance(train_data_format, str):
            if train_data_format == "chat":
                train_data_format = TrainDataFormat.CHAT
            elif train_data_format == "completion":
                train_data_format = TrainDataFormat.COMPLETION
            else:
                raise ValueError(
                    f"String `train_data_format` must be one of 'chat' or 'completion', but received: {train_data_format}."
                )

        if "train_data_path" not in train_kwargs:
            raise ValueError("The `train_data_path` must be provided to finetune on Databricks.")
        # Add the file name to the directory path.
        train_kwargs["train_data_path"] = DatabricksProvider.upload_data(
            train_data, train_kwargs["train_data_path"], train_data_format
        )

        try:
            from databricks.model_training import foundation_model as fm
        except ImportError:
            raise ImportError(
                "To use Databricks finetuning, please install the databricks_genai package via "
                "`pip install databricks_genai`."
            )

        if "register_to" not in train_kwargs:
            raise ValueError("The `register_to` must be provided to finetune on Databricks.")

        # Allow users to override the host and token. This is useful on Databricks hosted runtime.
        databricks_host = train_kwargs.pop("databricks_host", None)
        databricks_token = train_kwargs.pop("databricks_token", None)

        skip_deploy = train_kwargs.pop("skip_deploy", False)
        deploy_timeout = train_kwargs.pop("deploy_timeout", 900)

        logger.info("Starting finetuning on Databricks... this might take a few minutes to finish.")
        finetuning_run = fm.create(
            model=model,
            **train_kwargs,
        )

        job.run = finetuning_run

        # Wait for the finetuning run to be ready.
        while True:
            job.run = fm.get(job.run)
            if job.run.status.display_name == "Completed":
                logger.info("Finetuning run completed successfully!")
                break
            elif job.run.status.display_name == "Failed":
                raise ValueError(
                    f"Finetuning run failed with status: {job.run.status.display_name}. Please check the Databricks "
                    f"workspace for more details. Finetuning job's metadata: {job.run}."
                )
            else:
                time.sleep(60)

        if skip_deploy:
            return None

        job.launch_started = True
        model_to_deploy = train_kwargs.get("register_to")
        job.endpoint_name = model_to_deploy.replace(".", "_")
        DatabricksProvider.deploy_finetuned_model(
            model_to_deploy, train_data_format, databricks_host, databricks_token, deploy_timeout
        )
        job.launch_completed = True
        # The finetuned model name should be in the format: "databricks/<endpoint_name>".
        return f"databricks/{job.endpoint_name}"

    @staticmethod
    def upload_data(train_data: list[dict[str, Any]], databricks_unity_catalog_path: str, data_format: TrainDataFormat):
        logger.info("Uploading finetuning data to Databricks Unity Catalog...")
        file_path = _save_data_to_local_file(train_data, data_format)

        w = _get_workspace_client()
        _create_directory_in_databricks_unity_catalog(w, databricks_unity_catalog_path)

        try:
            with open(file_path, "rb") as f:
                target_path = os.path.join(databricks_unity_catalog_path, os.path.basename(file_path))
                w.files.upload(target_path, f, overwrite=True)
            logger.info("Successfully uploaded finetuning data to Databricks Unity Catalog!")
            return target_path
        except Exception as e:
            raise ValueError(f"Failed to upload finetuning data to Databricks Unity Catalog: {e}")


def _get_workspace_client() -> "WorkspaceClient":
    try:
        from databricks.sdk import WorkspaceClient
    except ImportError:
        raise ImportError(
            "To use Databricks finetuning, please install the databricks-sdk package via `pip install databricks-sdk`."
        )
    return WorkspaceClient()


def _create_directory_in_databricks_unity_catalog(w: "WorkspaceClient", databricks_unity_catalog_path: str):
    pattern = r"^/Volumes/(?P<catalog>[^/]+)/(?P<schema>[^/]+)/(?P<volume>[^/]+)(/[^/]+)+$"
    match = re.match(pattern, databricks_unity_catalog_path)
    if not match:
        raise ValueError(
            f"Databricks Unity Catalog path must be in the format '/Volumes/<catalog>/<schema>/<volume>/...', but "
            f"received: {databricks_unity_catalog_path}."
        )

    catalog = match.group("catalog")
    schema = match.group("schema")
    volume = match.group("volume")

    try:
        volume_path = f"{catalog}.{schema}.{volume}"
        w.volumes.read(volume_path)
    except Exception:
        raise ValueError(
            f"Databricks Unity Catalog volume does not exist: {volume_path}, please create it on the Databricks "
            "workspace."
        )

    try:
        w.files.get_directory_metadata(databricks_unity_catalog_path)
        logger.info(f"Directory {databricks_unity_catalog_path} already exists, skip creating it.")
    except Exception:
        # Create the directory if it doesn't exist, we don't raise an error because this is a common case.
        logger.info(f"Creating directory {databricks_unity_catalog_path} in Databricks Unity Catalog...")
        w.files.create_directory(databricks_unity_catalog_path)
        logger.info(f"Successfully created directory {databricks_unity_catalog_path} in Databricks Unity Catalog!")


def _save_data_to_local_file(train_data: list[dict[str, Any]], data_format: TrainDataFormat):
    import uuid

    file_name = f"finetuning_{uuid.uuid4()}.jsonl"

    finetune_dir = get_finetune_directory()
    file_path = os.path.join(finetune_dir, file_name)
    file_path = os.path.abspath(file_path)
    with open(file_path, "wb") as f:
        for item in train_data:
            if data_format == TrainDataFormat.CHAT:
                _validate_chat_data(item)
            elif data_format == TrainDataFormat.COMPLETION:
                _validate_completion_data(item)

            f.write(orjson.dumps(item) + b"\n")
    return file_path


def _validate_chat_data(data: dict[str, Any]):
    if "messages" not in data:
        raise ValueError(
            "Each finetuning data must be a dict with a 'messages' key when `task=CHAT_COMPLETION`, but "
            f"received: {data}"
        )

    if not isinstance(data["messages"], list):
        raise ValueError(
            "The value of the 'messages' key in each finetuning data must be a list of dicts with keys 'role' and "
            f"'content' when `task=CHAT_COMPLETION`, but received: {data['messages']}"
        )

    for message in data["messages"]:
        if "role" not in message:
            raise ValueError(f"Each message in the 'messages' list must contain a 'role' key, but received: {message}.")
        if "content" not in message:
            raise ValueError(
                f"Each message in the 'messages' list must contain a 'content' key, but received: {message}."
            )


def _validate_completion_data(data: dict[str, Any]):
    if "prompt" not in data:
        raise ValueError(
            "Each finetuning data must be a dict with a 'prompt' key when `task=INSTRUCTION_FINETUNE`, but "
            f"received: {data}"
        )
    if "response" not in data and "completion" not in data:
        raise ValueError(
            "Each finetuning data must be a dict with a 'response' or 'completion' key when "
            f"`task=INSTRUCTION_FINETUNE`, but received: {data}"
        )

```

--------------------------------------------------------------------------------
/docs/mkdocs.yml:
--------------------------------------------------------------------------------

```yaml
site_name: DSPy
site_description: The framework for programming—rather than prompting—language models.
site_url: https://dspy.ai/

repo_url: https://github.com/stanfordnlp/dspy
repo_name: stanfordnlp/dspy

edit_uri: blob/main/docs/docs/
docs_dir: "docs/"

nav:
    - Get Started: index.md
    - Learn DSPy:
        - Learning DSPy: learn/index.md
        - DSPy Programming:
            - Programming Overview: learn/programming/overview.md
            - Language Models: learn/programming/language_models.md
            - Signatures: learn/programming/signatures.md
            - Modules: learn/programming/modules.md
            - Adapters: learn/programming/adapters.md
            - Tools: learn/programming/tools.md
            - MCP: learn/programming/mcp.md
        - DSPy Evaluation:
            - Evaluation Overview: learn/evaluation/overview.md
            - Data Handling: learn/evaluation/data.md
            - Metrics: learn/evaluation/metrics.md
        - DSPy Optimization:
            - Optimization Overview: learn/optimization/overview.md
            - Optimizers: learn/optimization/optimizers.md
    - Tutorials:
        - Tutorials Overview: tutorials/index.md
        - Build AI Programs with DSPy:
            - Overview: tutorials/build_ai_program/index.md
            - Managing Conversation History: tutorials/conversation_history/index.md
            - Building AI Agents with DSPy: tutorials/customer_service_agent/index.ipynb
            - Building AI Applications by Customizing DSPy Modules: tutorials/custom_module/index.ipynb
            - Retrieval-Augmented Generation (RAG): tutorials/rag/index.ipynb
            - Building RAG as Agent: tutorials/agents/index.ipynb
            - Entity Extraction: tutorials/entity_extraction/index.ipynb
            - Classification: tutorials/classification/index.md
            - Multi-Hop RAG: tutorials/multihop_search/index.ipynb
            - Privacy-Conscious Delegation: tutorials/papillon/index.md
            - Program Of Thought: tutorials/program_of_thought/index.ipynb
            - Image Generation Prompt iteration: tutorials/image_generation_prompting/index.ipynb
            - Audio: tutorials/audio/index.ipynb
        - Optimize AI Programs with DSPy:
            - Overview: tutorials/optimize_ai_program/index.md
            - Math Reasoning: tutorials/math/index.ipynb
            - Classification Finetuning: tutorials/classification_finetuning/index.ipynb
            - Advanced Tool Use: tutorials/tool_use/index.ipynb
            - Finetuning Agents: tutorials/games/index.ipynb
        - Reflective Prompt Evolution with dspy.GEPA:
            - Overview: tutorials/gepa_ai_program/index.md
            - GEPA for AIME (Math): tutorials/gepa_aime/index.ipynb
            - GEPA for Structured Information Extraction for Enterprise Tasks: tutorials/gepa_facilitysupportanalyzer/index.ipynb
            - GEPA for Privacy-Conscious Delegation: tutorials/gepa_papillon/index.ipynb
        - Experimental RL Optimization for DSPy:
            - Overview: tutorials/rl_ai_program/index.md
            - RL for Privacy-Conscious Delegation: tutorials/rl_papillon/index.ipynb
            - RL for Multi-Hop Research: tutorials/rl_multihop/index.ipynb
        - Tools, Development, and Deployment:
            - Overview: tutorials/core_development/index.md
            - Use MCP in DSPy: tutorials/mcp/index.md
            - Output Refinement: tutorials/output_refinement/best-of-n-and-refine.md
            - Saving and Loading: tutorials/saving/index.md
            - Cache: tutorials/cache/index.md
            - Deployment: tutorials/deployment/index.md
            - Debugging & Observability: tutorials/observability/index.md
            - Tracking DSPy Optimizers: tutorials/optimizer_tracking/index.md
            - Streaming: tutorials/streaming/index.md
            - Async: tutorials/async/index.md
        - Real-World Examples:
            - Overview: tutorials/real_world_examples/index.md
            - Generating llms.txt: tutorials/llms_txt_generation/index.md
            - Memory-Enabled ReAct Agents: tutorials/mem0_react_agent/index.md
            - Financial Analysis with Yahoo Finance: tutorials/yahoo_finance_react/index.md
            - Email Information Extraction: tutorials/email_extraction/index.md
            - Code Generation for Unfamiliar Libraries: tutorials/sample_code_generation/index.md
            - Building a Creative Text-Based AI Game: tutorials/ai_text_game/index.md
    - DSPy in Production: production/index.md
    - Community:
        - Community Resources: community/community-resources.md
        - Use Cases: community/use-cases.md
        - Contributing: community/how-to-contribute.md
    - FAQ:
        - FAQ: faqs.md
        - Cheatsheet: cheatsheet.md

    - API Reference:
        - API Reference: api/index.md
        - Adapters:
            - Adapter: api/adapters/Adapter.md
            - ChatAdapter: api/adapters/ChatAdapter.md
            - JSONAdapter: api/adapters/JSONAdapter.md
            - TwoStepAdapter: api/adapters/TwoStepAdapter.md
        - Evaluation:
            - CompleteAndGrounded: api/evaluation/CompleteAndGrounded.md
            - Evaluate: api/evaluation/Evaluate.md
            - EvaluationResult: api/evaluation/EvaluationResult.md
            - SemanticF1: api/evaluation/SemanticF1.md
            - answer_exact_match: api/evaluation/answer_exact_match.md
            - answer_passage_match: api/evaluation/answer_passage_match.md
        - Experimental:
            - Citations: api/experimental/Citations.md
            - Document: api/experimental/Document.md
        - Models:
            - Embedder: api/models/Embedder.md
            - LM: api/models/LM.md
        - Modules:
            - BestOfN: api/modules/BestOfN.md
            - ChainOfThought: api/modules/ChainOfThought.md
            - CodeAct: api/modules/CodeAct.md
            - Module: api/modules/Module.md
            - MultiChainComparison: api/modules/MultiChainComparison.md
            - Parallel: api/modules/Parallel.md
            - Predict: api/modules/Predict.md
            - ProgramOfThought: api/modules/ProgramOfThought.md
            - ReAct: api/modules/ReAct.md
            - Refine: api/modules/Refine.md
        - Optimizers:
            - GEPA:
                - 1. GEPA Overview: api/optimizers/GEPA/overview.md
                - 2. GEPA Advanced: api/optimizers/GEPA/GEPA_Advanced.md
            - BetterTogether: api/optimizers/BetterTogether.md
            - BootstrapFewShot: api/optimizers/BootstrapFewShot.md
            - BootstrapFewShotWithRandomSearch: api/optimizers/BootstrapFewShotWithRandomSearch.md
            - BootstrapFinetune: api/optimizers/BootstrapFinetune.md
            - BootstrapRS: api/optimizers/BootstrapRS.md
            - COPRO: api/optimizers/COPRO.md
            - Ensemble: api/optimizers/Ensemble.md
            - InferRules: api/optimizers/InferRules.md
            - KNN: api/optimizers/KNN.md
            - KNNFewShot: api/optimizers/KNNFewShot.md
            - LabeledFewShot: api/optimizers/LabeledFewShot.md
            - MIPROv2: api/optimizers/MIPROv2.md
            - SIMBA: api/optimizers/SIMBA.md
        - Primitives:
            - Audio: api/primitives/Audio.md
            - Code: api/primitives/Code.md
            - Example: api/primitives/Example.md
            - History: api/primitives/History.md
            - Image: api/primitives/Image.md
            - Prediction: api/primitives/Prediction.md
            - Tool: api/primitives/Tool.md
            - ToolCalls: api/primitives/ToolCalls.md
        - Signatures:
            - InputField: api/signatures/InputField.md
            - OutputField: api/signatures/OutputField.md
            - Signature: api/signatures/Signature.md
        - Tools:
            - ColBERTv2: api/tools/ColBERTv2.md
            - Embeddings: api/tools/Embeddings.md
            - PythonInterpreter: api/tools/PythonInterpreter.md
        - Utils:
            - StatusMessage: api/utils/StatusMessage.md
            - StatusMessageProvider: api/utils/StatusMessageProvider.md
            - StreamListener: api/utils/StreamListener.md
            - asyncify: api/utils/asyncify.md
            - configure_cache: api/utils/configure_cache.md
            - disable_litellm_logging: api/utils/disable_litellm_logging.md
            - disable_logging: api/utils/disable_logging.md
            - enable_litellm_logging: api/utils/enable_litellm_logging.md
            - enable_logging: api/utils/enable_logging.md
            - inspect_history: api/utils/inspect_history.md
            - load: api/utils/load.md
            - streamify: api/utils/streamify.md

theme:
    name: material
    custom_dir: overrides
    features:
        - navigation.tabs
        - navigation.path
        - navigation.indexes
        - navigation.expand
        - toc.follow
        - toc.integrate
        - navigation.top
        - search.suggest
        - search.highlight
        - content.tabs.link
        - content.code.annotation
        - content.code.copy
        - navigation.footer
        - content.action.edit
    language: en
    palette:
        - scheme: default
          toggle:
            icon: material/weather-night
            name: Switch to dark mode
          primary: white
          accent: black
        - scheme: slate
          toggle:
            icon: material/weather-sunny
            name: Switch to light mode
          primary: black
          accent: lime
    icon:
        repo: fontawesome/brands/git-alt
        edit: material/pencil
        view: material/eye
    logo: static/img/dspy_logo.png
    favicon: static/img/logo.png

extra_css:
    - stylesheets/extra.css

plugins:
    - social
    - search:
        lang: en
        separator: '[\s\-\.]+'
    - mkdocstrings:
        handlers:
            python:
                options:
                    docstring_style: google
                    show_source: true
                    show_root_heading: true
                    heading_level: 3
                    members_order: source
                    separate_signature: false
                    show_category_heading: true
                    show_symbol_type_heading: true
                    show_docstring_parameters: true
                    show_if_no_docstring: true
                    show_signature_annotations: true
                    unwrap_annotated: true
                    annotations_path: brief
                    docstring_section_style: table
                    merge_init_into_class: true
                    rendering:
                        show_if_no_docstring: true
                        show_warnings: false
                        html_meta: false
    - mkdocs-jupyter:
        ignore_h1_titles: true
    - redirects:
        redirect_maps:
            # Redirect /intro/ to the main page
            "intro/index.md": "index.md"
            "intro.md": "index.md"
            
            "deep-dive/optimizers/bootstrap-fewshot.md": "api/optimizers/BootstrapFewShot.md"
            "deep-dive/optimizers/bfrs.md": "api/optimizers/BootstrapFewShotWithRandomSearch.md"
            "deep-dive/optimizers/BootstrapFinetune.md": "api/optimizers/BootstrapFinetune.md"
            "deep-dive/optimizers/copro.md": "api/optimizers/COPRO.md"
            "deep-dive/optimizers/Ensemble.md": "api/optimizers/Ensemble.md"
            "deep-dive/optimizers/LabeledFewShot.md": "api/optimizers/LabeledFewShot.md"
            "deep-dive/optimizers/miprov2.md": "api/optimizers/MIPROv2.md"
            "api/optimizers/GEPA/index.md": "api/optimizers/GEPA/overview.md"

            "docs/quick-start/getting-started-01.md": "tutorials/rag/index.ipynb"
            "docs/quick-start/getting-started-02.md": "tutorials/rag/index.ipynb"
            "quick-start/getting-started-01.md": "tutorials/rag/index.ipynb"
            "quick-start/getting-started-02.md": "tutorials/rag/index.ipynb"
    - llmstxt:
        markdown_description: >
          DSPy is the framework for programming—rather than prompting—language models.
          DSPy unifies techniques for prompting, fine-tuning, reasoning, tool use, and evaluation of LMs.
          It provides a systematic approach to building AI applications through composable modules, 
          optimization techniques, and evaluation frameworks.
        sections:
          Getting Started:
            - index.md: DSPy overview and quick start guide
            - cheatsheet.md: DSPy cheatsheet for quick reference
          Core Concepts:
            - learn/programming/overview.md: Programming paradigm and philosophy
            - learn/programming/signatures.md: Signatures - declarative input/output specifications
            - learn/programming/modules.md: Modules - composable AI components
            - learn/programming/language_models.md: Language model interfaces and configuration
          Essential Tutorials:
            - tutorials/rag/index.ipynb: Retrieval-Augmented Generation (RAG) tutorial
            - tutorials/classification/index.md: Classification with DSPy
            - tutorials/agents/index.ipynb: Building AI agents with DSPy
          Optimization:
            - learn/optimization/overview.md: Optimization techniques overview
            - tutorials/optimize_ai_program/index.md: Guide to optimizing AI programs
            - api/optimizers/BootstrapFewShot.md: Bootstrap few-shot optimizer
          Key Modules API:
            - api/modules/Predict.md: Basic prediction module
            - api/modules/ChainOfThought.md: Chain of thought reasoning
            - api/modules/ReAct.md: ReAct agent module
          Core API Reference:
            - api/signatures/Signature.md: Signature system documentation
            - api/primitives/Example.md: Example primitive for training data
          Production:
            - tutorials/deployment/index.md: Production deployment guide
            - tutorials/observability/index.md: Debugging and observability

extra:
    social:
        - icon: fontawesome/brands/github
          link: https://github.com/stanfordnlp/dspy
        - icon: fontawesome/brands/discord
          link: https://discord.gg/XCGy2WDCQB

extra_javascript:
    - "js/runllm-widget.js"

markdown_extensions:
    - toc:
        permalink: true
        toc_depth: 3
    - pymdownx.tabbed:
        alternate_style: true
    - pymdownx.highlight:
        anchor_linenums: true
    - pymdownx.inlinehilite
    - pymdownx.snippets
    - admonition
    - pymdownx.arithmatex:
        generic: true
    - footnotes
    - pymdownx.details
    - pymdownx.superfences
    - pymdownx.mark
    - attr_list
    - md_in_html
    - pymdownx.emoji:
        emoji_index: !!python/name:material.extensions.emoji.twemoji
        emoji_generator: !!python/name:material.extensions.emoji.to_svg

copyright: |
    &copy; 2025 <a href="https://github.com/stanfordnlp"  target="_blank" rel="noopener">DSPy</a>
```

--------------------------------------------------------------------------------
/docs/docs/tutorials/mcp/index.md:
--------------------------------------------------------------------------------

```markdown
# Tutorial: Use MCP tools in DSPy

MCP, standing for Model Context Protocol, is an open protocol that standardizes how applications
provide context to LLMs. Despite some development overhead, MCP offers a valuable opportunity to
share tools, resources, and prompts with other developers regardless of the technical stack you are
using. Likewise, you can use the tools built by other developers without rewriting code.

In this guide, we will walk you through how to use MCP tools in DSPy. For demonstration purposes,
we will build an airline service agent that can help users book flights and modify or cancel
existing bookings. This will rely on an MCP server with custom tools, but it should be easy to generalize
to [MCP servers built by the community](https://modelcontextprotocol.io/examples).

??? "How to run this tutorial"
    This tutorial cannot be run in hosted IPython notebooks like Google Colab or Databricks notebooks.
    To run the code, you will need to follow the guide to write code on your local device. The code
    is tested on macOS and should work the same way in Linux environments.

## Install Dependencies

Before starting, let's install the required dependencies:

```shell
pip install -U "dspy[mcp]"
```

## MCP Server Setup

Let's first set up the MCP server for the airline agent, which contains:

- A set of databases
  - User database, storing user information.
  - Flight database, storing flight information.
  - Ticket database, storing customer tickets.
- A set of tools
  - fetch_flight_info: get flight information for specific dates.
  - fetch_itinerary: get information about booked itineraries.
  - book_itinerary: book a flight on behalf of the user.
  - modify_itinerary: modify an itinerary, either through flight changes or cancellation.
  - get_user_info: get user information.
  - file_ticket: file a backlog ticket for human assistance.

In your working directory, create a file `mcp_server.py`, and paste the following content into
it:

```python
import random
import string

from mcp.server.fastmcp import FastMCP
from pydantic import BaseModel

# Create an MCP server
mcp = FastMCP("Airline Agent")


class Date(BaseModel):
    # Somehow LLM is bad at specifying `datetime.datetime`
    year: int
    month: int
    day: int
    hour: int


class UserProfile(BaseModel):
    user_id: str
    name: str
    email: str


class Flight(BaseModel):
    flight_id: str
    date_time: Date
    origin: str
    destination: str
    duration: float
    price: float


class Itinerary(BaseModel):
    confirmation_number: str
    user_profile: UserProfile
    flight: Flight


class Ticket(BaseModel):
    user_request: str
    user_profile: UserProfile


user_database = {
    "Adam": UserProfile(user_id="1", name="Adam", email="[email protected]"),
    "Bob": UserProfile(user_id="2", name="Bob", email="[email protected]"),
    "Chelsie": UserProfile(user_id="3", name="Chelsie", email="[email protected]"),
    "David": UserProfile(user_id="4", name="David", email="[email protected]"),
}

flight_database = {
    "DA123": Flight(
        flight_id="DA123",
        origin="SFO",
        destination="JFK",
        date_time=Date(year=2025, month=9, day=1, hour=1),
        duration=3,
        price=200,
    ),
    "DA125": Flight(
        flight_id="DA125",
        origin="SFO",
        destination="JFK",
        date_time=Date(year=2025, month=9, day=1, hour=7),
        duration=9,
        price=500,
    ),
    "DA456": Flight(
        flight_id="DA456",
        origin="SFO",
        destination="SNA",
        date_time=Date(year=2025, month=10, day=1, hour=1),
        duration=2,
        price=100,
    ),
    "DA460": Flight(
        flight_id="DA460",
        origin="SFO",
        destination="SNA",
        date_time=Date(year=2025, month=10, day=1, hour=9),
        duration=2,
        price=120,
    ),
}

itinery_database = {}
ticket_database = {}


@mcp.tool()
def fetch_flight_info(date: Date, origin: str, destination: str):
    """Fetch flight information from origin to destination on the given date"""
    flights = []

    for flight_id, flight in flight_database.items():
        if (
            flight.date_time.year == date.year
            and flight.date_time.month == date.month
            and flight.date_time.day == date.day
            and flight.origin == origin
            and flight.destination == destination
        ):
            flights.append(flight)
    return flights


@mcp.tool()
def fetch_itinerary(confirmation_number: str):
    """Fetch a booked itinerary information from database"""
    return itinery_database.get(confirmation_number)


@mcp.tool()
def pick_flight(flights: list[Flight]):
    """Pick up the best flight that matches users' request."""
    sorted_flights = sorted(
        flights,
        key=lambda x: (
            x.get("duration") if isinstance(x, dict) else x.duration,
            x.get("price") if isinstance(x, dict) else x.price,
        ),
    )
    return sorted_flights[0]


def generate_id(length=8):
    chars = string.ascii_lowercase + string.digits
    return "".join(random.choices(chars, k=length))


@mcp.tool()
def book_itinerary(flight: Flight, user_profile: UserProfile):
    """Book a flight on behalf of the user."""
    confirmation_number = generate_id()
    while confirmation_number in itinery_database:
        confirmation_number = generate_id()
    itinery_database[confirmation_number] = Itinerary(
        confirmation_number=confirmation_number,
        user_profile=user_profile,
        flight=flight,
    )
    return confirmation_number, itinery_database[confirmation_number]


@mcp.tool()
def cancel_itinerary(confirmation_number: str, user_profile: UserProfile):
    """Cancel an itinerary on behalf of the user."""
    if confirmation_number in itinery_database:
        del itinery_database[confirmation_number]
        return
    raise ValueError("Cannot find the itinerary, please check your confirmation number.")


@mcp.tool()
def get_user_info(name: str):
    """Fetch the user profile from database with given name."""
    return user_database.get(name)


@mcp.tool()
def file_ticket(user_request: str, user_profile: UserProfile):
    """File a customer support ticket if this is something the agent cannot handle."""
    ticket_id = generate_id(length=6)
    ticket_database[ticket_id] = Ticket(
        user_request=user_request,
        user_profile=user_profile,
    )
    return ticket_id


if __name__ == "__main__":
    mcp.run()
```

Before we start the server, let's take a look at the code.

We first create a `FastMCP` instance, which is a utility that helps quickly build an MCP server:

```python
mcp = FastMCP("Airline Agent")
```

Then we define our data structures, which in a real-world application would be the database schema, e.g.:

```python
class Flight(BaseModel):
    flight_id: str
    date_time: Date
    origin: str
    destination: str
    duration: float
    price: float
```

Following that, we initialize our database instances. In a real-world application, these would be connectors to
actual databases, but for simplicity, we just use dictionaries:

```python
user_database = {
    "Adam": UserProfile(user_id="1", name="Adam", email="[email protected]"),
    "Bob": UserProfile(user_id="2", name="Bob", email="[email protected]"),
    "Chelsie": UserProfile(user_id="3", name="Chelsie", email="[email protected]"),
    "David": UserProfile(user_id="4", name="David", email="[email protected]"),
}
```

The next step is to define the tools and mark them with `@mcp.tool()` so that they are discoverable by
MCP clients as MCP tools:

```python
@mcp.tool()
def fetch_flight_info(date: Date, origin: str, destination: str):
    """Fetch flight information from origin to destination on the given date"""
    flights = []

    for flight_id, flight in flight_database.items():
        if (
            flight.date_time.year == date.year
            and flight.date_time.month == date.month
            and flight.date_time.day == date.day
            and flight.origin == origin
            and flight.destination == destination
        ):
            flights.append(flight)
    return flights
```

The last step is spinning up the server:

```python
if __name__ == "__main__":
    mcp.run()
```

Now we have finished writing the server! Let's launch it:

```shell
python path_to_your_working_directory/mcp_server.py
```

## Write a DSPy Program That Utilizes Tools in MCP Server

Now that the server is running, let's build the actual airline service agent which
utilizes the MCP tools in our server to assist users. In your working directory,
create a file named `dspy_mcp_agent.py`, and follow the guide to add code to it.

### Gather Tools from MCP Servers

We first need to gather all available tools from the MCP server and make them
usable by DSPy. DSPy provides an API [`dspy.Tool`](https://dspy.ai/api/primitives/Tool/)
as the standard tool interface. Let's convert all the MCP tools to `dspy.Tool`.

We need to create an MCP client instance to communicate with the MCP server, fetch all available
tools, and convert them to `dspy.Tool` using the static method `from_mcp_tool`:

```python
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

# Create server parameters for stdio connection
server_params = StdioServerParameters(
    command="python",  # Executable
    args=["path_to_your_working_directory/mcp_server.py"],
    env=None,
)

async def run():
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize the connection
            await session.initialize()
            # List available tools
            tools = await session.list_tools()

            # Convert MCP tools to DSPy tools
            dspy_tools = []
            for tool in tools.tools:
                dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool))

            print(len(dspy_tools))
            print(dspy_tools[0].args)

if __name__ == "__main__":
    import asyncio

    asyncio.run(run())
```

With the code above, we have successfully collected all available MCP tools and converted
them to DSPy tools.


### Build a DSPy Agent to Handle Customer Requests

Now we will use `dspy.ReAct` to build the agent for handling customer requests. `ReAct` stands
for "reasoning and acting," which asks the LLM to decide whether to call a tool or wrap up the process.
If a tool is required, the LLM takes responsibility for deciding which tool to call and providing
the appropriate arguments.

As usual, we need to create a `dspy.Signature` to define the input and output of our agent:

```python
import dspy

class DSPyAirlineCustomerService(dspy.Signature):
    """You are an airline customer service agent. You are given a list of tools to handle user requests. You should decide the right tool to use in order to fulfill users' requests."""

    user_request: str = dspy.InputField()
    process_result: str = dspy.OutputField(
        desc=(
            "Message that summarizes the process result, and the information users need, "
            "e.g., the confirmation_number if it's a flight booking request."
        )
    )
```

And choose an LM for our agent:

```python
dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
```

Then we create the ReAct agent by passing the tools and signature into the `dspy.ReAct` API. We can now
put together the complete code script:

```python
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

import dspy

# Create server parameters for stdio connection
server_params = StdioServerParameters(
    command="python",  # Executable
    args=["script_tmp/mcp_server.py"],  # Optional command line arguments
    env=None,  # Optional environment variables
)


class DSPyAirlineCustomerService(dspy.Signature):
    """You are an airline customer service agent. You are given a list of tools to handle user requests.
    You should decide the right tool to use in order to fulfill users' requests."""

    user_request: str = dspy.InputField()
    process_result: str = dspy.OutputField(
        desc=(
            "Message that summarizes the process result, and the information users need, "
            "e.g., the confirmation_number if it's a flight booking request."
        )
    )


dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))


async def run(user_request):
    async with stdio_client(server_params) as (read, write):
        async with ClientSession(read, write) as session:
            # Initialize the connection
            await session.initialize()
            # List available tools
            tools = await session.list_tools()

            # Convert MCP tools to DSPy tools
            dspy_tools = []
            for tool in tools.tools:
                dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool))

            # Create the agent
            react = dspy.ReAct(DSPyAirlineCustomerService, tools=dspy_tools)

            result = await react.acall(user_request=user_request)
            print(result)


if __name__ == "__main__":
    import asyncio

    asyncio.run(run("please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam"))
```

Note that we must call `react.acall` because MCP tools are async by default. Let's execute the script:

```shell
python path_to_your_working_directory/dspy_mcp_agent.py
```

You should see output similar to this:

```
Prediction(
    trajectory={'thought_0': 'I need to fetch flight information for Adam from SFO to JFK on 09/01/2025 to find available flights for booking.', 'tool_name_0': 'fetch_flight_info', 'tool_args_0': {'date': {'year': 2025, 'month': 9, 'day': 1, 'hour': 0}, 'origin': 'SFO', 'destination': 'JFK'}, 'observation_0': ['{"flight_id": "DA123", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 1}, "origin": "SFO", "destination": "JFK", "duration": 3.0, "price": 200.0}', '{"flight_id": "DA125", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 7}, "origin": "SFO", "destination": "JFK", "duration": 9.0, "price": 500.0}'], ..., 'tool_name_4': 'finish', 'tool_args_4': {}, 'observation_4': 'Completed.'},
    reasoning="I successfully booked a flight for Adam from SFO to JFK on 09/01/2025. I found two available flights, selected the more economical option (flight DA123 at 1 AM for $200), retrieved Adam's user profile, and completed the booking process. The confirmation number for the flight is 8h7clk3q.",
    process_result='Your flight from SFO to JFK on 09/01/2025 has been successfully booked. Your confirmation number is 8h7clk3q.'
)
```

The `trajectory` field contains the entire thinking and acting process. If you're curious about what's happening
under the hood, check out the [Observability Guide](https://dspy.ai/tutorials/observability/) to set up MLflow,
which visualizes every step happening inside `dspy.ReAct`!


## Conclusion

In this guide, we built an airline service agent that utilizes a custom MCP server and the `dspy.ReAct` module. In the context
of MCP support, DSPy provides a simple interface for interacting with MCP tools, giving you the flexibility to implement
any functionality you need.

```

--------------------------------------------------------------------------------
/docs/docs/learn/optimization/optimizers.md:
--------------------------------------------------------------------------------

```markdown
---
sidebar_position: 1
---

# DSPy Optimizers (formerly Teleprompters)


A **DSPy optimizer** is an algorithm that can tune the parameters of a DSPy program (i.e., the prompts and/or the LM weights) to maximize the metrics you specify, like accuracy.


A typical DSPy optimizer takes three things:

- Your **DSPy program**. This may be a single module (e.g., `dspy.Predict`) or a complex multi-module program.

- Your **metric**. This is a function that evaluates the output of your program, and assigns it a score (higher is better).

- A few **training inputs**. This may be very small (i.e., only 5 or 10 examples) and incomplete (only inputs to your program, without any labels).

If you happen to have a lot of data, DSPy can leverage that. But you can start small and get strong results.

**Note:** Formerly called teleprompters. We are making an official name update, which will be reflected throughout the library and documentation.


## What does a DSPy Optimizer tune? How does it tune them?

Different optimizers in DSPy will tune your program's quality by **synthesizing good few-shot examples** for every module, like `dspy.BootstrapRS`,<sup>[1](https://arxiv.org/abs/2310.03714)</sup> **proposing and intelligently exploring better natural-language instructions** for every prompt, like `dspy.MIPROv2`,<sup>[2](https://arxiv.org/abs/2406.11695)</sup> and `dspy.GEPA`,<sup>[3](https://arxiv.org/abs/2507.19457)</sup> and **building datasets for your modules and using them to finetune the LM weights** in your system, like `dspy.BootstrapFinetune`.<sup>[4](https://arxiv.org/abs/2407.10930)</sup>

??? "What's an example of a DSPy optimizer? How do different optimizers work?"

    Take the `dspy.MIPROv2` optimizer as an example. First, MIPRO starts with the **bootstrapping stage**. It takes your program, which may be unoptimized at this point, and runs it many times across different inputs to collect traces of input/output behavior for each one of your modules. It filters these traces to keep only those that appear in trajectories scored highly by your metric. Second, MIPRO enters its **grounded proposal stage**. It previews your DSPy program's code, your data, and traces from running your program, and uses them to draft many potential instructions for every prompt in your program. Third, MIPRO launches the **discrete search stage**. It samples mini-batches from your training set, proposes a combination of instructions and traces to use for constructing every prompt in the pipeline, and evaluates the candidate program on the mini-batch. Using the resulting score, MIPRO updates a surrogate model that helps the proposals get better over time.

    One thing that makes DSPy optimizers so powerful is that they can be composed. You can run `dspy.MIPROv2` and use the produced program as an input to `dspy.MIPROv2` again or, say, to `dspy.BootstrapFinetune` to get better results. This is partly the essence of `dspy.BetterTogether`. Alternatively, you can run the optimizer and then extract the top-5 candidate programs and build a `dspy.Ensemble` of them. This allows you to scale _inference-time compute_ (e.g., ensembles) as well as DSPy's unique _pre-inference time compute_ (i.e., optimization budget) in highly systematic ways.



## What DSPy Optimizers are currently available?

Optimizers can be accessed via `from dspy.teleprompt import *`.

### Automatic Few-Shot Learning

These optimizers extend the signature by automatically generating and including **optimized** examples within the prompt sent to the model, implementing few-shot learning.

1. [**`LabeledFewShot`**](../../api/optimizers/LabeledFewShot.md): Simply constructs few-shot examples (demos) from provided labeled input and output data points.  Requires `k` (number of examples for the prompt) and `trainset` to randomly select `k` examples from.

2. [**`BootstrapFewShot`**](../../api/optimizers/BootstrapFewShot.md): Uses a `teacher` module (which defaults to your program) to generate complete demonstrations for every stage of your program, along with labeled examples in `trainset`. Parameters include `max_labeled_demos` (the number of demonstrations randomly selected from the `trainset`) and `max_bootstrapped_demos` (the number of additional examples generated by the `teacher`). The bootstrapping process employs the metric to validate demonstrations, including only those that pass the metric in the "compiled" prompt. Advanced: Supports using a `teacher` program that is a *different* DSPy program that has compatible structure, for harder tasks.

3. [**`BootstrapFewShotWithRandomSearch`**](../../api/optimizers/BootstrapFewShotWithRandomSearch.md): Applies `BootstrapFewShot` several times with random search over generated demonstrations, and selects the best program over the optimization. Parameters mirror those of `BootstrapFewShot`, with the addition of `num_candidate_programs`, which specifies the number of random programs evaluated over the optimization, including candidates of the uncompiled program, `LabeledFewShot` optimized program, `BootstrapFewShot` compiled program with unshuffled examples and `num_candidate_programs` of `BootstrapFewShot` compiled programs with randomized example sets.

4. [**`KNNFewShot`**](../../api/optimizers/KNNFewShot.md). Uses k-Nearest Neighbors algorithm to find the nearest training example demonstrations for a given input example. These nearest neighbor demonstrations are then used as the trainset for the BootstrapFewShot optimization process.


### Automatic Instruction Optimization

These optimizers produce optimal instructions for the prompt and, in the case of MIPROv2 can also optimize the set of few-shot demonstrations.

5. [**`COPRO`**](../../api/optimizers/COPRO.md): Generates and refines new instructions for each step, and optimizes them with coordinate ascent (hill-climbing using the metric function and the `trainset`). Parameters include `depth` which is the number of iterations of prompt improvement the optimizer runs over.

6. [**`MIPROv2`**](../../api/optimizers/MIPROv2.md): Generates instructions *and* few-shot examples in each step. The instruction generation is data-aware and demonstration-aware. Uses Bayesian Optimization to effectively search over the space of generation instructions/demonstrations across your modules.

7. [**`SIMBA`**](../../api/optimizers/SIMBA.md)

8. [**`GEPA`**](../../api/optimizers/GEPA/overview.md): Uses LM's to reflect on the DSPy program's trajectory, to identify what worked, what didn't and propose prompts addressing the gaps. Additionally, GEPA can leverage domain-specific textual feedback to rapidly improve the DSPy program. Detailed tutorials on using GEPA are available at [dspy.GEPA Tutorials](../../tutorials/gepa_ai_program/index.md).

### Automatic Finetuning

This optimizer is used to fine-tune the underlying LLM(s).

9. [**`BootstrapFinetune`**](/api/optimizers/BootstrapFinetune): Distills a prompt-based DSPy program into weight updates. The output is a DSPy program that has the same steps, but where each step is conducted by a finetuned model instead of a prompted LM. [See the classification fine-tuning tutorial](https://dspy.ai/tutorials/classification_finetuning/) for a complete example.


### Program Transformations

10. [**`Ensemble`**](../../api/optimizers/Ensemble.md): Ensembles a set of DSPy programs and either uses the full set or randomly samples a subset into a single program.


## Which optimizer should I use?

Ultimately, finding the ‘right’ optimizer to use & the best configuration for your task will require experimentation. Success in DSPy is still an iterative process - getting the best performance on your task will require you to explore and iterate.  

That being said, here's the general guidance on getting started:

- If you have **very few examples** (around 10), start with `BootstrapFewShot`.
- If you have **more data** (50 examples or more), try  `BootstrapFewShotWithRandomSearch`.
- If you prefer to do **instruction optimization only** (i.e. you want to keep your prompt 0-shot), use `MIPROv2` [configured for 0-shot optimization](../../api/optimizers/MIPROv2.md). 
- If you’re willing to use more inference calls to perform **longer optimization runs** (e.g. 40 trials or more), and have enough data (e.g. 200 examples or more to prevent overfitting) then try `MIPROv2`. 
- If you have been able to use one of these with a large LM (e.g., 7B parameters or above) and need a very **efficient program**, finetune a small LM for your task with `BootstrapFinetune`.

## How do I use an optimizer?

They all share this general interface, with some differences in the keyword arguments (hyperparameters). A full list can be found in the [API reference](../../api/optimizers/BetterTogether.md).

Let's see this with the most common one, `BootstrapFewShotWithRandomSearch`.

```python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch

# Set up the optimizer: we want to "bootstrap" (i.e., self-generate) 8-shot examples of your program's steps.
# The optimizer will repeat this 10 times (plus some initial attempts) before selecting its best attempt on the devset.
config = dict(max_bootstrapped_demos=4, max_labeled_demos=4, num_candidate_programs=10, num_threads=4)

teleprompter = BootstrapFewShotWithRandomSearch(metric=YOUR_METRIC_HERE, **config)
optimized_program = teleprompter.compile(YOUR_PROGRAM_HERE, trainset=YOUR_TRAINSET_HERE)
```


!!! info "Getting Started III: Optimizing the LM prompts or weights in DSPy programs"
    A typical simple optimization run costs on the order of $2 USD and takes around ten minutes, but be careful when running optimizers with very large LMs or very large datasets.
    Optimizer runs can cost as little as a few cents or up to tens of dollars, depending on your LM, dataset, and configuration.
    
    === "Optimizing prompts for a ReAct agent"
        This is a minimal but fully runnable example of setting up a `dspy.ReAct` agent that answers questions via
        search from Wikipedia and then optimizing it using `dspy.MIPROv2` in the cheap `light` mode on 500
        question-answer pairs sampled from the `HotPotQA` dataset.

        ```python linenums="1"
        import dspy
        from dspy.datasets import HotPotQA

        dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))

        def search(query: str) -> list[str]:
            """Retrieves abstracts from Wikipedia."""
            results = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')(query, k=3)
            return [x['text'] for x in results]

        trainset = [x.with_inputs('question') for x in HotPotQA(train_seed=2024, train_size=500).train]
        react = dspy.ReAct("question -> answer", tools=[search])

        tp = dspy.MIPROv2(metric=dspy.evaluate.answer_exact_match, auto="light", num_threads=24)
        optimized_react = tp.compile(react, trainset=trainset)
        ```

        An informal run similar to this on DSPy 2.5.29 raises ReAct's score from 24% to 51%.

    === "Optimizing prompts for RAG"
        Given a retrieval index to `search`, your favorite `dspy.LM`, and a small `trainset` of questions and ground-truth responses, the following code snippet can optimize your RAG system with long outputs against the built-in `dspy.SemanticF1` metric, which is implemented as a DSPy module.

        ```python linenums="1"
        class RAG(dspy.Module):
            def __init__(self, num_docs=5):
                self.num_docs = num_docs
                self.respond = dspy.ChainOfThought('context, question -> response')

            def forward(self, question):
                context = search(question, k=self.num_docs)   # not defined in this snippet, see link above
                return self.respond(context=context, question=question)

        tp = dspy.MIPROv2(metric=dspy.SemanticF1(), auto="medium", num_threads=24)
        optimized_rag = tp.compile(RAG(), trainset=trainset, max_bootstrapped_demos=2, max_labeled_demos=2)
        ```

        For a complete RAG example that you can run, start this [tutorial](../../tutorials/rag/index.ipynb). It improves the quality of a RAG system over a subset of StackExchange communities from 53% to 61%.

    === "Optimizing weights for Classification"
        <details><summary>Click to show dataset setup code.</summary>

        ```python linenums="1"
        import random
        from typing import Literal

        from datasets import load_dataset

        import dspy
        from dspy.datasets import DataLoader

        # Load the Banking77 dataset.
        CLASSES = load_dataset("PolyAI/banking77", split="train", trust_remote_code=True).features["label"].names
        kwargs = {"fields": ("text", "label"), "input_keys": ("text",), "split": "train", "trust_remote_code": True}

        # Load the first 2000 examples from the dataset, and assign a hint to each *training* example.
        trainset = [
            dspy.Example(x, hint=CLASSES[x.label], label=CLASSES[x.label]).with_inputs("text", "hint")
            for x in DataLoader().from_huggingface(dataset_name="PolyAI/banking77", **kwargs)[:2000]
        ]
        random.Random(0).shuffle(trainset)
        ```
        </details>

        ```python linenums="1"
        import dspy
        lm=dspy.LM('openai/gpt-4o-mini-2024-07-18')

        # Define the DSPy module for classification. It will use the hint at training time, if available.
        signature = dspy.Signature("text, hint -> label").with_updated_fields('label', type_=Literal[tuple(CLASSES)])
        classify = dspy.ChainOfThought(signature)
        classify.set_lm(lm)

        # Optimize via BootstrapFinetune.
        optimizer = dspy.BootstrapFinetune(metric=(lambda x, y, trace=None: x.label == y.label), num_threads=24)
        optimized = optimizer.compile(classify, trainset=trainset)

        optimized(text="What does a pending cash withdrawal mean?")
        
        # For a complete fine-tuning tutorial, see: https://dspy.ai/tutorials/classification_finetuning/
        ```

        **Possible Output (from the last line):**
        ```text
        Prediction(
            reasoning='A pending cash withdrawal indicates that a request to withdraw cash has been initiated but has not yet been completed or processed. This status means that the transaction is still in progress and the funds have not yet been deducted from the account or made available to the user.',
            label='pending_cash_withdrawal'
        )
        ```

        An informal run similar to this on DSPy 2.5.29 raises GPT-4o-mini's score 66% to 87%.


## Saving and loading optimizer output

After running a program through an optimizer, it's useful to also save it. At a later point, a program can be loaded from a file and used for inference. For this, the `load` and `save` methods can be used.

```python
optimized_program.save(YOUR_SAVE_PATH)
```

The resulting file is in plain-text JSON format. It contains all the parameters and steps in the source program. You can always read it and see what the optimizer generated.


To load a program from a file, you can instantiate an object from that class and then call the load method on it.

```python
loaded_program = YOUR_PROGRAM_CLASS()
loaded_program.load(path=YOUR_SAVE_PATH)
```


```

--------------------------------------------------------------------------------
/dspy/evaluate/evaluate.py:
--------------------------------------------------------------------------------

```python
import csv
import importlib
import json
import logging
import types
from typing import TYPE_CHECKING, Any, Callable

if TYPE_CHECKING:
    import pandas as pd

import tqdm

import dspy
from dspy.primitives.prediction import Prediction
from dspy.utils.callback import with_callbacks
from dspy.utils.parallelizer import ParallelExecutor

try:
    from IPython.display import HTML
    from IPython.display import display as display

except ImportError:

    def display(obj: Any):
        """
        Display the specified Python object in the console.

        :param obj: The Python object to display.
        """
        print(obj)

    def HTML(x: str) -> str:  # noqa: N802
        """
        Obtain the HTML representation of the specified string.
        """
        # NB: This method exists purely for code compatibility with the IPython HTML() function in
        # environments where IPython is not available. In such environments where IPython is not
        # available, this method will simply return the input string.
        return x


# TODO: Counting failures and having a max_failure count. When that is exceeded (also just at the end),
# we print the number of failures, the first N examples that failed, and the first N exceptions raised.

logger = logging.getLogger(__name__)


class EvaluationResult(Prediction):
    """
    A class that represents the result of an evaluation.
    It is a subclass of `dspy.Prediction` that contains the following fields

    - score: An float value (e.g., 67.30) representing the overall performance
    - results: a list of (example, prediction, score) tuples for each example in devset
    """

    def __init__(self, score: float, results: list[tuple["dspy.Example", "dspy.Example", Any]]):
        super().__init__(score=score, results=results)

    def __repr__(self):
        return f"EvaluationResult(score={self.score}, results=<list of {len(self.results)} results>)"


class Evaluate:
    """DSPy Evaluate class.

    This class is used to evaluate the performance of a DSPy program. Users need to provide a evaluation dataset and
    a metric function in order to use this class. This class supports parallel evaluation on the provided dataset.
    """

    def __init__(
        self,
        *,
        devset: list["dspy.Example"],
        metric: Callable | None = None,
        num_threads: int | None = None,
        display_progress: bool = False,
        display_table: bool | int = False,
        max_errors: int | None = None,
        provide_traceback: bool | None = None,
        failure_score: float = 0.0,
        save_as_csv: str | None = None,
        save_as_json: str | None = None,
        **kwargs,
    ):
        """
        Args:
            devset (list[dspy.Example]): the evaluation dataset.
            metric (Callable): The metric function to use for evaluation.
            num_threads (Optional[int]): The number of threads to use for parallel evaluation.
            display_progress (bool): Whether to display progress during evaluation.
            display_table (Union[bool, int]): Whether to display the evaluation results in a table.
                If a number is passed, the evaluation results will be truncated to that number before displayed.
            max_errors (Optional[int]): The maximum number of errors to allow before
                stopping evaluation. If ``None``, inherits from ``dspy.settings.max_errors``.
            provide_traceback (Optional[bool]): Whether to provide traceback information during evaluation.
            failure_score (float): The default score to use if evaluation fails due to an exception.
            save_as_csv (Optional[str]): The file name where the csv will be saved.
            save_as_json (Optional[str]): The file name where the json will be saved.

        """
        self.devset = devset
        self.metric = metric
        self.num_threads = num_threads
        self.display_progress = display_progress
        self.display_table = display_table
        self.max_errors = max_errors
        self.provide_traceback = provide_traceback
        self.failure_score = failure_score
        self.save_as_csv = save_as_csv
        self.save_as_json = save_as_json

        if "return_outputs" in kwargs:
            raise ValueError("`return_outputs` is no longer supported. Results are always returned inside the `results` field of the `EvaluationResult` object.")

    @with_callbacks
    def __call__(
        self,
        program: "dspy.Module",
        metric: Callable | None = None,
        devset: list["dspy.Example"] | None = None,
        num_threads: int | None = None,
        display_progress: bool | None = None,
        display_table: bool | int | None = None,
        callback_metadata: dict[str, Any] | None = None,
        save_as_csv: str | None = None,
        save_as_json: str | None = None,
    ) -> EvaluationResult:
        """
        Args:
            program (dspy.Module): The DSPy program to evaluate.
            metric (Callable): The metric function to use for evaluation. if not provided, use `self.metric`.
            devset (list[dspy.Example]): the evaluation dataset. if not provided, use `self.devset`.
            num_threads (Optional[int]): The number of threads to use for parallel evaluation. if not provided, use
                `self.num_threads`.
            display_progress (bool): Whether to display progress during evaluation. if not provided, use
                `self.display_progress`.
            display_table (Union[bool, int]): Whether to display the evaluation results in a table. if not provided, use
                `self.display_table`. If a number is passed, the evaluation results will be truncated to that number before displayed.
            callback_metadata (dict): Metadata to be used for evaluate callback handlers.

        Returns:
            The evaluation results are returned as a dspy.EvaluationResult object containing the following attributes:

            - score: A float percentage score (e.g., 67.30) representing overall performance

            - results: a list of (example, prediction, score) tuples for each example in devset
        """
        metric = metric if metric is not None else self.metric
        devset = devset if devset is not None else self.devset
        num_threads = num_threads if num_threads is not None else self.num_threads
        display_progress = display_progress if display_progress is not None else self.display_progress
        display_table = display_table if display_table is not None else self.display_table
        save_as_csv = save_as_csv if save_as_csv is not None else self.save_as_csv
        save_as_json = save_as_json if save_as_json is not None else self.save_as_json

        if callback_metadata:
            logger.debug(f"Evaluate is called with callback metadata: {callback_metadata}")

        tqdm.tqdm._instances.clear()

        executor = ParallelExecutor(
            num_threads=num_threads,
            disable_progress_bar=not display_progress,
            max_errors=(self.max_errors if self.max_errors is not None else dspy.settings.max_errors),
            provide_traceback=self.provide_traceback,
            compare_results=True,
        )

        def process_item(example):
            prediction = program(**example.inputs())
            score = metric(example, prediction)
            return prediction, score

        results = executor.execute(process_item, devset)
        assert len(devset) == len(results)

        results = [((dspy.Prediction(), self.failure_score) if r is None else r) for r in results]
        results = [(example, prediction, score) for example, (prediction, score) in zip(devset, results, strict=False)]
        ncorrect, ntotal = sum(score for *_, score in results), len(devset)

        logger.info(f"Average Metric: {ncorrect} / {ntotal} ({round(100 * ncorrect / ntotal, 1)}%)")

        if display_table:
            if importlib.util.find_spec("pandas") is not None:
                # Rename the 'correct' column to the name of the metric object
                metric_name = metric.__name__ if isinstance(metric, types.FunctionType) else metric.__class__.__name__
                # Construct a pandas DataFrame from the results
                result_df = self._construct_result_table(results, metric_name)

                self._display_result_table(result_df, display_table, metric_name)
            else:
                logger.warning("Skipping table display since `pandas` is not installed.")

        if save_as_csv:
            metric_name = (
                metric.__name__
                if isinstance(metric, types.FunctionType)
                else metric.__class__.__name__
            )
            data = self._prepare_results_output(results, metric_name)

            with open(save_as_csv, "w", newline="") as csvfile:
                fieldnames = data[0].keys()
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)

                writer.writeheader()
                for row in data:
                    writer.writerow(row)
        if save_as_json:
            metric_name = (
                metric.__name__
                if isinstance(metric, types.FunctionType)
                else metric.__class__.__name__
            )
            data = self._prepare_results_output(results, metric_name)
            with open(
                    save_as_json,
                    "w",
            ) as f:
                json.dump(data, f)

        return EvaluationResult(
            score=round(100 * ncorrect / ntotal, 2),
            results=results,
        )

    @staticmethod
    def _prepare_results_output(
            results: list[tuple["dspy.Example", "dspy.Example", Any]], metric_name: str
    ):
        return [
            (
                merge_dicts(example, prediction) | {metric_name: score}
                if prediction_is_dictlike(prediction)
                else dict(example) | {"prediction": prediction, metric_name: score}
            )
            for example, prediction, score in results
        ]

    def _construct_result_table(
        self, results: list[tuple["dspy.Example", "dspy.Example", Any]], metric_name: str
    ) -> "pd.DataFrame":
        """
        Construct a pandas DataFrame from the specified result list.
        Let's not try to change the name of this method as it may be patched by external tracing tools.

        Args:
            results: The list of results to construct the result DataFrame from.
            metric_name: The name of the metric used for evaluation.

        Returns:
            The constructed pandas DataFrame.
        """
        import pandas as pd

        data = self._prepare_results_output(results, metric_name)

        # Truncate every cell in the DataFrame (DataFrame.applymap was renamed to DataFrame.map in Pandas 2.1.0)
        result_df = pd.DataFrame(data)
        result_df = result_df.map(truncate_cell) if hasattr(result_df, "map") else result_df.applymap(truncate_cell)

        return result_df.rename(columns={"correct": metric_name})

    def _display_result_table(self, result_df: "pd.DataFrame", display_table: bool | int, metric_name: str):
        """
        Display the specified result DataFrame in a table format.

        Args:
            result_df: The result DataFrame to display.
            display_table: Whether to display the evaluation results in a table.
                If a number is passed, the evaluation results will be truncated to that number before displayed.
            metric_name: The name of the metric used for evaluation.
        """
        if isinstance(display_table, bool):
            df_to_display = result_df.copy()
            truncated_rows = 0
        else:
            df_to_display = result_df.head(display_table).copy()
            truncated_rows = len(result_df) - display_table

        df_to_display = stylize_metric_name(df_to_display, metric_name)

        display_dataframe(df_to_display)

        if truncated_rows > 0:
            # Simplified message about the truncated rows
            message = f"""
            <div style='
                text-align: center;
                font-size: 16px;
                font-weight: bold;
                color: #555;
                margin: 10px 0;'>
                ... {truncated_rows} more rows not displayed ...
            </div>
            """
            display(HTML(message))


def prediction_is_dictlike(prediction):
    # Downstream logic for displaying dictionary-like predictions depends solely on the predictions
    # having a method called `items()` for iterating through key/value pairs
    return hasattr(prediction, "items") and callable(prediction.items)


def merge_dicts(d1, d2) -> dict:
    merged = {}
    for k, v in d1.items():
        if k in d2:
            merged[f"example_{k}"] = v
        else:
            merged[k] = v

    for k, v in d2.items():
        if k in d1:
            merged[f"pred_{k}"] = v
        else:
            merged[k] = v

    return merged


def truncate_cell(content) -> str:
    """Truncate content of a cell to 25 words."""
    words = str(content).split()
    if len(words) > 25:
        return " ".join(words[:25]) + "..."
    return content


def stylize_metric_name(df: "pd.DataFrame", metric_name: str) -> "pd.DataFrame":
    """
    Stylize the cell contents of a pandas DataFrame corresponding to the specified metric name.

    :param df: The pandas DataFrame for which to stylize cell contents.
    :param metric_name: The name of the metric for which to stylize DataFrame cell contents.
    """
    def format_metric(x):
        if isinstance(x, float):
            return f"✔️ [{x:.3f}]"
        elif x is not None:
            return f"✔️ [{x}]"
        else:
            return ""
    df[metric_name] = df[metric_name].apply(format_metric)
    return df


def display_dataframe(df: "pd.DataFrame"):
    """
    Display the specified Pandas DataFrame in the console.

    :param df: The Pandas DataFrame to display.
    """
    import pandas as pd

    if is_in_ipython_notebook_environment():
        display(configure_dataframe_for_ipython_notebook_display(df))
    else:
        # Pretty print the DataFrame to the console
        with pd.option_context(
            "display.max_rows", None, "display.max_columns", None
        ):  # more options can be specified also
            print(df)


def configure_dataframe_for_ipython_notebook_display(df: "pd.DataFrame") -> "pd.DataFrame":
    """Set various pandas display options for DataFrame in an IPython notebook environment."""
    import pandas as pd

    pd.options.display.max_colwidth = 70
    return df


def is_in_ipython_notebook_environment():
    """
    Check if the current environment is an IPython notebook environment.

    :return: True if the current environment is an IPython notebook environment, False otherwise.
    """
    try:
        from IPython import get_ipython

        # This is a best-effort check to see if we are in an IPython notebook environment
        return "IPKernelApp" in getattr(get_ipython(), "config", {})
    except ImportError:
        return False


# FIXME: TODO: The merge_dicts stuff above is way too quick and dirty.
# TODO: the display_table can't handle False but can handle 0!
# Not sure how it works with True exactly, probably fails too.

```

--------------------------------------------------------------------------------
/dspy/teleprompt/utils.py:
--------------------------------------------------------------------------------

```python
import inspect
import logging
import math
import os
import random
import shutil
import sys

import numpy as np

try:
    from IPython.core.magics.code import extract_symbols
except ImportError:
    # Won't be able to read code from jupyter notebooks
    extract_symbols = None

import dspy
from dspy.teleprompt.bootstrap import BootstrapFewShot, LabeledFewShot

"""
This file consists of helper functions for our variety of optimizers.
"""

### OPTIMIZER TRAINING UTILS ###

logger = logging.getLogger(__name__)


def create_minibatch(trainset, batch_size=50, rng=None):
    """Create a minibatch from the trainset."""

    # Ensure batch_size isn't larger than the size of the dataset
    batch_size = min(batch_size, len(trainset))

    # If no RNG is provided, fall back to the global random instance
    rng = rng or random

    # Randomly sample indices for the mini-batch using the provided rng
    sampled_indices = rng.sample(range(len(trainset)), batch_size)

    # Create the mini-batch using the sampled indices
    minibatch = [trainset[i] for i in sampled_indices]

    return minibatch


def eval_candidate_program(batch_size, trainset, candidate_program, evaluate, rng=None):
    """Evaluate a candidate program on the trainset, using the specified batch size."""

    try:
        # Evaluate on the full trainset
        if batch_size >= len(trainset):
            return evaluate(candidate_program, devset=trainset, callback_metadata={"metric_key": "eval_full"})
        # Or evaluate on a minibatch
        else:
            return evaluate(
                candidate_program,
                devset=create_minibatch(trainset, batch_size, rng),
                callback_metadata={"metric_key": "eval_minibatch"}
            )
    except Exception:
        logger.error("An exception occurred during evaluation", exc_info=True)
        # TODO: Handle this better, as -ve scores are possible
        return dspy.Prediction(score=0.0, results=[])


def eval_candidate_program_with_pruning(
    trial,
    trial_logs,
    trainset,
    candidate_program,
    evaluate,
    trial_num,
    batch_size=100,
):
    """Evaluation of candidate_program with pruning implemented"""

    # Evaluate with the new prompts
    total_score = 0
    num_batches = math.ceil(len(trainset) / batch_size)
    total_eval_size = 0

    for i in range(num_batches):
        start_index = i * batch_size
        end_index = min((i + 1) * batch_size, len(trainset))
        split_trainset = trainset[start_index:end_index]
        split_score = evaluate(
            candidate_program,
            devset=split_trainset,
            display_table=0,
        )
        print(f"{i}st split score: {split_score}")
        total_eval_size += len(split_trainset)

        total_score += split_score * len(split_trainset)
        curr_weighted_avg_score = total_score / min((i + 1) * batch_size, len(trainset))
        print(f"curr average score: {curr_weighted_avg_score}")

        trial.report(curr_weighted_avg_score, i)

        # Handle pruning based on the intermediate value.
        if trial.should_prune():
            print("Trial pruned.")
            trial_logs[trial_num]["score"] = curr_weighted_avg_score
            trial_logs[trial_num]["num_eval_calls"] = total_eval_size
            trial_logs[trial_num]["pruned"] = True
            return curr_weighted_avg_score, trial_logs, total_eval_size, True

    print(f"Fully evaled score: {curr_weighted_avg_score}")
    score = curr_weighted_avg_score

    trial_logs[trial_num]["full_eval"] = False
    trial_logs[trial_num]["score"] = score
    trial_logs[trial_num]["pruned"] = False
    return score, trial_logs, total_eval_size, False


def get_program_with_highest_avg_score(param_score_dict, fully_evaled_param_combos):
    """Used as a helper function for bayesian + minibatching optimizers. Returns the program with the highest average score from the batches evaluated so far."""

    # Calculate the mean for each combination of categorical parameters, based on past trials
    results = []
    for key, values in param_score_dict.items():
        scores = np.array([v[0] for v in values])
        mean = np.average(scores)
        program = values[0][1]
        params = values[0][2]
        results.append((key, mean, program, params))

    # Sort results by the mean
    sorted_results = sorted(results, key=lambda x: x[1], reverse=True)

    # Find the combination with the highest mean, skip fully evaluated ones
    for combination in sorted_results:
        key, mean, program, params = combination

        if key in fully_evaled_param_combos:
            continue

        return program, mean, key, params

    # If no valid program is found, we return the last valid one that we found
    return program, mean, key, params


def calculate_last_n_proposed_quality(
    base_program,
    trial_logs,
    evaluate,
    trainset,
    devset,
    n,
):
    """
    Calculate the average and best quality of the last n programs proposed. This is useful for seeing if our proposals
    are actually 'improving' overtime or not.
    """
    # Get the trials from the last n keys in trial logs
    last_n_trial_nums = list(trial_logs.keys())[-n:]

    # Calculate the average and best score of these trials
    # if num_eval_calls in the trial is less than the trainset, throw a not-implemented error for now
    total_train_score = 0
    best_train_score = 0
    total_dev_score = 0
    best_dev_score = 0
    for trial_num in last_n_trial_nums:
        full_eval = trial_logs[trial_num]["full_eval"]
        if not full_eval:
            raise NotImplementedError(
                "Still need to implement non full eval handling in calculate_last_n_proposed_quality",
            )
        train_score = trial_logs[trial_num]["score"]
        program = base_program.deepcopy()
        program.load(trial_logs[trial_num]["program_path"])

        dev_score = evaluate(program, devset=devset)

        total_train_score += train_score
        total_dev_score += dev_score
        if train_score > best_train_score:
            best_train_score = train_score
            best_dev_score = dev_score

    return best_train_score, total_train_score / n, best_dev_score, total_dev_score / n


### LOGGING UTILS ###


def get_task_model_history_for_full_example(
    candidate_program,
    task_model,
    devset,
    evaluate,
):
    """Get a full trace of the task model's history for a given candidate program."""
    _ = evaluate(candidate_program, devset=devset[:1])
    _ = task_model.inspect_history(n=len(candidate_program.predictors()))
    return task_model.inspect_history(n=len(candidate_program.predictors()))


def print_full_program(program):
    """Print out the program's instructions & prefixes for each module."""
    for i, predictor in enumerate(program.predictors()):
        print(f"Predictor {i}")
        print(f"i: {get_signature(predictor).instructions}")
        *_, last_field = get_signature(predictor).fields.values()
        print(f"p: {last_field.json_schema_extra['prefix']}")
    print("\n")


def save_candidate_program(program, log_dir, trial_num, note=None):
    """Save the candidate program to the log directory."""

    if log_dir is None:
        return None

    # Ensure the directory exists
    eval_programs_dir = os.path.join(log_dir, "evaluated_programs")
    os.makedirs(eval_programs_dir, exist_ok=True)

    # Define the save path for the program
    if note:
        save_path = os.path.join(eval_programs_dir, f"program_{trial_num}_{note}.json")
    else:
        save_path = os.path.join(eval_programs_dir, f"program_{trial_num}.json")

    # Save the program
    program.save(save_path)

    return save_path


def save_file_to_log_dir(source_file_path, log_dir):
    if log_dir is None:
        return
    """Save a file to our log directory"""
    if not os.path.exists(log_dir):
        os.makedirs(log_dir)
    destination_file_path = os.path.join(log_dir, os.path.basename(source_file_path))

    # Copy the file
    shutil.copy(source_file_path, destination_file_path)


def setup_logging(log_dir):
    """Setup logger, which will log our print statements to a txt file at our log_dir for later viewing"""
    if log_dir is None:
        return
    # Create a logger
    logger = logging.getLogger()
    logger.setLevel(logging.WARNING)

    # Create a file handler that logs debug and higher level messages
    file_handler = logging.FileHandler(f"{log_dir}/logs.txt")
    file_handler.setLevel(logging.WARNING)
    file_formatter = logging.Formatter("%(asctime)s - %(message)s")
    file_handler.setFormatter(file_formatter)
    logger.addHandler(file_handler)

    # Create a console handler with a higher log level
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.WARNING)
    console_formatter = logging.Formatter("%(message)s")
    console_handler.setFormatter(console_formatter)
    logger.addHandler(console_handler)


def get_token_usage(model) -> tuple[int, int]:
    """
    Extract total input tokens and output tokens from a model's interaction history.
    Returns (total_input_tokens, total_output_tokens).
    """
    if not hasattr(model, "history"):
        return 0, 0

    input_tokens = []
    output_tokens = []
    for interaction in model.history:
        usage = interaction.get("usage", {})
        _input_tokens = usage.get("prompt_tokens", 0)
        _output_tokens = usage.get("completion_tokens", 0)
        input_tokens.append(_input_tokens)
        output_tokens.append(_output_tokens)

    total_input_tokens = int(np.sum(input_tokens))
    total_output_tokens = int(np.sum(output_tokens))

    return total_input_tokens, total_output_tokens


def log_token_usage(trial_logs, trial_num, model_dict):
    """
    Extract total input and output tokens used by each model and log to trial_logs[trial_num]["token_usage"].
    """

    token_usage_dict = {}

    for model_name, model in model_dict.items():
        in_tokens, out_tokens = get_token_usage(model)
        token_usage_dict[model_name] = {"total_input_tokens": in_tokens, "total_output_tokens": out_tokens}

    # Store token usage info in trial logs
    trial_logs[trial_num]["token_usage"] = token_usage_dict


### OTHER UTILS ###


def get_prompt_model(prompt_model):
    if prompt_model:
        return prompt_model
    else:
        return dspy.settings.lm


def get_signature(predictor):
    assert hasattr(predictor, "signature")
    return predictor.signature


def set_signature(predictor, updated_signature):
    assert hasattr(predictor, "signature")
    predictor.signature = updated_signature


def create_n_fewshot_demo_sets(
    student,
    num_candidate_sets,
    trainset,
    max_labeled_demos,
    max_bootstrapped_demos,
    metric,
    teacher_settings,
    max_errors=None,
    max_rounds=1,
    labeled_sample=True,
    min_num_samples=1,
    metric_threshold=None,
    teacher=None,
    include_non_bootstrapped=True,
    seed=0,
    rng=None,
):
    """
    This function is copied from random_search.py, and creates fewshot examples in the same way that random search does.
    This allows us to take advantage of using the same fewshot examples when we use the same random seed in our optimizers.
    """
    max_errors = dspy.settings.max_errors if max_errors is None else max_errors
    demo_candidates = {}

    # Account for confusing way this is set up, where we add in 3 more candidate sets to the N specified
    num_candidate_sets -= 3

    # Initialize demo_candidates dictionary
    for i, _ in enumerate(student.predictors()):
        demo_candidates[i] = []

    rng = rng or random.Random(seed)

    # Go through and create each candidate set
    for seed in range(-3, num_candidate_sets):
        print(f"Bootstrapping set {seed + 4}/{num_candidate_sets + 3}")

        trainset_copy = list(trainset)

        if seed == -3 and include_non_bootstrapped:
            # zero-shot
            program2 = student.reset_copy()

        elif seed == -2 and max_labeled_demos > 0 and include_non_bootstrapped:
            # labels only
            teleprompter = LabeledFewShot(k=max_labeled_demos)
            program2 = teleprompter.compile(
                student,
                trainset=trainset_copy,
                sample=labeled_sample,
            )

        elif seed == -1:
            # unshuffled few-shot
            program = BootstrapFewShot(
                metric=metric,
                max_errors=max_errors,
                max_bootstrapped_demos=max_bootstrapped_demos,
                max_labeled_demos=max_labeled_demos,
                teacher_settings=teacher_settings,
                max_rounds=max_rounds,
            )
            program2 = program.compile(student, teacher=teacher, trainset=trainset_copy)

        else:
            # shuffled few-shot
            rng.shuffle(trainset_copy)
            size = rng.randint(min_num_samples, max_bootstrapped_demos)

            teleprompter = BootstrapFewShot(
                metric=metric,
                max_errors=max_errors,
                metric_threshold=metric_threshold,
                max_bootstrapped_demos=size,
                max_labeled_demos=max_labeled_demos,
                teacher_settings=teacher_settings,
                max_rounds=max_rounds,
            )

            program2 = teleprompter.compile(
                student,
                teacher=teacher,
                trainset=trainset_copy,
            )

        for i, _ in enumerate(student.predictors()):
            demo_candidates[i].append(program2.predictors()[i].demos)

    return demo_candidates


def old_getfile(object):
    """Work out which source or compiled file an object was defined in."""
    if inspect.ismodule(object):
        if getattr(object, "__file__", None):
            return object.__file__
        raise TypeError(f"{object!r} is a built-in module")
    if inspect.isclass(object):
        if hasattr(object, "__module__"):
            module = sys.modules.get(object.__module__)
            if getattr(module, "__file__", None):
                return module.__file__
            if object.__module__ == "__main__":
                raise OSError("source code not available")
        raise TypeError(f"{object!r} is a built-in class")
    if inspect.ismethod(object):
        object = object.__func__
    if inspect.isfunction(object):
        object = object.__code__
    if inspect.istraceback(object):
        object = object.tb_frame
    if inspect.isframe(object):
        object = object.f_code
    if inspect.iscode(object):
        return object.co_filename
    raise TypeError(
        f"module, class, method, function, traceback, frame, or code object was expected, got {type(object).__name__}"
    )


def new_getfile(object):
    if not inspect.isclass(object):
        return old_getfile(object)

    # Lookup by parent module (as in current inspect)
    if hasattr(object, "__module__"):
        object_ = sys.modules.get(object.__module__)
        if hasattr(object_, "__file__"):
            return object_.__file__

    # If parent module is __main__, lookup by methods (NEW)
    for _, member in inspect.getmembers(object):
        if inspect.isfunction(member) and object.__qualname__ + "." + member.__name__ == member.__qualname__:
            return inspect.getfile(member)
    raise TypeError(f"Source for {object!r} not found")


inspect.getfile = new_getfile

```

--------------------------------------------------------------------------------
/docs/docs/cheatsheet.md:
--------------------------------------------------------------------------------

```markdown
---
sidebar_position: 999
---

# DSPy Cheatsheet

This page will contain snippets for frequent usage patterns.

## DSPy Programs

### Forcing fresh LM outputs

DSPy caches LM calls. Provide a unique ``rollout_id`` and set a non-zero
``temperature`` (e.g., 1.0) to bypass an existing cache entry while still caching
the new result:

```python
predict = dspy.Predict("question -> answer")
predict(question="1+1", config={"rollout_id": 1, "temperature": 1.0})
```

### dspy.Signature

```python
class BasicQA(dspy.Signature):
    """Answer questions with short factoid answers."""

    question: str = dspy.InputField()
    answer: str = dspy.OutputField(desc="often between 1 and 5 words")
```

### dspy.ChainOfThought

```python
generate_answer = dspy.ChainOfThought(BasicQA)

# Call the predictor on a particular input alongside a hint.
question='What is the color of the sky?'
pred = generate_answer(question=question)
```

### dspy.ProgramOfThought

```python
pot = dspy.ProgramOfThought(BasicQA)

question = 'Sarah has 5 apples. She buys 7 more apples from the store. How many apples does Sarah have now?'
result = pot(question=question)

print(f"Question: {question}")
print(f"Final Predicted Answer (after ProgramOfThought process): {result.answer}")
```

### dspy.ReAct

```python
react_module = dspy.ReAct(BasicQA)

question = 'Sarah has 5 apples. She buys 7 more apples from the store. How many apples does Sarah have now?'
result = react_module(question=question)

print(f"Question: {question}")
print(f"Final Predicted Answer (after ReAct process): {result.answer}")
```

### dspy.Retrieve

```python
colbertv2_wiki17_abstracts = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')
dspy.settings.configure(rm=colbertv2_wiki17_abstracts)

#Define Retrieve Module
retriever = dspy.Retrieve(k=3)

query='When was the first FIFA World Cup held?'

# Call the retriever on a particular query.
topK_passages = retriever(query).passages

for idx, passage in enumerate(topK_passages):
    print(f'{idx+1}]', passage, '\n')
```

### dspy.CodeAct

```python
from dspy import CodeAct

def factorial(n):
    """Calculate factorial of n"""
    if n == 1:
        return 1
    return n * factorial(n-1)

act = CodeAct("n->factorial", tools=[factorial])
result = act(n=5)
result # Returns 120
```

### dspy.Parallel

```python
import dspy

parallel = dspy.Parallel(num_threads=2)
predict = dspy.Predict("question -> answer")
result = parallel(
    [
        (predict, dspy.Example(question="1+1").with_inputs("question")),
        (predict, dspy.Example(question="2+2").with_inputs("question"))
    ]
)
result
```

## DSPy Metrics

### Function as Metric

To create a custom metric you can create a function that returns either a number or a boolean value:

```python
def parse_integer_answer(answer, only_first_line=True):
    try:
        if only_first_line:
            answer = answer.strip().split('\n')[0]

        # find the last token that has a number in it
        answer = [token for token in answer.split() if any(c.isdigit() for c in token)][-1]
        answer = answer.split('.')[0]
        answer = ''.join([c for c in answer if c.isdigit()])
        answer = int(answer)

    except (ValueError, IndexError):
        # print(answer)
        answer = 0

    return answer

# Metric Function
def gsm8k_metric(gold, pred, trace=None) -> int:
    return int(parse_integer_answer(str(gold.answer))) == int(parse_integer_answer(str(pred.answer)))
```

### LLM as Judge

```python
class FactJudge(dspy.Signature):
    """Judge if the answer is factually correct based on the context."""

    context = dspy.InputField(desc="Context for the prediction")
    question = dspy.InputField(desc="Question to be answered")
    answer = dspy.InputField(desc="Answer for the question")
    factually_correct: bool = dspy.OutputField(desc="Is the answer factually correct based on the context?")

judge = dspy.ChainOfThought(FactJudge)

def factuality_metric(example, pred):
    factual = judge(context=example.context, question=example.question, answer=pred.answer)
    return factual.factually_correct
```

## DSPy Evaluation

```python
from dspy.evaluate import Evaluate

evaluate_program = Evaluate(devset=devset, metric=your_defined_metric, num_threads=NUM_THREADS, display_progress=True, display_table=num_rows_to_display)

evaluate_program(your_dspy_program)
```

## DSPy Optimizers

### LabeledFewShot

```python
from dspy.teleprompt import LabeledFewShot

labeled_fewshot_optimizer = LabeledFewShot(k=8)
your_dspy_program_compiled = labeled_fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset)
```

### BootstrapFewShot

```python
from dspy.teleprompt import BootstrapFewShot

fewshot_optimizer = BootstrapFewShot(metric=your_defined_metric, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=10)

your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset)
```

#### Using another LM for compilation, specifying in teacher_settings

```python
from dspy.teleprompt import BootstrapFewShot

fewshot_optimizer = BootstrapFewShot(metric=your_defined_metric, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=10, teacher_settings=dict(lm=gpt4))

your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset)
```

#### Compiling a compiled program - bootstrapping a bootstrapped program

```python
your_dspy_program_compiledx2 = teleprompter.compile(
    your_dspy_program,
    teacher=your_dspy_program_compiled,
    trainset=trainset,
)
```

#### Saving/loading a compiled program

```python
save_path = './v1.json'
your_dspy_program_compiledx2.save(save_path)
```

```python
loaded_program = YourProgramClass()
loaded_program.load(path=save_path)
```

### BootstrapFewShotWithRandomSearch

Detailed documentation on BootstrapFewShotWithRandomSearch can be found [here](api/optimizers/BootstrapFewShot.md).

```python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch

fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS)

your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset, valset=devset)

```

Other custom configurations are similar to customizing the `BootstrapFewShot` optimizer.

### Ensemble

```python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch
from dspy.teleprompt.ensemble import Ensemble

fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS)
your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset, valset=devset)

ensemble_optimizer = Ensemble(reduce_fn=dspy.majority)
programs = [x[-1] for x in your_dspy_program_compiled.candidate_programs]
your_dspy_program_compiled_ensemble = ensemble_optimizer.compile(programs[:3])
```

### BootstrapFinetune

```python
from dspy.teleprompt import BootstrapFewShotWithRandomSearch, BootstrapFinetune

#Compile program on current dspy.settings.lm
fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_threads=NUM_THREADS)
your_dspy_program_compiled = tp.compile(your_dspy_program, trainset=trainset[:some_num], valset=trainset[some_num:])

#Configure model to finetune
config = dict(target=model_to_finetune, epochs=2, bf16=True, bsize=6, accumsteps=2, lr=5e-5)

#Compile program on BootstrapFinetune
finetune_optimizer = BootstrapFinetune(metric=your_defined_metric)
finetune_program = finetune_optimizer.compile(your_dspy_program, trainset=some_new_dataset_for_finetuning_model, **config)

finetune_program = your_dspy_program

#Load program and activate model's parameters in program before evaluation
ckpt_path = "saved_checkpoint_path_from_finetuning"
LM = dspy.HFModel(checkpoint=ckpt_path, model=model_to_finetune)

for p in finetune_program.predictors():
    p.lm = LM
    p.activated = False
```

### COPRO

Detailed documentation on COPRO can be found [here](api/optimizers/COPRO.md).

```python
from dspy.teleprompt import COPRO

eval_kwargs = dict(num_threads=16, display_progress=True, display_table=0)

copro_teleprompter = COPRO(prompt_model=model_to_generate_prompts, metric=your_defined_metric, breadth=num_new_prompts_generated, depth=times_to_generate_prompts, init_temperature=prompt_generation_temperature, verbose=False)

compiled_program_optimized_signature = copro_teleprompter.compile(your_dspy_program, trainset=trainset, eval_kwargs=eval_kwargs)
```

### MIPROv2

Note: detailed documentation can be found [here](api/optimizers/MIPROv2.md). `MIPROv2` is the latest extension of `MIPRO` which includes updates such as (1) improvements to instruction proposal and (2) more efficient search with minibatching.

#### Optimizing with MIPROv2

This shows how to perform an easy out-of-the box run with `auto=light`, which configures many hyperparameters for you and performs a light optimization run. You can alternatively set `auto=medium` or `auto=heavy` to perform longer optimization runs. The more detailed `MIPROv2` documentation [here](api/optimizers/MIPROv2.md) also provides more information about how to set hyperparameters by hand.

```python
# Import the optimizer
from dspy.teleprompt import MIPROv2

# Initialize optimizer
teleprompter = MIPROv2(
    metric=gsm8k_metric,
    auto="light", # Can choose between light, medium, and heavy optimization runs
)

# Optimize program
print(f"Optimizing program with MIPRO...")
optimized_program = teleprompter.compile(
    program.deepcopy(),
    trainset=trainset,
    max_bootstrapped_demos=3,
    max_labeled_demos=4,
)

# Save optimize program for future use
optimized_program.save(f"mipro_optimized")

# Evaluate optimized program
print(f"Evaluate optimized program...")
evaluate(optimized_program, devset=devset[:])
```

#### Optimizing instructions only with MIPROv2 (0-Shot)

```python
# Import the optimizer
from dspy.teleprompt import MIPROv2

# Initialize optimizer
teleprompter = MIPROv2(
    metric=gsm8k_metric,
    auto="light", # Can choose between light, medium, and heavy optimization runs
)

# Optimize program
print(f"Optimizing program with MIPRO...")
optimized_program = teleprompter.compile(
    program.deepcopy(),
    trainset=trainset,
    max_bootstrapped_demos=0,
    max_labeled_demos=0,
)

# Save optimize program for future use
optimized_program.save(f"mipro_optimized")

# Evaluate optimized program
print(f"Evaluate optimized program...")
evaluate(optimized_program, devset=devset[:])
```

### KNNFewShot

```python
from sentence_transformers import SentenceTransformer
from dspy import Embedder
from dspy.teleprompt import KNNFewShot
from dspy import ChainOfThought

knn_optimizer = KNNFewShot(k=3, trainset=trainset, vectorizer=Embedder(SentenceTransformer("all-MiniLM-L6-v2").encode))

qa_compiled = knn_optimizer.compile(student=ChainOfThought("question -> answer"))
```

### BootstrapFewShotWithOptuna

```python
from dspy.teleprompt import BootstrapFewShotWithOptuna

fewshot_optuna_optimizer = BootstrapFewShotWithOptuna(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS)

your_dspy_program_compiled = fewshot_optuna_optimizer.compile(student=your_dspy_program, trainset=trainset, valset=devset)
```

Other custom configurations are similar to customizing the `dspy.BootstrapFewShot` optimizer.


### SIMBA

SIMBA, which stands for Stochastic Introspective Mini-Batch Ascent, is a prompt optimizer that accepts arbitrary DSPy programs and proceeds in a sequence of mini-batches seeking to make incremental improvements to the prompt instructions or few-shot examples.

```python
from dspy.teleprompt import SIMBA

simba = SIMBA(metric=your_defined_metric, max_steps=12, max_demos=10)

optimized_program = simba.compile(student=your_dspy_program, trainset=trainset)
```


## DSPy Tools and Utilities

### dspy.Tool

```python
import dspy

def search_web(query: str) -> str:
    """Search the web for information"""
    return f"Search results for: {query}"

tool = dspy.Tool(search_web)
result = tool(query="Python programming")
```

### dspy.streamify

```python
import dspy
import asyncio

predict = dspy.Predict("question->answer")

stream_predict = dspy.streamify(
    predict,
    stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")],
)

async def read_output_stream():
    output_stream = stream_predict(question="Why did a chicken cross the kitchen?")

    async for chunk in output_stream:
        print(chunk)

asyncio.run(read_output_stream())
```


### dspy.asyncify

```python
import dspy

dspy_program = dspy.ChainOfThought("question -> answer")
dspy_program = dspy.asyncify(dspy_program)

asyncio.run(dspy_program(question="What is DSPy"))
```


### Track Usage

```python
import dspy
dspy.settings.configure(track_usage=True)

result = dspy.ChainOfThought(BasicQA)(question="What is 2+2?")
print(f"Token usage: {result.get_lm_usage()}")
```

### dspy.configure_cache

```python
import dspy

# Configure cache settings
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)
```

## DSPy `Refine` and `BestofN`

>`dspy.Suggest` and `dspy.Assert` are replaced by `dspy.Refine` and `dspy.BestofN` in DSPy 2.6.

### BestofN

Runs a module up to `N` times with different rollout IDs (bypassing cache) and returns the best prediction, as defined by the `reward_fn`, or the first prediction that passes the `threshold`.

```python
import dspy

qa = dspy.ChainOfThought("question -> answer")
def one_word_answer(args, pred):
    return 1.0 if len(pred.answer) == 1 else 0.0
best_of_3 = dspy.BestOfN(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0)
best_of_3(question="What is the capital of Belgium?").answer
# Brussels
```

### Refine

Refines a module by running it up to `N` times with different rollout IDs (bypassing cache) and returns the best prediction, as defined by the `reward_fn`, or the first prediction that passes the `threshold`. After each attempt (except the final one), `Refine` automatically generates detailed feedback about the module's performance and uses this feedback as hints for subsequent runs, creating an iterative refinement process.

```python
import dspy

qa = dspy.ChainOfThought("question -> answer")
def one_word_answer(args, pred):
    return 1.0 if len(pred.answer) == 1 else 0.0
best_of_3 = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0)
best_of_3(question="What is the capital of Belgium?").answer
# Brussels
```

#### Error Handling

By default, `Refine` will try to run the module up to N times until the threshold is met. If the module encounters an error, it will keep going up to N failed attempts. You can change this behavior by setting `fail_count` to a smaller number than `N`.

```python
refine = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=1)
...
refine(question="What is the capital of Belgium?")
# If we encounter just one failed attempt, the module will raise an error.
```

If you want to run the module up to N times without any error handling, you can set `fail_count` to `N`. This is the default behavior.

```python
refine = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=3)
...
refine(question="What is the capital of Belgium?")
```

```

--------------------------------------------------------------------------------
/docs/docs/roadmap.md:
--------------------------------------------------------------------------------

```markdown
---
draft: true
---

!!! warning "This document is from Aug 2024. Since then, DSPy 2.5 and 2.6 were released, DSPy has grown considerably, and 3.0 is approaching! Content below is highly outdated."



# Roadmap Sketch: DSPy 2.5+

It’s been a year since DSPy evolved out of Demonstrate–Search–Predict (DSP), whose research started at Stanford NLP all the way back in February 2022. Thanks to 200 wonderful contributors, DSPy has introduced tens of thousands of people to building modular LM programs and optimizing their prompts and weights automatically. In this time, DSPy has grown to 160,000 monthly downloads and 16,000 stars on GitHub, becoming synonymous with prompt optimization in many circles and inspiring at least a half-dozen cool new libraries.

This document is an initial sketch of DSPy’s public roadmap for the next few weeks and months, as we work on DSPy 2.5 and plan for DSPy 3.0. Suggestions and open-source contributors are more than welcome: just open an issue or submit a pull request regarding the roadmap.



## Technical Objectives

The thesis of DSPy is that for LMs to be useful, we have to shift from ad-hoc prompting to new notions of programming LMs. Instead of relying on LMs gaining much more general or more compositional capabilities, we need to enable developers to iteratively explore their problems and build modular software that invokes LMs for well-scoped tasks. We need to enable that through modules and optimizers that isolate how they decompose their problems and describe their system's objectives from how their LMs are invoked or fine-tuned to maximize their objectives. DSPy's goal has been to develop (and to build the community and shared infrastructure for the collective development of) the abstractions, programming patterns, and optimizers toward this thesis.

To a first approximation, DSPy’s current user-facing language has the minimum number of appropriate abstractions that address the goals above: declarative signatures, define-by-run modules, and optimizers that can be composed quite powerfully. But there are several things we need to do better to realize our goals. The upcoming DSPy releases will have the following objectives.

1. Polishing the core functionality.
2. Developing more accurate, lower-cost optimizers.
3. Building end-to-end tutorials from DSPy’s ML workflow to deployment.
4. Shifting towards more interactive optimization & tracking.



## Team & Organization

DSPy is fairly unusual in its technical objectives, contributors, and audience. Though DSPy takes inspiration from PyTorch, a library for building and optimizing DNNs, there is one major difference: PyTorch was introduced well after DNNs were mature ML concepts, but DSPy seeks to establish and advance core LM Programs research: the framework is propelled by constant academic research from programming abstractions (like the original **Demonstrate–Search–Predict** concepts, DSPy **Signatures**, or **LM Assertions**) to NLP systems (like **STORM**, **PATH**, and **IReRa**) to prompt optimizers (like **MIPRO**) and RL (like **BetterTogether**), among many other related directions.

This research all composes into a concrete, practical library, thanks to dozens of industry contributors, many of whom are deploying apps in production using DSPy. Because of this, DSPy reaches not only of grad students and ML engineers, but also many non-ML engineers, from early adopter SWEs to hobbyists exploring new ways of using LMs. The following team, with help from many folks in the OSS community, is working towards the objectives in this Roadmap.

**Project Lead:** Omar Khattab (Stanford & Databricks)

**Project Mentors:** Chris Potts (Stanford), Matei Zaharia (UC Berkeley & Databricks), Heather Miller (CMU & Two Sigma)

**Core Library:** Arnav Singhvi (Databricks & Stanford), Herumb Shandilya (Stanford), Hanna Moazam (Databricks), Sri Vardhamanan (Dashworks), Cyrus Nouroozi (Zenbase), Amir Mehr (Zenbase), Kyle Caverly (Modular), with special thanks to Keshav Santhanam (Stanford), Thomas Ahle (Normal Computing), Connor Shorten (Weaviate)

**Prompt Optimization:** Krista Opsahl-Ong (Stanford), Michael Ryan (Stanford), Josh Purtell (Basis), with special thanks to Eric Zhang (Stanford)

**Finetuning & RL:** Dilara Soylu (Stanford), Isaac Miller (Anyscale), Karel D'Oosterlinck (Ghent), with special thanks to Paridhi Masehswari (Stanford)

**PL Abstractions:** Shangyin Tan (UC Berkeley), Manish Shetty (UC Berkeley), Peter Zhong (CMU)

**Applications:** Jasper Xian (Waterloo), Saron Samuel (Stanford), Alberto Mancarella (Stanford), Faraz Khoubsirat (Waterloo), Saiful Haq (IIT-B), Ashutosh Sharma (UIUC)



## 1) Polishing the core functionality.

Over the next month, polishing is the main objective and likely the one to have the highest ROI on the experience of the average user. Conceptually, DSPy has an extremely small core. It’s nothing but (1) LMs, (2) Signatures & Modules, (3) Optimizers, and (4) Assertions. These concepts and their implementations evolved organically over the past couple of years. We are working now to consolidate what we’ve learned and refactor internally so that things “just work” out of the box for new users, who may not know all the tips-and-tricks just yet.

More concretely:

1. We want to increase the quality of zero-shot, off-the-shelf DSPy programs, i.e. those not yet compiled on custom data.
2. Wherever possible, DSPy should delegate lower-level internal complexity (like managing LMs and structured generation) to emerging lower-level libraries. When required, we may fork smaller libraries out of DSPy to support infrastructure pieces as their own projects.
3. DSPy should internally be more modular and we need higher compatibility between internal components. Specifically, we need more deeper and more native investment in (i) typed multi-field constraints, (ii) assertions, (iii) observability and experimental tracking, (iv) deployment of artifacts and related concerns like streaming and async, and (v) fine-tuning and serving open models.


### On LMs

As of DSPy 2.4, the library has approximately 20,000 lines of code and roughly another 10,000 lines of code for tests, examples, and documentation. Some of these are clearly necessary (e.g., DSPy optimizers) but others exist only because the LM space lacks the building blocks we need under the hood. Luckily, for LM interfaces, a very strong library now exists: LiteLLM, a library that unifies interfaces to various LM and embedding providers. We expect to reduce around 6000 LoCs of support for custom LMs and retrieval models by shifting a lot of that to LiteLLM.

Objectives in this space include improved caching, saving/loading of LMs, support for streaming and async LM requests. Work here is currently led by Hanna Moazam and Sri Vardhamanan, building on a foundation by Cyrus Nouroozi, Amir Mehr, Kyle Caverly, and others.


### On Signatures & Modules

Traditionally, LMs offer text-in-text-out interfaces. Toward modular programming, DSPy introduced signatures for the first time (as DSP Templates in Jan 2023) as a way to structure the inputs and outputs of LM interactions. Standard prompts conflate interface (“what should the LM do?”) with implementation (“how do we tell it to do that?”). DSPy signatures isolate the former so we can infer and learn the latter from data — in the context of a bigger program. Today in the LM landscape, notions of "structured outputs" have evolved dramatically, thanks to constrained decoding and other improvements, and have become mainstream. What may be called "structured inputs" remains is yet to become mainstream outside of DSPy, but is as crucial.

Objectives in this space include refining the abstractions and implementations first-class notion of LM Adapters in DSPy, as translators that sits between signatures and LM interfaces. While Optimizers adjust prompts through interactions with a user-supplied metric and data, Adapters are more concerned with building up interactions with LMs to account for, e.g. (i) non-plaintext LM interfaces like chat APIs, structured outputs, function calling, and multi-modal APIs, (ii) languages beyond English or other forms of higher-level specialization. This has been explored in DSPy on and off in various forms, but we have started working on more fundamental approaches to this problem that will offer tangible improvements to most use-cases. Work here is currently led by Omar Khattab.


### On Finetuning & Serving

In February 2023, DSPy introduced the notion of compiling to optimize the weights of an LM program. (To understand just how long ago that was in AI terms, this was before the Alpaca training project at Stanford had even started and a month before the first GPT-4 was released.) Since then, we have shown in October 2023 and, much more expansively, in July 2024, that the fine-tuning flavor of DSPy can deliver large gains for small LMs, especially when composed with prompt optimization.

Overall, though, most DSPy users in practice explore prompt optimization and not weight optimization and most of our examples do the same. The primary reason for a lot of this is infrastructure. Fine-tuning in the DSPy flavor is more than just training a model: ultimately, we need to bootstrap training data for several different modules in a program, train multiple models and handle model selection, and then load and plug in those models into the program's modules. Doing this robustly at the level of abstraction DSPy offers requires a level of resource management that is not generally supported by external existing tools. Major efforts in this regard are currently led by Dilara Soylu and Isaac Miller.


### On Optimizers & Assertions

This is a naturally major direction in the course of polishing. We will share more thoughts here after making more progress on the three angles above.



## 2) Developing more accurate, lower-cost optimizers.

A very large fraction of the research in DSPy focuses on optimizing the prompts and the weights of LM programs. In December 2022, we introduced the algorithm and abstractions behind BootstrapFewShot (as Demonstrate in DSP) and several of its variants. In February 2023, we introduced the core version of what later became BootstrapFinetune. In August 2023, we introduced new variations of both of these. In December 2023, we introduced the first couple of instruction optimizers into DSPy, CA-OPRO and early versions of MIPRO. These were again upgraded in March 2024. Fast forward to June and July 2024, we released MIPROv2 for prompt optimization and BetterTogether for fine-tuning the weights of LM programs.

We have been working towards a number of stronger optimizers. While we cannot share the internal details of research on new optimizers yet, we can outline the goals. A DSPy optimizer can be characterized via three angles:

1. Quality: How much quality can it deliver from various LMs? How effective does it need the zero-shot program to be in order to work well?
2. Cost: How many labeled (and unlabeled) inputs does it need? How many invocations of the program does it need? How expensive is the resulting optimized program at inference time?
3. Robustness: How well can it generalize to different unseen data points or distributions? How sensitive is it to mistakes of the metric or labels?

Over the next six months, our goal is to dramatically improve each angle of these _when the other two are held constant_.  Concretely, there are three directions here.

- Benchmarking: A key prerequisite here is work on benchmarking. On the team, Michael Ryan and Shangyin Tan are leading these efforts. More soon.

- Quality: The goal here is optimizers that extract, on average, 20% more on representative tasks than MIPROv2 and BetterTogether, under the usual conditions — like a few hundred inputs with labels and a good metric starting from a decent zero-shot program. Various efforts here are led by Dilara Soylu, Michael Ryan, Josh Purtell, Krista Opsahl-Ong, and Isaac Miller.

- Efficiency: The goal here is optimizers that match the current best scores from MIPROv2 and BetterTogether but under 1-2 challenges like: (i) starting from only 10-20 inputs with labels, (ii) starting with a weak zero-shot program that scores 0%, (iii) where significant misalignment exists between train/validation and test, or (iii) where the user supplies no metric but provides a very small number of output judgments.



## 3) Building end-to-end tutorials from DSPy’s ML workflow to deployment.

Using DSPy well for solving a new task is just doing good machine learning with LMs, but teaching this is hard. On the one hand, it's an iterative process: you make some initial choices, which will be sub-optimal, and then you refine them incrementally. It's highly exploratory: it's often the case that no one knows yet how to best solve a problem in a DSPy-esque way. One the other hand, DSPy offers many emerging lessons from several years of building LM systems, in which the design space, the data regime, and many other factors are new both to ML experts and to the very large fraction of users that have no ML experience.

Though current docs do address [a bunch of this](learn/index.md) in isolated ways, one thing we've learned is that we should separate teaching the core DSPy language (which is ultimately pretty small) from teaching the emerging ML workflow that works well in a DSPy-esque setting. As a natural extension of this, we need to place more emphasis on steps prior and after to the explicit coding in DSPy, from data collection to deployment that serves and monitors the optimized DSPy program in practice. This is just starting but efforts will be ramping up led by Omar Khattab, Isaac Miller, and Herumb Shandilya.


## 4) Shifting towards more interactive optimization & tracking.

Right now, a DSPy user has a few ways to observe and tweak the process of optimization. They can study the prompts before, during, and after optimization methods like `inspect_history`, built-in logging, and/or the metadata returned by optimizers. Similarly, they can rely on `program.save` and `program.load` to potentially adjust the optimized prompts by hand. Alternatively, they can use one of the many powerful observability integrations — like from Phoenix Arize, LangWatch, or Weights & Biases Weave — to observe _in real time_ the process of optimization (e.g., scores, stack traces, successful & failed traces, and candidate prompts). DSPy encourages iterative engineering by adjusting the program, data, or metrics across optimization runs. For example, some optimizers allow “checkpointing” — e.g., if you optimize with BootstrapFewShotWithRandomSearch for 10 iterations then increase to 15 iterations, the first 10 will be loaded from cache.

While these can accomplish a lot of goals, there are two limitations that future versions of DSPy will seek to address.

1. In general, DSPy’s (i) observability, (ii) experimental tracking, (iii) cost management, and (iii) deployment of programs should become first-class concerns via integration with tools like MLFlow. We will share more plans addressing this for DSPy 2.6 in the next 1-2 months.

2. DSPy 3.0 will introduce new optimizers that prioritize ad-hoc, human-in-the-loop feedback. This is perhaps the only substantial paradigm shift we see as necessary in the foreseeable future in DSPy. It involves various research questions at the level of the abstractions, UI/HCI, and ML, so it is a longer-term goal that we will share more about in the next 3-4 month.



```

--------------------------------------------------------------------------------
/dspy/clients/lm_local.py:
--------------------------------------------------------------------------------

```python
import datetime
import logging
import random
import socket
import string
import subprocess
import threading
import time
from typing import TYPE_CHECKING, Any

import requests

from dspy.clients.provider import Provider, TrainingJob
from dspy.clients.utils_finetune import TrainDataFormat, save_data

if TYPE_CHECKING:
    from dspy.clients.lm import LM

logger = logging.getLogger(__name__)


class LocalProvider(Provider):
    def __init__(self):
        super().__init__()
        self.finetunable = True
        self.TrainingJob = TrainingJob

    @staticmethod
    def launch(lm: "LM", launch_kwargs: dict[str, Any] | None = None):
        try:
            import sglang  # noqa: F401
        except ImportError:
            raise ImportError(
                "For local model launching, please install sglang."
                "Navigate to https://docs.sglang.ai/start/install.html for the latest installation instructions!"
            )

        if hasattr(lm, "process"):
            logger.info("Server is already launched.")
            return

        launch_kwargs = launch_kwargs or lm.launch_kwargs

        import os

        model = lm.model
        if model.startswith("openai/"):
            model = model[7:]
        if model.startswith("local:"):
            model = model[6:]
        if model.startswith("huggingface/"):
            model = model[len("huggingface/") :]

        logger.info(f"Grabbing a free port to launch an SGLang server for model {model}")
        logger.info(f"We see that CUDA_VISIBLE_DEVICES is {os.environ.get('CUDA_VISIBLE_DEVICES', 'unset')}")
        port = get_free_port()
        timeout = launch_kwargs.get("timeout", 1800)
        command = f"python -m sglang.launch_server --model-path {model} --port {port} --host 0.0.0.0"

        # We will manually stream & capture logs.
        process = subprocess.Popen(
            command.replace("\\\n", " ").replace("\\", " ").split(),
            text=True,
            stdout=subprocess.PIPE,  # We'll read from pipe
            stderr=subprocess.STDOUT,  # Merge stderr into stdout
        )

        # A threading.Event to control printing after the server is ready.
        # This will store *all* lines (both before and after readiness).
        logger.info(f"SGLang server process started with PID {process.pid}.")
        stop_printing_event = threading.Event()
        logs_buffer = []

        def _tail_process(proc, buffer, stop_event):
            while True:
                line = proc.stdout.readline()
                if not line and proc.poll() is not None:
                    # Process ended and no new line
                    break
                if line:
                    buffer.append(line)
                    # Print only if stop_event is not set
                    if not stop_event.is_set():
                        print(line, end="")

        # Start a background thread to read from the process continuously
        thread = threading.Thread(
            target=_tail_process,
            args=(process, logs_buffer, stop_printing_event),
            daemon=True,
        )
        thread.start()

        # Wait until the server is ready (or times out)
        base_url = f"http://localhost:{port}"
        try:
            wait_for_server(base_url, timeout=timeout)
        except TimeoutError:
            # If the server doesn't come up, we might want to kill it:
            process.kill()
            raise

        # Once server is ready, we tell the thread to stop printing further lines.
        stop_printing_event.set()

        # A convenience getter so the caller can see all logs so far (and future).
        def get_logs() -> str:
            # Join them all into a single string, or you might return a list
            return "".join(logs_buffer)

        # Let the user know server is up
        logger.info(f"Server ready on random port {port}! Logs are available via lm.get_logs() method on returned lm.")

        lm.kwargs["api_base"] = f"http://localhost:{port}/v1"
        lm.kwargs["api_key"] = "local"
        lm.get_logs = get_logs
        lm.process = process
        lm.thread = thread

    @staticmethod
    def kill(lm: "LM", launch_kwargs: dict[str, Any] | None = None):
        from sglang.utils import terminate_process

        if not hasattr(lm, "process"):
            logger.info("No running server to kill.")
            return
        # Ideally, the following happens atomically
        terminate_process(lm.process)
        lm.thread.join()
        del lm.process
        del lm.thread
        del lm.get_logs
        logger.info("Server killed.")

    @staticmethod
    def finetune(
        job: TrainingJob,
        model: str,
        train_data: list[dict[str, Any]],
        train_data_format: TrainDataFormat | None,
        train_kwargs: dict[str, Any] | None = None,
    ) -> str:
        if model.startswith("openai/"):
            model = model[7:]
        if model.startswith("local:"):
            model = model[6:]

        if train_data_format != TrainDataFormat.CHAT:
            raise ValueError("Only chat models are supported for local finetuning.")

        data_path = save_data(train_data)
        logger.info(f"Train data saved to {data_path}")
        output_dir = create_output_dir(model, data_path)

        default_train_kwargs = {
            "device": None,
            "use_peft": False,
            "num_train_epochs": 5,
            "per_device_train_batch_size": 1,
            "gradient_accumulation_steps": 8,
            "learning_rate": 1e-5,
            "max_seq_length": None,
            "packing": True,
            "bf16": True,
            "output_dir": output_dir,
        }
        train_kwargs = {**default_train_kwargs, **(train_kwargs or {})}
        output_dir = train_kwargs["output_dir"]  # user might have changed the output_dir

        logger.info(f"Starting local training, will save to {output_dir}")
        train_sft_locally(
            model_name=model,
            train_data=train_data,
            train_kwargs=train_kwargs,
        )

        logger.info("Training complete")
        return f"openai/local:{output_dir}"


def create_output_dir(model_name, data_path):
    model_str = model_name.replace("/", "-")
    time_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    rnd_str = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
    model_identifier = f"{rnd_str}_{model_str}_{time_str}"
    output_dir = data_path.replace(".jsonl", "_" + model_identifier)
    return output_dir


def train_sft_locally(model_name, train_data, train_kwargs):
    try:
        import torch
        from transformers import AutoModelForCausalLM, AutoTokenizer
        from trl import SFTConfig, SFTTrainer, setup_chat_format
    except ImportError:
        raise ImportError(
            "For local finetuning, please install torch, transformers, and trl "
            "by running `pip install -U torch transformers accelerate trl peft`"
        )

    device = train_kwargs.get("device", None)
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
    logger.info(f"Using device: {device}")

    model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_name).to(device)
    tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_name)

    # Set up the chat format; generally only for non-chat model variants, hence the try-except.
    try:
        model, tokenizer = setup_chat_format(model=model, tokenizer=tokenizer)
    except Exception:
        pass

    if tokenizer.pad_token_id is None:
        logger.info("Adding pad token to tokenizer")
        tokenizer.add_special_tokens({"pad_token": "[!#PAD#!]"})

    logger.info("Creating dataset")
    if "max_seq_length" not in train_kwargs:
        train_kwargs["max_seq_length"] = 4096
        logger.info(
            f"The 'train_kwargs' parameter didn't include a 'max_seq_length', defaulting to {train_kwargs['max_seq_length']}"
        )

    from datasets import Dataset

    hf_dataset = Dataset.from_list(train_data)

    def tokenize_function(example):
        return encode_sft_example(example, tokenizer, train_kwargs["max_seq_length"])  # noqa: F821

    tokenized_dataset = hf_dataset.map(tokenize_function, batched=False)
    tokenized_dataset.set_format(type="torch")
    tokenized_dataset = tokenized_dataset.filter(lambda example: (example["labels"] != -100).any())

    use_peft = train_kwargs.get("use_peft", False)
    peft_config = None

    if use_peft:
        from peft import LoraConfig

        rank_dimension = 32
        lora_alpha = 64
        lora_dropout = 0.05

        peft_config = LoraConfig(
            r=rank_dimension,
            lora_alpha=lora_alpha,
            lora_dropout=lora_dropout,
            bias="none",
            target_modules="all-linear",
            task_type="CAUSAL_LM",
        )

    sft_config = SFTConfig(
        output_dir=train_kwargs["output_dir"],
        num_train_epochs=train_kwargs["num_train_epochs"],
        per_device_train_batch_size=train_kwargs["per_device_train_batch_size"],
        gradient_accumulation_steps=train_kwargs["gradient_accumulation_steps"],
        learning_rate=train_kwargs["learning_rate"],
        max_grad_norm=2.0,  # note that the current SFTConfig default is 1.0
        logging_steps=20,
        warmup_ratio=0.03,
        lr_scheduler_type="constant",
        save_steps=10_000,
        bf16=train_kwargs["bf16"],
        max_seq_length=train_kwargs["max_seq_length"],
        packing=train_kwargs["packing"],
        dataset_kwargs={  # We need to pass dataset_kwargs because we are processing the dataset ourselves
            "add_special_tokens": False,  # Special tokens handled by template
            "append_concat_token": False,  # No additional separator needed
        },
    )

    logger.info("Starting training")
    trainer = SFTTrainer(
        model=model,
        args=sft_config,
        train_dataset=tokenized_dataset,
        peft_config=peft_config,
    )

    # Train!
    trainer.train()

    # Save the model!
    trainer.save_model()

    merge = True
    if use_peft and merge:
        from peft import AutoPeftModelForCausalLM

        # Load PEFT model on CPU
        model_ = AutoPeftModelForCausalLM.from_pretrained(
            pretrained_model_name_or_path=sft_config.output_dir,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
        )

        merged_model = model_.merge_and_unload()
        merged_model.save_pretrained(sft_config.output_dir, safe_serialization=True, max_shard_size="5GB")

    # Clean up!
    import gc

    del model
    del tokenizer
    del trainer
    gc.collect()
    torch.cuda.empty_cache()

    return sft_config.output_dir


def get_free_port() -> int:
    """
    Return a free TCP port on localhost.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("localhost", 0))
        return s.getsockname()[1]


def wait_for_server(base_url: str, timeout: int | None = None) -> None:
    """
    Wait for the server to be ready by polling the /v1/models endpoint.

    Args:
        base_url: The base URL of the server (e.g. http://localhost:1234)
        timeout: Maximum time to wait in seconds. None means wait forever.
    """
    start_time = time.time()
    while True:
        try:
            response = requests.get(
                f"{base_url}/v1/models",
                headers={"Authorization": "Bearer None"},
            )
            if response.status_code == 200:
                # A small extra sleep to ensure server is fully up.
                time.sleep(5)
                break

            if timeout and (time.time() - start_time) > timeout:
                raise TimeoutError("Server did not become ready within timeout period")
        except requests.exceptions.RequestException:
            # Server not up yet, wait and retry
            time.sleep(1)


def encode_sft_example(example, tokenizer, max_seq_length):
    """
    This function encodes a single example into a format that can be used for sft training.
    Here, we assume each example has a 'messages' field. Each message in it is a dict with 'role' and 'content' fields.
    We use the `apply_chat_template` function from the tokenizer to tokenize the messages and prepare the input and label tensors.

    Code obtained from the allenai/open-instruct repository: https://github.com/allenai/open-instruct/blob/4365dea3d1a6111e8b2712af06b22a4512a0df88/open_instruct/finetune.py
    """
    import torch

    messages = example["messages"]
    if len(messages) == 0:
        raise ValueError("messages field is empty.")
    input_ids = tokenizer.apply_chat_template(
        conversation=messages,
        tokenize=True,
        return_tensors="pt",
        padding=False,
        truncation=True,
        max_length=max_seq_length,
        add_generation_prompt=False,
    )
    labels = input_ids.clone()
    # mask the non-assistant part for avoiding loss
    for message_idx, message in enumerate(messages):
        if message["role"] != "assistant":
            # we calculate the start index of this non-assistant message
            if message_idx == 0:
                message_start_idx = 0
            else:
                message_start_idx = tokenizer.apply_chat_template(
                    conversation=messages[:message_idx],  # here marks the end of the previous messages
                    tokenize=True,
                    return_tensors="pt",
                    padding=False,
                    truncation=True,
                    max_length=max_seq_length,
                    add_generation_prompt=False,
                ).shape[1]
            # next, we calculate the end index of this non-assistant message
            if message_idx < len(messages) - 1 and messages[message_idx + 1]["role"] == "assistant":
                # for intermediate messages that follow with an assistant message, we need to
                # set `add_generation_prompt=True` to avoid the assistant generation prefix being included in the loss
                # (e.g., `<|assistant|>`)
                message_end_idx = tokenizer.apply_chat_template(
                    conversation=messages[: message_idx + 1],
                    tokenize=True,
                    return_tensors="pt",
                    padding=False,
                    truncation=True,
                    max_length=max_seq_length,
                    add_generation_prompt=True,
                ).shape[1]
            else:
                # for the last message or the message that doesn't follow with an assistant message,
                # we don't need to add the assistant generation prefix
                message_end_idx = tokenizer.apply_chat_template(
                    conversation=messages[: message_idx + 1],
                    tokenize=True,
                    return_tensors="pt",
                    padding=False,
                    truncation=True,
                    max_length=max_seq_length,
                    add_generation_prompt=False,
                ).shape[1]
            # set the label to -100 for the non-assistant part
            labels[:, message_start_idx:message_end_idx] = -100
            if max_seq_length and message_end_idx >= max_seq_length:
                break
    attention_mask = torch.ones_like(input_ids)
    return {
        "input_ids": input_ids.flatten(),
        "labels": labels.flatten(),
        "attention_mask": attention_mask.flatten(),
    }

```

--------------------------------------------------------------------------------
/dspy/streaming/streaming_listener.py:
--------------------------------------------------------------------------------

```python
import re
from collections import defaultdict
from queue import Queue
from typing import TYPE_CHECKING, Any

from litellm import ModelResponseStream

from dspy.adapters.chat_adapter import ChatAdapter
from dspy.adapters.json_adapter import JSONAdapter
from dspy.adapters.types import Type
from dspy.adapters.xml_adapter import XMLAdapter
from dspy.dsp.utils.settings import settings
from dspy.streaming.messages import StreamResponse

if TYPE_CHECKING:
    from dspy.primitives.module import Module

ADAPTER_SUPPORT_STREAMING = [ChatAdapter, XMLAdapter, JSONAdapter]


class StreamListener:
    """Class that listens to the stream to capture the streeaming of a specific output field of a predictor."""

    def __init__(
        self,
        signature_field_name: str,
        predict: Any = None,
        predict_name: str | None = None,
        allow_reuse: bool = False,
    ):
        """
        Args:
            signature_field_name: The name of the field to listen to.
            predict: The predictor to listen to. If None, when calling `streamify()` it will automatically look for
                the predictor that has the `signature_field_name` in its signature.
            predict_name: The name of the predictor to listen to. If None, when calling `streamify()` it will
                automatically look for the predictor that has the `signature_field_name` in its signature.
            allow_reuse: If True, the stream listener can be reused for multiple streams. Please note that this could
                hurt the performance because the same stream chunk is sent to multiple listeners.
        """
        self.signature_field_name = signature_field_name
        self.predict = predict
        self.predict_name = predict_name

        self.field_start_queue = []
        self.field_end_queue = Queue()
        self.stream_start = False
        self.stream_end = False
        self.cache_hit = False
        self.allow_reuse = allow_reuse

        self.adapter_identifiers = {
            "ChatAdapter": {
                "start_identifier": f"[[ ## {self.signature_field_name} ## ]]",
                "end_identifier": re.compile(r"\[\[ ## (\w+) ## \]\]"),
                "start_indicator": "[",
                "end_pattern_prefixes": ["[", "[[", "[[ ", "[[ #", "[[ ##"],
                "end_pattern_contains": "[[ ##",
            },
            "JSONAdapter": {
                "start_identifier": f'"{self.signature_field_name}":',
                "end_identifier": re.compile(r"\w*\"(,|\s*})"),
                "start_indicator": '"',
                "end_pattern_prefixes": ['"', '",', '" ', '"}'],
                "end_pattern_contains": None,
            },
            "XMLAdapter": {
                "start_identifier": f"<{self.signature_field_name}>",
                "end_identifier": re.compile(rf"</{self.signature_field_name}>"),
                "start_indicator": "<",
                "end_pattern_prefixes": ["<", "</"],
                "end_pattern_contains": "</",  # Any closing tag start
            },
        }

    def _buffered_message_end_with_start_identifier(self, concat_message: str, start_identifier: str) -> str:
        for i in range(len(concat_message)):
            if start_identifier.startswith(concat_message[len(concat_message) - i - 1 :]):
                return True
        return False

    def _could_form_end_identifier(self, concat_message: str, adapter_name: str) -> bool:
        """Check if the buffered message could potentially form the end identifier.

        This prevents unnecessary buffering when the tokens clearly cannot form the end pattern.
        For example, if buffered message is "hello world" and end pattern is "[[ ## ... ## ]]",
        we know it cannot form the pattern, so we should yield immediately.

        Args:
            concat_message: The concatenated buffered message
            adapter_name: The name of the adapter being used

        Returns:
            True if the message could potentially form part of the end identifier
        """
        adapter_config = self.adapter_identifiers[adapter_name]
        end_pattern_prefixes = adapter_config.get("end_pattern_prefixes", [])
        end_pattern_contains = adapter_config.get("end_pattern_contains")

        # First check: does it end with a potential start of the pattern?
        if any(concat_message.endswith(prefix) for prefix in end_pattern_prefixes):
            return True

        # Second check: if there's a pattern marker, check if message contains it
        # This handles cases like "[[ ## com" where we have partial field name
        if end_pattern_contains and end_pattern_contains in concat_message:
            return True

        return False

    def receive(self, chunk: ModelResponseStream):
        adapter_name = settings.adapter.__class__.__name__ if settings.adapter else "ChatAdapter"
        if adapter_name not in self.adapter_identifiers:
            raise ValueError(
                f"Unsupported adapter for streaming: {adapter_name}, please use one of the following adapters: "
                f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}"
            )
        start_identifier = self.adapter_identifiers[adapter_name]["start_identifier"]
        end_identifier = self.adapter_identifiers[adapter_name]["end_identifier"]
        start_indicator = self.adapter_identifiers[adapter_name]["start_indicator"]

        if self.stream_end:
            if self.allow_reuse:
                # Clear up the state for the next stream.
                self.stream_end = False
                self.cache_hit = False
                self.field_start_queue = []
                self.field_end_queue = Queue()
                self.stream_start = False
            else:
                return

        try:
            chunk_message = chunk.choices[0].delta.content
            if chunk_message is None:
                return
        except Exception:
            return

        # Handle custom streamable types
        if self._output_type and issubclass(self._output_type, Type) and self._output_type.is_streamable():
            if parsed_chunk := self._output_type.parse_stream_chunk(chunk):
                return StreamResponse(
                    self.predict_name,
                    self.signature_field_name,
                    parsed_chunk,
                    is_last_chunk=self.stream_end,
                )

        if chunk_message and start_identifier in chunk_message:
            # If the cache is hit, the chunk_message could be the full response. When it happens we can
            # directly end the stream listening. In some models like gemini, each stream chunk can be multiple
            # tokens, so it's possible that response only has one chunk, we also fall back to this logic.
            message_after_start_identifier = chunk_message[
                chunk_message.find(start_identifier) + len(start_identifier) :
            ]
            if re.search(end_identifier, message_after_start_identifier):
                self.cache_hit = True
                self.stream_start = True
                self.stream_end = True
                return

        if len(self.field_start_queue) == 0 and not self.stream_start and start_indicator in chunk_message:
            # We look for the pattern of start_identifier, i.e., "[[ ## {self.signature_field_name} ## ]]" for
            # ChatAdapter to identify the start of the stream of our target field. Once the start_indicator, i.e., "[["
            # for ChatAdapter, is found, we start checking the next tokens
            self.field_start_queue.append(chunk_message)
            return

        if len(self.field_start_queue) > 0 and not self.stream_start:
            # We keep appending the tokens to the queue until we have a full identifier or the concanated
            # tokens no longer match our expected identifier.
            self.field_start_queue.append(chunk_message)
            concat_message = "".join(self.field_start_queue)

            if start_identifier in concat_message:
                # We have a full identifier, we can start the stream.
                self.stream_start = True
                self.field_start_queue = []
                # Keep the part after the start_identifier from the concat_message, we need to write it to the buffer.
                value_start_index = concat_message.find(start_identifier) + len(start_identifier)
                chunk_message = concat_message[value_start_index:].lstrip()
                if isinstance(settings.adapter, JSONAdapter) and chunk_message.startswith('"'):
                    # For JSONAdapter, we need to remove the leading ". We cannot do this with the start_identifier
                    # because there could be a few splitters between ':' and '"', e.g., '"name": "value"'.
                    chunk_message = chunk_message[1:]

            elif self._buffered_message_end_with_start_identifier(concat_message.strip(), start_identifier):
                # If the buffered message ends with part of the start_identifier, we keep looking for the
                # start_identifier from the token stream.
                return
            else:
                # Doesn't match the expected identifier, reset the queue.
                self.field_start_queue = []
                return

        if self.stream_start and chunk_message:
            # The stream is started, we keep returning the token until we see the start of the next field.
            token = None
            self.field_end_queue.put(chunk_message)

            concat_message = "".join(self.field_end_queue.queue).strip()
            if re.search(end_identifier, concat_message):
                # The next field is identified, we can end the stream and flush out all tokens in the buffer.
                self.stream_end = True
                token = self.flush()
                token = token.rstrip()  # Remove the trailing \n\n
            elif not self._could_form_end_identifier(concat_message, adapter_name):
                # Buffer cannot form end identifier, safe to flush out the tokens in the buffer.
                token = self.flush()
            elif self.field_end_queue.qsize() > 10:
                # Buffer could form end identifier, but we've exceeded max buffer size
                # Yield the oldest token to prevent unbounded buffering
                token = self.field_end_queue.get()

            if token:
                return StreamResponse(
                    self.predict_name,
                    self.signature_field_name,
                    token,
                    is_last_chunk=self.stream_end,
                )

    def flush(self) -> str:
        """Flush all tokens in the field end queue.

        This method is called to flush out the last a few tokens when the stream is ended. These tokens
        are in the buffer because we don't directly yield the tokens received by the stream listener
        with the purpose to not yield the end_identifier tokens, e.g., "[[ ## ... ## ]]" for ChatAdapter.
        """
        last_tokens = "".join(self.field_end_queue.queue)
        self.field_end_queue = Queue()
        if isinstance(settings.adapter, JSONAdapter):
            match = re.search(r'",|"\s*}', last_tokens)
            if match:
                boundary_index = match.start()
            else:
                boundary_index = len(last_tokens)
            return last_tokens[:boundary_index]
        elif isinstance(settings.adapter, XMLAdapter):
            boundary_index = last_tokens.find(f"</{self.signature_field_name}>")
            if boundary_index == -1:
                boundary_index = len(last_tokens)
            return last_tokens[:boundary_index]
        elif isinstance(settings.adapter, ChatAdapter) or settings.adapter is None:
            boundary_index = last_tokens.find("[[")
            if boundary_index == -1:
                boundary_index = len(last_tokens)
            return last_tokens[:boundary_index]
        else:
            raise ValueError(
                f"Unsupported adapter for streaming: {settings.adapter}, please use one of the following adapters: "
                f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}"
            )

    def finalize(self) -> StreamResponse | None:
        """Finalize the stream and flush any remaining buffered tokens.

        This should be called when the stream ends.
        It ensures no tokens are lost from the buffer and marks the final chunk appropriately.

        Returns:
            A StreamResponse with the remaining buffered tokens and is_last_chunk=True,
            or None if there are no buffered tokens or the stream hasn't started.
        """
        if self.stream_end or not self.stream_start:
            # Stream already ended or never started, nothing to finalize
            return None

        self.stream_end = True
        if self.field_end_queue.qsize() > 0:
            token = self.flush()
            if token:
                return StreamResponse(
                    self.predict_name,
                    self.signature_field_name,
                    token,
                    is_last_chunk=True,
                )
        return None

    @property
    def _output_type(self) -> type | None:
        try:
            return self.predict.signature.output_fields[self.signature_field_name].annotation
        except Exception:
            return None



def find_predictor_for_stream_listeners(program: "Module", stream_listeners: list[StreamListener]) -> dict[int, list[StreamListener]]:
    """Find the predictor for each stream listener.

    This is a utility function to automatically find the predictor for each stream listener. It is used when some
    listeners don't specify the predictor they want to listen to. If a listener's `signature_field_name` is not
    unique in the program, this function will raise an error.
    """
    predictors = program.named_predictors()

    field_name_to_named_predictor = {}
    for listener in stream_listeners:
        if listener.predict:
            continue
        field_name_to_named_predictor[listener.signature_field_name] = None

    for name, predictor in predictors:
        for field_name, field_info in predictor.signature.output_fields.items():
            if field_name not in field_name_to_named_predictor:
                continue

            if field_name_to_named_predictor[field_name] is not None:
                raise ValueError(
                    f"Signature field {field_name} is not unique in the program, cannot automatically determine which "
                    "predictor to use for streaming. Please specify the predictor to listen to."
                )

            if not _is_streamable(field_info.annotation):
                raise ValueError(
                    f"Stream listener can only be applied to string or subclass of `dspy.Type` that has `is_streamable() == True`, "
                    f"but your field {field_name} is of type {field_info.annotation}."
                )

            field_name_to_named_predictor[field_name] = (name, predictor)

    predict_id_to_listener = defaultdict(list)
    for listener in stream_listeners:
        if listener.predict:
            predict_id_to_listener[id(listener.predict)].append(listener)
            continue
        if listener.signature_field_name not in field_name_to_named_predictor:
            raise ValueError(
                f"Signature field {listener.signature_field_name} is not a field of any predictor in the program, "
                "cannot automatically determine which predictor to use for streaming. Please verify your field name or "
                "specify the predictor to listen to."
            )
        listener.predict_name, listener.predict = field_name_to_named_predictor[listener.signature_field_name]
        predict_id_to_listener[id(listener.predict)].append(listener)
    return predict_id_to_listener

def _is_streamable(field_type: type | None) -> bool:
    if field_type is None:
        return False
    if field_type is str:
        return True
    if issubclass(field_type, Type):
        return field_type.is_streamable()
    return False

```

--------------------------------------------------------------------------------
/dspy/teleprompt/simba.py:
--------------------------------------------------------------------------------

```python
from __future__ import annotations

import logging
import random
from typing import Any, Callable

import numpy as np

import dspy
from dspy.teleprompt.simba_utils import append_a_demo, append_a_rule, prepare_models_for_resampling, wrap_program
from dspy.teleprompt.teleprompt import Teleprompter

logger = logging.getLogger(__name__)


class SIMBA(Teleprompter):
    """
    SIMBA (Stochastic Introspective Mini-Batch Ascent) optimizer for DSPy.
    
    SIMBA is a DSPy optimizer that uses the LLM to analyze its own performance and 
    generate improvement rules. It samples mini-batches, identifies challenging examples 
    with high output variability, then either creates self-reflective rules or adds 
    successful examples as demonstrations.
    
    For more details, see: https://dspy.ai/api/optimizers/SIMBA/
    """

    def __init__(
        self,
        *,
        metric: Callable[[dspy.Example, dict[str, Any]], float],
        bsize: int = 32,
        num_candidates: int = 6,
        max_steps: int = 8,
        max_demos: int = 4,
        prompt_model: dspy.LM | None = None,
        teacher_settings: dict | None = None,
        demo_input_field_maxlen: int = 100_000,
        num_threads: int | None = None,
        temperature_for_sampling: float = 0.2,
        temperature_for_candidates: float = 0.2,
    ) -> None:
        """
        Initializes SIMBA.

        Args:
            metric: A function that takes an Example and a prediction_dict
                as input and returns a float.
            bsize: Mini-batch size. Defaults to 32.
            num_candidates: Number of new candidate programs to produce
                per iteration. Defaults to 6.
            max_steps: Number of optimization steps to run. Defaults to 8.
            max_demos: Maximum number of demos a predictor can hold
                before dropping some. Defaults to 4.
            prompt_model: The model to use to evolve the program. When `prompt_model is None`, the globally configured
                lm is used.
            teacher_settings: Settings for the teacher model. Defaults to None.
            demo_input_field_maxlen: Maximum number of characters to keep
                in an input field when building a new demo. Defaults to 100,000.
            num_threads: Number of threads for parallel execution.
                Defaults to None.
            temperature_for_sampling: Temperature used for picking
                programs during the trajectory-sampling step. Defaults to 0.2.
            temperature_for_candidates: Temperature used for picking
                the source program for building new candidates. Defaults to 0.2.
        """
        self.metric = metric
        self.bsize = bsize
        self.num_candidates = num_candidates
        self.max_steps = max_steps
        self.max_demos = max_demos
        self.prompt_model = prompt_model or dspy.settings.lm
        self.teacher_settings = teacher_settings
        self.demo_input_field_maxlen = demo_input_field_maxlen
        self.num_threads = num_threads

        self.temperature_for_sampling = temperature_for_sampling
        self.temperature_for_candidates = temperature_for_candidates

        if self.max_demos > 0:
            self.strategies = [append_a_demo(demo_input_field_maxlen), append_a_rule]
        else:
            self.strategies = [append_a_rule]

    def compile(
        self,
        student: dspy.Module,
        *,
        trainset: list[dspy.Example],
        seed: int = 0
    ) -> dspy.Module:
        """
        Compile and optimize the student module using SIMBA.
        
        Args:
            student: The module to optimize
            trainset: Training examples for optimization
            seed: Random seed for reproducibility
            
        Returns:
            The optimized module with candidate_programs and trial_logs attached
        """
        # Basic checks
        assert len(trainset) >= self.bsize, f"Trainset too small: {len(trainset)} < {self.bsize}"

        # Initialize RNG
        rng = random.Random(seed)
        rng_np = np.random.default_rng(seed)

        programs = []
        program_scores = {}
        next_program_idx = 0

        # Helper functions
        def calc_average_score(prog_idx: int) -> float:
            scores = program_scores.get(prog_idx, [])
            if not scores:
                return 0.0
            return sum(scores) / len(scores)

        def top_k_plus_baseline(k: int) -> list[int]:
            # Sort all programs by descending average score
            scored_programs = sorted(programs, key=lambda p: calc_average_score(p.simba_idx), reverse=True)
            top_k = [p.simba_idx for p in scored_programs[:k]]
            # Ensure baseline=0 is in there:
            if 0 not in top_k and len(top_k) > 0:
                top_k[-1] = 0
            return list(dict.fromkeys(top_k))

        def softmax_sample(rng_obj: random.Random, program_idxs: list[int], temperature: float) -> int:
            if not program_idxs:
                raise ValueError("No programs available for softmax sampling.")

            # Unnormalized weights
            scores = [calc_average_score(idx) for idx in program_idxs]
            exps = [np.exp(s / temperature) for s in scores]
            sum_exps = sum(exps)
            if sum_exps <= 0:
                # Fallback: uniform if all exps are zero
                return rng_obj.choice(program_idxs)

            # Weighted random choice
            probs = [val / sum_exps for val in exps]
            return rng_obj.choices(program_idxs, weights=probs, k=1)[0]

        def register_new_program(prog: dspy.Module, score_list: list[float]) -> None:
            nonlocal next_program_idx
            next_program_idx += 1
            new_idx = next_program_idx
            prog.simba_idx = new_idx
            programs.append(prog)
            program_scores[new_idx] = score_list

        # Initialize the baseline program: index=0
        student = student.deepcopy()
        student.simba_idx = 0
        programs.append(student)
        program_scores[0] = []

        winning_programs = [student]

        # Data shuffling
        data_indices = list(range(len(trainset)))
        rng.shuffle(data_indices)
        instance_idx = 0

        # Parallel runner
        run_parallel = dspy.Parallel(access_examples=False, num_threads=self.num_threads)

        trial_logs = {}
        for batch_idx in range(self.max_steps):
            trial_logs[batch_idx] = {}

            logger.info(f"Starting batch {batch_idx+1} of {self.max_steps}.")

            # STEP 1: Get next batch
            if instance_idx + self.bsize > len(trainset):
                rng.shuffle(data_indices)
                instance_idx = 0

            batch_indices = data_indices[instance_idx : instance_idx + self.bsize]
            batch = [trainset[i] for i in batch_indices]
            instance_idx += self.bsize

            # We'll generate (program, model) pairs for the trajectory sampling.
            # Prepare distinct LMs (with different temperatures, etc.) from the baseline=programs[0].
            models = prepare_models_for_resampling(programs[0], self.num_candidates, self.teacher_settings)
            top_programs = top_k_plus_baseline(self.num_candidates)

            exec_pairs = []
            predictor2name = {}

            # For each model, for each example, pick a program from the pool via softmax
            for model in models:
                for example in batch:
                    chosen_prog_idx = softmax_sample(rng, top_programs, self.temperature_for_sampling)
                    candidate_system = programs[chosen_prog_idx].deepcopy()
                    candidate_system.set_lm(model)

                    for name, predictor in candidate_system.named_predictors():
                        predictor2name[id(predictor)] = name

                    # Use the special wrap that includes the 'example' in the output
                    wrapped_candidate_system = wrap_program(candidate_system, self.metric)
                    exec_pairs.append((wrapped_candidate_system, example))

            # STEP 2: Execute
            logger.info(f"Sampling program trajectories on {self.bsize} examples x {self.num_candidates} samples.")
            outputs = run_parallel(exec_pairs)
            assert len(outputs) == len(exec_pairs) == self.bsize * self.num_candidates

            # STEP 3: Sort the training buckets by (max-to-min gap, max score, and max-to-avg gap).
            buckets = []
            largest_max_to_avg_gap = float("-inf")
            batch_10th_percentile_score = np.percentile([float(o["score"]) for o in outputs], 10)
            batch_90th_percentile_score = np.percentile([float(o["score"]) for o in outputs], 90)

            # We'll chunk `outputs` by example index, each chunk has length = num_candidates
            for idx, _ in enumerate(batch):
                # gather all results for this example
                bucket = [outputs[i] for i in range(idx, len(outputs), self.bsize)]
                bucket.sort(key=lambda x: x["score"], reverse=True)

                max_score = float(bucket[0]["score"])
                min_score = float(bucket[-1]["score"])
                avg_score = sum(x["score"] for x in bucket) / len(bucket)
                max_to_min_gap = max_score - min_score
                max_to_avg_gap = max_score - avg_score
                if max_to_avg_gap > largest_max_to_avg_gap:
                    largest_max_to_avg_gap = max_to_avg_gap

                buckets.append((bucket, (max_to_min_gap, max_score, max_to_avg_gap)))

            # sort the buckets
            buckets.sort(key=lambda x: x[1], reverse=True)

            # Baseline for the batch is just the average of all runs
            all_scores_in_this_batch = [o["score"] for o in outputs]
            baseline_score = sum(all_scores_in_this_batch) / len(all_scores_in_this_batch)
            logger.info(f"Batch {batch_idx+1}: Baseline mini-batch score: {baseline_score}\n")

            # STEP 4: Build new candidate programs by applying a strategy to some top buckets.
            system_candidates = []
            for bucket_idx, (bucket, bucket_stats) in enumerate(buckets):
                max_to_min_gap, max_score, max_to_avg_gap = bucket_stats
                logger.info(
                    f"Batch {batch_idx+1}: Processing bucket #{bucket_idx+1}, with max score {max_score}, "
                    f"max-to-min gap {max_to_min_gap}, and max-to-avg gap {max_to_avg_gap}."
                )

                # pick source program
                src_prog_idx = softmax_sample(
                    rng, top_k_plus_baseline(self.num_candidates), self.temperature_for_candidates
                )
                system_candidate = programs[src_prog_idx].deepcopy()

                # Drop some demos from each predictor
                name2predictor = {}
                num_demos_list = []

                max_demos_tmp = self.max_demos if self.max_demos > 0 else 3

                for name, predictor in system_candidate.named_predictors():
                    name2predictor[name] = predictor
                    num_demos_list.append(len(predictor.demos))

                num_demos = max(num_demos_list) if num_demos_list else 0
                num_demos_to_drop = max(rng_np.poisson(num_demos / max_demos_tmp), int(num_demos >= max_demos_tmp))
                num_demos_to_drop = min(num_demos_to_drop, num_demos)
                demos_to_drop = [rng.randrange(num_demos) for _ in range(num_demos_to_drop)]

                for _, predictor in name2predictor.items():
                    predictor.demos = [demo for idxd, demo in enumerate(predictor.demos) if idxd not in demos_to_drop]

                # Pick a strategy
                strategy = rng.choice(self.strategies)
                logger.info(
                    f"Batch {batch_idx+1}: Invoking strategy: {strategy.__name__}"
                    + (f", having dropped {num_demos_to_drop} demos per predictor" if num_demos_to_drop else "")
                )

                try:
                    strategy(
                        bucket,
                        system_candidate,
                        predictor2name=predictor2name,
                        name2predictor=name2predictor,
                        batch_10p_score=batch_10th_percentile_score,
                        batch_90p_score=batch_90th_percentile_score,
                        prompt_model=self.prompt_model,
                    )
                except Exception as e:
                    logger.error(f"Strategy failed with error: {e}")
                    continue

                system_candidates.append(system_candidate)
                logger.info("\n")

                if len(system_candidates) >= self.num_candidates + 1:
                    break

            # STEP 5: Evaluate these new system_candidates on the same mini-batch
            logger.info(f"Batch {batch_idx+1}: Evaluating {len(system_candidates)} programs on {self.bsize} examples.")

            exec_pairs = [(wrap_program(sys, self.metric), ex) for sys in system_candidates for ex in batch]
            outputs = run_parallel(exec_pairs)
            assert len(outputs) == len(exec_pairs) == len(system_candidates) * self.bsize

            # STEP 6: Compute average mini-batch scores for each new candidate
            candidate_scores = []
            for idx_cand, _ in enumerate(system_candidates):
                start = idx_cand * self.bsize
                end = (idx_cand + 1) * self.bsize
                sys_scores = [outputs[i]["score"] for i in range(start, end)]
                avg_sys_score = sum(sys_scores) / len(sys_scores)
                candidate_scores.append(avg_sys_score)

            logger.info(
                f"Scores after {batch_idx+1} batches: {candidate_scores}, "
                f"Best: {max(candidate_scores) if candidate_scores else 'N/A'}\n"
            )

            # STEP 7: Select the best among these new ones for "winning" record
            if candidate_scores:
                best_idx_among_candidates = candidate_scores.index(max(candidate_scores))
                best_program = system_candidates[best_idx_among_candidates]
                winning_programs.append(best_program.deepcopy())

            # STEP 8: Register all new candidate systems in our global pool
            for idx_cand, cand_sys in enumerate(system_candidates):
                start = idx_cand * self.bsize
                end = (idx_cand + 1) * self.bsize
                sys_scores = [outputs[i]["score"] for i in range(start, end)]
                register_new_program(cand_sys, sys_scores)

        M = len(winning_programs) - 1  # noqa: N806
        N = self.num_candidates + 1  # noqa: N806
        if M < 1:
            program_idxs = [0] * N
        else:
            program_idxs = [round(i * M / (N - 1)) for i in range(N)]

        program_idxs = list(dict.fromkeys(program_idxs))

        candidate_programs = [winning_programs[i].deepcopy() for i in program_idxs]
        logger.info(f"VALIDATION: Evaluating {len(candidate_programs)} programs on the full trainset.")
        exec_pairs = [(wrap_program(sys, self.metric), ex) for sys in candidate_programs for ex in trainset]
        outputs = run_parallel(exec_pairs)

        scores = []
        for idx_prog, _ in enumerate(candidate_programs):
            start = idx_prog * len(trainset)
            end = (idx_prog + 1) * len(trainset)
            sys_scores = [outputs[i]["score"] for i in range(start, end)]
            avg_score = sum(sys_scores) / len(sys_scores) if sys_scores else 0.0
            scores.append(avg_score)
            if idx_prog != 0:
                trial_logs[idx_prog - 1]["train_score"] = avg_score

        # Build sorted list of {"score", "program"} dicts
        assert len(scores) == len(candidate_programs)
        candidate_data = [{"score": s, "program": p} for s, p in zip(scores, candidate_programs, strict=False)]
        candidate_data.sort(key=lambda x: x["score"], reverse=True)

        best_idx = scores.index(max(scores)) if scores else 0
        best_program = candidate_programs[best_idx].deepcopy()
        logger.info(
            f"Final trainset scores: {scores}, Best: {max(scores) if scores else 'N/A'} "
            f"(at index {best_idx if scores else 'N/A'})\n\n\n"
        )

        # Attach sorted, scored candidates & logs
        best_program.candidate_programs = candidate_data
        best_program.trial_logs = trial_logs

        return best_program

```

--------------------------------------------------------------------------------
/dspy/adapters/types/tool.py:
--------------------------------------------------------------------------------

```python
import asyncio
import inspect
from typing import TYPE_CHECKING, Any, Callable, get_origin, get_type_hints

import pydantic
from jsonschema import ValidationError, validate
from pydantic import BaseModel, TypeAdapter, create_model

from dspy.adapters.types.base_type import Type
from dspy.dsp.utils.settings import settings
from dspy.utils.callback import with_callbacks

if TYPE_CHECKING:
    import mcp
    from langchain.tools import BaseTool

_TYPE_MAPPING = {"string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict}


class Tool(Type):
    """Tool class.

    This class is used to simplify the creation of tools for tool calling (function calling) in LLMs. Only supports
    functions for now.
    """

    func: Callable
    name: str | None = None
    desc: str | None = None
    args: dict[str, Any] | None = None
    arg_types: dict[str, Any] | None = None
    arg_desc: dict[str, str] | None = None
    has_kwargs: bool = False

    def __init__(
        self,
        func: Callable,
        name: str | None = None,
        desc: str | None = None,
        args: dict[str, Any] | None = None,
        arg_types: dict[str, Any] | None = None,
        arg_desc: dict[str, str] | None = None,
    ):
        """Initialize the Tool class.

        Users can choose to specify the `name`, `desc`, `args`, and `arg_types`, or let the `dspy.Tool`
        automatically infer the values from the function. For values that are specified by the user, automatic inference
        will not be performed on them.

        Args:
            func (Callable): The actual function that is being wrapped by the tool.
            name (Optional[str], optional): The name of the tool. Defaults to None.
            desc (Optional[str], optional): The description of the tool. Defaults to None.
            args (Optional[dict[str, Any]], optional): The args and their schema of the tool, represented as a
                dictionary from arg name to arg's json schema. Defaults to None.
            arg_types (Optional[dict[str, Any]], optional): The argument types of the tool, represented as a dictionary
                from arg name to the type of the argument. Defaults to None.
            arg_desc (Optional[dict[str, str]], optional): Descriptions for each arg, represented as a
                dictionary from arg name to description string. Defaults to None.

        Example:

        ```python
        def foo(x: int, y: str = "hello"):
            return str(x) + y

        tool = Tool(foo)
        print(tool.args)
        # Expected output: {'x': {'type': 'integer'}, 'y': {'type': 'string', 'default': 'hello'}}
        ```
        """
        super().__init__(func=func, name=name, desc=desc, args=args, arg_types=arg_types, arg_desc=arg_desc)
        self._parse_function(func, arg_desc)

    def _parse_function(self, func: Callable, arg_desc: dict[str, str] | None = None):
        """Helper method that parses a function to extract the name, description, and args.

        This is a helper function that automatically infers the name, description, and args of the tool from the
        provided function. In order to make the inference work, the function must have valid type hints.
        """
        annotations_func = func if inspect.isfunction(func) or inspect.ismethod(func) else func.__call__
        name = getattr(func, "__name__", type(func).__name__)
        desc = getattr(func, "__doc__", None) or getattr(annotations_func, "__doc__", "")
        args = {}
        arg_types = {}

        # Use inspect.signature to get all arg names
        sig = inspect.signature(annotations_func)
        # Get available type hints
        available_hints = get_type_hints(annotations_func)
        # Build a dictionary of arg name -> type (defaulting to Any when missing)
        hints = {param_name: available_hints.get(param_name, Any) for param_name in sig.parameters.keys()}
        default_values = {param_name: sig.parameters[param_name].default for param_name in sig.parameters.keys()}

        # Process each argument's type to generate its JSON schema.
        for k, v in hints.items():
            arg_types[k] = v
            if k == "return":
                continue
            # Check if the type (or its origin) is a subclass of Pydantic's BaseModel
            origin = get_origin(v) or v
            if isinstance(origin, type) and issubclass(origin, BaseModel):
                # Get json schema, and replace $ref with the actual schema
                v_json_schema = _resolve_json_schema_reference(v.model_json_schema())
                args[k] = v_json_schema
            else:
                args[k] = _resolve_json_schema_reference(TypeAdapter(v).json_schema())
            if default_values[k] is not inspect.Parameter.empty:
                args[k]["default"] = default_values[k]
            if arg_desc and k in arg_desc:
                args[k]["description"] = arg_desc[k]

        self.name = self.name or name
        self.desc = self.desc or desc
        self.args = self.args if self.args is not None else args
        self.arg_types = self.arg_types if self.arg_types is not None else arg_types
        self.has_kwargs = any(param.kind == param.VAR_KEYWORD for param in sig.parameters.values())

    def _validate_and_parse_args(self, **kwargs):
        # Validate the args value comply to the json schema.
        for k, v in kwargs.items():
            if k not in self.args:
                if self.has_kwargs:
                    continue
                else:
                    raise ValueError(f"Arg {k} is not in the tool's args.")
            try:
                instance = v.model_dump() if hasattr(v, "model_dump") else v
                type_str = self.args[k].get("type")
                if type_str is not None and type_str != "Any":
                    validate(instance=instance, schema=self.args[k])
            except ValidationError as e:
                raise ValueError(f"Arg {k} is invalid: {e.message}")

        # Parse the args to the correct type.
        parsed_kwargs = {}
        for k, v in kwargs.items():
            if k in self.arg_types and self.arg_types[k] != Any:
                # Create a pydantic model wrapper with a dummy field `value` to parse the arg to the correct type.
                # This is specifically useful for handling nested Pydantic models like `list[list[MyPydanticModel]]`
                pydantic_wrapper = create_model("Wrapper", value=(self.arg_types[k], ...))
                parsed = pydantic_wrapper.model_validate({"value": v})
                parsed_kwargs[k] = parsed.value
            else:
                parsed_kwargs[k] = v
        return parsed_kwargs

    def format(self):
        return str(self)

    def format_as_litellm_function_call(self):
        return {
            "type": "function",
            "function": {
                "name": self.name,
                "description": self.desc,
                "parameters": {
                    "type": "object",
                    "properties": self.args,
                    "required": list(self.args.keys()),
                },
            },
        }

    def _run_async_in_sync(self, coroutine):
        try:
            loop = asyncio.get_running_loop()
        except RuntimeError:
            return asyncio.run(coroutine)

        return loop.run_until_complete(coroutine)

    @with_callbacks
    def __call__(self, **kwargs):
        parsed_kwargs = self._validate_and_parse_args(**kwargs)
        result = self.func(**parsed_kwargs)
        if asyncio.iscoroutine(result):
            if settings.allow_tool_async_sync_conversion:
                return self._run_async_in_sync(result)
            else:
                raise ValueError(
                    "You are calling `__call__` on an async tool, please use `acall` instead or set "
                    "`allow_async=True` to run the async tool in sync mode."
                )
        return result

    @with_callbacks
    async def acall(self, **kwargs):
        parsed_kwargs = self._validate_and_parse_args(**kwargs)
        result = self.func(**parsed_kwargs)
        if asyncio.iscoroutine(result):
            return await result
        else:
            # We should allow calling a sync tool in the async path.
            return result

    @classmethod
    def from_mcp_tool(cls, session: "mcp.ClientSession", tool: "mcp.types.Tool") -> "Tool":
        """
        Build a DSPy tool from an MCP tool and a ClientSession.

        Args:
            session: The MCP session to use.
            tool: The MCP tool to convert.

        Returns:
            A Tool object.
        """
        from dspy.utils.mcp import convert_mcp_tool

        return convert_mcp_tool(session, tool)

    @classmethod
    def from_langchain(cls, tool: "BaseTool") -> "Tool":
        """
        Build a DSPy tool from a LangChain tool.

        Args:
            tool: The LangChain tool to convert.

        Returns:
            A Tool object.

        Example:

        ```python
        import asyncio
        import dspy
        from langchain.tools import tool as lc_tool

        @lc_tool
        def add(x: int, y: int):
            "Add two numbers together."
            return x + y

        dspy_tool = dspy.Tool.from_langchain(add)

        async def run_tool():
            return await dspy_tool.acall(x=1, y=2)

        print(asyncio.run(run_tool()))
        # 3
        ```
        """
        from dspy.utils.langchain_tool import convert_langchain_tool

        return convert_langchain_tool(tool)

    def __repr__(self):
        return f"Tool(name={self.name}, desc={self.desc}, args={self.args})"

    def __str__(self):
        desc = f", whose description is <desc>{self.desc}</desc>.".replace("\n", "  ") if self.desc else "."
        arg_desc = f"It takes arguments {self.args}."
        return f"{self.name}{desc} {arg_desc}"


class ToolCalls(Type):
    class ToolCall(Type):
        name: str
        args: dict[str, Any]

        def format(self):
            return {
                "type": "function",
                "function": {
                    "name": self.name,
                    "arguments": self.args,
                },
            }

        def execute(self, functions: dict[str, Any] | list[Tool] | None = None) -> Any:
            """Execute this individual tool call and return its result.

            Args:
                functions: Functions to search for the tool. Can be:
                          - Dict mapping tool names to functions: {"tool_name": function}
                          - List of Tool objects: [Tool(function), ...]
                          - None: Will search in caller's locals and globals (automatic lookup)

            Returns:
                The result from executing this tool call.

            Raises:
                ValueError: If the tool function cannot be found.
                Exception: Any exception raised by the tool function.
            """
            func = None

            if functions is None:
                # Automatic lookup in caller's globals and locals
                frame = inspect.currentframe().f_back
                try:
                    caller_globals = frame.f_globals
                    caller_locals = frame.f_locals
                    func = caller_locals.get(self.name) or caller_globals.get(self.name)
                finally:
                    del frame

            elif isinstance(functions, dict):
                func = functions.get(self.name)
            elif isinstance(functions, list):
                for tool in functions:
                    if tool.name == self.name:
                        func = tool.func
                        break

            if func is None:
                raise ValueError(f"Tool function '{self.name}' not found. Please pass the tool functions to the `execute` method.")

            try:
                args = self.args or {}
                return func(**args)
            except Exception as e:
                raise RuntimeError(f"Error executing tool '{self.name}': {e}") from e

    tool_calls: list[ToolCall]

    @classmethod
    def from_dict_list(cls, tool_calls_dicts: list[dict[str, Any]]) -> "ToolCalls":
        """Convert a list of dictionaries to a ToolCalls instance.

        Args:
            dict_list: A list of dictionaries, where each dictionary should have 'name' and 'args' keys.

        Returns:
            A ToolCalls instance.

        Example:

            ```python
            tool_calls_dict = [
                {"name": "search", "args": {"query": "hello"}},
                {"name": "translate", "args": {"text": "world"}}
            ]
            tool_calls = ToolCalls.from_dict_list(tool_calls_dict)
            ```
        """
        tool_calls = [cls.ToolCall(**item) for item in tool_calls_dicts]
        return cls(tool_calls=tool_calls)

    @classmethod
    def description(cls) -> str:
        return (
            "Tool calls information, including the name of the tools and the arguments to be passed to it. "
            "Arguments must be provided in JSON format."
        )

    def format(self) -> list[dict[str, Any]]:
        # The tool_call field is compatible with OpenAI's tool calls schema.
        return {
            "tool_calls": [tool_call.format() for tool_call in self.tool_calls],
        }

    @pydantic.model_validator(mode="before")
    @classmethod
    def validate_input(cls, data: Any):
        if isinstance(data, cls):
            return data

        # Handle case where data is a list of dicts with "name" and "args" keys
        if isinstance(data, list) and all(
            isinstance(item, dict) and "name" in item and "args" in item for item in data
        ):
            return {"tool_calls": [cls.ToolCall(**item) for item in data]}
        # Handle case where data is a dict
        elif isinstance(data, dict):
            if "tool_calls" in data:
                # Handle case where data is a dict with "tool_calls" key
                tool_calls_data = data["tool_calls"]
                if isinstance(tool_calls_data, list):
                    return {
                        "tool_calls": [
                            cls.ToolCall(**item) if isinstance(item, dict) else item for item in tool_calls_data
                        ]
                    }
            elif "name" in data and "args" in data:
                # Handle case where data is a dict with "name" and "args" keys
                return {"tool_calls": [cls.ToolCall(**data)]}

        raise ValueError(f"Received invalid value for `dspy.ToolCalls`: {data}")


def _resolve_json_schema_reference(schema: dict) -> dict:
    """Recursively resolve json model schema, expanding all references."""

    # If there are no definitions to resolve, return the main schema
    if "$defs" not in schema and "definitions" not in schema:
        return schema

    def resolve_refs(obj: Any) -> Any:
        if not isinstance(obj, (dict, list)):
            return obj
        if isinstance(obj, dict):
            if "$ref" in obj:
                ref_path = obj["$ref"].split("/")[-1]
                return resolve_refs(schema["$defs"][ref_path])
            return {k: resolve_refs(v) for k, v in obj.items()}

        # Must be a list
        return [resolve_refs(item) for item in obj]

    # Resolve all references in the main schema
    resolved_schema = resolve_refs(schema)
    # Remove the $defs key as it's no longer needed
    resolved_schema.pop("$defs", None)
    return resolved_schema


def convert_input_schema_to_tool_args(
    schema: dict[str, Any],
) -> tuple[dict[str, Any], dict[str, Type], dict[str, str]]:
    """Convert an input json schema to tool arguments compatible with DSPy Tool.

    Args:
        schema: An input json schema describing the tool's input parameters

    Returns:
        A tuple of (args, arg_types, arg_desc) for DSPy Tool definition.
    """
    args, arg_types, arg_desc = {}, {}, {}
    properties = schema.get("properties", None)
    if properties is None:
        return args, arg_types, arg_desc

    required = schema.get("required", [])

    defs = schema.get("$defs", {})

    for name, prop in properties.items():
        if len(defs) > 0:
            prop = _resolve_json_schema_reference({"$defs": defs, **prop})
        args[name] = prop
        arg_types[name] = _TYPE_MAPPING.get(prop.get("type"), Any)
        arg_desc[name] = prop.get("description", "No description provided.")
        if name in required:
            arg_desc[name] += " (Required)"

    return args, arg_types, arg_desc

```
Page 8/14FirstPrevNextLast