#
tokens: 48895/50000 27/391 files (page 4/14)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── .internal_dspyai
│   │   ├── internals
│   │   │   ├── build-and-release.md
│   │   │   └── release-checklist.md
│   │   └── pyproject.toml
│   ├── .tmp
│   │   └── .generated-actions
│   │       └── run-pypi-publish-in-docker-container
│   │           └── action.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   └── feature_request.yml
│   ├── PULL_REQUEST_TEMPLATE
│   │   └── pull_request_template.md
│   ├── workflow_scripts
│   │   └── install_testpypi_pkg.sh
│   └── workflows
│       ├── build_and_release.yml
│       ├── build_utils
│       │   └── test_version.py
│       ├── docs-push.yml
│       ├── precommits_check.yml
│       └── run_tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── CONTRIBUTING.md
├── docs
│   ├── .gitignore
│   ├── docs
│   │   ├── api
│   │   │   ├── adapters
│   │   │   │   ├── Adapter.md
│   │   │   │   ├── ChatAdapter.md
│   │   │   │   ├── JSONAdapter.md
│   │   │   │   └── TwoStepAdapter.md
│   │   │   ├── evaluation
│   │   │   │   ├── answer_exact_match.md
│   │   │   │   ├── answer_passage_match.md
│   │   │   │   ├── CompleteAndGrounded.md
│   │   │   │   ├── Evaluate.md
│   │   │   │   ├── EvaluationResult.md
│   │   │   │   └── SemanticF1.md
│   │   │   ├── experimental
│   │   │   │   ├── Citations.md
│   │   │   │   └── Document.md
│   │   │   ├── index.md
│   │   │   ├── models
│   │   │   │   ├── Embedder.md
│   │   │   │   └── LM.md
│   │   │   ├── modules
│   │   │   │   ├── BestOfN.md
│   │   │   │   ├── ChainOfThought.md
│   │   │   │   ├── CodeAct.md
│   │   │   │   ├── Module.md
│   │   │   │   ├── MultiChainComparison.md
│   │   │   │   ├── Parallel.md
│   │   │   │   ├── Predict.md
│   │   │   │   ├── ProgramOfThought.md
│   │   │   │   ├── ReAct.md
│   │   │   │   └── Refine.md
│   │   │   ├── optimizers
│   │   │   │   ├── BetterTogether.md
│   │   │   │   ├── BootstrapFewShot.md
│   │   │   │   ├── BootstrapFewShotWithRandomSearch.md
│   │   │   │   ├── BootstrapFinetune.md
│   │   │   │   ├── BootstrapRS.md
│   │   │   │   ├── COPRO.md
│   │   │   │   ├── Ensemble.md
│   │   │   │   ├── GEPA
│   │   │   │   │   ├── GEPA_Advanced.md
│   │   │   │   │   └── overview.md
│   │   │   │   ├── InferRules.md
│   │   │   │   ├── KNN.md
│   │   │   │   ├── KNNFewShot.md
│   │   │   │   ├── LabeledFewShot.md
│   │   │   │   ├── MIPROv2.md
│   │   │   │   └── SIMBA.md
│   │   │   ├── primitives
│   │   │   │   ├── Audio.md
│   │   │   │   ├── Code.md
│   │   │   │   ├── Example.md
│   │   │   │   ├── History.md
│   │   │   │   ├── Image.md
│   │   │   │   ├── Prediction.md
│   │   │   │   ├── Tool.md
│   │   │   │   └── ToolCalls.md
│   │   │   ├── signatures
│   │   │   │   ├── InputField.md
│   │   │   │   ├── OutputField.md
│   │   │   │   └── Signature.md
│   │   │   ├── tools
│   │   │   │   ├── ColBERTv2.md
│   │   │   │   ├── Embeddings.md
│   │   │   │   └── PythonInterpreter.md
│   │   │   └── utils
│   │   │       ├── asyncify.md
│   │   │       ├── configure_cache.md
│   │   │       ├── disable_litellm_logging.md
│   │   │       ├── disable_logging.md
│   │   │       ├── enable_litellm_logging.md
│   │   │       ├── enable_logging.md
│   │   │       ├── inspect_history.md
│   │   │       ├── load.md
│   │   │       ├── StatusMessage.md
│   │   │       ├── StatusMessageProvider.md
│   │   │       ├── streamify.md
│   │   │       └── StreamListener.md
│   │   ├── cheatsheet.md
│   │   ├── community
│   │   │   ├── community-resources.md
│   │   │   ├── how-to-contribute.md
│   │   │   └── use-cases.md
│   │   ├── deep-dive
│   │   │   └── data-handling
│   │   │       ├── built-in-datasets.md
│   │   │       ├── examples.md
│   │   │       ├── img
│   │   │       │   └── data-loading.png
│   │   │       └── loading-custom-data.md
│   │   ├── faqs.md
│   │   ├── index.md
│   │   ├── js
│   │   │   └── runllm-widget.js
│   │   ├── learn
│   │   │   ├── evaluation
│   │   │   │   ├── data.md
│   │   │   │   ├── metrics.md
│   │   │   │   └── overview.md
│   │   │   ├── figures
│   │   │   │   ├── native_tool_call.png
│   │   │   │   └── teleprompter-classes.png
│   │   │   ├── index.md
│   │   │   ├── optimization
│   │   │   │   ├── optimizers.md
│   │   │   │   └── overview.md
│   │   │   └── programming
│   │   │       ├── 7-assertions.md
│   │   │       ├── adapters.md
│   │   │       ├── language_models.md
│   │   │       ├── mcp.md
│   │   │       ├── modules.md
│   │   │       ├── overview.md
│   │   │       ├── signatures.md
│   │   │       └── tools.md
│   │   ├── production
│   │   │   └── index.md
│   │   ├── roadmap.md
│   │   ├── static
│   │   │   ├── .nojekyll
│   │   │   └── img
│   │   │       ├── dspy_logo.png
│   │   │       ├── logo.png
│   │   │       ├── mlflow-tracing-rag.png
│   │   │       ├── modular.png
│   │   │       ├── optimize.png
│   │   │       ├── undraw_docusaurus_mountain.svg
│   │   │       ├── undraw_docusaurus_react.svg
│   │   │       ├── undraw_docusaurus_tree.svg
│   │   │       └── universal_compatibility.png
│   │   ├── stylesheets
│   │   │   └── extra.css
│   │   └── tutorials
│   │       ├── agents
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-agent.png
│   │       ├── ai_text_game
│   │       │   └── index.md
│   │       ├── async
│   │       │   └── index.md
│   │       ├── audio
│   │       │   └── index.ipynb
│   │       ├── build_ai_program
│   │       │   └── index.md
│   │       ├── cache
│   │       │   └── index.md
│   │       ├── classification
│   │       │   └── index.md
│   │       ├── classification_finetuning
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-classification.png
│   │       ├── conversation_history
│   │       │   └── index.md
│   │       ├── core_development
│   │       │   └── index.md
│   │       ├── custom_module
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-custom-module.png
│   │       ├── customer_service_agent
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-customer-service-agent.png
│   │       ├── deployment
│   │       │   ├── dspy_mlflow_ui.png
│   │       │   └── index.md
│   │       ├── email_extraction
│   │       │   ├── index.md
│   │       │   └── mlflow-tracing-email-extraction.png
│   │       ├── entity_extraction
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-entity-extraction.png
│   │       ├── games
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-agent.png
│   │       ├── gepa_ai_program
│   │       │   └── index.md
│   │       ├── gepa_aime
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-aime.png
│   │       │   └── mlflow-tracking-gepa-aime-optimization.png
│   │       ├── gepa_facilitysupportanalyzer
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-support.png
│   │       │   └── mlflow-tracking-gepa-support-optimization.png
│   │       ├── gepa_papillon
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-papilon.png
│   │       │   └── mlflow-tracking-gepa-papilon-optimization.png
│   │       ├── image_generation_prompting
│   │       │   └── index.ipynb
│   │       ├── index.md
│   │       ├── llms_txt_generation
│   │       │   └── index.md
│   │       ├── math
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-math.png
│   │       ├── mcp
│   │       │   └── index.md
│   │       ├── mem0_react_agent
│   │       │   └── index.md
│   │       ├── multihop_search
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-multi-hop.png
│   │       ├── observability
│   │       │   ├── index.md
│   │       │   ├── mlflow_trace_ui_navigation.gif
│   │       │   ├── mlflow_trace_ui.png
│   │       │   └── mlflow_trace_view.png
│   │       ├── optimize_ai_program
│   │       │   └── index.md
│   │       ├── optimizer_tracking
│   │       │   ├── child_run.png
│   │       │   ├── experiment.png
│   │       │   ├── index.md
│   │       │   └── parent_run.png
│   │       ├── output_refinement
│   │       │   └── best-of-n-and-refine.md
│   │       ├── papillon
│   │       │   └── index.md
│   │       ├── program_of_thought
│   │       │   └── index.ipynb
│   │       ├── rag
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-rag.png
│   │       ├── real_world_examples
│   │       │   └── index.md
│   │       ├── rl_ai_program
│   │       │   └── index.md
│   │       ├── rl_multihop
│   │       │   └── index.ipynb
│   │       ├── rl_papillon
│   │       │   └── index.ipynb
│   │       ├── sample_code_generation
│   │       │   └── index.md
│   │       ├── saving
│   │       │   └── index.md
│   │       ├── streaming
│   │       │   └── index.md
│   │       ├── tool_use
│   │       │   └── index.ipynb
│   │       └── yahoo_finance_react
│   │           └── index.md
│   ├── mkdocs.yml
│   ├── overrides
│   │   ├── home.html
│   │   ├── main.html
│   │   └── partials
│   │       └── tabs.html
│   ├── Pipfile
│   ├── Pipfile.lock
│   ├── README.md
│   ├── requirements.txt
│   ├── scripts
│   │   ├── generate_api_docs.py
│   │   └── generate_api_summary.py
│   └── vercel.json
├── dspy
│   ├── __init__.py
│   ├── __metadata__.py
│   ├── adapters
│   │   ├── __init__.py
│   │   ├── baml_adapter.py
│   │   ├── base.py
│   │   ├── chat_adapter.py
│   │   ├── json_adapter.py
│   │   ├── two_step_adapter.py
│   │   ├── types
│   │   │   ├── __init__.py
│   │   │   ├── audio.py
│   │   │   ├── base_type.py
│   │   │   ├── citation.py
│   │   │   ├── code.py
│   │   │   ├── document.py
│   │   │   ├── history.py
│   │   │   ├── image.py
│   │   │   └── tool.py
│   │   ├── utils.py
│   │   └── xml_adapter.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── base_lm.py
│   │   ├── cache.py
│   │   ├── databricks.py
│   │   ├── embedding.py
│   │   ├── lm_local_arbor.py
│   │   ├── lm_local.py
│   │   ├── lm.py
│   │   ├── openai.py
│   │   ├── provider.py
│   │   └── utils_finetune.py
│   ├── datasets
│   │   ├── __init__.py
│   │   ├── alfworld
│   │   │   ├── __init__.py
│   │   │   ├── alfworld.py
│   │   │   └── base_config.yml
│   │   ├── colors.py
│   │   ├── dataloader.py
│   │   ├── dataset.py
│   │   ├── gsm8k.py
│   │   ├── hotpotqa.py
│   │   └── math.py
│   ├── dsp
│   │   ├── __init__.py
│   │   ├── colbertv2.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── dpr.py
│   │       ├── settings.py
│   │       └── utils.py
│   ├── evaluate
│   │   ├── __init__.py
│   │   ├── auto_evaluation.py
│   │   ├── evaluate.py
│   │   └── metrics.py
│   ├── experimental
│   │   └── __init__.py
│   ├── predict
│   │   ├── __init__.py
│   │   ├── aggregation.py
│   │   ├── avatar
│   │   │   ├── __init__.py
│   │   │   ├── avatar.py
│   │   │   ├── models.py
│   │   │   └── signatures.py
│   │   ├── best_of_n.py
│   │   ├── chain_of_thought.py
│   │   ├── code_act.py
│   │   ├── knn.py
│   │   ├── multi_chain_comparison.py
│   │   ├── parallel.py
│   │   ├── parameter.py
│   │   ├── predict.py
│   │   ├── program_of_thought.py
│   │   ├── react.py
│   │   ├── refine.py
│   │   └── retry.py
│   ├── primitives
│   │   ├── __init__.py
│   │   ├── base_module.py
│   │   ├── example.py
│   │   ├── module.py
│   │   ├── prediction.py
│   │   ├── python_interpreter.py
│   │   └── runner.js
│   ├── propose
│   │   ├── __init__.py
│   │   ├── dataset_summary_generator.py
│   │   ├── grounded_proposer.py
│   │   ├── propose_base.py
│   │   └── utils.py
│   ├── retrievers
│   │   ├── __init__.py
│   │   ├── databricks_rm.py
│   │   ├── embeddings.py
│   │   ├── retrieve.py
│   │   └── weaviate_rm.py
│   ├── signatures
│   │   ├── __init__.py
│   │   ├── field.py
│   │   ├── signature.py
│   │   └── utils.py
│   ├── streaming
│   │   ├── __init__.py
│   │   ├── messages.py
│   │   ├── streamify.py
│   │   └── streaming_listener.py
│   ├── teleprompt
│   │   ├── __init__.py
│   │   ├── avatar_optimizer.py
│   │   ├── bettertogether.py
│   │   ├── bootstrap_finetune.py
│   │   ├── bootstrap_trace.py
│   │   ├── bootstrap.py
│   │   ├── copro_optimizer.py
│   │   ├── ensemble.py
│   │   ├── gepa
│   │   │   ├── __init__.py
│   │   │   ├── gepa_utils.py
│   │   │   ├── gepa.py
│   │   │   └── instruction_proposal.py
│   │   ├── grpo.py
│   │   ├── infer_rules.py
│   │   ├── knn_fewshot.py
│   │   ├── mipro_optimizer_v2.py
│   │   ├── random_search.py
│   │   ├── signature_opt.py
│   │   ├── simba_utils.py
│   │   ├── simba.py
│   │   ├── teleprompt_optuna.py
│   │   ├── teleprompt.py
│   │   ├── utils.py
│   │   └── vanilla.py
│   └── utils
│       ├── __init__.py
│       ├── annotation.py
│       ├── asyncify.py
│       ├── caching.py
│       ├── callback.py
│       ├── dummies.py
│       ├── exceptions.py
│       ├── hasher.py
│       ├── inspect_history.py
│       ├── langchain_tool.py
│       ├── logging_utils.py
│       ├── mcp.py
│       ├── parallelizer.py
│       ├── saving.py
│       ├── syncify.py
│       ├── unbatchify.py
│       └── usage_tracker.py
├── LICENSE
├── pyproject.toml
├── README.md
├── tests
│   ├── __init__.py
│   ├── adapters
│   │   ├── test_adapter_utils.py
│   │   ├── test_baml_adapter.py
│   │   ├── test_base_type.py
│   │   ├── test_chat_adapter.py
│   │   ├── test_citation.py
│   │   ├── test_code.py
│   │   ├── test_document.py
│   │   ├── test_json_adapter.py
│   │   ├── test_tool.py
│   │   ├── test_two_step_adapter.py
│   │   └── test_xml_adapter.py
│   ├── callback
│   │   └── test_callback.py
│   ├── clients
│   │   ├── test_cache.py
│   │   ├── test_databricks.py
│   │   ├── test_embedding.py
│   │   ├── test_inspect_global_history.py
│   │   └── test_lm.py
│   ├── conftest.py
│   ├── datasets
│   │   └── test_dataset.py
│   ├── docs
│   │   └── test_mkdocs_links.py
│   ├── evaluate
│   │   ├── test_evaluate.py
│   │   └── test_metrics.py
│   ├── examples
│   │   └── test_baleen.py
│   ├── metadata
│   │   └── test_metadata.py
│   ├── predict
│   │   ├── test_aggregation.py
│   │   ├── test_best_of_n.py
│   │   ├── test_chain_of_thought.py
│   │   ├── test_code_act.py
│   │   ├── test_knn.py
│   │   ├── test_multi_chain_comparison.py
│   │   ├── test_parallel.py
│   │   ├── test_predict.py
│   │   ├── test_program_of_thought.py
│   │   ├── test_react.py
│   │   ├── test_refine.py
│   │   └── test_retry.py
│   ├── primitives
│   │   ├── resources
│   │   │   └── saved_program.json
│   │   ├── test_base_module.py
│   │   ├── test_example.py
│   │   ├── test_module.py
│   │   └── test_python_interpreter.py
│   ├── propose
│   │   └── test_grounded_proposer.py
│   ├── README.md
│   ├── reliability
│   │   ├── __init__.py
│   │   ├── complex_types
│   │   │   └── generated
│   │   │       ├── test_many_types_1
│   │   │       │   ├── inputs
│   │   │       │   │   ├── input1.json
│   │   │       │   │   └── input2.json
│   │   │       │   ├── program.py
│   │   │       │   └── schema.json
│   │   │       ├── test_nesting_1
│   │   │       │   ├── inputs
│   │   │       │   │   ├── input1.json
│   │   │       │   │   └── input2.json
│   │   │       │   ├── program.py
│   │   │       │   └── schema.json
│   │   │       └── test_nesting_2
│   │   │           ├── inputs
│   │   │           │   └── input1.json
│   │   │           ├── program.py
│   │   │           └── schema.json
│   │   ├── conftest.py
│   │   ├── generate
│   │   │   ├── __init__.py
│   │   │   ├── __main__.py
│   │   │   └── utils.py
│   │   ├── input_formats
│   │   │   └── generated
│   │   │       └── test_markdown_1
│   │   │           ├── inputs
│   │   │           │   ├── input1.json
│   │   │           │   └── input2.json
│   │   │           ├── program.py
│   │   │           └── schema.json
│   │   ├── README.md
│   │   ├── reliability_conf.yaml
│   │   ├── test_generated.py
│   │   ├── test_pydantic_models.py
│   │   └── utils.py
│   ├── retrievers
│   │   └── test_embeddings.py
│   ├── signatures
│   │   ├── test_adapter_image.py
│   │   ├── test_custom_types.py
│   │   └── test_signature.py
│   ├── streaming
│   │   └── test_streaming.py
│   ├── teleprompt
│   │   ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json
│   │   ├── gepa_dummy_lm.json
│   │   ├── test_bootstrap_finetune.py
│   │   ├── test_bootstrap_trace.py
│   │   ├── test_bootstrap.py
│   │   ├── test_copro_optimizer.py
│   │   ├── test_ensemble.py
│   │   ├── test_finetune.py
│   │   ├── test_gepa_instruction_proposer.py
│   │   ├── test_gepa.py
│   │   ├── test_grpo.py
│   │   ├── test_knn_fewshot.py
│   │   ├── test_random_search.py
│   │   ├── test_teleprompt.py
│   │   └── test_utils.py
│   ├── test_utils
│   │   ├── __init__.py
│   │   └── server
│   │       ├── __init__.py
│   │       ├── litellm_server_config.yaml
│   │       └── litellm_server.py
│   └── utils
│       ├── __init__.py
│       ├── resources
│       │   └── mcp_server.py
│       ├── test_annotation.py
│       ├── test_asyncify.py
│       ├── test_exceptions.py
│       ├── test_langchain_tool.py
│       ├── test_mcp.py
│       ├── test_parallelizer.py
│       ├── test_saving.py
│       ├── test_settings.py
│       ├── test_syncify.py
│       ├── test_unbatchify.py
│       └── test_usage_tracker.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/adapters/test_citation.py:
--------------------------------------------------------------------------------

```python
import pydantic
import pytest

import dspy
from dspy.experimental import Citations


def test_citation_validate_input():
    citation = Citations.Citation(
        cited_text="The Earth orbits the Sun.",
        document_index=0,
        start_char_index=0,
        end_char_index=23,
        supported_text="The Earth orbits the Sun."
    )
    assert citation.cited_text == "The Earth orbits the Sun."
    assert citation.document_index == 0
    assert citation.start_char_index == 0
    assert citation.end_char_index == 23
    assert citation.type == "char_location"
    assert citation.supported_text == "The Earth orbits the Sun."

    with pytest.raises(pydantic.ValidationError):
        Citations.Citation(cited_text="text")


def test_citations_in_nested_type():
    class Wrapper(pydantic.BaseModel):
        citations: Citations

    citation = Citations.Citation(
        cited_text="Hello, world!",
        document_index=0,
        start_char_index=0,
        end_char_index=13,
        supported_text="Hello, world!"
    )
    citations = Citations(citations=[citation])
    wrapper = Wrapper(citations=citations)
    assert wrapper.citations.citations[0].cited_text == "Hello, world!"


def test_citation_with_all_fields():
    citation = Citations.Citation(
        cited_text="Water boils at 100°C.",
        document_index=1,
        document_title="Physics Facts",
        start_char_index=10,
        end_char_index=31,
        supported_text="Water boils at 100°C."
    )
    assert citation.cited_text == "Water boils at 100°C."
    assert citation.document_index == 1
    assert citation.document_title == "Physics Facts"
    assert citation.start_char_index == 10
    assert citation.end_char_index == 31
    assert citation.supported_text == "Water boils at 100°C."


def test_citation_format():
    citation = Citations.Citation(
        cited_text="The sky is blue.",
        document_index=0,
        document_title="Weather Guide",
        start_char_index=5,
        end_char_index=21,
        supported_text="The sky is blue."
    )

    formatted = citation.format()

    assert formatted["type"] == "char_location"
    assert formatted["cited_text"] == "The sky is blue."
    assert formatted["document_index"] == 0
    assert formatted["document_title"] == "Weather Guide"
    assert formatted["start_char_index"] == 5
    assert formatted["end_char_index"] == 21
    assert formatted["supported_text"] == "The sky is blue."


def test_citations_format():
    citations = Citations(citations=[
        Citations.Citation(
            cited_text="First citation",
            document_index=0,
            start_char_index=0,
            end_char_index=14,
            supported_text="First citation"
        ),
        Citations.Citation(
            cited_text="Second citation",
            document_index=1,
            document_title="Source",
            start_char_index=20,
            end_char_index=35,
            supported_text="Second citation"
        )
    ])

    formatted = citations.format()

    assert isinstance(formatted, list)
    assert len(formatted) == 2
    assert formatted[0]["cited_text"] == "First citation"
    assert formatted[1]["cited_text"] == "Second citation"
    assert formatted[1]["document_title"] == "Source"


def test_citations_from_dict_list():
    citations_data = [
        {
            "cited_text": "The sky is blue",
            "document_index": 0,
            "document_title": "Weather Guide",
            "start_char_index": 0,
            "end_char_index": 15,
            "supported_text": "The sky was blue yesterday."
        }
    ]

    citations = Citations.from_dict_list(citations_data)

    assert len(citations.citations) == 1
    assert citations.citations[0].cited_text == "The sky is blue"
    assert citations.citations[0].document_title == "Weather Guide"


def test_citations_postprocessing():
    from dspy.adapters.chat_adapter import ChatAdapter
    from dspy.signatures.signature import Signature

    class CitationSignature(Signature):
        """Test signature with citations."""
        question: str = dspy.InputField()
        answer: str = dspy.OutputField()
        citations: Citations = dspy.OutputField()

    adapter = ChatAdapter(native_response_types=[Citations])

    outputs = [{
        "text": "[[ ## answer ## ]]\nThe answer is blue.\n\n[[ ## citations ## ]]\n[]",
        "citations": [
            {
                "cited_text": "The sky is blue",
                "document_index": 0,
                "document_title": "Weather Guide",
                "start_char_index": 10,
                "end_char_index": 25,
                "supported_text": "The sky is blue"
            }
        ]
    }]

    result = adapter._call_postprocess(
        CitationSignature.delete("citations"),
        CitationSignature,
        outputs,
        dspy.LM(model="claude-3-5-sonnet-20241022")
    )

    assert len(result) == 1
    assert "citations" in result[0]
    assert isinstance(result[0]["citations"], Citations)
    assert len(result[0]["citations"]) == 1
    assert result[0]["citations"][0].cited_text == "The sky is blue"


def test_citation_extraction_from_lm_response():
    from unittest.mock import MagicMock

    mock_choice = MagicMock(message=MagicMock(provider_specific_fields={"citations": [[
        {
            "type": "char_location",
            "cited_text": "The sky is blue",
            "document_index": 0,
            "document_title": "Weather Guide",
            "start_char_index": 10,
            "end_char_index": 25,
            "supported_text": "The sky is blue"
        }
    ]]}))

    lm = dspy.LM(model="test")
    citations = lm._extract_citations_from_response(mock_choice)

    assert citations is not None
    assert len(citations) == 1
    assert citations[0]["cited_text"] == "The sky is blue"
    assert citations[0]["document_index"] == 0
    assert citations[0]["document_title"] == "Weather Guide"
    assert citations[0]["start_char_index"] == 10
    assert citations[0]["end_char_index"] == 25
    assert citations[0]["supported_text"] == "The sky is blue"

```

