#
tokens: 47262/50000 8/391 files (page 10/14)
lines: off (toggle) GitHub
raw markdown copy
This is page 10 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context.

# Directory Structure

```
├── .github
│   ├── .internal_dspyai
│   │   ├── internals
│   │   │   ├── build-and-release.md
│   │   │   └── release-checklist.md
│   │   └── pyproject.toml
│   ├── .tmp
│   │   └── .generated-actions
│   │       └── run-pypi-publish-in-docker-container
│   │           └── action.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   └── feature_request.yml
│   ├── PULL_REQUEST_TEMPLATE
│   │   └── pull_request_template.md
│   ├── workflow_scripts
│   │   └── install_testpypi_pkg.sh
│   └── workflows
│       ├── build_and_release.yml
│       ├── build_utils
│       │   └── test_version.py
│       ├── docs-push.yml
│       ├── precommits_check.yml
│       └── run_tests.yml
├── .gitignore
├── .pre-commit-config.yaml
├── CONTRIBUTING.md
├── docs
│   ├── .gitignore
│   ├── docs
│   │   ├── api
│   │   │   ├── adapters
│   │   │   │   ├── Adapter.md
│   │   │   │   ├── ChatAdapter.md
│   │   │   │   ├── JSONAdapter.md
│   │   │   │   └── TwoStepAdapter.md
│   │   │   ├── evaluation
│   │   │   │   ├── answer_exact_match.md
│   │   │   │   ├── answer_passage_match.md
│   │   │   │   ├── CompleteAndGrounded.md
│   │   │   │   ├── Evaluate.md
│   │   │   │   ├── EvaluationResult.md
│   │   │   │   └── SemanticF1.md
│   │   │   ├── experimental
│   │   │   │   ├── Citations.md
│   │   │   │   └── Document.md
│   │   │   ├── index.md
│   │   │   ├── models
│   │   │   │   ├── Embedder.md
│   │   │   │   └── LM.md
│   │   │   ├── modules
│   │   │   │   ├── BestOfN.md
│   │   │   │   ├── ChainOfThought.md
│   │   │   │   ├── CodeAct.md
│   │   │   │   ├── Module.md
│   │   │   │   ├── MultiChainComparison.md
│   │   │   │   ├── Parallel.md
│   │   │   │   ├── Predict.md
│   │   │   │   ├── ProgramOfThought.md
│   │   │   │   ├── ReAct.md
│   │   │   │   └── Refine.md
│   │   │   ├── optimizers
│   │   │   │   ├── BetterTogether.md
│   │   │   │   ├── BootstrapFewShot.md
│   │   │   │   ├── BootstrapFewShotWithRandomSearch.md
│   │   │   │   ├── BootstrapFinetune.md
│   │   │   │   ├── BootstrapRS.md
│   │   │   │   ├── COPRO.md
│   │   │   │   ├── Ensemble.md
│   │   │   │   ├── GEPA
│   │   │   │   │   ├── GEPA_Advanced.md
│   │   │   │   │   └── overview.md
│   │   │   │   ├── InferRules.md
│   │   │   │   ├── KNN.md
│   │   │   │   ├── KNNFewShot.md
│   │   │   │   ├── LabeledFewShot.md
│   │   │   │   ├── MIPROv2.md
│   │   │   │   └── SIMBA.md
│   │   │   ├── primitives
│   │   │   │   ├── Audio.md
│   │   │   │   ├── Code.md
│   │   │   │   ├── Example.md
│   │   │   │   ├── History.md
│   │   │   │   ├── Image.md
│   │   │   │   ├── Prediction.md
│   │   │   │   ├── Tool.md
│   │   │   │   └── ToolCalls.md
│   │   │   ├── signatures
│   │   │   │   ├── InputField.md
│   │   │   │   ├── OutputField.md
│   │   │   │   └── Signature.md
│   │   │   ├── tools
│   │   │   │   ├── ColBERTv2.md
│   │   │   │   ├── Embeddings.md
│   │   │   │   └── PythonInterpreter.md
│   │   │   └── utils
│   │   │       ├── asyncify.md
│   │   │       ├── configure_cache.md
│   │   │       ├── disable_litellm_logging.md
│   │   │       ├── disable_logging.md
│   │   │       ├── enable_litellm_logging.md
│   │   │       ├── enable_logging.md
│   │   │       ├── inspect_history.md
│   │   │       ├── load.md
│   │   │       ├── StatusMessage.md
│   │   │       ├── StatusMessageProvider.md
│   │   │       ├── streamify.md
│   │   │       └── StreamListener.md
│   │   ├── cheatsheet.md
│   │   ├── community
│   │   │   ├── community-resources.md
│   │   │   ├── how-to-contribute.md
│   │   │   └── use-cases.md
│   │   ├── deep-dive
│   │   │   └── data-handling
│   │   │       ├── built-in-datasets.md
│   │   │       ├── examples.md
│   │   │       ├── img
│   │   │       │   └── data-loading.png
│   │   │       └── loading-custom-data.md
│   │   ├── faqs.md
│   │   ├── index.md
│   │   ├── js
│   │   │   └── runllm-widget.js
│   │   ├── learn
│   │   │   ├── evaluation
│   │   │   │   ├── data.md
│   │   │   │   ├── metrics.md
│   │   │   │   └── overview.md
│   │   │   ├── figures
│   │   │   │   ├── native_tool_call.png
│   │   │   │   └── teleprompter-classes.png
│   │   │   ├── index.md
│   │   │   ├── optimization
│   │   │   │   ├── optimizers.md
│   │   │   │   └── overview.md
│   │   │   └── programming
│   │   │       ├── 7-assertions.md
│   │   │       ├── adapters.md
│   │   │       ├── language_models.md
│   │   │       ├── mcp.md
│   │   │       ├── modules.md
│   │   │       ├── overview.md
│   │   │       ├── signatures.md
│   │   │       └── tools.md
│   │   ├── production
│   │   │   └── index.md
│   │   ├── roadmap.md
│   │   ├── static
│   │   │   ├── .nojekyll
│   │   │   └── img
│   │   │       ├── dspy_logo.png
│   │   │       ├── logo.png
│   │   │       ├── mlflow-tracing-rag.png
│   │   │       ├── modular.png
│   │   │       ├── optimize.png
│   │   │       ├── undraw_docusaurus_mountain.svg
│   │   │       ├── undraw_docusaurus_react.svg
│   │   │       ├── undraw_docusaurus_tree.svg
│   │   │       └── universal_compatibility.png
│   │   ├── stylesheets
│   │   │   └── extra.css
│   │   └── tutorials
│   │       ├── agents
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-agent.png
│   │       ├── ai_text_game
│   │       │   └── index.md
│   │       ├── async
│   │       │   └── index.md
│   │       ├── audio
│   │       │   └── index.ipynb
│   │       ├── build_ai_program
│   │       │   └── index.md
│   │       ├── cache
│   │       │   └── index.md
│   │       ├── classification
│   │       │   └── index.md
│   │       ├── classification_finetuning
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-classification.png
│   │       ├── conversation_history
│   │       │   └── index.md
│   │       ├── core_development
│   │       │   └── index.md
│   │       ├── custom_module
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-custom-module.png
│   │       ├── customer_service_agent
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-customer-service-agent.png
│   │       ├── deployment
│   │       │   ├── dspy_mlflow_ui.png
│   │       │   └── index.md
│   │       ├── email_extraction
│   │       │   ├── index.md
│   │       │   └── mlflow-tracing-email-extraction.png
│   │       ├── entity_extraction
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-entity-extraction.png
│   │       ├── games
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-agent.png
│   │       ├── gepa_ai_program
│   │       │   └── index.md
│   │       ├── gepa_aime
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-aime.png
│   │       │   └── mlflow-tracking-gepa-aime-optimization.png
│   │       ├── gepa_facilitysupportanalyzer
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-support.png
│   │       │   └── mlflow-tracking-gepa-support-optimization.png
│   │       ├── gepa_papillon
│   │       │   ├── index.ipynb
│   │       │   ├── mlflow-tracing-gepa-papilon.png
│   │       │   └── mlflow-tracking-gepa-papilon-optimization.png
│   │       ├── image_generation_prompting
│   │       │   └── index.ipynb
│   │       ├── index.md
│   │       ├── llms_txt_generation
│   │       │   └── index.md
│   │       ├── math
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-math.png
│   │       ├── mcp
│   │       │   └── index.md
│   │       ├── mem0_react_agent
│   │       │   └── index.md
│   │       ├── multihop_search
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-multi-hop.png
│   │       ├── observability
│   │       │   ├── index.md
│   │       │   ├── mlflow_trace_ui_navigation.gif
│   │       │   ├── mlflow_trace_ui.png
│   │       │   └── mlflow_trace_view.png
│   │       ├── optimize_ai_program
│   │       │   └── index.md
│   │       ├── optimizer_tracking
│   │       │   ├── child_run.png
│   │       │   ├── experiment.png
│   │       │   ├── index.md
│   │       │   └── parent_run.png
│   │       ├── output_refinement
│   │       │   └── best-of-n-and-refine.md
│   │       ├── papillon
│   │       │   └── index.md
│   │       ├── program_of_thought
│   │       │   └── index.ipynb
│   │       ├── rag
│   │       │   ├── index.ipynb
│   │       │   └── mlflow-tracing-rag.png
│   │       ├── real_world_examples
│   │       │   └── index.md
│   │       ├── rl_ai_program
│   │       │   └── index.md
│   │       ├── rl_multihop
│   │       │   └── index.ipynb
│   │       ├── rl_papillon
│   │       │   └── index.ipynb
│   │       ├── sample_code_generation
│   │       │   └── index.md
│   │       ├── saving
│   │       │   └── index.md
│   │       ├── streaming
│   │       │   └── index.md
│   │       ├── tool_use
│   │       │   └── index.ipynb
│   │       └── yahoo_finance_react
│   │           └── index.md
│   ├── mkdocs.yml
│   ├── overrides
│   │   ├── home.html
│   │   ├── main.html
│   │   └── partials
│   │       └── tabs.html
│   ├── Pipfile
│   ├── Pipfile.lock
│   ├── README.md
│   ├── requirements.txt
│   ├── scripts
│   │   ├── generate_api_docs.py
│   │   └── generate_api_summary.py
│   └── vercel.json
├── dspy
│   ├── __init__.py
│   ├── __metadata__.py
│   ├── adapters
│   │   ├── __init__.py
│   │   ├── baml_adapter.py
│   │   ├── base.py
│   │   ├── chat_adapter.py
│   │   ├── json_adapter.py
│   │   ├── two_step_adapter.py
│   │   ├── types
│   │   │   ├── __init__.py
│   │   │   ├── audio.py
│   │   │   ├── base_type.py
│   │   │   ├── citation.py
│   │   │   ├── code.py
│   │   │   ├── document.py
│   │   │   ├── history.py
│   │   │   ├── image.py
│   │   │   └── tool.py
│   │   ├── utils.py
│   │   └── xml_adapter.py
│   ├── clients
│   │   ├── __init__.py
│   │   ├── base_lm.py
│   │   ├── cache.py
│   │   ├── databricks.py
│   │   ├── embedding.py
│   │   ├── lm_local_arbor.py
│   │   ├── lm_local.py
│   │   ├── lm.py
│   │   ├── openai.py
│   │   ├── provider.py
│   │   └── utils_finetune.py
│   ├── datasets
│   │   ├── __init__.py
│   │   ├── alfworld
│   │   │   ├── __init__.py
│   │   │   ├── alfworld.py
│   │   │   └── base_config.yml
│   │   ├── colors.py
│   │   ├── dataloader.py
│   │   ├── dataset.py
│   │   ├── gsm8k.py
│   │   ├── hotpotqa.py
│   │   └── math.py
│   ├── dsp
│   │   ├── __init__.py
│   │   ├── colbertv2.py
│   │   └── utils
│   │       ├── __init__.py
│   │       ├── dpr.py
│   │       ├── settings.py
│   │       └── utils.py
│   ├── evaluate
│   │   ├── __init__.py
│   │   ├── auto_evaluation.py
│   │   ├── evaluate.py
│   │   └── metrics.py
│   ├── experimental
│   │   └── __init__.py
│   ├── predict
│   │   ├── __init__.py
│   │   ├── aggregation.py
│   │   ├── avatar
│   │   │   ├── __init__.py
│   │   │   ├── avatar.py
│   │   │   ├── models.py
│   │   │   └── signatures.py
│   │   ├── best_of_n.py
│   │   ├── chain_of_thought.py
│   │   ├── code_act.py
│   │   ├── knn.py
│   │   ├── multi_chain_comparison.py
│   │   ├── parallel.py
│   │   ├── parameter.py
│   │   ├── predict.py
│   │   ├── program_of_thought.py
│   │   ├── react.py
│   │   ├── refine.py
│   │   └── retry.py
│   ├── primitives
│   │   ├── __init__.py
│   │   ├── base_module.py
│   │   ├── example.py
│   │   ├── module.py
│   │   ├── prediction.py
│   │   ├── python_interpreter.py
│   │   └── runner.js
│   ├── propose
│   │   ├── __init__.py
│   │   ├── dataset_summary_generator.py
│   │   ├── grounded_proposer.py
│   │   ├── propose_base.py
│   │   └── utils.py
│   ├── retrievers
│   │   ├── __init__.py
│   │   ├── databricks_rm.py
│   │   ├── embeddings.py
│   │   ├── retrieve.py
│   │   └── weaviate_rm.py
│   ├── signatures
│   │   ├── __init__.py
│   │   ├── field.py
│   │   ├── signature.py
│   │   └── utils.py
│   ├── streaming
│   │   ├── __init__.py
│   │   ├── messages.py
│   │   ├── streamify.py
│   │   └── streaming_listener.py
│   ├── teleprompt
│   │   ├── __init__.py
│   │   ├── avatar_optimizer.py
│   │   ├── bettertogether.py
│   │   ├── bootstrap_finetune.py
│   │   ├── bootstrap_trace.py
│   │   ├── bootstrap.py
│   │   ├── copro_optimizer.py
│   │   ├── ensemble.py
│   │   ├── gepa
│   │   │   ├── __init__.py
│   │   │   ├── gepa_utils.py
│   │   │   ├── gepa.py
│   │   │   └── instruction_proposal.py
│   │   ├── grpo.py
│   │   ├── infer_rules.py
│   │   ├── knn_fewshot.py
│   │   ├── mipro_optimizer_v2.py
│   │   ├── random_search.py
│   │   ├── signature_opt.py
│   │   ├── simba_utils.py
│   │   ├── simba.py
│   │   ├── teleprompt_optuna.py
│   │   ├── teleprompt.py
│   │   ├── utils.py
│   │   └── vanilla.py
│   └── utils
│       ├── __init__.py
│       ├── annotation.py
│       ├── asyncify.py
│       ├── caching.py
│       ├── callback.py
│       ├── dummies.py
│       ├── exceptions.py
│       ├── hasher.py
│       ├── inspect_history.py
│       ├── langchain_tool.py
│       ├── logging_utils.py
│       ├── mcp.py
│       ├── parallelizer.py
│       ├── saving.py
│       ├── syncify.py
│       ├── unbatchify.py
│       └── usage_tracker.py
├── LICENSE
├── pyproject.toml
├── README.md
├── tests
│   ├── __init__.py
│   ├── adapters
│   │   ├── test_adapter_utils.py
│   │   ├── test_baml_adapter.py
│   │   ├── test_base_type.py
│   │   ├── test_chat_adapter.py
│   │   ├── test_citation.py
│   │   ├── test_code.py
│   │   ├── test_document.py
│   │   ├── test_json_adapter.py
│   │   ├── test_tool.py
│   │   ├── test_two_step_adapter.py
│   │   └── test_xml_adapter.py
│   ├── callback
│   │   └── test_callback.py
│   ├── clients
│   │   ├── test_cache.py
│   │   ├── test_databricks.py
│   │   ├── test_embedding.py
│   │   ├── test_inspect_global_history.py
│   │   └── test_lm.py
│   ├── conftest.py
│   ├── datasets
│   │   └── test_dataset.py
│   ├── docs
│   │   └── test_mkdocs_links.py
│   ├── evaluate
│   │   ├── test_evaluate.py
│   │   └── test_metrics.py
│   ├── examples
│   │   └── test_baleen.py
│   ├── metadata
│   │   └── test_metadata.py
│   ├── predict
│   │   ├── test_aggregation.py
│   │   ├── test_best_of_n.py
│   │   ├── test_chain_of_thought.py
│   │   ├── test_code_act.py
│   │   ├── test_knn.py
│   │   ├── test_multi_chain_comparison.py
│   │   ├── test_parallel.py
│   │   ├── test_predict.py
│   │   ├── test_program_of_thought.py
│   │   ├── test_react.py
│   │   ├── test_refine.py
│   │   └── test_retry.py
│   ├── primitives
│   │   ├── resources
│   │   │   └── saved_program.json
│   │   ├── test_base_module.py
│   │   ├── test_example.py
│   │   ├── test_module.py
│   │   └── test_python_interpreter.py
│   ├── propose
│   │   └── test_grounded_proposer.py
│   ├── README.md
│   ├── reliability
│   │   ├── __init__.py
│   │   ├── complex_types
│   │   │   └── generated
│   │   │       ├── test_many_types_1
│   │   │       │   ├── inputs
│   │   │       │   │   ├── input1.json
│   │   │       │   │   └── input2.json
│   │   │       │   ├── program.py
│   │   │       │   └── schema.json
│   │   │       ├── test_nesting_1
│   │   │       │   ├── inputs
│   │   │       │   │   ├── input1.json
│   │   │       │   │   └── input2.json
│   │   │       │   ├── program.py
│   │   │       │   └── schema.json
│   │   │       └── test_nesting_2
│   │   │           ├── inputs
│   │   │           │   └── input1.json
│   │   │           ├── program.py
│   │   │           └── schema.json
│   │   ├── conftest.py
│   │   ├── generate
│   │   │   ├── __init__.py
│   │   │   ├── __main__.py
│   │   │   └── utils.py
│   │   ├── input_formats
│   │   │   └── generated
│   │   │       └── test_markdown_1
│   │   │           ├── inputs
│   │   │           │   ├── input1.json
│   │   │           │   └── input2.json
│   │   │           ├── program.py
│   │   │           └── schema.json
│   │   ├── README.md
│   │   ├── reliability_conf.yaml
│   │   ├── test_generated.py
│   │   ├── test_pydantic_models.py
│   │   └── utils.py
│   ├── retrievers
│   │   └── test_embeddings.py
│   ├── signatures
│   │   ├── test_adapter_image.py
│   │   ├── test_custom_types.py
│   │   └── test_signature.py
│   ├── streaming
│   │   └── test_streaming.py
│   ├── teleprompt
│   │   ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json
│   │   ├── gepa_dummy_lm.json
│   │   ├── test_bootstrap_finetune.py
│   │   ├── test_bootstrap_trace.py
│   │   ├── test_bootstrap.py
│   │   ├── test_copro_optimizer.py
│   │   ├── test_ensemble.py
│   │   ├── test_finetune.py
│   │   ├── test_gepa_instruction_proposer.py
│   │   ├── test_gepa.py
│   │   ├── test_grpo.py
│   │   ├── test_knn_fewshot.py
│   │   ├── test_random_search.py
│   │   ├── test_teleprompt.py
│   │   └── test_utils.py
│   ├── test_utils
│   │   ├── __init__.py
│   │   └── server
│   │       ├── __init__.py
│   │       ├── litellm_server_config.yaml
│   │       └── litellm_server.py
│   └── utils
│       ├── __init__.py
│       ├── resources
│       │   └── mcp_server.py
│       ├── test_annotation.py
│       ├── test_asyncify.py
│       ├── test_exceptions.py
│       ├── test_langchain_tool.py
│       ├── test_mcp.py
│       ├── test_parallelizer.py
│       ├── test_saving.py
│       ├── test_settings.py
│       ├── test_syncify.py
│       ├── test_unbatchify.py
│       └── test_usage_tracker.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/tests/primitives/test_base_module.py:
--------------------------------------------------------------------------------

```python
import asyncio
import logging
import os
import threading
from unittest.mock import patch

import pytest
from litellm import Choices, Message, ModelResponse
from litellm.types.utils import Usage

import dspy
from dspy.primitives.prediction import Prediction
from dspy.utils.dummies import DummyLM


def test_deepcopy_basic():
    signature = dspy.Signature("q -> a")
    cot = dspy.ChainOfThought(signature)
    cot_copy = cot.deepcopy()
    assert len(cot.parameters()) == len(cot_copy.parameters())
    # Parameters should be different objects with the same values.
    assert id(cot.parameters()[0]) != id(cot_copy.parameters()[0])
    assert cot.parameters()[0].__dict__ == cot_copy.parameters()[0].__dict__


def test_deepcopy_with_uncopyable_modules():
    class CustomClass(dspy.Module):
        def __init__(self):
            self.lock = threading.Lock()  # Non-copyable object.
            self.cot = dspy.ChainOfThought(dspy.Signature("q -> a"))

    model = CustomClass()
    model_copy = model.deepcopy()
    assert len(model.parameters()) == len(model_copy.parameters())
    # The lock should be refer to the same object (shallow copy).
    assert id(model.lock) == id(model_copy.lock)
    # Parameters should be different objects with the same values.
    assert id(model.parameters()[0]) != id(model_copy.parameters()[0])
    assert model.parameters()[0].__dict__ == model_copy.parameters()[0].__dict__


def test_deepcopy_with_nested_modules():
    class CustomClass1(dspy.Module):
        def __init__(self):
            self.lock = threading.Lock()  # Non-copyable object.
            self.cot = dspy.ChainOfThought(dspy.Signature("q -> a"))

    class CustomClass2(dspy.Module):
        def __init__(self):
            self.submodel = CustomClass1()

    model = CustomClass2()
    model_copy = model.deepcopy()
    assert len(model.parameters()) == len(model_copy.parameters())
    # The lock should be refer to the same object (shallow copy).
    assert id(model.submodel.lock) == id(model_copy.submodel.lock)
    # Parameters should be different objects with the same values.
    assert id(model.parameters()[0]) != id(model_copy.parameters()[0])
    assert model.parameters()[0].__dict__ == model_copy.parameters()[0].__dict__


