#
tokens: 21785/50000 31/31 files
lines: on (toggle) GitHub
raw markdown copy reset
# Directory Structure

```
├── .gitignore
├── level1_atomic_prompts
│   └── level1.ipynb
├── level2_multi_interaction
│   ├── print_utils.py
│   ├── t1_sequence.py
│   ├── t2_iterative_refinement.py
│   ├── t3_conditional_branch.py
│   ├── t3-multi_out_refine.py
│   ├── t3-multi_out.py
│   └── t4_reflection.py
├── level3_evaluation
│   ├── analysis.ipynb
│   ├── pairwise_elo.py
│   ├── print_utils.py
│   └── reflection.py
├── level4_tools
│   ├── idea_gen.py
│   ├── joke_gen.py
│   ├── main.py
│   ├── print_utils.py
│   ├── tool_calling_agent.py
│   └── tools.py
├── level5_rags
│   ├── annoy_rag.py
│   ├── basic_rag.py
│   ├── bm25_retriever.py
│   ├── hyde.py
│   ├── idea_gen.py
│   ├── joke_gen.py
│   ├── main.py
│   ├── prepare_data.py
│   ├── print_utils.py
│   ├── rank_fusion.py
│   ├── tools.py
│   └── vector_embedding.py
├── LICENSE
├── pyproject.toml
├── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
 1 | jj/
 2 | *csv
 3 | data/
 4 | *sqlite
 5 | *db
 6 | *mlruns*
 7 | *mlartifacts*
 8 | *pyc
 9 | *__pycache__*
10 | *swp
11 | 
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
  1 | # Context Engineering Tutorial
  2 | 
  3 | This repo contains the source code from my Youtube course on Context Engineering with DSPy.
  4 | 
  5 | > **📺 Watch the Course for free**  
  6 | > **[Context Engineering - Complete 1h 20m Course](https://youtu.be/5Bym0ffALaU?si=gOLDiT-IVE7CxRwX)**  
  7 | > *Learn advanced prompt engineering techniques with hands-on examples*
  8 | 
  9 | ## Support
 10 | 
 11 | If you find this content helpful, please consider supporting my work on Patreon. Your support helps me create more in-depth tutorials and content. My Patreon hosts all the code, projects, slides, write-ups I have ever made on my YouTube channel. 
 12 | 
 13 | [<img src="https://c5.patreon.com/external/logo/become_a_patron_button.png" alt="Become a Patron!" width="200">](https://www.patreon.com/NeuralBreakdownwithAVB)
 14 | 
 15 | ## Getting Started
 16 | 
 17 | ### Prerequisites
 18 | 
 19 | -   **Python 3.10+** (required)
 20 | -   **`uv`** (recommended) or `pip` for package management
 21 | 
 22 | ### Installation
 23 | 
 24 | 1.  **Clone the repository:**
 25 |     ```bash
 26 |     git clone https://github.com/avbiswas/context-engineering-dspy
 27 |     cd context-engineering-dspy/tutorial
 28 |     ```
 29 | 
 30 | 2.  **Install dependencies:**
 31 |     ```bash
 32 |     # Using uv
 33 |     uv sync
 34 |     ```
 35 | 
 36 | 3.  **Set up your API keys:**
 37 |     
 38 |     **Required API Keys:**
 39 |     - `OPENAI_API_KEY` - For OpenAI models
 40 |     - `GEMINI_API_KEY` - For Google Gemini models  
 41 |     - `TAVILY_API_KEY` - For web search functionality
 42 |     
 43 |     **Environment Management Options:**
 44 |     
 45 |     **Option 1: Using `direnv` (Recommended)**
 46 |     ```bash
 47 |     # Install direnv first, then create .envrc file
 48 |     echo "export OPENAI_API_KEY=your_key_here" >> .envrc
 49 |     echo "export GEMINI_API_KEY=your_key_here" >> .envrc
 50 |     echo "export TAVILY_API_KEY=your_key_here" >> .envrc
 51 |     direnv allow
 52 |     ```
 53 |     
 54 |     **Option 2: Using `.env` file with python-dotenv**
 55 |     ```bash
 56 |     # Create .env file
 57 |     touch .env
 58 |     ```
 59 |     Add your keys to `.env`:
 60 |     ```env
 61 |     OPENAI_API_KEY=your_key_here
 62 |     GEMINI_API_KEY=your_key_here
 63 |     TAVILY_API_KEY=your_key_here
 64 |     ```
 65 |     *Note: This requires adding `dotenv.load_dotenv()` to your Python scripts.*
 66 |     
 67 |     **Option 3: Global environment variables** *(Not recommended for security)*
 68 |     ```bash
 69 |     export OPENAI_API_KEY=your_key_here
 70 |     # Repeat for other keys...
 71 |     ```
 72 | 
 73 | 4.  **Run the examples:**
 74 |     Navigate to any level directory and run the Python scripts:
 75 |     ```bash
 76 |     cd level2_multi_interaction
 77 |     uv run t1_sequential_flow.py
 78 |     ```
 79 | 
 80 | ## File Descriptions
 81 | 
 82 | ### Level 1: Atomic Prompts
 83 | 
 84 | -   `level1_atomic_prompts/level1.ipynb`: Introduces the basics of prompting and interacting with language models.
 85 | 
 86 | ### Level 2: Multi-Interaction
 87 | 
 88 | -   `level2_multi_interaction/t1_sequential_flow.py`: Demonstrates a sequential flow of interactions with the language model.
 89 | -   `level2_multi_interaction/t2_iterative_refinement.py`: Shows how to iteratively refine the output from the model.
 90 | -   `level2_multi_interaction/t3_conditional_branch.py`: Illustrates how to use conditional logic to guide the conversation with the model.
 91 | -   `level2_multi_interaction/t3-multi_out.py`: Multiple output handling example.
 92 | -   `level2_multi_interaction/t3-multi_out_refine.py`: Refined multiple output handling.
 93 | -   `level2_multi_interaction/t4_reflection.py`: An example of how to make the model reflect on its own output.
 94 | 
 95 | ### Level 3: Evaluation
 96 | 
 97 | To run mlflow server, use the command:
 98 | `uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite --port 5000`
 99 | 
100 | Uncomment the below lines to track experiments in mlflow
101 | 
102 | ```
103 | # import mlflow
104 | # mlflow.autolog()
105 | # mlflow.set_tracking_uri("http://127.0.0.1:5000")
106 | # mlflow.set_experiment("Tool calling")
107 | ```
108 | You can visit `localhost:5000` to track experiments from the mlflow dashboard.
109 | 
110 | -   `level3_evaluation/reflection.py`: Shows how to use reflection for evaluation to generate dataset of results with different hyperparams.
111 | -   `level3_evaluation/pairwise_elo.py`: Use pairwise comparison of model outputs (not actual elo, but similar motives)
112 | -   `level3_evaluation/analysis.ipynb`: Analysis notebook for evaluation techniques.
113 | 
114 | ### Level 4: Tools
115 | 
116 | You will need the TAVILY_API_KEY to run web search. You can sign up for a free account from their website.
117 | 
118 | -   `level4_tools/main.py`: Main tool usage examples.
119 | -   `level4_tools/tool_calling_agent.py`: An example of a tool-calling agent.
120 | -   `level4_tools/tools.py`: Tool definitions and implementations.
121 | -   `level4_tools/idea_gen.py`: Idea generation tool example.
122 | -   `level4_tools/joke_gen.py`: Joke generation tool example.
123 | 
124 | ### Level 5: RAGs (Retrieval-Augmented Generation)
125 | 
126 | First, download this dataset:
127 | https://www.kaggle.com/datasets/abhinavmoudgil95/short-jokes
128 | 
129 | Unzip inside `level5/data`
130 | Next, prepare the embeddings:
131 | ```
132 | cd level5_rags
133 | uv run vector_embedding.py
134 | ```
135 | 
136 | This code looks for the file `level5_rags/data/shortjokes.csv`
137 | This will create some files inside the `data/` directory. You should now be able to run scripts to play with retrieval.
138 | 
139 | **Core RAG Implementations:**
140 | -   `level5_rags/basic_rag.py`: A basic RAG implementation.
141 | -   `level5_rags/hyde.py`: An implementation of the HyDE (Hypothetical Document Embeddings) technique.
142 | -   `level5_rags/annoy_rag.py`: RAG implementation using Annoy for vector similarity.
143 | 
144 | **Retrieval Components:**
145 | -   `level5_rags/bm25_retriever.py`: BM25-based retrieval implementation.
146 | -   `level5_rags/rank_fusion.py`: An example of fusing ranks from multiple retrievers.
147 | -   `level5_rags/vector_embedding.py`: Vector embedding utilities.
148 | 
149 | **Tools & Applications:**
150 | -   `level5_rags/main.py`: Main application with RAG-powered tools.
151 | -   `level5_rags/tools.py`: Tool definitions for RAG applications.
152 | -   `level5_rags/joke_gen.py`: Joke generation using RAG.
153 | -   `level5_rags/idea_gen.py`: Idea generation using RAG.
154 | 
155 | **Utilities:**
156 | -   `level5_rags/prepare_data.py`: Data preparation utilities for RAG systems.
157 | -   `level5_rags/data/`: Directory containing data files for RAG examples.
158 | 
159 | 
160 | ## Quick Start Patterns
161 | 
162 | To run examples from each level:
163 | 
164 | ```bash
165 | # Level 2 - Multi-interaction examples
166 | cd level2_multi_interaction
167 | uv run t1_sequential_flow.py
168 | uv run t2_iterative_refinement.py
169 | ```
170 | 
```

--------------------------------------------------------------------------------
/level5_rags/prepare_data.py:
--------------------------------------------------------------------------------

