This is page 2 of 14. Use http://codebase.md/stanfordnlp/dspy?lines=false&page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /tests/utils/test_exceptions.py: -------------------------------------------------------------------------------- ```python import dspy from dspy.utils.exceptions import AdapterParseError def test_adapter_parse_error_basic(): adapter_name = "ChatAdapter" signature = dspy.make_signature("question->answer1, answer2") lm_response = "[[ ## answer1 ## ]]\nanswer1" error = AdapterParseError(adapter_name=adapter_name, signature=signature, lm_response=lm_response) assert error.adapter_name == adapter_name assert error.signature == signature assert error.lm_response == lm_response error_message = str(error) assert error_message == ( "Adapter ChatAdapter failed to parse the LM response. \n\n" "LM Response: [[ ## answer1 ## ]]\nanswer1 \n\n" "Expected to find output fields in the LM response: [answer1, answer2] \n\n" ) def test_adapter_parse_error_with_message(): adapter_name = "ChatAdapter" signature = dspy.make_signature("question->answer1, answer2") lm_response = "[[ ## answer1 ## ]]\nanswer1" message = "Critical error, please fix!" error = AdapterParseError(adapter_name=adapter_name, signature=signature, lm_response=lm_response, message=message) assert error.adapter_name == adapter_name assert error.signature == signature assert error.lm_response == lm_response error_message = str(error) assert error_message == ( "Critical error, please fix!\n\n" "Adapter ChatAdapter failed to parse the LM response. \n\n" "LM Response: [[ ## answer1 ## ]]\nanswer1 \n\n" "Expected to find output fields in the LM response: [answer1, answer2] \n\n" ) def test_adapter_parse_error_with_parsed_result(): adapter_name = "ChatAdapter" signature = dspy.make_signature("question->answer1, answer2") lm_response = "[[ ## answer1 ## ]]\nanswer1" parsed_result = {"answer1": "value1"} error = AdapterParseError( adapter_name=adapter_name, signature=signature, lm_response=lm_response, parsed_result=parsed_result ) error_message = str(error) assert error_message == ( "Adapter ChatAdapter failed to parse the LM response. \n\n" "LM Response: [[ ## answer1 ## ]]\nanswer1 \n\n" "Expected to find output fields in the LM response: [answer1, answer2] \n\n" "Actual output fields parsed from the LM response: [answer1] \n\n" ) ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_nesting_1/program.py: -------------------------------------------------------------------------------- ```python ### Input models ### from pydantic import BaseModel, Field class Level5(BaseModel): field1: str = Field(..., description="A string field at the deepest level") field2: float = Field(..., description="A numerical field at the deepest level") class Level4(BaseModel): level5: Level5 class Level3(BaseModel): level4: Level4 class Level2(BaseModel): level3: Level3 class Level1(BaseModel): level2: Level2 class ProgramInputs(BaseModel): level1: Level1 ### Output models ### from typing import List from pydantic import BaseModel, Field class ResultLevel5(BaseModel): outputField1: bool = Field(..., description="A boolean field indicating success or failure") outputField2: list[str] = Field(..., description="An array of strings representing messages") class ResultLevel4(BaseModel): resultLevel5: ResultLevel5 class ResultLevel3(BaseModel): resultLevel4: ResultLevel4 class ResultLevel2(BaseModel): resultLevel3: ResultLevel3 class ResultLevel1(BaseModel): resultLevel2: ResultLevel2 class ProgramOutputs(BaseModel): resultLevel1: ResultLevel1 ### Program definition ### import dspy class BaseSignature(dspy.Signature): """ The AI program is designed to process hierarchical data structures with multiple levels of nesting. The program will take a deeply nested input structure representing a complex dataset, perform specific transformations, validations, and computations, and then produce an equally complex nested output structure. The program is suitable for applications that require detailed data processing, such as multi-level data aggregation, hierarchical data validation, and nested data transformation. """ program_signature = BaseSignature for input_field_name, input_field in ProgramInputs.model_fields.items(): program_signature = program_signature.append( name=input_field_name, field=dspy.InputField(description=input_field.description), type_=input_field.annotation, ) for output_field_name, output_field in ProgramOutputs.model_fields.items(): program_signature = program_signature.append( name=output_field_name, field=dspy.OutputField(description=input_field.description), type_=output_field.annotation, ) program = dspy.Predict(program_signature) ``` -------------------------------------------------------------------------------- /docs/docs/learn/optimization/overview.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 1 --- # Optimization in DSPy Once you have a system and a way to evaluate it, you can use DSPy optimizers to tune the prompts or weights in your program. Now it's useful to expand your data collection effort into building a training set and a held-out test set, in addition to the development set you've been using for exploration. For the training set (and its subset, validation set), you can often get substantial value out of 30 examples, but aim for at least 300 examples. Some optimizers accept a `trainset` only. Others ask for a `trainset` and a `valset`. When splitting data for most prompt optimizers, we recommend an unusual split compared to deep neural networks: 20% for training, 80% for validation. This reverse allocation emphasizes stable validation, since prompt-based optimizers often overfit to small training sets. In contrast, the [dspy.GEPA](https://dspy.ai/tutorials/gepa_ai_program/) optimizer follows the more standard ML convention: Maximize the training set size, while keeping the validation set just large enough to reflect the distribution of the downstream tasks (test set). After your first few optimization runs, you are either very happy with everything or you've made a lot of progress but you don't like something about the final program or the metric. At this point, go back to step 1 (Programming in DSPy) and revisit the major questions. Did you define your task well? Do you need to collect (or find online) more data for your problem? Do you want to update your metric? And do you want to use a more sophisticated optimizer? Do you need to consider advanced features like DSPy Assertions? Or, perhaps most importantly, do you want to add some more complexity or steps in your DSPy program itself? Do you want to use multiple optimizers in a sequence? Iterative development is key. DSPy gives you the pieces to do that incrementally: iterating on your data, your program structure, your metric, and your optimization steps. Optimizing complex LM programs is an entirely new paradigm that only exists in DSPy at the time of writing (update: there are now numerous DSPy extension frameworks, so this part is no longer true :-), so naturally the norms around what to do are still emerging. If you need help, we recently created a [Discord server](https://discord.gg/XCGy2WDCQB) for the community. ``` -------------------------------------------------------------------------------- /tests/utils/test_parallelizer.py: -------------------------------------------------------------------------------- ```python import time import pytest from dspy.utils.parallelizer import ParallelExecutor def test_worker_threads_independence(): def task(item): # Each thread maintains its own state by appending to a thread-local list return item * 2 data = [1, 2, 3, 4, 5] executor = ParallelExecutor(num_threads=3) results = executor.execute(task, data) assert results == [2, 4, 6, 8, 10] def test_parallel_execution_speed(): def task(item): time.sleep(0.1) # Simulate a time-consuming task return item data = [1, 2, 3, 4, 5] executor = ParallelExecutor(num_threads=5) start_time = time.time() executor.execute(task, data) end_time = time.time() assert end_time - start_time < len(data) def test_max_errors_handling(): def task(item): if item == 3: raise ValueError("Intentional error") return item data = [1, 2, 3, 4, 5] executor = ParallelExecutor(num_threads=3, max_errors=1) with pytest.raises(Exception, match="Execution cancelled due to errors or interruption."): executor.execute(task, data) def test_max_errors_not_met(): def task(item): if item == 3: raise ValueError("Intentional error") return item data = [1, 2, 3, 4, 5] executor = ParallelExecutor(num_threads=3, max_errors=2) # Ensure that the execution completes without crashing when max_errors is not met results = executor.execute(task, data) # Verify that the results exclude the failed task assert results == [1, 2, None, 4, 5] def test_parallel_executor_tracks_failed_indices_and_exceptions(): def task(item): if item == 3: raise ValueError("test error for 3") if item == 5: raise RuntimeError("test error for 5") return item data = [1, 2, 3, 4, 5] executor = ParallelExecutor(num_threads=3, max_errors=3) results = executor.execute(task, data) assert results == [1, 2, None, 4, None] assert sorted(executor.failed_indices) == [2, 4] assert len(executor.exceptions_map) == 2 assert isinstance(executor.exceptions_map[2], ValueError) assert str(executor.exceptions_map[2]) == "test error for 3" assert isinstance(executor.exceptions_map[4], RuntimeError) assert str(executor.exceptions_map[4]) == "test error for 5" ``` -------------------------------------------------------------------------------- /dspy/datasets/gsm8k.py: -------------------------------------------------------------------------------- ```python import random import tqdm class GSM8K: def __init__(self): self.do_shuffle = False from datasets import load_dataset dataset = load_dataset("gsm8k", "main") hf_official_train = dataset["train"] hf_official_test = dataset["test"] official_train = [] official_test = [] for example in tqdm.tqdm(hf_official_train): question = example["question"] answer = example["answer"].strip().split() assert answer[-2] == "####" gold_reasoning = " ".join(answer[:-2]) answer = str(int(answer[-1].replace(",", ""))) official_train.append({"question": question, "gold_reasoning": gold_reasoning, "answer": answer}) for example in tqdm.tqdm(hf_official_test): question = example["question"] answer = example["answer"].strip().split() assert answer[-2] == "####" gold_reasoning = " ".join(answer[:-2]) answer = str(int(answer[-1].replace(",", ""))) official_test.append({"question": question, "gold_reasoning": gold_reasoning, "answer": answer}) rng = random.Random(0) rng.shuffle(official_train) rng = random.Random(0) rng.shuffle(official_test) trainset = official_train[:200] devset = official_train[200:500] testset = official_test[:] import dspy trainset = [dspy.Example(**x).with_inputs("question") for x in trainset] devset = [dspy.Example(**x).with_inputs("question") for x in devset] testset = [dspy.Example(**x).with_inputs("question") for x in testset] self.train = trainset self.dev = devset self.test = testset def parse_integer_answer(answer, only_first_line=True): try: if only_first_line: answer = answer.strip().split("\n")[0] # find the last token that has a number in it answer = [token for token in answer.split() if any(c.isdigit() for c in token)][-1] answer = answer.split(".")[0] answer = "".join([c for c in answer if c.isdigit()]) answer = int(answer) except (ValueError, IndexError): answer = 0 return answer def gsm8k_metric(gold, pred, trace=None): return int(parse_integer_answer(str(gold.answer))) == int(parse_integer_answer(str(pred.answer))) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/core_development/index.md: -------------------------------------------------------------------------------- ```markdown # Tools, Development, and Deployment This section covers essential DSPy features and best practices for professional AI development. Learn how to implement key functionalities like streaming, caching, deployment, and monitoring in your DSPy applications. These tutorials focus on the practical aspects of building production-ready systems. ## Integration and Tooling ### [Use MCP in DSPy](../mcp/index.md) Learn to integrate Model Context Protocol (MCP) with DSPy applications. This tutorial shows how to leverage MCP for enhanced context management and more sophisticated AI interactions. ### [Output Refinement](../output_refinement/best-of-n-and-refine.md) Master techniques for improving output quality through refinement strategies. Learn how to implement best-of-N sampling and iterative refinement to get higher-quality results from your DSPy programs. ## Data Management and Persistence ### [Saving and Loading](../saving/index.md) Understand how to persist and restore DSPy programs and their optimized states. Learn best practices for model versioning, checkpoint management, and program serialization. ### [Cache](../cache/index.md) Implement efficient caching strategies to improve performance and reduce API costs. Learn how to configure and use DSPy's caching mechanisms effectively in different scenarios. ## Production Deployment ### [Deployment](../deployment/index.md) Learn to deploy DSPy applications in production environments. This tutorial covers multiple deployment strategies such as FastAPI and MLflow. ### [Streaming](../streaming/index.md) Implement real-time streaming capabilities in your DSPy applications. Learn how to handle streaming responses for better user experience in interactive applications. ### [Async](../async/index.md) Build asynchronous DSPy applications for improved performance and scalability. Learn async/await patterns and concurrent execution strategies for high-throughput systems. ## Monitoring and Optimization ### [Debugging & Observability](../observability/index.md) Master debugging and monitoring techniques for DSPy applications. Learn to use comprehensive logging, tracing, and error handling for production systems. ### [Tracking DSPy Optimizers](../optimizer_tracking/index.md) Learn to track and analyze optimizer performance and behavior. Understand how to monitor optimization processes and enhance the reproducibility of the optimization. ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/overview.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 1 --- # Programming in DSPy DSPy is a bet on _writing code instead of strings_. In other words, building the right control flow is crucial. Start by **defining your task**. What are the inputs to your system and what should your system produce as output? Is it a chatbot over your data or perhaps a code assistant? Or maybe a system for translation, for highlighting snippets from search results, or for generating reports with citations? Next, **define your initial pipeline**. Can your DSPy program just be a single module or do you need to break it down into a few steps? Do you need retrieval or other tools, like a calculator or a calendar API? Is there a typical workflow for solving your problem in multiple well-scoped steps, or do you want more open-ended tool use with agents for your task? Think about these but start simple, perhaps with just a single `dspy.ChainOfThought` module, then add complexity incrementally based on observations. As you do this, **craft and try a handful of examples** of the inputs to your program. Consider using a powerful LM at this point, or a couple of different LMs, just to understand what's possible. Record interesting (both easy and hard) examples you try. This will be useful when you are doing evaluation and optimization later. ??? "Beyond encouraging good design patterns, how does DSPy help here?" Conventional prompts couple your fundamental system architecture with incidental choices not portable to new LMs, objectives, or pipelines. A conventional prompt asks the LM to take some inputs and produce some outputs of certain types (a _signature_), formats the inputs in certain ways and requests outputs in a form it can parse accurately (an _adapter_), asks the LM to apply certain strategies like "thinking step by step" or using tools (a _module_'s logic), and relies on substantial trial-and-error to discover the right way to ask each LM to do this (a form of manual _optimization_). DSPy separates these concerns and automates the lower-level ones until you need to consider them. This allow you to write much shorter code, with much higher portability. For example, if you write a program using DSPy modules, you can swap the LM or its adapter without changing the rest of your logic. Or you can exchange one _module_, like `dspy.ChainOfThought`, with another, like `dspy.ProgramOfThought`, without modifying your signatures. When you're ready to use optimizers, the same program can have its prompts optimized or its LM weights fine-tuned. ``` -------------------------------------------------------------------------------- /dspy/utils/usage_tracker.py: -------------------------------------------------------------------------------- ```python """Usage tracking utilities for DSPy.""" from collections import defaultdict from contextlib import contextmanager from typing import Any, Generator from dspy.dsp.utils.settings import settings class UsageTracker: """Tracks LM usage data within a context.""" def __init__(self): # Map of LM name to list of usage entries. For example: # { # "openai/gpt-4o-mini": [ # {"prompt_tokens": 100, "completion_tokens": 200}, # {"prompt_tokens": 300, "completion_tokens": 400}, # ], # } self.usage_data = defaultdict(list) def _flatten_usage_entry(self, usage_entry: dict[str, Any]) -> dict[str, Any]: result = dict(usage_entry) if completion_tokens_details := result.get("completion_tokens_details"): result["completion_tokens_details"] = dict(completion_tokens_details) if prompt_tokens_details := result.get("prompt_tokens_details"): result["prompt_tokens_details"] = dict(prompt_tokens_details) return result def _merge_usage_entries(self, usage_entry1: dict[str, Any] | None, usage_entry2: dict[str, Any] | None) -> dict[str, Any]: if usage_entry1 is None or len(usage_entry1) == 0: return dict(usage_entry2) if usage_entry2 is None or len(usage_entry2) == 0: return dict(usage_entry1) result = dict(usage_entry2) for k, v in usage_entry1.items(): current_v = result.get(k) if isinstance(v, dict) or isinstance(current_v, dict): result[k] = self._merge_usage_entries(current_v, v) else: result[k] = (current_v or 0) + (v or 0) return result def add_usage(self, lm: str, usage_entry: dict[str, Any]) -> None: """Add a usage entry to the tracker.""" if len(usage_entry) > 0: self.usage_data[lm].append(self._flatten_usage_entry(usage_entry)) def get_total_tokens(self) -> dict[str, dict[str, Any]]: """Calculate total tokens from all tracked usage.""" total_usage_by_lm = {} for lm, usage_entries in self.usage_data.items(): total_usage = {} for usage_entry in usage_entries: total_usage = self._merge_usage_entries(total_usage, usage_entry) total_usage_by_lm[lm] = total_usage return total_usage_by_lm @contextmanager def track_usage() -> Generator[UsageTracker, None, None]: """Context manager for tracking LM usage.""" tracker = UsageTracker() with settings.context(usage_tracker=tracker): yield tracker ``` -------------------------------------------------------------------------------- /dspy/teleprompt/signature_opt.py: -------------------------------------------------------------------------------- ```python from .copro_optimizer import COPRO """ =============================================================== DEPRECATED!!! PLEASE USE COPRO INSTEAD. =============================================================== USAGE SUGGESTIONS: The following code can be used to compile a optimized signature teleprompter, and evaluate it on an end task: teleprompter = SignatureOptimizer(prompt_model=prompt_model, metric=metric, breadth=BREADTH, depth=DEPTH, init_temperature=INIT_TEMPERATURE) kwargs = dict(num_threads=NUM_THREADS, display_progress=True, display_table=0) compiled_prompt_opt = teleprompter.compile(program.deepcopy(), devset=devset[:DEV_NUM], eval_kwargs=kwargs) eval_score = evaluate(compiled_prompt_opt, devset=evalset[:EVAL_NUM], **kwargs) Note that this teleprompter takes in the following parameters: * prompt_model: The model used for prompt generation. When unspecified, defaults to the model set in settings (ie. dspy.settings.configure(lm=task_model)). * metric: The task metric used for optimization. * breadth: The number of new prompts to generate at each iteration. Default=10. * depth: The number of times we should ask our prompt model to generate new prompts, with the history of the past prompts as input. Default=3. * init_temperature: The temperature used to generate new prompts. Higher roughly equals more creative. Default=1.4. * verbose: Tells the method whether or not to print intermediate steps. * track_stats: Tells the method whether or not to track statistics about the optimization process. If True, the method will track the following statistics: * results_best: The min,max,avg,stddev of top 10 scores for each predictor at each depth. * results_latest: The min,max,avg,stddev of newest prompt scores for each predictor at each depth. * total_calls: The total number of calls to the task metric. These statistics will be returned as attributes of the best program. """ class SignatureOptimizer(COPRO): def __init__( self, prompt_model=None, metric=None, breadth=10, depth=3, init_temperature=1.4, verbose=False, track_stats=False, ): print( "\u001b[31m[WARNING] SignatureOptimizer has been deprecated and replaced with COPRO. SignatureOptimizer will be removed in a future release. \u001b[31m", ) super().__init__(prompt_model, metric, breadth, depth, init_temperature, verbose, track_stats) def compile(self, student, *, devset, eval_kwargs): return super().compile(student, trainset=devset, eval_kwargs=eval_kwargs) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/index.md: -------------------------------------------------------------------------------- ```markdown Welcome to DSPy tutorials! We've organized our tutorials into three main categories to help you get started: - **Build AI Programs with DSPy**: These hands-on tutorials guide you through building production-ready AI applications. From implementing RAG systems to creating intelligent agents, each tutorial demonstrates practical use cases. You'll also learn how to leverage DSPy optimizers to enhance your program's performance. - **Optimize AI Programs with DSPy Optimizers**: These tutorials deep dive into DSPy's optimization capabilities. While lighter on programming concepts, they focus on how to systematically improve your AI programs using DSPy optimizers, and showcase how DSPy optimizers help improve the quality automatically. - **DSPy Core Development**: These tutorials cover essential DSPy features and best practices. Learn how to implement key functionalities like streaming, caching, deployment, and monitoring in your DSPy applications. - Build AI Programs with DSPy - [Managing Conversation History](conversation_history/index.md) - [Building AI Agents with DSPy](customer_service_agent/index.ipynb) - [Building AI Applications by Customizing DSPy Modules](custom_module/index.ipynb) - [Retrieval-Augmented Generation (RAG)](rag/index.ipynb) - [Building RAG as Agent](agents/index.ipynb) - [Entity Extraction](entity_extraction/index.ipynb) - [Classification](classification/index.md) - [Multi-Hop RAG](multihop_search/index.ipynb) - [Privacy-Conscious Delegation](papillon/index.md) - [Program Of Thought](program_of_thought/index.ipynb) - [Image Generation Prompt iteration](image_generation_prompting/index.ipynb) - [Audio](audio/index.ipynb) - Optimize AI Programs with DSPy - [Math Reasoning](math/index.ipynb) - [Classification Finetuning](classification_finetuning/index.ipynb) - [Advanced Tool Use](tool_use/index.ipynb) - [Finetuning Agents](games/index.ipynb) - Reflective Prompt Evolution with dspy.GEPA: - [Overview](gepa_ai_program/index.md) - [GEPA for AIME](gepa_aime/index.ipynb) - [GEPA for PAPILLON](gepa_papillon/index.ipynb) - [GEPA for Enterprise classification task](gepa_facilitysupportanalyzer/index.ipynb) - Tools, Development, and Deployment - [Use MCP in DSPy](mcp/index.md) - [Output Refinement](output_refinement/best-of-n-and-refine.md) - [Saving and Loading](saving/index.md) - [Cache](cache/index.md) - [Deployment](deployment/index.md) - [Debugging & Observability](observability/index.md) - [Tracking DSPy Optimizers](optimizer_tracking/index.md) - [Streaming](streaming/index.md) - [Async](async/index.md) ``` -------------------------------------------------------------------------------- /tests/test_utils/server/__init__.py: -------------------------------------------------------------------------------- ```python import json import os import socket import subprocess import tempfile import time from typing import Any import pytest from tests.test_utils.server.litellm_server import LITELLM_TEST_SERVER_LOG_FILE_PATH_ENV_VAR @pytest.fixture() def litellm_test_server() -> tuple[str, str]: """ Start a LiteLLM test server for a DSPy integration test case, and tear down the server when the test case completes. """ with tempfile.TemporaryDirectory() as server_log_dir_path: # Create a server log file used to store request logs server_log_file_path = os.path.join(server_log_dir_path, "request_logs.jsonl") open(server_log_file_path, "a").close() port = _get_random_port() host = "127.0.0.1" print(f"Starting LiteLLM proxy server on port {port}") process = subprocess.Popen( ["litellm", "--host", host, "--port", str(port), "--config", _get_litellm_config_path()], env={LITELLM_TEST_SERVER_LOG_FILE_PATH_ENV_VAR: server_log_file_path, **os.environ.copy()}, text=True, ) try: _wait_for_port(host=host, port=port) except TimeoutError as e: process.terminate() raise e server_url = f"http://{host}:{port}" yield server_url, server_log_file_path process.kill() process.wait() def read_litellm_test_server_request_logs(server_log_file_path: str) -> list[dict[str, Any]]: """ Read request logs from a LiteLLM server used during DSPy integration tests. Args: server_log_file_path: The filesystem path to the LiteLLM server request logs jsonlines file. Return: A list of log entries, where each entry corresponds to one request handled by the server. """ data = [] with open(server_log_file_path) as f: for line in f: data.append(json.loads(line)) return data def _get_litellm_config_path(): module_dir = os.path.dirname(os.path.abspath(__file__)) return os.path.join(module_dir, "litellm_server_config.yaml") def _get_random_port(): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) return s.getsockname()[1] def _wait_for_port(host, port, timeout=10): start_time = time.time() while time.time() - start_time < timeout: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: try: sock.connect((host, port)) return True except ConnectionRefusedError: time.sleep(0.5) # Wait briefly before trying again raise TimeoutError(f"Server on port {port} did not become ready within {timeout} seconds.") ``` -------------------------------------------------------------------------------- /tests/reliability/reliability_conf.yaml: -------------------------------------------------------------------------------- ```yaml adapter: chat model_list: # The model to use for judging the correctness of program # outputs throughout reliability test suites. We recommend using # a high quality model as the judge, such as OpenAI GPT-4o - model_name: "judge" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gpt-4o" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gpt-4o-mini" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gpt-4-turbo" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gpt-o1" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gpt-o1-mini" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "claude-3.5-sonnet" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "claude-3.5-haiku" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gemini-1.5-pro" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "gemini-1.5-flash" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "llama-3.1-405b-instruct" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "llama-3.1-70b-instruct" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "llama-3.1-8b-instruct" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "llama-3.2-3b-instruct" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # api_base: "<api_base>" - model_name: "deepseek-r1" litellm_params: # model: "<litellm_provider>/<litellm_model_name>" # api_key: "api key" # max_tokens: 10000 ``` -------------------------------------------------------------------------------- /tests/reliability/conftest.py: -------------------------------------------------------------------------------- ```python import os import pytest import dspy from ..conftest import clear_settings from ..reliability.utils import get_adapter, parse_reliability_conf_yaml # Standard list of models that should be used for periodic DSPy reliability testing MODEL_LIST = [ "gpt-4o", "gpt-4o-mini", "gpt-4-turbo", "gpt-o1-preview", "gpt-o1-mini", "claude-3.5-sonnet", "claude-3.5-haiku", "gemini-1.5-pro", "gemini-1.5-flash", "llama-3.1-405b-instruct", "llama-3.1-70b-instruct", "llama-3.1-8b-instruct", "llama-3.2-3b-instruct", "deepseek-r1", ] def pytest_generate_tests(metafunc): """ Hook to parameterize reliability test cases with each model defined in the reliability tests YAML configuration """ known_failing_models = getattr(metafunc.function, "_known_failing_models", []) if "configure_model" in metafunc.fixturenames: params = [(model, model in known_failing_models) for model in MODEL_LIST] ids = [f"{model}" for model, _ in params] # Custom IDs for display metafunc.parametrize("configure_model", params, indirect=True, ids=ids) @pytest.fixture(autouse=True) def configure_model(request): """ Fixture to configure the DSPy library with a particular configured model and adapter before executing a test case. """ module_dir = os.path.dirname(os.path.abspath(__file__)) conf_path = os.path.join(module_dir, "reliability_conf.yaml") reliability_conf = parse_reliability_conf_yaml(conf_path) adapter = get_adapter(reliability_conf) model_name, should_ignore_failure = request.param model_params = reliability_conf.models.get(model_name) if model_params: lm = dspy.LM(**model_params) dspy.configure(lm=lm, adapter=adapter) else: pytest.skip( f"Skipping test because no reliability testing YAML configuration was found" f" for model {model_name}, or the YAML configuration is missing LiteLLM parameters" f" for this model ('litellm_params' section of conf file is missing)." ) # Store `should_ignore_failure` flag on the request node for use in post-test handling request.node.should_ignore_failure = should_ignore_failure request.node.model_name = model_name @pytest.hookimpl(tryfirst=True, hookwrapper=True) def pytest_runtest_makereport(item, call): """ Hook to conditionally ignore failures in a given test case for known failing models. """ outcome = yield rep = outcome.get_result() should_ignore_failure = getattr(item, "should_ignore_failure", False) if should_ignore_failure and rep.failed: rep.outcome = "passed" rep.wasxfail = "Ignoring failure for known failing model" ``` -------------------------------------------------------------------------------- /dspy/teleprompt/knn_fewshot.py: -------------------------------------------------------------------------------- ```python import types from typing import Any from dspy.clients import Embedder from dspy.predict.knn import KNN from dspy.primitives import Example from dspy.teleprompt import BootstrapFewShot from dspy.teleprompt.teleprompt import Teleprompter class KNNFewShot(Teleprompter): def __init__(self, k: int, trainset: list[Example], vectorizer: Embedder, **few_shot_bootstrap_args: dict[str, Any]): """ KNNFewShot is an optimizer that uses an in-memory KNN retriever to find the k nearest neighbors in a trainset at test time. For each input example in a forward call, it identifies the k most similar examples from the trainset and attaches them as demonstrations to the student module. Args: k: The number of nearest neighbors to attach to the student model. trainset: The training set to use for few-shot prompting. vectorizer: The `Embedder` to use for vectorization **few_shot_bootstrap_args: Additional arguments for the `BootstrapFewShot` optimizer. Example: ```python import dspy from sentence_transformers import SentenceTransformer # Define a QA module with chain of thought qa = dspy.ChainOfThought("question -> answer") # Create a training dataset with examples trainset = [ dspy.Example(question="What is the capital of France?", answer="Paris").with_inputs("question"), # ... more examples ... ] # Initialize KNNFewShot with a sentence transformer model knn_few_shot = KNNFewShot( k=3, trainset=trainset, vectorizer=dspy.Embedder(SentenceTransformer("all-MiniLM-L6-v2").encode) ) # Compile the QA module with few-shot learning compiled_qa = knn_few_shot.compile(qa) # Use the compiled module result = compiled_qa("What is the capital of Belgium?") ``` """ self.KNN = KNN(k, trainset, vectorizer=vectorizer) self.few_shot_bootstrap_args = few_shot_bootstrap_args def compile(self, student, *, teacher=None): student_copy = student.reset_copy() def forward_pass(_, **kwargs): knn_trainset = self.KNN(**kwargs) few_shot_bootstrap = BootstrapFewShot(**self.few_shot_bootstrap_args) compiled_program = few_shot_bootstrap.compile( student, teacher=teacher, trainset=knn_trainset, ) return compiled_program(**kwargs) student_copy.forward = types.MethodType(forward_pass, student_copy) return student_copy ``` -------------------------------------------------------------------------------- /tests/teleprompt/test_knn_fewshot.py: -------------------------------------------------------------------------------- ```python import pytest import dspy from dspy.teleprompt.knn_fewshot import KNNFewShot from dspy.utils.dummies import DummyLM, DummyVectorizer def mock_example(question: str, answer: str) -> dspy.Example: """Creates a mock DSP example with specified question and answer.""" return dspy.Example(question=question, answer=answer).with_inputs("question") @pytest.fixture def setup_knn_few_shot() -> KNNFewShot: """Sets up a KNNFewShot instance for testing.""" trainset = [ mock_example("What is the capital of France?", "Paris"), mock_example("What is the largest ocean?", "Pacific"), mock_example("What is 2+2?", "4"), ] return KNNFewShot(k=2, trainset=trainset, vectorizer=dspy.Embedder(DummyVectorizer())) def test_knn_few_shot_initialization(setup_knn_few_shot): """Tests the KNNFewShot initialization.""" knn_few_shot = setup_knn_few_shot assert knn_few_shot.KNN.k == 2, "Incorrect k value for KNN" assert len(knn_few_shot.KNN.trainset) == 3, "Incorrect trainset size for KNN" class SimpleModule(dspy.Module): def __init__(self, signature): super().__init__() self.predictor = dspy.Predict(signature) def forward(self, *args, **kwargs): return self.predictor(**kwargs) def reset_copy(self): # Creates a new instance of SimpleModule with the same predictor return SimpleModule(self.predictor.signature) # TODO: Test not working yet def _test_knn_few_shot_compile(setup_knn_few_shot): """Tests the compile method of KNNFewShot with SimpleModule as student.""" student = SimpleModule("input -> output") teacher = SimpleModule("input -> output") # Assuming teacher uses the same module type # Setup DummyLM with a response for a query similar to one of the training examples lm = DummyLM(["Madrid", "10"]) dspy.settings.configure(lm=lm) # Responses for the capital of Spain and the result of 5+5) knn_few_shot = setup_knn_few_shot trainset = knn_few_shot.KNN.trainset compiled_student = knn_few_shot.compile(student, teacher=teacher, trainset=trainset, valset=None) assert len(compiled_student.predictor.demos) == 1 assert compiled_student.predictor.demos[0].input == trainset[0].input assert compiled_student.predictor.demos[0].output == trainset[0].output # Simulate a query that is similar to one of the training examples output = compiled_student.forward(input="What is the capital of Spain?").output # Validate that the output corresponds to one of the expected DummyLM responses # This assumes the compiled_student's forward method will execute the predictor with the given query assert output in ["Madrid", "10"], "The compiled student did not return the correct output based on the query" ``` -------------------------------------------------------------------------------- /dspy/utils/annotation.py: -------------------------------------------------------------------------------- ```python import inspect import re import types from typing import Callable, ParamSpec, TypeVar, overload P = ParamSpec("P") R = TypeVar("R") @overload def experimental(f: Callable[P, R], version: str | None = None) -> Callable[P, R]: ... @overload def experimental(f: None = None, version: str | None = None) -> Callable[[Callable[P, R]], Callable[P, R]]: ... def experimental( f: Callable[P, R] | None = None, version: str | None = None, ) -> Callable[[Callable[P, R]], Callable[P, R]]: """Decorator / decorator creator for marking APIs experimental in the docstring. Args: f: The function to be decorated. version: The version in which the API was introduced as experimental. The version is used to determine whether the API should be considered as stable or not when releasing a new version of DSPy. Returns: A decorator that adds a note to the docstring of the decorated API. """ if f: return _experimental(f, version) else: def decorator(f: Callable[P, R]) -> Callable[P, R]: return _experimental(f, version) return decorator def _experimental(api: Callable[P, R], version: str | None = None) -> Callable[P, R]: """Add experimental notice to the API's docstring.""" if inspect.isclass(api): api_type = "class" elif inspect.isfunction(api): api_type = "function" elif isinstance(api, property): api_type = "property" elif isinstance(api, types.MethodType): api_type = "method" else: api_type = str(type(api)) indent = _get_min_indent_of_docstring(api.__doc__) if api.__doc__ else "" version_text = f" (introduced in v{version})" if version else "" notice = ( indent + f"Experimental: This {api_type} may change or " f"be removed in a future release without warning{version_text}." ) if api_type == "property": api.__doc__ = api.__doc__ + "\n\n" + notice if api.__doc__ else notice else: if api.__doc__: api.__doc__ = notice + "\n\n" + api.__doc__ else: api.__doc__ = notice return api def _get_min_indent_of_docstring(docstring_str: str) -> str: """ Get the minimum indentation string of a docstring, based on the assumption that the closing triple quote for multiline comments must be on a new line. Note that based on ruff rule D209, the closing triple quote for multiline comments must be on a new line. Args: docstring_str: string with docstring Returns: Whitespace corresponding to the indent of a docstring. """ if not docstring_str or "\n" not in docstring_str: return "" match = re.match(r"^\s*", docstring_str.rsplit("\n", 1)[-1]) return match.group() if match else "" ``` -------------------------------------------------------------------------------- /tests/teleprompt/test_bootstrap_finetune.py: -------------------------------------------------------------------------------- ```python from unittest.mock import patch import dspy from dspy import Example from dspy.predict import Predict from dspy.teleprompt import BootstrapFinetune from dspy.utils.dummies import DummyLM # Define a simple metric function for testing def simple_metric(example, prediction, trace=None): return example.output == prediction.output examples = [ Example(input="What is the color of the sky?", output="blue").with_inputs("input"), Example(input="What does the fox say?", output="Ring-ding-ding-ding-dingeringeding!").with_inputs("input"), ] trainset = [examples[0]] def test_bootstrap_finetune_initialization(): """Test BootstrapFinetune initialization with various parameters.""" bootstrap = BootstrapFinetune(metric=simple_metric) assert bootstrap.metric == simple_metric, "Metric not correctly initialized" assert bootstrap.multitask == True, "Multitask should default to True" class SimpleModule(dspy.Module): def __init__(self, signature): super().__init__() self.predictor = Predict(signature) def forward(self, **kwargs): return self.predictor(**kwargs) def test_compile_with_predict_instances(): """Test BootstrapFinetune compilation with Predict instances.""" # Create SimpleModule instances for student and teacher student = SimpleModule("input -> output") teacher = SimpleModule("input -> output") lm = DummyLM([{"output": "blue"}, {"output": "Ring-ding-ding-ding-dingeringeding!"}]) dspy.settings.configure(lm=lm) # Set LM for both student and teacher student.set_lm(lm) teacher.set_lm(lm) bootstrap = BootstrapFinetune(metric=simple_metric) # Mock the fine-tuning process since DummyLM doesn't support it with patch.object(bootstrap, "finetune_lms") as mock_finetune: mock_finetune.return_value = {(lm, None): lm} compiled_student = bootstrap.compile(student, teacher=teacher, trainset=trainset) assert compiled_student is not None, "Failed to compile student" assert hasattr(compiled_student, "_compiled") and compiled_student._compiled, "Student compilation flag not set" mock_finetune.assert_called_once() def test_error_handling_missing_lm(): """Test error handling when predictor doesn't have an LM assigned.""" lm = DummyLM([{"output": "test"}]) dspy.settings.configure(lm=lm) student = SimpleModule("input -> output") # Intentionally NOT setting LM for the student module bootstrap = BootstrapFinetune(metric=simple_metric) # This should raise ValueError about missing LM and hint to use set_lm try: bootstrap.compile(student, trainset=trainset) assert False, "Should have raised ValueError for missing LM" except ValueError as e: assert "does not have an LM assigned" in str(e) assert "set_lm" in str(e) ``` -------------------------------------------------------------------------------- /dspy/utils/inspect_history.py: -------------------------------------------------------------------------------- ```python def _green(text: str, end: str = "\n"): return "\x1b[32m" + str(text).lstrip() + "\x1b[0m" + end def _red(text: str, end: str = "\n"): return "\x1b[31m" + str(text) + "\x1b[0m" + end def _blue(text: str, end: str = "\n"): return "\x1b[34m" + str(text) + "\x1b[0m" + end def pretty_print_history(history, n: int = 1): """Prints the last n prompts and their completions.""" for item in history[-n:]: messages = item["messages"] or [{"role": "user", "content": item["prompt"]}] outputs = item["outputs"] timestamp = item.get("timestamp", "Unknown time") print("\n\n\n") print("\x1b[34m" + f"[{timestamp}]" + "\x1b[0m" + "\n") for msg in messages: print(_red(f"{msg['role'].capitalize()} message:")) if isinstance(msg["content"], str): print(msg["content"].strip()) else: if isinstance(msg["content"], list): for c in msg["content"]: if c["type"] == "text": print(c["text"].strip()) elif c["type"] == "image_url": image_str = "" if "base64" in c["image_url"].get("url", ""): len_base64 = len(c["image_url"]["url"].split("base64,")[1]) image_str = ( f"<{c['image_url']['url'].split('base64,')[0]}base64," f"<IMAGE BASE 64 ENCODED({len_base64!s})>" ) else: image_str = f"<image_url: {c['image_url']['url']}>" print(_blue(image_str.strip())) elif c["type"] == "input_audio": audio_format = c["input_audio"]["format"] len_audio = len(c["input_audio"]["data"]) audio_str = f"<audio format='{audio_format}' base64-encoded, length={len_audio}>" print(_blue(audio_str.strip())) print("\n") if isinstance(outputs[0], dict): if outputs[0]["text"]: print(_red("Response:")) print(_green(outputs[0]["text"].strip())) if outputs[0].get("tool_calls"): print(_red("Tool calls:")) for tool_call in outputs[0]["tool_calls"]: print(_green(f"{tool_call['function']['name']}: {tool_call['function']['arguments']}")) else: print(_red("Response:")) print(_green(outputs[0].strip())) if len(outputs) > 1: choices_text = f" \t (and {len(outputs) - 1} other completions)" print(_red(choices_text, end="")) print("\n\n\n") ``` -------------------------------------------------------------------------------- /tests/predict/test_refine.py: -------------------------------------------------------------------------------- ```python import pytest import dspy from dspy.predict.predict import Predict from dspy.predict.refine import Refine from dspy.primitives.prediction import Prediction from dspy.utils.dummies import DummyLM class DummyModule(dspy.Module): def __init__(self, signature, forward_fn): super().__init__() self.predictor = Predict(signature) self.forward_fn = forward_fn def forward(self, **kwargs) -> Prediction: return self.forward_fn(self, **kwargs) def test_refine_forward_success_first_attempt(): lm = DummyLM([{"answer": "Brussels"}, {"answer": "City of Brussels"}, {"answer": "Brussels"}]) dspy.settings.configure(lm=lm) module_call_count = [0] def count_calls(self, **kwargs): module_call_count[0] += 1 return self.predictor(**kwargs) reward_call_count = [0] def reward_fn(kwargs, pred: Prediction) -> float: reward_call_count[0] += 1 # The answer should always be one word. return 1.0 if len(pred.answer) == 1 else 0.0 predict = DummyModule("question -> answer", count_calls) refine = Refine(module=predict, N=3, reward_fn=reward_fn, threshold=1.0) result = refine(question="What is the capital of Belgium?") assert result.answer == "Brussels", "Result should be `Brussels`" assert reward_call_count[0] > 0, "Reward function should have been called" assert module_call_count[0] == 3, ( "Module should have been called exactly 3 times, but was called %d times" % module_call_count[0] ) def test_refine_module_default_fail_count(): lm = DummyLM([{"answer": "Brussels"}, {"answer": "City of Brussels"}, {"answer": "Brussels"}]) dspy.settings.configure(lm=lm) def always_raise(self, **kwargs): raise ValueError("Deliberately failing") predict = DummyModule("question -> answer", always_raise) refine = Refine(module=predict, N=3, reward_fn=lambda _, __: 1.0, threshold=0.0) with pytest.raises(ValueError): refine(question="What is the capital of Belgium?") def test_refine_module_custom_fail_count(): lm = DummyLM([{"answer": "Brussels"}, {"answer": "City of Brussels"}, {"answer": "Brussels"}]) dspy.settings.configure(lm=lm) module_call_count = [0] def raise_on_second_call(self, **kwargs): if module_call_count[0] < 2: module_call_count[0] += 1 raise ValueError("Deliberately failing") return self.predictor(**kwargs) predict = DummyModule("question -> answer", raise_on_second_call) refine = Refine(module=predict, N=3, reward_fn=lambda _, __: 1.0, threshold=0.0, fail_count=1) with pytest.raises(ValueError): refine(question="What is the capital of Belgium?") assert module_call_count[0] == 2, ( "Module should have been called exactly 2 times, but was called %d times" % module_call_count[0] ) ``` -------------------------------------------------------------------------------- /tests/predict/test_best_of_n.py: -------------------------------------------------------------------------------- ```python import pytest import dspy from dspy.predict.best_of_n import BestOfN from dspy.predict.predict import Predict from dspy.primitives.prediction import Prediction from dspy.utils.dummies import DummyLM class DummyModule(dspy.Module): def __init__(self, signature, forward_fn): super().__init__() self.predictor = Predict(signature) self.forward_fn = forward_fn def forward(self, **kwargs) -> Prediction: return self.forward_fn(self, **kwargs) def test_refine_forward_success_first_attempt(): lm = DummyLM([{"answer": "Brussels"}, {"answer": "City of Brussels"}, {"answer": "Brussels"}]) dspy.settings.configure(lm=lm) module_call_count = [0] def count_calls(self, **kwargs): module_call_count[0] += 1 return self.predictor(**kwargs) reward_call_count = [0] def reward_fn(kwargs, pred: Prediction) -> float: reward_call_count[0] += 1 # The answer should always be one word. return 1.0 if len(pred.answer) == 1 else 0.0 predict = DummyModule("question -> answer", count_calls) best_of_n = BestOfN(module=predict, N=3, reward_fn=reward_fn, threshold=1.0) result = best_of_n(question="What is the capital of Belgium?") assert result.answer == "Brussels", "Result should be `Brussels`" assert reward_call_count[0] > 0, "Reward function should have been called" assert module_call_count[0] == 3, ( "Module should have been called exactly 3 times, but was called %d times" % module_call_count[0] ) def test_refine_module_default_fail_count(): lm = DummyLM([{"answer": "Brussels"}, {"answer": "City of Brussels"}, {"answer": "Brussels"}]) dspy.settings.configure(lm=lm) def always_raise(self, **kwargs): raise ValueError("Deliberately failing") predict = DummyModule("question -> answer", always_raise) best_of_n = BestOfN(module=predict, N=3, reward_fn=lambda _, __: 1.0, threshold=0.0) with pytest.raises(ValueError): best_of_n(question="What is the capital of Belgium?") def test_refine_module_custom_fail_count(): lm = DummyLM([{"answer": "Brussels"}, {"answer": "City of Brussels"}, {"answer": "Brussels"}]) dspy.settings.configure(lm=lm) module_call_count = [0] def raise_on_second_call(self, **kwargs): if module_call_count[0] < 2: module_call_count[0] += 1 raise ValueError("Deliberately failing") return self.predictor(**kwargs) predict = DummyModule("question -> answer", raise_on_second_call) best_of_n = BestOfN(module=predict, N=3, reward_fn=lambda _, __: 1.0, threshold=0.0, fail_count=1) with pytest.raises(ValueError): best_of_n(question="What is the capital of Belgium?") assert module_call_count[0] == 2, ( "Module should have been called exactly 2 times, but was called %d times" % module_call_count[0] ) ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_many_types_1/program.py: -------------------------------------------------------------------------------- ```python ### Input models ### from datetime import datetime from enum import Enum from typing import List, Tuple from pydantic import BaseModel, Field class EnumField(Enum): option1 = "option1" option2 = "option2" option3 = "option3" class LiteralField(Enum): literalValue = "literalValue" class ObjectField(BaseModel): subField1: str subField2: float class NestedObjectField(BaseModel): tupleField: Tuple[str, float] enumField: EnumField datetimeField: datetime literalField: LiteralField class ProgramInputs(BaseModel): tupleField: Tuple[str, float] enumField: EnumField datetimeField: datetime literalField: LiteralField objectField: ObjectField nestedObjectField: NestedObjectField ### Output models ### from datetime import datetime from enum import Enum from typing import List, Tuple, Union from pydantic import BaseModel, Field class ProcessedEnumField(Enum): option1 = "option1" option2 = "option2" option3 = "option3" class ProcessedLiteralField(Enum): literalValue = "literalValue" class ProcessedObjectField(BaseModel): subField1: str subField2: float additionalField: bool class EnumField(Enum): option1 = "option1" option2 = "option2" option3 = "option3" class LiteralField(Enum): literalValue = "literalValue" class ProcessedNestedObjectField(BaseModel): tupleField: Tuple[str, float] enumField: EnumField datetimeField: datetime literalField: LiteralField additionalField: bool class ProgramOutputs(BaseModel): processedTupleField: Tuple[str, float] processedEnumField: ProcessedEnumField processedDatetimeField: datetime processedLiteralField: ProcessedLiteralField processedObjectField: ProcessedObjectField processedNestedObjectField: ProcessedNestedObjectField ### Program definition ### import dspy class BaseSignature(dspy.Signature): """ The program is designed to process various data types including tuples, enums, datetime values, literals, objects, and nested objects containing these types. The program will accept inputs of these types, perform specified operations on them, and return the results. The operations could include validation, transformation, and extraction of information from these inputs. """ program_signature = BaseSignature for input_field_name, input_field in ProgramInputs.model_fields.items(): program_signature = program_signature.append( name=input_field_name, field=dspy.InputField(description=input_field.description), type_=input_field.annotation, ) for output_field_name, output_field in ProgramOutputs.model_fields.items(): program_signature = program_signature.append( name=output_field_name, field=dspy.OutputField(description=input_field.description), type_=output_field.annotation, ) program = dspy.Predict(program_signature) ``` -------------------------------------------------------------------------------- /tests/clients/test_databricks.py: -------------------------------------------------------------------------------- ```python """Test the Databricks finetuning and deployment. This test requires valid Databricks credentials, so it is skipped on github actions. Right now it is only used for manual testing. """ import pytest import dspy from dspy.clients.databricks import ( DatabricksProvider, TrainingJobDatabricks, _create_directory_in_databricks_unity_catalog, ) try: from databricks.sdk import WorkspaceClient WorkspaceClient() except (ImportError, Exception): # Skip the test if the Databricks SDK is not configured or credentials are not available. pytestmark = pytest.mark.skip(reason="Databricks SDK not configured or credentials not available") def test_create_directory_in_databricks_unity_catalog(): from databricks.sdk import WorkspaceClient w = WorkspaceClient() with pytest.raises( ValueError, match=( "Databricks Unity Catalog path must be in the format '/Volumes/<catalog>/<schema>/<volume>/...', " "but received: /badstring/whatever" ), ): _create_directory_in_databricks_unity_catalog(w, "/badstring/whatever") _create_directory_in_databricks_unity_catalog(w, "/Volumes/main/chenmoney/testing/dspy_testing") # Check that the directory was created successfully, otherwise `get_directory_metadata` will raise an exception. w.files.get_directory_metadata("/Volumes/main/chenmoney/testing/dspy_testing") def test_create_finetuning_job(): fake_training_data = [ { "messages": [ {"role": "user", "content": "Hello, how are you?"}, {"role": "assistant", "content": "I'm doing great, thank you!"}, ] }, { "messages": [ {"role": "user", "content": "What is the capital of France?"}, {"role": "assistant", "content": "Paris!"}, ] }, { "messages": [ {"role": "user", "content": "What is the capital of Germany?"}, {"role": "assistant", "content": "Berlin!"}, ] }, ] dspy.settings.experimental = True job = TrainingJobDatabricks() DatabricksProvider.finetune( job=job, model="meta-llama/Llama-3.2-1B", train_data=fake_training_data, data_format="chat", train_kwargs={ "train_data_path": "/Volumes/main/chenmoney/testing/dspy_testing", "register_to": "main.chenmoney.finetuned_model", "task_type": "CHAT_COMPLETION", "skip_deploy": True, }, ) assert job.finetuning_run.status.display_name is not None def test_deploy_finetuned_model(): dspy.settings.experimental = True model_to_deploy = "main.chenmoney.finetuned_model" DatabricksProvider.deploy_finetuned_model( model=model_to_deploy, data_format="chat", ) lm = dspy.LM(model="databricks/main_chenmoney_finetuned_model") lm("what is 2 + 2?") ``` -------------------------------------------------------------------------------- /dspy/predict/retry.py: -------------------------------------------------------------------------------- ```python # import copy # import dspy # from .predict import Predict # class Retry(Predict): # def __init__(self, module): # super().__init__(module.signature) # self.module = module # self.original_signature = module.signature # self.original_forward = module.forward # self.new_signature = self._create_new_signature(self.original_signature) # def _create_new_signature(self, signature): # # Add "Past" input fields for each output field # for key, value in signature.output_fields.items(): # actual_prefix = value.json_schema_extra["prefix"].split(":")[0] + ":" # signature = signature.append(f"past_{key}", dspy.InputField( # prefix="Previous " + actual_prefix, # desc=f"past {actual_prefix[:-1]} with errors", # format=value.json_schema_extra.get("format"), # )) # signature = signature.append("feedback", dspy.InputField( # prefix="Instructions:", # desc="Some instructions you must satisfy", # format=str, # )) # return signature # def forward(self, *, past_outputs, **kwargs): # # Take into account the possible new signature, as in TypedPredictor # new_signature = kwargs.pop("new_signature", None) # if new_signature: # self.original_signature = new_signature # self.new_signature = self._create_new_signature(self.original_signature) # # Convert the dict past_outputs={"answer": ...} to kwargs # # {past_answer=..., ...} # for key, value in past_outputs.items(): # past_key = f"past_{key}" # if past_key in self.new_signature.input_fields: # kwargs[past_key] = value # # Tell the wrapped module to use the new signature. # # Note: This only works if the wrapped module is a Predict or ChainOfThought. # kwargs["new_signature"] = self.new_signature # return self.original_forward(**kwargs) # def __call__(self, **kwargs): # copy.deepcopy(kwargs) # kwargs["_trace"] = False # kwargs.setdefault("demos", self.demos if self.demos is not None else []) # # perform backtracking # if dspy.settings.backtrack_to == self: # for key, value in dspy.settings.backtrack_to_args.items(): # kwargs.setdefault(key, value) # pred = self.forward(**kwargs) # else: # pred = self.module(**kwargs) # # now pop multiple reserved keys # # NOTE(shangyin) past_outputs seems not useful to include in demos, # # therefore dropped # for key in ["_trace", "demos", "signature", "new_signature", "config", "lm", "past_outputs"]: # kwargs.pop(key, None) # if dspy.settings.trace is not None: # trace = dspy.settings.trace # trace.append((self, {**kwargs}, pred)) # return pred ``` -------------------------------------------------------------------------------- /tests/primitives/test_example.py: -------------------------------------------------------------------------------- ```python import pytest import dspy from dspy import Example def test_example_initialization(): example = Example(a=1, b=2) assert example.a == 1 assert example.b == 2 def test_example_initialization_from_base(): base = Example(a=1, b=2) example = Example(base=base, c=3) assert example.a == 1 assert example.b == 2 assert example.c == 3 def test_example_initialization_from_dict(): base_dict = {"a": 1, "b": 2} example = Example(base=base_dict, c=3) assert example.a == 1 assert example.b == 2 assert example.c == 3 def test_example_set_get_item(): example = Example() example["a"] = 1 assert example["a"] == 1 def test_example_attribute_access(): example = Example(a=1) assert example.a == 1 example.a = 2 assert example.a == 2 def test_example_deletion(): example = Example(a=1, b=2) del example["a"] with pytest.raises(AttributeError): _ = example.a def test_example_len(): example = Example(a=1, b=2, dspy_hidden=3) assert len(example) == 2 def test_example_repr_str_img(): example = Example( img=dspy.Image(url="") ) assert ( repr(example) == "Example({'img': Image(url=data:image/gif;base64,<IMAGE_BASE_64_ENCODED(56)>)}) (input_keys=None)" ) assert ( str(example) == "Example({'img': Image(url=data:image/gif;base64,<IMAGE_BASE_64_ENCODED(56)>)}) (input_keys=None)" ) def test_example_repr_str(): example = Example(a=1) assert repr(example) == "Example({'a': 1}) (input_keys=None)" assert str(example) == "Example({'a': 1}) (input_keys=None)" def test_example_eq(): example1 = Example(a=1, b=2) example2 = Example(a=1, b=2) assert example1 == example2 assert example1 != "" def test_example_hash(): example1 = Example(a=1, b=2) example2 = Example(a=1, b=2) assert hash(example1) == hash(example2) def test_example_keys_values_items(): example = Example(a=1, b=2, dspy_hidden=3) assert set(example.keys()) == {"a", "b"} assert 1 in example.values() assert ("b", 2) in example.items() def test_example_get(): example = Example(a=1, b=2) assert example.get("a") == 1 assert example.get("c", "default") == "default" def test_example_with_inputs(): example = Example(a=1, b=2).with_inputs("a") assert example._input_keys == {"a"} def test_example_inputs_labels(): example = Example(a=1, b=2).with_inputs("a") inputs = example.inputs() assert inputs.toDict() == {"a": 1} labels = example.labels() assert labels.toDict() == {"b": 2} def test_example_copy_without(): example = Example(a=1, b=2) copied = example.copy(c=3) assert copied.a == 1 assert copied.c == 3 without_a = copied.without("a") with pytest.raises(AttributeError): _ = without_a.a def test_example_to_dict(): example = Example(a=1, b=2) assert example.toDict() == {"a": 1, "b": 2} ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/build_ai_program/index.md: -------------------------------------------------------------------------------- ```markdown # Build AI Programs with DSPy This section contains hands-on tutorials that guide you through building production-ready AI applications using DSPy. Each tutorial demonstrates practical use cases and shows you how to leverage DSPy's modular programming approach to create robust, maintainable AI systems. ## Core Applications ### [Managing Conversation History](../conversation_history/index.md) Learn how to manage conversation history in DSPy applications. ### [Building AI Agents with DSPy](../customer_service_agent/index.ipynb) Learn to create intelligent agents that can handle complex customer service scenarios. This tutorial shows how to build agents that can understand context, maintain conversation state, and provide helpful responses. ### [Building AI Applications by Customizing DSPy Modules](../custom_module/index.ipynb) Discover how to create custom DSPy modules tailored to your specific needs. Learn the patterns for building reusable, composable components that can be shared across different applications. ## Retrieval-Augmented Generation (RAG) ### [Retrieval-Augmented Generation (RAG)](../rag/index.ipynb) Master the fundamentals of RAG systems with DSPy. Learn how to combine retrieval mechanisms with language models to build systems that can answer questions using external knowledge sources. ### [Building RAG as Agent](../agents/index.ipynb) Take RAG to the next level by building `ReAct` agent-based systems that can reason about when and how to retrieve information, making your RAG systems more intelligent and adaptive. ### [Multi-Hop RAG](../multihop_search/index.ipynb) Build sophisticated RAG systems that can perform multi-step reasoning across multiple information sources, perfect for complex research and analysis tasks. ## Specialized Use Cases ### [Entity Extraction](../entity_extraction/index.ipynb) Learn to build systems that can identify and extract specific entities from text, essential for information processing and data analysis applications. ### [Classification](../classification/index.md) Build robust text classification systems using DSPy's modular approach with a topic classification example. ### [Privacy-Conscious Delegation](../papillon/index.md) Explore advanced techniques for building AI systems that respect privacy constraints while maintaining high performance by combining a small local model and an advanced external model. ## Advanced Reasoning ### [Program Of Thought](../program_of_thought/index.ipynb) Learn to build systems that can generate and execute code to solve complex problems, combining the power of language models with programmatic reasoning. ## Multimodal Applications ### [Image Generation Prompt iteration](../image_generation_prompting/index.ipynb) Discover how to use DSPy to iteratively improve image generation prompts, creating better visual content through systematic optimization. ### [Audio](../audio/index.ipynb) Explore audio processing applications with DSPy, learning to build systems that can understand, process, and generate audio content. ``` -------------------------------------------------------------------------------- /tests/predict/test_retry.py: -------------------------------------------------------------------------------- ```python # import functools # import pydantic # import dspy # from dspy.primitives.assertions import assert_transform_module, backtrack_handler # from dspy.utils import DummyLM # def test_retry_simple(): # predict = dspy.Predict("question -> answer") # retry_module = dspy.Retry(predict) # # Test Retry has created the correct new signature # for field in predict.signature.output_fields: # assert f"past_{field}" in retry_module.new_signature.input_fields # assert "feedback" in retry_module.new_signature.input_fields # lm = DummyLM([{"answer": "blue"}]) # dspy.settings.configure(lm=lm) # result = retry_module.forward( # question="What color is the sky?", # past_outputs={"answer": "red"}, # feedback="Try harder", # ) # assert result.answer == "blue" # def test_retry_forward_with_feedback(): # # First we make a mistake, then we fix it # lm = DummyLM([{"answer": "red"}, {"answer": "blue"}]) # dspy.settings.configure(lm=lm, trace=[]) # class SimpleModule(dspy.Module): # def __init__(self): # super().__init__() # self.predictor = dspy.Predict("question -> answer") # def forward(self, **kwargs): # result = self.predictor(**kwargs) # print(f"SimpleModule got {result.answer=}") # dspy.Suggest(result.answer == "blue", "Please think harder") # return result # program = SimpleModule() # program = assert_transform_module( # program.map_named_predictors(dspy.Retry), # functools.partial(backtrack_handler, max_backtracks=1), # ) # result = program(question="What color is the sky?") # assert result.answer == "blue" # # def test_retry_forward_with_typed_predictor(): # # # First we make a mistake, then we fix it # # lm = DummyLM([{"output": '{"answer":"red"}'}, {"output": '{"answer":"blue"}'}]) # # dspy.settings.configure(lm=lm, trace=[]) # # class AnswerQuestion(dspy.Signature): # # """Answer questions with succinct responses.""" # # class Input(pydantic.BaseModel): # # question: str # # class Output(pydantic.BaseModel): # # answer: str # # input: Input = dspy.InputField() # # output: Output = dspy.OutputField() # # class QuestionAnswerer(dspy.Module): # # def __init__(self): # # super().__init__() # # self.answer_question = dspy.TypedPredictor(AnswerQuestion) # # def forward(self, **kwargs): # # result = self.answer_question(input=AnswerQuestion.Input(**kwargs)).output # # dspy.Suggest(result.answer == "blue", "Please think harder") # # return result # # program = QuestionAnswerer() # # program = assert_transform_module( # # program.map_named_predictors(dspy.Retry), # # functools.partial(backtrack_handler, max_backtracks=1), # # ) # # result = program(question="What color is the sky?") # # assert result.answer == "blue" ``` -------------------------------------------------------------------------------- /tests/utils/test_annotation.py: -------------------------------------------------------------------------------- ```python from dspy.utils.annotation import experimental def test_experimental_decorator_on_function(): @experimental def test_function(): """A test function.""" return "test" assert "Experimental: This function may change or be removed in a future release without warning." in test_function.__doc__ assert "A test function." in test_function.__doc__ assert test_function() == "test" def test_experimental_decorator_on_function_with_version(): @experimental(version="3.1.0") def test_function(): """A test function with version.""" return "versioned" assert "introduced in v3.1.0" in test_function.__doc__ assert "Experimental: This function may change or be removed in a future release without warning (introduced in v3.1.0)." in test_function.__doc__ assert "A test function with version." in test_function.__doc__ assert test_function() == "versioned" def test_experimental_decorator_on_class(): @experimental class TestClass: """A test class.""" def method(self): return "method" assert "Experimental: This class may change or be removed in a future release without warning." in TestClass.__doc__ assert "A test class." in TestClass.__doc__ instance = TestClass() assert instance.method() == "method" def test_experimental_decorator_on_class_with_version(): @experimental(version="2.5.0") class TestClass: """A test class with version.""" pass assert "introduced in v2.5.0" in TestClass.__doc__ assert "Experimental: This class may change or be removed in a future release without warning (introduced in v2.5.0)." in TestClass.__doc__ assert "A test class with version." in TestClass.__doc__ def test_experimental_decorator_without_docstring(): @experimental def test_function(): return "no_doc" assert test_function.__doc__ == "Experimental: This function may change or be removed in a future release without warning." assert test_function() == "no_doc" def test_experimental_decorator_without_docstring_with_version(): @experimental(version="1.0.0") def test_function(): return "no_doc_version" assert test_function.__doc__ == "Experimental: This function may change or be removed in a future release without warning (introduced in v1.0.0)." assert test_function() == "no_doc_version" def test_experimental_decorator_with_callable_syntax(): def test_function(): """A test function.""" return "callable" decorated = experimental(test_function) assert "Experimental:" in decorated.__doc__ assert "A test function." in decorated.__doc__ assert decorated() == "callable" def test_experimental_decorator_with_version_callable_syntax(): def test_function(): """A test function.""" return "callable_version" decorated = experimental(test_function, version="4.0.0") assert "introduced in v4.0.0" in decorated.__doc__ assert "Experimental:" in decorated.__doc__ assert decorated() == "callable_version" ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_nesting_1/schema.json: -------------------------------------------------------------------------------- ```json { "description": "The AI program is designed to process hierarchical data structures with multiple levels of nesting. The program will take a deeply nested input structure representing a complex dataset, perform specific transformations, validations, and computations, and then produce an equally complex nested output structure. The program is suitable for applications that require detailed data processing, such as multi-level data aggregation, hierarchical data validation, and nested data transformation.", "properties": { "level1": { "properties": { "level2": { "properties": { "level3": { "properties": { "level4": { "properties": { "level5": { "properties": { "field1": { "description": "A string field at the deepest level", "type": "string" }, "field2": { "description": "A numerical field at the deepest level", "type": "number" } }, "required": ["field1", "field2"], "type": "object" } }, "required": ["level5"], "type": "object" } }, "required": ["level4"], "type": "object" } }, "required": ["level3"], "type": "object" } }, "required": ["level2"], "type": "object" }, "resultLevel1": { "properties": { "resultLevel2": { "properties": { "resultLevel3": { "properties": { "resultLevel4": { "properties": { "resultLevel5": { "properties": { "outputField1": { "description": "A boolean field indicating success or failure", "type": "boolean" }, "outputField2": { "description": "An array of strings representing messages", "items": { "type": "string" }, "type": "array" } }, "required": ["outputField1", "outputField2"], "type": "object" } }, "required": ["resultLevel5"], "type": "object" } }, "required": ["resultLevel4"], "type": "object" } }, "required": ["resultLevel3"], "type": "object" } }, "required": ["resultLevel2"], "type": "object" } }, "required": ["level1", "resultLevel1"], "type": "object" } ``` -------------------------------------------------------------------------------- /dspy/predict/parallel.py: -------------------------------------------------------------------------------- ```python import threading from typing import Any from dspy.dsp.utils.settings import settings from dspy.primitives.example import Example from dspy.utils.parallelizer import ParallelExecutor class Parallel: def __init__( self, num_threads: int | None = None, max_errors: int | None = None, access_examples: bool = True, return_failed_examples: bool = False, provide_traceback: bool | None = None, disable_progress_bar: bool = False, ): super().__init__() self.num_threads = num_threads or settings.num_threads self.max_errors = settings.max_errors if max_errors is None else max_errors self.access_examples = access_examples self.return_failed_examples = return_failed_examples self.provide_traceback = provide_traceback self.disable_progress_bar = disable_progress_bar self.error_count = 0 self.error_lock = threading.Lock() self.cancel_jobs = threading.Event() self.failed_examples = [] self.exceptions = [] def forward(self, exec_pairs: list[tuple[Any, Example]], num_threads: int | None = None) -> list[Any]: num_threads = num_threads if num_threads is not None else self.num_threads executor = ParallelExecutor( num_threads=num_threads, max_errors=self.max_errors, provide_traceback=self.provide_traceback, disable_progress_bar=self.disable_progress_bar, ) def process_pair(pair): result = None module, example = pair if isinstance(example, Example): if self.access_examples: result = module(**example.inputs()) else: result = module(example) elif isinstance(example, dict): result = module(**example) elif isinstance(example, list) and module.__class__.__name__ == "Parallel": result = module(example) elif isinstance(example, tuple): result = module(*example) else: raise ValueError( f"Invalid example type: {type(example)}, only supported types are Example, dict, list and tuple" ) return result # Execute the processing function over the execution pairs results = executor.execute(process_pair, exec_pairs) # Populate failed examples and exceptions from the executor if self.return_failed_examples: for failed_idx in executor.failed_indices: if failed_idx < len(exec_pairs): _, original_example = exec_pairs[failed_idx] self.failed_examples.append(original_example) if exception := executor.exceptions_map.get(failed_idx): self.exceptions.append(exception) return results, self.failed_examples, self.exceptions else: return results def __call__(self, *args: Any, **kwargs: Any) -> Any: return self.forward(*args, **kwargs) ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_nesting_2/program.py: -------------------------------------------------------------------------------- ```python ### Input models ### from datetime import datetime from pydantic import BaseModel, Field class Details(BaseModel): value: str = Field(..., description="Customer's value category") age: int = Field(..., description="Customer's age") class Customer(BaseModel): customer_id: str = Field(..., description="Unique identifier for the customer") customer_type: bool = Field(..., description="Indicates if the customer is a premium member") details: Details class Details1(BaseModel): value: float = Field(..., description="Monetary value of the transaction") timestamp: datetime = Field(..., description="Timestamp of the transaction") class Transaction(BaseModel): transaction_id: str = Field(..., description="Unique identifier for the transaction") amount: float = Field(..., description="Transaction amount") details: Details1 class ProgramInputs(BaseModel): customer: Customer transaction: Transaction ### Output models ### from datetime import datetime from pydantic import BaseModel, Field class CustomerType(BaseModel): is_premium: bool = Field(..., description="Indicates if the customer is a premium member") category: str = Field(..., description="Customer's membership category") class CustomerSummary(BaseModel): customer_id: str = Field(..., description="Unique identifier for the customer") customer_type: CustomerType value: str = Field(..., description="Customer's value category") class Details(BaseModel): value: float = Field(..., description="Monetary value of the transaction") timestamp: datetime = Field(..., description="Timestamp of the transaction") class TransactionSummary(BaseModel): transaction_id: str = Field(..., description="Unique identifier for the transaction") total_amount: float = Field(..., description="Total transaction amount") details: Details class ProgramOutputs(BaseModel): customer_summary: CustomerSummary transaction_summary: TransactionSummary ### Program definition ### import dspy class BaseSignature(dspy.Signature): """ This AI program is designed to process complex datasets with multiple nested input fields and produce structured output fields. It can handle cases where nested fields have the same name but different types, ensuring that the data is accurately processed and transformed. The program is particularly useful for applications that require detailed data analysis, integration of multiple data sources, and handling of heterogeneous data types. """ program_signature = BaseSignature for input_field_name, input_field in ProgramInputs.model_fields.items(): program_signature = program_signature.append( name=input_field_name, field=dspy.InputField(description=input_field.description), type_=input_field.annotation, ) for output_field_name, output_field in ProgramOutputs.model_fields.items(): program_signature = program_signature.append( name=output_field_name, field=dspy.OutputField(description=input_field.description), type_=output_field.annotation, ) program = dspy.ChainOfThought(program_signature) ``` -------------------------------------------------------------------------------- /dspy/teleprompt/teleprompt_optuna.py: -------------------------------------------------------------------------------- ```python from dspy.evaluate.evaluate import Evaluate from dspy.teleprompt.teleprompt import Teleprompter from .bootstrap import BootstrapFewShot class BootstrapFewShotWithOptuna(Teleprompter): def __init__( self, metric, teacher_settings=None, max_bootstrapped_demos=4, max_labeled_demos=16, max_rounds=1, num_candidate_programs=16, num_threads=None, ): self.metric = metric self.teacher_settings = teacher_settings or {} self.max_rounds = max_rounds self.num_threads = num_threads self.min_num_samples = 1 self.max_num_samples = max_bootstrapped_demos self.num_candidate_sets = num_candidate_programs # self.max_num_traces = 1 + int(max_bootstrapped_demos / 2.0 * self.num_candidate_sets) # Semi-hacky way to get the parent class's _bootstrap function to stop early. # self.max_bootstrapped_demos = self.max_num_traces self.max_labeled_demos = max_labeled_demos print("Going to sample between", self.min_num_samples, "and", self.max_num_samples, "traces per predictor.") # print("Going to sample", self.max_num_traces, "traces in total.") print("Will attempt to train", self.num_candidate_sets, "candidate sets.") def objective(self, trial): program2 = self.student.reset_copy() for (name, compiled_predictor), (_, program2_predictor) in zip( self.compiled_teleprompter.named_predictors(), program2.named_predictors(), strict=False, ): all_demos = compiled_predictor.demos demo_index = trial.suggest_int(f"demo_index_for_{name}", 0, len(all_demos) - 1) selected_demo = dict(all_demos[demo_index]) program2_predictor.demos = [selected_demo] evaluate = Evaluate( devset=self.valset, metric=self.metric, num_threads=self.num_threads, display_table=False, display_progress=True, ) result = evaluate(program2) trial.set_user_attr("program", program2) return result.score def compile(self, student, *, teacher=None, max_demos, trainset, valset=None): import optuna self.trainset = trainset self.valset = valset or trainset self.student = student.reset_copy() self.teacher = teacher.deepcopy() if teacher is not None else student.reset_copy() teleprompter_optimize = BootstrapFewShot( metric=self.metric, max_bootstrapped_demos=max_demos, max_labeled_demos=self.max_labeled_demos, teacher_settings=self.teacher_settings, max_rounds=self.max_rounds, ) self.compiled_teleprompter = teleprompter_optimize.compile( self.student, teacher=self.teacher, trainset=self.trainset, ) study = optuna.create_study(direction="maximize") study.optimize(self.objective, n_trials=self.num_candidate_sets) best_program = study.trials[study.best_trial.number].user_attrs["program"] print("Best score:", study.best_value) print("Best program:", best_program) return best_program ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_many_types_1/inputs/input2.json: -------------------------------------------------------------------------------- ```json { "assertions": [ "The 'processedTupleField' should be an tuple with exactly two elements: the first element being a string and the second element being a number.", "The 'processedEnumField' should be one of the predefined options: 'option1', 'option2', or 'option3'.", "The 'processedDatetimeField' should be a date-time", "The 'processedLiteralField' should be the enum 'literalValue'.", "The 'processedObjectField' should be an object containing 'subField1' as a string, 'subField2' as a number, and an 'additionalField' as a boolean.", "The 'processedNestedObjectField' should be an object containing 'tupleField' as a tuple with a string and float, 'enumField' as one of the predefined options (option1, option2, or option3), 'datetimeField' as a 'date-time' object, 'literalField' as the string 'literalValue', and an 'additionalField' as a boolean." ], "input": { "datetimeField": "2023-10-01T12:00:00Z", "enumField": "option1", "literalField": "literalValue", "nestedObjectField": { "datetimeField": "2023-11-01T12:00:00Z", "enumField": "option2", "literalField": "literalValue", "tupleField": ["nestedString", 789] }, "objectField": { "subField1": "Patriotism is a feeling of love, devotion, and sense of attachment to one's country. This attachment can be a combination of many different feelings relating to one's homeland, including ethnic, cultural, political or historical aspects. It encompasses a set of concepts closely related to those of nationalism. In the context of patriotism, people may express their feelings in a variety of ways, including supporting their country's interests and policies, celebrating national holidays, and participating in civic activities. Patriotism often involves a sense of pride in one's country and a willingness to defend it against any threats. It can also include a commitment to improving the country and making it a better place for future generations. The concept of patriotism is often linked with the idea of national identity, which is the sense of a nation as a cohesive whole, as represented by distinctive traditions, culture, language, and politics. Patriots may feel a strong sense of loyalty and duty to their country, and they may take actions to support and protect it. However, it is important to note that patriotism can also be a complex and sometimes controversial concept. While it can inspire positive actions and a sense of community, it can also lead to exclusionary or aggressive behaviors if taken to an extreme. In some cases, excessive patriotism can result in nationalism, which can lead to conflicts with other nations or groups. Despite these potential issues, many people view patriotism as a positive force that can unite people and inspire them to work together for the common good. It can foster a sense of belonging and purpose, and it can motivate individuals to contribute to the well-being of their country. Overall, patriotism is a multifaceted and deeply personal sentiment that can manifest in many different ways, depending on an individual's experiences, beliefs, and values.", "subField2": 456 }, "tupleField": ["exampleString", 123] } } ``` -------------------------------------------------------------------------------- /tests/test_utils/server/litellm_server.py: -------------------------------------------------------------------------------- ```python import json import os from typing import AsyncIterator, Iterator import litellm from litellm import CustomLLM from litellm.types.utils import GenericStreamingChunk LITELLM_TEST_SERVER_LOG_FILE_PATH_ENV_VAR = "LITELLM_TEST_SERVER_LOG_FILE_PATH" class DSPyTestModel(CustomLLM): def completion(self, *args, **kwargs) -> litellm.ModelResponse: _append_request_to_log_file(kwargs) return _get_mock_llm_response(kwargs) async def acompletion(self, *args, **kwargs) -> litellm.ModelResponse: _append_request_to_log_file(kwargs) return _get_mock_llm_response(kwargs) def streaming(self, *args, **kwargs) -> Iterator[GenericStreamingChunk]: generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": '{"output_text": "Hello!"}', "tool_use": None, "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}, } return generic_streaming_chunk # type: ignore async def astreaming(self, *args, **kwargs) -> AsyncIterator[GenericStreamingChunk]: generic_streaming_chunk: GenericStreamingChunk = { "finish_reason": "stop", "index": 0, "is_finished": True, "text": '{"output_text": "Hello!"}', "tool_use": None, "usage": {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0}, } yield generic_streaming_chunk def _get_mock_llm_response(request_kwargs): _throw_exception_based_on_content_if_applicable(request_kwargs) return litellm.completion( model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hello world"}], usage={"prompt_tokens": 10, "completion_tokens": 10, "total_tokens": 20}, mock_response="Hi!", ) def _throw_exception_based_on_content_if_applicable(request_kwargs): """ Throws an exception, for testing purposes, based on the content of the request message. """ model = request_kwargs["model"] content = request_kwargs["messages"][0]["content"] if "429" in content: raise litellm.RateLimitError(message="Rate limit exceeded", llm_provider=None, model=model) elif "504" in content: raise litellm.Timeout("Request timed out!", llm_provider=None, model=model) elif "400" in content: raise litellm.BadRequestError(message="Bad request", llm_provider=None, model=model) elif "401" in content: raise litellm.AuthenticationError(message="Authentication error", llm_provider=None, model=model) def _append_request_to_log_file(completion_kwargs): log_file_path = os.environ.get(LITELLM_TEST_SERVER_LOG_FILE_PATH_ENV_VAR) if log_file_path is None: raise ValueError( "Server logs file path is not defined! Please set the path using the" + f" {LITELLM_TEST_SERVER_LOG_FILE_PATH_ENV_VAR} environment variable." ) with open(log_file_path, "a") as f: log_blob = ( { "model": completion_kwargs["model"], "messages": completion_kwargs["messages"], }, ) json.dump(log_blob, f) f.write("\n") dspy_test_model = DSPyTestModel() ``` -------------------------------------------------------------------------------- /docs/docs/api/modules/CodeAct.md: -------------------------------------------------------------------------------- ```markdown # dspy.CodeAct <!-- START_API_REF --> ::: dspy.CodeAct handler: python options: members: - __call__ - batch - deepcopy - dump_state - get_lm - inspect_history - load - load_state - map_named_predictors - named_parameters - named_predictors - named_sub_modules - parameters - predictors - reset_copy - save - set_lm show_source: true show_root_heading: true heading_level: 2 docstring_style: google show_root_full_path: true show_object_full_path: false separate_signature: false inherited_members: true <!-- END_API_REF --> # CodeAct CodeAct is a DSPy module that combines code generation with tool execution to solve problems. It generates Python code snippets that use provided tools and the Python standard library to accomplish tasks. ## Basic Usage Here's a simple example of using CodeAct: ```python import dspy from dspy.predict import CodeAct # Define a simple tool function def factorial(n: int) -> int: """Calculate the factorial of a number.""" if n == 1: return 1 return n * factorial(n-1) # Create a CodeAct instance act = CodeAct("n->factorial_result", tools=[factorial]) # Use the CodeAct instance result = act(n=5) print(result) # Will calculate factorial(5) = 120 ``` ## How It Works CodeAct operates in an iterative manner: 1. Takes input parameters and available tools 2. Generates Python code snippets that use these tools 3. Executes the code using a Python sandbox 4. Collects the output and determines if the task is complete 5. Answer the original question based on the collected information ## ⚠️ Limitations ### Only accepts pure functions as tools (no callable objects) The following example does not work due to the usage of a callable object. ```python # ❌ NG class Add(): def __call__(self, a: int, b: int): return a + b dspy.CodeAct("question -> answer", tools=[Add()]) ``` ### External libraries cannot be used The following example does not work due to the usage of the external library `numpy`. ```python # ❌ NG import numpy as np def exp(i: int): return np.exp(i) dspy.CodeAct("question -> answer", tools=[exp]) ``` ### All dependent functions need to be passed to `CodeAct` Functions that depend on other functions or classes not passed to `CodeAct` cannot be used. The following example does not work because the tool functions depend on other functions or classes that are not passed to `CodeAct`, such as `Profile` or `secret_function`. ```python # ❌ NG from pydantic import BaseModel class Profile(BaseModel): name: str age: int def age(profile: Profile): return def parent_function(): print("Hi!") def child_function(): parent_function() dspy.CodeAct("question -> answer", tools=[age, child_function]) ``` Instead, the following example works since all necessary tool functions are passed to `CodeAct`: ```python # ✅ OK def parent_function(): print("Hi!") def child_function(): parent_function() dspy.CodeAct("question -> answer", tools=[parent_function, child_function]) ``` ``` -------------------------------------------------------------------------------- /dspy/datasets/hotpotqa.py: -------------------------------------------------------------------------------- ```python import random from dspy.datasets.dataset import Dataset class HotPotQA(Dataset): def __init__( self, *args, only_hard_examples=True, keep_details="dev_titles", unofficial_dev=True, **kwargs, ) -> None: super().__init__(*args, **kwargs) assert only_hard_examples, ( "Care must be taken when adding support for easy examples." "Dev must be all hard to match official dev, but training can be flexible." ) from datasets import load_dataset hf_official_train = load_dataset("hotpot_qa", "fullwiki", split="train") hf_official_dev = load_dataset("hotpot_qa", "fullwiki", split="validation") official_train = [] for raw_example in hf_official_train: if raw_example["level"] == "hard": if keep_details is True: keys = ["id", "question", "answer", "type", "supporting_facts", "context"] elif keep_details == "dev_titles": keys = ["question", "answer", "supporting_facts"] else: keys = ["question", "answer"] example = {k: raw_example[k] for k in keys} if "supporting_facts" in example: example["gold_titles"] = set(example["supporting_facts"]["title"]) del example["supporting_facts"] official_train.append(example) rng = random.Random(0) rng.shuffle(official_train) self._train = official_train[: len(official_train) * 75 // 100] if unofficial_dev: self._dev = official_train[len(official_train) * 75 // 100 :] else: self._dev = None for example in self._train: if keep_details == "dev_titles": del example["gold_titles"] test = [] for raw_example in hf_official_dev: assert raw_example["level"] == "hard" example = {k: raw_example[k] for k in ["id", "question", "answer", "type", "supporting_facts"]} if "supporting_facts" in example: example["gold_titles"] = set(example["supporting_facts"]["title"]) del example["supporting_facts"] test.append(example) self._test = test if __name__ == "__main__": from dspy.dsp.utils import dotdict data_args = dotdict(train_seed=1, train_size=16, eval_seed=2023, dev_size=200 * 5, test_size=0) dataset = HotPotQA(**data_args) print(dataset) print(dataset.train[0].question) print(dataset.train[15].question) print(len(dataset.train), len(dataset.dev), len(dataset.test)) print(dataset.dev[0].question) print(dataset.dev[340].question) print(dataset.dev[937].question) """ What was the population of the city where Woodward Avenue ends in 2010? Where did the star , who is also an executive producer, of the Mick begin her carrer? 16 1000 0 Both London and German have seen attacks during war, there was one specific type of attack that Germany called the blitz, what did London call a similar attack? Pre-Madonna was a collection of demos by the singer who was a leading presence during the emergence of what network? Alan Mills composed the classic folk song that tells the story of what? """ ``` -------------------------------------------------------------------------------- /dspy/predict/best_of_n.py: -------------------------------------------------------------------------------- ```python from typing import Callable import dspy from dspy.predict.predict import Module, Prediction class BestOfN(Module): def __init__( self, module: Module, N: int, # noqa: N803 reward_fn: Callable[[dict, Prediction], float], threshold: float, fail_count: int | None = None, ): """ Runs a module up to `N` times with different rollout IDs at `temperature=1.0` and returns the best prediction out of `N` attempts or the first prediction that passes the `threshold`. Args: module (Module): The module to run. N (int): The number of times to run the module. reward_fn (Callable[[dict, Prediction], float]): The reward function which takes in the args passed to the module, the resulting prediction, and returns a scalar reward. threshold (float): The threshold for the reward function. fail_count (Optional[int], optional): The number of times the module can fail before raising an error. Defaults to N if not provided. Example: ```python import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) # Define a QA module with chain of thought qa = dspy.ChainOfThought("question -> answer") # Define a reward function that checks for one-word answers def one_word_answer(args, pred): return 1.0 if len(pred.answer.split()) == 1 else 0.0 # Create a refined module that tries up to 3 times best_of_3 = dspy.BestOfN(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0) # Use the refined module result = best_of_3(question="What is the capital of Belgium?").answer # Returns: Brussels ``` """ self.module = module self.reward_fn = lambda *args: reward_fn(*args) # to prevent this from becoming a parameter self.threshold = threshold self.N = N self.fail_count = fail_count or N # default to N if fail_count is not provided def forward(self, **kwargs): lm = self.module.get_lm() or dspy.settings.lm start = lm.kwargs.get("rollout_id", 0) rollout_ids = [start + i for i in range(self.N)] best_pred, best_trace, best_reward = None, None, -float("inf") for idx, rid in enumerate(rollout_ids): lm_ = lm.copy(rollout_id=rid, temperature=1.0) mod = self.module.deepcopy() mod.set_lm(lm_) try: with dspy.context(trace=[]): pred = mod(**kwargs) trace = dspy.settings.trace.copy() # NOTE: Not including the trace of reward_fn. reward = self.reward_fn(kwargs, pred) if reward > best_reward: best_reward, best_pred, best_trace = reward, pred, trace if reward >= self.threshold: break except Exception as e: print(f"BestOfN: Attempt {idx + 1} failed with rollout id {rid}: {e}") if idx > self.fail_count: raise e self.fail_count -= 1 if best_trace: dspy.settings.trace.extend(best_trace) return best_pred ``` -------------------------------------------------------------------------------- /dspy/clients/provider.py: -------------------------------------------------------------------------------- ```python from abc import abstractmethod from concurrent.futures import Future from threading import Thread from typing import TYPE_CHECKING, Any from dspy.clients.utils_finetune import MultiGPUConfig, TrainDataFormat if TYPE_CHECKING: from dspy.clients.lm import LM class TrainingJob(Future): def __init__( self, thread: Thread | None = None, model: str | None = None, train_data: list[dict[str, Any]] | None = None, train_data_format: TrainDataFormat | None = None, train_kwargs: dict[str, Any] | None = None, ): self.thread = thread self.model = model self.train_data = train_data self.train_data_format = train_data_format self.train_kwargs = train_kwargs or {} super().__init__() # Subclasses should override the cancel method to cancel the job; then call # the super's cancel method so that the future can be cancelled. def cancel(self): super().cancel() @abstractmethod def status(self): raise NotImplementedError class ReinforceJob: def __init__(self, lm: "LM", train_kwargs: dict[str, Any] | None = None, gpu_config: MultiGPUConfig = MultiGPUConfig(num_inference_gpus=1, num_training_gpus=1)): self.lm = lm self.train_kwargs = train_kwargs or {} self.gpu_config = gpu_config self.checkpoints = {} self.last_checkpoint = None self.gpu_config = gpu_config @abstractmethod def initialize(self): raise NotImplementedError @abstractmethod def step(self, train_data: list[dict[str, Any]], train_data_format: TrainDataFormat | str | None = None): raise NotImplementedError @abstractmethod def terminate(self): raise NotImplementedError @abstractmethod def update_model(self): raise NotImplementedError @abstractmethod def save_checkpoint(self, checkpoint_name: str): raise NotImplementedError def cancel(self): raise NotImplementedError def status(self): raise NotImplementedError class Provider: def __init__(self): self.finetunable = False self.reinforceable = False self.TrainingJob = TrainingJob self.ReinforceJob = ReinforceJob @staticmethod def is_provider_model(model: str) -> bool: # Subclasses should actually check whether a model is supported if they # want to have the model provider auto-discovered. return False @staticmethod def launch(lm: "LM", launch_kwargs: dict[str, Any] | None = None): # Note that "launch" and "kill" methods might be called even if there # is a launched LM or no launched LM to kill. These methods should be # resillient to such cases. pass @staticmethod def kill(lm: "LM", launch_kwargs: dict[str, Any] | None = None): # We assume that LM.launch_kwargs dictionary will contain the necessary # information for a provider to launch and/or kill an LM. This is the # reeason why the argument here is named launch_kwargs and not # kill_kwargs. pass @staticmethod def finetune( job: TrainingJob, model: str, train_data: list[dict[str, Any]], train_data_format: TrainDataFormat | str | None, train_kwargs: dict[str, Any] | None = None, ) -> str: raise NotImplementedError ``` -------------------------------------------------------------------------------- /dspy/datasets/colors.py: -------------------------------------------------------------------------------- ```python import random from dspy.datasets.dataset import Dataset ### A bunch of colors, originally from matplotlib all_colors = [ "alice blue", "dodger blue", "light sky blue", "deep sky blue", "sky blue", "steel blue", "light steel blue", "medium blue", "navy blue", "blue", "royal blue", "cadet blue", "cornflower blue", "medium slate blue", "slate blue", "dark slate blue", "powder blue", "turquoise", "dark turquoise", "medium turquoise", "pale turquoise", "light sea green", "medium sea green", "sea green", "forest green", "green yellow", "lime green", "dark green", "green", "lime", "chartreuse", "lawn green", "yellow green", "olive green", "dark olive green", "medium spring green", "spring green", "medium aquamarine", "aquamarine", "aqua", "cyan", "dark cyan", "teal", "medium orchid", "dark orchid", "orchid", "blue violet", "violet", "dark violet", "plum", "thistle", "magenta", "fuchsia", "dark magenta", "medium purple", "purple", "rebecca purple", "dark red", "fire brick", "indian red", "light coral", "dark salmon", "light salmon", "salmon", "red", "crimson", "tomato", "coral", "orange red", "dark orange", "orange", "yellow", "gold", "light goldenrod yellow", "pale goldenrod", "goldenrod", "dark goldenrod", "beige", "moccasin", "blanched almond", "navajo white", "antique white", "bisque", "burlywood", "dark khaki", "khaki", "tan", "wheat", "snow", "floral white", "old lace", "ivory", "linen", "seashell", "honeydew", "mint cream", "azure", "lavender", "ghost white", "white smoke", "gainsboro", "light gray", "silver", "dark gray", "gray", "dim gray", "slate gray", "light slate gray", "dark slate gray", "black", "medium violet red", "pale violet red", "deep pink", "hot pink", "light pink", "pink", "peach puff", "rosy brown", "saddle brown", "sandy brown", "chocolate", "peru", "sienna", "brown", "maroon", "white", "misty rose", "lavender blush", "papaya whip", "lemon chiffon", "light yellow", "corn silk", "pale green", "light green", "olive drab", "olive", "dark sea green", ] class Colors(Dataset): def __init__(self, sort_by_suffix=True, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.sort_by_suffix = sort_by_suffix colors = self.sorted_by_suffix(all_colors) train_size = int( len(colors) * 0.6 ) # chosen to ensure that similar colors aren't repeated between train and dev train_colors, dev_colors = colors[:train_size], colors[train_size:] self._train = [{"color": color} for color in train_colors] self._dev = [{"color": color} for color in dev_colors] random.Random(0).shuffle(self._train) random.Random(0).shuffle(self._dev) def sorted_by_suffix(self, colors): if not self.sort_by_suffix: return colors if isinstance(colors[0], str): sorted_colors = sorted(colors, key=lambda x: x[::-1]) else: sorted_colors = sorted(colors, key=lambda x: x["color"][::-1]) return sorted_colors ``` -------------------------------------------------------------------------------- /dspy/utils/unbatchify.py: -------------------------------------------------------------------------------- ```python import queue import threading import time from concurrent.futures import Future from typing import Any, Callable class Unbatchify: def __init__( self, batch_fn: Callable[[list[Any]], list[Any]], max_batch_size: int = 32, max_wait_time: float = 0.1 ): """ Initializes the Unbatchify. Args: batch_fn: The batch-processing function that accepts a list of inputs and returns a list of outputs. max_batch_size: The maximum number of items to include in a batch. max_wait_time: The maximum time (in seconds) to wait for batch to fill before processing. """ self.batch_fn = batch_fn self.max_batch_size = max_batch_size self.max_wait_time = max_wait_time self.input_queue = queue.Queue() self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self._worker) self.worker_thread.daemon = True # Ensures thread exits when main program exits self.worker_thread.start() def __call__(self, input_item: Any) -> Any: """ Thread-safe function that accepts a single input and returns the corresponding output. Args: input_item: The single input item to process. Returns: The output corresponding to the input_item after processing through batch_fn. """ future = Future() self.input_queue.put((input_item, future)) try: result = future.result() except Exception as e: raise e return result def _worker(self): """ Worker thread that batches inputs and processes them using batch_fn. """ while not self.stop_event.is_set(): batch = [] futures = [] start_time = time.time() while len(batch) < self.max_batch_size and (time.time() - start_time) < self.max_wait_time: try: input_item, future = self.input_queue.get(timeout=self.max_wait_time) batch.append(input_item) futures.append(future) except queue.Empty: break if batch: try: outputs = self.batch_fn(batch) for output, future in zip(outputs, futures, strict=False): future.set_result(output) except Exception as e: for future in futures: future.set_exception(e) else: time.sleep(0.01) # Clean up remaining items when stopping while True: try: _, future = self.input_queue.get_nowait() future.set_exception(RuntimeError("Unbatchify is closed")) except queue.Empty: break print("Worker thread has been terminated.") def close(self): """ Stops the worker thread and cleans up resources. """ if not self.stop_event.is_set(): self.stop_event.set() self.worker_thread.join() def __enter__(self): """ Enables use as a context manager. """ return self def __exit__(self, exc_type, exc_value, traceback): """ Ensures resources are cleaned up when exiting context. """ self.close() def __del__(self): """ Ensures the worker thread is terminated when the object is garbage collected. """ self.close() ``` -------------------------------------------------------------------------------- /dspy/signatures/field.py: -------------------------------------------------------------------------------- ```python import pydantic # The following arguments can be used in DSPy InputField and OutputField in addition # to the standard pydantic.Field arguments. We just hope pydanitc doesn't add these, # as it would give a name clash. DSPY_FIELD_ARG_NAMES = ["desc", "prefix", "format", "parser", "__dspy_field_type"] PYDANTIC_CONSTRAINT_MAP = { "gt": "greater than: ", "ge": "greater than or equal to: ", "lt": "less than: ", "le": "less than or equal to: ", "min_length": "minimum length: ", "max_length": "maximum length: ", "multiple_of": "a multiple of the given number: ", "allow_inf_nan": "allow 'inf', '-inf', 'nan' values: ", } def move_kwargs(**kwargs): # Pydantic doesn't allow arbitrary arguments to be given to fields, # but asks that # > any extra data you want to add to the JSON schema should be passed # > as a dictionary to the json_schema_extra keyword argument. # See: https://docs.pydantic.dev/2.6/migration/#changes-to-pydanticfield pydantic_kwargs = {} json_schema_extra = {} for k, v in kwargs.items(): if k in DSPY_FIELD_ARG_NAMES: json_schema_extra[k] = v else: pydantic_kwargs[k] = v # Also copy over the pydantic "description" if no dspy "desc" is given. if "description" in kwargs and "desc" not in json_schema_extra: json_schema_extra["desc"] = kwargs["description"] constraints = _translate_pydantic_field_constraints(**kwargs) if constraints: json_schema_extra["constraints"] = constraints pydantic_kwargs["json_schema_extra"] = json_schema_extra return pydantic_kwargs def _translate_pydantic_field_constraints(**kwargs): """Extracts Pydantic constraints and translates them into human-readable format.""" constraints = [] for key, value in kwargs.items(): if key in PYDANTIC_CONSTRAINT_MAP: constraints.append(f"{PYDANTIC_CONSTRAINT_MAP[key]}{value}") return ", ".join(constraints) def InputField(**kwargs): # noqa: N802 return pydantic.Field(**move_kwargs(**kwargs, __dspy_field_type="input")) def OutputField(**kwargs): # noqa: N802 return pydantic.Field(**move_kwargs(**kwargs, __dspy_field_type="output")) def new_to_old_field(field): return (OldInputField if field.json_schema_extra["__dspy_field_type"] == "input" else OldOutputField)( prefix=field.json_schema_extra["prefix"], desc=field.json_schema_extra["desc"], format=field.json_schema_extra.get("format"), ) class OldField: """A more ergonomic datatype that infers prefix and desc if omitted.""" def __init__(self, *, prefix=None, desc=None, input, format=None): self.prefix = prefix # This can be None initially and set later self.desc = desc self.format = format def finalize(self, key, inferred_prefix): """Set the prefix if it's not provided explicitly.""" if self.prefix is None: self.prefix = inferred_prefix + ":" if self.desc is None: self.desc = f"${{{key}}}" def __repr__(self): return f"{self.__class__.__name__}(prefix={self.prefix}, desc={self.desc})" def __eq__(self, __value: object) -> bool: return self.__dict__ == __value.__dict__ class OldInputField(OldField): def __init__(self, *, prefix=None, desc=None, format=None): super().__init__(prefix=prefix, desc=desc, input=True, format=format) class OldOutputField(OldField): def __init__(self, *, prefix=None, desc=None, format=None): super().__init__(prefix=prefix, desc=desc, input=False, format=format) ``` -------------------------------------------------------------------------------- /dspy/clients/__init__.py: -------------------------------------------------------------------------------- ```python import logging import os from pathlib import Path import litellm from dspy.clients.base_lm import BaseLM, inspect_history from dspy.clients.cache import Cache from dspy.clients.embedding import Embedder from dspy.clients.lm import LM from dspy.clients.provider import Provider, TrainingJob logger = logging.getLogger(__name__) DISK_CACHE_DIR = os.environ.get("DSPY_CACHEDIR") or os.path.join(Path.home(), ".dspy_cache") DISK_CACHE_LIMIT = int(os.environ.get("DSPY_CACHE_LIMIT", 3e10)) # 30 GB default def configure_cache( enable_disk_cache: bool | None = True, enable_memory_cache: bool | None = True, disk_cache_dir: str | None = DISK_CACHE_DIR, disk_size_limit_bytes: int | None = DISK_CACHE_LIMIT, memory_max_entries: int | None = 1000000, ): """Configure the cache for DSPy. Args: enable_disk_cache: Whether to enable on-disk cache. enable_memory_cache: Whether to enable in-memory cache. disk_cache_dir: The directory to store the on-disk cache. disk_size_limit_bytes: The size limit of the on-disk cache. memory_max_entries: The maximum number of entries in the in-memory cache. """ DSPY_CACHE = Cache( enable_disk_cache, enable_memory_cache, disk_cache_dir, disk_size_limit_bytes, memory_max_entries, ) import dspy # Update the reference to point to the new cache dspy.cache = DSPY_CACHE litellm.telemetry = False litellm.cache = None # By default we disable LiteLLM cache and use DSPy on-disk cache. def _get_dspy_cache(): disk_cache_dir = os.environ.get("DSPY_CACHEDIR") or os.path.join(Path.home(), ".dspy_cache") disk_cache_limit = int(os.environ.get("DSPY_CACHE_LIMIT", 3e10)) try: _dspy_cache = Cache( enable_disk_cache=True, enable_memory_cache=True, disk_cache_dir=disk_cache_dir, disk_size_limit_bytes=disk_cache_limit, memory_max_entries=1000000, ) except Exception as e: # If cache creation fails (e.g., in AWS Lambda), create a memory-only cache logger.warning("Failed to initialize disk cache, falling back to memory-only cache: %s", e) _dspy_cache = Cache( enable_disk_cache=False, enable_memory_cache=True, disk_cache_dir=disk_cache_dir, disk_size_limit_bytes=disk_cache_limit, memory_max_entries=1000000, ) return _dspy_cache DSPY_CACHE = _get_dspy_cache() if "LITELLM_LOCAL_MODEL_COST_MAP" not in os.environ: # Accessed at run time by litellm; i.e., fine to keep after import os.environ["LITELLM_LOCAL_MODEL_COST_MAP"] = "True" def configure_litellm_logging(level: str = "ERROR"): """Configure LiteLLM logging to the specified level.""" # Litellm uses a global logger called `verbose_logger` to control all loggings. from litellm._logging import verbose_logger numeric_logging_level = getattr(logging, level) verbose_logger.setLevel(numeric_logging_level) for h in verbose_logger.handlers: h.setLevel(numeric_logging_level) def enable_litellm_logging(): litellm.suppress_debug_info = False configure_litellm_logging("DEBUG") def disable_litellm_logging(): litellm.suppress_debug_info = True configure_litellm_logging("ERROR") # By default, we disable LiteLLM logging for clean logging disable_litellm_logging() __all__ = [ "BaseLM", "LM", "Provider", "TrainingJob", "inspect_history", "Embedder", "enable_litellm_logging", "disable_litellm_logging", "configure_cache", ] ``` -------------------------------------------------------------------------------- /tests/adapters/test_adapter_utils.py: -------------------------------------------------------------------------------- ```python # ruff: noqa: UP007 from typing import Literal, Optional, Union import pytest from pydantic import BaseModel from dspy.adapters.utils import parse_value class Profile(BaseModel): name: str age: int def test_parse_value_str_annotation(): # Test basic string conversion assert parse_value(123, str) == "123" assert parse_value(True, str) == "True" assert parse_value("hello", str) == "hello" assert parse_value(None, str) == "None" assert parse_value([1, 2, 3], str) == "[1, 2, 3]" def test_parse_value_pydantic_types(): # Test with pydantic BaseModel - JSON string input json_str = '{"name": "John", "age": 30}' result = parse_value(json_str, Profile) assert isinstance(result, Profile) assert result.name == "John" assert result.age == 30 # Test with pydantic BaseModel - dict input dict_input = {"name": "Jane", "age": 25} result = parse_value(dict_input, Profile) assert isinstance(result, Profile) assert result.name == "Jane" assert result.age == 25 # Test with invalid pydantic data with pytest.raises(Exception): parse_value('{"name": "John"}', Profile) # missing required age field def test_parse_value_basic_types(): # Test int assert parse_value("42", int) == 42 assert parse_value(42, int) == 42 # Test float assert parse_value("3.14", float) == 3.14 assert parse_value(3.14, float) == 3.14 # Test bool assert parse_value("true", bool) is True assert parse_value(True, bool) is True assert parse_value("false", bool) is False # Test list assert parse_value("[1, 2, 3]", list[int]) == [1, 2, 3] assert parse_value([1, 2, 3], list[int]) == [1, 2, 3] def test_parse_value_literal(): # Test Literal type assert parse_value("option1", Literal["option1", "option2"]) == "option1" assert parse_value("option2", Literal["option1", "option2"]) == "option2" # Test Literal with quotes and prefixes assert parse_value("'option1'", Literal["option1", "option2"]) == "option1" assert parse_value('"option1"', Literal["option1", "option2"]) == "option1" assert parse_value("Literal[option1]", Literal["option1", "option2"]) == "option1" assert parse_value("str[option1]", Literal["option1", "option2"]) == "option1" # Test invalid literal with pytest.raises(ValueError): parse_value("invalid", Literal["option1", "option2"]) def test_parse_value_union(): # Test Union with None (Optional) assert parse_value("test", Optional[str]) == "test" assert parse_value("test", str | None) == "test" assert parse_value("5", int | None) == 5 assert parse_value(None, Optional[str]) is None assert parse_value("text with [placeholder]", Optional[str]) == "text with [placeholder]" assert parse_value("text with [placeholder]", str | None) == "text with [placeholder]" # Test Union fallback to str assert parse_value("fallback", Union[int, str, None]) == "fallback" assert parse_value(5, Union[int, str, None]) == 5 assert parse_value("fallback", int | str | None) == "fallback" assert parse_value(5, int | str | None) == 5 assert parse_value("text with [placeholder]", Union[int, str, None]) == "text with [placeholder]" def test_parse_value_json_repair(): # Test cases where json_repair is needed assert parse_value('{"key": "value"}', dict) == {"key": "value"} # Test ast.literal_eval fallback assert parse_value("{'key': 'value'}", dict) == {"key": "value"} # Test fallback to original value when parsing fails malformed = "not json or literal" with pytest.raises(Exception): parse_value(malformed, dict) ``` -------------------------------------------------------------------------------- /dspy/adapters/types/document.py: -------------------------------------------------------------------------------- ```python from typing import Any, Literal import pydantic from dspy.adapters.types.base_type import Type from dspy.utils.annotation import experimental @experimental(version="3.0.4") class Document(Type): """A document type for providing content that can be cited by language models. This type represents documents that can be passed to language models for citation-enabled responses, particularly useful with Anthropic's Citations API. Documents include the content and metadata that helps the LM understand and reference the source material. Attributes: data: The text content of the document title: Optional title for the document (used in citations) media_type: MIME type of the document content (defaults to "text/plain") context: Optional context information about the document Example: ```python import dspy from dspy.signatures import Signature from dspy.experimental import Document, Citations class AnswerWithSources(Signature): '''Answer questions using provided documents with citations.''' documents: list[Document] = dspy.InputField() question: str = dspy.InputField() answer: str = dspy.OutputField() citations: Citations = dspy.OutputField() # Create documents docs = [ Document( data="The Earth orbits the Sun in an elliptical path.", title="Basic Astronomy Facts" ), Document( data="Water boils at 100°C at standard atmospheric pressure.", title="Physics Fundamentals", ) ] # Use with a citation-supporting model lm = dspy.LM("anthropic/claude-opus-4-1-20250805") predictor = dspy.Predict(AnswerWithSources) result = predictor(documents=docs, question="What temperature does water boil?", lm=lm) print(result.citations) ``` """ data: str title: str | None = None media_type: Literal["text/plain", "application/pdf"] = "text/plain" context: str | None = None def format(self) -> list[dict[str, Any]]: """Format document for LM consumption. Returns: A list containing the document block in the format expected by citation-enabled language models. """ document_block = { "type": "document", "source": { "type": "text", "media_type": self.media_type, "data": self.data }, "citations": {"enabled": True} } if self.title: document_block["title"] = self.title if self.context: document_block["context"] = self.context return [document_block] @classmethod def description(cls) -> str: """Description of the document type for use in prompts.""" return ( "A document containing text content that can be referenced and cited. " "Include the full text content and optionally a title for proper referencing." ) @pydantic.model_validator(mode="before") @classmethod def validate_input(cls, data: Any): if isinstance(data, cls): return data # Handle case where data is just a string (data only) if isinstance(data, str): return {"data": data} # Handle case where data is a dict elif isinstance(data, dict): return data raise ValueError(f"Received invalid value for `Document`: {data}") def __str__(self) -> str: """String representation showing title and content length.""" title_part = f"'{self.title}': " if self.title else "" return f"Document({title_part}{len(self.data)} chars)" ``` -------------------------------------------------------------------------------- /dspy/adapters/types/code.py: -------------------------------------------------------------------------------- ```python import re from typing import Any, ClassVar import pydantic from pydantic import create_model from dspy.adapters.types.base_type import Type class Code(Type): """Code type in DSPy. This type is useful for code generation and code analysis. Example 1: dspy.Code as output type in code generation: ```python import dspy dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) class CodeGeneration(dspy.Signature): '''Generate python code to answer the question.''' question: str = dspy.InputField(description="The question to answer") code: dspy.Code["java"] = dspy.OutputField(description="The code to execute") predict = dspy.Predict(CodeGeneration) result = predict(question="Given an array, find if any of the two numbers sum up to 10") print(result.code) ``` Example 2: dspy.Code as input type in code analysis: ```python import dspy import inspect dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) class CodeAnalysis(dspy.Signature): '''Analyze the time complexity of the function.''' code: dspy.Code["python"] = dspy.InputField(description="The function to analyze") result: str = dspy.OutputField(description="The time complexity of the function") predict = dspy.Predict(CodeAnalysis) def sleepsort(x): import time for i in x: time.sleep(i) print(i) result = predict(code=inspect.getsource(sleepsort)) print(result.result) ``` """ code: str language: ClassVar[str] = "python" def format(self): return f"{self.code}" @pydantic.model_serializer() def serialize_model(self): """Override to bypass the <<CUSTOM-TYPE-START-IDENTIFIER>> and <<CUSTOM-TYPE-END-IDENTIFIER>> tags.""" return self.format() @classmethod def description(cls) -> str: return ( "Code represented in a string, specified in the `code` field. If this is an output field, the code " "field should follow the markdown code block format, e.g. \n```python\n{code}\n``` or \n```cpp\n{code}\n```" f"\nProgramming language: {cls.language}" ) @pydantic.model_validator(mode="before") @classmethod def validate_input(cls, data: Any): if isinstance(data, cls): return data if isinstance(data, str): return {"code": _filter_code(data)} if isinstance(data, dict): if "code" not in data: raise ValueError("`code` field is required for `dspy.Code`") if not isinstance(data["code"], str): raise ValueError(f"`code` field must be a string, but received type: {type(data['code'])}") return {"code": _filter_code(data["code"])} raise ValueError(f"Received invalid value for `dspy.Code`: {data}") def _filter_code(code: str) -> str: """Extract code from markdown code blocks, stripping any language identifier.""" # Case 1: format like: # ```python # {code_block} # ``` regex_pattern = r"```(?:[^\n]*)\n(.*?)```" match = re.search(regex_pattern, code, re.DOTALL) if match: return match.group(1).strip() # Case 2: ```<code>``` (no language, single-line) regex_pattern_simple = r"```(.*?)```" match = re.search(regex_pattern_simple, code, re.DOTALL) if match: return match.group(1).strip() # Fallback case return code # Patch __class_getitem__ directly on the class to support dspy.Code["python"] syntax def _code_class_getitem(cls, language): code_with_language_cls = create_model(f"{cls.__name__}_{language}", __base__=cls) code_with_language_cls.language = language return code_with_language_cls Code.__class_getitem__ = classmethod(_code_class_getitem) ``` -------------------------------------------------------------------------------- /docs/docs/deep-dive/data-handling/examples.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 1 --- !!! warning "This page is outdated and may not be fully accurate in DSPy 2.5" # Examples in DSPy Working in DSPy involves training sets, development sets, and test sets. This is like traditional ML, but you usually need far fewer labels (or zero labels) to use DSPy effectively. The core data type for data in DSPy is `Example`. You will use **Examples** to represent items in your training set and test set. DSPy **Examples** are similar to Python `dict`s but have a few useful utilities. Your DSPy modules will return values of the type `Prediction`, which is a special sub-class of `Example`. ## Creating an `Example` When you use DSPy, you will do a lot of evaluation and optimization runs. Your individual datapoints will be of type `Example`: ```python qa_pair = dspy.Example(question="This is a question?", answer="This is an answer.") print(qa_pair) print(qa_pair.question) print(qa_pair.answer) ``` **Output:** ```text Example({'question': 'This is a question?', 'answer': 'This is an answer.'}) (input_keys=None) This is a question? This is an answer. ``` Examples can have any field keys and any value types, though usually values are strings. ```text object = Example(field1=value1, field2=value2, field3=value3, ...) ``` ## Specifying Input Keys In traditional ML, there are separated "inputs" and "labels". In DSPy, the `Example` objects have a `with_inputs()` method, which can mark specific fields as inputs. (The rest are just metadata or labels.) ```python # Single Input. print(qa_pair.with_inputs("question")) # Multiple Inputs; be careful about marking your labels as inputs unless you mean it. print(qa_pair.with_inputs("question", "answer")) ``` This flexibility allows for customized tailoring of the `Example` object for different DSPy scenarios. When you call `with_inputs()`, you get a new copy of the example. The original object is kept unchanged. ## Element Access and Updation Values can be accessed using the `.`(dot) operator. You can access the value of key `name` in defined object `Example(name="John Doe", job="sleep")` through `object.name`. To access or exclude certain keys, use `inputs()` and `labels()` methods to return new Example objects containing only input or non-input keys, respectively. ```python article_summary = dspy.Example(article= "This is an article.", summary= "This is a summary.").with_inputs("article") input_key_only = article_summary.inputs() non_input_key_only = article_summary.labels() print("Example object with Input fields only:", input_key_only) print("Example object with Non-Input fields only:", non_input_key_only) ``` **Output** ``` Example object with Input fields only: Example({'article': 'This is an article.'}) (input_keys=None) Example object with Non-Input fields only: Example({'summary': 'This is a summary.'}) (input_keys=None) ``` To exclude keys, use `without()`: ```python article_summary = dspy.Example(context="This is an article.", question="This is a question?", answer="This is an answer.", rationale= "This is a rationale.").with_inputs("context", "question") print("Example object without answer & rationale keys:", article_summary.without("answer", "rationale")) ``` **Output** ``` Example object without answer & rationale keys: Example({'context': 'This is an article.', 'question': 'This is a question?'}) (input_keys=None) ``` Updating values is simply assigning a new value using the `.` operator. ```python article_summary.context = "new context" ``` ## Iterating over Example Iteration in the `Example` class also functions like a dictionary, supporting methods like `keys()`, `values()`, etc: ```python for k, v in article_summary.items(): print(f"{k} = {v}") ``` **Output** ```text context = This is an article. question = This is a question? answer = This is an answer. rationale = This is a rationale. ``` ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_nesting_2/schema.json: -------------------------------------------------------------------------------- ```json { "description": "This AI program is designed to process complex datasets with multiple nested input fields and produce structured output fields. It can handle cases where nested fields have the same name but different types, ensuring that the data is accurately processed and transformed. The program is particularly useful for applications that require detailed data analysis, integration of multiple data sources, and handling of heterogeneous data types.", "properties": { "customer": { "properties": { "customer_id": { "description": "Unique identifier for the customer", "type": "string" }, "customer_type": { "description": "Indicates if the customer is a premium member", "type": "boolean" }, "details": { "properties": { "age": { "description": "Customer's age", "type": "integer" }, "value": { "description": "Customer's value category", "type": "string" } }, "required": ["value", "age"], "type": "object" } }, "required": ["customer_id", "customer_type", "details"], "type": "object" }, "customer_summary": { "properties": { "customer_id": { "description": "Unique identifier for the customer", "type": "string" }, "customer_type": { "properties": { "category": { "description": "Customer's membership category", "type": "string" }, "is_premium": { "description": "Indicates if the customer is a premium member", "type": "boolean" } }, "required": ["is_premium", "category"], "type": "object" }, "value": { "description": "Customer's value category", "type": "string" } }, "required": ["customer_id", "customer_type", "value"], "type": "object" }, "transaction": { "properties": { "amount": { "description": "Transaction amount", "type": "number" }, "details": { "properties": { "timestamp": { "description": "Timestamp of the transaction", "format": "date-time", "type": "string" }, "value": { "description": "Monetary value of the transaction", "type": "number" } }, "required": ["value", "timestamp"], "type": "object" }, "transaction_id": { "description": "Unique identifier for the transaction", "type": "string" } }, "required": ["transaction_id", "amount", "details"], "type": "object" }, "transaction_summary": { "properties": { "details": { "properties": { "timestamp": { "description": "Timestamp of the transaction", "format": "date-time", "type": "string" }, "value": { "description": "Monetary value of the transaction", "type": "number" } }, "required": ["value", "timestamp"], "type": "object" }, "total_amount": { "description": "Total transaction amount", "type": "number" }, "transaction_id": { "description": "Unique identifier for the transaction", "type": "string" } }, "required": ["transaction_id", "total_amount", "details"], "type": "object" } }, "required": [ "customer", "transaction", "customer_summary", "transaction_summary" ], "type": "object" } ``` -------------------------------------------------------------------------------- /tests/clients/test_embedding.py: -------------------------------------------------------------------------------- ```python from unittest.mock import patch import numpy as np import pytest import dspy from dspy.clients.embedding import Embedder # Mock response format similar to litellm's embedding response. class MockEmbeddingResponse: def __init__(self, embeddings): self.data = [{"embedding": emb} for emb in embeddings] self.usage = {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} self.model = "mock_model" self.object = "list" @pytest.fixture def cache(tmp_path): original_cache = dspy.cache dspy.configure_cache(disk_cache_dir=tmp_path / ".dspy_cache") yield dspy.cache = original_cache def test_litellm_embedding(cache): model = "text-embedding-ada-002" inputs = ["hello", "world"] mock_embeddings = [ [0.1, 0.2, 0.3], # embedding for "hello" [0.4, 0.5, 0.6], # embedding for "world" ] with patch("litellm.embedding") as mock_litellm: # Configure mock to return proper response format. mock_litellm.return_value = MockEmbeddingResponse(mock_embeddings) # Create embedding instance and call it. embedding = Embedder(model, caching=True) result = embedding(inputs) # Verify litellm was called with correct parameters. # Because we disable the litellm cache, it should be called with caching=False. mock_litellm.assert_called_once_with(model=model, input=inputs, caching=False) assert len(result) == len(inputs) np.testing.assert_allclose(result, mock_embeddings) # Second call should be cached. result = embedding(inputs) assert mock_litellm.call_count == 1 np.testing.assert_allclose(result, mock_embeddings) # Disable cache should issue new calls. embedding = Embedder(model, caching=False) result = embedding(inputs) assert mock_litellm.call_count == 2 np.testing.assert_allclose(result, mock_embeddings) def test_callable_embedding(cache): inputs = ["hello", "world", "test"] expected_embeddings = [ [0.1, 0.2, 0.3], # embedding for "hello" [0.4, 0.5, 0.6], # embedding for "world" [0.7, 0.8, 0.9], # embedding for "test" ] class EmbeddingFn: def __init__(self): self.call_count = 0 def __call__(self, texts): # Simple callable that returns random embeddings. self.call_count += 1 return expected_embeddings embedding_fn = EmbeddingFn() # Create embedding instance with callable embedding = Embedder(embedding_fn) result = embedding(inputs) assert embedding_fn.call_count == 1 np.testing.assert_allclose(result, expected_embeddings) result = embedding(inputs) # The second call should be cached. assert embedding_fn.call_count == 1 np.testing.assert_allclose(result, expected_embeddings) def test_invalid_model_type(): # Test that invalid model type raises ValueError with pytest.raises(ValueError): embedding = Embedder(123) # Invalid model type embedding(["test"]) @pytest.mark.asyncio async def test_async_embedding(): model = "text-embedding-ada-002" inputs = ["hello", "world"] mock_embeddings = [ [0.1, 0.2, 0.3], # embedding for "hello" [0.4, 0.5, 0.6], # embedding for "world" ] with patch("litellm.aembedding") as mock_litellm: # Configure mock to return proper response format. mock_litellm.return_value = MockEmbeddingResponse(mock_embeddings) # Create embedding instance and call it. embedding = Embedder(model, caching=False) result = await embedding.acall(inputs) # Verify litellm was called with correct parameters. mock_litellm.assert_called_once_with(model=model, input=inputs, caching=False) assert len(result) == len(inputs) np.testing.assert_allclose(result, mock_embeddings) ``` -------------------------------------------------------------------------------- /dspy/datasets/dataset.py: -------------------------------------------------------------------------------- ```python import random import uuid from dspy import Example from dspy.dsp.utils import dotdict class Dataset: def __init__(self, train_seed=0, train_size=None, eval_seed=0, dev_size=None, test_size=None, input_keys=None): self.train_size = train_size self.train_seed = train_seed self.dev_size = dev_size self.dev_seed = eval_seed self.test_size = test_size self.test_seed = eval_seed self.input_keys = input_keys or [] self.do_shuffle = True self.name = self.__class__.__name__ def reset_seeds(self, train_seed=None, train_size=None, eval_seed=None, dev_size=None, test_size=None): self.train_size = train_size or self.train_size self.train_seed = train_seed or self.train_seed self.dev_size = dev_size or self.dev_size self.dev_seed = eval_seed or self.dev_seed self.test_size = test_size or self.test_size self.test_seed = eval_seed or self.test_seed if hasattr(self, "_train_"): del self._train_ if hasattr(self, "_dev_"): del self._dev_ if hasattr(self, "_test_"): del self._test_ @property def train(self): if not hasattr(self, "_train_"): self._train_ = self._shuffle_and_sample("train", self._train, self.train_size, self.train_seed) return self._train_ @property def dev(self): if not hasattr(self, "_dev_"): self._dev_ = self._shuffle_and_sample("dev", self._dev, self.dev_size, self.dev_seed) return self._dev_ @property def test(self): if not hasattr(self, "_test_"): self._test_ = self._shuffle_and_sample("test", self._test, self.test_size, self.test_seed) return self._test_ def _shuffle_and_sample(self, split, data, size, seed=0): data = list(data) # Shuffle the data irrespective of the requested size. base_rng = random.Random(seed) if self.do_shuffle: base_rng.shuffle(data) data = data[:size] output = [] for example in data: example_obj = Example(**example, dspy_uuid=str(uuid.uuid4()), dspy_split=split) if self.input_keys: example_obj = example_obj.with_inputs(*self.input_keys) output.append(example_obj) # TODO: NOTE: Ideally we use these uuids for dedup internally, for demos and internal train/val splits. # Now, some tasks (like convQA and Colors) have overlapping examples. Here, we should allow the user to give us # a uuid field that would respect this in some way. This means that we need a more refined concept that # uuid (each example is unique) and more like a group_uuid. return output @classmethod def prepare_by_seed( cls, train_seeds=None, train_size=16, dev_size=1000, divide_eval_per_seed=True, eval_seed=2023, **kwargs, ): train_seeds = train_seeds or [1, 2, 3, 4, 5] data_args = dotdict(train_size=train_size, eval_seed=eval_seed, dev_size=dev_size, test_size=0, **kwargs) dataset = cls(**data_args) eval_set = dataset.dev eval_sets, train_sets = [], [] examples_per_seed = dev_size // len(train_seeds) if divide_eval_per_seed else dev_size eval_offset = 0 for train_seed in train_seeds: data_args.train_seed = train_seed dataset.reset_seeds(**data_args) eval_sets.append(eval_set[eval_offset : eval_offset + examples_per_seed]) train_sets.append(dataset.train) assert len(eval_sets[-1]) == examples_per_seed, len(eval_sets[-1]) assert len(train_sets[-1]) == train_size, len(train_sets[-1]) if divide_eval_per_seed: eval_offset += examples_per_seed return dotdict(train_sets=train_sets, eval_sets=eval_sets) ``` -------------------------------------------------------------------------------- /dspy/adapters/xml_adapter.py: -------------------------------------------------------------------------------- ```python import re from typing import Any from pydantic.fields import FieldInfo from dspy.adapters.chat_adapter import ChatAdapter, FieldInfoWithName from dspy.adapters.utils import format_field_value, translate_field_type from dspy.signatures.signature import Signature from dspy.utils.callback import BaseCallback class XMLAdapter(ChatAdapter): def __init__(self, callbacks: list[BaseCallback] | None = None): super().__init__(callbacks) self.field_pattern = re.compile(r"<(?P<name>\w+)>((?P<content>.*?))</\1>", re.DOTALL) def format_field_with_value(self, fields_with_values: dict[FieldInfoWithName, Any]) -> str: output = [] for field, field_value in fields_with_values.items(): formatted = format_field_value(field_info=field.info, value=field_value) output.append(f"<{field.name}>\n{formatted}\n</{field.name}>") return "\n\n".join(output).strip() def format_field_structure(self, signature: type[Signature]) -> str: """ XMLAdapter requires input and output fields to be wrapped in XML tags like `<field_name>`. """ parts = [] parts.append("All interactions will be structured in the following way, with the appropriate values filled in.") def format_signature_fields_for_instructions(fields: dict[str, FieldInfo]): return self.format_field_with_value( fields_with_values={ FieldInfoWithName(name=field_name, info=field_info): translate_field_type(field_name, field_info) for field_name, field_info in fields.items() }, ) parts.append(format_signature_fields_for_instructions(signature.input_fields)) parts.append(format_signature_fields_for_instructions(signature.output_fields)) return "\n\n".join(parts).strip() def format_assistant_message_content( self, signature: type[Signature], outputs: dict[str, Any], missing_field_message=None, ) -> str: return self.format_field_with_value( { FieldInfoWithName(name=k, info=v): outputs.get(k, missing_field_message) for k, v in signature.output_fields.items() }, ) def user_message_output_requirements(self, signature: type[Signature]) -> str: message = "Respond with the corresponding output fields wrapped in XML tags " message += ", then ".join(f"`<{f}>`" for f in signature.output_fields) message += "." return message def parse(self, signature: type[Signature], completion: str) -> dict[str, Any]: fields = {} for match in self.field_pattern.finditer(completion): name = match.group("name") content = match.group("content").strip() if name in signature.output_fields and name not in fields: fields[name] = content # Cast values using base class parse_value helper for k, v in fields.items(): fields[k] = self._parse_field_value(signature.output_fields[k], v, completion, signature) if fields.keys() != signature.output_fields.keys(): from dspy.utils.exceptions import AdapterParseError raise AdapterParseError( adapter_name="XMLAdapter", signature=signature, lm_response=completion, parsed_result=fields, ) return fields def _parse_field_value(self, field_info, raw, completion, signature): from dspy.adapters.utils import parse_value try: return parse_value(raw, field_info.annotation) except Exception as e: from dspy.utils.exceptions import AdapterParseError raise AdapterParseError( adapter_name="XMLAdapter", signature=signature, lm_response=completion, message=f"Failed to parse field {field_info} with value {raw}: {e}", ) ``` -------------------------------------------------------------------------------- /docs/scripts/generate_api_summary.py: -------------------------------------------------------------------------------- ```python from pathlib import Path INDEX_NAME = { "models": "Models", "primitives": "Primitives", "signatures": "Signatures", "adapters": "Adapters", "modules": "Modules", "evaluation": "Evaluation", "optimizers": "Optimizers", "utils": "Utils", "tools": "Tools", "experimental": "Experimental", } def build_nav_structure(directory: Path, base_path: Path) -> dict: """Recursively build navigation structure for a directory.""" nav = {} # Get all items in current directory items = sorted(directory.iterdir()) for path in items: if path.suffix == ".md": name = path.stem nav[name] = str(path.relative_to(base_path)) elif path.is_dir() and path.name == "GEPA": nav["GEPA"] = { "2. GEPA Advanced": "api/optimizers/GEPA/GEPA_Advanced.md", "1. GEPA Overview": "api/optimizers/GEPA/overview.md", } return nav def format_nav_section(nav_dict, indent_level=2): """Convert dictionary to properly indented nav section""" lines = [] indent = " " * indent_level module_navs = [] file_navs = [] for key, value in sorted(nav_dict.items()): if isinstance(value, dict): # This is a section module_navs.append(f"{indent}- {key}:") module_navs.extend(format_nav_section(value, indent_level + 1)) else: # This is a file file_navs.append(f"{indent}- {key}: {value}") # Put submodules' nav items before file nav items. e.g., `dspy.evaluate` before `dspy.ChainOfThought` # in the nav section. lines.extend(module_navs) lines.extend(file_navs) return lines def read_mkdocs_sections(filename: str = "mkdocs.yml"): """Read and parse the mkdocs.yml file into sections.""" with open(filename, "r") as f: lines = f.readlines() nav_start = -1 theme_start = -1 # Find section boundaries for i, line in enumerate(lines): if line.strip() == "nav:": nav_start = i elif line.strip() == "theme:": theme_start = i break # Split content into sections pre_nav = lines[: nav_start + 1] # Include the 'nav:' line nav_content = [] post_theme = lines[theme_start:] # Start from 'theme:' line # Extract nav content excluding API Reference i = nav_start + 1 while i < theme_start: line = lines[i] if line.strip() == "- API Reference:": # Skip this line and all indented lines that follow i += 1 while i < theme_start and (not lines[i].strip() or lines[i].startswith(" " * 8)): i += 1 else: nav_content.append(line) i += 1 return pre_nav, nav_content, post_theme def generate_api_nav(): """Generate the API navigation structure.""" api_nav = {} api_path = Path("docs/api") for dir_path in sorted(api_path.iterdir()): if dir_path.is_dir(): category = INDEX_NAME[dir_path.name] api_nav[category] = build_nav_structure(dir_path, Path("docs")) return api_nav def main(): """Main function to generate the API documentation summary.""" # Read existing mkdocs.yml sections pre_nav, nav_content, post_theme = read_mkdocs_sections() # Generate API navigation structure api_nav = generate_api_nav() # Create API section api_section = [" - API Reference:"] api_section.append(" - API Reference: api/index.md") api_section.extend(format_nav_section(api_nav)) api_section.append("") # Add empty line before theme section # Write back to mkdocs.yml with open("mkdocs.yml", "w") as f: # Write pre-nav content f.writelines(pre_nav) # Write nav content f.writelines(nav_content) # Add API section f.write("\n".join(api_section) + "\n") # Write post-theme content f.writelines(post_theme) if __name__ == "__main__": main() ``` -------------------------------------------------------------------------------- /dspy/primitives/example.py: -------------------------------------------------------------------------------- ```python class Example: def __init__(self, base=None, **kwargs): # Internal storage and other attributes self._store = {} self._demos = [] self._input_keys = None # Initialize from a base Example if provided if base and isinstance(base, type(self)): self._store = base._store.copy() # Initialize from a dict if provided elif base and isinstance(base, dict): self._store = base.copy() # Update with provided kwargs self._store.update(kwargs) def __getattr__(self, key): if key.startswith("__") and key.endswith("__"): raise AttributeError if key in self._store: return self._store[key] raise AttributeError(f"'{type(self).__name__}' object has no attribute '{key}'") def __setattr__(self, key, value): if key.startswith("_") or key in dir(self.__class__): super().__setattr__(key, value) else: self._store[key] = value def __getitem__(self, key): return self._store[key] def __setitem__(self, key, value): self._store[key] = value def __delitem__(self, key): del self._store[key] def __contains__(self, key): return key in self._store def __len__(self): return len([k for k in self._store if not k.startswith("dspy_")]) def __repr__(self): # return f"Example({self._store})" + f" (input_keys={self._input_keys}, demos={self._demos})" d = {k: v for k, v in self._store.items() if not k.startswith("dspy_")} return f"Example({d})" + f" (input_keys={self._input_keys})" def __str__(self): return self.__repr__() def __eq__(self, other): return isinstance(other, Example) and self._store == other._store def __hash__(self): return hash(tuple(self._store.items())) def keys(self, include_dspy=False): return [k for k in self._store.keys() if not k.startswith("dspy_") or include_dspy] def values(self, include_dspy=False): return [v for k, v in self._store.items() if not k.startswith("dspy_") or include_dspy] def items(self, include_dspy=False): return [(k, v) for k, v in self._store.items() if not k.startswith("dspy_") or include_dspy] def get(self, key, default=None): return self._store.get(key, default) def with_inputs(self, *keys): copied = self.copy() copied._input_keys = set(keys) return copied def inputs(self): if self._input_keys is None: raise ValueError("Inputs have not been set for this example. Use `example.with_inputs()` to set them.") # return items that are in input_keys d = {key: self._store[key] for key in self._store if key in self._input_keys} # return type(self)(d) new_instance = type(self)(base=d) new_instance._input_keys = self._input_keys # Preserve input_keys in new instance return new_instance def labels(self): # return items that are NOT in input_keys input_keys = self.inputs().keys() d = {key: self._store[key] for key in self._store if key not in input_keys} return type(self)(d) def __iter__(self): return iter(dict(self._store)) def copy(self, **kwargs): return type(self)(base=self, **kwargs) def without(self, *keys): copied = self.copy() for key in keys: del copied[key] return copied def toDict(self): # noqa: N802 def convert_to_serializable(value): if hasattr(value, "toDict"): return value.toDict() elif isinstance(value, list): return [convert_to_serializable(item) for item in value] elif isinstance(value, dict): return {k: convert_to_serializable(v) for k, v in value.items()} else: return value serializable_store = {} for k, v in self._store.items(): serializable_store[k] = convert_to_serializable(v) return serializable_store ``` -------------------------------------------------------------------------------- /tests/signatures/test_custom_types.py: -------------------------------------------------------------------------------- ```python import pydantic import pytest import dspy from dspy import Signature def test_basic_custom_type_resolution(): """Test basic custom type resolution with both explicit and automatic mapping.""" class CustomType(pydantic.BaseModel): value: str # Custom types can be explicitly mapped explicit_sig = Signature( "input: CustomType -> output: str", custom_types={"CustomType": CustomType} ) assert explicit_sig.input_fields["input"].annotation == CustomType # Custom types can also be auto-resolved from caller's scope auto_sig = Signature("input: CustomType -> output: str") assert auto_sig.input_fields["input"].annotation == CustomType def test_type_alias_for_nested_types(): """Test using type aliases for nested types.""" class Container: class NestedType(pydantic.BaseModel): value: str NestedType = Container.NestedType alias_sig = Signature("input: str -> output: NestedType") assert alias_sig.output_fields["output"].annotation == Container.NestedType class Container2: class Query(pydantic.BaseModel): text: str class Score(pydantic.BaseModel): score: float signature = dspy.Signature("query: Container2.Query -> score: Container2.Score") assert signature.output_fields["score"].annotation == Container2.Score class GlobalCustomType(pydantic.BaseModel): """A type defined at module level for testing module-level resolution.""" value: str notes: str = "" def test_module_level_type_resolution(): """Test resolution of types defined at module level.""" # Module-level types can be auto-resolved sig = Signature("name: str -> result: GlobalCustomType") assert sig.output_fields["result"].annotation == GlobalCustomType # Create module-level nested class for testing class OuterContainer: class InnerType(pydantic.BaseModel): name: str value: int def test_recommended_patterns(): """Test recommended patterns for working with custom types in signatures.""" # PATTERN 1: Local type with auto-resolution class LocalType(pydantic.BaseModel): value: str sig1 = Signature("input: str -> output: LocalType") assert sig1.output_fields["output"].annotation == LocalType # PATTERN 2: Module-level type with auto-resolution sig2 = Signature("input: str -> output: GlobalCustomType") assert sig2.output_fields["output"].annotation == GlobalCustomType # PATTERN 3: Nested type with dot notation sig3 = Signature("input: str -> output: OuterContainer.InnerType") assert sig3.output_fields["output"].annotation == OuterContainer.InnerType # PATTERN 4: Nested type using alias InnerTypeAlias = OuterContainer.InnerType sig4 = Signature("input: str -> output: InnerTypeAlias") assert sig4.output_fields["output"].annotation == InnerTypeAlias # PATTERN 5: Nested type with dot notation sig5 = Signature("input: str -> output: OuterContainer.InnerType") assert sig5.output_fields["output"].annotation == OuterContainer.InnerType def test_expected_failure(): # InnerType DNE when not OuterContainer.InnerTypes, so this type shouldnt be resolved with pytest.raises(ValueError): Signature("input: str -> output: InnerType") def test_module_type_resolution(): class TestModule(dspy.Module): def __init__(self): super().__init__() self.predict = dspy.Predict("input: str -> output: OuterContainer.InnerType") def predict(self, input: str) -> str: return input module = TestModule() sig = module.predict.signature assert sig.output_fields["output"].annotation == OuterContainer.InnerType def test_basic_custom_type_resolution(): class CustomType(pydantic.BaseModel): value: str sig = Signature("input: CustomType -> output: str", custom_types={"CustomType": CustomType}) assert sig.input_fields["input"].annotation == CustomType sig = Signature("input: CustomType -> output: str") assert sig.input_fields["input"].annotation == CustomType ``` -------------------------------------------------------------------------------- /docs/overrides/home.html: -------------------------------------------------------------------------------- ```html {% extends "base.html" %} {% block content %} <style> .md-main__inner .md-grid { padding: 0; margin: 0; } .content-container { max-width: 100%; margin: 0; padding: 0; } .hero { text-align: center; padding: 4rem 2rem; margin: 0; background-color: #f5f6f77a; color: white; } .hero-logo { max-width: 15rem; height: auto; margin: 0 auto; } .hero-subtitle { font-size: 1.2rem; margin: 1.5rem 0; color: #e2e8f0; } .cta-button { display: inline-block; padding: 0.75rem 1.5rem; background-color: transparent; color: black; text-decoration: none; border-radius: 0.375rem; font-weight: 600; border: 2px solid black; transition: all 0.3s ease; } .cta-button:hover { background-color: white; color: black; border: 2px solid white; } .features-section { padding: 4rem 2rem; } .features-title { text-align: center; font-size: 2rem; font-weight: 700; margin-bottom: 3rem; color: #1a202c; } .features-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(300px, 1fr)); gap: 3rem; max-width: 1200px; margin: 0 auto; } .feature-card { text-align: center; padding: 1.5rem; } .feature-image { width: 10rem; height: auto; margin: 0 auto 1.5rem; } .feature-title { font-size: 1.25rem; font-weight: 700; margin-bottom: 1rem; color: #2d3748; } .feature-description { color: #4a5568; line-height: 1.5; } @media (max-width: 768px) { .hero { padding: 3rem 1rem; } .hero-logo { max-width: 10rem; } .features-grid { grid-template-columns: 1fr; gap: 2rem; } .feature-card { padding: 1rem; } } </style> <div class="content-container"> <div class="hero"> <img src="{{ 'static/img/dspy_logo.png' | url }}" alt="DSPy Logo" class="hero-logo"> <p class="hero-subtitle">Programming—not prompting—Language Models</p> <a href="{{ 'quick-start/getting-started-1' | url }}" class="cta-button">Get Started with DSPy</a> </div> <div class="features-section"> <h2 class="features-title">The Way of DSPy</h2> <div class="features-grid"> <div class="feature-card"> <img src="{{ 'static/img/optimize.png' | url }}" alt="Systematic Optimization" class="feature-image"> <h3 class="feature-title">Systematic Optimization</h3> <p class="feature-description">Choose from a range of optimizers to enhance your program. Whether it's generating refined instructions, or fine-tuning weights, DSPy's optimizers are engineered to maximize efficiency and effectiveness.</p> </div> <div class="feature-card"> <img src="{{ 'static/img/modular.png' | url }}" alt="Modular Approach" class="feature-image"> <h3 class="feature-title">Modular Approach</h3> <p class="feature-description">With DSPy, you can build your system using predefined modules, replacing intricate prompting techniques with straightforward, effective solutions.</p> </div> <div class="feature-card"> <img src="{{ 'static/img/universal_compatibility.png' | url }}" alt="Cross-LM Compatibility" class="feature-image"> <h3 class="feature-title">Cross-LM Compatibility</h3> <p class="feature-description">Whether you're working with powerhouse models like GPT-3.5 or GPT-4, or local models such as T5-base or Llama2-13b, DSPy seamlessly integrates and enhances their performance in your system.</p> </div> </div> </div> </div> {% endblock %} ``` -------------------------------------------------------------------------------- /docs/docs/api/optimizers/MIPROv2.md: -------------------------------------------------------------------------------- ```markdown # dspy.MIPROv2 `MIPROv2` (<u>M</u>ultiprompt <u>I</u>nstruction <u>PR</u>oposal <u>O</u>ptimizer Version 2) is an prompt optimizer capable of optimizing both instructions and few-shot examples jointly. It does this by bootstrapping few-shot example candidates, proposing instructions grounded in different dynamics of the task, and finding an optimized combination of these options using Bayesian Optimization. It can be used for optimizing few-shot examples & instructions jointly, or just instructions for 0-shot optimization. <!-- START_API_REF --> ::: dspy.MIPROv2 handler: python options: members: - compile - get_params show_source: true show_root_heading: true heading_level: 2 docstring_style: google show_root_full_path: true show_object_full_path: false separate_signature: false inherited_members: true ::: <!-- END_API_REF --> ## Example Usage The program below shows optimizing a math program with MIPROv2 ```python import dspy from dspy.datasets.gsm8k import GSM8K, gsm8k_metric # Import the optimizer from dspy.teleprompt import MIPROv2 # Initialize the LM lm = dspy.LM('openai/gpt-4o-mini', api_key='YOUR_OPENAI_API_KEY') dspy.configure(lm=lm) # Initialize optimizer teleprompter = MIPROv2( metric=gsm8k_metric, auto="medium", # Can choose between light, medium, and heavy optimization runs ) # Optimize program print(f"Optimizing program with MIPROv2...") gsm8k = GSM8K() optimized_program = teleprompter.compile( dspy.ChainOfThought("question -> answer"), trainset=gsm8k.train, ) # Save optimize program for future use optimized_program.save(f"optimized.json") ``` ## How `MIPROv2` works At a high level, `MIPROv2` works by creating both few-shot examples and new instructions for each predictor in your LM program, and then searching over these using Bayesian Optimization to find the best combination of these variables for your program. If you want a visual explanation check out this [twitter thread](https://x.com/michaelryan207/status/1804189184988713065). These steps are broken down in more detail below: 1) **Bootstrap Few-Shot Examples**: Randomly samples examples from your training set, and run them through your LM program. If the output from the program is correct for this example, it is kept as a valid few-shot example candidate. Otherwise, we try another example until we've curated the specified amount of few-shot example candidates. This step creates `num_candidates` sets of `max_bootstrapped_demos` bootstrapped examples and `max_labeled_demos` basic examples sampled from the training set. 2) **Propose Instruction Candidates**. The instruction proposer includes (1) a generated summary of properties of the training dataset, (2) a generated summary of your LM program's code and the specific predictor that an instruction is being generated for, (3) the previously bootstrapped few-shot examples to show reference inputs / outputs for a given predictor and (4) a randomly sampled tip for generation (i.e. "be creative", "be concise", etc.) to help explore the feature space of potential instructions. This context is provided to a `prompt_model` which writes high quality instruction candidates. 3) **Find an Optimized Combination of Few-Shot Examples & Instructions**. Finally, we use Bayesian Optimization to choose which combinations of instructions and demonstrations work best for each predictor in our program. This works by running a series of `num_trials` trials, where a new set of prompts are evaluated over our validation set at each trial. The new set of prompts are only evaluated on a minibatch of size `minibatch_size` at each trial (when `minibatch`=`True`). The best averaging set of prompts is then evaluated on the full validation set every `minibatch_full_eval_steps`. At the end of the optimization process, the LM program with the set of prompts that performed best on the full validation set is returned. For those interested in more details, more information on `MIPROv2` along with a study on `MIPROv2` compared with other DSPy optimizers can be found in [this paper](https://arxiv.org/abs/2406.11695). ``` -------------------------------------------------------------------------------- /tests/utils/test_mcp.py: -------------------------------------------------------------------------------- ```python import asyncio import importlib import pytest from dspy.utils.mcp import convert_mcp_tool if importlib.util.find_spec("mcp") is None: pytest.skip(reason="mcp is not installed", allow_module_level=True) @pytest.mark.asyncio @pytest.mark.extra async def test_convert_mcp_tool(): from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client server_params = StdioServerParameters( command="python", args=["tests/utils/resources/mcp_server.py"], env=None, ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await asyncio.wait_for(session.initialize(), timeout=5) response = await session.list_tools() # Check add add_tool = convert_mcp_tool(session, response.tools[0]) assert add_tool.name == "add" assert add_tool.desc == "Add two numbers" assert add_tool.args == {"a": {"title": "A", "type": "integer"}, "b": {"title": "B", "type": "integer"}} assert add_tool.arg_types == {"a": int, "b": int} assert add_tool.arg_desc == { "a": "No description provided. (Required)", "b": "No description provided. (Required)", } assert await add_tool.acall(a=1, b=2) == "3" # Check hello hello_tool = convert_mcp_tool(session, response.tools[1]) assert hello_tool.name == "hello" assert hello_tool.desc == "Greet people" assert hello_tool.args == {"names": {"title": "Names", "type": "array", "items": {"type": "string"}}} assert hello_tool.arg_types == {"names": list} assert hello_tool.arg_desc == {"names": "No description provided. (Required)"} assert await hello_tool.acall(names=["Bob", "Tom"]) == ["Hello, Bob!", "Hello, Tom!"] # Check error handling error_tool = convert_mcp_tool(session, response.tools[2]) assert error_tool.name == "wrong_tool" assert error_tool.desc == "This tool raises an error" with pytest.raises( RuntimeError, match="Failed to call a MCP tool: Error executing tool wrong_tool: error!" ): await error_tool.acall() # Check nested Pydantic arg nested_pydantic_tool = convert_mcp_tool(session, response.tools[3]) assert nested_pydantic_tool.name == "get_account_name" assert nested_pydantic_tool.desc == "This extracts the name from account" assert nested_pydantic_tool.args == { "account": { "title": "Account", "type": "object", "required": ["profile", "account_id"], "properties": { "profile": { "title": "Profile", "type": "object", "properties": { "name": {"title": "Name", "type": "string"}, "age": {"title": "Age", "type": "integer"}, }, "required": ["name", "age"], }, "account_id": {"title": "Account Id", "type": "string"}, }, } } account_in_json = { "profile": { "name": "Bob", "age": 20, }, "account_id": "123", } result = await nested_pydantic_tool.acall(account=account_in_json) assert result == "Bob" # Check no input parameter current_datetime tool current_datetime_tool = convert_mcp_tool(session, response.tools[4]) assert current_datetime_tool.name == "current_datetime" assert current_datetime_tool.desc == "Get the current datetime" assert current_datetime_tool.args == {} assert current_datetime_tool.arg_types == {} assert current_datetime_tool.arg_desc == {} assert await current_datetime_tool.acall() == "2025-07-23T09:10:10.0+00:00" ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/async/index.md: -------------------------------------------------------------------------------- ```markdown # Async DSPy Programming DSPy provides native support for asynchronous programming, allowing you to build more efficient and scalable applications. This guide will walk you through how to leverage async capabilities in DSPy, covering both built-in modules and custom implementations. ## Why Use Async in DSPy? Asynchronous programming in DSPy offers several benefits: - Improved performance through concurrent operations - Better resource utilization - Reduced waiting time for I/O-bound operations - Enhanced scalability for handling multiple requests ## When Should I use Sync or Async? Choosing between synchronous and asynchronous programming in DSPy depends on your specific use case. Here's a guide to help you make the right choice: Use Synchronous Programming When - You're exploring or prototyping new ideas - You're conducting research or experiments - You're building small to medium-sized applications - You need simpler, more straightforward code - You want easier debugging and error tracking Use Asynchronous Programming When: - You're building a high-throughput service (high QPS) - You're working with tools that only support async operations - You need to handle multiple concurrent requests efficiently - You're building a production service that requires high scalability ### Important Considerations While async programming offers performance benefits, it comes with some trade-offs: - More complex error handling and debugging - Potential for subtle, hard-to-track bugs - More complex code structure - Different code between ipython (Colab, Jupyter lab, Databricks notebooks, ...) and normal python runtime. We recommend starting with synchronous programming for most development scenarios and switching to async only when you have a clear need for its benefits. This approach allows you to focus on the core logic of your application before dealing with the additional complexity of async programming. ## Using Built-in Modules Asynchronously Most DSPy built-in modules support asynchronous operations through the `acall()` method. This method maintains the same interface as the synchronous `__call__` method but operates asynchronously. Here's a basic example using `dspy.Predict`: ```python import dspy import asyncio import os os.environ["OPENAI_API_KEY"] = "your_api_key" dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) predict = dspy.Predict("question->answer") async def main(): # Use acall() for async execution output = await predict.acall(question="why did a chicken cross the kitchen?") print(output) asyncio.run(main()) ``` ### Working with Async Tools DSPy's `Tool` class seamlessly integrates with async functions. When you provide an async function to `dspy.Tool`, you can execute it using `acall()`. This is particularly useful for I/O-bound operations or when working with external services. ```python import asyncio import dspy import os os.environ["OPENAI_API_KEY"] = "your_api_key" async def foo(x): # Simulate an async operation await asyncio.sleep(0.1) print(f"I get: {x}") # Create a tool from the async function tool = dspy.Tool(foo) async def main(): # Execute the tool asynchronously await tool.acall(x=2) asyncio.run(main()) ``` Note: When using `dspy.ReAct` with tools, calling `acall()` on the ReAct instance will automatically execute all tools asynchronously using their `acall()` methods. ## Creating Custom Async DSPy Modules To create your own async DSPy module, implement the `aforward()` method instead of `forward()`. This method should contain your module's async logic. Here's an example of a custom module that chains two async operations: ```python import dspy import asyncio import os os.environ["OPENAI_API_KEY"] = "your_api_key" dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) class MyModule(dspy.Module): def __init__(self): self.predict1 = dspy.ChainOfThought("question->answer") self.predict2 = dspy.ChainOfThought("answer->simplified_answer") async def aforward(self, question, **kwargs): # Execute predictions sequentially but asynchronously answer = await self.predict1.acall(question=question) return await self.predict2.acall(answer=answer) async def main(): mod = MyModule() result = await mod.acall(question="Why did a chicken cross the kitchen?") print(result) asyncio.run(main()) ``` ``` -------------------------------------------------------------------------------- /tests/reliability/complex_types/generated/test_many_types_1/schema.json: -------------------------------------------------------------------------------- ```json { "description": "The program is designed to process various data types including tuples, enums, datetime values, literals, objects, and nested objects containing these types. The program will accept inputs of these types, perform specified operations on them, and return the results. The operations could include validation, transformation, and extraction of information from these inputs.", "properties": { "datetimeField": { "desc": null, "format": "date-time", "prefix": "Datetime Field:", "type": "string" }, "enumField": { "enum": ["option1", "option2", "option3"], "type": "string" }, "literalField": { "const": "literalValue", "enum": ["literalValue"], "type": "string" }, "nestedObjectField": { "properties": { "datetimeField": { "format": "date-time", "type": "string" }, "enumField": { "enum": ["option1", "option2", "option3"], "type": "string" }, "literalField": { "const": "literalValue", "enum": ["literalValue"], "type": "string" }, "tupleField": { "items": { "anyOf": [ { "type": "string" }, { "type": "number" } ] }, "maxItems": 2, "minItems": 2, "type": "array" } }, "required": ["tupleField", "enumField", "datetimeField", "literalField"], "type": "object" }, "objectField": { "properties": { "subField1": { "type": "string" }, "subField2": { "type": "number" } }, "required": ["subField1", "subField2"], "type": "object" }, "processedDatetimeField": { "desc": null, "format": "date-time", "prefix": "Processed Datetime Field:", "type": "string" }, "processedEnumField": { "enum": ["option1", "option2", "option3"], "type": "string" }, "processedLiteralField": { "const": "literalValue", "enum": ["literalValue"], "type": "string" }, "processedNestedObjectField": { "properties": { "additionalField": { "type": "boolean" }, "datetimeField": { "format": "date-time", "type": "string" }, "enumField": { "enum": ["option1", "option2", "option3"], "type": "string" }, "literalField": { "const": "literalValue", "enum": ["literalValue"], "type": "string" }, "tupleField": { "items": { "anyOf": [ { "type": "string" }, { "type": "number" } ] }, "maxItems": 2, "minItems": 2, "type": "array" } }, "required": [ "tupleField", "enumField", "datetimeField", "literalField", "additionalField" ], "type": "object" }, "processedObjectField": { "properties": { "additionalField": { "type": "boolean" }, "subField1": { "type": "string" }, "subField2": { "type": "number" } }, "required": ["subField1", "subField2", "additionalField"], "type": "object" }, "processedTupleField": { "desc": null, "items": { "anyOf": [ { "type": "string" }, { "type": "number" } ] }, "prefix": "Processed Tuple Field:", "type": "array" }, "tupleField": { "desc": null, "items": { "anyOf": [ { "type": "string" }, { "type": "number" } ] }, "prefix": "Tuple Field:", "type": "array" } }, "required": [ "tupleField", "enumField", "datetimeField", "literalField", "objectField", "nestedObjectField", "processedTupleField", "processedEnumField", "processedDatetimeField", "processedLiteralField", "processedObjectField", "processedNestedObjectField" ], "type": "object" } ```