This is page 3 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.github/.internal_dspyai/internals/build-and-release.md: -------------------------------------------------------------------------------- ```markdown # Build & Release Workflow Implementation The [build_and_release](https://github.com/stanfordnlp/dspy/blob/main/.github/workflows/build_and_release.yml) workflow automates deployments of dspy-ai to pypi. For a guide to triggering a release using the workflow, refer to [release checklist](release-checklist.md). ## Overview At a high level, the workflow works as follows: 1. Maintainer of the repo pushes a tag following [semver](https://semver.org/) versioning for the new release. 2. This triggers the github action which extracts the tag (the version) 3. Builds and publishes a release on [test-pypi](https://test.pypi.org/project/dspy-ai-test/) 4. Uses the test-pypi release to run build_utils/tests/intro.py with the new release as an integration test. Note intro.py is a copy of the intro notebook. 5. Assuming the test runs successfully, it pushes a release to [pypi](https://pypi.org/project/dspy-ai/). If not, the user can delete the tag, make the fixes and then push the tag again. Versioning for multiple releases to test-pypi with the same tag version is taken care of by the workflow by appending a pre-release identifier, so the user only needs to consider the version for pypi. 6. (Currently manual) the user creates a release and includes release notes, as described in docs/docs/release-checklist.md ## Implementation Details The workflow executes a series of jobs in sequence: - extract-tag - build-and-publish-test-pypi - test-intro-script - build-and-publish-pypi #### extract-tag Extracts the tag pushed to the commit. This tag is expected to be the version of the new deployment. #### build-and-publish-test-pypi Builds and publishes the package to test-pypi. 1. Determines the version that should be deployed to test-pypi. There may be an existing deployment with the version specified by the tag in the case that a deployment failed and the maintainer made some changes and pushed the same tag again (which is the intended usage). The following logic is implemented [test_version.py](https://github.com/stanfordnlp/dspy/blob/main/build_utils/test_version.py) 1. Load the releases on test-pypi 1. Check if there is a release matching our current tag 1. If not, create a release with the current tag 1. If it exists, oad the latest published version (this will either be the version with the tag itself, or the tag + a pre-release version). In either case, increment the pre-release version. 1. Updates the version placeholder in [setup.py](https://github.com/stanfordnlp/dspy/blob/main/setup.py) to the version obtained in step 1. 1. Updates the version placeholder in [pyproject.toml](https://github.com/stanfordnlp/dspy/blob/main/pyproject.toml) to the version obtained in step 1. 1. Updates the package name placeholder in [setup.py](https://github.com/stanfordnlp/dspy/blob/main/setup.py) to `dspy-ai-test`* 1. Updates the package name placeholder in [pyproject.toml](https://github.com/stanfordnlp/dspy/blob/main/pyproject.toml) to `dspy-ai-test`* 1. Builds the binary wheel 1. Publishes the package to test-pypi. #### test-intro-script Runs the pytest containing the intro script as an integration test using the package published to test-pypi. This is a validation step before publishing to pypi. 1. Uses a loop to install the version just published to test-pypi as sometimes there is a race condition between the package becoming available for installation and this job executing. 2. Runs the test to ensure the package is working as expected. 3. If this fails, the workflow fails and the maintainer needs to make a fix and delete and then recreate the tag. #### build-and-publish-pypi Builds and publishes the package to pypi. 1. Updates the version placeholder in [setup.py](https://github.com/stanfordnlp/dspy/blob/main/setup.py) to the version obtained in step 1. 1. Updates the version placeholder in [pyproject.toml](https://github.com/stanfordnlp/dspy/blob/main/pyproject.toml) to the version obtained in step 1. 1. Updates the package name placeholder in [setup.py](https://github.com/stanfordnlp/dspy/blob/main/setup.py) to `dspy-ai`* 1. Updates the package name placeholder in [pyproject.toml](https://github.com/stanfordnlp/dspy/blob/main/pyproject.toml) to `dspy-ai`* 1. Builds the binary wheel 1. Publishes the package to pypi. \* The package name is updated by the workflow to allow the same files to be used to build both the pypi and test-pypi packages. ``` -------------------------------------------------------------------------------- /docs/docs/community/community-resources.md: -------------------------------------------------------------------------------- ```markdown # Resources This is the list of tutorials and blog posts on DSPy. If you would like to add your own tutorial, please make a PR. ## A Few Blogs & Videos on using DSPy ### Blogs | **Name** | **Link** | |---|---| | **Why I bet on DSPy** | [Blog](https://blog.isaacbmiller.com/posts/dspy) | | **Not Your Average Prompt Engineering** | [Blog](https://jina.ai/news/dspy-not-your-average-prompt-engineering/) | | **Why I'm excited about DSPy** | [Blog](https://substack.stephen.so/p/why-im-excited-about-dspy) | | **Achieving GPT-4 Performance at Lower Cost** | [Link](https://gradient.ai/blog/achieving-gpt-4-level-performance-at-lower-cost-using-dspy) | | **Prompt engineering is a task best left to AI models** | [Link](https://www.theregister.com/2024/02/22/prompt_engineering_ai_models/) | | **What makes DSPy a valuable framework for developing complex language model pipelines?** | [Link](https://medium.com/@sujathamudadla1213/what-makes-dspy-a-valuable-framework-for-developing-complex-language-model-pipelines-edfa5b4bcf9b) | | **DSPy: A new framework to program your foundation models just by prompting** | [Link](https://www.linkedin.com/pulse/dspy-new-framework-program-your-foundation-models-just-prompting-lli4c/) | | **Intro to DSPy: Goodbye Prompting, Hello Programming** | [Link](https://medium.com/towards-data-science/intro-to-dspy-goodbye-prompting-hello-programming-4ca1c6ce3eb9) | | **DSPyGen: Revolutionizing AI** | [Link](https://www.linkedin.com/pulse/launch-alert-dspygen-20242252-revolutionizing-ai-sean-chatman--g9f1c/) | | **Building an AI Assistant with DSPy** | [Link](https://www.linkedin.com/pulse/building-ai-assistant-dspy-valliappa-lakshmanan-vgnsc/) | | **Building Self-improving Agents in Production with DSPy** | [Link](https://relevanceai.com/blog/building-self-improving-agentic-systems-in-production-with-dspy) | ### Videos | **Name** | **Link** | |---|---| | **DSPy Explained! (60K views)** | [Link](https://www.youtube.com/watch?v=41EfOY0Ldkc) | | **DSPy Intro from Sephora (25K views)** | [Link](https://www.youtube.com/watch?v=D2HurSldDkE) | | **Structured Outputs with DSPy** | [Link](https://www.youtube.com/watch?v=tVw3CwrN5-8) | | **DSPy and ColBERT - Weaviate Podcast** | [Link](https://www.youtube.com/watch?v=CDung1LnLbY) | | **SBTB23 DSPy** | [Link](https://www.youtube.com/watch?v=Dt3H2ninoeY) | | **Optimization with DSPy and LangChain** | [Link](https://www.youtube.com/watch?v=4EXOmWeqXRc) | | **Automated Prompt Engineering + Visualization** | [Link](https://www.youtube.com/watch?v=eAZ2LtJ6D5k) | | **Transforming LM Calls into Pipelines** | [Link](https://www.youtube.com/watch?v=NoaDWKHdkHg) | | **NeurIPS Hacker Cup: DSPy for Code Gen** | [Link](https://www.youtube.com/watch?v=gpe-rtJN8z8) | | **MIPRO and DSPy - Weaviate Podcast** | [Link](https://www.youtube.com/watch?v=skMH3DOV_UQ) | | **Getting Started with RAG in DSPy** | [Link](https://www.youtube.com/watch?v=CEuUG4Umfxs) | | **Adding Depth to DSPy Programs** | [Link](https://www.youtube.com/watch?v=0c7Ksd6BG88) | | **Programming Foundation Models with DSPy** | [Link](https://www.youtube.com/watch?v=Y94tw4eDHW0) | | **DSPy End-to-End: SF Meetup** | [Link](https://www.youtube.com/watch?v=Y81DoFmt-2U) | | **Monitoring & Tracing DSPy with Langtrace** | [Link](https://langtrace.ai/blog/announcing-dspy-support-in-langtrace) | | **Teaching chat models to solve chess puzzles using DSPy + Finetuning** | [Link](https://raw.sh/posts/chess_puzzles) | | **Build Self-Improving AI Agents with DSPy (No Code)** | [Link](https://www.youtube.com/watch?v=UY8OsMlV21Y) | | **DSPy 3.0 and DSPy at Databricks** | [Link](https://www.youtube.com/watch?v=grIuzesOwwU) | | **Context Engineering with DSPy** | [Link](https://www.youtube.com/watch?v=1I9PoXzvWcs) | ### Slides | **Name** | **Link** | |---|---| | **Context Engineering with DSPy** | [Link](https://docs.google.com/presentation/d/1ydssF387l1LsJ14z41_HUqsJwU77tKZJNGnAWPsw-1I/edit?usp=sharing) | ### Podcasts Weaviate has a directory of 10 amazing notebooks and 6 podcasts! Huge shoutout to them for the massive support ❤️. See the [Weaviate DSPy directory](https://weaviate.io/developers/weaviate/more-resources/dspy). This list represents a curated selection of DSPy resources. We continuously add new content as it becomes available in the community. Credit: Some of these resources were originally compiled in the [Awesome DSPy](https://github.com/ganarajpr/awesome-dspy/tree/master) repo. ``` -------------------------------------------------------------------------------- /dspy/datasets/alfworld/alfworld.py: -------------------------------------------------------------------------------- ```python import os import queue import random def env_worker(inq, outq): """ Worker process: creates a single AlfredTWEnv instance, handles 'init' (with task idx) and 'step' (with action). """ try: import io from contextlib import redirect_stderr, redirect_stdout import alfworld.agents.environment as environment import yaml except ImportError: raise ImportError( "alfworld is not installed. " "Please install it via `pip install alfworld==0.3.5` then run `alfworld-download`." ) buf = io.StringIO() base_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(base_dir, "base_config.yml") with open(config_path) as f: config = yaml.safe_load(f) with redirect_stdout(buf), redirect_stderr(buf): base_env = environment.AlfredTWEnv(config, train_eval="train") env = None while True: cmd, data = inq.get() if cmd == "init": env = base_env.init_env(batch_size=1) env.skip(data) task_def, info = env.reset() outq.put((task_def[0], info)) elif cmd == "step": obs, rew, done, info = env.step([data]) outq.put((obs, rew, done, info)) elif cmd == "close": outq.put("CLOSED") break else: outq.put("UNKNOWN_CMD") class EnvPool: """ Pool of processes, each with a unique env_worker. Acquire a worker using a context manager for safe usage: with pool.session() as sess: sess.init(5) # init with idx=5 obs, rew, done, info = sess.step("go north") ... """ def __init__(self, size=2): self.size = size self.workers = [] self.available = queue.Queue() try: import multiprocess as mp except ImportError: raise ImportError("multiprocess is not installed. " "Please install it via `pip install multiprocess`.") # Must call set_start_method('spawn') here, before creating any processes try: mp.set_start_method("spawn", force=True) except RuntimeError: # If it's already set, ignore pass ctx = mp.get_context("spawn") for i in range(size): inq = ctx.Queue() outq = ctx.Queue() p = ctx.Process(target=env_worker, args=(inq, outq), daemon=True) p.start() self.workers.append((inq, outq, p)) self.available.put(i) def _acquire(self): wid = self.available.get() return wid, self.workers[wid][0], self.workers[wid][1] def _release(self, wid): self.available.put(wid) def close_all(self): """Close all processes in the pool.""" while not self.available.empty(): wid = self.available.get() inq, outq, proc = self.workers[wid] inq.put(("close", None)) outq.get() # Wait 'CLOSED' inq.close() outq.close() proc.join() def session(self): """Context manager that acquires/releases a single worker.""" return _EnvSession(self) class _EnvSession: """ A context manager that acquires a worker from the pool, provides .init(idx) and .step(action), then releases the worker. """ def __init__(self, pool: EnvPool): self.pool = pool self.wid = None self.inq = None self.outq = None def __enter__(self): self.wid, self.inq, self.outq = self.pool._acquire() return self def __exit__(self, exc_type, exc_val, exc_tb): self.pool._release(self.wid) def init(self, idx): self.inq.put(("init", idx)) return self.outq.get() # (task_def, info) def step(self, action): self.inq.put(("step", action)) return self.outq.get() # (obs, rew, done, info) class AlfWorld: def __init__(self, max_threads=20): self.POOL = EnvPool(size=max_threads) import dspy dataset = [dspy.Example(idx=idx).with_inputs("idx") for idx in range(3500)] random.Random(0).shuffle(dataset) trainset, devset = dataset[:3000], dataset[-500:] assert len(trainset) + len(devset) <= len(dataset) self.trainset = trainset self.devset = devset def __del__(self): self.POOL.close_all() ``` -------------------------------------------------------------------------------- /tests/retrievers/test_embeddings.py: -------------------------------------------------------------------------------- ```python import os import tempfile from concurrent.futures import ThreadPoolExecutor import numpy as np import pytest from dspy.retrievers.embeddings import Embeddings def dummy_corpus(): return [ "The cat sat on the mat.", "The dog barked at the mailman.", "Birds fly in the sky.", ] def dummy_embedder(texts): embeddings = [] for text in texts: if "cat" in text: embeddings.append(np.array([1, 0, 0], dtype=np.float32)) elif "dog" in text: embeddings.append(np.array([0, 1, 0], dtype=np.float32)) else: embeddings.append(np.array([0, 0, 1], dtype=np.float32)) return np.stack(embeddings) def test_embeddings_basic_search(): corpus = dummy_corpus() embedder = dummy_embedder retriever = Embeddings(corpus=corpus, embedder=embedder, k=1) query = "I saw a dog running." result = retriever(query) assert hasattr(result, "passages") assert hasattr(result, "indices") assert isinstance(result.passages, list) assert isinstance(result.indices, list) assert len(result.passages) == 1 assert len(result.indices) == 1 assert result.passages[0] == "The dog barked at the mailman." def test_embeddings_multithreaded_search(): corpus = dummy_corpus() embedder = dummy_embedder retriever = Embeddings(corpus=corpus, embedder=embedder, k=1) queries = [ ("A cat is sitting on the mat.", "The cat sat on the mat."), ("My dog is awesome!", "The dog barked at the mailman."), ("Birds flying high.", "Birds fly in the sky."), ] * 10 def worker(query_text, expected_passage): result = retriever(query_text) assert result.passages[0] == expected_passage return result.passages[0] with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(worker, q, expected) for q, expected in queries] # Results will be in original order results = [f.result() for f in futures] assert results[0] == "The cat sat on the mat." assert results[1] == "The dog barked at the mailman." assert results[2] == "Birds fly in the sky." def test_embeddings_save_load(): corpus = dummy_corpus() embedder = dummy_embedder original_retriever = Embeddings(corpus=corpus, embedder=embedder, k=2, normalize=False, brute_force_threshold=1000) with tempfile.TemporaryDirectory() as temp_dir: save_path = os.path.join(temp_dir, "test_embeddings") # Save original original_retriever.save(save_path) # Verify files were created assert os.path.exists(os.path.join(save_path, "config.json")) assert os.path.exists(os.path.join(save_path, "corpus_embeddings.npy")) assert not os.path.exists(os.path.join(save_path, "faiss_index.bin")) # No FAISS for small corpus # Load into new instance new_retriever = Embeddings(corpus=["dummy"], embedder=embedder, k=1, normalize=True, brute_force_threshold=500) new_retriever.load(save_path, embedder) # Verify configuration was loaded correctly assert new_retriever.corpus == corpus assert new_retriever.k == 2 assert new_retriever.normalize is False assert new_retriever.embedder == embedder assert new_retriever.index is None # Verify search results are preserved query = "cat sitting" original_result = original_retriever(query) loaded_result = new_retriever(query) assert loaded_result.passages == original_result.passages assert loaded_result.indices == original_result.indices def test_embeddings_from_saved(): corpus = dummy_corpus() embedder = dummy_embedder original_retriever = Embeddings(corpus=corpus, embedder=embedder, k=3, normalize=True, brute_force_threshold=1000) with tempfile.TemporaryDirectory() as temp_dir: save_path = os.path.join(temp_dir, "test_embeddings") original_retriever.save(save_path) loaded_retriever = Embeddings.from_saved(save_path, embedder) assert loaded_retriever.k == original_retriever.k assert loaded_retriever.normalize == original_retriever.normalize assert loaded_retriever.corpus == original_retriever.corpus def test_embeddings_load_nonexistent_path(): with pytest.raises((FileNotFoundError, OSError)): Embeddings.from_saved("/nonexistent/path", dummy_embedder) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/output_refinement/best-of-n-and-refine.md: -------------------------------------------------------------------------------- ```markdown # Output Refinement: BestOfN and Refine Both `BestOfN` and `Refine` are DSPy modules designed to improve the reliability and quality of predictions by making multiple `LM` calls with different rollout IDs to bypass caching. Both modules stop when they have reached `N` attempts or when the `reward_fn` returns an award above the `threshold`. ## BestOfN `BestOfN` is a module that runs the provided module multiple times (up to `N`) with different rollout IDs. It returns either the first prediction that passes a specified threshold or the one with the highest reward if none meets the threshold. ### Basic Usage Lets say we wanted to have the best chance of getting a one word answer from the model. We could use `BestOfN` to try multiple rollout IDs and return the best result. ```python import dspy def one_word_answer(args, pred: dspy.Prediction) -> float: return 1.0 if len(pred.answer.split()) == 1 else 0.0 best_of_3 = dspy.BestOfN( module=dspy.ChainOfThought("question -> answer"), N=3, reward_fn=one_word_answer, threshold=1.0 ) result = best_of_3(question="What is the capital of Belgium?") print(result.answer) # Brussels ``` ### Error Handling By default, if the module encounters an error during an attempt, it will continue trying until it reaches `N` attempts. You can adjust this behavior with the `fail_count` parameter: ```python best_of_3 = dspy.BestOfN( module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=1 ) best_of_3(question="What is the capital of Belgium?") # raises an error after the first failure ``` ## Refine `Refine` extends the functionality of `BestOfN` by adding an automatic feedback loop. After each unsuccessful attempt (except the final one), it automatically generates detailed feedback about the module's performance and uses this feedback as hints for subsequent runs. ### Basic Usage ```python import dspy def one_word_answer(args, pred: dspy.Prediction) -> float: return 1.0 if len(pred.answer.split()) == 1 else 0.0 refine = dspy.Refine( module=dspy.ChainOfThought("question -> answer"), N=3, reward_fn=one_word_answer, threshold=1.0 ) result = refine(question="What is the capital of Belgium?") print(result.answer) # Brussels ``` ### Error Handling Like `BestOfN`, `Refine` will try up to `N` times by default, even if errors occur. You can control this with the `fail_count` parameter: ```python # Stop after the first error refine = dspy.Refine( module=qa, N=3, reward_fn=one_word_answer, threshold=1.0, fail_count=1 ) ``` ## Comparison: BestOfN vs. Refine Both modules serve similar purposes but differ in their approach: - `BestOfN` simply tries different rollout IDs and selects the best resulting prediction as defined by the `reward_fn`. - `Refine` adds an feedback loop, using the lm to generate a detailed feedback about the module's own performance using the previous prediction and the code in the `reward_fn`. This feedback is then used as hints for subsequent runs. ## Practical Examples ### Ensuring Factual Correctness ```python import dspy class FactualityJudge(dspy.Signature): """Determine if a statement is factually accurate.""" statement: str = dspy.InputField() is_factual: bool = dspy.OutputField() factuality_judge = dspy.ChainOfThought(FactualityJudge) def factuality_reward(args, pred: dspy.Prediction) -> float: statement = pred.answer result = factuality_judge(statement) return 1.0 if result.is_factual else 0.0 refined_qa = dspy.Refine( module=dspy.ChainOfThought("question -> answer"), N=3, reward_fn=factuality_reward, threshold=1.0 ) result = refined_qa(question="Tell me about Belgium's capital city.") print(result.answer) ``` ### Summarization - Controlling Response Length ```python import dspy def ideal_length_reward(args, pred: dspy.Prediction) -> float: """ Reward the summary for being close to 75 words with a tapering off for longer summaries. """ word_count = len(pred.summary.split()) distance = abs(word_count - 75) return max(0.0, 1.0 - (distance / 125)) optimized_summarizer = dspy.BestOfN( module=dspy.ChainOfThought("text -> summary"), N=50, reward_fn=ideal_length_reward, threshold=0.9 ) result = optimized_summarizer( text="[Long text to summarize...]" ) print(result.summary) ``` ## Migration from `dspy.Suggest` and `dspy.Assert` `BestOfN` and `Refine` are the replacements for `dspy.Suggest` and `dspy.Assert` as of DSPy 2.6. ``` -------------------------------------------------------------------------------- /tests/predict/test_program_of_thought.py: -------------------------------------------------------------------------------- ```python import shutil from unittest.mock import patch import pytest import dspy from dspy import ProgramOfThought, Signature from dspy.utils import DummyLM # This test suite requires deno to be installed. Please install deno following https://docs.deno.com/runtime/getting_started/installation/ is_deno_available = shutil.which("deno") is not None class BasicQA(Signature): question = dspy.InputField() answer = dspy.OutputField(desc="often between 1 and 5 words") @pytest.mark.skipif(not is_deno_available, reason="Deno is not installed or not in PATH") def test_pot_code_generation(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nresult = 1+1\nfinal_answer({'answer': result})\n```", }, {"reasoning": "Reason_B", "answer": "2"}, ] ) dspy.settings.configure(lm=lm) pot = ProgramOfThought(BasicQA) res = pot(question="What is 1+1?") assert res.answer == "2" assert pot.interpreter.deno_process is None # This test ensures the old finetuned saved models still work @pytest.mark.skipif(not is_deno_available, reason="Deno is not installed or not in PATH") def test_old_style_pot(): lm = DummyLM( [ {"reasoning": "Reason_A", "generated_code": "```python\nresult = 1+1\n```"}, {"reasoning": "Reason_B", "answer": "2"}, ] ) dspy.settings.configure(lm=lm) pot = ProgramOfThought(BasicQA) res = pot(question="What is 1+1?") assert res.answer == "2" assert pot.interpreter.deno_process is None class ExtremumFinder(Signature): input_list = dspy.InputField() maximum = dspy.OutputField(desc="The maximum of the given numbers") minimum = dspy.OutputField(desc="The minimum of the given numbers") @pytest.mark.skipif(not is_deno_available, reason="Deno is not installed or not in PATH") def test_pot_support_multiple_fields(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nmaximum = 6\nminimum = 2\nfinal_answer({'maximum': maximum, 'minimum': minimum})\n```", }, {"reasoning": "Reason_B", "maximum": "6", "minimum": "2"}, ] ) dspy.settings.configure(lm=lm) pot = ProgramOfThought(ExtremumFinder) res = pot(input_list="2, 3, 5, 6") assert res.maximum == "6" assert res.minimum == "2" assert pot.interpreter.deno_process is None @pytest.mark.skipif(not is_deno_available, reason="Deno is not installed or not in PATH") def test_pot_code_generation_with_one_error(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nresult = 1+0/0\nfinal_answer({'answer': result})\n```", }, { "reasoning": "Reason_B", "generated_code": "```python\nresult = 1+1\nfinal_answer({'answer': result})\n```", }, {"reasoning": "Reason_C", "answer": "2"}, ] ) dspy.settings.configure(lm=lm) pot = ProgramOfThought(BasicQA) res = pot(question="What is 1+1?") assert res.answer == "2" assert pot.interpreter.deno_process is None @pytest.mark.skipif(not is_deno_available, reason="Deno is not installed or not in PATH") def test_pot_code_generation_persistent_errors(): max_iters = 3 lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nresult = 1+0/0\nfinal_answer({'answer': result})\n```", }, ] * max_iters ) dspy.settings.configure(lm=lm) pot = ProgramOfThought(BasicQA, max_iters=max_iters) with pytest.raises(RuntimeError, match="Max hops reached. Failed to run ProgramOfThought: ZeroDivisionError:"): pot(question="What is 1+1?") assert pot.interpreter.deno_process is None def test_pot_code_parse_error(): max_iters = 3 lm = DummyLM( [ {"reasoning": "Reason_A", "generated_code": "```python\ninvalid=python=code\n```"}, ] * max_iters ) dspy.settings.configure(lm=lm) pot = ProgramOfThought(BasicQA, max_iters=max_iters) with ( patch("dspy.predict.program_of_thought.ProgramOfThought._execute_code") as mock_execute_code, pytest.raises( RuntimeError, match="Max hops reached. Failed to run ProgramOfThought: Error: Code format is not correct." ), ): pot(question="What is 1+1?") mock_execute_code.assert_not_called() ``` -------------------------------------------------------------------------------- /tests/examples/test_baleen.py: -------------------------------------------------------------------------------- ```python import dspy import dspy.evaluate from dspy.datasets import HotPotQA from dspy.dsp.utils import deduplicate from dspy.evaluate.evaluate import Evaluate from dspy.teleprompt.bootstrap import BootstrapFewShot class GenerateAnswer(dspy.Signature): """Answer questions with short factoid answers.""" context = dspy.InputField(desc="may contain relevant facts") question = dspy.InputField() answer = dspy.OutputField(desc="often between 1 and 5 words") class GenerateSearchQuery(dspy.Signature): """Write a simple search query that will help answer a complex question.""" context = dspy.InputField(desc="may contain relevant facts") question = dspy.InputField() query = dspy.OutputField() class SimplifiedBaleen(dspy.Module): def __init__(self, passages_per_hop=3, max_hops=2): super().__init__() self.generate_query = [dspy.ChainOfThought(GenerateSearchQuery) for _ in range(max_hops)] self.retrieve = dspy.Retrieve(k=passages_per_hop) self.generate_answer = dspy.ChainOfThought(GenerateAnswer) self.max_hops = max_hops def forward(self, question): context = [] for hop in range(self.max_hops): query = self.generate_query[hop](context=context, question=question).query passages = self.retrieve(query).passages context = deduplicate(context + passages) pred = self.generate_answer(context=context, question=question) return dspy.Prediction(context=context, answer=pred.answer) def load_hotpotqa(): # Load the dataset. dataset = HotPotQA(train_seed=1, train_size=20, eval_seed=2023, dev_size=50, test_size=0) # Tell DSPy that the 'question' field is the input. Any other fields are labels and/or metadata. trainset = [x.with_inputs("question") for x in dataset.train] devset = [x.with_inputs("question") for x in dataset.dev] return trainset, devset # @pytest.mark.slow_test # TODO: Find a way to make this test run without openai def _test_baleen(): lm = dspy.OpenAI(model="gpt-3.5-turbo") rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts") dspy.settings.configure(lm=lm, rm=rm) # Ask any question you like to this simple RAG program. my_question = "How many storeys are in the castle that David Gregory inherited?" # Get the prediction. This contains `pred.context` and `pred.answer`. uncompiled_baleen = SimplifiedBaleen() # uncompiled (i.e., zero-shot) program pred = uncompiled_baleen(my_question) assert pred.answer == "five" def validate_context_and_answer_and_hops(example, pred, trace=None): if not dspy.evaluate.answer_exact_match(example, pred): return False if not dspy.evaluate.answer_passage_match(example, pred): return False hops = [example.question] + [outputs.query for *_, outputs in trace if "query" in outputs] if max([len(h) for h in hops]) > 100: return False if any(dspy.evaluate.answer_exact_match_str(hops[idx], hops[:idx], frac=0.8) for idx in range(2, len(hops))): return False return True def gold_passages_retrieved(example, pred, trace=None): gold_titles = set(map(dspy.evaluate.normalize_text, example["gold_titles"])) found_titles = set(map(dspy.evaluate.normalize_text, [c.split(" | ")[0] for c in pred.context])) return gold_titles.issubset(found_titles) # @pytest.mark.slow_test # TODO: Find a way to make this test run without the slow hotpotqa dataset def _test_compiled_baleen(): trainset, devset = load_hotpotqa() lm = dspy.OpenAI(model="gpt-3.5-turbo") rm = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts") dspy.settings.configure(lm=lm, rm=rm) uncompiled_baleen = SimplifiedBaleen() # uncompiled (i.e., zero-shot) program teleprompter = BootstrapFewShot(metric=validate_context_and_answer_and_hops) compiled_baleen = teleprompter.compile( SimplifiedBaleen(), teacher=SimplifiedBaleen(passages_per_hop=2), trainset=trainset, ) evaluate_on_hotpotqa = Evaluate(devset=devset, num_threads=1, display_progress=True, display_table=5) uncompiled_baleen_retrieval_score = evaluate_on_hotpotqa( uncompiled_baleen, metric=gold_passages_retrieved, display=False ) # assert uncompiled_baleen_retrieval_score / 100 == 18 / 50 compiled_baleen_retrieval_score = evaluate_on_hotpotqa(compiled_baleen, metric=gold_passages_retrieved) # assert compiled_baleen_retrieval_score / 100 == 27 / 50 assert uncompiled_baleen_retrieval_score < compiled_baleen_retrieval_score ``` -------------------------------------------------------------------------------- /tests/utils/test_saving.py: -------------------------------------------------------------------------------- ```python import logging from unittest.mock import patch import pytest import dspy from dspy.utils import DummyLM def test_save_predict(tmp_path): predict = dspy.Predict("question->answer") predict.save(tmp_path, save_program=True) assert (tmp_path / "metadata.json").exists() assert (tmp_path / "program.pkl").exists() loaded_predict = dspy.load(tmp_path) assert isinstance(loaded_predict, dspy.Predict) assert predict.signature == loaded_predict.signature def test_save_custom_model(tmp_path): class CustomModel(dspy.Module): def __init__(self): self.cot1 = dspy.ChainOfThought("question->refined_question") self.cot2 = dspy.ChainOfThought("refined_question->answer") model = CustomModel() model.save(tmp_path, save_program=True) loaded_model = dspy.load(tmp_path) assert isinstance(loaded_model, CustomModel) assert len(model.predictors()) == len(loaded_model.predictors()) for predictor, loaded_predictor in zip(model.predictors(), loaded_model.predictors(), strict=False): assert predictor.signature == loaded_predictor.signature def test_save_model_with_custom_signature(tmp_path): import datetime class MySignature(dspy.Signature): """Just a custom signature.""" current_date: datetime.date = dspy.InputField() target_date: datetime.date = dspy.InputField() date_diff: int = dspy.OutputField(desc="The difference in days between the current_date and the target_date") predict = dspy.Predict(MySignature) predict.signature = predict.signature.with_instructions("You are a helpful assistant.") predict.save(tmp_path, save_program=True) loaded_predict = dspy.load(tmp_path) assert isinstance(loaded_predict, dspy.Predict) assert predict.signature == loaded_predict.signature @pytest.mark.extra def test_save_compiled_model(tmp_path): predict = dspy.Predict("question->answer") dspy.settings.configure(lm=DummyLM([{"answer": "blue"}, {"answer": "white"}] * 10)) trainset = [ {"question": "What is the color of the sky?", "answer": "blue"}, {"question": "What is the color of the ocean?", "answer": "blue"}, {"question": "What is the color of the milk?", "answer": "white"}, {"question": "What is the color of the coffee?", "answer": "black"}, ] trainset = [dspy.Example(**example).with_inputs("question") for example in trainset] def dummy_metric(example, pred, trace=None): return True optimizer = dspy.BootstrapFewShot(max_bootstrapped_demos=4, max_labeled_demos=4, max_rounds=5, metric=dummy_metric) compiled_predict = optimizer.compile(predict, trainset=trainset) compiled_predict.save(tmp_path, save_program=True) loaded_predict = dspy.load(tmp_path) assert compiled_predict.demos == loaded_predict.demos assert compiled_predict.signature == loaded_predict.signature def test_load_with_version_mismatch(tmp_path): from dspy.utils.saving import logger # Mock versions during save save_versions = {"python": "3.9", "dspy": "2.4.0", "cloudpickle": "2.0"} # Mock versions during load load_versions = {"python": "3.10", "dspy": "2.5.0", "cloudpickle": "2.1"} predict = dspy.Predict("question->answer") # Create a custom handler to capture log messages class ListHandler(logging.Handler): def __init__(self): super().__init__() self.messages = [] def emit(self, record): self.messages.append(record.getMessage()) # Add handler and set level handler = ListHandler() original_level = logger.level logger.addHandler(handler) logger.setLevel(logging.WARNING) try: # Mock version during save with patch("dspy.primitives.base_module.get_dependency_versions", return_value=save_versions): predict.save(tmp_path, save_program=True) # Mock version during load with patch("dspy.utils.saving.get_dependency_versions", return_value=load_versions): loaded_predict = dspy.load(tmp_path) # Assert warnings were logged, and one warning for each mismatched dependency. assert len(handler.messages) == 3 for msg in handler.messages: assert "There is a mismatch of" in msg # Verify the model still loads correctly despite version mismatches assert isinstance(loaded_predict, dspy.Predict) assert predict.signature == loaded_predict.signature finally: # Clean up: restore original level and remove handler logger.setLevel(original_level) logger.removeHandler(handler) ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/mcp.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 3 --- # Model Context Protocol (MCP) The [Model Context Protocol (MCP)](https://modelcontextprotocol.io/) is an open protocol that standardizes how applications provide context to language models. DSPy supports MCP, allowing you to use tools from any MCP server with DSPy agents. ## Installation Install DSPy with MCP support: ```bash pip install -U "dspy[mcp]" ``` ## Overview MCP enables you to: - **Use standardized tools** - Connect to any MCP-compatible server. - **Share tools across stacks** - Use the same tools across different frameworks. - **Simplify integration** - Convert MCP tools to DSPy tools with one line. DSPy does not handle MCP server connections directly. You can use client interfaces of the `mcp` library to establish the connection and pass `mcp.ClientSession` to `dspy.Tool.from_mcp_tool` in order to convert mcp tools into DSPy tools. ## Using MCP with DSPy ### 1. HTTP Server (Remote) For remote MCP servers over HTTP, use the streamable HTTP transport: ```python import asyncio import dspy from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client async def main(): # Connect to HTTP MCP server async with streamablehttp_client("http://localhost:8000/mcp") as (read, write): async with ClientSession(read, write) as session: # Initialize the session await session.initialize() # List and convert tools response = await session.list_tools() dspy_tools = [ dspy.Tool.from_mcp_tool(session, tool) for tool in response.tools ] # Create and use ReAct agent class TaskSignature(dspy.Signature): task: str = dspy.InputField() result: str = dspy.OutputField() react_agent = dspy.ReAct( signature=TaskSignature, tools=dspy_tools, max_iters=5 ) result = await react_agent.acall(task="Check the weather in Tokyo") print(result.result) asyncio.run(main()) ``` ### 2. Stdio Server (Local Process) The most common way to use MCP is with a local server process communicating via stdio: ```python import asyncio import dspy from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client async def main(): # Configure the stdio server server_params = StdioServerParameters( command="python", # Command to run args=["path/to/your/mcp_server.py"], # Server script path env=None, # Optional environment variables ) # Connect to the server async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: # Initialize the session await session.initialize() # List available tools response = await session.list_tools() # Convert MCP tools to DSPy tools dspy_tools = [ dspy.Tool.from_mcp_tool(session, tool) for tool in response.tools ] # Create a ReAct agent with the tools class QuestionAnswer(dspy.Signature): """Answer questions using available tools.""" question: str = dspy.InputField() answer: str = dspy.OutputField() react_agent = dspy.ReAct( signature=QuestionAnswer, tools=dspy_tools, max_iters=5 ) # Use the agent result = await react_agent.acall( question="What is 25 + 17?" ) print(result.answer) # Run the async function asyncio.run(main()) ``` ## Tool Conversion DSPy automatically handles the conversion from MCP tools to DSPy tools: ```python # MCP tool from session mcp_tool = response.tools[0] # Convert to DSPy tool dspy_tool = dspy.Tool.from_mcp_tool(session, mcp_tool) # The DSPy tool preserves: # - Tool name and description # - Parameter schemas and types # - Argument descriptions # - Async execution support # Use it like any DSPy tool result = await dspy_tool.acall(param1="value", param2=123) ``` ## Learn More - [MCP Official Documentation](https://modelcontextprotocol.io/) - [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk) - [DSPy MCP Tutorial](https://dspy.ai/tutorials/mcp/) - [DSPy Tools Documentation](./tools.md) MCP integration in DSPy makes it easy to use standardized tools from any MCP server, enabling powerful agent capabilities with minimal setup. ``` -------------------------------------------------------------------------------- /.github/workflows/run_tests.yml: -------------------------------------------------------------------------------- ```yaml name: Lint, Test, and Build on: push: branches: - main pull_request: types: [opened, synchronize, reopened] jobs: fix: name: Check Ruff Fix runs-on: ubuntu-latest permissions: contents: write pull-requests: write steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: "3.11" - name: Install uv with caching uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-dependency-glob: | **/pyproject.toml **/uv.lock - name: Create and activate virtual environment run: | uv venv .venv echo "${{ github.workspace }}/.venv/bin" >> $GITHUB_PATH - name: Install dependencies run: uv sync --dev -p .venv --extra dev - name: Ruff Check run: | ruff check --fix-only --diff --exit-non-zero-on-fix || ( echo "" echo "❌ Ruff found issues that can be fixed automatically." echo "💡 To fix them locally, run:" echo "" echo " pre-commit run --all-files" echo "" echo "Then commit and push the changes." exit 1 ) test: name: Run Tests runs-on: ubuntu-latest strategy: matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install Deno run: | curl -fsSL https://deno.land/install.sh | sh echo "${HOME}/.deno/bin" >> $GITHUB_PATH - name: Verify Deno installation run: deno --version - name: Install uv with caching uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-dependency-glob: | **/pyproject.toml **/uv.lock - name: Create and activate virtual environment run: | uv venv .venv echo "${{ github.workspace }}/.venv/bin" >> $GITHUB_PATH - name: Install dependencies run: | uv sync --dev -p .venv --extra dev uv pip list - name: Run lint with tests uses: chartboost/ruff-action@v1 with: args: check --fix-only - name: Run tests with pytest run: uv run -p .venv pytest -vv tests/ - name: Install optional dependencies run: uv sync -p .venv --extra dev --extra test_extras - name: Run extra tests run: uv run -p .venv pytest tests/ -m extra --extra llm_call_test: name: Run Tests with Real LM runs-on: ubuntu-latest services: ollama: image: ollama/ollama:latest ports: - 11434:11434 steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: 3.11 - name: Install uv with caching uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-dependency-glob: | **/pyproject.toml **/uv.lock - name: Create and activate virtual environment run: | uv venv .venv echo "${{ github.workspace }}/.venv/bin" >> $GITHUB_PATH - name: Install dependencies run: | uv sync --dev -p .venv --extra dev uv pip list - name: Pull LLM run: | timeout 60 bash -c 'until curl -f http://localhost:11434/api/version; do sleep 2; done' curl -X POST http://localhost:11434/api/pull \ -H "Content-Type: application/json" \ -d '{"name": "llama3.2:3b"}' echo "LM_FOR_TEST=ollama/llama3.2:3b" >> $GITHUB_ENV - name: Run tests run: uv run -p .venv pytest -m llm_call --llm_call -vv --durations=5 tests/ build_package: name: Build Package runs-on: ubuntu-latest strategy: matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install uv with caching uses: astral-sh/setup-uv@v5 with: enable-cache: true cache-dependency-glob: | **/pyproject.toml **/uv.lock - name: Create and activate virtual environment run: | uv venv .venv echo "${{ github.workspace }}/.venv/bin" >> $GITHUB_PATH - name: Install dependencies run: uv sync --dev -p .venv --extra dev - name: Build run: uv run -p .venv python -m build - name: Install built package run: uv pip install dist/*.whl -p .venv - name: Test import dspy run: uv run -p .venv python -c "import dspy" ``` -------------------------------------------------------------------------------- /dspy/evaluate/auto_evaluation.py: -------------------------------------------------------------------------------- ```python from dspy.predict.chain_of_thought import ChainOfThought from dspy.primitives import Module from dspy.signatures import InputField, OutputField, Signature class SemanticRecallPrecision(Signature): """ Compare a system's response to the ground truth to compute its recall and precision. If asked to reason, enumerate key ideas in each response, and whether they are present in the other response. """ question: str = InputField() ground_truth: str = InputField() system_response: str = InputField() recall: float = OutputField(desc="fraction (out of 1.0) of ground truth covered by the system response") precision: float = OutputField(desc="fraction (out of 1.0) of system response covered by the ground truth") class DecompositionalSemanticRecallPrecision(Signature): """ Compare a system's response to the ground truth to compute recall and precision of key ideas. You will first enumerate key ideas in each response, discuss their overlap, and then report recall and precision. """ question: str = InputField() ground_truth: str = InputField() system_response: str = InputField() ground_truth_key_ideas: str = OutputField(desc="enumeration of key ideas in the ground truth") system_response_key_ideas: str = OutputField(desc="enumeration of key ideas in the system response") discussion: str = OutputField(desc="discussion of the overlap between ground truth and system response") recall: float = OutputField(desc="fraction (out of 1.0) of ground truth covered by the system response") precision: float = OutputField(desc="fraction (out of 1.0) of system response covered by the ground truth") def f1_score(precision, recall): precision, recall = max(0.0, min(1.0, precision)), max(0.0, min(1.0, recall)) return 0.0 if precision + recall == 0 else 2 * (precision * recall) / (precision + recall) class SemanticF1(Module): def __init__(self, threshold=0.66, decompositional=False): self.threshold = threshold if decompositional: self.module = ChainOfThought(DecompositionalSemanticRecallPrecision) else: self.module = ChainOfThought(SemanticRecallPrecision) def forward(self, example, pred, trace=None): scores = self.module(question=example.question, ground_truth=example.response, system_response=pred.response) score = f1_score(scores.precision, scores.recall) return score if trace is None else score >= self.threshold ########### class AnswerCompleteness(Signature): """ Estimate the completeness of a system's responses, against the ground truth. You will first enumerate key ideas in each response, discuss their overlap, and then report completeness. """ question: str = InputField() ground_truth: str = InputField() system_response: str = InputField() ground_truth_key_ideas: str = OutputField(desc="enumeration of key ideas in the ground truth") system_response_key_ideas: str = OutputField(desc="enumeration of key ideas in the system response") discussion: str = OutputField(desc="discussion of the overlap between ground truth and system response") completeness: float = OutputField(desc="fraction (out of 1.0) of ground truth covered by the system response") class AnswerGroundedness(Signature): """ Estimate the groundedness of a system's responses, against real retrieved documents written by people. You will first enumerate whatever non-trivial or check-worthy claims are made in the system response, and then discuss the extent to which some or all of them can be deduced from the retrieved context and basic commonsense. """ question: str = InputField() retrieved_context: str = InputField() system_response: str = InputField() system_response_claims: str = OutputField(desc="enumeration of non-trivial or check-worthy claims in the system response") discussion: str = OutputField(desc="discussion of how supported the claims are by the retrieved context") groundedness: float = OutputField(desc="fraction (out of 1.0) of system response supported by the retrieved context") class CompleteAndGrounded(Module): def __init__(self, threshold=0.66): self.threshold = threshold self.completeness_module = ChainOfThought(AnswerCompleteness) self.groundedness_module = ChainOfThought(AnswerGroundedness) def forward(self, example, pred, trace=None): completeness = self.completeness_module(question=example.question, ground_truth=example.response, system_response=pred.response) groundedness = self.groundedness_module(question=example.question, retrieved_context=pred.context, system_response=pred.response) score = f1_score(groundedness.groundedness, completeness.completeness) return score if trace is None else score >= self.threshold ``` -------------------------------------------------------------------------------- /tests/predict/test_code_act.py: -------------------------------------------------------------------------------- ```python import shutil import pytest import dspy from dspy import Signature from dspy.predict import CodeAct from dspy.utils import DummyLM # This test suite requires deno to be installed. Please install deno following https://docs.deno.com/runtime/getting_started/installation/ is_deno_available = shutil.which("deno") is not None skip_if_deno_not_available = pytest.mark.skipif( not is_deno_available, reason="Deno is not installed or not in PATH" ) class BasicQA(Signature): question = dspy.InputField() answer = dspy.OutputField(desc="often between 1 and 5 words") def add(a: float, b: float) -> float: "add two numbers" return a + b @skip_if_deno_not_available def test_codeact_code_generation(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nresult = add(1,1)\nprint(result)\n```", "finished": True, }, {"reasoning": "Reason_B", "answer": "2"}, ] ) dspy.settings.configure(lm=lm) program = CodeAct(BasicQA, tools=[add]) res = program(question="What is 1+1?") assert res.answer == "2" assert res.trajectory == { "code_output_0": '"2\\n"', "generated_code_0": "result = add(1,1)\nprint(result)", } assert program.interpreter.deno_process is None class ExtremumFinder(Signature): input_list = dspy.InputField() maximum = dspy.OutputField(desc="The maximum of the given numbers") minimum = dspy.OutputField(desc="The minimum of the given numbers") def extract_maximum_minimum(input_list: str) -> dict[str, float]: numbers = list(map(float, input_list.split(","))) return {"maximum": max(numbers), "minimum": min(numbers)} @skip_if_deno_not_available def test_codeact_support_multiple_fields(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nresult = extract_maximum_minimum('2, 3, 5, 6')\nprint(result)\n```", "finished": True, }, {"reasoning": "Reason_B", "maximum": "6", "minimum": "2"}, ] ) dspy.settings.configure(lm=lm) program = CodeAct(ExtremumFinder, tools=[extract_maximum_minimum]) res = program(input_list="2, 3, 5, 6") assert res.maximum == "6" assert res.minimum == "2" assert res.trajectory == { "code_output_0": '"{\'maximum\': 6.0, \'minimum\': 2.0}\\n"', "generated_code_0": "result = extract_maximum_minimum('2, 3, 5, 6')\nprint(result)", } assert program.interpreter.deno_process is None @skip_if_deno_not_available def test_codeact_code_parse_failure(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nparse(error\n```", "finished": False, }, { "reasoning": "Reason_A", "generated_code": "```python\nresult = add(1,1)\nprint(result)\n```", "finished": True, }, {"reasoning": "Reason_B", "answer": "2"}, ] ) dspy.settings.configure(lm=lm) program = CodeAct(BasicQA, tools=[add]) res = program(question="What is 1+1?") assert res.answer == "2" assert res.trajectory == { "generated_code_0": "parse(error", "observation_0": "Failed to execute the generated code: Invalid Python syntax. message: ", "generated_code_1": "result = add(1,1)\nprint(result)", "code_output_1": '"2\\n"', } assert program.interpreter.deno_process is None @skip_if_deno_not_available def test_codeact_code_execution_failure(): lm = DummyLM( [ { "reasoning": "Reason_A", "generated_code": "```python\nunknown+1\n```", "finished": False, }, { "reasoning": "Reason_A", "generated_code": "```python\nresult = add(1,1)\nprint(result)\n```", "finished": True, }, {"reasoning": "Reason_B", "answer": "2"}, ] ) dspy.settings.configure(lm=lm) program = CodeAct(BasicQA, tools=[add]) res = program(question="What is 1+1?") assert res.answer == "2" assert res.trajectory == { "generated_code_0": "unknown+1", "observation_0": 'Failed to execute the generated code: NameError: ["name \'unknown\' is not defined"]', "generated_code_1": "result = add(1,1)\nprint(result)", "code_output_1": '"2\\n"', } assert program.interpreter.deno_process is None class CustomTool: def __call__(self, a: float, b: float) -> float: return a + b @skip_if_deno_not_available def test_codeact_tool_validation(): with pytest.raises(ValueError, match="CodeAct only accepts functions and not callable objects."): CodeAct(BasicQA, tools=[CustomTool()]) ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [build-system] requires = ["setuptools>=77.0.1"] build-backend = "setuptools.build_meta" [project] # Do not add spaces around the '=' sign for any of the fields # preceded by a marker comment as it affects the publish workflow. #replace_package_name_marker name="dspy" #replace_package_version_marker version="3.0.4b1" description = "DSPy" readme = "README.md" authors = [{ name = "Omar Khattab", email = "[email protected]" }] license = {file = "LICENSE"} requires-python = ">=3.10, <3.14" classifiers = [ "Development Status :: 3 - Alpha", "Intended Audience :: Science/Research", "Operating System :: POSIX :: Linux", "Programming Language :: Python :: 3" ] dependencies = [ "backoff>=2.2", "joblib~=1.3", "openai>=0.28.1", "regex>=2023.10.3", "orjson>=3.9.0", "tqdm>=4.66.1", "requests>=2.31.0", "optuna>=3.4.0", "pydantic>=2.0", "magicattr>=0.1.6", "litellm>=1.64.0", "diskcache>=5.6.0", "json-repair>=0.30.0", "tenacity>=8.2.3", "anyio", "asyncer==0.0.8", "cachetools>=5.5.0", "cloudpickle>=3.0.0", "rich>=13.7.1", "pillow>=10.1.0", "numpy>=1.26.0", "xxhash>=3.5.0", "gepa[dspy]==0.0.17", ] [project.optional-dependencies] anthropic = ["anthropic>=0.18.0,<1.0.0"] weaviate = ["weaviate-client~=4.5.4"] mcp = ["mcp; python_version >= '3.10'"] langchain = ["langchain_core"] dev = [ "pytest>=6.2.5", "pytest-mock>=3.12.0", "pytest-asyncio>=0.26.0", "ruff>=0.3.0", "pre-commit>=3.7.0", "pillow>=10.1.0", "datamodel_code_generator>=0.26.3", "build>=1.0.3", "litellm>=1.64.0; sys_platform == 'win32'", "litellm[proxy]>=1.64.0; sys_platform != 'win32'", ] test_extras = [ "mcp; python_version >= '3.10'", "datasets>=2.14.6", "pandas>=2.1.1", "optuna>=3.4.0", "langchain_core", ] [tool.setuptools.packages.find] where = ["."] include = ["dspy", "dspy.*"] exclude = ["tests", "tests.*"] [tool.setuptools.package-data] dspy = ["primitives/*.js"] [project.urls] homepage = "https://github.com/stanfordnlp/dspy" [tool.coverage.run] branch = true omit = [ "*/__init__.py", "*/test_*.py", "*/tests/*.py", "*/conftest.py", "*/venv/*", "*/virtualenv/*", "*/.venv/*", "*/.virtualenv/*", "*/env/*", "*/.env/*", "*/setup.py", ] [tool.coverage.report] exclude_lines = [ "pragma: no cover", "def __repr__", "if self.debug:", "raise AssertionError", "raise NotImplementedError", "if __name__ == '__main__':", "logger", "try", "except", "^\\s*self\\.\\w+(:\\s*[^=]+)?\\s*=.*$", "continue", ] [tool.pytest.ini_options] filterwarnings = [ # litellm uses deprecated pydantic config classes sometimes. # The issue has been fixed repeatedly, but still keeps showing up. # For examples, see litellm PRs #6903, #7300, #8096, #9372, and #12528. "ignore:.+class-based `config` is deprecated, use ConfigDict:DeprecationWarning", ] [tool.ruff] include = ["dspy/**/*.py", "tests/**/*.py"] exclude = [ "dspy/__metadata__.py", "tests/reliability/*.py", ] line-length = 120 indent-width = 4 target-version = "py310" [tool.ruff.lint] select = [ "E", # pycodestyle errors "W", # pycodestyle warnings "F", # pyflakes "I", # isort "C", # flake8-comprehensions "B", # flake8-bugbear "UP", # pyupgrade "N", # pep8-naming "RUF", # ruff-specific rules "Q", # flake8-quotes ] ignore = [ "B027", # Allow non-abstract empty methods in abstract base classes "FBT003",# Allow boolean positional values in function calls "C901", # Ignore complexity checking "E501", # Ignore line length errors (handled by formatter) "UP035", # Allow python typing modules "RUF005", # Allow using + operator to concatenate collections "B904", # Allow raise custom exceptions in except blocks "F403", # Allow wildcard imports "E721", # Allow using == to compare with type "UP031", # Allow percent format "RUF022", # Allow unsorted __all__ value ] # Allow fix for all enabled rules (when `--fix`) is provided. fixable = ["ALL"] unfixable = [] [tool.ruff.format] docstring-code-format = false quote-style = "double" indent-style = "space" skip-magic-trailing-comma = false line-ending = "auto" [tool.ruff.lint.isort] known-first-party = ["dspy"] [tool.ruff.lint.flake8-tidy-imports] ban-relative-imports = "all" [tool.ruff.lint.per-file-ignores] "tests/**/*.py" = [ "S101", # Allow assert statements in tests "TID252", # Allow relative imports in tests "ARG001", # Allow unused arguments in tests (like pytest fixtures) ] "__init__.py" = ["F401"] # Init files can be empty "dspy/__init__.py" = [ "I001", # Allow unsorted or unformatted imports (isort) "E402", # Allow imports not at the top of the file (needed for certain __init__ patterns) "F405", # Allow undefined names from wildcard imports (common in __init__ files) ] ``` -------------------------------------------------------------------------------- /tests/teleprompt/test_bootstrap.py: -------------------------------------------------------------------------------- ```python import pytest import dspy from dspy import Example from dspy.predict import Predict from dspy.teleprompt import BootstrapFewShot from dspy.utils.dummies import DummyLM # Define a simple metric function for testing def simple_metric(example, prediction, trace=None): # Simplified metric for testing: true if prediction matches expected output return example.output == prediction.output examples = [ Example(input="What is the color of the sky?", output="blue").with_inputs("input"), Example(input="What does the fox say?", output="Ring-ding-ding-ding-dingeringeding!"), ] trainset = [examples[0]] valset = [examples[1]] def test_bootstrap_initialization(): # Initialize BootstrapFewShot with a dummy metric and minimal setup bootstrap = BootstrapFewShot(metric=simple_metric, max_bootstrapped_demos=1, max_labeled_demos=1) assert bootstrap.metric == simple_metric, "Metric not correctly initialized" class SimpleModule(dspy.Module): def __init__(self, signature): super().__init__() self.predictor = Predict(signature) def forward(self, **kwargs): return self.predictor(**kwargs) def test_compile_with_predict_instances(): # Create Predict instances for student and teacher # Note that dspy.Predict is not itself a module, so we can't use it directly here student = SimpleModule("input -> output") teacher = SimpleModule("input -> output") lm = DummyLM(["Initial thoughts", "Finish[blue]"]) dspy.settings.configure(lm=lm) # Initialize BootstrapFewShot and compile the student bootstrap = BootstrapFewShot(metric=simple_metric, max_bootstrapped_demos=1, max_labeled_demos=1) compiled_student = bootstrap.compile(student, teacher=teacher, trainset=trainset) assert compiled_student is not None, "Failed to compile student" assert hasattr(compiled_student, "_compiled") and compiled_student._compiled, "Student compilation flag not set" def test_bootstrap_effectiveness(): # This test verifies if the bootstrapping process improves the student's predictions student = SimpleModule("input -> output") teacher = SimpleModule("input -> output") lm = DummyLM([{"output": "blue"}, {"output": "Ring-ding-ding-ding-dingeringeding!"}], follow_examples=True) dspy.settings.configure(lm=lm, trace=[]) bootstrap = BootstrapFewShot(metric=simple_metric, max_bootstrapped_demos=1, max_labeled_demos=1) compiled_student = bootstrap.compile(student, teacher=teacher, trainset=trainset) # Check that the compiled student has the correct demos assert len(compiled_student.predictor.demos) == 1 assert compiled_student.predictor.demos[0].input == trainset[0].input assert compiled_student.predictor.demos[0].output == trainset[0].output # Test the compiled student's prediction. # We are using a DummyLM with follow_examples=True, which means that # even though it would normally reply with "Ring-ding-ding-ding-dingeringeding!" # on the second output, if it seems an example that perfectly matches the # prompt, it will use that instead. That is why we expect "blue" here. prediction = compiled_student(input=trainset[0].input) assert prediction.output == trainset[0].output def test_error_handling_during_bootstrap(): """ Test to verify error handling during the bootstrapping process """ class BuggyModule(dspy.Module): def __init__(self, signature): super().__init__() self.predictor = Predict(signature) def forward(self, **kwargs): raise RuntimeError("Simulated error") student = SimpleModule("input -> output") teacher = BuggyModule("input -> output") # Setup DummyLM to simulate an error scenario lm = DummyLM( [ {"output": "Initial thoughts"}, # Simulate initial teacher's prediction ] ) dspy.settings.configure(lm=lm) bootstrap = BootstrapFewShot( metric=simple_metric, max_bootstrapped_demos=1, max_labeled_demos=1, max_errors=1, ) with pytest.raises(RuntimeError, match="Simulated error"): bootstrap.compile(student, teacher=teacher, trainset=trainset) def test_validation_set_usage(): """ Test to ensure the validation set is correctly used during bootstrapping """ student = SimpleModule("input -> output") teacher = SimpleModule("input -> output") lm = DummyLM( [ {"output": "Initial thoughts"}, {"output": "Finish[blue]"}, # Expected output for both training and validation ] ) dspy.settings.configure(lm=lm) bootstrap = BootstrapFewShot(metric=simple_metric, max_bootstrapped_demos=1, max_labeled_demos=1) compiled_student = bootstrap.compile(student, teacher=teacher, trainset=trainset) # Check that validation examples are part of student's demos after compilation assert len(compiled_student.predictor.demos) >= len(valset), "Validation set not used in compiled student demos" ``` -------------------------------------------------------------------------------- /dspy/adapters/types/audio.py: -------------------------------------------------------------------------------- ```python import base64 import io import mimetypes import os from typing import Any, Union import pydantic import requests from dspy.adapters.types.base_type import Type try: import soundfile as sf SF_AVAILABLE = True except ImportError: SF_AVAILABLE = False class Audio(Type): data: str audio_format: str model_config = pydantic.ConfigDict( frozen=True, extra="forbid", ) def format(self) -> list[dict[str, Any]]: try: data = self.data except Exception as e: raise ValueError(f"Failed to format audio for DSPy: {e}") return [{ "type": "input_audio", "input_audio": { "data": data, "format": self.audio_format } }] @pydantic.model_validator(mode="before") @classmethod def validate_input(cls, values: Any) -> Any: """ Validate input for Audio, expecting 'data' and 'audio_format' keys in dictionary. """ if isinstance(values, cls): return {"data": values.data, "audio_format": values.audio_format} return encode_audio(values) @classmethod def from_url(cls, url: str) -> "Audio": """ Download an audio file from URL and encode it as base64. """ response = requests.get(url) response.raise_for_status() mime_type = response.headers.get("Content-Type", "audio/wav") if not mime_type.startswith("audio/"): raise ValueError(f"Unsupported MIME type for audio: {mime_type}") audio_format = mime_type.split("/")[1] encoded_data = base64.b64encode(response.content).decode("utf-8") return cls(data=encoded_data, audio_format=audio_format) @classmethod def from_file(cls, file_path: str) -> "Audio": """ Read local audio file and encode it as base64. """ if not os.path.isfile(file_path): raise ValueError(f"File not found: {file_path}") mime_type, _ = mimetypes.guess_type(file_path) if not mime_type or not mime_type.startswith("audio/"): raise ValueError(f"Unsupported MIME type for audio: {mime_type}") with open(file_path, "rb") as file: file_data = file.read() audio_format = mime_type.split("/")[1] encoded_data = base64.b64encode(file_data).decode("utf-8") return cls(data=encoded_data, audio_format=audio_format) @classmethod def from_array( cls, array: Any, sampling_rate: int, format: str = "wav" ) -> "Audio": """ Process numpy-like array and encode it as base64. Uses sampling rate and audio format for encoding. """ if not SF_AVAILABLE: raise ImportError("soundfile is required to process audio arrays.") byte_buffer = io.BytesIO() sf.write( byte_buffer, array, sampling_rate, format=format.upper(), subtype="PCM_16", ) encoded_data = base64.b64encode(byte_buffer.getvalue()).decode("utf-8") return cls(data=encoded_data, audio_format=format) def __str__(self) -> str: return self.serialize_model() def __repr__(self) -> str: length = len(self.data) return f"Audio(data=<AUDIO_BASE_64_ENCODED({length})>, audio_format='{self.audio_format}')" def encode_audio(audio: Union[str, bytes, dict, "Audio", Any], sampling_rate: int = 16000, format: str = "wav") -> dict: """ Encode audio to a dict with 'data' and 'audio_format'. Accepts: local file path, URL, data URI, dict, Audio instance, numpy array, or bytes (with known format). """ if isinstance(audio, dict) and "data" in audio and "audio_format" in audio: return audio elif isinstance(audio, Audio): return {"data": audio.data, "audio_format": audio.audio_format} elif isinstance(audio, str) and audio.startswith("data:audio/"): try: header, b64data = audio.split(",", 1) mime = header.split(";")[0].split(":")[1] audio_format = mime.split("/")[1] return {"data": b64data, "audio_format": audio_format} except Exception as e: raise ValueError(f"Malformed audio data URI: {e}") elif isinstance(audio, str) and os.path.isfile(audio): a = Audio.from_file(audio) return {"data": a.data, "audio_format": a.audio_format} elif isinstance(audio, str) and audio.startswith("http"): a = Audio.from_url(audio) return {"data": a.data, "audio_format": a.audio_format} elif SF_AVAILABLE and hasattr(audio, "shape"): a = Audio.from_array(audio, sampling_rate=sampling_rate, format=format) return {"data": a.data, "audio_format": a.audio_format} elif isinstance(audio, bytes): encoded = base64.b64encode(audio).decode("utf-8") return {"data": encoded, "audio_format": format} else: raise ValueError(f"Unsupported type for encode_audio: {type(audio)}") ``` -------------------------------------------------------------------------------- /dspy/propose/dataset_summary_generator.py: -------------------------------------------------------------------------------- ```python import re import dspy from dspy.propose.utils import strip_prefix class ObservationSummarizer(dspy.Signature): ("""Given a series of observations I have made about my dataset, please summarize them into a brief 2-3 sentence summary which highlights only the most important details.""") observations = dspy.InputField(desc="Observations I have made about my dataset") summary = dspy.OutputField(desc="Two to Three sentence summary of only the most significant highlights of my observations") class DatasetDescriptor(dspy.Signature): ("""Given several examples from a dataset please write observations about trends that hold for most or all of the samples. """ """Some areas you may consider in your observations: topics, content, syntax, conciseness, etc. """ """It will be useful to make an educated guess as to the nature of the task this dataset will enable. Don't be afraid to be creative""") examples = dspy.InputField(desc="Sample data points from the dataset") observations = dspy.OutputField(desc="Somethings that holds true for most or all of the data you observed") class DatasetDescriptorWithPriorObservations(dspy.Signature): ("""Given several examples from a dataset please write observations about trends that hold for most or all of the samples. """ """I will also provide you with a few observations I have already made. Please add your own observations or if you feel the observations are comprehensive say 'COMPLETE' """ """Some areas you may consider in your observations: topics, content, syntax, conciceness, etc. """ """It will be useful to make an educated guess as to the nature of the task this dataset will enable. Don't be afraid to be creative""") examples = dspy.InputField(desc="Sample data points from the dataset") prior_observations = dspy.InputField(desc="Some prior observations I made about the data") observations = dspy.OutputField(desc="Somethings that holds true for most or all of the data you observed or COMPLETE if you have nothing to add") def order_input_keys_in_string(unordered_repr): # Regex pattern to match the input keys structure pattern = r"input_keys=\{([^\}]+)\}" # Function to reorder keys def reorder_keys(match): # Extracting the keys from the match keys_str = match.group(1) # Splitting the keys, stripping extra spaces, and sorting them keys = sorted(key.strip() for key in keys_str.split(",")) # Formatting the sorted keys back into the expected structure return f"input_keys={{{', '.join(keys)}}}" # Using re.sub to find all matches of the pattern and replace them using the reorder_keys function ordered_repr = re.sub(pattern, reorder_keys, unordered_repr) return ordered_repr def create_dataset_summary(trainset, view_data_batch_size, prompt_model, log_file=None, verbose=False): if verbose: print("\nBootstrapping dataset summary (this will be used to generate instructions)...") upper_lim = min(len(trainset), view_data_batch_size) prompt_model = prompt_model if prompt_model else dspy.settings.lm with dspy.settings.context(lm=prompt_model): observation = dspy.Predict(DatasetDescriptor, n=1, temperature=1.0)(examples=order_input_keys_in_string(trainset[0:upper_lim].__repr__())) observations = observation["observations"] if log_file: log_file.write("PRODUCING DATASET SUMMARY\n") skips = 0 try: max_calls = 10 calls = 0 for b in range(view_data_batch_size, len(trainset), view_data_batch_size): calls+=1 if calls >= max_calls: break if verbose: print(f"b: {b}") upper_lim = min(len(trainset), b+view_data_batch_size) with dspy.settings.context(lm=prompt_model): output = dspy.Predict(DatasetDescriptorWithPriorObservations, n=1, temperature=1.0)(prior_observations=observations, examples=order_input_keys_in_string(trainset[b:upper_lim].__repr__())) if len(output["observations"]) >= 8 and output["observations"][:8].upper() == "COMPLETE": skips += 1 if skips >= 5: break continue observations += output["observations"] if log_file: log_file.write(f"observations {observations}\n") except Exception as e: if verbose: print(f"e {e}. using observations from past round for a summary.") if prompt_model: with dspy.settings.context(lm=prompt_model): summary = dspy.Predict(ObservationSummarizer, n=1, temperature=1.0)(observations=observations) else: summary = dspy.Predict(ObservationSummarizer, n=1, temperature=1.0)(observations=observations) if verbose: print(f"summary: {summary}") if log_file: log_file.write(f"summary: {summary}\n") if verbose: print(f"\nGenerated summary: {strip_prefix(summary.summary)}\n") return strip_prefix(summary.summary) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/optimizer_tracking/index.md: -------------------------------------------------------------------------------- ```markdown # Tracking DSPy Optimizers with MLflow This tutorial demonstrates how to use MLflow to track and analyze your DSPy optimization process. MLflow's built-in integration for DSPy provides traceability and debuggability for your DSPy optimization experience. It allows you to understand the intermediate trials during the optimization, store the optimized program and its results, and provides observability into your program execution. Through the autologging capability, MLflow tracks the following information: * **Optimizer Parameters** * Number of few-shot examples * Number of candidates * Other configuration settings * **Program States** * Initial instructions and few-shot examples * Optimized instructions and few-shot examples * Intermediate instructions and few-shot examples during optimization * **Datasets** * Training data used * Evaluation data used * **Performance Progression** * Overall metric progression * Performance at each evaluation step * **Traces** * Program execution traces * Model responses * Intermediate prompts ## Getting Started ### 1. Install MLflow First, install MLflow (version 2.21.1 or later): ```bash pip install mlflow>=2.21.1 ``` ### 2. Start MLflow Tracking Server Let's spin up the MLflow tracking server with the following command. This will start a local server at `http://127.0.0.1:5000/`: ```bash # It is highly recommended to use SQL store when using MLflow tracing mlflow server --backend-store-uri sqlite:///mydb.sqlite ``` ### 3. Enable Autologging Configure MLflow to track your DSPy optimization: ```python import mlflow import dspy # Enable autologging with all features mlflow.dspy.autolog( log_compiles=True, # Track optimization process log_evals=True, # Track evaluation results log_traces_from_compile=True # Track program traces during optimization ) # Configure MLflow tracking mlflow.set_tracking_uri("http://localhost:5000") # Use local MLflow server mlflow.set_experiment("DSPy-Optimization") ``` ### 4. Optimizing Your Program Here's a complete example showing how to track the optimization of a math problem solver: ```python import dspy from dspy.datasets.gsm8k import GSM8K, gsm8k_metric # Configure your language model lm = dspy.LM(model="openai/gpt-4o") dspy.configure(lm=lm) # Load dataset gsm8k = GSM8K() trainset, devset = gsm8k.train, gsm8k.dev # Define your program program = dspy.ChainOfThought("question -> answer") # Create and run optimizer with tracking teleprompter = dspy.teleprompt.MIPROv2( metric=gsm8k_metric, auto="light", ) # The optimization process will be automatically tracked optimized_program = teleprompter.compile( program, trainset=trainset, ) ``` ### 5. Viewing Results Once your optimization is complete, you can analyze the results through MLflow's UI. Let's walk through how to explore your optimization runs. #### Step 1: Access the MLflow UI Navigate to `http://localhost:5000` in your web browser to access the MLflow tracking server UI. #### Step 2: Understanding the Experiment Structure When you open the experiment page, you'll see a hierarchical view of your optimization process. The parent run represents your overall optimization process, while the child runs show each intermediate version of your program that was created during optimization.  #### Step 3: Analyzing the Parent Run Clicking on the parent run reveals the big picture of your optimization process. You'll find detailed information about your optimizer's configuration parameters and how your evaluation metrics progressed over time. The parent run also stores your final optimized program, including the instructions, signature definitions, and few-shot examples that were used. Additionally, you can review the training data that was used during the optimization process.  #### Step 4: Examining Child Runs Each child run provides a detailed snapshot of a specific optimization attempt. When you select a child run from the experiment page, you can explore several aspects of that particular intermediate program. On the run parameter tab or artifact tab, you can review the instructions and few-shot examples used for the intermediate program. One of the most powerful features is the Traces tab, which provides a step-by-step view of your program's execution. Here you can understand exactly how your DSPy program processes inputs and generates outputs.  ### 6. Loading Models for Inference You can load the optimized program directly from the MLflow tracking server for inference: ```python model_path = mlflow.artifacts.download_artifacts("mlflow-artifacts:/path/to/best_model.json") program.load(model_path) ``` ## Troubleshooting - If traces aren't appearing, ensure `log_traces_from_compile=True` - For large datasets, consider setting `log_traces_from_compile=False` to avoid memory issues - Use `mlflow.get_run(run_id)` to programmatically access MLflow run data For more features, explore the [MLflow Documentation](https://mlflow.org/docs/latest/llms/dspy). ``` -------------------------------------------------------------------------------- /docs/docs/learn/evaluation/data.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 5 --- # Data DSPy is a machine learning framework, so working in it involves training sets, development sets, and test sets. For each example in your data, we distinguish typically between three types of values: the inputs, the intermediate labels, and the final label. You can use DSPy effectively without any intermediate or final labels, but you will need at least a few example inputs. ## DSPy `Example` objects The core data type for data in DSPy is `Example`. You will use **Examples** to represent items in your training set and test set. DSPy **Examples** are similar to Python `dict`s but have a few useful utilities. Your DSPy modules will return values of the type `Prediction`, which is a special sub-class of `Example`. When you use DSPy, you will do a lot of evaluation and optimization runs. Your individual datapoints will be of type `Example`: ```python qa_pair = dspy.Example(question="This is a question?", answer="This is an answer.") print(qa_pair) print(qa_pair.question) print(qa_pair.answer) ``` **Output:** ```text Example({'question': 'This is a question?', 'answer': 'This is an answer.'}) (input_keys=None) This is a question? This is an answer. ``` Examples can have any field keys and any value types, though usually values are strings. ```text object = Example(field1=value1, field2=value2, field3=value3, ...) ``` You can now express your training set for example as: ```python trainset = [dspy.Example(report="LONG REPORT 1", summary="short summary 1"), ...] ``` ### Specifying Input Keys In traditional ML, there are separated "inputs" and "labels". In DSPy, the `Example` objects have a `with_inputs()` method, which can mark specific fields as inputs. (The rest are just metadata or labels.) ```python # Single Input. print(qa_pair.with_inputs("question")) # Multiple Inputs; be careful about marking your labels as inputs unless you mean it. print(qa_pair.with_inputs("question", "answer")) ``` Values can be accessed using the `.`(dot) operator. You can access the value of key `name` in defined object `Example(name="John Doe", job="sleep")` through `object.name`. To access or exclude certain keys, use `inputs()` and `labels()` methods to return new Example objects containing only input or non-input keys, respectively. ```python article_summary = dspy.Example(article= "This is an article.", summary= "This is a summary.").with_inputs("article") input_key_only = article_summary.inputs() non_input_key_only = article_summary.labels() print("Example object with Input fields only:", input_key_only) print("Example object with Non-Input fields only:", non_input_key_only) ``` **Output** ``` Example object with Input fields only: Example({'article': 'This is an article.'}) (input_keys={'article'}) Example object with Non-Input fields only: Example({'summary': 'This is a summary.'}) (input_keys=None) ``` <!-- ## Loading Dataset from sources One of the most convenient way to import datasets in DSPy is by using `DataLoader`. The first step is to declare an object, this object can then be used to call utilities to load datasets in different formats: ```python from dspy.datasets import DataLoader dl = DataLoader() ``` For most dataset formats, it's quite straightforward you pass the file path to the corresponding method of the format and you'll get the list of `Example` for the dataset in return: ```python import pandas as pd csv_dataset = dl.from_csv( "sample_dataset.csv", fields=("instruction", "context", "response"), input_keys=("instruction", "context") ) json_dataset = dl.from_json( "sample_dataset.json", fields=("instruction", "context", "response"), input_keys=("instruction", "context") ) parquet_dataset = dl.from_parquet( "sample_dataset.parquet", fields=("instruction", "context", "response"), input_keys=("instruction", "context") ) pandas_dataset = dl.from_pandas( pd.read_csv("sample_dataset.csv"), # DataFrame fields=("instruction", "context", "response"), input_keys=("instruction", "context") ) ``` These are some formats that `DataLoader` supports to load from file directly. In the backend, most of these methods leverage the `load_dataset` method from `datasets` library to load these formats. But when working with text data you often use HuggingFace datasets, in order to import HF datasets in list of `Example` format we can use `from_huggingface` method: ```python blog_alpaca = dl.from_huggingface( "intertwine-expel/expel-blog", input_keys=("title",) ) ``` You can access the splits of the dataset by accessing the corresponding key: ```python train_split = blog_alpaca['train'] # Since this is the only split in the dataset we can split this into # train and test split ourselves by slicing or sampling 75 rows from the train # split for testing. testset = train_split[:75] trainset = train_split[75:] ``` The way you load a HuggingFace dataset using `load_dataset` is exactly how you load data it via `from_huggingface` as well. This includes passing specific splits, subsplits, read instructions, etc. For code snippets, you can refer to the [cheatsheet snippets](/cheatsheet/#dspy-dataloaders) for loading from HF. --> ``` -------------------------------------------------------------------------------- /dspy/evaluate/metrics.py: -------------------------------------------------------------------------------- ```python # TODO: This should move internally. Same for passage_match. dspy.metrics.answer_exact_match, dspy.metrics.answer_passage_match import re import string import unicodedata from collections import Counter from dspy.dsp.utils.utils import print_message def EM(prediction, answers_list): # noqa: N802 assert isinstance(answers_list, list) return max(em_score(prediction, ans) for ans in answers_list) def F1(prediction, answers_list): # noqa: N802 assert isinstance(answers_list, list) return max(f1_score(prediction, ans) for ans in answers_list) def HotPotF1(prediction, answers_list): # noqa: N802 assert isinstance(answers_list, list) return max(hotpot_f1_score(prediction, ans) for ans in answers_list) def normalize_text(s): s = unicodedata.normalize("NFD", s) def remove_articles(text): return re.sub(r"\b(a|an|the)\b", " ", text) def white_space_fix(text): return " ".join(text.split()) def remove_punc(text): exclude = set(string.punctuation) return "".join(ch for ch in text if ch not in exclude) def lower(text): return text.lower() return white_space_fix(remove_articles(remove_punc(lower(s)))) def em_score(prediction, ground_truth): return normalize_text(prediction) == normalize_text(ground_truth) # See: https://github.com/hotpotqa/hotpot/blob/master/hotpot_evaluate_v1.py # See: https://rajpurkar.github.io/SQuAD-explorer/ under Evaluation Script # See: QReCC's def f1_score(prediction, ground_truth): prediction_tokens = normalize_text(prediction).split() ground_truth_tokens = normalize_text(ground_truth).split() common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if len(prediction_tokens) == len(ground_truth_tokens) == 0: # Unlike most tasks, QReCC and SQuAD-2.0 assign 1.0 in this edge case. We don't for uniformity. print_message("\n#> F1 Metric: Rare edge case of len(prediction_tokens) == len(ground_truth_tokens) == 0.\n") if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) return f1 def hotpot_f1_score(prediction, ground_truth): normalized_prediction = normalize_text(prediction) normalized_ground_truth = normalize_text(ground_truth) if normalized_prediction in ["yes", "no", "noanswer"] and normalized_prediction != normalized_ground_truth: return 0 if normalized_ground_truth in ["yes", "no", "noanswer"] and normalized_prediction != normalized_ground_truth: return 0 prediction_tokens = normalized_prediction.split() ground_truth_tokens = normalized_ground_truth.split() common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) return f1 def precision_score(prediction, ground_truth): prediction_tokens = normalize_text(prediction).split() ground_truth_tokens = normalize_text(ground_truth).split() common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if len(prediction_tokens) == len(ground_truth_tokens) == 0: # Unlike most tasks, QReCC and SQuAD-2.0 assign 1.0 in this edge case. We don't for uniformity. print_message("\n#> Precision Metric: Rare edge case of len(prediction_tokens) == len(ground_truth_tokens) == 0.\n") if num_same == 0: return 0 precision = 1.0 * num_same / len(prediction_tokens) return precision def _passage_match(passages: list[str], answers: list[str]) -> bool: """Returns True if any of the passages contains the answer.""" from dspy.dsp.utils import DPR_normalize, has_answer def passage_has_answers(passage: str, answers: list[str]) -> bool: """Returns True if the passage contains the answer.""" return has_answer( tokenized_answers=[DPR_normalize(normalize_text(ans)) for ans in answers], text=normalize_text(passage), ) return any(passage_has_answers(psg, answers) for psg in passages) def _answer_match(prediction, answers, frac=1.0): """Returns True if the prediction matches any of the answers.""" if frac >= 1.0: return EM(prediction, answers) return F1(prediction, answers) >= frac def answer_exact_match(example, pred, trace=None, frac=1.0): if isinstance(example.answer, str): return _answer_match(pred.answer, [example.answer], frac=frac) elif isinstance(example.answer, list): return _answer_match(pred.answer, example.answer, frac=frac) raise ValueError(f"Invalid answer type: {type(example.answer)}") def answer_passage_match(example, pred, trace=None): if isinstance(example.answer, str): return _passage_match(pred.context, [example.answer]) elif isinstance(example.answer, list): return _passage_match(pred.context, example.answer) raise ValueError(f"Invalid answer type: {type(example.answer)}") ``` -------------------------------------------------------------------------------- /dspy/clients/utils_finetune.py: -------------------------------------------------------------------------------- ```python import os from enum import Enum from typing import Any, Literal, TypedDict import orjson import dspy from dspy.adapters.base import Adapter from dspy.utils.caching import DSPY_CACHEDIR class TrainingStatus(str, Enum): not_started = "not_started" pending = "pending" running = "running" succeeded = "succeeded" failed = "failed" cancelled = "cancelled" class TrainDataFormat(str, Enum): CHAT = "chat" COMPLETION = "completion" GRPO_CHAT = "grpo_chat" class Message(TypedDict): role: Literal["user"] | Literal["assistant"] | Literal["system"] content: str class MessageAssistant(TypedDict): role: Literal["assistant"] content: str class GRPOChatData(TypedDict): messages: list[Message] completion: MessageAssistant reward: float GRPOGroup = list[GRPOChatData] class MultiGPUConfig(TypedDict): # Number of GPUs to use for inference num_inference_gpus: int # Number of GPUs to use for training num_training_gpus: int def infer_data_format(adapter: Adapter) -> str: if isinstance(adapter, dspy.ChatAdapter): return TrainDataFormat.CHAT raise ValueError(f"Could not infer the data format for: {adapter}") def get_finetune_directory() -> str: default_finetunedir = os.path.join(DSPY_CACHEDIR, "finetune") finetune_dir = os.environ.get("DSPY_FINETUNEDIR") or default_finetunedir finetune_dir = os.path.abspath(finetune_dir) os.makedirs(finetune_dir, exist_ok=True) return finetune_dir def write_lines(file_path, data): with open(file_path, "wb") as f: for item in data: f.write(orjson.dumps(item) + b"\n") def save_data( data: list[dict[str, Any]], ) -> str: from dspy.utils.hasher import Hasher # Assign a unique name to the file based on the data hash hash = Hasher.hash(data) file_name = f"{hash}.jsonl" finetune_dir = get_finetune_directory() file_path = os.path.join(finetune_dir, file_name) file_path = os.path.abspath(file_path) with open(file_path, "wb") as f: for item in data: f.write(orjson.dumps(item) + b"\n") return file_path def validate_data_format( data: list[dict[str, Any]], data_format: TrainDataFormat, ): find_err_funcs = { TrainDataFormat.CHAT: find_data_error_chat, TrainDataFormat.COMPLETION: find_data_errors_completion, } err = f"Data format {data_format} is not supported." assert data_format in find_err_funcs, err find_err_func = find_err_funcs[data_format] if not isinstance(data, list): err = f"Data is not a list. Found data type: {type(data)}" raise ValueError(err) data_dict_errors = [] for ind, data_dict in enumerate(data): err = f"Not a dictionary -- found data type: {type(data_dict)}" if isinstance(data_dict, dict): err = find_err_func(data_dict) if err: err_dict = {"index": ind, "error": err} data_dict_errors.append(err_dict) if data_dict_errors: finetune_dir = get_finetune_directory() log_path = os.path.join(finetune_dir, "data_format_errors.log") log_path = os.path.abspath(log_path) write_lines(log_path, data_dict_errors) err = f"Data format errors found. For more details, see the log file: {log_path}" raise ValueError(err) def find_data_errors_completion(data_dict: dict[str, str]) -> str | None: keys = ["prompt", "completion"] assert isinstance(data_dict, dict) expected_keys = sorted(keys) found_keys = sorted(data_dict.keys()) if set(expected_keys) != set(found_keys): return f"Expected Keys: {expected_keys}; Found Keys: {found_keys}" for key in keys: if not isinstance(data_dict[key], str): return f"Expected `{key}` to be of type `str`. Found: {type(data_dict[key])}" # Following functions are modified from the OpenAI cookbook: # https://cookbook.openai.com/examples/chat_finetuning_data_prep def find_data_error_chat(messages: dict[str, Any]) -> str | None: assert isinstance(messages, dict) expected_keys = ["messages"] found_keys = sorted(messages.keys()) if set(expected_keys) != set(found_keys): return f"Expected Keys: {expected_keys}; Found Keys: {found_keys}" if not isinstance(messages["messages"], list): return f"The value of the `messages` key should be a list instance. Found: {type(messages['messages'])}" for ind, message in enumerate(messages["messages"]): err = find_data_error_chat_message(message) if err: return f"Error in message at index {ind}: {err}" def find_data_error_chat_message(message: dict[str, Any]) -> str | None: assert isinstance(message, dict) message_keys = sorted(["role", "content"]) found_keys = sorted(message.keys()) if set(message_keys) != set(found_keys): return f"Expected Keys: {message_keys}; Found Keys: {found_keys}" expected_roles = sorted(["assistant", "system", "user"]) found_role = message["role"] if found_role not in expected_roles: return f"Expected Roles: {expected_roles}; Found Role: {found_role}" if not isinstance(message["content"], str): return f"Expected Content Type: `str`; Found Content Type: {type(message['content'])}" ``` -------------------------------------------------------------------------------- /docs/docs/stylesheets/extra.css: -------------------------------------------------------------------------------- ```css /* Custom styles for logo */ .md-logo, .md-logo img { width: auto !important; height: 1.5rem !important; padding: 0 !important; margin: 0 !important; } .md-logo img { object-fit: contain !important; } /* Adjust header to accommodate logo */ .md-header__inner { padding: 0.5rem !important; } /* Responsive adjustments */ @media screen and (min-width: 76.25em) { .md-logo { height: 1.8rem !important; /* Reduced from 2.2rem */ } .md-logo img { height: 1.8rem !important; /* Reduced from 2.2rem */ } } /* Dark mode specific adjustments */ [data-md-color-scheme="slate"] .md-logo img { filter: brightness(0.9); } /* Default max-width */ .md-content { max-width: 980px; margin: 0 auto; } /* Adjust width when a sidebar or TOC is present */ @media (min-width: 1700px) { /* Increase width when there's enough space (like on desktop or tablet landscape) */ .md-content { min-width: 980px; } } /* Justified text for main content */ .md-content__inner p { text-align: justify; } /* Left-aligned text for grid cards */ .md-content__inner .grid.cards p { text-align: left; } /* Base styling for the output area */ .jp-Cell-outputWrapper .jp-OutputArea-output pre { max-height: 300px; overflow-y: auto; padding: 10px 15px; border-radius: 5px; font-family: monospace; font-size: 0.9em; } /* Light mode specific styling */ :root { --output-bg-light: #fafafa; --output-border-light: #ddd; --output-text-light: #333; } body[data-md-color-scheme="default"] .jp-Cell-outputWrapper .jp-OutputArea-output pre { background-color: var(--output-bg-light); border: 1px solid var(--output-border-light); color: var(--output-text-light); box-shadow: 0px 1px 4px rgba(0, 0, 0, 0.1); } /* Dark mode specific styling */ :root { --output-bg-dark: #2e2e2e; --output-border-dark: #555; --output-text-dark: #e0e0e0; } body[data-md-color-scheme="slate"] .jp-Cell-outputWrapper .jp-OutputArea-output pre { background-color: var(--output-bg-dark); border: 1px solid var(--output-border-dark); color: var(--output-text-dark); box-shadow: 0px 1px 4px rgba(255, 255, 255, 0.1); } /* Set a fixed width for the sidebar */ .md-sidebar { width: 235px; } /* Adjust search bar position */ .md-search { margin-left: auto; padding-right: 0; } /* If you need to adjust the width of the search bar */ .md-search__inner { width: 13rem; } /* Adjust repository button position and alignment */ .md-header__source { margin-left: 1rem; margin-right: 0; text-align: right; /* Keep right alignment for container */ } .md-header__source .md-source { justify-content: flex-start; /* Change to flex-start to align text to left */ width: auto; /* Allow element to shrink to content */ min-width: 0; /* Remove minimum width constraint */ } .md-header__source .md-source__icon { order: 2; /* Keep icon on the right */ margin-left: 0.5rem; margin-right: 0; } .md-header__source .md-source__repository { order: 1; /* Keep text on the left */ text-align: left; /* Ensure text is left-aligned */ } h2.doc-heading { font-size: 1rem; font-weight: 700; } /* Add more spacing between API sections */ .doc-heading { margin-top: 1em; border-top: 1px solid var(--md-default-fg-color--lightest); font-size: 0.85rem; } /* Make method names more prominent */ .doc-method, .doc-function { background-color: var(--md-code-bg-color); padding: 0.1em; margin: 0.5em 0; border-radius: 4px; } /* Make class documentation stand out */ .doc-class { padding: 1em; margin: 1em 0; border-left: 4px solid var(--md-primary-fg-color); background-color: var(--md-code-bg-color); } /* Style for type labels */ .doc-label { font-size: 0.8em; padding: 0.2em 0.6em; border-radius: 4px; background-color: var(--md-code-bg-color); display: inline-block; margin: 0.2em 0; font-weight: 400; text-transform: none; /* Prevent uppercase transformation */ color: var(--md-code-fg-color); } /* Add indentation and visual cues for nested navigation items */ .md-nav__item .md-nav__item { padding-left: 0.3rem; border-left: 1px solid var(--md-primary-fg-color--light); margin-left: 0.2rem; } /* Add some spacing between items */ .md-nav__item { margin: 0.3em 0; /* Reduced from 0.4em */ } /* Optional: add hover effect */ .md-nav__item .md-nav__item:hover { border-left-color: var(--md-primary-fg-color); } /* Enhance code examples in documentation */ .highlight { background-color: #f8f9fa; border: 1px solid #e9ecef; border-radius: 6px; margin: 1.5em 0; padding: 1em; box-shadow: 0 2px 4px rgba(0, 0, 0, 0.05); } /* Dark mode adjustments */ [data-md-color-scheme="slate"] .highlight { background-color: #2b2b2b; border-color: #404040; } /* Add subtle left border for visual interest */ .highlight pre { margin: 0; } /* Ensure code is readable */ .highlight code { font-family: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, monospace; font-size: 0.75em; } .highlight .linenos { font-size: 0.75em; } /* Copy button styling */ .highlight .md-clipboard { color: var(--md-default-fg-color--lighter); } .highlight .md-clipboard:hover { color: var(--md-accent-fg-color); } ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/yahoo_finance_react/index.md: -------------------------------------------------------------------------------- ```markdown # Financial Analysis with DSPy ReAct and Yahoo Finance News This tutorial shows how to build a financial analysis agent using DSPy ReAct with [LangChain's Yahoo Finance News tool](https://python.langchain.com/docs/integrations/tools/yahoo_finance_news/) for real-time market analysis. ## What You'll Build A financial agent that fetches news, analyzes sentiment, and provides investment insights. ## Setup ```bash pip install dspy langchain langchain-community yfinance ``` ## Step 1: Convert LangChain Tool to DSPy ```python import dspy from langchain_community.tools.yahoo_finance_news import YahooFinanceNewsTool from dspy.adapters.types.tool import Tool import json import yfinance as yf # Configure DSPy lm = dspy.LM(model='openai/gpt-4o-mini') dspy.configure(lm=lm) # Convert LangChain Yahoo Finance tool to DSPy yahoo_finance_tool = YahooFinanceNewsTool() finance_news_tool = Tool.from_langchain(yahoo_finance_tool) ``` ## Step 2: Create Supporting Financial Tools ```python def get_stock_price(ticker: str) -> str: """Get current stock price and basic info.""" try: stock = yf.Ticker(ticker) info = stock.info hist = stock.history(period="1d") if hist.empty: return f"Could not retrieve data for {ticker}" current_price = hist['Close'].iloc[-1] prev_close = info.get('previousClose', current_price) change_pct = ((current_price - prev_close) / prev_close * 100) if prev_close else 0 result = { "ticker": ticker, "price": round(current_price, 2), "change_percent": round(change_pct, 2), "company": info.get('longName', ticker) } return json.dumps(result) except Exception as e: return f"Error: {str(e)}" def compare_stocks(tickers: str) -> str: """Compare multiple stocks (comma-separated).""" try: ticker_list = [t.strip().upper() for t in tickers.split(',')] comparison = [] for ticker in ticker_list: stock = yf.Ticker(ticker) info = stock.info hist = stock.history(period="1d") if not hist.empty: current_price = hist['Close'].iloc[-1] prev_close = info.get('previousClose', current_price) change_pct = ((current_price - prev_close) / prev_close * 100) if prev_close else 0 comparison.append({ "ticker": ticker, "price": round(current_price, 2), "change_percent": round(change_pct, 2) }) return json.dumps(comparison) except Exception as e: return f"Error: {str(e)}" ``` ## Step 3: Build the Financial ReAct Agent ```python class FinancialAnalysisAgent(dspy.Module): """ReAct agent for financial analysis using Yahoo Finance data.""" def __init__(self): super().__init__() # Combine all tools self.tools = [ finance_news_tool, # LangChain Yahoo Finance News get_stock_price, compare_stocks ] # Initialize ReAct self.react = dspy.ReAct( signature="financial_query -> analysis_response", tools=self.tools, max_iters=6 ) def forward(self, financial_query: str): return self.react(financial_query=financial_query) ``` ## Step 4: Run Financial Analysis ```python def run_financial_demo(): """Demo of the financial analysis agent.""" # Initialize agent agent = FinancialAnalysisAgent() # Example queries queries = [ "What's the latest news about Apple (AAPL) and how might it affect the stock price?", "Compare AAPL, GOOGL, and MSFT performance", "Find recent Tesla news and analyze sentiment" ] for query in queries: print(f"Query: {query}") response = agent(financial_query=query) print(f"Analysis: {response.analysis_response}") print("-" * 50) # Run the demo if __name__ == "__main__": run_financial_demo() ``` ## Example Output When you run the agent with a query like "What's the latest news about Apple?", it will: 1. Use the Yahoo Finance News tool to fetch recent Apple news 2. Get current stock price data 3. Analyze the information and provide insights **Sample Response:** ``` Analysis: Given the current price of Apple (AAPL) at $196.58 and the slight increase of 0.48%, it appears that the stock is performing steadily in the market. However, the inability to access the latest news means that any significant developments that could influence investor sentiment and stock price are unknown. Investors should keep an eye on upcoming announcements or market trends that could impact Apple's performance, especially in comparison to other tech stocks like Microsoft (MSFT), which is also showing a positive trend. ``` ## Key Benefits - **Tool Integration**: Seamlessly combine LangChain tools with DSPy ReAct - **Real-time Data**: Access current market data and news - **Extensible**: Easy to add more financial analysis tools - **Intelligent Reasoning**: ReAct framework provides step-by-step analysis This tutorial shows how DSPy's ReAct framework works with LangChain's financial tools to create intelligent market analysis agents. ``` -------------------------------------------------------------------------------- /dspy/teleprompt/bootstrap_trace.py: -------------------------------------------------------------------------------- ```python import logging from dataclasses import dataclass from types import MethodType from typing import Any, Callable, TypedDict import dspy from dspy.evaluate.evaluate import Evaluate from dspy.primitives.example import Example from dspy.primitives.module import Module from dspy.primitives.prediction import Prediction from dspy.utils.exceptions import AdapterParseError logger = logging.getLogger(__name__) @dataclass class FailedPrediction: completion_text: str format_reward: float | None = None class TraceData(TypedDict): example_ind: int example: Example prediction: Prediction trace: list[tuple[Any, dict[str, Any], Prediction]] score: float | None def bootstrap_trace_data( program: Module, dataset: list[Example], metric: Callable | None = None, num_threads: int | None = None, raise_on_error=True, capture_failed_parses=False, failure_score: float = 0, format_failure_score: float = -1, log_format_failures: bool = False, callback_metadata: dict[str, Any] | None = None, ) -> list[TraceData]: # Return a list of dicts with the following keys: example_ind, example, prediction, trace, and score # (if metric != None) evaluator = Evaluate( devset=dataset, num_threads=num_threads, display_progress=True, provide_traceback=False, # TODO(check with team) max_errors=len(dataset) * 10, # TODO(check with team) failure_score=failure_score, ) def wrapped_metric(example, prediction, trace=None): prediction, _ = prediction if isinstance(prediction, FailedPrediction): return prediction.format_reward or format_failure_score return metric(example, prediction, trace) if metric else True # Use `object.__getattribute__` to bypass the custom hook `Module.__getattribute__` so that we avoid # the warning that `forward` is not accessed through `__call__`. original_forward = object.__getattribute__(program, "forward") def patched_forward(program_to_use: Module, **kwargs): with dspy.context(trace=[]): try: return original_forward(**kwargs), dspy.settings.trace.copy() except AdapterParseError as e: completion_str = e.lm_response parsed_result = e.parsed_result failed_signature = e.signature failed_inputs = kwargs present = list(parsed_result.keys()) if parsed_result else None expected = list(failed_signature.output_fields.keys()) found_pred = None for pred in program_to_use.predictors(): if pred.signature == failed_signature: found_pred = pred break if found_pred is None: raise ValueError(f"Failed to find the predictor for the failed signature: {failed_signature}") trace = dspy.settings.trace.copy() # Trace is Tuple[signature, inputs, prediction outputs] if present: failed_pred = FailedPrediction( completion_text=completion_str, format_reward=format_failure_score + (failure_score - format_failure_score) * (present / expected), ) else: failed_pred = FailedPrediction(completion_text=completion_str, format_reward=format_failure_score) trace.append( ( found_pred, failed_inputs, failed_pred, ) ) if log_format_failures: logging.warning( "Failed to parse output for example. This is likely due to the LLM response not following " "the adapter's formatting." ) return failed_pred, trace program.forward = MethodType(patched_forward, program) try: results = evaluator( program, metric=wrapped_metric, callback_metadata=callback_metadata, ).results finally: program.forward = original_forward data = [] for example_ind, (example, prediction, score) in enumerate(results): try: prediction, trace = prediction except ValueError as ve: # TODO(GRPO Team): Often during GRPO bootstrapping, the LLM response does not follow dspy formatting. This # leads to a value error. To reproduce this issue, try Qwen/Qwen2.5-Coder-0.5B-Instruct with MATH dataset. # Proposal(Lakshya): We should capture the incorrectly-formatted LLM response, and store it in the trace, # and pass it to in the GRPO group with a high-negative user-configurable score. logger.warning( "Failed to unpack prediction and trace. This is likely due to the LLM response not following " "dspy formatting." ) if raise_on_error: raise ve else: continue data_dict = {"example": example, "prediction": prediction, "trace": trace, "example_ind": example_ind} if metric: data_dict["score"] = score data.append(data_dict) return data ``` -------------------------------------------------------------------------------- /dspy/predict/code_act.py: -------------------------------------------------------------------------------- ```python import inspect import logging from typing import Callable import dspy from dspy.adapters.types.tool import Tool from dspy.predict.program_of_thought import ProgramOfThought from dspy.predict.react import ReAct from dspy.primitives.python_interpreter import PythonInterpreter from dspy.signatures.signature import Signature, ensure_signature logger = logging.getLogger(__name__) class CodeAct(ReAct, ProgramOfThought): """ CodeAct is a module that utilizes the Code Interpreter and predefined tools to solve the problem. """ def __init__(self, signature: str | type[Signature], tools: list[Callable], max_iters: int = 5, interpreter: PythonInterpreter | None = None): """ Initializes the CodeAct class with the specified model, temperature, and max tokens. Args: signature (Union[str, Type[Signature]]): The signature of the module. tools (list[Callable]): The tool callables to be used. CodeAct only accepts functions and not callable objects. max_iters (int): The maximum number of iterations to generate the answer. interpreter: PythonInterpreter instance to use. If None, a new one is instantiated. Example: ```python from dspy.predict import CodeAct def factorial(n): if n == 1: return 1 return n * factorial(n-1) act = CodeAct("n->factorial", tools=[factorial]) act(n=5) # 120 ``` """ self.signature = ensure_signature(signature) self.max_iters = max_iters self.history = [] tools = [t if isinstance(t, Tool) else Tool(t) for t in tools] if any( not inspect.isfunction(tool.func) for tool in tools ): raise ValueError("CodeAct only accepts functions and not callable objects.") tools = {tool.name: tool for tool in tools} instructions = self._build_instructions(self.signature, tools) codeact_signature = ( dspy.Signature({**self.signature.input_fields}, "\n".join(instructions)) .append("trajectory", dspy.InputField(), type_=str) .append("generated_code", dspy.OutputField(desc="Python code that when executed, produces output relevant to answering the question"), type_=str) .append("finished", dspy.OutputField(desc="a boolean flag to determine if the process is done"), type_=bool) ) extract_signature = dspy.Signature( {**self.signature.input_fields, **self.signature.output_fields}, self.signature.instructions, ).append("trajectory", dspy.InputField(), type_=str) self.tools: dict[str, Tool] = tools self.codeact = dspy.Predict(codeact_signature) self.extractor = dspy.ChainOfThought(extract_signature) # It will raises exception when dspy cannot find available deno instance by now. self.interpreter = interpreter or PythonInterpreter() def _build_instructions(self, signature, tools): instructions = [f"{signature.instructions}\n"] if signature.instructions else [] inputs = ", ".join([f"`{k}`" for k in signature.input_fields.keys()]) outputs = ", ".join([f"`{k}`" for k in signature.output_fields.keys()]) instructions.append( f"You are an intelligent agent. For each episode, you will receive the fields {inputs} as input.\n" f"Your goal is to generate executable Python code that collects any necessary information for producing {outputs}.\n" "For each iteration, you will generate a code snippet that either solves the task or progresses towards the solution.\n" "Ensure any output you wish to extract from the code is printed to the console. The code should be enclosed in a fenced code block.\n" f"When all information for producing the outputs ({outputs}) are available to be extracted, mark `finished=True` besides the final Python code.\n" "You have access to the Python Standard Library and the following functions:" ) for idx, tool in enumerate(tools.values()): instructions.append(f"({idx + 1}) {tool}") return instructions def forward(self, **kwargs): # Define the tool functions in the interpreter for tool in self.tools.values(): self.interpreter(inspect.getsource(tool.func)) trajectory = {} max_iters = kwargs.pop("max_iters", self.max_iters) for idx in range(max_iters): code_data = self.codeact(trajectory=trajectory, **kwargs) output = None code, error = self._parse_code(code_data) if error: trajectory[f"observation_{idx}"] = f"Failed to parse the generated code: {error}" continue trajectory[f"generated_code_{idx}"] = code output, error = self._execute_code(code) if not error: trajectory[f"code_output_{idx}"] = output else: trajectory[f"observation_{idx}"] = f"Failed to execute the generated code: {error}" if code_data.finished: break extract = self._call_with_potential_trajectory_truncation(self.extractor, trajectory, **kwargs) self.interpreter.shutdown() return dspy.Prediction(trajectory=trajectory, **extract) ``` -------------------------------------------------------------------------------- /tests/teleprompt/test_bootstrap_trace.py: -------------------------------------------------------------------------------- ```python from typing import Any from unittest import mock from litellm import Choices, Message, ModelResponse import dspy from dspy.primitives.example import Example from dspy.teleprompt.bootstrap_trace import FailedPrediction, bootstrap_trace_data def test_bootstrap_trace_data(): """Test bootstrap_trace_data function with single dspy.Predict program.""" # Define signature for string -> int conversion class StringToIntSignature(dspy.Signature): """Convert a string number to integer""" text: str = dspy.InputField() number: int = dspy.OutputField() # Create program with single dspy.Predict program = dspy.Predict(StringToIntSignature) # Create dummy dataset of size 5 dataset = [ Example(text="one", number=1).with_inputs("text"), Example(text="two", number=2).with_inputs("text"), Example(text="three", number=3).with_inputs("text"), Example(text="four", number=4).with_inputs("text"), Example(text="five", number=5).with_inputs("text"), ] # Define exact match metric def exact_match_metric(example, prediction, trace=None): return example.number == prediction.number # Configure dspy dspy.configure(lm=dspy.LM(model="openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()) # Mock litellm completion responses # 4 successful responses and 1 that will trigger AdapterParseError successful_responses = [ ModelResponse( choices=[Choices(message=Message(content='```json\n{"number": 1}\n```'))], model="openai/gpt-4o-mini", ), ModelResponse( choices=[Choices(message=Message(content='```json\n{"number": 2}\n```'))], model="openai/gpt-4o-mini", ), ModelResponse( choices=[Choices(message=Message(content='```json\n{"number": 3}\n```'))], model="openai/gpt-4o-mini", ), ModelResponse( choices=[Choices(message=Message(content='```json\n{"number": 4}\n```'))], model="openai/gpt-4o-mini", ), ] # Create a side effect that will trigger AdapterParseError on the 3rd call (index 2) def completion_side_effect(*args, **kwargs): call_count = completion_side_effect.call_count completion_side_effect.call_count += 1 if call_count == 5: # Third call (0-indexed) # Return malformed response that will cause AdapterParseError return ModelResponse( choices=[Choices(message=Message(content="This is an invalid JSON!"))], model="openai/gpt-4o-mini", ) else: return successful_responses[call_count] completion_side_effect.call_count = 0 with mock.patch("litellm.completion", side_effect=completion_side_effect): # Call bootstrap_trace_data results = bootstrap_trace_data( program=program, dataset=dataset, metric=exact_match_metric, raise_on_error=False, capture_failed_parses=True, ) # Verify results assert len(results) == 5, f"Expected 5 results, got {len(results)}" # Count successful and failed predictions successful_count = 0 failed_count = 0 for result in results: assert "example" in result assert "prediction" in result assert "trace" in result assert "example_ind" in result assert "score" in result if isinstance(result["prediction"], FailedPrediction): failed_count += 1 # Verify failed prediction structure assert hasattr(result["prediction"], "completion_text") assert hasattr(result["prediction"], "format_reward") assert result["prediction"].completion_text == "This is an invalid JSON!" else: successful_count += 1 # Verify successful prediction structure assert hasattr(result["prediction"], "number") # Verify we have the expected number of successful and failed bootstrapping assert successful_count == 4, f"Expected 4 successful predictions, got {successful_count}" assert failed_count == 1, f"Expected 1 failed prediction, got {failed_count}" # Verify that traces are present for result in results: assert len(result["trace"]) > 0, "Trace should not be empty" # Each trace entry should be a tuple of (predictor, inputs, prediction) for trace_entry in result["trace"]: assert len(trace_entry) == 3, "Trace entry should have 3 elements" def test_bootstrap_trace_data_passes_callback_metadata(monkeypatch): from dspy.teleprompt import bootstrap_trace as bootstrap_trace_module class DummyProgram(dspy.Module): def forward(self, **kwargs): # pragma: no cover - stub forward return dspy.Prediction() captured_metadata: dict[str, Any] = {} class DummyEvaluate: def __init__(self, *args, **kwargs): pass def __call__(self, *args, callback_metadata=None, **kwargs): captured_metadata["value"] = callback_metadata class _Result: results: list[Any] = [] return _Result() monkeypatch.setattr(bootstrap_trace_module, "Evaluate", DummyEvaluate) bootstrap_trace_module.bootstrap_trace_data( program=DummyProgram(), dataset=[], callback_metadata={"disable_logging": True}, ) assert captured_metadata["value"] == {"disable_logging": True} ``` -------------------------------------------------------------------------------- /tests/reliability/utils.py: -------------------------------------------------------------------------------- ```python import os from contextlib import contextmanager from functools import lru_cache, wraps from typing import Any, Dict, List, Union import pydantic import pytest import yaml import dspy JUDGE_MODEL_NAME = "judge" def assert_program_output_correct( program_input: Any, program_output: Any, grading_guidelines: Union[str, list[str]], ): """ With the help of an LLM judge, assert that the specified output of a DSPy program is correct, according to the specified grading guidelines. Args: program_input: The input to a DSPy program. program_output: The output from the DSPy program. grading_guidelines: The grading guidelines for judging the correctness of the program output. """ if not isinstance(grading_guidelines, list): grading_guidelines = [grading_guidelines] with judge_dspy_configuration(): for guideline_entry in grading_guidelines: judge_response = _get_judge_program()( program_input=str(program_input), program_output=str(program_output), guidelines=guideline_entry, ).judge_response assert judge_response.correct, f"Output: {program_output}. Reason incorrect: {judge_response.justification}" def known_failing_models(models: list[str]): """ Decorator to allow specific test cases to fail for certain models. This is useful when a model is known to be unable to perform a specific task (e.g. output formatting with complex schemas) to the required standard. Args: models: List of model names for which the test case is allowed to fail. """ def decorator(test_func): test_func._known_failing_models = models @wraps(test_func) def wrapper(*args, **kwargs): return test_func(*args, **kwargs) return wrapper return decorator @contextmanager def judge_dspy_configuration(**extra_judge_config): """ Context manager to temporarily configure the DSPy to use the the judge model from `reliability_conf.yaml`. Args: extra_judge_config: Extra configuration parameters to apply on top of the judge model configuration from `reliability_conf.yaml`. """ module_dir = os.path.dirname(os.path.abspath(__file__)) conf_path = os.path.join(module_dir, "reliability_conf.yaml") reliability_conf = parse_reliability_conf_yaml(conf_path) adapter = get_adapter(reliability_conf) judge_params = reliability_conf.models.get(JUDGE_MODEL_NAME) if judge_params is None: raise ValueError(f"No LiteLLM configuration found for judge model: {JUDGE_MODEL_NAME}") with dspy.settings.context(lm=dspy.LM(**judge_params, **extra_judge_config), adapter=adapter): yield def _get_judge_program(): class JudgeResponse(pydantic.BaseModel): correct: bool = pydantic.Field("Whether or not the judge output is correct") justification: str = pydantic.Field("Justification for the correctness of the judge output") class JudgeSignature(dspy.Signature): """ Given the input and output of an AI program, determine whether the output is correct, according to the provided guidelines. Only consider the guidelines when determining correctness. Outputs often look like Python objects. Analyze these objects very carefully to make sure you don't miss certain fields or values. """ program_input: str = dspy.InputField(description="The input to an AI program / model that is being judged") program_output: str = dspy.InputField( description="The resulting output from the AI program / model that is being judged" ) guidelines: str = dspy.InputField( description=( "Grading guidelines for judging the correctness of the program output." " If the output satisfies the guidelines, the judge will return correct=True." ) ) judge_response: JudgeResponse = dspy.OutputField() return dspy.Predict(JudgeSignature) class ReliabilityTestConf(pydantic.BaseModel): adapter: str models: dict[str, Any] @lru_cache(maxsize=None) def parse_reliability_conf_yaml(conf_file_path: str) -> ReliabilityTestConf: try: with open(conf_file_path, "r") as file: conf = yaml.safe_load(file) model_dict = {} for conf_entry in conf["model_list"]: model_name = conf_entry.get("model_name") if model_name is None: raise ValueError("Model name missing in reliability_conf.yaml") litellm_params = conf_entry.get("litellm_params") if litellm_params is not None: model_dict[model_name] = litellm_params adapter = conf.get("adapter") if adapter is None: raise ValueError("No adapter configuration found in reliability_conf.yaml") return ReliabilityTestConf(adapter=adapter, models=model_dict) except Exception as e: raise ValueError(f"Error parsing LiteLLM configuration file: {conf_file_path}") from e def get_adapter(reliability_conf: ReliabilityTestConf) -> dspy.Adapter: if reliability_conf.adapter.lower() == "chat": return dspy.ChatAdapter() elif reliability_conf.adapter.lower() == "json": return dspy.JSONAdapter() else: raise ValueError(f"Unknown adapter specification '{reliability_conf.adapter}' in reliability_conf.yaml") ``` -------------------------------------------------------------------------------- /dspy/dsp/utils/utils.py: -------------------------------------------------------------------------------- ```python import copy import datetime import itertools import os from collections import defaultdict import tqdm def print_message(*s, condition=True, pad=False, sep=None): s = " ".join([str(x) for x in s]) msg = "[{}] {}".format(datetime.datetime.now().strftime("%b %d, %H:%M:%S"), s) if condition: msg = msg if not pad else f"\n{msg}\n" print(msg, flush=True, sep=sep) return msg def timestamp(daydir=False): format_str = f"%Y-%m{'/' if daydir else '-'}%d{'/' if daydir else '_'}%H.%M.%S" result = datetime.datetime.now().strftime(format_str) return result def file_tqdm(file): print(f"#> Reading {file.name}") with tqdm.tqdm( total=os.path.getsize(file.name) / 1024.0 / 1024.0, unit="MiB", ) as pbar: for line in file: yield line pbar.update(len(line) / 1024.0 / 1024.0) pbar.close() def create_directory(path): if os.path.exists(path): print("\n") print_message("#> Note: Output directory", path, "already exists\n\n") else: print("\n") print_message("#> Creating directory", path, "\n\n") os.makedirs(path) def deduplicate(seq: list[str]) -> list[str]: """ From Raymond Hettinger https://twitter.com/raymondh/status/944125570534621185 Since Python 3.6 Dict are ordered Benchmark: https://gist.github.com/peterbe/67b9e40af60a1d5bcb1cfb4b2937b088 """ return list(dict.fromkeys(seq)) def batch(group, bsize, provide_offset=False): offset = 0 while offset < len(group): batch_data = group[offset : offset + bsize] yield ((offset, batch_data) if provide_offset else batch_data) offset += len(batch_data) return class dotdict(dict): # noqa: N801 def __getattr__(self, key): if key.startswith("__") and key.endswith("__"): return super().__getattr__(key) try: return self[key] except KeyError: raise AttributeError(f"'{type(self).__name__}' object has no attribute '{key}'") def __setattr__(self, key, value): if key.startswith("__") and key.endswith("__"): super().__setattr__(key, value) else: self[key] = value def __delattr__(self, key): if key.startswith("__") and key.endswith("__"): super().__delattr__(key) else: del self[key] def __deepcopy__(self, memo): # Use the default dict copying method to avoid infinite recursion. return dotdict(copy.deepcopy(dict(self), memo)) class dotdict_lax(dict): # noqa: N801 __getattr__ = dict.get __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def flatten(data_list): result = [] for child_list in data_list: result += child_list return result def zipstar(data_list, lazy=False): """ A much faster A, B, C = zip(*[(a, b, c), (a, b, c), ...]) May return lists or tuples. """ if len(data_list) == 0: return data_list width = len(data_list[0]) if width < 100: return [[elem[idx] for elem in data_list] for idx in range(width)] zipped_data = zip(*data_list, strict=False) return zipped_data if lazy else list(zipped_data) def zip_first(list1, list2): length = len(list1) if type(list1) in [tuple, list] else None zipped_data = list(zip(list1, list2, strict=False)) assert length in [None, len(zipped_data)], "zip_first() failure: length differs!" return zipped_data def int_or_float(val): if "." in val: return float(val) return int(val) def groupby_first_item(lst): groups = defaultdict(list) for first, *rest in lst: rest = rest[0] if len(rest) == 1 else rest groups[first].append(rest) return groups def process_grouped_by_first_item(lst): """ Requires items in list to already be grouped by first item. """ groups = defaultdict(list) started = False last_group = None for first, *rest in lst: rest = rest[0] if len(rest) == 1 else rest if started and first != last_group: yield (last_group, groups[last_group]) assert first not in groups, f"{first} seen earlier --- violates precondition." groups[first].append(rest) last_group = first started = True return groups def grouper(iterable, n, fillvalue=None): """ Collect data into fixed-length chunks or blocks Example: grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" Source: https://docs.python.org/3/library/itertools.html#itertools-recipes """ args = [iter(iterable)] * n return itertools.zip_longest(*args, fillvalue=fillvalue) def lengths2offsets(lengths): offset = 0 for length in lengths: yield (offset, offset + length) offset += length return # see https://stackoverflow.com/a/45187287 class NullContextManager: def __init__(self, dummy_resource=None): self.dummy_resource = dummy_resource def __enter__(self): return self.dummy_resource def __exit__(self, *args): pass def load_batch_backgrounds(args, qids): if args.qid2backgrounds is None: return None qbackgrounds = [] for qid in qids: back = args.qid2backgrounds[qid] if len(back) and isinstance(back[0], int): x = [args.collection[pid] for pid in back] else: x = [args.collectionX.get(pid, "") for pid in back] x = " [SEP] ".join(x) qbackgrounds.append(x) return qbackgrounds ``` -------------------------------------------------------------------------------- /docs/docs/deep-dive/data-handling/loading-custom-data.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 3 --- !!! warning "This page is outdated and may not be fully accurate in DSPy 2.5" # Creating a Custom Dataset We've seen how to work with with `Example` objects and use the `HotPotQA` class to load the HuggingFace HotPotQA dataset as a list of `Example` objects. But in production, such structured datasets are rare. Instead, you'll find yourself working on a custom dataset and might question: how do I create my own dataset or what format should it be? In DSPy, your dataset is a list of `Examples`, which we can accomplish in two ways: * **Recommended: The Pythonic Way:** Using native python utility and logic. * **Advanced: Using DSPy's `Dataset` class** ## Recommended: The Pythonic Way To create a list of `Example` objects, we can simply load data from the source and formulate it into a Python list. Let's load an example CSV `sample.csv` that contains 3 fields: (**context**, **question** and **summary**) via Pandas. From there, we can construct our data list. ```python import pandas as pd df = pd.read_csv("sample.csv") print(df.shape) ``` **Output:** ```text (1000, 3) ``` ```python dataset = [] for context, question, answer in df.values: dataset.append(dspy.Example(context=context, question=question, answer=answer).with_inputs("context", "question")) print(dataset[:3]) ``` **Output:** ```python [Example({'context': nan, 'question': 'Which is a species of fish? Tope or Rope', 'answer': 'Tope'}) (input_keys={'question', 'context'}), Example({'context': nan, 'question': 'Why can camels survive for long without water?', 'answer': 'Camels use the fat in their humps to keep them filled with energy and hydration for long periods of time.'}) (input_keys={'question', 'context'}), Example({'context': nan, 'question': "Alice's parents have three daughters: Amy, Jessy, and what’s the name of the third daughter?", 'answer': 'The name of the third daughter is Alice'}) (input_keys={'question', 'context'})] ``` While this is fairly simple, let's take a look at how loading datasets would look in DSPy - via the DSPythonic way! ## Advanced: Using DSPy's `Dataset` class (Optional) Let's take advantage of the `Dataset` class we defined in the previous article to accomplish the preprocessing: * Load data from CSV to a dataframe. * Split the data to train, dev and test splits. * Populate `_train`, `_dev` and `_test` class attributes. Note that these attributes should be a list of dictionary, or an iterator over mapping like HuggingFace Dataset, to make it work. This is all done through the `__init__` method, which is the only method we have to implement. ```python import pandas as pd from dspy.datasets.dataset import Dataset class CSVDataset(Dataset): def __init__(self, file_path, *args, **kwargs) -> None: super().__init__(*args, **kwargs) df = pd.read_csv(file_path) self._train = df.iloc[0:700].to_dict(orient='records') self._dev = df.iloc[700:].to_dict(orient='records') dataset = CSVDataset("sample.csv") print(dataset.train[:3]) ``` **Output:** ```text [Example({'context': nan, 'question': 'Which is a species of fish? Tope or Rope', 'answer': 'Tope'}) (input_keys={'question', 'context'}), Example({'context': nan, 'question': 'Why can camels survive for long without water?', 'answer': 'Camels use the fat in their humps to keep them filled with energy and hydration for long periods of time.'}) (input_keys={'question', 'context'}), Example({'context': nan, 'question': "Alice's parents have three daughters: Amy, Jessy, and what’s the name of the third daughter?", 'answer': 'The name of the third daughter is Alice'}) (input_keys={'question', 'context'})] ``` Let's understand the code step by step: * It inherits the base `Dataset` class from DSPy. This inherits all the useful data loading/processing functionality. * We load the data in CSV into a DataFrame. * We get the **train** split i.e first 700 rows in the DataFrame and convert it to lists of dicts using `to_dict(orient='records')` method and is then assigned to `self._train`. * We get the **dev** split i.e first 300 rows in the DataFrame and convert it to lists of dicts using `to_dict(orient='records')` method and is then assigned to `self._dev`. Using the Dataset base class now makes loading custom datasets incredibly easy and avoids having to write all that boilerplate code ourselves for every new dataset. !!! caution We did not populate `_test` attribute in the above code, which is fine and won't cause any unnecessary error as such. However it'll give you an error if you try to access the test split. ```python dataset.test[:5] ``` **** ```text --------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-59-5202f6de3c7b> in <cell line: 1>() ----> 1 dataset.test[:5] /usr/local/lib/python3.10/dist-packages/dspy/datasets/dataset.py in test(self) 51 def test(self): 52 if not hasattr(self, '_test_'): ---> 53 self._test_ = self._shuffle_and_sample('test', self._test, self.test_size, self.test_seed) 54 55 return self._test_ AttributeError: 'CSVDataset' object has no attribute '_test' ``` To prevent that you'll just need to make sure `_test` is not `None` and populated with the appropriate data. You can override the methods in `Dataset` class to customize your class even more. In summary, the Dataset base class provides a simplistic way to load and preprocess custom datasets with minimal code! ``` -------------------------------------------------------------------------------- /dspy/predict/avatar/avatar.py: -------------------------------------------------------------------------------- ```python from copy import deepcopy from pydantic.fields import FieldInfo import dspy from dspy.predict.avatar.models import Action, ActionOutput, Tool from dspy.predict.avatar.signatures import Actor from dspy.signatures.signature import ensure_signature def get_number_with_suffix(number: int) -> str: if number == 1: return "1st" elif number == 2: return "2nd" elif number == 3: return "3rd" else: return f"{number}th" class Avatar(dspy.Module): def __init__( self, signature, tools, max_iters=3, verbose=False, ): self.signature = ensure_signature(signature) self.input_fields = self.signature.input_fields self.output_fields = self.signature.output_fields self.finish_tool = Tool( tool=None, name="Finish", desc="returns the final output and finishes the task", ) self.tools = tools + [self.finish_tool] self.actor_signature = Actor for field in list(self.input_fields.keys())[::-1]: self.actor_signature = self.actor_signature.append( field, self._get_field(self.input_fields[field]), type_=self.input_fields[field].annotation, ) self.verbose = verbose self.max_iters = max_iters self.actor = dspy.TypedPredictor(self.actor_signature) self.actor_clone = deepcopy(self.actor) def _get_field(self, field_info: FieldInfo): if field_info.json_schema_extra["__dspy_field_type"] == "input": return dspy.InputField( prefix=field_info.json_schema_extra["prefix"], desc=field_info.json_schema_extra["desc"], format=field_info.json_schema_extra["format"] if "format" in field_info.json_schema_extra else None, ) elif field_info.json_schema_extra["__dspy_field_type"] == "output": return dspy.OutputField( prefix=field_info.json_schema_extra["prefix"], desc=field_info.json_schema_extra["desc"], format=field_info.json_schema_extra["format"] if "format" in field_info.json_schema_extra else None, ) else: raise ValueError(f"Unknown field type: {field_info.json_schema_extra['__dspy_field_type']}") def _update_signature(self, idx: int, omit_action: bool = False): self.actor.signature = self.actor.signature.with_updated_fields( f"action_{idx}", Action, __dspy_field_type="input" ) self.actor.signature = self.actor.signature.append( f"result_{idx}", dspy.InputField( prefix=f"Result {idx}:", desc=f"{get_number_with_suffix(idx)} result", type_=str, ), ) if omit_action: for field in list(self.output_fields.keys()): self.actor.signature = self.actor.signature.append( field, self._get_field(self.output_fields[field]), type_=self.output_fields[field].annotation, ) else: self.actor.signature = self.actor.signature.append( f"action_{idx+1}", dspy.OutputField( prefix=f"Action {idx+1}:", desc=f"{get_number_with_suffix(idx+1)} action to taken", ), ) self.actor.signature = self.actor.signature.with_updated_fields( f"action_{idx+1}", Action, ) def _call_tool(self, tool_name: str, tool_input_query: str) -> str: for tool in self.tools: if tool.name == tool_name: return tool.tool.run(tool_input_query) def forward(self, **kwargs): if self.verbose: print("Starting the task...") args = { "goal": self.signature.__doc__, "tools": [tool.name for tool in self.tools], } for key in self.input_fields.keys(): if key in kwargs: args[key] = kwargs[key] idx = 1 tool_name = None action_results: list[ActionOutput] = [] max_iters = None if "max_iters" not in kwargs else kwargs["max_iters"] while tool_name != "Finish" and (max_iters > 0 if max_iters else True): actor_output = self.actor(**args) action = getattr(actor_output, f"action_{idx}") tool_name = action.tool_name tool_input_query = action.tool_input_query if self.verbose: print(f"Action {idx}: {tool_name} ({tool_input_query})") if tool_name != "Finish": tool_output = self._call_tool(tool_name, tool_input_query) action_results.append( ActionOutput(tool_name=tool_name, tool_input_query=tool_input_query, tool_output=tool_output) ) self._update_signature(idx) args[f"action_{idx}"] = action args[f"result_{idx}"] = tool_output else: self._update_signature(idx, omit_action=True) args[f"action_{idx}"] = action args[f"result_{idx}"] = "Gathered all information needed to finish the task." break idx += 1 if max_iters: max_iters -= 1 final_answer = self.actor(**args) self.actor = deepcopy(self.actor_clone) return dspy.Prediction( **{key: getattr(final_answer, key) for key in self.output_fields.keys()}, actions=action_results, ) ``` -------------------------------------------------------------------------------- /dspy/primitives/runner.js: -------------------------------------------------------------------------------- ```javascript // Adapted from "Simon Willison’s TILs" (https://til.simonwillison.net/deno/pyodide-sandbox) import pyodideModule from "npm:pyodide/pyodide.js"; import { readLines } from "https://deno.land/[email protected]/io/mod.ts"; const pyodide = await pyodideModule.loadPyodide(); try { const env_vars = (Deno.args[0] ?? "").split(",").filter(Boolean); for (const key of env_vars) { const val = Deno.env.get(key); if (val !== undefined) { pyodide.runPython(` import os os.environ[${JSON.stringify(key)}] = ${JSON.stringify(val)} `); } } } catch (e) { console.error("Error setting environment variables in Pyodide:", e); } for await (const line of readLines(Deno.stdin)) { let input; try { input = JSON.parse(line); } catch (error) { console.log(JSON.stringify({ error: "Invalid JSON input: " + error.message, errorType: "ValueError" })); continue; } if (input.mount_file) { const hostPath = input.mount_file; const virtualPath = input.virtual_path || hostPath; try { const contents = await Deno.readFile(hostPath); const dirs = virtualPath.split('/').slice(1, -1); let cur = ''; for (const d of dirs) { cur += '/' + d; try { pyodide.FS.mkdir(cur); } catch (e) { if (!(e && e.message && e.message.includes('File exists'))) { console.log("[DEBUG] Error creating directory in Pyodide file system:", cur, "|", e.message); } } } pyodide.FS.writeFile(virtualPath, contents); } catch (e) { console.log(JSON.stringify({error: "Failed to mount file: " + e.message})); } continue; } if (input.sync_file) { const virtualPath = input.sync_file; const hostPath = input.host_file || virtualPath; try { const contents = pyodide.FS.readFile(virtualPath); await Deno.writeFile(hostPath, contents); } catch (e) { console.log("[DEBUG] Failed to sync file:", hostPath, "|", e.message); } continue; } // Expecting an object like { "code": "...", ... } if (typeof input !== 'object' || input === null) { console.log(JSON.stringify({ error: "Input is not a JSON object", errorType: "ValueError" })); continue; } // Check for shutdown if (input.shutdown) { break; } const code = input.code || ""; // Wrap execution in a try/catch so we can handle syntax errors, etc. try { await pyodide.loadPackagesFromImports(code); // 1. Temporarily override stdout/stderr so we can capture prints. pyodide.runPython(` import sys import io # Keep references to the old stdout/stderr so we can restore them later old_stdout = sys.stdout old_stderr = sys.stderr # New "file-like" buffers buf_stdout = io.StringIO() buf_stderr = io.StringIO() sys.stdout = buf_stdout sys.stderr = buf_stderr `); // 2. Setup proper exception arguments extractor and FinalAnswer bridge // The idea is borrowed from `smolagents` that uses the exception to simulate non-local exit pyodide.runPython(` import json def last_exception_args(): return json.dumps(sys.last_exc.args) if sys.last_exc else None class FinalAnswer(Exception): pass def final_answer(*args): raise FinalAnswer(*args) `); // 3. Run the user's code asynchronously const result = await pyodide.runPythonAsync(code); // 4. Retrieve captured stdout/stderr const capturedStdout = pyodide.runPython("buf_stdout.getvalue()"); const capturedStderr = pyodide.runPython("buf_stderr.getvalue()"); // 5. Restore original stdout/stderr pyodide.runPython(` sys.stdout = old_stdout sys.stderr = old_stderr `); // 6. Build our output object according to the rules: // - If result is None (or Python "None" => JS null), output all prints // - Else output the result only // Note: `None` in Python becomes `null` in JS. let output; if (result === null || result === undefined) { // The final statement was None or no return => deliver printed output // If you want to combine capturedStderr as well, you can append it // But here we'll just do stdout for clarity output = capturedStdout; // If there's something in stderr, you might want to include that or log it // output += capturedStderr; } else { // If the code returned a real value, just return that try { output = result.toJs(); } catch (e) { output = result; } } console.log(JSON.stringify({ output })); } catch (error) { // We have an error => check if it's a SyntaxError or something else // The Python error class name is stored in error.type: https://pyodide.org/en/stable/usage/api/js-api.html#pyodide.ffi.PythonError const errorType = error.type || "Error"; // error.message is mostly blank. const errorMessage = (error.message || "").trim(); // The arguments of the exception are stored in sys.last_exc.args, // which is always helpful but pyodide don't extract them for us. // Use a bridge function to get them. let errorArgs = []; if (errorType !== "SyntaxError") { // Only python exceptions have args. const last_exception_args = pyodide.globals.get("last_exception_args"); // Regarding https://pyodide.org/en/stable/usage/type-conversions.html#type-translations-errors, // we do a additional `json.dumps` and `JSON.parse` on the values, to avoid the possible memory leak. errorArgs = JSON.parse(last_exception_args()) || []; } console.log(JSON.stringify({ error: errorMessage, errorArgs: errorArgs, errorType: errorType })); } } ``` -------------------------------------------------------------------------------- /dspy/streaming/messages.py: -------------------------------------------------------------------------------- ```python import asyncio import concurrent.futures from dataclasses import dataclass from typing import Any from asyncer import syncify from dspy.dsp.utils.settings import settings from dspy.utils.callback import BaseCallback @dataclass class StreamResponse: predict_name: str signature_field_name: str chunk: str is_last_chunk: bool @dataclass class StatusMessage: """Dataclass that wraps a status message for status streaming.""" message: str def sync_send_to_stream(stream, message): """Send message to stream in a sync context, regardless of event loop state.""" async def _send(): await stream.send(message) try: asyncio.get_running_loop() # If we're in an event loop, offload to a new thread with its own event loop def run_in_new_loop(): new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) try: return new_loop.run_until_complete(_send()) finally: new_loop.close() with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(run_in_new_loop) return future.result() except RuntimeError: # Not in an event loop, safe to use a new event loop in this thread return syncify(_send)() class StatusMessageProvider: """Provides customizable status message streaming for DSPy programs. This class serves as a base for creating custom status message providers. Users can subclass and override its methods to define specific status messages for different stages of program execution, each method must return a string. Example: ```python class MyStatusMessageProvider(StatusMessageProvider): def lm_start_status_message(self, instance, inputs): return f"Calling LM with inputs {inputs}..." def module_end_status_message(self, outputs): return f"Module finished with output: {outputs}!" program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider()) ``` """ def tool_start_status_message(self, instance: Any, inputs: dict[str, Any]): """Status message before a `dspy.Tool` is called.""" return f"Calling tool {instance.name}..." def tool_end_status_message(self, outputs: Any): """Status message after a `dspy.Tool` is called.""" return "Tool calling finished! Querying the LLM with tool calling results..." def module_start_status_message(self, instance: Any, inputs: dict[str, Any]): """Status message before a `dspy.Module` or `dspy.Predict` is called.""" pass def module_end_status_message(self, outputs: Any): """Status message after a `dspy.Module` or `dspy.Predict` is called.""" pass def lm_start_status_message(self, instance: Any, inputs: dict[str, Any]): """Status message before a `dspy.LM` is called.""" pass def lm_end_status_message(self, outputs: Any): """Status message after a `dspy.LM` is called.""" pass class StatusStreamingCallback(BaseCallback): def __init__(self, status_message_provider: StatusMessageProvider | None = None): self.status_message_provider = status_message_provider or StatusMessageProvider() def on_tool_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): stream = settings.send_stream if stream is None or instance.name == "finish": return status_message = self.status_message_provider.tool_start_status_message(instance, inputs) if status_message: sync_send_to_stream(stream, StatusMessage(status_message)) def on_tool_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): stream = settings.send_stream if stream is None or outputs == "Completed.": return status_message = self.status_message_provider.tool_end_status_message(outputs) if status_message: sync_send_to_stream(stream, StatusMessage(status_message)) def on_lm_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): stream = settings.send_stream if stream is None: return status_message = self.status_message_provider.lm_start_status_message(instance, inputs) if status_message: sync_send_to_stream(stream, StatusMessage(status_message)) def on_lm_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): stream = settings.send_stream if stream is None: return status_message = self.status_message_provider.lm_end_status_message(outputs) if status_message: sync_send_to_stream(stream, StatusMessage(status_message)) def on_module_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): stream = settings.send_stream if stream is None: return status_message = self.status_message_provider.module_start_status_message(instance, inputs) if status_message: sync_send_to_stream(stream, StatusMessage(status_message)) def on_module_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): stream = settings.send_stream if stream is None: return status_message = self.status_message_provider.module_end_status_message(outputs) if status_message: sync_send_to_stream(stream, StatusMessage(status_message)) ``` -------------------------------------------------------------------------------- /dspy/retrievers/weaviate_rm.py: -------------------------------------------------------------------------------- ```python import dspy from dspy.dsp.utils import dotdict from dspy.primitives.prediction import Prediction try: from uuid import uuid4 import weaviate from weaviate.util import get_valid_uuid except ImportError as err: raise ImportError( "The 'weaviate' extra is required to use WeaviateRM. Install it with `pip install dspy-ai[weaviate]`", ) from err class WeaviateRM(dspy.Retrieve): """A retrieval module that uses Weaviate to return the top passages for a given query. Assumes that a Weaviate collection has been created and populated with the following payload: - content: The text of the passage Args: weaviate_collection_name (str): The name of the Weaviate collection. weaviate_client (WeaviateClient): An instance of the Weaviate client. k (int, optional): The default number of top passages to retrieve. Default to 3. tenant_id (str, optional): The tenant to retrieve objects from. Examples: Below is a code snippet that shows how to use Weaviate as the default retriever: ```python import weaviate llm = dspy.Cohere(model="command-r-plus", api_key=api_key) weaviate_client = weaviate.connect_to_[local, wcs, custom, embedded]("your-path-here") retriever_model = WeaviateRM("my_collection_name", weaviate_client=weaviate_client) dspy.settings.configure(lm=llm, rm=retriever_model) retrieve = dspy.Retrieve(k=1) topK_passages = retrieve("what are the stages in planning, sanctioning and execution of public works").passages ``` Below is a code snippet that shows how to use Weaviate in the forward() function of a module ```python self.retrieve = WeaviateRM("my_collection_name", weaviate_client=weaviate_client, k=num_passages) ``` """ def __init__( self, weaviate_collection_name: str, weaviate_client: weaviate.WeaviateClient | weaviate.Client, weaviate_collection_text_key: str | None = "content", k: int = 3, tenant_id: str | None = None, ): self._weaviate_collection_name = weaviate_collection_name self._weaviate_client = weaviate_client self._weaviate_collection = self._weaviate_client.collections.get(self._weaviate_collection_name) self._weaviate_collection_text_key = weaviate_collection_text_key self._tenant_id = tenant_id # Check the type of weaviate_client (this is added to support v3 and v4) if hasattr(weaviate_client, "collections"): self._client_type = "WeaviateClient" elif hasattr(weaviate_client, "query"): self._client_type = "Client" else: raise ValueError("Unsupported Weaviate client type") super().__init__(k=k) def forward(self, query_or_queries: str | list[str], k: int | None = None, **kwargs) -> Prediction: """Search with Weaviate for self.k top passages for query or queries. Args: query_or_queries (Union[str, list[str]]): The query or queries to search for. k (Optional[int]): The number of top passages to retrieve. Defaults to self.k. kwargs : Returns: dspy.Prediction: An object containing the retrieved passages. """ k = k if k is not None else self.k queries = [query_or_queries] if isinstance(query_or_queries, str) else query_or_queries queries = [q for q in queries if q] passages, parsed_results = [], [] tenant = kwargs.pop("tenant_id", self._tenant_id) for query in queries: if self._client_type == "WeaviateClient": if tenant: results = self._weaviate_collection.query.with_tenant(tenant).hybrid(query=query, limit=k, **kwargs) else: results = self._weaviate_collection.query.hybrid(query=query, limit=k, **kwargs) parsed_results = [result.properties[self._weaviate_collection_text_key] for result in results.objects] elif self._client_type == "Client": q = self._weaviate_client.query.get( self._weaviate_collection_name, [self._weaviate_collection_text_key] ) if tenant: q = q.with_tenant(tenant) results = q.with_hybrid(query=query).with_limit(k).do() results = results["data"]["Get"][self._weaviate_collection_name] parsed_results = [result[self._weaviate_collection_text_key] for result in results] passages.extend(dotdict({"long_text": d}) for d in parsed_results) return passages def get_objects(self, num_samples: int, fields: list[str]) -> list[dict]: """Get objects from Weaviate using the cursor API.""" if self._client_type == "WeaviateClient": objects = [] counter = 0 for item in self._weaviate_collection.iterator(): # TODO: add tenancy scoping if counter >= num_samples: break new_object = {} for key in item.properties.keys(): if key in fields: new_object[key] = item.properties[key] objects.append(new_object) counter += 1 return objects else: raise ValueError("`get_objects` is not supported for the v3 Weaviate Python client, please upgrade to v4.") def insert(self, new_object_properties: dict): if self._client_type == "WeaviateClient": self._weaviate_collection.data.insert( properties=new_object_properties, uuid=get_valid_uuid(uuid4()) ) # TODO: add tenancy scoping else: raise AttributeError("`insert` is not supported for the v3 Weaviate Python client, please upgrade to v4.") ``` -------------------------------------------------------------------------------- /docs/docs/deep-dive/data-handling/built-in-datasets.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 2 --- !!! warning "This page is outdated and may not be fully accurate in DSPy 2.5" # Utilizing Built-in Datasets It's easy to use your own data in DSPy: a dataset is just a list of `Example` objects. Using DSPy well involves being able to find and re-purpose existing datasets for your own pipelines in new ways; DSPy makes this a particularly powerful strategy. For convenience, DSPy currently also provides support for the following dataset out of the box: * **HotPotQA** (multi-hop question answering) * **GSM8k** (math questions) * **Color** (basic dataset of colors) ## Loading HotPotQA HotPotQA is which is a collection of question-answer pairs. ```python from dspy.datasets import HotPotQA dataset = HotPotQA(train_seed=1, train_size=5, eval_seed=2023, dev_size=50, test_size=0) print(dataset.train) ``` **Output:** ```text [Example({'question': 'At My Window was released by which American singer-songwriter?', 'answer': 'John Townes Van Zandt'}) (input_keys=None), Example({'question': 'which American actor was Candace Kita guest starred with ', 'answer': 'Bill Murray'}) (input_keys=None), Example({'question': 'Which of these publications was most recently published, Who Put the Bomp or Self?', 'answer': 'Self'}) (input_keys=None), Example({'question': 'The Victorians - Their Story In Pictures is a documentary series written by an author born in what year?', 'answer': '1950'}) (input_keys=None), Example({'question': 'Which magazine has published articles by Scott Shaw, Tae Kwon Do Times or Southwest Art?', 'answer': 'Tae Kwon Do Times'}) (input_keys=None)] ``` We just loaded trainset (5 examples) and devset (50 examples). Each example in our training set contains just a question and its (human-annotated) answer. As you can see, it is loaded as a list of `Example` objects. However, one thing to note is that it doesn't set the input keys implicitly, so that is something that we'll need to do!! ```python trainset = [x.with_inputs('question') for x in dataset.train] devset = [x.with_inputs('question') for x in dataset.dev] print(trainset) ``` **Output:** ```text [Example({'question': 'At My Window was released by which American singer-songwriter?', 'answer': 'John Townes Van Zandt'}) (input_keys={'question'}), Example({'question': 'which American actor was Candace Kita guest starred with ', 'answer': 'Bill Murray'}) (input_keys={'question'}), Example({'question': 'Which of these publications was most recently published, Who Put the Bomp or Self?', 'answer': 'Self'}) (input_keys={'question'}), Example({'question': 'The Victorians - Their Story In Pictures is a documentary series written by an author born in what year?', 'answer': '1950'}) (input_keys={'question'}), Example({'question': 'Which magazine has published articles by Scott Shaw, Tae Kwon Do Times or Southwest Art?', 'answer': 'Tae Kwon Do Times'}) (input_keys={'question'})] ``` DSPy typically requires very minimal labeling. Whereas your pipeline may involve six or seven complex steps, you only need labels for the initial question and the final answer. DSPy will bootstrap any intermediate labels needed to support your pipeline. If you change your pipeline in any way, the data bootstrapped will change accordingly! ## Advanced: Inside DSPy's `Dataset` class (Optional) We've seen how you can use `HotPotQA` dataset class and load the HotPotQA dataset, but how does it actually work? The `HotPotQA` class inherits from the `Dataset` class, which takes care of the conversion of the data loaded from a source into train-test-dev split, all of which are *list of examples*. In the `HotPotQA` class, you only implement the `__init__` method, where you populate the splits from HuggingFace into the variables `_train`, `_test` and `_dev`. The rest of the process is handled by methods in the `Dataset` class.  But how do the methods of the `Dataset` class convert the data from HuggingFace? Let's take a deep breath and think step by step...pun intended. In example above, we can see the splits accessed by `.train`, `.dev` and `.test` methods, so let's take a look at the implementation of the `train()` method: ```python @property def train(self): if not hasattr(self, '_train_'): self._train_ = self._shuffle_and_sample('train', self._train, self.train_size, self.train_seed) return self._train_ ``` As you can see, the `train()` method serves as a property, not a regular method. Within this property, it first checks if the `_train_` attribute exists. If not, it calls the `_shuffle_and_sample()` method to process the `self._train` where the HuggingFace dataset is loaded. Let's see the `_shuffle_and_sample()` method: ```python def _shuffle_and_sample(self, split, data, size, seed=0): data = list(data) base_rng = random.Random(seed) if self.do_shuffle: base_rng.shuffle(data) data = data[:size] output = [] for example in data: output.append(Example(**example, dspy_uuid=str(uuid.uuid4()), dspy_split=split)) return output ``` The `_shuffle_and_sample()` method does two things: * It shuffles the data if `self.do_shuffle` is True. * It then takes a sample of size `size` from the shuffled data. * It then loops through the sampled data and converts each element in `data` into an `Example` object. The `Example` along with example data also contains a unique ID, and the split name. Converting the raw examples into `Example` objects allows the Dataset class to process them in a standardized way later. For example, the collate method, which is used by the PyTorch DataLoader, expects each item to be an `Example`. To summarize, the `Dataset` class handles all the necessary data processing and provides a simple API to access the different splits. This differentiates from the dataset classes like HotpotQA which require only definitions on how to load the raw data. ``` -------------------------------------------------------------------------------- /tests/adapters/test_two_step_adapter.py: -------------------------------------------------------------------------------- ```python from unittest import mock import pytest import dspy def test_two_step_adapter_call(): class TestSignature(dspy.Signature): question: str = dspy.InputField(desc="The math question to solve") solution: str = dspy.OutputField(desc="Step by step solution") answer: float = dspy.OutputField(desc="The final numerical answer") program = dspy.Predict(TestSignature) mock_main_lm = mock.MagicMock(spec=dspy.LM) mock_main_lm.return_value = ["text from main LM"] mock_main_lm.kwargs = {"temperature": 1.0} mock_extraction_lm = mock.MagicMock(spec=dspy.LM) mock_extraction_lm.return_value = [ """ [[ ## solution ## ]] result [[ ## answer ## ]] 12 [[ ## completed ## ]] """ ] mock_extraction_lm.kwargs = {"temperature": 1.0} mock_extraction_lm.model = "openai/gpt-4o" dspy.configure(lm=mock_main_lm, adapter=dspy.TwoStepAdapter(extraction_model=mock_extraction_lm)) result = program(question="What is 5 + 7?") assert result.answer == 12 # main LM call mock_main_lm.assert_called_once() _, call_kwargs = mock_main_lm.call_args assert len(call_kwargs["messages"]) == 2 # assert first message assert call_kwargs["messages"][0]["role"] == "system" content = call_kwargs["messages"][0]["content"] assert "1. `question` (str)" in content assert "1. `solution` (str)" in content assert "2. `answer` (float)" in content # assert second message assert call_kwargs["messages"][1]["role"] == "user" content = call_kwargs["messages"][1]["content"] assert "question:" in content.lower() assert "What is 5 + 7?" in content # extraction LM call mock_extraction_lm.assert_called_once() _, call_kwargs = mock_extraction_lm.call_args assert len(call_kwargs["messages"]) == 2 # assert first message assert call_kwargs["messages"][0]["role"] == "system" content = call_kwargs["messages"][0]["content"] assert "`text` (str)" in content assert "`solution` (str)" in content assert "`answer` (float)" in content # assert second message assert call_kwargs["messages"][1]["role"] == "user" content = call_kwargs["messages"][1]["content"] assert "text from main LM" in content @pytest.mark.asyncio async def test_two_step_adapter_async_call(): class TestSignature(dspy.Signature): question: str = dspy.InputField(desc="The math question to solve") solution: str = dspy.OutputField(desc="Step by step solution") answer: float = dspy.OutputField(desc="The final numerical answer") program = dspy.Predict(TestSignature) mock_main_lm = mock.MagicMock(spec=dspy.LM) mock_main_lm.acall.return_value = ["text from main LM"] mock_main_lm.kwargs = {"temperature": 1.0} mock_extraction_lm = mock.MagicMock(spec=dspy.LM) mock_extraction_lm.acall.return_value = [ """ [[ ## solution ## ]] result [[ ## answer ## ]] 12 [[ ## completed ## ]] """ ] mock_extraction_lm.kwargs = {"temperature": 1.0} mock_extraction_lm.model = "openai/gpt-4o" with dspy.context(lm=mock_main_lm, adapter=dspy.TwoStepAdapter(extraction_model=mock_extraction_lm)): result = await program.acall(question="What is 5 + 7?") assert result.answer == 12 # main LM call mock_main_lm.acall.assert_called_once() _, call_kwargs = mock_main_lm.acall.call_args assert len(call_kwargs["messages"]) == 2 # assert first message assert call_kwargs["messages"][0]["role"] == "system" content = call_kwargs["messages"][0]["content"] assert "1. `question` (str)" in content assert "1. `solution` (str)" in content assert "2. `answer` (float)" in content # assert second message assert call_kwargs["messages"][1]["role"] == "user" content = call_kwargs["messages"][1]["content"] assert "question:" in content.lower() assert "What is 5 + 7?" in content # extraction LM call mock_extraction_lm.acall.assert_called_once() _, call_kwargs = mock_extraction_lm.acall.call_args assert len(call_kwargs["messages"]) == 2 # assert first message assert call_kwargs["messages"][0]["role"] == "system" content = call_kwargs["messages"][0]["content"] assert "`text` (str)" in content assert "`solution` (str)" in content assert "`answer` (float)" in content # assert second message assert call_kwargs["messages"][1]["role"] == "user" content = call_kwargs["messages"][1]["content"] assert "text from main LM" in content def test_two_step_adapter_parse(): class ComplexSignature(dspy.Signature): input_text: str = dspy.InputField() tags: list[str] = dspy.OutputField(desc="List of relevant tags") confidence: float = dspy.OutputField(desc="Confidence score") first_response = "main LM response" mock_extraction_lm = mock.MagicMock(spec=dspy.LM) mock_extraction_lm.return_value = [ """ { "tags": ["AI", "deep learning", "neural networks"], "confidence": 0.87 } """ ] mock_extraction_lm.kwargs = {"temperature": 1.0} mock_extraction_lm.model = "openai/gpt-4o" adapter = dspy.TwoStepAdapter(mock_extraction_lm) dspy.configure(adapter=adapter, lm=mock_extraction_lm) result = adapter.parse(ComplexSignature, first_response) assert result["tags"] == ["AI", "deep learning", "neural networks"] assert result["confidence"] == 0.87 def test_two_step_adapter_parse_errors(): class TestSignature(dspy.Signature): question: str = dspy.InputField() answer: str = dspy.OutputField() first_response = "main LM response" mock_extraction_lm = mock.MagicMock(spec=dspy.LM) mock_extraction_lm.return_value = ["invalid response"] mock_extraction_lm.kwargs = {"temperature": 1.0} mock_extraction_lm.model = "openai/gpt-4o" adapter = dspy.TwoStepAdapter(mock_extraction_lm) with pytest.raises(ValueError, match="Failed to parse response"): adapter.parse(TestSignature, first_response) ```