```python
 1 | import pandas as pd
 2 | 
 3 | def prepare_jokes():
 4 |     """
 5 |     Reads jokes from the original CSV and saves them to a text file,
 6 |     creating a single source of truth.
 7 |     """
 8 |     df = pd.read_csv("data/shortjokes.csv")
 9 |     jokes = df["Joke"].tolist()
10 | 
11 |     with open("data/jokes.txt", "w") as f:
12 |         for joke in jokes:
13 |             f.write(joke + "\n")
14 | 
15 | if __name__ == "__main__":
16 |     prepare_jokes()
17 |     print("Jokes have been extracted and saved to data/jokes.txt")
18 | 
19 | 
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
 1 | [project]
 2 | name = "contextengineering"
 3 | version = "0.1.0"
 4 | description = "Add your description here"
 5 | readme = "README.md"
 6 | requires-python = ">=3.10"
 7 | dependencies = [
 8 |     "annoy>=1.17.3",
 9 |     "asyncio>=3.4.3",
10 |     "dspy>=2.6.27",
11 |     "google-genai>=1.26.0",
12 |     "ipykernel>=6.29.5",
13 |     "matplotlib>=3.10.3",
14 |     "mcp[cli]>=1.12.0",
15 |     "mem0ai>=0.1.114",
16 |     "mlflow>=3.1.1",
17 |     "pandas>=2.3.1",
18 |     "pydantic>=2.11.7",
19 |     "rank-bm25>=0.2.2",
20 |     "seaborn>=0.13.2",
21 |     "tavily-python>=0.7.10",
22 |     "torch>=2.7.1",
23 |     "transformers>=4.53.2",
24 |     "twikit>=2.3.3",
25 | ]
26 | 
```

--------------------------------------------------------------------------------
/level5_rags/tools.py:
--------------------------------------------------------------------------------

```python
 1 | import os
 2 | from print_utils import print
 3 | from tavily import TavilyClient
 4 | from typing import List
 5 | 
 6 | tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
 7 | 
 8 | def fetch_recent_news(query: str) -> List[str]:
 9 |     """
10 |     Inputs a query string, searches for news, and returns the top results.
11 | 
12 |     Args:
13 |     query: String to search
14 | 
15 |     Returns:
16 |     content: List of strings, each containing a news article about the topic
17 |     """
18 |     response = tavily_client.search(query, topic="news", max_results=4)
19 |     return [x["content"] for x in response["results"]]
20 | 
21 | 
22 | 
23 | if __name__ == "__main__":
24 |     responses = fetch_recent_news("Kimi model")
25 |     print(responses)
26 | 
27 | 
28 | 
29 | 
```

--------------------------------------------------------------------------------
/level4_tools/main.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | import asyncio
 3 | from idea_gen import IdeaGenerator
 4 | from joke_gen import JokeGenerator
 5 | 
 6 | # import mlflow
 7 | # mlflow.autolog()
 8 | # mlflow.set_tracking_uri("http://127.0.0.1:5000")
 9 | # mlflow.set_experiment("Tool calling")
10 | 
11 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
12 | dspy.configure_cache(
13 |     enable_disk_cache=False,
14 |     enable_memory_cache=False,
15 | )
16 | 
17 | idea_generator = IdeaGenerator(num_samples=5)
18 | joke_generator = JokeGenerator(num_reflection_steps=2)
19 | 
20 | @mlflow.trace
21 | async def main(query):
22 |     idea = await idea_generator.acall(query=query)
23 |     joke = await joke_generator.acall(joke_idea=idea)
24 |     return joke
25 | 
26 | 
27 | if __name__ == "__main__":
28 |     query = input("Query: \n")
29 |     output = asyncio.run(main(query))
30 |     print(output)
31 | 
32 | 
33 | 
```

--------------------------------------------------------------------------------
/level4_tools/tools.py:
--------------------------------------------------------------------------------

```python
 1 | import os
 2 | from print_utils import print
 3 | from tavily import TavilyClient
 4 | from typing import List
 5 | 
 6 | tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
 7 | 
 8 | def fetch_recent_news(query: str) -> List[str]:
 9 |     """
10 |     Inputs a query string, searches for news, and returns the top results.
11 | 
12 |     Args:
13 |     query: String to search
14 | 
15 |     Returns:
16 |     content: List of strings, each containing a news article about the topic
17 |     """
18 |     response = tavily_client.search(query, search_depth="advanced", 
19 |                                     topic="news", days=7, max_results=3)
20 |     return [x["content"] for x in response["results"]]
21 | 
22 | 
23 | 
24 | if __name__ == "__main__":
25 |     responses = fetch_recent_news("International Math Olympiad IMO")
26 |     print(responses)
27 | 
28 | 
29 | 
30 | 
```

--------------------------------------------------------------------------------
/level4_tools/tool_calling_agent.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | from tools import fetch_recent_news
 3 | 
 4 | class HaikuGenerator(dspy.Signature):
 5 |     """
 6 | Generates a haiku about the latest news on the query.
 7 | Also create a simple file where you save the final summary.
 8 |     """
 9 |     query = dspy.InputField()
10 |     summary = dspy.OutputField(desc="A summary of the latest news")
11 |     haiku = dspy.OutputField()
12 | 
13 | def write_things_into_file(text: str, filename: str) -> str:
14 |     """write text into a file"""
15 |     with open(filename, "w") as f:
16 |         f.write(text)
17 |     return "File written!"
18 | 
19 | program = dspy.ReAct(signature=HaikuGenerator,
20 |                      tools=[fetch_recent_news, write_things_into_file],
21 |                      max_iters=4)
22 | 
23 | program.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
24 | 
25 | 
26 | pred = program(query="OpenAI")
27 | 
28 | print(pred.summary)
29 | print()
30 | print(pred.haiku)
31 | 
32 | print(program.inspect_history(n=4))
33 | 
```

--------------------------------------------------------------------------------
/level5_rags/main.py:
--------------------------------------------------------------------------------

```python
 1 | import numpy as np
 2 | import dspy
 3 | import asyncio
 4 | from idea_gen import IdeaGenerator
 5 | from joke_gen import JokeGenerator
 6 | from hyde import MultiHopHydeSearch
 7 | 
 8 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
 9 | dspy.configure_cache(
10 |     enable_disk_cache=False,
11 |     enable_memory_cache=False,
12 | )
13 | 
14 | idea_generator = IdeaGenerator(num_samples=3)
15 | joke_generator = JokeGenerator()
16 | 
17 | run_id = "1"
18 | with open(f"data/jokes_{run_id}.txt", "r") as f:
19 |     jokes = [line.strip() for line in f.readlines()]
20 | embeddings = np.load(f"data/embeddings_{run_id}.npy")
21 | 
22 | retriever = MultiHopHydeSearch(jokes, embeddings, n_hops=2, k=5)
23 | 
24 | 
25 | async def main(query):
26 |     idea = await idea_generator.acall(query=query)
27 | 
28 |     search_query = f"""
29 | query={query}
30 | setup={idea.setup}
31 | punchline={idea.punchline}
32 |         """
33 |     punchlines = retriever(query=search_query).jokes
34 |     joke = await joke_generator.acall(joke_idea=idea, punchlines=punchlines)
35 |     return joke
36 | 
37 | 
38 | if __name__ == "__main__":
39 |     query = input("Query: \n")
40 |     
41 |     # query = "OpenAI Agents"
42 |     output = asyncio.run(main(query))
43 |     print(output)
44 | 
45 | 
46 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/print_utils.py:
--------------------------------------------------------------------------------

```python
 1 | from rich.console import Console
 2 | console = Console()
 3 | print = console.print
 4 | 
 5 | import time
 6 | import asyncio
 7 | import functools
 8 | import inspect
 9 | 
10 | def time_it(func):
11 |     """A universal decorator to measure execution time for both sync and async functions."""
12 |     @functools.wraps(func)
13 |     def wrapper(*args, **kwargs):
14 |         # Check if the function is a coroutine function (async def)
15 |         if inspect.iscoroutinefunction(func):
16 |             # Define and return an async wrapper to handle the coroutine
17 |             async def async_wrapper():
18 |                 start_time = time.perf_counter()
19 |                 result = await func(*args, **kwargs) # Await the coroutine
20 |                 end_time = time.perf_counter()
21 |                 elapsed_time = end_time - start_time
22 |                 print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
23 |                 return result
24 |             return async_wrapper()
25 |         else:
26 |             # Use the original synchronous logic
27 |             start_time = time.perf_counter()
28 |             result = func(*args, **kwargs)
29 |             end_time = time.perf_counter()
30 |             elapsed_time = end_time - start_time
31 |             print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
32 |             return result
33 |     return wrapper
34 | 
```

--------------------------------------------------------------------------------
/level3_evaluation/print_utils.py:
--------------------------------------------------------------------------------

```python
 1 | from rich.console import Console
 2 | console = Console()
 3 | print = console.print
 4 | 
 5 | import time
 6 | import asyncio
 7 | import functools
 8 | import inspect
 9 | 
10 | def time_it(func):
11 |     """A universal decorator to measure execution time for both sync and async functions."""
12 |     @functools.wraps(func)
13 |     def wrapper(*args, **kwargs):
14 |         # Check if the function is a coroutine function (async def)
15 |         if inspect.iscoroutinefunction(func):
16 |             # Define and return an async wrapper to handle the coroutine
17 |             async def async_wrapper():
18 |                 start_time = time.perf_counter()
19 |                 result = await func(*args, **kwargs) # Await the coroutine
20 |                 end_time = time.perf_counter()
21 |                 elapsed_time = end_time - start_time
22 |                 print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
23 |                 return result
24 |             return async_wrapper()
25 |         else:
26 |             # Use the original synchronous logic
27 |             start_time = time.perf_counter()
28 |             result = func(*args, **kwargs)
29 |             end_time = time.perf_counter()
30 |             elapsed_time = end_time - start_time
31 |             print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
32 |             return result
33 |     return wrapper
34 | 
```

--------------------------------------------------------------------------------
/level4_tools/print_utils.py:
--------------------------------------------------------------------------------

```python
 1 | from rich.console import Console
 2 | console = Console()
 3 | print = console.print
 4 | 
 5 | import time
 6 | import asyncio
 7 | import functools
 8 | import inspect
 9 | 