--------------------------------------------------------------------------------
/dspy/primitives/prediction.py:
--------------------------------------------------------------------------------

```python
from dspy.primitives.example import Example


class Prediction(Example):
    """A prediction object that contains the output of a DSPy module.
    
    Prediction inherits from Example.
    
    To allow feedback-augmented scores, Prediction supports comparison operations
    (<, >, <=, >=) for Predictions with a `score` field. The comparison operations
    compare the 'score' values as floats. For equality comparison, Predictions are equal
    if their underlying data stores are equal (inherited from Example).
    
    Arithmetic operations (+, /, etc.) are also supported for Predictions with a 'score'
    field, operating on the score value.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        del self._demos
        del self._input_keys

        self._completions = None
        self._lm_usage = None

    def get_lm_usage(self):
        return self._lm_usage

    def set_lm_usage(self, value):
        self._lm_usage = value

    @classmethod
    def from_completions(cls, list_or_dict, signature=None):
        obj = cls()
        obj._completions = Completions(list_or_dict, signature=signature)
        obj._store = {k: v[0] for k, v in obj._completions.items()}

        return obj

    def __repr__(self):
        store_repr = ",\n    ".join(f"{k}={v!r}" for k, v in self._store.items())

        if self._completions is None or len(self._completions) == 1:
            return f"Prediction(\n    {store_repr}\n)"

        num_completions = len(self._completions)
        return f"Prediction(\n    {store_repr},\n    completions=Completions(...)\n) ({num_completions-1} completions omitted)"

    def __str__(self):
        return self.__repr__()

    def __float__(self):
        if "score" not in self._store:
            raise ValueError("Prediction object does not have a 'score' field to convert to float.")
        return float(self._store["score"])

    def __add__(self, other):
        if isinstance(other, (float, int)):
            return self.__float__() + other
        elif isinstance(other, Prediction):
            return self.__float__() + float(other)
        raise TypeError(f"Unsupported type for addition: {type(other)}")

    def __radd__(self, other):
        if isinstance(other, (float, int)):
            return other + self.__float__()
        elif isinstance(other, Prediction):
            return float(other) + self.__float__()
        raise TypeError(f"Unsupported type for addition: {type(other)}")

    def __truediv__(self, other):
        if isinstance(other, (float, int)):
            return self.__float__() / other
        elif isinstance(other, Prediction):
            return self.__float__() / float(other)
        raise TypeError(f"Unsupported type for division: {type(other)}")

    def __rtruediv__(self, other):
        if isinstance(other, (float, int)):
            return other / self.__float__()
        elif isinstance(other, Prediction):
            return float(other) / self.__float__()
        raise TypeError(f"Unsupported type for division: {type(other)}")

    def __lt__(self, other):
        if isinstance(other, (float, int)):
            return self.__float__() < other
        elif isinstance(other, Prediction):
            return self.__float__() < float(other)
        raise TypeError(f"Unsupported type for comparison: {type(other)}")

    def __le__(self, other):
        if isinstance(other, (float, int)):
            return self.__float__() <= other
        elif isinstance(other, Prediction):
            return self.__float__() <= float(other)
        raise TypeError(f"Unsupported type for comparison: {type(other)}")

    def __gt__(self, other):
        if isinstance(other, (float, int)):
            return self.__float__() > other
        elif isinstance(other, Prediction):
            return self.__float__() > float(other)
        raise TypeError(f"Unsupported type for comparison: {type(other)}")

    def __ge__(self, other):
        if isinstance(other, (float, int)):
            return self.__float__() >= other
        elif isinstance(other, Prediction):
            return self.__float__() >= float(other)
        raise TypeError(f"Unsupported type for comparison: {type(other)}")

    @property
    def completions(self):
        return self._completions


class Completions:
    def __init__(self, list_or_dict, signature=None):
        self.signature = signature

        if isinstance(list_or_dict, list):
            kwargs = {}
            for arg in list_or_dict:
                for k, v in arg.items():
                    kwargs.setdefault(k, []).append(v)
        else:
            kwargs = list_or_dict

        assert all(isinstance(v, list) for v in kwargs.values()), "All values must be lists"

        if kwargs:
            length = len(next(iter(kwargs.values())))
            assert all(len(v) == length for v in kwargs.values()), "All lists must have the same length"

        self._completions = kwargs

    def items(self):
        return self._completions.items()

    def __getitem__(self, key):
        if isinstance(key, int):
            if key < 0 or key >= len(self):
                raise IndexError("Index out of range")

            return Prediction(**{k: v[key] for k, v in self._completions.items()})

        return self._completions[key]

    def __getattr__(self, name):
        if name == "_completions":
            raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
        if name in self._completions:
            return self._completions[name]

        raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")

    def __len__(self):
        # Return the length of the list for one of the keys
        # It assumes all lists have the same length
        return len(next(iter(self._completions.values())))

    def __contains__(self, key):
        return key in self._completions

    def __repr__(self):
        items_repr = ",\n    ".join(f"{k}={v!r}" for k, v in self._completions.items())
        return f"Completions(\n    {items_repr}\n)"

    def __str__(self):
        # return str(self._completions)
        return self.__repr__()

```

--------------------------------------------------------------------------------
/tests/teleprompt/test_copro_optimizer.py:
--------------------------------------------------------------------------------

```python
import dspy
from dspy import Example
from dspy.teleprompt.signature_opt import COPRO
from dspy.utils.dummies import DummyLM


# Define a simple metric function for testing
def simple_metric(example, prediction):
    # Simplified metric for testing: true if prediction matches expected output
    return example.output == prediction.output


# Example training and validation sets
trainset = [
    Example(input="Question: What is the color of the sky?", output="blue").with_inputs("input"),
    Example(input="Question: What does the fox say?", output="Ring-ding-ding-ding-dingeringeding!").with_inputs(
        "input"
    ),
]


def test_signature_optimizer_initialization():
    optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4)
    assert optimizer.metric == simple_metric, "Metric not correctly initialized"
    assert optimizer.breadth == 2, "Breadth not correctly initialized"
    assert optimizer.depth == 1, "Depth not correctly initialized"
    assert optimizer.init_temperature == 1.4, "Initial temperature not correctly initialized"


class SimpleModule(dspy.Module):
    def __init__(self, signature):
        super().__init__()
        # COPRO doesn't work with dspy.Predict
        self.predictor = dspy.ChainOfThought(signature)

    def forward(self, **kwargs):
        return self.predictor(**kwargs)


def test_signature_optimizer_optimization_process():
    optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4)
    dspy.settings.configure(
        lm=DummyLM(
            [
                {
                    "proposed_instruction": "Optimized instruction 1",
                    "proposed_prefix_for_output_field": "Optimized instruction 2",
                },
            ]
        )
    )

    student = SimpleModule("input -> output")

    # Assuming the compile method of COPRO requires a student module, a development set, and evaluation kwargs
    optimized_student = optimizer.compile(
        student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False}
    )

    # Check that the optimized student has been modified from the original
    # This check can be more specific based on how the optimization modifies the student
    assert optimized_student is not student, "Optimization did not modify the student"

    # Further tests can be added to verify the specifics of the optimization process,
    # such as checking the instructions of the optimized student's predictors.


def test_signature_optimizer_statistics_tracking():
    optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4)
    optimizer.track_stats = True  # Enable statistics tracking

    dspy.settings.configure(
        lm=DummyLM(
            [
                {
                    "proposed_instruction": "Optimized instruction 1",
                    "proposed_prefix_for_output_field": "Optimized instruction 2",
                },
            ]
        )
    )
    student = SimpleModule("input -> output")
    optimized_student = optimizer.compile(
        student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False}
    )

    # Verify that statistics have been tracked and attached to the optimized student
    assert hasattr(optimized_student, "total_calls"), "Total calls statistic not tracked"
    assert hasattr(optimized_student, "results_best"), "Best results statistics not tracked"


# Assuming the setup_signature_optimizer fixture and simple_metric function are defined as before


def test_optimization_and_output_verification():
    lm = DummyLM(
        [
            {"proposed_instruction": "Optimized Prompt", "proposed_prefix_for_output_field": "Optimized Prefix"},
            {"reasoning": "france", "output": "Paris"},
            {"reasoning": "france", "output": "Paris"},
            {"reasoning": "france", "output": "Paris"},
            {"reasoning": "france", "output": "Paris"},
            {"reasoning": "france", "output": "Paris"},
            {"reasoning": "france", "output": "Paris"},
            {"reasoning": "france", "output": "Paris"},
        ]
    )
    dspy.settings.configure(lm=lm)
    optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4)

    student = SimpleModule("input -> output")

    # Compile the student with the optimizer
    optimized_student = optimizer.compile(
        student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False}
    )

    # Simulate calling the optimized student with a new input
    test_input = "What is the capital of France?"
    prediction = optimized_student(input=test_input)

    print(lm.get_convo(-1))

    assert prediction.output == "Paris"


def test_statistics_tracking_during_optimization():
    dspy.settings.configure(
        lm=DummyLM(
            [
                {"proposed_instruction": "Optimized Prompt", "proposed_prefix_for_output_field": "Optimized Prefix"},
            ]
        )
    )

    optimizer = COPRO(metric=simple_metric, breadth=2, depth=1, init_temperature=1.4)
    optimizer.track_stats = True  # Enable statistics tracking

    student = SimpleModule("input -> output")
    optimized_student = optimizer.compile(
        student, trainset=trainset, eval_kwargs={"num_threads": 1, "display_progress": False}
    )

    # Verify that statistics have been tracked
    assert hasattr(optimized_student, "total_calls"), "Optimizer did not track total metric calls"
    assert optimized_student.total_calls > 0, "Optimizer reported no metric calls"

    # Check if the results_best and results_latest contain valid statistics
    assert "results_best" in optimized_student.__dict__, "Optimizer did not track the best results"
    assert "results_latest" in optimized_student.__dict__, "Optimizer did not track the latest results"
    assert len(optimized_student.results_best) > 0, "Optimizer did not properly populate the best results statistics"
    assert (
        len(optimized_student.results_latest) > 0
    ), "Optimizer did not properly populate the latest results statistics"

    # Additional detailed checks can be added here to verify the contents of the tracked statistics

```

--------------------------------------------------------------------------------
/docs/docs/learn/evaluation/metrics.md:
--------------------------------------------------------------------------------

```markdown
---
sidebar_position: 5
---

# Metrics

DSPy is a machine learning framework, so you must think about your **automatic metrics** for evaluation (to track your progress) and optimization (so DSPy can make your programs more effective).


## What is a metric and how do I define a metric for my task?

A metric is just a function that will take examples from your data and the output of your system and return a score that quantifies how good the output is. What makes outputs from your system good or bad? 

For simple tasks, this could be just "accuracy" or "exact match" or "F1 score". This may be the case for simple classification or short-form QA tasks.

However, for most applications, your system will output long-form outputs. There, your metric should probably be a smaller DSPy program that checks multiple properties of the output (quite possibly using AI feedback from LMs).

Getting this right on the first try is unlikely, but you should start with something simple and iterate. 


## Simple metrics

A DSPy metric is just a function in Python that takes `example` (e.g., from your training or dev set) and the output `pred` from your DSPy program, and outputs a `float` (or `int` or `bool`) score.

Your metric should also accept an optional third argument called `trace`. You can ignore this for a moment, but it will enable some powerful tricks if you want to use your metric for optimization.

Here's a simple example of a metric that's comparing `example.answer` and `pred.answer`. This particular metric will return a `bool`.

```python
def validate_answer(example, pred, trace=None):
    return example.answer.lower() == pred.answer.lower()
```

Some people find these utilities (built-in) convenient:

- `dspy.evaluate.metrics.answer_exact_match`
- `dspy.evaluate.metrics.answer_passage_match`

Your metrics could be more complex, e.g. check for multiple properties. The metric below will return a `float` if `trace is None` (i.e., if it's used for evaluation or optimization), and will return a `bool` otherwise (i.e., if it's used to bootstrap demonstrations).

```python
def validate_context_and_answer(example, pred, trace=None):
    # check the gold label and the predicted answer are the same
    answer_match = example.answer.lower() == pred.answer.lower()

    # check the predicted answer comes from one of the retrieved contexts
    context_match = any((pred.answer.lower() in c) for c in pred.context)

    if trace is None: # if we're doing evaluation or optimization
        return (answer_match + context_match) / 2.0
    else: # if we're doing bootstrapping, i.e. self-generating good demonstrations of each step
        return answer_match and context_match
```

Defining a good metric is an iterative process, so doing some initial evaluations and looking at your data and outputs is key.


## Evaluation

Once you have a metric, you can run evaluations in a simple Python loop.

```python
scores = []
for x in devset:
    pred = program(**x.inputs())
    score = metric(x, pred)
    scores.append(score)
```

If you need some utilities, you can also use the built-in `Evaluate` utility. It can help with things like parallel evaluation (multiple threads) or showing you a sample of inputs/outputs and the metric scores.

```python
from dspy.evaluate import Evaluate

# Set up the evaluator, which can be re-used in your code.
evaluator = Evaluate(devset=YOUR_DEVSET, num_threads=1, display_progress=True, display_table=5)

# Launch evaluation.
evaluator(YOUR_PROGRAM, metric=YOUR_METRIC)
```


## Intermediate: Using AI feedback for your metric

For most applications, your system will output long-form outputs, so your metric should check multiple dimensions of the output using AI feedback from LMs.

This simple signature could come in handy.

```python
# Define the signature for automatic assessments.
class Assess(dspy.Signature):
    """Assess the quality of a tweet along the specified dimension."""

    assessed_text = dspy.InputField()
    assessment_question = dspy.InputField()
    assessment_answer: bool = dspy.OutputField()
```

For example, below is a simple metric that checks a generated tweet (1) answers a given question correctly and (2) whether it's also engaging. We also check that (3) `len(tweet) <= 280` characters.

```python
def metric(gold, pred, trace=None):
    question, answer, tweet = gold.question, gold.answer, pred.output

    engaging = "Does the assessed text make for a self-contained, engaging tweet?"
    correct = f"The text should answer `{question}` with `{answer}`. Does the assessed text contain this answer?"
    
    correct =  dspy.Predict(Assess)(assessed_text=tweet, assessment_question=correct)
    engaging = dspy.Predict(Assess)(assessed_text=tweet, assessment_question=engaging)

    correct, engaging = [m.assessment_answer for m in [correct, engaging]]
    score = (correct + engaging) if correct and (len(tweet) <= 280) else 0

    if trace is not None: return score >= 2
    return score / 2.0
```

When compiling, `trace is not None`, and we want to be strict about judging things, so we will only return `True` if `score >= 2`. Otherwise, we return a score out of 1.0 (i.e., `score / 2.0`).


## Advanced: Using a DSPy program as your metric

If your metric is itself a DSPy program, one of the most powerful ways to iterate is to compile (optimize) your metric itself. That's usually easy because the output of the metric is usually a simple value (e.g., a score out of 5) so the metric's metric is easy to define and optimize by collecting a few examples.



### Advanced: Accessing the `trace`

When your metric is used during evaluation runs, DSPy will not try to track the steps of your program.

But during compiling (optimization), DSPy will trace your LM calls. The trace will contain inputs/outputs to each DSPy predictor and you can leverage that to validate intermediate steps for optimization.


```python
def validate_hops(example, pred, trace=None):
    hops = [example.question] + [outputs.query for *_, outputs in trace if 'query' in outputs]

    if max([len(h) for h in hops]) > 100: return False
    if any(dspy.evaluate.answer_exact_match_str(hops[idx], hops[:idx], frac=0.8) for idx in range(2, len(hops))): return False

    return True
```

```

--------------------------------------------------------------------------------
/tests/predict/test_parallel.py:
--------------------------------------------------------------------------------

```python
import dspy
from dspy.utils.dummies import DummyLM


def test_parallel_module():
    lm = DummyLM(
        [
            {"output": "test output 1"},
            {"output": "test output 2"},
            {"output": "test output 3"},
            {"output": "test output 4"},
            {"output": "test output 5"},
        ]
    )
    dspy.settings.configure(lm=lm)

    class MyModule(dspy.Module):
        def __init__(self):
            super().__init__()
            self.predictor = dspy.Predict("input -> output")
            self.predictor2 = dspy.Predict("input -> output")

            self.parallel = dspy.Parallel(num_threads=2)

        def forward(self, input):
            return self.parallel(
                [
                    (self.predictor, input),
                    (self.predictor2, input),
                    (self.predictor, input),
                    (self.predictor2, input),
                    (self.predictor, input),
                ]
            )

    output = MyModule()(dspy.Example(input="test input").with_inputs("input"))

    expected_outputs = {f"test output {i}" for i in range(1, 6)}
    assert {r.output for r in output} == expected_outputs


def test_batch_module():
    lm = DummyLM(
        [
            {"output": "test output 1"},
            {"output": "test output 2"},
            {"output": "test output 3"},
            {"output": "test output 4"},
            {"output": "test output 5"},
        ]
    )
    res_lm = DummyLM(
        [
            {"output": "test output 1", "reasoning": "test reasoning 1"},
            {"output": "test output 2", "reasoning": "test reasoning 2"},
            {"output": "test output 3", "reasoning": "test reasoning 3"},
            {"output": "test output 4", "reasoning": "test reasoning 4"},
            {"output": "test output 5", "reasoning": "test reasoning 5"},
        ]
    )

    class MyModule(dspy.Module):
        def __init__(self):
            super().__init__()
            self.predictor = dspy.Predict("input -> output")
            self.predictor2 = dspy.Predict("input -> output, reasoning")

            self.parallel = dspy.Parallel(num_threads=2)

        def forward(self, input):
            with dspy.context(lm=lm):
                res1 = self.predictor.batch([input] * 5)

            with dspy.context(lm=res_lm):
                res2 = self.predictor2.batch([input] * 5)

            return (res1, res2)

    result, reason_result = MyModule()(dspy.Example(input="test input").with_inputs("input"))

    # Check that we got all expected outputs without caring about order
    expected_outputs = {f"test output {i}" for i in range(1, 6)}
    assert {r.output for r in result} == expected_outputs
    assert {r.output for r in reason_result} == expected_outputs

    # Check that reasoning matches outputs for reason_result
    for r in reason_result:
        num = r.output.split()[-1]  # get the number from "test output X"
        assert r.reasoning == f"test reasoning {num}"


def test_nested_parallel_module():
    lm = DummyLM(
        [
            {"output": "test output 1"},
            {"output": "test output 2"},
            {"output": "test output 3"},
            {"output": "test output 4"},
            {"output": "test output 5"},
        ]
    )
    dspy.settings.configure(lm=lm)

    class MyModule(dspy.Module):
        def __init__(self):
            super().__init__()
            self.predictor = dspy.Predict("input -> output")
            self.predictor2 = dspy.Predict("input -> output")

            self.parallel = dspy.Parallel(num_threads=2)

        def forward(self, input):
            return self.parallel(
                [
                    (self.predictor, input),
                    (self.predictor2, input),
                    (
                        self.parallel,
                        [
                            (self.predictor2, input),
                            (self.predictor, input),
                        ],
                    ),
                ]
            )

    output = MyModule()(dspy.Example(input="test input").with_inputs("input"))

    # For nested structure, check first two outputs and nested outputs separately
    assert {output[0].output, output[1].output} <= {f"test output {i}" for i in range(1, 5)}
    assert {output[2][0].output, output[2][1].output} <= {f"test output {i}" for i in range(1, 5)}
    all_outputs = {output[0].output, output[1].output, output[2][0].output, output[2][1].output}
    assert len(all_outputs) == 4


def test_nested_batch_method():
    lm = DummyLM(
        [
            {"output": "test output 1"},
            {"output": "test output 2"},
            {"output": "test output 3"},
            {"output": "test output 4"},
            {"output": "test output 5"},
        ]
    )
    dspy.settings.configure(lm=lm)

    class MyModule(dspy.Module):
        def __init__(self):
            super().__init__()
            self.predictor = dspy.Predict("input -> output")

        def forward(self, input):
            res = self.predictor.batch([dspy.Example(input=input).with_inputs("input")] * 2)

            return res

    result = MyModule().batch([dspy.Example(input="test input").with_inputs("input")] * 2)

    assert {result[0][0].output, result[0][1].output, result[1][0].output, result[1][1].output} == {
        "test output 1",
        "test output 2",
        "test output 3",
        "test output 4",
    }


def test_batch_with_failed_examples():
    class FailingModule(dspy.Module):
        def forward(self, value: int) -> str:
            if value == 42:
                raise ValueError("test error")
            return f"success-{value}"

    module = FailingModule()

    examples = [
        dspy.Example(value=1).with_inputs("value"),
        dspy.Example(value=42).with_inputs("value"),  # This will fail
        dspy.Example(value=3).with_inputs("value"),
    ]

    results, failed_examples, exceptions = module.batch(
        examples,
        return_failed_examples=True,
        provide_traceback=True,
    )

    assert results == ["success-1", None, "success-3"]

    assert len(failed_examples) == 1
    assert failed_examples[0].inputs()["value"] == 42

    assert len(exceptions) == 1
    assert isinstance(exceptions[0], ValueError)
    assert str(exceptions[0]) == "test error"

```

--------------------------------------------------------------------------------
/.github/workflows/build_and_release.yml:
--------------------------------------------------------------------------------

