This is page 7 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /docs/docs/static/img/undraw_docusaurus_tree.svg: -------------------------------------------------------------------------------- ``` <svg xmlns="http://www.w3.org/2000/svg" width="1129" height="663" viewBox="0 0 1129 663"> <title>Focus on What Matters</title> <circle cx="321" cy="321" r="321" fill="#f2f2f2" /> <ellipse cx="559" cy="635.49998" rx="514" ry="27.50002" fill="#3f3d56" /> <ellipse cx="558" cy="627" rx="460" ry="22" opacity="0.2" /> <rect x="131" y="152.5" width="840" height="50" fill="#3f3d56" /> <path d="M166.5,727.3299A21.67009,21.67009,0,0,0,188.1701,749H984.8299A21.67009,21.67009,0,0,0,1006.5,727.3299V296h-840Z" transform="translate(-35.5 -118.5)" fill="#3f3d56" /> <path d="M984.8299,236H188.1701A21.67009,21.67009,0,0,0,166.5,257.6701V296h840V257.6701A21.67009,21.67009,0,0,0,984.8299,236Z" transform="translate(-35.5 -118.5)" fill="#3f3d56" /> <path d="M984.8299,236H188.1701A21.67009,21.67009,0,0,0,166.5,257.6701V296h840V257.6701A21.67009,21.67009,0,0,0,984.8299,236Z" transform="translate(-35.5 -118.5)" opacity="0.2" /> <circle cx="181" cy="147.5" r="13" fill="#3f3d56" /> <circle cx="217" cy="147.5" r="13" fill="#3f3d56" /> <circle cx="253" cy="147.5" r="13" fill="#3f3d56" /> <rect x="168" y="213.5" width="337" height="386" rx="5.33505" fill="#606060" /> <rect x="603" y="272.5" width="284" height="22" rx="5.47638" fill="#2e8555" /> <rect x="537" y="352.5" width="416" height="15" rx="5.47638" fill="#2e8555" /> <rect x="537" y="396.5" width="416" height="15" rx="5.47638" fill="#2e8555" /> <rect x="537" y="440.5" width="416" height="15" rx="5.47638" fill="#2e8555" /> <rect x="537" y="484.5" width="416" height="15" rx="5.47638" fill="#2e8555" /> <rect x="865" y="552.5" width="88" height="26" rx="7.02756" fill="#3ecc5f" /> <path d="M1088.60287,624.61594a30.11371,30.11371,0,0,0,3.98291-15.266c0-13.79652-8.54358-24.98081-19.08256-24.98081s-19.08256,11.18429-19.08256,24.98081a30.11411,30.11411,0,0,0,3.98291,15.266,31.248,31.248,0,0,0,0,30.53213,31.248,31.248,0,0,0,0,30.53208,31.248,31.248,0,0,0,0,30.53208,30.11408,30.11408,0,0,0-3.98291,15.266c0,13.79652,8.54353,24.98081,19.08256,24.98081s19.08256-11.18429,19.08256-24.98081a30.11368,30.11368,0,0,0-3.98291-15.266,31.248,31.248,0,0,0,0-30.53208,31.248,31.248,0,0,0,0-30.53208,31.248,31.248,0,0,0,0-30.53213Z" transform="translate(-35.5 -118.5)" fill="#3f3d56" /> <ellipse cx="1038.00321" cy="460.31783" rx="19.08256" ry="24.9808" fill="#3f3d56" /> <ellipse cx="1038.00321" cy="429.78574" rx="19.08256" ry="24.9808" fill="#3f3d56" /> <path d="M1144.93871,339.34489a91.61081,91.61081,0,0,0,7.10658-10.46092l-50.141-8.23491,54.22885.4033a91.566,91.566,0,0,0,1.74556-72.42605l-72.75449,37.74139,67.09658-49.32086a91.41255,91.41255,0,1,0-150.971,102.29805,91.45842,91.45842,0,0,0-10.42451,16.66946l65.0866,33.81447-69.40046-23.292a91.46011,91.46011,0,0,0,14.73837,85.83669,91.40575,91.40575,0,1,0,143.68892,0,91.41808,91.41808,0,0,0,0-113.02862Z" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M981.6885,395.8592a91.01343,91.01343,0,0,0,19.56129,56.51431,91.40575,91.40575,0,1,0,143.68892,0C1157.18982,436.82067,981.6885,385.60008,981.6885,395.8592Z" transform="translate(-35.5 -118.5)" opacity="0.1" /> <path d="M365.62,461.43628H477.094v45.12043H365.62Z" transform="translate(-35.5 -118.5)" fill="#fff" fill-rule="evenodd" /> <path d="M264.76252,608.74122a26.50931,26.50931,0,0,1-22.96231-13.27072,26.50976,26.50976,0,0,0,22.96231,39.81215H291.304V608.74122Z" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M384.17242,468.57061l92.92155-5.80726V449.49263a26.54091,26.54091,0,0,0-26.54143-26.54143H331.1161l-3.31768-5.74622a3.83043,3.83043,0,0,0-6.63536,0l-3.31768,5.74622-3.31767-5.74622a3.83043,3.83043,0,0,0-6.63536,0l-3.31768,5.74622L301.257,417.205a3.83043,3.83043,0,0,0-6.63536,0L291.304,422.9512c-.02919,0-.05573.004-.08625.004l-5.49674-5.49541a3.8293,3.8293,0,0,0-6.4071,1.71723l-1.81676,6.77338L270.607,424.1031a3.82993,3.82993,0,0,0-4.6912,4.69253l1.84463,6.89148-6.77072,1.81411a3.8315,3.8315,0,0,0-1.71988,6.40975l5.49673,5.49673c0,.02787-.004.05574-.004.08493l-5.74622,3.31768a3.83043,3.83043,0,0,0,0,6.63536l5.74621,3.31768L259.0163,466.081a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768-5.74622,3.31767a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768-5.74622,3.31768a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768-5.74622,3.31767a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768-5.74622,3.31768a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768-5.74622,3.31768a3.83042,3.83042,0,0,0,0,6.63535l5.74622,3.31768-5.74622,3.31768a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768L259.0163,558.976a3.83042,3.83042,0,0,0,0,6.63535l5.74622,3.31768-5.74622,3.31768a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768-5.74622,3.31768a3.83042,3.83042,0,0,0,0,6.63535l5.74622,3.31768-5.74622,3.31768a3.83043,3.83043,0,0,0,0,6.63536l5.74622,3.31768A26.54091,26.54091,0,0,0,291.304,635.28265H450.55254A26.5409,26.5409,0,0,0,477.094,608.74122V502.5755l-92.92155-5.80727a14.12639,14.12639,0,0,1,0-28.19762" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M424.01111,635.28265h39.81214V582.19979H424.01111Z" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M490.36468,602.10586a6.60242,6.60242,0,0,0-.848.08493c-.05042-.19906-.09821-.39945-.15393-.59852A6.62668,6.62668,0,1,0,482.80568,590.21q-.2203-.22491-.44457-.44589a6.62391,6.62391,0,1,0-11.39689-6.56369c-.1964-.05575-.39414-.10218-.59056-.15262a6.63957,6.63957,0,1,0-13.10086,0c-.1964.05042-.39414.09687-.59056.15262a6.62767,6.62767,0,1,0-11.39688,6.56369,26.52754,26.52754,0,1,0,44.23127,25.52756,6.6211,6.6211,0,1,0,.848-13.18579" transform="translate(-35.5 -118.5)" fill="#44d860" fill-rule="evenodd" /> <path d="M437.28182,555.65836H477.094V529.11693H437.28182Z" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M490.36468,545.70532a3.31768,3.31768,0,0,0,0-6.63536,3.41133,3.41133,0,0,0-.42333.04247c-.02655-.09953-.04911-.19907-.077-.29859a3.319,3.319,0,0,0-1.278-6.37923,3.28174,3.28174,0,0,0-2.00122.68742q-.10947-.11346-.22294-.22295a3.282,3.282,0,0,0,.67149-1.98265,3.31768,3.31768,0,0,0-6.37-1.2992,13.27078,13.27078,0,1,0,0,25.54082,3.31768,3.31768,0,0,0,6.37-1.2992,3.282,3.282,0,0,0-.67149-1.98265q.11347-.10947.22294-.22294a3.28174,3.28174,0,0,0,2.00122.68742,3.31768,3.31768,0,0,0,1.278-6.37923c.02786-.0982.05042-.19907.077-.29859a3.41325,3.41325,0,0,0,.42333.04246" transform="translate(-35.5 -118.5)" fill="#44d860" fill-rule="evenodd" /> <path d="M317.84538,466.081a3.31768,3.31768,0,0,1-3.31767-3.31768,9.953,9.953,0,1,0-19.90608,0,3.31768,3.31768,0,1,1-6.63535,0,16.58839,16.58839,0,1,1,33.17678,0,3.31768,3.31768,0,0,1-3.31768,3.31768" transform="translate(-35.5 -118.5)" fill-rule="evenodd" /> <path d="M370.92825,635.28265h79.62429A26.5409,26.5409,0,0,0,477.094,608.74122v-92.895H397.46968a26.54091,26.54091,0,0,0-26.54143,26.54143Z" transform="translate(-35.5 -118.5)" fill="#ffff50" fill-rule="evenodd" /> <path d="M457.21444,556.98543H390.80778a1.32707,1.32707,0,0,1,0-2.65414h66.40666a1.32707,1.32707,0,0,1,0,2.65414m0,26.54143H390.80778a1.32707,1.32707,0,1,1,0-2.65414h66.40666a1.32707,1.32707,0,0,1,0,2.65414m0,26.54143H390.80778a1.32707,1.32707,0,1,1,0-2.65414h66.40666a1.32707,1.32707,0,0,1,0,2.65414m0-66.10674H390.80778a1.32707,1.32707,0,0,1,0-2.65414h66.40666a1.32707,1.32707,0,0,1,0,2.65414m0,26.29459H390.80778a1.32707,1.32707,0,0,1,0-2.65414h66.40666a1.32707,1.32707,0,0,1,0,2.65414m0,26.54143H390.80778a1.32707,1.32707,0,0,1,0-2.65414h66.40666a1.32707,1.32707,0,0,1,0,2.65414M477.094,474.19076c-.01592,0-.0292-.008-.04512-.00663-4.10064.13934-6.04083,4.24132-7.75274,7.86024-1.78623,3.78215-3.16771,6.24122-5.43171,6.16691-2.50685-.09024-3.94007-2.92222-5.45825-5.91874-1.74377-3.44243-3.73438-7.34667-7.91333-7.20069-4.04227.138-5.98907,3.70784-7.70631,6.857-1.82738,3.35484-3.07084,5.39455-5.46887,5.30033-2.55727-.09289-3.91619-2.39536-5.48877-5.06013-1.75306-2.96733-3.77951-6.30359-7.8775-6.18946-3.97326.13669-5.92537,3.16507-7.64791,5.83912-1.82207,2.82666-3.09872,4.5492-5.52725,4.447-2.61832-.09289-3.9706-2.00388-5.53522-4.21611-1.757-2.4856-3.737-5.299-7.82308-5.16231-3.88567.13271-5.83779,2.61434-7.559,4.80135-1.635,2.07555-2.9116,3.71846-5.61218,3.615a1.32793,1.32793,0,1,0-.09555,2.65414c4.00377.134,6.03154-2.38873,7.79257-4.6275,1.562-1.9853,2.91027-3.69855,5.56441-3.78879,2.55594-.10882,3.75429,1.47968,5.56707,4.04093,1.7212,2.43385,3.67465,5.19416,7.60545,5.33616,4.11789.138,6.09921-2.93946,7.8536-5.66261,1.56861-2.43385,2.92221-4.53461,5.50734-4.62352,2.37944-.08892,3.67466,1.79154,5.50072,4.885,1.72121,2.91557,3.67069,6.21865,7.67977,6.36463,4.14709.14332,6.14965-3.47693,7.89475-6.68181,1.51155-2.77092,2.93814-5.38791,5.46621-5.4755,2.37944-.05573,3.62025,2.11668,5.45558,5.74622,1.71459,3.388,3.65875,7.22591,7.73019,7.37321l.22429.004c4.06614,0,5.99571-4.08074,7.70364-7.68905,1.51154-3.19825,2.94211-6.21069,5.3972-6.33411Z" transform="translate(-35.5 -118.5)" fill-rule="evenodd" /> <path d="M344.38682,635.28265h53.08286V582.19979H344.38682Z" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M424.01111,602.10586a6.60242,6.60242,0,0,0-.848.08493c-.05042-.19906-.09821-.39945-.15394-.59852A6.62667,6.62667,0,1,0,416.45211,590.21q-.2203-.22491-.44458-.44589a6.62391,6.62391,0,1,0-11.39689-6.56369c-.1964-.05575-.39413-.10218-.59054-.15262a6.63957,6.63957,0,1,0-13.10084,0c-.19641.05042-.39414.09687-.59055.15262a6.62767,6.62767,0,1,0-11.39689,6.56369,26.52755,26.52755,0,1,0,44.2313,25.52756,6.6211,6.6211,0,1,0,.848-13.18579" transform="translate(-35.5 -118.5)" fill="#44d860" fill-rule="evenodd" /> <path d="M344.38682,555.65836h53.08286V529.11693H344.38682Z" transform="translate(-35.5 -118.5)" fill="#3ecc5f" fill-rule="evenodd" /> <path d="M410.74039,545.70532a3.31768,3.31768,0,1,0,0-6.63536,3.41133,3.41133,0,0,0-.42333.04247c-.02655-.09953-.04911-.19907-.077-.29859a3.319,3.319,0,0,0-1.278-6.37923,3.28174,3.28174,0,0,0-2.00122.68742q-.10947-.11346-.22294-.22295a3.282,3.282,0,0,0,.67149-1.98265,3.31768,3.31768,0,0,0-6.37-1.2992,13.27078,13.27078,0,1,0,0,25.54082,3.31768,3.31768,0,0,0,6.37-1.2992,3.282,3.282,0,0,0-.67149-1.98265q.11347-.10947.22294-.22294a3.28174,3.28174,0,0,0,2.00122.68742,3.31768,3.31768,0,0,0,1.278-6.37923c.02786-.0982.05042-.19907.077-.29859a3.41325,3.41325,0,0,0,.42333.04246" transform="translate(-35.5 -118.5)" fill="#44d860" fill-rule="evenodd" /> <path d="M424.01111,447.8338a3.60349,3.60349,0,0,1-.65028-.06636,3.34415,3.34415,0,0,1-.62372-.18579,3.44679,3.44679,0,0,1-.572-.30522,5.02708,5.02708,0,0,1-.50429-.4114,3.88726,3.88726,0,0,1-.41007-.50428,3.27532,3.27532,0,0,1-.55737-1.84463,3.60248,3.60248,0,0,1,.06636-.65027,3.82638,3.82638,0,0,1,.18447-.62373,3.48858,3.48858,0,0,1,.30656-.57064,3.197,3.197,0,0,1,.91436-.91568,3.44685,3.44685,0,0,1,.572-.30523,3.344,3.344,0,0,1,.62372-.18578,3.06907,3.06907,0,0,1,1.30053,0,3.22332,3.22332,0,0,1,1.19436.491,5.02835,5.02835,0,0,1,.50429.41139,4.8801,4.8801,0,0,1,.41139.50429,3.38246,3.38246,0,0,1,.30522.57064,3.47806,3.47806,0,0,1,.25215,1.274A3.36394,3.36394,0,0,1,426.36,446.865a5.02708,5.02708,0,0,1-.50429.4114,3.3057,3.3057,0,0,1-1.84463.55737m26.54143-1.65884a3.38754,3.38754,0,0,1-2.35024-.96877,5.04185,5.04185,0,0,1-.41007-.50428,3.27532,3.27532,0,0,1-.55737-1.84463,3.38659,3.38659,0,0,1,.96744-2.34892,5.02559,5.02559,0,0,1,.50429-.41139,3.44685,3.44685,0,0,1,.572-.30523,3.3432,3.3432,0,0,1,.62373-.18579,3.06952,3.06952,0,0,1,1.30052,0,3.22356,3.22356,0,0,1,1.19436.491,5.02559,5.02559,0,0,1,.50429.41139,3.38792,3.38792,0,0,1,.96876,2.34892,3.72635,3.72635,0,0,1-.06636.65026,3.37387,3.37387,0,0,1-.18579.62373,4.71469,4.71469,0,0,1-.30522.57064,4.8801,4.8801,0,0,1-.41139.50429,5.02559,5.02559,0,0,1-.50429.41139,3.30547,3.30547,0,0,1-1.84463.55737" transform="translate(-35.5 -118.5)" fill-rule="evenodd" /> </svg> ``` -------------------------------------------------------------------------------- /dspy/teleprompt/bootstrap.py: -------------------------------------------------------------------------------- ```python import logging import random import threading import tqdm import dspy from dspy.teleprompt.teleprompt import Teleprompter from .vanilla import LabeledFewShot # TODO: metrics should return an object with __bool__ basically, but fine if they're more complex. # They can also be sortable. # TODO: Switch here from dspy.dsp.Example to dspy.Example. Right now, it's okay because it's internal only (predictors). # NOTE: Notice the places where we don't shuffle examples. I do like that this one doesn't shuffle. # Other ones that consider options may want to use both unshuffled and then shuffle a few times, when # considering candidates. # TODO: the max_rounds via branch_idx to get past the cache, not just temperature. # In principle, we can also sample multiple outputs from the final generation step # (or even each step, in case the validation function just wants *one* thing that works, but nah) # and try them all. Having a pretty solid guess on the "final step" of each example isn't hard by the second round, # in the sense that we have the trace from the first round. (Yes it may change but that's an edge case that # won't hurt our "best effort" guarantees.) # TODO: When this bootstraps for another teleprompter like finetune, we want all demos we gather. # But when it's for direct use we may want to sample ONE demo per predictor--example pair. # This is important for "multi-use" modules. # TODO: Add baselines=[...] logger = logging.getLogger(__name__) class BootstrapFewShot(Teleprompter): def __init__( self, metric=None, metric_threshold=None, teacher_settings: dict | None = None, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, max_errors=None, ): """A Teleprompter class that composes a set of demos/examples to go into a predictor's prompt. These demos come from a combination of labeled examples in the training set, and bootstrapped demos. Each bootstrap round copies the LM with a new ``rollout_id`` at ``temperature=1.0`` to bypass caches and gather diverse traces. Args: metric (Callable): A function that compares an expected value and predicted value, outputting the result of that comparison. metric_threshold (float, optional): If the metric yields a numerical value, then check it against this threshold when deciding whether or not to accept a bootstrap example. Defaults to None. teacher_settings (dict, optional): Settings for the `teacher` model. Defaults to None. max_bootstrapped_demos (int): Maximum number of bootstrapped demonstrations to include. Defaults to 4. max_labeled_demos (int): Maximum number of labeled demonstrations to include. Defaults to 16. max_rounds (int): Number of iterations to attempt generating the required bootstrap examples. If unsuccessful after `max_rounds`, the program ends. Defaults to 1. max_errors (Optional[int]): Maximum number of errors until program ends. If ``None``, inherits from ``dspy.settings.max_errors``. """ self.metric = metric self.metric_threshold = metric_threshold self.teacher_settings = {} if teacher_settings is None else teacher_settings self.max_bootstrapped_demos = max_bootstrapped_demos self.max_labeled_demos = max_labeled_demos self.max_rounds = max_rounds self.max_errors = max_errors self.error_count = 0 self.error_lock = threading.Lock() def compile(self, student, *, teacher=None, trainset): self.trainset = trainset self._prepare_student_and_teacher(student, teacher) self._prepare_predictor_mappings() self._bootstrap() self.student = self._train() self.student._compiled = True return self.student def _prepare_student_and_teacher(self, student, teacher): self.student = student.reset_copy() # NOTE: behavior change on Oct 28, 2024. Deep copy instead of reset copy for the student-as-teacher. self.teacher = teacher.deepcopy() if teacher is not None else student.deepcopy() assert getattr(self.student, "_compiled", False) is False, "Student must be uncompiled." if self.max_labeled_demos and getattr(self.teacher, "_compiled", False) is False: teleprompter = LabeledFewShot(k=self.max_labeled_demos) self.teacher = teleprompter.compile(self.teacher.reset_copy(), trainset=self.trainset) def _prepare_predictor_mappings(self): name2predictor, predictor2name = {}, {} student, teacher = self.student, self.teacher assert len(student.predictors()) == len( teacher.predictors(), ), "Student and teacher must have the same number of predictors." for (name1, predictor1), (name2, predictor2) in zip( student.named_predictors(), teacher.named_predictors(), strict=False ): assert name1 == name2, "Student and teacher must have the same program structure." if hasattr(predictor1.signature, "equals"): assert predictor1.signature.equals( predictor2.signature, ), ( f"Student and teacher must have the same signatures. " f"{type(predictor1.signature)} != {type(predictor2.signature)}" ) else: # fallback in case if .equals is not implemented (e.g. dsp.Prompt) assert predictor1.signature == predictor2.signature, ( f"Student and teacher must have the same signatures. " f"{type(predictor1.signature)} != {type(predictor2.signature)}" ) assert id(predictor1) != id(predictor2), "Student and teacher must be different objects." name2predictor[name1] = None # dict(student=predictor1, teacher=predictor2) predictor2name[id(predictor1)] = name1 # FIXME(shangyint): This is an ugly hack to bind traces of # retry.module to retry # if isinstance(predictor1, Retry): # predictor2name[id(predictor1.module)] = name1 predictor2name[id(predictor2)] = name2 self.name2predictor = name2predictor self.predictor2name = predictor2name def _bootstrap(self, *, max_bootstraps=None): max_bootstraps = max_bootstraps or self.max_bootstrapped_demos bootstrap_attempts = 0 bootstrapped = {} self.name2traces = {name: [] for name in self.name2predictor} for example_idx, example in enumerate(tqdm.tqdm(self.trainset)): if len(bootstrapped) >= max_bootstraps: break for round_idx in range(self.max_rounds): bootstrap_attempts += 1 if self._bootstrap_one_example(example, round_idx): bootstrapped[example_idx] = True break print( f"Bootstrapped {len(bootstrapped)} full traces after {example_idx} examples " f"for up to {self.max_rounds} rounds, amounting to {bootstrap_attempts} attempts." ) # Unbootstrapped training examples self.validation = [x for idx, x in enumerate(self.trainset) if idx not in bootstrapped] random.Random(0).shuffle(self.validation) self.validation = self.validation # NOTE: Can't yet use evaluate because we need to trace *per example* # evaluate = Evaluate(program=self.teacher, metric=self.metric, num_threads=12) # score = evaluate(self.metric, display_table=False, display_progress=True) def _bootstrap_one_example(self, example, round_idx=0): name2traces = {} teacher = self.teacher predictor_cache = {} try: with dspy.settings.context(trace=[], **self.teacher_settings): lm = dspy.settings.lm # Use a fresh rollout with temperature=1.0 to bypass caches. lm = lm.copy(rollout_id=round_idx, temperature=1.0) if round_idx > 0 else lm new_settings = {"lm": lm} if round_idx > 0 else {} with dspy.settings.context(**new_settings): for name, predictor in teacher.named_predictors(): predictor_cache[name] = predictor.demos predictor.demos = [x for x in predictor.demos if x != example] prediction = teacher(**example.inputs()) trace = dspy.settings.trace for name, predictor in teacher.named_predictors(): predictor.demos = predictor_cache[name] if self.metric: metric_val = self.metric(example, prediction, trace) if self.metric_threshold: success = metric_val >= self.metric_threshold else: success = metric_val else: success = True except Exception as e: success = False with self.error_lock: self.error_count += 1 current_error_count = self.error_count effective_max_errors = self.max_errors if self.max_errors is not None else dspy.settings.max_errors if current_error_count >= effective_max_errors: raise e logger.error(f"Failed to run or to evaluate example {example} with {self.metric} due to {e}.") if success: for step in trace: predictor, inputs, outputs = step demo = dspy.Example(augmented=True, **inputs, **outputs) try: predictor_name = self.predictor2name[id(predictor)] except KeyError: continue # FIXME: ! # # TODO: Look closer into this. It's a bit tricky to reproduce. # print(f"Failed to find predictor {predictor} in {self.predictor2name}.") # print( # "Are you doing this in a notebook (Jupyter)? This might be caused by redefining values by rerunning cells.", # ) # print("Try restarting the notebook, or open an issue.") # raise KeyError( # f"Failed to find predictor {id(predictor)} {predictor} in {self.predictor2name}.", # ) from e name2traces[predictor_name] = name2traces.get(predictor_name, []) name2traces[predictor_name].append(demo) # Update the traces for name, demos in name2traces.items(): # If there are multiple traces for the same predictor in the sample example, # sample 50/50 from the first N-1 traces or the last trace. if len(demos) > 1: from dspy.utils.hasher import Hasher rng = random.Random(Hasher.hash(tuple(demos))) demos = [rng.choice(demos[:-1]) if rng.random() < 0.5 else demos[-1]] self.name2traces[name].extend(demos) return success def _train(self): rng = random.Random(0) raw_demos = self.validation for name, predictor in self.student.named_predictors(): augmented_demos = self.name2traces[name][: self.max_bootstrapped_demos] sample_size = min(self.max_labeled_demos - len(augmented_demos), len(raw_demos)) sample_size = max(0, sample_size) raw_demos = rng.sample(raw_demos, sample_size) predictor.demos = augmented_demos + raw_demos return self.student ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/email_extraction/index.md: -------------------------------------------------------------------------------- ```markdown # Extracting Information from Emails with DSPy This tutorial demonstrates how to build an intelligent email processing system using DSPy. We'll create a system that can automatically extract key information from various types of emails, classify their intent, and structure the data for further processing. ## What You'll Build By the end of this tutorial, you'll have a DSPy-powered email processing system that can: - **Classify email types** (order confirmation, support request, meeting invitation, etc.) - **Extract key entities** (dates, amounts, product names, contact info) - **Determine urgency levels** and required actions - **Structure extracted data** into consistent formats - **Handle multiple email formats** robustly ## Prerequisites - Basic understanding of DSPy modules and signatures - Python 3.9+ installed - OpenAI API key (or access to another supported LLM) ## Installation and Setup ```bash pip install dspy ``` <details> <summary>Recommended: Set up MLflow Tracing to understand what's happening under the hood.</summary> ### MLflow DSPy Integration <a href="https://mlflow.org/">MLflow</a> is an LLMOps tool that natively integrates with DSPy and offer explainability and experiment tracking. In this tutorial, you can use MLflow to visualize prompts and optimization progress as traces to understand the DSPy's behavior better. You can set up MLflow easily by following the four steps below.  1. Install MLflow ```bash %pip install mlflow>=3.0.0 ``` 2. Start MLflow UI in a separate terminal ```bash mlflow ui --port 5000 --backend-store-uri sqlite:///mlruns.db ``` 3. Connect the notebook to MLflow ```python import mlflow mlflow.set_tracking_uri("http://localhost:5000") mlflow.set_experiment("DSPy") ``` 4. Enabling tracing. ```python mlflow.dspy.autolog() ``` To learn more about the integration, visit [MLflow DSPy Documentation](https://mlflow.org/docs/latest/llms/dspy/index.html) as well. </details> ## Step 1: Define Our Data Structures First, let's define the types of information we want to extract from emails: ```python import dspy from typing import List, Optional, Literal from datetime import datetime from pydantic import BaseModel from enum import Enum class EmailType(str, Enum): ORDER_CONFIRMATION = "order_confirmation" SUPPORT_REQUEST = "support_request" MEETING_INVITATION = "meeting_invitation" NEWSLETTER = "newsletter" PROMOTIONAL = "promotional" INVOICE = "invoice" SHIPPING_NOTIFICATION = "shipping_notification" OTHER = "other" class UrgencyLevel(str, Enum): LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" class ExtractedEntity(BaseModel): entity_type: str value: str confidence: float ``` ## Step 2: Create DSPy Signatures Now let's define the signatures for our email processing pipeline: ```python class ClassifyEmail(dspy.Signature): """Classify the type and urgency of an email based on its content.""" email_subject: str = dspy.InputField(desc="The subject line of the email") email_body: str = dspy.InputField(desc="The main content of the email") sender: str = dspy.InputField(desc="Email sender information") email_type: EmailType = dspy.OutputField(desc="The classified type of email") urgency: UrgencyLevel = dspy.OutputField(desc="The urgency level of the email") reasoning: str = dspy.OutputField(desc="Brief explanation of the classification") class ExtractEntities(dspy.Signature): """Extract key entities and information from email content.""" email_content: str = dspy.InputField(desc="The full email content including subject and body") email_type: EmailType = dspy.InputField(desc="The classified type of email") key_entities: list[ExtractedEntity] = dspy.OutputField(desc="List of extracted entities with type, value, and confidence") financial_amount: Optional[float] = dspy.OutputField(desc="Any monetary amounts found (e.g., '$99.99')") important_dates: list[str] = dspy.OutputField(desc="List of important dates found in the email") contact_info: list[str] = dspy.OutputField(desc="Relevant contact information extracted") class GenerateActionItems(dspy.Signature): """Determine what actions are needed based on the email content and extracted information.""" email_type: EmailType = dspy.InputField() urgency: UrgencyLevel = dspy.InputField() email_summary: str = dspy.InputField(desc="Brief summary of the email content") extracted_entities: list[ExtractedEntity] = dspy.InputField(desc="Key entities found in the email") action_required: bool = dspy.OutputField(desc="Whether any action is required") action_items: list[str] = dspy.OutputField(desc="List of specific actions needed") deadline: Optional[str] = dspy.OutputField(desc="Deadline for action if applicable") priority_score: int = dspy.OutputField(desc="Priority score from 1-10") class SummarizeEmail(dspy.Signature): """Create a concise summary of the email content.""" email_subject: str = dspy.InputField() email_body: str = dspy.InputField() key_entities: list[ExtractedEntity] = dspy.InputField() summary: str = dspy.OutputField(desc="A 2-3 sentence summary of the email's main points") ``` ## Step 3: Build the Email Processing Module Now let's create our main email processing module: ```python class EmailProcessor(dspy.Module): """A comprehensive email processing system using DSPy.""" def __init__(self): super().__init__() # Initialize our processing components self.classifier = dspy.ChainOfThought(ClassifyEmail) self.entity_extractor = dspy.ChainOfThought(ExtractEntities) self.action_generator = dspy.ChainOfThought(GenerateActionItems) self.summarizer = dspy.ChainOfThought(SummarizeEmail) def forward(self, email_subject: str, email_body: str, sender: str = ""): """Process an email and extract structured information.""" # Step 1: Classify the email classification = self.classifier( email_subject=email_subject, email_body=email_body, sender=sender ) # Step 2: Extract entities full_content = f"Subject: {email_subject}\n\nFrom: {sender}\n\n{email_body}" entities = self.entity_extractor( email_content=full_content, email_type=classification.email_type ) # Step 3: Generate summary summary = self.summarizer( email_subject=email_subject, email_body=email_body, key_entities=entities.key_entities ) # Step 4: Determine actions actions = self.action_generator( email_type=classification.email_type, urgency=classification.urgency, email_summary=summary.summary, extracted_entities=entities.key_entities ) # Step 5: Structure the results return dspy.Prediction( email_type=classification.email_type, urgency=classification.urgency, summary=summary.summary, key_entities=entities.key_entities, financial_amount=entities.financial_amount, important_dates=entities.important_dates, action_required=actions.action_required, action_items=actions.action_items, deadline=actions.deadline, priority_score=actions.priority_score, reasoning=classification.reasoning, contact_info=entities.contact_info ) ``` ## Step 4: Running the Email Processing System Let's create a simple function to test our email processing system: ```python import os def run_email_processing_demo(): """Demonstration of the email processing system.""" # Configure DSPy lm = dspy.LM(model='openai/gpt-4o-mini') dspy.configure(lm=lm) os.environ["OPENAI_API_KEY"] = "<YOUR OPENAI KEY>" # Create our email processor processor = EmailProcessor() # Sample emails for testing sample_emails = [ { "subject": "Order Confirmation #12345 - Your MacBook Pro is on the way!", "body": """Dear John Smith, Thank you for your order! We're excited to confirm that your order #12345 has been processed. Order Details: - MacBook Pro 14-inch (Space Gray) - Order Total: $2,399.00 - Estimated Delivery: December 15, 2024 - Tracking Number: 1Z999AA1234567890 If you have any questions, please contact our support team at [email protected]. Best regards, TechStore Team""", "sender": "[email protected]" }, { "subject": "URGENT: Server Outage - Immediate Action Required", "body": """Hi DevOps Team, We're experiencing a critical server outage affecting our production environment. Impact: All users unable to access the platform Started: 2:30 PM EST Please join the emergency call immediately: +1-555-123-4567 This is our highest priority. Thanks, Site Reliability Team""", "sender": "[email protected]" }, { "subject": "Meeting Invitation: Q4 Planning Session", "body": """Hello team, You're invited to our Q4 planning session. When: Friday, December 20, 2024 at 2:00 PM - 4:00 PM EST Where: Conference Room A Please confirm your attendance by December 18th. Best, Sarah Johnson""", "sender": "[email protected]" } ] # Process each email and display results print("🚀 Email Processing Demo") print("=" * 50) for i, email in enumerate(sample_emails): print(f"\n📧 EMAIL {i+1}: {email['subject'][:50]}...") # Process the email result = processor( email_subject=email["subject"], email_body=email["body"], sender=email["sender"] ) # Display key results print(f" 📊 Type: {result.email_type}") print(f" 🚨 Urgency: {result.urgency}") print(f" 📝 Summary: {result.summary}") if result.financial_amount: print(f" 💰 Amount: ${result.financial_amount:,.2f}") if result.action_required: print(f" ✅ Action Required: Yes") if result.deadline: print(f" ⏰ Deadline: {result.deadline}") else: print(f" ✅ Action Required: No") # Run the demo if __name__ == "__main__": run_email_processing_demo() ``` ## Expected Output ``` 🚀 Email Processing Demo ================================================== 📧 EMAIL 1: Order Confirmation #12345 - Your MacBook Pro is on... 📊 Type: order_confirmation 🚨 Urgency: low 📝 Summary: The email confirms John Smith's order #12345 for a MacBook Pro 14-inch in Space Gray, totaling $2,399.00, with an estimated delivery date of December 15, 2024. It includes a tracking number and contact information for customer support. 💰 Amount: $2,399.00 ✅ Action Required: No 📧 EMAIL 2: URGENT: Server Outage - Immediate Action Required... 📊 Type: other 🚨 Urgency: critical 📝 Summary: The Site Reliability Team has reported a critical server outage that began at 2:30 PM EST, preventing all users from accessing the platform. They have requested the DevOps Team to join an emergency call immediately to address the issue. ✅ Action Required: Yes ⏰ Deadline: Immediately 📧 EMAIL 3: Meeting Invitation: Q4 Planning Session... 📊 Type: meeting_invitation 🚨 Urgency: medium 📝 Summary: Sarah Johnson has invited the team to a Q4 planning session on December 20, 2024, from 2:00 PM to 4:00 PM EST in Conference Room A. Attendees are asked to confirm their participation by December 18th. ✅ Action Required: Yes ⏰ Deadline: December 18th ``` ## Next Steps - **Add more email types** and refine classification (newsletter, promotional, etc.) - **Add integration** with email providers (Gmail API, Outlook, IMAP) - **Experiment with different LLMs** and optimization strategies - **Add multilingual support** for international email processing - **Optimization** for increasing the performance of your program ``` -------------------------------------------------------------------------------- /docs/docs/faqs.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 998 --- !!! warning "This page is outdated and may not be fully accurate in DSPy 2.5 and 2.6" # FAQs ## Is DSPy right for me? DSPy vs. other frameworks The **DSPy** philosophy and abstraction differ significantly from other libraries and frameworks, so it's usually straightforward to decide when **DSPy** is (or isn't) the right framework for your usecase. If you're a NLP/AI researcher (or a practitioner exploring new pipelines or new tasks), the answer is generally an invariable **yes**. If you're a practitioner doing other things, please read on. **DSPy vs. thin wrappers for prompts (OpenAI API, MiniChain, basic templating)** In other words: _Why can't I just write my prompts directly as string templates?_ Well, for extremely simple settings, this _might_ work just fine. (If you're familiar with neural networks, this is like expressing a tiny two-layer NN as a Python for-loop. It kinda works.) However, when you need higher quality (or manageable cost), then you need to iteratively explore multi-stage decomposition, improved prompting, data bootstrapping, careful finetuning, retrieval augmentation, and/or using smaller (or cheaper, or local) models. The true expressive power of building with foundation models lies in the interactions between these pieces. But every time you change one piece, you likely break (or weaken) multiple other components. **DSPy** cleanly abstracts away (_and_ powerfully optimizes) the parts of these interactions that are external to your actual system design. It lets you focus on designing the module-level interactions: the _same program_ expressed in 10 or 20 lines of **DSPy** can easily be compiled into multi-stage instructions for `GPT-4`, detailed prompts for `Llama2-13b`, or finetunes for `T5-base`. Oh, and you wouldn't need to maintain long, brittle, model-specific strings at the core of your project anymore. **DSPy vs. application development libraries like LangChain, LlamaIndex** LangChain and LlamaIndex target high-level application development; they offer _batteries-included_, pre-built application modules that plug in with your data or configuration. If you'd be happy to use a generic, off-the-shelf prompt for question answering over PDFs or standard text-to-SQL, you will find a rich ecosystem in these libraries. **DSPy** doesn't internally contain hand-crafted prompts that target specific applications. Instead, **DSPy** introduces a small set of much more powerful and general-purpose modules _that can learn to prompt (or finetune) your LM within your pipeline on your data_. when you change your data, make tweaks to your program's control flow, or change your target LM, the **DSPy compiler** can map your program into a new set of prompts (or finetunes) that are optimized specifically for this pipeline. Because of this, you may find that **DSPy** obtains the highest quality for your task, with the least effort, provided you're willing to implement (or extend) your own short program. In short, **DSPy** is for when you need a lightweight but automatically-optimizing programming model — not a library of predefined prompts and integrations. If you're familiar with neural networks: This is like the difference between PyTorch (i.e., representing **DSPy**) and HuggingFace Transformers (i.e., representing the higher-level libraries). **DSPy vs. generation control libraries like Guidance, LMQL, RELM, Outlines** These are all exciting new libraries for controlling the individual completions of LMs, e.g., if you want to enforce JSON output schema or constrain sampling to a particular regular expression. This is very useful in many settings, but it's generally focused on low-level, structured control of a single LM call. It doesn't help ensure the JSON (or structured output) you get is going to be correct or useful for your task. In contrast, **DSPy** automatically optimizes the prompts in your programs to align them with various task needs, which may also include producing valid structured outputs. That said, we are considering allowing **Signatures** in **DSPy** to express regex-like constraints that are implemented by these libraries. ## Basic Usage **How should I use DSPy for my task?** We wrote a [eight-step guide](learn/index.md) on this. In short, using DSPy is an iterative process. You first define your task and the metrics you want to maximize, and prepare a few example inputs — typically without labels (or only with labels for the final outputs, if your metric requires them). Then, you build your pipeline by selecting built-in layers (`modules`) to use, giving each layer a `signature` (input/output spec), and then calling your modules freely in your Python code. Lastly, you use a DSPy `optimizer` to compile your code into high-quality instructions, automatic few-shot examples, or updated LM weights for your LM. **How do I convert my complex prompt into a DSPy pipeline?** See the same answer above. **What do DSPy optimizers tune?** Or, _what does compiling actually do?_ Each optimizer is different, but they all seek to maximize a metric on your program by updating prompts or LM weights. Current DSPy `optimizers` can inspect your data, simulate traces through your program to generate good/bad examples of each step, propose or refine instructions for each step based on past results, finetune the weights of your LM on self-generated examples, or combine several of these to improve quality or cut cost. We'd love to merge new optimizers that explore a richer space: most manual steps you currently go through for prompt engineering, "synthetic data" generation, or self-improvement can probably generalized into a DSPy optimizer that acts on arbitrary LM programs. Other FAQs. We welcome PRs to add formal answers to each of these here. You will find the answer in existing issues, tutorials, or the papers for all or most of these. - **How do I get multiple outputs?** You can specify multiple output fields. For the short-form signature, you can list multiple outputs as comma separated values, following the "->" indicator (e.g. "inputs -> output1, output2"). For the long-form signature, you can include multiple `dspy.OutputField`s. - **How do I define my own metrics? Can metrics return a float?** You can define metrics as simply Python functions that process model generations and evaluate them based on user-defined requirements. Metrics can compare existent data (e.g. gold labels) to model predictions or they can be used to assess various components of an output using validation feedback from LMs (e.g. LLMs-as-Judges). Metrics can return `bool`, `int`, and `float` types scores. Check out the official [Metrics docs](learn/evaluation/metrics.md) to learn more about defining custom metrics and advanced evaluations using AI feedback and/or DSPy programs. - **How expensive or slow is compiling??** To reflect compiling metrics, we highlight an experiment for reference, compiling a program using the [BootstrapFewShotWithRandomSearch](api/optimizers/BootstrapFewShotWithRandomSearch.md) optimizer on the `gpt-3.5-turbo-1106` model over 7 candidate programs and 10 threads. We report that compiling this program takes around 6 minutes with 3200 API calls, 2.7 million input tokens and 156,000 output tokens, reporting a total cost of $3 USD (at the current pricing of the OpenAI model). Compiling DSPy `optimizers` naturally will incur additional LM calls, but we substantiate this overhead with minimalistic executions with the goal of maximizing performance. This invites avenues to enhance performance of smaller models by compiling DSPy programs with larger models to learn enhanced behavior during compile-time and propagate such behavior to the tested smaller model during inference-time. ## Deployment or Reproducibility Concerns - **How do I save a checkpoint of my compiled program?** Here is an example of saving/loading a compiled module: ```python cot_compiled = teleprompter.compile(CoT(), trainset=trainset, valset=devset) #Saving cot_compiled.save('compiled_cot_gsm8k.json') #Loading: cot = CoT() cot.load('compiled_cot_gsm8k.json') ``` - **How do I export for deployment?** Exporting DSPy programs is simply saving them as highlighted above! - **How do I search my own data?** Open source libraries such as [RAGautouille](https://github.com/bclavie/ragatouille) enable you to search for your own data through advanced retrieval models like ColBERT with tools to embed and index documents. Feel free to integrate such libraries to create searchable datasets while developing your DSPy programs! - **How do I turn off the cache? How do I export the cache?** From v2.5, you can turn off the cache by setting `cache` parameter in `dspy.LM` to `False`: ```python dspy.LM('openai/gpt-4o-mini', cache=False) ``` Your local cache will be saved to the global env directory `os.environ["DSP_CACHEDIR"]` or for notebooks `os.environ["DSP_NOTEBOOK_CACHEDIR"]`. You can usually set the cachedir to `os.path.join(repo_path, 'cache')` and export this cache from here: ```python os.environ["DSP_NOTEBOOK_CACHEDIR"] = os.path.join(os.getcwd(), 'cache') ``` !!! warning "Important" `DSP_CACHEDIR` is responsible for old clients (including dspy.OpenAI, dspy.ColBERTv2, etc.) and `DSPY_CACHEDIR` is responsible for the new dspy.LM client. In the AWS lambda deployment, you should disable both DSP_\* and DSPY_\*. ## Advanced Usage - **How do I parallelize?** You can parallelize DSPy programs during both compilation and evaluation by specifying multiple thread settings in the respective DSPy `optimizers` or within the `dspy.Evaluate` utility function. - **How do freeze a module?** Modules can be frozen by setting their `._compiled` attribute to be True, indicating the module has gone through optimizer compilation and should not have its parameters adjusted. This is handled internally in optimizers such as `dspy.BootstrapFewShot` where the student program is ensured to be frozen before the teacher propagates the gathered few-shot demonstrations in the bootstrapping process. - **How do I use DSPy assertions?** a) **How to Add Assertions to Your Program**: - **Define Constraints**: Use `dspy.Assert` and/or `dspy.Suggest` to define constraints within your DSPy program. These are based on boolean validation checks for the outcomes you want to enforce, which can simply be Python functions to validate the model outputs. - **Integrating Assertions**: Keep your Assertion statements following a model generations (hint: following a module layer) b) **How to Activate the Assertions**: 1. **Using `assert_transform_module`**: - Wrap your DSPy module with assertions using the `assert_transform_module` function, along with a `backtrack_handler`. This function transforms your program to include internal assertions backtracking and retry logic, which can be customized as well: `program_with_assertions = assert_transform_module(ProgramWithAssertions(), backtrack_handler)` 2. **Activate Assertions**: - Directly call `activate_assertions` on your DSPy program with assertions: `program_with_assertions = ProgramWithAssertions().activate_assertions()` **Note**: To use Assertions properly, you must **activate** a DSPy program that includes `dspy.Assert` or `dspy.Suggest` statements from either of the methods above. ## Errors - **How do I deal with "context too long" errors?** If you're dealing with "context too long" errors in DSPy, you're likely using DSPy optimizers to include demonstrations within your prompt, and this is exceeding your current context window. Try reducing these parameters (e.g. `max_bootstrapped_demos` and `max_labeled_demos`). Additionally, you can also reduce the number of retrieved passages/docs/embeddings to ensure your prompt is fitting within your model context length. A more general fix is simply increasing the number of `max_tokens` specified to the LM request (e.g. `lm = dspy.OpenAI(model = ..., max_tokens = ...`). ## Set Verbose Level DSPy utilizes the [logging library](https://docs.python.org/3/library/logging.html) to print logs. If you want to debug your DSPy code, set the logging level to DEBUG with the example code below. ```python import logging logging.getLogger("dspy").setLevel(logging.DEBUG) ``` Alternatively, if you want to reduce the amount of logs, set the logging level to WARNING or ERROR. ```python import logging logging.getLogger("dspy").setLevel(logging.WARNING) ``` ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/modules.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 3 --- # Modules A **DSPy module** is a building block for programs that use LMs. - Each built-in module abstracts a **prompting technique** (like chain of thought or ReAct). Crucially, they are generalized to handle any signature. - A DSPy module has **learnable parameters** (i.e., the little pieces comprising the prompt and the LM weights) and can be invoked (called) to process inputs and return outputs. - Multiple modules can be composed into bigger modules (programs). DSPy modules are inspired directly by NN modules in PyTorch, but applied to LM programs. ## How do I use a built-in module, like `dspy.Predict` or `dspy.ChainOfThought`? Let's start with the most fundamental module, `dspy.Predict`. Internally, all other DSPy modules are built using `dspy.Predict`. We'll assume you are already at least a little familiar with [DSPy signatures](signatures.md), which are declarative specs for defining the behavior of any module we use in DSPy. To use a module, we first **declare** it by giving it a signature. Then we **call** the module with the input arguments, and extract the output fields! ```python sentence = "it's a charming and often affecting journey." # example from the SST-2 dataset. # 1) Declare with a signature. classify = dspy.Predict('sentence -> sentiment: bool') # 2) Call with input argument(s). response = classify(sentence=sentence) # 3) Access the output. print(response.sentiment) ``` **Output:** ```text True ``` When we declare a module, we can pass configuration keys to it. Below, we'll pass `n=5` to request five completions. We can also pass `temperature` or `max_len`, etc. Let's use `dspy.ChainOfThought`. In many cases, simply swapping `dspy.ChainOfThought` in place of `dspy.Predict` improves quality. ```python question = "What's something great about the ColBERT retrieval model?" # 1) Declare with a signature, and pass some config. classify = dspy.ChainOfThought('question -> answer', n=5) # 2) Call with input argument. response = classify(question=question) # 3) Access the outputs. response.completions.answer ``` **Possible Output:** ```text ['One great thing about the ColBERT retrieval model is its superior efficiency and effectiveness compared to other models.', 'Its ability to efficiently retrieve relevant information from large document collections.', 'One great thing about the ColBERT retrieval model is its superior performance compared to other models and its efficient use of pre-trained language models.', 'One great thing about the ColBERT retrieval model is its superior efficiency and accuracy compared to other models.', 'One great thing about the ColBERT retrieval model is its ability to incorporate user feedback and support complex queries.'] ``` Let's discuss the output object here. The `dspy.ChainOfThought` module will generally inject a `reasoning` before the output field(s) of your signature. Let's inspect the (first) reasoning and answer! ```python print(f"Reasoning: {response.reasoning}") print(f"Answer: {response.answer}") ``` **Possible Output:** ```text Reasoning: We can consider the fact that ColBERT has shown to outperform other state-of-the-art retrieval models in terms of efficiency and effectiveness. It uses contextualized embeddings and performs document retrieval in a way that is both accurate and scalable. Answer: One great thing about the ColBERT retrieval model is its superior efficiency and effectiveness compared to other models. ``` This is accessible whether we request one or many completions. We can also access the different completions as a list of `Prediction`s or as several lists, one for each field. ```python response.completions[3].reasoning == response.completions.reasoning[3] ``` **Output:** ```text True ``` ## What other DSPy modules are there? How can I use them? The others are very similar. They mainly change the internal behavior with which your signature is implemented! 1. **`dspy.Predict`**: Basic predictor. Does not modify the signature. Handles the key forms of learning (i.e., storing the instructions and demonstrations and updates to the LM). 2. **`dspy.ChainOfThought`**: Teaches the LM to think step-by-step before committing to the signature's response. 3. **`dspy.ProgramOfThought`**: Teaches the LM to output code, whose execution results will dictate the response. 4. **`dspy.ReAct`**: An agent that can use tools to implement the given signature. 5. **`dspy.MultiChainComparison`**: Can compare multiple outputs from `ChainOfThought` to produce a final prediction. We also have some function-style modules: 6. **`dspy.majority`**: Can do basic voting to return the most popular response from a set of predictions. !!! info "A few examples of DSPy modules on simple tasks." Try the examples below after configuring your `lm`. Adjust the fields to explore what tasks your LM can do well out of the box. === "Math" ```python linenums="1" math = dspy.ChainOfThought("question -> answer: float") math(question="Two dice are tossed. What is the probability that the sum equals two?") ``` **Possible Output:** ```text Prediction( reasoning='When two dice are tossed, each die has 6 faces, resulting in a total of 6 x 6 = 36 possible outcomes. The sum of the numbers on the two dice equals two only when both dice show a 1. This is just one specific outcome: (1, 1). Therefore, there is only 1 favorable outcome. The probability of the sum being two is the number of favorable outcomes divided by the total number of possible outcomes, which is 1/36.', answer=0.0277776 ) ``` === "Retrieval-Augmented Generation" ```python linenums="1" def search(query: str) -> list[str]: """Retrieves abstracts from Wikipedia.""" results = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')(query, k=3) return [x['text'] for x in results] rag = dspy.ChainOfThought('context, question -> response') question = "What's the name of the castle that David Gregory inherited?" rag(context=search(question), question=question) ``` **Possible Output:** ```text Prediction( reasoning='The context provides information about David Gregory, a Scottish physician and inventor. It specifically mentions that he inherited Kinnairdy Castle in 1664. This detail directly answers the question about the name of the castle that David Gregory inherited.', response='Kinnairdy Castle' ) ``` === "Classification" ```python linenums="1" from typing import Literal class Classify(dspy.Signature): """Classify sentiment of a given sentence.""" sentence: str = dspy.InputField() sentiment: Literal['positive', 'negative', 'neutral'] = dspy.OutputField() confidence: float = dspy.OutputField() classify = dspy.Predict(Classify) classify(sentence="This book was super fun to read, though not the last chapter.") ``` **Possible Output:** ```text Prediction( sentiment='positive', confidence=0.75 ) ``` === "Information Extraction" ```python linenums="1" text = "Apple Inc. announced its latest iPhone 14 today. The CEO, Tim Cook, highlighted its new features in a press release." module = dspy.Predict("text -> title, headings: list[str], entities_and_metadata: list[dict[str, str]]") response = module(text=text) print(response.title) print(response.headings) print(response.entities_and_metadata) ``` **Possible Output:** ```text Apple Unveils iPhone 14 ['Introduction', 'Key Features', "CEO's Statement"] [{'entity': 'Apple Inc.', 'type': 'Organization'}, {'entity': 'iPhone 14', 'type': 'Product'}, {'entity': 'Tim Cook', 'type': 'Person'}] ``` === "Agents" ```python linenums="1" def evaluate_math(expression: str) -> float: return dspy.PythonInterpreter({}).execute(expression) def search_wikipedia(query: str) -> str: results = dspy.ColBERTv2(url='http://20.102.90.50:2017/wiki17_abstracts')(query, k=3) return [x['text'] for x in results] react = dspy.ReAct("question -> answer: float", tools=[evaluate_math, search_wikipedia]) pred = react(question="What is 9362158 divided by the year of birth of David Gregory of Kinnairdy castle?") print(pred.answer) ``` **Possible Output:** ```text 5761.328 ``` ## How do I compose multiple modules into a bigger program? DSPy is just Python code that uses modules in any control flow you like, with a little magic internally at `compile` time to trace your LM calls. What this means is that, you can just call the modules freely. See tutorials like [multi-hop search](https://dspy.ai/tutorials/multihop_search/), whose module is reproduced below as an example. ```python linenums="1" class Hop(dspy.Module): def __init__(self, num_docs=10, num_hops=4): self.num_docs, self.num_hops = num_docs, num_hops self.generate_query = dspy.ChainOfThought('claim, notes -> query') self.append_notes = dspy.ChainOfThought('claim, notes, context -> new_notes: list[str], titles: list[str]') def forward(self, claim: str) -> list[str]: notes = [] titles = [] for _ in range(self.num_hops): query = self.generate_query(claim=claim, notes=notes).query context = search(query, k=self.num_docs) prediction = self.append_notes(claim=claim, notes=notes, context=context) notes.extend(prediction.new_notes) titles.extend(prediction.titles) return dspy.Prediction(notes=notes, titles=list(set(titles))) ``` Then you can create a instance of the custom module class `Hop`, then invoke it by the `__call__` method: ``` hop = Hop() print(hop(claim="Stephen Curry is the best 3 pointer shooter ever in the human history")) ``` ## How do I track LM usage? !!! note "Version Requirement" LM usage tracking is available in DSPy version 2.6.16 and later. DSPy provides built-in tracking of language model usage across all module calls. To enable tracking: ```python dspy.settings.configure(track_usage=True) ``` Once enabled, you can access usage statistics from any `dspy.Prediction` object: ```python usage = prediction_instance.get_lm_usage() ``` The usage data is returned as a dictionary that maps each language model name to its usage statistics. Here's a complete example: ```python import dspy # Configure DSPy with tracking enabled dspy.settings.configure( lm=dspy.LM("openai/gpt-4o-mini", cache=False), track_usage=True ) # Define a simple program that makes multiple LM calls class MyProgram(dspy.Module): def __init__(self): self.predict1 = dspy.ChainOfThought("question -> answer") self.predict2 = dspy.ChainOfThought("question, answer -> score") def __call__(self, question: str) -> str: answer = self.predict1(question=question) score = self.predict2(question=question, answer=answer) return score # Run the program and check usage program = MyProgram() output = program(question="What is the capital of France?") print(output.get_lm_usage()) ``` This will output usage statistics like: ```python { 'openai/gpt-4o-mini': { 'completion_tokens': 61, 'prompt_tokens': 260, 'total_tokens': 321, 'completion_tokens_details': { 'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0, 'text_tokens': None }, 'prompt_tokens_details': { 'audio_tokens': 0, 'cached_tokens': 0, 'text_tokens': None, 'image_tokens': None } } } ``` When using DSPy's caching features (either in-memory or on-disk via litellm), cached responses won't count toward usage statistics. For example: ```python # Enable caching dspy.settings.configure( lm=dspy.LM("openai/gpt-4o-mini", cache=True), track_usage=True ) program = MyProgram() # First call - will show usage statistics output = program(question="What is the capital of Zambia?") print(output.get_lm_usage()) # Shows token usage # Second call - same question, will use cache output = program(question="What is the capital of Zambia?") print(output.get_lm_usage()) # Shows empty dict: {} ``` ``` -------------------------------------------------------------------------------- /dspy/adapters/json_adapter.py: -------------------------------------------------------------------------------- ```python import json import logging from typing import Any, get_origin import json_repair import litellm import pydantic import regex from pydantic.fields import FieldInfo from dspy.adapters.chat_adapter import ChatAdapter, FieldInfoWithName from dspy.adapters.types.tool import ToolCalls from dspy.adapters.utils import ( format_field_value, get_annotation_name, parse_value, serialize_for_json, translate_field_type, ) from dspy.clients.lm import LM from dspy.signatures.signature import Signature, SignatureMeta from dspy.utils.callback import BaseCallback from dspy.utils.exceptions import AdapterParseError logger = logging.getLogger(__name__) def _has_open_ended_mapping(signature: SignatureMeta) -> bool: """ Check whether any output field in the signature has an open-ended mapping type, such as dict[str, Any]. Structured Outputs require explicit properties, so such fields are incompatible. """ for field in signature.output_fields.values(): annotation = field.annotation if get_origin(annotation) is dict: return True return False class JSONAdapter(ChatAdapter): def __init__(self, callbacks: list[BaseCallback] | None = None, use_native_function_calling: bool = True): # JSONAdapter uses native function calling by default. super().__init__(callbacks=callbacks, use_native_function_calling=use_native_function_calling) def _json_adapter_call_common(self, lm, lm_kwargs, signature, demos, inputs, call_fn): """Common call logic to be used for both sync and async calls.""" provider = lm.model.split("/", 1)[0] or "openai" params = litellm.get_supported_openai_params(model=lm.model, custom_llm_provider=provider) if not params or "response_format" not in params: return call_fn(lm, lm_kwargs, signature, demos, inputs) has_tool_calls = any(field.annotation == ToolCalls for field in signature.output_fields.values()) # Some models support json mode but not structured outputs # Follows guidance from: https://docs.litellm.ai/docs/completion/json_mode#check-model-support supports_structured_outputs = litellm.supports_response_schema(model=lm.model, custom_llm_provider=provider) if _has_open_ended_mapping(signature) or (not self.use_native_function_calling and has_tool_calls) or not supports_structured_outputs: # We found that structured output mode doesn't work well with dspy.ToolCalls as output field. # So we fall back to json mode if native function calling is disabled and ToolCalls is present. lm_kwargs["response_format"] = {"type": "json_object"} return call_fn(lm, lm_kwargs, signature, demos, inputs) def __call__( self, lm: LM, lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: result = self._json_adapter_call_common(lm, lm_kwargs, signature, demos, inputs, super().__call__) if result: return result try: structured_output_model = _get_structured_outputs_response_format( signature, self.use_native_function_calling ) lm_kwargs["response_format"] = structured_output_model return super().__call__(lm, lm_kwargs, signature, demos, inputs) except Exception: logger.warning("Failed to use structured output format, falling back to JSON mode.") lm_kwargs["response_format"] = {"type": "json_object"} return super().__call__(lm, lm_kwargs, signature, demos, inputs) async def acall( self, lm: LM, lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: result = self._json_adapter_call_common(lm, lm_kwargs, signature, demos, inputs, super().acall) if result: return await result try: structured_output_model = _get_structured_outputs_response_format(signature) lm_kwargs["response_format"] = structured_output_model return await super().acall(lm, lm_kwargs, signature, demos, inputs) except Exception: logger.warning("Failed to use structured output format, falling back to JSON mode.") lm_kwargs["response_format"] = {"type": "json_object"} return await super().acall(lm, lm_kwargs, signature, demos, inputs) def format_field_structure(self, signature: type[Signature]) -> str: parts = [] parts.append("All interactions will be structured in the following way, with the appropriate values filled in.") def format_signature_fields_for_instructions(fields: dict[str, FieldInfo], role: str): return self.format_field_with_value( fields_with_values={ FieldInfoWithName(name=field_name, info=field_info): translate_field_type(field_name, field_info) for field_name, field_info in fields.items() }, role=role, ) parts.append("Inputs will have the following structure:") parts.append(format_signature_fields_for_instructions(signature.input_fields, role="user")) parts.append("Outputs will be a JSON object with the following fields.") parts.append(format_signature_fields_for_instructions(signature.output_fields, role="assistant")) return "\n\n".join(parts).strip() def user_message_output_requirements(self, signature: type[Signature]) -> str: def type_info(v): return ( f" (must be formatted as a valid Python {get_annotation_name(v.annotation)})" if v.annotation is not str else "" ) message = "Respond with a JSON object in the following order of fields: " message += ", then ".join(f"`{f}`{type_info(v)}" for f, v in signature.output_fields.items()) message += "." return message def format_assistant_message_content( self, signature: type[Signature], outputs: dict[str, Any], missing_field_message=None, ) -> str: fields_with_values = { FieldInfoWithName(name=k, info=v): outputs.get(k, missing_field_message) for k, v in signature.output_fields.items() } return self.format_field_with_value(fields_with_values, role="assistant") def parse(self, signature: type[Signature], completion: str) -> dict[str, Any]: pattern = r"\{(?:[^{}]|(?R))*\}" match = regex.search(pattern, completion, regex.DOTALL) if match: completion = match.group(0) fields = json_repair.loads(completion) if not isinstance(fields, dict): raise AdapterParseError( adapter_name="JSONAdapter", signature=signature, lm_response=completion, message="LM response cannot be serialized to a JSON object.", ) fields = {k: v for k, v in fields.items() if k in signature.output_fields} # Attempt to cast each value to type signature.output_fields[k].annotation. for k, v in fields.items(): if k in signature.output_fields: fields[k] = parse_value(v, signature.output_fields[k].annotation) if fields.keys() != signature.output_fields.keys(): raise AdapterParseError( adapter_name="JSONAdapter", signature=signature, lm_response=completion, parsed_result=fields, ) return fields def format_field_with_value(self, fields_with_values: dict[FieldInfoWithName, Any], role: str = "user") -> str: """ Formats the values of the specified fields according to the field's DSPy type (input or output), annotation (e.g. str, int, etc.), and the type of the value itself. Joins the formatted values into a single string, which is a multiline string if there are multiple fields. Args: fields_with_values: A dictionary mapping information about a field to its corresponding value. Returns: The joined formatted values of the fields, represented as a string. """ if role == "user": output = [] for field, field_value in fields_with_values.items(): formatted_field_value = format_field_value(field_info=field.info, value=field_value) output.append(f"[[ ## {field.name} ## ]]\n{formatted_field_value}") return "\n\n".join(output).strip() else: d = fields_with_values.items() d = {k.name: v for k, v in d} return json.dumps(serialize_for_json(d), indent=2) def format_finetune_data( self, signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], outputs: dict[str, Any] ) -> dict[str, list[Any]]: # TODO: implement format_finetune_data method in JSONAdapter raise NotImplementedError def _get_structured_outputs_response_format( signature: SignatureMeta, use_native_function_calling: bool = True, ) -> type[pydantic.BaseModel]: """ Builds a Pydantic model from a DSPy signature's output_fields and ensures the generated JSON schema is compatible with OpenAI Structured Outputs (all objects have a "required" key listing every property, and additionalProperties is always false). IMPORTANT: If any field's annotation is an open-ended mapping (e.g. dict[str, Any]), then a structured schema cannot be generated since all properties must be explicitly declared. In that case, an exception is raised so that the caller can fall back to using a plain "json_object" response_format. """ # Although we've already performed an early check, we keep this here as a final guard. for name, field in signature.output_fields.items(): annotation = field.annotation if get_origin(annotation) is dict: raise ValueError( f"Field '{name}' has an open-ended mapping type which is not supported by Structured Outputs." ) fields = {} for name, field in signature.output_fields.items(): annotation = field.annotation if use_native_function_calling and annotation == ToolCalls: # Skip ToolCalls field if native function calling is enabled. continue default = field.default if hasattr(field, "default") else ... fields[name] = (annotation, default) # Build the model with extra fields forbidden. pydantic_model = pydantic.create_model( "DSPyProgramOutputs", __config__=pydantic.ConfigDict(extra="forbid"), **fields, ) # Generate the initial schema. schema = pydantic_model.model_json_schema() # Remove any DSPy-specific metadata. for prop in schema.get("properties", {}).values(): prop.pop("json_schema_extra", None) def enforce_required(schema_part: dict): """ Recursively ensure that: - for any object schema, a "required" key is added with all property names (or [] if no properties) - additionalProperties is set to False regardless of the previous value. - the same enforcement is run for nested arrays and definitions. """ if schema_part.get("type") == "object": props = schema_part.get("properties") if props is not None: # For objects with explicitly declared properties: schema_part["required"] = list(props.keys()) schema_part["additionalProperties"] = False for sub_schema in props.values(): if isinstance(sub_schema, dict): enforce_required(sub_schema) else: # For objects with no properties (should not happen normally but a fallback). schema_part["properties"] = {} schema_part["required"] = [] schema_part["additionalProperties"] = False if schema_part.get("type") == "array" and isinstance(schema_part.get("items"), dict): enforce_required(schema_part["items"]) # Also enforce in any nested definitions / $defs. for key in ("$defs", "definitions"): if key in schema_part: for def_schema in schema_part[key].values(): enforce_required(def_schema) enforce_required(schema) # Override the model's JSON schema generation to return our precomputed schema. pydantic_model.model_json_schema = lambda *args, **kwargs: schema return pydantic_model ``` -------------------------------------------------------------------------------- /tests/teleprompt/test_gepa_instruction_proposer.py: -------------------------------------------------------------------------------- ```python from dataclasses import dataclass from typing import Any import dspy from dspy.teleprompt.gepa import instruction_proposal from dspy.utils.dummies import DummyLM def count_messages_with_image_url_pattern(messages): """Helper to count image URLs in messages - borrowed from image adapter tests""" pattern = {"type": "image_url", "image_url": {"url": lambda x: isinstance(x, str)}} try: def check_pattern(obj, pattern): if isinstance(pattern, dict): if not isinstance(obj, dict): return False return all(k in obj and check_pattern(obj[k], v) for k, v in pattern.items()) if callable(pattern): return pattern(obj) return obj == pattern def count_patterns(obj, pattern): count = 0 if check_pattern(obj, pattern): count += 1 if isinstance(obj, dict): count += sum(count_patterns(v, pattern) for v in obj.values()) if isinstance(obj, (list, tuple)): count += sum(count_patterns(v, pattern) for v in obj) return count return count_patterns(messages, pattern) except Exception: return 0 @dataclass class ImagesInHistory: has_structured_images: bool has_text_serialized_images: bool def check_images_in_history(history: list[Any]) -> ImagesInHistory: def check_text_serialized(item: Any) -> bool: if isinstance(item, list): return any(check_text_serialized(i) for i in item) if isinstance(item, dict): return any(check_text_serialized(i) for i in item.values()) if isinstance(item, str): return "CUSTOM-TYPE-START-IDENTIFIER" in item return False has_structured_images = False for call in history: if call.get("messages"): image_count = count_messages_with_image_url_pattern(call["messages"]) if image_count > 0: has_structured_images = True break return ImagesInHistory( has_structured_images=has_structured_images, has_text_serialized_images=any(check_text_serialized(i) for i in history), ) def test_reflection_lm_gets_structured_images(): """ Verify reflection LM receives structured image messages, not serialized text. """ student = dspy.Predict("image: dspy.Image -> label: str") image = dspy.Image("https://example.com/test.jpg") example = dspy.Example(image=image, label="dog").with_inputs("image") reflection_lm = DummyLM( [ {"improved_instruction": "Better instruction"}, {"improved_instruction": "Enhanced visual analysis instruction"}, {"improved_instruction": "Focus on key features"}, {"improved_instruction": "Analyze visual patterns systematically"}, {"improved_instruction": "Consider distinctive visual elements"}, {"improved_instruction": "Enhance recognition accuracy"}, {"improved_instruction": "Improve classification methodology"}, ] ) lm = DummyLM( [ {"label": "cat"}, {"label": "dog"}, {"label": "animal"}, {"label": "pet"}, {"label": "feline"}, {"label": "canine"}, {"label": "mammal"}, {"label": "creature"}, {"label": "species"}, {"label": "domestic"}, {"label": "wild"}, {"label": "carnivore"}, {"label": "herbivore"}, {"label": "quadruped"}, {"label": "vertebrate"}, ] ) dspy.settings.configure(lm=lm) gepa = dspy.GEPA( metric=lambda gold, pred, trace=None, pred_name=None, pred_trace=None: 0.3, max_metric_calls=2, reflection_lm=reflection_lm, instruction_proposer=instruction_proposal.MultiModalInstructionProposer(), ) gepa.compile(student, trainset=[example], valset=[example]) assert len(lm.history) > 0, "LM should have been called" assert len(reflection_lm.history) > 0, "Reflection LM should have been called" images_in_history = check_images_in_history(reflection_lm.history) assert images_in_history.has_structured_images, "Reflection LM should have received structured images" assert not images_in_history.has_text_serialized_images, "Reflection LM received serialized images in prompts" def test_custom_proposer_without_reflection_lm(): """Test that custom instruction proposers can work without reflection_lm when using updated GEPA core.""" # External reflection LM managed by the custom proposer external_reflection_lm = DummyLM( [ {"improved_instruction": "External LM response"}, {"improved_instruction": "Enhanced instruction"}, {"improved_instruction": "Better guidance"}, {"improved_instruction": "Optimized instruction"}, {"improved_instruction": "Refined approach"}, ] ) class ProposerWithExternalLM: def __call__(self, candidate, reflective_dataset, components_to_update): # This proposer manages its own external reflection LM with dspy.context(lm=external_reflection_lm): # Use external LM for reflection (optional - could be any custom logic) external_reflection_lm([{"role": "user", "content": "Improve this instruction"}]) return {name: f"Externally-improved: {candidate[name]}" for name in components_to_update} student = dspy.Predict("text -> label") example = dspy.Example(text="test input", label="test").with_inputs("text") # Use a robust dummy LM with enough responses for optimization steps lm = DummyLM( [ {"label": "test"}, {"label": "result"}, {"label": "output"}, {"label": "response"}, {"label": "classification"}, {"label": "prediction"}, {"label": "category"}, {"label": "type"}, {"label": "class"}, {"label": "group"}, {"label": "kind"}, {"label": "variant"}, {"label": "form"}, {"label": "style"}, {"label": "mode"}, ] ) dspy.settings.configure(lm=lm) # Test the full flexibility: no reflection_lm provided to GEPA at all! # The updated GEPA core library now allows this when using custom proposers gepa = dspy.GEPA( metric=lambda gold, pred, trace=None, pred_name=None, pred_trace=None: 0.7, # Score to trigger optimization max_metric_calls=5, # More calls to allow proper optimization reflection_lm=None, # No reflection_lm provided - this now works! instruction_proposer=ProposerWithExternalLM(), ) result = gepa.compile(student, trainset=[example], valset=[example]) assert result is not None assert len(lm.history) > 0, "Main LM should have been called" assert len(external_reflection_lm.history) > 0, "External reflection LM should have been called by custom proposer" def test_image_serialization_into_strings(): """ Test that demonstrates the image serialization problem when calling lm directly with serialized image data. """ class InstructionProposerCallingLMDirectly: def __call__( self, candidate: dict[str, str], reflective_dataset: dict[str, list[dict[str, Any]]], components_to_update: list[str], ) -> dict[str, str]: updated_components = {} for component_name in components_to_update: if component_name not in candidate or component_name not in reflective_dataset: continue current_instruction = candidate[component_name] component_data = reflective_dataset[component_name] feedback_analysis = "Feedback analysis:\n" for i, example in enumerate(component_data): feedback_analysis += f"Example {i + 1}:\n" # Non ideal approach: extract and serialize image objects directly inputs = example.get("Inputs", {}) for key, value in inputs.items(): feedback_analysis += f" {key}: {value}\n" outputs = example.get("Generated Outputs", {}) feedback = example.get("Feedback", "") feedback_analysis += f" Outputs: {outputs}\n" feedback_analysis += f" Feedback: {feedback}\n\n" context_lm = dspy.settings.lm messages = [ {"role": "system", "content": "You are an instruction improvement assistant."}, { "role": "user", "content": f"Current instruction: {current_instruction}\n\nFeedback: {feedback_analysis}\n\nProvide an improved instruction:", }, ] result = context_lm(messages=messages) updated_components[component_name] = result[0] return updated_components direct_lm_call_proposer = InstructionProposerCallingLMDirectly() student = dspy.Predict("image -> label") image = dspy.Image("https://picsum.photos/id/237/200/300") examples = [ dspy.Example(image=image, label="cat").with_inputs("image"), dspy.Example(image=image, label="animal").with_inputs("image"), ] lm = DummyLM( [ {"label": "cat"}, {"label": "dog"}, {"label": "animal"}, {"label": "pet"}, {"label": "feline"}, {"label": "mammal"}, {"label": "creature"}, {"label": "species"}, {"label": "domestic"}, {"label": "wild"}, {"label": "carnivore"}, {"label": "herbivore"}, ] ) dspy.settings.configure(lm=lm) reflection_lm = DummyLM( [ {"improved_instruction": "Be more specific about image analysis"}, {"improved_instruction": "Focus on visual features when classifying"}, {"improved_instruction": "Consider contextual clues in the image"}, {"improved_instruction": "Analyze shape, color, and texture patterns"}, {"improved_instruction": "Look for distinguishing characteristics"}, ] ) gepa = dspy.GEPA( metric=lambda gold, pred, trace=None, pred_name=None, pred_trace=None: 0.3, max_metric_calls=5, reflection_lm=reflection_lm, instruction_proposer=direct_lm_call_proposer, ) gepa.compile(student, trainset=examples, valset=examples) assert len(lm.history) > 0, "LM should have been called" assert len(reflection_lm.history) > 0, "Reflection LM should have been called" images_in_history = check_images_in_history(reflection_lm.history) assert images_in_history.has_text_serialized_images, ( "Expected to find serialized images (CUSTOM-TYPE-START-IDENTIFIER)" ) def test_default_proposer(): student = dspy.Predict("image -> label") image = dspy.Image("https://picsum.photos/id/237/200/300") examples = [ dspy.Example(image=image, label="cat").with_inputs("image"), dspy.Example(image=image, label="animal").with_inputs("image"), ] lm = DummyLM( [ {"label": "cat"}, {"label": "dog"}, {"label": "animal"}, {"label": "pet"}, {"label": "feline"}, {"label": "mammal"}, {"label": "creature"}, {"label": "species"}, {"label": "domestic"}, {"label": "wild"}, {"label": "carnivore"}, {"label": "herbivore"}, ] ) dspy.settings.configure(lm=lm) reflection_lm = DummyLM( [ {"improved_instruction": "Be more specific about image analysis"}, {"improved_instruction": "Focus on visual features when classifying"}, {"improved_instruction": "Consider contextual clues in the image"}, {"improved_instruction": "Analyze shape, color, and texture patterns"}, {"improved_instruction": "Look for distinguishing characteristics"}, ] ) gepa = dspy.GEPA( metric=lambda gold, pred, trace=None, pred_name=None, pred_trace=None: 0.3, max_metric_calls=5, reflection_lm=reflection_lm, ) gepa.compile(student, trainset=examples, valset=examples) assert len(lm.history) > 0, "LM should have been called" assert len(reflection_lm.history) > 0, "Reflection LM should have been called" images_in_history = check_images_in_history(reflection_lm.history) assert images_in_history.has_text_serialized_images, ( "Expected to find serialized images (CUSTOM-TYPE-START-IDENTIFIER)" ) ``` -------------------------------------------------------------------------------- /dspy/teleprompt/bootstrap_finetune.py: -------------------------------------------------------------------------------- ```python import logging from collections import defaultdict from typing import Any, Callable import dspy from dspy.adapters.base import Adapter from dspy.adapters.chat_adapter import ChatAdapter from dspy.clients.lm import LM from dspy.clients.utils_finetune import infer_data_format from dspy.dsp.utils.settings import settings from dspy.predict.predict import Predict from dspy.primitives.example import Example from dspy.primitives.module import Module from dspy.teleprompt.bootstrap_trace import bootstrap_trace_data from dspy.teleprompt.teleprompt import Teleprompter logger = logging.getLogger(__name__) class FinetuneTeleprompter(Teleprompter): def __init__( self, train_kwargs: dict[str, Any] | dict[LM, dict[str, Any]] | None = None, ): self.train_kwargs: dict[LM, Any] = self.convert_to_lm_dict(train_kwargs or {}) @staticmethod def convert_to_lm_dict(arg) -> dict[LM, Any]: non_empty_dict = arg and isinstance(arg, dict) if non_empty_dict and all(isinstance(k, LM) for k in arg.keys()): return arg # Default to using the same value for all LMs return defaultdict(lambda: arg) class BootstrapFinetune(FinetuneTeleprompter): def __init__( self, metric: Callable | None = None, multitask: bool = True, train_kwargs: dict[str, Any] | dict[LM, dict[str, Any]] | None = None, adapter: Adapter | dict[LM, Adapter] | None = None, exclude_demos: bool = False, num_threads: int | None = None, ): # TODO(feature): Inputs train_kwargs (a dict with string keys) and # adapter (Adapter) can depend on the LM they are used with. We are # takingthese as parameters for the time being. However, they can be # attached to LMs themselves -- an LM could know which adapter it should # be used with along with the train_kwargs. This will lead the only # required argument for LM.finetune() to be the train dataset. super().__init__(train_kwargs=train_kwargs) self.metric = metric self.multitask = multitask self.adapter: dict[LM, Adapter] = self.convert_to_lm_dict(adapter) self.exclude_demos = exclude_demos self.num_threads = num_threads def compile( self, student: Module, trainset: list[Example], teacher: Module | list[Module] | None = None ) -> Module: # TODO: Print statements can be converted to logger.info if we ensure # that the default DSPy logger logs info level messages in notebook # environments. logger.info("Preparing the student and teacher programs...") all_predictors_have_lms(student) logger.info("Bootstrapping data...") trace_data = [] teachers = teacher if isinstance(teacher, list) else [teacher] teachers = [prepare_teacher(student, t) for t in teachers] num_threads = self.num_threads or dspy.settings.num_threads for t in teachers: trace_data += bootstrap_trace_data(program=t, dataset=trainset, metric=self.metric, num_threads=num_threads) logger.info("Preparing the train data...") key_to_data = {} for pred_ind, pred in enumerate(student.predictors()): data_pred_ind = None if self.multitask else pred_ind if pred.lm is None: raise ValueError( f"Predictor {pred_ind} does not have an LM assigned. " f"Please ensure the module's predictors have their LM set before fine-tuning. " f"You can set it using: your_module.set_lm(your_lm)" ) training_key = (pred.lm, data_pred_ind) if training_key not in key_to_data: train_data, data_format = self._prepare_finetune_data( trace_data=trace_data, lm=pred.lm, pred_ind=data_pred_ind ) logger.info(f"Using {len(train_data)} data points for fine-tuning the model: {pred.lm.model}") finetune_kwargs = { "lm": pred.lm, "train_data": train_data, "train_data_format": data_format, "train_kwargs": self.train_kwargs[pred.lm], } key_to_data[training_key] = finetune_kwargs logger.info("Starting LM fine-tuning...") # TODO(feature): We could run batches of fine-tuning jobs in sequence # to avoid exceeding the number of threads. if len(key_to_data) > num_threads: raise ValueError( "BootstrapFinetune requires `num_threads` to be bigger than or equal to the number of fine-tuning " f"jobs. There are {len(key_to_data)} fine-tuning jobs to start, but the number of threads is: " f"{num_threads}! If the `multitask` flag is set to False, the number of fine-tuning jobs will " "be equal to the number of predictors in the student program. If the `multitask` flag is set to True, " "the number of fine-tuning jobs will be equal to: 1 if there is only a context LM, or the number of " "unique LMs attached to the predictors in the student program. In any case, the number of fine-tuning " "jobs will be less than or equal to the number of predictors." ) logger.info(f"{len(key_to_data)} fine-tuning job(s) to start") key_to_lm = self.finetune_lms(key_to_data) logger.info("Updating the student program with the fine-tuned LMs...") for pred_ind, pred in enumerate(student.predictors()): data_pred_ind = None if self.multitask else pred_ind training_key = (pred.lm, data_pred_ind) finetuned_lm = key_to_lm[training_key] if isinstance(finetuned_lm, Exception): raise RuntimeError(f"Finetuned LM for predictor {pred_ind} failed.") from finetuned_lm pred.lm = finetuned_lm # TODO: What should the correct behavior be here? Should # BootstrapFinetune modify the prompt demos according to the # train data? pred.demos = [] if self.exclude_demos else pred.demos logger.info("BootstrapFinetune has finished compiling the student program") student._compiled = True return student @staticmethod def finetune_lms(finetune_dict) -> dict[Any, LM]: num_jobs = len(finetune_dict) logger.info(f"Starting {num_jobs} fine-tuning job(s)...") # TODO(nit) Pass an identifier to the job so that we can tell the logs # coming from different fine-tune threads. key_to_job = {} for key, finetune_kwargs in finetune_dict.items(): lm: LM = finetune_kwargs.pop("lm") # TODO: The following line is a hack. We should re-think how to free # up resources for fine-tuning. This might mean introducing a new # provider method (e.g. prepare_for_finetune) that can be called # before fine-tuning is started. logger.info( "Calling lm.kill() on the LM to be fine-tuned to free up resources. This won't have any effect if the " "LM is not running." ) lm.kill() key_to_job[key] = lm.finetune(**finetune_kwargs) key_to_lm = {} for ind, (key, job) in enumerate(key_to_job.items()): result = job.result() if isinstance(result, Exception): raise result key_to_lm[key] = result job.thread.join() logger.info(f"Job {ind + 1}/{num_jobs} is done") return key_to_lm def _prepare_finetune_data(self, trace_data: list[dict[str, Any]], lm: LM, pred_ind: int | None = None): # TODO(nit) Log dataset details/size; make logs nicer if self.metric: logger.info(f"Collected data for {len(trace_data)} examples") trace_data = [d for d in trace_data if d["score"]] logger.info(f"After filtering with the metric, {len(trace_data)} examples remain") data = [] adapter = self.adapter[lm] or settings.adapter or ChatAdapter() data_format = infer_data_format(adapter) for item in trace_data: for pred_ind, _ in enumerate(item["trace"]): include_data = pred_ind is None or pred_ind == pred_ind if include_data: call_data = build_call_data_from_trace( trace=item["trace"], pred_ind=pred_ind, adapter=adapter, exclude_demos=self.exclude_demos ) data.append(call_data) import random random.Random(0).shuffle(data) return data, data_format # Note: Shared below are useful functions for preparing student/teacher programs # Similar methods are implemented separately and used by other DSPy # teleprompters. These can be moved to shared locations. def build_call_data_from_trace( trace: list[dict], pred_ind: int, adapter: Adapter, exclude_demos: bool = False, ) -> dict[str, list[dict[str, Any]]]: # Find data that's relevant to the predictor pred, inputs, outputs = trace[pred_ind] # assuming that the order is kept demos = [] if exclude_demos else pred.demos call_data = adapter.format_finetune_data( signature=pred.signature, demos=demos, inputs=inputs, outputs=outputs, ) return call_data # # TODO(PR) check with team # def bootstrap_trace_data_one_example( # example: Example, # program: Program, # metric: Optional[Callable] = None # ) -> dict[str, Any]: # # Return a dict with the following keys: # # example, prediction, trace, and score (if metric != None) # with dspy.context(trace=[]): # prediction = program(**example.inputs()) # trace = dspy.settings.trace # score = metric(example, prediction, trace) if metric else None # data_dict = dict( # example=example, # prediction=prediction, # trace=trace, # ) # if metric: # data_dict["score"] = score # return data_dict # Note: Shared below are useful functions for preparing student/teacher programs # Similar methods are implemented separately and used by other DSPy # teleprompters. These can be moved to shared locations. def all_predictors_have_lms(program: Module) -> bool: """Return True if all predictors in the program have an LM set.""" return all(pred.lm for pred in program.predictors()) def copy_program_with_lms(program: Module) -> Module: pred_lms = [pred.lm for pred in program.predictors()] program = program.deepcopy() for ind, pred in enumerate(program.predictors()): pred.lm = pred_lms[ind] return program def prepare_student(student: Module) -> Module: if getattr(student, "_compiled", False): raise ValueError("The student program should not be compiled.") # TODO: Should we use reset_copy here? How would it affect the student # program's predictor LMs, if they are set? # TODO: Should there be a deepcopy here? # student = student.deepcopy() return student def prepare_teacher(student: Module, teacher: Module | None = None) -> Module: if teacher is None: return student # Ensuring that the student and teacher are are structurally equivalent assert_structural_equivalency(student, teacher) # Ensuring that the student and teacher programs do not share predictors assert_no_shared_predictor(student, teacher) return teacher def assert_structural_equivalency(program1: object, program2: object): assert isinstance(program1, Module) assert isinstance(program2, Module) num1 = len(program1.predictors()) num2 = len(program2.predictors()) err = f"Structurally equivalent programs must have the the number of predictors. The number of predictors for the two modules do not match: {num1} != {num2}" assert num1 == num2, err pzip = zip(program1.named_predictors(), program2.named_predictors(), strict=False) for ind, ((name1, pred1), (name2, pred2)) in enumerate(pzip): err = f"Program predictor names must match at corresponding indices for structural equivalency. The predictor names for the programs do not match at index {ind}: '{name1}' != '{name2}'" assert name1 == name2, err assert isinstance(pred1, Predict) assert isinstance(pred2, Predict) def assert_no_shared_predictor(program1: Module, program2: Module): id_to_name1 = {id(p): n for n, p in program1.named_predictors()} id_to_name2 = {id(p): n for n, p in program2.named_predictors()} shared_ids = set(id_to_name1.keys()) & set(id_to_name2.keys()) pred_names = ", ".join(id_to_name1[id] for id in shared_ids) err = f"The programs share the following predictor(s) with each other: {pred_names}" assert not shared_ids, err def get_unique_lms(program: Module) -> list[LM]: lms = [pred.lm for pred in program.predictors()] return list(set(lms)) def launch_lms(program: Module): lms = get_unique_lms(program) for lm in lms: lm.launch() def kill_lms(program: Module): lms = get_unique_lms(program) for lm in lms: lm.kill() ``` -------------------------------------------------------------------------------- /tests/predict/test_react.py: -------------------------------------------------------------------------------- ```python import re import litellm import pytest from pydantic import BaseModel import dspy from dspy.utils.dummies import DummyLM @pytest.mark.extra def test_tool_observation_preserves_custom_type(): pytest.importorskip("PIL.Image") from PIL import Image captured_calls = [] class SpyChatAdapter(dspy.ChatAdapter): def format_user_message_content(self, signature, inputs, *args, **kwargs): captured_calls.append((signature, dict(inputs))) return super().format_user_message_content(signature, inputs, *args, **kwargs) def make_images(): return dspy.Image("https://example.com/test.png"), dspy.Image(Image.new("RGB", (100, 100), "red")) adapter = SpyChatAdapter() lm = DummyLM( [ { "next_thought": "I should call the image tool.", "next_tool_name": "make_images", "next_tool_args": {}, }, { "next_thought": "I now have the image so I can finish.", "next_tool_name": "finish", "next_tool_args": {}, }, {"reasoning": "image ready", "answer": "done"}, ], adapter=adapter, ) dspy.settings.configure(lm=lm, adapter=adapter) react = dspy.ReAct("question -> answer", tools=[make_images]) react(question="Draw me something red") sigs_with_obs = [sig for sig, inputs in captured_calls if "observation_0" in inputs] assert sigs_with_obs, "Expected ReAct to format a trajectory containing observation_0" observation_content = lm.history[1]["messages"][1]["content"] assert sum(1 for part in observation_content if isinstance(part, dict) and part.get("type") == "image_url") == 2 def test_tool_calling_with_pydantic_args(): class CalendarEvent(BaseModel): name: str date: str participants: dict[str, str] def write_invitation_letter(participant_name: str, event_info: CalendarEvent): if participant_name not in event_info.participants: return None return f"It's my honor to invite {participant_name} to event {event_info.name} on {event_info.date}" class InvitationSignature(dspy.Signature): participant_name: str = dspy.InputField(desc="The name of the participant to invite") event_info: CalendarEvent = dspy.InputField(desc="The information about the event") invitation_letter: str = dspy.OutputField(desc="The invitation letter to be sent to the participant") react = dspy.ReAct(InvitationSignature, tools=[write_invitation_letter]) lm = DummyLM( [ { "next_thought": "I need to write an invitation letter for Alice to the Science Fair event.", "next_tool_name": "write_invitation_letter", "next_tool_args": { "participant_name": "Alice", "event_info": { "name": "Science Fair", "date": "Friday", "participants": {"Alice": "female", "Bob": "male"}, }, }, }, { "next_thought": ( "I have successfully written the invitation letter for Alice to the Science Fair. Now " "I can finish the task." ), "next_tool_name": "finish", "next_tool_args": {}, }, { "reasoning": "This is a very rigorous reasoning process, trust me bro!", "invitation_letter": "It's my honor to invite Alice to the Science Fair event on Friday.", }, ] ) dspy.settings.configure(lm=lm) outputs = react( participant_name="Alice", event_info=CalendarEvent( name="Science Fair", date="Friday", participants={"Alice": "female", "Bob": "male"}, ), ) assert outputs.invitation_letter == "It's my honor to invite Alice to the Science Fair event on Friday." expected_trajectory = { "thought_0": "I need to write an invitation letter for Alice to the Science Fair event.", "tool_name_0": "write_invitation_letter", "tool_args_0": { "participant_name": "Alice", "event_info": { "name": "Science Fair", "date": "Friday", "participants": {"Alice": "female", "Bob": "male"}, }, }, "observation_0": "It's my honor to invite Alice to event Science Fair on Friday", "thought_1": "I have successfully written the invitation letter for Alice to the Science Fair. Now I can finish the task.", "tool_name_1": "finish", "tool_args_1": {}, "observation_1": "Completed.", } assert outputs.trajectory == expected_trajectory def test_tool_calling_without_typehint(): def foo(a, b): """Add two numbers.""" return a + b react = dspy.ReAct("a, b -> c:int", tools=[foo]) lm = DummyLM( [ {"next_thought": "I need to add two numbers.", "next_tool_name": "foo", "next_tool_args": {"a": 1, "b": 2}}, {"next_thought": "I have the sum, now I can finish.", "next_tool_name": "finish", "next_tool_args": {}}, {"reasoning": "I added the numbers successfully", "c": 3}, ] ) dspy.settings.configure(lm=lm) outputs = react(a=1, b=2) expected_trajectory = { "thought_0": "I need to add two numbers.", "tool_name_0": "foo", "tool_args_0": { "a": 1, "b": 2, }, "observation_0": 3, "thought_1": "I have the sum, now I can finish.", "tool_name_1": "finish", "tool_args_1": {}, "observation_1": "Completed.", } assert outputs.trajectory == expected_trajectory def test_trajectory_truncation(): # Create a simple tool for testing def echo(text: str) -> str: return f"Echoed: {text}" # Create ReAct instance with our echo tool react = dspy.ReAct("input_text -> output_text", tools=[echo]) # Mock react.react to simulate multiple tool calls call_count = 0 def mock_react(**kwargs): nonlocal call_count call_count += 1 if call_count < 3: # First 2 calls use the echo tool return dspy.Prediction( next_thought=f"Thought {call_count}", next_tool_name="echo", next_tool_args={"text": f"Text {call_count}"}, ) elif call_count == 3: # The 3rd call raises context window exceeded error raise litellm.ContextWindowExceededError("Context window exceeded", "dummy_model", "dummy_provider") else: # The 4th call finishes return dspy.Prediction(next_thought="Final thought", next_tool_name="finish", next_tool_args={}) react.react = mock_react react.extract = lambda **kwargs: dspy.Prediction(output_text="Final output") # Call forward and get the result result = react(input_text="test input") # Verify that older entries in the trajectory were truncated assert "thought_0" not in result.trajectory assert "thought_2" in result.trajectory assert result.output_text == "Final output" def test_error_retry(): # --- a tiny tool that always fails ------------------------------------- def foo(a, b): raise Exception("tool error") # --- program under test ------------------------------------------------- react = dspy.ReAct("a, b -> c:int", tools=[foo]) lm = DummyLM( [ { "next_thought": "I need to add two numbers.", "next_tool_name": "foo", "next_tool_args": {"a": 1, "b": 2}, }, { "next_thought": "I need to add two numbers.", "next_tool_name": "foo", "next_tool_args": {"a": 1, "b": 2}, }, # (The model *would* succeed on the 3rd turn, but max_iters=2 stops earlier.) {"reasoning": "I added the numbers successfully", "c": 3}, ] ) dspy.settings.configure(lm=lm) outputs = react(a=1, b=2, max_iters=2) traj = outputs.trajectory # --- exact-match checks (thoughts + tool calls) ------------------------- control_expected = { "thought_0": "I need to add two numbers.", "tool_name_0": "foo", "tool_args_0": {"a": 1, "b": 2}, "thought_1": "I need to add two numbers.", "tool_name_1": "foo", "tool_args_1": {"a": 1, "b": 2}, } for k, v in control_expected.items(): assert traj[k] == v, f"{k} mismatch" # --- flexible checks for observations ---------------------------------- # We only care that each observation mentions our error string; we ignore # any extra traceback detail or differing prefixes. for i in range(2): obs = traj[f"observation_{i}"] assert re.search(r"\btool error\b", obs), f"unexpected observation_{i!r}: {obs}" @pytest.mark.asyncio async def test_async_tool_calling_with_pydantic_args(): class CalendarEvent(BaseModel): name: str date: str participants: dict[str, str] async def write_invitation_letter(participant_name: str, event_info: CalendarEvent): if participant_name not in event_info.participants: return None return f"It's my honor to invite {participant_name} to event {event_info.name} on {event_info.date}" class InvitationSignature(dspy.Signature): participant_name: str = dspy.InputField(desc="The name of the participant to invite") event_info: CalendarEvent = dspy.InputField(desc="The information about the event") invitation_letter: str = dspy.OutputField(desc="The invitation letter to be sent to the participant") react = dspy.ReAct(InvitationSignature, tools=[write_invitation_letter]) lm = DummyLM( [ { "next_thought": "I need to write an invitation letter for Alice to the Science Fair event.", "next_tool_name": "write_invitation_letter", "next_tool_args": { "participant_name": "Alice", "event_info": { "name": "Science Fair", "date": "Friday", "participants": {"Alice": "female", "Bob": "male"}, }, }, }, { "next_thought": ( "I have successfully written the invitation letter for Alice to the Science Fair. Now " "I can finish the task." ), "next_tool_name": "finish", "next_tool_args": {}, }, { "reasoning": "This is a very rigorous reasoning process, trust me bro!", "invitation_letter": "It's my honor to invite Alice to the Science Fair event on Friday.", }, ] ) with dspy.context(lm=lm): outputs = await react.acall( participant_name="Alice", event_info=CalendarEvent( name="Science Fair", date="Friday", participants={"Alice": "female", "Bob": "male"}, ), ) assert outputs.invitation_letter == "It's my honor to invite Alice to the Science Fair event on Friday." expected_trajectory = { "thought_0": "I need to write an invitation letter for Alice to the Science Fair event.", "tool_name_0": "write_invitation_letter", "tool_args_0": { "participant_name": "Alice", "event_info": { "name": "Science Fair", "date": "Friday", "participants": {"Alice": "female", "Bob": "male"}, }, }, "observation_0": "It's my honor to invite Alice to event Science Fair on Friday", "thought_1": "I have successfully written the invitation letter for Alice to the Science Fair. Now I can finish the task.", "tool_name_1": "finish", "tool_args_1": {}, "observation_1": "Completed.", } assert outputs.trajectory == expected_trajectory @pytest.mark.asyncio async def test_async_error_retry(): # A tiny tool that always fails async def foo(a, b): raise Exception("tool error") react = dspy.ReAct("a, b -> c:int", tools=[foo]) lm = DummyLM( [ { "next_thought": "I need to add two numbers.", "next_tool_name": "foo", "next_tool_args": {"a": 1, "b": 2}, }, { "next_thought": "I need to add two numbers.", "next_tool_name": "foo", "next_tool_args": {"a": 1, "b": 2}, }, # (The model *would* succeed on the 3rd turn, but max_iters=2 stops earlier.) {"reasoning": "I added the numbers successfully", "c": 3}, ] ) with dspy.context(lm=lm): outputs = await react.acall(a=1, b=2, max_iters=2) traj = outputs.trajectory # Exact-match checks (thoughts + tool calls) control_expected = { "thought_0": "I need to add two numbers.", "tool_name_0": "foo", "tool_args_0": {"a": 1, "b": 2}, "thought_1": "I need to add two numbers.", "tool_name_1": "foo", "tool_args_1": {"a": 1, "b": 2}, } for k, v in control_expected.items(): assert traj[k] == v, f"{k} mismatch" # Flexible checks for observations # We only care that each observation mentions our error string; we ignore # any extra traceback detail or differing prefixes. for i in range(2): obs = traj[f"observation_{i}"] assert re.search(r"\btool error\b", obs), f"unexpected observation_{i!r}: {obs}" ``` -------------------------------------------------------------------------------- /dspy/utils/callback.py: -------------------------------------------------------------------------------- ```python import functools import inspect import logging import uuid from contextvars import ContextVar from typing import Any, Callable import dspy ACTIVE_CALL_ID = ContextVar("active_call_id", default=None) logger = logging.getLogger(__name__) class BaseCallback: """A base class for defining callback handlers for DSPy components. To use a callback, subclass this class and implement the desired handlers. Each handler will be called at the appropriate time before/after the execution of the corresponding component. For example, if you want to print a message before and after an LM is called, implement `the on_llm_start` and `on_lm_end` handler. Users can set the callback globally using `dspy.settings.configure` or locally by passing it to the component constructor. Example 1: Set a global callback using `dspy.settings.configure`. ``` import dspy from dspy.utils.callback import BaseCallback class LoggingCallback(BaseCallback): def on_lm_start(self, call_id, instance, inputs): print(f"LM is called with inputs: {inputs}") def on_lm_end(self, call_id, outputs, exception): print(f"LM is finished with outputs: {outputs}") dspy.settings.configure( callbacks=[LoggingCallback()] ) cot = dspy.ChainOfThought("question -> answer") cot(question="What is the meaning of life?") # > LM is called with inputs: {'question': 'What is the meaning of life?'} # > LM is finished with outputs: {'answer': '42'} ``` Example 2: Set a local callback by passing it to the component constructor. ``` lm_1 = dspy.LM("gpt-3.5-turbo", callbacks=[LoggingCallback()]) lm_1(question="What is the meaning of life?") # > LM is called with inputs: {'question': 'What is the meaning of life?'} # > LM is finished with outputs: {'answer': '42'} lm_2 = dspy.LM("gpt-3.5-turbo") lm_2(question="What is the meaning of life?") # No logging here because only `lm_1` has the callback set. ``` """ def on_module_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): """A handler triggered when forward() method of a module (subclass of dspy.Module) is called. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. instance: The Module instance. inputs: The inputs to the module's forward() method. Each arguments is stored as a key-value pair in a dictionary. """ pass def on_module_end( self, call_id: str, outputs: Any | None, exception: Exception | None = None, ): """A handler triggered after forward() method of a module (subclass of dspy.Module) is executed. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. outputs: The outputs of the module's forward() method. If the method is interrupted by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ pass def on_lm_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): """A handler triggered when __call__ method of dspy.LM instance is called. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. instance: The LM instance. inputs: The inputs to the LM's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ pass def on_lm_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after __call__ method of dspy.LM instance is executed. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. outputs: The outputs of the LM's __call__ method. If the method is interrupted by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ pass def on_adapter_format_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): """A handler triggered when format() method of an adapter (subclass of dspy.Adapter) is called. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. instance: The Adapter instance. inputs: The inputs to the Adapter's format() method. Each arguments is stored as a key-value pair in a dictionary. """ pass def on_adapter_format_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after format() method of an adapter (subclass of dspy.Adapter) is called.. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. outputs: The outputs of the Adapter's format() method. If the method is interrupted by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ pass def on_adapter_parse_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): """A handler triggered when parse() method of an adapter (subclass of dspy.Adapter) is called. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. instance: The Adapter instance. inputs: The inputs to the Adapter's parse() method. Each arguments is stored as a key-value pair in a dictionary. """ pass def on_adapter_parse_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after parse() method of an adapter (subclass of dspy.Adapter) is called. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. outputs: The outputs of the Adapter's parse() method. If the method is interrupted by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ pass def on_tool_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): """A handler triggered when a tool is called. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. instance: The Tool instance. inputs: The inputs to the Tool's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ pass def on_tool_end( self, call_id: str, outputs: dict[str, Any] | None, exception: Exception | None = None, ): """A handler triggered after a tool is executed. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. outputs: The outputs of the Tool's __call__ method. If the method is interrupted by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ pass def on_evaluate_start( self, call_id: str, instance: Any, inputs: dict[str, Any], ): """A handler triggered when evaluation is started. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. instance: The Evaluate instance. inputs: The inputs to the Evaluate's __call__ method. Each arguments is stored as a key-value pair in a dictionary. """ pass def on_evaluate_end( self, call_id: str, outputs: Any | None, exception: Exception | None = None, ): """A handler triggered after evaluation is executed. Args: call_id: A unique identifier for the call. Can be used to connect start/end handlers. outputs: The outputs of the Evaluate's __call__ method. If the method is interrupted by an exception, this will be None. exception: If an exception is raised during the execution, it will be stored here. """ pass def with_callbacks(fn): """Decorator to add callback functionality to instance methods.""" def _execute_start_callbacks(instance, fn, call_id, callbacks, args, kwargs): """Execute all start callbacks for a function call.""" inputs = inspect.getcallargs(fn, instance, *args, **kwargs) if "self" in inputs: inputs.pop("self") elif "instance" in inputs: inputs.pop("instance") for callback in callbacks: try: _get_on_start_handler(callback, instance, fn)(call_id=call_id, instance=instance, inputs=inputs) except Exception as e: logger.warning(f"Error when calling callback {callback}: {e}") def _execute_end_callbacks(instance, fn, call_id, results, exception, callbacks): """Execute all end callbacks for a function call.""" for callback in callbacks: try: _get_on_end_handler(callback, instance, fn)( call_id=call_id, outputs=results, exception=exception, ) except Exception as e: logger.warning(f"Error when applying callback {callback}'s end handler on function {fn.__name__}: {e}.") def _get_active_callbacks(instance): """Get combined global and instance-level callbacks.""" return dspy.settings.get("callbacks", []) + getattr(instance, "callbacks", []) if inspect.iscoroutinefunction(fn): @functools.wraps(fn) async def async_wrapper(instance, *args, **kwargs): callbacks = _get_active_callbacks(instance) if not callbacks: return await fn(instance, *args, **kwargs) call_id = uuid.uuid4().hex _execute_start_callbacks(instance, fn, call_id, callbacks, args, kwargs) # Active ID must be set right before the function is called, not before calling the callbacks. parent_call_id = ACTIVE_CALL_ID.get() ACTIVE_CALL_ID.set(call_id) results = None exception = None try: results = await fn(instance, *args, **kwargs) return results except Exception as e: exception = e raise exception finally: ACTIVE_CALL_ID.set(parent_call_id) _execute_end_callbacks(instance, fn, call_id, results, exception, callbacks) return async_wrapper else: @functools.wraps(fn) def sync_wrapper(instance, *args, **kwargs): callbacks = _get_active_callbacks(instance) if not callbacks: return fn(instance, *args, **kwargs) call_id = uuid.uuid4().hex _execute_start_callbacks(instance, fn, call_id, callbacks, args, kwargs) # Active ID must be set right before the function is called, not before calling the callbacks. parent_call_id = ACTIVE_CALL_ID.get() ACTIVE_CALL_ID.set(call_id) results = None exception = None try: results = fn(instance, *args, **kwargs) return results except Exception as e: exception = e raise exception finally: ACTIVE_CALL_ID.set(parent_call_id) _execute_end_callbacks(instance, fn, call_id, results, exception, callbacks) return sync_wrapper def _get_on_start_handler(callback: BaseCallback, instance: Any, fn: Callable) -> Callable: """Selects the appropriate on_start handler of the callback based on the instance and function name.""" if isinstance(instance, dspy.LM): return callback.on_lm_start elif isinstance(instance, dspy.Evaluate): return callback.on_evaluate_start if isinstance(instance, dspy.Adapter): if fn.__name__ == "format": return callback.on_adapter_format_start elif fn.__name__ == "parse": return callback.on_adapter_parse_start else: raise ValueError(f"Unsupported adapter method for using callback: {fn.__name__}.") if isinstance(instance, dspy.Tool): return callback.on_tool_start # We treat everything else as a module. return callback.on_module_start def _get_on_end_handler(callback: BaseCallback, instance: Any, fn: Callable) -> Callable: """Selects the appropriate on_end handler of the callback based on the instance and function name.""" if isinstance(instance, (dspy.LM)): return callback.on_lm_end elif isinstance(instance, dspy.Evaluate): return callback.on_evaluate_end if isinstance(instance, (dspy.Adapter)): if fn.__name__ == "format": return callback.on_adapter_format_end elif fn.__name__ == "parse": return callback.on_adapter_parse_end else: raise ValueError(f"Unsupported adapter method for using callback: {fn.__name__}.") if isinstance(instance, dspy.Tool): return callback.on_tool_end # We treat everything else as a module. return callback.on_module_end ``` -------------------------------------------------------------------------------- /dspy/teleprompt/gepa/gepa_utils.py: -------------------------------------------------------------------------------- ```python import logging import random from typing import Any, Callable, Protocol, TypedDict from gepa import EvaluationBatch, GEPAAdapter from gepa.core.adapter import ProposalFn import dspy from dspy.adapters.chat_adapter import ChatAdapter from dspy.adapters.types import History from dspy.adapters.types.base_type import Type from dspy.evaluate import Evaluate from dspy.primitives import Example, Prediction from dspy.teleprompt.bootstrap_trace import TraceData logger = logging.getLogger(__name__) class LoggerAdapter: def __init__(self, logger: logging.Logger): self.logger = logger def log(self, x: str): self.logger.info(x) DSPyTrace = list[tuple[Any, dict[str, Any], Prediction]] class ReflectiveExample(TypedDict): """ Structure of individual examples in the reflective dataset. Each example contains the predictor inputs, generated outputs, and feedback from evaluation. """ Inputs: dict[str, Any] # Predictor inputs (may include str, dspy.Image, etc.) Generated_Outputs: dict[str, Any] | str # Success: dict with output fields, Failure: error message string Feedback: str # Always a string - from metric function or parsing error message class ScoreWithFeedback(Prediction): score: float feedback: str class PredictorFeedbackFn(Protocol): def __call__( predictor_output: dict[str, Any], predictor_inputs: dict[str, Any], module_inputs: Example, module_outputs: Prediction, captured_trace: DSPyTrace, ) -> ScoreWithFeedback: """ This function is used to provide feedback to a specific predictor. The function is called with the following arguments: - predictor_output: The output of the predictor. - predictor_inputs: The inputs to the predictor. - module_inputs: The inputs to the whole program --- `Example`. - module_outputs: The outputs of the whole program --- `Prediction`. - captured_trace: The trace of the module's execution. # Shape of trace is: [predictor_invocation_idx -> Tuple[Predictor, PredictorInputs, Prediction]] # Each trace is a tuple of (Predictor, PredictorInputs, Prediction) The function should return a `ScoreWithFeedback` object. The feedback is a string that is used to guide the evolution of the predictor. """ ... class DspyAdapter(GEPAAdapter[Example, TraceData, Prediction]): def __init__( self, student_module, metric_fn: Callable, feedback_map: dict[str, Callable], failure_score=0.0, num_threads: int | None = None, add_format_failure_as_feedback: bool = False, rng: random.Random | None = None, reflection_lm=None, custom_instruction_proposer: "ProposalFn | None" = None, warn_on_score_mismatch: bool = True ): self.student = student_module self.metric_fn = metric_fn self.feedback_map = feedback_map self.failure_score = failure_score self.num_threads = num_threads self.add_format_failure_as_feedback = add_format_failure_as_feedback self.rng = rng or random.Random(0) self.reflection_lm = reflection_lm self.custom_instruction_proposer = custom_instruction_proposer self.warn_on_score_mismatch = warn_on_score_mismatch if self.custom_instruction_proposer is not None: # We are only overriding the propose_new_texts method when a custom # instruction proposer is provided. Otherwise, we use the GEPA # default propose_new_texts. def custom_propose_new_texts( candidate: dict[str, str], reflective_dataset: dict[str, list[dict[str, Any]]], components_to_update: list[str] ) -> dict[str, str]: if self.reflection_lm is not None: with dspy.context(lm=self.reflection_lm): return self.custom_instruction_proposer( candidate=candidate, reflective_dataset=reflective_dataset, components_to_update=components_to_update ) else: return self.custom_instruction_proposer( candidate=candidate, reflective_dataset=reflective_dataset, components_to_update=components_to_update ) self.propose_new_texts = custom_propose_new_texts # Cache predictor names/signatures self.named_predictors = list(self.student.named_predictors()) def build_program(self, candidate: dict[str, str]): new_prog = self.student.deepcopy() for name, pred in new_prog.named_predictors(): if name in candidate: pred.signature = pred.signature.with_instructions(candidate[name]) return new_prog def evaluate(self, batch, candidate, capture_traces=False): program = self.build_program(candidate) if capture_traces: # bootstrap_trace_data-like flow with trace capture from dspy.teleprompt import bootstrap_trace as bootstrap_trace_module eval_callback_metadata = {"disable_logging": True} trajs = bootstrap_trace_module.bootstrap_trace_data( program=program, dataset=batch, metric=self.metric_fn, num_threads=self.num_threads, raise_on_error=False, capture_failed_parses=True, failure_score=self.failure_score, format_failure_score=self.failure_score, callback_metadata=eval_callback_metadata, ) scores = [] outputs = [] for t in trajs: outputs.append(t["prediction"]) if hasattr(t["prediction"], "__class__") and t.get("score") is None: scores.append(self.failure_score) else: score = t["score"] if hasattr(score, "score"): score = score["score"] scores.append(score) return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajs) else: evaluator = Evaluate( devset=batch, metric=self.metric_fn, num_threads=self.num_threads, return_all_scores=True, failure_score=self.failure_score, provide_traceback=True, max_errors=len(batch) * 100 ) res = evaluator(program) outputs = [r[1] for r in res.results] scores = [r[2] for r in res.results] scores = [s["score"] if hasattr(s, "score") else s for s in scores] return EvaluationBatch(outputs=outputs, scores=scores, trajectories=None) def make_reflective_dataset(self, candidate, eval_batch, components_to_update) -> dict[str, list[ReflectiveExample]]: from dspy.teleprompt.bootstrap_trace import FailedPrediction program = self.build_program(candidate) ret_d: dict[str, list[ReflectiveExample]] = {} for pred_name in components_to_update: module = None for name, m in program.named_predictors(): if name == pred_name: module = m break assert module is not None items: list[ReflectiveExample] = [] for data in eval_batch.trajectories or []: trace = data["trace"] example = data["example"] prediction = data["prediction"] module_score = data["score"] if hasattr(module_score, "score"): module_score = module_score["score"] trace_instances = [t for t in trace if t[0].signature.equals(module.signature)] if not self.add_format_failure_as_feedback: trace_instances = [t for t in trace_instances if not isinstance(t[2], FailedPrediction)] if len(trace_instances) == 0: continue selected = None for t in trace_instances: if isinstance(t[2], FailedPrediction): selected = t break if selected is None: if isinstance(prediction, FailedPrediction): continue selected = self.rng.choice(trace_instances) inputs = selected[1] outputs = selected[2] new_inputs = {} new_outputs = {} contains_history = False history_key_name = None for input_key, input_val in inputs.items(): if isinstance(input_val, History): contains_history = True assert history_key_name is None history_key_name = input_key if contains_history: s = "```json\n" for i, message in enumerate(inputs[history_key_name].messages): s += f" {i}: {message}\n" s += "```" new_inputs["Context"] = s for input_key, input_val in inputs.items(): if contains_history and input_key == history_key_name: continue if isinstance(input_val, Type) and self.custom_instruction_proposer is not None: # Keep original object - will be properly formatted when sent to reflection LM new_inputs[input_key] = input_val else: new_inputs[input_key] = str(input_val) if isinstance(outputs, FailedPrediction): s = "Couldn't parse the output as per the expected output format. The model's raw response was:\n" s += "```\n" s += outputs.completion_text + "\n" s += "```\n\n" new_outputs = s else: for output_key, output_val in outputs.items(): new_outputs[output_key] = str(output_val) d = {"Inputs": new_inputs, "Generated Outputs": new_outputs} if isinstance(outputs, FailedPrediction): adapter = ChatAdapter() structure_instruction = "" for dd in adapter.format(module.signature, [], {}): structure_instruction += dd["role"] + ": " + dd["content"] + "\n" d["Feedback"] = "Your output failed to parse. Follow this structure:\n" + structure_instruction # d['score'] = self.failure_score else: feedback_fn = self.feedback_map[pred_name] fb = feedback_fn( predictor_output=outputs, predictor_inputs=inputs, module_inputs=example, module_outputs=prediction, captured_trace=trace, ) d["Feedback"] = fb["feedback"] if fb["score"] != module_score: if self.warn_on_score_mismatch: logger.warning("The score returned by the metric with pred_name is different from the overall metric score. This can indicate 2 things: Either the metric is non-deterministic (e.g., LLM-as-judge, Semantic score, etc.) or the metric returned a score specific to pred_name that differs from the module level score. Currently, GEPA does not support predictor level scoring (support coming soon), and only requires a feedback text to be provided, which can be specific to the predictor or program level. GEPA will ignore the differing score returned, and instead use module level score. You can safely ignore this warning if using a semantic metric, however, if this mismatch is caused due to predictor scoring, please return module-level scores. To disable this warning, set warn_on_score_mismatch=False.") self.warn_on_score_mismatch = False fb["score"] = module_score items.append(d) if len(items) == 0: # raise Exception(f"No valid predictions found for module {module.signature}.") continue ret_d[pred_name] = items if len(ret_d) == 0: raise Exception("No valid predictions found for any module.") return ret_d # TODO: The current DSPyAdapter implementation uses the GEPA default propose_new_texts. # We can potentially override this, to use the instruction proposal similar to MIPROv2. # def propose_new_texts( # self, # candidate: Dict[str, str], # reflective_dataset: Dict[str, List[Dict[str, Any]]], # components_to_update: List[str] # ) -> Dict[str, str]: # if self.adapter.propose_new_texts is not None: # return self.adapter.propose_new_texts(candidate, reflective_dataset, components_to_update) # from .instruction_proposal import InstructionProposalSignature # new_texts: Dict[str, str] = {} # for name in components_to_update: # base_instruction = candidate[name] # dataset_with_feedback = reflective_dataset[name] # new_texts[name] = InstructionProposalSignature.run( # lm=self.reflection_lm, # input_dict={ # "current_instruction_doc": base_instruction, # "dataset_with_feedback": dataset_with_feedback # } # )['new_instruction'] # return new_texts ``` -------------------------------------------------------------------------------- /dspy/teleprompt/gepa/instruction_proposal.py: -------------------------------------------------------------------------------- ```python from typing import Any from gepa.core.adapter import ProposalFn import dspy from dspy.adapters.types.base_type import Type from dspy.teleprompt.gepa.gepa_utils import ReflectiveExample class GenerateEnhancedMultimodalInstructionFromFeedback(dspy.Signature): """I provided an assistant with instructions to perform a task involving visual content, but the assistant's performance needs improvement based on the examples and feedback below. Your task is to write a better instruction for the assistant that addresses the specific issues identified in the feedback, with particular attention to how visual and textual information should be analyzed and integrated. ## Analysis Steps: 1. **Read the inputs carefully** and identify both the visual and textual input formats, understanding how they work together 2. **Read all the assistant responses and corresponding feedback** to understand what went wrong with visual analysis, text processing, or their integration 3. **Identify visual analysis patterns** - what visual features, relationships, or details are important for this task 4. **Identify domain-specific knowledge** about both visual and textual aspects, as this information may not be available to the assistant in the future 5. **Look for successful visual-textual integration strategies** and include these patterns in the instruction 6. **Address specific visual analysis issues** mentioned in the feedback ## Instruction Requirements: - **Clear task definition** explaining how to process both visual and textual inputs - **Visual analysis guidance** specific to this task (what to look for, how to describe, what features matter) - **Integration strategies** for combining visual observations with textual information - **Domain-specific knowledge** about visual concepts, terminology, or relationships - **Error prevention guidance** for common visual analysis mistakes shown in the feedback - **Precise, actionable language** for both visual and textual processing Focus on creating an instruction that helps the assistant properly analyze visual content, integrate it with textual information, and avoid the specific visual analysis mistakes shown in the examples.""" current_instruction = dspy.InputField( desc="The current instruction that was provided to the assistant to perform the multimodal task" ) examples_with_feedback = dspy.InputField( desc="Task examples with visual content showing inputs, assistant outputs, and feedback. " "Pay special attention to feedback about visual analysis accuracy, visual-textual integration, " "and any domain-specific visual knowledge that the assistant missed." ) improved_instruction = dspy.OutputField( desc="A better instruction for the assistant that addresses visual analysis issues, provides " "clear guidance on how to process and integrate visual and textual information, includes " "necessary visual domain knowledge, and prevents the visual analysis mistakes shown in the examples." ) class SingleComponentMultiModalProposer(dspy.Module): """ dspy.Module for proposing improved instructions based on feedback. """ def __init__(self): super().__init__() self.propose_instruction = dspy.Predict(GenerateEnhancedMultimodalInstructionFromFeedback) def forward(self, current_instruction: str, reflective_dataset: list[ReflectiveExample]) -> str: """ Generate an improved instruction based on current instruction and feedback examples. Args: current_instruction: The current instruction that needs improvement reflective_dataset: List of examples with inputs, outputs, and feedback May contain dspy.Image objects in inputs Returns: str: Improved instruction text """ # Format examples with enhanced pattern recognition formatted_examples, image_map = self._format_examples_with_pattern_analysis(reflective_dataset) # Build kwargs for the prediction call predict_kwargs = { "current_instruction": current_instruction, "examples_with_feedback": formatted_examples, } # Create a rich multimodal examples_with_feedback that includes both text and images predict_kwargs["examples_with_feedback"] = self._create_multimodal_examples(formatted_examples, image_map) # Use current dspy LM settings (GEPA will pass reflection_lm via context) result = self.propose_instruction(**predict_kwargs) return result.improved_instruction def _format_examples_with_pattern_analysis( self, reflective_dataset: list[ReflectiveExample] ) -> tuple[str, dict[int, list[Type]]]: """ Format examples with pattern analysis and feedback categorization. Returns: tuple: (formatted_text_with_patterns, image_map) """ # First, use the existing proven formatting approach formatted_examples, image_map = self._format_examples_for_instruction_generation(reflective_dataset) # Enhanced analysis: categorize feedback patterns feedback_analysis = self._analyze_feedback_patterns(reflective_dataset) # Add pattern analysis to the formatted examples if feedback_analysis["summary"]: pattern_summary = self._create_pattern_summary(feedback_analysis) enhanced_examples = f"{pattern_summary}\n\n{formatted_examples}" return enhanced_examples, image_map return formatted_examples, image_map def _analyze_feedback_patterns(self, reflective_dataset: list[ReflectiveExample]) -> dict[str, Any]: """ Analyze feedback patterns to provide better context for instruction generation. Categorizes feedback into: - Error patterns: Common mistakes and their types - Success patterns: What worked well and should be preserved/emphasized - Domain knowledge gaps: Missing information that should be included - Task-specific guidance: Specific requirements or edge cases """ analysis = { "error_patterns": [], "success_patterns": [], "domain_knowledge_gaps": [], "task_specific_guidance": [], "summary": "", } # Simple pattern recognition - could be enhanced further for example in reflective_dataset: feedback = example.get("Feedback", "").lower() # Identify error patterns if any(error_word in feedback for error_word in ["incorrect", "wrong", "error", "failed", "missing"]): analysis["error_patterns"].append(feedback) # Identify success patterns if any( success_word in feedback for success_word in ["correct", "good", "accurate", "well", "successfully"] ): analysis["success_patterns"].append(feedback) # Identify domain knowledge needs if any( knowledge_word in feedback for knowledge_word in ["should know", "domain", "specific", "context", "background"] ): analysis["domain_knowledge_gaps"].append(feedback) # Create summary if patterns were found if any(analysis[key] for key in ["error_patterns", "success_patterns", "domain_knowledge_gaps"]): analysis["summary"] = ( f"Patterns identified: {len(analysis['error_patterns'])} error(s), {len(analysis['success_patterns'])} success(es), {len(analysis['domain_knowledge_gaps'])} knowledge gap(s)" ) return analysis def _create_pattern_summary(self, feedback_analysis: dict[str, Any]) -> str: """Create a summary of feedback patterns to help guide instruction generation.""" summary_parts = ["## Feedback Pattern Analysis\n"] if feedback_analysis["error_patterns"]: summary_parts.append(f"**Common Issues Found ({len(feedback_analysis['error_patterns'])} examples):**") summary_parts.append("Focus on preventing these types of mistakes in the new instruction.\n") if feedback_analysis["success_patterns"]: summary_parts.append( f"**Successful Approaches Found ({len(feedback_analysis['success_patterns'])} examples):**" ) summary_parts.append("Build on these successful strategies in the new instruction.\n") if feedback_analysis["domain_knowledge_gaps"]: summary_parts.append( f"**Domain Knowledge Needs Identified ({len(feedback_analysis['domain_knowledge_gaps'])} examples):**" ) summary_parts.append("Include this specialized knowledge in the new instruction.\n") return "\n".join(summary_parts) def _format_examples_for_instruction_generation( self, reflective_dataset: list[ReflectiveExample] ) -> tuple[str, dict[int, list[Type]]]: """ Format examples using GEPA's markdown structure while preserving image objects. Returns: tuple: (formatted_text, image_map) where image_map maps example_index -> list[images] """ def render_value_with_images(value, level=3, example_images=None): if example_images is None: example_images = [] if isinstance(value, Type): image_idx = len(example_images) + 1 example_images.append(value) return f"[IMAGE-{image_idx} - see visual content]\n\n" elif isinstance(value, dict): s = "" for k, v in value.items(): s += f"{'#' * level} {k}\n" s += render_value_with_images(v, min(level + 1, 6), example_images) if not value: s += "\n" return s elif isinstance(value, (list, tuple)): s = "" for i, item in enumerate(value): s += f"{'#' * level} Item {i + 1}\n" s += render_value_with_images(item, min(level + 1, 6), example_images) if not value: s += "\n" return s else: return f"{str(value).strip()}\n\n" def convert_sample_to_markdown_with_images(sample, example_num): example_images = [] s = f"# Example {example_num}\n" for key, val in sample.items(): s += f"## {key}\n" s += render_value_with_images(val, level=3, example_images=example_images) return s, example_images formatted_parts = [] image_map = {} for i, example_data in enumerate(reflective_dataset): formatted_example, example_images = convert_sample_to_markdown_with_images(example_data, i + 1) formatted_parts.append(formatted_example) if example_images: image_map[i] = example_images formatted_text = "\n\n".join(formatted_parts) if image_map: total_images = sum(len(imgs) for imgs in image_map.values()) formatted_text = ( f"The examples below include visual content ({total_images} images total). " "Please analyze both the text and visual elements when suggesting improvements.\n\n" + formatted_text ) return formatted_text, image_map def _create_multimodal_examples(self, formatted_text: str, image_map: dict[int, list[Type]]) -> Any: """ Create a multimodal input that contains both text and images for the reflection LM. Args: formatted_text: The formatted text with image placeholders image_map: Dictionary mapping example_index -> list[images] for structured access """ if not image_map: return formatted_text # Collect all images from all examples all_images = [] for example_images in image_map.values(): all_images.extend(example_images) multimodal_content = [formatted_text] multimodal_content.extend(all_images) return multimodal_content class MultiModalInstructionProposer(ProposalFn): """GEPA-compatible multimodal instruction proposer. This class handles multimodal inputs (like dspy.Image) during GEPA optimization by using a single-component proposer for each component that needs to be updated. """ def __init__(self): self.single_proposer = SingleComponentMultiModalProposer() def __call__( self, candidate: dict[str, str], reflective_dataset: dict[str, list[ReflectiveExample]], components_to_update: list[str], ) -> dict[str, str]: """GEPA-compatible proposal function. Args: candidate: Current component name -> instruction mapping reflective_dataset: Component name -> list of reflective examples components_to_update: List of component names to update Returns: dict: Component name -> new instruction mapping """ updated_components = {} for component_name in components_to_update: if component_name in candidate and component_name in reflective_dataset: current_instruction = candidate[component_name] component_reflective_data = reflective_dataset[component_name] # Call the single-instruction proposer. # # In the future, proposals could consider multiple components instructions, # instead of just the current instruction, for more holistic instruction proposals. new_instruction = self.single_proposer( current_instruction=current_instruction, reflective_dataset=component_reflective_data ) updated_components[component_name] = new_instruction return updated_components ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/7-assertions.md: -------------------------------------------------------------------------------- ```markdown # DSPy Assertions !!! warning "Assertions are deprecated and NOT supported. Please use the `dspy.Refine` module instead. (or dspy.Suggest)." The content below is deprecated, and is scheduled to be removed. ## Introduction Language models (LMs) have transformed how we interact with machine learning, offering vast capabilities in natural language understanding and generation. However, ensuring these models adhere to domain-specific constraints remains a challenge. Despite the growth of techniques like fine-tuning or “prompt engineering”, these approaches are extremely tedious and rely on heavy, manual hand-waving to guide the LMs in adhering to specific constraints. Even DSPy's modularity of programming prompting pipelines lacks mechanisms to effectively and automatically enforce these constraints. To address this, we introduce DSPy Assertions, a feature within the DSPy framework designed to automate the enforcement of computational constraints on LMs. DSPy Assertions empower developers to guide LMs towards desired outcomes with minimal manual intervention, enhancing the reliability, predictability, and correctness of LM outputs. ### dspy.Assert and dspy.Suggest API We introduce two primary constructs within DSPy Assertions: - **`dspy.Assert`**: - **Parameters**: - `constraint (bool)`: Outcome of Python-defined boolean validation check. - `msg (Optional[str])`: User-defined error message providing feedback or correction guidance. - `backtrack (Optional[module])`: Specifies target module for retry attempts upon constraint failure. The default backtracking module is the last module before the assertion. - **Behavior**: Initiates retry upon failure, dynamically adjusting the pipeline's execution. If failures persist, it halts execution and raises a `dspy.AssertionError`. - **`dspy.Suggest`**: - **Parameters**: Similar to `dspy.Assert`. - **Behavior**: Encourages self-refinement through retries without enforcing hard stops. Logs failures after maximum backtracking attempts and continues execution. - **dspy.Assert vs. Python Assertions**: Unlike conventional Python `assert` statements that terminate the program upon failure, `dspy.Assert` conducts a sophisticated retry mechanism, allowing the pipeline to adjust. Specifically, when a constraint is not met: - Backtracking Mechanism: An under-the-hood backtracking is initiated, offering the model a chance to self-refine and proceed, which is done through signature modification. - Dynamic Signature Modification: internally modifying your DSPy program’s Signature by adding the following fields: - Past Output: your model's past output that did not pass the validation_fn - Instruction: your user-defined feedback message on what went wrong and what possibly to fix If the error continues past the `max_backtracking_attempts`, then `dspy.Assert` will halt the pipeline execution, alerting you with an `dspy.AssertionError`. This ensures your program doesn't continue executing with “bad” LM behavior and immediately highlights sample failure outputs for user assessment. - **dspy.Suggest vs. dspy.Assert**: `dspy.Suggest` on the other hand offers a softer approach. It maintains the same retry backtracking as `dspy.Assert` but instead serves as a gentle nudger. If the model outputs cannot pass the model constraints after the `max_backtracking_attempts`, `dspy.Suggest` will log the persistent failure and continue execution of the program on the rest of the data. This ensures the LM pipeline works in a "best-effort" manner without halting execution. - **`dspy.Suggest`** statements are best utilized as "helpers" during the evaluation phase, offering guidance and potential corrections without halting the pipeline. - **`dspy.Assert`** statements are recommended during the development stage as "checkers" to ensure the LM behaves as expected, providing a robust mechanism for identifying and addressing errors early in the development cycle. ## Use Case: Including Assertions in DSPy Programs We start with using an example of a multi-hop QA SimplifiedBaleen pipeline as defined in the intro walkthrough. ```python class SimplifiedBaleen(dspy.Module): def __init__(self, passages_per_hop=2, max_hops=2): super().__init__() self.generate_query = [dspy.ChainOfThought(GenerateSearchQuery) for _ in range(max_hops)] self.retrieve = dspy.Retrieve(k=passages_per_hop) self.generate_answer = dspy.ChainOfThought(GenerateAnswer) self.max_hops = max_hops def forward(self, question): context = [] prev_queries = [question] for hop in range(self.max_hops): query = self.generate_query[hop](context=context, question=question).query prev_queries.append(query) passages = self.retrieve(query).passages context = deduplicate(context + passages) pred = self.generate_answer(context=context, question=question) pred = dspy.Prediction(context=context, answer=pred.answer) return pred baleen = SimplifiedBaleen() baleen(question = "Which award did Gary Zukav's first book receive?") ``` To include DSPy Assertions, we simply define our validation functions and declare our assertions following the respective model generation. For this use case, suppose we want to impose the following constraints: 1. Length - each query should be less than 100 characters 2. Uniqueness - each generated query should differ from previously-generated queries. We can define these validation checks as boolean functions: ```python #simplistic boolean check for query length len(query) <= 100 #Python function for validating distinct queries def validate_query_distinction_local(previous_queries, query): """check if query is distinct from previous queries""" if previous_queries == []: return True if dspy.evaluate.answer_exact_match_str(query, previous_queries, frac=0.8): return False return True ``` We can declare these validation checks through `dspy.Suggest` statements (as we want to test the program in a best-effort demonstration). We want to keep these after the query generation `query = self.generate_query[hop](context=context, question=question).query`. ```python dspy.Suggest( len(query) <= 100, "Query should be short and less than 100 characters", target_module=self.generate_query ) dspy.Suggest( validate_query_distinction_local(prev_queries, query), "Query should be distinct from: " + "; ".join(f"{i+1}) {q}" for i, q in enumerate(prev_queries)), target_module=self.generate_query ) ``` It is recommended to define a program with assertions separately than your original program if you are doing comparative evaluation for the effect of assertions. If not, feel free to set Assertions away! Let's take a look at how the SimplifiedBaleen program will look with Assertions included: ```python class SimplifiedBaleenAssertions(dspy.Module): def __init__(self, passages_per_hop=2, max_hops=2): super().__init__() self.generate_query = [dspy.ChainOfThought(GenerateSearchQuery) for _ in range(max_hops)] self.retrieve = dspy.Retrieve(k=passages_per_hop) self.generate_answer = dspy.ChainOfThought(GenerateAnswer) self.max_hops = max_hops def forward(self, question): context = [] prev_queries = [question] for hop in range(self.max_hops): query = self.generate_query[hop](context=context, question=question).query dspy.Suggest( len(query) <= 100, "Query should be short and less than 100 characters", target_module=self.generate_query ) dspy.Suggest( validate_query_distinction_local(prev_queries, query), "Query should be distinct from: " + "; ".join(f"{i+1}) {q}" for i, q in enumerate(prev_queries)), target_module=self.generate_query ) prev_queries.append(query) passages = self.retrieve(query).passages context = deduplicate(context + passages) if all_queries_distinct(prev_queries): self.passed_suggestions += 1 pred = self.generate_answer(context=context, question=question) pred = dspy.Prediction(context=context, answer=pred.answer) return pred ``` Now calling programs with DSPy Assertions requires one last step, and that is transforming the program to wrap it with internal assertions backtracking and Retry logic. ```python from dspy.primitives.assertions import assert_transform_module, backtrack_handler baleen_with_assertions = assert_transform_module(SimplifiedBaleenAssertions(), backtrack_handler) # backtrack_handler is parameterized over a few settings for the backtracking mechanism # To change the number of max retry attempts, you can do baleen_with_assertions_retry_once = assert_transform_module(SimplifiedBaleenAssertions(), functools.partial(backtrack_handler, max_backtracks=1)) ``` Alternatively, you can also directly call `activate_assertions` on the program with `dspy.Assert/Suggest` statements using the default backtracking mechanism (`max_backtracks=2`): ```python baleen_with_assertions = SimplifiedBaleenAssertions().activate_assertions() ``` Now let's take a look at the internal LM backtracking by inspecting the history of the LM query generations. Here we see that when a query fails to pass the validation check of being less than 100 characters, its internal `GenerateSearchQuery` signature is dynamically modified during the backtracking+Retry process to include the past query and the corresponding user-defined instruction: `"Query should be short and less than 100 characters"`. ```text Write a simple search query that will help answer a complex question. --- Follow the following format. Context: may contain relevant facts Question: ${question} Reasoning: Let's think step by step in order to ${produce the query}. We ... Query: ${query} --- Context: [1] «Kerry Condon | Kerry Condon (born 4 January 1983) is [...]» [2] «Corona Riccardo | Corona Riccardo (c. 1878October 15, 1917) was [...]» Question: Who acted in the shot film The Shore and is also the youngest actress ever to play Ophelia in a Royal Shakespeare Company production of "Hamlet." ? Reasoning: Let's think step by step in order to find the answer to this question. First, we need to identify the actress who played Ophelia in a Royal Shakespeare Company production of "Hamlet." Then, we need to find out if this actress also acted in the short film "The Shore." Query: "actress who played Ophelia in Royal Shakespeare Company production of Hamlet" + "actress in short film The Shore" Write a simple search query that will help answer a complex question. --- Follow the following format. Context: may contain relevant facts Question: ${question} Past Query: past output with errors Instructions: Some instructions you must satisfy Query: ${query} --- Context: [1] «Kerry Condon | Kerry Condon (born 4 January 1983) is an Irish television and film actress, best known for her role as Octavia of the Julii in the HBO/BBC series "Rome," as Stacey Ehrmantraut in AMC's "Better Call Saul" and as the voice of F.R.I.D.A.Y. in various films in the Marvel Cinematic Universe. She is also the youngest actress ever to play Ophelia in a Royal Shakespeare Company production of "Hamlet."» [2] «Corona Riccardo | Corona Riccardo (c. 1878October 15, 1917) was an Italian born American actress who had a brief Broadway stage career before leaving to become a wife and mother. Born in Naples she came to acting in 1894 playing a Mexican girl in a play at the Empire Theatre. Wilson Barrett engaged her for a role in his play "The Sign of the Cross" which he took on tour of the United States. Riccardo played the role of Ancaria and later played Berenice in the same play. Robert B. Mantell in 1898 who struck by her beauty also cast her in two Shakespeare plays, "Romeo and Juliet" and "Othello". Author Lewis Strang writing in 1899 said Riccardo was the most promising actress in America at the time. Towards the end of 1898 Mantell chose her for another Shakespeare part, Ophelia im Hamlet. Afterwards she was due to join Augustin Daly's Theatre Company but Daly died in 1899. In 1899 she gained her biggest fame by playing Iras in the first stage production of Ben-Hur.» Question: Who acted in the shot film The Shore and is also the youngest actress ever to play Ophelia in a Royal Shakespeare Company production of "Hamlet." ? Past Query: "actress who played Ophelia in Royal Shakespeare Company production of Hamlet" + "actress in short film The Shore" Instructions: Query should be short and less than 100 characters Query: "actress Ophelia RSC Hamlet" + "actress The Shore" ``` ## Assertion-Driven Optimizations DSPy Assertions work with optimizations that DSPy offers, particularly with `BootstrapFewShotWithRandomSearch`, including the following settings: - Compilation with Assertions This includes assertion-driven example bootstrapping and counterexample bootstrapping during compilation. The teacher model for bootstrapping few-shot demonstrations can make use of DSPy Assertions to offer robust bootstrapped examples for the student model to learn from during inference. In this setting, the student model does not perform assertion aware optimizations (backtracking and retry) during inference. - Compilation + Inference with Assertions -This includes assertion-driven optimizations in both compilation and inference. Now the teacher model offers assertion-driven examples but the student can further optimize with assertions of its own during inference time. ```python teleprompter = BootstrapFewShotWithRandomSearch( metric=validate_context_and_answer_and_hops, max_bootstrapped_demos=max_bootstrapped_demos, num_candidate_programs=6, ) #Compilation with Assertions compiled_with_assertions_baleen = teleprompter.compile(student = baleen, teacher = baleen_with_assertions, trainset = trainset, valset = devset) #Compilation + Inference with Assertions compiled_baleen_with_assertions = teleprompter.compile(student=baleen_with_assertions, teacher = baleen_with_assertions, trainset=trainset, valset=devset) ``` ```