This is page 10 of 14. Use http://codebase.md/stanfordnlp/dspy?page={x} to view the full context. # Directory Structure ``` ├── .github │ ├── .internal_dspyai │ │ ├── internals │ │ │ ├── build-and-release.md │ │ │ └── release-checklist.md │ │ └── pyproject.toml │ ├── .tmp │ │ └── .generated-actions │ │ └── run-pypi-publish-in-docker-container │ │ └── action.yml │ ├── ISSUE_TEMPLATE │ │ ├── bug_report.yml │ │ └── feature_request.yml │ ├── PULL_REQUEST_TEMPLATE │ │ └── pull_request_template.md │ ├── workflow_scripts │ │ └── install_testpypi_pkg.sh │ └── workflows │ ├── build_and_release.yml │ ├── build_utils │ │ └── test_version.py │ ├── docs-push.yml │ ├── precommits_check.yml │ └── run_tests.yml ├── .gitignore ├── .pre-commit-config.yaml ├── CONTRIBUTING.md ├── docs │ ├── .gitignore │ ├── docs │ │ ├── api │ │ │ ├── adapters │ │ │ │ ├── Adapter.md │ │ │ │ ├── ChatAdapter.md │ │ │ │ ├── JSONAdapter.md │ │ │ │ └── TwoStepAdapter.md │ │ │ ├── evaluation │ │ │ │ ├── answer_exact_match.md │ │ │ │ ├── answer_passage_match.md │ │ │ │ ├── CompleteAndGrounded.md │ │ │ │ ├── Evaluate.md │ │ │ │ ├── EvaluationResult.md │ │ │ │ └── SemanticF1.md │ │ │ ├── experimental │ │ │ │ ├── Citations.md │ │ │ │ └── Document.md │ │ │ ├── index.md │ │ │ ├── models │ │ │ │ ├── Embedder.md │ │ │ │ └── LM.md │ │ │ ├── modules │ │ │ │ ├── BestOfN.md │ │ │ │ ├── ChainOfThought.md │ │ │ │ ├── CodeAct.md │ │ │ │ ├── Module.md │ │ │ │ ├── MultiChainComparison.md │ │ │ │ ├── Parallel.md │ │ │ │ ├── Predict.md │ │ │ │ ├── ProgramOfThought.md │ │ │ │ ├── ReAct.md │ │ │ │ └── Refine.md │ │ │ ├── optimizers │ │ │ │ ├── BetterTogether.md │ │ │ │ ├── BootstrapFewShot.md │ │ │ │ ├── BootstrapFewShotWithRandomSearch.md │ │ │ │ ├── BootstrapFinetune.md │ │ │ │ ├── BootstrapRS.md │ │ │ │ ├── COPRO.md │ │ │ │ ├── Ensemble.md │ │ │ │ ├── GEPA │ │ │ │ │ ├── GEPA_Advanced.md │ │ │ │ │ └── overview.md │ │ │ │ ├── InferRules.md │ │ │ │ ├── KNN.md │ │ │ │ ├── KNNFewShot.md │ │ │ │ ├── LabeledFewShot.md │ │ │ │ ├── MIPROv2.md │ │ │ │ └── SIMBA.md │ │ │ ├── primitives │ │ │ │ ├── Audio.md │ │ │ │ ├── Code.md │ │ │ │ ├── Example.md │ │ │ │ ├── History.md │ │ │ │ ├── Image.md │ │ │ │ ├── Prediction.md │ │ │ │ ├── Tool.md │ │ │ │ └── ToolCalls.md │ │ │ ├── signatures │ │ │ │ ├── InputField.md │ │ │ │ ├── OutputField.md │ │ │ │ └── Signature.md │ │ │ ├── tools │ │ │ │ ├── ColBERTv2.md │ │ │ │ ├── Embeddings.md │ │ │ │ └── PythonInterpreter.md │ │ │ └── utils │ │ │ ├── asyncify.md │ │ │ ├── configure_cache.md │ │ │ ├── disable_litellm_logging.md │ │ │ ├── disable_logging.md │ │ │ ├── enable_litellm_logging.md │ │ │ ├── enable_logging.md │ │ │ ├── inspect_history.md │ │ │ ├── load.md │ │ │ ├── StatusMessage.md │ │ │ ├── StatusMessageProvider.md │ │ │ ├── streamify.md │ │ │ └── StreamListener.md │ │ ├── cheatsheet.md │ │ ├── community │ │ │ ├── community-resources.md │ │ │ ├── how-to-contribute.md │ │ │ └── use-cases.md │ │ ├── deep-dive │ │ │ └── data-handling │ │ │ ├── built-in-datasets.md │ │ │ ├── examples.md │ │ │ ├── img │ │ │ │ └── data-loading.png │ │ │ └── loading-custom-data.md │ │ ├── faqs.md │ │ ├── index.md │ │ ├── js │ │ │ └── runllm-widget.js │ │ ├── learn │ │ │ ├── evaluation │ │ │ │ ├── data.md │ │ │ │ ├── metrics.md │ │ │ │ └── overview.md │ │ │ ├── figures │ │ │ │ ├── native_tool_call.png │ │ │ │ └── teleprompter-classes.png │ │ │ ├── index.md │ │ │ ├── optimization │ │ │ │ ├── optimizers.md │ │ │ │ └── overview.md │ │ │ └── programming │ │ │ ├── 7-assertions.md │ │ │ ├── adapters.md │ │ │ ├── language_models.md │ │ │ ├── mcp.md │ │ │ ├── modules.md │ │ │ ├── overview.md │ │ │ ├── signatures.md │ │ │ └── tools.md │ │ ├── production │ │ │ └── index.md │ │ ├── roadmap.md │ │ ├── static │ │ │ ├── .nojekyll │ │ │ └── img │ │ │ ├── dspy_logo.png │ │ │ ├── logo.png │ │ │ ├── mlflow-tracing-rag.png │ │ │ ├── modular.png │ │ │ ├── optimize.png │ │ │ ├── undraw_docusaurus_mountain.svg │ │ │ ├── undraw_docusaurus_react.svg │ │ │ ├── undraw_docusaurus_tree.svg │ │ │ └── universal_compatibility.png │ │ ├── stylesheets │ │ │ └── extra.css │ │ └── tutorials │ │ ├── agents │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── ai_text_game │ │ │ └── index.md │ │ ├── async │ │ │ └── index.md │ │ ├── audio │ │ │ └── index.ipynb │ │ ├── build_ai_program │ │ │ └── index.md │ │ ├── cache │ │ │ └── index.md │ │ ├── classification │ │ │ └── index.md │ │ ├── classification_finetuning │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-classification.png │ │ ├── conversation_history │ │ │ └── index.md │ │ ├── core_development │ │ │ └── index.md │ │ ├── custom_module │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-custom-module.png │ │ ├── customer_service_agent │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-customer-service-agent.png │ │ ├── deployment │ │ │ ├── dspy_mlflow_ui.png │ │ │ └── index.md │ │ ├── email_extraction │ │ │ ├── index.md │ │ │ └── mlflow-tracing-email-extraction.png │ │ ├── entity_extraction │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-entity-extraction.png │ │ ├── games │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-agent.png │ │ ├── gepa_ai_program │ │ │ └── index.md │ │ ├── gepa_aime │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-aime.png │ │ │ └── mlflow-tracking-gepa-aime-optimization.png │ │ ├── gepa_facilitysupportanalyzer │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-support.png │ │ │ └── mlflow-tracking-gepa-support-optimization.png │ │ ├── gepa_papillon │ │ │ ├── index.ipynb │ │ │ ├── mlflow-tracing-gepa-papilon.png │ │ │ └── mlflow-tracking-gepa-papilon-optimization.png │ │ ├── image_generation_prompting │ │ │ └── index.ipynb │ │ ├── index.md │ │ ├── llms_txt_generation │ │ │ └── index.md │ │ ├── math │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-math.png │ │ ├── mcp │ │ │ └── index.md │ │ ├── mem0_react_agent │ │ │ └── index.md │ │ ├── multihop_search │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-multi-hop.png │ │ ├── observability │ │ │ ├── index.md │ │ │ ├── mlflow_trace_ui_navigation.gif │ │ │ ├── mlflow_trace_ui.png │ │ │ └── mlflow_trace_view.png │ │ ├── optimize_ai_program │ │ │ └── index.md │ │ ├── optimizer_tracking │ │ │ ├── child_run.png │ │ │ ├── experiment.png │ │ │ ├── index.md │ │ │ └── parent_run.png │ │ ├── output_refinement │ │ │ └── best-of-n-and-refine.md │ │ ├── papillon │ │ │ └── index.md │ │ ├── program_of_thought │ │ │ └── index.ipynb │ │ ├── rag │ │ │ ├── index.ipynb │ │ │ └── mlflow-tracing-rag.png │ │ ├── real_world_examples │ │ │ └── index.md │ │ ├── rl_ai_program │ │ │ └── index.md │ │ ├── rl_multihop │ │ │ └── index.ipynb │ │ ├── rl_papillon │ │ │ └── index.ipynb │ │ ├── sample_code_generation │ │ │ └── index.md │ │ ├── saving │ │ │ └── index.md │ │ ├── streaming │ │ │ └── index.md │ │ ├── tool_use │ │ │ └── index.ipynb │ │ └── yahoo_finance_react │ │ └── index.md │ ├── mkdocs.yml │ ├── overrides │ │ ├── home.html │ │ ├── main.html │ │ └── partials │ │ └── tabs.html │ ├── Pipfile │ ├── Pipfile.lock │ ├── README.md │ ├── requirements.txt │ ├── scripts │ │ ├── generate_api_docs.py │ │ └── generate_api_summary.py │ └── vercel.json ├── dspy │ ├── __init__.py │ ├── __metadata__.py │ ├── adapters │ │ ├── __init__.py │ │ ├── baml_adapter.py │ │ ├── base.py │ │ ├── chat_adapter.py │ │ ├── json_adapter.py │ │ ├── two_step_adapter.py │ │ ├── types │ │ │ ├── __init__.py │ │ │ ├── audio.py │ │ │ ├── base_type.py │ │ │ ├── citation.py │ │ │ ├── code.py │ │ │ ├── document.py │ │ │ ├── history.py │ │ │ ├── image.py │ │ │ └── tool.py │ │ ├── utils.py │ │ └── xml_adapter.py │ ├── clients │ │ ├── __init__.py │ │ ├── base_lm.py │ │ ├── cache.py │ │ ├── databricks.py │ │ ├── embedding.py │ │ ├── lm_local_arbor.py │ │ ├── lm_local.py │ │ ├── lm.py │ │ ├── openai.py │ │ ├── provider.py │ │ └── utils_finetune.py │ ├── datasets │ │ ├── __init__.py │ │ ├── alfworld │ │ │ ├── __init__.py │ │ │ ├── alfworld.py │ │ │ └── base_config.yml │ │ ├── colors.py │ │ ├── dataloader.py │ │ ├── dataset.py │ │ ├── gsm8k.py │ │ ├── hotpotqa.py │ │ └── math.py │ ├── dsp │ │ ├── __init__.py │ │ ├── colbertv2.py │ │ └── utils │ │ ├── __init__.py │ │ ├── dpr.py │ │ ├── settings.py │ │ └── utils.py │ ├── evaluate │ │ ├── __init__.py │ │ ├── auto_evaluation.py │ │ ├── evaluate.py │ │ └── metrics.py │ ├── experimental │ │ └── __init__.py │ ├── predict │ │ ├── __init__.py │ │ ├── aggregation.py │ │ ├── avatar │ │ │ ├── __init__.py │ │ │ ├── avatar.py │ │ │ ├── models.py │ │ │ └── signatures.py │ │ ├── best_of_n.py │ │ ├── chain_of_thought.py │ │ ├── code_act.py │ │ ├── knn.py │ │ ├── multi_chain_comparison.py │ │ ├── parallel.py │ │ ├── parameter.py │ │ ├── predict.py │ │ ├── program_of_thought.py │ │ ├── react.py │ │ ├── refine.py │ │ └── retry.py │ ├── primitives │ │ ├── __init__.py │ │ ├── base_module.py │ │ ├── example.py │ │ ├── module.py │ │ ├── prediction.py │ │ ├── python_interpreter.py │ │ └── runner.js │ ├── propose │ │ ├── __init__.py │ │ ├── dataset_summary_generator.py │ │ ├── grounded_proposer.py │ │ ├── propose_base.py │ │ └── utils.py │ ├── retrievers │ │ ├── __init__.py │ │ ├── databricks_rm.py │ │ ├── embeddings.py │ │ ├── retrieve.py │ │ └── weaviate_rm.py │ ├── signatures │ │ ├── __init__.py │ │ ├── field.py │ │ ├── signature.py │ │ └── utils.py │ ├── streaming │ │ ├── __init__.py │ │ ├── messages.py │ │ ├── streamify.py │ │ └── streaming_listener.py │ ├── teleprompt │ │ ├── __init__.py │ │ ├── avatar_optimizer.py │ │ ├── bettertogether.py │ │ ├── bootstrap_finetune.py │ │ ├── bootstrap_trace.py │ │ ├── bootstrap.py │ │ ├── copro_optimizer.py │ │ ├── ensemble.py │ │ ├── gepa │ │ │ ├── __init__.py │ │ │ ├── gepa_utils.py │ │ │ ├── gepa.py │ │ │ └── instruction_proposal.py │ │ ├── grpo.py │ │ ├── infer_rules.py │ │ ├── knn_fewshot.py │ │ ├── mipro_optimizer_v2.py │ │ ├── random_search.py │ │ ├── signature_opt.py │ │ ├── simba_utils.py │ │ ├── simba.py │ │ ├── teleprompt_optuna.py │ │ ├── teleprompt.py │ │ ├── utils.py │ │ └── vanilla.py │ └── utils │ ├── __init__.py │ ├── annotation.py │ ├── asyncify.py │ ├── caching.py │ ├── callback.py │ ├── dummies.py │ ├── exceptions.py │ ├── hasher.py │ ├── inspect_history.py │ ├── langchain_tool.py │ ├── logging_utils.py │ ├── mcp.py │ ├── parallelizer.py │ ├── saving.py │ ├── syncify.py │ ├── unbatchify.py │ └── usage_tracker.py ├── LICENSE ├── pyproject.toml ├── README.md ├── tests │ ├── __init__.py │ ├── adapters │ │ ├── test_adapter_utils.py │ │ ├── test_baml_adapter.py │ │ ├── test_base_type.py │ │ ├── test_chat_adapter.py │ │ ├── test_citation.py │ │ ├── test_code.py │ │ ├── test_document.py │ │ ├── test_json_adapter.py │ │ ├── test_tool.py │ │ ├── test_two_step_adapter.py │ │ └── test_xml_adapter.py │ ├── callback │ │ └── test_callback.py │ ├── clients │ │ ├── test_cache.py │ │ ├── test_databricks.py │ │ ├── test_embedding.py │ │ ├── test_inspect_global_history.py │ │ └── test_lm.py │ ├── conftest.py │ ├── datasets │ │ └── test_dataset.py │ ├── docs │ │ └── test_mkdocs_links.py │ ├── evaluate │ │ ├── test_evaluate.py │ │ └── test_metrics.py │ ├── examples │ │ └── test_baleen.py │ ├── metadata │ │ └── test_metadata.py │ ├── predict │ │ ├── test_aggregation.py │ │ ├── test_best_of_n.py │ │ ├── test_chain_of_thought.py │ │ ├── test_code_act.py │ │ ├── test_knn.py │ │ ├── test_multi_chain_comparison.py │ │ ├── test_parallel.py │ │ ├── test_predict.py │ │ ├── test_program_of_thought.py │ │ ├── test_react.py │ │ ├── test_refine.py │ │ └── test_retry.py │ ├── primitives │ │ ├── resources │ │ │ └── saved_program.json │ │ ├── test_base_module.py │ │ ├── test_example.py │ │ ├── test_module.py │ │ └── test_python_interpreter.py │ ├── propose │ │ └── test_grounded_proposer.py │ ├── README.md │ ├── reliability │ │ ├── __init__.py │ │ ├── complex_types │ │ │ └── generated │ │ │ ├── test_many_types_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ ├── test_nesting_1 │ │ │ │ ├── inputs │ │ │ │ │ ├── input1.json │ │ │ │ │ └── input2.json │ │ │ │ ├── program.py │ │ │ │ └── schema.json │ │ │ └── test_nesting_2 │ │ │ ├── inputs │ │ │ │ └── input1.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── conftest.py │ │ ├── generate │ │ │ ├── __init__.py │ │ │ ├── __main__.py │ │ │ └── utils.py │ │ ├── input_formats │ │ │ └── generated │ │ │ └── test_markdown_1 │ │ │ ├── inputs │ │ │ │ ├── input1.json │ │ │ │ └── input2.json │ │ │ ├── program.py │ │ │ └── schema.json │ │ ├── README.md │ │ ├── reliability_conf.yaml │ │ ├── test_generated.py │ │ ├── test_pydantic_models.py │ │ └── utils.py │ ├── retrievers │ │ └── test_embeddings.py │ ├── signatures │ │ ├── test_adapter_image.py │ │ ├── test_custom_types.py │ │ └── test_signature.py │ ├── streaming │ │ └── test_streaming.py │ ├── teleprompt │ │ ├── gepa_dummy_lm_custom_component_selector_custom_instruction_proposer.json │ │ ├── gepa_dummy_lm.json │ │ ├── test_bootstrap_finetune.py │ │ ├── test_bootstrap_trace.py │ │ ├── test_bootstrap.py │ │ ├── test_copro_optimizer.py │ │ ├── test_ensemble.py │ │ ├── test_finetune.py │ │ ├── test_gepa_instruction_proposer.py │ │ ├── test_gepa.py │ │ ├── test_grpo.py │ │ ├── test_knn_fewshot.py │ │ ├── test_random_search.py │ │ ├── test_teleprompt.py │ │ └── test_utils.py │ ├── test_utils │ │ ├── __init__.py │ │ └── server │ │ ├── __init__.py │ │ ├── litellm_server_config.yaml │ │ └── litellm_server.py │ └── utils │ ├── __init__.py │ ├── resources │ │ └── mcp_server.py │ ├── test_annotation.py │ ├── test_asyncify.py │ ├── test_exceptions.py │ ├── test_langchain_tool.py │ ├── test_mcp.py │ ├── test_parallelizer.py │ ├── test_saving.py │ ├── test_settings.py │ ├── test_syncify.py │ ├── test_unbatchify.py │ └── test_usage_tracker.py └── uv.lock ``` # Files -------------------------------------------------------------------------------- /tests/primitives/test_base_module.py: -------------------------------------------------------------------------------- ```python import asyncio import logging import os import threading from unittest.mock import patch import pytest from litellm import Choices, Message, ModelResponse from litellm.types.utils import Usage import dspy from dspy.primitives.prediction import Prediction from dspy.utils.dummies import DummyLM def test_deepcopy_basic(): signature = dspy.Signature("q -> a") cot = dspy.ChainOfThought(signature) cot_copy = cot.deepcopy() assert len(cot.parameters()) == len(cot_copy.parameters()) # Parameters should be different objects with the same values. assert id(cot.parameters()[0]) != id(cot_copy.parameters()[0]) assert cot.parameters()[0].__dict__ == cot_copy.parameters()[0].__dict__ def test_deepcopy_with_uncopyable_modules(): class CustomClass(dspy.Module): def __init__(self): self.lock = threading.Lock() # Non-copyable object. self.cot = dspy.ChainOfThought(dspy.Signature("q -> a")) model = CustomClass() model_copy = model.deepcopy() assert len(model.parameters()) == len(model_copy.parameters()) # The lock should be refer to the same object (shallow copy). assert id(model.lock) == id(model_copy.lock) # Parameters should be different objects with the same values. assert id(model.parameters()[0]) != id(model_copy.parameters()[0]) assert model.parameters()[0].__dict__ == model_copy.parameters()[0].__dict__ def test_deepcopy_with_nested_modules(): class CustomClass1(dspy.Module): def __init__(self): self.lock = threading.Lock() # Non-copyable object. self.cot = dspy.ChainOfThought(dspy.Signature("q -> a")) class CustomClass2(dspy.Module): def __init__(self): self.submodel = CustomClass1() model = CustomClass2() model_copy = model.deepcopy() assert len(model.parameters()) == len(model_copy.parameters()) # The lock should be refer to the same object (shallow copy). assert id(model.submodel.lock) == id(model_copy.submodel.lock) # Parameters should be different objects with the same values. assert id(model.parameters()[0]) != id(model_copy.parameters()[0]) assert model.parameters()[0].__dict__ == model_copy.parameters()[0].__dict__ def test_save_and_load_with_json(tmp_path): model = dspy.ChainOfThought(dspy.Signature("q -> a")) model.predict.signature = model.predict.signature.with_instructions("You are a helpful assistant.") model.predict.demos = [ dspy.Example(q="What is the capital of France?", a="Paris", reasoning="n/a").with_inputs("q"), # Nested example dspy.Example( q=[ dspy.Example(q="What is the capital of France?"), dspy.Example(q="What is actually the capital of France?"), ], a="Paris", reasoning="n/a", ).with_inputs("q"), ] save_path = tmp_path / "model.json" model.save(save_path) new_model = dspy.ChainOfThought(dspy.Signature("q -> a")) new_model.load(save_path) assert str(new_model.predict.signature) == str(model.predict.signature) assert new_model.predict.demos[0] == model.predict.demos[0].toDict() assert new_model.predict.demos[1] == model.predict.demos[1].toDict() @pytest.mark.extra def test_save_and_load_with_pkl(tmp_path): import datetime # `datetime.date` is not json serializable, so we need to save with pickle. class MySignature(dspy.Signature): """Just a custom signature.""" current_date: datetime.date = dspy.InputField() target_date: datetime.date = dspy.InputField() date_diff: int = dspy.OutputField(desc="The difference in days between the current_date and the target_date") trainset = [ {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 2), "date_diff": 1}, {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 3), "date_diff": 2}, {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 4), "date_diff": 3}, {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 5), "date_diff": 4}, {"current_date": datetime.date(2024, 1, 1), "target_date": datetime.date(2024, 1, 6), "date_diff": 5}, ] trainset = [dspy.Example(**example).with_inputs("current_date", "target_date") for example in trainset] dspy.settings.configure( lm=DummyLM([{"date_diff": "1", "reasoning": "n/a"}, {"date_diff": "2", "reasoning": "n/a"}] * 10) ) cot = dspy.ChainOfThought(MySignature) cot(current_date=datetime.date(2024, 1, 1), target_date=datetime.date(2024, 1, 2)) def dummy_metric(example, pred, trace=None): return True optimizer = dspy.BootstrapFewShot(max_bootstrapped_demos=4, max_labeled_demos=4, max_rounds=5, metric=dummy_metric) compiled_cot = optimizer.compile(cot, trainset=trainset) compiled_cot.predict.signature = compiled_cot.predict.signature.with_instructions("You are a helpful assistant.") save_path = tmp_path / "program.pkl" compiled_cot.save(save_path) new_cot = dspy.ChainOfThought(MySignature) new_cot.load(save_path) assert str(new_cot.predict.signature) == str(compiled_cot.predict.signature) assert new_cot.predict.demos == compiled_cot.predict.demos def test_save_with_extra_modules(tmp_path): import sys # Create a temporary Python file with our custom module custom_module_path = tmp_path / "custom_module.py" with open(custom_module_path, "w") as f: f.write(""" import dspy class MyModule(dspy.Module): def __init__(self): self.cot = dspy.ChainOfThought(dspy.Signature("q -> a")) def forward(self, q): return self.cot(q=q) """) # Add the tmp_path to Python path so we can import the module sys.path.insert(0, str(tmp_path)) try: import custom_module cot = custom_module.MyModule() cot.save(tmp_path, save_program=True) # Remove the custom module from sys.modules to simulate it not being available sys.modules.pop("custom_module", None) # Also remove it from sys.path sys.path.remove(str(tmp_path)) del custom_module # Test the loading fails without using `modules_to_serialize` with pytest.raises(ModuleNotFoundError): dspy.load(tmp_path) sys.path.insert(0, str(tmp_path)) import custom_module cot.save( tmp_path, modules_to_serialize=[custom_module], save_program=True, ) # Remove the custom module from sys.modules to simulate it not being available sys.modules.pop("custom_module", None) # Also remove it from sys.path sys.path.remove(str(tmp_path)) del custom_module loaded_module = dspy.load(tmp_path) assert loaded_module.cot.predict.signature == cot.cot.predict.signature finally: # Only need to clean up sys.path if str(tmp_path) in sys.path: sys.path.remove(str(tmp_path)) def test_load_with_version_mismatch(tmp_path): from dspy.primitives.base_module import logger # Mock versions during save save_versions = {"python": "3.9", "dspy": "2.4.0", "cloudpickle": "2.0"} # Mock versions during load load_versions = {"python": "3.10", "dspy": "2.5.0", "cloudpickle": "2.1"} predict = dspy.Predict("question->answer") # Create a custom handler to capture log messages class ListHandler(logging.Handler): def __init__(self): super().__init__() self.messages = [] def emit(self, record): self.messages.append(record.getMessage()) # Add handler and set level handler = ListHandler() original_level = logger.level logger.addHandler(handler) logger.setLevel(logging.WARNING) try: save_path = tmp_path / "program.pkl" # Mock version during save with patch("dspy.primitives.base_module.get_dependency_versions", return_value=save_versions): predict.save(save_path) # Mock version during load with patch("dspy.primitives.base_module.get_dependency_versions", return_value=load_versions): loaded_predict = dspy.Predict("question->answer") loaded_predict.load(save_path) # Assert warnings were logged, and one warning for each mismatched dependency. assert len(handler.messages) == 3 for msg in handler.messages: assert "There is a mismatch of" in msg # Verify the model still loads correctly despite version mismatches assert isinstance(loaded_predict, dspy.Predict) assert str(predict.signature) == str(loaded_predict.signature) finally: # Clean up: restore original level and remove handler logger.setLevel(original_level) logger.removeHandler(handler) @pytest.mark.llm_call def test_single_module_call_with_usage_tracker(lm_for_test): dspy.settings.configure(lm=dspy.LM(lm_for_test, cache=False), track_usage=True) predict = dspy.ChainOfThought("question -> answer") output = predict(question="What is the capital of France?") lm_usage = output.get_lm_usage() assert len(lm_usage) == 1 assert lm_usage[lm_for_test]["prompt_tokens"] > 0 assert lm_usage[lm_for_test]["completion_tokens"] > 0 assert lm_usage[lm_for_test]["total_tokens"] > 0 # Test no usage being tracked when cache is enabled dspy.settings.configure(lm=dspy.LM(lm_for_test, cache=True), track_usage=True) for _ in range(2): output = predict(question="What is the capital of France?") assert len(output.get_lm_usage()) == 0 @pytest.mark.llm_call def test_multi_module_call_with_usage_tracker(lm_for_test): dspy.settings.configure(lm=dspy.LM(lm_for_test, cache=False), track_usage=True) class MyProgram(dspy.Module): def __init__(self): self.predict1 = dspy.ChainOfThought("question -> answer") self.predict2 = dspy.ChainOfThought("question, answer -> score") def __call__(self, question: str) -> Prediction: answer = self.predict1(question=question) score = self.predict2(question=question, answer=answer) return score program = MyProgram() output = program(question="What is the capital of France?") lm_usage = output.get_lm_usage() assert len(lm_usage) == 1 assert lm_usage[lm_for_test]["prompt_tokens"] > 0 assert lm_usage[lm_for_test]["prompt_tokens"] > 0 assert lm_usage[lm_for_test]["completion_tokens"] > 0 assert lm_usage[lm_for_test]["total_tokens"] > 0 # TODO: prepare second model for testing this unit test in ci @pytest.mark.skipif(not os.getenv("OPENAI_API_KEY"), reason="Skip the test if OPENAI_API_KEY is not set.") def test_usage_tracker_in_parallel(): class MyProgram(dspy.Module): def __init__(self, lm): self.lm = lm self.predict1 = dspy.ChainOfThought("question -> answer") self.predict2 = dspy.ChainOfThought("question, answer -> score") def __call__(self, question: str) -> Prediction: with dspy.settings.context(lm=self.lm): answer = self.predict1(question=question) score = self.predict2(question=question, answer=answer) return score dspy.settings.configure(track_usage=True) program1 = MyProgram(lm=dspy.LM("openai/gpt-4o-mini", cache=False)) program2 = MyProgram(lm=dspy.LM("openai/gpt-3.5-turbo", cache=False)) parallelizer = dspy.Parallel() results = parallelizer( [ (program1, {"question": "What is the meaning of life?"}), (program2, {"question": "why did a chicken cross the kitchen?"}), ] ) assert results[0].get_lm_usage() is not None assert results[1].get_lm_usage() is not None assert results[0].get_lm_usage().keys() == set(["openai/gpt-4o-mini"]) assert results[1].get_lm_usage().keys() == set(["openai/gpt-3.5-turbo"]) @pytest.mark.asyncio async def test_usage_tracker_async_parallel(): program = dspy.Predict("question -> answer") with patch("litellm.acompletion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="{'answer': 'Paris'}"))], usage=Usage( **{ "prompt_tokens": 1117, "completion_tokens": 46, "total_tokens": 1163, "prompt_tokens_details": {"cached_tokens": 0, "audio_tokens": 0}, "completion_tokens_details": { "reasoning_tokens": 0, "audio_tokens": 0, "accepted_prediction_tokens": 0, "rejected_prediction_tokens": 0, }, }, ), model="openai/gpt-4o-mini", ) coroutines = [ program.acall(question="What is the capital of France?"), program.acall(question="What is the capital of France?"), program.acall(question="What is the capital of France?"), program.acall(question="What is the capital of France?"), ] with dspy.settings.context( lm=dspy.LM("openai/gpt-4o-mini", cache=False), track_usage=True, adapter=dspy.JSONAdapter() ): results = await asyncio.gather(*coroutines) assert results[0].get_lm_usage() is not None assert results[1].get_lm_usage() is not None lm_usage0 = results[0].get_lm_usage()["openai/gpt-4o-mini"] lm_usage1 = results[1].get_lm_usage()["openai/gpt-4o-mini"] assert lm_usage0["prompt_tokens"] == 1117 assert lm_usage1["prompt_tokens"] == 1117 assert lm_usage0["completion_tokens"] == 46 assert lm_usage1["completion_tokens"] == 46 assert lm_usage0["total_tokens"] == 1163 assert lm_usage1["total_tokens"] == 1163 def test_usage_tracker_no_side_effect(): class MyProgram(dspy.Module): def __init__(self): self.predict = dspy.Predict("question -> answer") def forward(self, question: str, **kwargs) -> str: return self.predict(question=question).answer program = MyProgram() with dspy.context(lm=DummyLM([{"answer": "Paris"}]), track_usage=True): result = program(question="What is the capital of France?") assert result == "Paris" def test_module_history(): class MyProgram(dspy.Module): def __init__(self, **kwargs): super().__init__(**kwargs) self.cot = dspy.ChainOfThought("question -> answer") def forward(self, question: str, **kwargs) -> Prediction: return self.cot(question=question) with patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[ Choices(message=Message(content="{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}")) ], model="openai/gpt-4o-mini", ) dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()) program = MyProgram() program(question="What is the capital of France?") # Second call only call the submodule. program.cot(question="What is the capital of France?") # The LM history entity exists in all the ancestor callers. assert len(program.history) == 1 assert len(program.cot.history) == 2 assert len(program.cot.predict.history) == 2 # The same history entity is shared across all the ancestor callers to reduce memory usage. assert id(program.history[0]) == id(program.cot.history[0]) assert program.history[0]["outputs"] == ["{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}"] dspy.settings.configure(disable_history=True) program(question="What is the capital of France?") # No history is recorded when history is disabled. assert len(program.history) == 1 assert len(program.cot.history) == 2 assert len(program.cot.predict.history) == 2 dspy.settings.configure(disable_history=False) program(question="What is the capital of France?") # History is recorded again when history is enabled. assert len(program.history) == 2 assert len(program.cot.history) == 3 assert len(program.cot.predict.history) == 3 def test_module_history_with_concurrency(): class MyProgram(dspy.Module): def __init__(self): super().__init__() self.cot = dspy.ChainOfThought("question -> answer") def forward(self, question: str, **kwargs) -> Prediction: return self.cot(question=question) with patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="{'reasoning': 'N/A', 'answer': 'Holy crab!'}"))], model="openai/gpt-4o-mini", ) dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()) program = MyProgram() parallelizer = dspy.Parallel() parallelizer( [ (program, {"question": "What is the meaning of life?"}), (program, {"question": "why did a chicken cross the kitchen?"}), ] ) assert len(program.history) == 2 assert len(program.cot.history) == 2 assert len(program.cot.predict.history) == 2 @pytest.mark.asyncio async def test_module_history_async(): class MyProgram(dspy.Module): def __init__(self, **kwargs): super().__init__(**kwargs) self.cot = dspy.ChainOfThought("question -> answer") async def aforward(self, question: str, **kwargs) -> Prediction: return await self.cot.acall(question=question) with patch("litellm.acompletion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[ Choices(message=Message(content="{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}")) ], model="openai/gpt-4o-mini", ) program = MyProgram() with dspy.context(lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter()): await program.acall(question="What is the capital of France?") # Second call only call the submodule. await program.cot.acall(question="What is the capital of France?") # The LM history entity exists in all the ancestor callers. assert len(program.history) == 1 assert len(program.cot.history) == 2 assert len(program.cot.predict.history) == 2 # The same history entity is shared across all the ancestor callers to reduce memory usage. assert id(program.history[0]) == id(program.cot.history[0]) assert program.history[0]["outputs"] == ["{'reasoning': 'Paris is the capital of France', 'answer': 'Paris'}"] with dspy.context( disable_history=True, lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter() ): await program.acall(question="What is the capital of France?") # No history is recorded when history is disabled. assert len(program.history) == 1 assert len(program.cot.history) == 2 assert len(program.cot.predict.history) == 2 with dspy.context( disable_history=False, lm=dspy.LM("openai/gpt-4o-mini", cache=False), adapter=dspy.JSONAdapter() ): await program.acall(question="What is the capital of France?") # History is recorded again when history is enabled. assert len(program.history) == 2 assert len(program.cot.history) == 3 assert len(program.cot.predict.history) == 3 def test_forward_direct_call_warning(capsys): class TestModule(dspy.Module): def forward(self, x): return x module = TestModule() module.forward("test") captured = capsys.readouterr() assert "directly is discouraged" in captured.err def test_forward_through_call_no_warning(capsys): class TestModule(dspy.Module): def forward(self, x): return x module = TestModule() module(x="test") captured = capsys.readouterr() assert "directly is discouraged" not in captured.err ``` -------------------------------------------------------------------------------- /dspy/adapters/base.py: -------------------------------------------------------------------------------- ```python import logging from typing import TYPE_CHECKING, Any, get_origin import json_repair import litellm from dspy.adapters.types import History, Type from dspy.adapters.types.base_type import split_message_content_for_custom_types from dspy.adapters.types.tool import Tool, ToolCalls from dspy.experimental import Citations from dspy.signatures.signature import Signature from dspy.utils.callback import BaseCallback, with_callbacks logger = logging.getLogger(__name__) if TYPE_CHECKING: from dspy.clients.lm import LM _DEFAULT_NATIVE_RESPONSE_TYPES = [Citations] class Adapter: """Base Adapter class. The Adapter serves as the interface layer between DSPy module/signature and Language Models (LMs). It handles the complete transformation pipeline from DSPy inputs to LM calls and back to structured outputs. Key responsibilities: - Transform user inputs and signatures into properly formatted LM prompts, which also instructs the LM to format the response in a specific format. - Parse LM outputs into dictionaries matching the signature's output fields. - Enable/disable native LM features (function calling, citations, etc.) based on configuration. - Handle conversation history, few-shot examples, and custom type processing. The adapter pattern allows DSPy to work with different LM interfaces while maintaining a consistent programming model for users. """ def __init__( self, callbacks: list[BaseCallback] | None = None, use_native_function_calling: bool = False, native_response_types: list[type[Type]] | None = None, ): """ Args: callbacks: List of callback functions to execute during `format()` and `parse()` methods. Callbacks can be used for logging, monitoring, or custom processing. Defaults to None (empty list). use_native_function_calling: Whether to enable native function calling capabilities when the LM supports it. If True, the adapter will automatically configure function calling when input fields contain `dspy.Tool` or `list[dspy.Tool]` types. Defaults to False. native_response_types: List of output field types that should be handled by native LM features rather than adapter parsing. For example, `dspy.Citations` can be populated directly by citation APIs (e.g., Anthropic's citation feature). Defaults to `[Citations]`. """ self.callbacks = callbacks or [] self.use_native_function_calling = use_native_function_calling self.native_response_types = native_response_types or _DEFAULT_NATIVE_RESPONSE_TYPES def __init_subclass__(cls, **kwargs) -> None: super().__init_subclass__(**kwargs) # Decorate format() and parse() method with with_callbacks cls.format = with_callbacks(cls.format) cls.parse = with_callbacks(cls.parse) def _call_preprocess( self, lm: "LM", lm_kwargs: dict[str, Any], signature: type[Signature], inputs: dict[str, Any], ) -> type[Signature]: if self.use_native_function_calling: tool_call_input_field_name = self._get_tool_call_input_field_name(signature) tool_call_output_field_name = self._get_tool_call_output_field_name(signature) if tool_call_output_field_name and tool_call_input_field_name is None: raise ValueError( f"You provided an output field {tool_call_output_field_name} to receive the tool calls information, " "but did not provide any tools as the input. Please provide a list of tools as the input by adding an " "input field with type `list[dspy.Tool]`." ) if tool_call_output_field_name and litellm.supports_function_calling(model=lm.model): tools = inputs[tool_call_input_field_name] tools = tools if isinstance(tools, list) else [tools] litellm_tools = [] for tool in tools: litellm_tools.append(tool.format_as_litellm_function_call()) lm_kwargs["tools"] = litellm_tools signature_for_native_function_calling = signature.delete(tool_call_output_field_name) signature_for_native_function_calling = signature_for_native_function_calling.delete( tool_call_input_field_name ) return signature_for_native_function_calling # Handle custom types that use native response for name, field in signature.output_fields.items(): if ( isinstance(field.annotation, type) and issubclass(field.annotation, Type) and field.annotation in self.native_response_types ): signature = signature.delete(name) return signature def _call_postprocess( self, processed_signature: type[Signature], original_signature: type[Signature], outputs: list[dict[str, Any]], lm: "LM", ) -> list[dict[str, Any]]: values = [] tool_call_output_field_name = self._get_tool_call_output_field_name(original_signature) for output in outputs: output_logprobs = None tool_calls = None text = output if isinstance(output, dict): text = output["text"] output_logprobs = output.get("logprobs") tool_calls = output.get("tool_calls") if text: value = self.parse(processed_signature, text) for field_name in original_signature.output_fields.keys(): if field_name not in value: # We need to set the field not present in the processed signature to None for consistency. value[field_name] = None else: value = {} for field_name in original_signature.output_fields.keys(): value[field_name] = None if tool_calls and tool_call_output_field_name: tool_calls = [ { "name": v["function"]["name"], "args": json_repair.loads(v["function"]["arguments"]), } for v in tool_calls ] value[tool_call_output_field_name] = ToolCalls.from_dict_list(tool_calls) # Parse custom types that does not rely on the adapter parsing for name, field in original_signature.output_fields.items(): if ( isinstance(field.annotation, type) and issubclass(field.annotation, Type) and field.annotation in self.native_response_types ): value[name] = field.annotation.parse_lm_response(output) if output_logprobs: value["logprobs"] = output_logprobs values.append(value) return values def __call__( self, lm: "LM", lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: """ Execute the adapter pipeline: format inputs, call LM, and parse outputs. Args: lm: The Language Model instance to use for generation. Must be an instance of `dspy.BaseLM`. lm_kwargs: Additional keyword arguments to pass to the LM call (e.g., temperature, max_tokens). These are passed directly to the LM. signature: The DSPy signature associated with this LM call. demos: List of few-shot examples to include in the prompt. Each dictionary should contain keys matching the signature's input and output field names. Examples are formatted as user/assistant message pairs. inputs: The current input values for this call. Keys must match the signature's input field names. Returns: List of dictionaries representing parsed LM responses. Each dictionary contains keys matching the signature's output field names. For multiple generations (n > 1), returns multiple dictionaries. """ processed_signature = self._call_preprocess(lm, lm_kwargs, signature, inputs) inputs = self.format(processed_signature, demos, inputs) outputs = lm(messages=inputs, **lm_kwargs) return self._call_postprocess(processed_signature, signature, outputs, lm) async def acall( self, lm: "LM", lm_kwargs: dict[str, Any], signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: processed_signature = self._call_preprocess(lm, lm_kwargs, signature, inputs) inputs = self.format(processed_signature, demos, inputs) outputs = await lm.acall(messages=inputs, **lm_kwargs) return self._call_postprocess(processed_signature, signature, outputs, lm) def format( self, signature: type[Signature], demos: list[dict[str, Any]], inputs: dict[str, Any], ) -> list[dict[str, Any]]: """Format the input messages for the LM call. This method converts the DSPy structured input along with few-shot examples and conversation history into multiturn messages as expected by the LM. For custom adapters, this method can be overridden to customize the formatting of the input messages. In general we recommend the messages to have the following structure: ``` [ {"role": "system", "content": system_message}, # Begin few-shot examples {"role": "user", "content": few_shot_example_1_input}, {"role": "assistant", "content": few_shot_example_1_output}, {"role": "user", "content": few_shot_example_2_input}, {"role": "assistant", "content": few_shot_example_2_output}, ... # End few-shot examples # Begin conversation history {"role": "user", "content": conversation_history_1_input}, {"role": "assistant", "content": conversation_history_1_output}, {"role": "user", "content": conversation_history_2_input}, {"role": "assistant", "content": conversation_history_2_output}, ... # End conversation history {"role": "user", "content": current_input}, ] And system message should contain the field description, field structure, and task description. ``` Args: signature: The DSPy signature for which to format the input messages. demos: A list of few-shot examples. inputs: The input arguments to the DSPy module. Returns: A list of multiturn messages as expected by the LM. """ inputs_copy = dict(inputs) # If the signature and inputs have conversation history, we need to format the conversation history and # remove the history field from the signature. history_field_name = self._get_history_field_name(signature) if history_field_name: # In order to format the conversation history, we need to remove the history field from the signature. signature_without_history = signature.delete(history_field_name) conversation_history = self.format_conversation_history( signature_without_history, history_field_name, inputs_copy, ) messages = [] system_message = ( f"{self.format_field_description(signature)}\n" f"{self.format_field_structure(signature)}\n" f"{self.format_task_description(signature)}" ) messages.append({"role": "system", "content": system_message}) messages.extend(self.format_demos(signature, demos)) if history_field_name: # Conversation history and current input content = self.format_user_message_content(signature_without_history, inputs_copy, main_request=True) messages.extend(conversation_history) messages.append({"role": "user", "content": content}) else: # Only current input content = self.format_user_message_content(signature, inputs_copy, main_request=True) messages.append({"role": "user", "content": content}) messages = split_message_content_for_custom_types(messages) return messages def format_field_description(self, signature: type[Signature]) -> str: """Format the field description for the system message. This method formats the field description for the system message. It should return a string that contains the field description for the input fields and the output fields. Args: signature: The DSPy signature for which to format the field description. Returns: A string that contains the field description for the input fields and the output fields. """ raise NotImplementedError def format_field_structure(self, signature: type[Signature]) -> str: """Format the field structure for the system message. This method formats the field structure for the system message. It should return a string that dictates the format the input fields should be provided to the LM, and the format the output fields will be in the response. Refer to the ChatAdapter and JsonAdapter for an example. Args: signature: The DSPy signature for which to format the field structure. """ raise NotImplementedError def format_task_description(self, signature: type[Signature]) -> str: """Format the task description for the system message. This method formats the task description for the system message. In most cases this is just a thin wrapper over `signature.instructions`. Args: signature: The DSPy signature of the DSpy module. Returns: A string that describes the task. """ raise NotImplementedError def format_user_message_content( self, signature: type[Signature], inputs: dict[str, Any], prefix: str = "", suffix: str = "", main_request: bool = False, ) -> str: """Format the user message content. This method formats the user message content, which can be used in formatting few-shot examples, conversation history, and the current input. Args: signature: The DSPy signature for which to format the user message content. inputs: The input arguments to the DSPy module. prefix: A prefix to the user message content. suffix: A suffix to the user message content. Returns: A string that contains the user message content. """ raise NotImplementedError def format_assistant_message_content( self, signature: type[Signature], outputs: dict[str, Any], missing_field_message: str | None = None, ) -> str: """Format the assistant message content. This method formats the assistant message content, which can be used in formatting few-shot examples, conversation history. Args: signature: The DSPy signature for which to format the assistant message content. outputs: The output fields to be formatted. missing_field_message: A message to be used when a field is missing. Returns: A string that contains the assistant message content. """ raise NotImplementedError def format_demos(self, signature: type[Signature], demos: list[dict[str, Any]]) -> list[dict[str, Any]]: """Format the few-shot examples. This method formats the few-shot examples as multiturn messages. Args: signature: The DSPy signature for which to format the few-shot examples. demos: A list of few-shot examples, each element is a dictionary with keys of the input and output fields of the signature. Returns: A list of multiturn messages. """ complete_demos = [] incomplete_demos = [] for demo in demos: # Check if all fields are present and not None is_complete = all(k in demo and demo[k] is not None for k in signature.fields) # Check if demo has at least one input and one output field has_input = any(k in demo for k in signature.input_fields) has_output = any(k in demo for k in signature.output_fields) if is_complete: complete_demos.append(demo) elif has_input and has_output: # We only keep incomplete demos that have at least one input and one output field incomplete_demos.append(demo) messages = [] incomplete_demo_prefix = "This is an example of the task, though some input or output fields are not supplied." for demo in incomplete_demos: messages.append( { "role": "user", "content": self.format_user_message_content(signature, demo, prefix=incomplete_demo_prefix), } ) messages.append( { "role": "assistant", "content": self.format_assistant_message_content( signature, demo, missing_field_message="Not supplied for this particular example. " ), } ) for demo in complete_demos: messages.append({"role": "user", "content": self.format_user_message_content(signature, demo)}) messages.append( { "role": "assistant", "content": self.format_assistant_message_content( signature, demo, missing_field_message="Not supplied for this conversation history message. " ), } ) return messages def _get_history_field_name(self, signature: type[Signature]) -> bool: for name, field in signature.input_fields.items(): if field.annotation == History: return name return None def _get_tool_call_input_field_name(self, signature: type[Signature]) -> bool: for name, field in signature.input_fields.items(): # Look for annotation `list[dspy.Tool]` or `dspy.Tool` origin = get_origin(field.annotation) if origin is list and field.annotation.__args__[0] == Tool: return name if field.annotation == Tool: return name return None def _get_tool_call_output_field_name(self, signature: type[Signature]) -> bool: for name, field in signature.output_fields.items(): if field.annotation == ToolCalls: return name return None def format_conversation_history( self, signature: type[Signature], history_field_name: str, inputs: dict[str, Any], ) -> list[dict[str, Any]]: """Format the conversation history. This method formats the conversation history and the current input as multiturn messages. Args: signature: The DSPy signature for which to format the conversation history. history_field_name: The name of the history field in the signature. inputs: The input arguments to the DSPy module. Returns: A list of multiturn messages. """ conversation_history = inputs[history_field_name].messages if history_field_name in inputs else None if conversation_history is None: return [] messages = [] for message in conversation_history: messages.append( { "role": "user", "content": self.format_user_message_content(signature, message), } ) messages.append( { "role": "assistant", "content": self.format_assistant_message_content(signature, message), } ) # Remove the history field from the inputs del inputs[history_field_name] return messages def parse(self, signature: type[Signature], completion: str) -> dict[str, Any]: """Parse the LM output into a dictionary of the output fields. This method parses the LM output into a dictionary of the output fields. Args: signature: The DSPy signature for which to parse the LM output. completion: The LM output to be parsed. Returns: A dictionary of the output fields. """ raise NotImplementedError ``` -------------------------------------------------------------------------------- /tests/signatures/test_signature.py: -------------------------------------------------------------------------------- ```python from types import UnionType from typing import Any, Optional, Union import pydantic import pytest import dspy from dspy import InputField, OutputField, Signature, infer_prefix from dspy.utils.dummies import DummyLM def test_field_types_and_custom_attributes(): class TestSignature(Signature): """Instructions""" input1: str = InputField() input2: int = InputField() output1: list[str] = OutputField() output2 = OutputField() assert TestSignature.instructions == "Instructions" assert TestSignature.input_fields["input1"].annotation == str assert TestSignature.input_fields["input2"].annotation == int assert TestSignature.output_fields["output1"].annotation == list[str] assert TestSignature.output_fields["output2"].annotation == str def test_no_input_output(): with pytest.raises(TypeError): class TestSignature(Signature): input1: str def test_no_input_output2(): with pytest.raises(TypeError): class TestSignature(Signature): input1: str = pydantic.Field() def test_all_fields_have_prefix(): class TestSignature(Signature): input = InputField(prefix="Modified:") output = OutputField() assert TestSignature.input_fields["input"].json_schema_extra["prefix"] == "Modified:" assert TestSignature.output_fields["output"].json_schema_extra["prefix"] == "Output:" def test_signature_parsing(): signature = Signature("input1, input2 -> output") assert "input1" in signature.input_fields assert "input2" in signature.input_fields assert "output" in signature.output_fields def test_with_signature(): signature1 = Signature("input1, input2 -> output") signature2 = signature1.with_instructions("This is a test") assert signature2.instructions == "This is a test" assert signature1 is not signature2, "The type should be immutable" def test_with_updated_field(): signature1 = Signature("input1, input2 -> output") signature2 = signature1.with_updated_fields("input1", prefix="Modified:") assert signature2.input_fields["input1"].json_schema_extra["prefix"] == "Modified:" assert signature1.input_fields["input1"].json_schema_extra["prefix"] == "Input 1:" assert signature1 is not signature2, "The type should be immutable" for key in signature1.fields.keys(): if key != "input1": assert signature1.fields[key].json_schema_extra == signature2.fields[key].json_schema_extra assert signature1.instructions == signature2.instructions def test_empty_signature(): with pytest.raises(ValueError): Signature("") def test_instructions_signature(): with pytest.raises(ValueError): Signature("") def test_signature_instructions(): sig1 = Signature("input1 -> output1", instructions="This is a test") assert sig1.instructions == "This is a test" sig2 = Signature("input1 -> output1", "This is a test") assert sig2.instructions == "This is a test" def test_signature_instructions_none(): sig1 = Signature("a, b -> c") assert sig1.instructions == "Given the fields `a`, `b`, produce the fields `c`." def test_signature_from_dict(): signature = Signature( {"input1": InputField(), "input2": InputField(), "output": OutputField()}) for k in ["input1", "input2", "output"]: assert k in signature.fields assert signature.fields[k].annotation == str def test_signature_equality(): sig1 = Signature("input1 -> output1") sig2 = Signature("input1 -> output1") assert sig1.equals(sig2) def test_signature_inequality(): sig1 = Signature("input1 -> output1") sig2 = Signature("input2 -> output2") assert not sig1.equals(sig2) def test_equality_format(): class TestSignature(Signature): input = InputField(format=lambda x: x) output = OutputField() assert TestSignature.equals(TestSignature) def test_signature_reverse(): sig = Signature("input1 -> output1") assert sig.signature == "input1 -> output1" def test_insert_field_at_various_positions(): class InitialSignature(Signature): input1: str = InputField() output1: int = OutputField() s1 = InitialSignature.prepend("new_input_start", InputField(), str) s2 = InitialSignature.append("new_input_end", InputField(), str) assert "new_input_start" == list(s1.input_fields.keys())[0] # noqa: RUF015 assert "new_input_end" == list(s2.input_fields.keys())[-1] s3 = InitialSignature.prepend("new_output_start", OutputField(), str) s4 = InitialSignature.append("new_output_end", OutputField(), str) assert "new_output_start" == list(s3.output_fields.keys())[0] # noqa: RUF015 assert "new_output_end" == list(s4.output_fields.keys())[-1] def test_order_preserved_with_mixed_annotations(): class ExampleSignature(dspy.Signature): text: str = dspy.InputField() output = dspy.OutputField() pass_evaluation: bool = dspy.OutputField() expected_order = ["text", "output", "pass_evaluation"] actual_order = list(ExampleSignature.fields.keys()) assert actual_order == expected_order def test_infer_prefix(): assert infer_prefix( "someAttributeName42IsCool") == "Some Attribute Name 42 Is Cool" assert infer_prefix("version2Update") == "Version 2 Update" assert infer_prefix("modelT45Enhanced") == "Model T 45 Enhanced" assert infer_prefix("someAttributeName") == "Some Attribute Name" assert infer_prefix("some_attribute_name") == "Some Attribute Name" assert infer_prefix("URLAddress") == "URL Address" assert infer_prefix("isHTTPSecure") == "Is HTTP Secure" assert infer_prefix("isHTTPSSecure123") == "Is HTTPS Secure 123" def test_insantiating(): sig = Signature("input -> output") assert issubclass(sig, Signature) assert sig.__name__ == "StringSignature" value = sig(input="test", output="test") assert isinstance(value, sig) def test_insantiating2(): class SubSignature(Signature): input = InputField() output = OutputField() assert issubclass(SubSignature, Signature) assert SubSignature.__name__ == "SubSignature" value = SubSignature(input="test", output="test") assert isinstance(value, SubSignature) def test_multiline_instructions(): lm = DummyLM([{"output": "short answer"}]) dspy.settings.configure(lm=lm) class MySignature(Signature): """First line Second line Third line""" output = OutputField() predictor = dspy.Predict(MySignature) assert predictor().output == "short answer" def test_dump_and_load_state(): class CustomSignature(dspy.Signature): """I am just an instruction.""" sentence = dspy.InputField(desc="I am an innocent input!") sentiment = dspy.OutputField() state = CustomSignature.dump_state() expected = { "instructions": "I am just an instruction.", "fields": [ { "prefix": "Sentence:", "description": "I am an innocent input!", }, { "prefix": "Sentiment:", "description": "${sentiment}", }, ], } assert state == expected class CustomSignature2(dspy.Signature): """I am a malicious instruction.""" sentence = dspy.InputField(desc="I am an malicious input!") sentiment = dspy.OutputField() assert CustomSignature2.dump_state() != expected # Overwrite the state with the state of CustomSignature. loaded_signature = CustomSignature2.load_state(state) assert loaded_signature.instructions == "I am just an instruction." # After `load_state`, the state should be the same as CustomSignature. assert loaded_signature.dump_state() == expected # CustomSignature2 should not have been modified. assert CustomSignature2.instructions == "I am a malicious instruction." assert CustomSignature2.fields["sentence"].json_schema_extra["desc"] == "I am an malicious input!" assert CustomSignature2.fields["sentiment"].json_schema_extra["prefix"] == "Sentiment:" def test_typed_signatures_basic_types(): sig = Signature("input1: int, input2: str -> output: float") assert "input1" in sig.input_fields assert sig.input_fields["input1"].annotation == int assert "input2" in sig.input_fields assert sig.input_fields["input2"].annotation == str assert "output" in sig.output_fields assert sig.output_fields["output"].annotation == float def test_typed_signatures_generics(): sig = Signature( "input_list: list[int], input_dict: dict[str, float] -> output_tuple: tuple[str, int]") assert "input_list" in sig.input_fields assert sig.input_fields["input_list"].annotation == list[int] assert "input_dict" in sig.input_fields assert sig.input_fields["input_dict"].annotation == dict[str, float] assert "output_tuple" in sig.output_fields assert sig.output_fields["output_tuple"].annotation == tuple[str, int] def test_typed_signatures_unions_and_optionals(): sig = Signature( "input_opt: Optional[str], input_union: Union[int, None] -> output_union: Union[int, str]") assert "input_opt" in sig.input_fields # Optional[str] is actually Union[str, None] # Depending on the environment, it might resolve to Union[str, None] or Optional[str], either is correct. # We'll just check for a Union containing str and NoneType: input_opt_annotation = sig.input_fields["input_opt"].annotation assert input_opt_annotation == Optional[str] or ( getattr(input_opt_annotation, "__origin__", None) is Union and str in input_opt_annotation.__args__ and type(None) in input_opt_annotation.__args__ ) assert "input_union" in sig.input_fields input_union_annotation = sig.input_fields["input_union"].annotation assert ( getattr(input_union_annotation, "__origin__", None) is Union and int in input_union_annotation.__args__ and type(None) in input_union_annotation.__args__ ) assert "output_union" in sig.output_fields output_union_annotation = sig.output_fields["output_union"].annotation assert ( getattr(output_union_annotation, "__origin__", None) is Union and int in output_union_annotation.__args__ and str in output_union_annotation.__args__ ) def test_typed_signatures_any(): sig = Signature("input_any: Any -> output_any: Any") assert "input_any" in sig.input_fields assert sig.input_fields["input_any"].annotation == Any assert "output_any" in sig.output_fields assert sig.output_fields["output_any"].annotation == Any def test_typed_signatures_nested(): sig = Signature( "input_nested: list[Union[str, int]] -> output_nested: Tuple[int, Optional[float], list[str]]") input_nested_ann = sig.input_fields["input_nested"].annotation assert getattr(input_nested_ann, "__origin__", None) is list assert len(input_nested_ann.__args__) == 1 union_arg = input_nested_ann.__args__[0] assert getattr(union_arg, "__origin__", None) is Union assert str in union_arg.__args__ and int in union_arg.__args__ output_nested_ann = sig.output_fields["output_nested"].annotation assert getattr(output_nested_ann, "__origin__", None) is tuple assert output_nested_ann.__args__[0] == int # The second arg is Optional[float], which is Union[float, None] second_arg = output_nested_ann.__args__[1] assert getattr(second_arg, "__origin__", None) is Union assert float in second_arg.__args__ and type(None) in second_arg.__args__ # The third arg is list[str] third_arg = output_nested_ann.__args__[2] assert getattr(third_arg, "__origin__", None) is list assert third_arg.__args__[0] == str def test_typed_signatures_from_dict(): fields = { "input_str_list": (list[str], InputField()), "input_dict_int": (dict[str, int], InputField()), "output_tup": (tuple[int, float], OutputField()), } sig = Signature(fields) assert "input_str_list" in sig.input_fields assert sig.input_fields["input_str_list"].annotation == list[str] assert "input_dict_int" in sig.input_fields assert sig.input_fields["input_dict_int"].annotation == dict[str, int] assert "output_tup" in sig.output_fields assert sig.output_fields["output_tup"].annotation == tuple[int, float] def test_typed_signatures_complex_combinations(): sig = Signature( "input_complex: dict[str, list[Optional[Tuple[int, str]]]] -> output_complex: Union[list[str], dict[str, Any]]" ) input_complex_ann = sig.input_fields["input_complex"].annotation assert getattr(input_complex_ann, "__origin__", None) is dict key_arg, value_arg = input_complex_ann.__args__ assert key_arg == str # value_arg: list[Optional[Tuple[int, str]]] assert getattr(value_arg, "__origin__", None) is list inner_union = value_arg.__args__[0] # inner_union should be Optional[Tuple[int, str]] # which is Union[Tuple[int, str], None] assert getattr(inner_union, "__origin__", None) is Union tuple_type = [t for t in inner_union.__args__ if t != type(None)][0] # noqa: RUF015 assert getattr(tuple_type, "__origin__", None) is tuple assert tuple_type.__args__ == (int, str) output_complex_ann = sig.output_fields["output_complex"].annotation assert getattr(output_complex_ann, "__origin__", None) is Union assert len(output_complex_ann.__args__) == 2 possible_args = set(output_complex_ann.__args__) # Expecting list[str] and dict[str, Any] # Because sets don't preserve order, just check membership. # Find the list[str] arg list_arg = next(a for a in possible_args if getattr( a, "__origin__", None) is list) dict_arg = next(a for a in possible_args if getattr( a, "__origin__", None) is dict) assert list_arg.__args__ == (str,) k, v = dict_arg.__args__ assert k == str and v == Any def test_make_signature_from_string(): sig = Signature( "input1: int, input2: dict[str, int] -> output1: list[str], output2: Union[int, str]") assert "input1" in sig.input_fields assert sig.input_fields["input1"].annotation == int assert "input2" in sig.input_fields assert sig.input_fields["input2"].annotation == dict[str, int] assert "output1" in sig.output_fields assert sig.output_fields["output1"].annotation == list[str] assert "output2" in sig.output_fields assert sig.output_fields["output2"].annotation == Union[int, str] def test_signature_field_with_constraints(): class MySignature(Signature): inputs: str = InputField() outputs1: str = OutputField(min_length=5, max_length=10) outputs2: int = OutputField(ge=5, le=10) assert "outputs1" in MySignature.output_fields output1_constraints = MySignature.output_fields["outputs1"].json_schema_extra["constraints"] assert "minimum length: 5" in output1_constraints assert "maximum length: 10" in output1_constraints assert "outputs2" in MySignature.output_fields output2_constraints = MySignature.output_fields["outputs2"].json_schema_extra["constraints"] assert "greater than or equal to: 5" in output2_constraints assert "less than or equal to: 10" in output2_constraints def test_basic_custom_type(): class CustomType(pydantic.BaseModel): value: str test_signature = dspy.Signature( "input: CustomType -> output: str", custom_types={"CustomType": CustomType} ) assert test_signature.input_fields["input"].annotation == CustomType lm = DummyLM([{"output": "processed"}]) dspy.settings.configure(lm=lm) custom_obj = CustomType(value="test") pred = dspy.Predict(test_signature)(input=custom_obj) assert pred.output == "processed" def test_custom_type_from_different_module(): from pathlib import Path test_signature = dspy.Signature("input: Path -> output: str") assert test_signature.input_fields["input"].annotation == Path lm = DummyLM([{"output": "/test/path"}]) dspy.settings.configure(lm=lm) path_obj = Path("/test/path") pred = dspy.Predict(test_signature)(input=path_obj) assert pred.output == "/test/path" def test_pep604_union_type_inline(): sig = Signature( "input1: str | None, input2: None | int -> output_union: int | str" ) # input1 and input2 test that both 'T | None' and 'None | T' are interpreted as Optional types, # regardless of the order of None in the union expression. assert "input1" in sig.input_fields input1_annotation = sig.input_fields["input1"].annotation assert input1_annotation == Optional[str] or ( getattr(input1_annotation, "__origin__", None) is Union and str in input1_annotation.__args__ and type(None) in input1_annotation.__args__ ) assert "input2" in sig.input_fields input2_annotation = sig.input_fields["input2"].annotation assert input2_annotation == Optional[int] or ( getattr(input2_annotation, "__origin__", None) is Union and int in input2_annotation.__args__ and type(None) in input2_annotation.__args__ ) assert "output_union" in sig.output_fields output_union_annotation = sig.output_fields["output_union"].annotation assert ( getattr(output_union_annotation, "__origin__", None) is Union and int in output_union_annotation.__args__ and str in output_union_annotation.__args__ ) def test_pep604_union_type_inline_equivalence(): sig1 = Signature("input: str | None -> output: int | str") sig2 = Signature("input: Optional[str] -> output: Union[int, str]") # PEP 604 union types in inline signatures should be equivalent to Optional and Union types assert sig1.equals(sig2) # Check that the annotations are equivalent assert sig1.input_fields["input"].annotation == sig2.input_fields["input"].annotation assert sig1.output_fields["output"].annotation == sig2.output_fields["output"].annotation def test_pep604_union_type_inline_nested(): sig = Signature( "input: str | (int | float) | None -> output: str" ) assert "input" in sig.input_fields input_annotation = sig.input_fields["input"].annotation # Check for the correct union: Union[str, int, float, NoneType] assert getattr(input_annotation, "__origin__", None) is Union assert set(input_annotation.__args__) == {str, int, float, type(None)} def test_pep604_union_type_class_nested(): class Sig1(Signature): input: str | (int | float) | None = InputField() output: str = OutputField() assert "input" in Sig1.input_fields input_annotation = Sig1.input_fields["input"].annotation # Check for the correct union: UnionType[str, int, float, NoneType] assert isinstance(input_annotation, UnionType) assert set(input_annotation.__args__) == {str, int, float, type(None)} def test_pep604_union_type_class_equivalence(): class Sig1(Signature): input: str | None = InputField() output: int | str = OutputField() class Sig2(Signature): input: str | None = InputField() output: Union[int, str] = OutputField() # noqa: UP007 # PEP 604 union types in class signatures should be equivalent to Optional and Union types assert Sig1.equals(Sig2) # Check that the annotations are equivalent assert Sig1.input_fields["input"].annotation == Sig2.input_fields["input"].annotation assert Sig1.output_fields["output"].annotation == Sig2.output_fields["output"].annotation # Check that the pep604 annotations are of type UnionType assert isinstance(Sig1.input_fields["input"].annotation, UnionType) assert isinstance(Sig1.output_fields["output"].annotation, UnionType) def test_pep604_union_type_insert(): class PEP604Signature(Signature): input: str | None = InputField() output: int | str = OutputField() # This test ensures that inserting a field into a signature with a PEP 604 UnionType works # Insert a new input field at the start NewSig = PEP604Signature.prepend("new_input", InputField(), float | int) assert "new_input" in NewSig.input_fields new_input_annotation = NewSig.input_fields["new_input"].annotation assert isinstance(new_input_annotation, UnionType) assert set(new_input_annotation.__args__) == {float, int} # The original union type field should still be present and correct input_annotation = NewSig.input_fields["input"].annotation output_annotation = NewSig.output_fields["output"].annotation assert isinstance(input_annotation, UnionType) assert str in input_annotation.__args__ and type(None) in input_annotation.__args__ assert isinstance(output_annotation, UnionType) assert set(output_annotation.__args__) == {int, str} def test_pep604_union_type_with_custom_types(): class CustomType(pydantic.BaseModel): value: str sig = Signature( "input: CustomType | None -> output: int | str", custom_types={"CustomType": CustomType} ) assert sig.input_fields["input"].annotation == Union[CustomType, None] assert sig.output_fields["output"].annotation == Union[int, str] lm = DummyLM([{"output": "processed"}]) dspy.settings.configure(lm=lm) custom_obj = CustomType(value="test") pred = dspy.Predict(sig)(input=custom_obj) assert pred.output == "processed" ``` -------------------------------------------------------------------------------- /dspy/clients/lm_local_arbor.py: -------------------------------------------------------------------------------- ```python import json import time from datetime import datetime from typing import TYPE_CHECKING, Any from urllib.parse import urljoin import openai import requests import dspy from dspy.clients.provider import Provider, ReinforceJob, TrainingJob from dspy.clients.utils_finetune import GRPOGroup, GRPOStatus, TrainDataFormat, TrainingStatus, save_data if TYPE_CHECKING: from dspy.clients.lm import LM class ArborTrainingJob(TrainingJob): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.provider_file_id = None self.provider_job_id = None def cancel(self): if ArborProvider.does_job_exist(self.provider_job_id): status = self.status() if ArborProvider.is_terminal_training_status(status): err_msg = "Jobs that are complete cannot be canceled." err_msg += f" Job with ID {self.provider_job_id} is done." raise Exception(err_msg) openai.fine_tuning.jobs.cancel(self.provider_job_id) self.provider_job_id = None if self.provider_file_id is not None: if ArborProvider.does_file_exist(self.provider_file_id): openai.files.delete(self.provider_file_id) self.provider_file_id = None super().cancel() def status(self) -> TrainingStatus: status = ArborProvider.get_training_status(self.provider_job_id) return status class ArborReinforceJob(ReinforceJob): DEFAULT_TRAIN_KWARGS = { # noqa: RUF012 "temperature": 1.0, "beta": 0.04, "num_iterations": 1, "per_device_train_batch_size": 8, "learning_rate": 1e-6, "gradient_accumulation_steps": 1, # This is false by default in TRL, but I think it makes sense to be true for us "gradient_checkpointing": True, "lr_scheduler_type": "constant_with_warmup", "warmup_steps": 10, "max_prompt_length": None, "max_completion_length": None, "gradient_checkpointing_kwargs": None, "bf16": False, "scale_rewards": True, "max_grad_norm": 1.0, "report_to": "none", "log_completions": True, "logging_steps": 10, # By default, none is the model's max context length "max_context_length": None, "lora": False, } def __init__(self, lm: "LM", train_kwargs: dict[str, Any]): # The teleprompter must ensure that this is set if "num_generations" not in train_kwargs: raise ValueError("num_generations must be set in the training kwargs") self.lm = lm self.train_kwargs = train_kwargs self.provider_job_id = None self.checkpoints = {} self.last_checkpoint = None def initialize(self): # TODO(GRPO Team): Set provider job ID num_generations = self.train_kwargs.get("num_generations") temperature = self.train_kwargs.get("temperature", self.DEFAULT_TRAIN_KWARGS["temperature"]) beta = self.train_kwargs.get("beta", self.DEFAULT_TRAIN_KWARGS["beta"]) num_iterations = self.train_kwargs.get("num_iterations", self.DEFAULT_TRAIN_KWARGS["num_iterations"]) per_device_train_batch_size = self.train_kwargs.get( "per_device_train_batch_size", self.DEFAULT_TRAIN_KWARGS["per_device_train_batch_size"] ) learning_rate = self.train_kwargs.get("learning_rate", self.DEFAULT_TRAIN_KWARGS["learning_rate"]) gradient_accumulation_steps = self.train_kwargs.get( "gradient_accumulation_steps", self.DEFAULT_TRAIN_KWARGS["gradient_accumulation_steps"] ) gradient_checkpointing = self.train_kwargs.get( "gradient_checkpointing", self.DEFAULT_TRAIN_KWARGS["gradient_checkpointing"] ) lr_scheduler_type = self.train_kwargs.get("lr_scheduler_type", self.DEFAULT_TRAIN_KWARGS["lr_scheduler_type"]) warmup_steps = self.train_kwargs.get("warmup_steps", self.DEFAULT_TRAIN_KWARGS["warmup_steps"]) max_prompt_length = self.train_kwargs.get("max_prompt_length", self.DEFAULT_TRAIN_KWARGS["max_prompt_length"]) max_completion_length = self.train_kwargs.get( "max_completion_length", self.DEFAULT_TRAIN_KWARGS["max_completion_length"] ) bf16 = self.train_kwargs.get("bf16", self.DEFAULT_TRAIN_KWARGS["bf16"]) scale_rewards = self.train_kwargs.get("scale_rewards", self.DEFAULT_TRAIN_KWARGS["scale_rewards"]) gradient_checkpointing_kwargs = self.train_kwargs.get( "gradient_checkpointing_kwargs", self.DEFAULT_TRAIN_KWARGS["gradient_checkpointing_kwargs"] ) max_grad_norm = self.train_kwargs.get("max_grad_norm", self.DEFAULT_TRAIN_KWARGS["max_grad_norm"]) report_to = self.train_kwargs.get("report_to", self.DEFAULT_TRAIN_KWARGS["report_to"]) log_completions = self.train_kwargs.get("log_completions", self.DEFAULT_TRAIN_KWARGS["log_completions"]) logging_steps = self.train_kwargs.get("logging_steps", self.DEFAULT_TRAIN_KWARGS["logging_steps"]) max_context_length = self.train_kwargs.get( "max_context_length", self.DEFAULT_TRAIN_KWARGS["max_context_length"] ) max_steps = self.train_kwargs.get("max_steps",500) # lora = self.train_kwargs.get("lora", self.DEFAULT_TRAIN_KWARGS["lora"]) api_base = self.lm.kwargs["api_base"] finetune_model = ArborProvider._remove_provider_prefix(self.lm.model) data = { "model": finetune_model, "trainer_config": { "num_generations": num_generations, "temperature": temperature, "beta": beta, "per_device_train_batch_size": per_device_train_batch_size, "learning_rate": learning_rate, "gradient_accumulation_steps": gradient_accumulation_steps, "gradient_checkpointing": gradient_checkpointing, "lr_scheduler_type": lr_scheduler_type, "warmup_steps": warmup_steps, "max_prompt_length": max_prompt_length, "max_completion_length": max_completion_length, "bf16": bf16, "scale_rewards": scale_rewards, "gradient_checkpointing_kwargs": gradient_checkpointing_kwargs, "max_grad_norm": max_grad_norm, "report_to": report_to, "log_completions": log_completions, "logging_steps": logging_steps, # "max_context_length": max_context_length, # "max_seq_len": max_context_length, "max_steps": max_steps, # "lora": lora, }, "inference_config": { "model": finetune_model, "max_context_length": max_context_length, }, "gpu_config": { "type": "multi", "multi": { "num_inference_gpus": 1, "num_training_gpus": 1, }, }, } url = f"{api_base}fine_tuning/grpo/initialize" headers = {"Content-Type": "application/json"} response = requests.post(url=url, headers=headers, json=data) print(json.dumps(response.json(), indent=2)) response.raise_for_status() response = response.json() self.lm.model = ArborProvider._add_provider_prefix(response["current_model"]) self.provider_job_id = response.get("job_id") def _run_grpo_step_one_group( self, train_group: GRPOGroup, train_data_format: TrainDataFormat | str | None = None ): # TODO: Check that the data follows the intended format api_base = self.lm.kwargs["api_base"] # api_key = self.lm.kwargs["api_key"] finetune_model = ArborProvider._remove_provider_prefix(self.lm.model) data = {"job_id": self.provider_job_id, "model": finetune_model, "batch": train_group["group"], "batch_id": train_group["batch_id"]} url = urljoin(api_base, "fine_tuning/grpo/step") headers = {"Content-Type": "application/json"} response = requests.post(url, headers=headers, json=data) assert response.status_code == 200, f"Failed to run a GRPO step: {response.text}" response = response.json() assert "current_model" in response, f"Response does not contain the next model ID to be used: {response}" current_model = response["current_model"] self.lm.model = ArborProvider._add_provider_prefix(current_model) def step(self, train_data: list[GRPOGroup], train_data_format: TrainDataFormat | str | None): # Note: TrainDataFormat specifies the format for the inner most dict. # Because we run GRPO at the group level, train_data will be a list of # groups, where each group is a list of GRPOChatData. Our teleprompters # ensure that we pass the right data format. # We can consider making this distinction clearer, e.g., by having two # different step methods or changing our smallets data format to be the # GRPO group. # TODO: Support step on the server side assert ( train_data_format == TrainDataFormat.GRPO_CHAT ), f"GRPO only supports the GRPO_CHAT data format. Got {train_data_format} instead." for group in train_data: self._run_grpo_step_one_group(group, train_data_format) def get_status(self) -> GRPOStatus: api_base = self.lm.kwargs["api_base"] url = f"{api_base}fine_tuning/grpo/status" headers = {"Content-Type": "application/json"} body = {"job_id": self.provider_job_id} response = requests.post(url, headers=headers, json=body) assert response.status_code == 200, f"Failed to get GRPO status: {response.text}" return GRPOStatus(**response.json()) def save_checkpoint(self, checkpoint_name: str, score: float | None = None): api_base = self.lm.kwargs["api_base"] url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/checkpoint") headers = {"Content-Type": "application/json"} body = {"job_id": self.provider_job_id, "checkpoint_name": checkpoint_name} response = requests.post(url, headers=headers, json=body) assert response.status_code == 200, f"Failed to save checkpoint: {response.text}" response = response.json() last_checkpoint = response["last_checkpoint"] checkpoints = response["checkpoints"] checkpoint_model_path = checkpoints[last_checkpoint] self.checkpoints[last_checkpoint] = { "model_path": checkpoint_model_path, "score": score, } self.last_checkpoint = last_checkpoint def terminate(self): api_base = self.lm.kwargs["api_base"] url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/terminate") headers = {"Content-Type": "application/json"} body = {"job_id": self.provider_job_id} response = requests.post(url, headers=headers, json=body) assert response.status_code == 200, f"Failed to terminate GRPO: {response.text}" response = response.json() current_model = response["current_model"] self.lm.model = ArborProvider._add_provider_prefix(current_model) def cancel(self): if ArborProvider.does_job_exist(self.provider_job_id): status = self.status() if ArborProvider.is_terminal_training_status(status): err_msg = "Jobs that are complete cannot be canceled." err_msg += f" Job with ID {self.provider_job_id} is done." raise Exception(err_msg) openai.fine_tuning.jobs.cancel(self.provider_job_id) self.provider_job_id = None def status(self) -> TrainingStatus: status = ArborProvider.get_training_status(self.provider_job_id) return status class ArborProvider(Provider): def __init__(self): super().__init__() self.finetunable = True self.reinforceable = True self.TrainingJob = ArborTrainingJob self.ReinforceJob = ArborReinforceJob @staticmethod def launch(lm: "LM", launch_kwargs: dict[str, Any] | None = None): model = ArborProvider._remove_provider_prefix(lm.model) api_base = lm.kwargs["api_base"] launch_kwargs = launch_kwargs or lm.launch_kwargs # Make request to launch endpoint response = requests.post(urljoin(api_base, "chat/launch"), json={"model": model, "launch_kwargs": launch_kwargs}) if response.status_code != 200: raise Exception(f"Failed to launch model. Status code: {response.status_code}, Response: {response.text}") print(f"Inference server for model {model} launched successfully") @staticmethod def kill(lm: "LM", launch_kwargs: dict[str, Any] | None = None): api_base = lm.kwargs["api_base"] response = requests.post( urljoin(api_base, "chat/kill"), ) if response.status_code != 200: raise Exception(f"Failed to kill model. Status code: {response.status_code}, Response: {response.text}") print("Inference killed successfully") @staticmethod def _remove_provider_prefix(model: str) -> str: if model.startswith("openai/"): model = model[7:] if model.startswith("arbor:"): model = model[6:] return model @staticmethod def _add_provider_prefix(model: str) -> str: if not model.startswith("openai/arbor:"): model = "openai/arbor:" + model return model @staticmethod def _get_arbor_base_api(): # TODO: We will delete this method once we start passing the LM object # to finetune. import dspy.settings as settings if not hasattr(settings, "arbor_api_base"): raise ValueError( "Arbor API base not set. Please set the `dspy.settings.arbor_api_base` to the URL for the Arbor server (e.g. 'http://localhost:8000/v1/')." ) return dspy.settings.arbor_api_base @staticmethod def finetune( job: ArborTrainingJob, model: str, train_data: list[dict[str, Any]], train_data_format: TrainDataFormat | None, train_kwargs: dict[str, Any] | None = None, ) -> str: # TODO: We want to re-factor finetune so that it takes in an LM. # Until then, we use the following to get the api information. The # following is a dummy call to ensure that dspy.settings.arbor_base_api # is set. ArborProvider._get_arbor_base_api() model = ArborProvider._remove_provider_prefix(model) print("[Arbor Provider] Validating the data format") ArborProvider.validate_data_format(train_data_format) print("[Arbor Provider] Saving the data to a file") data_path = save_data(train_data) print(f"[Arbor Provider] Data saved to {data_path}") print("[Arbor Provider] Uploading the data to the provider") provider_file_id = ArborProvider.upload_data(data_path) job.provider_file_id = provider_file_id print("[Arbor Provider] Starting remote training") provider_job_id = ArborProvider._start_remote_training( train_file_id=job.provider_file_id, model=model, train_kwargs=train_kwargs, ) job.provider_job_id = provider_job_id print(f"[Arbor Provider] Job started with the Arbor Job ID {provider_job_id}") print("[Arbor Provider] Waiting for training to complete") ArborProvider.wait_for_job(job, train_kwargs) print("[Arbor Provider] Attempting to retrieve the trained model") model = ArborProvider.get_trained_model(job) print(f"[Arbor Provider] Model retrieved: {model}") return ArborProvider._add_provider_prefix(model) @staticmethod def does_job_exist(job_id: str, training_kwargs: dict[str, Any]) -> bool: try: original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() openai.fine_tuning.jobs.retrieve(job_id) openai.base_url = original_base_url return True except Exception: return False @staticmethod def does_file_exist(file_id: str, training_kwargs: dict[str, Any]) -> bool: try: original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() openai.files.retrieve(file_id) openai.base_url = original_base_url return True except Exception: return False @staticmethod def is_terminal_training_status(status: TrainingStatus) -> bool: return status in [ TrainingStatus.succeeded, TrainingStatus.failed, TrainingStatus.cancelled, ] @staticmethod def get_training_status(job_id: str, training_kwargs: dict[str, Any]) -> TrainingStatus: provider_status_to_training_status = { "validating_files": TrainingStatus.pending, "queued": TrainingStatus.pending, "running": TrainingStatus.running, "succeeded": TrainingStatus.succeeded, "failed": TrainingStatus.failed, "cancelled": TrainingStatus.cancelled, "pending": TrainingStatus.pending, "pending_pause": TrainingStatus.pending, "pending_resume": TrainingStatus.pending, "paused": TrainingStatus.pending, "pending_cancel": TrainingStatus.pending, } if job_id is None: print("There is no active job.") return TrainingStatus.not_started err_msg = f"Job with ID {job_id} does not exist." assert ArborProvider.does_job_exist(job_id, training_kwargs), err_msg original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() provider_job = openai.fine_tuning.jobs.retrieve(job_id) openai.base_url = original_base_url provider_status = provider_job.status status = provider_status_to_training_status[provider_status] return status @staticmethod def validate_data_format(data_format: TrainDataFormat): supported_data_formats = [ TrainDataFormat.CHAT, TrainDataFormat.COMPLETION, TrainDataFormat.GRPO_CHAT, ] if data_format not in supported_data_formats: err_msg = f"Arbor does not support the data format {data_format}." raise ValueError(err_msg) @staticmethod def upload_data(data_path: str, training_kwargs: dict[str, Any]) -> str: original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() provider_file = openai.files.create( file=open(data_path, "rb"), purpose="fine-tune", ) openai.base_url = original_base_url return provider_file.id @staticmethod def _start_remote_training(train_file_id: str, model: str, train_kwargs: dict[str, Any]) -> str: train_kwargs = train_kwargs or {} original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() provider_job = openai.fine_tuning.jobs.create( model=model, training_file=train_file_id, hyperparameters=train_kwargs, ) openai.base_url = original_base_url return provider_job.id @staticmethod def wait_for_job( job: TrainingJob, training_kwargs: dict[str, Any], poll_frequency: int = 20, ): done = False cur_event_id = None reported_estimated_time = False while not done: # Report estimated time if not already reported if not reported_estimated_time: original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() remote_job = openai.fine_tuning.jobs.retrieve(job.provider_job_id) openai.base_url = original_base_url timestamp = remote_job.estimated_finish if timestamp: estimated_finish_dt = datetime.fromtimestamp(timestamp) delta_dt = estimated_finish_dt - datetime.now() print(f"[Arbor Provider] The Arbor estimated time remaining is: {delta_dt}") reported_estimated_time = True # Get new events original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() page = openai.fine_tuning.jobs.list_events(fine_tuning_job_id=job.provider_job_id, limit=1) openai.base_url = original_base_url new_event = page.data[0] if page.data else None if new_event and new_event.id != cur_event_id: dt = datetime.fromtimestamp(new_event.created_at) print(f"[Arbor Provider] {dt} {new_event.message}") cur_event_id = new_event.id # Sleep and update the flag time.sleep(poll_frequency) done = ArborProvider.is_terminal_training_status(job.status()) @staticmethod def get_trained_model(job, training_kwargs: dict[str, Any]): status = job.status() if status != TrainingStatus.succeeded: err_msg = f"Job status is {status}." err_msg += f" Must be {TrainingStatus.succeeded} to retrieve model." raise Exception(err_msg) original_base_url = openai.base_url openai.base_url = ArborProvider._get_arbor_base_api() provider_job = openai.fine_tuning.jobs.retrieve(job.provider_job_id) openai.base_url = original_base_url finetuned_model = provider_job.fine_tuned_model return finetuned_model ``` -------------------------------------------------------------------------------- /tests/adapters/test_chat_adapter.py: -------------------------------------------------------------------------------- ```python from typing import Literal from unittest import mock import pydantic import pytest from litellm.utils import ChatCompletionMessageToolCall, Choices, Function, Message, ModelResponse import dspy @pytest.mark.parametrize( "input_literal, output_literal, input_value, expected_input_str, expected_output_str", [ # Scenario 1: double quotes escaped within strings ( Literal["one", "two", 'three"'], Literal["four", "five", 'six"'], "two", "Literal['one', 'two', 'three\"']", "Literal['four', 'five', 'six\"']", ), # Scenario 2: Single quotes inside strings ( Literal["she's here", "okay", "test"], Literal["done", "maybe'soon", "later"], "she's here", "Literal[\"she's here\", 'okay', 'test']", "Literal['done', \"maybe'soon\", 'later']", ), # Scenario 3: Strings containing both single and double quotes ( Literal["both\"and'", "another"], Literal["yet\"another'", "plain"], "another", "Literal['both\"and\\'', 'another']", "Literal['yet\"another\\'', 'plain']", ), # Scenario 4: No quotes at all (check the default) ( Literal["foo", "bar"], Literal["baz", "qux"], "foo", "Literal['foo', 'bar']", "Literal['baz', 'qux']", ), # Scenario 5: Mixed types ( Literal[1, "bar"], Literal[True, 3, "foo"], "bar", "Literal[1, 'bar']", "Literal[True, 3, 'foo']", ), ], ) def test_chat_adapter_quotes_literals_as_expected( input_literal, output_literal, input_value, expected_input_str, expected_output_str ): """ This test verifies that when we declare Literal fields with various mixes of single/double quotes, the generated content string includes those Literals exactly as we want them to appear (like IPython does). """ class TestSignature(dspy.Signature): input_text: input_literal = dspy.InputField() output_text: output_literal = dspy.OutputField() program = dspy.Predict(TestSignature) dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=dspy.ChatAdapter()) with mock.patch("litellm.completion") as mock_completion: program(input_text=input_value) mock_completion.assert_called_once() _, call_kwargs = mock_completion.call_args content = call_kwargs["messages"][0]["content"] assert expected_input_str in content assert expected_output_str in content def test_chat_adapter_sync_call(): signature = dspy.make_signature("question->answer") adapter = dspy.ChatAdapter() lm = dspy.utils.DummyLM([{"answer": "Paris"}]) result = adapter(lm, {}, signature, [], {"question": "What is the capital of France?"}) assert result == [{"answer": "Paris"}] @pytest.mark.asyncio async def test_chat_adapter_async_call(): signature = dspy.make_signature("question->answer") adapter = dspy.ChatAdapter() lm = dspy.utils.DummyLM([{"answer": "Paris"}]) result = await adapter.acall(lm, {}, signature, [], {"question": "What is the capital of France?"}) assert result == [{"answer": "Paris"}] def test_chat_adapter_with_pydantic_models(): """ This test verifies that ChatAdapter can handle different input and output field types, both basic and nested. """ class DogClass(pydantic.BaseModel): dog_breeds: list[str] = pydantic.Field(description="List of the breeds of dogs") num_dogs: int = pydantic.Field(description="Number of dogs the owner has", ge=0, le=10) class PetOwner(pydantic.BaseModel): name: str = pydantic.Field(description="Name of the owner") num_pets: int = pydantic.Field(description="Amount of pets the owner has", ge=0, le=100) dogs: DogClass = pydantic.Field(description="Nested Pydantic class with dog specific information ") class Answer(pydantic.BaseModel): result: str analysis: str class TestSignature(dspy.Signature): owner: PetOwner = dspy.InputField() question: str = dspy.InputField() output: Answer = dspy.OutputField() dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=dspy.ChatAdapter()) program = dspy.Predict(TestSignature) with mock.patch("litellm.completion") as mock_completion: program( owner=PetOwner(name="John", num_pets=5, dogs=DogClass(dog_breeds=["labrador", "chihuahua"], num_dogs=2)), question="How many non-dog pets does John have?", ) mock_completion.assert_called_once() _, call_kwargs = mock_completion.call_args system_content = call_kwargs["messages"][0]["content"] user_content = call_kwargs["messages"][1]["content"] assert "1. `owner` (PetOwner)" in system_content assert "2. `question` (str)" in system_content assert "1. `output` (Answer)" in system_content assert "name" in user_content assert "num_pets" in user_content assert "dogs" in user_content assert "dog_breeds" in user_content assert "num_dogs" in user_content assert "How many non-dog pets does John have?" in user_content def test_chat_adapter_signature_information(): """ This test ensures that the signature information sent to the LM follows an expected format. """ class TestSignature(dspy.Signature): input1: str = dspy.InputField(desc="String Input") input2: int = dspy.InputField(desc="Integer Input") output: str = dspy.OutputField(desc="String Output") dspy.configure(lm=dspy.LM(model="openai/gpt-4o"), adapter=dspy.ChatAdapter()) program = dspy.Predict(TestSignature) with mock.patch("litellm.completion") as mock_completion: program(input1="Test", input2=11) mock_completion.assert_called_once() _, call_kwargs = mock_completion.call_args assert len(call_kwargs["messages"]) == 2 assert call_kwargs["messages"][0]["role"] == "system" assert call_kwargs["messages"][1]["role"] == "user" system_content = call_kwargs["messages"][0]["content"] user_content = call_kwargs["messages"][1]["content"] assert "1. `input1` (str)" in system_content assert "2. `input2` (int)" in system_content assert "1. `output` (str)" in system_content assert "[[ ## input1 ## ]]\n{input1}" in system_content assert "[[ ## input2 ## ]]\n{input2}" in system_content assert "[[ ## output ## ]]\n{output}" in system_content assert "[[ ## completed ## ]]" in system_content assert "[[ ## input1 ## ]]" in user_content assert "[[ ## input2 ## ]]" in user_content assert "[[ ## output ## ]]" in user_content assert "[[ ## completed ## ]]" in user_content def test_chat_adapter_exception_raised_on_failure(): """ This test ensures that on an error, ChatAdapter raises an explicit exception. """ signature = dspy.make_signature("question->answer") adapter = dspy.ChatAdapter() invalid_completion = "{'output':'mismatched value'}" with pytest.raises(dspy.utils.exceptions.AdapterParseError, match="Adapter ChatAdapter failed to parse*"): adapter.parse(signature, invalid_completion) def test_chat_adapter_formats_image(): # Test basic image formatting image = dspy.Image(url="https://example.com/image.jpg") class MySignature(dspy.Signature): image: dspy.Image = dspy.InputField() text: str = dspy.OutputField() adapter = dspy.ChatAdapter() messages = adapter.format(MySignature, [], {"image": image}) assert len(messages) == 2 user_message_content = messages[1]["content"] assert user_message_content is not None # The message should have 3 chunks of types: text, image_url, text assert len(user_message_content) == 3 assert user_message_content[0]["type"] == "text" assert user_message_content[2]["type"] == "text" # Assert that the image is formatted correctly expected_image_content = {"type": "image_url", "image_url": {"url": "https://example.com/image.jpg"}} assert expected_image_content in user_message_content def test_chat_adapter_formats_image_with_few_shot_examples(): class MySignature(dspy.Signature): image: dspy.Image = dspy.InputField() text: str = dspy.OutputField() adapter = dspy.ChatAdapter() demos = [ dspy.Example( image=dspy.Image(url="https://example.com/image1.jpg"), text="This is a test image", ), dspy.Example( image=dspy.Image(url="https://example.com/image2.jpg"), text="This is another test image", ), ] messages = adapter.format(MySignature, demos, {"image": dspy.Image(url="https://example.com/image3.jpg")}) # 1 system message, 2 few shot examples (1 user and assistant message for each example), 1 user message assert len(messages) == 6 assert "[[ ## completed ## ]]\n" in messages[2]["content"] assert "[[ ## completed ## ]]\n" in messages[4]["content"] assert {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}} in messages[1]["content"] assert {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}} in messages[3]["content"] assert {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}} in messages[5]["content"] def test_chat_adapter_formats_image_with_nested_images(): class ImageWrapper(pydantic.BaseModel): images: list[dspy.Image] tag: list[str] class MySignature(dspy.Signature): image: ImageWrapper = dspy.InputField() text: str = dspy.OutputField() image1 = dspy.Image(url="https://example.com/image1.jpg") image2 = dspy.Image(url="https://example.com/image2.jpg") image3 = dspy.Image(url="https://example.com/image3.jpg") image_wrapper = ImageWrapper(images=[image1, image2, image3], tag=["test", "example"]) adapter = dspy.ChatAdapter() messages = adapter.format(MySignature, [], {"image": image_wrapper}) expected_image1_content = {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}} expected_image2_content = {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}} expected_image3_content = {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}} assert expected_image1_content in messages[1]["content"] assert expected_image2_content in messages[1]["content"] assert expected_image3_content in messages[1]["content"] def test_chat_adapter_formats_image_with_few_shot_examples_with_nested_images(): class ImageWrapper(pydantic.BaseModel): images: list[dspy.Image] tag: list[str] class MySignature(dspy.Signature): image: ImageWrapper = dspy.InputField() text: str = dspy.OutputField() image1 = dspy.Image(url="https://example.com/image1.jpg") image2 = dspy.Image(url="https://example.com/image2.jpg") image3 = dspy.Image(url="https://example.com/image3.jpg") image_wrapper = ImageWrapper(images=[image1, image2, image3], tag=["test", "example"]) demos = [ dspy.Example( image=image_wrapper, text="This is a test image", ), ] image_wrapper_2 = ImageWrapper(images=[dspy.Image(url="https://example.com/image4.jpg")], tag=["test", "example"]) adapter = dspy.ChatAdapter() messages = adapter.format(MySignature, demos, {"image": image_wrapper_2}) assert len(messages) == 4 # Image information in the few-shot example's user message expected_image1_content = {"type": "image_url", "image_url": {"url": "https://example.com/image1.jpg"}} expected_image2_content = {"type": "image_url", "image_url": {"url": "https://example.com/image2.jpg"}} expected_image3_content = {"type": "image_url", "image_url": {"url": "https://example.com/image3.jpg"}} assert expected_image1_content in messages[1]["content"] assert expected_image2_content in messages[1]["content"] assert expected_image3_content in messages[1]["content"] # The query image is formatted in the last user message assert {"type": "image_url", "image_url": {"url": "https://example.com/image4.jpg"}} in messages[-1]["content"] def test_chat_adapter_with_tool(): class MySignature(dspy.Signature): """Answer question with the help of the tools""" question: str = dspy.InputField() tools: list[dspy.Tool] = dspy.InputField() answer: str = dspy.OutputField() tool_calls: dspy.ToolCalls = dspy.OutputField() def get_weather(city: str) -> str: """Get the weather for a city""" return f"The weather in {city} is sunny" def get_population(country: str, year: int) -> str: """Get the population for a country""" return f"The population of {country} in {year} is 1000000" tools = [dspy.Tool(get_weather), dspy.Tool(get_population)] adapter = dspy.ChatAdapter() messages = adapter.format(MySignature, [], {"question": "What is the weather in Tokyo?", "tools": tools}) assert len(messages) == 2 # The output field type description should be included in the system message even if the output field is nested assert dspy.ToolCalls.description() in messages[0]["content"] # The user message should include the question and the tools assert "What is the weather in Tokyo?" in messages[1]["content"] assert "get_weather" in messages[1]["content"] assert "get_population" in messages[1]["content"] # Tool arguments format should be included in the user message assert "{'city': {'type': 'string'}}" in messages[1]["content"] assert "{'country': {'type': 'string'}, 'year': {'type': 'integer'}}" in messages[1]["content"] def test_chat_adapter_with_code(): # Test with code as input field class CodeAnalysis(dspy.Signature): """Analyze the time complexity of the code""" code: dspy.Code = dspy.InputField() result: str = dspy.OutputField() adapter = dspy.ChatAdapter() messages = adapter.format(CodeAnalysis, [], {"code": "print('Hello, world!')"}) assert len(messages) == 2 # The output field type description should be included in the system message even if the output field is nested assert dspy.Code.description() in messages[0]["content"] # The user message should include the question and the tools assert "print('Hello, world!')" in messages[1]["content"] # Test with code as output field class CodeGeneration(dspy.Signature): """Generate code to answer the question""" question: str = dspy.InputField() code: dspy.Code = dspy.OutputField() adapter = dspy.ChatAdapter() with mock.patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content='[[ ## code ## ]]\nprint("Hello, world!")'))], model="openai/gpt-4o-mini", ) result = adapter( dspy.LM(model="openai/gpt-4o-mini", cache=False), {}, CodeGeneration, [], {"question": "Write a python program to print 'Hello, world!'"}, ) assert result[0]["code"].code == 'print("Hello, world!")' def test_chat_adapter_formats_conversation_history(): class MySignature(dspy.Signature): question: str = dspy.InputField() history: dspy.History = dspy.InputField() answer: str = dspy.OutputField() history = dspy.History( messages=[ {"question": "What is the capital of France?", "answer": "Paris"}, {"question": "What is the capital of Germany?", "answer": "Berlin"}, ] ) adapter = dspy.ChatAdapter() messages = adapter.format(MySignature, [], {"question": "What is the capital of France?", "history": history}) assert len(messages) == 6 assert messages[1]["content"] == "[[ ## question ## ]]\nWhat is the capital of France?" assert messages[2]["content"] == "[[ ## answer ## ]]\nParis\n\n[[ ## completed ## ]]\n" assert messages[3]["content"] == "[[ ## question ## ]]\nWhat is the capital of Germany?" assert messages[4]["content"] == "[[ ## answer ## ]]\nBerlin\n\n[[ ## completed ## ]]\n" def test_chat_adapter_fallback_to_json_adapter_on_exception(): signature = dspy.make_signature("question->answer") adapter = dspy.ChatAdapter() with mock.patch("litellm.completion") as mock_completion: # Mock returning a response compatible with JSONAdapter but not ChatAdapter mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="{'answer': 'Paris'}"))], model="openai/gpt-4o-mini", ) lm = dspy.LM("openai/gpt-4o-mini", cache=False) with mock.patch("dspy.adapters.json_adapter.JSONAdapter.__call__") as mock_json_adapter_call: adapter(lm, {}, signature, [], {"question": "What is the capital of France?"}) mock_json_adapter_call.assert_called_once() # The parse should succeed result = adapter(lm, {}, signature, [], {"question": "What is the capital of France?"}) assert result == [{"answer": "Paris"}] @pytest.mark.asyncio async def test_chat_adapter_fallback_to_json_adapter_on_exception_async(): signature = dspy.make_signature("question->answer") adapter = dspy.ChatAdapter() with mock.patch("litellm.acompletion") as mock_completion: # Mock returning a response compatible with JSONAdapter but not ChatAdapter mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="{'answer': 'Paris'}"))], model="openai/gpt-4o-mini", ) lm = dspy.LM("openai/gpt-4o-mini", cache=False) with mock.patch("dspy.adapters.json_adapter.JSONAdapter.acall") as mock_json_adapter_acall: await adapter.acall(lm, {}, signature, [], {"question": "What is the capital of France?"}) mock_json_adapter_acall.assert_called_once() # The parse should succeed result = await adapter.acall(lm, {}, signature, [], {"question": "What is the capital of France?"}) assert result == [{"answer": "Paris"}] def test_chat_adapter_toolcalls_native_function_calling(): class MySignature(dspy.Signature): question: str = dspy.InputField() tools: list[dspy.Tool] = dspy.InputField() answer: str = dspy.OutputField() tool_calls: dspy.ToolCalls = dspy.OutputField() def get_weather(city: str) -> str: return f"The weather in {city} is sunny" tools = [dspy.Tool(get_weather)] adapter = dspy.JSONAdapter(use_native_function_calling=True) # Case 1: Tool calls are present in the response, while content is None. with mock.patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[ Choices( finish_reason="tool_calls", index=0, message=Message( content=None, role="assistant", tool_calls=[ ChatCompletionMessageToolCall( function=Function(arguments='{"city":"Paris"}', name="get_weather"), id="call_pQm8ajtSMxgA0nrzK2ivFmxG", type="function", ) ], ), ), ], model="openai/gpt-4o-mini", ) result = adapter( dspy.LM(model="openai/gpt-4o-mini", cache=False), {}, MySignature, [], {"question": "What is the weather in Paris?", "tools": tools}, ) assert result[0]["tool_calls"] == dspy.ToolCalls( tool_calls=[dspy.ToolCalls.ToolCall(name="get_weather", args={"city": "Paris"})] ) # `answer` is not present, so we set it to None assert result[0]["answer"] is None # Case 2: Tool calls are not present in the response, while content is present. with mock.patch("litellm.completion") as mock_completion: mock_completion.return_value = ModelResponse( choices=[Choices(message=Message(content="{'answer': 'Paris'}"))], model="openai/gpt-4o-mini", ) result = adapter( dspy.LM(model="openai/gpt-4o-mini", cache=False), {}, MySignature, [], {"question": "What is the weather in Paris?", "tools": tools}, ) assert result[0]["answer"] == "Paris" assert result[0]["tool_calls"] is None def test_chat_adapter_toolcalls_vague_match(): class MySignature(dspy.Signature): question: str = dspy.InputField() tools: list[dspy.Tool] = dspy.InputField() tool_calls: dspy.ToolCalls = dspy.OutputField() def get_weather(city: str) -> str: return f"The weather in {city} is sunny" tools = [dspy.Tool(get_weather)] adapter = dspy.ChatAdapter() with mock.patch("litellm.completion") as mock_completion: # Case 1: tool_calls field is a list of dicts mock_completion.return_value = ModelResponse( choices=[ Choices( message=Message( content="[[ ## tool_calls ## ]]\n[{'name': 'get_weather', 'args': {'city': 'Paris'}]" ) ) ], model="openai/gpt-4o-mini", ) result = adapter( dspy.LM(model="openai/gpt-4o-mini", cache=False), {}, MySignature, [], {"question": "What is the weather in Paris?", "tools": tools}, ) assert result[0]["tool_calls"] == dspy.ToolCalls( tool_calls=[dspy.ToolCalls.ToolCall(name="get_weather", args={"city": "Paris"})] ) with mock.patch("litellm.completion") as mock_completion: # Case 2: tool_calls field is a single dict with "name" and "args" keys mock_completion.return_value = ModelResponse( choices=[ Choices( message=Message( content="[[ ## tool_calls ## ]]\n{'name': 'get_weather', 'args': {'city': 'Paris'}}" ) ) ], model="openai/gpt-4o-mini", ) result = adapter( dspy.LM(model="openai/gpt-4o-mini", cache=False), {}, MySignature, [], {"question": "What is the weather in Paris?", "tools": tools}, ) assert result[0]["tool_calls"] == dspy.ToolCalls( tool_calls=[dspy.ToolCalls.ToolCall(name="get_weather", args={"city": "Paris"})] ) ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/ai_text_game/index.md: -------------------------------------------------------------------------------- ```markdown # Building a Creative Text-Based AI Game with DSPy This tutorial demonstrates how to create an interactive text-based adventure game using DSPy's modular programming approach. You'll build a dynamic game where AI handles narrative generation, character interactions, and adaptive gameplay. ## What You'll Build An intelligent text-based adventure game featuring: - Dynamic story generation and branching narratives - AI-powered character interactions and dialogue - Adaptive gameplay that responds to player choices - Inventory and character progression systems - Save/load game state functionality ## Setup ```bash pip install dspy rich typer ``` ## Step 1: Core Game Framework ```python import dspy import json from typing import Dict, List, Optional, Any from dataclasses import dataclass, field from enum import Enum import random from rich.console import Console from rich.panel import Panel from rich.text import Text import typer # Configure DSPy lm = dspy.LM(model='openai/gpt-4o-mini') dspy.configure(lm=lm) console = Console() class GameState(Enum): MENU = "menu" PLAYING = "playing" INVENTORY = "inventory" CHARACTER = "character" GAME_OVER = "game_over" @dataclass class Player: name: str health: int = 100 level: int = 1 experience: int = 0 inventory: list[str] = field(default_factory=list) skills: dict[str, int] = field(default_factory=lambda: { "strength": 10, "intelligence": 10, "charisma": 10, "stealth": 10 }) def add_item(self, item: str): self.inventory.append(item) console.print(f"[green]Added {item} to inventory![/green]") def remove_item(self, item: str) -> bool: if item in self.inventory: self.inventory.remove(item) return True return False def gain_experience(self, amount: int): self.experience += amount old_level = self.level self.level = 1 + (self.experience // 100) if self.level > old_level: console.print(f"[bold yellow]Level up! You are now level {self.level}![/bold yellow]") @dataclass class GameContext: current_location: str = "Village Square" story_progress: int = 0 visited_locations: list[str] = field(default_factory=list) npcs_met: list[str] = field(default_factory=list) completed_quests: list[str] = field(default_factory=list) game_flags: dict[str, bool] = field(default_factory=dict) def add_flag(self, flag: str, value: bool = True): self.game_flags[flag] = value def has_flag(self, flag: str) -> bool: return self.game_flags.get(flag, False) class GameEngine: def __init__(self): self.player = None self.context = GameContext() self.state = GameState.MENU self.running = True def save_game(self, filename: str = "savegame.json"): """Save current game state.""" save_data = { "player": { "name": self.player.name, "health": self.player.health, "level": self.player.level, "experience": self.player.experience, "inventory": self.player.inventory, "skills": self.player.skills }, "context": { "current_location": self.context.current_location, "story_progress": self.context.story_progress, "visited_locations": self.context.visited_locations, "npcs_met": self.context.npcs_met, "completed_quests": self.context.completed_quests, "game_flags": self.context.game_flags } } with open(filename, 'w') as f: json.dump(save_data, f, indent=2) console.print(f"[green]Game saved to {filename}![/green]") def load_game(self, filename: str = "savegame.json") -> bool: """Load game state from file.""" try: with open(filename, 'r') as f: save_data = json.load(f) # Reconstruct player player_data = save_data["player"] self.player = Player( name=player_data["name"], health=player_data["health"], level=player_data["level"], experience=player_data["experience"], inventory=player_data["inventory"], skills=player_data["skills"] ) # Reconstruct context context_data = save_data["context"] self.context = GameContext( current_location=context_data["current_location"], story_progress=context_data["story_progress"], visited_locations=context_data["visited_locations"], npcs_met=context_data["npcs_met"], completed_quests=context_data["completed_quests"], game_flags=context_data["game_flags"] ) console.print(f"[green]Game loaded from {filename}![/green]") return True except FileNotFoundError: console.print(f"[red]Save file {filename} not found![/red]") return False except Exception as e: console.print(f"[red]Error loading game: {e}![/red]") return False # Initialize game engine game = GameEngine() ``` ## Step 2: AI-Powered Story Generation ```python class StoryGenerator(dspy.Signature): """Generate dynamic story content based on current game state.""" location: str = dspy.InputField(desc="Current location") player_info: str = dspy.InputField(desc="Player information and stats") story_progress: int = dspy.InputField(desc="Current story progress level") recent_actions: str = dspy.InputField(desc="Player's recent actions") scene_description: str = dspy.OutputField(desc="Vivid description of current scene") available_actions: list[str] = dspy.OutputField(desc="List of possible player actions") npcs_present: list[str] = dspy.OutputField(desc="NPCs present in this location") items_available: list[str] = dspy.OutputField(desc="Items that can be found or interacted with") class DialogueGenerator(dspy.Signature): """Generate NPC dialogue and responses.""" npc_name: str = dspy.InputField(desc="Name and type of NPC") npc_personality: str = dspy.InputField(desc="NPC personality and background") player_input: str = dspy.InputField(desc="What the player said or did") context: str = dspy.InputField(desc="Current game context and history") npc_response: str = dspy.OutputField(desc="NPC's dialogue response") mood_change: str = dspy.OutputField(desc="How NPC's mood changed (positive/negative/neutral)") quest_offered: bool = dspy.OutputField(desc="Whether NPC offers a quest") information_revealed: str = dspy.OutputField(desc="Any important information shared") class ActionResolver(dspy.Signature): """Resolve player actions and determine outcomes.""" action: str = dspy.InputField(desc="Player's chosen action") player_stats: str = dspy.InputField(desc="Player's current stats and skills") context: str = dspy.InputField(desc="Current game context") difficulty: str = dspy.InputField(desc="Difficulty level of the action") success: bool = dspy.OutputField(desc="Whether the action succeeded") outcome_description: str = dspy.OutputField(desc="Description of what happened") stat_changes: dict[str, int] = dspy.OutputField(desc="Changes to player stats") items_gained: list[str] = dspy.OutputField(desc="Items gained from this action") experience_gained: int = dspy.OutputField(desc="Experience points gained") class GameAI(dspy.Module): """Main AI module for game logic and narrative.""" def __init__(self): super().__init__() self.story_gen = dspy.ChainOfThought(StoryGenerator) self.dialogue_gen = dspy.ChainOfThought(DialogueGenerator) self.action_resolver = dspy.ChainOfThought(ActionResolver) def generate_scene(self, player: Player, context: GameContext, recent_actions: str = "") -> Dict: """Generate current scene description and options.""" player_info = f"Level {player.level} {player.name}, Health: {player.health}, Skills: {player.skills}" scene = self.story_gen( location=context.current_location, player_info=player_info, story_progress=context.story_progress, recent_actions=recent_actions ) return { "description": scene.scene_description, "actions": scene.available_actions, "npcs": scene.npcs_present, "items": scene.items_available } def handle_dialogue(self, npc_name: str, player_input: str, context: GameContext) -> Dict: """Handle conversation with NPCs.""" # Create NPC personality based on name and context personality_map = { "Village Elder": "Wise, knowledgeable, speaks in riddles, has ancient knowledge", "Merchant": "Greedy but fair, loves to bargain, knows about valuable items", "Guard": "Dutiful, suspicious of strangers, follows rules strictly", "Thief": "Sneaky, untrustworthy, has information about hidden things", "Wizard": "Mysterious, powerful, speaks about magic and ancient forces" } personality = personality_map.get(npc_name, "Friendly villager with local knowledge") game_context = f"Location: {context.current_location}, Story progress: {context.story_progress}" response = self.dialogue_gen( npc_name=npc_name, npc_personality=personality, player_input=player_input, context=game_context ) return { "response": response.npc_response, "mood": response.mood_change, "quest": response.quest_offered, "info": response.information_revealed } def resolve_action(self, action: str, player: Player, context: GameContext) -> Dict: """Resolve player actions and determine outcomes.""" player_stats = f"Level {player.level}, Health {player.health}, Skills: {player.skills}" game_context = f"Location: {context.current_location}, Progress: {context.story_progress}" # Determine difficulty based on action type difficulty = "medium" if any(word in action.lower() for word in ["fight", "battle", "attack"]): difficulty = "hard" elif any(word in action.lower() for word in ["look", "examine", "talk"]): difficulty = "easy" result = self.action_resolver( action=action, player_stats=player_stats, context=game_context, difficulty=difficulty ) return { "success": result.success, "description": result.outcome_description, "stat_changes": result.stat_changes, "items": result.items_gained, "experience": result.experience_gained } # Initialize AI ai = GameAI() ``` ## Step 3: Game Interface and Interaction ```python def display_game_header(): """Display the game header.""" header = Text("🏰 MYSTIC REALM ADVENTURE 🏰", style="bold magenta") console.print(Panel(header, style="bright_blue")) def display_player_status(player: Player): """Display player status panel.""" status = f""" [bold]Name:[/bold] {player.name} [bold]Level:[/bold] {player.level} (XP: {player.experience}) [bold]Health:[/bold] {player.health}/100 [bold]Skills:[/bold] • Strength: {player.skills['strength']} • Intelligence: {player.skills['intelligence']} • Charisma: {player.skills['charisma']} • Stealth: {player.skills['stealth']} [bold]Inventory:[/bold] {len(player.inventory)} items """ console.print(Panel(status.strip(), title="Player Status", style="green")) def display_location(context: GameContext, scene: Dict): """Display current location and scene.""" location_panel = f""" [bold yellow]{context.current_location}[/bold yellow] {scene['description']} """ if scene['npcs']: location_panel += f"\n\n[bold]NPCs present:[/bold] {', '.join(scene['npcs'])}" if scene['items']: location_panel += f"\n[bold]Items visible:[/bold] {', '.join(scene['items'])}" console.print(Panel(location_panel.strip(), title="Current Location", style="cyan")) def display_actions(actions: list[str]): """Display available actions.""" action_text = "\n".join([f"{i+1}. {action}" for i, action in enumerate(actions)]) console.print(Panel(action_text, title="Available Actions", style="yellow")) def get_player_choice(max_choices: int) -> int: """Get player's choice with input validation.""" while True: try: choice = typer.prompt("Choose an action (number)") choice_num = int(choice) if 1 <= choice_num <= max_choices: return choice_num - 1 else: console.print(f"[red]Please enter a number between 1 and {max_choices}[/red]") except ValueError: console.print("[red]Please enter a valid number[/red]") def show_inventory(player: Player): """Display player inventory.""" if not player.inventory: console.print(Panel("Your inventory is empty.", title="Inventory", style="red")) else: items = "\n".join([f"• {item}" for item in player.inventory]) console.print(Panel(items, title="Inventory", style="green")) def main_menu(): """Display main menu and handle selection.""" console.clear() display_game_header() menu_options = [ "1. New Game", "2. Load Game", "3. How to Play", "4. Exit" ] menu_text = "\n".join(menu_options) console.print(Panel(menu_text, title="Main Menu", style="bright_blue")) choice = typer.prompt("Select an option") return choice def show_help(): """Display help information.""" help_text = """ [bold]How to Play:[/bold] • This is a text-based adventure game powered by AI • Make choices by selecting numbered options • Talk to NPCs to learn about the world and get quests • Explore different locations to find items and adventures • Your choices affect the story and character development • Use 'inventory' to check your items • Use 'status' to see your character info • Type 'save' to save your progress • Type 'quit' to return to main menu [bold]Tips:[/bold] • Different skills affect your success in various actions • NPCs remember your previous interactions • Explore thoroughly - there are hidden secrets! • Your reputation affects how NPCs treat you """ console.print(Panel(help_text.strip(), title="Game Help", style="blue")) typer.prompt("Press Enter to continue") ``` ## Step 4: Main Game Loop ```python def create_new_character(): """Create a new player character.""" console.clear() display_game_header() name = typer.prompt("Enter your character's name") # Character creation with skill point allocation console.print("\n[bold]Character Creation[/bold]") console.print("You have 10 extra skill points to distribute among your skills.") console.print("Base skills start at 10 each.\n") skills = {"strength": 10, "intelligence": 10, "charisma": 10, "stealth": 10} points_remaining = 10 for skill in skills.keys(): if points_remaining > 0: console.print(f"Points remaining: {points_remaining}") while True: try: points = int(typer.prompt(f"Points to add to {skill} (0-{points_remaining})")) if 0 <= points <= points_remaining: skills[skill] += points points_remaining -= points break else: console.print(f"[red]Enter a number between 0 and {points_remaining}[/red]") except ValueError: console.print("[red]Please enter a valid number[/red]") player = Player(name=name, skills=skills) console.print(f"\n[green]Welcome to Mystic Realm, {name}![/green]") return player def game_loop(): """Main game loop.""" recent_actions = "" while game.running and game.state == GameState.PLAYING: console.clear() display_game_header() # Generate current scene scene = ai.generate_scene(game.player, game.context, recent_actions) # Display game state display_player_status(game.player) display_location(game.context, scene) # Add standard actions all_actions = scene['actions'] + ["Check inventory", "Character status", "Save game", "Quit to menu"] display_actions(all_actions) # Get player choice choice_idx = get_player_choice(len(all_actions)) chosen_action = all_actions[choice_idx] # Handle special commands if chosen_action == "Check inventory": show_inventory(game.player) typer.prompt("Press Enter to continue") continue elif chosen_action == "Character status": display_player_status(game.player) typer.prompt("Press Enter to continue") continue elif chosen_action == "Save game": game.save_game() typer.prompt("Press Enter to continue") continue elif chosen_action == "Quit to menu": game.state = GameState.MENU break # Handle game actions if chosen_action in scene['actions']: # Check if it's dialogue with an NPC npc_target = None for npc in scene['npcs']: if npc.lower() in chosen_action.lower(): npc_target = npc break if npc_target: # Handle NPC interaction console.print(f"\n[bold]Talking to {npc_target}...[/bold]") dialogue = ai.handle_dialogue(npc_target, chosen_action, game.context) console.print(f"\n[italic]{npc_target}:[/italic] \"{dialogue['response']}\"") if dialogue['quest']: console.print(f"[yellow]💼 Quest opportunity detected![/yellow]") if dialogue['info']: console.print(f"[blue]ℹ️ {dialogue['info']}[/blue]") # Add NPC to met list if npc_target not in game.context.npcs_met: game.context.npcs_met.append(npc_target) recent_actions = f"Talked to {npc_target}: {chosen_action}" else: # Handle general action result = ai.resolve_action(chosen_action, game.player, game.context) console.print(f"\n{result['description']}") # Apply results if result['success']: console.print("[green]✅ Success![/green]") # Apply stat changes for stat, change in result['stat_changes'].items(): if stat in game.player.skills: game.player.skills[stat] += change if change > 0: console.print(f"[green]{stat.title()} increased by {change}![/green]") elif stat == "health": game.player.health = max(0, min(100, game.player.health + change)) if change > 0: console.print(f"[green]Health restored by {change}![/green]") elif change < 0: console.print(f"[red]Health decreased by {abs(change)}![/red]") # Add items for item in result['items']: game.player.add_item(item) # Give experience if result['experience'] > 0: game.player.gain_experience(result['experience']) # Update story progress game.context.story_progress += 1 else: console.print("[red]❌ The action didn't go as planned...[/red]") recent_actions = f"Attempted: {chosen_action}" # Check for game over conditions if game.player.health <= 0: console.print("\n[bold red]💀 You have died! Game Over![/bold red]") game.state = GameState.GAME_OVER break typer.prompt("\nPress Enter to continue") def main(): """Main game function.""" while game.running: if game.state == GameState.MENU: choice = main_menu() if choice == "1": game.player = create_new_character() game.context = GameContext() game.state = GameState.PLAYING console.print("\n[italic]Your adventure begins...[/italic]") typer.prompt("Press Enter to start") elif choice == "2": if game.load_game(): game.state = GameState.PLAYING typer.prompt("Press Enter to continue") elif choice == "3": show_help() elif choice == "4": game.running = False console.print("[bold]Thanks for playing! Goodbye![/bold]") elif game.state == GameState.PLAYING: game_loop() elif game.state == GameState.GAME_OVER: console.print("\n[bold]Game Over[/bold]") restart = typer.confirm("Would you like to return to the main menu?") if restart: game.state = GameState.MENU else: game.running = False if __name__ == "__main__": main() ``` ## Example Gameplay When you run the game, you'll experience: **Character Creation:** ``` 🏰 MYSTIC REALM ADVENTURE 🏰 Enter your character's name: Aria Character Creation You have 10 extra skill points to distribute among your skills. Base skills start at 10 each. Points remaining: 10 Points to add to strength (0-10): 2 Points to add to intelligence (0-8): 4 Points to add to charisma (0-4): 3 Points to add to stealth (0-1): 1 Welcome to Mystic Realm, Aria! ``` **Dynamic Scene Generation:** ``` ┌──────────── Current Location ────────────┐ │ Village Square │ │ │ │ You stand in the bustling heart of │ │ Willowbrook Village. The ancient stone │ │ fountain bubbles cheerfully as merchants │ │ hawk their wares and children play. A │ │ mysterious hooded figure lurks near the │ │ shadows of the old oak tree. │ │ │ │ NPCs present: Village Elder, Merchant │ │ Items visible: Strange Medallion, Herbs │ └──────────────────────────────────────────┘ ┌────────── Available Actions ─────────────┐ │ 1. Approach the hooded figure │ │ 2. Talk to the Village Elder │ │ 3. Browse the merchant's wares │ │ 4. Examine the strange medallion │ │ 5. Gather herbs near the fountain │ │ 6. Head to the forest path │ └───────────────────────────────────────────┘ ``` **AI-Generated Dialogue:** ``` Talking to Village Elder... Village Elder: "Ah, young traveler, I sense a great destiny surrounds you like morning mist. The ancient prophecy speaks of one who would come bearing the mark of courage. Tell me, have you noticed anything... unusual in your travels?" 💼 Quest opportunity detected! ℹ️ The Village Elder knows about an ancient prophecy that might involve you ``` ## Next Steps - **Combat System**: Add turn-based battles with strategy - **Magic System**: Spellcasting with resource management - **Multiplayer**: Network support for cooperative adventures - **Quest System**: Complex multi-step missions with branching outcomes - **World Building**: Procedurally generated locations and characters - **Audio**: Add sound effects and background music This tutorial demonstrates how DSPy's modular approach enables complex, interactive systems where AI handles creative content generation while maintaining consistent game logic and player agency. ``` -------------------------------------------------------------------------------- /docs/docs/index.md: -------------------------------------------------------------------------------- ```markdown --- sidebar_position: 1 hide: - navigation - toc --- { width="200", align=left } # _Programming_—not prompting—_LMs_ [](https://pepy.tech/projects/dspy) DSPy is a declarative framework for building modular AI software. It allows you to **iterate fast on structured code**, rather than brittle strings, and offers algorithms that **compile AI programs into effective prompts and weights** for your language models, whether you're building simple classifiers, sophisticated RAG pipelines, or Agent loops. Instead of wrangling prompts or training jobs, DSPy (Declarative Self-improving Python) enables you to **build AI software from natural-language modules** and to _generically compose them_ with different models, inference strategies, or learning algorithms. This makes AI software **more reliable, maintainable, and portable** across models and strategies. *tl;dr* Think of DSPy as a higher-level language for AI programming ([lecture](https://www.youtube.com/watch?v=JEMYuzrKLUw)), like the shift from assembly to C or pointer arithmetic to SQL. Meet the community, seek help, or start contributing via [GitHub](https://github.com/stanfordnlp/dspy) and [Discord](https://discord.gg/XCGy2WDCQB). <!-- Its abstractions make your AI software more reliable and maintainable, and allow it to become more portable as new models and learning techniques emerge. It's also just rather elegant! --> !!! info "Getting Started I: Install DSPy and set up your LM" ```bash > pip install -U dspy ``` === "OpenAI" You can authenticate by setting the `OPENAI_API_KEY` env variable or passing `api_key` below. ```python linenums="1" import dspy lm = dspy.LM("openai/gpt-4o-mini", api_key="YOUR_OPENAI_API_KEY") dspy.configure(lm=lm) ``` === "Anthropic" You can authenticate by setting the `ANTHROPIC_API_KEY` env variable or passing `api_key` below. ```python linenums="1" import dspy lm = dspy.LM("anthropic/claude-3-opus-20240229", api_key="YOUR_ANTHROPIC_API_KEY") dspy.configure(lm=lm) ``` === "Databricks" If you're on the Databricks platform, authentication is automatic via their SDK. If not, you can set the env variables `DATABRICKS_API_KEY` and `DATABRICKS_API_BASE`, or pass `api_key` and `api_base` below. ```python linenums="1" import dspy lm = dspy.LM( "databricks/databricks-llama-4-maverick", api_key="YOUR_DATABRICKS_ACCESS_TOKEN", api_base="YOUR_DATABRICKS_WORKSPACE_URL", # e.g.: https://dbc-64bf4923-e39e.cloud.databricks.com/serving-endpoints ) dspy.configure(lm=lm) ``` === "Gemini" You can authenticate by setting the `GEMINI_API_KEY` env variable or passing `api_key` below. ```python linenums="1" import dspy lm = dspy.LM("gemini/gemini-2.5-flash", api_key="YOUR_GEMINI_API_KEY") dspy.configure(lm=lm) ``` === "Local LMs on your laptop" First, install [Ollama](https://github.com/ollama/ollama) and launch its server with your LM. ```bash > curl -fsSL https://ollama.ai/install.sh | sh > ollama run llama3.2:1b ``` Then, connect to it from your DSPy code. ```python linenums="1" import dspy lm = dspy.LM("ollama_chat/llama3.2:1b", api_base="http://localhost:11434", api_key="") dspy.configure(lm=lm) ``` === "Local LMs on a GPU server" First, install [SGLang](https://docs.sglang.ai/get_started/install.html) and launch its server with your LM. ```bash > pip install "sglang[all]" > pip install flashinfer -i https://flashinfer.ai/whl/cu121/torch2.4/ > CUDA_VISIBLE_DEVICES=0 python -m sglang.launch_server --port 7501 --model-path meta-llama/Llama-3.1-8B-Instruct ``` If you don't have access from Meta to download `meta-llama/Llama-3.1-8B-Instruct`, use `Qwen/Qwen2.5-7B-Instruct` for example. Next, connect to your local LM from your DSPy code as an `OpenAI`-compatible endpoint. ```python linenums="1" lm = dspy.LM("openai/meta-llama/Llama-3.1-8B-Instruct", api_base="http://localhost:7501/v1", # ensure this points to your port api_key="local", model_type="chat") dspy.configure(lm=lm) ``` === "Other providers" In DSPy, you can use any of the dozens of [LLM providers supported by LiteLLM](https://docs.litellm.ai/docs/providers). Simply follow their instructions for which `{PROVIDER}_API_KEY` to set and how to write pass the `{provider_name}/{model_name}` to the constructor. Some examples: - `anyscale/mistralai/Mistral-7B-Instruct-v0.1`, with `ANYSCALE_API_KEY` - `together_ai/togethercomputer/llama-2-70b-chat`, with `TOGETHERAI_API_KEY` - `sagemaker/<your-endpoint-name>`, with `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, and `AWS_REGION_NAME` - `azure/<your_deployment_name>`, with `AZURE_API_KEY`, `AZURE_API_BASE`, `AZURE_API_VERSION`, and the optional `AZURE_AD_TOKEN` and `AZURE_API_TYPE` If your provider offers an OpenAI-compatible endpoint, just add an `openai/` prefix to your full model name. ```python linenums="1" import dspy lm = dspy.LM("openai/your-model-name", api_key="PROVIDER_API_KEY", api_base="YOUR_PROVIDER_URL") dspy.configure(lm=lm) ``` ??? "Calling the LM directly." Idiomatic DSPy involves using _modules_, which we define in the rest of this page. However, it's still easy to call the `lm` you configured above directly. This gives you a unified API and lets you benefit from utilities like automatic caching. ```python linenums="1" lm("Say this is a test!", temperature=0.7) # => ['This is a test!'] lm(messages=[{"role": "user", "content": "Say this is a test!"}]) # => ['This is a test!'] ``` ## 1) **Modules** help you describe AI behavior as _code_, not strings. To build reliable AI systems, you must iterate fast. But maintaining prompts makes that hard: it forces you to tinker with strings or data _every time you change your LM, metrics, or pipeline_. Having built over a dozen best-in-class compound LM systems since 2020, we learned this the hard way—and so built DSPy to decouple AI system design from messy incidental choices about specific LMs or prompting strategies. DSPy shifts your focus from tinkering with prompt strings to **programming with structured and declarative natural-language modules**. For every AI component in your system, you specify input/output behavior as a _signature_ and select a _module_ to assign a strategy for invoking your LM. DSPy expands your signatures into prompts and parses your typed outputs, so you can compose different modules together into ergonomic, portable, and optimizable AI systems. !!! info "Getting Started II: Build DSPy modules for various tasks" Try the examples below after configuring your `lm` above. Adjust the fields to explore what tasks your LM can do well out of the box. Each tab below sets up a DSPy module, like `dspy.Predict`, `dspy.ChainOfThought`, or `dspy.ReAct`, with a task-specific _signature_. For example, `question -> answer: float` tells the module to take a question and to produce a `float` answer. === "Math" ```python linenums="1" math = dspy.ChainOfThought("question -> answer: float") math(question="Two dice are tossed. What is the probability that the sum equals two?") ``` **Possible Output:** ```text Prediction( reasoning='When two dice are tossed, each die has 6 faces, resulting in a total of 6 x 6 = 36 possible outcomes. The sum of the numbers on the two dice equals two only when both dice show a 1. This is just one specific outcome: (1, 1). Therefore, there is only 1 favorable outcome. The probability of the sum being two is the number of favorable outcomes divided by the total number of possible outcomes, which is 1/36.', answer=0.0277776 ) ``` === "RAG" ```python linenums="1" def search_wikipedia(query: str) -> list[str]: results = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")(query, k=3) return [x["text"] for x in results] rag = dspy.ChainOfThought("context, question -> response") question = "What's the name of the castle that David Gregory inherited?" rag(context=search_wikipedia(question), question=question) ``` **Possible Output:** ```text Prediction( reasoning='The context provides information about David Gregory, a Scottish physician and inventor. It specifically mentions that he inherited Kinnairdy Castle in 1664. This detail directly answers the question about the name of the castle that David Gregory inherited.', response='Kinnairdy Castle' ) ``` === "Classification" ```python linenums="1" from typing import Literal class Classify(dspy.Signature): """Classify sentiment of a given sentence.""" sentence: str = dspy.InputField() sentiment: Literal["positive", "negative", "neutral"] = dspy.OutputField() confidence: float = dspy.OutputField() classify = dspy.Predict(Classify) classify(sentence="This book was super fun to read, though not the last chapter.") ``` **Possible Output:** ```text Prediction( sentiment='positive', confidence=0.75 ) ``` === "Information Extraction" ```python linenums="1" class ExtractInfo(dspy.Signature): """Extract structured information from text.""" text: str = dspy.InputField() title: str = dspy.OutputField() headings: list[str] = dspy.OutputField() entities: list[dict[str, str]] = dspy.OutputField(desc="a list of entities and their metadata") module = dspy.Predict(ExtractInfo) text = "Apple Inc. announced its latest iPhone 14 today." \ "The CEO, Tim Cook, highlighted its new features in a press release." response = module(text=text) print(response.title) print(response.headings) print(response.entities) ``` **Possible Output:** ```text Apple Inc. Announces iPhone 14 ['Introduction', "CEO's Statement", 'New Features'] [{'name': 'Apple Inc.', 'type': 'Organization'}, {'name': 'iPhone 14', 'type': 'Product'}, {'name': 'Tim Cook', 'type': 'Person'}] ``` === "Agents" ```python linenums="1" def evaluate_math(expression: str): return dspy.PythonInterpreter({}).execute(expression) def search_wikipedia(query: str): results = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")(query, k=3) return [x["text"] for x in results] react = dspy.ReAct("question -> answer: float", tools=[evaluate_math, search_wikipedia]) pred = react(question="What is 9362158 divided by the year of birth of David Gregory of Kinnairdy castle?") print(pred.answer) ``` **Possible Output:** ```text 5761.328 ``` === "Multi-Stage Pipelines" ```python linenums="1" class Outline(dspy.Signature): """Outline a thorough overview of a topic.""" topic: str = dspy.InputField() title: str = dspy.OutputField() sections: list[str] = dspy.OutputField() section_subheadings: dict[str, list[str]] = dspy.OutputField(desc="mapping from section headings to subheadings") class DraftSection(dspy.Signature): """Draft a top-level section of an article.""" topic: str = dspy.InputField() section_heading: str = dspy.InputField() section_subheadings: list[str] = dspy.InputField() content: str = dspy.OutputField(desc="markdown-formatted section") class DraftArticle(dspy.Module): def __init__(self): self.build_outline = dspy.ChainOfThought(Outline) self.draft_section = dspy.ChainOfThought(DraftSection) def forward(self, topic): outline = self.build_outline(topic=topic) sections = [] for heading, subheadings in outline.section_subheadings.items(): section, subheadings = f"## {heading}", [f"### {subheading}" for subheading in subheadings] section = self.draft_section(topic=outline.title, section_heading=section, section_subheadings=subheadings) sections.append(section.content) return dspy.Prediction(title=outline.title, sections=sections) draft_article = DraftArticle() article = draft_article(topic="World Cup 2002") ``` **Possible Output:** A 1500-word article on the topic, e.g. ```text ## Qualification Process The qualification process for the 2002 FIFA World Cup involved a series of..... [shortened here for presentation]. ### UEFA Qualifiers The UEFA qualifiers involved 50 teams competing for 13..... [shortened here for presentation]. .... [rest of the article] ``` Note that DSPy makes it straightforward to optimize multi-stage modules like this. As long as you can evaluate the _final_ output of the system, every DSPy optimizer can tune all of the intermediate modules. ??? "Using DSPy in practice: from quick scripting to building sophisticated systems." Standard prompts conflate interface ("what should the LM do?") with implementation ("how do we tell it to do that?"). DSPy isolates the former as _signatures_ so we can infer the latter or learn it from data — in the context of a bigger program. Even before you start using optimizers, DSPy's modules allow you to script effective LM systems as ergonomic, portable _code_. Across many tasks and LMs, we maintain _signature test suites_ that assess the reliability of the built-in DSPy adapters. Adapters are the components that map signatures to prompts prior to optimization. If you find a task where a simple prompt consistently outperforms idiomatic DSPy for your LM, consider that a bug and [file an issue](https://github.com/stanfordnlp/dspy/issues). We'll use this to improve the built-in adapters. ## 2) **Optimizers** tune the prompts and weights of your AI modules. DSPy provides you with the tools to compile high-level code with natural language annotations into the low-level computations, prompts, or weight updates that align your LM with your program's structure and metrics. If you change your code or your metrics, you can simply re-compile accordingly. Given a few tens or hundreds of representative _inputs_ of your task and a _metric_ that can measure the quality of your system's outputs, you can use a DSPy optimizer. Different optimizers in DSPy work by **synthesizing good few-shot examples** for every module, like `dspy.BootstrapRS`,<sup>[1](https://arxiv.org/abs/2310.03714)</sup> **proposing and intelligently exploring better natural-language instructions** for every prompt, like [`dspy.GEPA`](https://dspy.ai/tutorials/gepa_ai_program/)<sup>[2](https://arxiv.org/abs/2507.19457)</sup>, `dspy.MIPROv2`,<sup>[3](https://arxiv.org/abs/2406.11695)</sup> and **building datasets for your modules and using them to finetune the LM weights** in your system, like `dspy.BootstrapFinetune`.<sup>[4](https://arxiv.org/abs/2407.10930)</sup> For detailed tutorials on running `dspy.GEPA`, please take a look at [dspy.GEPA tutorials](https://dspy.ai/tutorials/gepa_ai_program/). !!! info "Getting Started III: Optimizing the LM prompts or weights in DSPy programs" A typical simple optimization run costs on the order of $2 USD and takes around 20 minutes, but be careful when running optimizers with very large LMs or very large datasets. Optimization can cost as little as a few cents or up to tens of dollars, depending on your LM, dataset, and configuration. Examples below rely on HuggingFace/datasets, you can install it by the command below. ```bash > pip install -U datasets ``` === "Optimizing prompts for a ReAct agent" This is a minimal but fully runnable example of setting up a `dspy.ReAct` agent that answers questions via search from Wikipedia and then optimizing it using `dspy.MIPROv2` in the cheap `light` mode on 500 question-answer pairs sampled from the `HotPotQA` dataset. ```python linenums="1" import dspy from dspy.datasets import HotPotQA dspy.configure(lm=dspy.LM("openai/gpt-4o-mini")) def search_wikipedia(query: str) -> list[str]: results = dspy.ColBERTv2(url="http://20.102.90.50:2017/wiki17_abstracts")(query, k=3) return [x["text"] for x in results] trainset = [x.with_inputs('question') for x in HotPotQA(train_seed=2024, train_size=500).train] react = dspy.ReAct("question -> answer", tools=[search_wikipedia]) tp = dspy.MIPROv2(metric=dspy.evaluate.answer_exact_match, auto="light", num_threads=24) optimized_react = tp.compile(react, trainset=trainset) ``` An informal run like this raises ReAct's score from 24% to 51%, by teaching `gpt-4o-mini` more about the specifics of the task. === "Optimizing prompts for RAG" Given a retrieval index to `search`, your favorite `dspy.LM`, and a small `trainset` of questions and ground-truth responses, the following code snippet can optimize your RAG system with long outputs against the built-in `SemanticF1` metric, which is implemented as a DSPy module. ```python linenums="1" class RAG(dspy.Module): def __init__(self, num_docs=5): self.num_docs = num_docs self.respond = dspy.ChainOfThought("context, question -> response") def forward(self, question): context = search(question, k=self.num_docs) # defined in tutorial linked below return self.respond(context=context, question=question) tp = dspy.MIPROv2(metric=dspy.evaluate.SemanticF1(decompositional=True), auto="medium", num_threads=24) optimized_rag = tp.compile(RAG(), trainset=trainset, max_bootstrapped_demos=2, max_labeled_demos=2) ``` For a complete RAG example that you can run, start this [tutorial](tutorials/rag/index.ipynb). It improves the quality of a RAG system over a subset of StackExchange communities by 10% relative gain. === "Optimizing weights for Classification" <details><summary>Click to show dataset setup code.</summary> ```python linenums="1" import random from typing import Literal from datasets import load_dataset import dspy from dspy.datasets import DataLoader # Load the Banking77 dataset. CLASSES = load_dataset("PolyAI/banking77", split="train", trust_remote_code=True).features["label"].names kwargs = {"fields": ("text", "label"), "input_keys": ("text",), "split": "train", "trust_remote_code": True} # Load the first 2000 examples from the dataset, and assign a hint to each *training* example. trainset = [ dspy.Example(x, hint=CLASSES[x.label], label=CLASSES[x.label]).with_inputs("text", "hint") for x in DataLoader().from_huggingface(dataset_name="PolyAI/banking77", **kwargs)[:2000] ] random.Random(0).shuffle(trainset) ``` </details> ```python linenums="1" import dspy lm=dspy.LM('openai/gpt-4o-mini-2024-07-18') # Define the DSPy module for classification. It will use the hint at training time, if available. signature = dspy.Signature("text, hint -> label").with_updated_fields("label", type_=Literal[tuple(CLASSES)]) classify = dspy.ChainOfThought(signature) classify.set_lm(lm) # Optimize via BootstrapFinetune. optimizer = dspy.BootstrapFinetune(metric=(lambda x, y, trace=None: x.label == y.label), num_threads=24) optimized = optimizer.compile(classify, trainset=trainset) optimized(text="What does a pending cash withdrawal mean?") # For a complete fine-tuning tutorial, see: https://dspy.ai/tutorials/classification_finetuning/ ``` **Possible Output (from the last line):** ```text Prediction( reasoning='A pending cash withdrawal indicates that a request to withdraw cash has been initiated but has not yet been completed or processed. This status means that the transaction is still in progress and the funds have not yet been deducted from the account or made available to the user.', label='pending_cash_withdrawal' ) ``` An informal run similar to this on DSPy 2.5.29 raises GPT-4o-mini's score 66% to 87%. ??? "What's an example of a DSPy optimizer? How do different optimizers work?" Take the `dspy.MIPROv2` optimizer as an example. First, MIPRO starts with the **bootstrapping stage**. It takes your program, which may be unoptimized at this point, and runs it many times across different inputs to collect traces of input/output behavior for each one of your modules. It filters these traces to keep only those that appear in trajectories scored highly by your metric. Second, MIPRO enters its **grounded proposal stage**. It previews your DSPy program's code, your data, and traces from running your program, and uses them to draft many potential instructions for every prompt in your program. Third, MIPRO launches the **discrete search stage**. It samples mini-batches from your training set, proposes a combination of instructions and traces to use for constructing every prompt in the pipeline, and evaluates the candidate program on the mini-batch. Using the resulting score, MIPRO updates a surrogate model that helps the proposals get better over time. One thing that makes DSPy optimizers so powerful is that they can be composed. You can run `dspy.MIPROv2` and use the produced program as an input to `dspy.MIPROv2` again or, say, to `dspy.BootstrapFinetune` to get better results. This is partly the essence of `dspy.BetterTogether`. Alternatively, you can run the optimizer and then extract the top-5 candidate programs and build a `dspy.Ensemble` of them. This allows you to scale _inference-time compute_ (e.g., ensembles) as well as DSPy's unique _pre-inference time compute_ (i.e., optimization budget) in highly systematic ways. <!-- Future: BootstrapRS or MIPRO on ??? with a local SGLang LM BootstrapFS on MATH with a tiny LM like Llama-3.2 with Ollama (maybe with a big teacher) --> ## 3) **DSPy's Ecosystem** advances open-source AI research. Compared to monolithic LMs, DSPy's modular paradigm enables a large community to improve the compositional architectures, inference-time strategies, and optimizers for LM programs in an open, distributed way. This gives DSPy users more control, helps them iterate much faster, and allows their programs to get better over time by applying the latest optimizers or modules. The DSPy research effort started at Stanford NLP in Feb 2022, building on what we had learned from developing early [compound LM systems](https://bair.berkeley.edu/blog/2024/02/18/compound-ai-systems/) like [ColBERT-QA](https://arxiv.org/abs/2007.00814), [Baleen](https://arxiv.org/abs/2101.00436), and [Hindsight](https://arxiv.org/abs/2110.07752). The first version was released as [DSP](https://arxiv.org/abs/2212.14024) in Dec 2022 and evolved by Oct 2023 into [DSPy](https://arxiv.org/abs/2310.03714). Thanks to [250 contributors](https://github.com/stanfordnlp/dspy/graphs/contributors), DSPy has introduced tens of thousands of people to building and optimizing modular LM programs. Since then, DSPy's community has produced a large body of work on optimizers, like [MIPROv2](https://arxiv.org/abs/2406.11695), [BetterTogether](https://arxiv.org/abs/2407.10930), and [LeReT](https://arxiv.org/abs/2410.23214), on program architectures, like [STORM](https://arxiv.org/abs/2402.14207), [IReRa](https://arxiv.org/abs/2401.12178), and [DSPy Assertions](https://arxiv.org/abs/2312.13382), and on successful applications to new problems, like [PAPILLON](https://arxiv.org/abs/2410.17127), [PATH](https://arxiv.org/abs/2406.11706), [WangLab@MEDIQA](https://arxiv.org/abs/2404.14544), [UMD's Prompting Case Study](https://arxiv.org/abs/2406.06608), and [Haize's Red-Teaming Program](https://blog.haizelabs.com/posts/dspy/), in addition to many open-source projects, production applications, and other [use cases](community/use-cases.md). ``` -------------------------------------------------------------------------------- /docs/docs/tutorials/sample_code_generation/index.md: -------------------------------------------------------------------------------- ```markdown # Automated Code Generation from Documentation with DSPy This tutorial demonstrates how to use DSPy to automatically fetch documentation from URLs and generate working code examples for any library. The system can analyze documentation websites, extract key concepts, and produce tailored code examples. ## What You'll Build A documentation-powered code generation system that: - Fetches and parses documentation from multiple URLs - Extracts API patterns, methods, and usage examples - Generates working code for specific use cases - Provides explanations and best practices - Works with any library's documentation ## Setup ```bash pip install dspy requests beautifulsoup4 html2text ``` ## Step 1: Documentation Fetching and Processing ```python import dspy import requests from bs4 import BeautifulSoup import html2text from typing import List, Dict, Any import json from urllib.parse import urljoin, urlparse import time # Configure DSPy lm = dspy.LM(model='openai/gpt-4o-mini') dspy.configure(lm=lm) class DocumentationFetcher: """Fetches and processes documentation from URLs.""" def __init__(self, max_retries=3, delay=1): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) self.max_retries = max_retries self.delay = delay self.html_converter = html2text.HTML2Text() self.html_converter.ignore_links = False self.html_converter.ignore_images = True def fetch_url(self, url: str) -> dict[str, str]: """Fetch content from a single URL.""" for attempt in range(self.max_retries): try: print(f"📡 Fetching: {url} (attempt {attempt + 1})") response = self.session.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Remove script and style elements for script in soup(["script", "style", "nav", "footer", "header"]): script.decompose() # Convert to markdown for better LLM processing markdown_content = self.html_converter.handle(str(soup)) return { "url": url, "title": soup.title.string if soup.title else "No title", "content": markdown_content, "success": True } except Exception as e: print(f"❌ Error fetching {url}: {e}") if attempt < self.max_retries - 1: time.sleep(self.delay) else: return { "url": url, "title": "Failed to fetch", "content": f"Error: {str(e)}", "success": False } return {"url": url, "title": "Failed", "content": "", "success": False} def fetch_documentation(self, urls: list[str]) -> list[dict[str, str]]: """Fetch documentation from multiple URLs.""" results = [] for url in urls: result = self.fetch_url(url) results.append(result) time.sleep(self.delay) # Be respectful to servers return results class LibraryAnalyzer(dspy.Signature): """Analyze library documentation to understand core concepts and patterns.""" library_name: str = dspy.InputField(desc="Name of the library to analyze") documentation_content: str = dspy.InputField(desc="Combined documentation content") core_concepts: list[str] = dspy.OutputField(desc="Main concepts and components") common_patterns: list[str] = dspy.OutputField(desc="Common usage patterns") key_methods: list[str] = dspy.OutputField(desc="Important methods and functions") installation_info: str = dspy.OutputField(desc="Installation and setup information") code_examples: list[str] = dspy.OutputField(desc="Example code snippets found") class CodeGenerator(dspy.Signature): """Generate code examples for specific use cases using the target library.""" library_info: str = dspy.InputField(desc="Library concepts and patterns") use_case: str = dspy.InputField(desc="Specific use case to implement") requirements: str = dspy.InputField(desc="Additional requirements or constraints") code_example: str = dspy.OutputField(desc="Complete, working code example") explanation: str = dspy.OutputField(desc="Step-by-step explanation of the code") best_practices: list[str] = dspy.OutputField(desc="Best practices and tips") imports_needed: list[str] = dspy.OutputField(desc="Required imports and dependencies") class DocumentationLearningAgent(dspy.Module): """Agent that learns from documentation URLs and generates code examples.""" def __init__(self): super().__init__() self.fetcher = DocumentationFetcher() self.analyze_docs = dspy.ChainOfThought(LibraryAnalyzer) self.generate_code = dspy.ChainOfThought(CodeGenerator) self.refine_code = dspy.ChainOfThought( "code, feedback -> improved_code: str, changes_made: list[str]" ) def learn_from_urls(self, library_name: str, doc_urls: list[str]) -> Dict: """Learn about a library from its documentation URLs.""" print(f"📚 Learning about {library_name} from {len(doc_urls)} URLs...") # Fetch all documentation docs = self.fetcher.fetch_documentation(doc_urls) # Combine successful fetches combined_content = "\n\n---\n\n".join([ f"URL: {doc['url']}\nTitle: {doc['title']}\n\n{doc['content']}" for doc in docs if doc['success'] ]) if not combined_content: raise ValueError("No documentation could be fetched successfully") # Analyze combined documentation analysis = self.analyze_docs( library_name=library_name, documentation_content=combined_content ) return { "library": library_name, "source_urls": [doc['url'] for doc in docs if doc['success']], "core_concepts": analysis.core_concepts, "patterns": analysis.common_patterns, "methods": analysis.key_methods, "installation": analysis.installation_info, "examples": analysis.code_examples, "fetched_docs": docs } def generate_example(self, library_info: Dict, use_case: str, requirements: str = "") -> Dict: """Generate a code example for a specific use case.""" # Format library information for the generator info_text = f""" Library: {library_info['library']} Core Concepts: {', '.join(library_info['core_concepts'])} Common Patterns: {', '.join(library_info['patterns'])} Key Methods: {', '.join(library_info['methods'])} Installation: {library_info['installation']} Example Code Snippets: {'; '.join(library_info['examples'][:3])} # First 3 examples """ code_result = self.generate_code( library_info=info_text, use_case=use_case, requirements=requirements ) return { "code": code_result.code_example, "explanation": code_result.explanation, "best_practices": code_result.best_practices, "imports": code_result.imports_needed } # Initialize the learning agent agent = DocumentationLearningAgent() ``` ## Step 2: Learning from Documentation URLs ```python def learn_library_from_urls(library_name: str, documentation_urls: list[str]) -> Dict: """Learn about any library from its documentation URLs.""" try: library_info = agent.learn_from_urls(library_name, documentation_urls) print(f"\n🔍 Library Analysis Results for {library_name}:") print(f"Sources: {len(library_info['source_urls'])} successful fetches") print(f"Core Concepts: {library_info['core_concepts']}") print(f"Common Patterns: {library_info['patterns']}") print(f"Key Methods: {library_info['methods']}") print(f"Installation: {library_info['installation']}") print(f"Found {len(library_info['examples'])} code examples") return library_info except Exception as e: print(f"❌ Error learning library: {e}") raise # Example 1: Learn FastAPI from official documentation fastapi_urls = [ "https://fastapi.tiangolo.com/", "https://fastapi.tiangolo.com/tutorial/first-steps/", "https://fastapi.tiangolo.com/tutorial/path-params/", "https://fastapi.tiangolo.com/tutorial/query-params/" ] print("🚀 Learning FastAPI from official documentation...") fastapi_info = learn_library_from_urls("FastAPI", fastapi_urls) # Example 2: Learn a different library (you can replace with any library) streamlit_urls = [ "https://docs.streamlit.io/", "https://docs.streamlit.io/get-started", "https://docs.streamlit.io/develop/api-reference" ] print("\n\n📊 Learning Streamlit from official documentation...") streamlit_info = learn_library_from_urls("Streamlit", streamlit_urls) ``` ## Step 3: Generating Code Examples ```python def generate_examples_for_library(library_info: Dict, library_name: str): """Generate code examples for any library based on its documentation.""" # Define generic use cases that can apply to most libraries use_cases = [ { "name": "Basic Setup and Hello World", "description": f"Create a minimal working example with {library_name}", "requirements": "Include installation, imports, and basic usage" }, { "name": "Common Operations", "description": f"Demonstrate the most common {library_name} operations", "requirements": "Show typical workflow and best practices" }, { "name": "Advanced Usage", "description": f"Create a more complex example showcasing {library_name} capabilities", "requirements": "Include error handling and optimization" } ] generated_examples = [] print(f"\n🔧 Generating examples for {library_name}...") for use_case in use_cases: print(f"\n📝 {use_case['name']}") print(f"Description: {use_case['description']}") example = agent.generate_example( library_info=library_info, use_case=use_case['description'], requirements=use_case['requirements'] ) print("\n💻 Generated Code:") print("```python") print(example['code']) print("```") print("\n📦 Required Imports:") for imp in example['imports']: print(f" • {imp}") print("\n📝 Explanation:") print(example['explanation']) print("\n✅ Best Practices:") for practice in example['best_practices']: print(f" • {practice}") generated_examples.append({ "use_case": use_case['name'], "code": example['code'], "imports": example['imports'], "explanation": example['explanation'], "best_practices": example['best_practices'] }) print("-" * 80) return generated_examples # Generate examples for both libraries print("🎯 Generating FastAPI Examples:") fastapi_examples = generate_examples_for_library(fastapi_info, "FastAPI") print("\n\n🎯 Generating Streamlit Examples:") streamlit_examples = generate_examples_for_library(streamlit_info, "Streamlit") ``` ## Step 4: Interactive Library Learning Function ```python def learn_any_library(library_name: str, documentation_urls: list[str], use_cases: list[str] = None): """Learn any library from its documentation and generate examples.""" if use_cases is None: use_cases = [ "Basic setup and hello world example", "Common operations and workflows", "Advanced usage with best practices" ] print(f"🚀 Starting automated learning for {library_name}...") print(f"Documentation sources: {len(documentation_urls)} URLs") try: # Step 1: Learn from documentation library_info = agent.learn_from_urls(library_name, documentation_urls) # Step 2: Generate examples for each use case all_examples = [] for i, use_case in enumerate(use_cases, 1): print(f"\n📝 Generating example {i}/{len(use_cases)}: {use_case}") example = agent.generate_example( library_info=library_info, use_case=use_case, requirements="Include error handling, comments, and follow best practices" ) all_examples.append({ "use_case": use_case, "code": example['code'], "imports": example['imports'], "explanation": example['explanation'], "best_practices": example['best_practices'] }) return { "library_info": library_info, "examples": all_examples } except Exception as e: print(f"❌ Error learning {library_name}: {e}") return None def interactive_learning_session(): """Interactive session for learning libraries with user input.""" print("🎯 Welcome to the Interactive Library Learning System!") print("This system will help you learn any Python library from its documentation.\n") learned_libraries = {} while True: print("\n" + "="*60) print("🚀 LIBRARY LEARNING SESSION") print("="*60) # Get library name from user library_name = input("\n📚 Enter the library name you want to learn (or 'quit' to exit): ").strip() if library_name.lower() in ['quit', 'exit', 'q']: print("\n👋 Thanks for using the Interactive Library Learning System!") break if not library_name: print("❌ Please enter a valid library name.") continue # Get documentation URLs print(f"\n🔗 Enter documentation URLs for {library_name} (one per line, empty line to finish):") urls = [] while True: url = input(" URL: ").strip() if not url: break if not url.startswith(('http://', 'https://')): print(" ⚠️ Please enter a valid URL starting with http:// or https://") continue urls.append(url) if not urls: print("❌ No valid URLs provided. Skipping this library.") continue # Get custom use cases from user print(f"\n🎯 Define use cases for {library_name} (optional, press Enter for defaults):") print(" Default use cases will be: Basic setup, Common operations, Advanced usage") user_wants_custom = input(" Do you want to define custom use cases? (y/n): ").strip().lower() use_cases = None if user_wants_custom in ['y', 'yes']: print(" Enter your use cases (one per line, empty line to finish):") use_cases = [] while True: use_case = input(" Use case: ").strip() if not use_case: break use_cases.append(use_case) if not use_cases: print(" No custom use cases provided, using defaults.") use_cases = None # Learn the library print(f"\n🚀 Starting learning process for {library_name}...") result = learn_any_library(library_name, urls, use_cases) if result: learned_libraries[library_name] = result print(f"\n✅ Successfully learned {library_name}!") # Show summary print(f"\n📊 Learning Summary for {library_name}:") print(f" • Core concepts: {len(result['library_info']['core_concepts'])} identified") print(f" • Common patterns: {len(result['library_info']['patterns'])} found") print(f" • Examples generated: {len(result['examples'])}") # Ask if user wants to see examples show_examples = input(f"\n👀 Do you want to see the generated examples for {library_name}? (y/n): ").strip().lower() if show_examples in ['y', 'yes']: for i, example in enumerate(result['examples'], 1): print(f"\n{'─'*50}") print(f"📝 Example {i}: {example['use_case']}") print(f"{'─'*50}") print("\n💻 Generated Code:") print("```python") print(example['code']) print("```") print(f"\n📦 Required Imports:") for imp in example['imports']: print(f" • {imp}") print(f"\n📝 Explanation:") print(example['explanation']) print(f"\n✅ Best Practices:") for practice in example['best_practices']: print(f" • {practice}") # Ask if user wants to see the next example if i < len(result['examples']): continue_viewing = input(f"\nContinue to next example? (y/n): ").strip().lower() if continue_viewing not in ['y', 'yes']: break # Offer to save results save_results = input(f"\n💾 Save learning results for {library_name} to file? (y/n): ").strip().lower() if save_results in ['y', 'yes']: filename = input(f" Enter filename (default: {library_name.lower()}_learning.json): ").strip() if not filename: filename = f"{library_name.lower()}_learning.json" try: import json with open(filename, 'w') as f: json.dump(result, f, indent=2, default=str) print(f" ✅ Results saved to {filename}") except Exception as e: print(f" ❌ Error saving file: {e}") else: print(f"❌ Failed to learn {library_name}") # Ask if user wants to learn another library print(f"\n📚 Libraries learned so far: {list(learned_libraries.keys())}") continue_learning = input("\n🔄 Do you want to learn another library? (y/n): ").strip().lower() if continue_learning not in ['y', 'yes']: break # Final summary if learned_libraries: print(f"\n🎉 Session Summary:") print(f"Successfully learned {len(learned_libraries)} libraries:") for lib_name, info in learned_libraries.items(): print(f" • {lib_name}: {len(info['examples'])} examples generated") return learned_libraries # Example: Run interactive learning session if __name__ == "__main__": # Run interactive session learned_libraries = interactive_learning_session() ``` ## Example Output When you run the interactive learning system, you'll see: **Interactive Session Start:** ``` 🎯 Welcome to the Interactive Library Learning System! This system will help you learn any Python library from its documentation. ============================================================ 🚀 LIBRARY LEARNING SESSION ============================================================ 📚 Enter the library name you want to learn (or 'quit' to exit): FastAPI 🔗 Enter documentation URLs for FastAPI (one per line, empty line to finish): URL: https://fastapi.tiangolo.com/ URL: https://fastapi.tiangolo.com/tutorial/first-steps/ URL: https://fastapi.tiangolo.com/tutorial/path-params/ URL: 🎯 Define use cases for FastAPI (optional, press Enter for defaults): Default use cases will be: Basic setup, Common operations, Advanced usage Do you want to define custom use cases? (y/n): y Enter your use cases (one per line, empty line to finish): Use case: Create a REST API with authentication Use case: Build a file upload endpoint Use case: Add database integration with SQLAlchemy Use case: ``` **Documentation Processing:** ``` 🚀 Starting learning process for FastAPI... 🚀 Starting automated learning for FastAPI... Documentation sources: 3 URLs 📡 Fetching: https://fastapi.tiangolo.com/ (attempt 1) 📡 Fetching: https://fastapi.tiangolo.com/tutorial/first-steps/ (attempt 1) 📡 Fetching: https://fastapi.tiangolo.com/tutorial/path-params/ (attempt 1) 📚 Learning about FastAPI from 3 URLs... 🔍 Library Analysis Results for FastAPI: Sources: 3 successful fetches Core Concepts: ['FastAPI app', 'path operations', 'dependencies', 'request/response models'] Common Patterns: ['app = FastAPI()', 'decorator-based routing', 'Pydantic models'] Key Methods: ['FastAPI()', '@app.get()', '@app.post()', 'uvicorn.run()'] Installation: pip install fastapi uvicorn ``` **Code Generation:** ``` 📝 Generating example 1/3: Create a REST API with authentication ✅ Successfully learned FastAPI! 📊 Learning Summary for FastAPI: • Core concepts: 4 identified • Common patterns: 3 found • Examples generated: 3 👀 Do you want to see the generated examples for FastAPI? (y/n): y ────────────────────────────────────────────────── 📝 Example 1: Create a REST API with authentication ────────────────────────────────────────────────── 💻 Generated Code: from fastapi import FastAPI, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import uvicorn from typing import Dict import jwt from datetime import datetime, timedelta app = FastAPI(title="Authenticated API", version="1.0.0") security = HTTPBearer() # Secret key for JWT (use environment variable in production) SECRET_KEY = "your-secret-key-here" ALGORITHM = "HS256" def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise HTTPException(status_code=401, detail="Invalid token") return username except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Invalid token") @app.post("/login") async def login(username: str, password: str) -> dict[str, str]: # In production, verify against database if username == "admin" and password == "secret": token_data = {"sub": username, "exp": datetime.utcnow() + timedelta(hours=24)} token = jwt.encode(token_data, SECRET_KEY, algorithm=ALGORITHM) return {"access_token": token, "token_type": "bearer"} raise HTTPException(status_code=401, detail="Invalid credentials") @app.get("/protected") async def protected_route(current_user: str = Depends(verify_token)) -> dict[str, str]: return {"message": f"Hello {current_user}! This is a protected route."} if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000) 📦 Required Imports: • pip install fastapi uvicorn python-jose[cryptography] • from fastapi import FastAPI, Depends, HTTPException, status • from fastapi.security import HTTPBearer • import jwt 📝 Explanation: This example creates a FastAPI application with JWT-based authentication. It includes a login endpoint that returns a JWT token and a protected route that requires authentication... ✅ Best Practices: • Use environment variables for secret keys • Implement proper password hashing in production • Add token expiration and refresh logic • Include proper error handling Continue to next example? (y/n): n 💾 Save learning results for FastAPI to file? (y/n): y Enter filename (default: fastapi_learning.json): ✅ Results saved to fastapi_learning.json 📚 Libraries learned so far: ['FastAPI'] 🔄 Do you want to learn another library? (y/n): n 🎉 Session Summary: Successfully learned 1 libraries: • FastAPI: 3 examples generated ``` ## Next Steps - **GitHub Integration**: Learn from README files and example repositories - **Video Tutorial Processing**: Extract information from video documentation - **Community Examples**: Aggregate examples from Stack Overflow and forums - **Version Comparison**: Track API changes across library versions - **Testing Generation**: Automatically create unit tests for generated code - **Page Crawling**: Automatically crawl documentation pages to actively understand the usage This tutorial demonstrates how DSPy can automate the entire process of learning unfamiliar libraries from their documentation, making it valuable for rapid technology adoption and exploration. ```