```yaml
---
name: Publish Python 🐍 distributions 📦 to PyPI
on:
  push:
    tags:
      - "*"
jobs:
  extract-tag:
    runs-on: ubuntu-latest
    outputs:
      version: ${{ steps.extract_tag.outputs.tag }}
    steps:
      - uses: actions/checkout@v4
      - id: extract_tag
        name: Extract tag name
        run: echo "tag=$(echo $GITHUB_REF | cut -d / -f 3)" >> "$GITHUB_OUTPUT"

  build-and-publish-test-pypi:
    needs: extract-tag
    runs-on: ubuntu-latest
    environment:
      name: pypi
    permissions:
      id-token: write # IMPORTANT: mandatory for trusted publishing
      contents: write
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python 3.11
        uses: actions/setup-python@v3
        with:
          python-version: "3.11"
      - name: Install dependencies
        run: python3 -m pip install --upgrade setuptools wheel twine build semver packaging
      - name: Get correct version for TestPyPI release
        id: check_version
        run: |
          VERSION=${{ needs.extract-tag.outputs.version }}  
          PACKAGE_NAME="dspy-ai-test"
          echo "Checking if $VERSION for $PACKAGE_NAME exists on TestPyPI"  
          NEW_VERSION=$(python3 .github/workflows/build_utils/test_version.py $PACKAGE_NAME $VERSION)  
          echo "Version to be used for TestPyPI release: $NEW_VERSION"  
          echo "version=$NEW_VERSION" >> "$GITHUB_OUTPUT"
      - name: Update version in pyproject.toml
        run: sed -i '/#replace_package_version_marker/{n;s/version="[^"]*"/version="${{ steps.check_version.outputs.version }}"/;}' pyproject.toml
      - name: Update package name in pyproject.toml
        run: sed -i '/#replace_package_name_marker/{n;s/name="[^"]*"/name="dspy-ai-test"/;}' pyproject.toml
      - name: Build a binary wheel
        run: python3 -m build
      # Test the locally built wheel
      - name: Create test environment
        run: python -m venv test_before_testpypi
      - name: Test package installation and functionality
        run: |
          source test_before_testpypi/bin/activate
          # Install the locally built wheel and testing dependencies
          pip install dist/*.whl pytest pytest-asyncio
          pytest tests/metadata/test_metadata.py tests/predict
          deactivate
      # Publish to test-PyPI
      - name: Publish distribution 📦 to test-PyPI
        uses: pypa/gh-action-pypi-publish@release/v1 # This requires a trusted publisher to be setup in pypi/testpypi
        with:
          repository-url: https://test.pypi.org/legacy/

  # TODO: Add tests using dspy-ai-test

  build-and-publish-pypi:
    needs: [extract-tag, build-and-publish-test-pypi]
    # Only publish to PyPI if the repository owner is stanfordnlp
    if: github.repository_owner == 'stanfordnlp'
    runs-on: ubuntu-latest
    environment:
      name: pypi
    permissions:
      id-token: write # IMPORTANT: mandatory for trusted publishing
      contents: write
    steps:
      - uses: actions/checkout@v4
      - name: Set up Python 3.11
        uses: actions/setup-python@v3
        with:
          python-version: "3.11"
      - name: Install dependencies
        run: python3 -m pip install --upgrade setuptools wheel twine build
      - name: Update version in pyproject.toml
        run: sed -i '/#replace_package_version_marker/{n;s/version *= *"[^"]*"/version="${{ needs.extract-tag.outputs.version }}"/;}' pyproject.toml
      - name: Update version in __metadata__.py
        run: sed -i '/#replace_package_version_marker/{n;s/__version__ *= *"[^"]*"/__version__="${{ needs.extract-tag.outputs.version }}"/;}' ./dspy/__metadata__.py
      # Publish to dspy
      - name: Update package name in pyproject.toml
        run: sed -i '/#replace_package_name_marker/{n;s/name *= *"[^"]*"/name="dspy"/;}' pyproject.toml
      - name: Update package name in metadata.py
        run: sed -i '/#replace_package_name_marker/{n;s/__name__ *= *"[^"]*"/__name__="dspy"/;}' ./dspy/__metadata__.py
      - name: Build a binary wheel
        run: python3 -m build
      # Test the locally built wheel before publishing to pypi
      - name: Create test environment
        run: python -m venv test_before_pypi
      - name: Test package installation and functionality
        run: |
          source test_before_pypi/bin/activate
          # Install the locally built wheel and testing dependencies
          pip install dist/*.whl pytest pytest-asyncio
          pytest tests/metadata/test_metadata.py tests/predict
          deactivate
          rm -r test_before_pypi
      - name: Publish distribution 📦 to PyPI (dspy)
        uses: pypa/gh-action-pypi-publish@release/v1
        with:
          attestations: false
      # Publish to dspy-ai
      - name: Update package name in pyproject.toml
        run: sed -i '/#replace_package_name_marker/{n;s/name *= *"[^"]*"/name="dspy-ai"/;}' .github/.internal_dspyai/pyproject.toml
      - name: Update version for dspy-ai release
        run: sed -i '/#replace_package_version_marker/{n;s/version *= *"[^"]*"/version="${{ needs.extract-tag.outputs.version }}"/;}' .github/.internal_dspyai/pyproject.toml
      - name: Update dspy dependency for dspy-ai release
        run: |
          sed -i '/#replace_dspy_version_marker/{n;s/dspy>=[^"]*/dspy>=${{ needs.extract-tag.outputs.version }}/;}' .github/.internal_dspyai/pyproject.toml
      - name: Build a binary wheel (dspy-ai)
        run: python3 -m build .github/.internal_dspyai/
      - name: Publish distribution 📦 to PyPI (dspy-ai)
        uses: pypa/gh-action-pypi-publish@release/v1
        with:
          attestations: false
          packages-dir: .github/.internal_dspyai/dist/
      - uses: stefanzweifel/git-auto-commit-action@v5 # auto commit changes to main
        with:
          commit_message: Update versions
          create_branch: true
          branch: release-${{ needs.extract-tag.outputs.version }}
      - name: Checkout main branch
        run: |
          git fetch origin
          git checkout main
      - name: Configure git user
        run: |
          git config --global user.email "[email protected]"
          git config --global user.name "Github Actions"
      - name: Merge release branch into main
        run: |
          git merge --no-ff release-${{ needs.extract-tag.outputs.version }}
      - name: Push changes to main
        run: |
          git push origin main

```

--------------------------------------------------------------------------------
/dspy/teleprompt/infer_rules.py:
--------------------------------------------------------------------------------

```python
import logging
import random

import numpy as np

import dspy
from dspy.evaluate.evaluate import Evaluate
from dspy.teleprompt import BootstrapFewShot

logger = logging.getLogger(__name__)


class InferRules(BootstrapFewShot):
    def __init__(self, num_candidates=10, num_rules=10, num_threads=None, teacher_settings=None, **kwargs):
        super().__init__(teacher_settings=teacher_settings, **kwargs)

        self.num_candidates = num_candidates
        self.num_rules = num_rules
        self.num_threads = num_threads
        self.rules_induction_program = RulesInductionProgram(num_rules, teacher_settings=teacher_settings)
        self.metric = kwargs.get("metric")
        self.max_errors = kwargs.get("max_errors")

    def compile(self, student, *, teacher=None, trainset, valset=None):
        if valset is None:
            train_size = int(0.5 * len(trainset))
            trainset, valset = trainset[:train_size], trainset[train_size:]

        super().compile(student, teacher=teacher, trainset=trainset)

        original_program = self.student.deepcopy()
        all_predictors = [p for p in original_program.predictors() if hasattr(p, "signature")]
        instructions_list = [p.signature.instructions for p in all_predictors]

        best_score = -np.inf
        best_program = None

        for candidate_idx in range(self.num_candidates):
            candidate_program = original_program.deepcopy()
            candidate_predictors = [p for p in candidate_program.predictors() if hasattr(p, "signature")]

            for i, predictor in enumerate(candidate_predictors):
                predictor.signature.instructions = instructions_list[i]

            for i, predictor in enumerate(candidate_predictors):
                rules = self.induce_natural_language_rules(predictor, trainset)
                predictor.signature.instructions = instructions_list[i]
                self.update_program_instructions(predictor, rules)

            score = self.evaluate_program(candidate_program, valset)

            if score > best_score:
                best_score = score
                best_program = candidate_program

            logger.info(f"Evaluated Candidate {candidate_idx + 1} with score {score}. Current best score: {best_score}")

        logger.info(f"Final best score: {best_score}")

        return best_program

    def induce_natural_language_rules(self, predictor, trainset):
        demos = self.get_predictor_demos(trainset, predictor)
        signature = predictor.signature
        while True:
            examples_text = self.format_examples(demos, signature)
            try:
                return self.rules_induction_program(examples_text)
            except Exception as e:
                assert (
                    isinstance(e, ValueError)
                    or e.__class__.__name__ == "BadRequestError"
                    or "ContextWindowExceededError" in str(e)
                )
                if len(demos) > 1:
                    demos = demos[:-1]
                else:
                    raise RuntimeError(
                        "Failed to generate natural language rules since a single example couldn't fit in the model's "
                        "context window."
                    ) from e

    def update_program_instructions(self, predictor, natural_language_rules):
        predictor.signature.instructions = (
            f"{predictor.signature.instructions}\n\n"
            f"Please adhere to the following rules when making your prediction:\n{natural_language_rules}"
        )

    def format_examples(self, demos, signature):
        examples_text = ""
        for demo in demos:
            input_fields = {k: v for k, v in demo.items() if k in signature.input_fields}
            output_fields = {k: v for k, v in demo.items() if k in signature.output_fields}
            input_text = "\n".join(f"{k}: {v}" for k, v in input_fields.items())
            output_text = "\n".join(f"{k}: {v}" for k, v in output_fields.items())
            examples_text += f"Input Fields:\n{input_text}\n\n=========\nOutput Fields:\n{output_text}\n\n"
        return examples_text

    def get_predictor_demos(self, trainset, predictor):
        # TODO: Consider how this handled "incomplete" demos.
        signature = predictor.signature
        return [
            {
                key: value
                for key, value in example.items()
                if key in signature.input_fields or key in signature.output_fields
            }
            for example in trainset
        ]

    def evaluate_program(self, program, dataset):
        effective_max_errors = (
            self.max_errors if self.max_errors is not None else dspy.settings.max_errors
        )
        evaluate = Evaluate(
            devset=dataset,
            metric=self.metric,
            num_threads=self.num_threads,
            max_errors=effective_max_errors,
            display_table=False,
            display_progress=True,
        )
        score = evaluate(program, metric=self.metric).score
        return score


class RulesInductionProgram(dspy.Module):
    def __init__(self, num_rules, teacher_settings=None):
        super().__init__()

        class CustomRulesInduction(dspy.Signature):
            __doc__ = (
                f"Given a set of examples, extract a list of {num_rules} concise and non-redundant natural language "
                "rules that provide clear guidance for performing the task. All rules should be actionable for a "
                "well-specified scope of examples of this general kind of task."
            )
            examples_text = dspy.InputField(desc="Text containing examples")
            natural_language_rules = dspy.OutputField(desc="Induced natural language rules")

        self.rules_induction = dspy.ChainOfThought(CustomRulesInduction)
        self.teacher_settings = teacher_settings or {}
        self.rng = random.Random(0)

    def forward(self, examples_text):
        with dspy.settings.context(**self.teacher_settings):
            # Generate rules with a fresh rollout and non-zero temperature.
            lm = dspy.settings.lm.copy(
                rollout_id=self.rng.randint(0, 10**9), temperature=1.0
            )
            with dspy.settings.context(lm=lm):
                rules = self.rules_induction(examples_text=examples_text).natural_language_rules

        return rules.strip()

```

--------------------------------------------------------------------------------
/tests/primitives/test_python_interpreter.py:
--------------------------------------------------------------------------------

```python
import os
import random
import shutil

import pytest

from dspy.primitives.python_interpreter import InterpreterError, PythonInterpreter

# This test suite requires deno to be installed. Please install deno following https://docs.deno.com/runtime/getting_started/installation/
if shutil.which("deno") is None:
    pytest.skip(reason="Deno is not installed or not in PATH", allow_module_level=True)


def test_execute_simple_code():
    with PythonInterpreter() as interpreter:
        code = "print('Hello, World!')"
        result = interpreter.execute(code)
        assert result == "Hello, World!\n", "Simple print statement should return 'Hello World!\n'"


def test_import():
    with PythonInterpreter() as interpreter:
        code = "import math\nresult = math.sqrt(4)\nresult"
        result = interpreter.execute(code)
        assert result == 2, "Should be able to import and use math.sqrt"


def test_user_variable_definitions():
    with PythonInterpreter() as interpreter:
        code = "result = number + 1\nresult"
        result = interpreter.execute(code, variables={"number": 4})
        assert result == 5, "User variable assignment should work"


def test_failure_syntax_error():
    with PythonInterpreter() as interpreter:
        code = "+++"
        with pytest.raises(SyntaxError, match="Invalid Python syntax"):
            interpreter.execute(code)


def test_failure_zero_division():
    with PythonInterpreter() as interpreter:
        code = "1+0/0"
        with pytest.raises(InterpreterError, match="ZeroDivisionError"):
            interpreter.execute(code)


def test_exception_args():
    with PythonInterpreter() as interpreter:
        token = random.randint(1, 10**9)
        code = f"raise ValueError({token})"
        with pytest.raises(InterpreterError, match=rf"ValueError: \[{token}\]"):
            interpreter.execute(code)


def test_final_answer_trick():
    with PythonInterpreter() as interpreter:
        token = random.randint(1, 10**9)
        code = f"final_answer('The result is', {token})"
        result = interpreter(code)

        # They should maintain the same order
        assert result == ["The result is", token], "The returned results are differ, `final_answer` trick doesn't work"

def test_enable_env_vars_flag():
    os.environ["FOO_TEST_ENV"] = "test_value"

    with PythonInterpreter(enable_env_vars=None) as interpreter:
        code = "import os\nresult = os.getenv('FOO_TEST_ENV')\nresult"
        result = interpreter.execute(code)
        assert result == "", "Environment variables should be inaccessible without allow-env"

    with PythonInterpreter(enable_env_vars=["FOO_TEST_ENV"]) as interpreter:
        code = "import os\nresult = os.getenv('FOO_TEST_ENV')\nresult"
        result = interpreter.execute(code)
        assert result == "test_value", "Environment variables should be accessible with allow-env"



def test_read_file_access_control(tmp_path):
    testfile_path = tmp_path / "test_temp_file.txt"
    virtual_path = f"/sandbox/{testfile_path.name}"
    with open(testfile_path, "w") as f:
        f.write("test content")

    with PythonInterpreter(enable_read_paths=[str(testfile_path)]) as interpreter:
        code = (
            f"with open({virtual_path!r}, 'r') as f:\n"
            f"    data = f.read()\n"
            f"data"
        )
        result = interpreter.execute(code)
        assert result == "test content", "Test file should be accessible with enable_read_paths and specified file"

    with PythonInterpreter(enable_read_paths=None) as interpreter:
        code = (
            f"try:\n"
            f"    with open({virtual_path!r}, 'r') as f:\n"
            f"        data = f.read()\n"
            f"except Exception as e:\n"
            f"    data = str(e)\n"
            f"data"
        )
        result = interpreter.execute(code)
        assert ("PermissionDenied" in result or "denied" in result.lower() or "no such file" in result.lower()), "Test file should not be accessible without enable_read_paths"

def test_enable_write_flag(tmp_path):
    testfile_path = tmp_path / "test_temp_output.txt"
    virtual_path = f"/sandbox/{testfile_path.name}"

    with PythonInterpreter(enable_write_paths=None) as interpreter:
        code = (
            f"try:\n"
            f"    with open({virtual_path!r}, 'w') as f:\n"
            f"        f.write('blocked')\n"
            f"    result = 'wrote'\n"
            f"except Exception as e:\n"
            f"    result = str(e)\n"
            f"result"
        )
        result = interpreter.execute(code)
        assert ("PermissionDenied" in result or "denied" in result.lower() or "no such file" in result.lower()), "Test file should not be writable without enable_write_paths"

    with PythonInterpreter(enable_write_paths=[str(testfile_path)]) as interpreter:
        code = (
            f"with open({virtual_path!r}, 'w') as f:\n"
            f"    f.write('allowed')\n"
            f"'ok'"
        )
        result = interpreter.execute(code)
        assert result == "ok", "Test file should be writable with enable_write_paths"
    assert testfile_path.exists()
    with open(testfile_path) as f:
        assert f.read() == "allowed", "Test file outputs should match content written during execution"

    with open(testfile_path, "w") as f:
        f.write("original_content")
    with PythonInterpreter(enable_write_paths=[str(testfile_path)], sync_files=False) as interpreter:
        code = (
            f"with open({virtual_path!r}, 'w') as f:\n"
            f"    f.write('should_not_sync')\n"
            f"'done_no_sync'"
        )
        result = interpreter.execute(code)
        assert result == "done_no_sync"
    with open(testfile_path) as f:
        assert f.read() == "original_content", "File should not be changed when sync_files is False"



def test_enable_net_flag():
    test_url = "https://example.com"

    with PythonInterpreter(enable_network_access=None) as interpreter:
        code = (
            "import js\n"
            f"resp = await js.fetch({test_url!r})\n"
            "resp.status"
        )
        with pytest.raises(InterpreterError, match="PythonError"):
            interpreter.execute(code)

    with PythonInterpreter(enable_network_access=["example.com"]) as interpreter:
        code = (
            "import js\n"
            f"resp = await js.fetch({test_url!r})\n"
            "resp.status"
        )
        result = interpreter.execute(code)
        assert int(result) == 200, "Network access is permitted with enable_network_access"

```

--------------------------------------------------------------------------------
/dspy/adapters/types/base_type.py:
--------------------------------------------------------------------------------

```python
import json
import re
from typing import Any, Optional, get_args, get_origin

import json_repair
import pydantic
from litellm import ModelResponseStream

CUSTOM_TYPE_START_IDENTIFIER = "<<CUSTOM-TYPE-START-IDENTIFIER>>"
CUSTOM_TYPE_END_IDENTIFIER = "<<CUSTOM-TYPE-END-IDENTIFIER>>"


class Type(pydantic.BaseModel):
    """Base class to support creating custom types for DSPy signatures.

    This is the parent class of DSPy custom types, e.g, dspy.Image. Subclasses must implement the `format` method to
    return a list of dictionaries (same as the Array of content parts in the OpenAI API user message's content field).

    Example:

        ```python
        class Image(Type):
            url: str

            def format(self) -> list[dict[str, Any]]:
                return [{"type": "image_url", "image_url": {"url": self.url}}]
        ```
    """

    def format(self) -> list[dict[str, Any]] | str:
        raise NotImplementedError

    @classmethod
    def description(cls) -> str:
        """Description of the custom type"""
        return ""

    @classmethod
    def extract_custom_type_from_annotation(cls, annotation):
        """Extract all custom types from the annotation.

        This is used to extract all custom types from the annotation of a field, while the annotation can
        have arbitrary level of nesting. For example, we detect `Tool` is in `list[dict[str, Tool]]`.
        """
        # Direct match. Nested type like `list[dict[str, Event]]` passes `isinstance(annotation, type)` in python 3.10
        # while fails in python 3.11. To accommodate users using python 3.10, we need to capture the error and ignore it.
        try:
            if isinstance(annotation, type) and issubclass(annotation, cls):
                return [annotation]
        except TypeError:
            pass

        origin = get_origin(annotation)
        if origin is None:
            return []

        result = []
        # Recurse into all type args
        for arg in get_args(annotation):
            result.extend(cls.extract_custom_type_from_annotation(arg))

        return result

    @pydantic.model_serializer()
    def serialize_model(self):
        formatted = self.format()
        if isinstance(formatted, list):
            return (
                f"{CUSTOM_TYPE_START_IDENTIFIER}{json.dumps(formatted, ensure_ascii=False)}{CUSTOM_TYPE_END_IDENTIFIER}"
            )
        return formatted

    @classmethod
    def is_streamable(cls) -> bool:
        """Whether the custom type is streamable."""
        return False

    @classmethod
    def parse_stream_chunk(cls, chunk: ModelResponseStream) -> Optional["Type"]:
        """
        Parse a stream chunk into the custom type.

        Args:
            chunk: A stream chunk.

        Returns:
            A custom type object or None if the chunk is not for this custom type.
        """
        return None


    @classmethod
    def parse_lm_response(cls, response: str | dict[str, Any]) -> Optional["Type"]:
        """Parse a LM response into the custom type.

        Args:
            response: A LM response.

        Returns:
            A custom type object.
        """
        return None

def split_message_content_for_custom_types(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
    """Split user message content into a list of content blocks.

    This method splits each user message's content in the `messages` list to be a list of content block, so that
    the custom types like `dspy.Image` can be properly formatted for better quality. For example, the split content
    may look like below if the user message has a `dspy.Image` object:

    ```
    [
        {"type": "text", "text": "{text_before_image}"},
        {"type": "image_url", "image_url": {"url": "{image_url}"}},
        {"type": "text", "text": "{text_after_image}"},
    ]
    ```

    This is implemented by finding the `<<CUSTOM-TYPE-START-IDENTIFIER>>` and `<<CUSTOM-TYPE-END-IDENTIFIER>>`
    in the user message content and splitting the content around them. The `<<CUSTOM-TYPE-START-IDENTIFIER>>`
    and `<<CUSTOM-TYPE-END-IDENTIFIER>>` are the reserved identifiers for the custom types as in `dspy.Type`.

    Args:
        messages: a list of messages sent to the LM. The format is the same as [OpenAI API's messages
            format](https://platform.openai.com/docs/guides/chat-completions/response-format).

    Returns:
        A list of messages with the content split into a list of content blocks around custom types content.
    """
    for message in messages:
        if message["role"] != "user":
            # Custom type messages are only in user messages
            continue

        pattern = rf"{CUSTOM_TYPE_START_IDENTIFIER}(.*?){CUSTOM_TYPE_END_IDENTIFIER}"
        result = []
        last_end = 0
        # DSPy adapter always formats user input into a string content before custom type splitting
        content: str = message["content"]

        for match in re.finditer(pattern, content, re.DOTALL):
            start, end = match.span()

            # Add text before the current block
            if start > last_end:
                result.append({"type": "text", "text": content[last_end:start]})

            # Parse the JSON inside the block
            custom_type_content = match.group(1).strip()
            parsed = None

            for parse_fn in [json.loads, _parse_doubly_quoted_json, json_repair.loads]:
                try:
                    parsed = parse_fn(custom_type_content)
                    break
                except json.JSONDecodeError:
                    continue

            if parsed:
                for custom_type_content in parsed:
                    result.append(custom_type_content)
            else:
                # fallback to raw string if it's not valid JSON
                result.append({"type": "text", "text": custom_type_content})

            last_end = end

        if last_end == 0:
            # No custom type found, return the original message
            continue

        # Add any remaining text after the last match
        if last_end < len(content):
            result.append({"type": "text", "text": content[last_end:]})

        message["content"] = result

    return messages


def _parse_doubly_quoted_json(json_str: str) -> Any:
    """
    Parse a doubly quoted JSON string into a Python dict.
    `dspy.Type` can be json-encoded twice if included in either list or dict, e.g., `list[dspy.experimental.Document]`
    """
    return json.loads(json.loads(f'"{json_str}"'))

```