def test_save_and_load_with_json(tmp_path):
    model = dspy.ChainOfThought(dspy.Signature("q -> a"))
    model.predict.signature = model.predict.signature.with_instructions("You are a helpful assistant.")
    model.predict.demos = [
        dspy.Example(q="What is the capital of France?", a="Paris", reasoning="n/a").with_inputs("q"),
        # Nested example
        dspy.Example(
            q=[
                dspy.Example(q="What is the capital of France?"),
                dspy.Example(q="What is actually the capital of France?"),
            ],
            a="Paris",
            reasoning="n/a",
        ).with_inputs("q"),
    ]
    save_path = tmp_path / "model.json"
    model.save(save_path)
    new_model = dspy.ChainOfThought(dspy.Signature("q -> a"))
    new_model.load(save_path)

    assert str(new_model.predict.signature) == str(model.predict.signature)
    assert new_model.predict.demos[0] == model.predict.demos[0].toDict()
    assert new_model.predict.demos[1] == model.predict.demos[1].toDict()


@pytest.mark.extra
def test_save_and_load_with_pkl(tmp_path):
    import datetime

    # `datetime.date` is not json serializable, so we need to save with pickle.
    class MySignature(dspy.Signature):
        """Just a custom signature."""

        current_date: datetime.date = dspy.InputField()
        target_date: datetime.date = dspy.InputField()
        date_diff: int = dspy.OutputField(desc="The difference in days between the current_date and the target_date")

    trainset = [
        {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 2), "date_diff": 1},
        {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 3), "date_diff": 2},
        {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 4), "date_diff": 3},
        {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 5), "date_diff": 4},
        {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 6), "date_diff": 5},
    ]
    trainset = [dspy.Example(**example).with_inputs("current_date", "target_date") for example in trainset]

    dspy.settings.configure(
        lm=DummyLM([{"date_diff": "1", "reasoning": "n/a"}, {"date_diff": "2", "reasoning": "n/a"}] * 10)
    )

    cot = dspy.ChainOfThought(MySignature)
    cot(current_date=datetime.date(2024, 1, 1), target_date=datetime.date(2024, 1, 2))

    def dummy_metric(example, pred, trace=None):
        return True

    optimizer = dspy.BootstrapFewShot(max_bootstrapped_demos=4, max_labeled_demos=4, max_rounds=5, metric=dummy_metric)
    compiled_cot = optimizer.compile(cot, trainset=trainset)
    compiled_cot.predict.signature = compiled_cot.predict.signature.with_instructions("You are a helpful assistant.")

    save_path = tmp_path / "program.pkl"
    compiled_cot.save(save_path)

    new_cot = dspy.ChainOfThought(MySignature)
    new_cot.load(save_path)

    assert str(new_cot.predict.signature) == str(compiled_cot.predict.signature)
    assert new_cot.predict.demos == compiled_cot.predict.demos


def test_save_with_extra_modules(tmp_path):
    import sys

    # Create a temporary Python file with our custom module
    custom_module_path = tmp_path / "custom_module.py"
    with open(custom_module_path, "w") as f:
        f.write("""
import dspy

class MyModule(dspy.Module):
    def __init__(self):
        self.cot = dspy.ChainOfThought(dspy.Signature("q -> a"))

    def forward(self, q):
        return self.cot(q=q)
""")

    # Add the tmp_path to Python path so we can import the module
    sys.path.insert(0, str(tmp_path))
    try:
        import custom_module

        cot = custom_module.MyModule()

        cot.save(tmp_path, save_program=True)
        # Remove the custom module from sys.modules to simulate it not being available
        sys.modules.pop("custom_module", None)
        # Also remove it from sys.path
        sys.path.remove(str(tmp_path))
        del custom_module

        # Test the loading fails without using `modules_to_serialize`
        with pytest.raises(ModuleNotFoundError):
            dspy.load(tmp_path)

        sys.path.insert(0, str(tmp_path))
        import custom_module

        cot.save(
            tmp_path,
            modules_to_serialize=[custom_module],
            save_program=True,
        )

        # Remove the custom module from sys.modules to simulate it not being available
        sys.modules.pop("custom_module", None)
        # Also remove it from sys.path
        sys.path.remove(str(tmp_path))
        del custom_module

        loaded_module = dspy.load(tmp_path)
        assert loaded_module.cot.predict.signature == cot.cot.predict.signature

    finally:
        # Only need to clean up sys.path
        if str(tmp_path) in sys.path:
            sys.path.remove(str(tmp_path))


def test_load_with_version_mismatch(tmp_path):
    from dspy.primitives.base_module import logger

    # Mock versions during save
    save_versions = {"python": "3.9", "dspy": "2.4.0", "cloudpickle": "2.0"}

    # Mock versions during load
    load_versions = {"python": "3.10", "dspy": "2.5.0", "cloudpickle": "2.1"}

    predict = dspy.Predict("question->answer")

    # Create a custom handler to capture log messages
    class ListHandler(logging.Handler):
        def __init__(self):
            super().__init__()
            self.messages = []

        def emit(self, record):
            self.messages.append(record.getMessage())

    # Add handler and set level
    handler = ListHandler()
    original_level = logger.level
    logger.addHandler(handler)
    logger.setLevel(logging.WARNING)

    try:
        save_path = tmp_path / "program.pkl"
        # Mock version during save
        with patch("dspy.primitives.base_module.get_dependency_versions", return_value=save_versions):
            predict.save(save_path)

        # Mock version during load
        with patch("dspy.primitives.base_module.get_dependency_versions", return_value=load_versions):
            loaded_predict = dspy.Predict("question->answer")
            loaded_predict.load(save_path)

        # Assert warnings were logged, and one warning for each mismatched dependency.
        assert len(handler.messages) == 3

        for msg in handler.messages:
            assert "There is a mismatch of" in msg

        # Verify the model still loads correctly despite version mismatches
        assert isinstance(loaded_predict, dspy.Predict)
        assert str(predict.signature) == str(loaded_predict.signature)

    finally:
        # Clean up: restore original level and remove handler
        logger.setLevel(original_level)
        logger.removeHandler(handler)


@pytest.mark.llm_call
def test_single_module_call_with_usage_tracker(lm_for_test):
    dspy.settings.configure(lm=dspy.LM(lm_for_test, cache=False), track_usage=True)

    predict = dspy.ChainOfThought("question -> answer")
    output = predict(question="What is the capital of France?")

    lm_usage = output.get_lm_usage()
    assert len(lm_usage) == 1
    assert lm_usage[lm_for_test]["prompt_tokens"] > 0
    assert lm_usage[lm_for_test]["completion_tokens"] > 0
    assert lm_usage[lm_for_test]["total_tokens"] > 0

    # Test no usage being tracked when cache is enabled
    dspy.settings.configure(lm=dspy.LM(lm_for_test, cache=True), track_usage=True)
    for _ in range(2):
        output = predict(question="What is the capital of France?")

    assert len(output.get_lm_usage()) == 0


@pytest.mark.llm_call
def test_multi_module_call_with_usage_tracker(lm_for_test):
    dspy.settings.configure(lm=dspy.LM(lm_for_test, cache=False), track_usage=True)

    class MyProgram(dspy.Module):
        def __init__(self):
            self.predict1 = dspy.ChainOfThought("question -> answer")
            self.predict2 = dspy.ChainOfThought("question, answer -> score")

        def __call__(self, question: str) -> Prediction:
            answer = self.predict1(question=question)
            score = self.predict2(question=question, answer=answer)
            return score

    program = MyProgram()
    output = program(question="What is the capital of France?")

    lm_usage = output.get_lm_usage()
    assert len(lm_usage) == 1
    assert lm_usage[lm_for_test]["prompt_tokens"] > 0
    assert lm_usage[lm_for_test]["prompt_tokens"] > 0
    assert lm_usage[lm_for_test]["completion_tokens"] > 0
    assert lm_usage[lm_for_test]["total_tokens"] > 0


# TODO: prepare second model for testing this unit test in ci
@pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="Skip the test if OPENAI_API_KEY is not set.")
def test_usage_tracker_in_parallel():
    class MyProgram(dspy.Module):
        def __init__(self, lm):
            self.lm = lm
            self.predict1 = dspy.ChainOfThought("question -> answer")
            self.predict2 = dspy.ChainOfThought("question, answer -> score")

        def __call__(self, question: str) -> Prediction:
            with dspy.settings.context(lm=self.lm):
                answer = self.predict1(question=question)
                score = self.predict2(question=question, answer=answer)
                return score

    dspy.settings.configure(track_usage=True)
    program1 = MyProgram(lm=dspy.LM("openai/gpt-4o-mini", cache=False))
    program2 = MyProgram(lm=dspy.LM("openai/gpt-3.5-turbo", cache=False))

    parallelizer = dspy.Parallel()

    results = parallelizer(
        [
            (program1, {"question": "What is the meaning of life?"}),
            (program2, {"question": "why did a chicken cross the kitchen?"}),
        ]
    )

    assert results[0].get_lm_usage() is not None
    assert results[1].get_lm_usage() is not None

    assert results[0].get_lm_usage().keys() == set(["openai/gpt-4o-mini"])
    assert results[1].get_lm_usage().keys() == set(["openai/gpt-3.5-turbo"])


@pytest.mark.asyncio
async def test_usage_tracker_async_parallel():
    program = dspy.Predict("question -> answer")

    with patch("litellm.acompletion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content="{'answer': 'Paris'}"))],
            usage=Usage(
                **{
                    "prompt_tokens": 1117,
                    "completion_tokens": 46,
                    "total_tokens": 1163,
                    "prompt_tokens_details": {"cached_tokens": 0, "audio_tokens": 0},
                    "completion_tokens_details": {
                        "reasoning_tokens": 0,
                        "audio_tokens": 0,
                        "accepted_prediction_tokens": 0,
                        "rejected_prediction_tokens": 0,
                    },
                },
            ),
            model="openai/gpt-4o-mini",
        )

        coroutines = [
            program.acall(question="What is the capital of France?"),
            program.acall(question="What is the capital of France?"),
            program.acall(question="What is the capital of France?"),
            program.acall(question="What is the capital of France?"),
        ]
        with dspy.settings.context(
            lm=dspy.LM("openai/gpt-4o-mini", cache=False), track_usage=True, adapter=dspy.JSONAdapter()
        ):
            results = await asyncio.gather(*coroutines)

        assert results[0].get_lm_usage() is not None
        assert results[1].get_lm_usage() is not None

        lm_usage0 = results[0].get_lm_usage()["openai/gpt-4o-mini"]
        lm_usage1 = results[1].get_lm_usage()["openai/gpt-4o-mini"]
        assert lm_usage0["prompt_tokens"] == 1117
        assert lm_usage1["prompt_tokens"] == 1117
        assert lm_usage0["completion_tokens"] == 46
        assert lm_usage1["completion_tokens"] == 46
        assert lm_usage0["total_tokens"] == 1163
        assert lm_usage1["total_tokens"] == 1163


def test_usage_tracker_no_side_effect():
    class MyProgram(dspy.Module):
        def __init__(self):
            self.predict = dspy.Predict("question -> answer")

        def forward(self, question: str, **kwargs) -> str:
            return self.predict(question=question).answer

    program = MyProgram()
    with dspy.context(lm=DummyLM([{"answer": "Paris"}]), track_usage=True):
        result = program(question="What is the capital of France?")
    assert result == "Paris"


def test_module_history():
    class MyProgram(dspy.Module):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self.cot = dspy.ChainOfThought("question -> answer")

        def forward(self, question: str, **kwargs) -> Prediction:
            return self.cot(question=question)

    with patch("litellm.completion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[
                Choices(message=Message(content="{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}"))
            ],
            model="openai/gpt-4o-mini",
        )
        dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter())
        program = MyProgram()
        program(question="What is the capital of France?")

        # Second call only call the submodule.
        program.cot(question="What is the capital of France?")

        # The LM history entity exists in all the ancestor callers.
        assert len(program.history) == 1
        assert len(program.cot.history) == 2
        assert len(program.cot.predict.history) == 2

        # The same history entity is shared across all the ancestor callers to reduce memory usage.
        assert id(program.history[0]) == id(program.cot.history[0])

        assert program.history[0]["outputs"] == ["{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}"]

        dspy.settings.configure(disable_history=True)

        program(question="What is the capital of France?")
        # No history is recorded when history is disabled.
        assert len(program.history) == 1
        assert len(program.cot.history) == 2
        assert len(program.cot.predict.history) == 2

        dspy.settings.configure(disable_history=False)

        program(question="What is the capital of France?")
        # History is recorded again when history is enabled.
        assert len(program.history) == 2
        assert len(program.cot.history) == 3
        assert len(program.cot.predict.history) == 3


def test_module_history_with_concurrency():
    class MyProgram(dspy.Module):
        def __init__(self):
            super().__init__()
            self.cot = dspy.ChainOfThought("question -> answer")

        def forward(self, question: str, **kwargs) -> Prediction:
            return self.cot(question=question)

    with patch("litellm.completion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content="{'reasoning': 'N/A', 'answer': 'Holy crab!'}"))],
            model="openai/gpt-4o-mini",
        )
        dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter())
        program = MyProgram()

        parallelizer = dspy.Parallel()

        parallelizer(
            [
                (program, {"question": "What is the meaning of life?"}),
                (program, {"question": "why did a chicken cross the kitchen?"}),
            ]
        )
        assert len(program.history) == 2
        assert len(program.cot.history) == 2
        assert len(program.cot.predict.history) == 2


@pytest.mark.asyncio
async def test_module_history_async():
    class MyProgram(dspy.Module):
        def __init__(self, **kwargs):
            super().__init__(**kwargs)
            self.cot = dspy.ChainOfThought("question -> answer")

        async def aforward(self, question: str, **kwargs) -> Prediction:
            return await self.cot.acall(question=question)

    with patch("litellm.acompletion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[
                Choices(message=Message(content="{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}"))
            ],
            model="openai/gpt-4o-mini",
        )
        program = MyProgram()
        with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()):
            await program.acall(question="What is the capital of France?")

            # Second call only call the submodule.
            await program.cot.acall(question="What is the capital of France?")

        # The LM history entity exists in all the ancestor callers.
        assert len(program.history) == 1
        assert len(program.cot.history) == 2
        assert len(program.cot.predict.history) == 2

        # The same history entity is shared across all the ancestor callers to reduce memory usage.
        assert id(program.history[0]) == id(program.cot.history[0])

        assert program.history[0]["outputs"] == ["{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}"]

        with dspy.context(
            disable_history=True, lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()
        ):
            await program.acall(question="What is the capital of France?")

        # No history is recorded when history is disabled.
        assert len(program.history) == 1
        assert len(program.cot.history) == 2
        assert len(program.cot.predict.history) == 2

        with dspy.context(
            disable_history=False, lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()
        ):
            await program.acall(question="What is the capital of France?")
        # History is recorded again when history is enabled.
        assert len(program.history) == 2
        assert len(program.cot.history) == 3
        assert len(program.cot.predict.history) == 3


def test_forward_direct_call_warning(capsys):
    class TestModule(dspy.Module):
        def forward(self, x):
            return x

    module = TestModule()
    module.forward("test")
    captured = capsys.readouterr()
    assert "directly is discouraged" in captured.err


def test_forward_through_call_no_warning(capsys):
    class TestModule(dspy.Module):
        def forward(self, x):
            return x

    module = TestModule()
    module(x="test")
    captured = capsys.readouterr()
    assert "directly is discouraged" not in captured.err

