This is page 5 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /dspy/adapters/types/image.py: -------------------------------------------------------------------------------- ```python import base64 import io import mimetypes import os import warnings from functools import lru_cache from typing import Any, Union from urllib.parse import urlparse import pydantic import requests from dspy.adapters.types.base_type import Type try: from PIL import Image as PILImage PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False class Image(Type): url: str model_config = pydantic.ConfigDict( frozen=True, str_strip_whitespace=True, validate_assignment=True, extra="forbid", ) def __init__(self, url: Any = None, *, download: bool = False, **data): """Create an Image. Parameters ---------- url: The image source. Supported values include - ``str``: HTTP(S)/GS URL or local file path - ``bytes``: raw image bytes - ``PIL.Image.Image``: a PIL image instance - ``dict`` with a single ``{"url": value}`` entry (legacy form) - already encoded data URI download: Whether remote URLs should be downloaded to infer their MIME type. Any additional keyword arguments are passed to :class:`pydantic.BaseModel`. """ if url is not None and "url" not in data: # Support a positional argument while allowing ``url=`` in **data. if isinstance(url, dict) and set(url.keys()) == {"url"}: # Legacy dict form from previous model validator. data["url"] = url["url"] else: # ``url`` may be a string, bytes, or a PIL image. data["url"] = url if "url" in data: # Normalize any accepted input into a base64 data URI or plain URL. data["url"] = encode_image(data["url"], download_images=download) # Delegate the rest of initialization to pydantic's BaseModel. super().__init__(**data) @lru_cache(maxsize=32) def format(self) -> list[dict[str, Any]] | str: try: image_url = encode_image(self.url) except Exception as e: raise ValueError(f"Failed to format image for DSPy: {e}") return [{"type": "image_url", "image_url": {"url": image_url}}] @classmethod def from_url(cls, url: str, download: bool = False): warnings.warn( "Image.from_url is deprecated; use Image(url) instead.", DeprecationWarning, stacklevel=2, ) return cls(url, download=download) @classmethod def from_file(cls, file_path: str): warnings.warn( "Image.from_file is deprecated; use Image(file_path) instead.", DeprecationWarning, stacklevel=2, ) return cls(file_path) @classmethod def from_PIL(cls, pil_image): # noqa: N802 warnings.warn( "Image.from_PIL is deprecated; use Image(pil_image) instead.", DeprecationWarning, stacklevel=2, ) return cls(pil_image) def __str__(self): return self.serialize_model() def __repr__(self): if "base64" in self.url: len_base64 = len(self.url.split("base64,")[1]) image_type = self.url.split(";")[0].split("/")[-1] return f"Image(url=data:image/{image_type};base64,<IMAGE_BASE_64_ENCODED({len_base64!s})>)" return f"Image(url='{self.url}')" def is_url(string: str) -> bool: """Check if a string is a valid URL.""" try: result = urlparse(string) return all([result.scheme in ("http", "https", "gs"), result.netloc]) except ValueError: return False def encode_image(image: Union[str, bytes, "PILImage.Image", dict], download_images: bool = False) -> str: """ Encode an image or file to a base64 data URI. Args: image: The image or file to encode. Can be a PIL Image, file path, URL, or data URI. download_images: Whether to download images from URLs. Returns: str: The data URI of the file or the URL if download_images is False. Raises: ValueError: If the file type is not supported. """ if isinstance(image, dict) and "url" in image: # NOTE: Not doing other validation for now return image["url"] elif isinstance(image, str): if image.startswith("data:"): # Already a data URI return image elif os.path.isfile(image): # File path return _encode_image_from_file(image) elif is_url(image): # URL if download_images: return _encode_image_from_url(image) else: # Return the URL as is return image else: # Unsupported string format raise ValueError(f"Unrecognized file string: {image}; If this file type should be supported, please open an issue.") elif PIL_AVAILABLE and isinstance(image, PILImage.Image): # PIL Image return _encode_pil_image(image) elif isinstance(image, bytes): # Raw bytes if not PIL_AVAILABLE: raise ImportError("Pillow is required to process image bytes.") img = PILImage.open(io.BytesIO(image)) return _encode_pil_image(img) elif isinstance(image, Image): return image.url else: print(f"Unsupported image type: {type(image)}") raise ValueError(f"Unsupported image type: {type(image)}") def _encode_image_from_file(file_path: str) -> str: """Encode a file from a file path to a base64 data URI.""" with open(file_path, "rb") as file: file_data = file.read() # Use mimetypes to guess directly from the file path mime_type, _ = mimetypes.guess_type(file_path) if mime_type is None: raise ValueError(f"Could not determine MIME type for file: {file_path}") encoded_data = base64.b64encode(file_data).decode("utf-8") return f"data:{mime_type};base64,{encoded_data}" def _encode_image_from_url(image_url: str) -> str: """Encode a file from a URL to a base64 data URI.""" response = requests.get(image_url) response.raise_for_status() content_type = response.headers.get("Content-Type", "") # Use the content type from the response headers if available if content_type: mime_type = content_type else: # Try to guess MIME type from URL mime_type, _ = mimetypes.guess_type(image_url) if mime_type is None: raise ValueError(f"Could not determine MIME type for URL: {image_url}") encoded_data = base64.b64encode(response.content).decode("utf-8") return f"data:{mime_type};base64,{encoded_data}" def _encode_pil_image(image: "PILImage") -> str: """Encode a PIL Image object to a base64 data URI.""" buffered = io.BytesIO() file_format = image.format or "PNG" image.save(buffered, format=file_format) # Get the correct MIME type using the image format file_extension = file_format.lower() mime_type, _ = mimetypes.guess_type(f"file.{file_extension}") if mime_type is None: raise ValueError(f"Could not determine MIME type for image format: {file_format}") encoded_data = base64.b64encode(buffered.getvalue()).decode("utf-8") return f"data:{mime_type};base64,{encoded_data}" def _get_file_extension(path_or_url: str) -> str: """Extract the file extension from a file path or URL.""" extension = os.path.splitext(urlparse(path_or_url).path)[1].lstrip(".").lower() return extension or "png" # Default to 'png' if no extension found def is_image(obj) -> bool: """Check if the object is an image or a valid media file reference.""" if PIL_AVAILABLE and isinstance(obj, PILImage.Image): return True if isinstance(obj, str): if obj.startswith("data:"): return True elif os.path.isfile(obj): return True elif is_url(obj): return True return False ``` -------------------------------------------------------------------------------- /dspy/primitives/module.py: -------------------------------------------------------------------------------- ```python import inspect import logging from typing import Any import magicattr from dspy.dsp.utils.settings import settings, thread_local_overrides from dspy.predict.parallel import Parallel from dspy.primitives.base_module import BaseModule from dspy.primitives.example import Example from dspy.primitives.prediction import Prediction from dspy.utils.callback import with_callbacks from dspy.utils.inspect_history import pretty_print_history from dspy.utils.usage_tracker import track_usage logger = logging.getLogger(__name__) class ProgramMeta(type): """Metaclass ensuring every ``dspy.Module`` instance is properly initialised.""" def __call__(cls, *args, **kwargs): # Create the instance without invoking ``__init__`` so we can inject # the base initialization beforehand. obj = cls.__new__(cls, *args, **kwargs) if isinstance(obj, cls): # ``_base_init`` sets attributes that should exist on all modules # even when a subclass forgets to call ``super().__init__``. Module._base_init(obj) cls.__init__(obj, *args, **kwargs) # Guarantee existence of critical attributes if ``__init__`` didn't # create them. if not hasattr(obj, "callbacks"): obj.callbacks = [] if not hasattr(obj, "history"): obj.history = [] return obj class Module(BaseModule, metaclass=ProgramMeta): def _base_init(self): self._compiled = False self.callbacks = [] self.history = [] def __init__(self, callbacks=None): self.callbacks = callbacks or [] self._compiled = False # LM calling history of the module. self.history = [] def __getstate__(self): state = self.__dict__.copy() state.pop("history", None) state.pop("callbacks", None) return state def __setstate__(self, state): self.__dict__.update(state) if not hasattr(self, "history"): self.history = [] if not hasattr(self, "callbacks"): self.callbacks = [] @with_callbacks def __call__(self, *args, **kwargs) -> Prediction: caller_modules = settings.caller_modules or [] caller_modules = list(caller_modules) caller_modules.append(self) with settings.context(caller_modules=caller_modules): if settings.track_usage and thread_local_overrides.get().get("usage_tracker") is None: with track_usage() as usage_tracker: output = self.forward(*args, **kwargs) tokens = usage_tracker.get_total_tokens() self._set_lm_usage(tokens, output) return output return self.forward(*args, **kwargs) @with_callbacks async def acall(self, *args, **kwargs) -> Prediction: caller_modules = settings.caller_modules or [] caller_modules = list(caller_modules) caller_modules.append(self) with settings.context(caller_modules=caller_modules): if settings.track_usage and thread_local_overrides.get().get("usage_tracker") is None: with track_usage() as usage_tracker: output = await self.aforward(*args, **kwargs) tokens = usage_tracker.get_total_tokens() self._set_lm_usage(tokens, output) return output return await self.aforward(*args, **kwargs) def named_predictors(self): from dspy.predict.predict import Predict return [(name, param) for name, param in self.named_parameters() if isinstance(param, Predict)] def predictors(self): return [param for _, param in self.named_predictors()] def set_lm(self, lm): for _, param in self.named_predictors(): param.lm = lm def get_lm(self): all_used_lms = [param.lm for _, param in self.named_predictors()] if len(set(all_used_lms)) == 1: return all_used_lms[0] raise ValueError("Multiple LMs are being used in the module. There's no unique LM to return.") def __repr__(self): s = [] for name, param in self.named_predictors(): s.append(f"{name} = {param}") return "\n".join(s) def map_named_predictors(self, func): """Applies a function to all named predictors.""" for name, predictor in self.named_predictors(): set_attribute_by_name(self, name, func(predictor)) return self def inspect_history(self, n: int = 1): return pretty_print_history(self.history, n) def batch( self, examples: list[Example], num_threads: int | None = None, max_errors: int | None = None, return_failed_examples: bool = False, provide_traceback: bool | None = None, disable_progress_bar: bool = False, ) -> list[Example] | tuple[list[Example], list[Example], list[Exception]]: """ Processes a list of dspy.Example instances in parallel using the Parallel module. Args: examples: List of dspy.Example instances to process. num_threads: Number of threads to use for parallel processing. max_errors: Maximum number of errors allowed before stopping execution. If ``None``, inherits from ``dspy.settings.max_errors``. return_failed_examples: Whether to return failed examples and exceptions. provide_traceback: Whether to include traceback information in error logs. disable_progress_bar: Whether to display the progress bar. Returns: List of results, and optionally failed examples and exceptions. """ # Create a list of execution pairs (self, example) exec_pairs = [(self, example.inputs()) for example in examples] # Create an instance of Parallel parallel_executor = Parallel( num_threads=num_threads, max_errors=max_errors, return_failed_examples=return_failed_examples, provide_traceback=provide_traceback, disable_progress_bar=disable_progress_bar, ) # Execute the forward method of Parallel if return_failed_examples: results, failed_examples, exceptions = parallel_executor.forward(exec_pairs) return results, failed_examples, exceptions else: results = parallel_executor.forward(exec_pairs) return results def _set_lm_usage(self, tokens: dict[str, Any], output: Any): # Some optimizers (e.g., GEPA bootstrap tracing) temporarily patch # module.forward to return a tuple: (prediction, trace). # When usage tracking is enabled, ensure we attach usage to the # prediction object if present. prediction_in_output = None if isinstance(output, Prediction): prediction_in_output = output elif isinstance(output, tuple) and len(output) > 0 and isinstance(output[0], Prediction): prediction_in_output = output[0] if prediction_in_output: prediction_in_output.set_lm_usage(tokens) else: logger.warning("Failed to set LM usage. Please return `dspy.Prediction` object from dspy.Module to enable usage tracking.") def __getattribute__(self, name): attr = super().__getattribute__(name) if name == "forward" and callable(attr): # Check if forward is called through __call__ or directly stack = inspect.stack() forward_called_directly = len(stack) <= 1 or stack[1].function != "__call__" if forward_called_directly: logger.warning( f"Calling module.forward(...) on {self.__class__.__name__} directly is discouraged. " f"Please use module(...) instead." ) return attr def set_attribute_by_name(obj, name, value): magicattr.set(obj, name, value) ``` -------------------------------------------------------------------------------- /dspy/dsp/colbertv2.py: -------------------------------------------------------------------------------- ```python from typing import Any import requests from dspy.clients.cache import request_cache from dspy.dsp.utils import dotdict # TODO: Ideally, this takes the name of the index and looks up its port. class ColBERTv2: """Wrapper for the ColBERTv2 Retrieval.""" def __init__( self, url: str = "http://0.0.0.0", port: str | int | None = None, post_requests: bool = False, ): self.post_requests = post_requests self.url = f"{url}:{port}" if port else url def __call__( self, query: str, k: int = 10, simplify: bool = False, ) -> list[str] | list[dotdict]: if self.post_requests: topk: list[dict[str, Any]] = colbertv2_post_request(self.url, query, k) else: topk: list[dict[str, Any]] = colbertv2_get_request(self.url, query, k) if simplify: return [psg["long_text"] for psg in topk] return [dotdict(psg) for psg in topk] @request_cache() def colbertv2_get_request_v2(url: str, query: str, k: int): assert k <= 100, "Only k <= 100 is supported for the hosted ColBERTv2 server at the moment." payload = {"query": query, "k": k} res = requests.get(url, params=payload, timeout=10) topk = res.json()["topk"][:k] topk = [{**d, "long_text": d["text"]} for d in topk] return topk[:k] @request_cache() def colbertv2_get_request_v2_wrapped(*args, **kwargs): return colbertv2_get_request_v2(*args, **kwargs) colbertv2_get_request = colbertv2_get_request_v2_wrapped @request_cache() def colbertv2_post_request_v2(url: str, query: str, k: int): headers = {"Content-Type": "application/json; charset=utf-8"} payload = {"query": query, "k": k} res = requests.post(url, json=payload, headers=headers, timeout=10) return res.json()["topk"][:k] @request_cache() def colbertv2_post_request_v2_wrapped(*args, **kwargs): return colbertv2_post_request_v2(*args, **kwargs) colbertv2_post_request = colbertv2_post_request_v2_wrapped class ColBERTv2RetrieverLocal: def __init__(self, passages: list[str], colbert_config=None, load_only: bool = False): """Colbertv2 retriever module Args: passages (list[str]): list of passages colbert_config (ColBERTConfig, optional): colbert config for building and searching. Defaults to None. load_only (bool, optional): whether to load the index or build and then load. Defaults to False. """ assert ( colbert_config is not None ), "Please pass a valid colbert_config, which you can import from colbert.infra.config import ColBERTConfig and modify it" self.colbert_config = colbert_config assert ( self.colbert_config.checkpoint is not None ), "Please pass a valid checkpoint like colbert-ir/colbertv2.0, which you can modify in the ColBERTConfig with attribute name checkpoint" self.passages = passages assert ( self.colbert_config.index_name is not None ), "Please pass a valid index_name, which you can modify in the ColBERTConfig with attribute name index_name" self.passages = passages if not load_only: print( f"Building the index for experiment {self.colbert_config.experiment} with index name " f"{self.colbert_config.index_name}" ) self.build_index() print( f"Loading the index for experiment {self.colbert_config.experiment} with index name " f"{self.colbert_config.index_name}" ) self.searcher = self.get_index() def build_index(self): try: import colbert # noqa: F401 except ImportError: print( "Colbert not found. Please check your installation or install the module using pip install " "colbert-ai[faiss-gpu,torch]." ) from colbert import Indexer from colbert.infra import Run, RunConfig with Run().context(RunConfig(nranks=self.colbert_config.nranks, experiment=self.colbert_config.experiment)): indexer = Indexer(checkpoint=self.colbert_config.checkpoint, config=self.colbert_config) indexer.index(name=self.colbert_config.index_name, collection=self.passages, overwrite=True) def get_index(self): try: import colbert # noqa: F401 except ImportError: print( "Colbert not found. Please check your installation or install the module using pip install " "colbert-ai[faiss-gpu,torch]." ) from colbert import Searcher from colbert.infra import Run, RunConfig with Run().context(RunConfig(experiment=self.colbert_config.experiment)): searcher = Searcher(index=self.colbert_config.index_name, collection=self.passages) return searcher def __call__(self, *args: Any, **kwargs: Any) -> Any: return self.forward(*args, **kwargs) def forward(self, query: str, k: int = 7, **kwargs): import torch if kwargs.get("filtered_pids"): filtered_pids = kwargs.get("filtered_pids") assert isinstance(filtered_pids, list) and all(isinstance(pid, int) for pid in filtered_pids), "The filtered pids should be a list of integers" device = "cuda" if torch.cuda.is_available() else "cpu" results = self.searcher.search( query, # Number of passages to receive k=k, # Passing the filter function of relevant filter_fn=lambda pids: torch.tensor( [pid for pid in pids if pid in filtered_pids], dtype=torch.int32 ).to(device), ) else: searcher_results = self.searcher.search(query, k=k) results = [] for pid, rank, score in zip(*searcher_results, strict=False): # noqa: B007 results.append(dotdict({"long_text": self.searcher.collection[pid], "score": score, "pid": pid})) return results class ColBERTv2RerankerLocal: def __init__(self, colbert_config=None, checkpoint: str = "bert-base-uncased"): try: import colbert # noqa: F401 except ImportError: print( "Colbert not found. Please check your installation or install the module using pip install " "colbert-ai[faiss-gpu,torch]." ) """_summary_ Args: colbert_config (ColBERTConfig, optional): Colbert config. Defaults to None. checkpoint_name (str, optional): checkpoint for embeddings. Defaults to 'bert-base-uncased'. """ self.colbert_config = colbert_config self.checkpoint = checkpoint self.colbert_config.checkpoint = checkpoint def __call__(self, *args: Any, **kwargs: Any) -> Any: return self.forward(*args, **kwargs) def forward(self, query: str, passages: list[str] | None = None): assert len(passages) > 0, "Passages should not be empty" import numpy as np from colbert.modeling.colbert import ColBERT from colbert.modeling.tokenization.doc_tokenization import DocTokenizer from colbert.modeling.tokenization.query_tokenization import QueryTokenizer passages = passages or [] self.colbert_config.nway = len(passages) query_tokenizer = QueryTokenizer(self.colbert_config, verbose=1) doc_tokenizer = DocTokenizer(self.colbert_config) query_ids, query_masks = query_tokenizer.tensorize([query]) doc_ids, doc_masks = doc_tokenizer.tensorize(passages) col = ColBERT(self.checkpoint, self.colbert_config) q = col.query(query_ids, query_masks) doc_ids, doc_masks = col.doc(doc_ids, doc_masks, keep_dims="return_mask") q_duplicated = q.repeat_interleave(len(passages), dim=0).contiguous() tensor_scores = col.score(q_duplicated, doc_ids, doc_masks) passage_score_arr = np.array([score.cpu().detach().numpy().tolist() for score in tensor_scores]) return passage_score_arr ``` -------------------------------------------------------------------------------- /tests/primitives/resources/saved_program.json: -------------------------------------------------------------------------------- ```json { "react": { "traces": [], "train": [], "demos": [ { "augmented": true, "question": "That Darn Cat! and Never a Dull Moment were both produced by what studio?", "trajectory": "[[ ## thought_0 ## ]]\nI need to find out which studio produced both \"That Darn Cat!\" and \"Never a Dull Moment.\" This information is likely available on Wikipedia, so I will search for it there.\n\n[[ ## tool_name_0 ## ]]\nsearch_wikipedia\n\n[[ ## tool_args_0 ## ]]\n{\"query\": \"That Darn Cat! and Never a Dull Moment studio production\"}\n\n[[ ## observation_0 ## ]]\n[1] «That Darn Cat! | That Darn Cat! is a 1965 American Walt Disney Productions thriller comedy film starring Hayley Mills (in her last of the six films she made for the Walt Disney Studios) and Dean Jones (starring in his first film for Disney) in a story about bank robbers, a kidnapping and a mischievous cat. The film was based on the 1963 novel \"Undercover Cat\" by Gordon and Mildred Gordon and was directed by Robert Stevenson. The title song was written by the Sherman Brothers and sung by Bobby Darin. The 1997 remake includes a cameo appearance by Dean Jones.»\n[2] «Never a Dull Moment (1968 film) | Never a Dull Moment is a 1968 American comedy crime film from Walt Disney Productions starring Dick Van Dyke and Edward G. Robinson and directed by Jerry Paris. The script by AJ Carothers was based on a novel by John Godey. The supporting cast features Dorothy Provine, Henry Silva, Slim Pickens and Jack Elam.»\n[3] «That Darn Cat (1997 film) | That Darn Cat is a 1997 American mystery comedy film starring Christina Ricci and Doug E. Doug. It is a remake of the 1965 film \"That Darn Cat! \", which in turn was based on the book \"Undercover Cat\" by Gordon and Mildred Gordon. It is directed by British TV veteran Bob Spiers (most famous for \"Fawlty Towers\", as well as \"Spice World\") and written by Scott Alexander and Larry Karaszewski, best known for \"Ed Wood\" and the first two \"Problem Child\" films.»", "next_thought": "Both \"That Darn Cat!\" and \"Never a Dull Moment\" were produced by Walt Disney Productions. I have the information I need to answer the question.", "next_tool_name": "finish", "next_tool_args": {} }, { "question": "Was Yakov Protazanov or Marcel Duchamp born in 1881", "answer": "Yakov Alexandrovich Protazanov (Russian: Я́ков Алекса́ндрович Протаза́нов ; January 23 (O.S. February 4), 1881", "dspy_uuid": "15118a69-274d-4eea-b3eb-204d05e24012", "dspy_split": "train" } ], "signature": { "instructions": "Imagine you are a detective racing against time to solve a high-profile case involving two famous films: \"That Darn Cat!\" and \"Never a Dull Moment.\" Your mission is to uncover which studio produced these films before the press conference starts in one hour. You have access to a powerful tool: a Wikipedia search. \n\nGiven the fields `question`, produce the fields `answer`.\n\nYou are an Agent. In each episode, you will be given the fields `question` as input. And you can see your past trajectory so far. Your goal is to use one or more of the supplied tools to collect any necessary information for producing `answer`.\n\nTo do this, you will interleave next_thought, next_tool_name, and next_tool_args in each turn, and also when finishing the task. After each tool call, you receive a resulting observation, which gets appended to your trajectory.\n\nWhen writing next_thought, you may reason about the current situation and plan for future steps. When selecting the next_tool_name and its next_tool_args, the tool must be one of:\n\n(1) search_wikipedia. It takes arguments {'query': {'type': 'string'}}.\n(2) finish, whose description is <desc>Marks the task as complete. That is, signals that all information for producing the outputs, i.e. `answer`, are now available to be extracted.<\/desc>. It takes arguments {}.\nWhen providing `next_tool_args`, the value inside the field must be in JSON format.", "fields": [ { "prefix": "Question:", "description": "${question}" }, { "prefix": "Trajectory:", "description": "${trajectory}" }, { "prefix": "Next Thought:", "description": "${next_thought}" }, { "prefix": "Next Tool Name:", "description": "${next_tool_name}" }, { "prefix": "Next Tool Args:", "description": "${next_tool_args}" } ] }, "lm": null }, "extract.predict": { "traces": [], "train": [], "demos": [ { "augmented": true, "question": "That Darn Cat! and Never a Dull Moment were both produced by what studio?", "trajectory": "[[ ## thought_0 ## ]]\nI need to find out which studio produced both \"That Darn Cat!\" and \"Never a Dull Moment.\" This information is likely available on Wikipedia, so I will search for it there.\n\n[[ ## tool_name_0 ## ]]\nsearch_wikipedia\n\n[[ ## tool_args_0 ## ]]\n{\"query\": \"That Darn Cat! and Never a Dull Moment studio production\"}\n\n[[ ## observation_0 ## ]]\n[1] «That Darn Cat! | That Darn Cat! is a 1965 American Walt Disney Productions thriller comedy film starring Hayley Mills (in her last of the six films she made for the Walt Disney Studios) and Dean Jones (starring in his first film for Disney) in a story about bank robbers, a kidnapping and a mischievous cat. The film was based on the 1963 novel \"Undercover Cat\" by Gordon and Mildred Gordon and was directed by Robert Stevenson. The title song was written by the Sherman Brothers and sung by Bobby Darin. The 1997 remake includes a cameo appearance by Dean Jones.»\n[2] «Never a Dull Moment (1968 film) | Never a Dull Moment is a 1968 American comedy crime film from Walt Disney Productions starring Dick Van Dyke and Edward G. Robinson and directed by Jerry Paris. The script by AJ Carothers was based on a novel by John Godey. The supporting cast features Dorothy Provine, Henry Silva, Slim Pickens and Jack Elam.»\n[3] «That Darn Cat (1997 film) | That Darn Cat is a 1997 American mystery comedy film starring Christina Ricci and Doug E. Doug. It is a remake of the 1965 film \"That Darn Cat! \", which in turn was based on the book \"Undercover Cat\" by Gordon and Mildred Gordon. It is directed by British TV veteran Bob Spiers (most famous for \"Fawlty Towers\", as well as \"Spice World\") and written by Scott Alexander and Larry Karaszewski, best known for \"Ed Wood\" and the first two \"Problem Child\" films.»\n\n[[ ## thought_1 ## ]]\nBoth \"That Darn Cat!\" and \"Never a Dull Moment\" were produced by Walt Disney Productions. I have the information I need to answer the question.\n\n[[ ## tool_name_1 ## ]]\nfinish\n\n[[ ## tool_args_1 ## ]]\n{}\n\n[[ ## observation_1 ## ]]\nCompleted.", "reasoning": "Both \"That Darn Cat!\" and \"Never a Dull Moment\" were produced by Walt Disney Productions, as confirmed by the information retrieved from Wikipedia.", "answer": "Walt Disney Productions" }, { "question": "Are Smyrnium and Nymania both types of plant?", "answer": "yes", "dspy_uuid": "b57b5933-95c7-472a-801b-3cc9bc0a3b99", "dspy_split": "train" } ], "signature": { "instructions": "Given the very verbose fields `question`, produce the fields `answer`.", "fields": [ { "prefix": "Question:", "description": "${question}" }, { "prefix": "Trajectory:", "description": "${trajectory}" }, { "prefix": "Reasoning: Let's think step by step in order to", "description": "${reasoning}" }, { "prefix": "Answer:", "description": "${answer}" } ] }, "lm": null }, "metadata": { "dependency_versions": { "python": "3.13", "dspy": "3.0.0", "cloudpickle": "3.1" } } } ``` -------------------------------------------------------------------------------- /tests/utils/test_usage_tracker.py: -------------------------------------------------------------------------------- ```python import dspy from dspy.utils.usage_tracker import UsageTracker, track_usage def test_add_usage_entry(): """Test adding usage entries to the tracker.""" tracker = UsageTracker() # Test with a single usage entry usage_entry = { "prompt_tokens": 1117, "completion_tokens": 46, "total_tokens": 1163, "prompt_tokens_details": {"cached_tokens": 0, "audio_tokens": 0}, "completion_tokens_details": { "reasoning_tokens": 0, "audio_tokens": 0, "accepted_prediction_tokens": 0, "rejected_prediction_tokens": 0, }, } tracker.add_usage("gpt-4o-mini", usage_entry) assert len(tracker.usage_data["gpt-4o-mini"]) == 1 assert tracker.usage_data["gpt-4o-mini"][0] == usage_entry def test_get_total_tokens(): """Test calculating total tokens from usage entries.""" tracker = UsageTracker() # Add multiple usage entries for the same model usage_entries = [ { "prompt_tokens": 1117, "completion_tokens": 46, "total_tokens": 1163, "prompt_tokens_details": {"cached_tokens": 200, "audio_tokens": 50}, "completion_tokens_details": { "reasoning_tokens": 20, "audio_tokens": 10, "accepted_prediction_tokens": 16, "rejected_prediction_tokens": 0, }, }, { "prompt_tokens": 800, "completion_tokens": 100, "total_tokens": 900, "prompt_tokens_details": {"cached_tokens": 300, "audio_tokens": 0}, "completion_tokens_details": { "reasoning_tokens": 50, "audio_tokens": 0, "accepted_prediction_tokens": 40, "rejected_prediction_tokens": 10, }, }, { "prompt_tokens": 500, "completion_tokens": 80, "total_tokens": 580, "prompt_tokens_details": {"cached_tokens": 100, "audio_tokens": 25}, "completion_tokens_details": { "reasoning_tokens": 30, "audio_tokens": 15, "accepted_prediction_tokens": 25, "rejected_prediction_tokens": 10, }, }, ] for entry in usage_entries: tracker.add_usage("gpt-4o-mini", entry) total_usage = tracker.get_total_tokens() assert "gpt-4o-mini" in total_usage assert total_usage["gpt-4o-mini"]["prompt_tokens"] == 2417 # 1117 + 800 + 500 assert total_usage["gpt-4o-mini"]["completion_tokens"] == 226 # 46 + 100 + 80 assert total_usage["gpt-4o-mini"]["total_tokens"] == 2643 # 1163 + 900 + 580 assert total_usage["gpt-4o-mini"]["prompt_tokens_details"]["cached_tokens"] == 600 # 200 + 300 + 100 assert total_usage["gpt-4o-mini"]["prompt_tokens_details"]["audio_tokens"] == 75 # 50 + 0 + 25 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["reasoning_tokens"] == 100 # 20 + 50 + 30 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["audio_tokens"] == 25 # 10 + 0 + 15 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["accepted_prediction_tokens"] == 81 # 16 + 40 + 25 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["rejected_prediction_tokens"] == 20 # 0 + 10 + 10 def test_track_usage_with_multiple_models(): """Test tracking usage across multiple models.""" tracker = UsageTracker() # Add usage entries for different models usage_entries = [ { "model": "gpt-4o-mini", "usage": { "prompt_tokens": 1117, "completion_tokens": 46, "total_tokens": 1163, "prompt_tokens_details": {"cached_tokens": 0, "audio_tokens": 0}, "completion_tokens_details": { "reasoning_tokens": 0, "audio_tokens": 0, "accepted_prediction_tokens": 0, "rejected_prediction_tokens": 0, }, }, }, { "model": "gpt-3.5-turbo", "usage": { "prompt_tokens": 800, "completion_tokens": 100, "total_tokens": 900, "prompt_tokens_details": {"cached_tokens": 0, "audio_tokens": 0}, "completion_tokens_details": { "reasoning_tokens": 0, "audio_tokens": 0, "accepted_prediction_tokens": 0, "rejected_prediction_tokens": 0, }, }, }, ] for entry in usage_entries: tracker.add_usage(entry["model"], entry["usage"]) total_usage = tracker.get_total_tokens() assert "gpt-4o-mini" in total_usage assert "gpt-3.5-turbo" in total_usage assert total_usage["gpt-4o-mini"]["total_tokens"] == 1163 assert total_usage["gpt-3.5-turbo"]["total_tokens"] == 900 def test_track_usage_context_manager(lm_for_test): lm = dspy.LM(lm_for_test, cache=False) dspy.settings.configure(lm=lm) predict = dspy.ChainOfThought("question -> answer") with track_usage() as tracker: predict(question="What is the capital of France?") predict(question="What is the capital of Italy?") assert len(tracker.usage_data) > 0 assert len(tracker.usage_data[lm_for_test]) == 2 total_usage = tracker.get_total_tokens() assert lm_for_test in total_usage assert len(total_usage.keys()) == 1 assert isinstance(total_usage[lm_for_test], dict) def test_merge_usage_entries_with_new_keys(): """Ensure merging usage entries preserves unseen keys.""" tracker = UsageTracker() tracker.add_usage("model-x", {"prompt_tokens": 5}) tracker.add_usage("model-x", {"completion_tokens": 2}) total_usage = tracker.get_total_tokens() assert total_usage["model-x"]["prompt_tokens"] == 5 assert total_usage["model-x"]["completion_tokens"] == 2 def test_merge_usage_entries_with_none_values(): """Test tracking usage across multiple models.""" tracker = UsageTracker() # Add usage entries for different models usage_entries = [ { "model": "gpt-4o-mini", "usage": { "prompt_tokens": 1117, "completion_tokens": 46, "total_tokens": 1163, "prompt_tokens_details": None, "completion_tokens_details": {}, }, }, { "model": "gpt-4o-mini", "usage": { "prompt_tokens": 800, "completion_tokens": 100, "total_tokens": 900, "prompt_tokens_details": {"cached_tokens": 50, "audio_tokens": 50}, "completion_tokens_details": None, }, }, { "model": "gpt-4o-mini", "usage": { "prompt_tokens": 800, "completion_tokens": 100, "total_tokens": 900, "prompt_tokens_details": None, "completion_tokens_details": { "reasoning_tokens": 1, "audio_tokens": 1, "accepted_prediction_tokens": 1, "rejected_prediction_tokens": 1, }, }, }, ] for entry in usage_entries: tracker.add_usage(entry["model"], entry["usage"]) total_usage = tracker.get_total_tokens() assert total_usage["gpt-4o-mini"]["prompt_tokens"] == 2717 assert total_usage["gpt-4o-mini"]["completion_tokens"] == 246 assert total_usage["gpt-4o-mini"]["total_tokens"] == 2963 assert total_usage["gpt-4o-mini"]["prompt_tokens_details"]["cached_tokens"] == 50 assert total_usage["gpt-4o-mini"]["prompt_tokens_details"]["audio_tokens"] == 50 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["reasoning_tokens"] == 1 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["audio_tokens"] == 1 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["accepted_prediction_tokens"] == 1 assert total_usage["gpt-4o-mini"]["completion_tokens_details"]["rejected_prediction_tokens"] == 1 ``` -------------------------------------------------------------------------------- /tests/evaluate/test_evaluate.py: -------------------------------------------------------------------------------- ```python import signal import threading from unittest.mock import patch import pytest import dspy from dspy.evaluate.evaluate import Evaluate, EvaluationResult from dspy.evaluate.metrics import answer_exact_match from dspy.predict import Predict from dspy.utils.callback import BaseCallback from dspy.utils.dummies import DummyLM def new_example(question, answer): """Helper function to create a new example.""" return dspy.Example( question=question, answer=answer, ).with_inputs("question") def test_evaluate_initialization(): devset = [new_example("What is 1+1?", "2")] ev = Evaluate( devset=devset, metric=answer_exact_match, display_progress=False, ) assert ev.devset == devset assert ev.metric == answer_exact_match assert ev.num_threads is None assert not ev.display_progress def test_evaluate_call(): dspy.settings.configure( lm=DummyLM( { "What is 1+1?": {"answer": "2"}, "What is 2+2?": {"answer": "4"}, } ) ) devset = [new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4")] program = Predict("question -> answer") assert program(question="What is 1+1?").answer == "2" ev = Evaluate( devset=devset, metric=answer_exact_match, display_progress=False, ) score = ev(program) assert score.score == 100.0 @pytest.mark.extra def test_construct_result_df(): import pandas as pd devset = [ new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4"), new_example("What is 3+3?", "-1"), ] ev = Evaluate( devset=devset, metric=answer_exact_match, ) results = [ (devset[0], {"answer": "2"}, 100.0), (devset[1], {"answer": "4"}, 100.0), (devset[2], {"answer": "-1"}, 0.0), ] result_df = ev._construct_result_table(results, answer_exact_match.__name__) pd.testing.assert_frame_equal( result_df, pd.DataFrame( { "question": ["What is 1+1?", "What is 2+2?", "What is 3+3?"], "example_answer": ["2", "4", "-1"], "pred_answer": ["2", "4", "-1"], "answer_exact_match": [100.0, 100.0, 0.0], } ), ) def test_multithread_evaluate_call(): dspy.settings.configure(lm=DummyLM({"What is 1+1?": {"answer": "2"}, "What is 2+2?": {"answer": "4"}})) devset = [new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4")] program = Predict("question -> answer") assert program(question="What is 1+1?").answer == "2" ev = Evaluate( devset=devset, metric=answer_exact_match, display_progress=False, num_threads=2, ) result = ev(program) assert result.score == 100.0 def test_multi_thread_evaluate_call_cancelled(monkeypatch): # slow LM that sleeps for 1 second before returning the answer class SlowLM(DummyLM): def __call__(self, *args, **kwargs): import time time.sleep(1) return super().__call__(*args, **kwargs) dspy.settings.configure(lm=SlowLM({"What is 1+1?": {"answer": "2"}, "What is 2+2?": {"answer": "4"}})) devset = [new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4")] program = Predict("question -> answer") assert program(question="What is 1+1?").answer == "2" # spawn a thread that will sleep for .1 seconds then send a KeyboardInterrupt def sleep_then_interrupt(): import time time.sleep(0.1) import os os.kill(os.getpid(), signal.SIGINT) input_thread = threading.Thread(target=sleep_then_interrupt) input_thread.start() with pytest.raises(KeyboardInterrupt): ev = Evaluate( devset=devset, metric=answer_exact_match, display_progress=False, num_threads=2, ) ev(program) def test_evaluate_call_wrong_answer(): dspy.settings.configure(lm=DummyLM({"What is 1+1?": {"answer": "0"}, "What is 2+2?": {"answer": "0"}})) devset = [new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4")] program = Predict("question -> answer") ev = Evaluate( devset=devset, metric=answer_exact_match, display_progress=False, ) result = ev(program) assert result.score == 0.0 @pytest.mark.extra @pytest.mark.parametrize( "program_with_example", [ (Predict("question -> answer"), new_example("What is 1+1?", "2")), # Create programs that do not return dictionary-like objects because Evaluate() # has failed for such cases in the past ( lambda text: Predict("text: str -> entities: list[str]")(text=text).entities, dspy.Example(text="United States", entities=["United States"]).with_inputs("text"), ), ( lambda text: Predict("text: str -> entities: list[dict[str, str]]")(text=text).entities, dspy.Example(text="United States", entities=[{"name": "United States", "type": "location"}]).with_inputs( "text" ), ), ( lambda text: Predict("text: str -> first_word: Tuple[str, int]")(text=text).words, dspy.Example(text="United States", first_word=("United", 6)).with_inputs("text"), ), ], ) @pytest.mark.parametrize("display_table", [True, False, 1]) @pytest.mark.parametrize("is_in_ipython_notebook_environment", [True, False]) def test_evaluate_display_table(program_with_example, display_table, is_in_ipython_notebook_environment, capfd): program, example = program_with_example example_input = next(iter(example.inputs().values())) example_output = {key: value for key, value in example.toDict().items() if key not in example.inputs()} dspy.settings.configure( lm=DummyLM( { example_input: example_output, } ) ) ev = Evaluate( devset=[example], metric=lambda example, pred, **kwargs: example == pred, display_table=display_table, ) assert ev.display_table == display_table with patch( "dspy.evaluate.evaluate.is_in_ipython_notebook_environment", return_value=is_in_ipython_notebook_environment ): ev(program) out, _ = capfd.readouterr() if not is_in_ipython_notebook_environment and display_table: # In console environments where IPython is not available, the table should be printed # to the console example_input = next(iter(example.inputs().values())) assert example_input in out def test_evaluate_callback(): class TestCallback(BaseCallback): def __init__(self): self.start_call_inputs = None self.start_call_count = 0 self.end_call_outputs = None self.end_call_count = 0 def on_evaluate_start( self, call_id: str, instance, inputs, ): self.start_call_inputs = inputs self.start_call_count += 1 def on_evaluate_end( self, call_id: str, outputs, exception=None, ): self.end_call_outputs = outputs self.end_call_count += 1 callback = TestCallback() dspy.settings.configure( lm=DummyLM( { "What is 1+1?": {"answer": "2"}, "What is 2+2?": {"answer": "4"}, } ), callbacks=[callback], ) devset = [new_example("What is 1+1?", "2"), new_example("What is 2+2?", "4")] program = Predict("question -> answer") assert program(question="What is 1+1?").answer == "2" ev = Evaluate( devset=devset, metric=answer_exact_match, display_progress=False, ) result = ev(program) assert result.score == 100.0 assert callback.start_call_inputs["program"] == program assert callback.start_call_count == 1 assert callback.end_call_outputs.score == 100.0 assert callback.end_call_count == 1 def test_evaluation_result_repr(): result = EvaluationResult(score=100.0, results=[(new_example("What is 1+1?", "2"), {"answer": "2"}, 100.0)]) assert repr(result) == "EvaluationResult(score=100.0, results=<list of 1 results>)" ``` -------------------------------------------------------------------------------- /dspy/clients/openai.py: -------------------------------------------------------------------------------- ```python import time from datetime import datetime from typing import Any import openai from dspy.clients.provider import Provider, TrainingJob from dspy.clients.utils_finetune import TrainDataFormat, TrainingStatus, save_data class TrainingJobOpenAI(TrainingJob): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.provider_file_id = None self.provider_job_id = None def cancel(self): # Cancel the provider job if OpenAIProvider.does_job_exist(self.provider_job_id): status = self.status() if OpenAIProvider.is_terminal_training_status(status): err_msg = "Jobs that are complete cannot be canceled." err_msg += f" Job with ID {self.provider_job_id} is done." raise Exception(err_msg) openai.fine_tuning.jobs.cancel(self.provider_job_id) self.provider_job_id = None # Delete the provider file if self.provider_file_id is not None: if OpenAIProvider.does_file_exist(self.provider_file_id): openai.files.delete(self.provider_file_id) self.provider_file_id = None # Call the super's cancel method after the custom cancellation logic super().cancel() def status(self) -> TrainingStatus: status = OpenAIProvider.get_training_status(self.provider_job_id) return status class OpenAIProvider(Provider): def __init__(self): super().__init__() self.finetunable = True self.TrainingJob = TrainingJobOpenAI @staticmethod def is_provider_model(model: str) -> bool: if model.startswith("openai/") or model.startswith("ft:"): # Althought it looks strange, `ft:` is a unique identifer for openai finetuned models in litellm context: # https://github.com/BerriAI/litellm/blob/cd893134b7974d9f21477049a373b469fff747a5/litellm/utils.py#L4495 return True return False @staticmethod def _remove_provider_prefix(model: str) -> str: provider_prefix = "openai/" return model.replace(provider_prefix, "") @staticmethod def finetune( job: TrainingJobOpenAI, model: str, train_data: list[dict[str, Any]], train_data_format: TrainDataFormat | None, train_kwargs: dict[str, Any] | None = None, ) -> str: model = OpenAIProvider._remove_provider_prefix(model) print("[OpenAI Provider] Validating the data format") OpenAIProvider.validate_data_format(train_data_format) print("[OpenAI Provider] Saving the data to a file") data_path = save_data(train_data) print(f"[OpenAI Provider] Data saved to {data_path}") print("[OpenAI Provider] Uploading the data to the provider") provider_file_id = OpenAIProvider.upload_data(data_path) job.provider_file_id = provider_file_id print("[OpenAI Provider] Starting remote training") provider_job_id = OpenAIProvider._start_remote_training( train_file_id=job.provider_file_id, model=model, train_kwargs=train_kwargs, ) job.provider_job_id = provider_job_id print(f"[OpenAI Provider] Job started with the OpenAI Job ID {provider_job_id}") print("[OpenAI Provider] Waiting for training to complete") # TODO(feature): Could we stream OAI logs? OpenAIProvider.wait_for_job(job) print("[OpenAI Provider] Attempting to retrieve the trained model") model = OpenAIProvider.get_trained_model(job) print(f"[OpenAI Provider] Model retrieved: {model}") return model @staticmethod def does_job_exist(job_id: str) -> bool: try: # TODO(nit): This call may fail for other reasons. We should check # the error message to ensure that the job does not exist. openai.fine_tuning.jobs.retrieve(job_id) return True except Exception: return False @staticmethod def does_file_exist(file_id: str) -> bool: try: # TODO(nit): This call may fail for other reasons. We should check # the error message to ensure that the file does not exist. openai.files.retrieve(file_id) return True except Exception: return False @staticmethod def is_terminal_training_status(status: TrainingStatus) -> bool: return status in [ TrainingStatus.succeeded, TrainingStatus.failed, TrainingStatus.cancelled, ] @staticmethod def get_training_status(job_id: str) -> TrainingStatus: provider_status_to_training_status = { "validating_files": TrainingStatus.pending, "queued": TrainingStatus.pending, "running": TrainingStatus.running, "succeeded": TrainingStatus.succeeded, "failed": TrainingStatus.failed, "cancelled": TrainingStatus.cancelled, } # Check if there is an active job if job_id is None: print("There is no active job.") return TrainingStatus.not_started err_msg = f"Job with ID {job_id} does not exist." assert OpenAIProvider.does_job_exist(job_id), err_msg # Retrieve the provider's job and report the status provider_job = openai.fine_tuning.jobs.retrieve(job_id) provider_status = provider_job.status status = provider_status_to_training_status[provider_status] return status @staticmethod def validate_data_format(data_format: TrainDataFormat): supported_data_formats = [ TrainDataFormat.CHAT, TrainDataFormat.COMPLETION, ] if data_format not in supported_data_formats: err_msg = f"OpenAI does not support the data format {data_format}." raise ValueError(err_msg) @staticmethod def upload_data(data_path: str) -> str: # Upload the data to the provider provider_file = openai.files.create( file=open(data_path, "rb"), purpose="fine-tune", ) return provider_file.id @staticmethod def _start_remote_training(train_file_id: str, model: str, train_kwargs: dict[str, Any] | None = None) -> str: train_kwargs = train_kwargs or {} provider_job = openai.fine_tuning.jobs.create( model=model, training_file=train_file_id, hyperparameters=train_kwargs, ) return provider_job.id @staticmethod def wait_for_job( job: TrainingJobOpenAI, poll_frequency: int = 20, ): # Poll for the job until it is done done = False cur_event_id = None reported_estimated_time = False while not done: # Report estimated time if not already reported if not reported_estimated_time: remote_job = openai.fine_tuning.jobs.retrieve(job.provider_job_id) timestamp = remote_job.estimated_finish if timestamp: estimated_finish_dt = datetime.fromtimestamp(timestamp) delta_dt = estimated_finish_dt - datetime.now() print(f"[OpenAI Provider] The OpenAI estimated time remaining is: {delta_dt}") reported_estimated_time = True # Get new events page = openai.fine_tuning.jobs.list_events(fine_tuning_job_id=job.provider_job_id, limit=1) new_event = page.data[0] if page.data else None if new_event and new_event.id != cur_event_id: dt = datetime.fromtimestamp(new_event.created_at) print(f"[OpenAI Provider] {dt} {new_event.message}") cur_event_id = new_event.id # Sleep and update the flag time.sleep(poll_frequency) done = OpenAIProvider.is_terminal_training_status(job.status()) @staticmethod def get_trained_model(job): status = job.status() if status != TrainingStatus.succeeded: err_msg = f"Job status is {status}." err_msg += f" Must be {TrainingStatus.succeeded} to retrieve model." raise Exception(err_msg) provider_job = openai.fine_tuning.jobs.retrieve(job.provider_job_id) finetuned_model = provider_job.fine_tuned_model return finetuned_model ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/tools.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 2 --- # Tools DSPy provides powerful support for **tool-using agents** that can interact with external functions, APIs, and services. Tools enable language models to go beyond text generation by performing actions, retrieving information, and processing data dynamically. There are two main approaches to using tools in DSPy: 1. **`dspy.ReAct`** - A fully managed tool agent that handles reasoning and tool calls automatically 2. **Manual tool handling** - Direct control over tool calls using `dspy.Tool`, `dspy.ToolCalls`, and custom signatures ## Approach 1: Using `dspy.ReAct` (Fully Managed) The `dspy.ReAct` module implements the Reasoning and Acting (ReAct) pattern, where the language model iteratively reasons about the current situation and decides which tools to call. ### Basic Example ```python import dspy # Define your tools as functions def get_weather(city: str) -> str: """Get the current weather for a city.""" # In a real implementation, this would call a weather API return f"The weather in {city} is sunny and 75°F" def search_web(query: str) -> str: """Search the web for information.""" # In a real implementation, this would call a search API return f"Search results for '{query}': [relevant information...]" # Create a ReAct agent react_agent = dspy.ReAct( signature="question -> answer", tools=[get_weather, search_web], max_iters=5 ) # Use the agent result = react_agent(question="What's the weather like in Tokyo?") print(result.answer) print("Tool calls made:", result.trajectory) ``` ### ReAct Features - **Automatic reasoning**: The model thinks through the problem step by step - **Tool selection**: Automatically chooses which tool to use based on the situation - **Iterative execution**: Can make multiple tool calls to gather information - **Error handling**: Built-in error recovery for failed tool calls - **Trajectory tracking**: Complete history of reasoning and tool calls ### ReAct Parameters ```python react_agent = dspy.ReAct( signature="question -> answer", # Input/output specification tools=[tool1, tool2, tool3], # List of available tools max_iters=10 # Maximum number of tool call iterations ) ``` ## Approach 2: Manual Tool Handling For more control over the tool calling process, you can manually handle tools using DSPy's tool types. ### Basic Setup ```python import dspy class ToolSignature(dspy.Signature): """Signature for manual tool handling.""" question: str = dspy.InputField() tools: list[dspy.Tool] = dspy.InputField() outputs: dspy.ToolCalls = dspy.OutputField() def weather(city: str) -> str: """Get weather information for a city.""" return f"The weather in {city} is sunny" def calculator(expression: str) -> str: """Evaluate a mathematical expression.""" try: result = eval(expression) # Note: Use safely in production return f"The result is {result}" except: return "Invalid expression" # Create tool instances tools = { "weather": dspy.Tool(weather), "calculator": dspy.Tool(calculator) } # Create predictor predictor = dspy.Predict(ToolSignature) # Make a prediction response = predictor( question="What's the weather in New York?", tools=list(tools.values()) ) # Execute the tool calls for call in response.outputs.tool_calls: # Execute the tool call result = call.execute() print(f"Tool: {call.name}") print(f"Args: {call.args}") print(f"Result: {result}") ``` ### Understanding `dspy.Tool` The `dspy.Tool` class wraps regular Python functions to make them compatible with DSPy's tool system: ```python def my_function(param1: str, param2: int = 5) -> str: """A sample function with parameters.""" return f"Processed {param1} with value {param2}" # Create a tool tool = dspy.Tool(my_function) # Tool properties print(tool.name) # "my_function" print(tool.desc) # The function's docstring print(tool.args) # Parameter schema print(str(tool)) # Full tool description ``` ### Understanding `dspy.ToolCalls` The `dspy.ToolCalls` type represents the output from a model that can make tool calls. Each individual tool call can be executed using the `execute` method: ```python # After getting a response with tool calls for call in response.outputs.tool_calls: print(f"Tool name: {call.name}") print(f"Arguments: {call.args}") # Execute individual tool calls with different options: # Option 1: Automatic discovery (finds functions in locals/globals) result = call.execute() # Automatically finds functions by name # Option 2: Pass tools as a dict (most explicit) result = call.execute(functions={"weather": weather, "calculator": calculator}) # Option 3: Pass Tool objects as a list result = call.execute(functions=[dspy.Tool(weather), dspy.Tool(calculator)]) print(f"Result: {result}") ``` ## Using Native Tool Calling DSPy adapters support **native function calling**, which leverages the underlying language model's built-in tool calling capabilities rather than relying on text-based parsing. This approach can provide more reliable tool execution and better integration with models that support native function calling. !!! warning "Native tool calling doesn't guarantee better quality" It's possible that native tool calling produces lower quality than custom tool calling. ### Adapter Behavior Different DSPy adapters have different defaults for native function calling: - **`ChatAdapter`** - Uses `use_native_function_calling=False` by default (relies on text parsing) - **`JSONAdapter`** - Uses `use_native_function_calling=True` by default (uses native function calling) You can override these defaults by explicitly setting the `use_native_function_calling` parameter when creating an adapter. ### Configuration ```python import dspy # ChatAdapter with native function calling enabled chat_adapter_native = dspy.ChatAdapter(use_native_function_calling=True) # JSONAdapter with native function calling disabled json_adapter_manual = dspy.JSONAdapter(use_native_function_calling=False) # Configure DSPy to use the adapter dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=chat_adapter_native) ``` You can enable the [MLflow tracing](https://dspy.ai/tutorials/observability/) to check how native tool calling is being used. If you use `JSONAdapter` or `ChatAdapter` with native function calling enabled on the code snippet as provided in [the section above](tools.md#basic-setup), you should see native function calling arg `tools` is set like the screenshot below:  ### Model Compatibility Native function calling automatically detects model support using `litellm.supports_function_calling()`. If the model doesn't support native function calling, DSPy will fall back to manual text-based parsing even when `use_native_function_calling=True` is set. ## Best Practices ### 1. Tool Function Design - **Clear docstrings**: Tools work better with descriptive documentation - **Type hints**: Provide clear parameter and return types - **Simple parameters**: Use basic types (str, int, bool, dict, list) or Pydantic models ```python def good_tool(city: str, units: str = "celsius") -> str: """ Get weather information for a specific city. Args: city: The name of the city to get weather for units: Temperature units, either 'celsius' or 'fahrenheit' Returns: A string describing the current weather conditions """ # Implementation with proper error handling if not city.strip(): return "Error: City name cannot be empty" # Weather logic here... return f"Weather in {city}: 25°{units[0].upper()}, sunny" ``` ### 2. Choosing Between ReAct and Manual Handling **Use `dspy.ReAct` when:** - You want automatic reasoning and tool selection - The task requires multiple tool calls - You need built-in error recovery - You want to focus on tool implementation rather than orchestration **Use manual tool handling when:** - You need precise control over tool execution - You want custom error handling logic - You want to minimize the latency - Your tool returns nothing (void function) Tools in DSPy provide a powerful way to extend language model capabilities beyond text generation. Whether using the fully automated ReAct approach or manual tool handling, you can build sophisticated agents that interact with the world through code. ``` -------------------------------------------------------------------------------- /dspy/adapters/two_step_adapter.py: -------------------------------------------------------------------------------- ```python from typing import Any import json_repair from dspy.adapters.base import Adapter from dspy.adapters.chat_adapter import ChatAdapter from dspy.adapters.types import ToolCalls from dspy.adapters.utils import get_field_description_string from dspy.clients import LM from dspy.signatures.field import InputField from dspy.signatures.signature import Signature, make_signature """ NOTE/TODO/FIXME: The main issue below is that the second step's signature is entirely created on the fly and is invoked with a chat adapter explicitly constructed with no demonstrations. This means that it cannot "learn" or get optimized. """ class TwoStepAdapter(Adapter): """ A two-stage adapter that: 1. Uses a simpler, more natural prompt for the main LM 2. Uses a smaller LM with chat adapter to extract structured data from the response of main LM This adapter uses a common __call__ logic defined in base Adapter class. This class is particularly useful when interacting with reasoning models as the main LM since reasoning models are known to struggle with structured outputs. Example: ``` import dspy lm = dspy.LM(model="openai/o3-mini", max_tokens=16000, temperature = 1.0) adapter = dspy.TwoStepAdapter(dspy.LM("openai/gpt-4o-mini")) dspy.configure(lm=lm, adapter=adapter) program = dspy.ChainOfThought("question->answer") result = program("What is the capital of France?") print(result) ``` """ def __init__(self, extraction_model: LM, **kwargs): super().__init__(**kwargs) if not isinstance(extraction_model, LM): raise ValueError("extraction_model must be an instance of LM") self.extraction_model = extraction_model def format( self, signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any] ) -> list[dict[str, Any]]: """ Format a prompt for the first stage with the main LM. This no specific structure is required for the main LM, we customize the format method instead of format_field_description or format_field_structure. Args: signature: The signature of the original task demos: A list of demo examples inputs: The current input Returns: A list of messages to be passed to the main LM. """ messages = [] # Create a task description for the main LM task_description = self.format_task_description(signature) messages.append({"role": "system", "content": task_description}) messages.extend(self.format_demos(signature, demos)) # Format the current input messages.append({"role": "user", "content": self.format_user_message_content(signature, inputs)}) return messages def parse(self, signature: Signature, completion: str) -> dict[str, Any]: """ Use a smaller LM (extraction_model) with chat adapter to extract structured data from the raw completion text of the main LM. Args: signature: The signature of the original task completion: The completion from the main LM Returns: A dictionary containing the extracted structured data. """ # The signature is supposed to be "text -> {original output fields}" extractor_signature = self._create_extractor_signature(signature) try: # Call the smaller LM to extract structured data from the raw completion text with ChatAdapter parsed_result = ChatAdapter()( lm=self.extraction_model, lm_kwargs={}, signature=extractor_signature, demos=[], inputs={"text": completion}, ) return parsed_result[0] except Exception as e: raise ValueError(f"Failed to parse response from the original completion: {completion}") from e async def acall( self, lm: "LM", lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: inputs = self.format(signature, demos, inputs) outputs = await lm.acall(messages=inputs, **lm_kwargs) # The signature is supposed to be "text -> {original output fields}" extractor_signature = self._create_extractor_signature(signature) values = [] tool_call_output_field_name = self._get_tool_call_output_field_name(signature) for output in outputs: output_logprobs = None tool_calls = None text = output if isinstance(output, dict): text = output["text"] output_logprobs = output.get("logprobs") tool_calls = output.get("tool_calls") try: # Call the smaller LM to extract structured data from the raw completion text with ChatAdapter value = await ChatAdapter().acall( lm=self.extraction_model, lm_kwargs={}, signature=extractor_signature, demos=[], inputs={"text": text}, ) value = value[0] except Exception as e: raise ValueError(f"Failed to parse response from the original completion: {output}") from e if tool_calls and tool_call_output_field_name: tool_calls = [ { "name": v["function"]["name"], "args": json_repair.loads(v["function"]["arguments"]), } for v in tool_calls ] value[tool_call_output_field_name] = ToolCalls.from_dict_list(tool_calls) if output_logprobs is not None: value["logprobs"] = output_logprobs values.append(value) return values def format_task_description(self, signature: Signature) -> str: """Create a description of the task based on the signature""" parts = [] parts.append("You are a helpful assistant that can solve tasks based on user input.") parts.append("As input, you will be provided with:\n" + get_field_description_string(signature.input_fields)) parts.append("Your outputs must contain:\n" + get_field_description_string(signature.output_fields)) parts.append("You should lay out your outputs in detail so that your answer can be understood by another agent") if signature.instructions: parts.append(f"Specific instructions: {signature.instructions}") return "\n".join(parts) def format_user_message_content( self, signature: type[Signature], inputs: dict[str, Any], prefix: str = "", suffix: str = "", ) -> str: parts = [prefix] for name in signature.input_fields.keys(): if name in inputs: parts.append(f"{name}: {inputs.get(name, '')}") parts.append(suffix) return "\n\n".join(parts).strip() def format_assistant_message_content( self, signature: type[Signature], outputs: dict[str, Any], missing_field_message: str | None = None, ) -> str: parts = [] for name in signature.output_fields.keys(): if name in outputs: parts.append(f"{name}: {outputs.get(name, missing_field_message)}") return "\n\n".join(parts).strip() def _create_extractor_signature( self, original_signature: type[Signature], ) -> type[Signature]: """Create a new signature containing a new 'text' input field and all output fields. Args: original_signature: The original signature to extract output fields from Returns: A new Signature type with a text input field and all output fields """ # Create new fields dict with 'text' input field and all output fields new_fields = { "text": (str, InputField()), **{name: (field.annotation, field) for name, field in original_signature.output_fields.items()}, } outputs_str = ", ".join([f"`{field}`" for field in original_signature.output_fields]) instructions = f"The input is a text that should contain all the necessary information to produce the fields {outputs_str}. \ Your job is to extract the fields from the text verbatim. Extract precisely the appropriate value (content) for each field." return make_signature(new_fields, instructions) ``` -------------------------------------------------------------------------------- /dspy/utils/parallelizer.py: -------------------------------------------------------------------------------- ```python import contextlib import copy import logging import signal import sys import threading import time import traceback from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait import tqdm logger = logging.getLogger(__name__) class ParallelExecutor: def __init__( self, num_threads=None, max_errors=None, disable_progress_bar=False, provide_traceback=None, compare_results=False, timeout=120, straggler_limit=3, ): """ Offers isolation between the tasks (dspy.settings) irrespective of whether num_threads == 1 or > 1. Handles also straggler timeouts. """ from dspy.dsp.utils.settings import settings self.num_threads = num_threads or settings.num_threads self.max_errors = settings.max_errors if max_errors is None else max_errors self.disable_progress_bar = disable_progress_bar self.provide_traceback = provide_traceback if provide_traceback is not None else settings.provide_traceback self.compare_results = compare_results self.timeout = timeout self.straggler_limit = straggler_limit self.error_count = 0 self.error_lock = threading.Lock() self.cancel_jobs = threading.Event() self.failed_indices = [] self.exceptions_map = {} def execute(self, function, data): tqdm.tqdm._instances.clear() wrapped = self._wrap_function(function) return self._execute_parallel(wrapped, data) def _wrap_function(self, user_function): def safe_func(item): if self.cancel_jobs.is_set(): return None try: return user_function(item) except Exception as e: with self.error_lock: self.error_count += 1 if self.error_count >= self.max_errors: self.cancel_jobs.set() if self.provide_traceback: logger.error(f"Error for {item}: {e}\n{traceback.format_exc()}") else: logger.error(f"Error for {item}: {e}. Set `provide_traceback=True` for traceback.") return e return safe_func def _execute_parallel(self, function, data): results = [None] * len(data) job_cancelled = "cancelled" # We resubmit at most once per item. start_time_map = {} start_time_lock = threading.Lock() resubmitted = set() # This is the worker function each thread will run. def worker(parent_overrides, submission_id, index, item): if self.cancel_jobs.is_set(): return index, job_cancelled # Record actual start time with start_time_lock: start_time_map[submission_id] = time.time() # Apply parent's thread-local overrides from dspy.dsp.utils.settings import thread_local_overrides original = thread_local_overrides.get() token = thread_local_overrides.set({**original, **parent_overrides.copy()}) if parent_overrides.get("usage_tracker"): # Usage tracker needs to be deep copied across threads so that each thread tracks its own usage thread_local_overrides.overrides["usage_tracker"] = copy.deepcopy(parent_overrides["usage_tracker"]) try: return index, function(item) finally: thread_local_overrides.reset(token) # Handle Ctrl-C in the main thread @contextlib.contextmanager def interrupt_manager(): if threading.current_thread() is threading.main_thread(): orig_handler = signal.getsignal(signal.SIGINT) def handler(sig, frame): self.cancel_jobs.set() logger.warning("SIGINT received. Cancelling.") orig_handler(sig, frame) signal.signal(signal.SIGINT, handler) try: yield finally: signal.signal(signal.SIGINT, orig_handler) else: yield executor = ThreadPoolExecutor(max_workers=self.num_threads) try: with interrupt_manager(): from dspy.dsp.utils.settings import thread_local_overrides parent_overrides = thread_local_overrides.get().copy() futures_map = {} futures_set = set() submission_counter = 0 for idx, item in enumerate(data): f = executor.submit(worker, parent_overrides, submission_counter, idx, item) futures_map[f] = (submission_counter, idx, item) futures_set.add(f) submission_counter += 1 pbar = tqdm.tqdm( total=len(data), dynamic_ncols=True, disable=self.disable_progress_bar, file=sys.stdout, ) def all_done(): return all(r is not None for r in results) while futures_set and not self.cancel_jobs.is_set(): if all_done(): break done, not_done = wait(futures_set, timeout=1, return_when=FIRST_COMPLETED) for f in done: futures_set.remove(f) try: index, outcome = f.result() except Exception: pass else: if outcome != job_cancelled and results[index] is None: # Check if this is an exception if isinstance(outcome, Exception): with self.error_lock: self.failed_indices.append(index) self.exceptions_map[index] = outcome results[index] = None # Keep None for failed examples else: results[index] = outcome # Update progress if self.compare_results: vals = [r[-1] for r in results if r is not None] self._update_progress(pbar, sum(vals), len(vals)) else: self._update_progress( pbar, len([r for r in results if r is not None]), len(data), ) if all_done(): break # Check stragglers if few remain if 0 < self.timeout and len(not_done) <= self.straggler_limit: now = time.time() for f in list(not_done): if f not in resubmitted: sid, idx, item = futures_map[f] with start_time_lock: st = start_time_map.get(sid, None) if st and (now - st) >= self.timeout: resubmitted.add(f) nf = executor.submit( worker, parent_overrides, submission_counter, idx, item, ) futures_map[nf] = (submission_counter, idx, item) futures_set.add(nf) submission_counter += 1 pbar.close() finally: # Avoid waiting on leftover tasks that no longer matter executor.shutdown(wait=False) if self.cancel_jobs.is_set(): logger.warning("Execution cancelled due to errors or interruption.") raise Exception("Execution cancelled due to errors or interruption.") return results def _update_progress(self, pbar, nresults, ntotal): if self.compare_results: pct = round(100 * nresults / ntotal, 1) if ntotal else 0 pbar.set_description(f"Average Metric: {nresults:.2f} / {ntotal} ({pct}%)") else: pbar.set_description(f"Processed {nresults} / {ntotal} examples") pbar.update() ``` -------------------------------------------------------------------------------- /tests/callback/test_callback.py: -------------------------------------------------------------------------------- ```python import time import pytest import dspy from dspy.utils.callback import ACTIVE_CALL_ID, BaseCallback, with_callbacks from dspy.utils.dummies import DummyLM @pytest.fixture(autouse=True) def reset_settings(): # Make sure the settings are reset after each test original_settings = dspy.settings.copy() yield dspy.settings.configure(**original_settings) class MyCallback(BaseCallback): """A simple callback that records the calls.""" def __init__(self): self.calls = [] def on_module_start(self, call_id, instance, inputs): self.calls.append({"handler": "on_module_start", "instance": instance, "inputs": inputs}) def on_module_end(self, call_id, outputs, exception): self.calls.append({"handler": "on_module_end", "outputs": outputs, "exception": exception}) def on_lm_start(self, call_id, instance, inputs): self.calls.append({"handler": "on_lm_start", "instance": instance, "inputs": inputs}) def on_lm_end(self, call_id, outputs, exception): self.calls.append({"handler": "on_lm_end", "outputs": outputs, "exception": exception}) def on_adapter_format_start(self, call_id, instance, inputs): self.calls.append({"handler": "on_adapter_format_start", "instance": instance, "inputs": inputs}) def on_adapter_format_end(self, call_id, outputs, exception): self.calls.append({"handler": "on_adapter_format_end", "outputs": outputs, "exception": exception}) def on_adapter_parse_start(self, call_id, instance, inputs): self.calls.append({"handler": "on_adapter_parse_start", "instance": instance, "inputs": inputs}) def on_adapter_parse_end(self, call_id, outputs, exception): self.calls.append({"handler": "on_adapter_parse_end", "outputs": outputs, "exception": exception}) def on_tool_start(self, call_id, instance, inputs): self.calls.append({"handler": "on_tool_start", "instance": instance, "inputs": inputs}) def on_tool_end(self, call_id, outputs, exception): self.calls.append({"handler": "on_tool_end", "outputs": outputs, "exception": exception}) @pytest.mark.parametrize( ("args", "kwargs"), [ ([1, "2", 3.0], {}), ([1, "2"], {"z": 3.0}), ([1], {"y": "2", "z": 3.0}), ([], {"x": 1, "y": "2", "z": 3.0}), ], ) def test_callback_injection(args, kwargs): class Target(dspy.Module): @with_callbacks def forward(self, x: int, y: str, z: float) -> int: time.sleep(0.1) return x + int(y) + int(z) callback = MyCallback() dspy.settings.configure(callbacks=[callback]) target = Target() result = target.forward(*args, **kwargs) assert result == 6 assert len(callback.calls) == 2 assert callback.calls[0]["handler"] == "on_module_start" assert callback.calls[0]["inputs"] == {"x": 1, "y": "2", "z": 3.0} assert callback.calls[1]["handler"] == "on_module_end" assert callback.calls[1]["outputs"] == 6 def test_callback_injection_local(): class Target(dspy.Module): @with_callbacks def forward(self, x: int, y: str, z: float) -> int: time.sleep(0.1) return x + int(y) + int(z) callback = MyCallback() target_1 = Target(callbacks=[callback]) result = target_1.forward(1, "2", 3.0) assert result == 6 assert len(callback.calls) == 2 assert callback.calls[0]["handler"] == "on_module_start" assert callback.calls[0]["inputs"] == {"x": 1, "y": "2", "z": 3.0} assert callback.calls[1]["handler"] == "on_module_end" assert callback.calls[1]["outputs"] == 6 callback.calls = [] target_2 = Target() result = target_2.forward(1, "2", 3.0) # Other instance should not trigger the callback assert not callback.calls def test_callback_error_handling(): class Target(dspy.Module): @with_callbacks def forward(self, x: int, y: str, z: float) -> int: time.sleep(0.1) raise ValueError("Error") callback = MyCallback() dspy.settings.configure(callbacks=[callback]) target = Target() with pytest.raises(ValueError, match="Error"): target.forward(1, "2", 3.0) assert len(callback.calls) == 2 assert callback.calls[0]["handler"] == "on_module_start" assert callback.calls[1]["handler"] == "on_module_end" assert isinstance(callback.calls[1]["exception"], ValueError) def test_multiple_callbacks(): class Target(dspy.Module): @with_callbacks def forward(self, x: int, y: str, z: float) -> int: time.sleep(0.1) return x + int(y) + int(z) callback_1 = MyCallback() callback_2 = MyCallback() dspy.settings.configure(callbacks=[callback_1, callback_2]) target = Target() result = target.forward(1, "2", 3.0) assert result == 6 assert len(callback_1.calls) == 2 assert len(callback_2.calls) == 2 def test_callback_complex_module(): callback = MyCallback() dspy.settings.configure( lm=DummyLM({"How are you?": {"answer": "test output", "reasoning": "No more responses"}}), callbacks=[callback], ) cot = dspy.ChainOfThought("question -> answer", n=3) result = cot(question="How are you?") assert result["answer"] == "test output" assert result["reasoning"] == "No more responses" assert len(callback.calls) == 14 assert [call["handler"] for call in callback.calls] == [ "on_module_start", "on_module_start", "on_adapter_format_start", "on_adapter_format_end", "on_lm_start", "on_lm_end", # Parsing will run per output (n=3) "on_adapter_parse_start", "on_adapter_parse_end", "on_adapter_parse_start", "on_adapter_parse_end", "on_adapter_parse_start", "on_adapter_parse_end", "on_module_end", "on_module_end", ] @pytest.mark.asyncio async def test_callback_async_module(): callback = MyCallback() with dspy.context( lm=DummyLM({"How are you?": {"answer": "test output", "reasoning": "No more responses"}}), callbacks=[callback], ): cot = dspy.ChainOfThought("question -> answer", n=3) result = await cot.acall(question="How are you?") assert result["answer"] == "test output" assert result["reasoning"] == "No more responses" assert len(callback.calls) == 14 assert [call["handler"] for call in callback.calls] == [ "on_module_start", "on_module_start", "on_adapter_format_start", "on_adapter_format_end", "on_lm_start", "on_lm_end", # Parsing will run per output (n=3) "on_adapter_parse_start", "on_adapter_parse_end", "on_adapter_parse_start", "on_adapter_parse_end", "on_adapter_parse_start", "on_adapter_parse_end", "on_module_end", "on_module_end", ] def test_tool_calls(): callback = MyCallback() dspy.settings.configure(callbacks=[callback]) def tool_1(query: str) -> str: """A dummy tool function.""" return "result 1" def tool_2(query: str) -> str: """Another dummy tool function.""" return "result 2" class MyModule(dspy.Module): def __init__(self): self.tools = [dspy.Tool(tool_1), dspy.Tool(tool_2)] def forward(self, query: str) -> str: query = self.tools[0](query=query) return self.tools[1](query=query) module = MyModule() result = module("query") assert result == "result 2" assert len(callback.calls) == 6 assert [call["handler"] for call in callback.calls] == [ "on_module_start", "on_tool_start", "on_tool_end", "on_tool_start", "on_tool_end", "on_module_end", ] def test_active_id(): # Test the call ID is generated and handled properly class CustomCallback(BaseCallback): def __init__(self): self.parent_call_ids = [] self.call_ids = [] def on_module_start(self, call_id, instance, inputs): parent_call_id = ACTIVE_CALL_ID.get() self.parent_call_ids.append(parent_call_id) self.call_ids.append(call_id) class Parent(dspy.Module): def __init__(self): self.child_1 = Child() self.child_2 = Child() def forward(self): self.child_1() self.child_2() class Child(dspy.Module): def forward(self): pass callback = CustomCallback() dspy.settings.configure(callbacks=[callback]) parent = Parent() parent() assert len(callback.call_ids) == 3 # All three calls should have different call ids assert len(set(callback.call_ids)) == 3 parent_call_id = callback.call_ids[0] assert callback.parent_call_ids == [None, parent_call_id, parent_call_id] ``` -------------------------------------------------------------------------------- /dspy/teleprompt/avatar_optimizer.py: -------------------------------------------------------------------------------- ```python from concurrent.futures import ThreadPoolExecutor from copy import deepcopy from random import sample from typing import Callable from pydantic import BaseModel from tqdm import tqdm import dspy from dspy.predict.avatar import ActionOutput from dspy.teleprompt.teleprompt import Teleprompter DEFAULT_MAX_EXAMPLES = 10 class EvalResult(BaseModel): example: dict score: float actions: list[ActionOutput] | None = None class Comparator(dspy.Signature): """After executing the given actions on user inputs using the given instruction, some inputs have yielded good, results, while others have not. I'll provide you the inputs along with their, corresponding evaluation metrics: Task: (1) Firstly, identify and contrast the patterns of inputs that have achieved good results with those that have not. (2) Then, review the computational logic for any inconsistencies in the previous actions. (3) Lastly, specify the modification in tools used that can lead to improved performance on the negative inputs.""" instruction: str = dspy.InputField( prefix="Instruction: ", desc="Instruction for the actor to execute the task", ) actions: list[str] = dspy.InputField( prefix="Actions: ", desc="Actions actor can take to complete the task", ) pos_input_with_metrics: list[EvalResult] = dspy.InputField( prefix="Positive Inputs: ", desc="Positive inputs along with their score on a evaluation metric and actions taken", ) neg_input_with_metrics: list[EvalResult] = dspy.InputField( prefix="Negative Inputs: ", desc="Negative inputs along with their score on a evaluation metric and actions taken", ) feedback: str = dspy.OutputField( prefix="Feedback: ", desc="Feedback for the actor to improve the performance of negative inputs", ) class FeedbackBasedInstruction(dspy.Signature): """There is a task that needs to be completed for which one can use multiple tools to achieve the desired outcome. A group's performance was evaluated on a dataset of inputs, the inputs that did well are positive inputs, and the inputs that did not do well are negative inputs. You received feedback on how they can better use the tools to improve your performance on the negative inputs. You have been provided with the previous instruction, that they followed to use tools to complete the task, and the feedback on your performance. Your task is to incorporate the feedback and generate a detailed instruction for the group to follow to improve their performance on the task. Make sure that the new instruction talks about how to use the tools effectively and should be no more than 3 paragraphs long. The previous instruction contains general guidelines that you must retain in the new instruction.""" previous_instruction: str = dspy.InputField( prefix="Previous Instruction: ", desc="Previous instruction for the actor to execute the task", ) feedback: str = dspy.InputField( prefix="Feedback: ", desc="Feedback for the actor to improve the performance of negative inputs", ) new_instruction: str = dspy.OutputField( prefix="New Instruction: ", desc="New instruction for the actor to execute the task", ) class AvatarOptimizer(Teleprompter): def __init__( self, metric: Callable, max_iters: int = 10, lower_bound: int = 0, upper_bound: int = 1, max_positive_inputs: int | None = None, max_negative_inputs: int | None = None, optimize_for: str = "max", ): assert metric is not None, "`metric` argument cannot be None. Please provide a metric function." self.metric = metric self.optimize_for = optimize_for self.max_iters = max_iters self.lower_bound = lower_bound self.upper_bound = upper_bound self.max_positive_inputs = max_positive_inputs or DEFAULT_MAX_EXAMPLES self.max_negative_inputs = max_negative_inputs or DEFAULT_MAX_EXAMPLES self.comparator = dspy.TypedPredictor(Comparator) self.feedback_instruction = dspy.Predict(FeedbackBasedInstruction) def process_example(self, actor, example, return_outputs): actor = deepcopy(actor) try: prediction = actor(**example.inputs().toDict()) score = self.metric(example, prediction) if return_outputs: return example, prediction, score else: return score except Exception as e: print(e) if return_outputs: return example, None, 0 else: return 0 def thread_safe_evaluator(self, devset, actor, return_outputs=False, num_threads=None): total_score = 0 total_examples = len(devset) results = [] num_threads = num_threads or dspy.settings.num_threads with ThreadPoolExecutor(max_workers=num_threads) as executor: futures = [executor.submit(self.process_example, actor, example, return_outputs) for example in devset] for future in tqdm(futures, total=total_examples, desc="Processing examples"): result = future.result() if return_outputs: example, prediction, score = result total_score += score results.append((example, prediction, score)) else: total_score += result avg_metric = total_score / total_examples if return_outputs: return avg_metric, results else: return avg_metric def _get_pos_neg_results( self, actor: dspy.Module, trainset: list[dspy.Example] ) -> tuple[float, list[EvalResult], list[EvalResult]]: pos_inputs = [] neg_inputs = [] avg_score, results = self.thread_safe_evaluator(trainset, actor, return_outputs=True) print(f"Average Score: {avg_score}") for example, prediction, score in results: if score >= self.upper_bound: pos_inputs.append( EvalResult( example=example.inputs().toDict(), score=score, actions=prediction.actions if prediction else None ) ) elif score <= self.lower_bound: neg_inputs.append( EvalResult( example=example.inputs().toDict(), score=score, actions=prediction.actions if prediction else None ) ) if len(pos_inputs) == 0: raise ValueError("No positive examples found, try lowering the upper_bound or providing more training data") if len(neg_inputs) == 0: raise ValueError("No negative examples found, try raising the lower_bound or providing more training data") return (avg_score, pos_inputs, neg_inputs) def compile(self, student, *, trainset): best_actor = deepcopy(student) best_score = -999 if self.optimize_for == "max" else 999 for i in range(self.max_iters): print(20*"=") print(f"Iteration {i+1}/{self.max_iters}") score, pos_inputs, neg_inputs = self._get_pos_neg_results(best_actor, trainset) print(f"Positive examples: {len(pos_inputs)}") print(f"Negative examples: {len(neg_inputs)}") print(f"Sampling {self.max_positive_inputs} positive examples and {self.max_negative_inputs} negative examples") if self.max_positive_inputs and len(pos_inputs) > self.max_positive_inputs: pos_inputs = sample(pos_inputs, self.max_positive_inputs) if self.max_negative_inputs and len(neg_inputs) > self.max_negative_inputs: neg_inputs = sample(neg_inputs, self.max_negative_inputs) feedback = self.comparator( instruction=best_actor.actor.signature.instructions, actions=[str(tool) for tool in best_actor.tools], pos_input_with_metrics=pos_inputs, neg_input_with_metrics=neg_inputs ).feedback new_instruction = self.feedback_instruction( previous_instruction=best_actor.actor.signature.instructions, feedback=feedback ).new_instruction print(f"Generated new instruction: {new_instruction}") if (self.optimize_for == "max" and best_score < score) or (self.optimize_for == "min" and best_score > score): best_actor.actor.signature = best_actor.actor.signature.with_instructions(new_instruction) best_actor.actor_clone = deepcopy(best_actor.actor) best_score = score print(f"Best Actor: {best_actor}") return best_actor ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/deployment/index.md: -------------------------------------------------------------------------------- ```markdown # Tutorial: Deploying your DSPy program This guide demonstrates two potential ways to deploy your DSPy program in production: FastAPI for lightweight deployments and MLflow for more production-grade deployments with program versioning and management. Below, we'll assume you have the following simple DSPy program that you want to deploy. You can replace this with something more sophisticated. ```python import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) dspy_program = dspy.ChainOfThought("question -> answer") ``` ## Deploying with FastAPI FastAPI offers a straightforward way to serve your DSPy program as a REST API. This is ideal when you have direct access to your program code and need a lightweight deployment solution. ```bash > pip install fastapi uvicorn > export OPENAI_API_KEY="your-openai-api-key" ``` Let's create a FastAPI application to serve your `dspy_program` defined above. ```python from fastapi import FastAPI, HTTPException from pydantic import BaseModel import dspy app = FastAPI( title="DSPy Program API", description="A simple API serving a DSPy Chain of Thought program", version="1.0.0" ) # Define request model for better documentation and validation class Question(BaseModel): text: str # Configure your language model and 'asyncify' your DSPy program. lm = dspy.LM("openai/gpt-4o-mini") dspy.settings.configure(lm=lm, async_max_workers=4) # default is 8 dspy_program = dspy.ChainOfThought("question -> answer") dspy_program = dspy.asyncify(dspy_program) @app.post("/predict") async def predict(question: Question): try: result = await dspy_program(question=question.text) return { "status": "success", "data": result.toDict() } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) ``` In the code above, we call `dspy.asyncify` to convert the dspy program to run in async mode for high-throughput FastAPI deployments. Currently, this runs the dspy program in a separate thread and awaits its result. By default, the limit of spawned threads is 8. Think of this like a worker pool. If you have 8 in-flight programs and call it once more, the 9th call will wait until one of the 8 returns. You can configure the async capacity using the new `async_max_workers` setting. ??? "Streaming, in DSPy 2.6.0+" Streaming is also supported in DSPy 2.6.0+, which can be installed via `pip install -U dspy`. We can use `dspy.streamify` to convert the dspy program to a streaming mode. This is useful when you want to stream the intermediate outputs (i.e. O1-style reasoning) to the client before the final prediction is ready. This uses asyncify under the hood and inherits the execution semantics. ```python dspy_program = dspy.asyncify(dspy.ChainOfThought("question -> answer")) streaming_dspy_program = dspy.streamify(dspy_program) @app.post("/predict/stream") async def stream(question: Question): async def generate(): async for value in streaming_dspy_program(question=question.text): if isinstance(value, dspy.Prediction): data = {"prediction": value.labels().toDict()} elif isinstance(value, litellm.ModelResponse): data = {"chunk": value.json()} yield f"data: {ujson.dumps(data)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(generate(), media_type="text/event-stream") # Since you're often going to want to stream the result of a DSPy program as server-sent events, # we've included a helper function for that, which is equivalent to the code above. from dspy.utils.streaming import streaming_response @app.post("/predict/stream") async def stream(question: Question): stream = streaming_dspy_program(question=question.text) return StreamingResponse(streaming_response(stream), media_type="text/event-stream") ``` Write your code to a file, e.g., `fastapi_dspy.py`. Then you can serve the app with: ```bash > uvicorn fastapi_dspy:app --reload ``` It will start a local server at `http://127.0.0.1:8000/`. You can test it with the python code below: ```python import requests response = requests.post( "http://127.0.0.1:8000/predict", json={"text": "What is the capital of France?"} ) print(response.json()) ``` You should see the response like below: ```json { "status": "success", "data": { "reasoning": "The capital of France is a well-known fact, commonly taught in geography classes and referenced in various contexts. Paris is recognized globally as the capital city, serving as the political, cultural, and economic center of the country.", "answer": "The capital of France is Paris." } } ``` ## Deploying with MLflow We recommend deploying with MLflow if you are looking to package your DSPy program and deploy in an isolated environment. MLflow is a popular platform for managing machine learning workflows, including versioning, tracking, and deployment. ```bash > pip install mlflow>=2.18.0 ``` Let's spin up the MLflow tracking server, where we will store our DSPy program. The command below will start a local server at `http://127.0.0.1:5000/`. ```bash > mlflow ui ``` Then we can define the DSPy program and log it to the MLflow server. "log" is an overloaded term in MLflow, basically it means we store the program information along with environment requirements in the MLflow server. This is done via the `mlflow.dspy.log_model()` function, please see the code below: > [!NOTE] > As of MLflow 2.22.0, there is a caveat that you must wrap your DSPy program in a custom DSPy Module class when deploying with MLflow. > This is because MLflow requires positional arguments while DSPy pre-built modules disallow positional arguments, e.g., `dspy.Predict` > or `dspy.ChainOfThought`. To work around this, create a wrapper class that inherits from `dspy.Module` and implement your program's > logic in the `forward()` method, as shown in the example below. ```python import dspy import mlflow mlflow.set_tracking_uri("http://127.0.0.1:5000/") mlflow.set_experiment("deploy_dspy_program") lm = dspy.LM("openai/gpt-4o-mini") dspy.settings.configure(lm=lm) class MyProgram(dspy.Module): def __init__(self): super().__init__() self.cot = dspy.ChainOfThought("question -> answer") def forward(self, messages): return self.cot(question=messages[0]["content"]) dspy_program = MyProgram() with mlflow.start_run(): mlflow.dspy.log_model( dspy_program, "dspy_program", input_example={"messages": [{"role": "user", "content": "What is LLM agent?"}]}, task="llm/v1/chat", ) ``` We recommend you to set `task="llm/v1/chat"` so that the deployed program automatically takes input and generate output in the same format as the OpenAI chat API, which is a common interface for LM applications. Write the code above into a file, e.g. `mlflow_dspy.py`, and run it. After you logged the program, you can view the saved information in MLflow UI. Open `http://127.0.0.1:5000/` and select the `deploy_dspy_program` experiment, then select the run your just created, under the `Artifacts` tab, you should see the logged program information, similar to the following screenshot:  Grab your run id from UI (or the console print when you execute `mlflow_dspy.py`), now you can deploy the logged program with the following command: ```bash > mlflow models serve -m runs:/{run_id}/model -p 6000 ``` After the program is deployed, you can test it with the following command: ```bash > curl http://127.0.0.1:6000/invocations -H "Content-Type:application/json" --data '{"messages": [{"content": "what is 2 + 2?", "role": "user"}]}' ``` You should see the response like below: ```json { "choices": [ { "index": 0, "message": { "role": "assistant", "content": "{\"reasoning\": \"The question asks for the sum of 2 and 2. To find the answer, we simply add the two numbers together: 2 + 2 = 4.\", \"answer\": \"4\"}" }, "finish_reason": "stop" } ] } ``` For complete guide on how to deploy a DSPy program with MLflow, and how to customize the deployment, please refer to the [MLflow documentation](https://mlflow.org/docs/latest/llms/dspy/index.html). ### Best Practices for MLflow Deployment 1. **Environment Management**: Always specify your Python dependencies in a `conda.yaml` or `requirements.txt` file. 2. **Versioning**: Use meaningful tags and descriptions for your model versions. 3. **Input Validation**: Define clear input schemas and examples. 4. **Monitoring**: Set up proper logging and monitoring for production deployments. For production deployments, consider using MLflow with containerization: ```bash > mlflow models build-docker -m "runs:/{run_id}/model" -n "dspy-program" > docker run -p 6000:8080 dspy-program ``` For a complete guide on production deployment options and best practices, refer to the [MLflow documentation](https://mlflow.org/docs/latest/llms/dspy/index.html). ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/signatures.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 2 --- # Signatures When we assign tasks to LMs in DSPy, we specify the behavior we need as a Signature. **A signature is a declarative specification of input/output behavior of a DSPy module.** Signatures allow you to tell the LM _what_ it needs to do, rather than specify _how_ we should ask the LM to do it. You're probably familiar with function signatures, which specify the input and output arguments and their types. DSPy signatures are similar, but with a couple of differences. While typical function signatures just _describe_ things, DSPy Signatures _declare and initialize the behavior_ of modules. Moreover, the field names matter in DSPy Signatures. You express semantic roles in plain English: a `question` is different from an `answer`, a `sql_query` is different from `python_code`. ## Why should I use a DSPy Signature? For modular and clean code, in which LM calls can be optimized into high-quality prompts (or automatic finetunes). Most people coerce LMs to do tasks by hacking long, brittle prompts. Or by collecting/generating data for fine-tuning. Writing signatures is far more modular, adaptive, and reproducible than hacking at prompts or finetunes. The DSPy compiler will figure out how to build a highly-optimized prompt for your LM (or finetune your small LM) for your signature, on your data, and within your pipeline. In many cases, we found that compiling leads to better prompts than humans write. Not because DSPy optimizers are more creative than humans, but simply because they can try more things and tune the metrics directly. ## **Inline** DSPy Signatures Signatures can be defined as a short string, with argument names and optional types that define semantic roles for inputs/outputs. 1. Question Answering: `"question -> answer"`, which is equivalent to `"question: str -> answer: str"` as the default type is always `str` 2. Sentiment Classification: `"sentence -> sentiment: bool"`, e.g. `True` if positive 3. Summarization: `"document -> summary"` Your signatures can also have multiple input/output fields with types: 4. Retrieval-Augmented Question Answering: `"context: list[str], question: str -> answer: str"` 5. Multiple-Choice Question Answering with Reasoning: `"question, choices: list[str] -> reasoning: str, selection: int"` **Tip:** For fields, any valid variable names work! Field names should be semantically meaningful, but start simple and don't prematurely optimize keywords! Leave that kind of hacking to the DSPy compiler. For example, for summarization, it's probably fine to say `"document -> summary"`, `"text -> gist"`, or `"long_context -> tldr"`. You can also add instructions to your inline signature, which can use variables at runtime. Use the `instructions` keyword argument to add instructions to your signature. ```python toxicity = dspy.Predict( dspy.Signature( "comment -> toxic: bool", instructions="Mark as 'toxic' if the comment includes insults, harassment, or sarcastic derogatory remarks.", ) ) comment = "you are beautiful." toxicity(comment=comment).toxic ``` **Output:** ```text False ``` ### Example A: Sentiment Classification ```python sentence = "it's a charming and often affecting journey." # example from the SST-2 dataset. classify = dspy.Predict('sentence -> sentiment: bool') # we'll see an example with Literal[] later classify(sentence=sentence).sentiment ``` **Output:** ```text True ``` ### Example B: Summarization ```python # Example from the XSum dataset. document = """The 21-year-old made seven appearances for the Hammers and netted his only goal for them in a Europa League qualification round match against Andorran side FC Lustrains last season. Lee had two loan spells in League One last term, with Blackpool and then Colchester United. He scored twice for the U's but was unable to save them from relegation. The length of Lee's contract with the promoted Tykes has not been revealed. Find all the latest football transfers on our dedicated page.""" summarize = dspy.ChainOfThought('document -> summary') response = summarize(document=document) print(response.summary) ``` **Possible Output:** ```text The 21-year-old Lee made seven appearances and scored one goal for West Ham last season. He had loan spells in League One with Blackpool and Colchester United, scoring twice for the latter. He has now signed a contract with Barnsley, but the length of the contract has not been revealed. ``` Many DSPy modules (except `dspy.Predict`) return auxiliary information by expanding your signature under the hood. For example, `dspy.ChainOfThought` also adds a `reasoning` field that includes the LM's reasoning before it generates the output `summary`. ```python print("Reasoning:", response.reasoning) ``` **Possible Output:** ```text Reasoning: We need to highlight Lee's performance for West Ham, his loan spells in League One, and his new contract with Barnsley. We also need to mention that his contract length has not been disclosed. ``` ## **Class-based** DSPy Signatures For some advanced tasks, you need more verbose signatures. This is typically to: 1. Clarify something about the nature of the task (expressed below as a `docstring`). 2. Supply hints on the nature of an input field, expressed as a `desc` keyword argument for `dspy.InputField`. 3. Supply constraints on an output field, expressed as a `desc` keyword argument for `dspy.OutputField`. ### Example C: Classification ```python from typing import Literal class Emotion(dspy.Signature): """Classify emotion.""" sentence: str = dspy.InputField() sentiment: Literal['sadness', 'joy', 'love', 'anger', 'fear', 'surprise'] = dspy.OutputField() sentence = "i started feeling a little vulnerable when the giant spotlight started blinding me" # from dair-ai/emotion classify = dspy.Predict(Emotion) classify(sentence=sentence) ``` **Possible Output:** ```text Prediction( sentiment='fear' ) ``` **Tip:** There's nothing wrong with specifying your requests to the LM more clearly. Class-based Signatures help you with that. However, don't prematurely tune the keywords of your signature by hand. The DSPy optimizers will likely do a better job (and will transfer better across LMs). ### Example D: A metric that evaluates faithfulness to citations ```python class CheckCitationFaithfulness(dspy.Signature): """Verify that the text is based on the provided context.""" context: str = dspy.InputField(desc="facts here are assumed to be true") text: str = dspy.InputField() faithfulness: bool = dspy.OutputField() evidence: dict[str, list[str]] = dspy.OutputField(desc="Supporting evidence for claims") context = "The 21-year-old made seven appearances for the Hammers and netted his only goal for them in a Europa League qualification round match against Andorran side FC Lustrains last season. Lee had two loan spells in League One last term, with Blackpool and then Colchester United. He scored twice for the U's but was unable to save them from relegation. The length of Lee's contract with the promoted Tykes has not been revealed. Find all the latest football transfers on our dedicated page." text = "Lee scored 3 goals for Colchester United." faithfulness = dspy.ChainOfThought(CheckCitationFaithfulness) faithfulness(context=context, text=text) ``` **Possible Output:** ```text Prediction( reasoning="Let's check the claims against the context. The text states Lee scored 3 goals for Colchester United, but the context clearly states 'He scored twice for the U's'. This is a direct contradiction.", faithfulness=False, evidence={'goal_count': ["scored twice for the U's"]} ) ``` ### Example E: Multi-modal image classification ```python class DogPictureSignature(dspy.Signature): """Output the dog breed of the dog in the image.""" image_1: dspy.Image = dspy.InputField(desc="An image of a dog") answer: str = dspy.OutputField(desc="The dog breed of the dog in the image") image_url = "https://picsum.photos/id/237/200/300" classify = dspy.Predict(DogPictureSignature) classify(image_1=dspy.Image.from_url(image_url)) ``` **Possible Output:** ```text Prediction( answer='Labrador Retriever' ) ``` ## Type Resolution in Signatures DSPy signatures support various annotation types: 1. **Basic types** like `str`, `int`, `bool` 2. **Typing module types** like `list[str]`, `dict[str, int]`, `Optional[float]`. `Union[str, int]` 3. **Custom types** defined in your code 4. **Dot notation** for nested types with proper configuration 5. **Special data types** like `dspy.Image, dspy.History` ### Working with Custom Types ```python # Simple custom type class QueryResult(pydantic.BaseModel): text: str score: float signature = dspy.Signature("query: str -> result: QueryResult") class MyContainer: class Query(pydantic.BaseModel): text: str class Score(pydantic.BaseModel): score: float signature = dspy.Signature("query: MyContainer.Query -> score: MyContainer.Score") ``` ## Using signatures to build modules & compiling them While signatures are convenient for prototyping with structured inputs/outputs, that's not the only reason to use them! You should compose multiple signatures into bigger [DSPy modules](modules.md) and [compile these modules into optimized prompts](../optimization/optimizers.md) and finetunes. ``` -------------------------------------------------------------------------------- /dspy/predict/refine.py: -------------------------------------------------------------------------------- ```python import inspect import textwrap from typing import Callable import orjson import dspy from dspy.adapters.utils import get_field_description_string from dspy.predict.predict import Prediction from dspy.signatures import InputField, OutputField, Signature from .predict import Module class OfferFeedback(Signature): """ In the discussion, assign blame to each module that contributed to the final reward being below the threshold, if any. Then, prescribe concrete advice of how the module should act on its future input when we retry the process, if it were to receive the same or similar inputs. If a module is not to blame, the advice should be N/A. The module will not see its own history, so it needs to rely on entirely concrete and actionable advice from you to avoid the same mistake on the same or similar inputs. """ program_code: str = InputField(desc="The code of the program that we are analyzing") modules_defn: str = InputField(desc="The definition of each module in the program, including its I/O") program_inputs: str = InputField(desc="The inputs to the program that we are analyzing") program_trajectory: str = InputField(desc="The trajectory of the program's execution, showing each module's I/O") program_outputs: str = InputField(desc="The outputs of the program that we are analyzing") reward_code: str = InputField(desc="The code of the reward function that we are analyzing") target_threshold: float = InputField(desc="The target threshold for the reward function") reward_value: float = InputField(desc="The reward value assigned to the program's outputs") module_names: list[str] = InputField(desc="The names of the modules in the program, for which we seek advice") discussion: str = OutputField(desc="Discussing blame of where each module went wrong, if it did") advice: dict[str, str] = OutputField( desc="For each module, describe very concretely, in this order: the specific scenarios in which it has made " "mistakes in the past and what each mistake was, followed by what it should do differently in that kind of" "scenario in the future. If the module is not to blame, write N/A." ) class Refine(Module): def __init__( self, module: Module, N: int, # noqa: N803 reward_fn: Callable[[dict, Prediction], float], threshold: float, fail_count: int | None = None, ): """ Refines a module by running it up to N times with different rollout IDs at `temperature=1.0` and returns the best prediction. This module runs the provided module multiple times with varying rollout identifiers and selects either the first prediction that exceeds the specified threshold or the one with the highest reward. If no prediction meets the threshold, it automatically generates feedback to improve future predictions. Args: module (Module): The module to refine. N (int): The number of times to run the module. must reward_fn (Callable): The reward function. threshold (float): The threshold for the reward function. fail_count (Optional[int], optional): The number of times the module can fail before raising an error Example: ```python import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) # Define a QA module with chain of thought qa = dspy.ChainOfThought("question -> answer") # Define a reward function that checks for one-word answers def one_word_answer(args, pred): return 1.0 if len(pred.answer.split()) == 1 else 0.0 # Create a refined module that tries up to 3 times best_of_3 = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0) # Use the refined module result = best_of_3(question="What is the capital of Belgium?").answer # Returns: Brussels ``` """ self.module = module self.reward_fn = lambda *args: reward_fn(*args) # to prevent this from becoming a parameter self.threshold = threshold self.N = N self.fail_count = fail_count or N # default to N if fail_count is not provided self.module_code = inspect.getsource(module.__class__) try: self.reward_fn_code = inspect.getsource(reward_fn) except TypeError: self.reward_fn_code = inspect.getsource(reward_fn.__class__) def forward(self, **kwargs): lm = self.module.get_lm() or dspy.settings.lm start = lm.kwargs.get("rollout_id", 0) rollout_ids = [start + i for i in range(self.N)] best_pred, best_trace, best_reward = None, None, -float("inf") advice = None adapter = dspy.settings.adapter or dspy.ChatAdapter() for idx, rid in enumerate(rollout_ids): lm_ = lm.copy(rollout_id=rid, temperature=1.0) mod = self.module.deepcopy() mod.set_lm(lm_) predictor2name = {predictor: name for name, predictor in mod.named_predictors()} signature2name = {predictor.signature: name for name, predictor in mod.named_predictors()} module_names = [name for name, _ in mod.named_predictors()] try: with dspy.context(trace=[]): if not advice: outputs = mod(**kwargs) else: class WrapperAdapter(adapter.__class__): def __call__(self, lm, lm_kwargs, signature, demos, inputs): inputs["hint_"] = advice.get(signature2name[signature], "N/A") # noqa: B023 signature = signature.append( "hint_", InputField(desc="A hint to the module from an earlier run") ) return adapter(lm, lm_kwargs, signature, demos, inputs) with dspy.context(adapter=WrapperAdapter()): outputs = mod(**kwargs) trace = dspy.settings.trace.copy() # TODO: Remove the hint from the trace, if it's there. # NOTE: Not including the trace of reward_fn. reward = self.reward_fn(kwargs, outputs) if reward > best_reward: best_reward, best_pred, best_trace = reward, outputs, trace if self.threshold is not None and reward >= self.threshold: break if idx == self.N - 1: break modules = {"program_code": self.module_code, "modules_defn": inspect_modules(mod)} trajectory = [{"module_name": predictor2name[p], "inputs": i, "outputs": dict(o)} for p, i, o in trace] trajectory = { "program_inputs": kwargs, "program_trajectory": trajectory, "program_outputs": dict(outputs), } reward = { "reward_code": self.reward_fn_code, "target_threshold": self.threshold, "reward_value": reward, } advise_kwargs = dict(**modules, **trajectory, **reward, module_names=module_names) # only dumps if it's a list or dict advise_kwargs = { k: v if isinstance(v, str) else orjson.dumps(recursive_mask(v), option=orjson.OPT_INDENT_2).decode() for k, v in advise_kwargs.items() } advice = dspy.Predict(OfferFeedback)(**advise_kwargs).advice # print(f"Advice for each module: {advice}") except Exception as e: print(f"Refine: Attempt failed with rollout id {rid}: {e}") if idx > self.fail_count: raise e self.fail_count -= 1 if best_trace: dspy.settings.trace.extend(best_trace) return best_pred def inspect_modules(program): separator = "-" * 80 output = [separator] for _, (name, predictor) in enumerate(program.named_predictors()): signature = predictor.signature instructions = textwrap.dedent(signature.instructions) instructions = ("\n" + "\t" * 2).join([""] + instructions.splitlines()) output.append(f"Module {name}") output.append("\n\tInput Fields:") output.append(("\n" + "\t" * 2).join([""] + get_field_description_string(signature.input_fields).splitlines())) output.append("\tOutput Fields:") output.append(("\n" + "\t" * 2).join([""] + get_field_description_string(signature.output_fields).splitlines())) output.append(f"\tOriginal Instructions: {instructions}") output.append(separator) return "\n".join([o.strip("\n") for o in output]) def recursive_mask(o): # If the object is already serializable, return it. try: orjson.dumps(o) return o except TypeError: pass # If it's a dictionary, apply recursively to its values. if isinstance(o, dict): return {k: recursive_mask(v) for k, v in o.items()} # If it's a list, apply recursively. elif isinstance(o, list): return [recursive_mask(v) for v in o] # If it's a tuple, apply recursively. elif isinstance(o, tuple): return tuple(recursive_mask(v) for v in o) # Otherwise, replace it with a placeholder string (or use repr(o)). else: return f"<non-serializable: {type(o).__name__}>" ``` -------------------------------------------------------------------------------- /dspy/clients/base_lm.py: -------------------------------------------------------------------------------- ```python import datetime import uuid from dspy.dsp.utils import settings from dspy.utils.callback import with_callbacks from dspy.utils.inspect_history import pretty_print_history MAX_HISTORY_SIZE = 10_000 GLOBAL_HISTORY = [] class BaseLM: """Base class for handling LLM calls. Most users can directly use the `dspy.LM` class, which is a subclass of `BaseLM`. Users can also implement their own subclasses of `BaseLM` to support custom LLM providers and inject custom logic. To do so, simply override the `forward` method and make sure the return format is identical to the [OpenAI response format](https://platform.openai.com/docs/api-reference/responses/object). Example: ```python from openai import OpenAI import dspy class MyLM(dspy.BaseLM): def forward(self, prompt, messages=None, **kwargs): client = OpenAI() return client.chat.completions.create( model=self.model, messages=messages or [{"role": "user", "content": prompt}], **self.kwargs, ) lm = MyLM(model="gpt-4o-mini") dspy.configure(lm=lm) print(dspy.Predict("q->a")(q="Why did the chicken cross the kitchen?")) ``` """ def __init__(self, model, model_type="chat", temperature=0.0, max_tokens=1000, cache=True, **kwargs): self.model = model self.model_type = model_type self.cache = cache self.kwargs = dict(temperature=temperature, max_tokens=max_tokens, **kwargs) self.history = [] def _process_lm_response(self, response, prompt, messages, **kwargs): merged_kwargs = {**self.kwargs, **kwargs} if self.model_type == "responses": outputs = self._process_response(response) else: outputs = self._process_completion(response, merged_kwargs) if settings.disable_history: return outputs # Logging, with removed api key & where `cost` is None on cache hit. kwargs = {k: v for k, v in kwargs.items() if not k.startswith("api_")} entry = { "prompt": prompt, "messages": messages, "kwargs": kwargs, "response": response, "outputs": outputs, "usage": dict(response.usage), "cost": getattr(response, "_hidden_params", {}).get("response_cost"), "timestamp": datetime.datetime.now().isoformat(), "uuid": str(uuid.uuid4()), "model": self.model, "response_model": response.model, "model_type": self.model_type, } self.update_history(entry) return outputs @with_callbacks def __call__(self, prompt=None, messages=None, **kwargs): response = self.forward(prompt=prompt, messages=messages, **kwargs) outputs = self._process_lm_response(response, prompt, messages, **kwargs) return outputs @with_callbacks async def acall(self, prompt=None, messages=None, **kwargs): response = await self.aforward(prompt=prompt, messages=messages, **kwargs) outputs = self._process_lm_response(response, prompt, messages, **kwargs) return outputs def forward(self, prompt=None, messages=None, **kwargs): """Forward pass for the language model. Subclasses must implement this method, and the response should be identical to [OpenAI response format](https://platform.openai.com/docs/api-reference/responses/object). """ raise NotImplementedError("Subclasses must implement this method.") async def aforward(self, prompt=None, messages=None, **kwargs): """Async forward pass for the language model. Subclasses that support async should implement this method, and the response should be identical to [OpenAI response format](https://platform.openai.com/docs/api-reference/responses/object). """ raise NotImplementedError("Subclasses must implement this method.") def copy(self, **kwargs): """Returns a copy of the language model with possibly updated parameters. Any provided keyword arguments update the corresponding attributes or LM kwargs of the copy. For example, ``lm.copy(rollout_id=1, temperature=1.0)`` returns an LM whose requests use a different rollout ID at non-zero temperature to bypass cache collisions. """ import copy new_instance = copy.deepcopy(self) new_instance.history = [] for key, value in kwargs.items(): if hasattr(self, key): setattr(new_instance, key, value) if (key in self.kwargs) or (not hasattr(self, key)): if value is None: new_instance.kwargs.pop(key, None) else: new_instance.kwargs[key] = value if hasattr(new_instance, "_warned_zero_temp_rollout"): new_instance._warned_zero_temp_rollout = False return new_instance def inspect_history(self, n: int = 1): return pretty_print_history(self.history, n) def update_history(self, entry): if settings.disable_history: return # Global LM history if len(GLOBAL_HISTORY) >= MAX_HISTORY_SIZE: GLOBAL_HISTORY.pop(0) GLOBAL_HISTORY.append(entry) if settings.max_history_size == 0: return # dspy.LM.history if len(self.history) >= settings.max_history_size: self.history.pop(0) self.history.append(entry) # Per-module history caller_modules = settings.caller_modules or [] for module in caller_modules: if len(module.history) >= settings.max_history_size: module.history.pop(0) module.history.append(entry) def _process_completion(self, response, merged_kwargs): """Process the response of OpenAI chat completion API and extract outputs. Args: response: The OpenAI chat completion response https://platform.openai.com/docs/api-reference/chat/object merged_kwargs: Merged kwargs from self.kwargs and method kwargs Returns: List of processed outputs """ outputs = [] for c in response.choices: output = {} output["text"] = c.message.content if hasattr(c, "message") else c["text"] if merged_kwargs.get("logprobs"): output["logprobs"] = c.logprobs if hasattr(c, "logprobs") else c["logprobs"] if hasattr(c, "message") and getattr(c.message, "tool_calls", None): output["tool_calls"] = c.message.tool_calls # Extract citations from LiteLLM response if available citations = self._extract_citations_from_response(c) if citations: output["citations"] = citations outputs.append(output) if all(len(output) == 1 for output in outputs): # Return a list if every output only has "text" key outputs = [output["text"] for output in outputs] return outputs def _extract_citations_from_response(self, choice): """Extract citations from LiteLLM response if available. Reference: https://docs.litellm.ai/docs/providers/anthropic#beta-citations-api Args: choice: The choice object from response.choices Returns: A list of citation dictionaries or None if no citations found """ try: # Check for citations in LiteLLM provider_specific_fields citations_data = choice.message.provider_specific_fields.get("citations") if isinstance(citations_data, list): return [citation for citations in citations_data for citation in citations] except Exception: return None def _process_response(self, response): """Process the response of OpenAI Response API and extract outputs. Args: response: OpenAI Response API response https://platform.openai.com/docs/api-reference/responses/object Returns: List of processed outputs, which is always of size 1 because the Response API only supports one output. """ text_outputs = [] tool_calls = [] reasoning_contents = [] for output_item in response.output: output_item_type = output_item.type if output_item_type == "message": for content_item in output_item.content: text_outputs.append(content_item.text) elif output_item_type == "function_call": tool_calls.append(output_item.model_dump()) elif output_item_type == "reasoning": if getattr(output_item, "content", None) and len(output_item.content) > 0: for content_item in output_item.content: reasoning_contents.append(content_item.text) elif getattr(output_item, "summary", None) and len(output_item.summary) > 0: for summary_item in output_item.summary: reasoning_contents.append(summary_item.text) result = {} if len(text_outputs) > 0: result["text"] = "".join(text_outputs) if len(tool_calls) > 0: result["tool_calls"] = tool_calls if len(reasoning_contents) > 0: result["reasoning_content"] = "".join(reasoning_contents) # All `response.output` items map to one answer, so we return a list of size 1. return [result] def inspect_history(n: int = 1): """The global history shared across all LMs.""" return pretty_print_history(GLOBAL_HISTORY, n) ``` -------------------------------------------------------------------------------- /dspy/primitives/python_interpreter.py: -------------------------------------------------------------------------------- ```python import json import os import subprocess from os import PathLike from types import TracebackType from typing import Any class InterpreterError(RuntimeError): pass class PythonInterpreter: r""" PythonInterpreter that runs code in a sandboxed environment using Deno and Pyodide. Prerequisites: - Deno (https://docs.deno.com/runtime/getting_started/installation/). Example Usage: ```python code_string = "print('Hello'); 1 + 2" with PythonInterpreter() as interp: output = interp(code_string) # If final statement is non-None, prints the numeric result, else prints captured output ``` """ def __init__( self, deno_command: list[str] | None = None, enable_read_paths: list[PathLike | str] | None = None, enable_write_paths: list[PathLike | str] | None = None, enable_env_vars: list[str] | None = None, enable_network_access: list[str] | None = None, sync_files: bool = True, ) -> None: """ Args: deno_command: command list to launch Deno. enable_read_paths: Files or directories to allow reading from in the sandbox. enable_write_paths: Files or directories to allow writing to in the sandbox. enable_env_vars: Environment variable names to allow in the sandbox. enable_network_access: Domains or IPs to allow network access in the sandbox. sync_files: If set, syncs changes within the sandbox back to original files after execution. """ if isinstance(deno_command, dict): deno_command = None # no-op, just a guard in case someone passes a dict self.enable_read_paths = enable_read_paths or [] self.enable_write_paths = enable_write_paths or [] self.enable_env_vars = enable_env_vars or [] self.enable_network_access = enable_network_access or [] self.sync_files = sync_files # TODO later on add enable_run (--allow-run) by proxying subprocess.run through Deno.run() to fix 'emscripten does not support processes' error if deno_command: self.deno_command = list(deno_command) else: args = ["deno", "run", "--allow-read"] self._env_arg = "" if self.enable_env_vars: user_vars = [str(v).strip() for v in self.enable_env_vars] args.append("--allow-env=" + ",".join(user_vars)) self._env_arg = ",".join(user_vars) if self.enable_network_access: args.append(f"--allow-net={','.join(str(x) for x in self.enable_network_access)}") if self.enable_write_paths: args.append(f"--allow-write={','.join(str(x) for x in self.enable_write_paths)}") args.append(self._get_runner_path()) # For runner.js to load in env vars if self._env_arg: args.append(self._env_arg) self.deno_command = args self.deno_process = None self._mounted_files = False def _get_runner_path(self) -> str: current_dir = os.path.dirname(os.path.abspath(__file__)) return os.path.join(current_dir, "runner.js") def _mount_files(self): if self._mounted_files: return paths_to_mount = [] if self.enable_read_paths: paths_to_mount.extend(self.enable_read_paths) if self.enable_write_paths: paths_to_mount.extend(self.enable_write_paths) if not paths_to_mount: return for path in paths_to_mount: if not path: continue if not os.path.exists(path): if self.enable_write_paths and path in self.enable_write_paths: open(path, "a").close() else: raise FileNotFoundError(f"Cannot mount non-existent file: {path}") virtual_path = f"/sandbox/{os.path.basename(path)}" mount_msg = json.dumps({"mount_file": str(path), "virtual_path": virtual_path}) self.deno_process.stdin.write(mount_msg + "\n") self.deno_process.stdin.flush() self._mounted_files = True def _sync_files(self): if not self.enable_write_paths or not self.sync_files: return for path in self.enable_write_paths: virtual_path = f"/sandbox/{os.path.basename(path)}" sync_msg = json.dumps({ "sync_file": virtual_path, "host_file": str(path) }) self.deno_process.stdin.write(sync_msg + "\n") self.deno_process.stdin.flush() def _ensure_deno_process(self) -> None: if self.deno_process is None or self.deno_process.poll() is not None: try: self.deno_process = subprocess.Popen( self.deno_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding="UTF-8", env=os.environ.copy() ) except FileNotFoundError as e: install_instructions = ( "Deno executable not found. Please install Deno to proceed.\n" "Installation instructions:\n" "> curl -fsSL https://deno.land/install.sh | sh\n" "*or*, on macOS with Homebrew:\n" "> brew install deno\n" "For additional configurations: https://docs.deno.com/runtime/getting_started/installation/" ) raise InterpreterError(install_instructions) from e def _inject_variables(self, code: str, variables: dict[str, Any]) -> str: # Insert Python assignments for each variable at the top of the code injected_lines = [] for key, value in variables.items(): if not key.isidentifier(): raise InterpreterError(f"Invalid variable name: '{key}'") python_value = self._serialize_value(value) injected_lines.append(f"{key} = {python_value}") injected_code = "\n".join(injected_lines) + "\n" + code return injected_code def _serialize_value(self, value: Any) -> str: # Basic safe serialization if isinstance(value, str): return repr(value) elif isinstance(value, (int, float, bool)): return str(value) elif value is None: return "None" elif isinstance(value, list) or isinstance(value, dict): return json.dumps(value) else: raise InterpreterError(f"Unsupported value type: {type(value).__name__}") def execute( self, code: str, variables: dict[str, Any] | None = None, ) -> Any: variables = variables or {} code = self._inject_variables(code, variables) self._ensure_deno_process() self._mount_files() # Send the code as JSON input_data = json.dumps({"code": code}) try: self.deno_process.stdin.write(input_data + "\n") self.deno_process.stdin.flush() except BrokenPipeError: # If the process died, restart and try again once self._ensure_deno_process() self.deno_process.stdin.write(input_data + "\n") self.deno_process.stdin.flush() # Read one JSON line from stdout output_line = self.deno_process.stdout.readline().strip() if not output_line: # Possibly the subprocess died or gave no output err_output = self.deno_process.stderr.read() raise InterpreterError(f"No output from Deno subprocess. Stderr: {err_output}") # Parse that line as JSON try: result = json.loads(output_line) except json.JSONDecodeError: # If not valid JSON, just return raw text result = {"output": output_line} # If we have an error, determine if it's a SyntaxError or other error using error.errorType. if "error" in result: error_msg = result["error"] error_type = result.get("errorType", "Sandbox Error") if error_type == "FinalAnswer": # The `FinalAnswer` trick to receive output from the sandbox interpreter, # just simply replace the output with the arguments. result["output"] = result.get("errorArgs", None) elif error_type == "SyntaxError": raise SyntaxError(f"Invalid Python syntax. message: {error_msg}") else: raise InterpreterError(f"{error_type}: {result.get('errorArgs') or error_msg}") # If there's no error or got `FinalAnswer`, return the "output" field self._sync_files() return result.get("output", None) def __enter__(self): return self # All exception fields are ignored and the runtime will automatically re-raise the exception def __exit__( self, _exc_type: type[BaseException] | None, _exc_val: BaseException | None, _exc_tb: TracebackType | None, ): self.shutdown() def __call__( self, code: str, variables: dict[str, Any] | None = None, ) -> Any: return self.execute(code, variables) def shutdown(self) -> None: if self.deno_process and self.deno_process.poll() is None: shutdown_message = json.dumps({"shutdown": True}) + "\n" self.deno_process.stdin.write(shutdown_message) self.deno_process.stdin.flush() self.deno_process.stdin.close() self.deno_process.wait() self.deno_process = None ``` -------------------------------------------------------------------------------- /tests/reliability/test_pydantic_models.py: -------------------------------------------------------------------------------- ```python from enum import Enum from typing import Any, List, Literal import pydantic import pytest import dspy from tests.reliability.utils import assert_program_output_correct, known_failing_models @pytest.mark.reliability def test_qa_with_pydantic_answer_model(): class Answer(pydantic.BaseModel): value: str certainty: float = pydantic.Field( description="A value between 0 and 1 indicating the model's confidence in the answer." ) comments: list[str] = pydantic.Field( description="At least two comments providing additional details about the answer." ) class QA(dspy.Signature): question: str = dspy.InputField() answer: Answer = dspy.OutputField() program = dspy.Predict(QA) question = "What is the capital of France?" answer = program(question=question).answer assert_program_output_correct( program_input=question, program_output=answer.value, grading_guidelines="The answer should be Paris. Answer should not contain extraneous information.", ) assert_program_output_correct( program_input=question, program_output=answer.comments, grading_guidelines=( "The comments should be relevant to the answer. They don't need to restate the answer explicitly." ), ) assert answer.certainty >= 0 assert answer.certainty <= 1 assert len(answer.comments) >= 2 @pytest.mark.parametrize("module", [dspy.Predict, dspy.ChainOfThought]) @pytest.mark.reliability def test_color_classification_using_enum(module): Color = Enum("Color", ["RED", "GREEN", "BLUE"]) class Colorful(dspy.Signature): text: str = dspy.InputField() color: Color = dspy.OutputField() program = module(Colorful) # Note: The precise text, including the trailing period, is important here for ensuring that # the program is correctly extracting the color from the text; previous implementations have # produced invalid enum responses for "The sky is blue.", but they have produced valid enum # responses for "The sky is blue" (without the period). color = program(text="The sky is blue.").color assert color == Color.BLUE @pytest.mark.reliability def test_entity_extraction_with_multiple_primitive_outputs(): class ExtractEntityFromDescriptionOutput(pydantic.BaseModel): entity_hu: str = pydantic.Field(description="The extracted entity in Hungarian, cleaned and lowercased.") entity_en: str = pydantic.Field(description="The English translation of the extracted Hungarian entity.") is_inverted: bool = pydantic.Field( description="Boolean flag indicating if the input is connected in an inverted way." ) categories: str = pydantic.Field(description="English categories separated by '|' to which the entity belongs.") review: bool = pydantic.Field( description="Boolean flag indicating low confidence or uncertainty in the extraction." ) class ExtractEntityFromDescription(dspy.Signature): """Extract an entity from a Hungarian description, provide its English translation, categories, and an inverted flag.""" description: str = dspy.InputField(description="The input description in Hungarian.") entity: ExtractEntityFromDescriptionOutput = dspy.OutputField( description="The extracted entity and its properties." ) program = dspy.ChainOfThought(ExtractEntityFromDescription) description = "A kávé egy növényi eredetű ital, amelyet a kávébabból készítenek." extracted_entity = program(description=description).entity assert_program_output_correct( program_input=description, program_output=extracted_entity.entity_hu, grading_guidelines="The translation of the extracted entity into English should be equivalent to 'coffee'", ) assert_program_output_correct( program_input=description, program_output=extracted_entity.entity_en, grading_guidelines="The extracted entity should be equivalent to 'coffee'", ) assert_program_output_correct( program_input=description, program_output=extracted_entity.categories, grading_guidelines=( "The extracted entity should be associated with English language categories that apply to the word 'coffee'." " The categories should be separated by the character '|'." ), ) @pytest.mark.parametrize("module", [dspy.Predict, dspy.ChainOfThought]) @pytest.mark.reliability def test_tool_calling_with_literals(module): next_tool_names = [ "get_docs", "finish", "search_policy", "notify_manager", "calculate_accrual", "combine_leave", "review_seniority_rules", "fetch_calendar", "verify_compensation", "check_carryover_policy", ] class ToolCalling(dspy.Signature): """ Given the fields question, produce the fields response. You will be given question and your goal is to finish with response. To do this, you will interleave Thought, Tool Name, and Tool Args, and receive a resulting Observation. """ question: str = dspy.InputField() trajectory: str = dspy.InputField() next_thought: str = dspy.OutputField() next_tool_name: Literal[ "get_docs", "finish", "search_policy", "notify_manager", "calculate_accrual", "combine_leave", "review_seniority_rules", "fetch_calendar", "verify_compensation", "check_carryover_policy", ] = dspy.OutputField() next_tool_args: dict[str, Any] = dspy.OutputField() response_status: Literal["success", "error", "pending"] = dspy.OutputField() user_intent: Literal["informational", "transactional", "exploratory"] = dspy.OutputField() program = dspy.Predict(ToolCalling) prediction = program( question=( "Tell me more about the company's internal policy for paid time off (PTO), " "including as many details as possible. I want to know how PTO is accrued—are " "there fixed rates, and do they vary by employee seniority or length of service? " "Are there specific rules about carrying unused PTO into the next calendar year, " "or is it a 'use it or lose it' system? Additionally, if an employee plans to take " "extended leave for a vacation or personal reasons, what is the process for submitting " "a request, and how far in advance should they notify their manager? Is there any overlap " "or interaction between PTO and other forms of leave, such as sick leave or parental leave? " "For example, can PTO be combined with those leave types to create a longer absence, or are " "they strictly separate? I’d also like to know if there are any restrictions on when PTO can " "be used, such as during critical business periods or holidays. Finally, what is the official " "policy if an employee resigns or is terminated—are they compensated for unused PTO days, and if " "so, at what rate?" ), trajectory=( "[" "{'thought': 'Start by understanding PTO accrual rules.', 'tool_name': 'search_policy', 'tool_args': {'topic': 'PTO accrual rates'}}, " "{'thought': 'Clarify whether PTO accrual rates vary by seniority.', 'tool_name': 'review_seniority_rules', 'tool_args': {}}, " "{'thought': 'Identify carryover rules for unused PTO.', 'tool_name': 'check_carryover_policy', 'tool_args': {'year': 'current year'}}, " "{'thought': 'Determine policies on extended leave requests.', 'tool_name': 'search_policy', 'tool_args': {'topic': 'PTO leave request process'}}, " "{'thought': 'Check the notification requirements for extended PTO.', 'tool_name': 'notify_manager', 'tool_args': {'type': 'extended leave'}}, " "{'thought': 'Investigate overlap between PTO and sick leave.', 'tool_name': 'combine_leave', 'tool_args': {'types': ['PTO', 'sick leave']}}, " "{'thought': 'Explore how PTO interacts with parental leave.', 'tool_name': 'combine_leave', 'tool_args': {'types': ['PTO', 'parental leave']}}, " "{'thought': 'Fetch the company calendar to determine critical business periods.', 'tool_name': 'fetch_calendar', 'tool_args': {'year': 'current year'}}, " "{'thought': 'Verify restrictions on PTO usage during holidays.', 'tool_name': 'search_policy', 'tool_args': {'topic': 'holiday restrictions on PTO'}}, " "{'thought': 'Confirm whether unused PTO is compensated upon termination.', 'tool_name': 'verify_compensation', 'tool_args': {'scenario': 'termination'}}, " "{'thought': 'Check if PTO is compensated differently upon resignation.', 'tool_name': 'verify_compensation', 'tool_args': {'scenario': 'resignation'}}, " "{'thought': 'Review if accrual caps limit PTO earnings.', 'tool_name': 'calculate_accrual', 'tool_args': {'cap': True}}, " "{'thought': 'Investigate whether senior employees receive additional PTO benefits.', 'tool_name': 'review_seniority_rules', 'tool_args': {'seniority_level': 'high'}}, " "{'thought': 'Assess policy transparency in PTO documentation.', 'tool_name': 'search_policy', 'tool_args': {'topic': 'PTO documentation clarity'}}, " "{'thought': 'Explore how leave types can be optimized together.', 'tool_name': 'combine_leave', 'tool_args': {'types': ['PTO', 'other leave']}}, " "{'thought': 'Check historical trends in PTO policy changes.', 'tool_name': 'get_docs', 'tool_args': {'document': 'PTO history'}}" "]" ), ) assert prediction.next_tool_name in next_tool_names ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/observability/index.md: -------------------------------------------------------------------------------- ```markdown # Tutorial: Debugging and Observability in DSPy This guide demonstrates how to debug problems and improve observability in DSPy. Modern AI programs often involve multiple components, such as language models, retrievers, and tools. DSPy allows you to build and optimize such complex AI systems in a clean and modular way. However, as systems grow more sophisticated, the ability to **understand what your system is doing** becomes critical. Without transparency, the prediction process can easily become a black box, making failures or quality issues difficult to diagnose and production maintenance challenging. By the end of this tutorial, you'll understand how to debug an issue and improve observability using [MLflow Tracing](#tracing). You'll also explore how to build a custom logging solution using callbacks. ## Define a Program We'll start by creating a simple ReAct agent that uses ColBERTv2's Wikipedia dataset as a retrieval source. You can replace this with a more sophisticated program. ```python import dspy import os os.environ["OPENAI_API_KEY"] = "{your_openai_api_key}" lm = dspy.LM("openai/gpt-4o-mini") colbert = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts") dspy.configure(lm=lm) def retrieve(query: str): """Retrieve top 3 relevant information from ColBert""" results = colbert(query, k=3) return [x["text"] for x in results] agent = dspy.ReAct("question -> answer", tools=[retrieve], max_iters=3) ``` Now, let's ask the agent a simple question: ```python prediction = agent(question="Which baseball team does Shohei Ohtani play for in June 2025?") print(prediction.answer) ``` ``` Shohei Ohtani is expected to play for the Hokkaido Nippon-Ham Fighters in June 2025, based on the available information. ``` Oh, this is incorrect. He no longer plays for the Hokkaido Nippon-Ham Fighters; he moved to the Dodgers and won the World Series in 2024! Let's debug the program and explore potential fixes. ## Using ``inspect_history`` DSPy provides the `inspect_history()` utility, which prints out all LLM invocations made so far: ```python # Print out 5 LLM calls dspy.inspect_history(n=5) ``` ``` [2024-12-01T10:23:29.144257] System message: Your input fields are: 1. `question` (str) ... Response: Response: [[ ## reasoning ## ]] The search for information regarding Shohei Ohtani's team in June 2025 did not yield any specific results. The retrieved data consistently mentioned that he plays for the Hokkaido Nippon-Ham Fighters, but there was no indication of any changes or updates regarding his team for the specified date. Given the lack of information, it is reasonable to conclude that he may still be with the Hokkaido Nippon-Ham Fighters unless there are future developments that are not captured in the current data. [[ ## answer ## ]] Shohei Ohtani is expected to play for the Hokkaido Nippon-Ham Fighters in June 2025, based on the available information. [[ ## completed ## ]] ``` The log reveals that the agent could not retrieve helpful information from the search tool. However, what exactly did the retriever return? While useful, `inspect_history` has some limitations: * In real-world systems, other components like retrievers, tools, and custom modules play significant roles, but `inspect_history` only logs LLM calls. * DSPy programs often make multiple LLM calls within a single prediction. Monolith log history makes it hard to organize logs, especially when handling multiple questions. * Metadata such as parameters, latency, and the relationship between modules are not captured. **Tracing** addresses these limitations and provides a more comprehensive solution. ## Tracing [MLflow](https://mlflow.org/docs/latest/llms/tracing/index.html) is an end-to-end machine learning platform that is integrated seamlessly with DSPy to support best practices in LLMOps. Using MLflow's automatic tracing capability with DSPy is straightforward; **No sign up for services or an API key is required**. You just need to install MLflow and call `mlflow.dspy.autolog()` in your notebook or script. ```bash pip install -U mlflow>=2.18.0 ``` After installation, spin up your server via the command below. ``` # It is highly recommended to use SQL store when using MLflow tracing mlflow server --backend-store-uri sqlite:///mydb.sqlite ``` If you don't specify a different port via `--port` flag, you MLflow server will be hosted at port 5000. Now let's change our code snippet to enable MLflow tracing. We need to: - Tell MLflow where the server is hosted. - Apply `mlflow.autolog()` so that DSPy tracing is automatically captured. The full code is as below, now let's run it again! ```python import dspy import os import mlflow os.environ["OPENAI_API_KEY"] = "{your_openai_api_key}" # Tell MLflow about the server URI. mlflow.set_tracking_uri("http://127.0.0.1:5000") # Create a unique name for your experiment. mlflow.set_experiment("DSPy") lm = dspy.LM("openai/gpt-4o-mini") colbert = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts") dspy.configure(lm=lm) def retrieve(query: str): """Retrieve top 3 relevant information from ColBert""" results = colbert(query, k=3) return [x["text"] for x in results] agent = dspy.ReAct("question -> answer", tools=[retrieve], max_iters=3) print(agent(question="Which baseball team does Shohei Ohtani play for?")) ``` MLflow automatically generates a **trace** for each prediction and records it within your experiment. To explore these traces visually, open `http://127.0.0.1:5000/` in your browser, then select the experiment you just created and navigate to the Traces tab:  Click on the most recent trace to view its detailed breakdown:  Here, you can examine the input and output of every step in your workflow. For example, the screenshot above shows the `retrieve` function's input and output. By inspecting the retriever's output, you can see that it returned outdated information, which is not sufficient to determine which team Shohei Ohtani plays for in June 2025. You can also inspect other steps, e.g, language model's input, output, and configuration. To address the issue of outdated information, you can replace the `retrieve` function with a web search tool powered by [Tavily search](https://www.tavily.com/). ```python from tavily import TavilyClient import dspy import mlflow # Tell MLflow about the server URI. mlflow.set_tracking_uri("http://127.0.0.1:5000") # Create a unique name for your experiment. mlflow.set_experiment("DSPy") search_client = TavilyClient(api_key="<YOUR_TAVILY_API_KEY>") def web_search(query: str) -> list[str]: """Run a web search and return the content from the top 5 search results""" response = search_client.search(query) return [r["content"] for r in response["results"]] agent = dspy.ReAct("question -> answer", tools=[web_search]) prediction = agent(question="Which baseball team does Shohei Ohtani play for?") print(agent.answer) ``` ``` Los Angeles Dodgers ``` Below is a GIF demonstrating how to navigate through the MLflow UI:  For a complete guide on how to use MLflow tracing, please refer to the [MLflow Tracing Guide](https://mlflow.org/docs/3.0.0rc0/tracing). !!! info Learn more about MLflow MLflow is an end-to-end LLMOps platform that offers extensive features like experiment tracking, evaluation, and deployment. To learn more about DSPy and MLflow integration, visit [this tutorial](../deployment/index.md#deploying-with-mlflow). ## Building a Custom Logging Solution Sometimes, you may want to implement a custom logging solution. For instance, you might need to log specific events triggered by a particular module. DSPy's **callback** mechanism supports such use cases. The ``BaseCallback`` class provides several handlers for customizing logging behavior: |Handlers|Description| |:--|:--| |`on_module_start` / `on_module_end` | Triggered when a `dspy.Module` subclass is invoked. | |`on_lm_start` / `on_lm_end` | Triggered when a `dspy.LM` subclass is invoked. | |`on_adapter_format_start` / `on_adapter_format_end`| Triggered when a `dspy.Adapter` subclass formats the input prompt. | |`on_adapter_parse_start` / `on_adapter_parse_end`| Triggered when a `dspy.Adapter` subclass postprocess the output text from an LM. | |`on_tool_start` / `on_tool_end` | Triggered when a `dspy.Tool` subclass is invoked. | |`on_evaluate_start` / `on_evaluate_end` | Triggered when a `dspy.Evaluate` instance is invoked. | Here's an example of custom callback that logs the intermediate steps of a ReAct agent: ```python import dspy from dspy.utils.callback import BaseCallback # 1. Define a custom callback class that extends BaseCallback class class AgentLoggingCallback(BaseCallback): # 2. Implement on_module_end handler to run a custom logging code. def on_module_end(self, call_id, outputs, exception): step = "Reasoning" if self._is_reasoning_output(outputs) else "Acting" print(f"== {step} Step ===") for k, v in outputs.items(): print(f" {k}: {v}") print("\n") def _is_reasoning_output(self, outputs): return any(k.startswith("Thought") for k in outputs.keys()) # 3. Set the callback to DSPy setting so it will be applied to program execution dspy.configure(callbacks=[AgentLoggingCallback()]) ``` ``` == Reasoning Step === Thought_1: I need to find the current team that Shohei Ohtani plays for in Major League Baseball. Action_1: Search[Shohei Ohtani current team 2023] == Acting Step === passages: ["Shohei Ohtani ..."] ... ``` !!! info Handling Inputs and Outputs in Callbacks Be cautious when working with input or output data in callbacks. Mutating them in-place can modify the original data passed to the program, potentially leading to unexpected behavior. To avoid this, it's strongly recommended to create a copy of the data before performing any operations that may alter it. ``` -------------------------------------------------------------------------------- /dspy/predict/predict.py: -------------------------------------------------------------------------------- ```python import logging import random from pydantic import BaseModel from dspy.adapters.chat_adapter import ChatAdapter from dspy.clients.base_lm import BaseLM from dspy.clients.lm import LM from dspy.dsp.utils.settings import settings from dspy.predict.parameter import Parameter from dspy.primitives.module import Module from dspy.primitives.prediction import Prediction from dspy.signatures.signature import Signature, ensure_signature from dspy.utils.callback import BaseCallback logger = logging.getLogger(__name__) class Predict(Module, Parameter): """Basic DSPy module that maps inputs to outputs using a language model. Args: signature: The input/output signature describing the task. callbacks: Optional list of callbacks for instrumentation. **config: Default keyword arguments forwarded to the underlying language model. These values can be overridden for a single invocation by passing a ``config`` dictionary when calling the module. For example:: predict = dspy.Predict("q -> a", rollout_id=1, temperature=1.0) predict(q="What is 1 + 52?", config={"rollout_id": 2, "temperature": 1.0}) """ def __init__(self, signature: str | type[Signature], callbacks: list[BaseCallback] | None = None, **config): super().__init__(callbacks=callbacks) self.stage = random.randbytes(8).hex() self.signature = ensure_signature(signature) self.config = config self.reset() def reset(self): self.lm = None self.traces = [] self.train = [] self.demos = [] def dump_state(self, json_mode=True): state_keys = ["traces", "train"] state = {k: getattr(self, k) for k in state_keys} state["demos"] = [] for demo in self.demos: demo = demo.copy() for field in demo: # FIXME: Saving BaseModels as strings in examples doesn't matter because you never re-access as an object demo[field] = serialize_object(demo[field]) if isinstance(demo, dict) or not json_mode: state["demos"].append(demo) else: state["demos"].append(demo.toDict()) state["signature"] = self.signature.dump_state() state["lm"] = self.lm.dump_state() if self.lm else None return state def load_state(self, state: dict) -> "Predict": """Load the saved state of a `Predict` object. Args: state: The saved state of a `Predict` object. Returns: Self to allow method chaining. """ excluded_keys = ["signature", "extended_signature", "lm"] for name, value in state.items(): # `excluded_keys` are fields that go through special handling. if name not in excluded_keys: setattr(self, name, value) self.signature = self.signature.load_state(state["signature"]) self.lm = LM(**state["lm"]) if state["lm"] else None if "extended_signature" in state: # legacy, up to and including 2.5, for CoT. raise NotImplementedError("Loading extended_signature is no longer supported in DSPy 2.6+") return self def _get_positional_args_error_message(self): input_fields = list(self.signature.input_fields.keys()) return ( "Positional arguments are not allowed when calling `dspy.Predict`, must use keyword arguments " f"that match your signature input fields: '{', '.join(input_fields)}'. For example: " f"`predict({input_fields[0]}=input_value, ...)`." ) def __call__(self, *args, **kwargs): if args: raise ValueError(self._get_positional_args_error_message()) return super().__call__(**kwargs) async def acall(self, *args, **kwargs): if args: raise ValueError(self._get_positional_args_error_message()) return await super().acall(**kwargs) def _forward_preprocess(self, **kwargs): # Extract the three privileged keyword arguments. assert "new_signature" not in kwargs, "new_signature is no longer a valid keyword argument." signature = ensure_signature(kwargs.pop("signature", self.signature)) demos = kwargs.pop("demos", self.demos) config = {**self.config, **kwargs.pop("config", {})} # Get the right LM to use. lm = kwargs.pop("lm", self.lm) or settings.lm if lm is None: raise ValueError( "No LM is loaded. Please configure the LM using `dspy.configure(lm=dspy.LM(...))`. e.g, " "`dspy.configure(lm=dspy.LM('openai/gpt-4o-mini'))`" ) if isinstance(lm, str): # Many users mistakenly use `dspy.configure(lm="openai/gpt-4o-mini")` instead of # `dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))`, so we are providing a specific error message. raise ValueError( f"LM must be an instance of `dspy.BaseLM`, not a string. Instead of using a string like " f"'dspy.configure(lm=\"{lm}\")', please configure the LM like 'dspy.configure(lm=dspy.LM(\"{lm}\"))'" ) elif not isinstance(lm, BaseLM): raise ValueError(f"LM must be an instance of `dspy.BaseLM`, not {type(lm)}. Received `lm={lm}`.") # If temperature is unset or <=0.15, and n > 1, set temperature to 0.7 to keep randomness. temperature = config.get("temperature") or lm.kwargs.get("temperature") num_generations = config.get("n") or lm.kwargs.get("n") or lm.kwargs.get("num_generations") or 1 if (temperature is None or temperature <= 0.15) and num_generations > 1: config["temperature"] = 0.7 if "prediction" in kwargs: if ( isinstance(kwargs["prediction"], dict) and kwargs["prediction"].get("type") == "content" and "content" in kwargs["prediction"] ): # If the `prediction` is the standard predicted outputs format # (https://platform.openai.com/docs/guides/predicted-outputs), we remove it from input kwargs and add it # to the lm kwargs. config["prediction"] = kwargs.pop("prediction") if not all(k in kwargs for k in signature.input_fields): present = [k for k in signature.input_fields if k in kwargs] missing = [k for k in signature.input_fields if k not in kwargs] logger.warning( "Not all input fields were provided to module. Present: %s. Missing: %s.", present, missing, ) return lm, config, signature, demos, kwargs def _forward_postprocess(self, completions, signature, **kwargs): pred = Prediction.from_completions(completions, signature=signature) if kwargs.pop("_trace", True) and settings.trace is not None and settings.max_trace_size > 0: trace = settings.trace if len(trace) >= settings.max_trace_size: trace.pop(0) trace.append((self, {**kwargs}, pred)) return pred def _should_stream(self): stream_listeners = settings.stream_listeners or [] should_stream = settings.send_stream is not None if should_stream and len(stream_listeners) > 0: should_stream = any(stream_listener.predict == self for stream_listener in stream_listeners) return should_stream def forward(self, **kwargs): lm, config, signature, demos, kwargs = self._forward_preprocess(**kwargs) adapter = settings.adapter or ChatAdapter() if self._should_stream(): with settings.context(caller_predict=self): completions = adapter(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs) else: with settings.context(send_stream=None): completions = adapter(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs) return self._forward_postprocess(completions, signature, **kwargs) async def aforward(self, **kwargs): lm, config, signature, demos, kwargs = self._forward_preprocess(**kwargs) adapter = settings.adapter or ChatAdapter() if self._should_stream(): with settings.context(caller_predict=self): completions = await adapter.acall(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs) else: with settings.context(send_stream=None): completions = await adapter.acall(lm, lm_kwargs=config, signature=signature, demos=demos, inputs=kwargs) return self._forward_postprocess(completions, signature, **kwargs) def update_config(self, **kwargs): self.config = {**self.config, **kwargs} def get_config(self): return self.config def __repr__(self): return f"{self.__class__.__name__}({self.signature})" def serialize_object(obj): """ Recursively serialize a given object into a JSON-compatible format. Supports Pydantic models, lists, dicts, and primitive types. """ if isinstance(obj, BaseModel): # Use model_dump with mode="json" to ensure all fields (including HttpUrl, datetime, etc.) # are converted to JSON-serializable types (strings) return obj.model_dump(mode="json") elif isinstance(obj, list): return [serialize_object(item) for item in obj] elif isinstance(obj, tuple): return tuple(serialize_object(item) for item in obj) elif isinstance(obj, dict): return {key: serialize_object(value) for key, value in obj.items()} else: return obj # # TODO: FIXME: Hmm, I guess expected behavior is that contexts can # affect execution. Well, we need to determine whether context dominates, __init__ demoninates, or forward dominates. # Generally, unless overwritten, we'd see n=None, temperature=None. # That will eventually mean we have to learn them. ``` -------------------------------------------------------------------------------- /dspy/adapters/baml_adapter.py: -------------------------------------------------------------------------------- ```python """ Custom adapter for improving structured outputs using the information from Pydantic models. Based on the format used by BAML: https://github.com/BoundaryML/baml """ import inspect import types from typing import Any, Literal, Union, get_args, get_origin from pydantic import BaseModel from dspy.adapters.json_adapter import JSONAdapter from dspy.adapters.utils import format_field_value as original_format_field_value from dspy.signatures.signature import Signature # Changing the comment symbol to Python's # rather than other languages' // seems to help COMMENT_SYMBOL = "#" def _render_type_str( annotation: Any, depth: int = 0, indent: int = 0, seen_models: set[type] | None = None, ) -> str: """Recursively renders a type annotation into a simplified string. Args: annotation: The type annotation to render depth: Current recursion depth (prevents infinite recursion) indent: Current indentation level for nested structures """ # Non-nested types if annotation is str: return "string" if annotation is int: return "int" if annotation is float: return "float" if annotation is bool: return "boolean" if inspect.isclass(annotation) and issubclass(annotation, BaseModel): return _build_simplified_schema(annotation, indent, seen_models) try: origin = get_origin(annotation) args = get_args(annotation) except Exception: return str(annotation) # Optional[T] or T | None if origin in (types.UnionType, Union): non_none_args = [arg for arg in args if arg is not type(None)] # Render the non-None part of the union type_render = " or ".join([_render_type_str(arg, depth + 1, indent) for arg in non_none_args]) # Add "or null" if None was part of the union if len(non_none_args) < len(args): return f"{type_render} or null" return type_render # Literal[T1, T2, ...] if origin is Literal: return " or ".join(f'"{arg}"' for arg in args) # list[T] if origin is list: # For Pydantic models in lists, use bracket notation inner_type = args[0] if inspect.isclass(inner_type) and issubclass(inner_type, BaseModel): # Build inner schema - the Pydantic model inside should use indent level for array contents inner_schema = _build_simplified_schema(inner_type, indent + 1, seen_models) # Format with proper bracket notation and indentation current_indent = " " * indent return f"[\n{inner_schema}\n{current_indent}]" else: return f"{_render_type_str(inner_type, depth + 1, indent)}[]" # dict[T1, T2] if origin is dict: return f"dict[{_render_type_str(args[0], depth + 1, indent)}, {_render_type_str(args[1], depth + 1, indent)}]" # fallback if hasattr(annotation, "__name__"): return annotation.__name__ return str(annotation) def _build_simplified_schema( pydantic_model: type[BaseModel], indent: int = 0, seen_models: set[type] | None = None, ) -> str: """Builds a simplified, human-readable schema from a Pydantic model. Args: pydantic_model: The Pydantic model to build schema for indent: Current indentation level seen_models: Set to track visited pydantic models (prevents infinite recursion) """ seen_models = seen_models or set() if pydantic_model in seen_models: raise ValueError("BAMLAdapter cannot handle recursive pydantic models, please use a different adapter.") # Add `pydantic_model` to `seen_models` with a placeholder value to avoid infinite recursion. seen_models.add(pydantic_model) lines = [] current_indent = " " * indent next_indent = " " * (indent + 1) lines.append(f"{current_indent}{{") fields = pydantic_model.model_fields if not fields: lines.append(f"{next_indent}{COMMENT_SYMBOL} No fields defined") for name, field in fields.items(): if field.description: lines.append(f"{next_indent}{COMMENT_SYMBOL} {field.description}") elif field.alias and field.alias != name: # If there's an alias but no description, show the alias as a comment lines.append(f"{next_indent}{COMMENT_SYMBOL} alias: {field.alias}") rendered_type = _render_type_str(field.annotation, indent=indent + 1, seen_models=seen_models) line = f"{next_indent}{name}: {rendered_type}," lines.append(line) lines.append(f"{current_indent}}}") return "\n".join(lines) class BAMLAdapter(JSONAdapter): """ A DSPy adapter that improves the rendering of complex/nested Pydantic models to help LMs. This adapter generates a compact, human-readable schema representation for nested Pydantic output fields, inspired by the BAML project's JSON formatter (https://github.com/BoundaryML/baml). The resulting rendered schema is more token-efficient and easier for smaller LMs to follow than a raw JSON schema. It also includes Pydantic field descriptions as comments in the schema, which provide valuable additional context for the LM to understand the expected output. Example Usage: ```python import dspy from pydantic import BaseModel, Field from typing import Literal from baml_adapter import BAMLAdapter # Import from your module # 1. Define your Pydantic models class PatientAddress(BaseModel): street: str city: str country: Literal["US", "CA"] class PatientDetails(BaseModel): name: str = Field(description="Full name of the patient.") age: int address: PatientAddress | None # 2. Define a signature using the Pydantic model as an output field class ExtractPatientInfo(dspy.Signature): '''Extract patient information from the clinical note.''' clinical_note: str = dspy.InputField() patient_info: PatientDetails = dspy.OutputField() # 3. Configure dspy to use the new adapter llm = dspy.OpenAI(model="gpt-4.1-mini") dspy.configure(lm=llm, adapter=BAMLAdapter()) # 4. Run your program extractor = dspy.Predict(ExtractPatientInfo) note = "John Doe, 45 years old, lives at 123 Main St, Anytown. Resident of the US." result = extractor(clinical_note=note) print(result.patient_info) # Expected output: # PatientDetails(name='John Doe', age=45, address=PatientAddress(street='123 Main St', city='Anytown', country='US')) ``` """ def format_field_description(self, signature: type[Signature]) -> str: """Format the field description for the system message.""" sections = [] # Add input field descriptions if signature.input_fields: sections.append("Your input fields are:") for i, (name, field) in enumerate(signature.input_fields.items(), 1): type_name = getattr(field.annotation, "__name__", str(field.annotation)) description = f": {field.description}" if field.description else ":" sections.append(f"{i}. `{name}` ({type_name}){description}") # Add output field descriptions if signature.output_fields: sections.append("Your output fields are:") for i, (name, field) in enumerate(signature.output_fields.items(), 1): type_name = getattr(field.annotation, "__name__", str(field.annotation)) description = f": {field.description}" if field.description else ":" sections.append(f"{i}. `{name}` ({type_name}){description}") return "\n".join(sections) def format_field_structure(self, signature: type[Signature]) -> str: """Overrides the base method to generate a simplified schema for Pydantic models.""" sections = [] # Add structural explanation sections.append( "All interactions will be structured in the following way, with the appropriate values filled in.\n" ) # Add input structure section if signature.input_fields: for name in signature.input_fields.keys(): sections.append(f"[[ ## {name} ## ]]") sections.append(f"{{{name}}}") sections.append("") # Empty line after each input # Add output structure section if signature.output_fields: for name, field in signature.output_fields.items(): field_type = field.annotation sections.append(f"[[ ## {name} ## ]]") sections.append(f"Output field `{name}` should be of type: {_render_type_str(field_type, indent=0)}\n") # Add completed section sections.append("[[ ## completed ## ]]") return "\n".join(sections) def format_user_message_content( self, signature: type[Signature], inputs: dict[str, Any], prefix: str = "", suffix: str = "", main_request: bool = False, ) -> str: """Overrides the base method to render Pydantic input instances as clean JSON.""" messages = [prefix] for key, field_info in signature.input_fields.items(): if key in inputs: value = inputs.get(key) formatted_value = "" if isinstance(value, BaseModel): # Use clean, indented JSON for Pydantic instances formatted_value = value.model_dump_json(indent=2, by_alias=True) else: # Fallback to the original dspy formatter for other types formatted_value = original_format_field_value(field_info=field_info, value=value) messages.append(f"[[ ## {key} ## ]]\n{formatted_value}") if main_request: output_requirements = self.user_message_output_requirements(signature) if output_requirements is not None: messages.append(output_requirements) messages.append(suffix) return "\n\n".join(m for m in messages if m).strip() ```