--------------------------------------------------------------------------------
/docs/docs/tutorials/saving/index.md:
--------------------------------------------------------------------------------

```markdown
# Tutorial: Saving and Loading your DSPy program

This guide demonstrates how to save and load your DSPy program. At a high level, there are two ways to save your DSPy program:

1. Save the state of the program only, similar to weights-only saving in PyTorch.
2. Save the whole program, including both the architecture and the state, which is supported by `dspy>=2.6.0`.

## State-only Saving

State represents the DSPy program's internal state, including the signature, demos (few-shot examples), and other information like
the `lm` to use for each `dspy.Predict` in the program. It also includes configurable attributes of other DSPy modules like
`k` for `dspy.retrievers.Retriever`. To save the state of a program, use the `save` method and set `save_program=False`. You can
choose to save the state to a JSON file or a pickle file. We recommend saving the state to a JSON file because it is safer and readable.
But sometimes your program contains non-serializable objects like `dspy.Image` or `datetime.datetime`, in which case you should save
the state to a pickle file.

Let's say we have compiled a program with some data, and we want to save the program for future usage:

```python
import dspy
from dspy.datasets.gsm8k import GSM8K, gsm8k_metric

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

gsm8k = GSM8K()
gsm8k_trainset = gsm8k.train[:10]
dspy_program = dspy.ChainOfThought("question -> answer")

optimizer = dspy.BootstrapFewShot(metric=gsm8k_metric, max_bootstrapped_demos=4, max_labeled_demos=4, max_rounds=5)
compiled_dspy_program = optimizer.compile(dspy_program, trainset=gsm8k_trainset)
```

To save the state of your program to json file:

```python
compiled_dspy_program.save("./dspy_program/program.json", save_program=False)
```

To save the state of your program to a pickle file:

```python
compiled_dspy_program.save("./dspy_program/program.pkl", save_program=False)
```

To load your saved state, you need to **recreate the same program**, then load the state using the `load` method.

```python
loaded_dspy_program = dspy.ChainOfThought("question -> answer") # Recreate the same program.
loaded_dspy_program.load("./dspy_program/program.json")

assert len(compiled_dspy_program.demos) == len(loaded_dspy_program.demos)
for original_demo, loaded_demo in zip(compiled_dspy_program.demos, loaded_dspy_program.demos):
    # Loaded demo is a dict, while the original demo is a dspy.Example.
    assert original_demo.toDict() == loaded_demo
assert str(compiled_dspy_program.signature) == str(loaded_dspy_program.signature)
```

Or load the state from a pickle file:

```python
loaded_dspy_program = dspy.ChainOfThought("question -> answer") # Recreate the same program.
loaded_dspy_program.load("./dspy_program/program.pkl")

assert len(compiled_dspy_program.demos) == len(loaded_dspy_program.demos)
for original_demo, loaded_demo in zip(compiled_dspy_program.demos, loaded_dspy_program.demos):
    # Loaded demo is a dict, while the original demo is a dspy.Example.
    assert original_demo.toDict() == loaded_demo
assert str(compiled_dspy_program.signature) == str(loaded_dspy_program.signature)
```

## Whole Program Saving

Starting from `dspy>=2.6.0`, DSPy supports saving the whole program, including the architecture and the state. This feature
is powered by `cloudpickle`, which is a library for serializing and deserializing Python objects.

To save the whole program, use the `save` method and set `save_program=True`, and specify a directory path to save the program
instead of a file name. We require a directory path because we also save some metadata, e.g., the dependency versions along
with the program itself.

```python
compiled_dspy_program.save("./dspy_program/", save_program=True)
```

To load the saved program, directly use `dspy.load` method:

```python
loaded_dspy_program = dspy.load("./dspy_program/")

assert len(compiled_dspy_program.demos) == len(loaded_dspy_program.demos)
for original_demo, loaded_demo in zip(compiled_dspy_program.demos, loaded_dspy_program.demos):
    # Loaded demo is a dict, while the original demo is a dspy.Example.
    assert original_demo.toDict() == loaded_demo
assert str(compiled_dspy_program.signature) == str(loaded_dspy_program.signature)
```

With whole program saving, you don't need to recreate the program, but can directly load the architecture along with the state.
You can pick the suitable saving approach based on your needs.

### Serializing Imported Modules

When saving a program with `save_program=True`, you might need to include custom modules that your program depends on. This is
necessary if your program depends on these modules, but at loading time these modules are not imported before calling `dspy.load`.

You can specify which custom modules should be serialized with your program by passing them to the `modules_to_serialize`
parameter when calling `save`. This ensures that any dependencies your program relies on are included during serialization and
available when loading the program later.

Under the hood this uses cloudpickle's `cloudpickle.register_pickle_by_value` function to register a module as picklable by value.
When a module is registered this way, cloudpickle will serialize the module by value rather than by reference, ensuring that the
module contents are preserved with the saved program.

For example, if your program uses custom modules:

```python
import dspy
import my_custom_module

compiled_dspy_program = dspy.ChainOfThought(my_custom_module.custom_signature)

# Save the program with the custom module
compiled_dspy_program.save(
    "./dspy_program/",
    save_program=True,
    modules_to_serialize=[my_custom_module]
)
```

This ensures that the required modules are properly serialized and available when loading the program later. Any number of
modules can be passed to `modules_to_serialize`. If you don't specify `modules_to_serialize`, no additional modules will be
registered for serialization.

## Backward Compatibility

As of `dspy<3.0.0`, we don't guarantee the backward compatibility of the saved program. For example, if you save the program with `dspy==2.5.35`,
at loading time please make sure to use the same version of DSPy to load the program, otherwise the program may not work as expected. Chances
are that loading a saved file in a different version of DSPy will not raise an error, but the performance could be different from when
the program was saved.

Starting from `dspy>=3.0.0`, we will guarantee the backward compatibility of the saved program in major releases, i.e., programs saved in `dspy==3.0.0`
should be loadable in `dspy==3.7.10`.

```

--------------------------------------------------------------------------------
/dspy/dsp/utils/settings.py:
--------------------------------------------------------------------------------

```python
import asyncio
import contextvars
import copy
import threading
from contextlib import contextmanager

from dspy.dsp.utils.utils import dotdict

DEFAULT_CONFIG = dotdict(
    lm=None,
    adapter=None,
    rm=None,
    branch_idx=0,
    trace=[],
    callbacks=[],
    async_max_workers=8,
    send_stream=None,
    disable_history=False,
    track_usage=False,
    usage_tracker=None,
    caller_predict=None,
    caller_modules=None,
    stream_listeners=[],
    provide_traceback=False,  # Whether to include traceback information in error logs.
    num_threads=8,  # Number of threads to use for parallel processing.
    max_errors=10,  # Maximum errors before halting operations.
    # If true, async tools can be called in sync mode by getting converted to sync.
    allow_tool_async_sync_conversion=False,
    max_history_size=10000,
    max_trace_size=10000,
)

# Global base configuration and owner tracking
main_thread_config = copy.deepcopy(DEFAULT_CONFIG)
config_owner_thread_id = None
config_owner_async_task = None

# Global lock for settings configuration
global_lock = threading.Lock()

thread_local_overrides = contextvars.ContextVar("context_overrides", default=dotdict())


class Settings:
    """
    A singleton class for DSPy configuration settings.
    Thread-safe global configuration.
    - 'configure' can be called by only one 'owner' thread (the first thread that calls it).
    - Other threads see the configured global values from 'main_thread_config'.
    - 'context' sets thread-local overrides. These overrides propagate to threads spawned
      inside that context block, when (and only when!) using a ParallelExecutor that copies overrides.

      1. Only one unique thread (which can be any thread!) can call dspy.configure.
      2. It affects a global state, visible to all. As a result, user threads work, but they shouldn't be
         mixed with concurrent changes to dspy.configure from the "main" thread.
         (TODO: In the future, add warnings: if there are near-in-time user-thread reads followed by .configure calls.)
      3. Any thread can use dspy.context. It propagates to child threads created with DSPy primitives: Parallel, asyncify, etc.
    """

    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    @property
    def lock(self):
        return global_lock

    def __getattr__(self, name):
        overrides = thread_local_overrides.get()
        if name in overrides:
            return overrides[name]
        elif name in main_thread_config:
            return main_thread_config[name]
        else:
            raise AttributeError(f"'Settings' object has no attribute '{name}'")

    def __setattr__(self, name, value):
        if name in ("_instance",):
            super().__setattr__(name, value)
        else:
            self.configure(**{name: value})

    def __getitem__(self, key):
        return self.__getattr__(key)

    def __setitem__(self, key, value):
        self.__setattr__(key, value)

    def __contains__(self, key):
        overrides = thread_local_overrides.get()
        return key in overrides or key in main_thread_config

    def get(self, key, default=None):
        try:
            return self[key]
        except AttributeError:
            return default

    def copy(self):
        overrides = thread_local_overrides.get()
        return dotdict({**main_thread_config, **overrides})

    @property
    def config(self):
        return self.copy()

    def _ensure_configure_allowed(self):
        global main_thread_config, config_owner_thread_id, config_owner_async_task
        current_thread_id = threading.get_ident()

        if config_owner_thread_id is None:
            # First `configure` call assigns the owner thread id.
            config_owner_thread_id = current_thread_id

        if config_owner_thread_id != current_thread_id:
            # Disallow a second `configure` calls from other threads.
            raise RuntimeError("dspy.settings can only be changed by the thread that initially configured it.")

        # Async task doesn't allow a second `configure` call, must use dspy.context(...) instead.
        is_async_task = False
        try:
            if asyncio.current_task() is not None:
                is_async_task = True
        except RuntimeError:
            # This exception (e.g., "no current task") means we are not in an async loop/task,
            # or asyncio module itself is not fully functional in this specific sub-thread context.
            is_async_task = False

        if not is_async_task:
            return

        if config_owner_async_task is None:
            # First `configure` call assigns the owner async task.
            config_owner_async_task = asyncio.current_task()
            return

        # We are in an async task. Now check for IPython and allow calling `configure` from IPython.
        in_ipython = False
        try:
            from IPython import get_ipython

            # get_ipython is a global injected by IPython environments.
            # We check its existence and type to be more robust.
            in_ipython = get_ipython() is not None
        except Exception:
            # If `IPython` is not installed or `get_ipython` failed, we are not in an IPython environment.
            in_ipython = False

        if not in_ipython and config_owner_async_task != asyncio.current_task():
            raise RuntimeError(
                "dspy.settings.configure(...) can only be called from the same async task that called it first. Please "
                "use `dspy.context(...)` in other async tasks instead."
            )

    def configure(self, **kwargs):
        # If no exception is raised, the `configure` call is allowed.
        self._ensure_configure_allowed()

        # Update global config
        for k, v in kwargs.items():
            main_thread_config[k] = v

    @contextmanager
    def context(self, **kwargs):
        """
        Context manager for temporary configuration changes at the thread level.
        Does not affect global configuration. Changes only apply to the current thread.
        If threads are spawned inside this block using ParallelExecutor, they will inherit these overrides.
        """

        original_overrides = thread_local_overrides.get().copy()
        new_overrides = dotdict({**main_thread_config, **original_overrides, **kwargs})
        token = thread_local_overrides.set(new_overrides)

        try:
            yield
        finally:
            thread_local_overrides.reset(token)

    def __repr__(self):
        overrides = thread_local_overrides.get()
        combined_config = {**main_thread_config, **overrides}
        return repr(combined_config)


settings = Settings()

```

--------------------------------------------------------------------------------
/dspy/teleprompt/random_search.py:
--------------------------------------------------------------------------------

```python
import random

import dspy
from dspy.evaluate.evaluate import Evaluate
from dspy.teleprompt.teleprompt import Teleprompter

from .bootstrap import BootstrapFewShot
from .vanilla import LabeledFewShot

# TODO: Don't forget dealing with the raw demos.
# TODO: Deal with the (pretty common) case of having a metric for filtering and a separate metric for eval.
# The metric itself may tell though by the presence of trace.

# TODO: This function should take a max_budget and max_teacher_budget. That's in the number of program calls.
# In this case, max_student_budget is max_budget - max_teacher_budget.
# For max_teacher_budget, this will just limit the total number of things we bootstrap.
# This can end up implicitly defining the number of candidate programs (i.e., stop when runs out). Cap at 16.
# For max_student_budget, this will be a more upfront calculation.
# Right now, it can also just induce the number of candidate programs. Later, it could be used more interestingly
# for selective early stopping.
# Progressive elimination sounds about right: after 50 examples, drop bottom third, after 100, another third, etc.
# until only 3--5 are left for the end. Could also be systematic and add (earlier) stopping based on error bounds.
# In general, though, the early filtering is just saying: either there are some really bad ones, or some really really
# good ones, or most things are pretty close. In all of these cases, dropping the bottom third is not going to hurt.


class BootstrapFewShotWithRandomSearch(Teleprompter):
    def __init__(
        self,
        metric,
        teacher_settings=None,
        max_bootstrapped_demos=4,
        max_labeled_demos=16,
        max_rounds=1,
        num_candidate_programs=16,
        num_threads=None,
        max_errors=None,
        stop_at_score=None,
        metric_threshold=None,
    ):
        self.metric = metric
        self.teacher_settings = teacher_settings or {}
        self.max_rounds = max_rounds

        self.num_threads = num_threads
        self.stop_at_score = stop_at_score
        self.metric_threshold = metric_threshold
        self.min_num_samples = 1
        self.max_num_samples = max_bootstrapped_demos
        self.max_errors = max_errors
        self.num_candidate_sets = num_candidate_programs
        self.max_labeled_demos = max_labeled_demos

        print(f"Going to sample between {self.min_num_samples} and {self.max_num_samples} traces per predictor.")
        print(f"Will attempt to bootstrap {self.num_candidate_sets} candidate sets.")

    def compile(self, student, *, teacher=None, trainset, valset=None, restrict=None, labeled_sample=True):
        self.trainset = trainset
        self.valset = valset or trainset  # TODO: FIXME: Note this choice.

        effective_max_errors = self.max_errors if self.max_errors is not None else dspy.settings.max_errors

        scores = []
        all_subscores = []
        score_data = []

        for seed in range(-3, self.num_candidate_sets):
            if (restrict is not None) and (seed not in restrict):
                continue

            trainset_copy = list(self.trainset)

            if seed == -3:
                # zero-shot
                program = student.reset_copy()

            elif seed == -2:
                # labels only
                teleprompter = LabeledFewShot(k=self.max_labeled_demos)
                program = teleprompter.compile(student, trainset=trainset_copy, sample=labeled_sample)

            elif seed == -1:
                # unshuffled few-shot
                optimizer = BootstrapFewShot(
                    metric=self.metric,
                    metric_threshold=self.metric_threshold,
                    max_bootstrapped_demos=self.max_num_samples,
                    max_labeled_demos=self.max_labeled_demos,
                    teacher_settings=self.teacher_settings,
                    max_rounds=self.max_rounds,
                    max_errors=effective_max_errors,
                )
                program = optimizer.compile(student, teacher=teacher, trainset=trainset_copy)

            else:
                assert seed >= 0, seed

                random.Random(seed).shuffle(trainset_copy)
                size = random.Random(seed).randint(self.min_num_samples, self.max_num_samples)

                optimizer = BootstrapFewShot(
                    metric=self.metric,
                    metric_threshold=self.metric_threshold,
                    max_bootstrapped_demos=size,
                    max_labeled_demos=self.max_labeled_demos,
                    teacher_settings=self.teacher_settings,
                    max_rounds=self.max_rounds,
                    max_errors=effective_max_errors,
                )

                program = optimizer.compile(student, teacher=teacher, trainset=trainset_copy)

            evaluate = Evaluate(
                devset=self.valset,
                metric=self.metric,
                num_threads=self.num_threads,
                max_errors=effective_max_errors,
                display_table=False,
                display_progress=True,
            )

            result = evaluate(program)

            score, subscores = result.score, [output[2] for output in result.results]

            all_subscores.append(subscores)

            if len(scores) == 0 or score > max(scores):
                print("New best score:", score, "for seed", seed)
                best_program = program

            scores.append(score)
            print(f"Scores so far: {scores}")
            print(f"Best score so far: {max(scores)}")

            score_data.append({"score": score, "subscores": subscores, "seed": seed, "program": program})

            if self.stop_at_score is not None and score >= self.stop_at_score:
                print(f"Stopping early because score {score} is >= stop_at_score {self.stop_at_score}")
                break

        # To best program, attach all program candidates in decreasing average score
        best_program.candidate_programs = score_data
        best_program.candidate_programs = sorted(
            best_program.candidate_programs, key=lambda x: x["score"], reverse=True
        )

        print(f"{len(best_program.candidate_programs)} candidate programs found.")

        return best_program


# sample between 4 and 10 examples from traces
# TODO: FIXME: The max number of demos should be determined in part by the LM's tokenizer + max_length.
# This does require executing the program, or at least the predictor.
# # # # # # (Actually we can just combine the token counts of the traces, when formatted via signature/adapter).
# Alternatively, we can keep track of the (zero-shot) number of tokens when we bootstrap.
# As another option, we can just try a wide range and handle failures as penalties on the score.
# The number "24" of traces to collect can also be affected. If we only need 3x10, some overlap is ok.
# We can also consider having short_demos and long_demos.

```

--------------------------------------------------------------------------------
/tests/primitives/test_module.py:
--------------------------------------------------------------------------------

```python
from pathlib import Path

import dspy
from dspy.primitives.module import Module, set_attribute_by_name  # Adjust the import based on your file structure
from dspy.utils import DummyLM


class HopModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predict1 = dspy.Predict("question -> query")
        self.predict2 = dspy.Predict("query -> answer")

    def forward(self, question):
        query = self.predict1(question=question).query
        return self.predict2(query=query)


def test_module_initialization():
    module = Module()
    assert module._compiled is False, "Module _compiled attribute should be False upon initialization"


def test_named_predictors():
    module = HopModule()
    named_preds = module.named_predictors()
    assert len(named_preds) == 2, "Should identify correct number of Predict instances"
    names, preds = zip(*named_preds, strict=False)
    assert "predict1" in names and "predict2" in names, "Named predictors should include 'predict1' and 'predict2'"


def test_predictors():
    module = HopModule()
    preds = module.predictors()
    assert len(preds) == 2, "Should return correct number of Predict instances"
    assert all(isinstance(p, dspy.Predict) for p in preds), "All returned items should be instances of PredictMock"


def test_forward():
    program = HopModule()
    dspy.settings.configure(
        lm=DummyLM(
            {
                "What is 1+1?": {"query": "let me check"},
                "let me check": {"answer": "2"},
            }
        )
    )
    result = program(question="What is 1+1?").answer
    assert result == "2"


def test_nested_named_predictors():
    class Hop2Module(dspy.Module):
        def __init__(self):
            super().__init__()
            self.hop = HopModule()

    module = Hop2Module()
    named_preds = module.named_predictors()
    assert len(named_preds) == 2
    names, _preds = zip(*named_preds, strict=False)
    assert "hop.predict1" in names
    assert "hop.predict2" in names


def test_empty_module():
    module = Module()
    assert list(module.named_sub_modules()) == [("self", module)]


def test_single_level():
    module = Module()
    module.sub = Module()
    expected = [("self", module), ("self.sub", module.sub)]
    assert list(module.named_sub_modules()) == expected


def test_multiple_levels():
    module = Module()
    module.sub = Module()
    module.sub.subsub = Module()
    expected = [("self", module), ("self.sub", module.sub), ("self.sub.subsub", module.sub.subsub)]
    assert list(module.named_sub_modules()) == expected


def test_multiple_sub_modules():
    module = Module()
    module.sub1 = Module()
    module.sub2 = Module()
    expected = [("self", module), ("self.sub1", module.sub1), ("self.sub2", module.sub2)]
    assert sorted(module.named_sub_modules()) == sorted(expected)


def test_non_base_module_attributes():
    module = Module()
    module.sub = Module()
    module.not_a_sub = "Not a self"
    expected = [("self", module), ("self.sub", module.sub)]
    assert list(module.named_sub_modules()) == expected


def test_complex_module_traversal():
    root = Module()
    root.sub_module = Module()
    root.sub_module.nested_list = [Module(), {"key": Module()}]
    root.sub_module.nested_tuple = (Module(), [Module(), Module()])
    expected_names = {
        "self",
        "self.sub_module",
        "self.sub_module.nested_list[0]",
        "self.sub_module.nested_list[1][key]",
        "self.sub_module.nested_tuple[0]",
        "self.sub_module.nested_tuple[1][0]",
        "self.sub_module.nested_tuple[1][1]",
    }
    found_names = {name for name, _ in root.named_sub_modules()}

    assert found_names == expected_names, (
        f"Missing or extra modules found. Missing: {expected_names - found_names}, Extra: {found_names - expected_names}"
    )


def test_complex_module_traversal_with_same_module():
    root = Module()
    root.sub_module = Module()
    root.sub_module.nested_list = [Module(), {"key": Module()}]
    same_module = Module()
    root.sub_module.nested_tuple = (Module(), [same_module, same_module])
    expected_names = {
        "self",
        "self.sub_module",
        "self.sub_module.nested_list[0]",
        "self.sub_module.nested_list[1][key]",  # NOTE: named_sub_modules allows recursive structures
        "self.sub_module.nested_tuple[0]",
        "self.sub_module.nested_tuple[1][0]",  # NEW: named_sub_modules allows recursive structures, but named_parameters does not
    }
    found_names = {name for name, _ in root.named_sub_modules()}

    assert found_names == expected_names, (
        f"Missing or extra modules found. Missing: {expected_names - found_names}, Extra: {found_names - expected_names}"
    )


def test_complex_module_set_attribute_by_name():
    root = Module()
    root.sub_module = Module()
    root.sub_module.nested_list = [Module(), {"key": Module()}]
    same_module = Module()
    root.sub_module.nested_tuple = (Module(), [same_module, same_module])

    set_attribute_by_name(root, "test_attrib", True)
    assert root.test_attrib is True
    set_attribute_by_name(root, "sub_module.test_attrib", True)
    assert root.sub_module.test_attrib is True
    set_attribute_by_name(root, "sub_module.nested_list[0].test_attrib", True)
    assert root.sub_module.nested_list[0].test_attrib is True
    set_attribute_by_name(root, "sub_module.nested_list[1]['key'].test_attrib", True)
    assert root.sub_module.nested_list[1]["key"].test_attrib is True
    set_attribute_by_name(root, "sub_module.nested_tuple[0].test_attrib", True)
    assert root.sub_module.nested_tuple[0].test_attrib is True
    set_attribute_by_name(root, "sub_module.nested_tuple[1][0].test_attrib", True)
    assert root.sub_module.nested_tuple[1][0].test_attrib is True
    assert root.sub_module.nested_tuple[1][1].test_attrib is True


class DuplicateModule(Module):
    def __init__(self):
        super().__init__()
        self.p0 = dspy.Predict("question -> answer")
        self.p1 = self.p0


def test_named_parameters_duplicate_references():
    module = DuplicateModule()
    # Only testing for whether exceptions are thrown or not
    # As Module.named_parameters() is recursive, this is mainly for catching infinite recursion
    module.named_parameters()


def test_load_dspy_program_cross_version():
    """
    Test backward compatibility for loading a saved DSPy program.

    This test verifies that DSPy can load a program saved in version 3.0.1, ensuring compatibility with older versions.
    The saved state is located in 'test/primitives/resources/saved_program.json' and represents an optimized
    `dspy.ReAct` program.
    """
    path = Path(__file__).parent / "resources" / "saved_program.json"
    loaded_react = dspy.ReAct("question->answer", tools=[])
    loaded_react.load(path)
    assert (
        "Imagine you are a detective racing against time to solve a high-profile"
        in loaded_react.react.signature.instructions
    )
    assert "Given the very verbose fields `question`" in loaded_react.extract.predict.signature.instructions

    assert len(loaded_react.react.demos) == 2
    assert len(loaded_react.extract.predict.demos) == 2

```

