This is page 8 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /dspy/clients/databricks.py: -------------------------------------------------------------------------------- ```python import logging import os import re import time from typing import TYPE_CHECKING, Any import orjson import requests from dspy.clients.provider import Provider, TrainingJob from dspy.clients.utils_finetune import TrainDataFormat, get_finetune_directory if TYPE_CHECKING: from databricks.sdk import WorkspaceClient logger = logging.getLogger(__name__) class TrainingJobDatabricks(TrainingJob): def __init__(self, finetuning_run=None, *args, **kwargs): super().__init__(*args, **kwargs) self.finetuning_run = finetuning_run self.launch_started = False self.launch_completed = False self.endpoint_name = None def status(self): if not self.finetuning_run: return None try: from databricks.model_training import foundation_model as fm except ImportError: raise ImportError( "To use Databricks finetuning, please install the databricks_genai package via " "`pip install databricks_genai`." ) run = fm.get(self.finetuning_run) return run.status class DatabricksProvider(Provider): finetunable = True TrainingJob = TrainingJobDatabricks @staticmethod def is_provider_model(model: str) -> bool: # We don't automatically infer Databricks models because Databricks is not a proprietary model provider. return False @staticmethod def deploy_finetuned_model( model: str, data_format: TrainDataFormat | None = None, databricks_host: str | None = None, databricks_token: str | None = None, deploy_timeout: int = 900, ): workspace_client = _get_workspace_client() model_version = next(workspace_client.model_versions.list(model)).version # Allow users to override the host and token. This is useful on Databricks hosted runtime. databricks_host = databricks_host or workspace_client.config.host databricks_token = databricks_token or workspace_client.config.token headers = {"Context-Type": "text/json", "Authorization": f"Bearer {databricks_token}"} optimizable_info = requests.get( url=f"{databricks_host}/api/2.0/serving-endpoints/get-model-optimization-info/{model}/{model_version}", headers=headers, ).json() if "optimizable" not in optimizable_info or not optimizable_info["optimizable"]: raise ValueError(f"Model is not eligible for provisioned throughput: {optimizable_info}") chunk_size = optimizable_info["throughput_chunk_size"] # Minimum desired provisioned throughput min_provisioned_throughput = 0 # Maximum desired provisioned throughput max_provisioned_throughput = chunk_size # Databricks serving endpoint names cannot contain ".". model_name = model.replace(".", "_") get_endpoint_response = requests.get( url=f"{databricks_host}/api/2.0/serving-endpoints/{model_name}", json={"name": model_name}, headers=headers ) if get_endpoint_response.status_code == 200: logger.info(f"Serving endpoint {model_name} already exists, updating it instead of creating a new one.") # The serving endpoint already exists, we will update it instead of creating a new one. data = { "served_entities": [ { "name": model_name, "entity_name": model, "entity_version": model_version, "min_provisioned_throughput": min_provisioned_throughput, "max_provisioned_throughput": max_provisioned_throughput, } ] } response = requests.put( url=f"{databricks_host}/api/2.0/serving-endpoints/{model_name}/config", json=data, headers=headers, ) else: logger.info(f"Creating serving endpoint {model_name} on Databricks model serving!") # Send the POST request to create the serving endpoint. data = { "name": model_name, "config": { "served_entities": [ { "name": model_name, "entity_name": model, "entity_version": model_version, "min_provisioned_throughput": min_provisioned_throughput, "max_provisioned_throughput": max_provisioned_throughput, } ] }, } response = requests.post(url=f"{databricks_host}/api/2.0/serving-endpoints", json=data, headers=headers) if response.status_code == 200: logger.info( f"Successfully started creating/updating serving endpoint {model_name} on Databricks model serving!" ) else: raise ValueError(f"Failed to create serving endpoint: {response.json()}.") logger.info( f"Waiting for serving endpoint {model_name} to be ready, this might take a few minutes... You can check " f"the status of the endpoint at {databricks_host}/ml/endpoints/{model_name}" ) from openai import OpenAI client = OpenAI( api_key=databricks_token, base_url=f"{databricks_host}/serving-endpoints", ) # Wait for the deployment to be ready. num_retries = deploy_timeout // 60 for _ in range(num_retries): try: if data_format == TrainDataFormat.CHAT: client.chat.completions.create( messages=[{"role": "user", "content": "hi"}], model=model_name, max_tokens=1 ) elif data_format == TrainDataFormat.COMPLETION: client.completions.create(prompt="hi", model=model_name, max_tokens=1) logger.info(f"Databricks model serving endpoint {model_name} is ready!") return except Exception: time.sleep(60) raise ValueError( f"Failed to create serving endpoint {model_name} on Databricks model serving platform within " f"{deploy_timeout} seconds." ) @staticmethod def finetune( job: TrainingJobDatabricks, model: str, train_data: list[dict[str, Any]], train_data_format: TrainDataFormat | str | None = "chat", train_kwargs: dict[str, Any] | None = None, ) -> str: if isinstance(train_data_format, str): if train_data_format == "chat": train_data_format = TrainDataFormat.CHAT elif train_data_format == "completion": train_data_format = TrainDataFormat.COMPLETION else: raise ValueError( f"String `train_data_format` must be one of 'chat' or 'completion', but received: {train_data_format}." ) if "train_data_path" not in train_kwargs: raise ValueError("The `train_data_path` must be provided to finetune on Databricks.") # Add the file name to the directory path. train_kwargs["train_data_path"] = DatabricksProvider.upload_data( train_data, train_kwargs["train_data_path"], train_data_format ) try: from databricks.model_training import foundation_model as fm except ImportError: raise ImportError( "To use Databricks finetuning, please install the databricks_genai package via " "`pip install databricks_genai`." ) if "register_to" not in train_kwargs: raise ValueError("The `register_to` must be provided to finetune on Databricks.") # Allow users to override the host and token. This is useful on Databricks hosted runtime. databricks_host = train_kwargs.pop("databricks_host", None) databricks_token = train_kwargs.pop("databricks_token", None) skip_deploy = train_kwargs.pop("skip_deploy", False) deploy_timeout = train_kwargs.pop("deploy_timeout", 900) logger.info("Starting finetuning on Databricks... this might take a few minutes to finish.") finetuning_run = fm.create( model=model, **train_kwargs, ) job.run = finetuning_run # Wait for the finetuning run to be ready. while True: job.run = fm.get(job.run) if job.run.status.display_name == "Completed": logger.info("Finetuning run completed successfully!") break elif job.run.status.display_name == "Failed": raise ValueError( f"Finetuning run failed with status: {job.run.status.display_name}. Please check the Databricks " f"workspace for more details. Finetuning job's metadata: {job.run}." ) else: time.sleep(60) if skip_deploy: return None job.launch_started = True model_to_deploy = train_kwargs.get("register_to") job.endpoint_name = model_to_deploy.replace(".", "_") DatabricksProvider.deploy_finetuned_model( model_to_deploy, train_data_format, databricks_host, databricks_token, deploy_timeout ) job.launch_completed = True # The finetuned model name should be in the format: "databricks/<endpoint_name>". return f"databricks/{job.endpoint_name}" @staticmethod def upload_data(train_data: list[dict[str, Any]], databricks_unity_catalog_path: str, data_format: TrainDataFormat): logger.info("Uploading finetuning data to Databricks Unity Catalog...") file_path = _save_data_to_local_file(train_data, data_format) w = _get_workspace_client() _create_directory_in_databricks_unity_catalog(w, databricks_unity_catalog_path) try: with open(file_path, "rb") as f: target_path = os.path.join(databricks_unity_catalog_path, os.path.basename(file_path)) w.files.upload(target_path, f, overwrite=True) logger.info("Successfully uploaded finetuning data to Databricks Unity Catalog!") return target_path except Exception as e: raise ValueError(f"Failed to upload finetuning data to Databricks Unity Catalog: {e}") def _get_workspace_client() -> "WorkspaceClient": try: from databricks.sdk import WorkspaceClient except ImportError: raise ImportError( "To use Databricks finetuning, please install the databricks-sdk package via `pip install databricks-sdk`." ) return WorkspaceClient() def _create_directory_in_databricks_unity_catalog(w: "WorkspaceClient", databricks_unity_catalog_path: str): pattern = r"^/Volumes/(?P<catalog>[^/]+)/(?P<schema>[^/]+)/(?P<volume>[^/]+)(/[^/]+)+$" match = re.match(pattern, databricks_unity_catalog_path) if not match: raise ValueError( f"Databricks Unity Catalog path must be in the format '/Volumes/<catalog>/<schema>/<volume>/...', but " f"received: {databricks_unity_catalog_path}." ) catalog = match.group("catalog") schema = match.group("schema") volume = match.group("volume") try: volume_path = f"{catalog}.{schema}.{volume}" w.volumes.read(volume_path) except Exception: raise ValueError( f"Databricks Unity Catalog volume does not exist: {volume_path}, please create it on the Databricks " "workspace." ) try: w.files.get_directory_metadata(databricks_unity_catalog_path) logger.info(f"Directory {databricks_unity_catalog_path} already exists, skip creating it.") except Exception: # Create the directory if it doesn't exist, we don't raise an error because this is a common case. logger.info(f"Creating directory {databricks_unity_catalog_path} in Databricks Unity Catalog...") w.files.create_directory(databricks_unity_catalog_path) logger.info(f"Successfully created directory {databricks_unity_catalog_path} in Databricks Unity Catalog!") def _save_data_to_local_file(train_data: list[dict[str, Any]], data_format: TrainDataFormat): import uuid file_name = f"finetuning_{uuid.uuid4()}.jsonl" finetune_dir = get_finetune_directory() file_path = os.path.join(finetune_dir, file_name) file_path = os.path.abspath(file_path) with open(file_path, "wb") as f: for item in train_data: if data_format == TrainDataFormat.CHAT: _validate_chat_data(item) elif data_format == TrainDataFormat.COMPLETION: _validate_completion_data(item) f.write(orjson.dumps(item) + b"\n") return file_path def _validate_chat_data(data: dict[str, Any]): if "messages" not in data: raise ValueError( "Each finetuning data must be a dict with a 'messages' key when `task=CHAT_COMPLETION`, but " f"received: {data}" ) if not isinstance(data["messages"], list): raise ValueError( "The value of the 'messages' key in each finetuning data must be a list of dicts with keys 'role' and " f"'content' when `task=CHAT_COMPLETION`, but received: {data['messages']}" ) for message in data["messages"]: if "role" not in message: raise ValueError(f"Each message in the 'messages' list must contain a 'role' key, but received: {message}.") if "content" not in message: raise ValueError( f"Each message in the 'messages' list must contain a 'content' key, but received: {message}." ) def _validate_completion_data(data: dict[str, Any]): if "prompt" not in data: raise ValueError( "Each finetuning data must be a dict with a 'prompt' key when `task=INSTRUCTION_FINETUNE`, but " f"received: {data}" ) if "response" not in data and "completion" not in data: raise ValueError( "Each finetuning data must be a dict with a 'response' or 'completion' key when " f"`task=INSTRUCTION_FINETUNE`, but received: {data}" ) ``` -------------------------------------------------------------------------------- /docs/mkdocs.yml: -------------------------------------------------------------------------------- ```yaml site_name: DSPy site_description: The framework for programming—rather than prompting—language models. site_url: https://dspy.ai/ repo_url: https://github.com/stanfordnlp/dspy repo_name: stanfordnlp/dspy edit_uri: blob/main/docs/docs/ docs_dir: "docs/" nav: - Get Started: index.md - Learn DSPy: - Learning DSPy: learn/index.md - DSPy Programming: - Programming Overview: learn/programming/overview.md - Language Models: learn/programming/language_models.md - Signatures: learn/programming/signatures.md - Modules: learn/programming/modules.md - Adapters: learn/programming/adapters.md - Tools: learn/programming/tools.md - MCP: learn/programming/mcp.md - DSPy Evaluation: - Evaluation Overview: learn/evaluation/overview.md - Data Handling: learn/evaluation/data.md - Metrics: learn/evaluation/metrics.md - DSPy Optimization: - Optimization Overview: learn/optimization/overview.md - Optimizers: learn/optimization/optimizers.md - Tutorials: - Tutorials Overview: tutorials/index.md - Build AI Programs with DSPy: - Overview: tutorials/build_ai_program/index.md - Managing Conversation History: tutorials/conversation_history/index.md - Building AI Agents with DSPy: tutorials/customer_service_agent/index.ipynb - Building AI Applications by Customizing DSPy Modules: tutorials/custom_module/index.ipynb - Retrieval-Augmented Generation (RAG): tutorials/rag/index.ipynb - Building RAG as Agent: tutorials/agents/index.ipynb - Entity Extraction: tutorials/entity_extraction/index.ipynb - Classification: tutorials/classification/index.md - Multi-Hop RAG: tutorials/multihop_search/index.ipynb - Privacy-Conscious Delegation: tutorials/papillon/index.md - Program Of Thought: tutorials/program_of_thought/index.ipynb - Image Generation Prompt iteration: tutorials/image_generation_prompting/index.ipynb - Audio: tutorials/audio/index.ipynb - Optimize AI Programs with DSPy: - Overview: tutorials/optimize_ai_program/index.md - Math Reasoning: tutorials/math/index.ipynb - Classification Finetuning: tutorials/classification_finetuning/index.ipynb - Advanced Tool Use: tutorials/tool_use/index.ipynb - Finetuning Agents: tutorials/games/index.ipynb - Reflective Prompt Evolution with dspy.GEPA: - Overview: tutorials/gepa_ai_program/index.md - GEPA for AIME (Math): tutorials/gepa_aime/index.ipynb - GEPA for Structured Information Extraction for Enterprise Tasks: tutorials/gepa_facilitysupportanalyzer/index.ipynb - GEPA for Privacy-Conscious Delegation: tutorials/gepa_papillon/index.ipynb - Experimental RL Optimization for DSPy: - Overview: tutorials/rl_ai_program/index.md - RL for Privacy-Conscious Delegation: tutorials/rl_papillon/index.ipynb - RL for Multi-Hop Research: tutorials/rl_multihop/index.ipynb - Tools, Development, and Deployment: - Overview: tutorials/core_development/index.md - Use MCP in DSPy: tutorials/mcp/index.md - Output Refinement: tutorials/output_refinement/best-of-n-and-refine.md - Saving and Loading: tutorials/saving/index.md - Cache: tutorials/cache/index.md - Deployment: tutorials/deployment/index.md - Debugging & Observability: tutorials/observability/index.md - Tracking DSPy Optimizers: tutorials/optimizer_tracking/index.md - Streaming: tutorials/streaming/index.md - Async: tutorials/async/index.md - Real-World Examples: - Overview: tutorials/real_world_examples/index.md - Generating llms.txt: tutorials/llms_txt_generation/index.md - Memory-Enabled ReAct Agents: tutorials/mem0_react_agent/index.md - Financial Analysis with Yahoo Finance: tutorials/yahoo_finance_react/index.md - Email Information Extraction: tutorials/email_extraction/index.md - Code Generation for Unfamiliar Libraries: tutorials/sample_code_generation/index.md - Building a Creative Text-Based AI Game: tutorials/ai_text_game/index.md - DSPy in Production: production/index.md - Community: - Community Resources: community/community-resources.md - Use Cases: community/use-cases.md - Contributing: community/how-to-contribute.md - FAQ: - FAQ: faqs.md - Cheatsheet: cheatsheet.md - API Reference: - API Reference: api/index.md - Adapters: - Adapter: api/adapters/Adapter.md - ChatAdapter: api/adapters/ChatAdapter.md - JSONAdapter: api/adapters/JSONAdapter.md - TwoStepAdapter: api/adapters/TwoStepAdapter.md - Evaluation: - CompleteAndGrounded: api/evaluation/CompleteAndGrounded.md - Evaluate: api/evaluation/Evaluate.md - EvaluationResult: api/evaluation/EvaluationResult.md - SemanticF1: api/evaluation/SemanticF1.md - answer_exact_match: api/evaluation/answer_exact_match.md - answer_passage_match: api/evaluation/answer_passage_match.md - Experimental: - Citations: api/experimental/Citations.md - Document: api/experimental/Document.md - Models: - Embedder: api/models/Embedder.md - LM: api/models/LM.md - Modules: - BestOfN: api/modules/BestOfN.md - ChainOfThought: api/modules/ChainOfThought.md - CodeAct: api/modules/CodeAct.md - Module: api/modules/Module.md - MultiChainComparison: api/modules/MultiChainComparison.md - Parallel: api/modules/Parallel.md - Predict: api/modules/Predict.md - ProgramOfThought: api/modules/ProgramOfThought.md - ReAct: api/modules/ReAct.md - Refine: api/modules/Refine.md - Optimizers: - GEPA: - 1. GEPA Overview: api/optimizers/GEPA/overview.md - 2. GEPA Advanced: api/optimizers/GEPA/GEPA_Advanced.md - BetterTogether: api/optimizers/BetterTogether.md - BootstrapFewShot: api/optimizers/BootstrapFewShot.md - BootstrapFewShotWithRandomSearch: api/optimizers/BootstrapFewShotWithRandomSearch.md - BootstrapFinetune: api/optimizers/BootstrapFinetune.md - BootstrapRS: api/optimizers/BootstrapRS.md - COPRO: api/optimizers/COPRO.md - Ensemble: api/optimizers/Ensemble.md - InferRules: api/optimizers/InferRules.md - KNN: api/optimizers/KNN.md - KNNFewShot: api/optimizers/KNNFewShot.md - LabeledFewShot: api/optimizers/LabeledFewShot.md - MIPROv2: api/optimizers/MIPROv2.md - SIMBA: api/optimizers/SIMBA.md - Primitives: - Audio: api/primitives/Audio.md - Code: api/primitives/Code.md - Example: api/primitives/Example.md - History: api/primitives/History.md - Image: api/primitives/Image.md - Prediction: api/primitives/Prediction.md - Tool: api/primitives/Tool.md - ToolCalls: api/primitives/ToolCalls.md - Signatures: - InputField: api/signatures/InputField.md - OutputField: api/signatures/OutputField.md - Signature: api/signatures/Signature.md - Tools: - ColBERTv2: api/tools/ColBERTv2.md - Embeddings: api/tools/Embeddings.md - PythonInterpreter: api/tools/PythonInterpreter.md - Utils: - StatusMessage: api/utils/StatusMessage.md - StatusMessageProvider: api/utils/StatusMessageProvider.md - StreamListener: api/utils/StreamListener.md - asyncify: api/utils/asyncify.md - configure_cache: api/utils/configure_cache.md - disable_litellm_logging: api/utils/disable_litellm_logging.md - disable_logging: api/utils/disable_logging.md - enable_litellm_logging: api/utils/enable_litellm_logging.md - enable_logging: api/utils/enable_logging.md - inspect_history: api/utils/inspect_history.md - load: api/utils/load.md - streamify: api/utils/streamify.md theme: name: material custom_dir: overrides features: - navigation.tabs - navigation.path - navigation.indexes - navigation.expand - toc.follow - toc.integrate - navigation.top - search.suggest - search.highlight - content.tabs.link - content.code.annotation - content.code.copy - navigation.footer - content.action.edit language: en palette: - scheme: default toggle: icon: material/weather-night name: Switch to dark mode primary: white accent: black - scheme: slate toggle: icon: material/weather-sunny name: Switch to light mode primary: black accent: lime icon: repo: fontawesome/brands/git-alt edit: material/pencil view: material/eye logo: static/img/dspy_logo.png favicon: static/img/logo.png extra_css: - stylesheets/extra.css plugins: - social - search: lang: en separator: '[\s\-\.]+' - mkdocstrings: handlers: python: options: docstring_style: google show_source: true show_root_heading: true heading_level: 3 members_order: source separate_signature: false show_category_heading: true show_symbol_type_heading: true show_docstring_parameters: true show_if_no_docstring: true show_signature_annotations: true unwrap_annotated: true annotations_path: brief docstring_section_style: table merge_init_into_class: true rendering: show_if_no_docstring: true show_warnings: false html_meta: false - mkdocs-jupyter: ignore_h1_titles: true - redirects: redirect_maps: # Redirect /intro/ to the main page "intro/index.md": "index.md" "intro.md": "index.md" "deep-dive/optimizers/bootstrap-fewshot.md": "api/optimizers/BootstrapFewShot.md" "deep-dive/optimizers/bfrs.md": "api/optimizers/BootstrapFewShotWithRandomSearch.md" "deep-dive/optimizers/BootstrapFinetune.md": "api/optimizers/BootstrapFinetune.md" "deep-dive/optimizers/copro.md": "api/optimizers/COPRO.md" "deep-dive/optimizers/Ensemble.md": "api/optimizers/Ensemble.md" "deep-dive/optimizers/LabeledFewShot.md": "api/optimizers/LabeledFewShot.md" "deep-dive/optimizers/miprov2.md": "api/optimizers/MIPROv2.md" "api/optimizers/GEPA/index.md": "api/optimizers/GEPA/overview.md" "docs/quick-start/getting-started-01.md": "tutorials/rag/index.ipynb" "docs/quick-start/getting-started-02.md": "tutorials/rag/index.ipynb" "quick-start/getting-started-01.md": "tutorials/rag/index.ipynb" "quick-start/getting-started-02.md": "tutorials/rag/index.ipynb" - llmstxt: markdown_description: > DSPy is the framework for programming—rather than prompting—language models. DSPy unifies techniques for prompting, fine-tuning, reasoning, tool use, and evaluation of LMs. It provides a systematic approach to building AI applications through composable modules, optimization techniques, and evaluation frameworks. sections: Getting Started: - index.md: DSPy overview and quick start guide - cheatsheet.md: DSPy cheatsheet for quick reference Core Concepts: - learn/programming/overview.md: Programming paradigm and philosophy - learn/programming/signatures.md: Signatures - declarative input/output specifications - learn/programming/modules.md: Modules - composable AI components - learn/programming/language_models.md: Language model interfaces and configuration Essential Tutorials: - tutorials/rag/index.ipynb: Retrieval-Augmented Generation (RAG) tutorial - tutorials/classification/index.md: Classification with DSPy - tutorials/agents/index.ipynb: Building AI agents with DSPy Optimization: - learn/optimization/overview.md: Optimization techniques overview - tutorials/optimize_ai_program/index.md: Guide to optimizing AI programs - api/optimizers/BootstrapFewShot.md: Bootstrap few-shot optimizer Key Modules API: - api/modules/Predict.md: Basic prediction module - api/modules/ChainOfThought.md: Chain of thought reasoning - api/modules/ReAct.md: ReAct agent module Core API Reference: - api/signatures/Signature.md: Signature system documentation - api/primitives/Example.md: Example primitive for training data Production: - tutorials/deployment/index.md: Production deployment guide - tutorials/observability/index.md: Debugging and observability extra: social: - icon: fontawesome/brands/github link: https://github.com/stanfordnlp/dspy - icon: fontawesome/brands/discord link: https://discord.gg/XCGy2WDCQB extra_javascript: - "js/runllm-widget.js" markdown_extensions: - toc: permalink: true toc_depth: 3 - pymdownx.tabbed: alternate_style: true - pymdownx.highlight: anchor_linenums: true - pymdownx.inlinehilite - pymdownx.snippets - admonition - pymdownx.arithmatex: generic: true - footnotes - pymdownx.details - pymdownx.superfences - pymdownx.mark - attr_list - md_in_html - pymdownx.emoji: emoji_index: !!python/name:material.extensions.emoji.twemoji emoji_generator: !!python/name:material.extensions.emoji.to_svg copyright: | © 2025 <a href="https://github.com/stanfordnlp" target="_blank" rel="noopener">DSPy</a> ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/mcp/index.md: -------------------------------------------------------------------------------- ```markdown # Tutorial: Use MCP tools in DSPy MCP, standing for Model Context Protocol, is an open protocol that standardizes how applications provide context to LLMs. Despite some development overhead, MCP offers a valuable opportunity to share tools, resources, and prompts with other developers regardless of the technical stack you are using. Likewise, you can use the tools built by other developers without rewriting code. In this guide, we will walk you through how to use MCP tools in DSPy. For demonstration purposes, we will build an airline service agent that can help users book flights and modify or cancel existing bookings. This will rely on an MCP server with custom tools, but it should be easy to generalize to [MCP servers built by the community](https://modelcontextprotocol.io/examples). ??? "How to run this tutorial" This tutorial cannot be run in hosted IPython notebooks like Google Colab or Databricks notebooks. To run the code, you will need to follow the guide to write code on your local device. The code is tested on macOS and should work the same way in Linux environments. ## Install Dependencies Before starting, let's install the required dependencies: ```shell pip install -U "dspy[mcp]" ``` ## MCP Server Setup Let's first set up the MCP server for the airline agent, which contains: - A set of databases - User database, storing user information. - Flight database, storing flight information. - Ticket database, storing customer tickets. - A set of tools - fetch_flight_info: get flight information for specific dates. - fetch_itinerary: get information about booked itineraries. - book_itinerary: book a flight on behalf of the user. - modify_itinerary: modify an itinerary, either through flight changes or cancellation. - get_user_info: get user information. - file_ticket: file a backlog ticket for human assistance. In your working directory, create a file `mcp_server.py`, and paste the following content into it: ```python import random import string from mcp.server.fastmcp import FastMCP from pydantic import BaseModel # Create an MCP server mcp = FastMCP("Airline Agent") class Date(BaseModel): # Somehow LLM is bad at specifying `datetime.datetime` year: int month: int day: int hour: int class UserProfile(BaseModel): user_id: str name: str email: str class Flight(BaseModel): flight_id: str date_time: Date origin: str destination: str duration: float price: float class Itinerary(BaseModel): confirmation_number: str user_profile: UserProfile flight: Flight class Ticket(BaseModel): user_request: str user_profile: UserProfile user_database = { "Adam": UserProfile(user_id="1", name="Adam", email="[email protected]"), "Bob": UserProfile(user_id="2", name="Bob", email="[email protected]"), "Chelsie": UserProfile(user_id="3", name="Chelsie", email="[email protected]"), "David": UserProfile(user_id="4", name="David", email="[email protected]"), } flight_database = { "DA123": Flight( flight_id="DA123", origin="SFO", destination="JFK", date_time=Date(year=2025, month=9, day=1, hour=1), duration=3, price=200, ), "DA125": Flight( flight_id="DA125", origin="SFO", destination="JFK", date_time=Date(year=2025, month=9, day=1, hour=7), duration=9, price=500, ), "DA456": Flight( flight_id="DA456", origin="SFO", destination="SNA", date_time=Date(year=2025, month=10, day=1, hour=1), duration=2, price=100, ), "DA460": Flight( flight_id="DA460", origin="SFO", destination="SNA", date_time=Date(year=2025, month=10, day=1, hour=9), duration=2, price=120, ), } itinery_database = {} ticket_database = {} @mcp.tool() def fetch_flight_info(date: Date, origin: str, destination: str): """Fetch flight information from origin to destination on the given date""" flights = [] for flight_id, flight in flight_database.items(): if ( flight.date_time.year == date.year and flight.date_time.month == date.month and flight.date_time.day == date.day and flight.origin == origin and flight.destination == destination ): flights.append(flight) return flights @mcp.tool() def fetch_itinerary(confirmation_number: str): """Fetch a booked itinerary information from database""" return itinery_database.get(confirmation_number) @mcp.tool() def pick_flight(flights: list[Flight]): """Pick up the best flight that matches users' request.""" sorted_flights = sorted( flights, key=lambda x: ( x.get("duration") if isinstance(x, dict) else x.duration, x.get("price") if isinstance(x, dict) else x.price, ), ) return sorted_flights[0] def generate_id(length=8): chars = string.ascii_lowercase + string.digits return "".join(random.choices(chars, k=length)) @mcp.tool() def book_itinerary(flight: Flight, user_profile: UserProfile): """Book a flight on behalf of the user.""" confirmation_number = generate_id() while confirmation_number in itinery_database: confirmation_number = generate_id() itinery_database[confirmation_number] = Itinerary( confirmation_number=confirmation_number, user_profile=user_profile, flight=flight, ) return confirmation_number, itinery_database[confirmation_number] @mcp.tool() def cancel_itinerary(confirmation_number: str, user_profile: UserProfile): """Cancel an itinerary on behalf of the user.""" if confirmation_number in itinery_database: del itinery_database[confirmation_number] return raise ValueError("Cannot find the itinerary, please check your confirmation number.") @mcp.tool() def get_user_info(name: str): """Fetch the user profile from database with given name.""" return user_database.get(name) @mcp.tool() def file_ticket(user_request: str, user_profile: UserProfile): """File a customer support ticket if this is something the agent cannot handle.""" ticket_id = generate_id(length=6) ticket_database[ticket_id] = Ticket( user_request=user_request, user_profile=user_profile, ) return ticket_id if __name__ == "__main__": mcp.run() ``` Before we start the server, let's take a look at the code. We first create a `FastMCP` instance, which is a utility that helps quickly build an MCP server: ```python mcp = FastMCP("Airline Agent") ``` Then we define our data structures, which in a real-world application would be the database schema, e.g.: ```python class Flight(BaseModel): flight_id: str date_time: Date origin: str destination: str duration: float price: float ``` Following that, we initialize our database instances. In a real-world application, these would be connectors to actual databases, but for simplicity, we just use dictionaries: ```python user_database = { "Adam": UserProfile(user_id="1", name="Adam", email="[email protected]"), "Bob": UserProfile(user_id="2", name="Bob", email="[email protected]"), "Chelsie": UserProfile(user_id="3", name="Chelsie", email="[email protected]"), "David": UserProfile(user_id="4", name="David", email="[email protected]"), } ``` The next step is to define the tools and mark them with `@mcp.tool()` so that they are discoverable by MCP clients as MCP tools: ```python @mcp.tool() def fetch_flight_info(date: Date, origin: str, destination: str): """Fetch flight information from origin to destination on the given date""" flights = [] for flight_id, flight in flight_database.items(): if ( flight.date_time.year == date.year and flight.date_time.month == date.month and flight.date_time.day == date.day and flight.origin == origin and flight.destination == destination ): flights.append(flight) return flights ``` The last step is spinning up the server: ```python if __name__ == "__main__": mcp.run() ``` Now we have finished writing the server! Let's launch it: ```shell python path_to_your_working_directory/mcp_server.py ``` ## Write a DSPy Program That Utilizes Tools in MCP Server Now that the server is running, let's build the actual airline service agent which utilizes the MCP tools in our server to assist users. In your working directory, create a file named `dspy_mcp_agent.py`, and follow the guide to add code to it. ### Gather Tools from MCP Servers We first need to gather all available tools from the MCP server and make them usable by DSPy. DSPy provides an API [`dspy.Tool`](https://dspy.ai/api/primitives/Tool/) as the standard tool interface. Let's convert all the MCP tools to `dspy.Tool`. We need to create an MCP client instance to communicate with the MCP server, fetch all available tools, and convert them to `dspy.Tool` using the static method `from_mcp_tool`: ```python from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client # Create server parameters for stdio connection server_params = StdioServerParameters( command="python", # Executable args=["path_to_your_working_directory/mcp_server.py"], env=None, ) async def run(): async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the connection await session.initialize() # List available tools tools = await session.list_tools() # Convert MCP tools to DSPy tools dspy_tools = [] for tool in tools.tools: dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool)) print(len(dspy_tools)) print(dspy_tools[0].args) if __name__ == "__main__": import asyncio asyncio.run(run()) ``` With the code above, we have successfully collected all available MCP tools and converted them to DSPy tools. ### Build a DSPy Agent to Handle Customer Requests Now we will use `dspy.ReAct` to build the agent for handling customer requests. `ReAct` stands for "reasoning and acting," which asks the LLM to decide whether to call a tool or wrap up the process. If a tool is required, the LLM takes responsibility for deciding which tool to call and providing the appropriate arguments. As usual, we need to create a `dspy.Signature` to define the input and output of our agent: ```python import dspy class DSPyAirlineCustomerService(dspy.Signature): """You are an airline customer service agent. You are given a list of tools to handle user requests. You should decide the right tool to use in order to fulfill users' requests.""" user_request: str = dspy.InputField() process_result: str = dspy.OutputField( desc=( "Message that summarizes the process result, and the information users need, " "e.g., the confirmation_number if it's a flight booking request." ) ) ``` And choose an LM for our agent: ```python dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) ``` Then we create the ReAct agent by passing the tools and signature into the `dspy.ReAct` API. We can now put together the complete code script: ```python from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client import dspy # Create server parameters for stdio connection server_params = StdioServerParameters( command="python", # Executable args=["script_tmp/mcp_server.py"], # Optional command line arguments env=None, # Optional environment variables ) class DSPyAirlineCustomerService(dspy.Signature): """You are an airline customer service agent. You are given a list of tools to handle user requests. You should decide the right tool to use in order to fulfill users' requests.""" user_request: str = dspy.InputField() process_result: str = dspy.OutputField( desc=( "Message that summarizes the process result, and the information users need, " "e.g., the confirmation_number if it's a flight booking request." ) ) dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) async def run(user_request): async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the connection await session.initialize() # List available tools tools = await session.list_tools() # Convert MCP tools to DSPy tools dspy_tools = [] for tool in tools.tools: dspy_tools.append(dspy.Tool.from_mcp_tool(session, tool)) # Create the agent react = dspy.ReAct(DSPyAirlineCustomerService, tools=dspy_tools) result = await react.acall(user_request=user_request) print(result) if __name__ == "__main__": import asyncio asyncio.run(run("please help me book a flight from SFO to JFK on 09/01/2025, my name is Adam")) ``` Note that we must call `react.acall` because MCP tools are async by default. Let's execute the script: ```shell python path_to_your_working_directory/dspy_mcp_agent.py ``` You should see output similar to this: ``` Prediction( trajectory={'thought_0': 'I need to fetch flight information for Adam from SFO to JFK on 09/01/2025 to find available flights for booking.', 'tool_name_0': 'fetch_flight_info', 'tool_args_0': {'date': {'year': 2025, 'month': 9, 'day': 1, 'hour': 0}, 'origin': 'SFO', 'destination': 'JFK'}, 'observation_0': ['{"flight_id": "DA123", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 1}, "origin": "SFO", "destination": "JFK", "duration": 3.0, "price": 200.0}', '{"flight_id": "DA125", "date_time": {"year": 2025, "month": 9, "day": 1, "hour": 7}, "origin": "SFO", "destination": "JFK", "duration": 9.0, "price": 500.0}'], ..., 'tool_name_4': 'finish', 'tool_args_4': {}, 'observation_4': 'Completed.'}, reasoning="I successfully booked a flight for Adam from SFO to JFK on 09/01/2025. I found two available flights, selected the more economical option (flight DA123 at 1 AM for $200), retrieved Adam's user profile, and completed the booking process. The confirmation number for the flight is 8h7clk3q.", process_result='Your flight from SFO to JFK on 09/01/2025 has been successfully booked. Your confirmation number is 8h7clk3q.' ) ``` The `trajectory` field contains the entire thinking and acting process. If you're curious about what's happening under the hood, check out the [Observability Guide](https://dspy.ai/tutorials/observability/) to set up MLflow, which visualizes every step happening inside `dspy.ReAct`! ## Conclusion In this guide, we built an airline service agent that utilizes a custom MCP server and the `dspy.ReAct` module. In the context of MCP support, DSPy provides a simple interface for interacting with MCP tools, giving you the flexibility to implement any functionality you need. ``` -------------------------------------------------------------------------------- /docs/docs/learn/optimization/optimizers.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 1 --- # DSPy Optimizers (formerly Teleprompters) A **DSPy optimizer** is an algorithm that can tune the parameters of a DSPy program (i.e., the prompts and/or the LM weights) to maximize the metrics you specify, like accuracy. A typical DSPy optimizer takes three things: - Your **DSPy program**. This may be a single module (e.g., `dspy.Predict`) or a complex multi-module program. - Your **metric**. This is a function that evaluates the output of your program, and assigns it a score (higher is better). - A few **training inputs**. This may be very small (i.e., only 5 or 10 examples) and incomplete (only inputs to your program, without any labels). If you happen to have a lot of data, DSPy can leverage that. But you can start small and get strong results. **Note:** Formerly called teleprompters. We are making an official name update, which will be reflected throughout the library and documentation. ## What does a DSPy Optimizer tune? How does it tune them? Different optimizers in DSPy will tune your program's quality by **synthesizing good few-shot examples** for every module, like `dspy.BootstrapRS`,<sup>[1](https://arxiv.org/abs/2310.03714)</sup> **proposing and intelligently exploring better natural-language instructions** for every prompt, like `dspy.MIPROv2`,<sup>[2](https://arxiv.org/abs/2406.11695)</sup> and `dspy.GEPA`,<sup>[3](https://arxiv.org/abs/2507.19457)</sup> and **building datasets for your modules and using them to finetune the LM weights** in your system, like `dspy.BootstrapFinetune`.<sup>[4](https://arxiv.org/abs/2407.10930)</sup> ??? "What's an example of a DSPy optimizer? How do different optimizers work?" Take the `dspy.MIPROv2` optimizer as an example. First, MIPRO starts with the **bootstrapping stage**. It takes your program, which may be unoptimized at this point, and runs it many times across different inputs to collect traces of input/output behavior for each one of your modules. It filters these traces to keep only those that appear in trajectories scored highly by your metric. Second, MIPRO enters its **grounded proposal stage**. It previews your DSPy program's code, your data, and traces from running your program, and uses them to draft many potential instructions for every prompt in your program. Third, MIPRO launches the **discrete search stage**. It samples mini-batches from your training set, proposes a combination of instructions and traces to use for constructing every prompt in the pipeline, and evaluates the candidate program on the mini-batch. Using the resulting score, MIPRO updates a surrogate model that helps the proposals get better over time. One thing that makes DSPy optimizers so powerful is that they can be composed. You can run `dspy.MIPROv2` and use the produced program as an input to `dspy.MIPROv2` again or, say, to `dspy.BootstrapFinetune` to get better results. This is partly the essence of `dspy.BetterTogether`. Alternatively, you can run the optimizer and then extract the top-5 candidate programs and build a `dspy.Ensemble` of them. This allows you to scale _inference-time compute_ (e.g., ensembles) as well as DSPy's unique _pre-inference time compute_ (i.e., optimization budget) in highly systematic ways. ## What DSPy Optimizers are currently available? Optimizers can be accessed via `from dspy.teleprompt import *`. ### Automatic Few-Shot Learning These optimizers extend the signature by automatically generating and including **optimized** examples within the prompt sent to the model, implementing few-shot learning. 1. [**`LabeledFewShot`**](../../api/optimizers/LabeledFewShot.md): Simply constructs few-shot examples (demos) from provided labeled input and output data points. Requires `k` (number of examples for the prompt) and `trainset` to randomly select `k` examples from. 2. [**`BootstrapFewShot`**](../../api/optimizers/BootstrapFewShot.md): Uses a `teacher` module (which defaults to your program) to generate complete demonstrations for every stage of your program, along with labeled examples in `trainset`. Parameters include `max_labeled_demos` (the number of demonstrations randomly selected from the `trainset`) and `max_bootstrapped_demos` (the number of additional examples generated by the `teacher`). The bootstrapping process employs the metric to validate demonstrations, including only those that pass the metric in the "compiled" prompt. Advanced: Supports using a `teacher` program that is a *different* DSPy program that has compatible structure, for harder tasks. 3. [**`BootstrapFewShotWithRandomSearch`**](../../api/optimizers/BootstrapFewShotWithRandomSearch.md): Applies `BootstrapFewShot` several times with random search over generated demonstrations, and selects the best program over the optimization. Parameters mirror those of `BootstrapFewShot`, with the addition of `num_candidate_programs`, which specifies the number of random programs evaluated over the optimization, including candidates of the uncompiled program, `LabeledFewShot` optimized program, `BootstrapFewShot` compiled program with unshuffled examples and `num_candidate_programs` of `BootstrapFewShot` compiled programs with randomized example sets. 4. [**`KNNFewShot`**](../../api/optimizers/KNNFewShot.md). Uses k-Nearest Neighbors algorithm to find the nearest training example demonstrations for a given input example. These nearest neighbor demonstrations are then used as the trainset for the BootstrapFewShot optimization process. ### Automatic Instruction Optimization These optimizers produce optimal instructions for the prompt and, in the case of MIPROv2 can also optimize the set of few-shot demonstrations. 5. [**`COPRO`**](../../api/optimizers/COPRO.md): Generates and refines new instructions for each step, and optimizes them with coordinate ascent (hill-climbing using the metric function and the `trainset`). Parameters include `depth` which is the number of iterations of prompt improvement the optimizer runs over. 6. [**`MIPROv2`**](../../api/optimizers/MIPROv2.md): Generates instructions *and* few-shot examples in each step. The instruction generation is data-aware and demonstration-aware. Uses Bayesian Optimization to effectively search over the space of generation instructions/demonstrations across your modules. 7. [**`SIMBA`**](../../api/optimizers/SIMBA.md) 8. [**`GEPA`**](../../api/optimizers/GEPA/overview.md): Uses LM's to reflect on the DSPy program's trajectory, to identify what worked, what didn't and propose prompts addressing the gaps. Additionally, GEPA can leverage domain-specific textual feedback to rapidly improve the DSPy program. Detailed tutorials on using GEPA are available at [dspy.GEPA Tutorials](../../tutorials/gepa_ai_program/index.md). ### Automatic Finetuning This optimizer is used to fine-tune the underlying LLM(s). 9. [**`BootstrapFinetune`**](/api/optimizers/BootstrapFinetune): Distills a prompt-based DSPy program into weight updates. The output is a DSPy program that has the same steps, but where each step is conducted by a finetuned model instead of a prompted LM. [See the classification fine-tuning tutorial](https://dspy.ai/tutorials/classification_finetuning/) for a complete example. ### Program Transformations 10. [**`Ensemble`**](../../api/optimizers/Ensemble.md): Ensembles a set of DSPy programs and either uses the full set or randomly samples a subset into a single program. ## Which optimizer should I use? Ultimately, finding the ‘right’ optimizer to use & the best configuration for your task will require experimentation. Success in DSPy is still an iterative process - getting the best performance on your task will require you to explore and iterate. That being said, here's the general guidance on getting started: - If you have **very few examples** (around 10), start with `BootstrapFewShot`. - If you have **more data** (50 examples or more), try `BootstrapFewShotWithRandomSearch`. - If you prefer to do **instruction optimization only** (i.e. you want to keep your prompt 0-shot), use `MIPROv2` [configured for 0-shot optimization](../../api/optimizers/MIPROv2.md). - If you’re willing to use more inference calls to perform **longer optimization runs** (e.g. 40 trials or more), and have enough data (e.g. 200 examples or more to prevent overfitting) then try `MIPROv2`. - If you have been able to use one of these with a large LM (e.g., 7B parameters or above) and need a very **efficient program**, finetune a small LM for your task with `BootstrapFinetune`. ## How do I use an optimizer? They all share this general interface, with some differences in the keyword arguments (hyperparameters). A full list can be found in the [API reference](../../api/optimizers/BetterTogether.md). Let's see this with the most common one, `BootstrapFewShotWithRandomSearch`. ```python from dspy.teleprompt import BootstrapFewShotWithRandomSearch # Set up the optimizer: we want to "bootstrap" (i.e., self-generate) 8-shot examples of your program's steps. # The optimizer will repeat this 10 times (plus some initial attempts) before selecting its best attempt on the devset. config = dict(max_bootstrapped_demos=4, max_labeled_demos=4, num_candidate_programs=10, num_threads=4) teleprompter = BootstrapFewShotWithRandomSearch(metric=YOUR_METRIC_HERE, **config) optimized_program = teleprompter.compile(YOUR_PROGRAM_HERE, trainset=YOUR_TRAINSET_HERE) ``` !!! info "Getting Started III: Optimizing the LM prompts or weights in DSPy programs" A typical simple optimization run costs on the order of $2 USD and takes around ten minutes, but be careful when running optimizers with very large LMs or very large datasets. Optimizer runs can cost as little as a few cents or up to tens of dollars, depending on your LM, dataset, and configuration. === "Optimizing prompts for a ReAct agent" This is a minimal but fully runnable example of setting up a `dspy.ReAct` agent that answers questions via search from Wikipedia and then optimizing it using `dspy.MIPROv2` in the cheap `light` mode on 500 question-answer pairs sampled from the `HotPotQA` dataset. ```python linenums="1" import dspy from dspy.datasets import HotPotQA dspy.configure(lm=dspy.LM('openai/gpt-4o-mini')) def search(query: str) -> list[str]: """Retrieves abstracts from Wikipedia.""" results = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')(query, k=3) return [x['text'] for x in results] trainset = [x.with_inputs('question') for x in HotPotQA(train_seed=2024, train_size=500).train] react = dspy.ReAct("question -> answer", tools=[search]) tp = dspy.MIPROv2(metric=dspy.evaluate.answer_exact_match, auto="light", num_threads=24) optimized_react = tp.compile(react, trainset=trainset) ``` An informal run similar to this on DSPy 2.5.29 raises ReAct's score from 24% to 51%. === "Optimizing prompts for RAG" Given a retrieval index to `search`, your favorite `dspy.LM`, and a small `trainset` of questions and ground-truth responses, the following code snippet can optimize your RAG system with long outputs against the built-in `dspy.SemanticF1` metric, which is implemented as a DSPy module. ```python linenums="1" class RAG(dspy.Module): def __init__(self, num_docs=5): self.num_docs = num_docs self.respond = dspy.ChainOfThought('context, question -> response') def forward(self, question): context = search(question, k=self.num_docs) # not defined in this snippet, see link above return self.respond(context=context, question=question) tp = dspy.MIPROv2(metric=dspy.SemanticF1(), auto="medium", num_threads=24) optimized_rag = tp.compile(RAG(), trainset=trainset, max_bootstrapped_demos=2, max_labeled_demos=2) ``` For a complete RAG example that you can run, start this [tutorial](../../tutorials/rag/index.ipynb). It improves the quality of a RAG system over a subset of StackExchange communities from 53% to 61%. === "Optimizing weights for Classification" <details><summary>Click to show dataset setup code.</summary> ```python linenums="1" import random from typing import Literal from datasets import load_dataset import dspy from dspy.datasets import DataLoader # Load the Banking77 dataset. CLASSES = load_dataset("PolyAI/banking77", split="train", trust_remote_code=True).features["label"].names kwargs = {"fields": ("text", "label"), "input_keys": ("text",), "split": "train", "trust_remote_code": True} # Load the first 2000 examples from the dataset, and assign a hint to each *training* example. trainset = [ dspy.Example(x, hint=CLASSES[x.label], label=CLASSES[x.label]).with_inputs("text", "hint") for x in DataLoader().from_huggingface(dataset_name="PolyAI/banking77", **kwargs)[:2000] ] random.Random(0).shuffle(trainset) ``` </details> ```python linenums="1" import dspy lm=dspy.LM('openai/gpt-4o-mini-2024-07-18') # Define the DSPy module for classification. It will use the hint at training time, if available. signature = dspy.Signature("text, hint -> label").with_updated_fields('label', type_=Literal[tuple(CLASSES)]) classify = dspy.ChainOfThought(signature) classify.set_lm(lm) # Optimize via BootstrapFinetune. optimizer = dspy.BootstrapFinetune(metric=(lambda x, y, trace=None: x.label == y.label), num_threads=24) optimized = optimizer.compile(classify, trainset=trainset) optimized(text="What does a pending cash withdrawal mean?") # For a complete fine-tuning tutorial, see: https://dspy.ai/tutorials/classification_finetuning/ ``` **Possible Output (from the last line):** ```text Prediction( reasoning='A pending cash withdrawal indicates that a request to withdraw cash has been initiated but has not yet been completed or processed. This status means that the transaction is still in progress and the funds have not yet been deducted from the account or made available to the user.', label='pending_cash_withdrawal' ) ``` An informal run similar to this on DSPy 2.5.29 raises GPT-4o-mini's score 66% to 87%. ## Saving and loading optimizer output After running a program through an optimizer, it's useful to also save it. At a later point, a program can be loaded from a file and used for inference. For this, the `load` and `save` methods can be used. ```python optimized_program.save(YOUR_SAVE_PATH) ``` The resulting file is in plain-text JSON format. It contains all the parameters and steps in the source program. You can always read it and see what the optimizer generated. To load a program from a file, you can instantiate an object from that class and then call the load method on it. ```python loaded_program = YOUR_PROGRAM_CLASS() loaded_program.load(path=YOUR_SAVE_PATH) ``` ``` -------------------------------------------------------------------------------- /dspy/evaluate/evaluate.py: -------------------------------------------------------------------------------- ```python import csv import importlib import json import logging import types from typing import TYPE_CHECKING, Any, Callable if TYPE_CHECKING: import pandas as pd import tqdm import dspy from dspy.primitives.prediction import Prediction from dspy.utils.callback import with_callbacks from dspy.utils.parallelizer import ParallelExecutor try: from IPython.display import HTML from IPython.display import display as display except ImportError: def display(obj: Any): """ Display the specified Python object in the console. :param obj: The Python object to display. """ print(obj) def HTML(x: str) -> str: # noqa: N802 """ Obtain the HTML representation of the specified string. """ # NB: This method exists purely for code compatibility with the IPython HTML() function in # environments where IPython is not available. In such environments where IPython is not # available, this method will simply return the input string. return x # TODO: Counting failures and having a max_failure count. When that is exceeded (also just at the end), # we print the number of failures, the first N examples that failed, and the first N exceptions raised. logger = logging.getLogger(__name__) class EvaluationResult(Prediction): """ A class that represents the result of an evaluation. It is a subclass of `dspy.Prediction` that contains the following fields - score: An float value (e.g., 67.30) representing the overall performance - results: a list of (example, prediction, score) tuples for each example in devset """ def __init__(self, score: float, results: list[tuple["dspy.Example", "dspy.Example", Any]]): super().__init__(score=score, results=results) def __repr__(self): return f"EvaluationResult(score={self.score}, results=<list of {len(self.results)} results>)" class Evaluate: """DSPy Evaluate class. This class is used to evaluate the performance of a DSPy program. Users need to provide a evaluation dataset and a metric function in order to use this class. This class supports parallel evaluation on the provided dataset. """ def __init__( self, *, devset: list["dspy.Example"], metric: Callable | None = None, num_threads: int | None = None, display_progress: bool = False, display_table: bool | int = False, max_errors: int | None = None, provide_traceback: bool | None = None, failure_score: float = 0.0, save_as_csv: str | None = None, save_as_json: str | None = None, **kwargs, ): """ Args: devset (list[dspy.Example]): the evaluation dataset. metric (Callable): The metric function to use for evaluation. num_threads (Optional[int]): The number of threads to use for parallel evaluation. display_progress (bool): Whether to display progress during evaluation. display_table (Union[bool, int]): Whether to display the evaluation results in a table. If a number is passed, the evaluation results will be truncated to that number before displayed. max_errors (Optional[int]): The maximum number of errors to allow before stopping evaluation. If ``None``, inherits from ``dspy.settings.max_errors``. provide_traceback (Optional[bool]): Whether to provide traceback information during evaluation. failure_score (float): The default score to use if evaluation fails due to an exception. save_as_csv (Optional[str]): The file name where the csv will be saved. save_as_json (Optional[str]): The file name where the json will be saved. """ self.devset = devset self.metric = metric self.num_threads = num_threads self.display_progress = display_progress self.display_table = display_table self.max_errors = max_errors self.provide_traceback = provide_traceback self.failure_score = failure_score self.save_as_csv = save_as_csv self.save_as_json = save_as_json if "return_outputs" in kwargs: raise ValueError("`return_outputs` is no longer supported. Results are always returned inside the `results` field of the `EvaluationResult` object.") @with_callbacks def __call__( self, program: "dspy.Module", metric: Callable | None = None, devset: list["dspy.Example"] | None = None, num_threads: int | None = None, display_progress: bool | None = None, display_table: bool | int | None = None, callback_metadata: dict[str, Any] | None = None, save_as_csv: str | None = None, save_as_json: str | None = None, ) -> EvaluationResult: """ Args: program (dspy.Module): The DSPy program to evaluate. metric (Callable): The metric function to use for evaluation. if not provided, use `self.metric`. devset (list[dspy.Example]): the evaluation dataset. if not provided, use `self.devset`. num_threads (Optional[int]): The number of threads to use for parallel evaluation. if not provided, use `self.num_threads`. display_progress (bool): Whether to display progress during evaluation. if not provided, use `self.display_progress`. display_table (Union[bool, int]): Whether to display the evaluation results in a table. if not provided, use `self.display_table`. If a number is passed, the evaluation results will be truncated to that number before displayed. callback_metadata (dict): Metadata to be used for evaluate callback handlers. Returns: The evaluation results are returned as a dspy.EvaluationResult object containing the following attributes: - score: A float percentage score (e.g., 67.30) representing overall performance - results: a list of (example, prediction, score) tuples for each example in devset """ metric = metric if metric is not None else self.metric devset = devset if devset is not None else self.devset num_threads = num_threads if num_threads is not None else self.num_threads display_progress = display_progress if display_progress is not None else self.display_progress display_table = display_table if display_table is not None else self.display_table save_as_csv = save_as_csv if save_as_csv is not None else self.save_as_csv save_as_json = save_as_json if save_as_json is not None else self.save_as_json if callback_metadata: logger.debug(f"Evaluate is called with callback metadata: {callback_metadata}") tqdm.tqdm._instances.clear() executor = ParallelExecutor( num_threads=num_threads, disable_progress_bar=not display_progress, max_errors=(self.max_errors if self.max_errors is not None else dspy.settings.max_errors), provide_traceback=self.provide_traceback, compare_results=True, ) def process_item(example): prediction = program(**example.inputs()) score = metric(example, prediction) return prediction, score results = executor.execute(process_item, devset) assert len(devset) == len(results) results = [((dspy.Prediction(), self.failure_score) if r is None else r) for r in results] results = [(example, prediction, score) for example, (prediction, score) in zip(devset, results, strict=False)] ncorrect, ntotal = sum(score for *_, score in results), len(devset) logger.info(f"Average Metric: {ncorrect} / {ntotal} ({round(100 * ncorrect / ntotal, 1)}%)") if display_table: if importlib.util.find_spec("pandas") is not None: # Rename the 'correct' column to the name of the metric object metric_name = metric.__name__ if isinstance(metric, types.FunctionType) else metric.__class__.__name__ # Construct a pandas DataFrame from the results result_df = self._construct_result_table(results, metric_name) self._display_result_table(result_df, display_table, metric_name) else: logger.warning("Skipping table display since `pandas` is not installed.") if save_as_csv: metric_name = ( metric.__name__ if isinstance(metric, types.FunctionType) else metric.__class__.__name__ ) data = self._prepare_results_output(results, metric_name) with open(save_as_csv, "w", newline="") as csvfile: fieldnames = data[0].keys() writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for row in data: writer.writerow(row) if save_as_json: metric_name = ( metric.__name__ if isinstance(metric, types.FunctionType) else metric.__class__.__name__ ) data = self._prepare_results_output(results, metric_name) with open( save_as_json, "w", ) as f: json.dump(data, f) return EvaluationResult( score=round(100 * ncorrect / ntotal, 2), results=results, ) @staticmethod def _prepare_results_output( results: list[tuple["dspy.Example", "dspy.Example", Any]], metric_name: str ): return [ ( merge_dicts(example, prediction) | {metric_name: score} if prediction_is_dictlike(prediction) else dict(example) | {"prediction": prediction, metric_name: score} ) for example, prediction, score in results ] def _construct_result_table( self, results: list[tuple["dspy.Example", "dspy.Example", Any]], metric_name: str ) -> "pd.DataFrame": """ Construct a pandas DataFrame from the specified result list. Let's not try to change the name of this method as it may be patched by external tracing tools. Args: results: The list of results to construct the result DataFrame from. metric_name: The name of the metric used for evaluation. Returns: The constructed pandas DataFrame. """ import pandas as pd data = self._prepare_results_output(results, metric_name) # Truncate every cell in the DataFrame (DataFrame.applymap was renamed to DataFrame.map in Pandas 2.1.0) result_df = pd.DataFrame(data) result_df = result_df.map(truncate_cell) if hasattr(result_df, "map") else result_df.applymap(truncate_cell) return result_df.rename(columns={"correct": metric_name}) def _display_result_table(self, result_df: "pd.DataFrame", display_table: bool | int, metric_name: str): """ Display the specified result DataFrame in a table format. Args: result_df: The result DataFrame to display. display_table: Whether to display the evaluation results in a table. If a number is passed, the evaluation results will be truncated to that number before displayed. metric_name: The name of the metric used for evaluation. """ if isinstance(display_table, bool): df_to_display = result_df.copy() truncated_rows = 0 else: df_to_display = result_df.head(display_table).copy() truncated_rows = len(result_df) - display_table df_to_display = stylize_metric_name(df_to_display, metric_name) display_dataframe(df_to_display) if truncated_rows > 0: # Simplified message about the truncated rows message = f""" <div style=' text-align: center; font-size: 16px; font-weight: bold; color: #555; margin: 10px 0;'> ... {truncated_rows} more rows not displayed ... </div> """ display(HTML(message)) def prediction_is_dictlike(prediction): # Downstream logic for displaying dictionary-like predictions depends solely on the predictions # having a method called `items()` for iterating through key/value pairs return hasattr(prediction, "items") and callable(prediction.items) def merge_dicts(d1, d2) -> dict: merged = {} for k, v in d1.items(): if k in d2: merged[f"example_{k}"] = v else: merged[k] = v for k, v in d2.items(): if k in d1: merged[f"pred_{k}"] = v else: merged[k] = v return merged def truncate_cell(content) -> str: """Truncate content of a cell to 25 words.""" words = str(content).split() if len(words) > 25: return " ".join(words[:25]) + "..." return content def stylize_metric_name(df: "pd.DataFrame", metric_name: str) -> "pd.DataFrame": """ Stylize the cell contents of a pandas DataFrame corresponding to the specified metric name. :param df: The pandas DataFrame for which to stylize cell contents. :param metric_name: The name of the metric for which to stylize DataFrame cell contents. """ def format_metric(x): if isinstance(x, float): return f"✔️ [{x:.3f}]" elif x is not None: return f"✔️ [{x}]" else: return "" df[metric_name] = df[metric_name].apply(format_metric) return df def display_dataframe(df: "pd.DataFrame"): """ Display the specified Pandas DataFrame in the console. :param df: The Pandas DataFrame to display. """ import pandas as pd if is_in_ipython_notebook_environment(): display(configure_dataframe_for_ipython_notebook_display(df)) else: # Pretty print the DataFrame to the console with pd.option_context( "display.max_rows", None, "display.max_columns", None ): # more options can be specified also print(df) def configure_dataframe_for_ipython_notebook_display(df: "pd.DataFrame") -> "pd.DataFrame": """Set various pandas display options for DataFrame in an IPython notebook environment.""" import pandas as pd pd.options.display.max_colwidth = 70 return df def is_in_ipython_notebook_environment(): """ Check if the current environment is an IPython notebook environment. :return: True if the current environment is an IPython notebook environment, False otherwise. """ try: from IPython import get_ipython # This is a best-effort check to see if we are in an IPython notebook environment return "IPKernelApp" in getattr(get_ipython(), "config", {}) except ImportError: return False # FIXME: TODO: The merge_dicts stuff above is way too quick and dirty. # TODO: the display_table can't handle False but can handle 0! # Not sure how it works with True exactly, probably fails too. ``` -------------------------------------------------------------------------------- /dspy/teleprompt/utils.py: -------------------------------------------------------------------------------- ```python import inspect import logging import math import os import random import shutil import sys import numpy as np try: from IPython.core.magics.code import extract_symbols except ImportError: # Won't be able to read code from jupyter notebooks extract_symbols = None import dspy from dspy.teleprompt.bootstrap import BootstrapFewShot, LabeledFewShot """ This file consists of helper functions for our variety of optimizers. """ ### OPTIMIZER TRAINING UTILS ### logger = logging.getLogger(__name__) def create_minibatch(trainset, batch_size=50, rng=None): """Create a minibatch from the trainset.""" # Ensure batch_size isn't larger than the size of the dataset batch_size = min(batch_size, len(trainset)) # If no RNG is provided, fall back to the global random instance rng = rng or random # Randomly sample indices for the mini-batch using the provided rng sampled_indices = rng.sample(range(len(trainset)), batch_size) # Create the mini-batch using the sampled indices minibatch = [trainset[i] for i in sampled_indices] return minibatch def eval_candidate_program(batch_size, trainset, candidate_program, evaluate, rng=None): """Evaluate a candidate program on the trainset, using the specified batch size.""" try: # Evaluate on the full trainset if batch_size >= len(trainset): return evaluate(candidate_program, devset=trainset, callback_metadata={"metric_key": "eval_full"}) # Or evaluate on a minibatch else: return evaluate( candidate_program, devset=create_minibatch(trainset, batch_size, rng), callback_metadata={"metric_key": "eval_minibatch"} ) except Exception: logger.error("An exception occurred during evaluation", exc_info=True) # TODO: Handle this better, as -ve scores are possible return dspy.Prediction(score=0.0, results=[]) def eval_candidate_program_with_pruning( trial, trial_logs, trainset, candidate_program, evaluate, trial_num, batch_size=100, ): """Evaluation of candidate_program with pruning implemented""" # Evaluate with the new prompts total_score = 0 num_batches = math.ceil(len(trainset) / batch_size) total_eval_size = 0 for i in range(num_batches): start_index = i * batch_size end_index = min((i + 1) * batch_size, len(trainset)) split_trainset = trainset[start_index:end_index] split_score = evaluate( candidate_program, devset=split_trainset, display_table=0, ) print(f"{i}st split score: {split_score}") total_eval_size += len(split_trainset) total_score += split_score * len(split_trainset) curr_weighted_avg_score = total_score / min((i + 1) * batch_size, len(trainset)) print(f"curr average score: {curr_weighted_avg_score}") trial.report(curr_weighted_avg_score, i) # Handle pruning based on the intermediate value. if trial.should_prune(): print("Trial pruned.") trial_logs[trial_num]["score"] = curr_weighted_avg_score trial_logs[trial_num]["num_eval_calls"] = total_eval_size trial_logs[trial_num]["pruned"] = True return curr_weighted_avg_score, trial_logs, total_eval_size, True print(f"Fully evaled score: {curr_weighted_avg_score}") score = curr_weighted_avg_score trial_logs[trial_num]["full_eval"] = False trial_logs[trial_num]["score"] = score trial_logs[trial_num]["pruned"] = False return score, trial_logs, total_eval_size, False def get_program_with_highest_avg_score(param_score_dict, fully_evaled_param_combos): """Used as a helper function for bayesian + minibatching optimizers. Returns the program with the highest average score from the batches evaluated so far.""" # Calculate the mean for each combination of categorical parameters, based on past trials results = [] for key, values in param_score_dict.items(): scores = np.array([v[0] for v in values]) mean = np.average(scores) program = values[0][1] params = values[0][2] results.append((key, mean, program, params)) # Sort results by the mean sorted_results = sorted(results, key=lambda x: x[1], reverse=True) # Find the combination with the highest mean, skip fully evaluated ones for combination in sorted_results: key, mean, program, params = combination if key in fully_evaled_param_combos: continue return program, mean, key, params # If no valid program is found, we return the last valid one that we found return program, mean, key, params def calculate_last_n_proposed_quality( base_program, trial_logs, evaluate, trainset, devset, n, ): """ Calculate the average and best quality of the last n programs proposed. This is useful for seeing if our proposals are actually 'improving' overtime or not. """ # Get the trials from the last n keys in trial logs last_n_trial_nums = list(trial_logs.keys())[-n:] # Calculate the average and best score of these trials # if num_eval_calls in the trial is less than the trainset, throw a not-implemented error for now total_train_score = 0 best_train_score = 0 total_dev_score = 0 best_dev_score = 0 for trial_num in last_n_trial_nums: full_eval = trial_logs[trial_num]["full_eval"] if not full_eval: raise NotImplementedError( "Still need to implement non full eval handling in calculate_last_n_proposed_quality", ) train_score = trial_logs[trial_num]["score"] program = base_program.deepcopy() program.load(trial_logs[trial_num]["program_path"]) dev_score = evaluate(program, devset=devset) total_train_score += train_score total_dev_score += dev_score if train_score > best_train_score: best_train_score = train_score best_dev_score = dev_score return best_train_score, total_train_score / n, best_dev_score, total_dev_score / n ### LOGGING UTILS ### def get_task_model_history_for_full_example( candidate_program, task_model, devset, evaluate, ): """Get a full trace of the task model's history for a given candidate program.""" _ = evaluate(candidate_program, devset=devset[:1]) _ = task_model.inspect_history(n=len(candidate_program.predictors())) return task_model.inspect_history(n=len(candidate_program.predictors())) def print_full_program(program): """Print out the program's instructions & prefixes for each module.""" for i, predictor in enumerate(program.predictors()): print(f"Predictor {i}") print(f"i: {get_signature(predictor).instructions}") *_, last_field = get_signature(predictor).fields.values() print(f"p: {last_field.json_schema_extra['prefix']}") print("\n") def save_candidate_program(program, log_dir, trial_num, note=None): """Save the candidate program to the log directory.""" if log_dir is None: return None # Ensure the directory exists eval_programs_dir = os.path.join(log_dir, "evaluated_programs") os.makedirs(eval_programs_dir, exist_ok=True) # Define the save path for the program if note: save_path = os.path.join(eval_programs_dir, f"program_{trial_num}_{note}.json") else: save_path = os.path.join(eval_programs_dir, f"program_{trial_num}.json") # Save the program program.save(save_path) return save_path def save_file_to_log_dir(source_file_path, log_dir): if log_dir is None: return """Save a file to our log directory""" if not os.path.exists(log_dir): os.makedirs(log_dir) destination_file_path = os.path.join(log_dir, os.path.basename(source_file_path)) # Copy the file shutil.copy(source_file_path, destination_file_path) def setup_logging(log_dir): """Setup logger, which will log our print statements to a txt file at our log_dir for later viewing""" if log_dir is None: return # Create a logger logger = logging.getLogger() logger.setLevel(logging.WARNING) # Create a file handler that logs debug and higher level messages file_handler = logging.FileHandler(f"{log_dir}/logs.txt") file_handler.setLevel(logging.WARNING) file_formatter = logging.Formatter("%(asctime)s - %(message)s") file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) # Create a console handler with a higher log level console_handler = logging.StreamHandler() console_handler.setLevel(logging.WARNING) console_formatter = logging.Formatter("%(message)s") console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) def get_token_usage(model) -> tuple[int, int]: """ Extract total input tokens and output tokens from a model's interaction history. Returns (total_input_tokens, total_output_tokens). """ if not hasattr(model, "history"): return 0, 0 input_tokens = [] output_tokens = [] for interaction in model.history: usage = interaction.get("usage", {}) _input_tokens = usage.get("prompt_tokens", 0) _output_tokens = usage.get("completion_tokens", 0) input_tokens.append(_input_tokens) output_tokens.append(_output_tokens) total_input_tokens = int(np.sum(input_tokens)) total_output_tokens = int(np.sum(output_tokens)) return total_input_tokens, total_output_tokens def log_token_usage(trial_logs, trial_num, model_dict): """ Extract total input and output tokens used by each model and log to trial_logs[trial_num]["token_usage"]. """ token_usage_dict = {} for model_name, model in model_dict.items(): in_tokens, out_tokens = get_token_usage(model) token_usage_dict[model_name] = {"total_input_tokens": in_tokens, "total_output_tokens": out_tokens} # Store token usage info in trial logs trial_logs[trial_num]["token_usage"] = token_usage_dict ### OTHER UTILS ### def get_prompt_model(prompt_model): if prompt_model: return prompt_model else: return dspy.settings.lm def get_signature(predictor): assert hasattr(predictor, "signature") return predictor.signature def set_signature(predictor, updated_signature): assert hasattr(predictor, "signature") predictor.signature = updated_signature def create_n_fewshot_demo_sets( student, num_candidate_sets, trainset, max_labeled_demos, max_bootstrapped_demos, metric, teacher_settings, max_errors=None, max_rounds=1, labeled_sample=True, min_num_samples=1, metric_threshold=None, teacher=None, include_non_bootstrapped=True, seed=0, rng=None, ): """ This function is copied from random_search.py, and creates fewshot examples in the same way that random search does. This allows us to take advantage of using the same fewshot examples when we use the same random seed in our optimizers. """ max_errors = dspy.settings.max_errors if max_errors is None else max_errors demo_candidates = {} # Account for confusing way this is set up, where we add in 3 more candidate sets to the N specified num_candidate_sets -= 3 # Initialize demo_candidates dictionary for i, _ in enumerate(student.predictors()): demo_candidates[i] = [] rng = rng or random.Random(seed) # Go through and create each candidate set for seed in range(-3, num_candidate_sets): print(f"Bootstrapping set {seed + 4}/{num_candidate_sets + 3}") trainset_copy = list(trainset) if seed == -3 and include_non_bootstrapped: # zero-shot program2 = student.reset_copy() elif seed == -2 and max_labeled_demos > 0 and include_non_bootstrapped: # labels only teleprompter = LabeledFewShot(k=max_labeled_demos) program2 = teleprompter.compile( student, trainset=trainset_copy, sample=labeled_sample, ) elif seed == -1: # unshuffled few-shot program = BootstrapFewShot( metric=metric, max_errors=max_errors, max_bootstrapped_demos=max_bootstrapped_demos, max_labeled_demos=max_labeled_demos, teacher_settings=teacher_settings, max_rounds=max_rounds, ) program2 = program.compile(student, teacher=teacher, trainset=trainset_copy) else: # shuffled few-shot rng.shuffle(trainset_copy) size = rng.randint(min_num_samples, max_bootstrapped_demos) teleprompter = BootstrapFewShot( metric=metric, max_errors=max_errors, metric_threshold=metric_threshold, max_bootstrapped_demos=size, max_labeled_demos=max_labeled_demos, teacher_settings=teacher_settings, max_rounds=max_rounds, ) program2 = teleprompter.compile( student, teacher=teacher, trainset=trainset_copy, ) for i, _ in enumerate(student.predictors()): demo_candidates[i].append(program2.predictors()[i].demos) return demo_candidates def old_getfile(object): """Work out which source or compiled file an object was defined in.""" if inspect.ismodule(object): if getattr(object, "__file__", None): return object.__file__ raise TypeError(f"{object!r} is a built-in module") if inspect.isclass(object): if hasattr(object, "__module__"): module = sys.modules.get(object.__module__) if getattr(module, "__file__", None): return module.__file__ if object.__module__ == "__main__": raise OSError("source code not available") raise TypeError(f"{object!r} is a built-in class") if inspect.ismethod(object): object = object.__func__ if inspect.isfunction(object): object = object.__code__ if inspect.istraceback(object): object = object.tb_frame if inspect.isframe(object): object = object.f_code if inspect.iscode(object): return object.co_filename raise TypeError( f"module, class, method, function, traceback, frame, or code object was expected, got {type(object).__name__}" ) def new_getfile(object): if not inspect.isclass(object): return old_getfile(object) # Lookup by parent module (as in current inspect) if hasattr(object, "__module__"): object_ = sys.modules.get(object.__module__) if hasattr(object_, "__file__"): return object_.__file__ # If parent module is __main__, lookup by methods (NEW) for _, member in inspect.getmembers(object): if inspect.isfunction(member) and object.__qualname__ + "." + member.__name__ == member.__qualname__: return inspect.getfile(member) raise TypeError(f"Source for {object!r} not found") inspect.getfile = new_getfile ``` -------------------------------------------------------------------------------- /docs/docs/cheatsheet.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 999 --- # DSPy Cheatsheet This page will contain snippets for frequent usage patterns. ## DSPy Programs ### Forcing fresh LM outputs DSPy caches LM calls. Provide a unique ``rollout_id`` and set a non-zero ``temperature`` (e.g., 1.0) to bypass an existing cache entry while still caching the new result: ```python predict = dspy.Predict("question -> answer") predict(question="1+1", config={"rollout_id": 1, "temperature": 1.0}) ``` ### dspy.Signature ```python class BasicQA(dspy.Signature): """Answer questions with short factoid answers.""" question: str = dspy.InputField() answer: str = dspy.OutputField(desc="often between 1 and 5 words") ``` ### dspy.ChainOfThought ```python generate_answer = dspy.ChainOfThought(BasicQA) # Call the predictor on a particular input alongside a hint. question='What is the color of the sky?' pred = generate_answer(question=question) ``` ### dspy.ProgramOfThought ```python pot = dspy.ProgramOfThought(BasicQA) question = 'Sarah has 5 apples. She buys 7 more apples from the store. How many apples does Sarah have now?' result = pot(question=question) print(f"Question: {question}") print(f"Final Predicted Answer (after ProgramOfThought process): {result.answer}") ``` ### dspy.ReAct ```python react_module = dspy.ReAct(BasicQA) question = 'Sarah has 5 apples. She buys 7 more apples from the store. How many apples does Sarah have now?' result = react_module(question=question) print(f"Question: {question}") print(f"Final Predicted Answer (after ReAct process): {result.answer}") ``` ### dspy.Retrieve ```python colbertv2_wiki17_abstracts = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts') dspy.settings.configure(rm=colbertv2_wiki17_abstracts) #Define Retrieve Module retriever = dspy.Retrieve(k=3) query='When was the first FIFA World Cup held?' # Call the retriever on a particular query. topK_passages = retriever(query).passages for idx, passage in enumerate(topK_passages): print(f'{idx+1}]', passage, '\n') ``` ### dspy.CodeAct ```python from dspy import CodeAct def factorial(n): """Calculate factorial of n""" if n == 1: return 1 return n * factorial(n-1) act = CodeAct("n->factorial", tools=[factorial]) result = act(n=5) result # Returns 120 ``` ### dspy.Parallel ```python import dspy parallel = dspy.Parallel(num_threads=2) predict = dspy.Predict("question -> answer") result = parallel( [ (predict, dspy.Example(question="1+1").with_inputs("question")), (predict, dspy.Example(question="2+2").with_inputs("question")) ] ) result ``` ## DSPy Metrics ### Function as Metric To create a custom metric you can create a function that returns either a number or a boolean value: ```python def parse_integer_answer(answer, only_first_line=True): try: if only_first_line: answer = answer.strip().split('\n')[0] # find the last token that has a number in it answer = [token for token in answer.split() if any(c.isdigit() for c in token)][-1] answer = answer.split('.')[0] answer = ''.join([c for c in answer if c.isdigit()]) answer = int(answer) except (ValueError, IndexError): # print(answer) answer = 0 return answer # Metric Function def gsm8k_metric(gold, pred, trace=None) -> int: return int(parse_integer_answer(str(gold.answer))) == int(parse_integer_answer(str(pred.answer))) ``` ### LLM as Judge ```python class FactJudge(dspy.Signature): """Judge if the answer is factually correct based on the context.""" context = dspy.InputField(desc="Context for the prediction") question = dspy.InputField(desc="Question to be answered") answer = dspy.InputField(desc="Answer for the question") factually_correct: bool = dspy.OutputField(desc="Is the answer factually correct based on the context?") judge = dspy.ChainOfThought(FactJudge) def factuality_metric(example, pred): factual = judge(context=example.context, question=example.question, answer=pred.answer) return factual.factually_correct ``` ## DSPy Evaluation ```python from dspy.evaluate import Evaluate evaluate_program = Evaluate(devset=devset, metric=your_defined_metric, num_threads=NUM_THREADS, display_progress=True, display_table=num_rows_to_display) evaluate_program(your_dspy_program) ``` ## DSPy Optimizers ### LabeledFewShot ```python from dspy.teleprompt import LabeledFewShot labeled_fewshot_optimizer = LabeledFewShot(k=8) your_dspy_program_compiled = labeled_fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset) ``` ### BootstrapFewShot ```python from dspy.teleprompt import BootstrapFewShot fewshot_optimizer = BootstrapFewShot(metric=your_defined_metric, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=10) your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset) ``` #### Using another LM for compilation, specifying in teacher_settings ```python from dspy.teleprompt import BootstrapFewShot fewshot_optimizer = BootstrapFewShot(metric=your_defined_metric, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=10, teacher_settings=dict(lm=gpt4)) your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset) ``` #### Compiling a compiled program - bootstrapping a bootstrapped program ```python your_dspy_program_compiledx2 = teleprompter.compile( your_dspy_program, teacher=your_dspy_program_compiled, trainset=trainset, ) ``` #### Saving/loading a compiled program ```python save_path = './v1.json' your_dspy_program_compiledx2.save(save_path) ``` ```python loaded_program = YourProgramClass() loaded_program.load(path=save_path) ``` ### BootstrapFewShotWithRandomSearch Detailed documentation on BootstrapFewShotWithRandomSearch can be found [here](api/optimizers/BootstrapFewShot.md). ```python from dspy.teleprompt import BootstrapFewShotWithRandomSearch fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS) your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset, valset=devset) ``` Other custom configurations are similar to customizing the `BootstrapFewShot` optimizer. ### Ensemble ```python from dspy.teleprompt import BootstrapFewShotWithRandomSearch from dspy.teleprompt.ensemble import Ensemble fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS) your_dspy_program_compiled = fewshot_optimizer.compile(student = your_dspy_program, trainset=trainset, valset=devset) ensemble_optimizer = Ensemble(reduce_fn=dspy.majority) programs = [x[-1] for x in your_dspy_program_compiled.candidate_programs] your_dspy_program_compiled_ensemble = ensemble_optimizer.compile(programs[:3]) ``` ### BootstrapFinetune ```python from dspy.teleprompt import BootstrapFewShotWithRandomSearch, BootstrapFinetune #Compile program on current dspy.settings.lm fewshot_optimizer = BootstrapFewShotWithRandomSearch(metric=your_defined_metric, max_bootstrapped_demos=2, num_threads=NUM_THREADS) your_dspy_program_compiled = tp.compile(your_dspy_program, trainset=trainset[:some_num], valset=trainset[some_num:]) #Configure model to finetune config = dict(target=model_to_finetune, epochs=2, bf16=True, bsize=6, accumsteps=2, lr=5e-5) #Compile program on BootstrapFinetune finetune_optimizer = BootstrapFinetune(metric=your_defined_metric) finetune_program = finetune_optimizer.compile(your_dspy_program, trainset=some_new_dataset_for_finetuning_model, **config) finetune_program = your_dspy_program #Load program and activate model's parameters in program before evaluation ckpt_path = "saved_checkpoint_path_from_finetuning" LM = dspy.HFModel(checkpoint=ckpt_path, model=model_to_finetune) for p in finetune_program.predictors(): p.lm = LM p.activated = False ``` ### COPRO Detailed documentation on COPRO can be found [here](api/optimizers/COPRO.md). ```python from dspy.teleprompt import COPRO eval_kwargs = dict(num_threads=16, display_progress=True, display_table=0) copro_teleprompter = COPRO(prompt_model=model_to_generate_prompts, metric=your_defined_metric, breadth=num_new_prompts_generated, depth=times_to_generate_prompts, init_temperature=prompt_generation_temperature, verbose=False) compiled_program_optimized_signature = copro_teleprompter.compile(your_dspy_program, trainset=trainset, eval_kwargs=eval_kwargs) ``` ### MIPROv2 Note: detailed documentation can be found [here](api/optimizers/MIPROv2.md). `MIPROv2` is the latest extension of `MIPRO` which includes updates such as (1) improvements to instruction proposal and (2) more efficient search with minibatching. #### Optimizing with MIPROv2 This shows how to perform an easy out-of-the box run with `auto=light`, which configures many hyperparameters for you and performs a light optimization run. You can alternatively set `auto=medium` or `auto=heavy` to perform longer optimization runs. The more detailed `MIPROv2` documentation [here](api/optimizers/MIPROv2.md) also provides more information about how to set hyperparameters by hand. ```python # Import the optimizer from dspy.teleprompt import MIPROv2 # Initialize optimizer teleprompter = MIPROv2( metric=gsm8k_metric, auto="light", # Can choose between light, medium, and heavy optimization runs ) # Optimize program print(f"Optimizing program with MIPRO...") optimized_program = teleprompter.compile( program.deepcopy(), trainset=trainset, max_bootstrapped_demos=3, max_labeled_demos=4, ) # Save optimize program for future use optimized_program.save(f"mipro_optimized") # Evaluate optimized program print(f"Evaluate optimized program...") evaluate(optimized_program, devset=devset[:]) ``` #### Optimizing instructions only with MIPROv2 (0-Shot) ```python # Import the optimizer from dspy.teleprompt import MIPROv2 # Initialize optimizer teleprompter = MIPROv2( metric=gsm8k_metric, auto="light", # Can choose between light, medium, and heavy optimization runs ) # Optimize program print(f"Optimizing program with MIPRO...") optimized_program = teleprompter.compile( program.deepcopy(), trainset=trainset, max_bootstrapped_demos=0, max_labeled_demos=0, ) # Save optimize program for future use optimized_program.save(f"mipro_optimized") # Evaluate optimized program print(f"Evaluate optimized program...") evaluate(optimized_program, devset=devset[:]) ``` ### KNNFewShot ```python from sentence_transformers import SentenceTransformer from dspy import Embedder from dspy.teleprompt import KNNFewShot from dspy import ChainOfThought knn_optimizer = KNNFewShot(k=3, trainset=trainset, vectorizer=Embedder(SentenceTransformer("all-MiniLM-L6-v2").encode)) qa_compiled = knn_optimizer.compile(student=ChainOfThought("question -> answer")) ``` ### BootstrapFewShotWithOptuna ```python from dspy.teleprompt import BootstrapFewShotWithOptuna fewshot_optuna_optimizer = BootstrapFewShotWithOptuna(metric=your_defined_metric, max_bootstrapped_demos=2, num_candidate_programs=8, num_threads=NUM_THREADS) your_dspy_program_compiled = fewshot_optuna_optimizer.compile(student=your_dspy_program, trainset=trainset, valset=devset) ``` Other custom configurations are similar to customizing the `dspy.BootstrapFewShot` optimizer. ### SIMBA SIMBA, which stands for Stochastic Introspective Mini-Batch Ascent, is a prompt optimizer that accepts arbitrary DSPy programs and proceeds in a sequence of mini-batches seeking to make incremental improvements to the prompt instructions or few-shot examples. ```python from dspy.teleprompt import SIMBA simba = SIMBA(metric=your_defined_metric, max_steps=12, max_demos=10) optimized_program = simba.compile(student=your_dspy_program, trainset=trainset) ``` ## DSPy Tools and Utilities ### dspy.Tool ```python import dspy def search_web(query: str) -> str: """Search the web for information""" return f"Search results for: {query}" tool = dspy.Tool(search_web) result = tool(query="Python programming") ``` ### dspy.streamify ```python import dspy import asyncio predict = dspy.Predict("question->answer") stream_predict = dspy.streamify( predict, stream_listeners=[dspy.streaming.StreamListener(signature_field_name="answer")], ) async def read_output_stream(): output_stream = stream_predict(question="Why did a chicken cross the kitchen?") async for chunk in output_stream: print(chunk) asyncio.run(read_output_stream()) ``` ### dspy.asyncify ```python import dspy dspy_program = dspy.ChainOfThought("question -> answer") dspy_program = dspy.asyncify(dspy_program) asyncio.run(dspy_program(question="What is DSPy")) ``` ### Track Usage ```python import dspy dspy.settings.configure(track_usage=True) result = dspy.ChainOfThought(BasicQA)(question="What is 2+2?") print(f"Token usage: {result.get_lm_usage()}") ``` ### dspy.configure_cache ```python import dspy # Configure cache settings dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) ``` ## DSPy `Refine` and `BestofN` >`dspy.Suggest` and `dspy.Assert` are replaced by `dspy.Refine` and `dspy.BestofN` in DSPy 2.6. ### BestofN Runs a module up to `N` times with different rollout IDs (bypassing cache) and returns the best prediction, as defined by the `reward_fn`, or the first prediction that passes the `threshold`. ```python import dspy qa = dspy.ChainOfThought("question -> answer") def one_word_answer(args, pred): return 1.0 if len(pred.answer) == 1 else 0.0 best_of_3 = dspy.BestOfN(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0) best_of_3(question="What is the capital of Belgium?").answer # Brussels ``` ### Refine Refines a module by running it up to `N` times with different rollout IDs (bypassing cache) and returns the best prediction, as defined by the `reward_fn`, or the first prediction that passes the `threshold`. After each attempt (except the final one), `Refine` automatically generates detailed feedback about the module's performance and uses this feedback as hints for subsequent runs, creating an iterative refinement process. ```python import dspy qa = dspy.ChainOfThought("question -> answer") def one_word_answer(args, pred): return 1.0 if len(pred.answer) == 1 else 0.0 best_of_3 = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0) best_of_3(question="What is the capital of Belgium?").answer # Brussels ``` #### Error Handling By default, `Refine` will try to run the module up to N times until the threshold is met. If the module encounters an error, it will keep going up to N failed attempts. You can change this behavior by setting `fail_count` to a smaller number than `N`. ```python refine = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=1) ... refine(question="What is the capital of Belgium?") # If we encounter just one failed attempt, the module will raise an error. ``` If you want to run the module up to N times without any error handling, you can set `fail_count` to `N`. This is the default behavior. ```python refine = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=3) ... refine(question="What is the capital of Belgium?") ``` ``` -------------------------------------------------------------------------------- /docs/docs/roadmap.md: -------------------------------------------------------------------------------- ```markdown --- draft: true --- !!! warning "This document is from Aug 2024. Since then, DSPy 2.5 and 2.6 were released, DSPy has grown considerably, and 3.0 is approaching! Content below is highly outdated." # Roadmap Sketch: DSPy 2.5+ It’s been a year since DSPy evolved out of Demonstrate–Search–Predict (DSP), whose research started at Stanford NLP all the way back in February 2022. Thanks to 200 wonderful contributors, DSPy has introduced tens of thousands of people to building modular LM programs and optimizing their prompts and weights automatically. In this time, DSPy has grown to 160,000 monthly downloads and 16,000 stars on GitHub, becoming synonymous with prompt optimization in many circles and inspiring at least a half-dozen cool new libraries. This document is an initial sketch of DSPy’s public roadmap for the next few weeks and months, as we work on DSPy 2.5 and plan for DSPy 3.0. Suggestions and open-source contributors are more than welcome: just open an issue or submit a pull request regarding the roadmap. ## Technical Objectives The thesis of DSPy is that for LMs to be useful, we have to shift from ad-hoc prompting to new notions of programming LMs. Instead of relying on LMs gaining much more general or more compositional capabilities, we need to enable developers to iteratively explore their problems and build modular software that invokes LMs for well-scoped tasks. We need to enable that through modules and optimizers that isolate how they decompose their problems and describe their system's objectives from how their LMs are invoked or fine-tuned to maximize their objectives. DSPy's goal has been to develop (and to build the community and shared infrastructure for the collective development of) the abstractions, programming patterns, and optimizers toward this thesis. To a first approximation, DSPy’s current user-facing language has the minimum number of appropriate abstractions that address the goals above: declarative signatures, define-by-run modules, and optimizers that can be composed quite powerfully. But there are several things we need to do better to realize our goals. The upcoming DSPy releases will have the following objectives. 1. Polishing the core functionality. 2. Developing more accurate, lower-cost optimizers. 3. Building end-to-end tutorials from DSPy’s ML workflow to deployment. 4. Shifting towards more interactive optimization & tracking. ## Team & Organization DSPy is fairly unusual in its technical objectives, contributors, and audience. Though DSPy takes inspiration from PyTorch, a library for building and optimizing DNNs, there is one major difference: PyTorch was introduced well after DNNs were mature ML concepts, but DSPy seeks to establish and advance core LM Programs research: the framework is propelled by constant academic research from programming abstractions (like the original **Demonstrate–Search–Predict** concepts, DSPy **Signatures**, or **LM Assertions**) to NLP systems (like **STORM**, **PATH**, and **IReRa**) to prompt optimizers (like **MIPRO**) and RL (like **BetterTogether**), among many other related directions. This research all composes into a concrete, practical library, thanks to dozens of industry contributors, many of whom are deploying apps in production using DSPy. Because of this, DSPy reaches not only of grad students and ML engineers, but also many non-ML engineers, from early adopter SWEs to hobbyists exploring new ways of using LMs. The following team, with help from many folks in the OSS community, is working towards the objectives in this Roadmap. **Project Lead:** Omar Khattab (Stanford & Databricks) **Project Mentors:** Chris Potts (Stanford), Matei Zaharia (UC Berkeley & Databricks), Heather Miller (CMU & Two Sigma) **Core Library:** Arnav Singhvi (Databricks & Stanford), Herumb Shandilya (Stanford), Hanna Moazam (Databricks), Sri Vardhamanan (Dashworks), Cyrus Nouroozi (Zenbase), Amir Mehr (Zenbase), Kyle Caverly (Modular), with special thanks to Keshav Santhanam (Stanford), Thomas Ahle (Normal Computing), Connor Shorten (Weaviate) **Prompt Optimization:** Krista Opsahl-Ong (Stanford), Michael Ryan (Stanford), Josh Purtell (Basis), with special thanks to Eric Zhang (Stanford) **Finetuning & RL:** Dilara Soylu (Stanford), Isaac Miller (Anyscale), Karel D'Oosterlinck (Ghent), with special thanks to Paridhi Masehswari (Stanford) **PL Abstractions:** Shangyin Tan (UC Berkeley), Manish Shetty (UC Berkeley), Peter Zhong (CMU) **Applications:** Jasper Xian (Waterloo), Saron Samuel (Stanford), Alberto Mancarella (Stanford), Faraz Khoubsirat (Waterloo), Saiful Haq (IIT-B), Ashutosh Sharma (UIUC) ## 1) Polishing the core functionality. Over the next month, polishing is the main objective and likely the one to have the highest ROI on the experience of the average user. Conceptually, DSPy has an extremely small core. It’s nothing but (1) LMs, (2) Signatures & Modules, (3) Optimizers, and (4) Assertions. These concepts and their implementations evolved organically over the past couple of years. We are working now to consolidate what we’ve learned and refactor internally so that things “just work” out of the box for new users, who may not know all the tips-and-tricks just yet. More concretely: 1. We want to increase the quality of zero-shot, off-the-shelf DSPy programs, i.e. those not yet compiled on custom data. 2. Wherever possible, DSPy should delegate lower-level internal complexity (like managing LMs and structured generation) to emerging lower-level libraries. When required, we may fork smaller libraries out of DSPy to support infrastructure pieces as their own projects. 3. DSPy should internally be more modular and we need higher compatibility between internal components. Specifically, we need more deeper and more native investment in (i) typed multi-field constraints, (ii) assertions, (iii) observability and experimental tracking, (iv) deployment of artifacts and related concerns like streaming and async, and (v) fine-tuning and serving open models. ### On LMs As of DSPy 2.4, the library has approximately 20,000 lines of code and roughly another 10,000 lines of code for tests, examples, and documentation. Some of these are clearly necessary (e.g., DSPy optimizers) but others exist only because the LM space lacks the building blocks we need under the hood. Luckily, for LM interfaces, a very strong library now exists: LiteLLM, a library that unifies interfaces to various LM and embedding providers. We expect to reduce around 6000 LoCs of support for custom LMs and retrieval models by shifting a lot of that to LiteLLM. Objectives in this space include improved caching, saving/loading of LMs, support for streaming and async LM requests. Work here is currently led by Hanna Moazam and Sri Vardhamanan, building on a foundation by Cyrus Nouroozi, Amir Mehr, Kyle Caverly, and others. ### On Signatures & Modules Traditionally, LMs offer text-in-text-out interfaces. Toward modular programming, DSPy introduced signatures for the first time (as DSP Templates in Jan 2023) as a way to structure the inputs and outputs of LM interactions. Standard prompts conflate interface (“what should the LM do?”) with implementation (“how do we tell it to do that?”). DSPy signatures isolate the former so we can infer and learn the latter from data — in the context of a bigger program. Today in the LM landscape, notions of "structured outputs" have evolved dramatically, thanks to constrained decoding and other improvements, and have become mainstream. What may be called "structured inputs" remains is yet to become mainstream outside of DSPy, but is as crucial. Objectives in this space include refining the abstractions and implementations first-class notion of LM Adapters in DSPy, as translators that sits between signatures and LM interfaces. While Optimizers adjust prompts through interactions with a user-supplied metric and data, Adapters are more concerned with building up interactions with LMs to account for, e.g. (i) non-plaintext LM interfaces like chat APIs, structured outputs, function calling, and multi-modal APIs, (ii) languages beyond English or other forms of higher-level specialization. This has been explored in DSPy on and off in various forms, but we have started working on more fundamental approaches to this problem that will offer tangible improvements to most use-cases. Work here is currently led by Omar Khattab. ### On Finetuning & Serving In February 2023, DSPy introduced the notion of compiling to optimize the weights of an LM program. (To understand just how long ago that was in AI terms, this was before the Alpaca training project at Stanford had even started and a month before the first GPT-4 was released.) Since then, we have shown in October 2023 and, much more expansively, in July 2024, that the fine-tuning flavor of DSPy can deliver large gains for small LMs, especially when composed with prompt optimization. Overall, though, most DSPy users in practice explore prompt optimization and not weight optimization and most of our examples do the same. The primary reason for a lot of this is infrastructure. Fine-tuning in the DSPy flavor is more than just training a model: ultimately, we need to bootstrap training data for several different modules in a program, train multiple models and handle model selection, and then load and plug in those models into the program's modules. Doing this robustly at the level of abstraction DSPy offers requires a level of resource management that is not generally supported by external existing tools. Major efforts in this regard are currently led by Dilara Soylu and Isaac Miller. ### On Optimizers & Assertions This is a naturally major direction in the course of polishing. We will share more thoughts here after making more progress on the three angles above. ## 2) Developing more accurate, lower-cost optimizers. A very large fraction of the research in DSPy focuses on optimizing the prompts and the weights of LM programs. In December 2022, we introduced the algorithm and abstractions behind BootstrapFewShot (as Demonstrate in DSP) and several of its variants. In February 2023, we introduced the core version of what later became BootstrapFinetune. In August 2023, we introduced new variations of both of these. In December 2023, we introduced the first couple of instruction optimizers into DSPy, CA-OPRO and early versions of MIPRO. These were again upgraded in March 2024. Fast forward to June and July 2024, we released MIPROv2 for prompt optimization and BetterTogether for fine-tuning the weights of LM programs. We have been working towards a number of stronger optimizers. While we cannot share the internal details of research on new optimizers yet, we can outline the goals. A DSPy optimizer can be characterized via three angles: 1. Quality: How much quality can it deliver from various LMs? How effective does it need the zero-shot program to be in order to work well? 2. Cost: How many labeled (and unlabeled) inputs does it need? How many invocations of the program does it need? How expensive is the resulting optimized program at inference time? 3. Robustness: How well can it generalize to different unseen data points or distributions? How sensitive is it to mistakes of the metric or labels? Over the next six months, our goal is to dramatically improve each angle of these _when the other two are held constant_. Concretely, there are three directions here. - Benchmarking: A key prerequisite here is work on benchmarking. On the team, Michael Ryan and Shangyin Tan are leading these efforts. More soon. - Quality: The goal here is optimizers that extract, on average, 20% more on representative tasks than MIPROv2 and BetterTogether, under the usual conditions — like a few hundred inputs with labels and a good metric starting from a decent zero-shot program. Various efforts here are led by Dilara Soylu, Michael Ryan, Josh Purtell, Krista Opsahl-Ong, and Isaac Miller. - Efficiency: The goal here is optimizers that match the current best scores from MIPROv2 and BetterTogether but under 1-2 challenges like: (i) starting from only 10-20 inputs with labels, (ii) starting with a weak zero-shot program that scores 0%, (iii) where significant misalignment exists between train/validation and test, or (iii) where the user supplies no metric but provides a very small number of output judgments. ## 3) Building end-to-end tutorials from DSPy’s ML workflow to deployment. Using DSPy well for solving a new task is just doing good machine learning with LMs, but teaching this is hard. On the one hand, it's an iterative process: you make some initial choices, which will be sub-optimal, and then you refine them incrementally. It's highly exploratory: it's often the case that no one knows yet how to best solve a problem in a DSPy-esque way. One the other hand, DSPy offers many emerging lessons from several years of building LM systems, in which the design space, the data regime, and many other factors are new both to ML experts and to the very large fraction of users that have no ML experience. Though current docs do address [a bunch of this](learn/index.md) in isolated ways, one thing we've learned is that we should separate teaching the core DSPy language (which is ultimately pretty small) from teaching the emerging ML workflow that works well in a DSPy-esque setting. As a natural extension of this, we need to place more emphasis on steps prior and after to the explicit coding in DSPy, from data collection to deployment that serves and monitors the optimized DSPy program in practice. This is just starting but efforts will be ramping up led by Omar Khattab, Isaac Miller, and Herumb Shandilya. ## 4) Shifting towards more interactive optimization & tracking. Right now, a DSPy user has a few ways to observe and tweak the process of optimization. They can study the prompts before, during, and after optimization methods like `inspect_history`, built-in logging, and/or the metadata returned by optimizers. Similarly, they can rely on `program.save` and `program.load` to potentially adjust the optimized prompts by hand. Alternatively, they can use one of the many powerful observability integrations — like from Phoenix Arize, LangWatch, or Weights & Biases Weave — to observe _in real time_ the process of optimization (e.g., scores, stack traces, successful & failed traces, and candidate prompts). DSPy encourages iterative engineering by adjusting the program, data, or metrics across optimization runs. For example, some optimizers allow “checkpointing” — e.g., if you optimize with BootstrapFewShotWithRandomSearch for 10 iterations then increase to 15 iterations, the first 10 will be loaded from cache. While these can accomplish a lot of goals, there are two limitations that future versions of DSPy will seek to address. 1. In general, DSPy’s (i) observability, (ii) experimental tracking, (iii) cost management, and (iii) deployment of programs should become first-class concerns via integration with tools like MLFlow. We will share more plans addressing this for DSPy 2.6 in the next 1-2 months. 2. DSPy 3.0 will introduce new optimizers that prioritize ad-hoc, human-in-the-loop feedback. This is perhaps the only substantial paradigm shift we see as necessary in the foreseeable future in DSPy. It involves various research questions at the level of the abstractions, UI/HCI, and ML, so it is a longer-term goal that we will share more about in the next 3-4 month. ``` -------------------------------------------------------------------------------- /dspy/clients/lm_local.py: -------------------------------------------------------------------------------- ```python import datetime import logging import random import socket import string import subprocess import threading import time from typing import TYPE_CHECKING, Any import requests from dspy.clients.provider import Provider, TrainingJob from dspy.clients.utils_finetune import TrainDataFormat, save_data if TYPE_CHECKING: from dspy.clients.lm import LM logger = logging.getLogger(__name__) class LocalProvider(Provider): def __init__(self): super().__init__() self.finetunable = True self.TrainingJob = TrainingJob @staticmethod def launch(lm: "LM", launch_kwargs: dict[str, Any] | None = None): try: import sglang # noqa: F401 except ImportError: raise ImportError( "For local model launching, please install sglang." "Navigate to https://docs.sglang.ai/start/install.html for the latest installation instructions!" ) if hasattr(lm, "process"): logger.info("Server is already launched.") return launch_kwargs = launch_kwargs or lm.launch_kwargs import os model = lm.model if model.startswith("openai/"): model = model[7:] if model.startswith("local:"): model = model[6:] if model.startswith("huggingface/"): model = model[len("huggingface/") :] logger.info(f"Grabbing a free port to launch an SGLang server for model {model}") logger.info(f"We see that CUDA_VISIBLE_DEVICES is {os.environ.get('CUDA_VISIBLE_DEVICES', 'unset')}") port = get_free_port() timeout = launch_kwargs.get("timeout", 1800) command = f"python -m sglang.launch_server --model-path {model} --port {port} --host 0.0.0.0" # We will manually stream & capture logs. process = subprocess.Popen( command.replace("\\\n", " ").replace("\\", " ").split(), text=True, stdout=subprocess.PIPE, # We'll read from pipe stderr=subprocess.STDOUT, # Merge stderr into stdout ) # A threading.Event to control printing after the server is ready. # This will store *all* lines (both before and after readiness). logger.info(f"SGLang server process started with PID {process.pid}.") stop_printing_event = threading.Event() logs_buffer = [] def _tail_process(proc, buffer, stop_event): while True: line = proc.stdout.readline() if not line and proc.poll() is not None: # Process ended and no new line break if line: buffer.append(line) # Print only if stop_event is not set if not stop_event.is_set(): print(line, end="") # Start a background thread to read from the process continuously thread = threading.Thread( target=_tail_process, args=(process, logs_buffer, stop_printing_event), daemon=True, ) thread.start() # Wait until the server is ready (or times out) base_url = f"http://localhost:{port}" try: wait_for_server(base_url, timeout=timeout) except TimeoutError: # If the server doesn't come up, we might want to kill it: process.kill() raise # Once server is ready, we tell the thread to stop printing further lines. stop_printing_event.set() # A convenience getter so the caller can see all logs so far (and future). def get_logs() -> str: # Join them all into a single string, or you might return a list return "".join(logs_buffer) # Let the user know server is up logger.info(f"Server ready on random port {port}! Logs are available via lm.get_logs() method on returned lm.") lm.kwargs["api_base"] = f"http://localhost:{port}/v1" lm.kwargs["api_key"] = "local" lm.get_logs = get_logs lm.process = process lm.thread = thread @staticmethod def kill(lm: "LM", launch_kwargs: dict[str, Any] | None = None): from sglang.utils import terminate_process if not hasattr(lm, "process"): logger.info("No running server to kill.") return # Ideally, the following happens atomically terminate_process(lm.process) lm.thread.join() del lm.process del lm.thread del lm.get_logs logger.info("Server killed.") @staticmethod def finetune( job: TrainingJob, model: str, train_data: list[dict[str, Any]], train_data_format: TrainDataFormat | None, train_kwargs: dict[str, Any] | None = None, ) -> str: if model.startswith("openai/"): model = model[7:] if model.startswith("local:"): model = model[6:] if train_data_format != TrainDataFormat.CHAT: raise ValueError("Only chat models are supported for local finetuning.") data_path = save_data(train_data) logger.info(f"Train data saved to {data_path}") output_dir = create_output_dir(model, data_path) default_train_kwargs = { "device": None, "use_peft": False, "num_train_epochs": 5, "per_device_train_batch_size": 1, "gradient_accumulation_steps": 8, "learning_rate": 1e-5, "max_seq_length": None, "packing": True, "bf16": True, "output_dir": output_dir, } train_kwargs = {**default_train_kwargs, **(train_kwargs or {})} output_dir = train_kwargs["output_dir"] # user might have changed the output_dir logger.info(f"Starting local training, will save to {output_dir}") train_sft_locally( model_name=model, train_data=train_data, train_kwargs=train_kwargs, ) logger.info("Training complete") return f"openai/local:{output_dir}" def create_output_dir(model_name, data_path): model_str = model_name.replace("/", "-") time_str = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") rnd_str = "".join(random.choices(string.ascii_lowercase + string.digits, k=6)) model_identifier = f"{rnd_str}_{model_str}_{time_str}" output_dir = data_path.replace(".jsonl", "_" + model_identifier) return output_dir def train_sft_locally(model_name, train_data, train_kwargs): try: import torch from transformers import AutoModelForCausalLM, AutoTokenizer from trl import SFTConfig, SFTTrainer, setup_chat_format except ImportError: raise ImportError( "For local finetuning, please install torch, transformers, and trl " "by running `pip install -U torch transformers accelerate trl peft`" ) device = train_kwargs.get("device", None) if device is None: device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu" logger.info(f"Using device: {device}") model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=model_name).to(device) tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_name) # Set up the chat format; generally only for non-chat model variants, hence the try-except. try: model, tokenizer = setup_chat_format(model=model, tokenizer=tokenizer) except Exception: pass if tokenizer.pad_token_id is None: logger.info("Adding pad token to tokenizer") tokenizer.add_special_tokens({"pad_token": "[!#PAD#!]"}) logger.info("Creating dataset") if "max_seq_length" not in train_kwargs: train_kwargs["max_seq_length"] = 4096 logger.info( f"The 'train_kwargs' parameter didn't include a 'max_seq_length', defaulting to {train_kwargs['max_seq_length']}" ) from datasets import Dataset hf_dataset = Dataset.from_list(train_data) def tokenize_function(example): return encode_sft_example(example, tokenizer, train_kwargs["max_seq_length"]) # noqa: F821 tokenized_dataset = hf_dataset.map(tokenize_function, batched=False) tokenized_dataset.set_format(type="torch") tokenized_dataset = tokenized_dataset.filter(lambda example: (example["labels"] != -100).any()) use_peft = train_kwargs.get("use_peft", False) peft_config = None if use_peft: from peft import LoraConfig rank_dimension = 32 lora_alpha = 64 lora_dropout = 0.05 peft_config = LoraConfig( r=rank_dimension, lora_alpha=lora_alpha, lora_dropout=lora_dropout, bias="none", target_modules="all-linear", task_type="CAUSAL_LM", ) sft_config = SFTConfig( output_dir=train_kwargs["output_dir"], num_train_epochs=train_kwargs["num_train_epochs"], per_device_train_batch_size=train_kwargs["per_device_train_batch_size"], gradient_accumulation_steps=train_kwargs["gradient_accumulation_steps"], learning_rate=train_kwargs["learning_rate"], max_grad_norm=2.0, # note that the current SFTConfig default is 1.0 logging_steps=20, warmup_ratio=0.03, lr_scheduler_type="constant", save_steps=10_000, bf16=train_kwargs["bf16"], max_seq_length=train_kwargs["max_seq_length"], packing=train_kwargs["packing"], dataset_kwargs={ # We need to pass dataset_kwargs because we are processing the dataset ourselves "add_special_tokens": False, # Special tokens handled by template "append_concat_token": False, # No additional separator needed }, ) logger.info("Starting training") trainer = SFTTrainer( model=model, args=sft_config, train_dataset=tokenized_dataset, peft_config=peft_config, ) # Train! trainer.train() # Save the model! trainer.save_model() merge = True if use_peft and merge: from peft import AutoPeftModelForCausalLM # Load PEFT model on CPU model_ = AutoPeftModelForCausalLM.from_pretrained( pretrained_model_name_or_path=sft_config.output_dir, torch_dtype=torch.float16, low_cpu_mem_usage=True, ) merged_model = model_.merge_and_unload() merged_model.save_pretrained(sft_config.output_dir, safe_serialization=True, max_shard_size="5GB") # Clean up! import gc del model del tokenizer del trainer gc.collect() torch.cuda.empty_cache() return sft_config.output_dir def get_free_port() -> int: """ Return a free TCP port on localhost. """ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("localhost", 0)) return s.getsockname()[1] def wait_for_server(base_url: str, timeout: int | None = None) -> None: """ Wait for the server to be ready by polling the /v1/models endpoint. Args: base_url: The base URL of the server (e.g. http://localhost:1234) timeout: Maximum time to wait in seconds. None means wait forever. """ start_time = time.time() while True: try: response = requests.get( f"{base_url}/v1/models", headers={"Authorization": "Bearer None"}, ) if response.status_code == 200: # A small extra sleep to ensure server is fully up. time.sleep(5) break if timeout and (time.time() - start_time) > timeout: raise TimeoutError("Server did not become ready within timeout period") except requests.exceptions.RequestException: # Server not up yet, wait and retry time.sleep(1) def encode_sft_example(example, tokenizer, max_seq_length): """ This function encodes a single example into a format that can be used for sft training. Here, we assume each example has a 'messages' field. Each message in it is a dict with 'role' and 'content' fields. We use the `apply_chat_template` function from the tokenizer to tokenize the messages and prepare the input and label tensors. Code obtained from the allenai/open-instruct repository: https://github.com/allenai/open-instruct/blob/4365dea3d1a6111e8b2712af06b22a4512a0df88/open_instruct/finetune.py """ import torch messages = example["messages"] if len(messages) == 0: raise ValueError("messages field is empty.") input_ids = tokenizer.apply_chat_template( conversation=messages, tokenize=True, return_tensors="pt", padding=False, truncation=True, max_length=max_seq_length, add_generation_prompt=False, ) labels = input_ids.clone() # mask the non-assistant part for avoiding loss for message_idx, message in enumerate(messages): if message["role"] != "assistant": # we calculate the start index of this non-assistant message if message_idx == 0: message_start_idx = 0 else: message_start_idx = tokenizer.apply_chat_template( conversation=messages[:message_idx], # here marks the end of the previous messages tokenize=True, return_tensors="pt", padding=False, truncation=True, max_length=max_seq_length, add_generation_prompt=False, ).shape[1] # next, we calculate the end index of this non-assistant message if message_idx < len(messages) - 1 and messages[message_idx + 1]["role"] == "assistant": # for intermediate messages that follow with an assistant message, we need to # set `add_generation_prompt=True` to avoid the assistant generation prefix being included in the loss # (e.g., `<|assistant|>`) message_end_idx = tokenizer.apply_chat_template( conversation=messages[: message_idx + 1], tokenize=True, return_tensors="pt", padding=False, truncation=True, max_length=max_seq_length, add_generation_prompt=True, ).shape[1] else: # for the last message or the message that doesn't follow with an assistant message, # we don't need to add the assistant generation prefix message_end_idx = tokenizer.apply_chat_template( conversation=messages[: message_idx + 1], tokenize=True, return_tensors="pt", padding=False, truncation=True, max_length=max_seq_length, add_generation_prompt=False, ).shape[1] # set the label to -100 for the non-assistant part labels[:, message_start_idx:message_end_idx] = -100 if max_seq_length and message_end_idx >= max_seq_length: break attention_mask = torch.ones_like(input_ids) return { "input_ids": input_ids.flatten(), "labels": labels.flatten(), "attention_mask": attention_mask.flatten(), } ``` -------------------------------------------------------------------------------- /dspy/streaming/streaming_listener.py: -------------------------------------------------------------------------------- ```python import re from collections import defaultdict from queue import Queue from typing import TYPE_CHECKING, Any from litellm import ModelResponseStream from dspy.adapters.chat_adapter import ChatAdapter from dspy.adapters.json_adapter import JSONAdapter from dspy.adapters.types import Type from dspy.adapters.xml_adapter import XMLAdapter from dspy.dsp.utils.settings import settings from dspy.streaming.messages import StreamResponse if TYPE_CHECKING: from dspy.primitives.module import Module ADAPTER_SUPPORT_STREAMING = [ChatAdapter, XMLAdapter, JSONAdapter] class StreamListener: """Class that listens to the stream to capture the streeaming of a specific output field of a predictor.""" def __init__( self, signature_field_name: str, predict: Any = None, predict_name: str | None = None, allow_reuse: bool = False, ): """ Args: signature_field_name: The name of the field to listen to. predict: The predictor to listen to. If None, when calling `streamify()` it will automatically look for the predictor that has the `signature_field_name` in its signature. predict_name: The name of the predictor to listen to. If None, when calling `streamify()` it will automatically look for the predictor that has the `signature_field_name` in its signature. allow_reuse: If True, the stream listener can be reused for multiple streams. Please note that this could hurt the performance because the same stream chunk is sent to multiple listeners. """ self.signature_field_name = signature_field_name self.predict = predict self.predict_name = predict_name self.field_start_queue = [] self.field_end_queue = Queue() self.stream_start = False self.stream_end = False self.cache_hit = False self.allow_reuse = allow_reuse self.adapter_identifiers = { "ChatAdapter": { "start_identifier": f"[[ ## {self.signature_field_name} ## ]]", "end_identifier": re.compile(r"\[\[ ## (\w+) ## \]\]"), "start_indicator": "[", "end_pattern_prefixes": ["[", "[[", "[[ ", "[[ #", "[[ ##"], "end_pattern_contains": "[[ ##", }, "JSONAdapter": { "start_identifier": f'"{self.signature_field_name}":', "end_identifier": re.compile(r"\w*\"(,|\s*})"), "start_indicator": '"', "end_pattern_prefixes": ['"', '",', '" ', '"}'], "end_pattern_contains": None, }, "XMLAdapter": { "start_identifier": f"<{self.signature_field_name}>", "end_identifier": re.compile(rf"</{self.signature_field_name}>"), "start_indicator": "<", "end_pattern_prefixes": ["<", "</"], "end_pattern_contains": "</", # Any closing tag start }, } def _buffered_message_end_with_start_identifier(self, concat_message: str, start_identifier: str) -> str: for i in range(len(concat_message)): if start_identifier.startswith(concat_message[len(concat_message) - i - 1 :]): return True return False def _could_form_end_identifier(self, concat_message: str, adapter_name: str) -> bool: """Check if the buffered message could potentially form the end identifier. This prevents unnecessary buffering when the tokens clearly cannot form the end pattern. For example, if buffered message is "hello world" and end pattern is "[[ ## ... ## ]]", we know it cannot form the pattern, so we should yield immediately. Args: concat_message: The concatenated buffered message adapter_name: The name of the adapter being used Returns: True if the message could potentially form part of the end identifier """ adapter_config = self.adapter_identifiers[adapter_name] end_pattern_prefixes = adapter_config.get("end_pattern_prefixes", []) end_pattern_contains = adapter_config.get("end_pattern_contains") # First check: does it end with a potential start of the pattern? if any(concat_message.endswith(prefix) for prefix in end_pattern_prefixes): return True # Second check: if there's a pattern marker, check if message contains it # This handles cases like "[[ ## com" where we have partial field name if end_pattern_contains and end_pattern_contains in concat_message: return True return False def receive(self, chunk: ModelResponseStream): adapter_name = settings.adapter.__class__.__name__ if settings.adapter else "ChatAdapter" if adapter_name not in self.adapter_identifiers: raise ValueError( f"Unsupported adapter for streaming: {adapter_name}, please use one of the following adapters: " f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}" ) start_identifier = self.adapter_identifiers[adapter_name]["start_identifier"] end_identifier = self.adapter_identifiers[adapter_name]["end_identifier"] start_indicator = self.adapter_identifiers[adapter_name]["start_indicator"] if self.stream_end: if self.allow_reuse: # Clear up the state for the next stream. self.stream_end = False self.cache_hit = False self.field_start_queue = [] self.field_end_queue = Queue() self.stream_start = False else: return try: chunk_message = chunk.choices[0].delta.content if chunk_message is None: return except Exception: return # Handle custom streamable types if self._output_type and issubclass(self._output_type, Type) and self._output_type.is_streamable(): if parsed_chunk := self._output_type.parse_stream_chunk(chunk): return StreamResponse( self.predict_name, self.signature_field_name, parsed_chunk, is_last_chunk=self.stream_end, ) if chunk_message and start_identifier in chunk_message: # If the cache is hit, the chunk_message could be the full response. When it happens we can # directly end the stream listening. In some models like gemini, each stream chunk can be multiple # tokens, so it's possible that response only has one chunk, we also fall back to this logic. message_after_start_identifier = chunk_message[ chunk_message.find(start_identifier) + len(start_identifier) : ] if re.search(end_identifier, message_after_start_identifier): self.cache_hit = True self.stream_start = True self.stream_end = True return if len(self.field_start_queue) == 0 and not self.stream_start and start_indicator in chunk_message: # We look for the pattern of start_identifier, i.e., "[[ ## {self.signature_field_name} ## ]]" for # ChatAdapter to identify the start of the stream of our target field. Once the start_indicator, i.e., "[[" # for ChatAdapter, is found, we start checking the next tokens self.field_start_queue.append(chunk_message) return if len(self.field_start_queue) > 0 and not self.stream_start: # We keep appending the tokens to the queue until we have a full identifier or the concanated # tokens no longer match our expected identifier. self.field_start_queue.append(chunk_message) concat_message = "".join(self.field_start_queue) if start_identifier in concat_message: # We have a full identifier, we can start the stream. self.stream_start = True self.field_start_queue = [] # Keep the part after the start_identifier from the concat_message, we need to write it to the buffer. value_start_index = concat_message.find(start_identifier) + len(start_identifier) chunk_message = concat_message[value_start_index:].lstrip() if isinstance(settings.adapter, JSONAdapter) and chunk_message.startswith('"'): # For JSONAdapter, we need to remove the leading ". We cannot do this with the start_identifier # because there could be a few splitters between ':' and '"', e.g., '"name": "value"'. chunk_message = chunk_message[1:] elif self._buffered_message_end_with_start_identifier(concat_message.strip(), start_identifier): # If the buffered message ends with part of the start_identifier, we keep looking for the # start_identifier from the token stream. return else: # Doesn't match the expected identifier, reset the queue. self.field_start_queue = [] return if self.stream_start and chunk_message: # The stream is started, we keep returning the token until we see the start of the next field. token = None self.field_end_queue.put(chunk_message) concat_message = "".join(self.field_end_queue.queue).strip() if re.search(end_identifier, concat_message): # The next field is identified, we can end the stream and flush out all tokens in the buffer. self.stream_end = True token = self.flush() token = token.rstrip() # Remove the trailing \n\n elif not self._could_form_end_identifier(concat_message, adapter_name): # Buffer cannot form end identifier, safe to flush out the tokens in the buffer. token = self.flush() elif self.field_end_queue.qsize() > 10: # Buffer could form end identifier, but we've exceeded max buffer size # Yield the oldest token to prevent unbounded buffering token = self.field_end_queue.get() if token: return StreamResponse( self.predict_name, self.signature_field_name, token, is_last_chunk=self.stream_end, ) def flush(self) -> str: """Flush all tokens in the field end queue. This method is called to flush out the last a few tokens when the stream is ended. These tokens are in the buffer because we don't directly yield the tokens received by the stream listener with the purpose to not yield the end_identifier tokens, e.g., "[[ ## ... ## ]]" for ChatAdapter. """ last_tokens = "".join(self.field_end_queue.queue) self.field_end_queue = Queue() if isinstance(settings.adapter, JSONAdapter): match = re.search(r'",|"\s*}', last_tokens) if match: boundary_index = match.start() else: boundary_index = len(last_tokens) return last_tokens[:boundary_index] elif isinstance(settings.adapter, XMLAdapter): boundary_index = last_tokens.find(f"</{self.signature_field_name}>") if boundary_index == -1: boundary_index = len(last_tokens) return last_tokens[:boundary_index] elif isinstance(settings.adapter, ChatAdapter) or settings.adapter is None: boundary_index = last_tokens.find("[[") if boundary_index == -1: boundary_index = len(last_tokens) return last_tokens[:boundary_index] else: raise ValueError( f"Unsupported adapter for streaming: {settings.adapter}, please use one of the following adapters: " f"{', '.join([a.__name__ for a in ADAPTER_SUPPORT_STREAMING])}" ) def finalize(self) -> StreamResponse | None: """Finalize the stream and flush any remaining buffered tokens. This should be called when the stream ends. It ensures no tokens are lost from the buffer and marks the final chunk appropriately. Returns: A StreamResponse with the remaining buffered tokens and is_last_chunk=True, or None if there are no buffered tokens or the stream hasn't started. """ if self.stream_end or not self.stream_start: # Stream already ended or never started, nothing to finalize return None self.stream_end = True if self.field_end_queue.qsize() > 0: token = self.flush() if token: return StreamResponse( self.predict_name, self.signature_field_name, token, is_last_chunk=True, ) return None @property def _output_type(self) -> type | None: try: return self.predict.signature.output_fields[self.signature_field_name].annotation except Exception: return None def find_predictor_for_stream_listeners(program: "Module", stream_listeners: list[StreamListener]) -> dict[int, list[StreamListener]]: """Find the predictor for each stream listener. This is a utility function to automatically find the predictor for each stream listener. It is used when some listeners don't specify the predictor they want to listen to. If a listener's `signature_field_name` is not unique in the program, this function will raise an error. """ predictors = program.named_predictors() field_name_to_named_predictor = {} for listener in stream_listeners: if listener.predict: continue field_name_to_named_predictor[listener.signature_field_name] = None for name, predictor in predictors: for field_name, field_info in predictor.signature.output_fields.items(): if field_name not in field_name_to_named_predictor: continue if field_name_to_named_predictor[field_name] is not None: raise ValueError( f"Signature field {field_name} is not unique in the program, cannot automatically determine which " "predictor to use for streaming. Please specify the predictor to listen to." ) if not _is_streamable(field_info.annotation): raise ValueError( f"Stream listener can only be applied to string or subclass of `dspy.Type` that has `is_streamable() == True`, " f"but your field {field_name} is of type {field_info.annotation}." ) field_name_to_named_predictor[field_name] = (name, predictor) predict_id_to_listener = defaultdict(list) for listener in stream_listeners: if listener.predict: predict_id_to_listener[id(listener.predict)].append(listener) continue if listener.signature_field_name not in field_name_to_named_predictor: raise ValueError( f"Signature field {listener.signature_field_name} is not a field of any predictor in the program, " "cannot automatically determine which predictor to use for streaming. Please verify your field name or " "specify the predictor to listen to." ) listener.predict_name, listener.predict = field_name_to_named_predictor[listener.signature_field_name] predict_id_to_listener[id(listener.predict)].append(listener) return predict_id_to_listener def _is_streamable(field_type: type | None) -> bool: if field_type is None: return False if field_type is str: return True if issubclass(field_type, Type): return field_type.is_streamable() return False ``` -------------------------------------------------------------------------------- /dspy/teleprompt/simba.py: -------------------------------------------------------------------------------- ```python from __future__ import annotations import logging import random from typing import Any, Callable import numpy as np import dspy from dspy.teleprompt.simba_utils import append_a_demo, append_a_rule, prepare_models_for_resampling, wrap_program from dspy.teleprompt.teleprompt import Teleprompter logger = logging.getLogger(__name__) class SIMBA(Teleprompter): """ SIMBA (Stochastic Introspective Mini-Batch Ascent) optimizer for DSPy. SIMBA is a DSPy optimizer that uses the LLM to analyze its own performance and generate improvement rules. It samples mini-batches, identifies challenging examples with high output variability, then either creates self-reflective rules or adds successful examples as demonstrations. For more details, see: https://dspy.ai/api/optimizers/SIMBA/ """ def __init__( self, *, metric: Callable[[dspy.Example, dict[str, Any]], float], bsize: int = 32, num_candidates: int = 6, max_steps: int = 8, max_demos: int = 4, prompt_model: dspy.LM | None = None, teacher_settings: dict | None = None, demo_input_field_maxlen: int = 100_000, num_threads: int | None = None, temperature_for_sampling: float = 0.2, temperature_for_candidates: float = 0.2, ) -> None: """ Initializes SIMBA. Args: metric: A function that takes an Example and a prediction_dict as input and returns a float. bsize: Mini-batch size. Defaults to 32. num_candidates: Number of new candidate programs to produce per iteration. Defaults to 6. max_steps: Number of optimization steps to run. Defaults to 8. max_demos: Maximum number of demos a predictor can hold before dropping some. Defaults to 4. prompt_model: The model to use to evolve the program. When `prompt_model is None`, the globally configured lm is used. teacher_settings: Settings for the teacher model. Defaults to None. demo_input_field_maxlen: Maximum number of characters to keep in an input field when building a new demo. Defaults to 100,000. num_threads: Number of threads for parallel execution. Defaults to None. temperature_for_sampling: Temperature used for picking programs during the trajectory-sampling step. Defaults to 0.2. temperature_for_candidates: Temperature used for picking the source program for building new candidates. Defaults to 0.2. """ self.metric = metric self.bsize = bsize self.num_candidates = num_candidates self.max_steps = max_steps self.max_demos = max_demos self.prompt_model = prompt_model or dspy.settings.lm self.teacher_settings = teacher_settings self.demo_input_field_maxlen = demo_input_field_maxlen self.num_threads = num_threads self.temperature_for_sampling = temperature_for_sampling self.temperature_for_candidates = temperature_for_candidates if self.max_demos > 0: self.strategies = [append_a_demo(demo_input_field_maxlen), append_a_rule] else: self.strategies = [append_a_rule] def compile( self, student: dspy.Module, *, trainset: list[dspy.Example], seed: int = 0 ) -> dspy.Module: """ Compile and optimize the student module using SIMBA. Args: student: The module to optimize trainset: Training examples for optimization seed: Random seed for reproducibility Returns: The optimized module with candidate_programs and trial_logs attached """ # Basic checks assert len(trainset) >= self.bsize, f"Trainset too small: {len(trainset)} < {self.bsize}" # Initialize RNG rng = random.Random(seed) rng_np = np.random.default_rng(seed) programs = [] program_scores = {} next_program_idx = 0 # Helper functions def calc_average_score(prog_idx: int) -> float: scores = program_scores.get(prog_idx, []) if not scores: return 0.0 return sum(scores) / len(scores) def top_k_plus_baseline(k: int) -> list[int]: # Sort all programs by descending average score scored_programs = sorted(programs, key=lambda p: calc_average_score(p.simba_idx), reverse=True) top_k = [p.simba_idx for p in scored_programs[:k]] # Ensure baseline=0 is in there: if 0 not in top_k and len(top_k) > 0: top_k[-1] = 0 return list(dict.fromkeys(top_k)) def softmax_sample(rng_obj: random.Random, program_idxs: list[int], temperature: float) -> int: if not program_idxs: raise ValueError("No programs available for softmax sampling.") # Unnormalized weights scores = [calc_average_score(idx) for idx in program_idxs] exps = [np.exp(s / temperature) for s in scores] sum_exps = sum(exps) if sum_exps <= 0: # Fallback: uniform if all exps are zero return rng_obj.choice(program_idxs) # Weighted random choice probs = [val / sum_exps for val in exps] return rng_obj.choices(program_idxs, weights=probs, k=1)[0] def register_new_program(prog: dspy.Module, score_list: list[float]) -> None: nonlocal next_program_idx next_program_idx += 1 new_idx = next_program_idx prog.simba_idx = new_idx programs.append(prog) program_scores[new_idx] = score_list # Initialize the baseline program: index=0 student = student.deepcopy() student.simba_idx = 0 programs.append(student) program_scores[0] = [] winning_programs = [student] # Data shuffling data_indices = list(range(len(trainset))) rng.shuffle(data_indices) instance_idx = 0 # Parallel runner run_parallel = dspy.Parallel(access_examples=False, num_threads=self.num_threads) trial_logs = {} for batch_idx in range(self.max_steps): trial_logs[batch_idx] = {} logger.info(f"Starting batch {batch_idx+1} of {self.max_steps}.") # STEP 1: Get next batch if instance_idx + self.bsize > len(trainset): rng.shuffle(data_indices) instance_idx = 0 batch_indices = data_indices[instance_idx : instance_idx + self.bsize] batch = [trainset[i] for i in batch_indices] instance_idx += self.bsize # We'll generate (program, model) pairs for the trajectory sampling. # Prepare distinct LMs (with different temperatures, etc.) from the baseline=programs[0]. models = prepare_models_for_resampling(programs[0], self.num_candidates, self.teacher_settings) top_programs = top_k_plus_baseline(self.num_candidates) exec_pairs = [] predictor2name = {} # For each model, for each example, pick a program from the pool via softmax for model in models: for example in batch: chosen_prog_idx = softmax_sample(rng, top_programs, self.temperature_for_sampling) candidate_system = programs[chosen_prog_idx].deepcopy() candidate_system.set_lm(model) for name, predictor in candidate_system.named_predictors(): predictor2name[id(predictor)] = name # Use the special wrap that includes the 'example' in the output wrapped_candidate_system = wrap_program(candidate_system, self.metric) exec_pairs.append((wrapped_candidate_system, example)) # STEP 2: Execute logger.info(f"Sampling program trajectories on {self.bsize} examples x {self.num_candidates} samples.") outputs = run_parallel(exec_pairs) assert len(outputs) == len(exec_pairs) == self.bsize * self.num_candidates # STEP 3: Sort the training buckets by (max-to-min gap, max score, and max-to-avg gap). buckets = [] largest_max_to_avg_gap = float("-inf") batch_10th_percentile_score = np.percentile([float(o["score"]) for o in outputs], 10) batch_90th_percentile_score = np.percentile([float(o["score"]) for o in outputs], 90) # We'll chunk `outputs` by example index, each chunk has length = num_candidates for idx, _ in enumerate(batch): # gather all results for this example bucket = [outputs[i] for i in range(idx, len(outputs), self.bsize)] bucket.sort(key=lambda x: x["score"], reverse=True) max_score = float(bucket[0]["score"]) min_score = float(bucket[-1]["score"]) avg_score = sum(x["score"] for x in bucket) / len(bucket) max_to_min_gap = max_score - min_score max_to_avg_gap = max_score - avg_score if max_to_avg_gap > largest_max_to_avg_gap: largest_max_to_avg_gap = max_to_avg_gap buckets.append((bucket, (max_to_min_gap, max_score, max_to_avg_gap))) # sort the buckets buckets.sort(key=lambda x: x[1], reverse=True) # Baseline for the batch is just the average of all runs all_scores_in_this_batch = [o["score"] for o in outputs] baseline_score = sum(all_scores_in_this_batch) / len(all_scores_in_this_batch) logger.info(f"Batch {batch_idx+1}: Baseline mini-batch score: {baseline_score}\n") # STEP 4: Build new candidate programs by applying a strategy to some top buckets. system_candidates = [] for bucket_idx, (bucket, bucket_stats) in enumerate(buckets): max_to_min_gap, max_score, max_to_avg_gap = bucket_stats logger.info( f"Batch {batch_idx+1}: Processing bucket #{bucket_idx+1}, with max score {max_score}, " f"max-to-min gap {max_to_min_gap}, and max-to-avg gap {max_to_avg_gap}." ) # pick source program src_prog_idx = softmax_sample( rng, top_k_plus_baseline(self.num_candidates), self.temperature_for_candidates ) system_candidate = programs[src_prog_idx].deepcopy() # Drop some demos from each predictor name2predictor = {} num_demos_list = [] max_demos_tmp = self.max_demos if self.max_demos > 0 else 3 for name, predictor in system_candidate.named_predictors(): name2predictor[name] = predictor num_demos_list.append(len(predictor.demos)) num_demos = max(num_demos_list) if num_demos_list else 0 num_demos_to_drop = max(rng_np.poisson(num_demos / max_demos_tmp), int(num_demos >= max_demos_tmp)) num_demos_to_drop = min(num_demos_to_drop, num_demos) demos_to_drop = [rng.randrange(num_demos) for _ in range(num_demos_to_drop)] for _, predictor in name2predictor.items(): predictor.demos = [demo for idxd, demo in enumerate(predictor.demos) if idxd not in demos_to_drop] # Pick a strategy strategy = rng.choice(self.strategies) logger.info( f"Batch {batch_idx+1}: Invoking strategy: {strategy.__name__}" + (f", having dropped {num_demos_to_drop} demos per predictor" if num_demos_to_drop else "") ) try: strategy( bucket, system_candidate, predictor2name=predictor2name, name2predictor=name2predictor, batch_10p_score=batch_10th_percentile_score, batch_90p_score=batch_90th_percentile_score, prompt_model=self.prompt_model, ) except Exception as e: logger.error(f"Strategy failed with error: {e}") continue system_candidates.append(system_candidate) logger.info("\n") if len(system_candidates) >= self.num_candidates + 1: break # STEP 5: Evaluate these new system_candidates on the same mini-batch logger.info(f"Batch {batch_idx+1}: Evaluating {len(system_candidates)} programs on {self.bsize} examples.") exec_pairs = [(wrap_program(sys, self.metric), ex) for sys in system_candidates for ex in batch] outputs = run_parallel(exec_pairs) assert len(outputs) == len(exec_pairs) == len(system_candidates) * self.bsize # STEP 6: Compute average mini-batch scores for each new candidate candidate_scores = [] for idx_cand, _ in enumerate(system_candidates): start = idx_cand * self.bsize end = (idx_cand + 1) * self.bsize sys_scores = [outputs[i]["score"] for i in range(start, end)] avg_sys_score = sum(sys_scores) / len(sys_scores) candidate_scores.append(avg_sys_score) logger.info( f"Scores after {batch_idx+1} batches: {candidate_scores}, " f"Best: {max(candidate_scores) if candidate_scores else 'N/A'}\n" ) # STEP 7: Select the best among these new ones for "winning" record if candidate_scores: best_idx_among_candidates = candidate_scores.index(max(candidate_scores)) best_program = system_candidates[best_idx_among_candidates] winning_programs.append(best_program.deepcopy()) # STEP 8: Register all new candidate systems in our global pool for idx_cand, cand_sys in enumerate(system_candidates): start = idx_cand * self.bsize end = (idx_cand + 1) * self.bsize sys_scores = [outputs[i]["score"] for i in range(start, end)] register_new_program(cand_sys, sys_scores) M = len(winning_programs) - 1 # noqa: N806 N = self.num_candidates + 1 # noqa: N806 if M < 1: program_idxs = [0] * N else: program_idxs = [round(i * M / (N - 1)) for i in range(N)] program_idxs = list(dict.fromkeys(program_idxs)) candidate_programs = [winning_programs[i].deepcopy() for i in program_idxs] logger.info(f"VALIDATION: Evaluating {len(candidate_programs)} programs on the full trainset.") exec_pairs = [(wrap_program(sys, self.metric), ex) for sys in candidate_programs for ex in trainset] outputs = run_parallel(exec_pairs) scores = [] for idx_prog, _ in enumerate(candidate_programs): start = idx_prog * len(trainset) end = (idx_prog + 1) * len(trainset) sys_scores = [outputs[i]["score"] for i in range(start, end)] avg_score = sum(sys_scores) / len(sys_scores) if sys_scores else 0.0 scores.append(avg_score) if idx_prog != 0: trial_logs[idx_prog - 1]["train_score"] = avg_score # Build sorted list of {"score", "program"} dicts assert len(scores) == len(candidate_programs) candidate_data = [{"score": s, "program": p} for s, p in zip(scores, candidate_programs, strict=False)] candidate_data.sort(key=lambda x: x["score"], reverse=True) best_idx = scores.index(max(scores)) if scores else 0 best_program = candidate_programs[best_idx].deepcopy() logger.info( f"Final trainset scores: {scores}, Best: {max(scores) if scores else 'N/A'} " f"(at index {best_idx if scores else 'N/A'})\n\n\n" ) # Attach sorted, scored candidates & logs best_program.candidate_programs = candidate_data best_program.trial_logs = trial_logs return best_program ``` -------------------------------------------------------------------------------- /dspy/adapters/types/tool.py: -------------------------------------------------------------------------------- ```python import asyncio import inspect from typing import TYPE_CHECKING, Any, Callable, get_origin, get_type_hints import pydantic from jsonschema import ValidationError, validate from pydantic import BaseModel, TypeAdapter, create_model from dspy.adapters.types.base_type import Type from dspy.dsp.utils.settings import settings from dspy.utils.callback import with_callbacks if TYPE_CHECKING: import mcp from langchain.tools import BaseTool _TYPE_MAPPING = {"string": str, "integer": int, "number": float, "boolean": bool, "array": list, "object": dict} class Tool(Type): """Tool class. This class is used to simplify the creation of tools for tool calling (function calling) in LLMs. Only supports functions for now. """ func: Callable name: str | None = None desc: str | None = None args: dict[str, Any] | None = None arg_types: dict[str, Any] | None = None arg_desc: dict[str, str] | None = None has_kwargs: bool = False def __init__( self, func: Callable, name: str | None = None, desc: str | None = None, args: dict[str, Any] | None = None, arg_types: dict[str, Any] | None = None, arg_desc: dict[str, str] | None = None, ): """Initialize the Tool class. Users can choose to specify the `name`, `desc`, `args`, and `arg_types`, or let the `dspy.Tool` automatically infer the values from the function. For values that are specified by the user, automatic inference will not be performed on them. Args: func (Callable): The actual function that is being wrapped by the tool. name (Optional[str], optional): The name of the tool. Defaults to None. desc (Optional[str], optional): The description of the tool. Defaults to None. args (Optional[dict[str, Any]], optional): The args and their schema of the tool, represented as a dictionary from arg name to arg's json schema. Defaults to None. arg_types (Optional[dict[str, Any]], optional): The argument types of the tool, represented as a dictionary from arg name to the type of the argument. Defaults to None. arg_desc (Optional[dict[str, str]], optional): Descriptions for each arg, represented as a dictionary from arg name to description string. Defaults to None. Example: ```python def foo(x: int, y: str = "hello"): return str(x) + y tool = Tool(foo) print(tool.args) # Expected output: {'x': {'type': 'integer'}, 'y': {'type': 'string', 'default': 'hello'}} ``` """ super().__init__(func=func, name=name, desc=desc, args=args, arg_types=arg_types, arg_desc=arg_desc) self._parse_function(func, arg_desc) def _parse_function(self, func: Callable, arg_desc: dict[str, str] | None = None): """Helper method that parses a function to extract the name, description, and args. This is a helper function that automatically infers the name, description, and args of the tool from the provided function. In order to make the inference work, the function must have valid type hints. """ annotations_func = func if inspect.isfunction(func) or inspect.ismethod(func) else func.__call__ name = getattr(func, "__name__", type(func).__name__) desc = getattr(func, "__doc__", None) or getattr(annotations_func, "__doc__", "") args = {} arg_types = {} # Use inspect.signature to get all arg names sig = inspect.signature(annotations_func) # Get available type hints available_hints = get_type_hints(annotations_func) # Build a dictionary of arg name -> type (defaulting to Any when missing) hints = {param_name: available_hints.get(param_name, Any) for param_name in sig.parameters.keys()} default_values = {param_name: sig.parameters[param_name].default for param_name in sig.parameters.keys()} # Process each argument's type to generate its JSON schema. for k, v in hints.items(): arg_types[k] = v if k == "return": continue # Check if the type (or its origin) is a subclass of Pydantic's BaseModel origin = get_origin(v) or v if isinstance(origin, type) and issubclass(origin, BaseModel): # Get json schema, and replace $ref with the actual schema v_json_schema = _resolve_json_schema_reference(v.model_json_schema()) args[k] = v_json_schema else: args[k] = _resolve_json_schema_reference(TypeAdapter(v).json_schema()) if default_values[k] is not inspect.Parameter.empty: args[k]["default"] = default_values[k] if arg_desc and k in arg_desc: args[k]["description"] = arg_desc[k] self.name = self.name or name self.desc = self.desc or desc self.args = self.args if self.args is not None else args self.arg_types = self.arg_types if self.arg_types is not None else arg_types self.has_kwargs = any(param.kind == param.VAR_KEYWORD for param in sig.parameters.values()) def _validate_and_parse_args(self, **kwargs): # Validate the args value comply to the json schema. for k, v in kwargs.items(): if k not in self.args: if self.has_kwargs: continue else: raise ValueError(f"Arg {k} is not in the tool's args.") try: instance = v.model_dump() if hasattr(v, "model_dump") else v type_str = self.args[k].get("type") if type_str is not None and type_str != "Any": validate(instance=instance, schema=self.args[k]) except ValidationError as e: raise ValueError(f"Arg {k} is invalid: {e.message}") # Parse the args to the correct type. parsed_kwargs = {} for k, v in kwargs.items(): if k in self.arg_types and self.arg_types[k] != Any: # Create a pydantic model wrapper with a dummy field `value` to parse the arg to the correct type. # This is specifically useful for handling nested Pydantic models like `list[list[MyPydanticModel]]` pydantic_wrapper = create_model("Wrapper", value=(self.arg_types[k], ...)) parsed = pydantic_wrapper.model_validate({"value": v}) parsed_kwargs[k] = parsed.value else: parsed_kwargs[k] = v return parsed_kwargs def format(self): return str(self) def format_as_litellm_function_call(self): return { "type": "function", "function": { "name": self.name, "description": self.desc, "parameters": { "type": "object", "properties": self.args, "required": list(self.args.keys()), }, }, } def _run_async_in_sync(self, coroutine): try: loop = asyncio.get_running_loop() except RuntimeError: return asyncio.run(coroutine) return loop.run_until_complete(coroutine) @with_callbacks def __call__(self, **kwargs): parsed_kwargs = self._validate_and_parse_args(**kwargs) result = self.func(**parsed_kwargs) if asyncio.iscoroutine(result): if settings.allow_tool_async_sync_conversion: return self._run_async_in_sync(result) else: raise ValueError( "You are calling `__call__` on an async tool, please use `acall` instead or set " "`allow_async=True` to run the async tool in sync mode." ) return result @with_callbacks async def acall(self, **kwargs): parsed_kwargs = self._validate_and_parse_args(**kwargs) result = self.func(**parsed_kwargs) if asyncio.iscoroutine(result): return await result else: # We should allow calling a sync tool in the async path. return result @classmethod def from_mcp_tool(cls, session: "mcp.ClientSession", tool: "mcp.types.Tool") -> "Tool": """ Build a DSPy tool from an MCP tool and a ClientSession. Args: session: The MCP session to use. tool: The MCP tool to convert. Returns: A Tool object. """ from dspy.utils.mcp import convert_mcp_tool return convert_mcp_tool(session, tool) @classmethod def from_langchain(cls, tool: "BaseTool") -> "Tool": """ Build a DSPy tool from a LangChain tool. Args: tool: The LangChain tool to convert. Returns: A Tool object. Example: ```python import asyncio import dspy from langchain.tools import tool as lc_tool @lc_tool def add(x: int, y: int): "Add two numbers together." return x + y dspy_tool = dspy.Tool.from_langchain(add) async def run_tool(): return await dspy_tool.acall(x=1, y=2) print(asyncio.run(run_tool())) # 3 ``` """ from dspy.utils.langchain_tool import convert_langchain_tool return convert_langchain_tool(tool) def __repr__(self): return f"Tool(name={self.name}, desc={self.desc}, args={self.args})" def __str__(self): desc = f", whose description is <desc>{self.desc}</desc>.".replace("\n", " ") if self.desc else "." arg_desc = f"It takes arguments {self.args}." return f"{self.name}{desc} {arg_desc}" class ToolCalls(Type): class ToolCall(Type): name: str args: dict[str, Any] def format(self): return { "type": "function", "function": { "name": self.name, "arguments": self.args, }, } def execute(self, functions: dict[str, Any] | list[Tool] | None = None) -> Any: """Execute this individual tool call and return its result. Args: functions: Functions to search for the tool. Can be: - Dict mapping tool names to functions: {"tool_name": function} - List of Tool objects: [Tool(function), ...] - None: Will search in caller's locals and globals (automatic lookup) Returns: The result from executing this tool call. Raises: ValueError: If the tool function cannot be found. Exception: Any exception raised by the tool function. """ func = None if functions is None: # Automatic lookup in caller's globals and locals frame = inspect.currentframe().f_back try: caller_globals = frame.f_globals caller_locals = frame.f_locals func = caller_locals.get(self.name) or caller_globals.get(self.name) finally: del frame elif isinstance(functions, dict): func = functions.get(self.name) elif isinstance(functions, list): for tool in functions: if tool.name == self.name: func = tool.func break if func is None: raise ValueError(f"Tool function '{self.name}' not found. Please pass the tool functions to the `execute` method.") try: args = self.args or {} return func(**args) except Exception as e: raise RuntimeError(f"Error executing tool '{self.name}': {e}") from e tool_calls: list[ToolCall] @classmethod def from_dict_list(cls, tool_calls_dicts: list[dict[str, Any]]) -> "ToolCalls": """Convert a list of dictionaries to a ToolCalls instance. Args: dict_list: A list of dictionaries, where each dictionary should have 'name' and 'args' keys. Returns: A ToolCalls instance. Example: ```python tool_calls_dict = [ {"name": "search", "args": {"query": "hello"}}, {"name": "translate", "args": {"text": "world"}} ] tool_calls = ToolCalls.from_dict_list(tool_calls_dict) ``` """ tool_calls = [cls.ToolCall(**item) for item in tool_calls_dicts] return cls(tool_calls=tool_calls) @classmethod def description(cls) -> str: return ( "Tool calls information, including the name of the tools and the arguments to be passed to it. " "Arguments must be provided in JSON format." ) def format(self) -> list[dict[str, Any]]: # The tool_call field is compatible with OpenAI's tool calls schema. return { "tool_calls": [tool_call.format() for tool_call in self.tool_calls], } @pydantic.model_validator(mode="before") @classmethod def validate_input(cls, data: Any): if isinstance(data, cls): return data # Handle case where data is a list of dicts with "name" and "args" keys if isinstance(data, list) and all( isinstance(item, dict) and "name" in item and "args" in item for item in data ): return {"tool_calls": [cls.ToolCall(**item) for item in data]} # Handle case where data is a dict elif isinstance(data, dict): if "tool_calls" in data: # Handle case where data is a dict with "tool_calls" key tool_calls_data = data["tool_calls"] if isinstance(tool_calls_data, list): return { "tool_calls": [ cls.ToolCall(**item) if isinstance(item, dict) else item for item in tool_calls_data ] } elif "name" in data and "args" in data: # Handle case where data is a dict with "name" and "args" keys return {"tool_calls": [cls.ToolCall(**data)]} raise ValueError(f"Received invalid value for `dspy.ToolCalls`: {data}") def _resolve_json_schema_reference(schema: dict) -> dict: """Recursively resolve json model schema, expanding all references.""" # If there are no definitions to resolve, return the main schema if "$defs" not in schema and "definitions" not in schema: return schema def resolve_refs(obj: Any) -> Any: if not isinstance(obj, (dict, list)): return obj if isinstance(obj, dict): if "$ref" in obj: ref_path = obj["$ref"].split("/")[-1] return resolve_refs(schema["$defs"][ref_path]) return {k: resolve_refs(v) for k, v in obj.items()} # Must be a list return [resolve_refs(item) for item in obj] # Resolve all references in the main schema resolved_schema = resolve_refs(schema) # Remove the $defs key as it's no longer needed resolved_schema.pop("$defs", None) return resolved_schema def convert_input_schema_to_tool_args( schema: dict[str, Any], ) -> tuple[dict[str, Any], dict[str, Type], dict[str, str]]: """Convert an input json schema to tool arguments compatible with DSPy Tool. Args: schema: An input json schema describing the tool's input parameters Returns: A tuple of (args, arg_types, arg_desc) for DSPy Tool definition. """ args, arg_types, arg_desc = {}, {}, {} properties = schema.get("properties", None) if properties is None: return args, arg_types, arg_desc required = schema.get("required", []) defs = schema.get("$defs", {}) for name, prop in properties.items(): if len(defs) > 0: prop = _resolve_json_schema_reference({"$defs": defs, **prop}) args[name] = prop arg_types[name] = _TYPE_MAPPING.get(prop.get("type"), Any) arg_desc[name] = prop.get("description", "No description provided.") if name in required: arg_desc[name] += " (Required)" return args, arg_types, arg_desc ```