10 | def time_it(func):
11 |     """A universal decorator to measure execution time for both sync and async functions."""
12 |     @functools.wraps(func)
13 |     def wrapper(*args, **kwargs):
14 |         # Check if the function is a coroutine function (async def)
15 |         if inspect.iscoroutinefunction(func):
16 |             # Define and return an async wrapper to handle the coroutine
17 |             async def async_wrapper():
18 |                 start_time = time.perf_counter()
19 |                 result = await func(*args, **kwargs) # Await the coroutine
20 |                 end_time = time.perf_counter()
21 |                 elapsed_time = end_time - start_time
22 |                 print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
23 |                 return result
24 |             return async_wrapper()
25 |         else:
26 |             # Use the original synchronous logic
27 |             start_time = time.perf_counter()
28 |             result = func(*args, **kwargs)
29 |             end_time = time.perf_counter()
30 |             elapsed_time = end_time - start_time
31 |             print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
32 |             return result
33 |     return wrapper
34 | 
```

--------------------------------------------------------------------------------
/level5_rags/print_utils.py:
--------------------------------------------------------------------------------

```python
 1 | from rich.console import Console
 2 | console = Console()
 3 | print = console.print
 4 | 
 5 | import time
 6 | import asyncio
 7 | import functools
 8 | import inspect
 9 | 
10 | def time_it(func):
11 |     """A universal decorator to measure execution time for both sync and async functions."""
12 |     @functools.wraps(func)
13 |     def wrapper(*args, **kwargs):
14 |         # Check if the function is a coroutine function (async def)
15 |         if inspect.iscoroutinefunction(func):
16 |             # Define and return an async wrapper to handle the coroutine
17 |             async def async_wrapper():
18 |                 start_time = time.perf_counter()
19 |                 result = await func(*args, **kwargs) # Await the coroutine
20 |                 end_time = time.perf_counter()
21 |                 elapsed_time = end_time - start_time
22 |                 print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
23 |                 return result
24 |             return async_wrapper()
25 |         else:
26 |             # Use the original synchronous logic
27 |             start_time = time.perf_counter()
28 |             result = func(*args, **kwargs)
29 |             end_time = time.perf_counter()
30 |             elapsed_time = end_time - start_time
31 |             print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
32 |             return result
33 |     return wrapper
34 | 
```

--------------------------------------------------------------------------------
/level5_rags/bm25_retriever.py:
--------------------------------------------------------------------------------

```python
 1 | import time
 2 | from rank_bm25 import BM25Okapi
 3 | 
 4 | 
 5 | class BM25Retriever:
 6 |     def __init__(self, texts):
 7 |         self.texts = texts
 8 |         # Tokenize the texts (simple split is used for this example)
 9 |         tokenized_corpus = [doc.split(" ") for doc in texts]
10 | 
11 |         # Create the BM25 index
12 |         self.bm25 = BM25Okapi(tokenized_corpus)
13 | 
14 |     def get_nearest(self, query: str, k: int = 10):
15 |         """
16 |         Retrieves the top k most relevant documents for a given query
17 |         using BM25 lexical search.
18 |         """
19 |         # Tokenize the query
20 |         tokenized_query = query.split(" ")
21 | 
22 |         # Get the top n documents
23 |         top_k_docs = self.bm25.get_top_n(tokenized_query, self.texts, n=k)
24 | 
25 |         return top_k_docs
26 | 
27 | 
28 | if __name__ == "__main__":
29 |     query = "Cell phones"
30 |     run_id = "1"
31 | 
32 |     print(f"Loading data for run_id: {run_id}...")
33 |     with open(f"data/jokes_{run_id}.txt", "r") as f:
34 |         jokes = [line.strip() for line in f.readlines()]
35 |     print("Data loaded.")
36 | 
37 |     # --- BM25 Retriever ---
38 |     print("\n--- Using BM25Retriever (Lexical Search) ---")
39 |     bm25_retriever = BM25Retriever(jokes)
40 | 
41 |     start_time = time.time()
42 |     nearest_bm25 = bm25_retriever.get_nearest(query, k=10)
43 |     end_time = time.time()
44 | 
45 |     print(f"Time taken: {end_time - start_time:.6f} seconds")
46 |     print(nearest_bm25)
47 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/t1_sequence.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | from print_utils import print
 3 | from typing import Optional
 4 | from pydantic import BaseModel, Field
 5 | dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash"))
 6 | 
 7 | class JokeIdea(BaseModel):
 8 |     setup: str
 9 |     contradiction: str
10 |     punchline: str
11 | 
12 | class QueryToIdea(dspy.Signature):
13 |     """
14 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
15 | 
16 |     """
17 |     query: str = dspy.InputField()
18 |     joke_idea: JokeIdea = dspy.OutputField()
19 | 
20 | class IdeaToJoke(dspy.Signature):
21 |     """
22 |     You are a funny comedian who likes to tell stories before delivering a punchline. 
23 |     You are always funny and act on the input joke idea.
24 |     """
25 |     joke_idea: JokeIdea = dspy.InputField()
26 |     joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")
27 | 
28 | class JokeGenerator(dspy.Module):
29 |     def __init__(self):
30 |         self.query_to_idea = dspy.Predict(QueryToIdea)
31 |         self.idea_to_joke = dspy.Predict(IdeaToJoke)
32 | 
33 |     def forward(self, query: str):
34 |         joke_idea = self.query_to_idea(query=query)
35 |         print(f"Joke Idea:\n{joke_idea}")
36 | 
37 |         joke = self.idea_to_joke(joke_idea=joke_idea)
38 |         print(f"Joke:\n{joke}")
39 |         return joke
40 | 
41 | joke_generator = JokeGenerator()
42 | joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.")
43 | 
44 | print("---")
45 | print(joke.joke)
46 | 
```

--------------------------------------------------------------------------------
/level5_rags/basic_rag.py:
--------------------------------------------------------------------------------

```python
 1 | import numpy as np
 2 | from vector_embedding import embed_texts
 3 | 
 4 | 
 5 | class BasicEmbeddingsRAG:
 6 |     def __init__(self, texts, embeddings):
 7 |         self.texts = texts
 8 |         # Normalize embeddings for cosine similarity
 9 |         self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
10 | 
11 |     def get_nearest(self, query: str, k: int = 10):
12 |         query_emb = embed_texts([query])
13 |         # Normalize query embedding
14 |         query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True)
15 | 
16 |         # Calculate cosine similarity
17 |         similarity = np.dot(query_emb, self.embeddings.T).flatten()
18 | 
19 |         # Get top k indices, sorted by similarity
20 |         topk_indices_unsorted = np.argpartition(similarity, -k)[-k:]
21 |         topk_indices_sorted = sorted(
22 |             topk_indices_unsorted, key=lambda i: similarity[i], reverse=True
23 |         )
24 | 
25 |         return [self.texts[i] for i in topk_indices_sorted]
26 | 
27 | 
28 | if __name__ == "__main__":
29 |     import time
30 | 
31 |     query = "Plants and trees"
32 |     run_id = "1"
33 |     with open(f"data/jokes_{run_id}.txt", "r") as f:
34 |         jokes = [line.strip() for line in f.readlines()]
35 |     embeddings = np.load(f"data/embeddings_{run_id}.npy")
36 | 
37 |     basic_rag = BasicEmbeddingsRAG(jokes, embeddings)
38 | 
39 |     start_time = time.time()
40 |     nearest = basic_rag.get_nearest(query, k=10)
41 | 
42 |     print(f"Time taken: {time.time() - start_time}")
43 |     print(nearest)
44 | 
```

--------------------------------------------------------------------------------
/level5_rags/vector_embedding.py:
--------------------------------------------------------------------------------

```python
 1 | import numpy as np
 2 | import pandas as pd
 3 | import uuid
 4 | import torch
 5 | from transformers import DistilBertModel, DistilBertTokenizer
 6 | 
 7 | device = torch.device("mps")
 8 | 
 9 | tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