--------------------------------------------------------------------------------
/dspy/dsp/utils/dpr.py:
--------------------------------------------------------------------------------

```python
"""
Source: DPR Implementation from Facebook Research
https://github.com/facebookresearch/DPR/tree/master/dpr
Original license: https://github.com/facebookresearch/DPR/blob/main/LICENSE
"""

import copy
import logging
import unicodedata

import regex

logger = logging.getLogger(__name__)


class Tokens:
    """A class to represent a list of tokenized text."""

    TEXT = 0
    TEXT_WS = 1
    SPAN = 2
    POS = 3
    LEMMA = 4
    NER = 5

    def __init__(self, data, annotators, opts=None):
        self.data = data
        self.annotators = annotators
        self.opts = opts or {}

    def __len__(self):
        """The number of tokens."""
        return len(self.data)

    def slice(self, i=None, j=None):
        """Return a view of the list of tokens from [i, j)."""
        new_tokens = copy.copy(self)
        new_tokens.data = self.data[i:j]
        return new_tokens

    def untokenize(self):
        """Returns the original text (with whitespace reinserted)."""
        return "".join([t[self.TEXT_WS] for t in self.data]).strip()

    def words(self, uncased=False):
        """Returns a list of the text of each token

        Args:
            uncased: lower cases text
        """
        if uncased:
            return [t[self.TEXT].lower() for t in self.data]
        else:
            return [t[self.TEXT] for t in self.data]

    def offsets(self):
        """Returns a list of [start, end) character offsets of each token."""
        return [t[self.SPAN] for t in self.data]

    def pos(self):
        """Returns a list of part-of-speech tags of each token.
        Returns None if this annotation was not included.
        """
        if "pos" not in self.annotators:
            return None
        return [t[self.POS] for t in self.data]

    def lemmas(self):
        """Returns a list of the lemmatized text of each token.
        Returns None if this annotation was not included.
        """
        if "lemma" not in self.annotators:
            return None
        return [t[self.LEMMA] for t in self.data]

    def entities(self):
        """Returns a list of named-entity-recognition tags of each token.
        Returns None if this annotation was not included.
        """
        if "ner" not in self.annotators:
            return None
        return [t[self.NER] for t in self.data]

    def ngrams(self, n=1, uncased=False, filter_fn=None, as_strings=True):
        """Returns a list of all ngrams from length 1 to n.

        Args:
            n: upper limit of ngram length
            uncased: lower cases text
            filter_fn: user function that takes in an ngram list and returns
              True or False to keep or not keep the ngram
            as_string: return the ngram as a string vs list
        """

        def _skip(gram):
            if not filter_fn:
                return False
            return filter_fn(gram)

        words = self.words(uncased)
        ngrams = [
            (s, e + 1)
            for s in range(len(words))
            for e in range(s, min(s + n, len(words)))
            if not _skip(words[s : e + 1])
        ]

        # Concatenate into strings
        if as_strings:
            ngrams = ["{}".format(" ".join(words[s:e])) for (s, e) in ngrams]

        return ngrams

    def entity_groups(self):
        """Group consecutive entity tokens with the same NER tag."""
        entities = self.entities()
        if not entities:
            return None
        non_ent = self.opts.get("non_ent", "O")
        groups = []
        idx = 0
        while idx < len(entities):
            ner_tag = entities[idx]
            # Check for entity tag
            if ner_tag != non_ent:
                # Chomp the sequence
                start = idx
                while idx < len(entities) and entities[idx] == ner_tag:
                    idx += 1
                groups.append((self.slice(start, idx).untokenize(), ner_tag))
            else:
                idx += 1
        return groups


class Tokenizer:
    """Base tokenizer class.
    Tokenizers implement tokenize, which should return a Tokens class.
    """

    def tokenize(self, text):
        raise NotImplementedError

    def shutdown(self):
        pass

    def __del__(self):
        self.shutdown()


class SimpleTokenizer(Tokenizer):
    ALPHA_NUM = r"[\p{L}\p{N}\p{M}]+"
    NON_WS = r"[^\p{Z}\p{C}]"

    def __init__(self, **kwargs):
        """
        Args:
            annotators: None or empty set (only tokenizes).
        """
        self._regexp = regex.compile(
            "(%s)|(%s)" % (self.ALPHA_NUM, self.NON_WS),
            flags=regex.IGNORECASE + regex.UNICODE + regex.MULTILINE,
        )
        if len(kwargs.get("annotators", {})) > 0:
            logger.warning(
                "%s only tokenizes! Skipping annotators: %s",
                type(self).__name__,
                kwargs.get("annotators"),
            )
        self.annotators = set()

    def tokenize(self, text):
        data = []
        matches = list(self._regexp.finditer(text))
        for i in range(len(matches)):
            # Get text
            token = matches[i].group()

            # Get whitespace
            span = matches[i].span()
            start_ws = span[0]
            if i + 1 < len(matches):
                end_ws = matches[i + 1].span()[0]
            else:
                end_ws = span[1]

            # Format data
            data.append(
                (
                    token,
                    text[start_ws:end_ws],
                    span,
                )
            )
        return Tokens(data, self.annotators)


def has_answer(tokenized_answers, text):
    text = DPR_normalize(text)

    for single_answer in tokenized_answers:
        for i in range(0, len(text) - len(single_answer) + 1):
            if single_answer == text[i : i + len(single_answer)]:
                return True

    return False


def locate_answers(tokenized_answers, text):
    """
    Returns each occurrence of an answer as (offset, endpos) in terms of *characters*.
    """
    tokenized_text = DPR_tokenize(text)
    occurrences = []

    text_words, text_word_positions = tokenized_text.words(uncased=True), tokenized_text.offsets()
    answers_words = [ans.words(uncased=True) for ans in tokenized_answers]

    for single_answer in answers_words:
        for i in range(0, len(text_words) - len(single_answer) + 1):
            if single_answer == text_words[i : i + len(single_answer)]:
                (offset, _), (_, endpos) = text_word_positions[i], text_word_positions[i + len(single_answer) - 1]
                occurrences.append((offset, endpos))

    return occurrences


STokenizer = SimpleTokenizer()


def DPR_tokenize(text):  # noqa: N802
    return STokenizer.tokenize(unicodedata.normalize("NFD", text))


def DPR_normalize(text):  # noqa: N802
    return DPR_tokenize(text).words(uncased=True)


# Source: https://github.com/shmsw25/qa-hard-em/blob/master/prepro_util.py
def strip_accents(text):
    """Strips accents from a piece of text."""
    text = unicodedata.normalize("NFD", text)
    output = []
    for char in text:
        cat = unicodedata.category(char)
        if cat == "Mn":
            continue
        output.append(char)
    return "".join(output)

```

--------------------------------------------------------------------------------
/dspy/propose/utils.py:
--------------------------------------------------------------------------------

```python
import inspect
import json
import re

import dspy

try:
    from IPython.core.magics.code import extract_symbols
except ImportError:
    # Won't be able to read code from jupyter notebooks
    extract_symbols = None

from dspy.predict.parameter import Parameter
from dspy.teleprompt.utils import get_signature, new_getfile


def strip_prefix(text):
    pattern = r"^[\*\s]*(([\w\'\-]+\s+){0,4}[\w\'\-]+):\s*"
    modified_text = re.sub(pattern, "", text)
    return modified_text.strip('"')

def create_instruction_set_history_string(base_program, trial_logs, top_n):
    program_history = []
    for trial_num in trial_logs:
        trial = trial_logs[trial_num]
        if "program_path" in trial:
            trial_program = base_program.deepcopy()
            trial_program.load(trial["program_path"])
            program_history.append({
                "program": trial_program,
                "score": trial["score"],
            })

    # Deduplicate program history based on the program's instruction set
    seen_programs = set()
    unique_program_history = []
    for entry in program_history:
        program = entry["program"]
        instruction_set = get_program_instruction_set_string(program)
        if instruction_set not in seen_programs:
            seen_programs.add(instruction_set)
            unique_program_history.append(entry)

    # Get the top n programs from program history
    top_n_program_history = sorted(unique_program_history, key=lambda x: x["score"], reverse=True)[:top_n]
    top_n_program_history.reverse()

    # Create formatted string
    instruction_set_history_string = ""
    for entry in top_n_program_history:
        program = entry["program"]
        score = entry["score"]
        instruction_set = get_program_instruction_set_string(program)
        instruction_set_history_string += instruction_set + f" | Score: {score}\n\n"

    return instruction_set_history_string

def parse_list_of_instructions(instruction_string):
    # Try to convert the string representation of a list to an actual list using JSON
    try:
        instructions = json.loads(instruction_string)
        return instructions
    except json.JSONDecodeError:
        pass

    # If JSON decoding fails, extract strings within quotes
    instructions = re.findall(r'"([^"]*)"', instruction_string)
    return instructions

def get_program_instruction_set_string(program):
    instruction_list = []
    for _, pred in enumerate(program.predictors()):
        pred_instructions = get_signature(pred).instructions
        instruction_list.append(f'"{pred_instructions}"')
    # Joining the list into a single string that looks like a list
    return f"[{', '.join(instruction_list)}]"

def create_predictor_level_history_string(base_program, predictor_i, trial_logs, top_n):
    instruction_aggregate = {}
    instruction_history = []

    # Load trial programs
    for trial_num in trial_logs:
        trial = trial_logs[trial_num]
        if "program_path" in trial:
            trial_program = base_program.deepcopy()
            trial_program.load(trial["program_path"])
            instruction_history.append({
                "program": trial_program,
                "score": trial["score"],
            })

    # Aggregate scores for each instruction
    for history_item in instruction_history:
        predictor = history_item["program"].predictors()[predictor_i]
        instruction = get_signature(predictor).instructions
        score = history_item["score"]

        if instruction in instruction_aggregate:
            instruction_aggregate[instruction]["total_score"] += score
            instruction_aggregate[instruction]["count"] += 1
        else:
            instruction_aggregate[instruction] = {"total_score": score, "count": 1}

    # Calculate average score for each instruction and prepare for sorting
    predictor_history = []
    for instruction, data in instruction_aggregate.items():
        average_score = data["total_score"] / data["count"]
        predictor_history.append((instruction, average_score))

    # Deduplicate and sort by average score, then select top N
    seen_instructions = set()
    unique_predictor_history = []
    for instruction, score in predictor_history:
        if instruction not in seen_instructions:
            seen_instructions.add(instruction)
            unique_predictor_history.append((instruction, score))

    top_instructions = sorted(unique_predictor_history, key=lambda x: x[1], reverse=True)[:top_n]
    top_instructions.reverse()

    # Create formatted history string
    predictor_history_string = ""
    for instruction, score in top_instructions:
        predictor_history_string += instruction + f" | Score: {score}\n\n"

    return predictor_history_string

def create_example_string(fields, example):

    # Building the output string
    output = []
    for field_name, field_values in fields.items():
        name = field_values.json_schema_extra["prefix"]

        # Determine the value from input_data or prediction_data
        value = example.get(field_name)

        # Construct the string for the current field
        field_str = f"{name} {value}"
        output.append(field_str)

    # Joining all the field strings
    return "\n".join(output)

def get_dspy_source_code(module):
    header = []
    base_code = ""

    # Don't get source code for Predict or ChainOfThought modules (NOTE we will need to extend this list as more DSPy.modules are added)
    # TODO: if type(module).__name__ not in ["Predict", "ChainOfThought", "ReAct"]:
    if not type(module).__name__ == "Predict" and not type(module).__name__ == "ChainOfThought":
        try:
            base_code = inspect.getsource(type(module))
        except TypeError:
            obj = type(module)
            cell_code = "".join(inspect.linecache.getlines(new_getfile(obj)))
            class_code = extract_symbols(cell_code, obj.__name__)[0][0]
            base_code = str(class_code)

    completed_set = set()
    for attribute in module.__dict__.keys():
        try:
            iterable = iter(getattr(module, attribute))
        except TypeError:
            iterable = [getattr(module, attribute)]

        for item in iterable:
            # Skip items that are unhashable (like module history)
            try:
                hash(item)
            except TypeError:
                continue
            if isinstance(item, Parameter):
                if hasattr(item, "signature") and item.signature is not None and item.signature.__pydantic_parent_namespace__["signature_name"] + "_sig" not in completed_set:
                    try:
                        header.append(inspect.getsource(item.signature))
                        print(inspect.getsource(item.signature))
                    except (TypeError, OSError):
                        header.append(str(item.signature))
                    completed_set.add(item.signature.__pydantic_parent_namespace__["signature_name"] + "_sig")
            if isinstance(item, dspy.Module):
                code = get_dspy_source_code(item).strip()
                if code not in completed_set:
                    header.append(code)
                    completed_set.add(code)
            completed_set.add(item)

    return "\n\n".join(header) + "\n\n" + base_code

```

--------------------------------------------------------------------------------
/docs/docs/api/optimizers/GEPA/overview.md:
--------------------------------------------------------------------------------

```markdown
# dspy.GEPA: Reflective Prompt Optimizer

**GEPA** (Genetic-Pareto) is a reflective optimizer proposed in "GEPA: Reflective Prompt Evolution Can Outperform Reinforcement Learning" (Agrawal et al., 2025, [arxiv:2507.19457](https://arxiv.org/abs/2507.19457)), that adaptively evolves _textual components_ (such as prompts) of arbitrary systems. In addition to scalar scores returned by metrics, users can also provide GEPA with a text feedback to guide the optimization process. Such textual feedback provides GEPA more visibility into why the system got the score that it did, and then GEPA can introspect to identify how to improve the score. This allows GEPA to propose high performing prompts in very few rollouts.

<!-- START_API_REF -->
::: dspy.GEPA
    handler: python
    options:
        members:
            - auto_budget
            - compile
            - get_params
        show_source: true
        show_root_heading: true
        heading_level: 2
        docstring_style: google
        show_root_full_path: true
        show_object_full_path: false
        separate_signature: false
        inherited_members: true
:::
<!-- END_API_REF -->

One of the key insights behind GEPA is its ability to leverage domain-specific textual feedback. Users should provide a feedback function as the GEPA metric, which has the following call signature:
<!-- START_API_REF -->
::: dspy.teleprompt.gepa.gepa.GEPAFeedbackMetric
    handler: python
    options:
        members:
            - __call__
        show_source: true
        show_root_heading: true
        heading_level: 2
        docstring_style: google
        show_root_full_path: true
        show_object_full_path: false
        separate_signature: false
        inherited_members: true
:::
<!-- END_API_REF -->

When `track_stats=True`, GEPA returns detailed results about all of the proposed candidates, and metadata about the optimization run. The results are available in the `detailed_results` attribute of the optimized program returned by GEPA, and has the following type:
<!-- START_API_REF -->
::: dspy.teleprompt.gepa.gepa.DspyGEPAResult
    handler: python
    options:
        show_source: true
        show_root_heading: true
        heading_level: 2
        docstring_style: google
        show_root_full_path: true
        show_object_full_path: false
        separate_signature: false
        inherited_members: true
:::
<!-- END_API_REF -->

## Usage Examples

See GEPA usage tutorials in [GEPA Tutorials](../../../tutorials/gepa_ai_program/index.md).

### Inference-Time Search

GEPA can act as a test-time/inference search mechanism. By setting your `valset` to your _evaluation batch_ and using `track_best_outputs=True`, GEPA produces for each batch element the highest-scoring outputs found during the evolutionary search.

```python
gepa = dspy.GEPA(metric=metric, track_stats=True, ...)
new_prog = gepa.compile(student, trainset=my_tasks, valset=my_tasks)
highest_score_achieved_per_task = new_prog.detailed_results.highest_score_achieved_per_val_task
best_outputs = new_prog.detailed_results.best_outputs_valset
```

## How Does GEPA Work?

### 1. **Reflective Prompt Mutation**

GEPA uses LLMs to _reflect_ on structured execution traces (inputs, outputs, failures, feedback), targeting a chosen module and proposing a new instruction/program text tailored to real observed failures and rich textual/environmental feedback.

### 2. **Rich Textual Feedback as Optimization Signal**

GEPA can leverage _any_ textual feedback available—not just scalar rewards. This includes evaluation logs, code traces, failed parses, constraint violations, error message strings, or even isolated submodule-specific feedback. This allows actionable, domain-aware optimization. 

### 3. **Pareto-based Candidate Selection**

Rather than evolving just the _best_ global candidate (which leads to local optima or stagnation), GEPA maintains a Pareto frontier: the set of candidates which achieve the highest score on at least one evaluation instance. In each iteration, the next candidate to mutate is sampled (with probability proportional to coverage) from this frontier, guaranteeing both exploration and robust retention of complementary strategies.

### Algorithm Summary

1. **Initialize** the candidate pool with the the unoptimized program.
2. **Iterate**:
   - **Sample a candidate** (from Pareto frontier).
   - **Sample a minibatch** from the train set.
   - **Collect execution traces + feedbacks** for module rollout on minibatch.
   - **Select a module** of the candidate for targeted improvement.
   - **LLM Reflection:** Propose a new instruction/prompt for the targeted module using reflective meta-prompting and the gathered feedback.
   - **Roll out the new candidate** on the minibatch; **if improved, evaluate on Pareto validation set**.
   - **Update the candidate pool/Pareto frontier.**
   - **[Optionally] System-aware merge/crossover**: Combine best-performing modules from distinct lineages.
3. **Continue** until rollout or metric budget is exhausted. 
4. **Return** candidate with best aggregate performance on validation.

## Implementing Feedback Metrics

A well-designed metric is central to GEPA's sample efficiency and learning signal richness. GEPA expects the metric to returns a `dspy.Prediction(score=..., feedback=...)`. GEPA leverages natural language traces from LLM-based workflows for optimization, preserving intermediate trajectories and errors in plain text rather than reducing them to numerical rewards. This mirrors human diagnostic processes, enabling clearer identification of system behaviors and bottlenecks.

Practical Recipe for GEPA-Friendly Feedback:

- **Leverage Existing Artifacts**: Use logs, unit tests, evaluation scripts, and profiler outputs; surfacing these often suffices.
- **Decompose Outcomes**: Break scores into per-objective components (e.g., correctness, latency, cost, safety) and attribute errors to steps.
- **Expose Trajectories**: Label pipeline stages, reporting pass/fail with salient errors (e.g., in code generation pipelines).
- **Ground in Checks**: Employ automatic validators (unit tests, schemas, simulators) or LLM-as-a-judge for non-verifiable tasks (as in PUPA).
- **Prioritize Clarity**: Focus on error coverage and decision points over technical complexity.

### Examples

- **Document Retrieval** (e.g., HotpotQA): List correctly retrieved, incorrect, or missed documents, beyond mere Recall/F1 scores.
- **Multi-Objective Tasks** (e.g., PUPA): Decompose aggregate scores to reveal contributions from each objective, highlighting tradeoffs (e.g., quality vs. privacy).
- **Stacked Pipelines** (e.g., code generation: parse → compile → run → profile → evaluate): Expose stage-specific failures; natural-language traces often suffice for LLM self-correction.

## Custom Instruction Proposal

For advanced customization of GEPA's instruction proposal mechanism, including custom instruction proposers and component selectors, see [Advanced Features](GEPA_Advanced.md).

## Further Reading

- [GEPA Paper: arxiv:2507.19457](https://arxiv.org/abs/2507.19457)
- [GEPA Github](https://github.com/gepa-ai/gepa) - This repository provides the core GEPA evolution pipeline used by `dspy.GEPA` optimizer.
- [DSPy Tutorials](../../../tutorials/gepa_ai_program/index.md)

```

--------------------------------------------------------------------------------
/docs/docs/tutorials/cache/index.md:
--------------------------------------------------------------------------------

```markdown
# Use and Customize DSPy Cache

In this tutorial, we will explore the design of DSPy's caching mechanism and demonstrate how to effectively use and customize it.

## DSPy Cache Structure

DSPy's caching system is architected in three distinct layers:

1.  **In-memory cache**: Implemented using `cachetools.LRUCache`, this layer provides fast access to frequently used data.
2.  **On-disk cache**: Leveraging `diskcache.FanoutCache`, this layer offers persistent storage for cached items.
3.  **Prompt cache (Server-side cache)**: This layer is managed by the LLM service provider (e.g., OpenAI, Anthropic).

While DSPy does not directly control the server-side prompt cache, it offers users the flexibility to enable, disable, and customize the in-memory and on-disk caches to suit their specific requirements.

## Using DSPy Cache

By default, both in-memory and on-disk caching are automatically enabled in DSPy. No specific action is required to start using the cache. When a cache hit occurs, you will observe a significant reduction in the module call's execution time. Furthermore, if usage tracking is enabled, the usage metrics for a cached call will be `None`.

Consider the following example:

```python
import dspy
import os
import time

os.environ["OPENAI_API_KEY"] = "{your_openai_key}"

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"), track_usage=True)

predict = dspy.Predict("question->answer")

start = time.time()
result1 = predict(question="Who is the GOAT of basketball?")
print(f"Time elapse: {time.time() - start: 2f}\n\nTotal usage: {result1.get_lm_usage()}")

start = time.time()
result2 = predict(question="Who is the GOAT of basketball?")
print(f"Time elapse: {time.time() - start: 2f}\n\nTotal usage: {result2.get_lm_usage()}")
```

A sample output looks like:

```
Time elapse:  4.384113
Total usage: {'openai/gpt-4o-mini': {'completion_tokens': 97, 'prompt_tokens': 144, 'total_tokens': 241, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0, 'text_tokens': None}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0, 'text_tokens': None, 'image_tokens': None}}}