```

--------------------------------------------------------------------------------
/dspy/adapters/base.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import TYPE_CHECKING, Any, get_origin

import json_repair
import litellm

from dspy.adapters.types import History, Type
from dspy.adapters.types.base_type import split_message_content_for_custom_types
from dspy.adapters.types.tool import Tool, ToolCalls
from dspy.experimental import Citations
from dspy.signatures.signature import Signature
from dspy.utils.callback import BaseCallback, with_callbacks

logger = logging.getLogger(__name__)

if TYPE_CHECKING:
    from dspy.clients.lm import LM

_DEFAULT_NATIVE_RESPONSE_TYPES = [Citations]


class Adapter:
    """Base Adapter class.

    The Adapter serves as the interface layer between DSPy module/signature and Language Models (LMs). It handles the
    complete transformation pipeline from DSPy inputs to LM calls and back to structured outputs.

    Key responsibilities:
        - Transform user inputs and signatures into properly formatted LM prompts, which also instructs the LM to format
            the response in a specific format.
        - Parse LM outputs into dictionaries matching the signature's output fields.
        - Enable/disable native LM features (function calling, citations, etc.) based on configuration.
        - Handle conversation history, few-shot examples, and custom type processing.

    The adapter pattern allows DSPy to work with different LM interfaces while maintaining a consistent programming
    model for users.
    """

    def __init__(
        self,
        callbacks: list[BaseCallback] | None = None,
        use_native_function_calling: bool = False,
        native_response_types: list[type[Type]] | None = None,
    ):
        """
        Args:
            callbacks: List of callback functions to execute during `format()` and `parse()` methods. Callbacks can be
                used for logging, monitoring, or custom processing. Defaults to None (empty list).
            use_native_function_calling: Whether to enable native function calling capabilities when the LM supports it.
                If True, the adapter will automatically configure function calling when input fields contain `dspy.Tool`
                or `list[dspy.Tool]` types. Defaults to False.
            native_response_types: List of output field types that should be handled by native LM features rather than
                adapter parsing. For example, `dspy.Citations` can be populated directly by citation APIs
                (e.g., Anthropic's citation feature). Defaults to `[Citations]`.
        """
        self.callbacks = callbacks or []
        self.use_native_function_calling = use_native_function_calling
        self.native_response_types = native_response_types or _DEFAULT_NATIVE_RESPONSE_TYPES

    def __init_subclass__(cls, **kwargs) -> None:
        super().__init_subclass__(**kwargs)

        # Decorate format() and parse() method with with_callbacks
        cls.format = with_callbacks(cls.format)
        cls.parse = with_callbacks(cls.parse)

    def _call_preprocess(
        self,
        lm: "LM",
        lm_kwargs: dict[str, Any],
        signature: type[Signature],
        inputs: dict[str, Any],
    ) -> type[Signature]:
        if self.use_native_function_calling:
            tool_call_input_field_name = self._get_tool_call_input_field_name(signature)
            tool_call_output_field_name = self._get_tool_call_output_field_name(signature)

            if tool_call_output_field_name and tool_call_input_field_name is None:
                raise ValueError(
                    f"You provided an output field {tool_call_output_field_name} to receive the tool calls information, "
                    "but did not provide any tools as the input. Please provide a list of tools as the input by adding an "
                    "input field with type `list[dspy.Tool]`."
                )

            if tool_call_output_field_name and litellm.supports_function_calling(model=lm.model):
                tools = inputs[tool_call_input_field_name]
                tools = tools if isinstance(tools, list) else [tools]

                litellm_tools = []
                for tool in tools:
                    litellm_tools.append(tool.format_as_litellm_function_call())

                lm_kwargs["tools"] = litellm_tools

                signature_for_native_function_calling = signature.delete(tool_call_output_field_name)
                signature_for_native_function_calling = signature_for_native_function_calling.delete(
                    tool_call_input_field_name
                )

                return signature_for_native_function_calling

        # Handle custom types that use native response
        for name, field in signature.output_fields.items():
            if (
                isinstance(field.annotation, type)
                and issubclass(field.annotation, Type)
                and field.annotation in self.native_response_types
            ):
                signature = signature.delete(name)

        return signature

    def _call_postprocess(
        self,
        processed_signature: type[Signature],
        original_signature: type[Signature],
        outputs: list[dict[str, Any]],
        lm: "LM",
    ) -> list[dict[str, Any]]:
        values = []

        tool_call_output_field_name = self._get_tool_call_output_field_name(original_signature)

        for output in outputs:
            output_logprobs = None
            tool_calls = None
            text = output

            if isinstance(output, dict):
                text = output["text"]
                output_logprobs = output.get("logprobs")
                tool_calls = output.get("tool_calls")

            if text:
                value = self.parse(processed_signature, text)
                for field_name in original_signature.output_fields.keys():
                    if field_name not in value:
                        # We need to set the field not present in the processed signature to None for consistency.
                        value[field_name] = None
            else:
                value = {}
                for field_name in original_signature.output_fields.keys():
                    value[field_name] = None

            if tool_calls and tool_call_output_field_name:
                tool_calls = [
                    {
                        "name": v["function"]["name"],
                        "args": json_repair.loads(v["function"]["arguments"]),
                    }
                    for v in tool_calls
                ]
                value[tool_call_output_field_name] = ToolCalls.from_dict_list(tool_calls)

            # Parse custom types that does not rely on the adapter parsing
            for name, field in original_signature.output_fields.items():
                if (
                    isinstance(field.annotation, type)
                    and issubclass(field.annotation, Type)
                    and field.annotation in self.native_response_types
                ):
                    value[name] = field.annotation.parse_lm_response(output)

            if output_logprobs:
                value["logprobs"] = output_logprobs

            values.append(value)

        return values

    def __call__(
        self,
        lm: "LM",
        lm_kwargs: dict[str, Any],
        signature: type[Signature],
        demos: list[dict[str, Any]],
        inputs: dict[str, Any],
    ) -> list[dict[str, Any]]:
        """
        Execute the adapter pipeline: format inputs, call LM, and parse outputs.

        Args:
            lm: The Language Model instance to use for generation. Must be an instance of `dspy.BaseLM`.
            lm_kwargs: Additional keyword arguments to pass to the LM call (e.g., temperature, max_tokens). These are
                passed directly to the LM.
            signature: The DSPy signature associated with this LM call.
            demos: List of few-shot examples to include in the prompt. Each dictionary should contain keys matching the
                signature's input and output field names. Examples are formatted as user/assistant message pairs.
            inputs: The current input values for this call. Keys must match the signature's input field names.

        Returns:
            List of dictionaries representing parsed LM responses. Each dictionary contains keys matching the
            signature's output field names. For multiple generations (n > 1), returns multiple dictionaries.
        """
        processed_signature = self._call_preprocess(lm, lm_kwargs, signature, inputs)
        inputs = self.format(processed_signature, demos, inputs)

        outputs = lm(messages=inputs, **lm_kwargs)
        return self._call_postprocess(processed_signature, signature, outputs, lm)

    async def acall(
        self,
        lm: "LM",
        lm_kwargs: dict[str, Any],
        signature: type[Signature],
        demos: list[dict[str, Any]],
        inputs: dict[str, Any],
    ) -> list[dict[str, Any]]:
        processed_signature = self._call_preprocess(lm, lm_kwargs, signature, inputs)
        inputs = self.format(processed_signature, demos, inputs)

        outputs = await lm.acall(messages=inputs, **lm_kwargs)
        return self._call_postprocess(processed_signature, signature, outputs, lm)

    def format(
        self,
        signature: type[Signature],
        demos: list[dict[str, Any]],
        inputs: dict[str, Any],
    ) -> list[dict[str, Any]]:
        """Format the input messages for the LM call.

        This method converts the DSPy structured input along with few-shot examples and conversation history into
        multiturn messages as expected by the LM. For custom adapters, this method can be overridden to customize
        the formatting of the input messages.

        In general we recommend the messages to have the following structure:
        ```
        [
            {"role": "system", "content": system_message},
            # Begin few-shot examples
            {"role": "user", "content": few_shot_example_1_input},
            {"role": "assistant", "content": few_shot_example_1_output},
            {"role": "user", "content": few_shot_example_2_input},
            {"role": "assistant", "content": few_shot_example_2_output},
            ...
            # End few-shot examples
            # Begin conversation history
            {"role": "user", "content": conversation_history_1_input},
            {"role": "assistant", "content": conversation_history_1_output},
            {"role": "user", "content": conversation_history_2_input},
            {"role": "assistant", "content": conversation_history_2_output},
            ...
            # End conversation history
            {"role": "user", "content": current_input},
        ]

        And system message should contain the field description, field structure, and task description.
        ```


        Args:
            signature: The DSPy signature for which to format the input messages.
            demos: A list of few-shot examples.
            inputs: The input arguments to the DSPy module.

        Returns:
            A list of multiturn messages as expected by the LM.
        """
        inputs_copy = dict(inputs)

        # If the signature and inputs have conversation history, we need to format the conversation history and
        # remove the history field from the signature.
        history_field_name = self._get_history_field_name(signature)
        if history_field_name:
            # In order to format the conversation history, we need to remove the history field from the signature.
            signature_without_history = signature.delete(history_field_name)
            conversation_history = self.format_conversation_history(
                signature_without_history,
                history_field_name,
                inputs_copy,
            )

        messages = []
        system_message = (
            f"{self.format_field_description(signature)}\n"
            f"{self.format_field_structure(signature)}\n"
            f"{self.format_task_description(signature)}"
        )
        messages.append({"role": "system", "content": system_message})
        messages.extend(self.format_demos(signature, demos))
        if history_field_name:
            # Conversation history and current input
            content = self.format_user_message_content(signature_without_history, inputs_copy, main_request=True)
            messages.extend(conversation_history)
            messages.append({"role": "user", "content": content})
        else:
            # Only current input
            content = self.format_user_message_content(signature, inputs_copy, main_request=True)
            messages.append({"role": "user", "content": content})

        messages = split_message_content_for_custom_types(messages)
        return messages

    def format_field_description(self, signature: type[Signature]) -> str:
        """Format the field description for the system message.

        This method formats the field description for the system message. It should return a string that contains
        the field description for the input fields and the output fields.

        Args:
            signature: The DSPy signature for which to format the field description.

        Returns:
            A string that contains the field description for the input fields and the output fields.
        """
        raise NotImplementedError

    def format_field_structure(self, signature: type[Signature]) -> str:
        """Format the field structure for the system message.

        This method formats the field structure for the system message. It should return a string that dictates the
        format the input fields should be provided to the LM, and the format the output fields will be in the response.
        Refer to the ChatAdapter and JsonAdapter for an example.

        Args:
            signature: The DSPy signature for which to format the field structure.
        """
        raise NotImplementedError

    def format_task_description(self, signature: type[Signature]) -> str:
        """Format the task description for the system message.

        This method formats the task description for the system message. In most cases this is just a thin wrapper
        over `signature.instructions`.

        Args:
            signature: The DSPy signature of the DSpy module.

        Returns:
            A string that describes the task.
        """
        raise NotImplementedError

    def format_user_message_content(
        self,
        signature: type[Signature],
        inputs: dict[str, Any],
        prefix: str = "",
        suffix: str = "",
        main_request: bool = False,
    ) -> str:
        """Format the user message content.

        This method formats the user message content, which can be used in formatting few-shot examples, conversation
        history, and the current input.

        Args:
            signature: The DSPy signature for which to format the user message content.
            inputs: The input arguments to the DSPy module.
            prefix: A prefix to the user message content.
            suffix: A suffix to the user message content.

        Returns:
            A string that contains the user message content.
        """
        raise NotImplementedError

    def format_assistant_message_content(
        self,
        signature: type[Signature],
        outputs: dict[str, Any],
        missing_field_message: str | None = None,
    ) -> str:
        """Format the assistant message content.

        This method formats the assistant message content, which can be used in formatting few-shot examples,
        conversation history.

        Args:
            signature: The DSPy signature for which to format the assistant message content.
            outputs: The output fields to be formatted.
            missing_field_message: A message to be used when a field is missing.

        Returns:
            A string that contains the assistant message content.
        """
        raise NotImplementedError

    def format_demos(self, signature: type[Signature], demos: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Format the few-shot examples.

        This method formats the few-shot examples as multiturn messages.

        Args:
            signature: The DSPy signature for which to format the few-shot examples.
            demos: A list of few-shot examples, each element is a dictionary with keys of the input and output fields of
                the signature.

        Returns:
            A list of multiturn messages.
        """
        complete_demos = []
        incomplete_demos = []

        for demo in demos:
            # Check if all fields are present and not None
            is_complete = all(k in demo and demo[k] is not None for k in signature.fields)

            # Check if demo has at least one input and one output field
            has_input = any(k in demo for k in signature.input_fields)
            has_output = any(k in demo for k in signature.output_fields)

            if is_complete:
                complete_demos.append(demo)
            elif has_input and has_output:
                # We only keep incomplete demos that have at least one input and one output field
                incomplete_demos.append(demo)

        messages = []

        incomplete_demo_prefix = "This is an example of the task, though some input or output fields are not supplied."
        for demo in incomplete_demos:
            messages.append(
                {
                    "role": "user",
                    "content": self.format_user_message_content(signature, demo, prefix=incomplete_demo_prefix),
                }
            )
            messages.append(
                {
                    "role": "assistant",
                    "content": self.format_assistant_message_content(
                        signature, demo, missing_field_message="Not supplied for this particular example. "
                    ),
                }
            )

        for demo in complete_demos:
            messages.append({"role": "user", "content": self.format_user_message_content(signature, demo)})
            messages.append(
                {
                    "role": "assistant",
                    "content": self.format_assistant_message_content(
                        signature, demo, missing_field_message="Not supplied for this conversation history message. "
                    ),
                }
            )

        return messages

    def _get_history_field_name(self, signature: type[Signature]) -> bool:
        for name, field in signature.input_fields.items():
            if field.annotation == History:
                return name
        return None

    def _get_tool_call_input_field_name(self, signature: type[Signature]) -> bool:
        for name, field in signature.input_fields.items():
            # Look for annotation `list[dspy.Tool]` or `dspy.Tool`
            origin = get_origin(field.annotation)
            if origin is list and field.annotation.__args__[0] == Tool:
                return name
            if field.annotation == Tool:
                return name
        return None

    def _get_tool_call_output_field_name(self, signature: type[Signature]) -> bool:
        for name, field in signature.output_fields.items():
            if field.annotation == ToolCalls:
                return name
        return None

    def format_conversation_history(
        self,
        signature: type[Signature],
        history_field_name: str,
        inputs: dict[str, Any],
    ) -> list[dict[str, Any]]:
        """Format the conversation history.

        This method formats the conversation history and the current input as multiturn messages.

        Args:
            signature: The DSPy signature for which to format the conversation history.
            history_field_name: The name of the history field in the signature.
            inputs: The input arguments to the DSPy module.

        Returns:
            A list of multiturn messages.
        """
        conversation_history = inputs[history_field_name].messages if history_field_name in inputs else None

        if conversation_history is None:
            return []

        messages = []
        for message in conversation_history:
            messages.append(
                {
                    "role": "user",
                    "content": self.format_user_message_content(signature, message),
                }
            )
            messages.append(
                {
                    "role": "assistant",
                    "content": self.format_assistant_message_content(signature, message),
                }
            )

        # Remove the history field from the inputs
        del inputs[history_field_name]

        return messages

    def parse(self, signature: type[Signature], completion: str) -> dict[str, Any]:
        """Parse the LM output into a dictionary of the output fields.

        This method parses the LM output into a dictionary of the output fields.

        Args:
            signature: The DSPy signature for which to parse the LM output.
            completion: The LM output to be parsed.

        Returns:
            A dictionary of the output fields.
        """
        raise NotImplementedError

```

--------------------------------------------------------------------------------
/tests/signatures/test_signature.py:
--------------------------------------------------------------------------------

```python
from types import UnionType
from typing import Any, Optional, Union

import pydantic
import pytest

import dspy
from dspy import InputField, OutputField, Signature, infer_prefix
from dspy.utils.dummies import DummyLM


def test_field_types_and_custom_attributes():
    class TestSignature(Signature):
        """Instructions"""

        input1: str = InputField()
        input2: int = InputField()
        output1: list[str] = OutputField()
        output2 = OutputField()

    assert TestSignature.instructions == "Instructions"
    assert TestSignature.input_fields["input1"].annotation == str
    assert TestSignature.input_fields["input2"].annotation == int
    assert TestSignature.output_fields["output1"].annotation == list[str]
    assert TestSignature.output_fields["output2"].annotation == str


def test_no_input_output():
    with pytest.raises(TypeError):

        class TestSignature(Signature):
            input1: str


def test_no_input_output2():
    with pytest.raises(TypeError):

        class TestSignature(Signature):
            input1: str = pydantic.Field()


def test_all_fields_have_prefix():
    class TestSignature(Signature):
        input = InputField(prefix="Modified:")
        output = OutputField()

    assert TestSignature.input_fields["input"].json_schema_extra["prefix"] == "Modified:"
    assert TestSignature.output_fields["output"].json_schema_extra["prefix"] == "Output:"


def test_signature_parsing():
    signature = Signature("input1, input2 -> output")
    assert "input1" in signature.input_fields
    assert "input2" in signature.input_fields
    assert "output" in signature.output_fields


def test_with_signature():
    signature1 = Signature("input1, input2 -> output")
    signature2 = signature1.with_instructions("This is a test")
    assert signature2.instructions == "This is a test"
    assert signature1 is not signature2, "The type should be immutable"


def test_with_updated_field():
    signature1 = Signature("input1, input2 -> output")
    signature2 = signature1.with_updated_fields("input1", prefix="Modified:")
    assert signature2.input_fields["input1"].json_schema_extra["prefix"] == "Modified:"
    assert signature1.input_fields["input1"].json_schema_extra["prefix"] == "Input 1:"
    assert signature1 is not signature2, "The type should be immutable"
    for key in signature1.fields.keys():
        if key != "input1":
            assert signature1.fields[key].json_schema_extra == signature2.fields[key].json_schema_extra
    assert signature1.instructions == signature2.instructions


def test_empty_signature():
    with pytest.raises(ValueError):
        Signature("")


def test_instructions_signature():
    with pytest.raises(ValueError):
        Signature("")


def test_signature_instructions():
    sig1 = Signature("input1 -> output1", instructions="This is a test")
    assert sig1.instructions == "This is a test"
    sig2 = Signature("input1 -> output1", "This is a test")
    assert sig2.instructions == "This is a test"


def test_signature_instructions_none():
    sig1 = Signature("a, b -> c")
    assert sig1.instructions == "Given the fields `a`, `b`, produce the fields `c`."


def test_signature_from_dict():
    signature = Signature(
        {"input1": InputField(), "input2": InputField(), "output": OutputField()})
    for k in ["input1", "input2", "output"]:
        assert k in signature.fields
        assert signature.fields[k].annotation == str


def test_signature_equality():
    sig1 = Signature("input1 -> output1")
    sig2 = Signature("input1 -> output1")
    assert sig1.equals(sig2)


def test_signature_inequality():
    sig1 = Signature("input1 -> output1")
    sig2 = Signature("input2 -> output2")
    assert not sig1.equals(sig2)


def test_equality_format():
    class TestSignature(Signature):
        input = InputField(format=lambda x: x)
        output = OutputField()

    assert TestSignature.equals(TestSignature)


def test_signature_reverse():
    sig = Signature("input1 -> output1")
    assert sig.signature == "input1 -> output1"


def test_insert_field_at_various_positions():
    class InitialSignature(Signature):
        input1: str = InputField()
        output1: int = OutputField()

    s1 = InitialSignature.prepend("new_input_start", InputField(), str)
    s2 = InitialSignature.append("new_input_end", InputField(), str)
    assert "new_input_start" == list(s1.input_fields.keys())[0]  # noqa: RUF015
    assert "new_input_end" == list(s2.input_fields.keys())[-1]

    s3 = InitialSignature.prepend("new_output_start", OutputField(), str)
    s4 = InitialSignature.append("new_output_end", OutputField(), str)
    assert "new_output_start" == list(s3.output_fields.keys())[0]  # noqa: RUF015
    assert "new_output_end" == list(s4.output_fields.keys())[-1]


def test_order_preserved_with_mixed_annotations():
    class ExampleSignature(dspy.Signature):
        text: str = dspy.InputField()
        output = dspy.OutputField()
        pass_evaluation: bool = dspy.OutputField()

    expected_order = ["text", "output", "pass_evaluation"]
    actual_order = list(ExampleSignature.fields.keys())
    assert actual_order == expected_order


def test_infer_prefix():
    assert infer_prefix(
        "someAttributeName42IsCool") == "Some Attribute Name 42 Is Cool"
    assert infer_prefix("version2Update") == "Version 2 Update"
    assert infer_prefix("modelT45Enhanced") == "Model T 45 Enhanced"
    assert infer_prefix("someAttributeName") == "Some Attribute Name"
    assert infer_prefix("some_attribute_name") == "Some Attribute Name"
    assert infer_prefix("URLAddress") == "URL Address"
    assert infer_prefix("isHTTPSecure") == "Is HTTP Secure"
    assert infer_prefix("isHTTPSSecure123") == "Is HTTPS Secure 123"


def test_insantiating():
    sig = Signature("input -> output")
    assert issubclass(sig, Signature)
    assert sig.__name__ == "StringSignature"
    value = sig(input="test", output="test")
    assert isinstance(value, sig)


def test_insantiating2():
    class SubSignature(Signature):
        input = InputField()
        output = OutputField()

    assert issubclass(SubSignature, Signature)
    assert SubSignature.__name__ == "SubSignature"
    value = SubSignature(input="test", output="test")
    assert isinstance(value, SubSignature)


def test_multiline_instructions():
    lm = DummyLM([{"output": "short answer"}])
    dspy.settings.configure(lm=lm)

    class MySignature(Signature):
        """First line
        Second line
            Third line"""

        output = OutputField()

    predictor = dspy.Predict(MySignature)
    assert predictor().output == "short answer"


def test_dump_and_load_state():
    class CustomSignature(dspy.Signature):
        """I am just an instruction."""

        sentence = dspy.InputField(desc="I am an innocent input!")
        sentiment = dspy.OutputField()

    state = CustomSignature.dump_state()
    expected = {
        "instructions": "I am just an instruction.",
        "fields": [
            {
                "prefix": "Sentence:",
                "description": "I am an innocent input!",
            },
            {
                "prefix": "Sentiment:",
                "description": "${sentiment}",
            },
        ],
    }
    assert state == expected

    class CustomSignature2(dspy.Signature):
        """I am a malicious instruction."""

        sentence = dspy.InputField(desc="I am an malicious input!")
        sentiment = dspy.OutputField()

    assert CustomSignature2.dump_state() != expected
    # Overwrite the state with the state of CustomSignature.
    loaded_signature = CustomSignature2.load_state(state)
    assert loaded_signature.instructions == "I am just an instruction."
    # After `load_state`, the state should be the same as CustomSignature.
    assert loaded_signature.dump_state() == expected
    # CustomSignature2 should not have been modified.
    assert CustomSignature2.instructions == "I am a malicious instruction."
    assert CustomSignature2.fields["sentence"].json_schema_extra["desc"] == "I am an malicious input!"
    assert CustomSignature2.fields["sentiment"].json_schema_extra["prefix"] == "Sentiment:"


def test_typed_signatures_basic_types():
    sig = Signature("input1: int, input2: str -> output: float")
    assert "input1" in sig.input_fields
    assert sig.input_fields["input1"].annotation == int
    assert "input2" in sig.input_fields
    assert sig.input_fields["input2"].annotation == str
    assert "output" in sig.output_fields
    assert sig.output_fields["output"].annotation == float


def test_typed_signatures_generics():
    sig = Signature(
        "input_list: list[int], input_dict: dict[str, float] -> output_tuple: tuple[str, int]")
    assert "input_list" in sig.input_fields
    assert sig.input_fields["input_list"].annotation == list[int]
    assert "input_dict" in sig.input_fields
    assert sig.input_fields["input_dict"].annotation == dict[str, float]
    assert "output_tuple" in sig.output_fields
    assert sig.output_fields["output_tuple"].annotation == tuple[str, int]


def test_typed_signatures_unions_and_optionals():
    sig = Signature(
        "input_opt: Optional[str], input_union: Union[int, None] -> output_union: Union[int, str]")
    assert "input_opt" in sig.input_fields
    # Optional[str] is actually Union[str, None]
    # Depending on the environment, it might resolve to Union[str, None] or Optional[str], either is correct.
    # We'll just check for a Union containing str and NoneType:
    input_opt_annotation = sig.input_fields["input_opt"].annotation
    assert input_opt_annotation == Optional[str] or (
        getattr(input_opt_annotation, "__origin__", None) is Union
        and str in input_opt_annotation.__args__
        and type(None) in input_opt_annotation.__args__
    )

    assert "input_union" in sig.input_fields
    input_union_annotation = sig.input_fields["input_union"].annotation
    assert (
        getattr(input_union_annotation, "__origin__", None) is Union
        and int in input_union_annotation.__args__
        and type(None) in input_union_annotation.__args__
    )

    assert "output_union" in sig.output_fields
    output_union_annotation = sig.output_fields["output_union"].annotation
    assert (
        getattr(output_union_annotation, "__origin__", None) is Union
        and int in output_union_annotation.__args__
        and str in output_union_annotation.__args__
    )


def test_typed_signatures_any():
    sig = Signature("input_any: Any -> output_any: Any")
    assert "input_any" in sig.input_fields
    assert sig.input_fields["input_any"].annotation == Any
    assert "output_any" in sig.output_fields
    assert sig.output_fields["output_any"].annotation == Any


def test_typed_signatures_nested():
    sig = Signature(
        "input_nested: list[Union[str, int]] -> output_nested: Tuple[int, Optional[float], list[str]]")
    input_nested_ann = sig.input_fields["input_nested"].annotation
    assert getattr(input_nested_ann, "__origin__", None) is list
    assert len(input_nested_ann.__args__) == 1
    union_arg = input_nested_ann.__args__[0]
    assert getattr(union_arg, "__origin__", None) is Union
    assert str in union_arg.__args__ and int in union_arg.__args__

    output_nested_ann = sig.output_fields["output_nested"].annotation
    assert getattr(output_nested_ann, "__origin__", None) is tuple
    assert output_nested_ann.__args__[0] == int
    # The second arg is Optional[float], which is Union[float, None]
    second_arg = output_nested_ann.__args__[1]
    assert getattr(second_arg, "__origin__", None) is Union
    assert float in second_arg.__args__ and type(None) in second_arg.__args__
    # The third arg is list[str]
    third_arg = output_nested_ann.__args__[2]
    assert getattr(third_arg, "__origin__", None) is list
    assert third_arg.__args__[0] == str


def test_typed_signatures_from_dict():
    fields = {
        "input_str_list": (list[str], InputField()),
        "input_dict_int": (dict[str, int], InputField()),
        "output_tup": (tuple[int, float], OutputField()),
    }
    sig = Signature(fields)
    assert "input_str_list" in sig.input_fields
    assert sig.input_fields["input_str_list"].annotation == list[str]
    assert "input_dict_int" in sig.input_fields
    assert sig.input_fields["input_dict_int"].annotation == dict[str, int]
    assert "output_tup" in sig.output_fields
    assert sig.output_fields["output_tup"].annotation == tuple[int, float]


def test_typed_signatures_complex_combinations():
    sig = Signature(
        "input_complex: dict[str, list[Optional[Tuple[int, str]]]] -> output_complex: Union[list[str], dict[str, Any]]"
    )
    input_complex_ann = sig.input_fields["input_complex"].annotation
    assert getattr(input_complex_ann, "__origin__", None) is dict
    key_arg, value_arg = input_complex_ann.__args__
    assert key_arg == str
    # value_arg: list[Optional[Tuple[int, str]]]
    assert getattr(value_arg, "__origin__", None) is list
    inner_union = value_arg.__args__[0]
    # inner_union should be Optional[Tuple[int, str]]
    # which is Union[Tuple[int, str], None]
    assert getattr(inner_union, "__origin__", None) is Union
    tuple_type = [t for t in inner_union.__args__ if t != type(None)][0]  # noqa: RUF015
    assert getattr(tuple_type, "__origin__", None) is tuple
    assert tuple_type.__args__ == (int, str)

    output_complex_ann = sig.output_fields["output_complex"].annotation
    assert getattr(output_complex_ann, "__origin__", None) is Union
    assert len(output_complex_ann.__args__) == 2
    possible_args = set(output_complex_ann.__args__)
    # Expecting list[str] and dict[str, Any]
    # Because sets don't preserve order, just check membership.
    # Find the list[str] arg
    list_arg = next(a for a in possible_args if getattr(
        a, "__origin__", None) is list)
    dict_arg = next(a for a in possible_args if getattr(
        a, "__origin__", None) is dict)
    assert list_arg.__args__ == (str,)
    k, v = dict_arg.__args__
    assert k == str and v == Any


def test_make_signature_from_string():
    sig = Signature(
        "input1: int, input2: dict[str, int] -> output1: list[str], output2: Union[int, str]")
    assert "input1" in sig.input_fields
    assert sig.input_fields["input1"].annotation == int
    assert "input2" in sig.input_fields
    assert sig.input_fields["input2"].annotation == dict[str, int]
    assert "output1" in sig.output_fields
    assert sig.output_fields["output1"].annotation == list[str]
    assert "output2" in sig.output_fields
    assert sig.output_fields["output2"].annotation == Union[int, str]


def test_signature_field_with_constraints():
    class MySignature(Signature):
        inputs: str = InputField()
        outputs1: str = OutputField(min_length=5, max_length=10)
        outputs2: int = OutputField(ge=5, le=10)

    assert "outputs1" in MySignature.output_fields
    output1_constraints = MySignature.output_fields["outputs1"].json_schema_extra["constraints"]

    assert "minimum length: 5" in output1_constraints
    assert "maximum length: 10" in output1_constraints

    assert "outputs2" in MySignature.output_fields
    output2_constraints = MySignature.output_fields["outputs2"].json_schema_extra["constraints"]
    assert "greater than or equal to: 5" in output2_constraints
    assert "less than or equal to: 10" in output2_constraints


def test_basic_custom_type():
    class CustomType(pydantic.BaseModel):
        value: str

    test_signature = dspy.Signature(
        "input: CustomType -> output: str",
        custom_types={"CustomType": CustomType}
    )

    assert test_signature.input_fields["input"].annotation == CustomType

    lm = DummyLM([{"output": "processed"}])
    dspy.settings.configure(lm=lm)

    custom_obj = CustomType(value="test")
    pred = dspy.Predict(test_signature)(input=custom_obj)
    assert pred.output == "processed"


def test_custom_type_from_different_module():
    from pathlib import Path

    test_signature = dspy.Signature("input: Path -> output: str")
    assert test_signature.input_fields["input"].annotation == Path

    lm = DummyLM([{"output": "/test/path"}])
    dspy.settings.configure(lm=lm)

    path_obj = Path("/test/path")
    pred = dspy.Predict(test_signature)(input=path_obj)
    assert pred.output == "/test/path"

def test_pep604_union_type_inline():
    sig = Signature(
        "input1: str | None, input2: None | int -> output_union: int | str"
    )

    # input1 and input2 test that both 'T | None' and 'None | T' are interpreted as Optional types,
    # regardless of the order of None in the union expression.

    assert "input1" in sig.input_fields
    input1_annotation = sig.input_fields["input1"].annotation
    assert input1_annotation == Optional[str] or (
        getattr(input1_annotation, "__origin__", None) is Union
        and str in input1_annotation.__args__
        and type(None) in input1_annotation.__args__
    )

    assert "input2" in sig.input_fields
    input2_annotation = sig.input_fields["input2"].annotation
    assert input2_annotation == Optional[int] or (
        getattr(input2_annotation, "__origin__", None) is Union
        and int in input2_annotation.__args__
        and type(None) in input2_annotation.__args__
    )

    assert "output_union" in sig.output_fields
    output_union_annotation = sig.output_fields["output_union"].annotation
    assert (
        getattr(output_union_annotation, "__origin__", None) is Union
        and int in output_union_annotation.__args__
        and str in output_union_annotation.__args__
    )


def test_pep604_union_type_inline_equivalence():
    sig1 = Signature("input: str | None -> output: int | str")
    sig2 = Signature("input: Optional[str] -> output: Union[int, str]")

    # PEP 604 union types in inline signatures should be equivalent to Optional and Union types
    assert sig1.equals(sig2)

    # Check that the annotations are equivalent
    assert sig1.input_fields["input"].annotation == sig2.input_fields["input"].annotation
    assert sig1.output_fields["output"].annotation == sig2.output_fields["output"].annotation


def test_pep604_union_type_inline_nested():
    sig = Signature(
        "input: str | (int | float) | None -> output: str"
    )
    assert "input" in sig.input_fields
    input_annotation = sig.input_fields["input"].annotation

    # Check for the correct union: Union[str, int, float, NoneType]
    assert getattr(input_annotation, "__origin__", None) is Union
    assert set(input_annotation.__args__) == {str, int, float, type(None)}


def test_pep604_union_type_class_nested():
    class Sig1(Signature):
        input: str | (int | float) | None = InputField()
        output: str = OutputField()

    assert "input" in Sig1.input_fields
    input_annotation = Sig1.input_fields["input"].annotation

    # Check for the correct union: UnionType[str, int, float, NoneType]
    assert isinstance(input_annotation, UnionType)
    assert set(input_annotation.__args__) == {str, int, float, type(None)}


def test_pep604_union_type_class_equivalence():
    class Sig1(Signature):
        input: str | None = InputField()
        output: int | str = OutputField()

    class Sig2(Signature):
        input: str | None = InputField()
        output: Union[int, str] = OutputField()  # noqa: UP007

    # PEP 604 union types in class signatures should be equivalent to Optional and Union types
    assert Sig1.equals(Sig2)

    # Check that the annotations are equivalent
    assert Sig1.input_fields["input"].annotation == Sig2.input_fields["input"].annotation
    assert Sig1.output_fields["output"].annotation == Sig2.output_fields["output"].annotation

    # Check that the pep604 annotations are of type UnionType
    assert isinstance(Sig1.input_fields["input"].annotation, UnionType)
    assert isinstance(Sig1.output_fields["output"].annotation, UnionType)


def test_pep604_union_type_insert():
    class PEP604Signature(Signature):
        input: str | None = InputField()
        output: int | str = OutputField()

    # This test ensures that inserting a field into a signature with a PEP 604 UnionType works

    # Insert a new input field at the start
    NewSig = PEP604Signature.prepend("new_input", InputField(), float | int)
    assert "new_input" in NewSig.input_fields

    new_input_annotation = NewSig.input_fields["new_input"].annotation
    assert isinstance(new_input_annotation, UnionType)
    assert set(new_input_annotation.__args__) == {float, int}

    # The original union type field should still be present and correct
    input_annotation = NewSig.input_fields["input"].annotation
    output_annotation = NewSig.output_fields["output"].annotation

    assert isinstance(input_annotation, UnionType)
    assert str in input_annotation.__args__ and type(None) in input_annotation.__args__

    assert isinstance(output_annotation, UnionType)
    assert set(output_annotation.__args__) == {int, str}


def test_pep604_union_type_with_custom_types():
    class CustomType(pydantic.BaseModel):
        value: str

    sig = Signature(
        "input: CustomType | None -> output: int | str",
        custom_types={"CustomType": CustomType}
    )

    assert sig.input_fields["input"].annotation == Union[CustomType, None]
    assert sig.output_fields["output"].annotation == Union[int, str]

    lm = DummyLM([{"output": "processed"}])
    dspy.settings.configure(lm=lm)

    custom_obj = CustomType(value="test")
    pred = dspy.Predict(sig)(input=custom_obj)
    assert pred.output == "processed"

```