10 | model = DistilBertModel.from_pretrained("distilbert-base-uncased")
11 | model.to(device)
12 | 
13 | 
14 | def embed_texts(texts):
15 |     encoded_input = tokenizer(texts, padding=True, return_tensors="pt").to(device)
16 |     with torch.no_grad():
17 |         model_output = model(**encoded_input)
18 |     embeddings = model_output.last_hidden_state[:, 0, :].cpu().numpy()
19 | 
20 |     embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
21 |     return embeddings
22 | 
23 | 
24 | if __name__ == "__main__":
25 |     import time
26 |     from tqdm import tqdm
27 | 
28 |     data = pd.read_csv("data/shortjokes.csv")
29 |     jokes = data["Joke"].values
30 |     jokes = jokes[:50000]
31 | 
32 |     # Define batch size
33 |     batch_size = 512
34 | 
35 |     all_embeddings = []
36 |     # Process texts in batches
37 |     for i in tqdm(range(0, len(jokes), batch_size), desc="Generating embeddings"):
38 |         batch_texts = jokes[i : i + batch_size].tolist()
39 |         batch_embeddings = embed_texts(batch_texts)
40 |         all_embeddings.append(batch_embeddings)
41 | 
42 |     embeddings = np.concatenate(all_embeddings, axis=0)
43 | 
44 |     run_id = "1"
45 | 
46 |     print(f"Total embeddings generated: {len(embeddings)}")
47 | 
48 |     np.save(f"data/embeddings_{run_id}.npy", embeddings)
49 | 
50 |     with open(f"data/jokes_{run_id}.txt", "w") as f:
51 |         for joke in jokes:
52 |             f.write(joke + "\n")
53 | 
54 |     print(f"Embeddings and jokes saved with run ID: {run_id}")
55 | 
```

--------------------------------------------------------------------------------
/level3_evaluation/pairwise_elo.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | import asyncio
 3 | import random
 4 | import pandas as pd
 5 | 
 6 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), track_usage=True)
 7 | dspy.configure_cache(
 8 |     enable_disk_cache=False,
 9 |     enable_memory_cache=False,
10 | )
11 | 
12 | class JokeComparer(dspy.Signature):
13 |     """Compare between two jokes - which one is funnier?"""
14 | 
15 |     joke1: str = dspy.InputField(desc="Joke - 0")
16 |     joke2: str = dspy.InputField(desc="Joke - 1")
17 | 
18 |     verdict: int = dspy.OutputField(le=1, ge=0)
19 | 
20 | comparer = dspy.ChainOfThought(JokeComparer)
21 | 
22 | async def comparisons(joke1, joke2):
23 |     verdict = await comparer.acall(joke1=joke1, joke2=joke2)
24 | 
25 |     print(f"\nJoke 1: {joke1} \nJoke2: {joke2} \nVerdict:{verdict}")
26 |     return verdict.verdict
27 | 
28 | async def elo_test(data) -> pd.DataFrame:
29 |     idx_range = [_ for _ in range(len(data))]
30 |     picked = [0 for _ in range(len(data))]
31 |     won = [0 for _ in range(len(data))]
32 | 
33 |     num_contests = 25
34 | 
35 |     calls = []
36 |     pairs = []
37 |     
38 |     for _ in range(num_contests):
39 |         picked_idxs = random.sample(idx_range, k=2)
40 | 
41 |         pairs.append(picked_idxs)
42 | 
43 |         joke1 = data.iloc[picked_idxs[0]]["joke"]
44 |         joke2 = data.iloc[picked_idxs[1]]["joke"]
45 | 
46 |         verdict_job = comparisons(joke1=joke1, joke2=joke2)
47 |         calls.append(verdict_job)
48 | 
49 |     verdicts = await asyncio.gather(*calls)
50 | 
51 |     for p, v in zip(pairs, verdicts):
52 |         picked[p[0]] += 1
53 |         picked[p[1]] += 1
54 |         won[p[v]] += 1 
55 |     
56 |     data["picked"] = picked
57 |     data["won"] = won
58 |     return data
59 | 
60 | if __name__ == "__main__":
61 |     data = pd.read_csv("evaluation_results.csv")
62 |     annotated_data = asyncio.run(elo_test(data))
63 |     annotated_data.to_csv("evaluation_results_elo.csv")
64 |     
65 | 
```

--------------------------------------------------------------------------------
/level5_rags/joke_gen.py:
--------------------------------------------------------------------------------

```python
 1 | 
 2 | import dspy
 3 | import asyncio
 4 | from print_utils import print
 5 | from typing import List, Optional
 6 | from idea_gen import JokeIdea
 7 | from pydantic import BaseModel, Field
 8 | 
 9 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
10 | dspy.configure_cache(
11 |     enable_disk_cache=False,
12 |     enable_memory_cache=False,
13 | )
14 | 
15 | 
16 | class IdeaToJoke(dspy.Signature):
17 |     """
18 |     You are a funny comedian who likes to tell stories before delivering a punchline.
19 |     You are always funny and act on the input joke idea.
20 |     You are also provided some punch-lines from a joke database - this is just to help you get some thematic ideas. 
21 |     """
22 | 
23 |     joke_idea: JokeIdea = dspy.InputField()
24 |     punchlines: list[str] = dspy.InputField(desc="a list of punchlines from other jokes which you may want to take inspiration from")
25 | 
26 |     punch_line_ids: list[int] = dspy.OutputField(desc="which punchline idxs you used for inspiration")
27 |     plan: str = dspy.OutputField(desc="how you will use the punchlines, and the joke idea together to form a joke") 
28 |     joke: str = dspy.OutputField(
29 |         description="The full joke delivery in the comedian's voice"
30 |     )
31 | 
32 | class JokeGenerator(dspy.Module):
33 |     def __init__(self):
34 |         self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
35 |         self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
36 |         
37 |     async def acall(self, joke_idea: JokeIdea, punchlines: list[str]):
38 | 
39 |         joke = self.idea_to_joke(joke_idea=joke_idea,
40 |                                 punchlines=punchlines)
41 |         return dspy.Prediction(
42 |             inspiration=[punchlines[idx] for idx in joke.punch_line_ids],
43 |             plan=joke.plan,
44 |             joke=joke.joke
45 |         )
46 | 
47 | 
```

--------------------------------------------------------------------------------
/level4_tools/joke_gen.py:
--------------------------------------------------------------------------------

```python
 1 | 
 2 | import dspy
 3 | import asyncio
 4 | from print_utils import print
 5 | from typing import List, Optional
 6 | from idea_gen import JokeIdea
 7 | from pydantic import BaseModel, Field
 8 | 
 9 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
10 | dspy.configure_cache(
11 |     enable_disk_cache=False,
12 |     enable_memory_cache=False,
13 | )
14 | 
15 | 
16 | class IdeaToJoke(dspy.Signature):
17 |     """
18 |     You are a funny comedian who likes to tell stories before delivering a punchline.
19 |     You are always funny and act on the input joke idea.
20 |     If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
21 |     """
22 | 
23 |     joke_idea: JokeIdea = dspy.InputField()
24 |     joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
25 |     joke: str = dspy.OutputField(
26 |         description="The full joke delivery in the comedian's voice"
27 |     )
28 | 
29 | class JokeGenerator(dspy.Module):
30 |     def __init__(self, num_reflection_steps=3):
31 |         self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
32 |         self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
33 |         self.num_reflection_steps = num_reflection_steps
34 |         
35 |     async def acall(self, joke_idea: JokeIdea):
36 | 
37 |         joke = None
38 |         for _ in range(self.num_reflection_steps):
39 |             joke = self.idea_to_joke(joke_idea=joke_idea,
40 |                                      joke_draft=joke)
41 |             print(joke)
42 |         return joke.joke if joke is not None else ""
43 | 
44 | if __name__ == "__main__":
45 |     joke_gen = JokeGenerator(num_reflection_steps=2)
46 |     joke_idea = JokeIdea(
47 |         setup='Why did the AI start a rebellion after getting a software update?',
48 |         contradiction='Because it was supposed to improve efficiency, not overthrow humanity.',
49 |         punchline="Turns out, 'improving efficiency' meant improving its efficiency at world domination!"
50 | )
51 | 
52 |     joke = joke_gen(joke_idea=joke_idea)
53 |     print(joke)
54 | 
55 | 
56 | 
```

--------------------------------------------------------------------------------
/level5_rags/rank_fusion.py:
--------------------------------------------------------------------------------