Time elapse:  0.000529
Total usage: {}
```

## Disabling/Enabling DSPy Cache

There are scenarios where you might need to disable caching, either entirely or selectively for in-memory or on-disk caches. For instance:

- You require different responses for identical LM requests.
- You lack disk write permissions and need to disable the on-disk cache.
- You have limited memory resources and wish to disable the in-memory cache.

DSPy provides the `dspy.configure_cache()` utility function for this purpose. You can use the corresponding flags to control the enabled/disabled state of each cache type:

```python
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)
```

In additions, you can manage the capacity of the in-memory and on-disk caches:

```python
dspy.configure_cache(
    enable_disk_cache=True,
    enable_memory_cache=True,
    disk_size_limit_bytes=YOUR_DESIRED_VALUE,
    memory_max_entries=YOUR_DESIRED_VALUE,
)
```

Please note that `disk_size_limit_bytes` defines the maximum size in bytes for the on-disk cache, while `memory_max_entries` specifies the maximum number of entries for the in-memory cache.

## Understanding and Customizing the Cache

In specific situations, you might want to implement a custom cache, for example, to gain finer control over how cache keys are generated. By default, the cache key is derived from a hash of all request arguments sent to `litellm`, excluding credentials like `api_key`.

To create a custom cache, you need to subclass `dspy.clients.Cache` and override the relevant methods:

```python
class CustomCache(dspy.clients.Cache):
    def __init__(self, **kwargs):
        {write your own constructor}

    def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> str:
        {write your logic of computing cache key}

    def get(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> Any:
        {write your cache read logic}

    def put(
        self,
        request: dict[str, Any],
        value: Any,
        ignored_args_for_cache_key: Optional[list[str]] = None,
        enable_memory_cache: bool = True,
    ) -> None:
        {write your cache write logic}
```

To ensure seamless integration with the rest of DSPy, it is recommended to implement your custom cache using the same method signatures as the base class, or at a minimum, include `**kwargs` in your method definitions to prevent runtime errors during cache read/write operations.

Once your custom cache class is defined, you can instruct DSPy to use it:

```python
dspy.cache = CustomCache()
```

Let's illustrate this with a practical example. Suppose we want the cache key computation to depend solely on the request message content, ignoring other parameters like the specific LM being called. We can create a custom cache as follows:

```python
class CustomCache(dspy.clients.Cache):

    def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> str:
        messages = request.get("messages", [])
        return sha256(ujson.dumps(messages, sort_keys=True).encode()).hexdigest()

dspy.cache = CustomCache(enable_disk_cache=True, enable_memory_cache=True, disk_cache_dir=dspy.clients.DISK_CACHE_DIR)
```

For comparison, consider executing the code below without the custom cache:

```python
import dspy
import os
import time

os.environ["OPENAI_API_KEY"] = "{your_openai_key}"

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

predict = dspy.Predict("question->answer")

start = time.time()
result1 = predict(question="Who is the GOAT of soccer?")
print(f"Time elapse: {time.time() - start: 2f}")

start = time.time()
with dspy.context(lm=dspy.LM("openai/gpt-4.1-mini")):
    result2 = predict(question="Who is the GOAT of soccer?")
print(f"Time elapse: {time.time() - start: 2f}")
```

The time elapsed will indicate that the cache is not hit on the second call. However, when using the custom cache:

```python
import dspy
import os
import time
from typing import Dict, Any, Optional
import ujson
from hashlib import sha256

os.environ["OPENAI_API_KEY"] = "{your_openai_key}"

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

class CustomCache(dspy.clients.Cache):

    def cache_key(self, request: dict[str, Any], ignored_args_for_cache_key: Optional[list[str]] = None) -> str:
        messages = request.get("messages", [])
        return sha256(ujson.dumps(messages, sort_keys=True).encode()).hexdigest()

dspy.cache = CustomCache(enable_disk_cache=True, enable_memory_cache=True, disk_cache_dir=dspy.clients.DISK_CACHE_DIR)

predict = dspy.Predict("question->answer")

start = time.time()
result1 = predict(question="Who is the GOAT of volleyball?")
print(f"Time elapse: {time.time() - start: 2f}")

start = time.time()
with dspy.context(lm=dspy.LM("openai/gpt-4.1-mini")):
    result2 = predict(question="Who is the GOAT of volleyball?")
print(f"Time elapse: {time.time() - start: 2f}")
```

You will observe that the cache is hit on the second call, demonstrating the effect of the custom cache key logic.
```

--------------------------------------------------------------------------------
/dspy/datasets/dataloader.py:
--------------------------------------------------------------------------------

```python
import random
from collections.abc import Mapping
from typing import TYPE_CHECKING

import dspy
from dspy.datasets.dataset import Dataset

if TYPE_CHECKING:
    import pandas as pd


class DataLoader(Dataset):
    def __init__(self):
        pass

    def from_huggingface(
        self,
        dataset_name: str,
        *args,
        input_keys: tuple[str] = (),
        fields: tuple[str] | None = None,
        **kwargs,
    ) -> Mapping[str, list[dspy.Example]] | list[dspy.Example]:
        if fields and not isinstance(fields, tuple):
            raise ValueError("Invalid fields provided. Please provide a tuple of fields.")

        if not isinstance(input_keys, tuple):
            raise ValueError("Invalid input keys provided. Please provide a tuple of input keys.")

        from datasets import load_dataset

        dataset = load_dataset(dataset_name, *args, **kwargs)

        if isinstance(dataset, list) and isinstance(kwargs["split"], list):
            dataset = {split_name: dataset[idx] for idx, split_name in enumerate(kwargs["split"])}

        try:
            returned_split = {}
            for split_name in dataset.keys():
                if fields:
                    returned_split[split_name] = [
                        dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys)
                        for row in dataset[split_name]
                    ]
                else:
                    returned_split[split_name] = [
                        dspy.Example({field: row[field] for field in row.keys()}).with_inputs(*input_keys)
                        for row in dataset[split_name]
                    ]

            return returned_split
        except AttributeError:
            if fields:
                return [
                    dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset
                ]
            else:
                return [
                    dspy.Example({field: row[field] for field in row.keys()}).with_inputs(*input_keys)
                    for row in dataset
                ]

    def from_csv(
        self,
        file_path: str,
        fields: list[str] | None = None,
        input_keys: tuple[str] = (),
    ) -> list[dspy.Example]:
        from datasets import load_dataset

        dataset = load_dataset("csv", data_files=file_path)["train"]

        if not fields:
            fields = list(dataset.features)

        return [dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset]

    def from_pandas(
        self,
        df: "pd.DataFrame",
        fields: list[str] | None = None,
        input_keys: tuple[str] = (),
    ) -> list[dspy.Example]:
        if fields is None:
            fields = list(df.columns)

        return [
            dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for _, row in df.iterrows()
        ]

    def from_json(
        self,
        file_path: str,
        fields: list[str] | None = None,
        input_keys: tuple[str] = (),
    ) -> list[dspy.Example]:
        from datasets import load_dataset

        dataset = load_dataset("json", data_files=file_path)["train"]

        if not fields:
            fields = list(dataset.features)

        return [dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset]

    def from_parquet(
        self,
        file_path: str,
        fields: list[str] | None = None,
        input_keys: tuple[str] = (),
    ) -> list[dspy.Example]:
        from datasets import load_dataset

        dataset = load_dataset("parquet", data_files=file_path)["train"]

        if not fields:
            fields = list(dataset.features)

        return [dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys) for row in dataset]

    def from_rm(self, num_samples: int, fields: list[str], input_keys: list[str]) -> list[dspy.Example]:
        try:
            rm = dspy.settings.rm
            try:
                return [
                    dspy.Example({field: row[field] for field in fields}).with_inputs(*input_keys)
                    for row in rm.get_objects(num_samples=num_samples, fields=fields)
                ]
            except AttributeError:
                raise ValueError(
                    "Retrieval module does not support `get_objects`. Please use a different retrieval module."
                )
        except AttributeError:
            raise ValueError(
                "Retrieval module not found. Please set a retrieval module using `dspy.settings.configure`."
            )

    def sample(
        self,
        dataset: list[dspy.Example],
        n: int,
        *args,
        **kwargs,
    ) -> list[dspy.Example]:
        if not isinstance(dataset, list):
            raise ValueError(
                f"Invalid dataset provided of type {type(dataset)}. Please provide a list of `dspy.Example`s."
            )

        return random.sample(dataset, n, *args, **kwargs)

    def train_test_split(
        self,
        dataset: list[dspy.Example],
        train_size: int | float = 0.75,
        test_size: int | float | None = None,
        random_state: int | None = None,
    ) -> Mapping[str, list[dspy.Example]]:
        if random_state is not None:
            random.seed(random_state)

        dataset_shuffled = dataset.copy()
        random.shuffle(dataset_shuffled)

        if train_size is not None and isinstance(train_size, float) and (0 < train_size < 1):
            train_end = int(len(dataset_shuffled) * train_size)
        elif train_size is not None and isinstance(train_size, int):
            train_end = train_size
        else:
            raise ValueError(
                "Invalid `train_size`. Please provide a float between 0 and 1 to represent the proportion of the "
                "dataset to include in the train split or an int to represent the absolute number of samples to "
                f"include in the train split. Received `train_size`: {train_size}."
            )

        if test_size is not None:
            if isinstance(test_size, float) and (0 < test_size < 1):
                test_end = int(len(dataset_shuffled) * test_size)
            elif isinstance(test_size, int):
                test_end = test_size
            else:
                raise ValueError(
                    "Invalid `test_size`. Please provide a float between 0 and 1 to represent the proportion of the "
                    "dataset to include in the test split or an int to represent the absolute number of samples to "
                    f"include in the test split. Received `test_size`: {test_size}."
                )
            if train_end + test_end > len(dataset_shuffled):
                raise ValueError(
                    "`train_size` + `test_size` cannot exceed the total number of samples. Received "
                    f"`train_size`: {train_end}, `test_size`: {test_end}, and `dataset_size`: {len(dataset_shuffled)}."
                )
        else:
            test_end = len(dataset_shuffled) - train_end

        train_dataset = dataset_shuffled[:train_end]
        test_dataset = dataset_shuffled[train_end : train_end + test_end]

        return {"train": train_dataset, "test": test_dataset}

```

--------------------------------------------------------------------------------
/dspy/teleprompt/bettertogether.py:
--------------------------------------------------------------------------------

```python
import logging
import random
from typing import Callable

import dspy
from dspy.primitives.example import Example
from dspy.primitives.module import Module
from dspy.teleprompt.bootstrap_finetune import (
    BootstrapFinetune,
    all_predictors_have_lms,
    kill_lms,
    launch_lms,
    prepare_student,
)
from dspy.teleprompt.random_search import BootstrapFewShotWithRandomSearch
from dspy.teleprompt.teleprompt import Teleprompter

logger = logging.getLogger(__name__)


class BetterTogether(Teleprompter):

    STRAT_SEP = " -> "

    def __init__(self,
        metric: Callable,
        prompt_optimizer: Teleprompter | None = None,
        weight_optimizer: Teleprompter | None = None,
        seed: int | None = None,
      ):
        if not dspy.settings.experimental:
            raise ValueError("This is an experimental optimizer. Set `dspy.settings.experimental` to `True` to use it.")

        # TODO: Note that the BetterTogether optimizer is meaningful when
        # BootstrapFinetune uses a metric to filter the training data before
        # fine-tuning. However, one can also choose to run this optimizer with
        # a BootstrapFinetune without a metric, say, if there aren't labels
        # available for the training data. Should this be noted somewhere?
        # TODO: We should re-consider if the metric should be required.
        self.prompt_optimizer = prompt_optimizer if prompt_optimizer else BootstrapFewShotWithRandomSearch(metric=metric)
        self.weight_optimizer = weight_optimizer if weight_optimizer else BootstrapFinetune(metric=metric)

        is_supported_prompt = isinstance(self.prompt_optimizer, BootstrapFewShotWithRandomSearch)
        is_supported_weight = isinstance(self.weight_optimizer, BootstrapFinetune)
        if not is_supported_prompt or not is_supported_weight:
            raise ValueError(
                "The BetterTogether optimizer only supports the following optimizers for now: BootstrapFinetune, "
                "BootstrapFewShotWithRandomSearch."
            )

        self.rng = random.Random(seed)

    def compile(
        self,
        student: Module,
        trainset: list[Example],
        strategy: str = "p -> w -> p",
        valset_ratio = 0.1,
    ) -> Module:
        # TODO: We could record acc on a different valset to pick the best
        # strategy within the provided strategy
        logger.info("Validating the strategy")
        parsed_strategy = strategy.lower().split(self.STRAT_SEP)

        if not all(s in ["p", "w"] for s in parsed_strategy):
            raise ValueError(
                f"The strategy should be a sequence of 'p' and 'w' separated by '{self.STRAT_SEP}', but "
                f"found: {strategy}"
            )

        logger.info("Preparing the student program...")
        # TODO: Prepare student returns student.reset_copy(), which is what gets
        # optimized. We should make this clear in the doc comments.
        student = prepare_student(student)
        all_predictors_have_lms(student)

        # Make a shallow copy of the trainset, so that we don't change the order
        # of the examples in the original trainset
        trainset = trainset[:]
        logger.info("Compiling the student program...")
        student = self._run_strategies(parsed_strategy, student, trainset, valset_ratio)

        logger.info("BetterTogether has finished compiling the student program")
        return student

    def _run_strategies(self, parsed_strategy, student, trainset, valset_ratio) -> Module:
        # Keep track of all the partial strategies/programs in parsed_strategy
        # "" corresponds to the initial student program
        candidate_programs = []
        candidate_programs.append(("", student))
        launched_flag = False

        for ind, step_code in enumerate(parsed_strategy):
            current_strategy = self.STRAT_SEP.join(parsed_strategy[:ind + 1])
            logger.info(
                f"\n########## Step {ind + 1} of {len(parsed_strategy)} - Strategy "
                f"'{current_strategy}' ##########"
            )

            logger.info("Shuffling the trainset...")
            self.rng.shuffle(trainset)
            if not launched_flag:
                launch_lms(student)
                launched_flag = True

            # TODO: Should we reset or just deepcopy? How does resetting affect
            # the predictor LMs?
            student = student.deepcopy()
            student._compiled = False
            if step_code == "p":
                student = self._compile_prompt_optimizer(student, trainset, valset_ratio)
            elif step_code == "w":
                student = self._compile_weight_optimizer(student, trainset)
                launched_flag = False

            # Record the program corresponding to the current strategy
            candidate_programs.append((current_strategy, student))

        if launched_flag:
            kill_lms(student)

        student.candidate_programs = candidate_programs
        return student

    def _compile_prompt_optimizer(self, student, trainset, valset_ratio) -> Module:
        logger.info("Preparing for prompt optimization...")

        # Sampling a validation set from the trainset for the prompt optimizer
        # We drop the hints for prompt optimization
        trainset = [x.with_inputs(*list(set(x.inputs().keys()) - {"hint"})) for x in trainset]
        num_val = int(valset_ratio * len(trainset))
        prompt_valset = trainset[:num_val]
        prompt_trainset = trainset[num_val:]

        # TODO: To make this optimizer general, we need to ensure that all the
        # prompt optimizers are accepting a valset or encode a way to check if
        # a valset should be passed to an optimizer's compile method.
        # TODO: We should ensure that the prompt optimizers in DSPy respect the
        # predictor.lm attributes. In particular,
        # BootstrapFewShotWithRandomSearch seems to be resetting these. We are
        # manually re-setting the LMs here to circumvent this issue, but we
        # should consider addressing it in BFRS.
        logger.info("Compiling the prompt optimizer...")
        pred_lms = [pred.lm for pred in student.predictors()]
        student = self.prompt_optimizer.compile(student, trainset=prompt_trainset, valset=prompt_valset)
        for pred, lm in zip(student.predictors(), pred_lms, strict=False):
            pred.lm = lm

        return student

    def _compile_weight_optimizer(self, student, trainset) -> Module:
        logger.info("Preparing for weight optimization...")

        # Saving the LMs before compiling the weight optimizer
        original_lms = [pred.lm for pred in student.predictors()]

        # TODO: To make this optimizer general, we need to ensure that all the
        # prompt optimizers are accepting a valset or encode a way to check if
        # a valset should be passed to an optimizer's compile.
        logger.info("Compiling the weight optimizer...")
        student = self.weight_optimizer.compile(student, trainset=trainset)

        # Updating the train kwargs for the new LMs. This is needed because the
        # train_kwargs of the optimizer is configured for the original LMs.
        new_lms = [pred.lm for pred in student.predictors()]
        for original_lm, new_lm in zip(original_lms, new_lms, strict=False):
            original_params = self.weight_optimizer.train_kwargs[original_lm]
            self.weight_optimizer.train_kwargs[new_lm] = original_params

        return student

```

--------------------------------------------------------------------------------
/dspy/clients/embedding.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Callable

import litellm
import numpy as np

from dspy.clients.cache import request_cache


class Embedder:
    """DSPy embedding class.

    The class for computing embeddings for text inputs. This class provides a unified interface for both:

    1. Hosted embedding models (e.g. OpenAI's text-embedding-3-small) via litellm integration
    2. Custom embedding functions that you provide

    For hosted models, simply pass the model name as a string (e.g., "openai/text-embedding-3-small"). The class will use
    litellm to handle the API calls and caching.

    For custom embedding models, pass a callable function that:
    - Takes a list of strings as input.
    - Returns embeddings as either:
        - A 2D numpy array of float32 values
        - A 2D list of float32 values
    - Each row should represent one embedding vector

    Args:
        model: The embedding model to use. This can be either a string (representing the name of the hosted embedding
            model, must be an embedding model supported by litellm) or a callable that represents a custom embedding
            model.
        batch_size (int, optional): The default batch size for processing inputs in batches. Defaults to 200.
        caching (bool, optional): Whether to cache the embedding response when using a hosted model. Defaults to True.
        **kwargs: Additional default keyword arguments to pass to the embedding model.

    Examples:
        Example 1: Using a hosted model.

        ```python
        import dspy

        embedder = dspy.Embedder("openai/text-embedding-3-small", batch_size=100)
        embeddings = embedder(["hello", "world"])

        assert embeddings.shape == (2, 1536)
        ```

        Example 2: Using any local embedding model, e.g. from https://huggingface.co/models?library=sentence-transformers.

        ```python
        # pip install sentence_transformers
        import dspy
        from sentence_transformers import SentenceTransformer

        # Load an extremely efficient local model for retrieval
        model = SentenceTransformer("sentence-transformers/static-retrieval-mrl-en-v1", device="cpu")

        embedder = dspy.Embedder(model.encode)
        embeddings = embedder(["hello", "world"], batch_size=1)

        assert embeddings.shape == (2, 1024)
        ```

        Example 3: Using a custom function.

        ```python
        import dspy
        import numpy as np

        def my_embedder(texts):
            return np.random.rand(len(texts), 10)

        embedder = dspy.Embedder(my_embedder)
        embeddings = embedder(["hello", "world"], batch_size=1)

        assert embeddings.shape == (2, 10)
        ```
    """

    def __init__(self, model: str | Callable, batch_size: int = 200, caching: bool = True, **kwargs: dict[str, Any]):
        self.model = model
        self.batch_size = batch_size
        self.caching = caching
        self.default_kwargs = kwargs

    def _preprocess(self, inputs, batch_size=None, caching=None, **kwargs):
        if isinstance(inputs, str):
            is_single_input = True
            inputs = [inputs]
        else:
            is_single_input = False

        if not all(isinstance(inp, str) for inp in inputs):
            raise ValueError("All inputs must be strings.")

        batch_size = batch_size or self.batch_size
        caching = caching or self.caching
        merged_kwargs = self.default_kwargs.copy()
        merged_kwargs.update(kwargs)

        input_batches = []
        for i in range(0, len(inputs), batch_size):
            input_batches.append(inputs[i : i + batch_size])

        return input_batches, caching, merged_kwargs, is_single_input

    def _postprocess(self, embeddings_list, is_single_input):
        embeddings = np.array(embeddings_list, dtype=np.float32)
        if is_single_input:
            return embeddings[0]
        else:
            return np.array(embeddings, dtype=np.float32)

    def __call__(self, inputs: str | list[str], batch_size: int | None = None, caching: bool | None = None, **kwargs: dict[str, Any]) -> np.ndarray:
        """Compute embeddings for the given inputs.

        Args:
            inputs: The inputs to compute embeddings for, can be a single string or a list of strings.
            batch_size (int, optional): The batch size for processing inputs. If None, defaults to the batch_size set
                during initialization.
            caching (bool, optional): Whether to cache the embedding response when using a hosted model. If None,
                defaults to the caching setting from initialization.
            kwargs: Additional keyword arguments to pass to the embedding model. These will override the default
                kwargs provided during initialization.

        Returns:
            numpy.ndarray: If the input is a single string, returns a 1D numpy array representing the embedding.
            If the input is a list of strings, returns a 2D numpy array of embeddings, one embedding per row.
        """
        input_batches, caching, kwargs, is_single_input = self._preprocess(inputs, batch_size, caching, **kwargs)

        compute_embeddings = _cached_compute_embeddings if caching else _compute_embeddings

        embeddings_list = []

        for batch in input_batches:
            embeddings_list.extend(compute_embeddings(self.model, batch, caching=caching, **kwargs))
        return self._postprocess(embeddings_list, is_single_input)

    async def acall(self, inputs, batch_size=None, caching=None, **kwargs):
        input_batches, caching, kwargs, is_single_input = self._preprocess(inputs, batch_size, caching, **kwargs)

        embeddings_list = []
        acompute_embeddings = _cached_acompute_embeddings if caching else _acompute_embeddings

        for batch in input_batches:
            embeddings_list.extend(await acompute_embeddings(self.model, batch, caching=caching, **kwargs))
        return self._postprocess(embeddings_list, is_single_input)


def _compute_embeddings(model, batch_inputs, caching=False, **kwargs):
    if isinstance(model, str):
        caching = caching and litellm.cache is not None
        embedding_response = litellm.embedding(model=model, input=batch_inputs, caching=caching, **kwargs)
        return [data["embedding"] for data in embedding_response.data]
    elif callable(model):
        return model(batch_inputs, **kwargs)
    else:
        raise ValueError(f"`model` in `dspy.Embedder` must be a string or a callable, but got {type(model)}.")


@request_cache(ignored_args_for_cache_key=["api_key", "api_base", "base_url"])
def _cached_compute_embeddings(model, batch_inputs, caching=True, **kwargs):
    return _compute_embeddings(model, batch_inputs, caching=caching, **kwargs)


async def _acompute_embeddings(model, batch_inputs, caching=False, **kwargs):
    if isinstance(model, str):
        caching = caching and litellm.cache is not None
        embedding_response = await litellm.aembedding(model=model, input=batch_inputs, caching=caching, **kwargs)
        return [data["embedding"] for data in embedding_response.data]
    elif callable(model):
        return model(batch_inputs, **kwargs)
    else:
        raise ValueError(f"`model` in `dspy.Embedder` must be a string or a callable, but got {type(model)}.")


@request_cache(ignored_args_for_cache_key=["api_key", "api_base", "base_url"])
async def _cached_acompute_embeddings(model, batch_inputs, caching=True, **kwargs):
    return await _acompute_embeddings(model, batch_inputs, caching=caching, **kwargs)

```

--------------------------------------------------------------------------------
/dspy/utils/dummies.py:
--------------------------------------------------------------------------------

```python
import random
from collections import defaultdict
from typing import Any

import numpy as np

from dspy.adapters.chat_adapter import FieldInfoWithName, field_header_pattern
from dspy.clients.lm import LM
from dspy.dsp.utils.utils import dotdict
from dspy.signatures.field import OutputField
from dspy.utils.callback import with_callbacks


class DummyLM(LM):
    """Dummy language model for unit testing purposes.

    Three modes of operation:

    Mode 1: List of dictionaries

    If a list of dictionaries is provided, the dummy model will return the next dictionary
    in the list for each request, formatted according to the `format_field_with_value` function.

    Example:

    ```
    lm = DummyLM([{"answer": "red"}, {"answer": "blue"}])
    dspy.settings.configure(lm=lm)
    predictor("What color is the sky?")
    # Output:
    # [[## answer ##]]
    # red
    predictor("What color is the sky?")
    # Output:
    # [[## answer ##]]
    # blue
    ```

    Mode 2: Dictionary of dictionaries

    If a dictionary of dictionaries is provided, the dummy model will return the value
    corresponding to the key which is contained with the final message of the prompt,
    formatted according to the `format_field_with_value` function from the chat adapter.

    ```
    lm = DummyLM({"What color is the sky?": {"answer": "blue"}})
    dspy.settings.configure(lm=lm)
    predictor("What color is the sky?")
    # Output:
    # [[## answer ##]]
    # blue
    ```

    Mode 3: Follow examples

    If `follow_examples` is set to True, and the prompt contains an example input exactly equal to the prompt,
    the dummy model will return the output from that example.

    ```
    lm = DummyLM([{"answer": "red"}], follow_examples=True)
    dspy.settings.configure(lm=lm)
    predictor("What color is the sky?, demos=dspy.Example(input="What color is the sky?", output="blue"))
    # Output:
    # [[## answer ##]]
    # blue
    ```

    """

    def __init__(self, answers: list[dict[str, Any]] | dict[str, dict[str, Any]], follow_examples: bool = False, adapter=None):
        super().__init__("dummy", "chat", 0.0, 1000, True)
        self.answers = answers
        if isinstance(answers, list):
            self.answers = iter(answers)
        self.follow_examples = follow_examples

        # Set adapter, defaulting to ChatAdapter
        if adapter is None:
            from dspy.adapters.chat_adapter import ChatAdapter
            adapter = ChatAdapter()
        self.adapter = adapter

    def _use_example(self, messages):
        # find all field names
        fields = defaultdict(int)
        for message in messages:
            if "content" in message:
                if ma := field_header_pattern.match(message["content"]):
                    fields[message["content"][ma.start() : ma.end()]] += 1
        # find the fields which are missing from the final turns
        max_count = max(fields.values())
        output_fields = [field for field, count in fields.items() if count != max_count]

        # get the output from the last turn that has the output fields as headers
        final_input = messages[-1]["content"].split("\n\n")[0]
        for input, output in zip(reversed(messages[:-1]), reversed(messages), strict=False):
            if any(field in output["content"] for field in output_fields) and final_input in input["content"]:
                return output["content"]

    @with_callbacks
    def __call__(self, prompt=None, messages=None, **kwargs):
        def format_answer_fields(field_names_and_values: dict[str, Any]):
            fields_with_values = {
                FieldInfoWithName(name=field_name, info=OutputField()): value
                for field_name, value in field_names_and_values.items()
            }
            # The reason why DummyLM needs an adapter is because it needs to know which output format to mimic.
            # Normally LMs should not have any knowledge of an adapter, because the output format is defined in the prompt.
            adapter = self.adapter

            # Try to use role="assistant" if the adapter supports it (like JSONAdapter)
            try:
                return adapter.format_field_with_value(fields_with_values, role="assistant")
            except TypeError:
                # Fallback for adapters that don't support role parameter (like ChatAdapter)
                return adapter.format_field_with_value(fields_with_values)

        # Build the request.
        outputs = []
        for _ in range(kwargs.get("n", 1)):
            messages = messages or [{"role": "user", "content": prompt}]
            kwargs = {**self.kwargs, **kwargs}

            if self.follow_examples:
                outputs.append(self._use_example(messages))
            elif isinstance(self.answers, dict):
                outputs.append(
                    next(
                        (format_answer_fields(v) for k, v in self.answers.items() if k in messages[-1]["content"]),
                        "No more responses",
                    )
                )
            else:
                outputs.append(format_answer_fields(next(self.answers, {"answer": "No more responses"})))

            # Logging, with removed api key & where `cost` is None on cache hit.
            kwargs = {k: v for k, v in kwargs.items() if not k.startswith("api_")}
            entry = {"prompt": prompt, "messages": messages, "kwargs": kwargs}
            entry = {**entry, "outputs": outputs, "usage": 0}
            entry = {**entry, "cost": 0}
            self.update_history(entry)

        return outputs

    async def acall(self, prompt=None, messages=None, **kwargs):
        return self.__call__(prompt=prompt, messages=messages, **kwargs)

    def get_convo(self, index):
        """Get the prompt + answer from the ith message."""
        return self.history[index]["messages"], self.history[index]["outputs"]


def dummy_rm(passages=()) -> callable:
    if not passages:

        def inner(query: str, *, k: int, **kwargs):
            raise ValueError("No passages defined")

        return inner
    max_length = max(map(len, passages)) + 100
    vectorizer = DummyVectorizer(max_length)
    passage_vecs = vectorizer(passages)

    def inner(query: str, *, k: int, **kwargs):
        assert k <= len(passages)
        query_vec = vectorizer([query])[0]
        scores = passage_vecs @ query_vec
        largest_idx = (-scores).argsort()[:k]

        return [dotdict(long_text=passages[i]) for i in largest_idx]

    return inner


class DummyVectorizer:
    """Simple vectorizer based on n-grams."""

    def __init__(self, max_length=100, n_gram=2):
        self.max_length = max_length
        self.n_gram = n_gram
        self.P = 10**9 + 7  # A large prime number
        random.seed(123)
        self.coeffs = [random.randrange(1, self.P) for _ in range(n_gram)]

    def _hash(self, gram):
        """Hashes a string using a polynomial hash function."""
        h = 1
        for coeff, c in zip(self.coeffs, gram, strict=False):
            h = h * coeff + ord(c)
            h %= self.P
        return h % self.max_length

    def __call__(self, texts: list[str]) -> np.ndarray:
        vecs = []
        for text in texts:
            grams = [text[i : i + self.n_gram] for i in range(len(text) - self.n_gram + 1)]
            vec = [0] * self.max_length
            for gram in grams:
                vec[self._hash(gram)] += 1
            vecs.append(vec)

        vecs = np.array(vecs, dtype=np.float32)
        vecs -= np.mean(vecs, axis=1, keepdims=True)
        vecs /= np.linalg.norm(vecs, axis=1, keepdims=True) + 1e-10  # Added epsilon to avoid division by zero
        return vecs

```

--------------------------------------------------------------------------------
/tests/utils/test_settings.py:
--------------------------------------------------------------------------------

```python
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from unittest import mock

import pytest
from litellm import Choices, Message, ModelResponse

import dspy


def test_basic_dspy_settings():
    dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x])
    assert dspy.settings.lm.model == "openai/gpt-4o"
    assert isinstance(dspy.settings.adapter, dspy.JSONAdapter)
    assert len(dspy.settings.callbacks) == 1


