This is page 6 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /docs/docs/tutorials/llms_txt_generation/index.md: -------------------------------------------------------------------------------- ```markdown # Generating llms.txt for Code Documentation with DSPy This tutorial demonstrates how to use DSPy to automatically generate an `llms.txt` file for the DSPy repository itself. The `llms.txt` standard provides LLM-friendly documentation that helps AI systems better understand codebases. ## What is llms.txt? `llms.txt` is a proposed standard for providing structured, LLM-friendly documentation about a project. It typically includes: - Project overview and purpose - Key concepts and terminology - Architecture and structure - Usage examples - Important files and directories ## Building a DSPy Program for llms.txt Generation Let's create a DSPy program that analyzes a repository and generates comprehensive `llms.txt` documentation. ### Step 1: Define Our Signatures First, we'll define signatures for different aspects of documentation generation: ```python import dspy from typing import List class AnalyzeRepository(dspy.Signature): """Analyze a repository structure and identify key components.""" repo_url: str = dspy.InputField(desc="GitHub repository URL") file_tree: str = dspy.InputField(desc="Repository file structure") readme_content: str = dspy.InputField(desc="README.md content") project_purpose: str = dspy.OutputField(desc="Main purpose and goals of the project") key_concepts: list[str] = dspy.OutputField(desc="List of important concepts and terminology") architecture_overview: str = dspy.OutputField(desc="High-level architecture description") class AnalyzeCodeStructure(dspy.Signature): """Analyze code structure to identify important directories and files.""" file_tree: str = dspy.InputField(desc="Repository file structure") package_files: str = dspy.InputField(desc="Key package and configuration files") important_directories: list[str] = dspy.OutputField(desc="Key directories and their purposes") entry_points: list[str] = dspy.OutputField(desc="Main entry points and important files") development_info: str = dspy.OutputField(desc="Development setup and workflow information") class GenerateLLMsTxt(dspy.Signature): """Generate a comprehensive llms.txt file from analyzed repository information.""" project_purpose: str = dspy.InputField() key_concepts: list[str] = dspy.InputField() architecture_overview: str = dspy.InputField() important_directories: list[str] = dspy.InputField() entry_points: list[str] = dspy.InputField() development_info: str = dspy.InputField() usage_examples: str = dspy.InputField(desc="Common usage patterns and examples") llms_txt_content: str = dspy.OutputField(desc="Complete llms.txt file content following the standard format") ``` ### Step 2: Create the Repository Analyzer Module ```python class RepositoryAnalyzer(dspy.Module): def __init__(self): super().__init__() self.analyze_repo = dspy.ChainOfThought(AnalyzeRepository) self.analyze_structure = dspy.ChainOfThought(AnalyzeCodeStructure) self.generate_examples = dspy.ChainOfThought("repo_info -> usage_examples") self.generate_llms_txt = dspy.ChainOfThought(GenerateLLMsTxt) def forward(self, repo_url, file_tree, readme_content, package_files): # Analyze repository purpose and concepts repo_analysis = self.analyze_repo( repo_url=repo_url, file_tree=file_tree, readme_content=readme_content ) # Analyze code structure structure_analysis = self.analyze_structure( file_tree=file_tree, package_files=package_files ) # Generate usage examples usage_examples = self.generate_examples( repo_info=f"Purpose: {repo_analysis.project_purpose}\nConcepts: {repo_analysis.key_concepts}" ) # Generate final llms.txt llms_txt = self.generate_llms_txt( project_purpose=repo_analysis.project_purpose, key_concepts=repo_analysis.key_concepts, architecture_overview=repo_analysis.architecture_overview, important_directories=structure_analysis.important_directories, entry_points=structure_analysis.entry_points, development_info=structure_analysis.development_info, usage_examples=usage_examples.usage_examples ) return dspy.Prediction( llms_txt_content=llms_txt.llms_txt_content, analysis=repo_analysis, structure=structure_analysis ) ``` ### Step 3: Gather Repository Information Let's create helper functions to extract repository information: ```python import requests import os from pathlib import Path os.environ["GITHUB_ACCESS_TOKEN"] = "<your_access_token>" def get_github_file_tree(repo_url): """Get repository file structure from GitHub API.""" # Extract owner/repo from URL parts = repo_url.rstrip('/').split('/') owner, repo = parts[-2], parts[-1] api_url = f"https://api.github.com/repos/{owner}/{repo}/git/trees/main?recursive=1" response = requests.get(api_url, headers={ "Authorization": f"Bearer {os.environ.get('GITHUB_ACCESS_TOKEN')}" }) if response.status_code == 200: tree_data = response.json() file_paths = [item['path'] for item in tree_data['tree'] if item['type'] == 'blob'] return '\n'.join(sorted(file_paths)) else: raise Exception(f"Failed to fetch repository tree: {response.status_code}") def get_github_file_content(repo_url, file_path): """Get specific file content from GitHub.""" parts = repo_url.rstrip('/').split('/') owner, repo = parts[-2], parts[-1] api_url = f"https://api.github.com/repos/{owner}/{repo}/contents/{file_path}" response = requests.get(api_url, headers={ "Authorization": f"Bearer {os.environ.get('GITHUB_ACCESS_TOKEN')}" }) if response.status_code == 200: import base64 content = base64.b64decode(response.json()['content']).decode('utf-8') return content else: return f"Could not fetch {file_path}" def gather_repository_info(repo_url): """Gather all necessary repository information.""" file_tree = get_github_file_tree(repo_url) readme_content = get_github_file_content(repo_url, "README.md") # Get key package files package_files = [] for file_path in ["pyproject.toml", "setup.py", "requirements.txt", "package.json"]: try: content = get_github_file_content(repo_url, file_path) if "Could not fetch" not in content: package_files.append(f"=== {file_path} ===\n{content}") except: continue package_files_content = "\n\n".join(package_files) return file_tree, readme_content, package_files_content ``` ### Step 4: Configure DSPy and Generate llms.txt ```python def generate_llms_txt_for_dspy(): # Configure DSPy (use your preferred LM) lm = dspy.LM(model="gpt-4o-mini") dspy.configure(lm=lm) os.environ["OPENAI_API_KEY"] = "<YOUR OPENAI KEY>" # Initialize our analyzer analyzer = RepositoryAnalyzer() # Gather DSPy repository information repo_url = "https://github.com/stanfordnlp/dspy" file_tree, readme_content, package_files = gather_repository_info(repo_url) # Generate llms.txt result = analyzer( repo_url=repo_url, file_tree=file_tree, readme_content=readme_content, package_files=package_files ) return result # Run the generation if __name__ == "__main__": result = generate_llms_txt_for_dspy() # Save the generated llms.txt with open("llms.txt", "w") as f: f.write(result.llms_txt_content) print("Generated llms.txt file!") print("\nPreview:") print(result.llms_txt_content[:500] + "...") ``` ## Expected Output Structure The generated `llms.txt` for DSPy would follow this structure: ``` # DSPy: Programming Language Models ## Project Overview DSPy is a framework for programming—rather than prompting—language models... ## Key Concepts - **Modules**: Building blocks for LM programs - **Signatures**: Input/output specifications - **Teleprompters**: Optimization algorithms - **Predictors**: Core reasoning components ## Architecture - `/dspy/`: Main package directory - `/adapters/`: Input/output format handlers - `/clients/`: LM client interfaces - `/predict/`: Core prediction modules - `/teleprompt/`: Optimization algorithms ## Usage Examples 1. **Building a Classifier**: Using DSPy, a user can define a modular classifier that takes in text data and categorizes it into predefined classes. The user can specify the classification logic declaratively, allowing for easy adjustments and optimizations. 2. **Creating a RAG Pipeline**: A developer can implement a retrieval-augmented generation pipeline that first retrieves relevant documents based on a query and then generates a coherent response using those documents. DSPy facilitates the integration of retrieval and generation components seamlessly. 3. **Optimizing Prompts**: Users can leverage DSPy to create a system that automatically optimizes prompts for language models based on performance metrics, improving the quality of responses over time without manual intervention. 4. **Implementing Agent Loops**: A user can design an agent loop that continuously interacts with users, learns from feedback, and refines its responses, showcasing the self-improving capabilities of the DSPy framework. 5. **Compositional Code**: Developers can write compositional code that allows different modules of the AI system to interact with each other, enabling complex workflows that can be easily modified and extended. ``` The resulting `llms.txt` file provides a comprehensive, LLM-friendly overview of the DSPy repository that can help other AI systems better understand and work with the codebase. ## Next Steps - Extend the program to analyze multiple repositories - Add support for different documentation formats - Create metrics for documentation quality assessment - Build a web interface for interactive repository analysis ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/language_models.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 2 --- # Language Models The first step in any DSPy code is to set up your language model. For example, you can configure OpenAI's GPT-4o-mini as your default LM as follows. ```python linenums="1" # Authenticate via `OPENAI_API_KEY` env: import os; os.environ['OPENAI_API_KEY'] = 'here' lm = dspy.LM('openai/gpt-4o-mini') dspy.configure(lm=lm) ``` !!! info "A few different LMs" === "OpenAI" You can authenticate by setting the `OPENAI_API_KEY` env variable or passing `api_key` below. ```python linenums="1" import dspy lm = dspy.LM('openai/gpt-4o-mini', api_key='YOUR_OPENAI_API_KEY') dspy.configure(lm=lm) ``` === "Gemini (AI Studio)" You can authenticate by setting the GEMINI_API_KEY env variable or passing `api_key` below. ```python linenums="1" import dspy lm = dspy.LM('gemini/gemini-2.5-pro-preview-03-25', api_key='GEMINI_API_KEY') dspy.configure(lm=lm) ``` === "Anthropic" You can authenticate by setting the ANTHROPIC_API_KEY env variable or passing `api_key` below. ```python linenums="1" import dspy lm = dspy.LM('anthropic/claude-3-opus-20240229', api_key='YOUR_ANTHROPIC_API_KEY') dspy.configure(lm=lm) ``` === "Databricks" If you're on the Databricks platform, authentication is automatic via their SDK. If not, you can set the env variables `DATABRICKS_API_KEY` and `DATABRICKS_API_BASE`, or pass `api_key` and `api_base` below. ```python linenums="1" import dspy lm = dspy.LM('databricks/databricks-meta-llama-3-1-70b-instruct') dspy.configure(lm=lm) ``` === "Local LMs on a GPU server" First, install [SGLang](https://sgl-project.github.io/start/install.html) and launch its server with your LM. ```bash > pip install "sglang[all]" > pip install flashinfer -i https://flashinfer.ai/whl/cu121/torch2.4/ > CUDA_VISIBLE_DEVICES=0 python -m sglang.launch_server --port 7501 --model-path meta-llama/Meta-Llama-3-8B-Instruct ``` Then, connect to it from your DSPy code as an OpenAI-compatible endpoint. ```python linenums="1" lm = dspy.LM("openai/meta-llama/Meta-Llama-3-8B-Instruct", api_base="http://localhost:7501/v1", # ensure this points to your port api_key="", model_type='chat') dspy.configure(lm=lm) ``` === "Local LMs on your laptop" First, install [Ollama](https://github.com/ollama/ollama) and launch its server with your LM. ```bash > curl -fsSL https://ollama.ai/install.sh | sh > ollama run llama3.2:1b ``` Then, connect to it from your DSPy code. ```python linenums="1" import dspy lm = dspy.LM('ollama_chat/llama3.2', api_base='http://localhost:11434', api_key='') dspy.configure(lm=lm) ``` === "Other providers" In DSPy, you can use any of the dozens of [LLM providers supported by LiteLLM](https://docs.litellm.ai/docs/providers). Simply follow their instructions for which `{PROVIDER}_API_KEY` to set and how to write pass the `{provider_name}/{model_name}` to the constructor. Some examples: - `anyscale/mistralai/Mistral-7B-Instruct-v0.1`, with `ANYSCALE_API_KEY` - `together_ai/togethercomputer/llama-2-70b-chat`, with `TOGETHERAI_API_KEY` - `sagemaker/<your-endpoint-name>`, with `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_REGION_NAME` - `azure/<your_deployment_name>`, with `AZURE_API_KEY`, `AZURE_API_BASE`, `AZURE_API_VERSION`, and the optional `AZURE_AD_TOKEN` and `AZURE_API_TYPE` as environment variables. If you are initiating external models without setting environment variables, use the following: `lm = dspy.LM('azure/<your_deployment_name>', api_key = 'AZURE_API_KEY' , api_base = 'AZURE_API_BASE', api_version = 'AZURE_API_VERSION')` If your provider offers an OpenAI-compatible endpoint, just add an `openai/` prefix to your full model name. ```python linenums="1" import dspy lm = dspy.LM('openai/your-model-name', api_key='PROVIDER_API_KEY', api_base='YOUR_PROVIDER_URL') dspy.configure(lm=lm) ``` If you run into errors, please refer to the [LiteLLM Docs](https://docs.litellm.ai/docs/providers) to verify if you are using the same variable names/following the right procedure. ## Calling the LM directly. It's easy to call the `lm` you configured above directly. This gives you a unified API and lets you benefit from utilities like automatic caching. ```python linenums="1" lm("Say this is a test!", temperature=0.7) # => ['This is a test!'] lm(messages=[{"role": "user", "content": "Say this is a test!"}]) # => ['This is a test!'] ``` ## Using the LM with DSPy modules. Idiomatic DSPy involves using _modules_, which we discuss in the next guide. ```python linenums="1" # Define a module (ChainOfThought) and assign it a signature (return an answer, given a question). qa = dspy.ChainOfThought('question -> answer') # Run with the default LM configured with `dspy.configure` above. response = qa(question="How many floors are in the castle David Gregory inherited?") print(response.answer) ``` **Possible Output:** ```text The castle David Gregory inherited has 7 floors. ``` ## Using multiple LMs. You can change the default LM globally with `dspy.configure` or change it inside a block of code with `dspy.context`. !!! tip Using `dspy.configure` and `dspy.context` is thread-safe! ```python linenums="1" dspy.configure(lm=dspy.LM('openai/gpt-4o-mini')) response = qa(question="How many floors are in the castle David Gregory inherited?") print('GPT-4o-mini:', response.answer) with dspy.context(lm=dspy.LM('openai/gpt-3.5-turbo')): response = qa(question="How many floors are in the castle David Gregory inherited?") print('GPT-3.5-turbo:', response.answer) ``` **Possible Output:** ```text GPT-4o-mini: The number of floors in the castle David Gregory inherited cannot be determined with the information provided. GPT-3.5-turbo: The castle David Gregory inherited has 7 floors. ``` ## Configuring LM generation. For any LM, you can configure any of the following attributes at initialization or in each subsequent call. ```python linenums="1" gpt_4o_mini = dspy.LM('openai/gpt-4o-mini', temperature=0.9, max_tokens=3000, stop=None, cache=False) ``` By default LMs in DSPy are cached. If you repeat the same call, you will get the same outputs. But you can turn off caching by setting `cache=False`. If you want to keep caching enabled but force a new request (for example, to obtain diverse outputs), pass a unique `rollout_id` and set a non-zero `temperature` in your call. DSPy hashes both the inputs and the `rollout_id` when looking up a cache entry, so different values force a new LM request while still caching future calls with the same inputs and `rollout_id`. The ID is also recorded in `lm.history`, which makes it easy to track or compare different rollouts during experiments. Changing only the `rollout_id` while keeping `temperature=0` will not affect the LM's output. ```python linenums="1" lm("Say this is a test!", rollout_id=1, temperature=1.0) ``` You can pass these LM kwargs directly to DSPy modules as well. Supplying them at initialization sets the defaults for every call: ```python linenums="1" predict = dspy.Predict("question -> answer", rollout_id=1, temperature=1.0) ``` To override them for a single invocation, provide a ``config`` dictionary when calling the module: ```python linenums="1" predict = dspy.Predict("question -> answer") predict(question="What is 1 + 52?", config={"rollout_id": 5, "temperature": 1.0}) ``` In both cases, ``rollout_id`` is forwarded to the underlying LM, affects its caching behavior, and is stored alongside each response so you can replay or analyze specific rollouts later. ## Inspecting output and usage metadata. Every LM object maintains the history of its interactions, including inputs, outputs, token usage (and $$$ cost), and metadata. ```python linenums="1" len(lm.history) # e.g., 3 calls to the LM lm.history[-1].keys() # access the last call to the LM, with all metadata ``` **Output:** ```text dict_keys(['prompt', 'messages', 'kwargs', 'response', 'outputs', 'usage', 'cost', 'timestamp', 'uuid', 'model', 'response_model', 'model_type]) ``` ## Using the Responses API By default, DSPy calls language models (LMs) using LiteLLM's [Chat Completions API](https://docs.litellm.ai/docs/completion), which is suitable for most standard models and tasks. However, some advanced models, such as OpenAI's reasoning models (e.g., `gpt-5` or other future models), may offer improved quality or additional features when accessed via the [Responses API](https://docs.litellm.ai/docs/response_api), which is supported in DSPy. **When should you use the Responses API?** - If you are working with models that support or require the `responses` endpoint (such as OpenAI's reasoning models). - When you want to leverage enhanced reasoning, multi-turn, or richer output capabilities provided by certain models. **How to enable the Responses API in DSPy:** To enable the Responses API, just set `model_type="responses"` when creating the `dspy.LM` instance. ```python import dspy # Configure DSPy to use the Responses API for your language model dspy.settings.configure( lm=dspy.LM( "openai/gpt-5-mini", model_type="responses", temperature=1.0, max_tokens=16000, ), ) ``` Please note that not all models or providers support the Responses API, check [LiteLLM's documentation](https://docs.litellm.ai/docs/response_api) for more details. ## Advanced: Building custom LMs and writing your own Adapters. Though rarely needed, you can write custom LMs by inheriting from `dspy.BaseLM`. Another advanced layer in the DSPy ecosystem is that of _adapters_, which sit between DSPy signatures and LMs. A future version of this guide will discuss these advanced features, though you likely don't need them. ``` -------------------------------------------------------------------------------- /dspy/adapters/chat_adapter.py: -------------------------------------------------------------------------------- ```python import re import textwrap from typing import Any, NamedTuple from litellm import ContextWindowExceededError from pydantic.fields import FieldInfo from dspy.adapters.base import Adapter from dspy.adapters.utils import ( format_field_value, get_annotation_name, get_field_description_string, parse_value, translate_field_type, ) from dspy.clients.lm import LM from dspy.signatures.signature import Signature from dspy.utils.exceptions import AdapterParseError field_header_pattern = re.compile(r"\[\[ ## (\w+) ## \]\]") class FieldInfoWithName(NamedTuple): name: str info: FieldInfo class ChatAdapter(Adapter): def __call__( self, lm: LM, lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: try: return super().__call__(lm, lm_kwargs, signature, demos, inputs) except Exception as e: # fallback to JSONAdapter from dspy.adapters.json_adapter import JSONAdapter if isinstance(e, ContextWindowExceededError) or isinstance(self, JSONAdapter): # On context window exceeded error or already using JSONAdapter, we don't want to retry with a different # adapter. raise e return JSONAdapter()(lm, lm_kwargs, signature, demos, inputs) async def acall( self, lm: LM, lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: try: return await super().acall(lm, lm_kwargs, signature, demos, inputs) except Exception as e: # fallback to JSONAdapter from dspy.adapters.json_adapter import JSONAdapter if isinstance(e, ContextWindowExceededError) or isinstance(self, JSONAdapter): # On context window exceeded error or already using JSONAdapter, we don't want to retry with a different # adapter. raise e return await JSONAdapter().acall(lm, lm_kwargs, signature, demos, inputs) def format_field_description(self, signature: type[Signature]) -> str: return ( f"Your input fields are:\n{get_field_description_string(signature.input_fields)}\n" f"Your output fields are:\n{get_field_description_string(signature.output_fields)}" ) def format_field_structure(self, signature: type[Signature]) -> str: """ `ChatAdapter` requires input and output fields to be in their own sections, with section header using markers `[[ ## field_name ## ]]`. An arbitrary field `completed` ([[ ## completed ## ]]) is added to the end of the output fields section to indicate the end of the output fields. """ parts = [] parts.append("All interactions will be structured in the following way, with the appropriate values filled in.") def format_signature_fields_for_instructions(fields: dict[str, FieldInfo]): return self.format_field_with_value( fields_with_values={ FieldInfoWithName(name=field_name, info=field_info): translate_field_type(field_name, field_info) for field_name, field_info in fields.items() }, ) parts.append(format_signature_fields_for_instructions(signature.input_fields)) parts.append(format_signature_fields_for_instructions(signature.output_fields)) parts.append("[[ ## completed ## ]]\n") return "\n\n".join(parts).strip() def format_task_description(self, signature: type[Signature]) -> str: instructions = textwrap.dedent(signature.instructions) objective = ("\n" + " " * 8).join([""] + instructions.splitlines()) return f"In adhering to this structure, your objective is: {objective}" def format_user_message_content( self, signature: type[Signature], inputs: dict[str, Any], prefix: str = "", suffix: str = "", main_request: bool = False, ) -> str: messages = [prefix] for k, v in signature.input_fields.items(): if k in inputs: value = inputs.get(k) formatted_field_value = format_field_value(field_info=v, value=value) messages.append(f"[[ ## {k} ## ]]\n{formatted_field_value}") if main_request: output_requirements = self.user_message_output_requirements(signature) if output_requirements is not None: messages.append(output_requirements) messages.append(suffix) return "\n\n".join(messages).strip() def user_message_output_requirements(self, signature: type[Signature]) -> str: """Returns a simplified format reminder for the language model. In chat-based interactions, language models may lose track of the required output format as the conversation context grows longer. This method generates a concise reminder of the expected output structure that can be included in user messages. Args: signature (Type[Signature]): The DSPy signature defining the expected input/output fields. Returns: str: A simplified description of the required output format. Note: This is a more lightweight version of `format_field_structure` specifically designed for inline reminders within chat messages. """ def type_info(v): if v.annotation is not str: return f" (must be formatted as a valid Python {get_annotation_name(v.annotation)})" else: return "" message = "Respond with the corresponding output fields, starting with the field " message += ", then ".join(f"`[[ ## {f} ## ]]`{type_info(v)}" for f, v in signature.output_fields.items()) message += ", and then ending with the marker for `[[ ## completed ## ]]`." return message def format_assistant_message_content( self, signature: type[Signature], outputs: dict[str, Any], missing_field_message=None, ) -> str: assistant_message_content = self.format_field_with_value( { FieldInfoWithName(name=k, info=v): outputs.get(k, missing_field_message) for k, v in signature.output_fields.items() }, ) assistant_message_content += "\n\n[[ ## completed ## ]]\n" return assistant_message_content def parse(self, signature: type[Signature], completion: str) -> dict[str, Any]: sections = [(None, [])] for line in completion.splitlines(): match = field_header_pattern.match(line.strip()) if match: # If the header pattern is found, split the rest of the line as content header = match.group(1) remaining_content = line[match.end() :].strip() sections.append((header, [remaining_content] if remaining_content else [])) else: sections[-1][1].append(line) sections = [(k, "\n".join(v).strip()) for k, v in sections] fields = {} for k, v in sections: if (k not in fields) and (k in signature.output_fields): try: fields[k] = parse_value(v, signature.output_fields[k].annotation) except Exception as e: raise AdapterParseError( adapter_name="ChatAdapter", signature=signature, lm_response=completion, message=f"Failed to parse field {k} with value {v} from the LM response. Error message: {e}", ) if fields.keys() != signature.output_fields.keys(): raise AdapterParseError( adapter_name="ChatAdapter", signature=signature, lm_response=completion, parsed_result=fields, ) return fields def format_field_with_value(self, fields_with_values: dict[FieldInfoWithName, Any]) -> str: """ Formats the values of the specified fields according to the field's DSPy type (input or output), annotation (e.g. str, int, etc.), and the type of the value itself. Joins the formatted values into a single string, which is is a multiline string if there are multiple fields. Args: fields_with_values: A dictionary mapping information about a field to its corresponding value. Returns: The joined formatted values of the fields, represented as a string """ output = [] for field, field_value in fields_with_values.items(): formatted_field_value = format_field_value(field_info=field.info, value=field_value) output.append(f"[[ ## {field.name} ## ]]\n{formatted_field_value}") return "\n\n".join(output).strip() def format_finetune_data( self, signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], outputs: dict[str, Any], ) -> dict[str, list[Any]]: """ Format the call data into finetuning data according to the OpenAI API specifications. For the chat adapter, this means formatting the data as a list of messages, where each message is a dictionary with a "role" and "content" key. The role can be "system", "user", or "assistant". Then, the messages are wrapped in a dictionary with a "messages" key. """ system_user_messages = self.format( # returns a list of dicts with the keys "role" and "content" signature=signature, demos=demos, inputs=inputs ) assistant_message_content = self.format_assistant_message_content( # returns a string, without the role signature=signature, outputs=outputs ) assistant_message = {"role": "assistant", "content": assistant_message_content} messages = system_user_messages + [assistant_message] return {"messages": messages} ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/mem0_react_agent/index.md: -------------------------------------------------------------------------------- ```markdown # Building Memory-Enabled Agents with DSPy ReAct and Mem0 This tutorial demonstrates how to build intelligent conversational agents that can remember information across interactions using DSPy's ReAct framework combined with [Mem0](https://docs.mem0.ai/)'s memory capabilities. You'll learn to create agents that can store, retrieve, and use contextual information to provide personalized and coherent responses. ## What You'll Build By the end of this tutorial, you'll have a memory-enabled agent that can: - **Remember user preferences** and past conversations - **Store and retrieve factual information** about users and topics - **Use memory to inform decisions** and provide personalized responses - **Handle complex multi-turn conversations** with context awareness - **Manage different types of memories** (facts, preferences, experiences) ## Prerequisites - Basic understanding of DSPy and ReAct agents - Python 3.9+ installed - API keys for your preferred LLM provider ## Installation and Setup ```bash pip install dspy mem0ai ``` ## Step 1: Understanding Mem0 Integration Mem0 provides a memory layer that can store, search, and retrieve memories for AI agents. Let's start by understanding how to integrate it with DSPy: ```python import dspy from mem0 import Memory import os from typing import List, Dict, Any, Optional from datetime import datetime # Configure environment os.environ["OPENAI_API_KEY"] = "your-openai-api-key" # Initialize Mem0 memory system config = { "llm": { "provider": "openai", "config": { "model": "gpt-4o-mini", "temperature": 0.1 } }, "embedder": { "provider": "openai", "config": { "model": "text-embedding-3-small" } } } ``` ## Step 2: Create Memory-Aware Tools Let's create tools that can interact with the memory system: ```python import datetime class MemoryTools: """Tools for interacting with the Mem0 memory system.""" def __init__(self, memory: Memory): self.memory = memory def store_memory(self, content: str, user_id: str = "default_user") -> str: """Store information in memory.""" try: self.memory.add(content, user_id=user_id) return f"Stored memory: {content}" except Exception as e: return f"Error storing memory: {str(e)}" def search_memories(self, query: str, user_id: str = "default_user", limit: int = 5) -> str: """Search for relevant memories.""" try: results = self.memory.search(query, user_id=user_id, limit=limit) if not results: return "No relevant memories found." memory_text = "Relevant memories found:\n" for i, result in enumerate(results["results"]): memory_text += f"{i}. {result['memory']}\n" return memory_text except Exception as e: return f"Error searching memories: {str(e)}" def get_all_memories(self, user_id: str = "default_user") -> str: """Get all memories for a user.""" try: results = self.memory.get_all(user_id=user_id) if not results: return "No memories found for this user." memory_text = "All memories for user:\n" for i, result in enumerate(results["results"]): memory_text += f"{i}. {result['memory']}\n" return memory_text except Exception as e: return f"Error retrieving memories: {str(e)}" def update_memory(self, memory_id: str, new_content: str) -> str: """Update an existing memory.""" try: self.memory.update(memory_id, new_content) return f"Updated memory with new content: {new_content}" except Exception as e: return f"Error updating memory: {str(e)}" def delete_memory(self, memory_id: str) -> str: """Delete a specific memory.""" try: self.memory.delete(memory_id) return "Memory deleted successfully." except Exception as e: return f"Error deleting memory: {str(e)}" def get_current_time() -> str: """Get the current date and time.""" return datetime.now().strftime("%Y-%m-%d %H:%M:%S") ``` ## Step 3: Build the Memory-Enhanced ReAct Agent Now let's create our main ReAct agent that can use memory: ```python class MemoryQA(dspy.Signature): """ You're a helpful assistant and have access to memory method. Whenever you answer a user's input, remember to store the information in memory so that you can use it later. """ user_input: str = dspy.InputField() response: str = dspy.OutputField() class MemoryReActAgent(dspy.Module): """A ReAct agent enhanced with Mem0 memory capabilities.""" def __init__(self, memory: Memory): super().__init__() self.memory_tools = MemoryTools(memory) # Create tools list for ReAct self.tools = [ self.memory_tools.store_memory, self.memory_tools.search_memories, self.memory_tools.get_all_memories, get_current_time, self.set_reminder, self.get_preferences, self.update_preferences, ] # Initialize ReAct with our tools self.react = dspy.ReAct( signature=MemoryQA, tools=self.tools, max_iters=6 ) def forward(self, user_input: str): """Process user input with memory-aware reasoning.""" return self.react(user_input=user_input) def set_reminder(self, reminder_text: str, date_time: str = None, user_id: str = "default_user") -> str: """Set a reminder for the user.""" reminder = f"Reminder set for {date_time}: {reminder_text}" return self.memory_tools.store_memory( f"REMINDER: {reminder}", user_id=user_id ) def get_preferences(self, category: str = "general", user_id: str = "default_user") -> str: """Get user preferences for a specific category.""" query = f"user preferences {category}" return self.memory_tools.search_memories( query=query, user_id=user_id ) def update_preferences(self, category: str, preference: str, user_id: str = "default_user") -> str: """Update user preferences.""" preference_text = f"User preference for {category}: {preference}" return self.memory_tools.store_memory( preference_text, user_id=user_id ) ``` ## Step 4: Running the Memory-Enhanced Agent Let's create a simple interface to interact with our memory-enabled agent: ```python import time def run_memory_agent_demo(): """Demonstration of memory-enhanced ReAct agent.""" # Configure DSPy lm = dspy.LM(model='openai/gpt-4o-mini') dspy.configure(lm=lm) # Initialize memory system memory = Memory.from_config(config) # Create our agent agent = MemoryReActAgent(memory) # Sample conversation demonstrating memory capabilities print("🧠 Memory-Enhanced ReAct Agent Demo") print("=" * 50) conversations = [ "Hi, I'm Alice and I love Italian food, especially pasta carbonara.", "I'm Alice. I prefer to exercise in the morning around 7 AM.", "I'm Alice. What do you remember about my food preferences?", "I'm Alice. Set a reminder for me to go grocery shopping tomorrow.", "I'm Alice. What are my exercise preferences?", "I'm Alice. I also enjoy hiking on weekends.", "I'm Alice. What do you know about me so far?" ] for i, user_input in enumerate(conversations, 1): print(f"\n📝 User: {user_input}") try: response = agent(user_input=user_input) print(f"🤖 Agent: {response.response}") time.sleep(1) except Exception as e: print(f"❌ Error: {e}") # Run the demonstration if __name__ == "__main__": run_memory_agent_demo() ``` ## Expected Output ``` 🧠 Memory-Enhanced ReAct Agent Demo ================================================== 📝 User: Hi, I'm Alice and I love Italian food, especially pasta carbonara. 🤖 Agent: Hi Alice! It's great to meet you! Pasta carbonara is a delicious choice. Do you have a favorite place to get it, or do you enjoy making it at home? 📝 User: I'm Alice. I prefer to exercise in the morning around 7 AM. 🤖 Agent: Nice to meet you, Alice! I've noted that you prefer to exercise in the morning around 7 AM. If you have any questions or need tips related to your morning workouts, feel free to ask! 📝 User: I'm Alice. What do you remember about my food preferences? 🤖 Agent: I remember that you love Italian food, and your favorite dish is pasta carbonara! 📝 User: I'm Alice. Set a reminder for me to go grocery shopping tomorrow. 🤖 Agent: I have set a reminder for you, Alice, to go grocery shopping tomorrow. 📝 User: I'm Alice. What are my exercise preferences? 🤖 Agent: Your exercise preference is to work out in the morning around 7 AM. 📝 User: I'm Alice. I also enjoy hiking on weekends. 🤖 Agent: Nice to meet you, Alice! It's great to hear that you enjoy hiking on weekends. If you need any recommendations for trails or tips for your hikes, feel free to ask! 📝 User: I'm Alice. What do you know about me so far? 🤖 Agent: So far, I know that you enjoy hiking on weekends, prefer to exercise in the morning around 7 AM, and love Italian food, especially pasta carbonara. Additionally, you have a reminder set for grocery shopping on October 7, 2023. ``` ## Next Steps - **Implement memory persistence** with databases (PostgreSQL, MongoDB) - **Add memory categorization** and tagging for better organization - **Create memory expiration policies** for data management - **Build multi-user memory isolation** for production applications - **Add memory analytics** and insights - **Integrate with vector databases** for enhanced semantic search - **Implement memory compression** for long-term storage efficiency This tutorial demonstrates how DSPy's ReAct framework can be enhanced with Mem0's memory capabilities to create intelligent, context-aware agents that can learn and remember information across interactions, making them more useful for real-world applications. ``` -------------------------------------------------------------------------------- /dspy/adapters/utils.py: -------------------------------------------------------------------------------- ```python import ast import enum import inspect import json import types from collections.abc import Mapping from typing import Any, Literal, Union, get_args, get_origin import json_repair import pydantic from pydantic import TypeAdapter from pydantic.fields import FieldInfo from dspy.adapters.types.base_type import Type as DspyType from dspy.signatures.utils import get_dspy_field_type def serialize_for_json(value: Any) -> Any: """ Formats the specified value so that it can be serialized as a JSON string. Args: value: The value to format as a JSON string. Returns: The formatted value, which is serializable as a JSON string. """ # Attempt to format the value as a JSON-compatible object using pydantic, falling back to # a string representation of the value if that fails (e.g. if the value contains an object # that pydantic doesn't recognize or can't serialize) try: return TypeAdapter(type(value)).dump_python(value, mode="json") except Exception: return str(value) def format_field_value(field_info: FieldInfo, value: Any, assume_text=True) -> str | dict: """ Formats the value of the specified field according to the field's DSPy type (input or output), annotation (e.g. str, int, etc.), and the type of the value itself. Args: field_info: Information about the field, including its DSPy field type and annotation. value: The value of the field. Returns: The formatted value of the field, represented as a string. """ string_value = None if isinstance(value, list) and field_info.annotation is str: # If the field has no special type requirements, format it as a nice numbered list for the LM. string_value = _format_input_list_field_value(value) else: jsonable_value = serialize_for_json(value) if isinstance(jsonable_value, dict) or isinstance(jsonable_value, list): string_value = json.dumps(jsonable_value, ensure_ascii=False) else: # If the value is not a Python representation of a JSON object or Array # (e.g. the value is a JSON string), just use the string representation of the value # to avoid double-quoting the JSON string (which would hurt accuracy for certain # tasks, e.g. tasks that rely on computing string length) string_value = str(jsonable_value) if assume_text: return string_value else: return {"type": "text", "text": string_value} def _get_json_schema(field_type): def move_type_to_front(d): # Move the 'type' key to the front of the dictionary, recursively, for LLM readability/adherence. if isinstance(d, Mapping): return { k: move_type_to_front(v) for k, v in sorted(d.items(), key=lambda item: (item[0] != "type", item[0])) } elif isinstance(d, list): return [move_type_to_front(item) for item in d] return d schema = pydantic.TypeAdapter(field_type).json_schema() schema = move_type_to_front(schema) return schema def translate_field_type(field_name, field_info): field_type = field_info.annotation if get_dspy_field_type(field_info) == "input" or field_type is str: desc = "" elif field_type is bool: desc = "must be True or False" elif field_type in (int, float): desc = f"must be a single {field_type.__name__} value" elif inspect.isclass(field_type) and issubclass(field_type, enum.Enum): enum_vals = "; ".join(str(member.value) for member in field_type) desc = f"must be one of: {enum_vals}" elif hasattr(field_type, "__origin__") and field_type.__origin__ is Literal: desc = ( # Strongly encourage the LM to avoid choosing values that don't appear in the # literal or returning a value of the form 'Literal[<selected_value>]' f"must exactly match (no extra characters) one of: {'; '.join([str(x) for x in field_type.__args__])}" ) else: desc = f"must adhere to the JSON schema: {json.dumps(_get_json_schema(field_type), ensure_ascii=False)}" desc = (" " * 8) + f"# note: the value you produce {desc}" if desc else "" return f"{{{field_name}}}{desc}" def find_enum_member(enum, identifier): """ Finds the enum member corresponding to the specified identifier, which may be the enum member's name or value. Args: enum: The enum to search for the member. identifier: If the enum is explicitly-valued, this is the value of the enum member to find. If the enum is auto-valued, this is the name of the enum member to find. Returns: The enum member corresponding to the specified identifier. """ # Check if the identifier is a valid enum member value *before* checking if it's a valid enum # member name, since the identifier will be a value for explicitly-valued enums. This handles # the (rare) case where an enum member value is the same as another enum member's name in # an explicitly-valued enum for member in enum: if member.value == identifier: return member # If the identifier is not a valid enum member value, check if it's a valid enum member name, # since the identifier will be a member name for auto-valued enums if identifier in enum.__members__: return enum[identifier] raise ValueError(f"{identifier} is not a valid name or value for the enum {enum.__name__}") def parse_value(value, annotation): if annotation is str: return str(value) if isinstance(annotation, enum.EnumMeta): return find_enum_member(annotation, value) origin = get_origin(annotation) if origin is Literal: allowed = get_args(annotation) if value in allowed: return value if isinstance(value, str): v = value.strip() if v.startswith(("Literal[", "str[")) and v.endswith("]"): v = v[v.find("[") + 1 : -1] if len(v) > 1 and v[0] == v[-1] and v[0] in "\"'": v = v[1:-1] if v in allowed: return v raise ValueError(f"{value!r} is not one of {allowed!r}") if not isinstance(value, str): return TypeAdapter(annotation).validate_python(value) if origin in (Union, types.UnionType) and type(None) in get_args(annotation) and str in get_args(annotation): # Handle union annotations, e.g., `str | None`, `Optional[str]`, `Union[str, int, None]`, etc. return TypeAdapter(annotation).validate_python(value) candidate = json_repair.loads(value) # json_repair.loads returns "" on failure. if candidate == "" and value != "": try: candidate = ast.literal_eval(value) except (ValueError, SyntaxError): candidate = value try: return TypeAdapter(annotation).validate_python(candidate) except pydantic.ValidationError as e: if inspect.isclass(annotation) and issubclass(annotation, DspyType): try: # For dspy.Type, try parsing from the original value in case it has a custom parser return TypeAdapter(annotation).validate_python(value) except Exception: raise e raise def get_annotation_name(annotation): origin = get_origin(annotation) args = get_args(annotation) if origin is None: if hasattr(annotation, "__name__"): return annotation.__name__ else: return str(annotation) if origin is Literal: args_str = ", ".join( _quoted_string_for_literal_type_annotation(a) if isinstance(a, str) else get_annotation_name(a) for a in args ) return f"{get_annotation_name(origin)}[{args_str}]" else: args_str = ", ".join(get_annotation_name(a) for a in args) return f"{get_annotation_name(origin)}[{args_str}]" def get_field_description_string(fields: dict) -> str: field_descriptions = [] for idx, (k, v) in enumerate(fields.items()): field_message = f"{idx + 1}. `{k}`" field_message += f" ({get_annotation_name(v.annotation)})" desc = v.json_schema_extra["desc"] if v.json_schema_extra["desc"] != f"${{{k}}}" else "" custom_types = DspyType.extract_custom_type_from_annotation(v.annotation) for custom_type in custom_types: if len(custom_type.description()) > 0: desc += f"\n Type description of {get_annotation_name(custom_type)}: {custom_type.description()}" field_message += f": {desc}" field_message += ( f"\nConstraints: {v.json_schema_extra['constraints']}" if v.json_schema_extra.get("constraints") else "" ) field_descriptions.append(field_message) return "\n".join(field_descriptions).strip() def _format_input_list_field_value(value: list[Any]) -> str: """ Formats the value of an input field of type list[Any]. Args: value: The value of the list-type input field. Returns: A string representation of the input field's list value. """ if len(value) == 0: return "N/A" if len(value) == 1: return _format_blob(value[0]) return "\n".join([f"[{idx + 1}] {_format_blob(txt)}" for idx, txt in enumerate(value)]) def _format_blob(blob: str) -> str: """ Formats the specified text blobs so that an LM can parse it correctly within a list of multiple text blobs. Args: blob: The text blob to format. Returns: The formatted text blob. """ if "\n" not in blob and "«" not in blob and "»" not in blob: return f"«{blob}»" modified_blob = blob.replace("\n", "\n ") return f"«««\n {modified_blob}\n»»»" def _quoted_string_for_literal_type_annotation(s: str) -> str: """ Return the specified string quoted for inclusion in a literal type annotation. """ has_single = "'" in s has_double = '"' in s if has_single and not has_double: # Only single quotes => enclose in double quotes return f'"{s}"' elif has_double and not has_single: # Only double quotes => enclose in single quotes return f"'{s}'" elif has_single and has_double: # Both => enclose in single quotes; escape each single quote with \' escaped = s.replace("'", "\\'") return f"'{escaped}'" else: # Neither => enclose in single quotes return f"'{s}'" ``` -------------------------------------------------------------------------------- /dspy/clients/cache.py: -------------------------------------------------------------------------------- ```python import copy import inspect import logging import threading from functools import wraps from hashlib import sha256 from typing import Any import cloudpickle import orjson import pydantic from cachetools import LRUCache from diskcache import FanoutCache logger = logging.getLogger(__name__) class Cache: """DSPy Cache `Cache` provides 2 levels of caching (in the given order): 1. In-memory cache - implemented with cachetools.LRUCache 2. On-disk cache - implemented with diskcache.FanoutCache """ def __init__( self, enable_disk_cache: bool, enable_memory_cache: bool, disk_cache_dir: str, disk_size_limit_bytes: int | None = 1024 * 1024 * 10, memory_max_entries: int | None = 1000000, ): """ Args: enable_disk_cache: Whether to enable on-disk cache. enable_memory_cache: Whether to enable in-memory cache. disk_cache_dir: The directory where the disk cache is stored. disk_size_limit_bytes: The maximum size of the disk cache (in bytes). memory_max_entries: The maximum size of the in-memory cache (in number of items). """ self.enable_disk_cache = enable_disk_cache self.enable_memory_cache = enable_memory_cache if self.enable_memory_cache: self.memory_cache = LRUCache(maxsize=memory_max_entries) else: self.memory_cache = {} if self.enable_disk_cache: self.disk_cache = FanoutCache( shards=16, timeout=10, directory=disk_cache_dir, size_limit=disk_size_limit_bytes, ) else: self.disk_cache = {} self._lock = threading.RLock() def __contains__(self, key: str) -> bool: """Check if a key is in the cache.""" return key in self.memory_cache or key in self.disk_cache def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: list[str] | None = None) -> str: """ Obtain a unique cache key for the given request dictionary by hashing its JSON representation. For request fields having types that are known to be JSON-incompatible, convert them to a JSON-serializable format before hashing. """ ignored_args_for_cache_key = ignored_args_for_cache_key or [] def transform_value(value): if isinstance(value, type) and issubclass(value, pydantic.BaseModel): return value.model_json_schema() elif isinstance(value, pydantic.BaseModel): return value.model_dump(mode="json") elif callable(value): # Try to get the source code of the callable if available import inspect try: # For regular functions, we can get the source code return f"<callable_source:{inspect.getsource(value)}>" except (TypeError, OSError): # For lambda functions or other callables where source isn't available, # use a string representation return f"<callable:{value.__name__ if hasattr(value, '__name__') else 'lambda'}>" elif isinstance(value, dict): return {k: transform_value(v) for k, v in value.items()} else: return value params = {k: transform_value(v) for k, v in request.items() if k not in ignored_args_for_cache_key} return sha256(orjson.dumps(params, option=orjson.OPT_SORT_KEYS)).hexdigest() def get(self, request: dict[str, Any], ignored_args_for_cache_key: list[str] | None = None) -> Any: if not self.enable_memory_cache and not self.enable_disk_cache: return None try: key = self.cache_key(request, ignored_args_for_cache_key) except Exception: logger.debug(f"Failed to generate cache key for request: {request}") return None if self.enable_memory_cache and key in self.memory_cache: with self._lock: response = self.memory_cache[key] elif self.enable_disk_cache and key in self.disk_cache: # Found on disk but not in memory cache, add to memory cache response = self.disk_cache[key] if self.enable_memory_cache: with self._lock: self.memory_cache[key] = response else: return None response = copy.deepcopy(response) if hasattr(response, "usage"): # Clear the usage data when cache is hit, because no LM call is made response.usage = {} response.cache_hit = True return response def put( self, request: dict[str, Any], value: Any, ignored_args_for_cache_key: list[str] | None = None, enable_memory_cache: bool = True, ) -> None: enable_memory_cache = self.enable_memory_cache and enable_memory_cache # Early return to avoid computing cache key if both memory and disk cache are disabled if not enable_memory_cache and not self.enable_disk_cache: return try: key = self.cache_key(request, ignored_args_for_cache_key) except Exception: logger.debug(f"Failed to generate cache key for request: {request}") return if enable_memory_cache: with self._lock: self.memory_cache[key] = value if self.enable_disk_cache: try: self.disk_cache[key] = value except Exception as e: # Disk cache writing can fail for different reasons, e.g. disk full or the `value` is not picklable. logger.debug(f"Failed to put value in disk cache: {value}, {e}") def reset_memory_cache(self) -> None: if not self.enable_memory_cache: return with self._lock: self.memory_cache.clear() def save_memory_cache(self, filepath: str) -> None: if not self.enable_memory_cache: return with self._lock: with open(filepath, "wb") as f: cloudpickle.dump(self.memory_cache, f) def load_memory_cache(self, filepath: str) -> None: if not self.enable_memory_cache: return with self._lock: with open(filepath, "rb") as f: self.memory_cache = cloudpickle.load(f) def request_cache( cache_arg_name: str | None = None, ignored_args_for_cache_key: list[str] | None = None, enable_memory_cache: bool = True, *, # everything after this is keyword-only maxsize: int | None = None, # legacy / no-op ): """ Decorator for applying caching to a function based on the request argument. Args: cache_arg_name: The name of the argument that contains the request. If not provided, the entire kwargs is used as the request. ignored_args_for_cache_key: A list of arguments to ignore when computing the cache key from the request. enable_memory_cache: Whether to enable in-memory cache at call time. If False, the memory cache will not be written to on new data. """ ignored_args_for_cache_key = ignored_args_for_cache_key or ["api_key", "api_base", "base_url"] # Deprecation notice if maxsize is not None: logger.warning( "[DEPRECATION] `maxsize` is deprecated and no longer does anything; " "the cache is now handled internally by `dspy.cache`. " "This parameter will be removed in a future release.", ) def decorator(fn): @wraps(fn) def process_request(args, kwargs): # Use fully qualified function name for uniqueness fn_identifier = f"{fn.__module__}.{fn.__qualname__}" # Create a modified request that includes the function identifier so that it's incorporated into the cache # key. Deep copy is required because litellm sometimes modifies the kwargs in place. if cache_arg_name: # When `cache_arg_name` is provided, use the value of the argument with this name as the request for # caching. modified_request = copy.deepcopy(kwargs[cache_arg_name]) else: # When `cache_arg_name` is not provided, use the entire kwargs as the request for caching. modified_request = copy.deepcopy(kwargs) for i, arg in enumerate(args): modified_request[f"positional_arg_{i}"] = arg modified_request["_fn_identifier"] = fn_identifier return modified_request @wraps(fn) def sync_wrapper(*args, **kwargs): import dspy cache = dspy.cache modified_request = process_request(args, kwargs) # Retrieve from cache if available cached_result = cache.get(modified_request, ignored_args_for_cache_key) if cached_result is not None: return cached_result # Otherwise, compute and store the result # Make a copy of the original request in case it's modified in place, e.g., deleting some fields original_request = copy.deepcopy(modified_request) result = fn(*args, **kwargs) # `enable_memory_cache` can be provided at call time to avoid indefinite growth. cache.put(original_request, result, ignored_args_for_cache_key, enable_memory_cache) return result @wraps(fn) async def async_wrapper(*args, **kwargs): import dspy cache = dspy.cache modified_request = process_request(args, kwargs) # Retrieve from cache if available cached_result = cache.get(modified_request, ignored_args_for_cache_key) if cached_result is not None: return cached_result # Otherwise, compute and store the result # Make a copy of the original request in case it's modified in place, e.g., deleting some fields original_request = copy.deepcopy(modified_request) result = await fn(*args, **kwargs) cache.put(original_request, result, ignored_args_for_cache_key, enable_memory_cache) return result if inspect.iscoroutinefunction(fn): return async_wrapper else: return sync_wrapper return decorator ``` -------------------------------------------------------------------------------- /docs/scripts/generate_api_docs.py: -------------------------------------------------------------------------------- ```python import importlib import inspect import pkgutil from pathlib import Path from typing import Any import dspy API_MAPPING = { "models": [ dspy.LM, dspy.Embedder, ], "primitives": [ dspy.Audio, dspy.Code, dspy.Example, dspy.Image, dspy.History, dspy.Prediction, dspy.Tool, dspy.ToolCalls, ], "signatures": [ dspy.Signature, dspy.InputField, dspy.OutputField, ], "adapters": [ dspy.Adapter, dspy.ChatAdapter, dspy.JSONAdapter, dspy.TwoStepAdapter, ], "modules": [ dspy.Module, dspy.Predict, dspy.ChainOfThought, dspy.ReAct, dspy.ProgramOfThought, dspy.MultiChainComparison, dspy.Parallel, dspy.BestOfN, dspy.Refine, ], "tools": [ dspy.ColBERTv2, dspy.retrievers.Embeddings, dspy.PythonInterpreter, ], "utils": [ dspy.inspect_history, dspy.load, dspy.asyncify, dspy.streamify, dspy.enable_logging, dspy.disable_logging, dspy.enable_litellm_logging, dspy.disable_litellm_logging, dspy.configure_cache, dspy.streaming.StatusMessageProvider, dspy.streaming.StatusMessage, dspy.streaming.StreamListener, ], "evaluation": [ dspy.Evaluate, dspy.evaluate.answer_exact_match, dspy.evaluate.answer_passage_match, dspy.evaluate.SemanticF1, dspy.evaluate.CompleteAndGrounded, dspy.evaluate.EvaluationResult, ], "optimizers": [ dspy.LabeledFewShot, dspy.BootstrapFewShot, dspy.BootstrapFewShotWithRandomSearch, dspy.MIPROv2, dspy.BetterTogether, dspy.BootstrapFinetune, dspy.COPRO, dspy.Ensemble, dspy.KNN, dspy.KNNFewShot, dspy.InferRules, dspy.GEPA, ], "experimental": [ dspy.experimental.Citations, dspy.experimental.Document, ], } LOCATION_OVERRIDES = { "docs/api/optimizers/GEPA.md": "docs/api/optimizers/GEPA/overview.md", } def should_document_method(obj): name = obj.__name__ # Exclude methods not defined in dspy, such as `model_dump_json` from pydantic. module = getattr(obj, "__module__", "") if not module or not module.startswith("dspy"): return False # Exclude private and dunder methods, but include `__call__` if name == "__call__" or not name.startswith("_"): return True return False def get_module_contents(module): """Get all public classes and functions from a module.""" contents_in_all = getattr(module, "__all__", None) contents = {} for name, obj in inspect.getmembers(module): if contents_in_all and name not in contents_in_all: continue if inspect.ismodule(obj) and obj.__name__.startswith(module.__name__) and not name.startswith("_"): contents[name] = obj elif ( (inspect.isclass(obj) or (inspect.isroutine(obj) and should_document_method(obj))) # classes or functions in experimental module are not located in dspy/experimental and (obj.__module__.startswith(module.__name__) or module.__name__.startswith("dspy.experimental")) and not name.startswith("_") ): contents[name] = obj return contents def get_public_methods(cls): """Returns a list of all public methods in a class.""" return [ name for name, member in inspect.getmembers( cls, predicate=lambda x: inspect.isroutine(x) and should_document_method(x) ) ] def generate_doc_page(name: str, module_path: str, obj: Any, is_root: bool = False) -> str: """Generate documentation page content for an object.""" members_config = "" if inspect.isclass(obj): methods = get_public_methods(obj) if methods: methods_list = "\n".join(f" - {method}" for method in methods) members_config = f""" members: {methods_list}""" # We need to put ::: at last to avoid unclosed div. See https://github.com/danielfrg/mkdocs-jupyter/issues/231 for more details. return f"""<!-- START_API_REF --> ::: {module_path}.{name} handler: python options:{members_config} show_source: true show_root_heading: true heading_level: 2 docstring_style: google show_root_full_path: true show_object_full_path: false separate_signature: false inherited_members: true ::: <!-- END_API_REF --> """ def get_api_category(obj): for category, objects in API_MAPPING.items(): if obj in objects: return category return None def read_existing_content(file_path: Path) -> tuple[str, str]: """Read existing file content and split into pre and post API reference sections. Returns: tuple[str, str]: (content_before_api_ref, content_after_api_ref) If file doesn't exist or no API ref section found, returns empty strings. """ if not file_path.exists(): return "", "" content = file_path.read_text() # Look for our specific API reference markers api_start_marker = "<!-- START_API_REF -->" api_end_marker = "<!-- END_API_REF -->" api_start = content.find(api_start_marker) if api_start == -1: # No API section found, treat all content as pre-content return content, "" api_end = content.find(api_end_marker) if api_end == -1: # Start marker found but no end marker - treat rest of file as post-content api_end = len(content) else: api_end = api_end + len(api_end_marker) return content[:api_start].rstrip(), content[api_end:].lstrip() def write_doc_file(file_path: Path, title: str, api_content: str): """Write documentation to file while preserving existing content.""" pre_content, post_content = read_existing_content(file_path) # If no pre-content exists, add the title if not pre_content: pre_content = f"# {title}\n" # Combine all sections full_content = f"{pre_content}\n\n{api_content}\n{post_content}".strip() + "\n" # Write the combined content file_path.write_text(full_content) def generate_md_docs(output_dir: Path, excluded_modules=None): """Generate documentation for all public classes and functions in the dspy package. Args: output_dir: The directory to write the documentation to, e.g. "docs/api" excluded_modules: A list of modules to exclude from documentation, e.g. ["dspy.dsp"] """ module = importlib.import_module("dspy") output_dir.mkdir(parents=True, exist_ok=True) init_contents = get_module_contents(module) objects_processed = {} # Generate docs for root-level objects, e.g. dspy.Predict, dspy.Example, etc. for name, obj in init_contents.items(): if inspect.ismodule(obj): continue category = get_api_category(obj) if category is None: # Skip if the object is not in the API mapping. continue page_content = generate_doc_page(name, "dspy", obj, is_root=True) file_path = output_dir / category / f"{name}.md" if file_path.as_posix() in LOCATION_OVERRIDES: file_path = Path(LOCATION_OVERRIDES[file_path.as_posix()]) write_doc_file(file_path, f"dspy.{name}", page_content) objects_processed[f"{obj.__module__}.{name}"] = obj for submodule in pkgutil.iter_modules(module.__path__, prefix=f"{module.__name__}."): submodule_name = submodule.name.split(".")[-1] # Skip if this is a private module or not in __init__.py if submodule_name.startswith("_") or submodule_name not in init_contents: continue generate_md_docs_submodule(submodule.name, output_dir, objects_processed, excluded_modules) def generate_md_docs_submodule(module_path: str, output_dir: Path, objects_processed=None, excluded_modules=None): """Recursively generate documentation for a submodule. We generate docs for all public classes and functions in the submodule, then recursively generate docs for all submodules within the submodule. Args: module_path: The path to the submodule, e.g. "dspy.predict" output_dir: The directory to write the documentation to, e.g. "docs/api/predict" objects_processed: A dictionary of objects that have already been processed, used to avoid redundant processing. excluded_modules: A list of modules to exclude from documentation, e.g. ["dspy.dsp"] """ if excluded_modules and module_path in excluded_modules: return try: module = importlib.import_module(module_path) except ImportError: print(f"Skipping {module_path} due to import error") return init_contents = get_module_contents(module) for name, obj in init_contents.items(): if inspect.ismodule(obj): continue category = get_api_category(obj) if category is None: # Skip if the object is not in the API mapping. continue full_name = f"{obj.__module__}.{name}" if full_name not in objects_processed: # Only generate docs for objects that are not root-level objects. page_content = generate_doc_page(name, module_path, obj, is_root=False) file_path = output_dir / category / f"{name}.md" if file_path.as_posix() in LOCATION_OVERRIDES: file_path = Path(LOCATION_OVERRIDES[file_path.as_posix()]) write_doc_file(file_path, f"{module_path}.{name}", page_content) objects_processed[full_name] = obj for name, obj in init_contents.items(): if inspect.ismodule(obj): generate_md_docs_submodule(f"{module_path}.{name}", output_dir / name, objects_processed) def remove_empty_dirs(path: Path): """Recursively remove empty directories.""" for child in path.glob("*"): if child.is_dir(): remove_empty_dirs(child) if path.is_dir() and not any(path.iterdir()): path.rmdir() if __name__ == "__main__": api_dir = Path("docs/api") api_dir.mkdir(parents=True, exist_ok=True) # Create category directories if they don't exist for category in API_MAPPING.keys(): subpath = api_dir / category subpath.mkdir(parents=True, exist_ok=True) excluded_modules = ["dspy.dsp"] generate_md_docs(api_dir, excluded_modules=excluded_modules) # Clean up empty directories remove_empty_dirs(api_dir) ``` -------------------------------------------------------------------------------- /dspy/teleprompt/simba_utils.py: -------------------------------------------------------------------------------- ```python import inspect import logging import textwrap from typing import Callable import orjson import dspy from dspy.adapters.utils import get_field_description_string from dspy.signatures import InputField, OutputField logger = logging.getLogger(__name__) def prepare_models_for_resampling(program: dspy.Module, n: int, teacher_settings: dict | None = None): lm = program.get_lm() or dspy.settings.lm start_rollout_id = lm.kwargs.get("rollout_id", 0) rollout_ids = [start_rollout_id + i for i in range(n)] start_rollout_idx, models = 0, [] # If we have a teacher model, use this as the first model if teacher_settings: teacher_lm = teacher_settings.get("lm") or lm teacher_lm.kwargs["rollout_id"] = rollout_ids[start_rollout_idx] models.append(teacher_lm) start_rollout_idx += 1 # The rest of the models are just copies of the base model models.extend([lm.copy(rollout_id=r, temperature=1.0) for r in rollout_ids[start_rollout_idx:]]) return models def wrap_program(program: dspy.Module, metric: Callable): def wrapped_program(example): with dspy.context(trace=[]): prediction, trace, score = None, None, 0.0 try: prediction = program(**example.inputs()) except Exception as e: logger.warning(e) trace = dspy.settings.trace.copy() output = None score = 0.0 output_metadata = {} try: output = metric(example, prediction) if isinstance(output, (int, float)): score = output elif isinstance(output, dspy.Prediction): if not hasattr(output, "score"): raise ValueError("When `metric` returns a `dspy.Prediction`, it must contain a `score` field.") score = output.score # Extract fields from the output dspy.Prediction, excluding `score`` output_metadata = { k: v for k, v in output.items() if k != "score" } except Exception as e: logger.warning(e) return { "prediction": prediction, "trace": trace, "score": score, "example": example, "output_metadata": output_metadata } return wrapped_program def append_a_demo(demo_input_field_maxlen): def append_a_demo_(bucket, system, **kwargs): predictor2name, name2predictor = kwargs["predictor2name"], kwargs["name2predictor"] batch_10p_score = kwargs["batch_10p_score"] good = bucket[0] trace = good["trace"] name2demo = {} if good["score"] <= batch_10p_score: logger.info(f"Skipping appending a demo as good score {good['score']} is at or below the 10th percentile.") return False for step in trace: predictor, _inputs, _outputs = step for k, v in _inputs.items(): if demo_input_field_maxlen and len(str(v)) > demo_input_field_maxlen: _inputs[k] = f"{str(v)[:demo_input_field_maxlen]}\n\t\t... <TRUNCATED FOR BREVITY>" demo = dspy.Example(augmented=True, **_inputs, **_outputs) name = predictor2name[id(predictor)] name2demo[name] = demo # keep the last demo for each predictor for name, demo in name2demo.items(): predictor = name2predictor[name] predictor.demos.append(demo) logger.info(f"Added {len(name2demo)} demos (one each) across all predictors.") return True return append_a_demo_ def append_a_rule(bucket, system, **kwargs): predictor2name = kwargs["predictor2name"] batch_10p_score, batch_90p_score = kwargs["batch_10p_score"], kwargs["batch_90p_score"] prompt_model = kwargs["prompt_model"] or dspy.settings.lm module_names = [name for name, _ in system.named_predictors()] good, bad = bucket[0], bucket[-1] example = good["example"] if good["score"] <= batch_10p_score or bad["score"] >= batch_90p_score: logger.info(f"Skipping rule generation as good score {good['score']} is at or below the 10th percentile " f"*or* bad score {bad['score']} is at or above the 90th percentile.") return False if good["score"] <= bad["score"]: if good["score"] > batch_90p_score: bad["trace"] = [] bad["score"] = "N/A" bad["prediction"] = {"N/A": "Prediction not available"} else: good["trace"] = [] good["score"] = "N/A" good["prediction"] = {"N/A": "Prediction not available"} better_trajectory = [ {"module_name": predictor2name[id(p)], "inputs": i, "outputs": dict(o)} for p, i, o in good["trace"] ] worse_trajectory = [ {"module_name": predictor2name[id(p)], "inputs": i, "outputs": dict(o)} for p, i, o in bad["trace"] ] kwargs = { "program_code": inspect.getsource(system.__class__), "modules_defn": inspect_modules(system), "program_inputs": {**example.inputs()}, "oracle_metadata": {**example.labels()}, "better_program_trajectory": better_trajectory, "better_program_outputs": dict(good["prediction"]), "worse_program_trajectory": worse_trajectory, "worse_program_outputs": dict(bad["prediction"] or {}), "worse_reward_value": bad["score"], "better_reward_value": good["score"], "worse_reward_info": bad["output_metadata"], "better_reward_info": good["output_metadata"], "module_names": module_names, } kwargs = {k: v if isinstance(v, str) else orjson.dumps(recursive_mask(v), option=orjson.OPT_INDENT_2).decode() for k, v in kwargs.items()} with dspy.settings.context(trace=[], lm=prompt_model): advice_program = dspy.Predict(OfferFeedback) advice = advice_program(**kwargs).module_advice for name, predictor in system.named_predictors(): if name in advice: logger.info(f"Advice for {name}: {advice[name]}") instructions = predictor.signature.instructions + "\n\n" + advice[name] predictor.signature = predictor.signature.with_instructions(instructions) return True class OfferFeedback(dspy.Signature): """ You will be given two trajectories of an LLM-driven program's execution. Your goal is to help the program's modules build up experience on how to maximize the reward value assigned to the program's outputs if it were to receive similar inputs in the future. The module won't see its own history. It will rely on your advice balancing being concrete and being generalizable. In your advice: - Avoid boilerplate. Offer advice that would change the module's behavior for the better in the future. - Ensure that advice offered to a module M is specific to that M's specific sub-task, not the overall program. - Rely on contrasting the behavior of the worse trajectory against the better trajectory in making recommendations. - Ensure each unique module name appears exactly once as a key in the advice dictionary. """ program_code: str = InputField(desc="The code of the program that we are analyzing") modules_defn: str = InputField(desc="The definition of each module in the program, including its I/O") program_inputs: str = InputField(desc="The inputs to the program that we are analyzing") oracle_metadata: str = InputField(desc="Any (hidden) metadata about the training set instance we're analyzing") worse_program_trajectory: str = InputField( desc="The trajectory of the program's execution, showing each module's I/O" ) worse_program_outputs: str = InputField(desc="The outputs of the program that we are analyzing") worse_reward_value: float = InputField(desc="The reward value assigned to the program's outputs") worse_reward_info: str = InputField(desc="Additional information that might be helpful to understanding the assigned reward value.") better_program_trajectory: str = InputField( desc="The trajectory of the program's execution, showing each module's I/O" ) better_program_outputs: str = InputField(desc="The outputs of the program that we are analyzing") better_reward_value: float = InputField(desc="The reward value assigned to the program's outputs") better_reward_info: str = InputField(desc="Additional information that might be helpful to understanding the assigned reward value.") module_names: list[str] = InputField(desc="The names of the modules in the program, for which we seek advice") discussion: str = OutputField(desc="Discussing blame of where each module went wrong, if it did") module_advice: dict[str, str] = OutputField( desc="For each module, describe very concretely: If the module receives ${description of input or patterns " "therein}, then it should ${description of content, behavior, or strategies to adopt and/or others to avoid}. " "Basically, your advice be such that if the module has access to your tip, it would be much more likely to act " "like the successful trajectory rather than the lower-scoring trajectory." ) def inspect_modules(program): separator = "-" * 80 output = [separator] for name, predictor in program.named_predictors(): signature = predictor.signature instructions = textwrap.dedent(signature.instructions) instructions = ("\n" + "\t" * 2).join([""] + instructions.splitlines()) output.append(f"Module {name}") output.append("\n\tInput Fields:") output.append(("\n" + "\t" * 2).join([""] + get_field_description_string(signature.input_fields).splitlines())) output.append("\tOutput Fields:") output.append(("\n" + "\t" * 2).join([""] + get_field_description_string(signature.output_fields).splitlines())) output.append(f"\tOriginal Instructions: {instructions}") output.append(separator) return "\n".join([o.strip("\n") for o in output]) def recursive_mask(o): # If the object is already serializable, return it. try: orjson.dumps(o) return o except (TypeError, orjson.JSONEncodeError): pass # If it's a dictionary, apply recursively to its values. if isinstance(o, dict): return {k: recursive_mask(v) for k, v in o.items()} # If it's a list, apply recursively. elif isinstance(o, list): return [recursive_mask(v) for v in o] # If it's a tuple, apply recursively. elif isinstance(o, tuple): return tuple(recursive_mask(v) for v in o) # Otherwise, replace it with a placeholder string (or use repr(o)). else: return f"<non-serializable: {type(o).__name__}>" ``` -------------------------------------------------------------------------------- /docs/docs/learn/programming/adapters.md: -------------------------------------------------------------------------------- ```markdown # Understanding DSPy Adapters ## What are Adapters? Adapters are the bridge between `dspy.Predict` and the actual Language Model (LM). When you call a DSPy module, the adapter takes your signature, user inputs, and other attributes like `demos` (few-shot examples) and converts them into multi-turn messages that get sent to the LM. The adapter system is responsible for: - Translating DSPy signatures into system messages that define the task and request/response structure. - Formatting input data according to the request structure outlined in DSPy signatures. - Parsing LM responses back into structured DSPy outputs, such as `dspy.Prediction` instances. - Managing conversation history and function calls. - Converting pre-built DSPy types into LM prompt messages, e.g., `dspy.Tool`, `dspy.Image`, etc. ## Configure Adapters You can use `dspy.configure(adapter=...)` to choose the adapter for the entire Python process, or `with dspy.context(adapter=...):` to only affect a certain namespace. If no adapter is specified in the DSPy workflow, each `dspy.Predict.__call__` defaults to using the `dspy.ChatAdapter`. Thus, the two code snippets below are equivalent: ```python import dspy dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) predict = dspy.Predict("question -> answer") result = predict(question="What is the capital of France?") ``` ```python import dspy dspy.configure( lm=dspy.LM("openai/gpt-4o-mini"), adapter=dspy.ChatAdapter(), # This is the default value ) predict = dspy.Predict("question -> answer") result = predict(question="What is the capital of France?") ``` ## Where Adapters Fit in the System The flow works as follows: 1. The user calls their DSPy agent, typically a `dspy.Module` with inputs. 2. The inner `dspy.Predict` is invoked to obtain the LM response. 3. `dspy.Predict` calls **Adapter.format()**, which converts its signature, inputs, and demos into multi-turn messages sent to the `dspy.LM`. `dspy.LM` is a thin wrapper around `litellm`, which communicates with the LM endpoint. 4. The LM receives the messages and generates a response. 5. **Adapter.parse()** converts the LM response into structured DSPy outputs, as specified in the signature. 6. The caller of `dspy.Predict` receives the parsed outputs. You can explicitly call `Adapter.format()` to view the messages sent to the LM. ```python # Simplified flow example signature = dspy.Signature("question -> answer") inputs = {"question": "What is 2+2?"} demos = [{"question": "What is 1+1?", "answer": "2"}] adapter = dspy.ChatAdapter() print(adapter.format(signature, demos, inputs)) ``` The output should resemble: ``` {'role': 'system', 'content': 'Your input fields are:\n1. `question` (str):\nYour output fields are:\n1. `answer` (str):\nAll interactions will be structured in the following way, with the appropriate values filled in.\n\n[[ ## question ## ]]\n{question}\n\n[[ ## answer ## ]]\n{answer}\n\n[[ ## completed ## ]]\nIn adhering to this structure, your objective is: \n Given the fields `question`, produce the fields `answer`.'} {'role': 'user', 'content': '[[ ## question ## ]]\nWhat is 1+1?'} {'role': 'assistant', 'content': '[[ ## answer ## ]]\n2\n\n[[ ## completed ## ]]\n'} {'role': 'user', 'content': '[[ ## question ## ]]\nWhat is 2+2?\n\nRespond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`.'} ``` ## Types of Adapters DSPy offers several adapter types, each tailored for specific use cases: ### ChatAdapter **ChatAdapter** is the default adapter and works with all language models. It uses a field-based format with special markers. #### Format Structure ChatAdapter uses `[[ ## field_name ## ]]` markers to delineate fields. For fields of non-primitive Python types, it includes the JSON schema of the type. Below, we use `dspy.inspect_history()` to display the formatted messages by `dspy.ChatAdapter` clearly. ```python import dspy import pydantic dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), adapter=dspy.ChatAdapter()) class ScienceNews(pydantic.BaseModel): text: str scientists_involved: list[str] class NewsQA(dspy.Signature): """Get news about the given science field""" science_field: str = dspy.InputField() year: int = dspy.InputField() num_of_outputs: int = dspy.InputField() news: list[ScienceNews] = dspy.OutputField(desc="science news") predict = dspy.Predict(NewsQA) predict(science_field="Computer Theory", year=2022, num_of_outputs=1) dspy.inspect_history() ``` The output looks like: ``` [2025-08-15T22:24:29.378666] System message: Your input fields are: 1. `science_field` (str): 2. `year` (int): 3. `num_of_outputs` (int): Your output fields are: 1. `news` (list[ScienceNews]): science news All interactions will be structured in the following way, with the appropriate values filled in. [[ ## science_field ## ]] {science_field} [[ ## year ## ]] {year} [[ ## num_of_outputs ## ]] {num_of_outputs} [[ ## news ## ]] {news} # note: the value you produce must adhere to the JSON schema: {"type": "array", "$defs": {"ScienceNews": {"type": "object", "properties": {"scientists_involved": {"type": "array", "items": {"type": "string"}, "title": "Scientists Involved"}, "text": {"type": "string", "title": "Text"}}, "required": ["text", "scientists_involved"], "title": "ScienceNews"}}, "items": {"$ref": "#/$defs/ScienceNews"}} [[ ## completed ## ]] In adhering to this structure, your objective is: Get news about the given science field User message: [[ ## science_field ## ]] Computer Theory [[ ## year ## ]] 2022 [[ ## num_of_outputs ## ]] 1 Respond with the corresponding output fields, starting with the field `[[ ## news ## ]]` (must be formatted as a valid Python list[ScienceNews]), and then ending with the marker for `[[ ## completed ## ]]`. Response: [[ ## news ## ]] [ { "scientists_involved": ["John Doe", "Jane Smith"], "text": "In 2022, researchers made significant advancements in quantum computing algorithms, demonstrating their potential to solve complex problems faster than classical computers. This breakthrough could revolutionize fields such as cryptography and optimization." } ] [[ ## completed ## ]] ``` !!! info "Practice: locate Signature information in the printed LM history" Try adjusting the signature, and observe how the changes are reflected in the printed LM message. Each field is preceded by a marker `[[ ## field_name ## ]]`. If an output field has non-primitive types, the instruction includes the type's JSON schema, and the output is formatted accordingly. Because the output field is structured as defined by ChatAdapter, it can be automatically parsed into structured data. #### When to Use ChatAdapter `ChatAdapter` offers the following advantages: - **Universal compatibility**: Works with all language models, though smaller models may generate responses that do not match the required format. - **Fallback protection**: If `ChatAdapter` fails, it automatically retries with `JSONAdapter`. In general, `ChatAdapter` is a reliable choice if you don't have specific requirements. #### When Not to Use ChatAdapter Avoid using `ChatAdapter` if you are: - **Latency sensitive**: `ChatAdapter` includes more boilerplate output tokens compared to other adapters, so if you're building a system sensitive to latency, consider using a different adapter. ### JSONAdapter **JSONAdapter** prompts the LM to return JSON data containing all output fields as specified in the signature. It is effective for models that support structured output via the `response_format` parameter, leveraging native JSON generation capabilities for more reliable parsing. #### Format Structure The input part of the prompt formatted by `JSONAdapter` is similar to `ChatAdapter`, but the output part differs, as shown below: ```python import dspy import pydantic dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), adapter=dspy.JSONAdapter()) class ScienceNews(pydantic.BaseModel): text: str scientists_involved: list[str] class NewsQA(dspy.Signature): """Get news about the given science field""" science_field: str = dspy.InputField() year: int = dspy.InputField() num_of_outputs: int = dspy.InputField() news: list[ScienceNews] = dspy.OutputField(desc="science news") predict = dspy.Predict(NewsQA) predict(science_field="Computer Theory", year=2022, num_of_outputs=1) dspy.inspect_history() ``` ``` System message: Your input fields are: 1. `science_field` (str): 2. `year` (int): 3. `num_of_outputs` (int): Your output fields are: 1. `news` (list[ScienceNews]): science news All interactions will be structured in the following way, with the appropriate values filled in. Inputs will have the following structure: [[ ## science_field ## ]] {science_field} [[ ## year ## ]] {year} [[ ## num_of_outputs ## ]] {num_of_outputs} Outputs will be a JSON object with the following fields. { "news": "{news} # note: the value you produce must adhere to the JSON schema: {\"type\": \"array\", \"$defs\": {\"ScienceNews\": {\"type\": \"object\", \"properties\": {\"scientists_involved\": {\"type\": \"array\", \"items\": {\"type\": \"string\"}, \"title\": \"Scientists Involved\"}, \"text\": {\"type\": \"string\", \"title\": \"Text\"}}, \"required\": [\"text\", \"scientists_involved\"], \"title\": \"ScienceNews\"}}, \"items\": {\"$ref\": \"#/$defs/ScienceNews\"}}" } In adhering to this structure, your objective is: Get news about the given science field User message: [[ ## science_field ## ]] Computer Theory [[ ## year ## ]] 2022 [[ ## num_of_outputs ## ]] 1 Respond with a JSON object in the following order of fields: `news` (must be formatted as a valid Python list[ScienceNews]). Response: { "news": [ { "text": "In 2022, researchers made significant advancements in quantum computing algorithms, demonstrating that quantum systems can outperform classical computers in specific tasks. This breakthrough could revolutionize fields such as cryptography and complex system simulations.", "scientists_involved": [ "Dr. Alice Smith", "Dr. Bob Johnson", "Dr. Carol Lee" ] } ] } ``` #### When to Use JSONAdapter `JSONAdapter` is good at: - **Structured output support**: When the model supports the `response_format` parameter. - **Low latency**: Minimal boilerplate in the LM response results in faster responses. #### When Not to Use JSONAdapter Avoid using `JSONAdapter` if you are: - Using a model that does not natively support structured output, such as a small open-source model hosted on Ollama. ## Summary Adapters are a crucial component of DSPy that bridge the gap between structured DSPy signatures and language model APIs. Understanding when and how to use different adapters will help you build more reliable and efficient DSPy programs. ``` -------------------------------------------------------------------------------- /dspy/predict/react.py: -------------------------------------------------------------------------------- ```python import logging from typing import TYPE_CHECKING, Any, Callable, Literal from litellm import ContextWindowExceededError import dspy from dspy.adapters.types.tool import Tool from dspy.primitives.module import Module from dspy.signatures.signature import ensure_signature logger = logging.getLogger(__name__) if TYPE_CHECKING: from dspy.signatures.signature import Signature class ReAct(Module): def __init__(self, signature: type["Signature"], tools: list[Callable], max_iters: int = 10): """ ReAct stands for "Reasoning and Acting," a popular paradigm for building tool-using agents. In this approach, the language model is iteratively provided with a list of tools and has to reason about the current situation. The model decides whether to call a tool to gather more information or to finish the task based on its reasoning process. The DSPy version of ReAct is generalized to work over any signature, thanks to signature polymorphism. Args: signature: The signature of the module, which defines the input and output of the react module. tools (list[Callable]): A list of functions, callable objects, or `dspy.Tool` instances. max_iters (Optional[int]): The maximum number of iterations to run. Defaults to 10. Example: ```python def get_weather(city: str) -> str: return f"The weather in {city} is sunny." react = dspy.ReAct(signature="question->answer", tools=[get_weather]) pred = react(question="What is the weather in Tokyo?") ``` """ super().__init__() self.signature = signature = ensure_signature(signature) self.max_iters = max_iters tools = [t if isinstance(t, Tool) else Tool(t) for t in tools] tools = {tool.name: tool for tool in tools} inputs = ", ".join([f"`{k}`" for k in signature.input_fields.keys()]) outputs = ", ".join([f"`{k}`" for k in signature.output_fields.keys()]) instr = [f"{signature.instructions}\n"] if signature.instructions else [] instr.extend( [ f"You are an Agent. In each episode, you will be given the fields {inputs} as input. And you can see your past trajectory so far.", f"Your goal is to use one or more of the supplied tools to collect any necessary information for producing {outputs}.\n", "To do this, you will interleave next_thought, next_tool_name, and next_tool_args in each turn, and also when finishing the task.", "After each tool call, you receive a resulting observation, which gets appended to your trajectory.\n", "When writing next_thought, you may reason about the current situation and plan for future steps.", "When selecting the next_tool_name and its next_tool_args, the tool must be one of:\n", ] ) tools["finish"] = Tool( func=lambda: "Completed.", name="finish", desc=f"Marks the task as complete. That is, signals that all information for producing the outputs, i.e. {outputs}, are now available to be extracted.", args={}, ) for idx, tool in enumerate(tools.values()): instr.append(f"({idx + 1}) {tool}") instr.append("When providing `next_tool_args`, the value inside the field must be in JSON format") react_signature = ( dspy.Signature({**signature.input_fields}, "\n".join(instr)) .append("trajectory", dspy.InputField(), type_=str) .append("next_thought", dspy.OutputField(), type_=str) .append("next_tool_name", dspy.OutputField(), type_=Literal[tuple(tools.keys())]) .append("next_tool_args", dspy.OutputField(), type_=dict[str, Any]) ) fallback_signature = dspy.Signature( {**signature.input_fields, **signature.output_fields}, signature.instructions, ).append("trajectory", dspy.InputField(), type_=str) self.tools = tools self.react = dspy.Predict(react_signature) self.extract = dspy.ChainOfThought(fallback_signature) def _format_trajectory(self, trajectory: dict[str, Any]): adapter = dspy.settings.adapter or dspy.ChatAdapter() trajectory_signature = dspy.Signature(f"{', '.join(trajectory.keys())} -> x") return adapter.format_user_message_content(trajectory_signature, trajectory) def forward(self, **input_args): trajectory = {} max_iters = input_args.pop("max_iters", self.max_iters) for idx in range(max_iters): try: pred = self._call_with_potential_trajectory_truncation(self.react, trajectory, **input_args) except ValueError as err: logger.warning(f"Ending the trajectory: Agent failed to select a valid tool: {_fmt_exc(err)}") break trajectory[f"thought_{idx}"] = pred.next_thought trajectory[f"tool_name_{idx}"] = pred.next_tool_name trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: trajectory[f"observation_{idx}"] = self.tools[pred.next_tool_name](**pred.next_tool_args) except Exception as err: trajectory[f"observation_{idx}"] = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" if pred.next_tool_name == "finish": break extract = self._call_with_potential_trajectory_truncation(self.extract, trajectory, **input_args) return dspy.Prediction(trajectory=trajectory, **extract) async def aforward(self, **input_args): trajectory = {} max_iters = input_args.pop("max_iters", self.max_iters) for idx in range(max_iters): try: pred = await self._async_call_with_potential_trajectory_truncation(self.react, trajectory, **input_args) except ValueError as err: logger.warning(f"Ending the trajectory: Agent failed to select a valid tool: {_fmt_exc(err)}") break trajectory[f"thought_{idx}"] = pred.next_thought trajectory[f"tool_name_{idx}"] = pred.next_tool_name trajectory[f"tool_args_{idx}"] = pred.next_tool_args try: trajectory[f"observation_{idx}"] = await self.tools[pred.next_tool_name].acall(**pred.next_tool_args) except Exception as err: trajectory[f"observation_{idx}"] = f"Execution error in {pred.next_tool_name}: {_fmt_exc(err)}" if pred.next_tool_name == "finish": break extract = await self._async_call_with_potential_trajectory_truncation(self.extract, trajectory, **input_args) return dspy.Prediction(trajectory=trajectory, **extract) def _call_with_potential_trajectory_truncation(self, module, trajectory, **input_args): for _ in range(3): try: return module( **input_args, trajectory=self._format_trajectory(trajectory), ) except ContextWindowExceededError: logger.warning("Trajectory exceeded the context window, truncating the oldest tool call information.") trajectory = self.truncate_trajectory(trajectory) async def _async_call_with_potential_trajectory_truncation(self, module, trajectory, **input_args): for _ in range(3): try: return await module.acall( **input_args, trajectory=self._format_trajectory(trajectory), ) except ContextWindowExceededError: logger.warning("Trajectory exceeded the context window, truncating the oldest tool call information.") trajectory = self.truncate_trajectory(trajectory) def truncate_trajectory(self, trajectory): """Truncates the trajectory so that it fits in the context window. Users can override this method to implement their own truncation logic. """ keys = list(trajectory.keys()) if len(keys) < 4: # Every tool call has 4 keys: thought, tool_name, tool_args, and observation. raise ValueError( "The trajectory is too long so your prompt exceeded the context window, but the trajectory cannot be " "truncated because it only has one tool call." ) for key in keys[:4]: trajectory.pop(key) return trajectory def _fmt_exc(err: BaseException, *, limit: int = 5) -> str: """ Return a one-string traceback summary. * `limit` - how many stack frames to keep (from the innermost outwards). """ import traceback return "\n" + "".join(traceback.format_exception(type(err), err, err.__traceback__, limit=limit)).strip() """ Thoughts and Planned Improvements for dspy.ReAct. TOPIC 01: How Trajectories are Formatted, or rather when they are formatted. Right now, both sub-modules are invoked with a `trajectory` argument, which is a string formatted in `forward`. Though the formatter uses a general adapter.format_fields, the tracing of DSPy only sees the string, not the formatting logic. What this means is that, in demonstrations, even if the user adjusts the adapter for a fixed program, the demos' format will not update accordingly, but the inference-time trajectories will. One way to fix this is to support `format=fn` in the dspy.InputField() for "trajectory" in the signatures. But this means that care must be taken that the adapter is accessed at `forward` runtime, not signature definition time. Another potential fix is to more natively support a "variadic" input field, where the input is a list of dictionaries, or a big dictionary, and have each adapter format it accordingly. Trajectories also affect meta-programming modules that view the trace later. It's inefficient O(n^2) to view the trace of every module repeating the prefix. TOPIC 03: Simplifying ReAct's __init__ by moving modular logic to the Tool class. * Handling exceptions and error messages. * More cleanly defining the "finish" tool, perhaps as a runtime-defined function? TOPIC 04: Default behavior when the trajectory gets too long. TOPIC 05: Adding more structure around how the instruction is formatted. * Concretely, it's now a string, so an optimizer can and does rewrite it freely. * An alternative would be to add more structure, such that a certain template is fixed but values are variable? TOPIC 06: Idiomatically allowing tools that maintain state across iterations, but not across different `forward` calls. * So the tool would be newly initialized at the start of each `forward` call, but maintain state across iterations. * This is pretty useful for allowing the agent to keep notes or count certain things, etc. """ ``` -------------------------------------------------------------------------------- /tests/adapters/test_xml_adapter.py: -------------------------------------------------------------------------------- ```python from unittest import mock import pydantic import pytest from litellm import Choices, Message, ModelResponse import dspy from dspy.adapters.chat_adapter import FieldInfoWithName from dspy.adapters.xml_adapter import XMLAdapter def test_xml_adapter_format_and_parse_basic(): class TestSignature(dspy.Signature): question: str = dspy.InputField() answer: str = dspy.OutputField() adapter = XMLAdapter() # Format output fields as XML fields_with_values = {FieldInfoWithName(name="answer", info=TestSignature.output_fields["answer"]): "Paris"} xml = adapter.format_field_with_value(fields_with_values) assert xml.strip() == "<answer>\nParis\n</answer>" # Parse XML output completion = "<answer>Paris</answer>" parsed = adapter.parse(TestSignature, completion) assert parsed == {"answer": "Paris"} def test_xml_adapter_parse_multiple_fields(): class TestSignature(dspy.Signature): question: str = dspy.InputField() answer: str = dspy.OutputField() explanation: str = dspy.OutputField() adapter = XMLAdapter() completion = """ <answer>Paris</answer> <explanation>The capital of France is Paris.</explanation> """ parsed = adapter.parse(TestSignature, completion) assert parsed == {"answer": "Paris", "explanation": "The capital of France is Paris."} def test_xml_adapter_parse_raises_on_missing_field(): class TestSignature(dspy.Signature): question: str = dspy.InputField() answer: str = dspy.OutputField() explanation: str = dspy.OutputField() adapter = XMLAdapter() completion = "<answer>Paris</answer>" with pytest.raises(dspy.utils.exceptions.AdapterParseError) as e: adapter.parse(TestSignature, completion) assert e.value.adapter_name == "XMLAdapter" assert e.value.signature == TestSignature assert e.value.lm_response == "<answer>Paris</answer>" assert "explanation" in str(e.value) def test_xml_adapter_parse_casts_types(): class TestSignature(dspy.Signature): number: int = dspy.OutputField() flag: bool = dspy.OutputField() adapter = XMLAdapter() completion = """ <number>42</number> <flag>true</flag> """ parsed = adapter.parse(TestSignature, completion) assert parsed == {"number": 42, "flag": True} def test_xml_adapter_parse_raises_on_type_error(): class TestSignature(dspy.Signature): number: int = dspy.OutputField() adapter = XMLAdapter() completion = "<number>not_a_number</number>" with pytest.raises(dspy.utils.exceptions.AdapterParseError) as e: adapter.parse(TestSignature, completion) assert "Failed to parse field" in str(e.value) def test_xml_adapter_format_and_parse_nested_model(): class InnerModel(pydantic.BaseModel): value: int label: str class TestSignature(dspy.Signature): question: str = dspy.InputField() result: InnerModel = dspy.OutputField() adapter = XMLAdapter() # Format output fields as XML fields_with_values = { FieldInfoWithName(name="result", info=TestSignature.output_fields["result"]): InnerModel(value=5, label="foo") } xml = adapter.format_field_with_value(fields_with_values) # The output will be a JSON string inside the XML tag assert xml.strip().startswith("<result>") assert '"value": 5' in xml assert '"label": "foo"' in xml assert xml.strip().endswith("</result>") # Parse XML output (should parse as string, not as model) completion = '<result>{"value": 5, "label": "foo"}</result>' parsed = adapter.parse(TestSignature, completion) # The parse_value helper will try to cast to InnerModel assert isinstance(parsed["result"], InnerModel) assert parsed["result"].value == 5 assert parsed["result"].label == "foo" def test_xml_adapter_format_and_parse_list_of_models(): class Item(pydantic.BaseModel): name: str score: float class TestSignature(dspy.Signature): items: list[Item] = dspy.OutputField() adapter = XMLAdapter() items = [Item(name="a", score=1.1), Item(name="b", score=2.2)] fields_with_values = {FieldInfoWithName(name="items", info=TestSignature.output_fields["items"]): items} xml = adapter.format_field_with_value(fields_with_values) assert xml.strip().startswith("<items>") assert '"name": "a"' in xml assert '"score": 2.2' in xml assert xml.strip().endswith("</items>") # Parse XML output import json completion = f"<items>{json.dumps([i.model_dump() for i in items])}</items>" parsed = adapter.parse(TestSignature, completion) assert isinstance(parsed["items"], list) assert all(isinstance(i, Item) for i in parsed["items"]) assert parsed["items"][0].name == "a" assert parsed["items"][1].score == 2.2 def test_xml_adapter_with_tool_like_output(): # XMLAdapter does not natively support tool calls, but we can test structured output class ToolCall(pydantic.BaseModel): name: str args: dict result: str class TestSignature(dspy.Signature): question: str = dspy.InputField() tool_calls: list[ToolCall] = dspy.OutputField() answer: str = dspy.OutputField() adapter = XMLAdapter() tool_calls = [ ToolCall(name="get_weather", args={"city": "Tokyo"}, result="Sunny"), ToolCall(name="get_population", args={"country": "Japan", "year": 2023}, result="125M"), ] fields_with_values = { FieldInfoWithName(name="tool_calls", info=TestSignature.output_fields["tool_calls"]): tool_calls, FieldInfoWithName( name="answer", info=TestSignature.output_fields["answer"] ): "The weather is Sunny. Population is 125M.", } xml = adapter.format_field_with_value(fields_with_values) assert xml.strip().startswith("<tool_calls>") assert '"name": "get_weather"' in xml assert '"result": "125M"' in xml assert xml.strip().endswith("</answer>") import json completion = ( f"<tool_calls>{json.dumps([tc.model_dump() for tc in tool_calls])}</tool_calls>" f"\n<answer>The weather is Sunny. Population is 125M.</answer>" ) parsed = adapter.parse(TestSignature, completion) assert isinstance(parsed["tool_calls"], list) assert parsed["tool_calls"][0].name == "get_weather" assert parsed["tool_calls"][1].result == "125M" assert parsed["answer"] == "The weather is Sunny. Population is 125M." def test_xml_adapter_formats_nested_images(): class ImageWrapper(pydantic.BaseModel): images: list[dspy.Image] tag: list[str] class MySignature(dspy.Signature): image: ImageWrapper = dspy.InputField() text: str = dspy.OutputField() image1 = dspy.Image(url="https://example.com/image1.jpg") image2 = dspy.Image(url="https://example.com/image2.jpg") image3 = dspy.Image(url="https://example.com/image3.jpg") image_wrapper = ImageWrapper(images=[image1, image2, image3], tag=["test", "example"]) demos = [ dspy.Example( image=image_wrapper, text="This is a test image", ), ] image_wrapper_2 = ImageWrapper(images=[dspy.Image(url="https://example.com/image4.jpg")], tag=["test", "example"]) adapter = dspy.XMLAdapter() messages = adapter.format(MySignature, demos, {"image": image_wrapper_2}) assert len(messages) == 4 # Image information in the few-shot example's user message expected_image1_content = {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}} expected_image2_content = {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}} expected_image3_content = {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}} assert expected_image1_content in messages[1]["content"] assert expected_image2_content in messages[1]["content"] assert expected_image3_content in messages[1]["content"] # The query image is formatted in the last user message assert {"type": "image_url", "image_url": {"url": "https://example.com/image4.jpg"}} in messages[-1]["content"] def test_xml_adapter_with_code(): # Test with code as input field class CodeAnalysis(dspy.Signature): """Analyze the time complexity of the code""" code: dspy.Code = dspy.InputField() result: str = dspy.OutputField() adapter = dspy.XMLAdapter() messages = adapter.format(CodeAnalysis, [], {"code": "print('Hello, world!')"}) assert len(messages) == 2 # The output field type description should be included in the system message even if the output field is nested assert dspy.Code.description() in messages[0]["content"] # The user message should include the question and the tools assert "print('Hello, world!')" in messages[1]["content"] # Test with code as output field class CodeGeneration(dspy.Signature): """Generate code to answer the question""" question: str = dspy.InputField() code: dspy.Code = dspy.OutputField() adapter = dspy.XMLAdapter() with mock.patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content='<code>print("Hello, world!")</code>'))], model="openai/gpt-4o-mini", ) result = adapter( dspy.LM(model="openai/gpt-4o-mini", cache=False), {}, CodeGeneration, [], {"question": "Write a python program to print 'Hello, world!'"}, ) assert result[0]["code"].code == 'print("Hello, world!")' def test_xml_adapter_full_prompt(): class QA(dspy.Signature): query: str = dspy.InputField() context: str | None = dspy.InputField() answer: str = dspy.OutputField() adapter = dspy.XMLAdapter() messages = adapter.format(QA, [], {"query": "when was Marie Curie born"}) assert len(messages) == 2 assert messages[0]["role"] == "system" assert messages[1]["role"] == "user" expected_system = ( "Your input fields are:\n" "1. `query` (str): \n" "2. `context` (UnionType[str, NoneType]):\n" "Your output fields are:\n" "1. `answer` (str):\n" "All interactions will be structured in the following way, with the appropriate values filled in.\n\n" "<query>\n{query}\n</query>\n\n" "<context>\n{context}\n</context>\n\n" "<answer>\n{answer}\n</answer>\n" "In adhering to this structure, your objective is: \n" " Given the fields `query`, `context`, produce the fields `answer`." ) expected_user = ( "[[ ## query ## ]]\nwhen was Marie Curie born\n\n" "Respond with the corresponding output fields wrapped in XML tags `<answer>`." ) assert messages[0]["content"] == expected_system assert messages[1]["content"] == expected_user ``` -------------------------------------------------------------------------------- /dspy/primitives/base_module.py: -------------------------------------------------------------------------------- ```python import copy import logging from collections import deque from collections.abc import Generator from pathlib import Path import cloudpickle import orjson from dspy.utils.saving import get_dependency_versions # NOTE: Note: It's important (temporary decision) to maintain named_parameters that's different in behavior from # named_sub_modules for the time being. logger = logging.getLogger(__name__) class BaseModule: def __init__(self): pass def named_parameters(self): """ Unlike PyTorch, handles (non-recursive) lists of parameters too. """ import dspy from dspy.predict.parameter import Parameter visited = set() named_parameters = [] def add_parameter(param_name, param_value): if isinstance(param_value, Parameter): if id(param_value) not in visited: visited.add(id(param_value)) named_parameters.append((param_name, param_value)) elif isinstance(param_value, dspy.Module): # When a sub-module is pre-compiled, keep it frozen. if not getattr(param_value, "_compiled", False): for sub_name, param in param_value.named_parameters(): add_parameter(f"{param_name}.{sub_name}", param) if isinstance(self, Parameter): add_parameter("self", self) for name, value in self.__dict__.items(): if isinstance(value, Parameter): add_parameter(name, value) elif isinstance(value, dspy.Module): # When a sub-module is pre-compiled, keep it frozen. if not getattr(value, "_compiled", False): for sub_name, param in value.named_parameters(): add_parameter(f"{name}.{sub_name}", param) elif isinstance(value, (list, tuple)): for idx, item in enumerate(value): add_parameter(f"{name}[{idx}]", item) elif isinstance(value, dict): for key, item in value.items(): add_parameter(f"{name}['{key}']", item) return named_parameters def named_sub_modules(self, type_=None, skip_compiled=False) -> Generator[tuple[str, "BaseModule"], None, None]: """Find all sub-modules in the module, as well as their names. Say `self.children[4]['key'].sub_module` is a sub-module. Then the name will be `children[4]['key'].sub_module`. But if the sub-module is accessible at different paths, only one of the paths will be returned. """ if type_ is None: type_ = BaseModule queue = deque([("self", self)]) seen = {id(self)} def add_to_queue(name, item): if id(item) not in seen: seen.add(id(item)) queue.append((name, item)) while queue: name, item = queue.popleft() if isinstance(item, type_): yield name, item if isinstance(item, BaseModule): if skip_compiled and getattr(item, "_compiled", False): continue for sub_name, sub_item in item.__dict__.items(): add_to_queue(f"{name}.{sub_name}", sub_item) elif isinstance(item, (list, tuple)): for i, sub_item in enumerate(item): add_to_queue(f"{name}[{i}]", sub_item) elif isinstance(item, dict): for key, sub_item in item.items(): add_to_queue(f"{name}[{key}]", sub_item) def parameters(self): return [param for _, param in self.named_parameters()] def deepcopy(self): """Deep copy the module. This is a tweak to the default python deepcopy that only deep copies `self.parameters()`, and for other attributes, we just do the shallow copy. """ try: # If the instance itself is copyable, we can just deep copy it. # Otherwise we will have to create a new instance and copy over the attributes one by one. return copy.deepcopy(self) except Exception: pass # Create an empty instance. new_instance = self.__class__.__new__(self.__class__) # Set attribuetes of the copied instance. for attr, value in self.__dict__.items(): if isinstance(value, BaseModule): setattr(new_instance, attr, value.deepcopy()) else: try: # Try to deep copy the attribute setattr(new_instance, attr, copy.deepcopy(value)) except Exception: logging.warning( f"Failed to deep copy attribute '{attr}' of {self.__class__.__name__}, " "falling back to shallow copy or reference copy." ) try: # Fallback to shallow copy if deep copy fails setattr(new_instance, attr, copy.copy(value)) except Exception: # If even the shallow copy fails, we just copy over the reference. setattr(new_instance, attr, value) return new_instance def reset_copy(self): """Deep copy the module and reset all parameters.""" new_instance = self.deepcopy() for param in new_instance.parameters(): param.reset() return new_instance def dump_state(self, json_mode=True): return {name: param.dump_state(json_mode=json_mode) for name, param in self.named_parameters()} def load_state(self, state): for name, param in self.named_parameters(): param.load_state(state[name]) def save(self, path, save_program=False, modules_to_serialize=None): """Save the module. Save the module to a directory or a file. There are two modes: - `save_program=False`: Save only the state of the module to a json or pickle file, based on the value of the file extension. - `save_program=True`: Save the whole module to a directory via cloudpickle, which contains both the state and architecture of the model. If `save_program=True` and `modules_to_serialize` are provided, it will register those modules for serialization with cloudpickle's `register_pickle_by_value`. This causes cloudpickle to serialize the module by value rather than by reference, ensuring the module is fully preserved along with the saved program. This is useful when you have custom modules that need to be serialized alongside your program. If None, then no modules will be registered for serialization. We also save the dependency versions, so that the loaded model can check if there is a version mismatch on critical dependencies or DSPy version. Args: path (str): Path to the saved state file, which should be a .json or .pkl file when `save_program=False`, and a directory when `save_program=True`. save_program (bool): If True, save the whole module to a directory via cloudpickle, otherwise only save the state. modules_to_serialize (list): A list of modules to serialize with cloudpickle's `register_pickle_by_value`. If None, then no modules will be registered for serialization. """ metadata = {} metadata["dependency_versions"] = get_dependency_versions() path = Path(path) if save_program: if path.suffix: raise ValueError( f"`path` must point to a directory without a suffix when `save_program=True`, but received: {path}" ) if path.exists() and not path.is_dir(): raise NotADirectoryError(f"The path '{path}' exists but is not a directory.") if not path.exists(): # Create the directory (and any parent directories) path.mkdir(parents=True) try: modules_to_serialize = modules_to_serialize or [] for module in modules_to_serialize: cloudpickle.register_pickle_by_value(module) with open(path / "program.pkl", "wb") as f: cloudpickle.dump(self, f) except Exception as e: raise RuntimeError( f"Saving failed with error: {e}. Please remove the non-picklable attributes from your DSPy program, " "or consider using state-only saving by setting `save_program=False`." ) with open(path / "metadata.json", "wb") as f: f.write(orjson.dumps(metadata, option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE)) return if path.suffix == ".json": state = self.dump_state() state["metadata"] = metadata try: with open(path, "wb") as f: f.write(orjson.dumps(state, option=orjson.OPT_INDENT_2 | orjson.OPT_APPEND_NEWLINE)) except Exception as e: raise RuntimeError( f"Failed to save state to {path} with error: {e}. Your DSPy program may contain non " "json-serializable objects, please consider saving the state in .pkl by using `path` ending " "with `.pkl`, or saving the whole program by setting `save_program=True`." ) elif path.suffix == ".pkl": state = self.dump_state(json_mode=False) state["metadata"] = metadata with open(path, "wb") as f: cloudpickle.dump(state, f) else: raise ValueError(f"`path` must end with `.json` or `.pkl` when `save_program=False`, but received: {path}") def load(self, path): """Load the saved module. You may also want to check out dspy.load, if you want to load an entire program, not just the state for an existing program. Args: path (str): Path to the saved state file, which should be a .json or a .pkl file """ path = Path(path) if path.suffix == ".json": with open(path, "rb") as f: state = orjson.loads(f.read()) elif path.suffix == ".pkl": with open(path, "rb") as f: state = cloudpickle.load(f) else: raise ValueError(f"`path` must end with `.json` or `.pkl`, but received: {path}") dependency_versions = get_dependency_versions() saved_dependency_versions = state["metadata"]["dependency_versions"] for key, saved_version in saved_dependency_versions.items(): if dependency_versions[key] != saved_version: logger.warning( f"There is a mismatch of {key} version between saved model and current environment. " f"You saved with `{key}=={saved_version}`, but now you have " f"`{key}=={dependency_versions[key]}`. This might cause errors or performance downgrade " "on the loaded model, please consider loading the model in the same environment as the " "saving environment." ) self.load_state(state) ``` -------------------------------------------------------------------------------- /tests/clients/test_cache.py: -------------------------------------------------------------------------------- ```python import os from dataclasses import dataclass from unittest.mock import patch import pydantic import pytest from cachetools import LRUCache from diskcache import FanoutCache from dspy.clients.cache import Cache @dataclass class DummyResponse: message: str usage: dict @pytest.fixture def cache_config(tmp_path): """Default cache configuration.""" return { "enable_disk_cache": True, "enable_memory_cache": True, "disk_cache_dir": str(tmp_path), "disk_size_limit_bytes": 1024 * 1024, # 1MB "memory_max_entries": 100, } @pytest.fixture def cache(cache_config): """Create a cache instance with the default configuration.""" return Cache(**cache_config) def test_initialization(tmp_path): """Test different cache initialization configurations.""" # Test memory-only cache memory_cache = Cache( enable_disk_cache=False, enable_memory_cache=True, disk_cache_dir="", disk_size_limit_bytes=0, memory_max_entries=50, ) assert isinstance(memory_cache.memory_cache, LRUCache) assert memory_cache.memory_cache.maxsize == 50 assert memory_cache.disk_cache == {} # Test disk-only cache disk_cache = Cache( enable_disk_cache=True, enable_memory_cache=False, disk_cache_dir=str(tmp_path), disk_size_limit_bytes=1024, memory_max_entries=0, ) assert isinstance(disk_cache.disk_cache, FanoutCache) assert disk_cache.memory_cache == {} # Test disabled cache disabled_cache = Cache( enable_disk_cache=False, enable_memory_cache=False, disk_cache_dir="", disk_size_limit_bytes=0, memory_max_entries=0, ) assert disabled_cache.memory_cache == {} assert disabled_cache.disk_cache == {} def test_cache_key_generation(cache): """Test cache key generation with different types of inputs.""" # Test with simple dictionary request = {"prompt": "Hello", "model": "openai/gpt-4o-mini", "temperature": 0.7} key = cache.cache_key(request) assert isinstance(key, str) assert len(key) == 64 # SHA-256 hash is 64 characters # Test with pydantic model class TestModel(pydantic.BaseModel): name: str value: int model = TestModel(name="test", value=42) request_with_model = {"data": model} key_with_model = cache.cache_key(request_with_model) assert isinstance(key_with_model, str) # Test with pydantic model class request_with_model_class = {"model_class": TestModel} key_with_model_class = cache.cache_key(request_with_model_class) assert isinstance(key_with_model_class, str) def test_put_and_get(cache): """Test putting and getting from cache.""" # Test putting and getting from memory cache request = {"prompt": "Hello", "model": "openai/gpt-4o-mini", "temperature": 0.7} value = DummyResponse(message="This is a test response", usage={"prompt_tokens": 10, "completion_tokens": 20}) cache.put(request, value) result = cache.get(request) assert result.message == value.message assert result.usage == {} # Test with disk cache # First, clear memory cache to ensure we're using disk cache cache.reset_memory_cache() # Get from disk cache result_from_disk = cache.get(request) assert result_from_disk.message == value.message assert result_from_disk.usage == {} # Verify it was also added back to memory cache assert cache.cache_key(request) in cache.memory_cache def test_cache_miss(cache): """Test getting a non-existent key.""" request = {"prompt": "Non-existent", "model": "gpt-4"} result = cache.get(request) assert result is None def test_cache_key_error_handling(cache): """Test error handling for unserializable objects.""" # Test with a request that can't be serialized to JSON class UnserializableObject: pass request = {"data": UnserializableObject()} # Should not raise an exception result = cache.get(request) assert result is None # Should not raise an exception cache.put(request, "value") def test_reset_memory_cache(cache): """Test resetting memory cache.""" # Add some items to the memory cache requests = [{"prompt": f"Hello {i}", "model": "openai/gpt-4o-mini"} for i in range(5)] for i, req in enumerate(requests): cache.put(req, f"Response {i}") # Verify items are in memory cache for req in requests: key = cache.cache_key(req) assert key in cache.memory_cache # Reset memory cache cache.reset_memory_cache() # Verify memory cache is empty assert len(cache.memory_cache) == 0 # But disk cache still has the items for req in requests: result = cache.get(req) assert result is not None def test_save_and_load_memory_cache(cache, tmp_path): """Test saving and loading memory cache.""" # Add some items to the memory cache requests = [{"prompt": f"Hello {i}", "model": "openai/gpt-4o-mini"} for i in range(5)] for i, req in enumerate(requests): cache.put(req, f"Response {i}") # Save memory cache to a temporary file temp_cache_file = tmp_path / "memory_cache.pkl" cache.save_memory_cache(str(temp_cache_file)) # Create a new cache instance with disk cache disabled new_cache = Cache( enable_memory_cache=True, enable_disk_cache=False, disk_cache_dir=tmp_path / "disk_cache", disk_size_limit_bytes=0, memory_max_entries=100, ) # Load the memory cache new_cache.load_memory_cache(str(temp_cache_file)) # Verify items are in the new memory cache for req in requests: result = new_cache.get(req) assert result is not None assert result == f"Response {requests.index(req)}" def test_request_cache_decorator(cache): """Test the lm_cache decorator.""" from dspy.clients.cache import request_cache # Mock the dspy.cache attribute with patch("dspy.cache", cache): # Define a test function @request_cache() def test_function(prompt, model): return f"Response for {prompt} with {model}" # First call should compute the result result1 = test_function(prompt="Hello", model="openai/gpt-4o-mini") assert result1 == "Response for Hello with openai/gpt-4o-mini" # Second call with same arguments should use cache with patch.object(cache, "get") as mock_get: mock_get.return_value = "Cached response" result2 = test_function(prompt="Hello", model="openai/gpt-4o-mini") assert result2 == "Cached response" mock_get.assert_called_once() # Call with different arguments should compute again result3 = test_function(prompt="Different", model="openai/gpt-4o-mini") assert result3 == "Response for Different with openai/gpt-4o-mini" def test_request_cache_decorator_with_ignored_args_for_cache_key(cache): """Test the request_cache decorator with ignored_args_for_cache_key.""" from dspy.clients.cache import request_cache # Mock the dspy.cache attribute with patch("dspy.cache", cache): # Define a test function @request_cache(ignored_args_for_cache_key=["model"]) def test_function1(prompt, model): return f"Response for {prompt} with {model}" @request_cache() def test_function2(prompt, model): return f"Response for {prompt} with {model}" # First call should compute the result result1 = test_function1(prompt="Hello", model="openai/gpt-4o-mini") result2 = test_function1(prompt="Hello", model="openai/gpt-4o") # Because model arg is ignored, the second call should return the same result as the first assert result1 == result2 result3 = test_function2(prompt="Hello", model="openai/gpt-4o-mini") result4 = test_function2(prompt="Hello", model="openai/gpt-4o") # Because model arg is not ignored, the second call should return a different result assert result3 != result4 @pytest.mark.asyncio async def test_request_cache_decorator_async(cache): """Test the request_cache decorator with async functions.""" from dspy.clients.cache import request_cache # Mock the dspy.cache attribute with patch("dspy.cache", cache): # Define a test function @request_cache() async def test_function(prompt, model): return f"Response for {prompt} with {model}" # First call should compute the result result1 = await test_function(prompt="Hello", model="openai/gpt-4o-mini") assert result1 == "Response for Hello with openai/gpt-4o-mini" # Second call with same arguments should use cache with patch.object(cache, "get") as mock_get: mock_get.return_value = "Cached response" result2 = await test_function(prompt="Hello", model="openai/gpt-4o-mini") assert result2 == "Cached response" mock_get.assert_called_once() # Call with different arguments should compute again result3 = await test_function(prompt="Different", model="openai/gpt-4o-mini") assert result3 == "Response for Different with openai/gpt-4o-mini" def test_cache_consistency_with_lm_call_modifies_the_request(cache): """Test that the cache is consistent with the LM call that modifies the request.""" from dspy.clients.cache import request_cache # Mock the dspy.cache attribute with patch("dspy.cache", cache): # Define a test function @request_cache() def test_function(**kwargs): del kwargs["field_to_delete"] return kwargs # First call should compute the result test_function(field_to_delete="delete", field_to_keep="keep") # The cache key should use the original request, not the modified one assert ( cache.get( { "field_to_keep": "keep", "_fn_identifier": f"{test_function.__module__}.{test_function.__qualname__}", } ) is None ) assert ( cache.get( { "field_to_keep": "keep", "field_to_delete": "delete", "_fn_identifier": f"{test_function.__module__}.{test_function.__qualname__}", } ) is not None ) def test_cache_fallback_on_restricted_environment(): """Test that DSPy gracefully falls back to memory-only cache when disk cache fails.""" old_env = os.environ.get("DSPY_CACHEDIR") try: # Set an invalid cache directory that can't be created os.environ["DSPY_CACHEDIR"] = "/dev/null/invalid_path" import dspy from dspy.clients import _get_dspy_cache dspy.cache = _get_dspy_cache() # Cache should work with memory-only fallback despite invalid disk path test_request = {"model": "test", "prompt": "hello"} dspy.cache.put(test_request, "fallback_result") result = dspy.cache.get(test_request) assert result == "fallback_result", "Memory cache fallback should work" finally: if old_env is None: os.environ.pop("DSPY_CACHEDIR", None) else: os.environ["DSPY_CACHEDIR"] = old_env ``` -------------------------------------------------------------------------------- /docs/docs/community/use-cases.md: -------------------------------------------------------------------------------- ```markdown # Use Cases We often get questions like “How are people using DSPy in practice?”, both in production and for research. This list was created to collect a few pointers and to encourage others in the community to add their own work below. This list is continuously growing. We regularly add new use cases and welcome community contributions. If you would like to add your product or research to this list, please submit a PR. ## A Few Company Use Cases | **Name** | **Use Cases** | |---|---| | **[JetBlue](https://www.jetblue.com/)** | Multiple chatbot use cases. [Blog](https://www.databricks.com/blog/optimizing-databricks-llm-pipelines-dspy) | | **[Replit](https://replit.com/)** | Synthesize diffs using code LLMs using a DSPy pipeline. [Blog](https://blog.replit.com/code-repair) | | **[Databricks](https://www.databricks.com/)** | Research, products, and customer solutions around LM Judges, RAG, classification, and other applications. [Blog](https://www.databricks.com/blog/dspy-databricks), [Blog II](https://www.databricks.com/customers/ddi) | | **[Sephora](https://www.sephora.com/)** | Undisclosed agent usecases; perspectives shared in [DAIS Session](https://www.youtube.com/watch?v=D2HurSldDkE). | | **[Zoro UK](https://www.zoro.co.uk/)** | E-commerce applications around structured shopping. [Portkey Session](https://www.youtube.com/watch?v=_vGKSc1tekE) | | **[VMware](https://www.vmware.com/)** | RAG and other prompt optimization applications. [Interview in The Register.](https://www.theregister.com/2024/02/22/prompt_engineering_ai_models/) [Business Insider.](https://www.businessinsider.com/chaptgpt-large-language-model-ai-prompt-engineering-automated-optimizer-2024-3) | | **[Haize Labs](https://www.haizelabs.com/)** | Automated red-teaming for LLMs. [Blog](https://blog.haizelabs.com/posts/dspy/) | | **[Plastic Labs](https://www.plasticlabs.ai/)** | R&D pipelines for Honcho. [Blog](https://blog.plasticlabs.ai/blog/User-State-is-State-of-the-Art) | | **[PingCAP](https://pingcap.com/)** | Building a knowledge graph. [Article](https://www.pingcap.com/article/building-a-graphrag-from-wikipedia-page-using-dspy-openai-and-tidb-vector-database/) | | **[Salomatic](https://langtrace.ai/blog/case-study-how-salomatic-used-langtrace-to-build-a-reliable-medical-report-generation-system)** | Enriching medical reports using DSPy. [Blog](https://langtrace.ai/blog/case-study-how-salomatic-used-langtrace-to-build-a-reliable-medical-report-generation-system) | | **[Truelaw](https://www.youtube.com/watch?v=O0F3RAWZNfM)** | How Truelaw builds bespoke LLM pipelines for law firms using DSPy. [Podcast](https://www.youtube.com/watch?v=O0F3RAWZNfM) | | **[STChealth](https://stchealth.com/)** | Using DSPy for entity resolution including human-readable rationale for decisions. | | **[Moody's](https://www.moodys.com/)** | Leveraging DSPy to optimize RAG systems, LLM-as-a-Judge, and agentic systems for financial workflows. | | **[Normal Computing](https://www.normalcomputing.com/)** | Translate specs from chip companies from English to intermediate formal languages | | **[Procure.FYI](https://www.procure.fyi/)** | Process messy, publicly available technology spending and pricing data via DSPy. | | **[RadiantLogic](https://www.radiantlogic.com/)** | AI Data Assistant. DSPy is used for the agent that routes the query, the context extraction module, the text-to-sql conversion engine, and the table summarization module. | | **[Raia](https://raiahealth.com/)** | Using DSPy for AI-powered Personal Healthcare Agents. | | **[Hyperlint](https://hyperlint.com)** | Uses DSPy to generate technical documentation. DSPy helps to fetch relevant information and synthesize that into tutorials. | | **[Starops](https://staropshq.com/) & [Saya](https://heysaya.ai/)** | Building research documents given a user's corpus. Generate prompts to create more articles from example articles. | | **[Tessel AI](https://tesselai.com/)** | Enhancing human-machine interaction with data use cases. | | **[Dicer.ai](https://dicer.ai/)** | Uses DSPy for marketing AI to get the most from their paid ads. | | **[Howie](https://howie.ai)** | Using DSPy to automate meeting scheduling through email. | | **[Isoform.ai](https://isoform.ai)** | Building custom integrations using DSPy. | | **[Trampoline AI](https://trampoline.ai)** | Uses DSPy to power their data-augmentation and LM pipelines. | | **[Pretrain](https://pretrain.com)** | Uses DSPy to automatically optimize AI performance towards user-defined tasks based on uploaded examples. | | **[Spindle AI](https://spindle.ai)** | Turns natural-language constrained optimization problems into solvable mathematical programs whose candidate solutions are scenarios. | | **[Infinitus](https://www.infinitus.ai/product/ai-agents/)** | Leverages DSPy to build and optimize healthcare AI agents | This list represents companies that have publicly shared their use cases or have provided permission to be included. It reflects a selection of the many industry applications of DSPy currently in production. ## A Few Papers Using DSPy | **Name** | **Description** | |---|---| | **[STORM](https://arxiv.org/abs/2402.14207)** | Writing Wikipedia-like Articles From Scratch. | | **[PATH](https://arxiv.org/abs/2406.11706)** | Prompts as Auto-Optimized Training Hyperparameters: Training Best-in-Class IR Models from Scratch with 10 Gold Labels | | **[WangLab @ MEDIQA](https://arxiv.org/abs/2404.14544)** | UofT's winning system at MEDIQA, outperforms the next best system by 20 points | | **[UMD's Suicide Detection System](https://arxiv.org/abs/2406.06608)** | Outperforms 20-hour expert human prompt engineering by 40% | | **[IReRa](https://arxiv.org/abs/2401.12178)** | Infer-Retrieve-Rank: Extreme Classification with > 10,000 Labels | | **[Unreasonably Effective Eccentric Prompts](https://arxiv.org/abs/2402.10949v2)** | General Prompt Optimization | | **[Palimpzest](https://arxiv.org/abs/2405.14696)** | A Declarative System for Optimizing AI Workloads | | **[AI Agents that Matter](https://arxiv.org/abs/2407.01502v1)** | Agent Efficiency Optimization | | **[EDEN](https://arxiv.org/abs/2406.17982v1)** | Empathetic Dialogues for English Learning: Uses adaptive empathetic feedback to improve student grit | | **[ECG-Chat](https://arxiv.org/pdf/2408.08849)** | Uses DSPy with GraphRAG for medical report generation | | **[DSPy Assertions](https://arxiv.org/abs/2312.13382)** | Various applications of imposing hard and soft constraints on LM outputs | | **[DSPy Guardrails](https://boxiyu.github.io/assets/pdf/DSPy_Guardrails.pdf)** | Reduce the attack success rate of CodeAttack, decreasing from 75% to 5% | | **[Co-STORM](https://arxiv.org/pdf/2408.15232)** | Collaborative STORM: Generate Wikipedia-like articles through collaborative discourse among users and multiple LM agents | | **[MedVAL](https://arxiv.org/abs/2507.03152)** | Expert-level validation of AI-generated medical text with scalable language models | | **[Neural @ ArchEHR-QA 2025](https://aclanthology.org/2025.bionlp-share.13.pdf)** | Runner up method at 2025 BioNLP Shared Task Workshop This list is regularly updated with new research publications using DSPy. ## A Few Repositories (or other OSS examples) using DSPy | **Name** | **Description/Link** | |---|---| | **Stanford CS 224U Homework** | [Github](https://github.com/cgpotts/cs224u/blob/main/hw_openqa.ipynb) | | **STORM Report Generation (10,000 GitHub stars)** | [Github](https://github.com/stanford-oval/storm) | | **DSPy Redteaming** | [Github](https://github.com/haizelabs/dspy-redteam) | | **DSPy Theory of Mind** | [Github](https://github.com/plastic-labs/dspy-opentom) | | **Indic cross-lingual Natural Language Inference** | [Github](https://github.com/saifulhaq95/DSPy-Indic/blob/main/indicxlni.ipynb) | | **Optimizing LM for Text2SQL using DSPy** | [Github](https://github.com/jjovalle99/DSPy-Text2SQL) | | **DSPy PII Masking Demo by Eric Ness** | [Colab](https://colab.research.google.com/drive/1KZR1sGTp_RLWUJPAiK1FKPKI-Qn9neUm?usp=sharing) | | **DSPy on BIG-Bench Hard Example** | [Github](https://drchrislevy.github.io/posts/dspy/dspy.html) | | **Building a chess playing agent using DSPy** | [Github](https://medium.com/thoughts-on-machine-learning/building-a-chess-playing-agent-using-dspy-9b87c868f71e) | | **Ittia Research Fact Checking** | [Github](https://github.com/ittia-research/check) | | **Strategic Debate via Tree-of-Thought** | [Github](https://github.com/zbambergerNLP/strategic-debate-tot) | | **Sanskrit to English Translation App**| [Github](https://github.com/ganarajpr/sanskrit-translator-dspy) | | **DSPy for extracting features from PDFs on arXiv**| [Github](https://github.com/S1M0N38/dspy-arxiv) | | **DSPygen: DSPy in Ruby on Rails**| [Github](https://github.com/seanchatmangpt/dspygen) | | **DSPy Inspector**| [Github](https://github.com/Neoxelox/dspy-inspector) | | **DSPy with FastAPI**| [Github](https://github.com/diicellman/dspy-rag-fastapi) | | **DSPy for Indian Languages**| [Github](https://github.com/saifulhaq95/DSPy-Indic) | | **Hurricane: Blog Posts with Generative Feedback Loops!**| [Github](https://github.com/weaviate-tutorials/Hurricane) | | **RAG example using DSPy, Gradio, FastAPI, and Ollama**| [Github](https://github.com/diicellman/dspy-gradio-rag) | | **Synthetic Data Generation**| [Github](https://colab.research.google.com/drive/1CweVOu0qhTC0yOfW5QkLDRIKuAuWJKEr?usp=sharing) | | **Self Discover**| [Github](https://colab.research.google.com/drive/1GkAQKmw1XQgg5UNzzy8OncRe79V6pADB?usp=sharing) | | **MedVAL**| [Github](https://github.com/StanfordMIMI/MedVAL) | This list showcases some of the open-source projects and repositories using DSPy, with many more examples available in the community. ## A Few Providers, Integrations, and related Blog Releases | **Name** | **Link** | |---|---| | **Databricks** | [Link](https://www.databricks.com/blog/dspy-databricks) | | **Zenbase** | [Link](https://zenbase.ai/) | | **LangWatch** | [Link](https://langwatch.ai/blog/introducing-dspy-visualizer) | | **Gradient** | [Link](https://gradient.ai/blog/achieving-gpt-4-level-performance-at-lower-cost-using-dspy) | | **Snowflake** | [Link](https://medium.com/snowflake/dspy-snowflake-140d6d947d73) | | **Langchain** | [Link](https://python.langchain.com/v0.2/docs/integrations/providers/dspy/) | | **Weaviate** | [Link](https://weaviate.io/blog/dspy-optimizers) | | **Qdrant** | [Link](https://qdrant.tech/documentation/frameworks/dspy/) | | **Weights & Biases Weave** | [Link](https://weave-docs.wandb.ai/guides/integrations/dspy/) | | **Milvus** | [Link](https://milvus.io/docs/integrate_with_dspy.md) | | **Neo4j** | [Link](https://neo4j.com/labs/genai-ecosystem/dspy/) | | **Lightning AI** | [Link](https://lightning.ai/lightning-ai/studios/dspy-programming-with-foundation-models) | | **Haystack** | [Link](https://towardsdatascience.com/automating-prompt-engineering-with-dspy-and-haystack-926a637a3f43) | | **Arize** | [Link](https://docs.arize.com/phoenix/tracing/integrations-tracing/dspy) | | **LlamaIndex** | [Link](https://github.com/stanfordnlp/dspy/blob/main/examples/llamaindex/dspy_llamaindex_rag.ipynb) | | **Langtrace** | [Link](https://docs.langtrace.ai/supported-integrations/llm-frameworks/dspy) | | **Langfuse** | [Link](https://langfuse.com/docs/integrations/dspy) | | **OpenLIT** | [Link](https://docs.openlit.io/latest/integrations/dspy) | | **Relevance AI** | [Link](https://relevanceai.com/blog/dspy-programming---not-prompting---language-models) | Credit: Some of these resources were originally compiled in the [Awesome DSPy](https://github.com/ganarajpr/awesome-dspy/tree/master) repo. ``` -------------------------------------------------------------------------------- /dspy/streaming/streamify.py: -------------------------------------------------------------------------------- ```python import asyncio import contextvars import logging import threading from asyncio import iscoroutinefunction from queue import Queue from typing import TYPE_CHECKING, Any, AsyncGenerator, Awaitable, Callable, Generator import litellm import orjson from anyio import create_memory_object_stream, create_task_group from anyio.streams.memory import MemoryObjectSendStream from litellm import ModelResponseStream from dspy.dsp.utils.settings import settings from dspy.primitives.prediction import Prediction from dspy.streaming.messages import StatusMessage, StatusMessageProvider, StatusStreamingCallback from dspy.streaming.streaming_listener import StreamListener, find_predictor_for_stream_listeners from dspy.utils.asyncify import asyncify logger = logging.getLogger(__name__) if TYPE_CHECKING: from dspy.primitives.module import Module def streamify( program: "Module", status_message_provider: StatusMessageProvider | None = None, stream_listeners: list[StreamListener] | None = None, include_final_prediction_in_output_stream: bool = True, is_async_program: bool = False, async_streaming: bool = True, ) -> Callable[[Any, Any], Awaitable[Any]]: """ Wrap a DSPy program so that it streams its outputs incrementally, rather than returning them all at once. It also provides status messages to the user to indicate the progress of the program, and users can implement their own status message provider to customize the status messages and what module to generate status messages for. Args: program: The DSPy program to wrap with streaming functionality. status_message_provider: A custom status message generator to use instead of the default one. Users can implement their own status message generator to customize the status messages and what module to generate status messages for. stream_listeners: A list of stream listeners to capture the streaming output of specific fields of sub predicts in the program. When provided, only the target fields in the target predict will be streamed to the user. include_final_prediction_in_output_stream: Whether to include the final prediction in the output stream, only useful when `stream_listeners` is provided. If `False`, the final prediction will not be included in the output stream. When the program hit cache, or no listeners captured anything, the final prediction will still be included in the output stream even if this is `False`. is_async_program: Whether the program is async. If `False`, the program will be wrapped with `asyncify`, otherwise the program will be called with `acall`. async_streaming: Whether to return an async generator or a sync generator. If `False`, the streaming will be converted to a sync generator. Returns: A function that takes the same arguments as the original program, but returns an async generator that yields the program's outputs incrementally. Example: ```python import asyncio import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) # Create the program and wrap it with streaming functionality program = dspy.streamify(dspy.Predict("q->a")) # Use the program with streaming output async def use_streaming(): output = program(q="Why did a chicken cross the kitchen?") return_value = None async for value in output: if isinstance(value, dspy.Prediction): return_value = value else: print(value) return return_value output = asyncio.run(use_streaming()) print(output) ``` Example with custom status message provider: ```python import asyncio import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) class MyStatusMessageProvider(StatusMessageProvider): def module_start_status_message(self, instance, inputs): return f"Predicting..." def tool_end_status_message(self, outputs): return f"Tool calling finished with output: {outputs}!" # Create the program and wrap it with streaming functionality program = dspy.streamify(dspy.Predict("q->a"), status_message_provider=MyStatusMessageProvider()) # Use the program with streaming output async def use_streaming(): output = program(q="Why did a chicken cross the kitchen?") return_value = None async for value in output: if isinstance(value, dspy.Prediction): return_value = value else: print(value) return return_value output = asyncio.run(use_streaming()) print(output) ``` Example with stream listeners: ```python import asyncio import dspy dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False)) # Create the program and wrap it with streaming functionality predict = dspy.Predict("question->answer, reasoning") stream_listeners = [ dspy.streaming.StreamListener(signature_field_name="answer"), dspy.streaming.StreamListener(signature_field_name="reasoning"), ] stream_predict = dspy.streamify(predict, stream_listeners=stream_listeners) async def use_streaming(): output = stream_predict( question="why did a chicken cross the kitchen?", include_final_prediction_in_output_stream=False, ) return_value = None async for value in output: if isinstance(value, dspy.Prediction): return_value = value else: print(value) return return_value output = asyncio.run(use_streaming()) print(output) ``` You should see the streaming chunks (in the format of `dspy.streaming.StreamResponse`) in the console output. """ stream_listeners = stream_listeners or [] if len(stream_listeners) > 0: predict_id_to_listener = find_predictor_for_stream_listeners(program, stream_listeners) else: predict_id_to_listener = {} if is_async_program: program = program.acall elif not iscoroutinefunction(program): program = asyncify(program) callbacks = settings.callbacks status_streaming_callback = StatusStreamingCallback(status_message_provider) if not any(isinstance(c, StatusStreamingCallback) for c in callbacks): callbacks.append(status_streaming_callback) async def generator(args, kwargs, stream: MemoryObjectSendStream): with settings.context(send_stream=stream, callbacks=callbacks, stream_listeners=stream_listeners): prediction = await program(*args, **kwargs) await stream.send(prediction) async def async_streamer(*args, **kwargs): send_stream, receive_stream = create_memory_object_stream(16) async with create_task_group() as tg, send_stream, receive_stream: tg.start_soon(generator, args, kwargs, send_stream) async for value in receive_stream: if isinstance(value, ModelResponseStream): if len(predict_id_to_listener) == 0: # No listeners are configured, yield the chunk directly for backwards compatibility. yield value else: # We are receiving a chunk from the LM's response stream, delegate it to the listeners to # determine if we should yield a value to the user. for listener in predict_id_to_listener[value.predict_id]: # In some special cases such as Citation API, it is possible that multiple listeners # return values at the same time due to the chunk buffer of the listener. if output := listener.receive(value): yield output elif isinstance(value, StatusMessage): yield value elif isinstance(value, Prediction): # Flush remaining buffered tokens before yielding the Prediction instance for listener in stream_listeners: if final_chunk := listener.finalize(): yield final_chunk if include_final_prediction_in_output_stream: yield value elif ( len(stream_listeners) == 0 or any(listener.cache_hit for listener in stream_listeners) or not any(listener.stream_start for listener in stream_listeners) ): yield value return else: # This wildcard case allows for customized streaming behavior. # It is useful when a users have a custom LM which returns stream chunks in a custom format. # We let those chunks pass through to the user to handle them as needed. yield value if async_streaming: return async_streamer else: def sync_streamer(*args, **kwargs): output = async_streamer(*args, **kwargs) return apply_sync_streaming(output) return sync_streamer def apply_sync_streaming(async_generator: AsyncGenerator) -> Generator: """Convert the async streaming generator to a sync generator.""" queue = Queue() # Queue to hold items from the async generator stop_sentinel = object() # Sentinel to signal the generator is complete # To propagate prediction request ID context to the child thread context = contextvars.copy_context() def producer(): """Runs in a background thread to fetch items asynchronously.""" async def runner(): try: async for item in async_generator: queue.put(item) finally: # Signal completion queue.put(stop_sentinel) context.run(asyncio.run, runner()) # Start the producer in a background thread thread = threading.Thread(target=producer, daemon=True) thread.start() # Consume items from the queue while True: item = queue.get() # Block until an item is available if item is stop_sentinel: break yield item async def streaming_response(streamer: AsyncGenerator) -> AsyncGenerator: """ Convert a DSPy program output stream to an OpenAI-compatible output stream that can be used by a service as an API response to a streaming request. Args: streamer: An async generator that yields values from a DSPy program output stream. Returns: An async generator that yields OpenAI-compatible streaming response chunks. """ async for value in streamer: if isinstance(value, Prediction): data = {"prediction": dict(value.items(include_dspy=False))} yield f"data: {orjson.dumps(data).decode()}\n\n" elif isinstance(value, litellm.ModelResponseStream): data = {"chunk": value.json()} yield f"data: {orjson.dumps(data).decode()}\n\n" elif isinstance(value, str) and value.startswith("data:"): # The chunk value is an OpenAI-compatible streaming chunk value, # e.g. "data: {"finish_reason": "stop", "index": 0, "is_finished": True, ...}", # so yield it directly yield value else: raise ValueError(f"Unknown chunk value type: {value}") yield "data: [DONE]\n\n" ```