--------------------------------------------------------------------------------
/dspy/clients/lm_local_arbor.py:
--------------------------------------------------------------------------------

```python
import json
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any
from urllib.parse import urljoin

import openai
import requests

import dspy
from dspy.clients.provider import Provider, ReinforceJob, TrainingJob
from dspy.clients.utils_finetune import GRPOGroup, GRPOStatus, TrainDataFormat, TrainingStatus, save_data

if TYPE_CHECKING:
    from dspy.clients.lm import LM


class ArborTrainingJob(TrainingJob):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.provider_file_id = None
        self.provider_job_id = None

    def cancel(self):
        if ArborProvider.does_job_exist(self.provider_job_id):
            status = self.status()
            if ArborProvider.is_terminal_training_status(status):
                err_msg = "Jobs that are complete cannot be canceled."
                err_msg += f" Job with ID {self.provider_job_id} is done."
                raise Exception(err_msg)
            openai.fine_tuning.jobs.cancel(self.provider_job_id)
            self.provider_job_id = None

        if self.provider_file_id is not None:
            if ArborProvider.does_file_exist(self.provider_file_id):
                openai.files.delete(self.provider_file_id)
            self.provider_file_id = None

        super().cancel()

    def status(self) -> TrainingStatus:
        status = ArborProvider.get_training_status(self.provider_job_id)
        return status


class ArborReinforceJob(ReinforceJob):
    DEFAULT_TRAIN_KWARGS = {  # noqa: RUF012
        "temperature": 1.0,
        "beta": 0.04,
        "num_iterations": 1,
        "per_device_train_batch_size": 8,
        "learning_rate": 1e-6,
        "gradient_accumulation_steps": 1,
        # This is false by default in TRL, but I think it makes sense to be true for us
        "gradient_checkpointing": True,
        "lr_scheduler_type": "constant_with_warmup",
        "warmup_steps": 10,
        "max_prompt_length": None,
        "max_completion_length": None,
        "gradient_checkpointing_kwargs": None,
        "bf16": False,
        "scale_rewards": True,
        "max_grad_norm": 1.0,
        "report_to": "none",
        "log_completions": True,
        "logging_steps": 10,
        # By default, none is the model's max context length
        "max_context_length": None,
        "lora": False,
    }

    def __init__(self, lm: "LM", train_kwargs: dict[str, Any]):
        # The teleprompter must ensure that this is set
        if "num_generations" not in train_kwargs:
            raise ValueError("num_generations must be set in the training kwargs")

        self.lm = lm
        self.train_kwargs = train_kwargs
        self.provider_job_id = None
        self.checkpoints = {}
        self.last_checkpoint = None

    def initialize(self):
        # TODO(GRPO Team): Set provider job ID
        num_generations = self.train_kwargs.get("num_generations")
        temperature = self.train_kwargs.get("temperature", self.DEFAULT_TRAIN_KWARGS["temperature"])
        beta = self.train_kwargs.get("beta", self.DEFAULT_TRAIN_KWARGS["beta"])
        num_iterations = self.train_kwargs.get("num_iterations", self.DEFAULT_TRAIN_KWARGS["num_iterations"])
        per_device_train_batch_size = self.train_kwargs.get(
            "per_device_train_batch_size", self.DEFAULT_TRAIN_KWARGS["per_device_train_batch_size"]
        )
        learning_rate = self.train_kwargs.get("learning_rate", self.DEFAULT_TRAIN_KWARGS["learning_rate"])
        gradient_accumulation_steps = self.train_kwargs.get(
            "gradient_accumulation_steps", self.DEFAULT_TRAIN_KWARGS["gradient_accumulation_steps"]
        )
        gradient_checkpointing = self.train_kwargs.get(
            "gradient_checkpointing", self.DEFAULT_TRAIN_KWARGS["gradient_checkpointing"]
        )
        lr_scheduler_type = self.train_kwargs.get("lr_scheduler_type", self.DEFAULT_TRAIN_KWARGS["lr_scheduler_type"])
        warmup_steps = self.train_kwargs.get("warmup_steps", self.DEFAULT_TRAIN_KWARGS["warmup_steps"])
        max_prompt_length = self.train_kwargs.get("max_prompt_length", self.DEFAULT_TRAIN_KWARGS["max_prompt_length"])
        max_completion_length = self.train_kwargs.get(
            "max_completion_length", self.DEFAULT_TRAIN_KWARGS["max_completion_length"]
        )
        bf16 = self.train_kwargs.get("bf16", self.DEFAULT_TRAIN_KWARGS["bf16"])
        scale_rewards = self.train_kwargs.get("scale_rewards", self.DEFAULT_TRAIN_KWARGS["scale_rewards"])
        gradient_checkpointing_kwargs = self.train_kwargs.get(
            "gradient_checkpointing_kwargs", self.DEFAULT_TRAIN_KWARGS["gradient_checkpointing_kwargs"]
        )
        max_grad_norm = self.train_kwargs.get("max_grad_norm", self.DEFAULT_TRAIN_KWARGS["max_grad_norm"])
        report_to = self.train_kwargs.get("report_to", self.DEFAULT_TRAIN_KWARGS["report_to"])
        log_completions = self.train_kwargs.get("log_completions", self.DEFAULT_TRAIN_KWARGS["log_completions"])
        logging_steps = self.train_kwargs.get("logging_steps", self.DEFAULT_TRAIN_KWARGS["logging_steps"])
        max_context_length = self.train_kwargs.get(
            "max_context_length", self.DEFAULT_TRAIN_KWARGS["max_context_length"]
        )
        max_steps = self.train_kwargs.get("max_steps",500)
        # lora = self.train_kwargs.get("lora", self.DEFAULT_TRAIN_KWARGS["lora"])
        api_base = self.lm.kwargs["api_base"]

        finetune_model = ArborProvider._remove_provider_prefix(self.lm.model)
        data = {
            "model": finetune_model,
            "trainer_config": {
                "num_generations": num_generations,
                "temperature": temperature,
                "beta": beta,
                "per_device_train_batch_size": per_device_train_batch_size,
                "learning_rate": learning_rate,
                "gradient_accumulation_steps": gradient_accumulation_steps,
                "gradient_checkpointing": gradient_checkpointing,
                "lr_scheduler_type": lr_scheduler_type,
                "warmup_steps": warmup_steps,
                "max_prompt_length": max_prompt_length,
                "max_completion_length": max_completion_length,
                "bf16": bf16,
                "scale_rewards": scale_rewards,
                "gradient_checkpointing_kwargs": gradient_checkpointing_kwargs,
                "max_grad_norm": max_grad_norm,
                "report_to": report_to,
                "log_completions": log_completions,
                "logging_steps": logging_steps,
                 # "max_context_length": max_context_length,
                # "max_seq_len": max_context_length,
                "max_steps": max_steps,
                # "lora": lora,
            },
            "inference_config": {
                "model": finetune_model,
                "max_context_length": max_context_length,
            },
            "gpu_config": {
                "type": "multi",
                "multi": {
                    "num_inference_gpus": 1,
                    "num_training_gpus": 1,
                },
            },
        }
        url = f"{api_base}fine_tuning/grpo/initialize"
        headers = {"Content-Type": "application/json"}
        response = requests.post(url=url, headers=headers, json=data)
        print(json.dumps(response.json(), indent=2))
        response.raise_for_status()
        response = response.json()
        self.lm.model = ArborProvider._add_provider_prefix(response["current_model"])
        self.provider_job_id = response.get("job_id")

    def _run_grpo_step_one_group(
        self, train_group: GRPOGroup, train_data_format: TrainDataFormat | str | None = None
    ):
        # TODO: Check that the data follows the intended format
        api_base = self.lm.kwargs["api_base"]
        # api_key = self.lm.kwargs["api_key"]

        finetune_model = ArborProvider._remove_provider_prefix(self.lm.model)
        data = {"job_id": self.provider_job_id, "model": finetune_model, "batch": train_group["group"], "batch_id": train_group["batch_id"]}
        url = urljoin(api_base, "fine_tuning/grpo/step")
        headers = {"Content-Type": "application/json"}
        response = requests.post(url, headers=headers, json=data)
        assert response.status_code == 200, f"Failed to run a GRPO step: {response.text}"
        response = response.json()
        assert "current_model" in response, f"Response does not contain the next model ID to be used: {response}"
        current_model = response["current_model"]
        self.lm.model = ArborProvider._add_provider_prefix(current_model)

    def step(self, train_data: list[GRPOGroup], train_data_format: TrainDataFormat | str | None):
        # Note: TrainDataFormat specifies the format for the inner most dict.
        # Because we run GRPO at the group level, train_data will be a list of
        # groups, where each group is a list of GRPOChatData. Our teleprompters
        # ensure that we pass the right data format.
        # We can consider making this distinction clearer, e.g., by having two
        # different step methods or changing our smallets data format to be the
        # GRPO group.
        # TODO: Support step on the server side
        assert (
            train_data_format == TrainDataFormat.GRPO_CHAT
        ), f"GRPO only supports the GRPO_CHAT data format. Got {train_data_format} instead."
        for group in train_data:
            self._run_grpo_step_one_group(group, train_data_format)

    def get_status(self) -> GRPOStatus:
        api_base = self.lm.kwargs["api_base"]
        url = f"{api_base}fine_tuning/grpo/status"
        headers = {"Content-Type": "application/json"}
        body = {"job_id": self.provider_job_id}
        response = requests.post(url, headers=headers, json=body)
        assert response.status_code == 200, f"Failed to get GRPO status: {response.text}"
        return GRPOStatus(**response.json())

    def save_checkpoint(self, checkpoint_name: str, score: float | None = None):
        api_base = self.lm.kwargs["api_base"]
        url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/checkpoint")
        headers = {"Content-Type": "application/json"}
        body = {"job_id": self.provider_job_id, "checkpoint_name": checkpoint_name}
        response = requests.post(url, headers=headers, json=body)
        assert response.status_code == 200, f"Failed to save checkpoint: {response.text}"
        response = response.json()

        last_checkpoint = response["last_checkpoint"]
        checkpoints = response["checkpoints"]
        checkpoint_model_path = checkpoints[last_checkpoint]
        self.checkpoints[last_checkpoint] = {
            "model_path": checkpoint_model_path,
            "score": score,
        }
        self.last_checkpoint = last_checkpoint

    def terminate(self):
        api_base = self.lm.kwargs["api_base"]

        url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/terminate")
        headers = {"Content-Type": "application/json"}
        body = {"job_id": self.provider_job_id}
        response = requests.post(url, headers=headers, json=body)
        assert response.status_code == 200, f"Failed to terminate GRPO: {response.text}"

        response = response.json()
        current_model = response["current_model"]
        self.lm.model = ArborProvider._add_provider_prefix(current_model)

    def cancel(self):
        if ArborProvider.does_job_exist(self.provider_job_id):
            status = self.status()
            if ArborProvider.is_terminal_training_status(status):
                err_msg = "Jobs that are complete cannot be canceled."
                err_msg += f" Job with ID {self.provider_job_id} is done."
                raise Exception(err_msg)
            openai.fine_tuning.jobs.cancel(self.provider_job_id)
            self.provider_job_id = None

    def status(self) -> TrainingStatus:
        status = ArborProvider.get_training_status(self.provider_job_id)
        return status


class ArborProvider(Provider):
    def __init__(self):
        super().__init__()
        self.finetunable = True
        self.reinforceable = True
        self.TrainingJob = ArborTrainingJob
        self.ReinforceJob = ArborReinforceJob

    @staticmethod
    def launch(lm: "LM", launch_kwargs: dict[str, Any] | None = None):
        model = ArborProvider._remove_provider_prefix(lm.model)

        api_base = lm.kwargs["api_base"]

        launch_kwargs = launch_kwargs or lm.launch_kwargs

        # Make request to launch endpoint
        response = requests.post(urljoin(api_base, "chat/launch"), json={"model": model, "launch_kwargs": launch_kwargs})

        if response.status_code != 200:
            raise Exception(f"Failed to launch model. Status code: {response.status_code}, Response: {response.text}")

        print(f"Inference server for model {model} launched successfully")

    @staticmethod
    def kill(lm: "LM", launch_kwargs: dict[str, Any] | None = None):
        api_base = lm.kwargs["api_base"]

        response = requests.post(
            urljoin(api_base, "chat/kill"),
        )

        if response.status_code != 200:
            raise Exception(f"Failed to kill model. Status code: {response.status_code}, Response: {response.text}")

        print("Inference killed successfully")

    @staticmethod
    def _remove_provider_prefix(model: str) -> str:
        if model.startswith("openai/"):
            model = model[7:]
        if model.startswith("arbor:"):
            model = model[6:]
        return model

    @staticmethod
    def _add_provider_prefix(model: str) -> str:
        if not model.startswith("openai/arbor:"):
            model = "openai/arbor:" + model
        return model

    @staticmethod
    def _get_arbor_base_api():
        # TODO: We will delete this method once we start passing the LM object
        # to finetune.
        import dspy.settings as settings

        if not hasattr(settings, "arbor_api_base"):
            raise ValueError(
                "Arbor API base not set. Please set the `dspy.settings.arbor_api_base` to the URL for the Arbor server (e.g. 'http://localhost:8000/v1/')."
            )
        return dspy.settings.arbor_api_base

    @staticmethod
    def finetune(
        job: ArborTrainingJob,
        model: str,
        train_data: list[dict[str, Any]],
        train_data_format: TrainDataFormat | None,
        train_kwargs: dict[str, Any] | None = None,
    ) -> str:
        # TODO: We want to re-factor finetune so that it takes in an LM.
        # Until then, we use the following to get the api information. The
        # following is a dummy call to ensure that dspy.settings.arbor_base_api
        # is set.
        ArborProvider._get_arbor_base_api()

        model = ArborProvider._remove_provider_prefix(model)

        print("[Arbor Provider] Validating the data format")
        ArborProvider.validate_data_format(train_data_format)

        print("[Arbor Provider] Saving the data to a file")
        data_path = save_data(train_data)
        print(f"[Arbor Provider] Data saved to {data_path}")

        print("[Arbor Provider] Uploading the data to the provider")
        provider_file_id = ArborProvider.upload_data(data_path)
        job.provider_file_id = provider_file_id

        print("[Arbor Provider] Starting remote training")
        provider_job_id = ArborProvider._start_remote_training(
            train_file_id=job.provider_file_id,
            model=model,
            train_kwargs=train_kwargs,
        )
        job.provider_job_id = provider_job_id
        print(f"[Arbor Provider] Job started with the Arbor Job ID {provider_job_id}")

        print("[Arbor Provider] Waiting for training to complete")
        ArborProvider.wait_for_job(job, train_kwargs)

        print("[Arbor Provider] Attempting to retrieve the trained model")
        model = ArborProvider.get_trained_model(job)
        print(f"[Arbor Provider] Model retrieved: {model}")

        return ArborProvider._add_provider_prefix(model)

    @staticmethod
    def does_job_exist(job_id: str, training_kwargs: dict[str, Any]) -> bool:
        try:
            original_base_url = openai.base_url
            openai.base_url = ArborProvider._get_arbor_base_api()
            openai.fine_tuning.jobs.retrieve(job_id)
            openai.base_url = original_base_url
            return True
        except Exception:
            return False

    @staticmethod
    def does_file_exist(file_id: str, training_kwargs: dict[str, Any]) -> bool:
        try:
            original_base_url = openai.base_url
            openai.base_url = ArborProvider._get_arbor_base_api()
            openai.files.retrieve(file_id)
            openai.base_url = original_base_url
            return True
        except Exception:
            return False

    @staticmethod
    def is_terminal_training_status(status: TrainingStatus) -> bool:
        return status in [
            TrainingStatus.succeeded,
            TrainingStatus.failed,
            TrainingStatus.cancelled,
        ]

    @staticmethod
    def get_training_status(job_id: str, training_kwargs: dict[str, Any]) -> TrainingStatus:
        provider_status_to_training_status = {
            "validating_files": TrainingStatus.pending,
            "queued": TrainingStatus.pending,
            "running": TrainingStatus.running,
            "succeeded": TrainingStatus.succeeded,
            "failed": TrainingStatus.failed,
            "cancelled": TrainingStatus.cancelled,
            "pending": TrainingStatus.pending,
            "pending_pause": TrainingStatus.pending,
            "pending_resume": TrainingStatus.pending,
            "paused": TrainingStatus.pending,
            "pending_cancel": TrainingStatus.pending,
        }

        if job_id is None:
            print("There is no active job.")
            return TrainingStatus.not_started

        err_msg = f"Job with ID {job_id} does not exist."
        assert ArborProvider.does_job_exist(job_id, training_kwargs), err_msg

        original_base_url = openai.base_url
        openai.base_url = ArborProvider._get_arbor_base_api()
        provider_job = openai.fine_tuning.jobs.retrieve(job_id)
        openai.base_url = original_base_url

        provider_status = provider_job.status
        status = provider_status_to_training_status[provider_status]

        return status

    @staticmethod
    def validate_data_format(data_format: TrainDataFormat):
        supported_data_formats = [
            TrainDataFormat.CHAT,
            TrainDataFormat.COMPLETION,
            TrainDataFormat.GRPO_CHAT,
        ]

        if data_format not in supported_data_formats:
            err_msg = f"Arbor does not support the data format {data_format}."
            raise ValueError(err_msg)

    @staticmethod
    def upload_data(data_path: str, training_kwargs: dict[str, Any]) -> str:
        original_base_url = openai.base_url
        openai.base_url = ArborProvider._get_arbor_base_api()
        provider_file = openai.files.create(
            file=open(data_path, "rb"),
            purpose="fine-tune",
        )
        openai.base_url = original_base_url

        return provider_file.id

    @staticmethod
    def _start_remote_training(train_file_id: str, model: str, train_kwargs: dict[str, Any]) -> str:
        train_kwargs = train_kwargs or {}
        original_base_url = openai.base_url
        openai.base_url = ArborProvider._get_arbor_base_api()
        provider_job = openai.fine_tuning.jobs.create(
            model=model,
            training_file=train_file_id,
            hyperparameters=train_kwargs,
        )
        openai.base_url = original_base_url
        return provider_job.id

    @staticmethod
    def wait_for_job(
        job: TrainingJob,
        training_kwargs: dict[str, Any],
        poll_frequency: int = 20,
    ):
        done = False
        cur_event_id = None
        reported_estimated_time = False
        while not done:
            # Report estimated time if not already reported
            if not reported_estimated_time:
                original_base_url = openai.base_url
                openai.base_url = ArborProvider._get_arbor_base_api()
                remote_job = openai.fine_tuning.jobs.retrieve(job.provider_job_id)
                openai.base_url = original_base_url

                timestamp = remote_job.estimated_finish
                if timestamp:
                    estimated_finish_dt = datetime.fromtimestamp(timestamp)
                    delta_dt = estimated_finish_dt - datetime.now()
                    print(f"[Arbor Provider] The Arbor estimated time remaining is: {delta_dt}")
                    reported_estimated_time = True

            # Get new events
            original_base_url = openai.base_url
            openai.base_url = ArborProvider._get_arbor_base_api()
            page = openai.fine_tuning.jobs.list_events(fine_tuning_job_id=job.provider_job_id, limit=1)
            openai.base_url = original_base_url

            new_event = page.data[0] if page.data else None
            if new_event and new_event.id != cur_event_id:
                dt = datetime.fromtimestamp(new_event.created_at)
                print(f"[Arbor Provider] {dt} {new_event.message}")
                cur_event_id = new_event.id

            # Sleep and update the flag
            time.sleep(poll_frequency)
            done = ArborProvider.is_terminal_training_status(job.status())

    @staticmethod
    def get_trained_model(job, training_kwargs: dict[str, Any]):
        status = job.status()
        if status != TrainingStatus.succeeded:
            err_msg = f"Job status is {status}."
            err_msg += f" Must be {TrainingStatus.succeeded} to retrieve model."
            raise Exception(err_msg)

        original_base_url = openai.base_url
        openai.base_url = ArborProvider._get_arbor_base_api()
        provider_job = openai.fine_tuning.jobs.retrieve(job.provider_job_id)
        openai.base_url = original_base_url

        finetuned_model = provider_job.fine_tuned_model
        return finetuned_model

```