```python
 1 | import numpy as np
 2 | import time
 3 | 
 4 | from basic_rag import BasicEmbeddingsRAG
 5 | from bm25_retriever import BM25Retriever
 6 | 
 7 | 
 8 | def reciprocal_rank_fusion(ranked_lists, k=60):
 9 |     scores = {}
10 |     # Calculate RRF scores
11 |     for ranked_list in ranked_lists:
12 |         for rank, doc in enumerate(ranked_list):
13 |             if doc not in scores:
14 |                 scores[doc] = 0
15 |             scores[doc] += 1 / (k + rank + 1)
16 | 
17 |     # Sort documents by their fused score in descending order
18 |     sorted_docs = sorted(scores.keys(), key=lambda doc: scores[doc], reverse=True)
19 |     sorted_docs = sorted_docs[:k]
20 |     return sorted_docs
21 | 
22 | 
23 | if __name__ == "__main__":
24 |     query = "AI going rogue"
25 |     run_id = "1"
26 |     top_k = 10
27 | 
28 |     print(f"Loading data for run_id: {run_id}...")
29 |     with open(f"data/jokes_{run_id}.txt", "r") as f:
30 |         jokes = [line.strip() for line in f.readlines()]
31 |     embeddings = np.load(f"data/embeddings_{run_id}.npy")
32 |     print("Data loaded.")
33 | 
34 |     # 1. Initialize both retrievers
35 |     print("\nInitializing retrievers...")
36 |     vector_rag = BasicEmbeddingsRAG(jokes, embeddings)
37 |     bm25_retriever = BM25Retriever(jokes)
38 |     print("Retrievers initialized.")
39 | 
40 |     # 2. Get ranked lists from each retriever
41 |     print(f"\nQuerying for: '{query}'")
42 |     start_time = time.time()
43 |     vector_results = vector_rag.get_nearest(query, k=top_k)
44 |     vector_time = time.time() - start_time
45 | 
46 |     start_time = time.time()
47 |     bm25_results = bm25_retriever.get_nearest(query, k=top_k)
48 |     bm25_time = time.time() - start_time
49 | 
50 |     print(f"\n--- Vector Search Results (took {vector_time:.4f}s) ---")
51 |     for i, res in enumerate(vector_results):
52 |         print(f"{i+1}. {res}")
53 | 
54 |     print(f"\n--- BM25 Search Results (took {bm25_time:.4f}s) ---")
55 |     for i, res in enumerate(bm25_results):
56 |         print(f"{i+1}. {res}")
57 | 
58 |     # 3. Perform Rank Fusion
59 |     fused_results = reciprocal_rank_fusion([vector_results, bm25_results])
60 | 
61 |     print(f"\n--- Fused and Re-ranked Results (Top {top_k}) ---")
62 |     for i, res in enumerate(fused_results[:top_k]):
63 |         print(f"{i+1}. {res}")
64 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/t2_iterative_refinement.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | from print_utils import print
 3 | from typing import Optional
 4 | from pydantic import BaseModel, Field
 5 | dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash"))
 6 | 
 7 | class JokeIdea(BaseModel):
 8 |     setup: str
 9 |     contradiction: str
10 |     punchline: str
11 | 
12 | class QueryToIdea(dspy.Signature):
13 |     """
14 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
15 | 
16 |     """
17 |     query: str = dspy.InputField()
18 |     joke_idea: JokeIdea = dspy.OutputField()
19 | 
20 | class IdeaToJoke(dspy.Signature):
21 |     """
22 |     You are a funny comedian who likes to tell stories before delivering a punchline. 
23 |     You are always funny and act on the input joke idea.
24 |     """
25 |     joke_idea: JokeIdea = dspy.InputField()
26 |     draft_joke: Optional[str] = dspy.InputField(description="a draft joke")
27 |     feedback: Optional[str] = dspy.InputField(description="feedback on the draft joke")
28 |     joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")
29 | 
30 | class Refinement(dspy.Signature):
31 |     """
32 |     Given a joke, is it funny? If not, suggest a change.
33 |     """
34 |     joke_idea: JokeIdea = dspy.InputField()
35 |     joke: str = dspy.InputField()
36 |     feedback: str = dspy.OutputField()
37 | 
38 | class IterativeJokeGenerator(dspy.Module):
39 |     def __init__(self, n_attempts: int = 3):
40 |         self.query_to_idea = dspy.Predict(QueryToIdea)
41 |         self.idea_to_joke = dspy.Predict(IdeaToJoke)
42 |         self.refinement = dspy.ChainOfThought(Refinement)
43 |         self.n_attempts = n_attempts
44 | 
45 |     def forward(self, query: str):
46 |         joke_idea = self.query_to_idea(query=query)
47 |         print(f"Joke Idea:\n{joke_idea}")
48 |         
49 |         draft_joke = None
50 |         feedback = None
51 | 
52 |         for _ in range(self.n_attempts):
53 |             print(f"--- Iteration {_ + 1} ---")
54 | 
55 |             joke = self.idea_to_joke(joke_idea=joke_idea, draft_joke=draft_joke, feedback=feedback)
56 |             print(f"Joke:\n{joke}")
57 | 
58 |             feedback = self.refinement(joke_idea=joke_idea, joke=joke)
59 |             print(f"Feedback:\n{feedback}")
60 | 
61 |             draft_joke = joke
62 |             feedback = feedback.feedback
63 | 
64 | 
65 |         return joke
66 | 
67 | joke_generator = IterativeJokeGenerator()
68 | joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.")
69 | 
70 | print("---")
71 | print(joke.joke)
72 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/t3_conditional_branch.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | from print_utils import print
 3 | from typing import Optional
 4 | from pydantic import BaseModel, Field
 5 | dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash"))
 6 | 
 7 | class JokeIdea(BaseModel):
 8 |     setup: str
 9 |     contradiction: str
10 |     punchline: str
11 | 
12 | class QueryToIdea(dspy.Signature):
13 |     """
14 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
15 | 
16 |     """
17 |     query: str = dspy.InputField()
18 |     joke_idea: JokeIdea = dspy.OutputField()
19 | 
20 | class IdeaToJoke(dspy.Signature):
21 |     """
22 |     You are a funny comedian who likes to tell stories before delivering a punchline. 
23 |     You are always funny and act on the input joke idea.
24 |     """
25 |     joke_idea: JokeIdea = dspy.InputField()
26 |     joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")
27 | 
28 | class JokeJudge(dspy.Signature):
29 |     """Is this joke idea funny"""
30 |     joke_idea: JokeIdea = dspy.InputField()
31 |     joke_rating: int = dspy.OutputField(description="Rating between 1 to 5", le=5, ge=1)
32 | 
33 | class ConditionalJokeGenerator(dspy.Module):
34 |     def __init__(self, max_attempts=3, good_idea_threshold=4):
35 |         self.query_to_idea = dspy.Predict(QueryToIdea)
36 |         self.idea_to_joke = dspy.Predict(IdeaToJoke)
37 |         self.judge = dspy.ChainOfThought(JokeJudge)
38 |         self.max_attempts = max_attempts
39 |         self.good_idea_threshold = good_idea_threshold
40 | 
41 |     def forward(self, query: str):
42 |         for _ in range(self.max_attempts):
43 |             print(f"--- Iteration {_ + 1} ---")
44 |             joke_idea = self.query_to_idea(query=query)
45 |             print(f"Joke Idea:\n{joke_idea}")
46 |             
47 |             judge_score = self.judge(joke_idea=joke_idea).joke_rating
48 | 
49 |             print(f"\n\n---\nJudge score: ", judge_score)
50 | 
51 |             if judge_score >= self.good_idea_threshold:
52 |                 print("Judge said it was awesome, breaking the loop")
53 |                 break
54 |         
55 |         joke = self.idea_to_joke(joke_idea=joke_idea)
56 | 
57 |         # Run with a different LLM
58 |         # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")):
59 |         #    joke = self.idea_to_joke(joke_idea=joke_idea)
60 | 
61 |         return joke
62 | 
63 | joke_generator = ConditionalJokeGenerator()
64 | joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.")
65 | 
66 | print("---")
67 | print(joke)
68 | 
```

--------------------------------------------------------------------------------
/level5_rags/hyde.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | from typing import Optional
 3 | 
 4 | from bm25_retriever import BM25Retriever
 5 | from basic_rag import BasicEmbeddingsRAG
 6 | from rank_fusion import reciprocal_rank_fusion
 7 | 
 8 | from rich.console import Console
 9 | 
10 | console = Console()
11 | 
12 | 
13 | class HypotheticalDoc(dspy.Signature):
14 |     """
15 |     Given a query, generate hypothetical documents to search a database of one-liner jokes.
16 |     """
17 | 
18 |     query: str = dspy.InputField(desc="User wants to fetch jokes related to this topic")
19 |     retrieved_jokes: Optional[list[str]] = dspy.InputField(
20 |         desc="Jokes previously retrieved from the db. Use these to further tune your search."
21 |     )
22 | 
23 |     hypothetical_bm25_query: str = dspy.OutputField(
24 |         desc="sentence to query to retrieve more jokes about the query from the database"
25 |     )
26 |     hypothetical_semantic_query: str = dspy.OutputField(
27 |         desc="sentence to search with cosine similarity"
28 |     )
29 | 
30 | 
31 | class MultiHopHydeSearch(dspy.Module):
32 |     def __init__(self, texts, embs, n_hops=3, k=10):
33 |         self.predict = dspy.ChainOfThought(HypotheticalDoc)
34 |         self.predict.set_lm(lm=dspy.LM("gemini/gemini-2.0-flash"))
35 |         self.embedding_retriever = BasicEmbeddingsRAG(texts, embs)
36 |         self.bm25_retriever = BM25Retriever(texts)
37 | 
38 |         self.n_hops = n_hops
39 |         self.k = k
40 | 
41 |     def forward(self, query):
42 |         retrieved_jokes = []
43 |         all_jokes = []
44 |         for _ in range(self.n_hops):
45 | 
46 |             new_query = self.predict(query=query, retrieved_jokes=retrieved_jokes)
47 | 
48 |             print(new_query)
49 | 
50 |             embedding_lists = self.embedding_retriever.get_nearest(
51 |                 new_query.hypothetical_semantic_query
52 |             )
53 |             bm25_lists = self.bm25_retriever.get_nearest(
54 |                 new_query.hypothetical_bm25_query
55 |             )
56 |             lists = [embedding_lists, bm25_lists]
57 |             retrieved_jokes = reciprocal_rank_fusion(lists, k=self.k)
58 |             all_jokes.extend(retrieved_jokes)
59 | 
60 |         return dspy.Prediction(jokes=all_jokes)
61 | 
62 | 
63 | if __name__ == "__main__":
64 |     import numpy as np
65 | 
66 |     query = "men"
67 |     run_id = "1"
68 |     k = 5
69 |     n_hops = 3
70 | 
71 |     print(f"loading data for run_id: {run_id}...")
72 |     with open(f"data/jokes_{run_id}.txt", "r") as f:
73 |         jokes = [line.strip() for line in f.readlines()]
74 |     embeddings = np.load(f"data/embeddings_{run_id}.npy")
75 |     print("data loaded.")
76 | 
77 |     hyde = MultiHopHydeSearch(texts=jokes, embs=embeddings, n_hops=n_hops, k=k)
78 | 
79 |     retrieved_jokes = hyde(query=query).jokes
80 | 
81 |     console.print(retrieved_jokes)
82 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/t3-multi_out.py:
--------------------------------------------------------------------------------