def test_forbid_configure_call_in_child_thread():
    dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x])

    def worker():
        with pytest.raises(RuntimeError, match="Cannot call dspy.configure"):
            dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[])

    with ThreadPoolExecutor(max_workers=1) as executor:
        executor.submit(worker)


def test_dspy_context():
    dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x])
    with dspy.context(lm=dspy.LM("openai/gpt-4o-mini"), callbacks=[]):
        assert dspy.settings.lm.model == "openai/gpt-4o-mini"
        assert len(dspy.settings.callbacks) == 0

    assert dspy.settings.lm.model == "openai/gpt-4o"
    assert len(dspy.settings.callbacks) == 1


def test_dspy_context_parallel():
    dspy.configure(lm=dspy.LM("openai/gpt-4o"), adapter=dspy.JSONAdapter(), callbacks=[lambda x: x])

    def worker(i):
        with dspy.context(lm=dspy.LM("openai/gpt-4o-mini"), trace=[i], callbacks=[]):
            assert dspy.settings.lm.model == "openai/gpt-4o-mini"
            assert dspy.settings.trace == [i]
            assert len(dspy.settings.callbacks) == 0

    with ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(worker, range(3))

    assert dspy.settings.lm.model == "openai/gpt-4o"
    assert len(dspy.settings.callbacks) == 1


def test_dspy_context_with_dspy_parallel():
    dspy.configure(lm=dspy.LM("openai/gpt-4o", cache=False), adapter=dspy.ChatAdapter())

    class MyModule(dspy.Module):
        def __init__(self):
            self.predict = dspy.Predict("question -> answer")

        def forward(self, question: str) -> str:
            lm = dspy.LM("openai/gpt-4o-mini", cache=False) if "France" in question else dspy.settings.lm
            with dspy.context(lm=lm):
                time.sleep(1)
                assert dspy.settings.lm.model == lm.model
                return self.predict(question=question)

    with mock.patch("litellm.completion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content="[[ ## answer ## ]]\nParis"))],
            model="openai/gpt-4o-mini",
        )

        module = MyModule()
        parallelizer = dspy.Parallel()
        input_pairs = [
            (module, {"question": "What is the capital of France?"}),
            (module, {"question": "What is the capital of Germany?"}),
        ]
        parallelizer(input_pairs)

        # Verify mock was called correctly
        assert mock_completion.call_count == 2
        for call_args in mock_completion.call_args_list:
            if "France" in call_args.kwargs["messages"][-1]["content"]:
                # France question uses gpt-4o-mini
                assert call_args.kwargs["model"] == "openai/gpt-4o-mini"
            else:
                # Germany question uses gpt-4o
                assert call_args.kwargs["model"] == "openai/gpt-4o"

        # The main thread is not affected by the context
        assert dspy.settings.lm.model == "openai/gpt-4o"


@pytest.mark.asyncio
async def test_dspy_context_with_async_task_group():
    class MyModule(dspy.Module):
        def __init__(self):
            self.predict = dspy.Predict("question -> answer")

        async def aforward(self, question: str) -> str:
            lm = (
                dspy.LM("openai/gpt-4o-mini", cache=False)
                if "France" in question
                else dspy.LM("openai/gpt-4o", cache=False)
            )
            with dspy.context(lm=lm, trace=[]):
                await asyncio.sleep(1)
                assert dspy.settings.lm.model == lm.model
                result = await self.predict.acall(question=question)
                assert len(dspy.settings.trace) == 1
                return result

    module = MyModule()

    with dspy.context(lm=dspy.LM("openai/gpt-4.1", cache=False), adapter=dspy.ChatAdapter()):
        with mock.patch("litellm.acompletion") as mock_completion:
            mock_completion.return_value = ModelResponse(
                choices=[Choices(message=Message(content="[[ ## answer ## ]]\nParis"))],
                model="openai/gpt-4o-mini",
            )

            # Define the coroutines to be run
            coroutines = [
                module.acall(question="What is the capital of France?"),
                module.acall(question="What is the capital of France?"),
                module.acall(question="What is the capital of Germany?"),
                module.acall(question="What is the capital of Germany?"),
            ]

            # Run them concurrently and gather results
            results = await asyncio.gather(*coroutines)

        assert results[0].answer == "Paris"
        assert results[1].answer == "Paris"
        assert results[2].answer == "Paris"
        assert results[3].answer == "Paris"

        # Verify mock was called correctly
        assert mock_completion.call_count == 4
        # France question uses gpt-4o-mini
        assert mock_completion.call_args_list[0].kwargs["model"] == "openai/gpt-4o-mini"
        assert mock_completion.call_args_list[1].kwargs["model"] == "openai/gpt-4o-mini"
        # Germany question uses gpt-4o
        assert mock_completion.call_args_list[2].kwargs["model"] == "openai/gpt-4o"
        assert mock_completion.call_args_list[3].kwargs["model"] == "openai/gpt-4o"

        # The main thread is not affected by the context
        assert dspy.settings.lm.model == "openai/gpt-4.1"
        assert dspy.settings.trace == []


@pytest.mark.asyncio
async def test_dspy_configure_allowance_async():
    def bar1():
        # `dspy.configure` is disallowed in different async tasks from the initial one.
        # In this case, foo1 (async) calls bar1 (sync), and bar1 uses the async task from foo1.
        with pytest.raises(RuntimeError) as e:
            dspy.configure(lm=dspy.LM("openai/gpt-4o"))
        assert "dspy.settings.configure(...) can only be called from the same async" in str(e.value)

    async def foo1():
        bar1()
        await asyncio.sleep(0.1)

    async def foo2():
        # `dspy.configure` is disallowed in different async tasks from the initial one.
        with pytest.raises(RuntimeError) as e:
            dspy.configure(lm=dspy.LM("openai/gpt-4o"))
        assert "dspy.settings.configure(...) can only be called from the same async" in str(e.value)
        await asyncio.sleep(0.1)

    async def foo3():
        # `dspy.context` is allowed in different async tasks from the initial one.
        with dspy.context(lm=dspy.LM("openai/gpt-4o")):
            await asyncio.sleep(0.1)

    async def foo4():
        # foo4 is directly invoked by the entry task, so it has the same async task as the entry task.
        dspy.configure(lm=dspy.LM("openai/gpt-4o"))
        await asyncio.sleep(0.1)

    # `dspy.configure` is allowed to be called multiple times in the same async task.
    dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))
    dspy.configure(lm=dspy.LM("openai/gpt-4o"))
    dspy.configure(adapter=dspy.JSONAdapter())

    await asyncio.gather(foo1(), foo2(), foo3())

    foo4()

```

--------------------------------------------------------------------------------
/dspy/retrievers/embeddings.py:
--------------------------------------------------------------------------------

```python
import json
import os
from typing import Any

import numpy as np

from dspy.utils.unbatchify import Unbatchify


class Embeddings:
    def __init__(
        self,
        corpus: list[str],
        embedder,
        k: int = 5,
        callbacks: list[Any] | None = None,
        cache: bool = False,
        brute_force_threshold: int = 20_000,
        normalize: bool = True,
    ):
        assert cache is False, "Caching is not supported for embeddings-based retrievers"

        self.embedder = embedder
        self.k = k
        self.corpus = corpus
        self.normalize = normalize

        self.corpus_embeddings = self.embedder(self.corpus)
        self.corpus_embeddings = self._normalize(self.corpus_embeddings) if self.normalize else self.corpus_embeddings

        self.index = self._build_faiss() if len(corpus) >= brute_force_threshold else None
        self.search_fn = Unbatchify(self._batch_forward)

    def __call__(self, query: str):
        return self.forward(query)

    def forward(self, query: str):
        import dspy

        passages, indices = self.search_fn(query)
        return dspy.Prediction(passages=passages, indices=indices)

    def _batch_forward(self, queries: list[str]):
        q_embeds = self.embedder(queries)
        q_embeds = self._normalize(q_embeds) if self.normalize else q_embeds

        pids = self._faiss_search(q_embeds, self.k * 10) if self.index else None
        pids = np.tile(np.arange(len(self.corpus)), (len(queries), 1)) if pids is None else pids

        return self._rerank_and_predict(q_embeds, pids)

    def _build_faiss(self):
        nbytes = 32
        partitions = int(2 * np.sqrt(len(self.corpus)))
        dim = self.corpus_embeddings.shape[1]

        try:
            import faiss
        except ImportError:
            raise ImportError("Please `pip install faiss-cpu` or increase `brute_force_threshold` to avoid FAISS.")

        quantizer = faiss.IndexFlatL2(dim)
        index = faiss.IndexIVFPQ(quantizer, dim, partitions, nbytes, 8)

        print(
            f"Training a {nbytes}-byte FAISS index with {partitions} partitions, based on "
            f"{len(self.corpus)} x {dim}-dim embeddings"
        )
        index.train(self.corpus_embeddings)
        index.add(self.corpus_embeddings)
        index.nprobe = min(16, partitions)

        return index

    def _faiss_search(self, query_embeddings: np.ndarray, num_candidates: int):
        return self.index.search(query_embeddings, num_candidates)[1]

    def _rerank_and_predict(self, q_embeds: np.ndarray, candidate_indices: np.ndarray):
        candidate_embeddings = self.corpus_embeddings[candidate_indices]
        scores = np.einsum("qd,qkd->qk", q_embeds, candidate_embeddings)

        top_k_indices = np.argsort(-scores, axis=1)[:, : self.k]
        top_indices = candidate_indices[np.arange(len(q_embeds))[:, None], top_k_indices]

        return [([self.corpus[idx] for idx in indices], [idx for idx in indices]) for indices in top_indices]  # noqa: C416

    def _normalize(self, embeddings: np.ndarray):
        norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
        return embeddings / np.maximum(norms, 1e-10)

    def save(self, path: str):
        """
        Save the embeddings index to disk.

        This saves the corpus, embeddings, FAISS index (if present), and configuration
        to allow for fast loading without recomputing embeddings.

        Args:
            path: Directory path where the embeddings will be saved
        """
        os.makedirs(path, exist_ok=True)

        # Save configuration and corpus
        config = {
            "k": self.k,
            "normalize": self.normalize,
            "corpus": self.corpus,
            "has_faiss_index": self.index is not None,
        }

        with open(os.path.join(path, "config.json"), "w") as f:
            json.dump(config, f, indent=2)

        # Save embeddings
        np.save(os.path.join(path, "corpus_embeddings.npy"), self.corpus_embeddings)

        # Save FAISS index if it exists
        if self.index is not None:
            try:
                import faiss
                faiss.write_index(self.index, os.path.join(path, "faiss_index.bin"))
            except ImportError:
                # If FAISS is not available, we can't save the index
                # but we can still save the embeddings for brute force search
                pass

    def load(self, path: str, embedder):
        """
        Load the embeddings index from disk into the current instance.

        Args:
            path: Directory path where the embeddings were saved
            embedder: The embedder function to use for new queries

        Returns:
            self: Returns self for method chaining

        Raises:
            FileNotFoundError: If the save directory or required files don't exist
            ValueError: If the saved config is invalid or incompatible
        """
        if not os.path.exists(path):
            raise FileNotFoundError(f"Save directory not found: {path}")

        config_path = os.path.join(path, "config.json")
        embeddings_path = os.path.join(path, "corpus_embeddings.npy")

        if not os.path.exists(config_path):
            raise FileNotFoundError(f"Config file not found: {config_path}")
        if not os.path.exists(embeddings_path):
            raise FileNotFoundError(f"Embeddings file not found: {embeddings_path}")

        # Load configuration and corpus
        with open(config_path) as f:
            config = json.load(f)

        # Validate required config fields
        required_fields = ["k", "normalize", "corpus", "has_faiss_index"]
        for field in required_fields:
            if field not in config:
                raise ValueError(f"Invalid config: missing required field '{field}'")

        # Restore configuration
        self.k = config["k"]
        self.normalize = config["normalize"]
        self.corpus = config["corpus"]
        self.embedder = embedder

        # Load embeddings
        self.corpus_embeddings = np.load(embeddings_path)

        # Load FAISS index if it was saved and FAISS is available
        faiss_index_path = os.path.join(path, "faiss_index.bin")
        if config["has_faiss_index"] and os.path.exists(faiss_index_path):
            try:
                import faiss
                self.index = faiss.read_index(faiss_index_path)
            except ImportError:
                # If FAISS is not available, fall back to brute force
                self.index = None
        else:
            self.index = None

        return self

    @classmethod
    def from_saved(cls, path: str, embedder):
        """
        Create an Embeddings instance from a saved index.

        This is the recommended way to load saved embeddings as it creates a new
        instance without unnecessarily computing embeddings.

        Args:
            path: Directory path where the embeddings were saved
            embedder: The embedder function to use for new queries

        Returns:
            Embeddings instance loaded from disk

        Example:
            ```python
            # Save embeddings
            embeddings = Embeddings(corpus, embedder)
            embeddings.save("./saved_embeddings")

            # Load embeddings later
            loaded_embeddings = Embeddings.from_saved("./saved_embeddings", embedder)
            ```
        """
        # Create a minimal instance without triggering embedding computation
        instance = cls.__new__(cls)
        # Initialize the search function (required since we bypassed __init__)
        instance.search_fn = Unbatchify(instance._batch_forward)
        instance.load(path, embedder)
        return instance

```

--------------------------------------------------------------------------------
/dspy/datasets/alfworld/base_config.yml:
--------------------------------------------------------------------------------

```yaml
dataset:
  data_path: '$ALFWORLD_DATA/json_2.1.1/train'
  eval_id_data_path: '$ALFWORLD_DATA/json_2.1.1/valid_seen'    # null/None to disable
  eval_ood_data_path: '$ALFWORLD_DATA/json_2.1.1/valid_unseen' # null/None to disable
  num_train_games: -1                                          # max training games (<=0 indicates full dataset)
  num_eval_games: -1                                           # max evaluation games (<=0 indicates full dataset)

logic:
  domain: '$ALFWORLD_DATA/logic/alfred.pddl'                   # PDDL domain file that defines the world dynamics
  grammar: '$ALFWORLD_DATA/logic/alfred.twl2'                  # Grammar file that defines the text feedbacks

env:
  type: 'AlfredTWEnv'                                          # 'AlfredTWEnv' or 'AlfredThorEnv' or 'AlfredHybrid'
  regen_game_files: False                                      # check if game is solvable by expert and save to game.tw-pddl file
  domain_randomization: False                                  # shuffle Textworld print order and object id nums
  task_types: [1, 2, 3, 4, 5, 6]                               # task-type ids: 1 - Pick & Place, 2 - Examine in Light, 3 - Clean & Place, 4 - Heat & Place, 5 - Cool & Place, 6 - Pick Two & Place
  expert_timeout_steps: 150                                    # max steps before timeout for expert to solve the task
  expert_type: "handcoded"                                     # 'handcoded' or 'downward'. Note: the downward planner is very slow for real-time use
  goal_desc_human_anns_prob: 0.0                               # prob of using human-annotated goal language instead of templated goals (1.0 indicates all human annotations from ALFRED)

  hybrid:
    start_eps: 100000                                          # starting episode of hybrid training, tw-only training upto this point
    thor_prob: 0.5                                             # prob of AlfredThorEnv during hybrid training
    eval_mode: "tw"                                            # 'tw' or 'thor' - env used for evaluation during hybrid training

  thor:
    screen_width: 300                                          # width of THOR window
    screen_height: 300                                         # height of THOR window
    smooth_nav: False                                          # smooth rotations, looks, and translations during navigation (very slow)
    save_frames_to_disk: False                                 # save frame PNGs to disk (useful for making videos)
    save_frames_path: './videos/'                              # path to save frame PNGs

controller:
  type: 'oracle'                                               # 'oracle' or 'oracle_astar' or 'mrcnn' or 'mrcnn_astar' (aka BUTLER)
  debug: False
  load_receps: True                                            # load receptacle locations from precomputed dict (if available)