--------------------------------------------------------------------------------
/tests/adapters/test_chat_adapter.py:
--------------------------------------------------------------------------------

```python
from typing import Literal
from unittest import mock

import pydantic
import pytest
from litellm.utils import ChatCompletionMessageToolCall, Choices, Function, Message, ModelResponse

import dspy


@pytest.mark.parametrize(
    "input_literal, output_literal, input_value, expected_input_str, expected_output_str",
    [
        # Scenario 1: double quotes escaped within strings
        (
            Literal["one", "two", 'three"'],
            Literal["four", "five", 'six"'],
            "two",
            "Literal['one', 'two', 'three\"']",
            "Literal['four', 'five', 'six\"']",
        ),
        # Scenario 2: Single quotes inside strings
        (
            Literal["she's here", "okay", "test"],
            Literal["done", "maybe'soon", "later"],
            "she's here",
            "Literal[\"she's here\", 'okay', 'test']",
            "Literal['done', \"maybe'soon\", 'later']",
        ),
        # Scenario 3: Strings containing both single and double quotes
        (
            Literal["both\"and'", "another"],
            Literal["yet\"another'", "plain"],
            "another",
            "Literal['both\"and\\'', 'another']",
            "Literal['yet\"another\\'', 'plain']",
        ),
        # Scenario 4: No quotes at all (check the default)
        (
            Literal["foo", "bar"],
            Literal["baz", "qux"],
            "foo",
            "Literal['foo', 'bar']",
            "Literal['baz', 'qux']",
        ),
        # Scenario 5: Mixed types
        (
            Literal[1, "bar"],
            Literal[True, 3, "foo"],
            "bar",
            "Literal[1, 'bar']",
            "Literal[True, 3, 'foo']",
        ),
    ],
)
def test_chat_adapter_quotes_literals_as_expected(
    input_literal, output_literal, input_value, expected_input_str, expected_output_str
):
    """
    This test verifies that when we declare Literal fields with various mixes
    of single/double quotes, the generated content string includes those
    Literals exactly as we want them to appear (like IPython does).
    """

    class TestSignature(dspy.Signature):
        input_text: input_literal = dspy.InputField()
        output_text: output_literal = dspy.OutputField()

    program = dspy.Predict(TestSignature)

    dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=dspy.ChatAdapter())

    with mock.patch("litellm.completion") as mock_completion:
        program(input_text=input_value)

    mock_completion.assert_called_once()
    _, call_kwargs = mock_completion.call_args
    content = call_kwargs["messages"][0]["content"]

    assert expected_input_str in content
    assert expected_output_str in content


def test_chat_adapter_sync_call():
    signature = dspy.make_signature("question->answer")
    adapter = dspy.ChatAdapter()
    lm = dspy.utils.DummyLM([{"answer": "Paris"}])
    result = adapter(lm, {}, signature, [], {"question": "What is the capital of France?"})
    assert result == [{"answer": "Paris"}]


@pytest.mark.asyncio
async def test_chat_adapter_async_call():
    signature = dspy.make_signature("question->answer")
    adapter = dspy.ChatAdapter()
    lm = dspy.utils.DummyLM([{"answer": "Paris"}])
    result = await adapter.acall(lm, {}, signature, [], {"question": "What is the capital of France?"})
    assert result == [{"answer": "Paris"}]


def test_chat_adapter_with_pydantic_models():
    """
    This test verifies that ChatAdapter can handle different input and output field types, both basic and nested.
    """

    class DogClass(pydantic.BaseModel):
        dog_breeds: list[str] = pydantic.Field(description="List of the breeds of dogs")
        num_dogs: int = pydantic.Field(description="Number of dogs the owner has", ge=0, le=10)

    class PetOwner(pydantic.BaseModel):
        name: str = pydantic.Field(description="Name of the owner")
        num_pets: int = pydantic.Field(description="Amount of pets the owner has", ge=0, le=100)
        dogs: DogClass = pydantic.Field(description="Nested Pydantic class with dog specific information ")

    class Answer(pydantic.BaseModel):
        result: str
        analysis: str

    class TestSignature(dspy.Signature):
        owner: PetOwner = dspy.InputField()
        question: str = dspy.InputField()
        output: Answer = dspy.OutputField()

    dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=dspy.ChatAdapter())
    program = dspy.Predict(TestSignature)

    with mock.patch("litellm.completion") as mock_completion:
        program(
            owner=PetOwner(name="John", num_pets=5, dogs=DogClass(dog_breeds=["labrador", "chihuahua"], num_dogs=2)),
            question="How many non-dog pets does John have?",
        )

    mock_completion.assert_called_once()
    _, call_kwargs = mock_completion.call_args

    system_content = call_kwargs["messages"][0]["content"]
    user_content = call_kwargs["messages"][1]["content"]
    assert "1. `owner` (PetOwner)" in system_content
    assert "2. `question` (str)" in system_content
    assert "1. `output` (Answer)" in system_content

    assert "name" in user_content
    assert "num_pets" in user_content
    assert "dogs" in user_content
    assert "dog_breeds" in user_content
    assert "num_dogs" in user_content
    assert "How many non-dog pets does John have?" in user_content


def test_chat_adapter_signature_information():
    """
    This test ensures that the signature information sent to the LM follows an expected format.
    """

    class TestSignature(dspy.Signature):
        input1: str = dspy.InputField(desc="String Input")
        input2: int = dspy.InputField(desc="Integer Input")
        output: str = dspy.OutputField(desc="String Output")

    dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=dspy.ChatAdapter())
    program = dspy.Predict(TestSignature)

    with mock.patch("litellm.completion") as mock_completion:
        program(input1="Test", input2=11)

    mock_completion.assert_called_once()
    _, call_kwargs = mock_completion.call_args

    assert len(call_kwargs["messages"]) == 2
    assert call_kwargs["messages"][0]["role"] == "system"
    assert call_kwargs["messages"][1]["role"] == "user"

    system_content = call_kwargs["messages"][0]["content"]
    user_content = call_kwargs["messages"][1]["content"]

    assert "1. `input1` (str)" in system_content
    assert "2. `input2` (int)" in system_content
    assert "1. `output` (str)" in system_content
    assert "[[ ## input1 ## ]]\n{input1}" in system_content
    assert "[[ ## input2 ## ]]\n{input2}" in system_content
    assert "[[ ## output ## ]]\n{output}" in system_content
    assert "[[ ## completed ## ]]" in system_content

    assert "[[ ## input1 ## ]]" in user_content
    assert "[[ ## input2 ## ]]" in user_content
    assert "[[ ## output ## ]]" in user_content
    assert "[[ ## completed ## ]]" in user_content


def test_chat_adapter_exception_raised_on_failure():
    """
    This test ensures that on an error, ChatAdapter raises an explicit exception.
    """
    signature = dspy.make_signature("question->answer")
    adapter = dspy.ChatAdapter()
    invalid_completion = "{'output':'mismatched value'}"
    with pytest.raises(dspy.utils.exceptions.AdapterParseError, match="Adapter ChatAdapter failed to parse*"):
        adapter.parse(signature, invalid_completion)


def test_chat_adapter_formats_image():
    # Test basic image formatting
    image = dspy.Image(url="https://example.com/image.jpg")

    class MySignature(dspy.Signature):
        image: dspy.Image = dspy.InputField()
        text: str = dspy.OutputField()

    adapter = dspy.ChatAdapter()
    messages = adapter.format(MySignature, [], {"image": image})

    assert len(messages) == 2
    user_message_content = messages[1]["content"]
    assert user_message_content is not None

    # The message should have 3 chunks of types: text, image_url, text
    assert len(user_message_content) == 3
    assert user_message_content[0]["type"] == "text"
    assert user_message_content[2]["type"] == "text"

    # Assert that the image is formatted correctly
    expected_image_content = {"type": "image_url", "image_url": {"url": "https://example.com/image.jpg"}}
    assert expected_image_content in user_message_content


def test_chat_adapter_formats_image_with_few_shot_examples():
    class MySignature(dspy.Signature):
        image: dspy.Image = dspy.InputField()
        text: str = dspy.OutputField()

    adapter = dspy.ChatAdapter()

    demos = [
        dspy.Example(
            image=dspy.Image(url="https://example.com/image1.jpg"),
            text="This is a test image",
        ),
        dspy.Example(
            image=dspy.Image(url="https://example.com/image2.jpg"),
            text="This is another test image",
        ),
    ]
    messages = adapter.format(MySignature, demos, {"image": dspy.Image(url="https://example.com/image3.jpg")})

    # 1 system message, 2 few shot examples (1 user and assistant message for each example), 1 user message
    assert len(messages) == 6

    assert "[[ ## completed ## ]]\n" in messages[2]["content"]
    assert "[[ ## completed ## ]]\n" in messages[4]["content"]

    assert {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}} in messages[1]["content"]
    assert {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}} in messages[3]["content"]
    assert {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}} in messages[5]["content"]


def test_chat_adapter_formats_image_with_nested_images():
    class ImageWrapper(pydantic.BaseModel):
        images: list[dspy.Image]
        tag: list[str]

    class MySignature(dspy.Signature):
        image: ImageWrapper = dspy.InputField()
        text: str = dspy.OutputField()

    image1 = dspy.Image(url="https://example.com/image1.jpg")
    image2 = dspy.Image(url="https://example.com/image2.jpg")
    image3 = dspy.Image(url="https://example.com/image3.jpg")

    image_wrapper = ImageWrapper(images=[image1, image2, image3], tag=["test", "example"])

    adapter = dspy.ChatAdapter()
    messages = adapter.format(MySignature, [], {"image": image_wrapper})

    expected_image1_content = {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}}
    expected_image2_content = {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}}
    expected_image3_content = {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}}

    assert expected_image1_content in messages[1]["content"]
    assert expected_image2_content in messages[1]["content"]
    assert expected_image3_content in messages[1]["content"]


def test_chat_adapter_formats_image_with_few_shot_examples_with_nested_images():
    class ImageWrapper(pydantic.BaseModel):
        images: list[dspy.Image]
        tag: list[str]

    class MySignature(dspy.Signature):
        image: ImageWrapper = dspy.InputField()
        text: str = dspy.OutputField()

    image1 = dspy.Image(url="https://example.com/image1.jpg")
    image2 = dspy.Image(url="https://example.com/image2.jpg")
    image3 = dspy.Image(url="https://example.com/image3.jpg")

    image_wrapper = ImageWrapper(images=[image1, image2, image3], tag=["test", "example"])
    demos = [
        dspy.Example(
            image=image_wrapper,
            text="This is a test image",
        ),
    ]

    image_wrapper_2 = ImageWrapper(images=[dspy.Image(url="https://example.com/image4.jpg")], tag=["test", "example"])
    adapter = dspy.ChatAdapter()
    messages = adapter.format(MySignature, demos, {"image": image_wrapper_2})

    assert len(messages) == 4

    # Image information in the few-shot example's user message
    expected_image1_content = {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}}
    expected_image2_content = {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}}
    expected_image3_content = {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}}
    assert expected_image1_content in messages[1]["content"]
    assert expected_image2_content in messages[1]["content"]
    assert expected_image3_content in messages[1]["content"]

    # The query image is formatted in the last user message
    assert {"type": "image_url", "image_url": {"url": "https://example.com/image4.jpg"}} in messages[-1]["content"]


def test_chat_adapter_with_tool():
    class MySignature(dspy.Signature):
        """Answer question with the help of the tools"""

        question: str = dspy.InputField()
        tools: list[dspy.Tool] = dspy.InputField()
        answer: str = dspy.OutputField()
        tool_calls: dspy.ToolCalls = dspy.OutputField()

    def get_weather(city: str) -> str:
        """Get the weather for a city"""
        return f"The weather in {city} is sunny"

    def get_population(country: str, year: int) -> str:
        """Get the population for a country"""
        return f"The population of {country} in {year} is 1000000"

    tools = [dspy.Tool(get_weather), dspy.Tool(get_population)]

    adapter = dspy.ChatAdapter()
    messages = adapter.format(MySignature, [], {"question": "What is the weather in Tokyo?", "tools": tools})

    assert len(messages) == 2

    # The output field type description should be included in the system message even if the output field is nested
    assert dspy.ToolCalls.description() in messages[0]["content"]

    # The user message should include the question and the tools
    assert "What is the weather in Tokyo?" in messages[1]["content"]
    assert "get_weather" in messages[1]["content"]
    assert "get_population" in messages[1]["content"]

    # Tool arguments format should be included in the user message
    assert "{'city': {'type': 'string'}}" in messages[1]["content"]
    assert "{'country': {'type': 'string'}, 'year': {'type': 'integer'}}" in messages[1]["content"]


def test_chat_adapter_with_code():
    # Test with code as input field
    class CodeAnalysis(dspy.Signature):
        """Analyze the time complexity of the code"""

        code: dspy.Code = dspy.InputField()
        result: str = dspy.OutputField()

    adapter = dspy.ChatAdapter()
    messages = adapter.format(CodeAnalysis, [], {"code": "print('Hello, world!')"})

    assert len(messages) == 2

    # The output field type description should be included in the system message even if the output field is nested
    assert dspy.Code.description() in messages[0]["content"]

    # The user message should include the question and the tools
    assert "print('Hello, world!')" in messages[1]["content"]

    # Test with code as output field
    class CodeGeneration(dspy.Signature):
        """Generate code to answer the question"""

        question: str = dspy.InputField()
        code: dspy.Code = dspy.OutputField()

    adapter = dspy.ChatAdapter()
    with mock.patch("litellm.completion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content='[[ ## code ## ]]\nprint("Hello, world!")'))],
            model="openai/gpt-4o-mini",
        )
        result = adapter(
            dspy.LM(model="openai/gpt-4o-mini", cache=False),
            {},
            CodeGeneration,
            [],
            {"question": "Write a python program to print 'Hello, world!'"},
        )
        assert result[0]["code"].code == 'print("Hello, world!")'


def test_chat_adapter_formats_conversation_history():
    class MySignature(dspy.Signature):
        question: str = dspy.InputField()
        history: dspy.History = dspy.InputField()
        answer: str = dspy.OutputField()

    history = dspy.History(
        messages=[
            {"question": "What is the capital of France?", "answer": "Paris"},
            {"question": "What is the capital of Germany?", "answer": "Berlin"},
        ]
    )

    adapter = dspy.ChatAdapter()
    messages = adapter.format(MySignature, [], {"question": "What is the capital of France?", "history": history})

    assert len(messages) == 6
    assert messages[1]["content"] == "[[ ## question ## ]]\nWhat is the capital of France?"
    assert messages[2]["content"] == "[[ ## answer ## ]]\nParis\n\n[[ ## completed ## ]]\n"
    assert messages[3]["content"] == "[[ ## question ## ]]\nWhat is the capital of Germany?"
    assert messages[4]["content"] == "[[ ## answer ## ]]\nBerlin\n\n[[ ## completed ## ]]\n"


def test_chat_adapter_fallback_to_json_adapter_on_exception():
    signature = dspy.make_signature("question->answer")
    adapter = dspy.ChatAdapter()

    with mock.patch("litellm.completion") as mock_completion:
        # Mock returning a response compatible with JSONAdapter but not ChatAdapter
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content="{'answer': 'Paris'}"))],
            model="openai/gpt-4o-mini",
        )

        lm = dspy.LM("openai/gpt-4o-mini", cache=False)

        with mock.patch("dspy.adapters.json_adapter.JSONAdapter.__call__") as mock_json_adapter_call:
            adapter(lm, {}, signature, [], {"question": "What is the capital of France?"})
            mock_json_adapter_call.assert_called_once()

        # The parse should succeed
        result = adapter(lm, {}, signature, [], {"question": "What is the capital of France?"})
        assert result == [{"answer": "Paris"}]


@pytest.mark.asyncio
async def test_chat_adapter_fallback_to_json_adapter_on_exception_async():
    signature = dspy.make_signature("question->answer")
    adapter = dspy.ChatAdapter()

    with mock.patch("litellm.acompletion") as mock_completion:
        # Mock returning a response compatible with JSONAdapter but not ChatAdapter
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content="{'answer': 'Paris'}"))],
            model="openai/gpt-4o-mini",
        )

        lm = dspy.LM("openai/gpt-4o-mini", cache=False)

        with mock.patch("dspy.adapters.json_adapter.JSONAdapter.acall") as mock_json_adapter_acall:
            await adapter.acall(lm, {}, signature, [], {"question": "What is the capital of France?"})
            mock_json_adapter_acall.assert_called_once()

        # The parse should succeed
        result = await adapter.acall(lm, {}, signature, [], {"question": "What is the capital of France?"})
        assert result == [{"answer": "Paris"}]


def test_chat_adapter_toolcalls_native_function_calling():
    class MySignature(dspy.Signature):
        question: str = dspy.InputField()
        tools: list[dspy.Tool] = dspy.InputField()
        answer: str = dspy.OutputField()
        tool_calls: dspy.ToolCalls = dspy.OutputField()

    def get_weather(city: str) -> str:
        return f"The weather in {city} is sunny"

    tools = [dspy.Tool(get_weather)]

    adapter = dspy.JSONAdapter(use_native_function_calling=True)

    # Case 1: Tool calls are present in the response, while content is None.
    with mock.patch("litellm.completion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[
                Choices(
                    finish_reason="tool_calls",
                    index=0,
                    message=Message(
                        content=None,
                        role="assistant",
                        tool_calls=[
                            ChatCompletionMessageToolCall(
                                function=Function(arguments='{"city":"Paris"}', name="get_weather"),
                                id="call_pQm8ajtSMxgA0nrzK2ivFmxG",
                                type="function",
                            )
                        ],
                    ),
                ),
            ],
            model="openai/gpt-4o-mini",
        )
        result = adapter(
            dspy.LM(model="openai/gpt-4o-mini", cache=False),
            {},
            MySignature,
            [],
            {"question": "What is the weather in Paris?", "tools": tools},
        )

        assert result[0]["tool_calls"] == dspy.ToolCalls(
            tool_calls=[dspy.ToolCalls.ToolCall(name="get_weather", args={"city": "Paris"})]
        )
        # `answer` is not present, so we set it to None
        assert result[0]["answer"] is None

    # Case 2: Tool calls are not present in the response, while content is present.
    with mock.patch("litellm.completion") as mock_completion:
        mock_completion.return_value = ModelResponse(
            choices=[Choices(message=Message(content="{'answer': 'Paris'}"))],
            model="openai/gpt-4o-mini",
        )
        result = adapter(
            dspy.LM(model="openai/gpt-4o-mini", cache=False),
            {},
            MySignature,
            [],
            {"question": "What is the weather in Paris?", "tools": tools},
        )
        assert result[0]["answer"] == "Paris"
        assert result[0]["tool_calls"] is None


def test_chat_adapter_toolcalls_vague_match():
    class MySignature(dspy.Signature):
        question: str = dspy.InputField()
        tools: list[dspy.Tool] = dspy.InputField()
        tool_calls: dspy.ToolCalls = dspy.OutputField()

    def get_weather(city: str) -> str:
        return f"The weather in {city} is sunny"

    tools = [dspy.Tool(get_weather)]

    adapter = dspy.ChatAdapter()

    with mock.patch("litellm.completion") as mock_completion:
        # Case 1: tool_calls field is a list of dicts
        mock_completion.return_value = ModelResponse(
            choices=[
                Choices(
                    message=Message(
                        content="[[ ## tool_calls ## ]]\n[{'name': 'get_weather', 'args': {'city': 'Paris'}]"
                    )
                )
            ],
            model="openai/gpt-4o-mini",
        )
        result = adapter(
            dspy.LM(model="openai/gpt-4o-mini", cache=False),
            {},
            MySignature,
            [],
            {"question": "What is the weather in Paris?", "tools": tools},
        )
        assert result[0]["tool_calls"] == dspy.ToolCalls(
            tool_calls=[dspy.ToolCalls.ToolCall(name="get_weather", args={"city": "Paris"})]
        )

    with mock.patch("litellm.completion") as mock_completion:
        # Case 2: tool_calls field is a single dict with "name" and "args" keys
        mock_completion.return_value = ModelResponse(
            choices=[
                Choices(
                    message=Message(
                        content="[[ ## tool_calls ## ]]\n{'name': 'get_weather', 'args': {'city': 'Paris'}}"
                    )
                )
            ],
            model="openai/gpt-4o-mini",
        )
        result = adapter(
            dspy.LM(model="openai/gpt-4o-mini", cache=False),
            {},
            MySignature,
            [],
            {"question": "What is the weather in Paris?", "tools": tools},
        )
        assert result[0]["tool_calls"] == dspy.ToolCalls(
            tool_calls=[dspy.ToolCalls.ToolCall(name="get_weather", args={"city": "Paris"})]
        )

```