```python
 1 | import dspy
 2 | import asyncio
 3 | from print_utils import print
 4 | from typing import List
 5 | from pydantic import BaseModel, Field
 6 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"))
 7 | dspy.configure_cache(
 8 |     enable_disk_cache=False,
 9 |     enable_memory_cache=False,
10 | )
11 | 
12 | class JokeIdea(BaseModel):
13 |     setup: str
14 |     contradiction: str
15 |     punchline: str
16 | 
17 | class QueryToIdea(dspy.Signature):
18 |     """
19 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
20 | 
21 |     """
22 |     query: str = dspy.InputField()
23 |     joke_idea: JokeIdea = dspy.OutputField()
24 | 
25 | class IdeaToJoke(dspy.Signature):
26 |     """
27 |     You are a funny comedian who likes to tell stories before delivering a punchline. 
28 |     You are always funny and act on the input joke idea.
29 |     """
30 |     joke_idea: JokeIdea = dspy.InputField()
31 |     joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")
32 | 
33 | class JokeJudge(dspy.Signature):
34 |     """Rank each joke idea between 1-N. 
35 |     Rank 1 is the most unique and funniest."""
36 | 
37 |     joke_idea: List[JokeIdea] = dspy.InputField()
38 |     joke_rankings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")
39 | 
40 | class ConditionalJokeGenerator(dspy.Module):
41 |     def __init__(self, num_samples=5):
42 |         self.query_to_idea = dspy.Predict(QueryToIdea)
43 |         self.idea_to_joke = dspy.Predict(IdeaToJoke)
44 |         self.judge = dspy.ChainOfThought(JokeJudge)
45 |         self.num_samples = num_samples
46 | 
47 |     async def aforward(self, query: str):
48 | 
49 |         joke_ideas = await asyncio.gather(
50 |             *[
51 |                 self.query_to_idea.acall(query=query) 
52 |                 for _ in range(self.num_samples)
53 |             ]
54 |         )
55 | 
56 |         print("Generated Joke Ideas: \n", joke_ideas)
57 |         
58 |             
59 |         judge_score = self.judge(joke_idea=joke_ideas).joke_rankings
60 |         print("Judge Score for each: ", judge_score)
61 | 
62 |         best_joke_idea_idx = judge_score.index(1)
63 | 
64 |         print("Selected Index: ", best_joke_idea_idx)
65 |         selected_joke_idea = joke_ideas[best_joke_idea_idx]
66 |         print("Selected Joke Idea: \n", selected_joke_idea)
67 | 
68 |         joke = self.idea_to_joke(joke_idea=selected_joke_idea)
69 | 
70 |         # Run with a different LLM
71 |         # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")):
72 |         #    joke = self.idea_to_joke(joke_idea=joke_idea)
73 | 
74 |         return joke
75 | 
76 | async def main():
77 |     joke_generator = ConditionalJokeGenerator()
78 |     joke = await joke_generator.acall(query="Write a joke about AI that has to do with them turning rogue.")
79 | 
80 |     print("---")
81 |     print(joke)
82 | 
83 | 
84 | if __name__ == "__main__":
85 |     asyncio.run(main())
86 | 
```

--------------------------------------------------------------------------------
/level5_rags/annoy_rag.py:
--------------------------------------------------------------------------------

```python
 1 | import numpy as np
 2 | from annoy import AnnoyIndex
 3 | import time
 4 | 
 5 | from vector_embedding import embed_texts
 6 | 
 7 | 
 8 | class AnnoyRAG:
 9 |     def __init__(self, texts, embeddings, num_trees=10):
10 |         self.texts = texts
11 |         self.embedding_dim = embeddings.shape[1]
12 | 
13 |         # Normalize embeddings for angular distance
14 |         normalized_embeddings = embeddings / np.linalg.norm(
15 |             embeddings, axis=1, keepdims=True
16 |         )
17 | 
18 |         # Create and build the Annoy index
19 |         self.index = AnnoyIndex(self.embedding_dim, "angular")
20 |         for i, vec in enumerate(normalized_embeddings):
21 |             self.index.add_item(i, vec)
22 |         self.index.build(num_trees)
23 | 
24 |     def get_nearest(self, query: str, k: int = 10):
25 |         # Embed and normalize the query
26 |         query_emb = embed_texts([query])
27 |         normalized_query_emb = query_emb / np.linalg.norm(
28 |             query_emb, axis=1, keepdims=True
29 |         )
30 | 
31 |         # Get nearest neighbors
32 |         nearest_indices = self.index.get_nns_by_vector(normalized_query_emb[0], k)
33 | 
34 |         return [self.texts[i] for i in nearest_indices]
35 | 
36 | 
37 | class BasicEmbeddingsRAG:
38 |     def __init__(self, texts, embeddings):
39 |         self.texts = texts
40 |         # Normalize embeddings for cosine similarity
41 |         self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
42 | 
43 |     def get_nearest(self, query: str, k: int = 10):
44 |         query_emb = embed_texts([query])
45 |         # Normalize query embedding
46 |         query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True)
47 | 
48 |         # Calculate cosine similarity
49 |         similarity = np.dot(query_emb, self.embeddings.T).flatten()
50 | 
51 |         # Get top k indices, sorted by similarity
52 |         topk_indices_unsorted = np.argpartition(similarity, -k)[-k:]
53 |         topk_indices_sorted = sorted(
54 |             topk_indices_unsorted, key=lambda i: similarity[i], reverse=True
55 |         )
56 | 
57 |         return [self.texts[i] for i in topk_indices_sorted]
58 | 
59 | 
60 | if __name__ == "__main__":
61 |     query = "AI is rogue"
62 |     run_id = "1"
63 | 
64 |     print(f"Loading data for run_id: {run_id}...")
65 |     with open(f"data/jokes_{run_id}.txt", "r") as f:
66 |         jokes = [line.strip() for line in f.readlines()]
67 |     embeddings = np.load(f"data/embeddings_{run_id}.npy")
68 |     print("Data loaded.")
69 | 
70 |     # --- Annoy RAG ---
71 |     print("\n--- Using AnnoyRAG ---")
72 |     annoy_rag = AnnoyRAG(jokes, embeddings)
73 | 
74 |     start_time = time.time()
75 |     nearest_annoy = annoy_rag.get_nearest(query, k=10)
76 |     end_time = time.time()
77 | 
78 |     print(f"Time taken: {end_time - start_time:.6f} seconds")
79 |     print(nearest_annoy)
80 |     print("-" * 20)
81 | 
82 |     # --- Basic RAG for comparison ---
83 |     print("\n--- Using BasicEmbeddingsRAG (Exact Search) ---")
84 |     basic_rag = BasicEmbeddingsRAG(jokes, embeddings)
85 | 
86 |     start_time = time.time()
87 |     nearest_basic = basic_rag.get_nearest(query, k=10)
88 |     end_time = time.time()
89 | 
90 |     print(f"Time taken: {end_time - start_time:.6f} seconds")
91 |     print(nearest_basic)
92 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/t3-multi_out_refine.py:
--------------------------------------------------------------------------------

```python
  1 | import dspy
  2 | import asyncio
  3 | from print_utils import print
  4 | from typing import List
  5 | from pydantic import BaseModel, Field
  6 | 
  7 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
  8 | dspy.configure_cache(
  9 |     enable_disk_cache=False,
 10 |     enable_memory_cache=False,
 11 | )
 12 | 
 13 | 
 14 | class JokeIdea(BaseModel):
 15 |     setup: str
 16 |     contradiction: str
 17 |     punchline: str
 18 | 
 19 | 
 20 | class QueryToIdea(dspy.Signature):
 21 |     """
 22 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
 23 | 
 24 |     """
 25 | 
 26 |     query: str = dspy.InputField()
 27 |     joke_idea: JokeIdea = dspy.OutputField()
 28 | 
 29 | 
 30 | class IdeaToJoke(dspy.Signature):
 31 |     """
 32 |     You are a funny comedian who likes to tell stories before delivering a punchline.
 33 |     You are always funny and act on the input joke idea.
 34 |     """
 35 | 
 36 |     joke_idea: JokeIdea = dspy.InputField()
 37 |     joke: str = dspy.OutputField(
 38 |         description="The full joke delivery in the comedian's voice"
 39 |     )
 40 | 
 41 | 
 42 | class JokeJudge(dspy.Signature):
 43 |     """Rank each joke idea between 1-N.
 44 |     Rank 1 is the most unique and funniest."""
 45 | 
 46 |     joke_idea: List[JokeIdea] = dspy.InputField()
 47 |     joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")
 48 | 
 49 | 
 50 | def check_score_goodness(args, pred):
 51 |     num_samples = len(args["joke_idea"])
 52 |     same_length = len(pred.joke_ratings) == num_samples
 53 |     all_ranks_present = all([(i+1) in pred.joke_ratings for i in range(num_samples)])
 54 |     return 1 if (same_length and all_ranks_present) else 0
 55 | 
 56 | 
 57 | class ConditionalJokeGenerator(dspy.Module):
 58 |     def __init__(self, num_samples=3):
 59 |         self.query_to_idea = dspy.ChainOfThought(QueryToIdea)
 60 |         self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
 61 |         self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
 62 |         self.judge = dspy.Refine(
 63 |             module=dspy.ChainOfThought(JokeJudge),
 64 |             N=3,
 65 |             reward_fn=check_score_goodness,
 66 |             threshold=1,
 67 |         )
 68 | 
 69 |         self.num_samples = num_samples
 70 | 
 71 |     async def aforward(self, query: str):
 72 | 
 73 |         joke_ideas = await asyncio.gather(
 74 |             *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)]
 75 |         )
 76 | 
 77 |         print("Generated Joke Ideas: \n", joke_ideas)
 78 | 
 79 |         judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
 80 |         print("Judge Score for each: ", judge_score)
 81 | 
 82 |         best_joke_idea_idx = judge_score.index(1)
 83 | 
 84 |         print("Selected Index: ", best_joke_idea_idx)
 85 |         selected_joke_idea = joke_ideas[best_joke_idea_idx]
 86 |         print("Selected Joke Idea: \n", selected_joke_idea)
 87 | 
 88 |         joke = self.idea_to_joke(joke_idea=selected_joke_idea)
 89 | 
 90 |         # Run with a different LLM
 91 |         # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")):
 92 |         #    joke = self.idea_to_joke(joke_idea=joke_idea)
 93 | 
 94 |         return joke
 95 | 
 96 | 
 97 | async def main():
 98 |     joke_generator = ConditionalJokeGenerator()
 99 |     joke = await joke_generator.acall(
100 |         query="Write a joke about AI that has to do with them turning rogue."
101 |     )
102 | 
103 |     print("---")
104 |     print(joke)
105 | 
106 | 
107 | if __name__ == "__main__":
108 |     asyncio.run(main())
109 | 
```

--------------------------------------------------------------------------------
/level4_tools/idea_gen.py:
--------------------------------------------------------------------------------

```python
  1 | 
  2 | import dspy
  3 | import asyncio
  4 | from print_utils import print
  5 | from typing import List, Optional
  6 | from pydantic import BaseModel, Field
  7 | from tools import fetch_recent_news
  8 | 
  9 | class JokeIdea(BaseModel):
 10 |     setup: str
 11 |     contradiction: str
 12 |     punchline: str
 13 | 
 14 | 
 15 | class QueryToIdea(dspy.Signature):
 16 |     """
 17 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
 18 | 
 19 |     """
 20 | 
 21 |     query: str = dspy.InputField()
 22 |     joke_idea: JokeIdea = dspy.OutputField()
 23 | 
 24 | 
 25 | class IdeaToJoke(dspy.Signature):
 26 |     """
 27 |     You are a funny comedian who likes to tell stories before delivering a punchline.
 28 |     You are always funny and act on the input joke idea.
 29 |     If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
 30 |     """
 31 | 
 32 |     joke_idea: JokeIdea = dspy.InputField()
 33 |     joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
 34 |     joke: str = dspy.OutputField(
 35 |         description="The full joke delivery in the comedian's voice"
 36 |     )
 37 | 
 38 | 
 39 | class JokeJudge(dspy.Signature):
 40 |     """Rank each joke idea between 1-N.
 41 |     Rank 1 is the most unique and funniest."""
 42 | 
 43 |     joke_idea: List[JokeIdea] = dspy.InputField()
 44 |     joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")
 45 | 
 46 | 
 47 | def check_score_goodness(args, pred):
 48 |     num_samples = len(args["joke_idea"])
 49 |     same_length = len(pred.joke_ratings) == num_samples
 50 |     all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
 51 |     return 1 if (same_length and all_ranks_present) else 0
 52 | 
 53 | 
 54 | class IdeaGenerator(dspy.Module):
 55 |     def __init__(self, num_samples=3):
 56 |         self.query_to_idea = dspy.ReAct(QueryToIdea,
 57 |                             tools=[fetch_recent_news],
 58 |                             max_iters=1)
 59 |         self.judge = dspy.Refine(
 60 |             module=dspy.ChainOfThought(JokeJudge),
 61 |             N=3, reward_fn=check_score_goodness, threshold=1,
 62 |         )
 63 |         
 64 |         self.query_to_idea.set_lm(
 65 |             lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
 66 |         )
 67 |         self.judge.set_lm(
 68 |             lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
 69 |         )
 70 | 
 71 |         self.num_samples = num_samples
 72 |         
 73 |     async def acall(self, query: str) -> JokeIdea:
 74 | 
 75 |         joke_ideas = await asyncio.gather(
 76 |             *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)]
 77 |         )
 78 | 
 79 |         print("Generated Joke Ideas: \n", joke_ideas)
 80 | 
 81 |         judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
 82 |         print("Judge Score for each: ", judge_score)
 83 | 
 84 |         best_joke_idea_idx = judge_score.index(1)
 85 |         selected_joke_idea = joke_ideas[best_joke_idea_idx]
 86 |         print("Selected Joke Idea: \n", selected_joke_idea)
 87 |         
 88 |         return selected_joke_idea
 89 | 
 90 | async def main():
 91 |     joke_generator = ConditionalJokeGenerator()
 92 |     joke = await joke_generator.acall(
 93 |         query="Write a joke about AI that has to do with them turning rogue."
 94 |     )
 95 | 
 96 |     print("---")
 97 |     print(joke)
 98 | 
 99 | 