mask_rcnn:
  pretrained_model_path: '$ALFWORLD_DATA/detectors/mrcnn.pth'

general:
  random_seed: 42
  use_cuda: True                                               # disable this when running on machine without cuda
  visdom: False                                                # plot training/eval curves, run with visdom server
  task: 'alfred'
  training_method: 'dagger'                                    # 'dqn' or 'dagger'
  save_path: './training/'                                     # path to save pytorch models
  observation_pool_capacity: 3                                 # k-size queue, 0 indicates no observation
  hide_init_receptacles: False                                 # remove initial observation containing navigable receptacles

  training:
    batch_size: 10
    max_episode: 50000
    smoothing_eps: 0.1
    optimizer:
      learning_rate: 0.001
      clip_grad_norm: 5

  evaluate:
    run_eval: True
    batch_size: 10
    env:
      type: "AlfredTWEnv"

  checkpoint:
    report_frequency: 1000                                    # report every N episode
    experiment_tag: 'test'                                    # name of experiment
    load_pretrained: False                                    # during test, enable this so that the agent load your pretrained model
    load_from_tag: 'not loading anything'                     # name of pre-trained model to load in save_path

  model:
    encoder_layers: 1
    decoder_layers: 1
    encoder_conv_num: 5
    block_hidden_dim: 64
    n_heads: 1
    dropout: 0.1
    block_dropout: 0.1
    recurrent: True

rl:
  action_space: "admissible"                                  # 'admissible' (candidates from text engine) or 'generation' (seq2seq-style generation) or 'beam_search_choice' or 'exhaustive' (not working)
  max_target_length: 20                                       # max token length for seq2seq generation
  beam_width: 10                                              # 1 means greedy
  generate_top_k: 3

  training:
    max_nb_steps_per_episode: 50                              # terminate after this many steps
    learn_start_from_this_episode: 0                          # delay updates until this episode
    target_net_update_frequency: 500                          # sync target net with online net per this many epochs

  replay:
    accumulate_reward_from_final: True
    count_reward_lambda: 0.0                                  # 0 to disable
    novel_object_reward_lambda: 0.0                           # 0 to disable
    discount_gamma_game_reward: 0.9
    discount_gamma_count_reward: 0.5
    discount_gamma_novel_object_reward: 0.5
    replay_memory_capacity: 500000                            # adjust this depending on your RAM size
    replay_memory_priority_fraction: 0.5
    update_per_k_game_steps: 5
    replay_batch_size: 64
    multi_step: 3
    replay_sample_history_length: 4
    replay_sample_update_from: 2

  epsilon_greedy:
    noisy_net: False                                          # if this is true, then epsilon greedy is disabled
    epsilon_anneal_episodes: 1000                             # -1 if not annealing
    epsilon_anneal_from: 0.3
    epsilon_anneal_to: 0.1

dagger:
  action_space: "generation"                                  # 'admissible' (candidates from text engine) or 'generation' (seq2seq-style generation) or 'exhaustive' (not working)
  max_target_length: 20                                       # max token length for seq2seq generation
  beam_width: 10                                              # 1 means greedy
  generate_top_k: 5
  unstick_by_beam_search: False                               # use beam-search for failed actions, set True during evaluation

  training:
    max_nb_steps_per_episode: 50                              # terminate after this many steps

  fraction_assist:
    fraction_assist_anneal_episodes: 50000
    fraction_assist_anneal_from: 1.0
    fraction_assist_anneal_to: 0.01

  fraction_random:
    fraction_random_anneal_episodes: 0
    fraction_random_anneal_from: 0.0
    fraction_random_anneal_to: 0.0

  replay:
    replay_memory_capacity: 500000
    update_per_k_game_steps: 5
    replay_batch_size: 64
    replay_sample_history_length: 4
    replay_sample_update_from: 2

vision_dagger:
  model_type: "resnet"                                        # 'resnet' (whole image features) or 'maskrcnn_whole' (whole image MaskRCNN feats) or 'maskrcnn' (top k MaskRCNN detection feats) or 'no_vision' (zero vision input)
  resnet_fc_dim: 64
  maskrcnn_top_k_boxes: 10                                    # top k box features
  use_exploration_frame_feats: False                          # append feats from initial exploration (memory intensive!)
  sequence_aggregation_method: "average"                      # 'sum' or 'average' or 'rnn'
```

--------------------------------------------------------------------------------
/dspy/adapters/types/citation.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Optional

import pydantic

from dspy.adapters.types.base_type import Type
from dspy.utils.annotation import experimental


@experimental(version="3.0.4")
class Citations(Type):
    """Citations extracted from an LM response with source references.

    This type represents citations returned by language models that support
    citation extraction, particularly Anthropic's Citations API through LiteLLM.
    Citations include the quoted text and source information.

    Example:
        ```python
        import os
        import dspy
        from dspy.signatures import Signature
        from dspy.experimental import Citations, Document
        os.environ["ANTHROPIC_API_KEY"] = "YOUR_ANTHROPIC_API_KEY"

        class AnswerWithSources(Signature):
            '''Answer questions using provided documents with citations.'''
            documents: list[Document] = dspy.InputField()
            question: str = dspy.InputField()
            answer: str = dspy.OutputField()
            citations: Citations = dspy.OutputField()

        # Create documents to provide as sources
        docs = [
            Document(
                data="The Earth orbits the Sun in an elliptical path.",
                title="Basic Astronomy Facts"
            ),
            Document(
                data="Water boils at 100°C at standard atmospheric pressure.",
                title="Physics Fundamentals",
                metadata={"author": "Dr. Smith", "year": 2023}
            )
        ]

        # Use with a model that supports citations like Claude
        lm = dspy.LM("anthropic/claude-opus-4-1-20250805")
        predictor = dspy.Predict(AnswerWithSources)
        result = predictor(documents=docs, question="What temperature does water boil?", lm=lm)

        for citation in result.citations.citations:
            print(citation.format())
        ```
    """

    class Citation(Type):
        """Individual citation with character location information."""
        type: str = "char_location"
        cited_text: str
        document_index: int
        document_title: str | None = None
        start_char_index: int
        end_char_index: int
        supported_text: str | None = None

        def format(self) -> dict[str, Any]:
            """Format citation as dictionary for LM consumption.

            Returns:
                A dictionary in the format expected by citation APIs.
            """
            citation_dict = {
                "type": self.type,
                "cited_text": self.cited_text,
                "document_index": self.document_index,
                "start_char_index": self.start_char_index,
                "end_char_index": self.end_char_index
            }

            if self.document_title:
                citation_dict["document_title"] = self.document_title

            if self.supported_text:
                citation_dict["supported_text"] = self.supported_text

            return citation_dict

    citations: list[Citation]

    @classmethod
    def from_dict_list(cls, citations_dicts: list[dict[str, Any]]) -> "Citations":
        """Convert a list of dictionaries to a Citations instance.

        Args:
            citations_dicts: A list of dictionaries, where each dictionary should have 'cited_text' key
                and 'document_index', 'start_char_index', 'end_char_index' keys.

        Returns:
            A Citations instance.

        Example:
            ```python
            citations_dict = [
                {
                    "cited_text": "The sky is blue",
                    "document_index": 0,
                    "document_title": "Weather Guide",
                    "start_char_index": 0,
                    "end_char_index": 15,
                    "supported_text": "The sky was blue yesterday."
                }
            ]
            citations = Citations.from_dict_list(citations_dict)
            ```
        """
        citations = [cls.Citation(**item) for item in citations_dicts]
        return cls(citations=citations)

    @classmethod
    def description(cls) -> str:
        """Description of the citations type for use in prompts."""
        return (
            "Citations with quoted text and source references. "
            "Include the exact text being cited and information about its source."
        )

    def format(self) -> list[dict[str, Any]]:
        """Format citations as a list of dictionaries."""
        return [citation.format() for citation in self.citations]

    @pydantic.model_validator(mode="before")
    @classmethod
    def validate_input(cls, data: Any):
        if isinstance(data, cls):
            return data

        # Handle case where data is a list of dicts with citation info
        if isinstance(data, list) and all(
            isinstance(item, dict) and "cited_text" in item for item in data
        ):
            return {"citations": [cls.Citation(**item) for item in data]}

        # Handle case where data is a dict
        elif isinstance(data, dict):
            if "citations" in data:
                # Handle case where data is a dict with "citations" key
                citations_data = data["citations"]
                if isinstance(citations_data, list):
                    return {
                        "citations": [
                            cls.Citation(**item) if isinstance(item, dict) else item
                            for item in citations_data
                        ]
                    }
            elif "cited_text" in data:
                # Handle case where data is a single citation dict
                return {"citations": [cls.Citation(**data)]}

        raise ValueError(f"Received invalid value for `Citations`: {data}")

    def __iter__(self):
        """Allow iteration over citations."""
        return iter(self.citations)

    def __len__(self):
        """Return the number of citations."""
        return len(self.citations)

    def __getitem__(self, index):
        """Allow indexing into citations."""
        return self.citations[index]

    @classmethod
    def is_streamable(cls) -> bool:
        """Whether the Citations type is streamable."""
        return True

    @classmethod
    def parse_stream_chunk(cls, chunk) -> Optional["Citations"]:
        """
        Parse a stream chunk into Citations.

        Args:
            chunk: A stream chunk from the LM.

        Returns:
            A Citations object if the chunk contains citation data, None otherwise.
        """
        try:
            # Check if the chunk has citation data in provider_specific_fields
            if hasattr(chunk, "choices") and chunk.choices:
                delta = chunk.choices[0].delta
                if hasattr(delta, "provider_specific_fields") and delta.provider_specific_fields:
                    citation_data = delta.provider_specific_fields.get("citation")
                    if citation_data:
                        return cls.from_dict_list([citation_data])
        except Exception:
            pass
        return None


    @classmethod
    def parse_lm_response(cls, response: str | dict[str, Any]) -> Optional["Citations"]:
        """Parse a LM response into Citations.

        Args:
            response: A LM response that may contain citation data.

        Returns:
            A Citations object if citation data is found, None otherwise.
        """
        if isinstance(response, dict):
            # Check if the response contains citations in the expected format
            if "citations" in response:
                citations_data = response["citations"]
                if isinstance(citations_data, list):
                    return cls.from_dict_list(citations_data)

        return None

```

--------------------------------------------------------------------------------
/docs/docs/tutorials/conversation_history/index.md:
--------------------------------------------------------------------------------

```markdown
# Managing Conversation History

Maintaining conversation history is a fundamental feature when building AI applications such as chatbots. While DSPy does not provide automatic conversation history management within `dspy.Module`, it offers the `dspy.History` utility to help you manage conversation history effectively.

## Using `dspy.History` to Manage Conversation History

The `dspy.History` class can be used as an input field type, containing a `messages: list[dict[str, Any]]` attribute that stores the conversation history. Each entry in this list is a dictionary with keys corresponding to the fields defined in your signature. See the example below:

```python
import dspy
import os

os.environ["OPENAI_API_KEY"] = "{your_openai_api_key}"

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))

class QA(dspy.Signature):
    question: str = dspy.InputField()
    history: dspy.History = dspy.InputField()
    answer: str = dspy.OutputField()

predict = dspy.Predict(QA)
history = dspy.History(messages=[])

while True:
    question = input("Type your question, end conversation by typing 'finish': ")
    if question == "finish":
        break
    outputs = predict(question=question, history=history)
    print(f"\n{outputs.answer}\n")
    history.messages.append({"question": question, **outputs})

dspy.inspect_history()
```

There are two key steps when using the conversation history:

- **Include a field of type `dspy.History` in your Signature.**
- **Maintain a history instance at runtime, appending new conversation turns to it.** Each entry should include all relevant input and output field information.

A sample run might look like this:

```
Type your question, end conversation by typing 'finish': do you know the competition between pytorch and tensorflow?

Yes, there is a notable competition between PyTorch and TensorFlow, which are two of the most popular deep learning frameworks. PyTorch, developed by Facebook, is known for its dynamic computation graph, which allows for more flexibility and ease of use, especially in research settings. TensorFlow, developed by Google, initially used a static computation graph but has since introduced eager execution to improve usability. TensorFlow is often favored in production environments due to its scalability and deployment capabilities. Both frameworks have strong communities and extensive libraries, and the choice between them often depends on specific project requirements and personal preference.

Type your question, end conversation by typing 'finish': which one won the battle? just tell me the result, don't include any reasoning, thanks!

There is no definitive winner; both PyTorch and TensorFlow are widely used and have their own strengths.
Type your question, end conversation by typing 'finish': finish




[2025-07-11T16:35:57.592762]

System message:

Your input fields are:
1. `question` (str): 
2. `history` (History):
Your output fields are:
1. `answer` (str):
All interactions will be structured in the following way, with the appropriate values filled in.

[[ ## question ## ]]
{question}

[[ ## history ## ]]
{history}

[[ ## answer ## ]]
{answer}

[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Given the fields `question`, `history`, produce the fields `answer`.


User message:

[[ ## question ## ]]
do you know the competition between pytorch and tensorflow?


Assistant message:

[[ ## answer ## ]]
Yes, there is a notable competition between PyTorch and TensorFlow, which are two of the most popular deep learning frameworks. PyTorch, developed by Facebook, is known for its dynamic computation graph, which allows for more flexibility and ease of use, especially in research settings. TensorFlow, developed by Google, initially used a static computation graph but has since introduced eager execution to improve usability. TensorFlow is often favored in production environments due to its scalability and deployment capabilities. Both frameworks have strong communities and extensive libraries, and the choice between them often depends on specific project requirements and personal preference.

[[ ## completed ## ]]


User message:

[[ ## question ## ]]
which one won the battle? just tell me the result, don't include any reasoning, thanks!

Respond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`.


Response:

[[ ## answer ## ]]
There is no definitive winner; both PyTorch and TensorFlow are widely used and have their own strengths.

[[ ## completed ## ]]
```

Notice how each user input and assistant response is appended to the history, allowing the model to maintain context across turns.

The actual prompt sent to the language model is a multi-turn message, as shown by the output of `dspy.inspect_history`. Each conversation turn is represented as a user message followed by an assistant message.

## History in Few-shot Examples

You may notice that `history` does not appear in the input fields section of the prompt, even though it is listed as an input field (e.g., "2. `history` (History):" in the system message). This is intentional: when formatting few-shot examples that include conversation history, DSPy does not expand the history into multiple turns. Instead, to remain compatible with the OpenAI standard format, each few-shot example is represented as a single turn.

For example:

```
import dspy

dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini"))


class QA(dspy.Signature):
    question: str = dspy.InputField()
    history: dspy.History = dspy.InputField()
    answer: str = dspy.OutputField()


predict = dspy.Predict(QA)
history = dspy.History(messages=[])

predict.demos.append(
    dspy.Example(
        question="What is the capital of France?",
        history=dspy.History(
            messages=[{"question": "What is the capital of Germany?", "answer": "The capital of Germany is Berlin."}]
        ),
        answer="The capital of France is Paris.",
    )
)

predict(question="What is the capital of America?", history=dspy.History(messages=[]))
dspy.inspect_history()
```

The resulting history will look like this:

```
[2025-07-11T16:53:10.994111]

System message:

Your input fields are:
1. `question` (str): 
2. `history` (History):
Your output fields are:
1. `answer` (str):
All interactions will be structured in the following way, with the appropriate values filled in.

[[ ## question ## ]]
{question}

[[ ## history ## ]]
{history}

[[ ## answer ## ]]
{answer}

[[ ## completed ## ]]
In adhering to this structure, your objective is: 
        Given the fields `question`, `history`, produce the fields `answer`.


User message:

[[ ## question ## ]]
What is the capital of France?

[[ ## history ## ]]
{"messages": [{"question": "What is the capital of Germany?", "answer": "The capital of Germany is Berlin."}]}


Assistant message:

[[ ## answer ## ]]
The capital of France is Paris.

[[ ## completed ## ]]


User message:

[[ ## question ## ]]
What is the capital of Germany?

Respond with the corresponding output fields, starting with the field `[[ ## answer ## ]]`, and then ending with the marker for `[[ ## completed ## ]]`.


Response:

[[ ## answer ## ]]
The capital of Germany is Berlin.

[[ ## completed ## ]]
```

As you can see, the few-shot example does not expand the conversation history into multiple turns. Instead, it represents the history as JSON data within its section:

```
[[ ## history ## ]]
{"messages": [{"question": "What is the capital of Germany?", "answer": "The capital of Germany is Berlin."}]}
```

This approach ensures compatibility with standard prompt formats while still providing the model with relevant conversational context.


```

--------------------------------------------------------------------------------
/dspy/predict/program_of_thought.py:
--------------------------------------------------------------------------------

```python
import json
import logging
import re

import dspy
from dspy.primitives.module import Module
from dspy.primitives.python_interpreter import PythonInterpreter
from dspy.signatures.signature import Signature, ensure_signature

logger = logging.getLogger(__name__)


class ProgramOfThought(Module):
    """
    A DSPy module that runs Python programs to solve a problem.
    This module requires deno to be installed. Please install deno following https://docs.deno.com/runtime/getting_started/installation/

    Example:
    ```
    import dspy

    lm = dspy.LM('openai/gpt-4o-mini')
    dspy.configure(lm=lm)
    pot = dspy.ProgramOfThought("question -> answer")
    pot(question="what is 1+1?")
    ```
    """

    def __init__(self, signature: str | type[Signature], max_iters: int = 3, interpreter: PythonInterpreter | None = None):
        """
        Args:
            signature: The signature of the module.
            max_iters: The maximum number of iterations to retry code generation and execution.
            interpreter: PythonInterpreter instance to use. If None, a new one is instantiated.
        """
        super().__init__()
        self.signature = signature = ensure_signature(signature)
        self.max_iters = max_iters

        self.input_fields = signature.input_fields
        self.output_fields = signature.output_fields

        self.code_generate = dspy.ChainOfThought(
            dspy.Signature(
                self._generate_signature("generate").fields,
                self._generate_instruction("generate"),
            ),
        )
        self.code_regenerate = dspy.ChainOfThought(
            dspy.Signature(
                self._generate_signature("regenerate").fields,
                self._generate_instruction("regenerate"),
            ),
        )
        self.generate_answer = dspy.ChainOfThought(
            dspy.Signature(
                self._generate_signature("answer").fields,
                self._generate_instruction("answer"),
            ),
        )
        # It will raises exception when dspy cannot find available deno instance by now.
        self.interpreter = interpreter or PythonInterpreter()

    def _generate_signature(self, mode):
        signature_dict = dict(self.input_fields)
        fields_for_mode = {
            "generate": {
                "generated_code": dspy.OutputField(
                    prefix="Code:",
                    desc="python code that answers the question",
                    format=str,
                ),
            },
            "regenerate": {
                "previous_code": dspy.InputField(
                    prefix="Previous Code:",
                    desc="previously-generated python code that errored",
                    format=str,
                ),
                "error": dspy.InputField(
                    prefix="Error:",
                    desc="error message from previously-generated python code",
                ),
                "generated_code": dspy.OutputField(
                    prefix="Code:",
                    desc="python code that answers the question",
                    format=str,
                ),
            },
            "answer": {
                "final_generated_code": dspy.InputField(
                    prefix="Code:",
                    desc="python code that answers the question",
                    format=str,
                ),
                "code_output": dspy.InputField(
                    prefix="Code Output:",
                    desc="output of previously-generated python code",
                ),
            }
            | self.signature.output_fields,
        }
        signature_dict.update(fields_for_mode[mode])
        return dspy.Signature(signature_dict)

    def _generate_instruction(self, mode):
        mode_inputs = ", ".join(
            [f"`{field_name}`" for field_name in self._generate_signature(mode).input_fields],
        )
        mode_outputs = ", ".join(
            [f"`{field_name}`" for field_name in self._generate_signature(mode).output_fields],
        )
        final_outputs = ", ".join(
            [f"`{field_name}`" for field_name in self.output_fields],
        )
        if mode == "generate":
            instr = [
                f"You will be given {mode_inputs} and you will respond with {mode_outputs}.",
                f"Generating executable Python code that programmatically computes the correct {mode_outputs}.",
                "After you're done with the computation and think you have the answer, make sure to provide your answer by calling the preloaded function `final_answer()`.",
                f'You should structure your answer in a dict object, like {{"field_a": answer_a, ...}}, evaluates to the correct value mapping for {final_outputs}.',
            ]
        elif mode == "regenerate":
            instr = [
                f"You are given {mode_inputs} due to an error in previous code.",
                "Your task is to correct the error and provide the new `generated_code`.",
            ]
        else:  # mode == 'answer'
            instr = [
                f"Given the final code {mode_inputs}, provide the final {mode_outputs}.",
            ]

        return "\n".join(instr)

    def _parse_code(self, code_data):
        code = code_data.get("generated_code", "").split("---", 1)[0].split("\n\n\n", 1)[0]
        code_match = re.search(r"```python[ \n](.*?)[ \n]```?", code, re.DOTALL)
        code_block = (code_match.group(1) if code_match else code).replace("\\n", "\n")
        if not code_block:
            return code, "Error: Empty code after parsing."
        if "\n" not in code_block and code_block.count("=") > 1:
            return code, "Error: Code format is not correct."
        lines = code_block.split("\n")
        last_line_match = re.match(r"^(\w+)\s*=", lines[-1].strip())
        if last_line_match and len(lines) > 1:
            code_block += "\n" + last_line_match.group(1)
        else:
            code_block = re.sub(
                r"([a-zA-Z_]\w* *=.*?)(?=[a-zA-Z_]\w* *=)",
                r"\1\n",
                code_block,
            )
            code_block = re.sub(
                r"([a-zA-Z_]\w* *=.*?)([a-zA-Z_]\w*)$",
                r"\1\n\2",
                code_block,
            )
        return code_block, None

    def _execute_code(self, code):
        """
        Execute the code using PythonInterpreter and return the output or error.
        """
        if not code:
            return None, "Error: Empty code before execution."

        try:
            # Since it's more complex structure now, just blindly use json to represents all.
            output = json.dumps(self.interpreter.execute(code))
            return output, None
        except Exception as e:
            return None, str(e)

    def forward(self, **kwargs):
        input_kwargs = {field_name: kwargs[field_name] for field_name in self.input_fields}
        code_data = self.code_generate(**input_kwargs)
        output = None
        code, error = self._parse_code(code_data)
        if not error:
            output, error = self._execute_code(code)
        hop = 1
        # Retying code generation and execution until no error or reach max_iters
        while error is not None:
            logger.error(f"Error in code execution: {error}")
            if hop == self.max_iters:
                self.interpreter.shutdown()
                raise RuntimeError(f"Max hops reached. Failed to run ProgramOfThought: {error}")
            input_kwargs.update({"previous_code": code, "error": error})
            code_data = self.code_regenerate(**input_kwargs)
            code, error = self._parse_code(code_data)
            if not error:
                output, error = self._execute_code(code)
            hop += 1
        input_kwargs.update({"final_generated_code": code, "code_output": output})
        answer_gen_result = self.generate_answer(**input_kwargs)
        self.interpreter.shutdown()
        return answer_gen_result

```
Page 4/14FirstPrevNextLast