--------------------------------------------------------------------------------
/docs/docs/tutorials/ai_text_game/index.md:
--------------------------------------------------------------------------------

```markdown
# Building a Creative Text-Based AI Game with DSPy

This tutorial demonstrates how to create an interactive text-based adventure game using DSPy's modular programming approach. You'll build a dynamic game where AI handles narrative generation, character interactions, and adaptive gameplay.

## What You'll Build

An intelligent text-based adventure game featuring:

- Dynamic story generation and branching narratives
- AI-powered character interactions and dialogue
- Adaptive gameplay that responds to player choices
- Inventory and character progression systems
- Save/load game state functionality

## Setup

```bash
pip install dspy rich typer
```

## Step 1: Core Game Framework

```python
import dspy
import json
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
from enum import Enum
import random
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
import typer

# Configure DSPy
lm = dspy.LM(model='openai/gpt-4o-mini')
dspy.configure(lm=lm)

console = Console()

class GameState(Enum):
    MENU = "menu"
    PLAYING = "playing"
    INVENTORY = "inventory"
    CHARACTER = "character"
    GAME_OVER = "game_over"

@dataclass
class Player:
    name: str
    health: int = 100
    level: int = 1
    experience: int = 0
    inventory: list[str] = field(default_factory=list)
    skills: dict[str, int] = field(default_factory=lambda: {
        "strength": 10,
        "intelligence": 10,
        "charisma": 10,
        "stealth": 10
    })
    
    def add_item(self, item: str):
        self.inventory.append(item)
        console.print(f"[green]Added {item} to inventory![/green]")
    
    def remove_item(self, item: str) -> bool:
        if item in self.inventory:
            self.inventory.remove(item)
            return True
        return False
    
    def gain_experience(self, amount: int):
        self.experience += amount
        old_level = self.level
        self.level = 1 + (self.experience // 100)
        if self.level > old_level:
            console.print(f"[bold yellow]Level up! You are now level {self.level}![/bold yellow]")

@dataclass
class GameContext:
    current_location: str = "Village Square"
    story_progress: int = 0
    visited_locations: list[str] = field(default_factory=list)
    npcs_met: list[str] = field(default_factory=list)
    completed_quests: list[str] = field(default_factory=list)
    game_flags: dict[str, bool] = field(default_factory=dict)
    
    def add_flag(self, flag: str, value: bool = True):
        self.game_flags[flag] = value
    
    def has_flag(self, flag: str) -> bool:
        return self.game_flags.get(flag, False)

class GameEngine:
    def __init__(self):
        self.player = None
        self.context = GameContext()
        self.state = GameState.MENU
        self.running = True
        
    def save_game(self, filename: str = "savegame.json"):
        """Save current game state."""
        save_data = {
            "player": {
                "name": self.player.name,
                "health": self.player.health,
                "level": self.player.level,
                "experience": self.player.experience,
                "inventory": self.player.inventory,
                "skills": self.player.skills
            },
            "context": {
                "current_location": self.context.current_location,
                "story_progress": self.context.story_progress,
                "visited_locations": self.context.visited_locations,
                "npcs_met": self.context.npcs_met,
                "completed_quests": self.context.completed_quests,
                "game_flags": self.context.game_flags
            }
        }
        
        with open(filename, 'w') as f:
            json.dump(save_data, f, indent=2)
        console.print(f"[green]Game saved to {filename}![/green]")
    
    def load_game(self, filename: str = "savegame.json") -> bool:
        """Load game state from file."""
        try:
            with open(filename, 'r') as f:
                save_data = json.load(f)
            
            # Reconstruct player
            player_data = save_data["player"]
            self.player = Player(
                name=player_data["name"],
                health=player_data["health"],
                level=player_data["level"],
                experience=player_data["experience"],
                inventory=player_data["inventory"],
                skills=player_data["skills"]
            )
            
            # Reconstruct context
            context_data = save_data["context"]
            self.context = GameContext(
                current_location=context_data["current_location"],
                story_progress=context_data["story_progress"],
                visited_locations=context_data["visited_locations"],
                npcs_met=context_data["npcs_met"],
                completed_quests=context_data["completed_quests"],
                game_flags=context_data["game_flags"]
            )
            
            console.print(f"[green]Game loaded from {filename}![/green]")
            return True
            
        except FileNotFoundError:
            console.print(f"[red]Save file {filename} not found![/red]")
            return False
        except Exception as e:
            console.print(f"[red]Error loading game: {e}![/red]")
            return False

# Initialize game engine
game = GameEngine()
```

## Step 2: AI-Powered Story Generation

```python
class StoryGenerator(dspy.Signature):
    """Generate dynamic story content based on current game state."""
    location: str = dspy.InputField(desc="Current location")
    player_info: str = dspy.InputField(desc="Player information and stats")
    story_progress: int = dspy.InputField(desc="Current story progress level")
    recent_actions: str = dspy.InputField(desc="Player's recent actions")
    
    scene_description: str = dspy.OutputField(desc="Vivid description of current scene")
    available_actions: list[str] = dspy.OutputField(desc="List of possible player actions")
    npcs_present: list[str] = dspy.OutputField(desc="NPCs present in this location")
    items_available: list[str] = dspy.OutputField(desc="Items that can be found or interacted with")

class DialogueGenerator(dspy.Signature):
    """Generate NPC dialogue and responses."""
    npc_name: str = dspy.InputField(desc="Name and type of NPC")
    npc_personality: str = dspy.InputField(desc="NPC personality and background")
    player_input: str = dspy.InputField(desc="What the player said or did")
    context: str = dspy.InputField(desc="Current game context and history")
    
    npc_response: str = dspy.OutputField(desc="NPC's dialogue response")
    mood_change: str = dspy.OutputField(desc="How NPC's mood changed (positive/negative/neutral)")
    quest_offered: bool = dspy.OutputField(desc="Whether NPC offers a quest")
    information_revealed: str = dspy.OutputField(desc="Any important information shared")

class ActionResolver(dspy.Signature):
    """Resolve player actions and determine outcomes."""
    action: str = dspy.InputField(desc="Player's chosen action")
    player_stats: str = dspy.InputField(desc="Player's current stats and skills")
    context: str = dspy.InputField(desc="Current game context")
    difficulty: str = dspy.InputField(desc="Difficulty level of the action")
    
    success: bool = dspy.OutputField(desc="Whether the action succeeded")
    outcome_description: str = dspy.OutputField(desc="Description of what happened")
    stat_changes: dict[str, int] = dspy.OutputField(desc="Changes to player stats")
    items_gained: list[str] = dspy.OutputField(desc="Items gained from this action")
    experience_gained: int = dspy.OutputField(desc="Experience points gained")

class GameAI(dspy.Module):
    """Main AI module for game logic and narrative."""
    
    def __init__(self):
        super().__init__()
        self.story_gen = dspy.ChainOfThought(StoryGenerator)
        self.dialogue_gen = dspy.ChainOfThought(DialogueGenerator)
        self.action_resolver = dspy.ChainOfThought(ActionResolver)
    
    def generate_scene(self, player: Player, context: GameContext, recent_actions: str = "") -> Dict:
        """Generate current scene description and options."""
        
        player_info = f"Level {player.level} {player.name}, Health: {player.health}, Skills: {player.skills}"
        
        scene = self.story_gen(
            location=context.current_location,
            player_info=player_info,
            story_progress=context.story_progress,
            recent_actions=recent_actions
        )
        
        return {
            "description": scene.scene_description,
            "actions": scene.available_actions,
            "npcs": scene.npcs_present,
            "items": scene.items_available
        }
    
    def handle_dialogue(self, npc_name: str, player_input: str, context: GameContext) -> Dict:
        """Handle conversation with NPCs."""
        
        # Create NPC personality based on name and context
        personality_map = {
            "Village Elder": "Wise, knowledgeable, speaks in riddles, has ancient knowledge",
            "Merchant": "Greedy but fair, loves to bargain, knows about valuable items",
            "Guard": "Dutiful, suspicious of strangers, follows rules strictly",
            "Thief": "Sneaky, untrustworthy, has information about hidden things",
            "Wizard": "Mysterious, powerful, speaks about magic and ancient forces"
        }
        
        personality = personality_map.get(npc_name, "Friendly villager with local knowledge")
        game_context = f"Location: {context.current_location}, Story progress: {context.story_progress}"
        
        response = self.dialogue_gen(
            npc_name=npc_name,
            npc_personality=personality,
            player_input=player_input,
            context=game_context
        )
        
        return {
            "response": response.npc_response,
            "mood": response.mood_change,
            "quest": response.quest_offered,
            "info": response.information_revealed
        }
    
    def resolve_action(self, action: str, player: Player, context: GameContext) -> Dict:
        """Resolve player actions and determine outcomes."""
        
        player_stats = f"Level {player.level}, Health {player.health}, Skills: {player.skills}"
        game_context = f"Location: {context.current_location}, Progress: {context.story_progress}"
        
        # Determine difficulty based on action type
        difficulty = "medium"
        if any(word in action.lower() for word in ["fight", "battle", "attack"]):
            difficulty = "hard"
        elif any(word in action.lower() for word in ["look", "examine", "talk"]):
            difficulty = "easy"
        
        result = self.action_resolver(
            action=action,
            player_stats=player_stats,
            context=game_context,
            difficulty=difficulty
        )
        
        return {
            "success": result.success,
            "description": result.outcome_description,
            "stat_changes": result.stat_changes,
            "items": result.items_gained,
            "experience": result.experience_gained
        }

# Initialize AI
ai = GameAI()
```

## Step 3: Game Interface and Interaction

```python
def display_game_header():
    """Display the game header."""
    header = Text("🏰 MYSTIC REALM ADVENTURE 🏰", style="bold magenta")
    console.print(Panel(header, style="bright_blue"))

def display_player_status(player: Player):
    """Display player status panel."""
    status = f"""
[bold]Name:[/bold] {player.name}
[bold]Level:[/bold] {player.level} (XP: {player.experience})
[bold]Health:[/bold] {player.health}/100
[bold]Skills:[/bold]
  • Strength: {player.skills['strength']}
  • Intelligence: {player.skills['intelligence']}
  • Charisma: {player.skills['charisma']}
  • Stealth: {player.skills['stealth']}
[bold]Inventory:[/bold] {len(player.inventory)} items
    """
    console.print(Panel(status.strip(), title="Player Status", style="green"))

def display_location(context: GameContext, scene: Dict):
    """Display current location and scene."""
    location_panel = f"""
[bold yellow]{context.current_location}[/bold yellow]

{scene['description']}
    """
    
    if scene['npcs']:
        location_panel += f"\n\n[bold]NPCs present:[/bold] {', '.join(scene['npcs'])}"
    
    if scene['items']:
        location_panel += f"\n[bold]Items visible:[/bold] {', '.join(scene['items'])}"
    
    console.print(Panel(location_panel.strip(), title="Current Location", style="cyan"))

def display_actions(actions: list[str]):
    """Display available actions."""
    action_text = "\n".join([f"{i+1}. {action}" for i, action in enumerate(actions)])
    console.print(Panel(action_text, title="Available Actions", style="yellow"))

def get_player_choice(max_choices: int) -> int:
    """Get player's choice with input validation."""
    while True:
        try:
            choice = typer.prompt("Choose an action (number)")
            choice_num = int(choice)
            if 1 <= choice_num <= max_choices:
                return choice_num - 1
            else:
                console.print(f"[red]Please enter a number between 1 and {max_choices}[/red]")
        except ValueError:
            console.print("[red]Please enter a valid number[/red]")

def show_inventory(player: Player):
    """Display player inventory."""
    if not player.inventory:
        console.print(Panel("Your inventory is empty.", title="Inventory", style="red"))
    else:
        items = "\n".join([f"• {item}" for item in player.inventory])
        console.print(Panel(items, title="Inventory", style="green"))

def main_menu():
    """Display main menu and handle selection."""
    console.clear()
    display_game_header()
    
    menu_options = [
        "1. New Game",
        "2. Load Game", 
        "3. How to Play",
        "4. Exit"
    ]
    
    menu_text = "\n".join(menu_options)
    console.print(Panel(menu_text, title="Main Menu", style="bright_blue"))
    
    choice = typer.prompt("Select an option")
    return choice

def show_help():
    """Display help information."""
    help_text = """
[bold]How to Play:[/bold]

• This is a text-based adventure game powered by AI
• Make choices by selecting numbered options
• Talk to NPCs to learn about the world and get quests
• Explore different locations to find items and adventures
• Your choices affect the story and character development
• Use 'inventory' to check your items
• Use 'status' to see your character info
• Type 'save' to save your progress
• Type 'quit' to return to main menu

[bold]Tips:[/bold]
• Different skills affect your success in various actions
• NPCs remember your previous interactions
• Explore thoroughly - there are hidden secrets!
• Your reputation affects how NPCs treat you
    """
    console.print(Panel(help_text.strip(), title="Game Help", style="blue"))
    typer.prompt("Press Enter to continue")
```

## Step 4: Main Game Loop

```python
def create_new_character():
    """Create a new player character."""
    console.clear()
    display_game_header()
    
    name = typer.prompt("Enter your character's name")
    
    # Character creation with skill point allocation
    console.print("\n[bold]Character Creation[/bold]")
    console.print("You have 10 extra skill points to distribute among your skills.")
    console.print("Base skills start at 10 each.\n")
    
    skills = {"strength": 10, "intelligence": 10, "charisma": 10, "stealth": 10}
    points_remaining = 10
    
    for skill in skills.keys():
        if points_remaining > 0:
            console.print(f"Points remaining: {points_remaining}")
            while True:
                try:
                    points = int(typer.prompt(f"Points to add to {skill} (0-{points_remaining})"))
                    if 0 <= points <= points_remaining:
                        skills[skill] += points
                        points_remaining -= points
                        break
                    else:
                        console.print(f"[red]Enter a number between 0 and {points_remaining}[/red]")
                except ValueError:
                    console.print("[red]Please enter a valid number[/red]")
    
    player = Player(name=name, skills=skills)
    console.print(f"\n[green]Welcome to Mystic Realm, {name}![/green]")
    return player

def game_loop():
    """Main game loop."""
    recent_actions = ""
    
    while game.running and game.state == GameState.PLAYING:
        console.clear()
        display_game_header()
        
        # Generate current scene
        scene = ai.generate_scene(game.player, game.context, recent_actions)
        
        # Display game state
        display_player_status(game.player)
        display_location(game.context, scene)
        
        # Add standard actions
        all_actions = scene['actions'] + ["Check inventory", "Character status", "Save game", "Quit to menu"]
        display_actions(all_actions)
        
        # Get player choice
        choice_idx = get_player_choice(len(all_actions))
        chosen_action = all_actions[choice_idx]
        
        # Handle special commands
        if chosen_action == "Check inventory":
            show_inventory(game.player)
            typer.prompt("Press Enter to continue")
            continue
        elif chosen_action == "Character status":
            display_player_status(game.player)
            typer.prompt("Press Enter to continue")
            continue
        elif chosen_action == "Save game":
            game.save_game()
            typer.prompt("Press Enter to continue")
            continue
        elif chosen_action == "Quit to menu":
            game.state = GameState.MENU
            break
        
        # Handle game actions
        if chosen_action in scene['actions']:
            # Check if it's dialogue with an NPC
            npc_target = None
            for npc in scene['npcs']:
                if npc.lower() in chosen_action.lower():
                    npc_target = npc
                    break
            
            if npc_target:
                # Handle NPC interaction
                console.print(f"\n[bold]Talking to {npc_target}...[/bold]")
                dialogue = ai.handle_dialogue(npc_target, chosen_action, game.context)
                
                console.print(f"\n[italic]{npc_target}:[/italic] \"{dialogue['response']}\"")
                
                if dialogue['quest']:
                    console.print(f"[yellow]💼 Quest opportunity detected![/yellow]")
                
                if dialogue['info']:
                    console.print(f"[blue]ℹ️  {dialogue['info']}[/blue]")
                    
                # Add NPC to met list
                if npc_target not in game.context.npcs_met:
                    game.context.npcs_met.append(npc_target)
                
                recent_actions = f"Talked to {npc_target}: {chosen_action}"
            else:
                # Handle general action
                result = ai.resolve_action(chosen_action, game.player, game.context)
                
                console.print(f"\n{result['description']}")
                
                # Apply results
                if result['success']:
                    console.print("[green]✅ Success![/green]")
                    
                    # Apply stat changes
                    for stat, change in result['stat_changes'].items():
                        if stat in game.player.skills:
                            game.player.skills[stat] += change
                            if change > 0:
                                console.print(f"[green]{stat.title()} increased by {change}![/green]")
                        elif stat == "health":
                            game.player.health = max(0, min(100, game.player.health + change))
                            if change > 0:
                                console.print(f"[green]Health restored by {change}![/green]")
                            elif change < 0:
                                console.print(f"[red]Health decreased by {abs(change)}![/red]")
                    
                    # Add items
                    for item in result['items']:
                        game.player.add_item(item)
                    
                    # Give experience
                    if result['experience'] > 0:
                        game.player.gain_experience(result['experience'])
                    
                    # Update story progress
                    game.context.story_progress += 1
                else:
                    console.print("[red]❌ The action didn't go as planned...[/red]")
                
                recent_actions = f"Attempted: {chosen_action}"
            
            # Check for game over conditions
            if game.player.health <= 0:
                console.print("\n[bold red]💀 You have died! Game Over![/bold red]")
                game.state = GameState.GAME_OVER
                break
            
            typer.prompt("\nPress Enter to continue")

def main():
    """Main game function."""
    while game.running:
        if game.state == GameState.MENU:
            choice = main_menu()
            
            if choice == "1":
                game.player = create_new_character()
                game.context = GameContext()
                game.state = GameState.PLAYING
                console.print("\n[italic]Your adventure begins...[/italic]")
                typer.prompt("Press Enter to start")
                
            elif choice == "2":
                if game.load_game():
                    game.state = GameState.PLAYING
                typer.prompt("Press Enter to continue")
                
            elif choice == "3":
                show_help()
                
            elif choice == "4":
                game.running = False
                console.print("[bold]Thanks for playing! Goodbye![/bold]")
            
        elif game.state == GameState.PLAYING:
            game_loop()
            
        elif game.state == GameState.GAME_OVER:
            console.print("\n[bold]Game Over[/bold]")
            restart = typer.confirm("Would you like to return to the main menu?")
            if restart:
                game.state = GameState.MENU
            else:
                game.running = False

if __name__ == "__main__":
    main()
```

## Example Gameplay

When you run the game, you'll experience:

**Character Creation:**
```
🏰 MYSTIC REALM ADVENTURE 🏰

Enter your character's name: Aria

Character Creation
You have 10 extra skill points to distribute among your skills.
Base skills start at 10 each.

Points remaining: 10
Points to add to strength (0-10): 2
Points to add to intelligence (0-8): 4
Points to add to charisma (0-4): 3
Points to add to stealth (0-1): 1

Welcome to Mystic Realm, Aria!
```

**Dynamic Scene Generation:**
```
┌──────────── Current Location ────────────┐
│ Village Square                           │
│                                          │
│ You stand in the bustling heart of       │
│ Willowbrook Village. The ancient stone   │
│ fountain bubbles cheerfully as merchants │
│ hawk their wares and children play. A    │
│ mysterious hooded figure lurks near the  │
│ shadows of the old oak tree.             │
│                                          │
│ NPCs present: Village Elder, Merchant    │
│ Items visible: Strange Medallion, Herbs  │
└──────────────────────────────────────────┘

┌────────── Available Actions ─────────────┐
│ 1. Approach the hooded figure            │
│ 2. Talk to the Village Elder             │
│ 3. Browse the merchant's wares           │
│ 4. Examine the strange medallion         │
│ 5. Gather herbs near the fountain        │
│ 6. Head to the forest path               │
└───────────────────────────────────────────┘
```

**AI-Generated Dialogue:**
```
Talking to Village Elder...

Village Elder: "Ah, young traveler, I sense a great destiny 
surrounds you like morning mist. The ancient prophecy speaks 
of one who would come bearing the mark of courage. Tell me, 
have you noticed anything... unusual in your travels?"

💼 Quest opportunity detected!
ℹ️ The Village Elder knows about an ancient prophecy that might involve you
```

## Next Steps

- **Combat System**: Add turn-based battles with strategy
- **Magic System**: Spellcasting with resource management
- **Multiplayer**: Network support for cooperative adventures
- **Quest System**: Complex multi-step missions with branching outcomes
- **World Building**: Procedurally generated locations and characters
- **Audio**: Add sound effects and background music

This tutorial demonstrates how DSPy's modular approach enables complex, interactive systems where AI handles creative content generation while maintaining consistent game logic and player agency.

```

--------------------------------------------------------------------------------
/docs/docs/index.md:
--------------------------------------------------------------------------------

```markdown
---
sidebar_position: 1
hide:
  - navigation
  - toc

---

![DSPy](static/img/dspy_logo.png){ width="200", align=left }

# _Programming_—not prompting—_LMs_

[![PyPI Downloads](https://static.pepy.tech/badge/dspy/month)](https://pepy.tech/projects/dspy)

DSPy is a declarative framework for building modular AI software. It allows you to **iterate fast on structured code**, rather than brittle strings, and offers algorithms that **compile AI programs into effective prompts and weights** for your language models, whether you're building simple classifiers, sophisticated RAG pipelines, or Agent loops.

Instead of wrangling prompts or training jobs, DSPy (Declarative Self-improving Python) enables you to **build AI software from natural-language modules** and to _generically compose them_ with different models, inference strategies, or learning algorithms. This makes AI software **more reliable, maintainable, and portable** across models and strategies.

*tl;dr* Think of DSPy as a higher-level language for AI programming ([lecture](https://www.youtube.com/watch?v=JEMYuzrKLUw)), like the shift from assembly to C or pointer arithmetic to SQL. Meet the community, seek help, or start contributing via [GitHub](https://github.com/stanfordnlp/dspy) and [Discord](https://discord.gg/XCGy2WDCQB).

<!-- Its abstractions make your AI software more reliable and maintainable, and allow it to become more portable as new models and learning techniques emerge. It's also just rather elegant! -->

!!! info "Getting Started I: Install DSPy and set up your LM"

    ```bash
    > pip install -U dspy
    ```

    === "OpenAI"
        You can authenticate by setting the `OPENAI_API_KEY` env variable or passing `api_key` below.

        ```python linenums="1"
        import dspy
        lm = dspy.LM("openai/gpt-4o-mini", api_key="YOUR_OPENAI_API_KEY")
        dspy.configure(lm=lm)
        ```

    === "Anthropic"
        You can authenticate by setting the `ANTHROPIC_API_KEY` env variable or passing `api_key` below.

        ```python linenums="1"
        import dspy
        lm = dspy.LM("anthropic/claude-3-opus-20240229", api_key="YOUR_ANTHROPIC_API_KEY")
        dspy.configure(lm=lm)
        ```

    === "Databricks"
        If you're on the Databricks platform, authentication is automatic via their SDK. If not, you can set the env variables `DATABRICKS_API_KEY` and `DATABRICKS_API_BASE`, or pass `api_key` and `api_base` below.

        ```python linenums="1"
        import dspy
        lm = dspy.LM(
            "databricks/databricks-llama-4-maverick",
            api_key="YOUR_DATABRICKS_ACCESS_TOKEN",
            api_base="YOUR_DATABRICKS_WORKSPACE_URL",  # e.g.: https://dbc-64bf4923-e39e.cloud.databricks.com/serving-endpoints
        )
        dspy.configure(lm=lm)
        ```

    === "Gemini"
        You can authenticate by setting the `GEMINI_API_KEY` env variable or passing `api_key` below.

        ```python linenums="1"
        import dspy
        lm = dspy.LM("gemini/gemini-2.5-flash", api_key="YOUR_GEMINI_API_KEY")
        dspy.configure(lm=lm)
        ```

    === "Local LMs on your laptop"
          First, install [Ollama](https://github.com/ollama/ollama) and launch its server with your LM.

          ```bash
          > curl -fsSL https://ollama.ai/install.sh | sh
          > ollama run llama3.2:1b
          ```

          Then, connect to it from your DSPy code.

        ```python linenums="1"
        import dspy
        lm = dspy.LM("ollama_chat/llama3.2:1b", api_base="http://localhost:11434", api_key="")
        dspy.configure(lm=lm)
        ```

    === "Local LMs on a GPU server"
          First, install [SGLang](https://docs.sglang.ai/get_started/install.html) and launch its server with your LM.

          ```bash
          > pip install "sglang[all]"
          > pip install flashinfer -i https://flashinfer.ai/whl/cu121/torch2.4/ 

          > CUDA_VISIBLE_DEVICES=0 python -m sglang.launch_server --port 7501 --model-path meta-llama/Llama-3.1-8B-Instruct
          ```
        
        If you don't have access from Meta to download `meta-llama/Llama-3.1-8B-Instruct`, use `Qwen/Qwen2.5-7B-Instruct` for example.

        Next, connect to your local LM from your DSPy code as an `OpenAI`-compatible endpoint.

          ```python linenums="1"
          lm = dspy.LM("openai/meta-llama/Llama-3.1-8B-Instruct",
                       api_base="http://localhost:7501/v1",  # ensure this points to your port
                       api_key="local", model_type="chat")
          dspy.configure(lm=lm)
          ```

    === "Other providers"
        In DSPy, you can use any of the dozens of [LLM providers supported by LiteLLM](https://docs.litellm.ai/docs/providers). Simply follow their instructions for which `{PROVIDER}_API_KEY` to set and how to write pass the `{provider_name}/{model_name}` to the constructor.

        Some examples:

        - `anyscale/mistralai/Mistral-7B-Instruct-v0.1`, with `ANYSCALE_API_KEY`
        - `together_ai/togethercomputer/llama-2-70b-chat`, with `TOGETHERAI_API_KEY`
        - `sagemaker/<your-endpoint-name>`, with `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_REGION_NAME`
        - `azure/<your_deployment_name>`, with `AZURE_API_KEY`, `AZURE_API_BASE`, `AZURE_API_VERSION`, and the optional `AZURE_AD_TOKEN` and `AZURE_API_TYPE`

        
        If your provider offers an OpenAI-compatible endpoint, just add an `openai/` prefix to your full model name.

        ```python linenums="1"
        import dspy
        lm = dspy.LM("openai/your-model-name", api_key="PROVIDER_API_KEY", api_base="YOUR_PROVIDER_URL")
        dspy.configure(lm=lm)
        ```

??? "Calling the LM directly."

     Idiomatic DSPy involves using _modules_, which we define in the rest of this page. However, it's still easy to call the `lm` you configured above directly. This gives you a unified API and lets you benefit from utilities like automatic caching.

     ```python linenums="1"       
     lm("Say this is a test!", temperature=0.7)  # => ['This is a test!']
     lm(messages=[{"role": "user", "content": "Say this is a test!"}])  # => ['This is a test!']
     ``` 


## 1) **Modules** help you describe AI behavior as _code_, not strings.

To build reliable AI systems, you must iterate fast. But maintaining prompts makes that hard: it forces you to tinker with strings or data _every time you change your LM, metrics, or pipeline_. Having built over a dozen best-in-class compound LM systems since 2020, we learned this the hard way—and so built DSPy to decouple AI system design from messy incidental choices about specific LMs or prompting strategies.

DSPy shifts your focus from tinkering with prompt strings to **programming with structured and declarative natural-language modules**. For every AI component in your system, you specify input/output behavior as a _signature_ and select a _module_ to assign a strategy for invoking your LM. DSPy expands your signatures into prompts and parses your typed outputs, so you can compose different modules together into ergonomic, portable, and optimizable AI systems.


!!! info "Getting Started II: Build DSPy modules for various tasks"
    Try the examples below after configuring your `lm` above. Adjust the fields to explore what tasks your LM can do well out of the box. Each tab below sets up a DSPy module, like `dspy.Predict`, `dspy.ChainOfThought`, or `dspy.ReAct`, with a task-specific _signature_. For example, `question -> answer: float` tells the module to take a question and to produce a `float` answer.

    === "Math"

        ```python linenums="1"
        math = dspy.ChainOfThought("question -> answer: float")
        math(question="Two dice are tossed. What is the probability that the sum equals two?")
        ```
        
        **Possible Output:**
        ```text
        Prediction(
            reasoning='When two dice are tossed, each die has 6 faces, resulting in a total of 6 x 6 = 36 possible outcomes. The sum of the numbers on the two dice equals two only when both dice show a 1. This is just one specific outcome: (1, 1). Therefore, there is only 1 favorable outcome. The probability of the sum being two is the number of favorable outcomes divided by the total number of possible outcomes, which is 1/36.',
            answer=0.0277776
        )
        ```

    === "RAG"

        ```python linenums="1"       
        def search_wikipedia(query: str) -> list[str]:
            results = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")(query, k=3)
            return [x["text"] for x in results]
        
        rag = dspy.ChainOfThought("context, question -> response")

        question = "What's the name of the castle that David Gregory inherited?"
        rag(context=search_wikipedia(question), question=question)
        ```
        
        **Possible Output:**
        ```text
        Prediction(
            reasoning='The context provides information about David Gregory, a Scottish physician and inventor. It specifically mentions that he inherited Kinnairdy Castle in 1664. This detail directly answers the question about the name of the castle that David Gregory inherited.',
            response='Kinnairdy Castle'
        )
        ```

    === "Classification"

        ```python linenums="1"
        from typing import Literal

        class Classify(dspy.Signature):
            """Classify sentiment of a given sentence."""
            
            sentence: str = dspy.InputField()
            sentiment: Literal["positive", "negative", "neutral"] = dspy.OutputField()
            confidence: float = dspy.OutputField()

        classify = dspy.Predict(Classify)
        classify(sentence="This book was super fun to read, though not the last chapter.")
        ```
        
        **Possible Output:**

        ```text
        Prediction(
            sentiment='positive',
            confidence=0.75
        )
        ```

    === "Information Extraction"

        ```python linenums="1"        
        class ExtractInfo(dspy.Signature):
            """Extract structured information from text."""
            
            text: str = dspy.InputField()
            title: str = dspy.OutputField()
            headings: list[str] = dspy.OutputField()
            entities: list[dict[str, str]] = dspy.OutputField(desc="a list of entities and their metadata")
        
        module = dspy.Predict(ExtractInfo)

        text = "Apple Inc. announced its latest iPhone 14 today." \
            "The CEO, Tim Cook, highlighted its new features in a press release."
        response = module(text=text)

        print(response.title)
        print(response.headings)
        print(response.entities)
        ```
        
        **Possible Output:**
        ```text
        Apple Inc. Announces iPhone 14
        ['Introduction', "CEO's Statement", 'New Features']
        [{'name': 'Apple Inc.', 'type': 'Organization'}, {'name': 'iPhone 14', 'type': 'Product'}, {'name': 'Tim Cook', 'type': 'Person'}]
        ```

    === "Agents"

        ```python linenums="1"       
        def evaluate_math(expression: str):
            return dspy.PythonInterpreter({}).execute(expression)

        def search_wikipedia(query: str):
            results = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")(query, k=3)
            return [x["text"] for x in results]

        react = dspy.ReAct("question -> answer: float", tools=[evaluate_math, search_wikipedia])

        pred = react(question="What is 9362158 divided by the year of birth of David Gregory of Kinnairdy castle?")
        print(pred.answer)
        ```
        
        **Possible Output:**

        ```text
        5761.328
        ```
    
    === "Multi-Stage Pipelines"

        ```python linenums="1"       
        class Outline(dspy.Signature):
            """Outline a thorough overview of a topic."""
            
            topic: str = dspy.InputField()
            title: str = dspy.OutputField()
            sections: list[str] = dspy.OutputField()
            section_subheadings: dict[str, list[str]] = dspy.OutputField(desc="mapping from section headings to subheadings")

        class DraftSection(dspy.Signature):
            """Draft a top-level section of an article."""
            
            topic: str = dspy.InputField()
            section_heading: str = dspy.InputField()
            section_subheadings: list[str] = dspy.InputField()
            content: str = dspy.OutputField(desc="markdown-formatted section")

        class DraftArticle(dspy.Module):
            def __init__(self):
                self.build_outline = dspy.ChainOfThought(Outline)
                self.draft_section = dspy.ChainOfThought(DraftSection)

            def forward(self, topic):
                outline = self.build_outline(topic=topic)
                sections = []
                for heading, subheadings in outline.section_subheadings.items():
                    section, subheadings = f"## {heading}", [f"### {subheading}" for subheading in subheadings]
                    section = self.draft_section(topic=outline.title, section_heading=section, section_subheadings=subheadings)
                    sections.append(section.content)
                return dspy.Prediction(title=outline.title, sections=sections)

        draft_article = DraftArticle()
        article = draft_article(topic="World Cup 2002")
        ```
        
        **Possible Output:**

        A 1500-word article on the topic, e.g.

        ```text
        ## Qualification Process

        The qualification process for the 2002 FIFA World Cup involved a series of..... [shortened here for presentation].

        ### UEFA Qualifiers

        The UEFA qualifiers involved 50 teams competing for 13..... [shortened here for presentation].

        .... [rest of the article]
        ```

        Note that DSPy makes it straightforward to optimize multi-stage modules like this. As long as you can evaluate the _final_ output of the system, every DSPy optimizer can tune all of the intermediate modules.

??? "Using DSPy in practice: from quick scripting to building sophisticated systems."

    Standard prompts conflate interface ("what should the LM do?") with implementation ("how do we tell it to do that?"). DSPy isolates the former as _signatures_ so we can infer the latter or learn it from data — in the context of a bigger program.
    
    Even before you start using optimizers, DSPy's modules allow you to script effective LM systems as ergonomic, portable _code_. Across many tasks and LMs, we maintain _signature test suites_ that assess the reliability of the built-in DSPy adapters. Adapters are the components that map signatures to prompts prior to optimization. If you find a task where a simple prompt consistently outperforms idiomatic DSPy for your LM, consider that a bug and [file an issue](https://github.com/stanfordnlp/dspy/issues). We'll use this to improve the built-in adapters.


## 2) **Optimizers** tune the prompts and weights of your AI modules.

DSPy provides you with the tools to compile high-level code with natural language annotations into the low-level computations, prompts, or weight updates that align your LM with your program's structure and metrics. If you change your code or your metrics, you can simply re-compile accordingly.

Given a few tens or hundreds of representative _inputs_ of your task and a _metric_ that can measure the quality of your system's outputs, you can use a DSPy optimizer. Different optimizers in DSPy work by **synthesizing good few-shot examples** for every module, like `dspy.BootstrapRS`,<sup>[1](https://arxiv.org/abs/2310.03714)</sup> **proposing and intelligently exploring better natural-language instructions** for every prompt, like [`dspy.GEPA`](https://dspy.ai/tutorials/gepa_ai_program/)<sup>[2](https://arxiv.org/abs/2507.19457)</sup>, `dspy.MIPROv2`,<sup>[3](https://arxiv.org/abs/2406.11695)</sup> and **building datasets for your modules and using them to finetune the LM weights** in your system, like `dspy.BootstrapFinetune`.<sup>[4](https://arxiv.org/abs/2407.10930)</sup> For detailed tutorials on running `dspy.GEPA`, please take a look at [dspy.GEPA tutorials](https://dspy.ai/tutorials/gepa_ai_program/).


!!! info "Getting Started III: Optimizing the LM prompts or weights in DSPy programs"
    A typical simple optimization run costs on the order of $2 USD and takes around 20 minutes, but be careful when running optimizers with very large LMs or very large datasets.
    Optimization can cost as little as a few cents or up to tens of dollars, depending on your LM, dataset, and configuration.

    Examples below rely on HuggingFace/datasets, you can install it by the command below.

    ```bash
    > pip install -U datasets
    ```

    === "Optimizing prompts for a ReAct agent"
        This is a minimal but fully runnable example of setting up a `dspy.ReAct` agent that answers questions via
        search from Wikipedia and then optimizing it using `dspy.MIPROv2` in the cheap `light` mode on 500
        question-answer pairs sampled from the `HotPotQA` dataset.

        ```python linenums="1"
        import dspy
        from dspy.datasets import HotPotQA

        dspy.configure(lm=dspy.LM("openai/gpt-4o-mini"))

        def search_wikipedia(query: str) -> list[str]:
            results = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")(query, k=3)
            return [x["text"] for x in results]

        trainset = [x.with_inputs('question') for x in HotPotQA(train_seed=2024, train_size=500).train]
        react = dspy.ReAct("question -> answer", tools=[search_wikipedia])

        tp = dspy.MIPROv2(metric=dspy.evaluate.answer_exact_match, auto="light", num_threads=24)
        optimized_react = tp.compile(react, trainset=trainset)
        ```

        An informal run like this raises ReAct's score from 24% to 51%, by teaching `gpt-4o-mini` more about the specifics of the task.

    === "Optimizing prompts for RAG"
        Given a retrieval index to `search`, your favorite `dspy.LM`, and a small `trainset` of questions and ground-truth responses, the following code snippet can optimize your RAG system with long outputs against the built-in `SemanticF1` metric, which is implemented as a DSPy module.

        ```python linenums="1"
        class RAG(dspy.Module):
            def __init__(self, num_docs=5):
                self.num_docs = num_docs
                self.respond = dspy.ChainOfThought("context, question -> response")

            def forward(self, question):
                context = search(question, k=self.num_docs)   # defined in tutorial linked below
                return self.respond(context=context, question=question)

        tp = dspy.MIPROv2(metric=dspy.evaluate.SemanticF1(decompositional=True), auto="medium", num_threads=24)
        optimized_rag = tp.compile(RAG(), trainset=trainset, max_bootstrapped_demos=2, max_labeled_demos=2)
        ```

        For a complete RAG example that you can run, start this [tutorial](tutorials/rag/index.ipynb). It improves the quality of a RAG system over a subset of StackExchange communities by 10% relative gain.

    === "Optimizing weights for Classification"
        <details><summary>Click to show dataset setup code.</summary>

        ```python linenums="1"
        import random
        from typing import Literal

        from datasets import load_dataset

        import dspy
        from dspy.datasets import DataLoader

        # Load the Banking77 dataset.
        CLASSES = load_dataset("PolyAI/banking77", split="train", trust_remote_code=True).features["label"].names
        kwargs = {"fields": ("text", "label"), "input_keys": ("text",), "split": "train", "trust_remote_code": True}

        # Load the first 2000 examples from the dataset, and assign a hint to each *training* example.
        trainset = [
            dspy.Example(x, hint=CLASSES[x.label], label=CLASSES[x.label]).with_inputs("text", "hint")
            for x in DataLoader().from_huggingface(dataset_name="PolyAI/banking77", **kwargs)[:2000]
        ]
        random.Random(0).shuffle(trainset)
        ```
        </details>

        ```python linenums="1"
        import dspy
        lm=dspy.LM('openai/gpt-4o-mini-2024-07-18')

        # Define the DSPy module for classification. It will use the hint at training time, if available.
        signature = dspy.Signature("text, hint -> label").with_updated_fields("label", type_=Literal[tuple(CLASSES)])
        classify = dspy.ChainOfThought(signature)
        classify.set_lm(lm)

        # Optimize via BootstrapFinetune.
        optimizer = dspy.BootstrapFinetune(metric=(lambda x, y, trace=None: x.label == y.label), num_threads=24)
        optimized = optimizer.compile(classify, trainset=trainset)

        optimized(text="What does a pending cash withdrawal mean?")
        
        # For a complete fine-tuning tutorial, see: https://dspy.ai/tutorials/classification_finetuning/
        ```

        **Possible Output (from the last line):**
        ```text
        Prediction(
            reasoning='A pending cash withdrawal indicates that a request to withdraw cash has been initiated but has not yet been completed or processed. This status means that the transaction is still in progress and the funds have not yet been deducted from the account or made available to the user.',
            label='pending_cash_withdrawal'
        )
        ```

        An informal run similar to this on DSPy 2.5.29 raises GPT-4o-mini's score 66% to 87%.


??? "What's an example of a DSPy optimizer? How do different optimizers work?"

    Take the `dspy.MIPROv2` optimizer as an example. First, MIPRO starts with the **bootstrapping stage**. It takes your program, which may be unoptimized at this point, and runs it many times across different inputs to collect traces of input/output behavior for each one of your modules. It filters these traces to keep only those that appear in trajectories scored highly by your metric. Second, MIPRO enters its **grounded proposal stage**. It previews your DSPy program's code, your data, and traces from running your program, and uses them to draft many potential instructions for every prompt in your program. Third, MIPRO launches the **discrete search stage**. It samples mini-batches from your training set, proposes a combination of instructions and traces to use for constructing every prompt in the pipeline, and evaluates the candidate program on the mini-batch. Using the resulting score, MIPRO updates a surrogate model that helps the proposals get better over time.

    One thing that makes DSPy optimizers so powerful is that they can be composed. You can run `dspy.MIPROv2` and use the produced program as an input to `dspy.MIPROv2` again or, say, to `dspy.BootstrapFinetune` to get better results. This is partly the essence of `dspy.BetterTogether`. Alternatively, you can run the optimizer and then extract the top-5 candidate programs and build a `dspy.Ensemble` of them. This allows you to scale _inference-time compute_ (e.g., ensembles) as well as DSPy's unique _pre-inference time compute_ (i.e., optimization budget) in highly systematic ways.



<!-- Future:
BootstrapRS or MIPRO on ??? with a local SGLang LM
BootstrapFS on MATH with a tiny LM like Llama-3.2 with Ollama (maybe with a big teacher) -->



## 3) **DSPy's Ecosystem** advances open-source AI research.

Compared to monolithic LMs, DSPy's modular paradigm enables a large community to improve the compositional architectures, inference-time strategies, and optimizers for LM programs in an open, distributed way. This gives DSPy users more control, helps them iterate much faster, and allows their programs to get better over time by applying the latest optimizers or modules.

The DSPy research effort started at Stanford NLP in Feb 2022, building on what we had learned from developing early [compound LM systems](https://bair.berkeley.edu/blog/2024/02/18/compound-ai-systems/) like [ColBERT-QA](https://arxiv.org/abs/2007.00814), [Baleen](https://arxiv.org/abs/2101.00436), and [Hindsight](https://arxiv.org/abs/2110.07752). The first version was released as [DSP](https://arxiv.org/abs/2212.14024) in Dec 2022 and evolved by Oct 2023 into [DSPy](https://arxiv.org/abs/2310.03714). Thanks to [250 contributors](https://github.com/stanfordnlp/dspy/graphs/contributors), DSPy has introduced tens of thousands of people to building and optimizing modular LM programs.

Since then, DSPy's community has produced a large body of work on optimizers, like [MIPROv2](https://arxiv.org/abs/2406.11695), [BetterTogether](https://arxiv.org/abs/2407.10930), and [LeReT](https://arxiv.org/abs/2410.23214), on program architectures, like [STORM](https://arxiv.org/abs/2402.14207), [IReRa](https://arxiv.org/abs/2401.12178), and [DSPy Assertions](https://arxiv.org/abs/2312.13382), and on successful applications to new problems, like [PAPILLON](https://arxiv.org/abs/2410.17127), [PATH](https://arxiv.org/abs/2406.11706), [WangLab@MEDIQA](https://arxiv.org/abs/2404.14544), [UMD's Prompting Case Study](https://arxiv.org/abs/2406.06608), and [Haize's Red-Teaming Program](https://blog.haizelabs.com/posts/dspy/), in addition to many open-source projects, production applications, and other [use cases](community/use-cases.md).

```

--------------------------------------------------------------------------------
/docs/docs/tutorials/sample_code_generation/index.md:
--------------------------------------------------------------------------------

```markdown
# Automated Code Generation from Documentation with DSPy

This tutorial demonstrates how to use DSPy to automatically fetch documentation from URLs and generate working code examples for any library. The system can analyze documentation websites, extract key concepts, and produce tailored code examples.

## What You'll Build

A documentation-powered code generation system that:

- Fetches and parses documentation from multiple URLs
- Extracts API patterns, methods, and usage examples  
- Generates working code for specific use cases
- Provides explanations and best practices
- Works with any library's documentation

## Setup

```bash
pip install dspy requests beautifulsoup4 html2text
```

## Step 1: Documentation Fetching and Processing

```python
import dspy
import requests
from bs4 import BeautifulSoup
import html2text
from typing import List, Dict, Any
import json
from urllib.parse import urljoin, urlparse
import time

# Configure DSPy
lm = dspy.LM(model='openai/gpt-4o-mini')
dspy.configure(lm=lm)

class DocumentationFetcher:
    """Fetches and processes documentation from URLs."""
    
    def __init__(self, max_retries=3, delay=1):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        self.max_retries = max_retries
        self.delay = delay
        self.html_converter = html2text.HTML2Text()
        self.html_converter.ignore_links = False
        self.html_converter.ignore_images = True
    
    def fetch_url(self, url: str) -> dict[str, str]:
        """Fetch content from a single URL."""
        for attempt in range(self.max_retries):
            try:
                print(f"📡 Fetching: {url} (attempt {attempt + 1})")
                response = self.session.get(url, timeout=10)
                response.raise_for_status()
                
                soup = BeautifulSoup(response.content, 'html.parser')
                
                # Remove script and style elements
                for script in soup(["script", "style", "nav", "footer", "header"]):
                    script.decompose()
                
                # Convert to markdown for better LLM processing
                markdown_content = self.html_converter.handle(str(soup))
                
                return {
                    "url": url,
                    "title": soup.title.string if soup.title else "No title",
                    "content": markdown_content,
                    "success": True
                }
                
            except Exception as e:
                print(f"❌ Error fetching {url}: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.delay)
                else:
                    return {
                        "url": url,
                        "title": "Failed to fetch",
                        "content": f"Error: {str(e)}",
                        "success": False
                    }
        
        return {"url": url, "title": "Failed", "content": "", "success": False}
    
    def fetch_documentation(self, urls: list[str]) -> list[dict[str, str]]:
        """Fetch documentation from multiple URLs."""
        results = []
        
        for url in urls:
            result = self.fetch_url(url)
            results.append(result)
            time.sleep(self.delay)  # Be respectful to servers
        
        return results

class LibraryAnalyzer(dspy.Signature):
    """Analyze library documentation to understand core concepts and patterns."""
    library_name: str = dspy.InputField(desc="Name of the library to analyze")
    documentation_content: str = dspy.InputField(desc="Combined documentation content")
    
    core_concepts: list[str] = dspy.OutputField(desc="Main concepts and components")
    common_patterns: list[str] = dspy.OutputField(desc="Common usage patterns")
    key_methods: list[str] = dspy.OutputField(desc="Important methods and functions")
    installation_info: str = dspy.OutputField(desc="Installation and setup information")
    code_examples: list[str] = dspy.OutputField(desc="Example code snippets found")

class CodeGenerator(dspy.Signature):
    """Generate code examples for specific use cases using the target library."""
    library_info: str = dspy.InputField(desc="Library concepts and patterns")
    use_case: str = dspy.InputField(desc="Specific use case to implement")
    requirements: str = dspy.InputField(desc="Additional requirements or constraints")
    
    code_example: str = dspy.OutputField(desc="Complete, working code example")
    explanation: str = dspy.OutputField(desc="Step-by-step explanation of the code")
    best_practices: list[str] = dspy.OutputField(desc="Best practices and tips")
    imports_needed: list[str] = dspy.OutputField(desc="Required imports and dependencies")

class DocumentationLearningAgent(dspy.Module):
    """Agent that learns from documentation URLs and generates code examples."""
    
    def __init__(self):
        super().__init__()
        self.fetcher = DocumentationFetcher()
        self.analyze_docs = dspy.ChainOfThought(LibraryAnalyzer)
        self.generate_code = dspy.ChainOfThought(CodeGenerator)
        self.refine_code = dspy.ChainOfThought(
            "code, feedback -> improved_code: str, changes_made: list[str]"
        )
    
    def learn_from_urls(self, library_name: str, doc_urls: list[str]) -> Dict:
        """Learn about a library from its documentation URLs."""
        
        print(f"📚 Learning about {library_name} from {len(doc_urls)} URLs...")
        
        # Fetch all documentation
        docs = self.fetcher.fetch_documentation(doc_urls)
        
        # Combine successful fetches
        combined_content = "\n\n---\n\n".join([
            f"URL: {doc['url']}\nTitle: {doc['title']}\n\n{doc['content']}"
            for doc in docs if doc['success']
        ])
        
        if not combined_content:
            raise ValueError("No documentation could be fetched successfully")
        
        # Analyze combined documentation
        analysis = self.analyze_docs(
            library_name=library_name,
            documentation_content=combined_content
        )
        
        return {
            "library": library_name,
            "source_urls": [doc['url'] for doc in docs if doc['success']],
            "core_concepts": analysis.core_concepts,
            "patterns": analysis.common_patterns,
            "methods": analysis.key_methods,
            "installation": analysis.installation_info,
            "examples": analysis.code_examples,
            "fetched_docs": docs
        }
    
    def generate_example(self, library_info: Dict, use_case: str, requirements: str = "") -> Dict:
        """Generate a code example for a specific use case."""
        
        # Format library information for the generator
        info_text = f"""
        Library: {library_info['library']}
        Core Concepts: {', '.join(library_info['core_concepts'])}
        Common Patterns: {', '.join(library_info['patterns'])}
        Key Methods: {', '.join(library_info['methods'])}
        Installation: {library_info['installation']}
        Example Code Snippets: {'; '.join(library_info['examples'][:3])}  # First 3 examples
        """
        
        code_result = self.generate_code(
            library_info=info_text,
            use_case=use_case,
            requirements=requirements
        )
        
        return {
            "code": code_result.code_example,
            "explanation": code_result.explanation,
            "best_practices": code_result.best_practices,
            "imports": code_result.imports_needed
        }

# Initialize the learning agent
agent = DocumentationLearningAgent()
```

## Step 2: Learning from Documentation URLs

```python
def learn_library_from_urls(library_name: str, documentation_urls: list[str]) -> Dict:
    """Learn about any library from its documentation URLs."""
    
    try:
        library_info = agent.learn_from_urls(library_name, documentation_urls)
        
        print(f"\n🔍 Library Analysis Results for {library_name}:")
        print(f"Sources: {len(library_info['source_urls'])} successful fetches")
        print(f"Core Concepts: {library_info['core_concepts']}")
        print(f"Common Patterns: {library_info['patterns']}")
        print(f"Key Methods: {library_info['methods']}")
        print(f"Installation: {library_info['installation']}")
        print(f"Found {len(library_info['examples'])} code examples")
        
        return library_info
        
    except Exception as e:
        print(f"❌ Error learning library: {e}")
        raise

# Example 1: Learn FastAPI from official documentation
fastapi_urls = [
    "https://fastapi.tiangolo.com/",
    "https://fastapi.tiangolo.com/tutorial/first-steps/",
    "https://fastapi.tiangolo.com/tutorial/path-params/",
    "https://fastapi.tiangolo.com/tutorial/query-params/"
]

print("🚀 Learning FastAPI from official documentation...")
fastapi_info = learn_library_from_urls("FastAPI", fastapi_urls)

# Example 2: Learn a different library (you can replace with any library)
streamlit_urls = [
    "https://docs.streamlit.io/",
    "https://docs.streamlit.io/get-started",
    "https://docs.streamlit.io/develop/api-reference"
]

print("\n\n📊 Learning Streamlit from official documentation...")
streamlit_info = learn_library_from_urls("Streamlit", streamlit_urls)
```

## Step 3: Generating Code Examples

```python
def generate_examples_for_library(library_info: Dict, library_name: str):
    """Generate code examples for any library based on its documentation."""
    
    # Define generic use cases that can apply to most libraries
    use_cases = [
        {
            "name": "Basic Setup and Hello World",
            "description": f"Create a minimal working example with {library_name}",
            "requirements": "Include installation, imports, and basic usage"
        },
        {
            "name": "Common Operations",
            "description": f"Demonstrate the most common {library_name} operations",
            "requirements": "Show typical workflow and best practices"
        },
        {
            "name": "Advanced Usage",
            "description": f"Create a more complex example showcasing {library_name} capabilities",
            "requirements": "Include error handling and optimization"
        }
    ]
    
    generated_examples = []
    
    print(f"\n🔧 Generating examples for {library_name}...")
    
    for use_case in use_cases:
        print(f"\n📝 {use_case['name']}")
        print(f"Description: {use_case['description']}")
        
        example = agent.generate_example(
            library_info=library_info,
            use_case=use_case['description'],
            requirements=use_case['requirements']
        )
        
        print("\n💻 Generated Code:")
        print("```python")
        print(example['code'])
        print("```")
        
        print("\n📦 Required Imports:")
        for imp in example['imports']:
            print(f"  • {imp}")
        
        print("\n📝 Explanation:")
        print(example['explanation'])
        
        print("\n✅ Best Practices:")
        for practice in example['best_practices']:
            print(f"  • {practice}")
        
        generated_examples.append({
            "use_case": use_case['name'],
            "code": example['code'],
            "imports": example['imports'],
            "explanation": example['explanation'],
            "best_practices": example['best_practices']
        })
        
        print("-" * 80)
    
    return generated_examples

# Generate examples for both libraries
print("🎯 Generating FastAPI Examples:")
fastapi_examples = generate_examples_for_library(fastapi_info, "FastAPI")

print("\n\n🎯 Generating Streamlit Examples:")
streamlit_examples = generate_examples_for_library(streamlit_info, "Streamlit")
```

## Step 4: Interactive Library Learning Function

```python
def learn_any_library(library_name: str, documentation_urls: list[str], use_cases: list[str] = None):
    """Learn any library from its documentation and generate examples."""
    
    if use_cases is None:
        use_cases = [
            "Basic setup and hello world example",
            "Common operations and workflows",
            "Advanced usage with best practices"
        ]
    
    print(f"🚀 Starting automated learning for {library_name}...")
    print(f"Documentation sources: {len(documentation_urls)} URLs")
    
    try:
        # Step 1: Learn from documentation
        library_info = agent.learn_from_urls(library_name, documentation_urls)
        
        # Step 2: Generate examples for each use case
        all_examples = []
        
        for i, use_case in enumerate(use_cases, 1):
            print(f"\n📝 Generating example {i}/{len(use_cases)}: {use_case}")
            
            example = agent.generate_example(
                library_info=library_info,
                use_case=use_case,
                requirements="Include error handling, comments, and follow best practices"
            )
            
            all_examples.append({
                "use_case": use_case,
                "code": example['code'],
                "imports": example['imports'],
                "explanation": example['explanation'],
                "best_practices": example['best_practices']
            })
        
        return {
            "library_info": library_info,
            "examples": all_examples
        }
    
    except Exception as e:
        print(f"❌ Error learning {library_name}: {e}")
        return None

def interactive_learning_session():
    """Interactive session for learning libraries with user input."""
    
    print("🎯 Welcome to the Interactive Library Learning System!")
    print("This system will help you learn any Python library from its documentation.\n")
    
    learned_libraries = {}
    
    while True:
        print("\n" + "="*60)
        print("🚀 LIBRARY LEARNING SESSION")
        print("="*60)
        
        # Get library name from user
        library_name = input("\n📚 Enter the library name you want to learn (or 'quit' to exit): ").strip()
        
        if library_name.lower() in ['quit', 'exit', 'q']:
            print("\n👋 Thanks for using the Interactive Library Learning System!")
            break
        
        if not library_name:
            print("❌ Please enter a valid library name.")
            continue
        
        # Get documentation URLs
        print(f"\n🔗 Enter documentation URLs for {library_name} (one per line, empty line to finish):")
        urls = []
        while True:
            url = input("  URL: ").strip()
            if not url:
                break
            if not url.startswith(('http://', 'https://')):
                print("    ⚠️  Please enter a valid URL starting with http:// or https://")
                continue
            urls.append(url)
        
        if not urls:
            print("❌ No valid URLs provided. Skipping this library.")
            continue
        
        # Get custom use cases from user
        print(f"\n🎯 Define use cases for {library_name} (optional, press Enter for defaults):")
        print("   Default use cases will be: Basic setup, Common operations, Advanced usage")
        
        user_wants_custom = input("   Do you want to define custom use cases? (y/n): ").strip().lower()
        
        use_cases = None
        if user_wants_custom in ['y', 'yes']:
            print("   Enter your use cases (one per line, empty line to finish):")
            use_cases = []
            while True:
                use_case = input("     Use case: ").strip()
                if not use_case:
                    break
                use_cases.append(use_case)
            
            if not use_cases:
                print("   No custom use cases provided, using defaults.")
                use_cases = None
        
        # Learn the library
        print(f"\n🚀 Starting learning process for {library_name}...")
        result = learn_any_library(library_name, urls, use_cases)
        
        if result:
            learned_libraries[library_name] = result
            print(f"\n✅ Successfully learned {library_name}!")
            
            # Show summary
            print(f"\n📊 Learning Summary for {library_name}:")
            print(f"   • Core concepts: {len(result['library_info']['core_concepts'])} identified")
            print(f"   • Common patterns: {len(result['library_info']['patterns'])} found")
            print(f"   • Examples generated: {len(result['examples'])}")
            
            # Ask if user wants to see examples
            show_examples = input(f"\n👀 Do you want to see the generated examples for {library_name}? (y/n): ").strip().lower()
            
            if show_examples in ['y', 'yes']:
                for i, example in enumerate(result['examples'], 1):
                    print(f"\n{'─'*50}")
                    print(f"📝 Example {i}: {example['use_case']}")
                    print(f"{'─'*50}")
                    
                    print("\n💻 Generated Code:")
                    print("```python")
                    print(example['code'])
                    print("```")
                    
                    print(f"\n📦 Required Imports:")
                    for imp in example['imports']:
                        print(f"  • {imp}")
                    
                    print(f"\n📝 Explanation:")
                    print(example['explanation'])
                    
                    print(f"\n✅ Best Practices:")
                    for practice in example['best_practices']:
                        print(f"  • {practice}")
                    
                    # Ask if user wants to see the next example
                    if i < len(result['examples']):
                        continue_viewing = input(f"\nContinue to next example? (y/n): ").strip().lower()
                        if continue_viewing not in ['y', 'yes']:
                            break
            
            # Offer to save results
            save_results = input(f"\n💾 Save learning results for {library_name} to file? (y/n): ").strip().lower()
            
            if save_results in ['y', 'yes']:
                filename = input(f"   Enter filename (default: {library_name.lower()}_learning.json): ").strip()
                if not filename:
                    filename = f"{library_name.lower()}_learning.json"
                
                try:
                    import json
                    with open(filename, 'w') as f:
                        json.dump(result, f, indent=2, default=str)
                    print(f"   ✅ Results saved to {filename}")
                except Exception as e:
                    print(f"   ❌ Error saving file: {e}")
        
        else:
            print(f"❌ Failed to learn {library_name}")
        
        # Ask if user wants to learn another library
        print(f"\n📚 Libraries learned so far: {list(learned_libraries.keys())}")
        continue_learning = input("\n🔄 Do you want to learn another library? (y/n): ").strip().lower()
        
        if continue_learning not in ['y', 'yes']:
            break
    
    # Final summary
    if learned_libraries:
        print(f"\n🎉 Session Summary:")
        print(f"Successfully learned {len(learned_libraries)} libraries:")
        for lib_name, info in learned_libraries.items():
            print(f"  • {lib_name}: {len(info['examples'])} examples generated")
    
    return learned_libraries

# Example: Run interactive learning session
if __name__ == "__main__":
    # Run interactive session
    learned_libraries = interactive_learning_session()
```

## Example Output

When you run the interactive learning system, you'll see:

**Interactive Session Start:**
```
🎯 Welcome to the Interactive Library Learning System!
This system will help you learn any Python library from its documentation.

============================================================
🚀 LIBRARY LEARNING SESSION
============================================================

📚 Enter the library name you want to learn (or 'quit' to exit): FastAPI

🔗 Enter documentation URLs for FastAPI (one per line, empty line to finish):
  URL: https://fastapi.tiangolo.com/
  URL: https://fastapi.tiangolo.com/tutorial/first-steps/
  URL: https://fastapi.tiangolo.com/tutorial/path-params/
  URL: 

🎯 Define use cases for FastAPI (optional, press Enter for defaults):
   Default use cases will be: Basic setup, Common operations, Advanced usage
   Do you want to define custom use cases? (y/n): y
   Enter your use cases (one per line, empty line to finish):
     Use case: Create a REST API with authentication
     Use case: Build a file upload endpoint
     Use case: Add database integration with SQLAlchemy
     Use case: 
```

**Documentation Processing:**
```
🚀 Starting learning process for FastAPI...
🚀 Starting automated learning for FastAPI...
Documentation sources: 3 URLs
📡 Fetching: https://fastapi.tiangolo.com/ (attempt 1)
📡 Fetching: https://fastapi.tiangolo.com/tutorial/first-steps/ (attempt 1)
📡 Fetching: https://fastapi.tiangolo.com/tutorial/path-params/ (attempt 1)
📚 Learning about FastAPI from 3 URLs...

🔍 Library Analysis Results for FastAPI:
Sources: 3 successful fetches
Core Concepts: ['FastAPI app', 'path operations', 'dependencies', 'request/response models']
Common Patterns: ['app = FastAPI()', 'decorator-based routing', 'Pydantic models']
Key Methods: ['FastAPI()', '@app.get()', '@app.post()', 'uvicorn.run()']
Installation: pip install fastapi uvicorn
```

**Code Generation:**
```
📝 Generating example 1/3: Create a REST API with authentication

✅ Successfully learned FastAPI!

📊 Learning Summary for FastAPI:
   • Core concepts: 4 identified
   • Common patterns: 3 found
   • Examples generated: 3

👀 Do you want to see the generated examples for FastAPI? (y/n): y

──────────────────────────────────────────────────
📝 Example 1: Create a REST API with authentication
──────────────────────────────────────────────────

💻 Generated Code:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import uvicorn
from typing import Dict
import jwt
from datetime import datetime, timedelta

app = FastAPI(title="Authenticated API", version="1.0.0")
security = HTTPBearer()

# Secret key for JWT (use environment variable in production)
SECRET_KEY = "your-secret-key-here"
ALGORITHM = "HS256"

def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise HTTPException(status_code=401, detail="Invalid token")
        return username
    except jwt.PyJWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

@app.post("/login")
async def login(username: str, password: str) -> dict[str, str]:
    # In production, verify against database
    if username == "admin" and password == "secret":
        token_data = {"sub": username, "exp": datetime.utcnow() + timedelta(hours=24)}
        token = jwt.encode(token_data, SECRET_KEY, algorithm=ALGORITHM)
        return {"access_token": token, "token_type": "bearer"}
    raise HTTPException(status_code=401, detail="Invalid credentials")

@app.get("/protected")
async def protected_route(current_user: str = Depends(verify_token)) -> dict[str, str]:
    return {"message": f"Hello {current_user}! This is a protected route."}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

📦 Required Imports:
  • pip install fastapi uvicorn python-jose[cryptography]
  • from fastapi import FastAPI, Depends, HTTPException, status
  • from fastapi.security import HTTPBearer
  • import jwt

📝 Explanation:
This example creates a FastAPI application with JWT-based authentication. It includes a login endpoint that returns a JWT token and a protected route that requires authentication...

✅ Best Practices:
  • Use environment variables for secret keys
  • Implement proper password hashing in production
  • Add token expiration and refresh logic
  • Include proper error handling

Continue to next example? (y/n): n

💾 Save learning results for FastAPI to file? (y/n): y
   Enter filename (default: fastapi_learning.json): 
   ✅ Results saved to fastapi_learning.json

📚 Libraries learned so far: ['FastAPI']

🔄 Do you want to learn another library? (y/n): n

🎉 Session Summary:
Successfully learned 1 libraries:
  • FastAPI: 3 examples generated
```


## Next Steps

- **GitHub Integration**: Learn from README files and example repositories
- **Video Tutorial Processing**: Extract information from video documentation
- **Community Examples**: Aggregate examples from Stack Overflow and forums
- **Version Comparison**: Track API changes across library versions
- **Testing Generation**: Automatically create unit tests for generated code
- **Page Crawling**: Automatically crawl documentation pages to actively understand the usage

This tutorial demonstrates how DSPy can automate the entire process of learning unfamiliar libraries from their documentation, making it valuable for rapid technology adoption and exploration.

```
Page 10/14FirstPrevNextLast