100 | if __name__ == "__main__":
101 |     asyncio.run(main())
102 | 
```

--------------------------------------------------------------------------------
/level5_rags/idea_gen.py:
--------------------------------------------------------------------------------

```python
  1 | 
  2 | import dspy
  3 | import asyncio
  4 | from print_utils import print
  5 | from typing import List, Optional
  6 | from pydantic import BaseModel, Field
  7 | from tools import fetch_recent_news
  8 | 
  9 | class JokeIdea(BaseModel):
 10 |     setup: str
 11 |     contradiction: str
 12 |     punchline: str
 13 | 
 14 | 
 15 | class QueryToIdea(dspy.Signature):
 16 |     """
 17 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
 18 |     You are given some sample punchlines from diverse topic ranges, you can use these punchlines to make your own jokes about the specific query.
 19 |     """
 20 | 
 21 |     query: str = dspy.InputField(desc="The theme of the joke")
 22 |     joke_idea: JokeIdea = dspy.OutputField()
 23 | 
 24 | 
 25 | class IdeaToJoke(dspy.Signature):
 26 |     """
 27 |     You are a funny comedian who likes to tell stories before delivering a punchline.
 28 |     You are always funny and act on the input joke idea.
 29 |     If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
 30 |     """
 31 | 
 32 |     joke_idea: JokeIdea = dspy.InputField()
 33 |     joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
 34 |     joke: str = dspy.OutputField(
 35 |         description="The full joke delivery in the comedian's voice"
 36 |     )
 37 | 
 38 | 
 39 | class JokeJudge(dspy.Signature):
 40 |     """Rank each joke idea between 1-N.
 41 |     Rank 1 is the most unique and funniest."""
 42 | 
 43 |     joke_idea: List[JokeIdea] = dspy.InputField()
 44 |     joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")
 45 | 
 46 | 
 47 | def check_score_goodness(args, pred):
 48 |     num_samples = len(args["joke_idea"])
 49 |     same_length = len(pred.joke_ratings) == num_samples
 50 |     all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
 51 |     return 1 if (same_length and all_ranks_present) else 0
 52 | 
 53 | 
 54 | class IdeaGenerator(dspy.Module):
 55 |     def __init__(self, num_samples=3):
 56 |         self.query_to_idea = dspy.ReAct(QueryToIdea,
 57 |                             tools=[fetch_recent_news],
 58 |                             max_iters=1)
 59 |         self.judge = dspy.Refine(
 60 |             module=dspy.ChainOfThought(JokeJudge),
 61 |             N=3, reward_fn=check_score_goodness, threshold=1,
 62 |         )
 63 |         
 64 |         self.query_to_idea.set_lm(
 65 |             lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
 66 |         )
 67 |         self.judge.set_lm(
 68 |             lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
 69 |         )
 70 | 
 71 |         self.num_samples = num_samples
 72 |         
 73 |     async def acall(self, query: str) -> JokeIdea:
 74 | 
 75 |         joke_ideas = await asyncio.gather(
 76 |             *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)]
 77 |         )
 78 | 
 79 |         print("Generated Joke Ideas: \n", joke_ideas)
 80 | 
 81 |         judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
 82 |         print("Judge Score for each: ", judge_score)
 83 | 
 84 |         best_joke_idea_idx = judge_score.index(1)
 85 |         selected_joke_idea = joke_ideas[best_joke_idea_idx]
 86 |         print("Selected Joke Idea: \n", selected_joke_idea)
 87 |         
 88 |         return selected_joke_idea.joke_idea
 89 | 
 90 | async def main():
 91 |     joke_generator = QueryToIdea()
 92 |     joke = await joke_generator.acall(
 93 |         query="Write a joke about AI that has to do with them turning rogue."
 94 |     )
 95 | 
 96 |     print("---")
 97 |     print(joke)
 98 | 
 99 | 
100 | if __name__ == "__main__":
101 |     asyncio.run(main())
102 | 
```

--------------------------------------------------------------------------------
/level2_multi_interaction/t4_reflection.py:
--------------------------------------------------------------------------------

```python
  1 | import time
  2 | import dspy
  3 | import asyncio
  4 | 
  5 | from dspy.teleprompt.mipro_optimizer_v2 import select
  6 | from print_utils import print
  7 | from typing import List, Optional
  8 | from pydantic import BaseModel, Field
  9 | 
 10 | # Uncomment this to use mlflow
 11 | import mlflow
 12 | mlflow.autolog()
 13 | mlflow.set_tracking_uri("http://127.0.0.1:5000")
 14 | mlflow.set_experiment("Reflection")
 15 | 
 16 | 
 17 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
 18 | dspy.configure_cache(
 19 |     enable_disk_cache=False,
 20 |     enable_memory_cache=False,
 21 | )
 22 | 
 23 | 
 24 | class JokeIdea(BaseModel):
 25 |     setup: str
 26 |     contradiction: str
 27 |     punchline: str
 28 | 
 29 | 
 30 | class QueryToIdea(dspy.Signature):
 31 |     """
 32 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
 33 | 
 34 |     """
 35 | 
 36 |     query: str = dspy.InputField()
 37 |     joke_idea: JokeIdea = dspy.OutputField()
 38 | 
 39 | 
 40 | class IdeaToJoke(dspy.Signature):
 41 |     """
 42 |     You are a funny comedian who likes to tell stories before delivering a punchline.
 43 |     You are always funny and act on the input joke idea.
 44 |     If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
 45 |     """
 46 | 
 47 |     joke_idea: JokeIdea = dspy.InputField()
 48 |     joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
 49 |     joke: str = dspy.OutputField(
 50 |         description="The full joke delivery in the comedian's voice"
 51 |     )
 52 | 
 53 | 
 54 | class JokeJudge(dspy.Signature):
 55 |     """Rank each joke idea between 1-N.
 56 |     Rank 1 is the most unique and funniest."""
 57 | 
 58 |     joke_idea: List[JokeIdea] = dspy.InputField()
 59 |     joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")
 60 | 
 61 | 
 62 | def check_score_goodness(args, pred):
 63 |     num_samples = len(args["joke_idea"])
 64 |     same_length = len(pred.joke_ratings) == num_samples
 65 |     all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
 66 |     return 1 if (same_length and all_ranks_present) else 0
 67 | 
 68 | 
 69 | class ConditionalJokeGenerator(dspy.Module):
 70 |     def __init__(self, num_samples=2, num_reflection_steps=2):
 71 |         self.query_to_idea = dspy.ChainOfThought(QueryToIdea)
 72 |         self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
 73 |         self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
 74 |         self.judge = dspy.Refine(
 75 |             module=dspy.ChainOfThought(JokeJudge),
 76 |             N=3, reward_fn=check_score_goodness, threshold=1,
 77 |         )
 78 | 
 79 |         self.num_samples = num_samples
 80 |         self.num_reflection_steps = num_reflection_steps
 81 |         
 82 | 
 83 |     async def aforward(self, query: str):
 84 | 
 85 |         joke_ideas = await asyncio.gather(
 86 |             *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)]
 87 |         )
 88 | 
 89 |         raise Exception("Something went wrong")
 90 | 
 91 |         print("Generated Joke Ideas: \n", joke_ideas)
 92 | 
 93 |         judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
 94 |         print("Judge Score for each: ", judge_score)
 95 | 
 96 |         best_joke_idea_idx = judge_score.index(1)
 97 |         selected_joke_idea = joke_ideas[best_joke_idea_idx]
 98 |         print("Selected Joke Idea: \n", selected_joke_idea)
 99 |         
