This is page 4 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /tests/adapters/test_citation.py: -------------------------------------------------------------------------------- ```python import pydantic import pytest import dspy from dspy.experimental import Citations def test_citation_validate_input(): citation = Citations.Citation( cited_text="The Earth orbits the Sun.", document_index=0, start_char_index=0, end_char_index=23, supported_text="The Earth orbits the Sun." ) assert citation.cited_text == "The Earth orbits the Sun." assert citation.document_index == 0 assert citation.start_char_index == 0 assert citation.end_char_index == 23 assert citation.type == "char_location" assert citation.supported_text == "The Earth orbits the Sun." with pytest.raises(pydantic.ValidationError): Citations.Citation(cited_text="text") def test_citations_in_nested_type(): class Wrapper(pydantic.BaseModel): citations: Citations citation = Citations.Citation( cited_text="Hello, world!", document_index=0, start_char_index=0, end_char_index=13, supported_text="Hello, world!" ) citations = Citations(citations=[citation]) wrapper = Wrapper(citations=citations) assert wrapper.citations.citations[0].cited_text == "Hello, world!" def test_citation_with_all_fields(): citation = Citations.Citation( cited_text="Water boils at 100°C.", document_index=1, document_title="Physics Facts", start_char_index=10, end_char_index=31, supported_text="Water boils at 100°C." ) assert citation.cited_text == "Water boils at 100°C." assert citation.document_index == 1 assert citation.document_title == "Physics Facts" assert citation.start_char_index == 10 assert citation.end_char_index == 31 assert citation.supported_text == "Water boils at 100°C." def test_citation_format(): citation = Citations.Citation( cited_text="The sky is blue.", document_index=0, document_title="Weather Guide", start_char_index=5, end_char_index=21, supported_text="The sky is blue." ) formatted = citation.format() assert formatted["type"] == "char_location" assert formatted["cited_text"] == "The sky is blue." assert formatted["document_index"] == 0 assert formatted["document_title"] == "Weather Guide" assert formatted["start_char_index"] == 5 assert formatted["end_char_index"] == 21 assert formatted["supported_text"] == "The sky is blue." def test_citations_format(): citations = Citations(citations=[ Citations.Citation( cited_text="First citation", document_index=0, start_char_index=0, end_char_index=14, supported_text="First citation" ), Citations.Citation( cited_text="Second citation", document_index=1, document_title="Source", start_char_index=20, end_char_index=35, supported_text="Second citation" ) ]) formatted = citations.format() assert isinstance(formatted, list) assert len(formatted) == 2 assert formatted[0]["cited_text"] == "First citation" assert formatted[1]["cited_text"] == "Second citation" assert formatted[1]["document_title"] == "Source" def test_citations_from_dict_list(): citations_data = [ { "cited_text": "The sky is blue", "document_index": 0, "document_title": "Weather Guide", "start_char_index": 0, "end_char_index": 15, "supported_text": "The sky was blue yesterday." } ] citations = Citations.from_dict_list(citations_data) assert len(citations.citations) == 1 assert citations.citations[0].cited_text == "The sky is blue" assert citations.citations[0].document_title == "Weather Guide" def test_citations_postprocessing(): from dspy.adapters.chat_adapter import ChatAdapter from dspy.signatures.signature import Signature class CitationSignature(Signature): """Test signature with citations.""" question: str = dspy.InputField() answer: str = dspy.OutputField() citations: Citations = dspy.OutputField() adapter = ChatAdapter(native_response_types=[Citations]) outputs = [{ "text": "[[ ## answer ## ]]\nThe answer is blue.\n\n[[ ## citations ## ]]\n[]", "citations": [ { "cited_text": "The sky is blue", "document_index": 0, "document_title": "Weather Guide", "start_char_index": 10, "end_char_index": 25, "supported_text": "The sky is blue" } ] }] result = adapter._call_postprocess( CitationSignature.delete("citations"), CitationSignature, outputs, dspy.LM(model="claude-3-5-sonnet-20241022") ) assert len(result) == 1 assert "citations" in result[0] assert isinstance(result[0]["citations"], Citations) assert len(result[0]["citations"]) == 1 assert result[0]["citations"][0].cited_text == "The sky is blue" def test_citation_extraction_from_lm_response(): from unittest.mock import MagicMock mock_choice = MagicMock(message=MagicMock(provider_specific_fields={"citations": [[ { "type": "char_location", "cited_text": "The sky is blue", "document_index": 0, "document_title": "Weather Guide", "start_char_index": 10, "end_char_index": 25, "supported_text": "The sky is blue" } ]]})) lm = dspy.LM(model="test") citations = lm._extract_citations_from_response(mock_choice) assert citations is not None assert len(citations) == 1 assert citations[0]["cited_text"] == "The sky is blue" assert citations[0]["document_index"] == 0 assert citations[0]["document_title"] == "Weather Guide" assert citations[0]["start_char_index"] == 10 assert citations[0]["end_char_index"] == 25 assert citations[0]["supported_text"] == "The sky is blue" ``` -------------------------------------------------------------------------------- /dspy/primitives/prediction.py: -------------------------------------------------------------------------------- ```python from dspy.primitives.example import Example class Prediction(Example): """A prediction object that contains the output of a DSPy module. Prediction inherits from Example. To allow feedback-augmented scores, Prediction supports comparison operations (<, >, <=, >=) for Predictions with a `score` field. The comparison operations compare the 'score' values as floats. For equality comparison, Predictions are equal if their underlying data stores are equal (inherited from Example). Arithmetic operations (+, /, etc.) are also supported for Predictions with a 'score' field, operating on the score value. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) del self._demos del self._input_keys self._completions = None self._lm_usage = None def get_lm_usage(self): return self._lm_usage def set_lm_usage(self, value): self._lm_usage = value @classmethod def from_completions(cls, list_or_dict, signature=None): obj = cls() obj._completions = Completions(list_or_dict, signature=signature) obj._store = {k: v[0] for k, v in obj._completions.items()} return obj def __repr__(self): store_repr = ",\n ".join(f"{k}={v!r}" for k, v in self._store.items()) if self._completions is None or len(self._completions) == 1: return f"Prediction(\n {store_repr}\n)" num_completions = len(self._completions) return f"Prediction(\n {store_repr},\n completions=Completions(...)\n) ({num_completions-1} completions omitted)" def __str__(self): return self.__repr__() def __float__(self): if "score" not in self._store: raise ValueError("Prediction object does not have a 'score' field to convert to float.") return float(self._store["score"]) def __add__(self, other): if isinstance(other, (float, int)): return self.__float__() + other elif isinstance(other, Prediction): return self.__float__() + float(other) raise TypeError(f"Unsupported type for addition: {type(other)}") def __radd__(self, other): if isinstance(other, (float, int)): return other + self.__float__() elif isinstance(other, Prediction): return float(other) + self.__float__() raise TypeError(f"Unsupported type for addition: {type(other)}") def __truediv__(self, other): if isinstance(other, (float, int)): return self.__float__() / other elif isinstance(other, Prediction): return self.__float__() / float(other) raise TypeError(f"Unsupported type for division: {type(other)}") def __rtruediv__(self, other): if isinstance(other, (float, int)): return other / self.__float__() elif isinstance(other, Prediction): return float(other) / self.__float__() raise TypeError(f"Unsupported type for division: {type(other)}") def __lt__(self, other): if isinstance(other, (float, int)): return self.__float__() < other elif isinstance(other, Prediction): return self.__float__() < float(other) raise TypeError(f"Unsupported type for comparison: {type(other)}") def __le__(self, other): if isinstance(other, (float, int)): return self.__float__() <= other elif isinstance(other, Prediction): return self.__float__() <= float(other) raise TypeError(f"Unsupported type for comparison: {type(other)}") def __gt__(self, other): if isinstance(other, (float, int)): return self.__float__() > other elif isinstance(other, Prediction): return self.__float__() > float(other) raise TypeError(f"Unsupported type for comparison: {type(other)}") def __ge__(self, other): if isinstance(other, (float, int)): return self.__float__() >= other elif isinstance(other, Prediction): return self.__float__() >= float(other) raise TypeError(f"Unsupported type for comparison: {type(other)}") @property def completions(self): return self._completions class Completions: def __init__(self, list_or_dict, signature=None): self.signature = signature if isinstance(list_or_dict, list): kwargs = {} for arg in list_or_dict: for k, v in arg.items(): kwargs.setdefault(k, []).append(v) else: kwargs = list_or_dict assert all(isinstance(v, list) for v in kwargs.values()), "All values must be lists" if kwargs: length = len(next(iter(kwargs.values()))) assert all(len(v) == length for v in kwargs.values()), "All lists must have the same length" self._completions = kwargs def items(self): return self._completions.items() def __getitem__(self, key): if isinstance(key, int): if key < 0 or key >= len(self): raise IndexError("Index out of range") return Prediction(**{k: v[key] for k, v in self._completions.items()}) return self._completions[key] def __getattr__(self, name): if name == "_completions": raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") if name in self._completions: return self._completions[name] raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'") def __len__(self): # Return the length of the list for one of the keys # It assumes all lists have the same length return len(next(iter(self._completions.values()))) def __contains__(self, key): return key in self._completions def __repr__(self): items_repr = ",\n ".join(f"{k}={v!r}" for k, v in self._completions.items()) return f"Completions(\n {items_repr}\n)" def __str__(self): # return str(self._completions) return self.__repr__() ``` -------------------------------------------------------------------------------- /tests/teleprompt/test_copro_optimizer.py: -------------------------------------------------------------------------------- ```python import dspy from dspy import Example from dspy.teleprompt.signature_opt import COPRO from dspy.utils.dummies import DummyLM # Define a simple metric function for testing def simple_metric(example, prediction): # Simplified metric for testing: true if prediction matches expected output return example.output == prediction.output # Example training and validation sets trainset = [ Example(input="Question: What is the color of the sky?", output="blue").with_inputs("input"), Example(input="Question: What does the fox say?", output="Ring-ding-ding-ding-dingeringeding!").with_inputs( "input" ), ] def test_signature_optimizer_initialization(): optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4) assert optimizer.metric == simple_metric, "Metric not correctly initialized" assert optimizer.breadth == 2, "Breadth not correctly initialized" assert optimizer.depth == 1, "Depth not correctly initialized" assert optimizer.init_temperature == 1.4, "Initial temperature not correctly initialized" class SimpleModule(dspy.Module): def __init__(self, signature): super().__init__() # COPRO doesn't work with dspy.Predict self.predictor = dspy.ChainOfThought(signature) def forward(self, **kwargs): return self.predictor(**kwargs) def test_signature_optimizer_optimization_process(): optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4) dspy.settings.configure( lm=DummyLM( [ { "proposed_instruction": "Optimized instruction 1", "proposed_prefix_for_output_field": "Optimized instruction 2", }, ] ) ) student = SimpleModule("input -> output") # Assuming the compile method of COPRO requires a student module, a development set, and evaluation kwargs optimized_student = optimizer.compile( student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False} ) # Check that the optimized student has been modified from the original # This check can be more specific based on how the optimization modifies the student assert optimized_student is not student, "Optimization did not modify the student" # Further tests can be added to verify the specifics of the optimization process, # such as checking the instructions of the optimized student's predictors. def test_signature_optimizer_statistics_tracking(): optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4) optimizer.track_stats = True # Enable statistics tracking dspy.settings.configure( lm=DummyLM( [ { "proposed_instruction": "Optimized instruction 1", "proposed_prefix_for_output_field": "Optimized instruction 2", }, ] ) ) student = SimpleModule("input -> output") optimized_student = optimizer.compile( student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False} ) # Verify that statistics have been tracked and attached to the optimized student assert hasattr(optimized_student, "total_calls"), "Total calls statistic not tracked" assert hasattr(optimized_student, "results_best"), "Best results statistics not tracked" # Assuming the setup_signature_optimizer fixture and simple_metric function are defined as before def test_optimization_and_output_verification(): lm = DummyLM( [ {"proposed_instruction": "Optimized Prompt", "proposed_prefix_for_output_field": "Optimized Prefix"}, {"reasoning": "france", "output": "Paris"}, {"reasoning": "france", "output": "Paris"}, {"reasoning": "france", "output": "Paris"}, {"reasoning": "france", "output": "Paris"}, {"reasoning": "france", "output": "Paris"}, {"reasoning": "france", "output": "Paris"}, {"reasoning": "france", "output": "Paris"}, ] ) dspy.settings.configure(lm=lm) optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4) student = SimpleModule("input -> output") # Compile the student with the optimizer optimized_student = optimizer.compile( student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False} ) # Simulate calling the optimized student with a new input test_input = "What is the capital of France?" prediction = optimized_student(input=test_input) print(lm.get_convo(-1)) assert prediction.output == "Paris" def test_statistics_tracking_during_optimization(): dspy.settings.configure( lm=DummyLM( [ {"proposed_instruction": "Optimized Prompt", "proposed_prefix_for_output_field": "Optimized Prefix"}, ] ) ) optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4) optimizer.track_stats = True # Enable statistics tracking student = SimpleModule("input -> output") optimized_student = optimizer.compile( student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False} ) # Verify that statistics have been tracked assert hasattr(optimized_student, "total_calls"), "Optimizer did not track total metric calls" assert optimized_student.total_calls > 0, "Optimizer reported no metric calls" # Check if the results_best and results_latest contain valid statistics assert "results_best" in optimized_student.__dict__, "Optimizer did not track the best results" assert "results_latest" in optimized_student.__dict__, "Optimizer did not track the latest results" assert len(optimized_student.results_best) > 0, "Optimizer did not properly populate the best results statistics" assert ( len(optimized_student.results_latest) > 0 ), "Optimizer did not properly populate the latest results statistics" # Additional detailed checks can be added here to verify the contents of the tracked statistics ``` -------------------------------------------------------------------------------- /docs/docs/learn/evaluation/metrics.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 5 --- # Metrics DSPy is a machine learning framework, so you must think about your **automatic metrics** for evaluation (to track your progress) and optimization (so DSPy can make your programs more effective). ## What is a metric and how do I define a metric for my task? A metric is just a function that will take examples from your data and the output of your system and return a score that quantifies how good the output is. What makes outputs from your system good or bad? For simple tasks, this could be just "accuracy" or "exact match" or "F1 score". This may be the case for simple classification or short-form QA tasks. However, for most applications, your system will output long-form outputs. There, your metric should probably be a smaller DSPy program that checks multiple properties of the output (quite possibly using AI feedback from LMs). Getting this right on the first try is unlikely, but you should start with something simple and iterate. ## Simple metrics A DSPy metric is just a function in Python that takes `example` (e.g., from your training or dev set) and the output `pred` from your DSPy program, and outputs a `float` (or `int` or `bool`) score. Your metric should also accept an optional third argument called `trace`. You can ignore this for a moment, but it will enable some powerful tricks if you want to use your metric for optimization. Here's a simple example of a metric that's comparing `example.answer` and `pred.answer`. This particular metric will return a `bool`. ```python def validate_answer(example, pred, trace=None): return example.answer.lower() == pred.answer.lower() ``` Some people find these utilities (built-in) convenient: - `dspy.evaluate.metrics.answer_exact_match` - `dspy.evaluate.metrics.answer_passage_match` Your metrics could be more complex, e.g. check for multiple properties. The metric below will return a `float` if `trace is None` (i.e., if it's used for evaluation or optimization), and will return a `bool` otherwise (i.e., if it's used to bootstrap demonstrations). ```python def validate_context_and_answer(example, pred, trace=None): # check the gold label and the predicted answer are the same answer_match = example.answer.lower() == pred.answer.lower() # check the predicted answer comes from one of the retrieved contexts context_match = any((pred.answer.lower() in c) for c in pred.context) if trace is None: # if we're doing evaluation or optimization return (answer_match + context_match) / 2.0 else: # if we're doing bootstrapping, i.e. self-generating good demonstrations of each step return answer_match and context_match ``` Defining a good metric is an iterative process, so doing some initial evaluations and looking at your data and outputs is key. ## Evaluation Once you have a metric, you can run evaluations in a simple Python loop. ```python scores = [] for x in devset: pred = program(**x.inputs()) score = metric(x, pred) scores.append(score) ``` If you need some utilities, you can also use the built-in `Evaluate` utility. It can help with things like parallel evaluation (multiple threads) or showing you a sample of inputs/outputs and the metric scores. ```python from dspy.evaluate import Evaluate # Set up the evaluator, which can be re-used in your code. evaluator = Evaluate(devset=YOUR_DEVSET, num_threads=1, display_progress=True, display_table=5) # Launch evaluation. evaluator(YOUR_PROGRAM, metric=YOUR_METRIC) ``` ## Intermediate: Using AI feedback for your metric For most applications, your system will output long-form outputs, so your metric should check multiple dimensions of the output using AI feedback from LMs. This simple signature could come in handy. ```python # Define the signature for automatic assessments. class Assess(dspy.Signature): """Assess the quality of a tweet along the specified dimension.""" assessed_text = dspy.InputField() assessment_question = dspy.InputField() assessment_answer: bool = dspy.OutputField() ``` For example, below is a simple metric that checks a generated tweet (1) answers a given question correctly and (2) whether it's also engaging. We also check that (3) `len(tweet) <= 280` characters. ```python def metric(gold, pred, trace=None): question, answer, tweet = gold.question, gold.answer, pred.output engaging = "Does the assessed text make for a self-contained, engaging tweet?" correct = f"The text should answer `{question}` with `{answer}`. Does the assessed text contain this answer?" correct = dspy.Predict(Assess)(assessed_text=tweet, assessment_question=correct) engaging = dspy.Predict(Assess)(assessed_text=tweet, assessment_question=engaging) correct, engaging = [m.assessment_answer for m in [correct, engaging]] score = (correct + engaging) if correct and (len(tweet) <= 280) else 0 if trace is not None: return score >= 2 return score / 2.0 ``` When compiling, `trace is not None`, and we want to be strict about judging things, so we will only return `True` if `score >= 2`. Otherwise, we return a score out of 1.0 (i.e., `score / 2.0`). ## Advanced: Using a DSPy program as your metric If your metric is itself a DSPy program, one of the most powerful ways to iterate is to compile (optimize) your metric itself. That's usually easy because the output of the metric is usually a simple value (e.g., a score out of 5) so the metric's metric is easy to define and optimize by collecting a few examples. ### Advanced: Accessing the `trace` When your metric is used during evaluation runs, DSPy will not try to track the steps of your program. But during compiling (optimization), DSPy will trace your LM calls. The trace will contain inputs/outputs to each DSPy predictor and you can leverage that to validate intermediate steps for optimization. ```python def validate_hops(example, pred, trace=None): hops = [example.question] + [outputs.query for *_, outputs in trace if 'query' in outputs] if max([len(h) for h in hops]) > 100: return False if any(dspy.evaluate.answer_exact_match_str(hops[idx], hops[:idx], frac=0.8) for idx in range(2, len(hops))): return False return True ``` ``` -------------------------------------------------------------------------------- /tests/predict/test_parallel.py: -------------------------------------------------------------------------------- ```python import dspy from dspy.utils.dummies import DummyLM def test_parallel_module(): lm = DummyLM( [ {"output": "test output 1"}, {"output": "test output 2"}, {"output": "test output 3"}, {"output": "test output 4"}, {"output": "test output 5"}, ] ) dspy.settings.configure(lm=lm) class MyModule(dspy.Module): def __init__(self): super().__init__() self.predictor = dspy.Predict("input -> output") self.predictor2 = dspy.Predict("input -> output") self.parallel = dspy.Parallel(num_threads=2) def forward(self, input): return self.parallel( [ (self.predictor, input), (self.predictor2, input), (self.predictor, input), (self.predictor2, input), (self.predictor, input), ] ) output = MyModule()(dspy.Example(input="test input").with_inputs("input")) expected_outputs = {f"test output {i}" for i in range(1, 6)} assert {r.output for r in output} == expected_outputs def test_batch_module(): lm = DummyLM( [ {"output": "test output 1"}, {"output": "test output 2"}, {"output": "test output 3"}, {"output": "test output 4"}, {"output": "test output 5"}, ] ) res_lm = DummyLM( [ {"output": "test output 1", "reasoning": "test reasoning 1"}, {"output": "test output 2", "reasoning": "test reasoning 2"}, {"output": "test output 3", "reasoning": "test reasoning 3"}, {"output": "test output 4", "reasoning": "test reasoning 4"}, {"output": "test output 5", "reasoning": "test reasoning 5"}, ] ) class MyModule(dspy.Module): def __init__(self): super().__init__() self.predictor = dspy.Predict("input -> output") self.predictor2 = dspy.Predict("input -> output, reasoning") self.parallel = dspy.Parallel(num_threads=2) def forward(self, input): with dspy.context(lm=lm): res1 = self.predictor.batch([input] * 5) with dspy.context(lm=res_lm): res2 = self.predictor2.batch([input] * 5) return (res1, res2) result, reason_result = MyModule()(dspy.Example(input="test input").with_inputs("input")) # Check that we got all expected outputs without caring about order expected_outputs = {f"test output {i}" for i in range(1, 6)} assert {r.output for r in result} == expected_outputs assert {r.output for r in reason_result} == expected_outputs # Check that reasoning matches outputs for reason_result for r in reason_result: num = r.output.split()[-1] # get the number from "test output X" assert r.reasoning == f"test reasoning {num}" def test_nested_parallel_module(): lm = DummyLM( [ {"output": "test output 1"}, {"output": "test output 2"}, {"output": "test output 3"}, {"output": "test output 4"}, {"output": "test output 5"}, ] ) dspy.settings.configure(lm=lm) class MyModule(dspy.Module): def __init__(self): super().__init__() self.predictor = dspy.Predict("input -> output") self.predictor2 = dspy.Predict("input -> output") self.parallel = dspy.Parallel(num_threads=2) def forward(self, input): return self.parallel( [ (self.predictor, input), (self.predictor2, input), ( self.parallel, [ (self.predictor2, input), (self.predictor, input), ], ), ] ) output = MyModule()(dspy.Example(input="test input").with_inputs("input")) # For nested structure, check first two outputs and nested outputs separately assert {output[0].output, output[1].output} <= {f"test output {i}" for i in range(1, 5)} assert {output[2][0].output, output[2][1].output} <= {f"test output {i}" for i in range(1, 5)} all_outputs = {output[0].output, output[1].output, output[2][0].output, output[2][1].output} assert len(all_outputs) == 4 def test_nested_batch_method(): lm = DummyLM( [ {"output": "test output 1"}, {"output": "test output 2"}, {"output": "test output 3"}, {"output": "test output 4"}, {"output": "test output 5"}, ] ) dspy.settings.configure(lm=lm) class MyModule(dspy.Module): def __init__(self): super().__init__() self.predictor = dspy.Predict("input -> output") def forward(self, input): res = self.predictor.batch([dspy.Example(input=input).with_inputs("input")] * 2) return res result = MyModule().batch([dspy.Example(input="test input").with_inputs("input")] * 2) assert {result[0][0].output, result[0][1].output, result[1][0].output, result[1][1].output} == { "test output 1", "test output 2", "test output 3", "test output 4", } def test_batch_with_failed_examples(): class FailingModule(dspy.Module): def forward(self, value: int) -> str: if value == 42: raise ValueError("test error") return f"success-{value}" module = FailingModule() examples = [ dspy.Example(value=1).with_inputs("value"), dspy.Example(value=42).with_inputs("value"), # This will fail dspy.Example(value=3).with_inputs("value"), ] results, failed_examples, exceptions = module.batch( examples, return_failed_examples=True, provide_traceback=True, ) assert results == ["success-1", None, "success-3"] assert len(failed_examples) == 1 assert failed_examples[0].inputs()["value"] == 42 assert len(exceptions) == 1 assert isinstance(exceptions[0], ValueError) assert str(exceptions[0]) == "test error" ``` -------------------------------------------------------------------------------- /.github/workflows/build_and_release.yml: -------------------------------------------------------------------------------- ```yaml --- name: Publish Python 🐍 distributions 📦 to PyPI on: push: tags: - "*" jobs: extract-tag: runs-on: ubuntu-latest outputs: version: ${{ steps.extract_tag.outputs.tag }} steps: - uses: actions/checkout@v4 - id: extract_tag name: Extract tag name run: echo "tag=$(echo $GITHUB_REF | cut -d / -f 3)" >> "$GITHUB_OUTPUT" build-and-publish-test-pypi: needs: extract-tag runs-on: ubuntu-latest environment: name: pypi permissions: id-token: write # IMPORTANT: mandatory for trusted publishing contents: write steps: - uses: actions/checkout@v4 - name: Set up Python 3.11 uses: actions/setup-python@v3 with: python-version: "3.11" - name: Install dependencies run: python3 -m pip install --upgrade setuptools wheel twine build semver packaging - name: Get correct version for TestPyPI release id: check_version run: | VERSION=${{ needs.extract-tag.outputs.version }} PACKAGE_NAME="dspy-ai-test" echo "Checking if $VERSION for $PACKAGE_NAME exists on TestPyPI" NEW_VERSION=$(python3 .github/workflows/build_utils/test_version.py $PACKAGE_NAME $VERSION) echo "Version to be used for TestPyPI release: $NEW_VERSION" echo "version=$NEW_VERSION" >> "$GITHUB_OUTPUT" - name: Update version in pyproject.toml run: sed -i '/#replace_package_version_marker/{n;s/version="[^"]*"/version="${{ steps.check_version.outputs.version }}"/;}' pyproject.toml - name: Update package name in pyproject.toml run: sed -i '/#replace_package_name_marker/{n;s/name="[^"]*"/name="dspy-ai-test"/;}' pyproject.toml - name: Build a binary wheel run: python3 -m build # Test the locally built wheel - name: Create test environment run: python -m venv test_before_testpypi - name: Test package installation and functionality run: | source test_before_testpypi/bin/activate # Install the locally built wheel and testing dependencies pip install dist/*.whl pytest pytest-asyncio pytest tests/metadata/test_metadata.py tests/predict deactivate # Publish to test-PyPI - name: Publish distribution 📦 to test-PyPI uses: pypa/gh-action-pypi-publish@release/v1 # This requires a trusted publisher to be setup in pypi/testpypi with: repository-url: https://test.pypi.org/legacy/ # TODO: Add tests using dspy-ai-test build-and-publish-pypi: needs: [extract-tag, build-and-publish-test-pypi] # Only publish to PyPI if the repository owner is stanfordnlp if: github.repository_owner == 'stanfordnlp' runs-on: ubuntu-latest environment: name: pypi permissions: id-token: write # IMPORTANT: mandatory for trusted publishing contents: write steps: - uses: actions/checkout@v4 - name: Set up Python 3.11 uses: actions/setup-python@v3 with: python-version: "3.11" - name: Install dependencies run: python3 -m pip install --upgrade setuptools wheel twine build - name: Update version in pyproject.toml run: sed -i '/#replace_package_version_marker/{n;s/version *= *"[^"]*"/version="${{ needs.extract-tag.outputs.version }}"/;}' pyproject.toml - name: Update version in __metadata__.py run: sed -i '/#replace_package_version_marker/{n;s/__version__ *= *"[^"]*"/__version__="${{ needs.extract-tag.outputs.version }}"/;}' ./dspy/__metadata__.py # Publish to dspy - name: Update package name in pyproject.toml run: sed -i '/#replace_package_name_marker/{n;s/name *= *"[^"]*"/name="dspy"/;}' pyproject.toml - name: Update package name in metadata.py run: sed -i '/#replace_package_name_marker/{n;s/__name__ *= *"[^"]*"/__name__="dspy"/;}' ./dspy/__metadata__.py - name: Build a binary wheel run: python3 -m build # Test the locally built wheel before publishing to pypi - name: Create test environment run: python -m venv test_before_pypi - name: Test package installation and functionality run: | source test_before_pypi/bin/activate # Install the locally built wheel and testing dependencies pip install dist/*.whl pytest pytest-asyncio pytest tests/metadata/test_metadata.py tests/predict deactivate rm -r test_before_pypi - name: Publish distribution 📦 to PyPI (dspy) uses: pypa/gh-action-pypi-publish@release/v1 with: attestations: false # Publish to dspy-ai - name: Update package name in pyproject.toml run: sed -i '/#replace_package_name_marker/{n;s/name *= *"[^"]*"/name="dspy-ai"/;}' .github/.internal_dspyai/pyproject.toml - name: Update version for dspy-ai release run: sed -i '/#replace_package_version_marker/{n;s/version *= *"[^"]*"/version="${{ needs.extract-tag.outputs.version }}"/;}' .github/.internal_dspyai/pyproject.toml - name: Update dspy dependency for dspy-ai release run: | sed -i '/#replace_dspy_version_marker/{n;s/dspy>=[^"]*/dspy>=${{ needs.extract-tag.outputs.version }}/;}' .github/.internal_dspyai/pyproject.toml - name: Build a binary wheel (dspy-ai) run: python3 -m build .github/.internal_dspyai/ - name: Publish distribution 📦 to PyPI (dspy-ai) uses: pypa/gh-action-pypi-publish@release/v1 with: attestations: false packages-dir: .github/.internal_dspyai/dist/ - uses: stefanzweifel/git-auto-commit-action@v5 # auto commit changes to main with: commit_message: Update versions create_branch: true branch: release-${{ needs.extract-tag.outputs.version }} - name: Checkout main branch run: | git fetch origin git checkout main - name: Configure git user run: | git config --global user.email "[email protected]" git config --global user.name "Github Actions" - name: Merge release branch into main run: | git merge --no-ff release-${{ needs.extract-tag.outputs.version }} - name: Push changes to main run: | git push origin main ``` -------------------------------------------------------------------------------- /dspy/teleprompt/infer_rules.py: -------------------------------------------------------------------------------- ```python import logging import random import numpy as np import dspy from dspy.evaluate.evaluate import Evaluate from dspy.teleprompt import BootstrapFewShot logger = logging.getLogger(__name__) class InferRules(BootstrapFewShot): def __init__(self, num_candidates=10, num_rules=10, num_threads=None, teacher_settings=None, **kwargs): super().__init__(teacher_settings=teacher_settings, **kwargs) self.num_candidates = num_candidates self.num_rules = num_rules self.num_threads = num_threads self.rules_induction_program = RulesInductionProgram(num_rules, teacher_settings=teacher_settings) self.metric = kwargs.get("metric") self.max_errors = kwargs.get("max_errors") def compile(self, student, *, teacher=None, trainset, valset=None): if valset is None: train_size = int(0.5 * len(trainset)) trainset, valset = trainset[:train_size], trainset[train_size:] super().compile(student, teacher=teacher, trainset=trainset) original_program = self.student.deepcopy() all_predictors = [p for p in original_program.predictors() if hasattr(p, "signature")] instructions_list = [p.signature.instructions for p in all_predictors] best_score = -np.inf best_program = None for candidate_idx in range(self.num_candidates): candidate_program = original_program.deepcopy() candidate_predictors = [p for p in candidate_program.predictors() if hasattr(p, "signature")] for i, predictor in enumerate(candidate_predictors): predictor.signature.instructions = instructions_list[i] for i, predictor in enumerate(candidate_predictors): rules = self.induce_natural_language_rules(predictor, trainset) predictor.signature.instructions = instructions_list[i] self.update_program_instructions(predictor, rules) score = self.evaluate_program(candidate_program, valset) if score > best_score: best_score = score best_program = candidate_program logger.info(f"Evaluated Candidate {candidate_idx + 1} with score {score}. Current best score: {best_score}") logger.info(f"Final best score: {best_score}") return best_program def induce_natural_language_rules(self, predictor, trainset): demos = self.get_predictor_demos(trainset, predictor) signature = predictor.signature while True: examples_text = self.format_examples(demos, signature) try: return self.rules_induction_program(examples_text) except Exception as e: assert ( isinstance(e, ValueError) or e.__class__.__name__ == "BadRequestError" or "ContextWindowExceededError" in str(e) ) if len(demos) > 1: demos = demos[:-1] else: raise RuntimeError( "Failed to generate natural language rules since a single example couldn't fit in the model's " "context window." ) from e def update_program_instructions(self, predictor, natural_language_rules): predictor.signature.instructions = ( f"{predictor.signature.instructions}\n\n" f"Please adhere to the following rules when making your prediction:\n{natural_language_rules}" ) def format_examples(self, demos, signature): examples_text = "" for demo in demos: input_fields = {k: v for k, v in demo.items() if k in signature.input_fields} output_fields = {k: v for k, v in demo.items() if k in signature.output_fields} input_text = "\n".join(f"{k}: {v}" for k, v in input_fields.items()) output_text = "\n".join(f"{k}: {v}" for k, v in output_fields.items()) examples_text += f"Input Fields:\n{input_text}\n\n=========\nOutput Fields:\n{output_text}\n\n" return examples_text def get_predictor_demos(self, trainset, predictor): # TODO: Consider how this handled "incomplete" demos. signature = predictor.signature return [ { key: value for key, value in example.items() if key in signature.input_fields or key in signature.output_fields } for example in trainset ] def evaluate_program(self, program, dataset): effective_max_errors = ( self.max_errors if self.max_errors is not None else dspy.settings.max_errors ) evaluate = Evaluate( devset=dataset, metric=self.metric, num_threads=self.num_threads, max_errors=effective_max_errors, display_table=False, display_progress=True, ) score = evaluate(program, metric=self.metric).score return score class RulesInductionProgram(dspy.Module): def __init__(self, num_rules, teacher_settings=None): super().__init__() class CustomRulesInduction(dspy.Signature): __doc__ = ( f"Given a set of examples, extract a list of {num_rules} concise and non-redundant natural language " "rules that provide clear guidance for performing the task. All rules should be actionable for a " "well-specified scope of examples of this general kind of task." ) examples_text = dspy.InputField(desc="Text containing examples") natural_language_rules = dspy.OutputField(desc="Induced natural language rules") self.rules_induction = dspy.ChainOfThought(CustomRulesInduction) self.teacher_settings = teacher_settings or {} self.rng = random.Random(0) def forward(self, examples_text): with dspy.settings.context(**self.teacher_settings): # Generate rules with a fresh rollout and non-zero temperature. lm = dspy.settings.lm.copy( rollout_id=self.rng.randint(0, 10**9), temperature=1.0 ) with dspy.settings.context(lm=lm): rules = self.rules_induction(examples_text=examples_text).natural_language_rules return rules.strip() ``` -------------------------------------------------------------------------------- /tests/primitives/test_python_interpreter.py: -------------------------------------------------------------------------------- ```python import os import random import shutil import pytest from dspy.primitives.python_interpreter import InterpreterError, PythonInterpreter # This test suite requires deno to be installed. Please install deno following https://docs.deno.com/runtime/getting_started/installation/ if shutil.which("deno") is None: pytest.skip(reason="Deno is not installed or not in PATH", allow_module_level=True) def test_execute_simple_code(): with PythonInterpreter() as interpreter: code = "print('Hello, World!')" result = interpreter.execute(code) assert result == "Hello, World!\n", "Simple print statement should return 'Hello World!\n'" def test_import(): with PythonInterpreter() as interpreter: code = "import math\nresult = math.sqrt(4)\nresult" result = interpreter.execute(code) assert result == 2, "Should be able to import and use math.sqrt" def test_user_variable_definitions(): with PythonInterpreter() as interpreter: code = "result = number + 1\nresult" result = interpreter.execute(code, variables={"number": 4}) assert result == 5, "User variable assignment should work" def test_failure_syntax_error(): with PythonInterpreter() as interpreter: code = "+++" with pytest.raises(SyntaxError, match="Invalid Python syntax"): interpreter.execute(code) def test_failure_zero_division(): with PythonInterpreter() as interpreter: code = "1+0/0" with pytest.raises(InterpreterError, match="ZeroDivisionError"): interpreter.execute(code) def test_exception_args(): with PythonInterpreter() as interpreter: token = random.randint(1, 10**9) code = f"raise ValueError({token})" with pytest.raises(InterpreterError, match=rf"ValueError: \[{token}\]"): interpreter.execute(code) def test_final_answer_trick(): with PythonInterpreter() as interpreter: token = random.randint(1, 10**9) code = f"final_answer('The result is', {token})" result = interpreter(code) # They should maintain the same order assert result == ["The result is", token], "The returned results are differ, `final_answer` trick doesn't work" def test_enable_env_vars_flag(): os.environ["FOO_TEST_ENV"] = "test_value" with PythonInterpreter(enable_env_vars=None) as interpreter: code = "import os\nresult = os.getenv('FOO_TEST_ENV')\nresult" result = interpreter.execute(code) assert result == "", "Environment variables should be inaccessible without allow-env" with PythonInterpreter(enable_env_vars=["FOO_TEST_ENV"]) as interpreter: code = "import os\nresult = os.getenv('FOO_TEST_ENV')\nresult" result = interpreter.execute(code) assert result == "test_value", "Environment variables should be accessible with allow-env" def test_read_file_access_control(tmp_path): testfile_path = tmp_path / "test_temp_file.txt" virtual_path = f"/sandbox/{testfile_path.name}" with open(testfile_path, "w") as f: f.write("test content") with PythonInterpreter(enable_read_paths=[str(testfile_path)]) as interpreter: code = ( f"with open({virtual_path!r}, 'r') as f:\n" f" data = f.read()\n" f"data" ) result = interpreter.execute(code) assert result == "test content", "Test file should be accessible with enable_read_paths and specified file" with PythonInterpreter(enable_read_paths=None) as interpreter: code = ( f"try:\n" f" with open({virtual_path!r}, 'r') as f:\n" f" data = f.read()\n" f"except Exception as e:\n" f" data = str(e)\n" f"data" ) result = interpreter.execute(code) assert ("PermissionDenied" in result or "denied" in result.lower() or "no such file" in result.lower()), "Test file should not be accessible without enable_read_paths" def test_enable_write_flag(tmp_path): testfile_path = tmp_path / "test_temp_output.txt" virtual_path = f"/sandbox/{testfile_path.name}" with PythonInterpreter(enable_write_paths=None) as interpreter: code = ( f"try:\n" f" with open({virtual_path!r}, 'w') as f:\n" f" f.write('blocked')\n" f" result = 'wrote'\n" f"except Exception as e:\n" f" result = str(e)\n" f"result" ) result = interpreter.execute(code) assert ("PermissionDenied" in result or "denied" in result.lower() or "no such file" in result.lower()), "Test file should not be writable without enable_write_paths" with PythonInterpreter(enable_write_paths=[str(testfile_path)]) as interpreter: code = ( f"with open({virtual_path!r}, 'w') as f:\n" f" f.write('allowed')\n" f"'ok'" ) result = interpreter.execute(code) assert result == "ok", "Test file should be writable with enable_write_paths" assert testfile_path.exists() with open(testfile_path) as f: assert f.read() == "allowed", "Test file outputs should match content written during execution" with open(testfile_path, "w") as f: f.write("original_content") with PythonInterpreter(enable_write_paths=[str(testfile_path)], sync_files=False) as interpreter: code = ( f"with open({virtual_path!r}, 'w') as f:\n" f" f.write('should_not_sync')\n" f"'done_no_sync'" ) result = interpreter.execute(code) assert result == "done_no_sync" with open(testfile_path) as f: assert f.read() == "original_content", "File should not be changed when sync_files is False" def test_enable_net_flag(): test_url = "https://example.com" with PythonInterpreter(enable_network_access=None) as interpreter: code = ( "import js\n" f"resp = await js.fetch({test_url!r})\n" "resp.status" ) with pytest.raises(InterpreterError, match="PythonError"): interpreter.execute(code) with PythonInterpreter(enable_network_access=["example.com"]) as interpreter: code = ( "import js\n" f"resp = await js.fetch({test_url!r})\n" "resp.status" ) result = interpreter.execute(code) assert int(result) == 200, "Network access is permitted with enable_network_access" ``` -------------------------------------------------------------------------------- /dspy/adapters/types/base_type.py: -------------------------------------------------------------------------------- ```python import json import re from typing import Any, Optional, get_args, get_origin import json_repair import pydantic from litellm import ModelResponseStream CUSTOM_TYPE_START_IDENTIFIER = "<<CUSTOM-TYPE-START-IDENTIFIER>>" CUSTOM_TYPE_END_IDENTIFIER = "<<CUSTOM-TYPE-END-IDENTIFIER>>" class Type(pydantic.BaseModel): """Base class to support creating custom types for DSPy signatures. This is the parent class of DSPy custom types, e.g, dspy.Image. Subclasses must implement the `format` method to return a list of dictionaries (same as the Array of content parts in the OpenAI API user message's content field). Example: ```python class Image(Type): url: str def format(self) -> list[dict[str, Any]]: return [{"type": "image_url", "image_url": {"url": self.url}}] ``` """ def format(self) -> list[dict[str, Any]] | str: raise NotImplementedError @classmethod def description(cls) -> str: """Description of the custom type""" return "" @classmethod def extract_custom_type_from_annotation(cls, annotation): """Extract all custom types from the annotation. This is used to extract all custom types from the annotation of a field, while the annotation can have arbitrary level of nesting. For example, we detect `Tool` is in `list[dict[str, Tool]]`. """ # Direct match. Nested type like `list[dict[str, Event]]` passes `isinstance(annotation, type)` in python 3.10 # while fails in python 3.11. To accommodate users using python 3.10, we need to capture the error and ignore it. try: if isinstance(annotation, type) and issubclass(annotation, cls): return [annotation] except TypeError: pass origin = get_origin(annotation) if origin is None: return [] result = [] # Recurse into all type args for arg in get_args(annotation): result.extend(cls.extract_custom_type_from_annotation(arg)) return result @pydantic.model_serializer() def serialize_model(self): formatted = self.format() if isinstance(formatted, list): return ( f"{CUSTOM_TYPE_START_IDENTIFIER}{json.dumps(formatted, ensure_ascii=False)}{CUSTOM_TYPE_END_IDENTIFIER}" ) return formatted @classmethod def is_streamable(cls) -> bool: """Whether the custom type is streamable.""" return False @classmethod def parse_stream_chunk(cls, chunk: ModelResponseStream) -> Optional["Type"]: """ Parse a stream chunk into the custom type. Args: chunk: A stream chunk. Returns: A custom type object or None if the chunk is not for this custom type. """ return None @classmethod def parse_lm_response(cls, response: str | dict[str, Any]) -> Optional["Type"]: """Parse a LM response into the custom type. Args: response: A LM response. Returns: A custom type object. """ return None def split_message_content_for_custom_types(messages: list[dict[str, Any]]) -> list[dict[str, Any]]: """Split user message content into a list of content blocks. This method splits each user message's content in the `messages` list to be a list of content block, so that the custom types like `dspy.Image` can be properly formatted for better quality. For example, the split content may look like below if the user message has a `dspy.Image` object: ``` [ {"type": "text", "text": "{text_before_image}"}, {"type": "image_url", "image_url": {"url": "{image_url}"}}, {"type": "text", "text": "{text_after_image}"}, ] ``` This is implemented by finding the `<<CUSTOM-TYPE-START-IDENTIFIER>>` and `<<CUSTOM-TYPE-END-IDENTIFIER>>` in the user message content and splitting the content around them. The `<<CUSTOM-TYPE-START-IDENTIFIER>>` and `<<CUSTOM-TYPE-END-IDENTIFIER>>` are the reserved identifiers for the custom types as in `dspy.Type`. Args: messages: a list of messages sent to the LM. The format is the same as [OpenAI API's messages format](https://platform.openai.com/docs/guides/chat-completions/response-format). Returns: A list of messages with the content split into a list of content blocks around custom types content. """ for message in messages: if message["role"] != "user": # Custom type messages are only in user messages continue pattern = rf"{CUSTOM_TYPE_START_IDENTIFIER}(.*?){CUSTOM_TYPE_END_IDENTIFIER}" result = [] last_end = 0 # DSPy adapter always formats user input into a string content before custom type splitting content: str = message["content"] for match in re.finditer(pattern, content, re.DOTALL): start, end = match.span() # Add text before the current block if start > last_end: result.append({"type": "text", "text": content[last_end:start]}) # Parse the JSON inside the block custom_type_content = match.group(1).strip() parsed = None for parse_fn in [json.loads, _parse_doubly_quoted_json, json_repair.loads]: try: parsed = parse_fn(custom_type_content) break except json.JSONDecodeError: continue if parsed: for custom_type_content in parsed: result.append(custom_type_content) else: # fallback to raw string if it's not valid JSON result.append({"type": "text", "text": custom_type_content}) last_end = end if last_end == 0: # No custom type found, return the original message continue # Add any remaining text after the last match if last_end < len(content): result.append({"type": "text", "text": content[last_end:]}) message["content"] = result return messages def _parse_doubly_quoted_json(json_str: str) -> Any: """ Parse a doubly quoted JSON string into a Python dict. `dspy.Type` can be json-encoded twice if included in either list or dict, e.g., `list[dspy.experimental.Document]` """ return json.loads(json.loads(f'"{json_str}"')) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/saving/index.md: -------------------------------------------------------------------------------- ```markdown # Tutorial: Saving and Loading your DSPy program This guide demonstrates how to save and load your DSPy program. At a high level, there are two ways to save your DSPy program: 1. Save the state of the program only, similar to weights-only saving in PyTorch. 2. Save the whole program, including both the architecture and the state, which is supported by `dspy>=2.6.0`. ## State-only Saving State represents the DSPy program's internal state, including the signature, demos (few-shot examples), and other information like the `lm` to use for each `dspy.Predict` in the program. It also includes configurable attributes of other DSPy modules like `k` for `dspy.retrievers.Retriever`. To save the state of a program, use the `save` method and set `save_program=False`. You can choose to save the state to a JSON file or a pickle file. We recommend saving the state to a JSON file because it is safer and readable. But sometimes your program contains non-serializable objects like `dspy.Image` or `datetime.datetime`, in which case you should save the state to a pickle file. Let's say we have compiled a program with some data, and we want to save the program for future usage: ```python import dspy from dspy.datasets.gsm8k import GSM8K, gsm8k_metric dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) gsm8k = GSM8K() gsm8k_trainset = gsm8k.train[:10] dspy_program = dspy.ChainOfThought("question -> answer") optimizer = dspy.BootstrapFewShot(metric=gsm8k_metric, max_bootstrapped_demos=4, max_labeled_demos=4, max_rounds=5) compiled_dspy_program = optimizer.compile(dspy_program, trainset=gsm8k_trainset) ``` To save the state of your program to json file: ```python compiled_dspy_program.save("./dspy_program/program.json", save_program=False) ``` To save the state of your program to a pickle file: ```python compiled_dspy_program.save("./dspy_program/program.pkl", save_program=False) ``` To load your saved state, you need to **recreate the same program**, then load the state using the `load` method. ```python loaded_dspy_program = dspy.ChainOfThought("question -> answer") # Recreate the same program. loaded_dspy_program.load("./dspy_program/program.json") assert len(compiled_dspy_program.demos) == len(loaded_dspy_program.demos) for original_demo, loaded_demo in zip(compiled_dspy_program.demos, loaded_dspy_program.demos): # Loaded demo is a dict, while the original demo is a dspy.Example. assert original_demo.toDict() == loaded_demo assert str(compiled_dspy_program.signature) == str(loaded_dspy_program.signature) ``` Or load the state from a pickle file: ```python loaded_dspy_program = dspy.ChainOfThought("question -> answer") # Recreate the same program. loaded_dspy_program.load("./dspy_program/program.pkl") assert len(compiled_dspy_program.demos) == len(loaded_dspy_program.demos) for original_demo, loaded_demo in zip(compiled_dspy_program.demos, loaded_dspy_program.demos): # Loaded demo is a dict, while the original demo is a dspy.Example. assert original_demo.toDict() == loaded_demo assert str(compiled_dspy_program.signature) == str(loaded_dspy_program.signature) ``` ## Whole Program Saving Starting from `dspy>=2.6.0`, DSPy supports saving the whole program, including the architecture and the state. This feature is powered by `cloudpickle`, which is a library for serializing and deserializing Python objects. To save the whole program, use the `save` method and set `save_program=True`, and specify a directory path to save the program instead of a file name. We require a directory path because we also save some metadata, e.g., the dependency versions along with the program itself. ```python compiled_dspy_program.save("./dspy_program/", save_program=True) ``` To load the saved program, directly use `dspy.load` method: ```python loaded_dspy_program = dspy.load("./dspy_program/") assert len(compiled_dspy_program.demos) == len(loaded_dspy_program.demos) for original_demo, loaded_demo in zip(compiled_dspy_program.demos, loaded_dspy_program.demos): # Loaded demo is a dict, while the original demo is a dspy.Example. assert original_demo.toDict() == loaded_demo assert str(compiled_dspy_program.signature) == str(loaded_dspy_program.signature) ``` With whole program saving, you don't need to recreate the program, but can directly load the architecture along with the state. You can pick the suitable saving approach based on your needs. ### Serializing Imported Modules When saving a program with `save_program=True`, you might need to include custom modules that your program depends on. This is necessary if your program depends on these modules, but at loading time these modules are not imported before calling `dspy.load`. You can specify which custom modules should be serialized with your program by passing them to the `modules_to_serialize` parameter when calling `save`. This ensures that any dependencies your program relies on are included during serialization and available when loading the program later. Under the hood this uses cloudpickle's `cloudpickle.register_pickle_by_value` function to register a module as picklable by value. When a module is registered this way, cloudpickle will serialize the module by value rather than by reference, ensuring that the module contents are preserved with the saved program. For example, if your program uses custom modules: ```python import dspy import my_custom_module compiled_dspy_program = dspy.ChainOfThought(my_custom_module.custom_signature) # Save the program with the custom module compiled_dspy_program.save( "./dspy_program/", save_program=True, modules_to_serialize=[my_custom_module] ) ``` This ensures that the required modules are properly serialized and available when loading the program later. Any number of modules can be passed to `modules_to_serialize`. If you don't specify `modules_to_serialize`, no additional modules will be registered for serialization. ## Backward Compatibility As of `dspy<3.0.0`, we don't guarantee the backward compatibility of the saved program. For example, if you save the program with `dspy==2.5.35`, at loading time please make sure to use the same version of DSPy to load the program, otherwise the program may not work as expected. Chances are that loading a saved file in a different version of DSPy will not raise an error, but the performance could be different from when the program was saved. Starting from `dspy>=3.0.0`, we will guarantee the backward compatibility of the saved program in major releases, i.e., programs saved in `dspy==3.0.0` should be loadable in `dspy==3.7.10`. ``` -------------------------------------------------------------------------------- /dspy/dsp/utils/settings.py: -------------------------------------------------------------------------------- ```python import asyncio import contextvars import copy import threading from contextlib import contextmanager from dspy.dsp.utils.utils import dotdict DEFAULT_CONFIG = dotdict( lm=None, adapter=None, rm=None, branch_idx=0, trace=[], callbacks=[], async_max_workers=8, send_stream=None, disable_history=False, track_usage=False, usage_tracker=None, caller_predict=None, caller_modules=None, stream_listeners=[], provide_traceback=False, # Whether to include traceback information in error logs. num_threads=8, # Number of threads to use for parallel processing. max_errors=10, # Maximum errors before halting operations. # If true, async tools can be called in sync mode by getting converted to sync. allow_tool_async_sync_conversion=False, max_history_size=10000, max_trace_size=10000, ) # Global base configuration and owner tracking main_thread_config = copy.deepcopy(DEFAULT_CONFIG) config_owner_thread_id = None config_owner_async_task = None # Global lock for settings configuration global_lock = threading.Lock() thread_local_overrides = contextvars.ContextVar("context_overrides", default=dotdict()) class Settings: """ A singleton class for DSPy configuration settings. Thread-safe global configuration. - 'configure' can be called by only one 'owner' thread (the first thread that calls it). - Other threads see the configured global values from 'main_thread_config'. - 'context' sets thread-local overrides. These overrides propagate to threads spawned inside that context block, when (and only when!) using a ParallelExecutor that copies overrides. 1. Only one unique thread (which can be any thread!) can call dspy.configure. 2. It affects a global state, visible to all. As a result, user threads work, but they shouldn't be mixed with concurrent changes to dspy.configure from the "main" thread. (TODO: In the future, add warnings: if there are near-in-time user-thread reads followed by .configure calls.) 3. Any thread can use dspy.context. It propagates to child threads created with DSPy primitives: Parallel, asyncify, etc. """ _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance @property def lock(self): return global_lock def __getattr__(self, name): overrides = thread_local_overrides.get() if name in overrides: return overrides[name] elif name in main_thread_config: return main_thread_config[name] else: raise AttributeError(f"'Settings' object has no attribute '{name}'") def __setattr__(self, name, value): if name in ("_instance",): super().__setattr__(name, value) else: self.configure(**{name: value}) def __getitem__(self, key): return self.__getattr__(key) def __setitem__(self, key, value): self.__setattr__(key, value) def __contains__(self, key): overrides = thread_local_overrides.get() return key in overrides or key in main_thread_config def get(self, key, default=None): try: return self[key] except AttributeError: return default def copy(self): overrides = thread_local_overrides.get() return dotdict({**main_thread_config, **overrides}) @property def config(self): return self.copy() def _ensure_configure_allowed(self): global main_thread_config, config_owner_thread_id, config_owner_async_task current_thread_id = threading.get_ident() if config_owner_thread_id is None: # First `configure` call assigns the owner thread id. config_owner_thread_id = current_thread_id if config_owner_thread_id != current_thread_id: # Disallow a second `configure` calls from other threads. raise RuntimeError("dspy.settings can only be changed by the thread that initially configured it.") # Async task doesn't allow a second `configure` call, must use dspy.context(...) instead. is_async_task = False try: if asyncio.current_task() is not None: is_async_task = True except RuntimeError: # This exception (e.g., "no current task") means we are not in an async loop/task, # or asyncio module itself is not fully functional in this specific sub-thread context. is_async_task = False if not is_async_task: return if config_owner_async_task is None: # First `configure` call assigns the owner async task. config_owner_async_task = asyncio.current_task() return # We are in an async task. Now check for IPython and allow calling `configure` from IPython. in_ipython = False try: from IPython import get_ipython # get_ipython is a global injected by IPython environments. # We check its existence and type to be more robust. in_ipython = get_ipython() is not None except Exception: # If `IPython` is not installed or `get_ipython` failed, we are not in an IPython environment. in_ipython = False if not in_ipython and config_owner_async_task != asyncio.current_task(): raise RuntimeError( "dspy.settings.configure(...) can only be called from the same async task that called it first. Please " "use `dspy.context(...)` in other async tasks instead." ) def configure(self, **kwargs): # If no exception is raised, the `configure` call is allowed. self._ensure_configure_allowed() # Update global config for k, v in kwargs.items(): main_thread_config[k] = v @contextmanager def context(self, **kwargs): """ Context manager for temporary configuration changes at the thread level. Does not affect global configuration. Changes only apply to the current thread. If threads are spawned inside this block using ParallelExecutor, they will inherit these overrides. """ original_overrides = thread_local_overrides.get().copy() new_overrides = dotdict({**main_thread_config, **original_overrides, **kwargs}) token = thread_local_overrides.set(new_overrides) try: yield finally: thread_local_overrides.reset(token) def __repr__(self): overrides = thread_local_overrides.get() combined_config = {**main_thread_config, **overrides} return repr(combined_config) settings = Settings() ``` -------------------------------------------------------------------------------- /dspy/teleprompt/random_search.py: -------------------------------------------------------------------------------- ```python import random import dspy from dspy.evaluate.evaluate import Evaluate from dspy.teleprompt.teleprompt import Teleprompter from .bootstrap import BootstrapFewShot from .vanilla import LabeledFewShot # TODO: Don't forget dealing with the raw demos. # TODO: Deal with the (pretty common) case of having a metric for filtering and a separate metric for eval. # The metric itself may tell though by the presence of trace. # TODO: This function should take a max_budget and max_teacher_budget. That's in the number of program calls. # In this case, max_student_budget is max_budget - max_teacher_budget. # For max_teacher_budget, this will just limit the total number of things we bootstrap. # This can end up implicitly defining the number of candidate programs (i.e., stop when runs out). Cap at 16. # For max_student_budget, this will be a more upfront calculation. # Right now, it can also just induce the number of candidate programs. Later, it could be used more interestingly # for selective early stopping. # Progressive elimination sounds about right: after 50 examples, drop bottom third, after 100, another third, etc. # until only 3--5 are left for the end. Could also be systematic and add (earlier) stopping based on error bounds. # In general, though, the early filtering is just saying: either there are some really bad ones, or some really really # good ones, or most things are pretty close. In all of these cases, dropping the bottom third is not going to hurt. class BootstrapFewShotWithRandomSearch(Teleprompter): def __init__( self, metric, teacher_settings=None, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, num_candidate_programs=16, num_threads=None, max_errors=None, stop_at_score=None, metric_threshold=None, ): self.metric = metric self.teacher_settings = teacher_settings or {} self.max_rounds = max_rounds self.num_threads = num_threads self.stop_at_score = stop_at_score self.metric_threshold = metric_threshold self.min_num_samples = 1 self.max_num_samples = max_bootstrapped_demos self.max_errors = max_errors self.num_candidate_sets = num_candidate_programs self.max_labeled_demos = max_labeled_demos print(f"Going to sample between {self.min_num_samples} and {self.max_num_samples} traces per predictor.") print(f"Will attempt to bootstrap {self.num_candidate_sets} candidate sets.") def compile(self, student, *, teacher=None, trainset, valset=None, restrict=None, labeled_sample=True): self.trainset = trainset self.valset = valset or trainset # TODO: FIXME: Note this choice. effective_max_errors = self.max_errors if self.max_errors is not None else dspy.settings.max_errors scores = [] all_subscores = [] score_data = [] for seed in range(-3, self.num_candidate_sets): if (restrict is not None) and (seed not in restrict): continue trainset_copy = list(self.trainset) if seed == -3: # zero-shot program = student.reset_copy() elif seed == -2: # labels only teleprompter = LabeledFewShot(k=self.max_labeled_demos) program = teleprompter.compile(student, trainset=trainset_copy, sample=labeled_sample) elif seed == -1: # unshuffled few-shot optimizer = BootstrapFewShot( metric=self.metric, metric_threshold=self.metric_threshold, max_bootstrapped_demos=self.max_num_samples, max_labeled_demos=self.max_labeled_demos, teacher_settings=self.teacher_settings, max_rounds=self.max_rounds, max_errors=effective_max_errors, ) program = optimizer.compile(student, teacher=teacher, trainset=trainset_copy) else: assert seed >= 0, seed random.Random(seed).shuffle(trainset_copy) size = random.Random(seed).randint(self.min_num_samples, self.max_num_samples) optimizer = BootstrapFewShot( metric=self.metric, metric_threshold=self.metric_threshold, max_bootstrapped_demos=size, max_labeled_demos=self.max_labeled_demos, teacher_settings=self.teacher_settings, max_rounds=self.max_rounds, max_errors=effective_max_errors, ) program = optimizer.compile(student, teacher=teacher, trainset=trainset_copy) evaluate = Evaluate( devset=self.valset, metric=self.metric, num_threads=self.num_threads, max_errors=effective_max_errors, display_table=False, display_progress=True, ) result = evaluate(program) score, subscores = result.score, [output[2] for output in result.results] all_subscores.append(subscores) if len(scores) == 0 or score > max(scores): print("New best score:", score, "for seed", seed) best_program = program scores.append(score) print(f"Scores so far: {scores}") print(f"Best score so far: {max(scores)}") score_data.append({"score": score, "subscores": subscores, "seed": seed, "program": program}) if self.stop_at_score is not None and score >= self.stop_at_score: print(f"Stopping early because score {score} is >= stop_at_score {self.stop_at_score}") break # To best program, attach all program candidates in decreasing average score best_program.candidate_programs = score_data best_program.candidate_programs = sorted( best_program.candidate_programs, key=lambda x: x["score"], reverse=True ) print(f"{len(best_program.candidate_programs)} candidate programs found.") return best_program # sample between 4 and 10 examples from traces # TODO: FIXME: The max number of demos should be determined in part by the LM's tokenizer + max_length. # This does require executing the program, or at least the predictor. # # # # # # (Actually we can just combine the token counts of the traces, when formatted via signature/adapter). # Alternatively, we can keep track of the (zero-shot) number of tokens when we bootstrap. # As another option, we can just try a wide range and handle failures as penalties on the score. # The number "24" of traces to collect can also be affected. If we only need 3x10, some overlap is ok. # We can also consider having short_demos and long_demos. ``` -------------------------------------------------------------------------------- /tests/primitives/test_module.py: -------------------------------------------------------------------------------- ```python from pathlib import Path import dspy from dspy.primitives.module import Module, set_attribute_by_name # Adjust the import based on your file structure from dspy.utils import DummyLM class HopModule(dspy.Module): def __init__(self): super().__init__() self.predict1 = dspy.Predict("question -> query") self.predict2 = dspy.Predict("query -> answer") def forward(self, question): query = self.predict1(question=question).query return self.predict2(query=query) def test_module_initialization(): module = Module() assert module._compiled is False, "Module _compiled attribute should be False upon initialization" def test_named_predictors(): module = HopModule() named_preds = module.named_predictors() assert len(named_preds) == 2, "Should identify correct number of Predict instances" names, preds = zip(*named_preds, strict=False) assert "predict1" in names and "predict2" in names, "Named predictors should include 'predict1' and 'predict2'" def test_predictors(): module = HopModule() preds = module.predictors() assert len(preds) == 2, "Should return correct number of Predict instances" assert all(isinstance(p, dspy.Predict) for p in preds), "All returned items should be instances of PredictMock" def test_forward(): program = HopModule() dspy.settings.configure( lm=DummyLM( { "What is 1+1?": {"query": "let me check"}, "let me check": {"answer": "2"}, } ) ) result = program(question="What is 1+1?").answer assert result == "2" def test_nested_named_predictors(): class Hop2Module(dspy.Module): def __init__(self): super().__init__() self.hop = HopModule() module = Hop2Module() named_preds = module.named_predictors() assert len(named_preds) == 2 names, _preds = zip(*named_preds, strict=False) assert "hop.predict1" in names assert "hop.predict2" in names def test_empty_module(): module = Module() assert list(module.named_sub_modules()) == [("self", module)] def test_single_level(): module = Module() module.sub = Module() expected = [("self", module), ("self.sub", module.sub)] assert list(module.named_sub_modules()) == expected def test_multiple_levels(): module = Module() module.sub = Module() module.sub.subsub = Module() expected = [("self", module), ("self.sub", module.sub), ("self.sub.subsub", module.sub.subsub)] assert list(module.named_sub_modules()) == expected def test_multiple_sub_modules(): module = Module() module.sub1 = Module() module.sub2 = Module() expected = [("self", module), ("self.sub1", module.sub1), ("self.sub2", module.sub2)] assert sorted(module.named_sub_modules()) == sorted(expected) def test_non_base_module_attributes(): module = Module() module.sub = Module() module.not_a_sub = "Not a self" expected = [("self", module), ("self.sub", module.sub)] assert list(module.named_sub_modules()) == expected def test_complex_module_traversal(): root = Module() root.sub_module = Module() root.sub_module.nested_list = [Module(), {"key": Module()}] root.sub_module.nested_tuple = (Module(), [Module(), Module()]) expected_names = { "self", "self.sub_module", "self.sub_module.nested_list[0]", "self.sub_module.nested_list[1][key]", "self.sub_module.nested_tuple[0]", "self.sub_module.nested_tuple[1][0]", "self.sub_module.nested_tuple[1][1]", } found_names = {name for name, _ in root.named_sub_modules()} assert found_names == expected_names, ( f"Missing or extra modules found. Missing: {expected_names - found_names}, Extra: {found_names - expected_names}" ) def test_complex_module_traversal_with_same_module(): root = Module() root.sub_module = Module() root.sub_module.nested_list = [Module(), {"key": Module()}] same_module = Module() root.sub_module.nested_tuple = (Module(), [same_module, same_module]) expected_names = { "self", "self.sub_module", "self.sub_module.nested_list[0]", "self.sub_module.nested_list[1][key]", # NOTE: named_sub_modules allows recursive structures "self.sub_module.nested_tuple[0]", "self.sub_module.nested_tuple[1][0]", # NEW: named_sub_modules allows recursive structures, but named_parameters does not } found_names = {name for name, _ in root.named_sub_modules()} assert found_names == expected_names, ( f"Missing or extra modules found. Missing: {expected_names - found_names}, Extra: {found_names - expected_names}" ) def test_complex_module_set_attribute_by_name(): root = Module() root.sub_module = Module() root.sub_module.nested_list = [Module(), {"key": Module()}] same_module = Module() root.sub_module.nested_tuple = (Module(), [same_module, same_module]) set_attribute_by_name(root, "test_attrib", True) assert root.test_attrib is True set_attribute_by_name(root, "sub_module.test_attrib", True) assert root.sub_module.test_attrib is True set_attribute_by_name(root, "sub_module.nested_list[0].test_attrib", True) assert root.sub_module.nested_list[0].test_attrib is True set_attribute_by_name(root, "sub_module.nested_list[1]['key'].test_attrib", True) assert root.sub_module.nested_list[1]["key"].test_attrib is True set_attribute_by_name(root, "sub_module.nested_tuple[0].test_attrib", True) assert root.sub_module.nested_tuple[0].test_attrib is True set_attribute_by_name(root, "sub_module.nested_tuple[1][0].test_attrib", True) assert root.sub_module.nested_tuple[1][0].test_attrib is True assert root.sub_module.nested_tuple[1][1].test_attrib is True class DuplicateModule(Module): def __init__(self): super().__init__() self.p0 = dspy.Predict("question -> answer") self.p1 = self.p0 def test_named_parameters_duplicate_references(): module = DuplicateModule() # Only testing for whether exceptions are thrown or not # As Module.named_parameters() is recursive, this is mainly for catching infinite recursion module.named_parameters() def test_load_dspy_program_cross_version(): """ Test backward compatibility for loading a saved DSPy program. This test verifies that DSPy can load a program saved in version 3.0.1, ensuring compatibility with older versions. The saved state is located in 'test/primitives/resources/saved_program.json' and represents an optimized `dspy.ReAct` program. """ path = Path(__file__).parent / "resources" / "saved_program.json" loaded_react = dspy.ReAct("question->answer", tools=[]) loaded_react.load(path) assert ( "Imagine you are a detective racing against time to solve a high-profile" in loaded_react.react.signature.instructions ) assert "Given the very verbose fields `question`" in loaded_react.extract.predict.signature.instructions assert len(loaded_react.react.demos) == 2 assert len(loaded_react.extract.predict.demos) == 2 ``` -------------------------------------------------------------------------------- /dspy/dsp/utils/dpr.py: -------------------------------------------------------------------------------- ```python """ Source: DPR Implementation from Facebook Research https://github.com/facebookresearch/DPR/tree/master/dpr Original license: https://github.com/facebookresearch/DPR/blob/main/LICENSE """ import copy import logging import unicodedata import regex logger = logging.getLogger(__name__) class Tokens: """A class to represent a list of tokenized text.""" TEXT = 0 TEXT_WS = 1 SPAN = 2 POS = 3 LEMMA = 4 NER = 5 def __init__(self, data, annotators, opts=None): self.data = data self.annotators = annotators self.opts = opts or {} def __len__(self): """The number of tokens.""" return len(self.data) def slice(self, i=None, j=None): """Return a view of the list of tokens from [i, j).""" new_tokens = copy.copy(self) new_tokens.data = self.data[i:j] return new_tokens def untokenize(self): """Returns the original text (with whitespace reinserted).""" return "".join([t[self.TEXT_WS] for t in self.data]).strip() def words(self, uncased=False): """Returns a list of the text of each token Args: uncased: lower cases text """ if uncased: return [t[self.TEXT].lower() for t in self.data] else: return [t[self.TEXT] for t in self.data] def offsets(self): """Returns a list of [start, end) character offsets of each token.""" return [t[self.SPAN] for t in self.data] def pos(self): """Returns a list of part-of-speech tags of each token. Returns None if this annotation was not included. """ if "pos" not in self.annotators: return None return [t[self.POS] for t in self.data] def lemmas(self): """Returns a list of the lemmatized text of each token. Returns None if this annotation was not included. """ if "lemma" not in self.annotators: return None return [t[self.LEMMA] for t in self.data] def entities(self): """Returns a list of named-entity-recognition tags of each token. Returns None if this annotation was not included. """ if "ner" not in self.annotators: return None return [t[self.NER] for t in self.data] def ngrams(self, n=1, uncased=False, filter_fn=None, as_strings=True): """Returns a list of all ngrams from length 1 to n. Args: n: upper limit of ngram length uncased: lower cases text filter_fn: user function that takes in an ngram list and returns True or False to keep or not keep the ngram as_string: return the ngram as a string vs list """ def _skip(gram): if not filter_fn: return False return filter_fn(gram) words = self.words(uncased) ngrams = [ (s, e + 1) for s in range(len(words)) for e in range(s, min(s + n, len(words))) if not _skip(words[s : e + 1]) ] # Concatenate into strings if as_strings: ngrams = ["{}".format(" ".join(words[s:e])) for (s, e) in ngrams] return ngrams def entity_groups(self): """Group consecutive entity tokens with the same NER tag.""" entities = self.entities() if not entities: return None non_ent = self.opts.get("non_ent", "O") groups = [] idx = 0 while idx < len(entities): ner_tag = entities[idx] # Check for entity tag if ner_tag != non_ent: # Chomp the sequence start = idx while idx < len(entities) and entities[idx] == ner_tag: idx += 1 groups.append((self.slice(start, idx).untokenize(), ner_tag)) else: idx += 1 return groups class Tokenizer: """Base tokenizer class. Tokenizers implement tokenize, which should return a Tokens class. """ def tokenize(self, text): raise NotImplementedError def shutdown(self): pass def __del__(self): self.shutdown() class SimpleTokenizer(Tokenizer): ALPHA_NUM = r"[\p{L}\p{N}\p{M}]+" NON_WS = r"[^\p{Z}\p{C}]" def __init__(self, **kwargs): """ Args: annotators: None or empty set (only tokenizes). """ self._regexp = regex.compile( "(%s)|(%s)" % (self.ALPHA_NUM, self.NON_WS), flags=regex.IGNORECASE + regex.UNICODE + regex.MULTILINE, ) if len(kwargs.get("annotators", {})) > 0: logger.warning( "%s only tokenizes! Skipping annotators: %s", type(self).__name__, kwargs.get("annotators"), ) self.annotators = set() def tokenize(self, text): data = [] matches = list(self._regexp.finditer(text)) for i in range(len(matches)): # Get text token = matches[i].group() # Get whitespace span = matches[i].span() start_ws = span[0] if i + 1 < len(matches): end_ws = matches[i + 1].span()[0] else: end_ws = span[1] # Format data data.append( ( token, text[start_ws:end_ws], span, ) ) return Tokens(data, self.annotators) def has_answer(tokenized_answers, text): text = DPR_normalize(text) for single_answer in tokenized_answers: for i in range(0, len(text) - len(single_answer) + 1): if single_answer == text[i : i + len(single_answer)]: return True return False def locate_answers(tokenized_answers, text): """ Returns each occurrence of an answer as (offset, endpos) in terms of *characters*. """ tokenized_text = DPR_tokenize(text) occurrences = [] text_words, text_word_positions = tokenized_text.words(uncased=True), tokenized_text.offsets() answers_words = [ans.words(uncased=True) for ans in tokenized_answers] for single_answer in answers_words: for i in range(0, len(text_words) - len(single_answer) + 1): if single_answer == text_words[i : i + len(single_answer)]: (offset, _), (_, endpos) = text_word_positions[i], text_word_positions[i + len(single_answer) - 1] occurrences.append((offset, endpos)) return occurrences STokenizer = SimpleTokenizer() def DPR_tokenize(text): # noqa: N802 return STokenizer.tokenize(unicodedata.normalize("NFD", text)) def DPR_normalize(text): # noqa: N802 return DPR_tokenize(text).words(uncased=True) # Source: https://github.com/shmsw25/qa-hard-em/blob/master/prepro_util.py def strip_accents(text): """Strips accents from a piece of text.""" text = unicodedata.normalize("NFD", text) output = [] for char in text: cat = unicodedata.category(char) if cat == "Mn": continue output.append(char) return "".join(output) ``` -------------------------------------------------------------------------------- /dspy/propose/utils.py: -------------------------------------------------------------------------------- ```python import inspect import json import re import dspy try: from IPython.core.magics.code import extract_symbols except ImportError: # Won't be able to read code from jupyter notebooks extract_symbols = None from dspy.predict.parameter import Parameter from dspy.teleprompt.utils import get_signature, new_getfile def strip_prefix(text): pattern = r"^[\*\s]*(([\w\'\-]+\s+){0,4}[\w\'\-]+):\s*" modified_text = re.sub(pattern, "", text) return modified_text.strip('"') def create_instruction_set_history_string(base_program, trial_logs, top_n): program_history = [] for trial_num in trial_logs: trial = trial_logs[trial_num] if "program_path" in trial: trial_program = base_program.deepcopy() trial_program.load(trial["program_path"]) program_history.append({ "program": trial_program, "score": trial["score"], }) # Deduplicate program history based on the program's instruction set seen_programs = set() unique_program_history = [] for entry in program_history: program = entry["program"] instruction_set = get_program_instruction_set_string(program) if instruction_set not in seen_programs: seen_programs.add(instruction_set) unique_program_history.append(entry) # Get the top n programs from program history top_n_program_history = sorted(unique_program_history, key=lambda x: x["score"], reverse=True)[:top_n] top_n_program_history.reverse() # Create formatted string instruction_set_history_string = "" for entry in top_n_program_history: program = entry["program"] score = entry["score"] instruction_set = get_program_instruction_set_string(program) instruction_set_history_string += instruction_set + f" | Score: {score}\n\n" return instruction_set_history_string def parse_list_of_instructions(instruction_string): # Try to convert the string representation of a list to an actual list using JSON try: instructions = json.loads(instruction_string) return instructions except json.JSONDecodeError: pass # If JSON decoding fails, extract strings within quotes instructions = re.findall(r'"([^"]*)"', instruction_string) return instructions def get_program_instruction_set_string(program): instruction_list = [] for _, pred in enumerate(program.predictors()): pred_instructions = get_signature(pred).instructions instruction_list.append(f'"{pred_instructions}"') # Joining the list into a single string that looks like a list return f"[{', '.join(instruction_list)}]" def create_predictor_level_history_string(base_program, predictor_i, trial_logs, top_n): instruction_aggregate = {} instruction_history = [] # Load trial programs for trial_num in trial_logs: trial = trial_logs[trial_num] if "program_path" in trial: trial_program = base_program.deepcopy() trial_program.load(trial["program_path"]) instruction_history.append({ "program": trial_program, "score": trial["score"], }) # Aggregate scores for each instruction for history_item in instruction_history: predictor = history_item["program"].predictors()[predictor_i] instruction = get_signature(predictor).instructions score = history_item["score"] if instruction in instruction_aggregate: instruction_aggregate[instruction]["total_score"] += score instruction_aggregate[instruction]["count"] += 1 else: instruction_aggregate[instruction] = {"total_score": score, "count": 1} # Calculate average score for each instruction and prepare for sorting predictor_history = [] for instruction, data in instruction_aggregate.items(): average_score = data["total_score"] / data["count"] predictor_history.append((instruction, average_score)) # Deduplicate and sort by average score, then select top N seen_instructions = set() unique_predictor_history = [] for instruction, score in predictor_history: if instruction not in seen_instructions: seen_instructions.add(instruction) unique_predictor_history.append((instruction, score)) top_instructions = sorted(unique_predictor_history, key=lambda x: x[1], reverse=True)[:top_n] top_instructions.reverse() # Create formatted history string predictor_history_string = "" for instruction, score in top_instructions: predictor_history_string += instruction + f" | Score: {score}\n\n" return predictor_history_string def create_example_string(fields, example): # Building the output string output = [] for field_name, field_values in fields.items(): name = field_values.json_schema_extra["prefix"] # Determine the value from input_data or prediction_data value = example.get(field_name) # Construct the string for the current field field_str = f"{name} {value}" output.append(field_str) # Joining all the field strings return "\n".join(output) def get_dspy_source_code(module): header = [] base_code = "" # Don't get source code for Predict or ChainOfThought modules (NOTE we will need to extend this list as more DSPy.modules are added) # TODO: if type(module).__name__ not in ["Predict", "ChainOfThought", "ReAct"]: if not type(module).__name__ == "Predict" and not type(module).__name__ == "ChainOfThought": try: base_code = inspect.getsource(type(module)) except TypeError: obj = type(module) cell_code = "".join(inspect.linecache.getlines(new_getfile(obj))) class_code = extract_symbols(cell_code, obj.__name__)[0][0] base_code = str(class_code) completed_set = set() for attribute in module.__dict__.keys(): try: iterable = iter(getattr(module, attribute)) except TypeError: iterable = [getattr(module, attribute)] for item in iterable: # Skip items that are unhashable (like module history) try: hash(item) except TypeError: continue if isinstance(item, Parameter): if hasattr(item, "signature") and item.signature is not None and item.signature.__pydantic_parent_namespace__["signature_name"] + "_sig" not in completed_set: try: header.append(inspect.getsource(item.signature)) print(inspect.getsource(item.signature)) except (TypeError, OSError): header.append(str(item.signature)) completed_set.add(item.signature.__pydantic_parent_namespace__["signature_name"] + "_sig") if isinstance(item, dspy.Module): code = get_dspy_source_code(item).strip() if code not in completed_set: header.append(code) completed_set.add(code) completed_set.add(item) return "\n\n".join(header) + "\n\n" + base_code ``` -------------------------------------------------------------------------------- /docs/docs/api/optimizers/GEPA/overview.md: -------------------------------------------------------------------------------- ```markdown # dspy.GEPA: Reflective Prompt Optimizer **GEPA** (Genetic-Pareto) is a reflective optimizer proposed in "GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning" (Agrawal et al., 2025, [arxiv:2507.19457](https://arxiv.org/abs/2507.19457)), that adaptively evolves _textual components_ (such as prompts) of arbitrary systems. In addition to scalar scores returned by metrics, users can also provide GEPA with a text feedback to guide the optimization process. Such textual feedback provides GEPA more visibility into why the system got the score that it did, and then GEPA can introspect to identify how to improve the score. This allows GEPA to propose high performing prompts in very few rollouts. <!-- START_API_REF --> ::: dspy.GEPA handler: python options: members: - auto_budget - compile - get_params show_source: true show_root_heading: true heading_level: 2 docstring_style: google show_root_full_path: true show_object_full_path: false separate_signature: false inherited_members: true ::: <!-- END_API_REF --> One of the key insights behind GEPA is its ability to leverage domain-specific textual feedback. Users should provide a feedback function as the GEPA metric, which has the following call signature: <!-- START_API_REF --> ::: dspy.teleprompt.gepa.gepa.GEPAFeedbackMetric handler: python options: members: - __call__ show_source: true show_root_heading: true heading_level: 2 docstring_style: google show_root_full_path: true show_object_full_path: false separate_signature: false inherited_members: true ::: <!-- END_API_REF --> When `track_stats=True`, GEPA returns detailed results about all of the proposed candidates, and metadata about the optimization run. The results are available in the `detailed_results` attribute of the optimized program returned by GEPA, and has the following type: <!-- START_API_REF --> ::: dspy.teleprompt.gepa.gepa.DspyGEPAResult handler: python options: show_source: true show_root_heading: true heading_level: 2 docstring_style: google show_root_full_path: true show_object_full_path: false separate_signature: false inherited_members: true ::: <!-- END_API_REF --> ## Usage Examples See GEPA usage tutorials in [GEPA Tutorials](../../../tutorials/gepa_ai_program/index.md). ### Inference-Time Search GEPA can act as a test-time/inference search mechanism. By setting your `valset` to your _evaluation batch_ and using `track_best_outputs=True`, GEPA produces for each batch element the highest-scoring outputs found during the evolutionary search. ```python gepa = dspy.GEPA(metric=metric, track_stats=True, ...) new_prog = gepa.compile(student, trainset=my_tasks, valset=my_tasks) highest_score_achieved_per_task = new_prog.detailed_results.highest_score_achieved_per_val_task best_outputs = new_prog.detailed_results.best_outputs_valset ``` ## How Does GEPA Work? ### 1. **Reflective Prompt Mutation** GEPA uses LLMs to _reflect_ on structured execution traces (inputs, outputs, failures, feedback), targeting a chosen module and proposing a new instruction/program text tailored to real observed failures and rich textual/environmental feedback. ### 2. **Rich Textual Feedback as Optimization Signal** GEPA can leverage _any_ textual feedback available—not just scalar rewards. This includes evaluation logs, code traces, failed parses, constraint violations, error message strings, or even isolated submodule-specific feedback. This allows actionable, domain-aware optimization. ### 3. **Pareto-based Candidate Selection** Rather than evolving just the _best_ global candidate (which leads to local optima or stagnation), GEPA maintains a Pareto frontier: the set of candidates which achieve the highest score on at least one evaluation instance. In each iteration, the next candidate to mutate is sampled (with probability proportional to coverage) from this frontier, guaranteeing both exploration and robust retention of complementary strategies. ### Algorithm Summary 1. **Initialize** the candidate pool with the the unoptimized program. 2. **Iterate**: - **Sample a candidate** (from Pareto frontier). - **Sample a minibatch** from the train set. - **Collect execution traces + feedbacks** for module rollout on minibatch. - **Select a module** of the candidate for targeted improvement. - **LLM Reflection:** Propose a new instruction/prompt for the targeted module using reflective meta-prompting and the gathered feedback. - **Roll out the new candidate** on the minibatch; **if improved, evaluate on Pareto validation set**. - **Update the candidate pool/Pareto frontier.** - **[Optionally] System-aware merge/crossover**: Combine best-performing modules from distinct lineages. 3. **Continue** until rollout or metric budget is exhausted. 4. **Return** candidate with best aggregate performance on validation. ## Implementing Feedback Metrics A well-designed metric is central to GEPA's sample efficiency and learning signal richness. GEPA expects the metric to returns a `dspy.Prediction(score=..., feedback=...)`. GEPA leverages natural language traces from LLM-based workflows for optimization, preserving intermediate trajectories and errors in plain text rather than reducing them to numerical rewards. This mirrors human diagnostic processes, enabling clearer identification of system behaviors and bottlenecks. Practical Recipe for GEPA-Friendly Feedback: - **Leverage Existing Artifacts**: Use logs, unit tests, evaluation scripts, and profiler outputs; surfacing these often suffices. - **Decompose Outcomes**: Break scores into per-objective components (e.g., correctness, latency, cost, safety) and attribute errors to steps. - **Expose Trajectories**: Label pipeline stages, reporting pass/fail with salient errors (e.g., in code generation pipelines). - **Ground in Checks**: Employ automatic validators (unit tests, schemas, simulators) or LLM-as-a-judge for non-verifiable tasks (as in PUPA). - **Prioritize Clarity**: Focus on error coverage and decision points over technical complexity. ### Examples - **Document Retrieval** (e.g., HotpotQA): List correctly retrieved, incorrect, or missed documents, beyond mere Recall/F1 scores. - **Multi-Objective Tasks** (e.g., PUPA): Decompose aggregate scores to reveal contributions from each objective, highlighting tradeoffs (e.g., quality vs. privacy). - **Stacked Pipelines** (e.g., code generation: parse → compile → run → profile → evaluate): Expose stage-specific failures; natural-language traces often suffice for LLM self-correction. ## Custom Instruction Proposal For advanced customization of GEPA's instruction proposal mechanism, including custom instruction proposers and component selectors, see [Advanced Features](GEPA_Advanced.md). ## Further Reading - [GEPA Paper: arxiv:2507.19457](https://arxiv.org/abs/2507.19457) - [GEPA Github](https://github.com/gepa-ai/gepa) - This repository provides the core GEPA evolution pipeline used by `dspy.GEPA` optimizer. - [DSPy Tutorials](../../../tutorials/gepa_ai_program/index.md) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/cache/index.md: -------------------------------------------------------------------------------- ```markdown # Use and Customize DSPy Cache In this tutorial, we will explore the design of DSPy's caching mechanism and demonstrate how to effectively use and customize it. ## DSPy Cache Structure DSPy's caching system is architected in three distinct layers: 1. **In-memory cache**: Implemented using `cachetools.LRUCache`, this layer provides fast access to frequently used data. 2. **On-disk cache**: Leveraging `diskcache.FanoutCache`, this layer offers persistent storage for cached items. 3. **Prompt cache (Server-side cache)**: This layer is managed by the LLM service provider (e.g., OpenAI, Anthropic). While DSPy does not directly control the server-side prompt cache, it offers users the flexibility to enable, disable, and customize the in-memory and on-disk caches to suit their specific requirements. ## Using DSPy Cache By default, both in-memory and on-disk caching are automatically enabled in DSPy. No specific action is required to start using the cache. When a cache hit occurs, you will observe a significant reduction in the module call's execution time. Furthermore, if usage tracking is enabled, the usage metrics for a cached call will be `None`. Consider the following example: ```python import dspy import os import time os.environ["OPENAI_API_KEY"] = "{your_openai_key}" dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"), track_usage=True) predict = dspy.Predict("question->answer") start = time.time() result1 = predict(question="Who is the GOAT of basketball?") print(f"Time elapse: {time.time() - start: 2f}\n\nTotal usage: {result1.get_lm_usage()}") start = time.time() result2 = predict(question="Who is the GOAT of basketball?") print(f"Time elapse: {time.time() - start: 2f}\n\nTotal usage: {result2.get_lm_usage()}") ``` A sample output looks like: ``` Time elapse: 4.384113 Total usage: {'openai/gpt-4o-mini': {'completion_tokens': 97, 'prompt_tokens': 144, 'total_tokens': 241, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0, 'text_tokens': None}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0, 'text_tokens': None, 'image_tokens': None}}} Time elapse: 0.000529 Total usage: {} ``` ## Disabling/Enabling DSPy Cache There are scenarios where you might need to disable caching, either entirely or selectively for in-memory or on-disk caches. For instance: - You require different responses for identical LM requests. - You lack disk write permissions and need to disable the on-disk cache. - You have limited memory resources and wish to disable the in-memory cache. DSPy provides the `dspy.configure_cache()` utility function for this purpose. You can use the corresponding flags to control the enabled/disabled state of each cache type: ```python dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) ``` In additions, you can manage the capacity of the in-memory and on-disk caches: ```python dspy.configure_cache( enable_disk_cache=True, enable_memory_cache=True, disk_size_limit_bytes=YOUR_DESIRED_VALUE, memory_max_entries=YOUR_DESIRED_VALUE, ) ``` Please note that `disk_size_limit_bytes` defines the maximum size in bytes for the on-disk cache, while `memory_max_entries` specifies the maximum number of entries for the in-memory cache. ## Understanding and Customizing the Cache In specific situations, you might want to implement a custom cache, for example, to gain finer control over how cache keys are generated. By default, the cache key is derived from a hash of all request arguments sent to `litellm`, excluding credentials like `api_key`. To create a custom cache, you need to subclass `dspy.clients.Cache` and override the relevant methods: ```python class CustomCache(dspy.clients.Cache): def __init__(self, **kwargs): {write your own constructor} def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> str: {write your logic of computing cache key} def get(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> Any: {write your cache read logic} def put( self, request: dict[str, Any], value: Any, ignored_args_for_cache_key: Optional[list[str]] = None, enable_memory_cache: bool = True, ) -> None: {write your cache write logic} ``` To ensure seamless integration with the rest of DSPy, it is recommended to implement your custom cache using the same method signatures as the base class, or at a minimum, include `**kwargs` in your method definitions to prevent runtime errors during cache read/write operations. Once your custom cache class is defined, you can instruct DSPy to use it: ```python dspy.cache = CustomCache() ``` Let's illustrate this with a practical example. Suppose we want the cache key computation to depend solely on the request message content, ignoring other parameters like the specific LM being called. We can create a custom cache as follows: ```python class CustomCache(dspy.clients.Cache): def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> str: messages = request.get("messages", []) return sha256(ujson.dumps(messages, sort_keys=True).encode()).hexdigest() dspy.cache = CustomCache(enable_disk_cache=True, enable_memory_cache=True, disk_cache_dir=dspy.clients.DISK_CACHE_DIR) ``` For comparison, consider executing the code below without the custom cache: ```python import dspy import os import time os.environ["OPENAI_API_KEY"] = "{your_openai_key}" dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) predict = dspy.Predict("question->answer") start = time.time() result1 = predict(question="Who is the GOAT of soccer?") print(f"Time elapse: {time.time() - start: 2f}") start = time.time() with dspy.context(lm=dspy.LM("openai/gpt-4.1-mini")): result2 = predict(question="Who is the GOAT of soccer?") print(f"Time elapse: {time.time() - start: 2f}") ``` The time elapsed will indicate that the cache is not hit on the second call. However, when using the custom cache: ```python import dspy import os import time from typing import Dict, Any, Optional import ujson from hashlib import sha256 os.environ["OPENAI_API_KEY"] = "{your_openai_key}" dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) class CustomCache(dspy.clients.Cache): def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> str: messages = request.get("messages", []) return sha256(ujson.dumps(messages, sort_keys=True).encode()).hexdigest() dspy.cache = CustomCache(enable_disk_cache=True, enable_memory_cache=True, disk_cache_dir=dspy.clients.DISK_CACHE_DIR) predict = dspy.Predict("question->answer") start = time.time() result1 = predict(question="Who is the GOAT of volleyball?") print(f"Time elapse: {time.time() - start: 2f}") start = time.time() with dspy.context(lm=dspy.LM("openai/gpt-4.1-mini")): result2 = predict(question="Who is the GOAT of volleyball?") print(f"Time elapse: {time.time() - start: 2f}") ``` You will observe that the cache is hit on the second call, demonstrating the effect of the custom cache key logic. ``` -------------------------------------------------------------------------------- /dspy/datasets/dataloader.py: -------------------------------------------------------------------------------- ```python import random from collections.abc import Mapping from typing import TYPE_CHECKING import dspy from dspy.datasets.dataset import Dataset if TYPE_CHECKING: import pandas as pd class DataLoader(Dataset): def __init__(self): pass def from_huggingface( self, dataset_name: str, *args, input_keys: tuple[str] = (), fields: tuple[str] | None = None, **kwargs, ) -> Mapping[str, list[dspy.Example]] | list[dspy.Example]: if fields and not isinstance(fields, tuple): raise ValueError("Invalid fields provided. Please provide a tuple of fields.") if not isinstance(input_keys, tuple): raise ValueError("Invalid input keys provided. Please provide a tuple of input keys.") from datasets import load_dataset dataset = load_dataset(dataset_name, *args, **kwargs) if isinstance(dataset, list) and isinstance(kwargs["split"], list): dataset = {split_name: dataset[idx] for idx, split_name in enumerate(kwargs["split"])} try: returned_split = {} for split_name in dataset.keys(): if fields: returned_split[split_name] = [ dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset[split_name] ] else: returned_split[split_name] = [ dspy.Example({field: row[field] for field in row.keys()}).with_inputs(*input_keys) for row in dataset[split_name] ] return returned_split except AttributeError: if fields: return [ dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset ] else: return [ dspy.Example({field: row[field] for field in row.keys()}).with_inputs(*input_keys) for row in dataset ] def from_csv( self, file_path: str, fields: list[str] | None = None, input_keys: tuple[str] = (), ) -> list[dspy.Example]: from datasets import load_dataset dataset = load_dataset("csv", data_files=file_path)["train"] if not fields: fields = list(dataset.features) return [dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset] def from_pandas( self, df: "pd.DataFrame", fields: list[str] | None = None, input_keys: tuple[str] = (), ) -> list[dspy.Example]: if fields is None: fields = list(df.columns) return [ dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for _, row in df.iterrows() ] def from_json( self, file_path: str, fields: list[str] | None = None, input_keys: tuple[str] = (), ) -> list[dspy.Example]: from datasets import load_dataset dataset = load_dataset("json", data_files=file_path)["train"] if not fields: fields = list(dataset.features) return [dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset] def from_parquet( self, file_path: str, fields: list[str] | None = None, input_keys: tuple[str] = (), ) -> list[dspy.Example]: from datasets import load_dataset dataset = load_dataset("parquet", data_files=file_path)["train"] if not fields: fields = list(dataset.features) return [dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset] def from_rm(self, num_samples: int, fields: list[str], input_keys: list[str]) -> list[dspy.Example]: try: rm = dspy.settings.rm try: return [ dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in rm.get_objects(num_samples=num_samples, fields=fields) ] except AttributeError: raise ValueError( "Retrieval module does not support `get_objects`. Please use a different retrieval module." ) except AttributeError: raise ValueError( "Retrieval module not found. Please set a retrieval module using `dspy.settings.configure`." ) def sample( self, dataset: list[dspy.Example], n: int, *args, **kwargs, ) -> list[dspy.Example]: if not isinstance(dataset, list): raise ValueError( f"Invalid dataset provided of type {type(dataset)}. Please provide a list of `dspy.Example`s." ) return random.sample(dataset, n, *args, **kwargs) def train_test_split( self, dataset: list[dspy.Example], train_size: int | float = 0.75, test_size: int | float | None = None, random_state: int | None = None, ) -> Mapping[str, list[dspy.Example]]: if random_state is not None: random.seed(random_state) dataset_shuffled = dataset.copy() random.shuffle(dataset_shuffled) if train_size is not None and isinstance(train_size, float) and (0 < train_size < 1): train_end = int(len(dataset_shuffled) * train_size) elif train_size is not None and isinstance(train_size, int): train_end = train_size else: raise ValueError( "Invalid `train_size`. Please provide a float between 0 and 1 to represent the proportion of the " "dataset to include in the train split or an int to represent the absolute number of samples to " f"include in the train split. Received `train_size`: {train_size}." ) if test_size is not None: if isinstance(test_size, float) and (0 < test_size < 1): test_end = int(len(dataset_shuffled) * test_size) elif isinstance(test_size, int): test_end = test_size else: raise ValueError( "Invalid `test_size`. Please provide a float between 0 and 1 to represent the proportion of the " "dataset to include in the test split or an int to represent the absolute number of samples to " f"include in the test split. Received `test_size`: {test_size}." ) if train_end + test_end > len(dataset_shuffled): raise ValueError( "`train_size` + `test_size` cannot exceed the total number of samples. Received " f"`train_size`: {train_end}, `test_size`: {test_end}, and `dataset_size`: {len(dataset_shuffled)}." ) else: test_end = len(dataset_shuffled) - train_end train_dataset = dataset_shuffled[:train_end] test_dataset = dataset_shuffled[train_end : train_end + test_end] return {"train": train_dataset, "test": test_dataset} ``` -------------------------------------------------------------------------------- /dspy/teleprompt/bettertogether.py: -------------------------------------------------------------------------------- ```python import logging import random from typing import Callable import dspy from dspy.primitives.example import Example from dspy.primitives.module import Module from dspy.teleprompt.bootstrap_finetune import ( BootstrapFinetune, all_predictors_have_lms, kill_lms, launch_lms, prepare_student, ) from dspy.teleprompt.random_search import BootstrapFewShotWithRandomSearch from dspy.teleprompt.teleprompt import Teleprompter logger = logging.getLogger(__name__) class BetterTogether(Teleprompter): STRAT_SEP = " -> " def __init__(self, metric: Callable, prompt_optimizer: Teleprompter | None = None, weight_optimizer: Teleprompter | None = None, seed: int | None = None, ): if not dspy.settings.experimental: raise ValueError("This is an experimental optimizer. Set `dspy.settings.experimental` to `True` to use it.") # TODO: Note that the BetterTogether optimizer is meaningful when # BootstrapFinetune uses a metric to filter the training data before # fine-tuning. However, one can also choose to run this optimizer with # a BootstrapFinetune without a metric, say, if there aren't labels # available for the training data. Should this be noted somewhere? # TODO: We should re-consider if the metric should be required. self.prompt_optimizer = prompt_optimizer if prompt_optimizer else BootstrapFewShotWithRandomSearch(metric=metric) self.weight_optimizer = weight_optimizer if weight_optimizer else BootstrapFinetune(metric=metric) is_supported_prompt = isinstance(self.prompt_optimizer, BootstrapFewShotWithRandomSearch) is_supported_weight = isinstance(self.weight_optimizer, BootstrapFinetune) if not is_supported_prompt or not is_supported_weight: raise ValueError( "The BetterTogether optimizer only supports the following optimizers for now: BootstrapFinetune, " "BootstrapFewShotWithRandomSearch." ) self.rng = random.Random(seed) def compile( self, student: Module, trainset: list[Example], strategy: str = "p -> w -> p", valset_ratio = 0.1, ) -> Module: # TODO: We could record acc on a different valset to pick the best # strategy within the provided strategy logger.info("Validating the strategy") parsed_strategy = strategy.lower().split(self.STRAT_SEP) if not all(s in ["p", "w"] for s in parsed_strategy): raise ValueError( f"The strategy should be a sequence of 'p' and 'w' separated by '{self.STRAT_SEP}', but " f"found: {strategy}" ) logger.info("Preparing the student program...") # TODO: Prepare student returns student.reset_copy(), which is what gets # optimized. We should make this clear in the doc comments. student = prepare_student(student) all_predictors_have_lms(student) # Make a shallow copy of the trainset, so that we don't change the order # of the examples in the original trainset trainset = trainset[:] logger.info("Compiling the student program...") student = self._run_strategies(parsed_strategy, student, trainset, valset_ratio) logger.info("BetterTogether has finished compiling the student program") return student def _run_strategies(self, parsed_strategy, student, trainset, valset_ratio) -> Module: # Keep track of all the partial strategies/programs in parsed_strategy # "" corresponds to the initial student program candidate_programs = [] candidate_programs.append(("", student)) launched_flag = False for ind, step_code in enumerate(parsed_strategy): current_strategy = self.STRAT_SEP.join(parsed_strategy[:ind + 1]) logger.info( f"\n########## Step {ind + 1} of {len(parsed_strategy)} - Strategy " f"'{current_strategy}' ##########" ) logger.info("Shuffling the trainset...") self.rng.shuffle(trainset) if not launched_flag: launch_lms(student) launched_flag = True # TODO: Should we reset or just deepcopy? How does resetting affect # the predictor LMs? student = student.deepcopy() student._compiled = False if step_code == "p": student = self._compile_prompt_optimizer(student, trainset, valset_ratio) elif step_code == "w": student = self._compile_weight_optimizer(student, trainset) launched_flag = False # Record the program corresponding to the current strategy candidate_programs.append((current_strategy, student)) if launched_flag: kill_lms(student) student.candidate_programs = candidate_programs return student def _compile_prompt_optimizer(self, student, trainset, valset_ratio) -> Module: logger.info("Preparing for prompt optimization...") # Sampling a validation set from the trainset for the prompt optimizer # We drop the hints for prompt optimization trainset = [x.with_inputs(*list(set(x.inputs().keys()) - {"hint"})) for x in trainset] num_val = int(valset_ratio * len(trainset)) prompt_valset = trainset[:num_val] prompt_trainset = trainset[num_val:] # TODO: To make this optimizer general, we need to ensure that all the # prompt optimizers are accepting a valset or encode a way to check if # a valset should be passed to an optimizer's compile method. # TODO: We should ensure that the prompt optimizers in DSPy respect the # predictor.lm attributes. In particular, # BootstrapFewShotWithRandomSearch seems to be resetting these. We are # manually re-setting the LMs here to circumvent this issue, but we # should consider addressing it in BFRS. logger.info("Compiling the prompt optimizer...") pred_lms = [pred.lm for pred in student.predictors()] student = self.prompt_optimizer.compile(student, trainset=prompt_trainset, valset=prompt_valset) for pred, lm in zip(student.predictors(), pred_lms, strict=False): pred.lm = lm return student def _compile_weight_optimizer(self, student, trainset) -> Module: logger.info("Preparing for weight optimization...") # Saving the LMs before compiling the weight optimizer original_lms = [pred.lm for pred in student.predictors()] # TODO: To make this optimizer general, we need to ensure that all the # prompt optimizers are accepting a valset or encode a way to check if # a valset should be passed to an optimizer's compile. logger.info("Compiling the weight optimizer...") student = self.weight_optimizer.compile(student, trainset=trainset) # Updating the train kwargs for the new LMs. This is needed because the # train_kwargs of the optimizer is configured for the original LMs. new_lms = [pred.lm for pred in student.predictors()] for original_lm, new_lm in zip(original_lms, new_lms, strict=False): original_params = self.weight_optimizer.train_kwargs[original_lm] self.weight_optimizer.train_kwargs[new_lm] = original_params return student ``` -------------------------------------------------------------------------------- /dspy/clients/embedding.py: -------------------------------------------------------------------------------- ```python from typing import Any, Callable import litellm import numpy as np from dspy.clients.cache import request_cache class Embedder: """DSPy embedding class. The class for computing embeddings for text inputs. This class provides a unified interface for both: 1. Hosted embedding models (e.g. OpenAI's text-embedding-3-small) via litellm integration 2. Custom embedding functions that you provide For hosted models, simply pass the model name as a string (e.g., "openai/text-embedding-3-small"). The class will use litellm to handle the API calls and caching. For custom embedding models, pass a callable function that: - Takes a list of strings as input. - Returns embeddings as either: - A 2D numpy array of float32 values - A 2D list of float32 values - Each row should represent one embedding vector Args: model: The embedding model to use. This can be either a string (representing the name of the hosted embedding model, must be an embedding model supported by litellm) or a callable that represents a custom embedding model. batch_size (int, optional): The default batch size for processing inputs in batches. Defaults to 200. caching (bool, optional): Whether to cache the embedding response when using a hosted model. Defaults to True. **kwargs: Additional default keyword arguments to pass to the embedding model. Examples: Example 1: Using a hosted model. ```python import dspy embedder = dspy.Embedder("openai/text-embedding-3-small", batch_size=100) embeddings = embedder(["hello", "world"]) assert embeddings.shape == (2, 1536) ``` Example 2: Using any local embedding model, e.g. from https://huggingface.co/models?library=sentence-transformers. ```python # pip install sentence_transformers import dspy from sentence_transformers import SentenceTransformer # Load an extremely efficient local model for retrieval model = SentenceTransformer("sentence-transformers/static-retrieval-mrl-en-v1", device="cpu") embedder = dspy.Embedder(model.encode) embeddings = embedder(["hello", "world"], batch_size=1) assert embeddings.shape == (2, 1024) ``` Example 3: Using a custom function. ```python import dspy import numpy as np def my_embedder(texts): return np.random.rand(len(texts), 10) embedder = dspy.Embedder(my_embedder) embeddings = embedder(["hello", "world"], batch_size=1) assert embeddings.shape == (2, 10) ``` """ def __init__(self, model: str | Callable, batch_size: int = 200, caching: bool = True, **kwargs: dict[str, Any]): self.model = model self.batch_size = batch_size self.caching = caching self.default_kwargs = kwargs def _preprocess(self, inputs, batch_size=None, caching=None, **kwargs): if isinstance(inputs, str): is_single_input = True inputs = [inputs] else: is_single_input = False if not all(isinstance(inp, str) for inp in inputs): raise ValueError("All inputs must be strings.") batch_size = batch_size or self.batch_size caching = caching or self.caching merged_kwargs = self.default_kwargs.copy() merged_kwargs.update(kwargs) input_batches = [] for i in range(0, len(inputs), batch_size): input_batches.append(inputs[i : i + batch_size]) return input_batches, caching, merged_kwargs, is_single_input def _postprocess(self, embeddings_list, is_single_input): embeddings = np.array(embeddings_list, dtype=np.float32) if is_single_input: return embeddings[0] else: return np.array(embeddings, dtype=np.float32) def __call__(self, inputs: str | list[str], batch_size: int | None = None, caching: bool | None = None, **kwargs: dict[str, Any]) -> np.ndarray: """Compute embeddings for the given inputs. Args: inputs: The inputs to compute embeddings for, can be a single string or a list of strings. batch_size (int, optional): The batch size for processing inputs. If None, defaults to the batch_size set during initialization. caching (bool, optional): Whether to cache the embedding response when using a hosted model. If None, defaults to the caching setting from initialization. kwargs: Additional keyword arguments to pass to the embedding model. These will override the default kwargs provided during initialization. Returns: numpy.ndarray: If the input is a single string, returns a 1D numpy array representing the embedding. If the input is a list of strings, returns a 2D numpy array of embeddings, one embedding per row. """ input_batches, caching, kwargs, is_single_input = self._preprocess(inputs, batch_size, caching, **kwargs) compute_embeddings = _cached_compute_embeddings if caching else _compute_embeddings embeddings_list = [] for batch in input_batches: embeddings_list.extend(compute_embeddings(self.model, batch, caching=caching, **kwargs)) return self._postprocess(embeddings_list, is_single_input) async def acall(self, inputs, batch_size=None, caching=None, **kwargs): input_batches, caching, kwargs, is_single_input = self._preprocess(inputs, batch_size, caching, **kwargs) embeddings_list = [] acompute_embeddings = _cached_acompute_embeddings if caching else _acompute_embeddings for batch in input_batches: embeddings_list.extend(await acompute_embeddings(self.model, batch, caching=caching, **kwargs)) return self._postprocess(embeddings_list, is_single_input) def _compute_embeddings(model, batch_inputs, caching=False, **kwargs): if isinstance(model, str): caching = caching and litellm.cache is not None embedding_response = litellm.embedding(model=model, input=batch_inputs, caching=caching, **kwargs) return [data["embedding"] for data in embedding_response.data] elif callable(model): return model(batch_inputs, **kwargs) else: raise ValueError(f"`model` in `dspy.Embedder` must be a string or a callable, but got {type(model)}.") @request_cache(ignored_args_for_cache_key=["api_key", "api_base", "base_url"]) def _cached_compute_embeddings(model, batch_inputs, caching=True, **kwargs): return _compute_embeddings(model, batch_inputs, caching=caching, **kwargs) async def _acompute_embeddings(model, batch_inputs, caching=False, **kwargs): if isinstance(model, str): caching = caching and litellm.cache is not None embedding_response = await litellm.aembedding(model=model, input=batch_inputs, caching=caching, **kwargs) return [data["embedding"] for data in embedding_response.data] elif callable(model): return model(batch_inputs, **kwargs) else: raise ValueError(f"`model` in `dspy.Embedder` must be a string or a callable, but got {type(model)}.") @request_cache(ignored_args_for_cache_key=["api_key", "api_base", "base_url"]) async def _cached_acompute_embeddings(model, batch_inputs, caching=True, **kwargs): return await _acompute_embeddings(model, batch_inputs, caching=caching, **kwargs) ``` -------------------------------------------------------------------------------- /dspy/utils/dummies.py: -------------------------------------------------------------------------------- ```python import random from collections import defaultdict from typing import Any import numpy as np from dspy.adapters.chat_adapter import FieldInfoWithName, field_header_pattern from dspy.clients.lm import LM from dspy.dsp.utils.utils import dotdict from dspy.signatures.field import OutputField from dspy.utils.callback import with_callbacks class DummyLM(LM): """Dummy language model for unit testing purposes. Three modes of operation: Mode 1: List of dictionaries If a list of dictionaries is provided, the dummy model will return the next dictionary in the list for each request, formatted according to the `format_field_with_value` function. Example: ``` lm = DummyLM([{"answer": "red"}, {"answer": "blue"}]) dspy.settings.configure(lm=lm) predictor("What color is the sky?") # Output: # [[## answer ##]] # red predictor("What color is the sky?") # Output: # [[## answer ##]] # blue ``` Mode 2: Dictionary of dictionaries If a dictionary of dictionaries is provided, the dummy model will return the value corresponding to the key which is contained with the final message of the prompt, formatted according to the `format_field_with_value` function from the chat adapter. ``` lm = DummyLM({"What color is the sky?": {"answer": "blue"}}) dspy.settings.configure(lm=lm) predictor("What color is the sky?") # Output: # [[## answer ##]] # blue ``` Mode 3: Follow examples If `follow_examples` is set to True, and the prompt contains an example input exactly equal to the prompt, the dummy model will return the output from that example. ``` lm = DummyLM([{"answer": "red"}], follow_examples=True) dspy.settings.configure(lm=lm) predictor("What color is the sky?, demos=dspy.Example(input="What color is the sky?", output="blue")) # Output: # [[## answer ##]] # blue ``` """ def __init__(self, answers: list[dict[str, Any]] | dict[str, dict[str, Any]], follow_examples: bool = False, adapter=None): super().__init__("dummy", "chat", 0.0, 1000, True) self.answers = answers if isinstance(answers, list): self.answers = iter(answers) self.follow_examples = follow_examples # Set adapter, defaulting to ChatAdapter if adapter is None: from dspy.adapters.chat_adapter import ChatAdapter adapter = ChatAdapter() self.adapter = adapter def _use_example(self, messages): # find all field names fields = defaultdict(int) for message in messages: if "content" in message: if ma := field_header_pattern.match(message["content"]): fields[message["content"][ma.start() : ma.end()]] += 1 # find the fields which are missing from the final turns max_count = max(fields.values()) output_fields = [field for field, count in fields.items() if count != max_count] # get the output from the last turn that has the output fields as headers final_input = messages[-1]["content"].split("\n\n")[0] for input, output in zip(reversed(messages[:-1]), reversed(messages), strict=False): if any(field in output["content"] for field in output_fields) and final_input in input["content"]: return output["content"] @with_callbacks def __call__(self, prompt=None, messages=None, **kwargs): def format_answer_fields(field_names_and_values: dict[str, Any]): fields_with_values = { FieldInfoWithName(name=field_name, info=OutputField()): value for field_name, value in field_names_and_values.items() } # The reason why DummyLM needs an adapter is because it needs to know which output format to mimic. # Normally LMs should not have any knowledge of an adapter, because the output format is defined in the prompt. adapter = self.adapter # Try to use role="assistant" if the adapter supports it (like JSONAdapter) try: return adapter.format_field_with_value(fields_with_values, role="assistant") except TypeError: # Fallback for adapters that don't support role parameter (like ChatAdapter) return adapter.format_field_with_value(fields_with_values) # Build the request. outputs = [] for _ in range(kwargs.get("n", 1)): messages = messages or [{"role": "user", "content": prompt}] kwargs = {**self.kwargs, **kwargs} if self.follow_examples: outputs.append(self._use_example(messages)) elif isinstance(self.answers, dict): outputs.append( next( (format_answer_fields(v) for k, v in self.answers.items() if k in messages[-1]["content"]), "No more responses", ) ) else: outputs.append(format_answer_fields(next(self.answers, {"answer": "No more responses"}))) # Logging, with removed api key & where `cost` is None on cache hit. kwargs = {k: v for k, v in kwargs.items() if not k.startswith("api_")} entry = {"prompt": prompt, "messages": messages, "kwargs": kwargs} entry = {**entry, "outputs": outputs, "usage": 0} entry = {**entry, "cost": 0} self.update_history(entry) return outputs async def acall(self, prompt=None, messages=None, **kwargs): return self.__call__(prompt=prompt, messages=messages, **kwargs) def get_convo(self, index): """Get the prompt + answer from the ith message.""" return self.history[index]["messages"], self.history[index]["outputs"] def dummy_rm(passages=()) -> callable: if not passages: def inner(query: str, *, k: int, **kwargs): raise ValueError("No passages defined") return inner max_length = max(map(len, passages)) + 100 vectorizer = DummyVectorizer(max_length) passage_vecs = vectorizer(passages) def inner(query: str, *, k: int, **kwargs): assert k <= len(passages) query_vec = vectorizer([query])[0] scores = passage_vecs @ query_vec largest_idx = (-scores).argsort()[:k] return [dotdict(long_text=passages[i]) for i in largest_idx] return inner class DummyVectorizer: """Simple vectorizer based on n-grams.""" def __init__(self, max_length=100, n_gram=2): self.max_length = max_length self.n_gram = n_gram self.P = 10**9 + 7 # A large prime number random.seed(123) self.coeffs = [random.randrange(1, self.P) for _ in range(n_gram)] def _hash(self, gram): """Hashes a string using a polynomial hash function.""" h = 1 for coeff, c in zip(self.coeffs, gram, strict=False): h = h * coeff + ord(c) h %= self.P return h % self.max_length def __call__(self, texts: list[str]) -> np.ndarray: vecs = [] for text in texts: grams = [text[i : i + self.n_gram] for i in range(len(text) - self.n_gram + 1)] vec = [0] * self.max_length for gram in grams: vec[self._hash(gram)] += 1 vecs.append(vec) vecs = np.array(vecs, dtype=np.float32) vecs -= np.mean(vecs, axis=1, keepdims=True) vecs /= np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-10 # Added epsilon to avoid division by zero return vecs ``` -------------------------------------------------------------------------------- /tests/utils/test_settings.py: -------------------------------------------------------------------------------- ```python import asyncio import time from concurrent.futures import ThreadPoolExecutor from unittest import mock import pytest from litellm import Choices, Message, ModelResponse import dspy def test_basic_dspy_settings(): dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x]) assert dspy.settings.lm.model == "openai/gpt-4o" assert isinstance(dspy.settings.adapter, dspy.JSONAdapter) assert len(dspy.settings.callbacks) == 1 def test_forbid_configure_call_in_child_thread(): dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x]) def worker(): with pytest.raises(RuntimeError, match="Cannot call dspy.configure"): dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[]) with ThreadPoolExecutor(max_workers=1) as executor: executor.submit(worker) def test_dspy_context(): dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x]) with dspy.context(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[]): assert dspy.settings.lm.model == "openai/gpt-4o-mini" assert len(dspy.settings.callbacks) == 0 assert dspy.settings.lm.model == "openai/gpt-4o" assert len(dspy.settings.callbacks) == 1 def test_dspy_context_parallel(): dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x]) def worker(i): with dspy.context(lm=dspy.LM("openai/gpt-4o-mini"), trace=[i], callbacks=[]): assert dspy.settings.lm.model == "openai/gpt-4o-mini" assert dspy.settings.trace == [i] assert len(dspy.settings.callbacks) == 0 with ThreadPoolExecutor(max_workers=5) as executor: executor.map(worker, range(3)) assert dspy.settings.lm.model == "openai/gpt-4o" assert len(dspy.settings.callbacks) == 1 def test_dspy_context_with_dspy_parallel(): dspy.configure(lm=dspy.LM("openai/gpt-4o", cache=False), adapter=dspy.ChatAdapter()) class MyModule(dspy.Module): def __init__(self): self.predict = dspy.Predict("question -> answer") def forward(self, question: str) -> str: lm = dspy.LM("openai/gpt-4o-mini", cache=False) if "France" in question else dspy.settings.lm with dspy.context(lm=lm): time.sleep(1) assert dspy.settings.lm.model == lm.model return self.predict(question=question) with mock.patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="[[ ## answer ## ]]\nParis"))], model="openai/gpt-4o-mini", ) module = MyModule() parallelizer = dspy.Parallel() input_pairs = [ (module, {"question": "What is the capital of France?"}), (module, {"question": "What is the capital of Germany?"}), ] parallelizer(input_pairs) # Verify mock was called correctly assert mock_completion.call_count == 2 for call_args in mock_completion.call_args_list: if "France" in call_args.kwargs["messages"][-1]["content"]: # France question uses gpt-4o-mini assert call_args.kwargs["model"] == "openai/gpt-4o-mini" else: # Germany question uses gpt-4o assert call_args.kwargs["model"] == "openai/gpt-4o" # The main thread is not affected by the context assert dspy.settings.lm.model == "openai/gpt-4o" @pytest.mark.asyncio async def test_dspy_context_with_async_task_group(): class MyModule(dspy.Module): def __init__(self): self.predict = dspy.Predict("question -> answer") async def aforward(self, question: str) -> str: lm = ( dspy.LM("openai/gpt-4o-mini", cache=False) if "France" in question else dspy.LM("openai/gpt-4o", cache=False) ) with dspy.context(lm=lm, trace=[]): await asyncio.sleep(1) assert dspy.settings.lm.model == lm.model result = await self.predict.acall(question=question) assert len(dspy.settings.trace) == 1 return result module = MyModule() with dspy.context(lm=dspy.LM("openai/gpt-4.1", cache=False), adapter=dspy.ChatAdapter()): with mock.patch("litellm.acompletion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="[[ ## answer ## ]]\nParis"))], model="openai/gpt-4o-mini", ) # Define the coroutines to be run coroutines = [ module.acall(question="What is the capital of France?"), module.acall(question="What is the capital of France?"), module.acall(question="What is the capital of Germany?"), module.acall(question="What is the capital of Germany?"), ] # Run them concurrently and gather results results = await asyncio.gather(*coroutines) assert results[0].answer == "Paris" assert results[1].answer == "Paris" assert results[2].answer == "Paris" assert results[3].answer == "Paris" # Verify mock was called correctly assert mock_completion.call_count == 4 # France question uses gpt-4o-mini assert mock_completion.call_args_list[0].kwargs["model"] == "openai/gpt-4o-mini" assert mock_completion.call_args_list[1].kwargs["model"] == "openai/gpt-4o-mini" # Germany question uses gpt-4o assert mock_completion.call_args_list[2].kwargs["model"] == "openai/gpt-4o" assert mock_completion.call_args_list[3].kwargs["model"] == "openai/gpt-4o" # The main thread is not affected by the context assert dspy.settings.lm.model == "openai/gpt-4.1" assert dspy.settings.trace == [] @pytest.mark.asyncio async def test_dspy_configure_allowance_async(): def bar1(): # `dspy.configure` is disallowed in different async tasks from the initial one. # In this case, foo1 (async) calls bar1 (sync), and bar1 uses the async task from foo1. with pytest.raises(RuntimeError) as e: dspy.configure(lm=dspy.LM("openai/gpt-4o")) assert "dspy.settings.configure(...) can only be called from the same async" in str(e.value) async def foo1(): bar1() await asyncio.sleep(0.1) async def foo2(): # `dspy.configure` is disallowed in different async tasks from the initial one. with pytest.raises(RuntimeError) as e: dspy.configure(lm=dspy.LM("openai/gpt-4o")) assert "dspy.settings.configure(...) can only be called from the same async" in str(e.value) await asyncio.sleep(0.1) async def foo3(): # `dspy.context` is allowed in different async tasks from the initial one. with dspy.context(lm=dspy.LM("openai/gpt-4o")): await asyncio.sleep(0.1) async def foo4(): # foo4 is directly invoked by the entry task, so it has the same async task as the entry task. dspy.configure(lm=dspy.LM("openai/gpt-4o")) await asyncio.sleep(0.1) # `dspy.configure` is allowed to be called multiple times in the same async task. dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) dspy.configure(lm=dspy.LM("openai/gpt-4o")) dspy.configure(adapter=dspy.JSONAdapter()) await asyncio.gather(foo1(), foo2(), foo3()) foo4() ``` -------------------------------------------------------------------------------- /dspy/retrievers/embeddings.py: -------------------------------------------------------------------------------- ```python import json import os from typing import Any import numpy as np from dspy.utils.unbatchify import Unbatchify class Embeddings: def __init__( self, corpus: list[str], embedder, k: int = 5, callbacks: list[Any] | None = None, cache: bool = False, brute_force_threshold: int = 20_000, normalize: bool = True, ): assert cache is False, "Caching is not supported for embeddings-based retrievers" self.embedder = embedder self.k = k self.corpus = corpus self.normalize = normalize self.corpus_embeddings = self.embedder(self.corpus) self.corpus_embeddings = self._normalize(self.corpus_embeddings) if self.normalize else self.corpus_embeddings self.index = self._build_faiss() if len(corpus) >= brute_force_threshold else None self.search_fn = Unbatchify(self._batch_forward) def __call__(self, query: str): return self.forward(query) def forward(self, query: str): import dspy passages, indices = self.search_fn(query) return dspy.Prediction(passages=passages, indices=indices) def _batch_forward(self, queries: list[str]): q_embeds = self.embedder(queries) q_embeds = self._normalize(q_embeds) if self.normalize else q_embeds pids = self._faiss_search(q_embeds, self.k * 10) if self.index else None pids = np.tile(np.arange(len(self.corpus)), (len(queries), 1)) if pids is None else pids return self._rerank_and_predict(q_embeds, pids) def _build_faiss(self): nbytes = 32 partitions = int(2 * np.sqrt(len(self.corpus))) dim = self.corpus_embeddings.shape[1] try: import faiss except ImportError: raise ImportError("Please `pip install faiss-cpu` or increase `brute_force_threshold` to avoid FAISS.") quantizer = faiss.IndexFlatL2(dim) index = faiss.IndexIVFPQ(quantizer, dim, partitions, nbytes, 8) print( f"Training a {nbytes}-byte FAISS index with {partitions} partitions, based on " f"{len(self.corpus)} x {dim}-dim embeddings" ) index.train(self.corpus_embeddings) index.add(self.corpus_embeddings) index.nprobe = min(16, partitions) return index def _faiss_search(self, query_embeddings: np.ndarray, num_candidates: int): return self.index.search(query_embeddings, num_candidates)[1] def _rerank_and_predict(self, q_embeds: np.ndarray, candidate_indices: np.ndarray): candidate_embeddings = self.corpus_embeddings[candidate_indices] scores = np.einsum("qd,qkd->qk", q_embeds, candidate_embeddings) top_k_indices = np.argsort(-scores, axis=1)[:, : self.k] top_indices = candidate_indices[np.arange(len(q_embeds))[:, None], top_k_indices] return [([self.corpus[idx] for idx in indices], [idx for idx in indices]) for indices in top_indices] # noqa: C416 def _normalize(self, embeddings: np.ndarray): norms = np.linalg.norm(embeddings, axis=1, keepdims=True) return embeddings / np.maximum(norms, 1e-10) def save(self, path: str): """ Save the embeddings index to disk. This saves the corpus, embeddings, FAISS index (if present), and configuration to allow for fast loading without recomputing embeddings. Args: path: Directory path where the embeddings will be saved """ os.makedirs(path, exist_ok=True) # Save configuration and corpus config = { "k": self.k, "normalize": self.normalize, "corpus": self.corpus, "has_faiss_index": self.index is not None, } with open(os.path.join(path, "config.json"), "w") as f: json.dump(config, f, indent=2) # Save embeddings np.save(os.path.join(path, "corpus_embeddings.npy"), self.corpus_embeddings) # Save FAISS index if it exists if self.index is not None: try: import faiss faiss.write_index(self.index, os.path.join(path, "faiss_index.bin")) except ImportError: # If FAISS is not available, we can't save the index # but we can still save the embeddings for brute force search pass def load(self, path: str, embedder): """ Load the embeddings index from disk into the current instance. Args: path: Directory path where the embeddings were saved embedder: The embedder function to use for new queries Returns: self: Returns self for method chaining Raises: FileNotFoundError: If the save directory or required files don't exist ValueError: If the saved config is invalid or incompatible """ if not os.path.exists(path): raise FileNotFoundError(f"Save directory not found: {path}") config_path = os.path.join(path, "config.json") embeddings_path = os.path.join(path, "corpus_embeddings.npy") if not os.path.exists(config_path): raise FileNotFoundError(f"Config file not found: {config_path}") if not os.path.exists(embeddings_path): raise FileNotFoundError(f"Embeddings file not found: {embeddings_path}") # Load configuration and corpus with open(config_path) as f: config = json.load(f) # Validate required config fields required_fields = ["k", "normalize", "corpus", "has_faiss_index"] for field in required_fields: if field not in config: raise ValueError(f"Invalid config: missing required field '{field}'") # Restore configuration self.k = config["k"] self.normalize = config["normalize"] self.corpus = config["corpus"] self.embedder = embedder # Load embeddings self.corpus_embeddings = np.load(embeddings_path) # Load FAISS index if it was saved and FAISS is available faiss_index_path = os.path.join(path, "faiss_index.bin") if config["has_faiss_index"] and os.path.exists(faiss_index_path): try: import faiss self.index = faiss.read_index(faiss_index_path) except ImportError: # If FAISS is not available, fall back to brute force self.index = None else: self.index = None return self @classmethod def from_saved(cls, path: str, embedder): """ Create an Embeddings instance from a saved index. This is the recommended way to load saved embeddings as it creates a new instance without unnecessarily computing embeddings. Args: path: Directory path where the embeddings were saved embedder: The embedder function to use for new queries Returns: Embeddings instance loaded from disk Example: ```python # Save embeddings embeddings = Embeddings(corpus, embedder) embeddings.save("./saved_embeddings") # Load embeddings later loaded_embeddings = Embeddings.from_saved("./saved_embeddings", embedder) ``` """ # Create a minimal instance without triggering embedding computation instance = cls.__new__(cls) # Initialize the search function (required since we bypassed __init__) instance.search_fn = Unbatchify(instance._batch_forward) instance.load(path, embedder) return instance ``` -------------------------------------------------------------------------------- /dspy/datasets/alfworld/base_config.yml: -------------------------------------------------------------------------------- ```yaml dataset: data_path: '$ALFWORLD_DATA/json_2.1.1/train' eval_id_data_path: '$ALFWORLD_DATA/json_2.1.1/valid_seen' # null/None to disable eval_ood_data_path: '$ALFWORLD_DATA/json_2.1.1/valid_unseen' # null/None to disable num_train_games: -1 # max training games (<=0 indicates full dataset) num_eval_games: -1 # max evaluation games (<=0 indicates full dataset) logic: domain: '$ALFWORLD_DATA/logic/alfred.pddl' # PDDL domain file that defines the world dynamics grammar: '$ALFWORLD_DATA/logic/alfred.twl2' # Grammar file that defines the text feedbacks env: type: 'AlfredTWEnv' # 'AlfredTWEnv' or 'AlfredThorEnv' or 'AlfredHybrid' regen_game_files: False # check if game is solvable by expert and save to game.tw-pddl file domain_randomization: False # shuffle Textworld print order and object id nums task_types: [1, 2, 3, 4, 5, 6] # task-type ids: 1 - Pick & Place, 2 - Examine in Light, 3 - Clean & Place, 4 - Heat & Place, 5 - Cool & Place, 6 - Pick Two & Place expert_timeout_steps: 150 # max steps before timeout for expert to solve the task expert_type: "handcoded" # 'handcoded' or 'downward'. Note: the downward planner is very slow for real-time use goal_desc_human_anns_prob: 0.0 # prob of using human-annotated goal language instead of templated goals (1.0 indicates all human annotations from ALFRED) hybrid: start_eps: 100000 # starting episode of hybrid training, tw-only training upto this point thor_prob: 0.5 # prob of AlfredThorEnv during hybrid training eval_mode: "tw" # 'tw' or 'thor' - env used for evaluation during hybrid training thor: screen_width: 300 # width of THOR window screen_height: 300 # height of THOR window smooth_nav: False # smooth rotations, looks, and translations during navigation (very slow) save_frames_to_disk: False # save frame PNGs to disk (useful for making videos) save_frames_path: './videos/' # path to save frame PNGs controller: type: 'oracle' # 'oracle' or 'oracle_astar' or 'mrcnn' or 'mrcnn_astar' (aka BUTLER) debug: False load_receps: True # load receptacle locations from precomputed dict (if available) mask_rcnn: pretrained_model_path: '$ALFWORLD_DATA/detectors/mrcnn.pth' general: random_seed: 42 use_cuda: True # disable this when running on machine without cuda visdom: False # plot training/eval curves, run with visdom server task: 'alfred' training_method: 'dagger' # 'dqn' or 'dagger' save_path: './training/' # path to save pytorch models observation_pool_capacity: 3 # k-size queue, 0 indicates no observation hide_init_receptacles: False # remove initial observation containing navigable receptacles training: batch_size: 10 max_episode: 50000 smoothing_eps: 0.1 optimizer: learning_rate: 0.001 clip_grad_norm: 5 evaluate: run_eval: True batch_size: 10 env: type: "AlfredTWEnv" checkpoint: report_frequency: 1000 # report every N episode experiment_tag: 'test' # name of experiment load_pretrained: False # during test, enable this so that the agent load your pretrained model load_from_tag: 'not loading anything' # name of pre-trained model to load in save_path model: encoder_layers: 1 decoder_layers: 1 encoder_conv_num: 5 block_hidden_dim: 64 n_heads: 1 dropout: 0.1 block_dropout: 0.1 recurrent: True rl: action_space: "admissible" # 'admissible' (candidates from text engine) or 'generation' (seq2seq-style generation) or 'beam_search_choice' or 'exhaustive' (not working) max_target_length: 20 # max token length for seq2seq generation beam_width: 10 # 1 means greedy generate_top_k: 3 training: max_nb_steps_per_episode: 50 # terminate after this many steps learn_start_from_this_episode: 0 # delay updates until this episode target_net_update_frequency: 500 # sync target net with online net per this many epochs replay: accumulate_reward_from_final: True count_reward_lambda: 0.0 # 0 to disable novel_object_reward_lambda: 0.0 # 0 to disable discount_gamma_game_reward: 0.9 discount_gamma_count_reward: 0.5 discount_gamma_novel_object_reward: 0.5 replay_memory_capacity: 500000 # adjust this depending on your RAM size replay_memory_priority_fraction: 0.5 update_per_k_game_steps: 5 replay_batch_size: 64 multi_step: 3 replay_sample_history_length: 4 replay_sample_update_from: 2 epsilon_greedy: noisy_net: False # if this is true, then epsilon greedy is disabled epsilon_anneal_episodes: 1000 # -1 if not annealing epsilon_anneal_from: 0.3 epsilon_anneal_to: 0.1 dagger: action_space: "generation" # 'admissible' (candidates from text engine) or 'generation' (seq2seq-style generation) or 'exhaustive' (not working) max_target_length: 20 # max token length for seq2seq generation beam_width: 10 # 1 means greedy generate_top_k: 5 unstick_by_beam_search: False # use beam-search for failed actions, set True during evaluation training: max_nb_steps_per_episode: 50 # terminate after this many steps fraction_assist: fraction_assist_anneal_episodes: 50000 fraction_assist_anneal_from: 1.0 fraction_assist_anneal_to: 0.01 fraction_random: fraction_random_anneal_episodes: 0 fraction_random_anneal_from: 0.0 fraction_random_anneal_to: 0.0 replay: replay_memory_capacity: 500000 update_per_k_game_steps: 5 replay_batch_size: 64 replay_sample_history_length: 4 replay_sample_update_from: 2 vision_dagger: model_type: "resnet" # 'resnet' (whole image features) or 'maskrcnn_whole' (whole image MaskRCNN feats) or 'maskrcnn' (top k MaskRCNN detection feats) or 'no_vision' (zero vision input) resnet_fc_dim: 64 maskrcnn_top_k_boxes: 10 # top k box features use_exploration_frame_feats: False # append feats from initial exploration (memory intensive!) sequence_aggregation_method: "average" # 'sum' or 'average' or 'rnn' ``` -------------------------------------------------------------------------------- /dspy/adapters/types/citation.py: -------------------------------------------------------------------------------- ```python from typing import Any, Optional import pydantic from dspy.adapters.types.base_type import Type from dspy.utils.annotation import experimental @experimental(version="3.0.4") class Citations(Type): """Citations extracted from an LM response with source references. This type represents citations returned by language models that support citation extraction, particularly Anthropic's Citations API through LiteLLM. Citations include the quoted text and source information. Example: ```python import os import dspy from dspy.signatures import Signature from dspy.experimental import Citations, Document os.environ["ANTHROPIC_API_KEY"] = "YOUR_ANTHROPIC_API_KEY" class AnswerWithSources(Signature): '''Answer questions using provided documents with citations.''' documents: list[Document] = dspy.InputField() question: str = dspy.InputField() answer: str = dspy.OutputField() citations: Citations = dspy.OutputField() # Create documents to provide as sources docs = [ Document( data="The Earth orbits the Sun in an elliptical path.", title="Basic Astronomy Facts" ), Document( data="Water boils at 100°C at standard atmospheric pressure.", title="Physics Fundamentals", metadata={"author": "Dr. Smith", "year": 2023} ) ] # Use with a model that supports citations like Claude lm = dspy.LM("anthropic/claude-opus-4-1-20250805") predictor = dspy.Predict(AnswerWithSources) result = predictor(documents=docs, question="What temperature does water boil?", lm=lm) for citation in result.citations.citations: print(citation.format()) ``` """ class Citation(Type): """Individual citation with character location information.""" type: str = "char_location" cited_text: str document_index: int document_title: str | None = None start_char_index: int end_char_index: int supported_text: str | None = None def format(self) -> dict[str, Any]: """Format citation as dictionary for LM consumption. Returns: A dictionary in the format expected by citation APIs. """ citation_dict = { "type": self.type, "cited_text": self.cited_text, "document_index": self.document_index, "start_char_index": self.start_char_index, "end_char_index": self.end_char_index } if self.document_title: citation_dict["document_title"] = self.document_title if self.supported_text: citation_dict["supported_text"] = self.supported_text return citation_dict citations: list[Citation] @classmethod def from_dict_list(cls, citations_dicts: list[dict[str, Any]]) -> "Citations": """Convert a list of dictionaries to a Citations instance. Args: citations_dicts: A list of dictionaries, where each dictionary should have 'cited_text' key and 'document_index', 'start_char_index', 'end_char_index' keys. Returns: A Citations instance. Example: ```python citations_dict = [ { "cited_text": "The sky is blue", "document_index": 0, "document_title": "Weather Guide", "start_char_index": 0, "end_char_index": 15, "supported_text": "The sky was blue yesterday." } ] citations = Citations.from_dict_list(citations_dict) ``` """ citations = [cls.Citation(**item) for item in citations_dicts] return cls(citations=citations) @classmethod def description(cls) -> str: """Description of the citations type for use in prompts.""" return ( "Citations with quoted text and source references. " "Include the exact text being cited and information about its source." ) def format(self) -> list[dict[str, Any]]: """Format citations as a list of dictionaries.""" return [citation.format() for citation in self.citations] @pydantic.model_validator(mode="before") @classmethod def validate_input(cls, data: Any): if isinstance(data, cls): return data # Handle case where data is a list of dicts with citation info if isinstance(data, list) and all( isinstance(item, dict) and "cited_text" in item for item in data ): return {"citations": [cls.Citation(**item) for item in data]} # Handle case where data is a dict elif isinstance(data, dict): if "citations" in data: # Handle case where data is a dict with "citations" key citations_data = data["citations"] if isinstance(citations_data, list): return { "citations": [ cls.Citation(**item) if isinstance(item, dict) else item for item in citations_data ] } elif "cited_text" in data: # Handle case where data is a single citation dict return {"citations": [cls.Citation(**data)]} raise ValueError(f"Received invalid value for `Citations`: {data}") def __iter__(self): """Allow iteration over citations.""" return iter(self.citations) def __len__(self): """Return the number of citations.""" return len(self.citations) def __getitem__(self, index): """Allow indexing into citations.""" return self.citations[index] @classmethod def is_streamable(cls) -> bool: """Whether the Citations type is streamable.""" return True @classmethod def parse_stream_chunk(cls, chunk) -> Optional["Citations"]: """ Parse a stream chunk into Citations. Args: chunk: A stream chunk from the LM. Returns: A Citations object if the chunk contains citation data, None otherwise. """ try: # Check if the chunk has citation data in provider_specific_fields if hasattr(chunk, "choices") and chunk.choices: delta = chunk.choices[0].delta if hasattr(delta, "provider_specific_fields") and delta.provider_specific_fields: citation_data = delta.provider_specific_fields.get("citation") if citation_data: return cls.from_dict_list([citation_data]) except Exception: pass return None @classmethod def parse_lm_response(cls, response: str | dict[str, Any]) -> Optional["Citations"]: """Parse a LM response into Citations. Args: response: A LM response that may contain citation data. Returns: A Citations object if citation data is found, None otherwise. """ if isinstance(response, dict): # Check if the response contains citations in the expected format if "citations" in response: citations_data = response["citations"] if isinstance(citations_data, list): return cls.from_dict_list(citations_data) return None ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/conversation_history/index.md: -------------------------------------------------------------------------------- ```markdown # Managing Conversation History Maintaining conversation history is a fundamental feature when building AI applications such as chatbots. While DSPy does not provide automatic conversation history management within `dspy.Module`, it offers the `dspy.History` utility to help you manage conversation history effectively. ## Using `dspy.History` to Manage Conversation History The `dspy.History` class can be used as an input field type, containing a `messages: list[dict[str, Any]]` attribute that stores the conversation history. Each entry in this list is a dictionary with keys corresponding to the fields defined in your signature. See the example below: ```python import dspy import os os.environ["OPENAI_API_KEY"] = "{your_openai_api_key}" dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) class QA(dspy.Signature): question: str = dspy.InputField() history: dspy.History = dspy.InputField() answer: str = dspy.OutputField() predict = dspy.Predict(QA) history = dspy.History(messages=[]) while True: question = input("Type your question, end conversation by typing 'finish': ") if question == "finish": break outputs = predict(question=question, history=history) print(f"\n{outputs.answer}\n") history.messages.append({"question": question, **outputs}) dspy.inspect_history() ``` There are two key steps when using the conversation history: - **Include a field of type `dspy.History` in your Signature.** - **Maintain a history instance at runtime, appending new conversation turns to it.** Each entry should include all relevant input and output field information. A sample run might look like this: ``` Type your question, end conversation by typing 'finish': do you know the competition between pytorch and tensorflow? Yes, there is a notable competition between PyTorch and TensorFlow, which are two of the most popular deep learning frameworks. PyTorch, developed by Facebook, is known for its dynamic computation graph, which allows for more flexibility and ease of use, especially in research settings. TensorFlow, developed by Google, initially used a static computation graph but has since introduced eager execution to improve usability. TensorFlow is often favored in production environments due to its scalability and deployment capabilities. Both frameworks have strong communities and extensive libraries, and the choice between them often depends on specific project requirements and personal preference. Type your question, end conversation by typing 'finish': which one won the battle? just tell me the result, don't include any reasoning, thanks! There is no definitive winner; both PyTorch and TensorFlow are widely used and have their own strengths. Type your question, end conversation by typing 'finish': finish [2025-07-11T16:35:57.592762] System message: Your input fields are: 1. `question` (str): 2. `history` (History): Your output fields are: 1. `answer` (str): All interactions will be structured in the following way, with the appropriate values filled in. [[ ## question ## ]] {question} [[ ## history ## ]] {history} [[ ## answer ## ]] {answer} [[ ## completed ## ]] In adhering to this structure, your objective is: Given the fields `question`, `history`, produce the fields `answer`. User message: [[ ## question ## ]] do you know the competition between pytorch and tensorflow? Assistant message: [[ ## answer ## ]] Yes, there is a notable competition between PyTorch and TensorFlow, which are two of the most popular deep learning frameworks. PyTorch, developed by Facebook, is known for its dynamic computation graph, which allows for more flexibility and ease of use, especially in research settings. TensorFlow, developed by Google, initially used a static computation graph but has since introduced eager execution to improve usability. TensorFlow is often favored in production environments due to its scalability and deployment capabilities. Both frameworks have strong communities and extensive libraries, and the choice between them often depends on specific project requirements and personal preference. [[ ## completed ## ]] User message: [[ ## question ## ]] which one won the battle? just tell me the result, don't include any reasoning, thanks! Respond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`. Response: [[ ## answer ## ]] There is no definitive winner; both PyTorch and TensorFlow are widely used and have their own strengths. [[ ## completed ## ]] ``` Notice how each user input and assistant response is appended to the history, allowing the model to maintain context across turns. The actual prompt sent to the language model is a multi-turn message, as shown by the output of `dspy.inspect_history`. Each conversation turn is represented as a user message followed by an assistant message. ## History in Few-shot Examples You may notice that `history` does not appear in the input fields section of the prompt, even though it is listed as an input field (e.g., "2. `history` (History):" in the system message). This is intentional: when formatting few-shot examples that include conversation history, DSPy does not expand the history into multiple turns. Instead, to remain compatible with the OpenAI standard format, each few-shot example is represented as a single turn. For example: ``` import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) class QA(dspy.Signature): question: str = dspy.InputField() history: dspy.History = dspy.InputField() answer: str = dspy.OutputField() predict = dspy.Predict(QA) history = dspy.History(messages=[]) predict.demos.append( dspy.Example( question="What is the capital of France?", history=dspy.History( messages=[{"question": "What is the capital of Germany?", "answer": "The capital of Germany is Berlin."}] ), answer="The capital of France is Paris.", ) ) predict(question="What is the capital of America?", history=dspy.History(messages=[])) dspy.inspect_history() ``` The resulting history will look like this: ``` [2025-07-11T16:53:10.994111] System message: Your input fields are: 1. `question` (str): 2. `history` (History): Your output fields are: 1. `answer` (str): All interactions will be structured in the following way, with the appropriate values filled in. [[ ## question ## ]] {question} [[ ## history ## ]] {history} [[ ## answer ## ]] {answer} [[ ## completed ## ]] In adhering to this structure, your objective is: Given the fields `question`, `history`, produce the fields `answer`. User message: [[ ## question ## ]] What is the capital of France? [[ ## history ## ]] {"messages": [{"question": "What is the capital of Germany?", "answer": "The capital of Germany is Berlin."}]} Assistant message: [[ ## answer ## ]] The capital of France is Paris. [[ ## completed ## ]] User message: [[ ## question ## ]] What is the capital of Germany? Respond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`. Response: [[ ## answer ## ]] The capital of Germany is Berlin. [[ ## completed ## ]] ``` As you can see, the few-shot example does not expand the conversation history into multiple turns. Instead, it represents the history as JSON data within its section: ``` [[ ## history ## ]] {"messages": [{"question": "What is the capital of Germany?", "answer": "The capital of Germany is Berlin."}]} ``` This approach ensures compatibility with standard prompt formats while still providing the model with relevant conversational context. ``` -------------------------------------------------------------------------------- /dspy/predict/program_of_thought.py: -------------------------------------------------------------------------------- ```python import json import logging import re import dspy from dspy.primitives.module import Module from dspy.primitives.python_interpreter import PythonInterpreter from dspy.signatures.signature import Signature, ensure_signature logger = logging.getLogger(__name__) class ProgramOfThought(Module): """ A DSPy module that runs Python programs to solve a problem. This module requires deno to be installed. Please install deno following https://docs.deno.com/runtime/getting_started/installation/ Example: ``` import dspy lm = dspy.LM('openai/gpt-4o-mini') dspy.configure(lm=lm) pot = dspy.ProgramOfThought("question -> answer") pot(question="what is 1+1?") ``` """ def __init__(self, signature: str | type[Signature], max_iters: int = 3, interpreter: PythonInterpreter | None = None): """ Args: signature: The signature of the module. max_iters: The maximum number of iterations to retry code generation and execution. interpreter: PythonInterpreter instance to use. If None, a new one is instantiated. """ super().__init__() self.signature = signature = ensure_signature(signature) self.max_iters = max_iters self.input_fields = signature.input_fields self.output_fields = signature.output_fields self.code_generate = dspy.ChainOfThought( dspy.Signature( self._generate_signature("generate").fields, self._generate_instruction("generate"), ), ) self.code_regenerate = dspy.ChainOfThought( dspy.Signature( self._generate_signature("regenerate").fields, self._generate_instruction("regenerate"), ), ) self.generate_answer = dspy.ChainOfThought( dspy.Signature( self._generate_signature("answer").fields, self._generate_instruction("answer"), ), ) # It will raises exception when dspy cannot find available deno instance by now. self.interpreter = interpreter or PythonInterpreter() def _generate_signature(self, mode): signature_dict = dict(self.input_fields) fields_for_mode = { "generate": { "generated_code": dspy.OutputField( prefix="Code:", desc="python code that answers the question", format=str, ), }, "regenerate": { "previous_code": dspy.InputField( prefix="Previous Code:", desc="previously-generated python code that errored", format=str, ), "error": dspy.InputField( prefix="Error:", desc="error message from previously-generated python code", ), "generated_code": dspy.OutputField( prefix="Code:", desc="python code that answers the question", format=str, ), }, "answer": { "final_generated_code": dspy.InputField( prefix="Code:", desc="python code that answers the question", format=str, ), "code_output": dspy.InputField( prefix="Code Output:", desc="output of previously-generated python code", ), } | self.signature.output_fields, } signature_dict.update(fields_for_mode[mode]) return dspy.Signature(signature_dict) def _generate_instruction(self, mode): mode_inputs = ", ".join( [f"`{field_name}`" for field_name in self._generate_signature(mode).input_fields], ) mode_outputs = ", ".join( [f"`{field_name}`" for field_name in self._generate_signature(mode).output_fields], ) final_outputs = ", ".join( [f"`{field_name}`" for field_name in self.output_fields], ) if mode == "generate": instr = [ f"You will be given {mode_inputs} and you will respond with {mode_outputs}.", f"Generating executable Python code that programmatically computes the correct {mode_outputs}.", "After you're done with the computation and think you have the answer, make sure to provide your answer by calling the preloaded function `final_answer()`.", f'You should structure your answer in a dict object, like {{"field_a": answer_a, ...}}, evaluates to the correct value mapping for {final_outputs}.', ] elif mode == "regenerate": instr = [ f"You are given {mode_inputs} due to an error in previous code.", "Your task is to correct the error and provide the new `generated_code`.", ] else: # mode == 'answer' instr = [ f"Given the final code {mode_inputs}, provide the final {mode_outputs}.", ] return "\n".join(instr) def _parse_code(self, code_data): code = code_data.get("generated_code", "").split("---", 1)[0].split("\n\n\n", 1)[0] code_match = re.search(r"```python[ \n](.*?)[ \n]```?", code, re.DOTALL) code_block = (code_match.group(1) if code_match else code).replace("\\n", "\n") if not code_block: return code, "Error: Empty code after parsing." if "\n" not in code_block and code_block.count("=") > 1: return code, "Error: Code format is not correct." lines = code_block.split("\n") last_line_match = re.match(r"^(\w+)\s*=", lines[-1].strip()) if last_line_match and len(lines) > 1: code_block += "\n" + last_line_match.group(1) else: code_block = re.sub( r"([a-zA-Z_]\w* *=.*?)(?=[a-zA-Z_]\w* *=)", r"\1\n", code_block, ) code_block = re.sub( r"([a-zA-Z_]\w* *=.*?)([a-zA-Z_]\w*)$", r"\1\n\2", code_block, ) return code_block, None def _execute_code(self, code): """ Execute the code using PythonInterpreter and return the output or error. """ if not code: return None, "Error: Empty code before execution." try: # Since it's more complex structure now, just blindly use json to represents all. output = json.dumps(self.interpreter.execute(code)) return output, None except Exception as e: return None, str(e) def forward(self, **kwargs): input_kwargs = {field_name: kwargs[field_name] for field_name in self.input_fields} code_data = self.code_generate(**input_kwargs) output = None code, error = self._parse_code(code_data) if not error: output, error = self._execute_code(code) hop = 1 # Retying code generation and execution until no error or reach max_iters while error is not None: logger.error(f"Error in code execution: {error}") if hop == self.max_iters: self.interpreter.shutdown() raise RuntimeError(f"Max hops reached. Failed to run ProgramOfThought: {error}") input_kwargs.update({"previous_code": code, "error": error}) code_data = self.code_regenerate(**input_kwargs) code, error = self._parse_code(code_data) if not error: output, error = self._execute_code(code) hop += 1 input_kwargs.update({"final_generated_code": code, "code_output": output}) answer_gen_result = self.generate_answer(**input_kwargs) self.interpreter.shutdown() return answer_gen_result ```