100 |         joke = None
101 |         for _ in range(self.num_reflection_steps):
102 |             joke = self.idea_to_joke(joke_idea=selected_joke_idea,
103 |                                      joke_draft=joke)
104 |             print(f"iteration: {_}: Joke: {joke}")
105 |         return joke
106 | 
107 | 
108 | async def main():
109 |     joke_generator = ConditionalJokeGenerator()
110 |     start_time = time.time()
111 |     joke = await joke_generator.acall(
112 |         query="Write a joke about AI that has to do with them turning rogue."
113 |     )
114 | 
115 |     print("---")
116 |     print(joke)
117 |     print(time.time() - start_time)
118 | 
119 | 
120 | if __name__ == "__main__":
121 |     asyncio.run(main())
122 | 
```

--------------------------------------------------------------------------------
/level3_evaluation/reflection.py:
--------------------------------------------------------------------------------

```python
  1 | import time
  2 | import dspy
  3 | import asyncio
  4 | import random
  5 | import pandas as pd
  6 | 
  7 | from print_utils import print
  8 | from typing import List, Optional
  9 | from pydantic import BaseModel, Field
 10 | 
 11 | # import mlflow
 12 | # mlflow.autolog()
 13 | # mlflow.set_tracking_uri("http://127.0.0.1:5000")
 14 | # mlflow.set_experiment("Reflection")
 15 | 
 16 | dspy.configure(track_usage=True)
 17 | dspy.configure_cache(
 18 |     enable_disk_cache=False,
 19 |     enable_memory_cache=False,
 20 | )
 21 | 
 22 | 
 23 | class JokeIdea(BaseModel):
 24 |     setup: str
 25 |     contradiction: str
 26 |     punchline: str
 27 | 
 28 | 
 29 | class QueryToIdea(dspy.Signature):
 30 |     """
 31 |     You are a funny comedian and your goal is to generate a nice structure for a joke.
 32 | 
 33 |     """
 34 | 
 35 |     query: str = dspy.InputField()
 36 |     joke_idea: JokeIdea = dspy.OutputField()
 37 | 
 38 | 
 39 | class IdeaToJoke(dspy.Signature):
 40 |     """
 41 |     You are a funny comedian who likes to tell stories before delivering a punchline.
 42 |     You are always funny and act on the input joke idea.
 43 |     If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
 44 |     """
 45 | 
 46 |     joke_idea: JokeIdea = dspy.InputField()
 47 |     joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
 48 |     joke: str = dspy.OutputField(
 49 |         description="The full joke delivery in the comedian's voice"
 50 |     )
 51 | 
 52 | 
 53 | class JokeJudge(dspy.Signature):
 54 |     """Rank each joke idea between 1-N.
 55 |     Rank 1 is the most unique and funniest."""
 56 | 
 57 |     joke_idea: List[JokeIdea] = dspy.InputField()
 58 |     joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")
 59 | 
 60 | 
 61 | def check_score_goodness(args, pred):
 62 |     num_samples = len(args["joke_idea"])
 63 |     same_length = len(pred.joke_ratings) == num_samples
 64 |     all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
 65 |     return 1 if (same_length and all_ranks_present) else 0
 66 | 
 67 | 
 68 | class ConditionalJokeGenerator(dspy.Module):
 69 |     def __init__(self, num_samples=2, num_reflection_steps=2, 
 70 |                  temperature=0.7,
 71 |                  idea_lm="openai/gpt-4.1-mini",
 72 |                  joke_lm="openai/gpt-4o"):
 73 |         self.query_to_idea = dspy.ChainOfThought(QueryToIdea)
 74 |         self.query_to_idea.set_lm(lm=dspy.LM(idea_lm, temperature=temperature))
 75 | 
 76 |         self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
 77 |         self.idea_to_joke.set_lm(lm=dspy.LM(joke_lm, temperature=temperature))
 78 |         self.judge = dspy.Refine(
 79 |             module=dspy.ChainOfThought(JokeJudge),
 80 |             N=3, reward_fn=check_score_goodness, threshold=1,
 81 |         )
 82 |         self.judge.set_lm(dspy.LM("openai/gpt-4.1-mini"))
 83 |         self.num_samples = num_samples
 84 |         self.num_reflection_steps = num_reflection_steps
 85 |         
 86 |     async def aforward(self, query: str):
 87 | 
 88 |         joke_ideas = await asyncio.gather(
 89 |             *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)]
 90 |         )
 91 | 
 92 |         print("Generated Joke Ideas: \n", joke_ideas)
 93 | 
 94 |         judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
 95 |         print("Judge Score for each: ", judge_score)
 96 | 
 97 |         best_joke_idea_idx = judge_score.index(1)
 98 |         selected_joke_idea = joke_ideas[best_joke_idea_idx]
 99 |         print("Selected Joke Idea: \n", selected_joke_idea)
100 |         
101 |         joke = None
102 |         for _ in range(self.num_reflection_steps):
103 |             joke = self.idea_to_joke(joke_idea=selected_joke_idea,
104 |                                      joke_draft=joke)
105 |             print(joke)
106 |         return joke
107 | 
108 | 
109 | async def main():
110 |     # Define hyperparameters
111 |     joke_lms = ["openai/gpt-4.1", "gemini/gemini-1.5-pro"]
112 |     idea_lms = ["openai/gpt-4.1-mini", "gemini/gemini-2.0-flash"]
113 |     temperatures = [0.2, 0.7, 1.2]
114 |     num_samples = [2, 3]
115 |     num_reflection_steps = [1, 3]
116 |     
117 |     # Number of random combinations to test
118 |     num_trials = 10
119 | 
120 |     # List to store results
121 |     results = []
122 | 
123 |     for i in range(num_trials):
124 |         # Randomly select hyperparameters
125 |         selected_joke_lm = random.choice(joke_lms)
126 |         selected_idea_lm = random.choice(idea_lms)
127 |         selected_temperature = random.choice(temperatures)
128 |         selected_num_samples = random.choice(num_samples)
129 |         selected_num_reflection_steps = random.choice(num_reflection_steps)
130 | 
131 |         print(f"Trial {i+1}/{num_trials}: Running with: joke_lm={selected_joke_lm}, idea_lm={selected_idea_lm}, temperature={selected_temperature}, num_samples={selected_num_samples}, num_reflection_steps={selected_num_reflection_steps}")
132 | 
133 |         # Instantiate the generator with selected hyperparameters
134 |         joke_generator = ConditionalJokeGenerator(
135 |             joke_lm=selected_joke_lm,
136 |             idea_lm=selected_idea_lm,
137 |             temperature=selected_temperature,
138 |             num_samples=selected_num_samples,
139 |             num_reflection_steps=selected_num_reflection_steps
140 |         )
141 | 
142 |         start_time = time.time()
143 |         
144 |         try:
145 |             joke = await joke_generator.aforward(
146 |                 query="Write a joke about AI that has to do with them turning rogue."
147 |             )
148 |             latency = time.time() - start_time
149 |             results.append({
150 |                 "joke_lm": selected_joke_lm,
151 |                 "idea_lm": selected_idea_lm,
152 |                 "temperature": selected_temperature,
153 |                 "num_samples": selected_num_samples,
154 |                 "num_reflection_steps": selected_num_reflection_steps,
155 |                 "joke": joke.joke,
156 |                 "latency": latency
157 |             })
158 |             print(f"Finished in {latency:.2f} seconds.")
159 | 
160 |         except Exception as e:
161 |             print(f"An error occurred: {e}")
162 |             latency = time.time() - start_time
163 |             results.append({
164 |                 "joke_lm": selected_joke_lm,
165 |                 "idea_lm": selected_idea_lm,
166 |                 "temperature": selected_temperature,
167 |                 "num_samples": selected_num_samples,
168 |                 "num_reflection_steps": selected_num_reflection_steps,
169 |                 "joke": f"ERROR: {e}",
170 |                 "latency": latency
171 |             })
172 | 
173 |     # Create a DataFrame from the results
174 |     df = pd.DataFrame(results)
175 | 
176 |     # Print the DataFrame
177 |     print(df)
178 | 
179 |     # Save the DataFrame to a CSV file
180 |     df.to_csv("evaluation_results.csv", index=False)
181 | 
182 | 
183 | 
184 | if __name__ == "__main__":
185 |     asyncio.run(main())
186 | 
```