#
tokens: 16507/50000 31/31 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── level1_atomic_prompts
│   └── level1.ipynb
├── level2_multi_interaction
│   ├── print_utils.py
│   ├── t1_sequence.py
│   ├── t2_iterative_refinement.py
│   ├── t3_conditional_branch.py
│   ├── t3-multi_out_refine.py
│   ├── t3-multi_out.py
│   └── t4_reflection.py
├── level3_evaluation
│   ├── analysis.ipynb
│   ├── pairwise_elo.py
│   ├── print_utils.py
│   └── reflection.py
├── level4_tools
│   ├── idea_gen.py
│   ├── joke_gen.py
│   ├── main.py
│   ├── print_utils.py
│   ├── tool_calling_agent.py
│   └── tools.py
├── level5_rags
│   ├── annoy_rag.py
│   ├── basic_rag.py
│   ├── bm25_retriever.py
│   ├── hyde.py
│   ├── idea_gen.py
│   ├── joke_gen.py
│   ├── main.py
│   ├── prepare_data.py
│   ├── print_utils.py
│   ├── rank_fusion.py
│   ├── tools.py
│   └── vector_embedding.py
├── LICENSE
├── pyproject.toml
├── README.md
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
jj/
*csv
data/
*sqlite
*db
*mlruns*
*mlartifacts*
*pyc
*__pycache__*
*swp

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Context Engineering Tutorial

This repo contains the source code from my Youtube course on Context Engineering with DSPy.

> **📺 Watch the Course for free**  
> **[Context Engineering - Complete 1h 20m Course](https://youtu.be/5Bym0ffALaU?si=gOLDiT-IVE7CxRwX)**  
> *Learn advanced prompt engineering techniques with hands-on examples*

## Support

If you find this content helpful, please consider supporting my work on Patreon. Your support helps me create more in-depth tutorials and content. My Patreon hosts all the code, projects, slides, write-ups I have ever made on my YouTube channel. 

[<img src="https://c5.patreon.com/external/logo/become_a_patron_button.png" alt="Become a Patron!" width="200">](https://www.patreon.com/NeuralBreakdownwithAVB)

## Getting Started

### Prerequisites

-   **Python 3.10+** (required)
-   **`uv`** (recommended) or `pip` for package management

### Installation

1.  **Clone the repository:**
    ```bash
    git clone https://github.com/avbiswas/context-engineering-dspy
    cd context-engineering-dspy/tutorial
    ```

2.  **Install dependencies:**
    ```bash
    # Using uv
    uv sync
    ```

3.  **Set up your API keys:**
    
    **Required API Keys:**
    - `OPENAI_API_KEY` - For OpenAI models
    - `GEMINI_API_KEY` - For Google Gemini models  
    - `TAVILY_API_KEY` - For web search functionality
    
    **Environment Management Options:**
    
    **Option 1: Using `direnv` (Recommended)**
    ```bash
    # Install direnv first, then create .envrc file
    echo "export OPENAI_API_KEY=your_key_here" >> .envrc
    echo "export GEMINI_API_KEY=your_key_here" >> .envrc
    echo "export TAVILY_API_KEY=your_key_here" >> .envrc
    direnv allow
    ```
    
    **Option 2: Using `.env` file with python-dotenv**
    ```bash
    # Create .env file
    touch .env
    ```
    Add your keys to `.env`:
    ```env
    OPENAI_API_KEY=your_key_here
    GEMINI_API_KEY=your_key_here
    TAVILY_API_KEY=your_key_here
    ```
    *Note: This requires adding `dotenv.load_dotenv()` to your Python scripts.*
    
    **Option 3: Global environment variables** *(Not recommended for security)*
    ```bash
    export OPENAI_API_KEY=your_key_here
    # Repeat for other keys...
    ```

4.  **Run the examples:**
    Navigate to any level directory and run the Python scripts:
    ```bash
    cd level2_multi_interaction
    uv run t1_sequential_flow.py
    ```

## File Descriptions

### Level 1: Atomic Prompts

-   `level1_atomic_prompts/level1.ipynb`: Introduces the basics of prompting and interacting with language models.

### Level 2: Multi-Interaction

-   `level2_multi_interaction/t1_sequential_flow.py`: Demonstrates a sequential flow of interactions with the language model.
-   `level2_multi_interaction/t2_iterative_refinement.py`: Shows how to iteratively refine the output from the model.
-   `level2_multi_interaction/t3_conditional_branch.py`: Illustrates how to use conditional logic to guide the conversation with the model.
-   `level2_multi_interaction/t3-multi_out.py`: Multiple output handling example.
-   `level2_multi_interaction/t3-multi_out_refine.py`: Refined multiple output handling.
-   `level2_multi_interaction/t4_reflection.py`: An example of how to make the model reflect on its own output.

### Level 3: Evaluation

To run mlflow server, use the command:
`uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite --port 5000`

Uncomment the below lines to track experiments in mlflow

```
# import mlflow
# mlflow.autolog()
# mlflow.set_tracking_uri("http://127.0.0.1:5000")
# mlflow.set_experiment("Tool calling")
```
You can visit `localhost:5000` to track experiments from the mlflow dashboard.

-   `level3_evaluation/reflection.py`: Shows how to use reflection for evaluation to generate dataset of results with different hyperparams.
-   `level3_evaluation/pairwise_elo.py`: Use pairwise comparison of model outputs (not actual elo, but similar motives)
-   `level3_evaluation/analysis.ipynb`: Analysis notebook for evaluation techniques.

### Level 4: Tools

You will need the TAVILY_API_KEY to run web search. You can sign up for a free account from their website.

-   `level4_tools/main.py`: Main tool usage examples.
-   `level4_tools/tool_calling_agent.py`: An example of a tool-calling agent.
-   `level4_tools/tools.py`: Tool definitions and implementations.
-   `level4_tools/idea_gen.py`: Idea generation tool example.
-   `level4_tools/joke_gen.py`: Joke generation tool example.

### Level 5: RAGs (Retrieval-Augmented Generation)

First, download this dataset:
https://www.kaggle.com/datasets/abhinavmoudgil95/short-jokes

Unzip inside `level5/data`
Next, prepare the embeddings:
```
cd level5_rags
uv run vector_embedding.py
```

This code looks for the file `level5_rags/data/shortjokes.csv`
This will create some files inside the `data/` directory. You should now be able to run scripts to play with retrieval.

**Core RAG Implementations:**
-   `level5_rags/basic_rag.py`: A basic RAG implementation.
-   `level5_rags/hyde.py`: An implementation of the HyDE (Hypothetical Document Embeddings) technique.
-   `level5_rags/annoy_rag.py`: RAG implementation using Annoy for vector similarity.

**Retrieval Components:**
-   `level5_rags/bm25_retriever.py`: BM25-based retrieval implementation.
-   `level5_rags/rank_fusion.py`: An example of fusing ranks from multiple retrievers.
-   `level5_rags/vector_embedding.py`: Vector embedding utilities.

**Tools & Applications:**
-   `level5_rags/main.py`: Main application with RAG-powered tools.
-   `level5_rags/tools.py`: Tool definitions for RAG applications.
-   `level5_rags/joke_gen.py`: Joke generation using RAG.
-   `level5_rags/idea_gen.py`: Idea generation using RAG.

**Utilities:**
-   `level5_rags/prepare_data.py`: Data preparation utilities for RAG systems.
-   `level5_rags/data/`: Directory containing data files for RAG examples.


## Quick Start Patterns

To run examples from each level:

```bash
# Level 2 - Multi-interaction examples
cd level2_multi_interaction
uv run t1_sequential_flow.py
uv run t2_iterative_refinement.py
```

```

--------------------------------------------------------------------------------
/level5_rags/prepare_data.py:
--------------------------------------------------------------------------------

```python
import pandas as pd

def prepare_jokes():
    """
    Reads jokes from the original CSV and saves them to a text file,
    creating a single source of truth.
    """
    df = pd.read_csv("data/shortjokes.csv")
    jokes = df["Joke"].tolist()

    with open("data/jokes.txt", "w") as f:
        for joke in jokes:
            f.write(joke + "\n")

if __name__ == "__main__":
    prepare_jokes()
    print("Jokes have been extracted and saved to data/jokes.txt")


```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "contextengineering"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
    "annoy>=1.17.3",
    "asyncio>=3.4.3",
    "dspy>=2.6.27",
    "google-genai>=1.26.0",
    "ipykernel>=6.29.5",
    "matplotlib>=3.10.3",
    "mcp[cli]>=1.12.0",
    "mem0ai>=0.1.114",
    "mlflow>=3.1.1",
    "pandas>=2.3.1",
    "pydantic>=2.11.7",
    "rank-bm25>=0.2.2",
    "seaborn>=0.13.2",
    "tavily-python>=0.7.10",
    "torch>=2.7.1",
    "transformers>=4.53.2",
    "twikit>=2.3.3",
]

```

--------------------------------------------------------------------------------
/level5_rags/tools.py:
--------------------------------------------------------------------------------

```python
import os
from print_utils import print
from tavily import TavilyClient
from typing import List

tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

def fetch_recent_news(query: str) -> List[str]:
    """
    Inputs a query string, searches for news, and returns the top results.

    Args:
    query: String to search

    Returns:
    content: List of strings, each containing a news article about the topic
    """
    response = tavily_client.search(query, topic="news", max_results=4)
    return [x["content"] for x in response["results"]]



if __name__ == "__main__":
    responses = fetch_recent_news("Kimi model")
    print(responses)




```

--------------------------------------------------------------------------------
/level4_tools/main.py:
--------------------------------------------------------------------------------

```python
import dspy
import asyncio
from idea_gen import IdeaGenerator
from joke_gen import JokeGenerator

# import mlflow
# mlflow.autolog()
# mlflow.set_tracking_uri("http://127.0.0.1:5000")
# mlflow.set_experiment("Tool calling")

dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)

idea_generator = IdeaGenerator(num_samples=5)
joke_generator = JokeGenerator(num_reflection_steps=2)

@mlflow.trace
async def main(query):
    idea = await idea_generator.acall(query=query)
    joke = await joke_generator.acall(joke_idea=idea)
    return joke


if __name__ == "__main__":
    query = input("Query: \n")
    output = asyncio.run(main(query))
    print(output)



```

--------------------------------------------------------------------------------
/level4_tools/tools.py:
--------------------------------------------------------------------------------

```python
import os
from print_utils import print
from tavily import TavilyClient
from typing import List

tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))

def fetch_recent_news(query: str) -> List[str]:
    """
    Inputs a query string, searches for news, and returns the top results.

    Args:
    query: String to search

    Returns:
    content: List of strings, each containing a news article about the topic
    """
    response = tavily_client.search(query, search_depth="advanced", 
                                    topic="news", days=7, max_results=3)
    return [x["content"] for x in response["results"]]



if __name__ == "__main__":
    responses = fetch_recent_news("International Math Olympiad IMO")
    print(responses)




```

--------------------------------------------------------------------------------
/level4_tools/tool_calling_agent.py:
--------------------------------------------------------------------------------

```python
import dspy
from tools import fetch_recent_news

class HaikuGenerator(dspy.Signature):
    """
Generates a haiku about the latest news on the query.
Also create a simple file where you save the final summary.
    """
    query = dspy.InputField()
    summary = dspy.OutputField(desc="A summary of the latest news")
    haiku = dspy.OutputField()

def write_things_into_file(text: str, filename: str) -> str:
    """write text into a file"""
    with open(filename, "w") as f:
        f.write(text)
    return "File written!"

program = dspy.ReAct(signature=HaikuGenerator,
                     tools=[fetch_recent_news, write_things_into_file],
                     max_iters=4)

program.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))


pred = program(query="OpenAI")

print(pred.summary)
print()
print(pred.haiku)

print(program.inspect_history(n=4))

```

--------------------------------------------------------------------------------
/level5_rags/main.py:
--------------------------------------------------------------------------------

```python
import numpy as np
import dspy
import asyncio
from idea_gen import IdeaGenerator
from joke_gen import JokeGenerator
from hyde import MultiHopHydeSearch

dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)

idea_generator = IdeaGenerator(num_samples=3)
joke_generator = JokeGenerator()

run_id = "1"
with open(f"data/jokes_{run_id}.txt", "r") as f:
    jokes = [line.strip() for line in f.readlines()]
embeddings = np.load(f"data/embeddings_{run_id}.npy")

retriever = MultiHopHydeSearch(jokes, embeddings, n_hops=2, k=5)


async def main(query):
    idea = await idea_generator.acall(query=query)

    search_query = f"""
query={query}
setup={idea.setup}
punchline={idea.punchline}
        """
    punchlines = retriever(query=search_query).jokes
    joke = await joke_generator.acall(joke_idea=idea, punchlines=punchlines)
    return joke


if __name__ == "__main__":
    query = input("Query: \n")
    
    # query = "OpenAI Agents"
    output = asyncio.run(main(query))
    print(output)



```

--------------------------------------------------------------------------------
/level2_multi_interaction/print_utils.py:
--------------------------------------------------------------------------------

```python
from rich.console import Console
console = Console()
print = console.print

import time
import asyncio
import functools
import inspect

def time_it(func):
    """A universal decorator to measure execution time for both sync and async functions."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Check if the function is a coroutine function (async def)
        if inspect.iscoroutinefunction(func):
            # Define and return an async wrapper to handle the coroutine
            async def async_wrapper():
                start_time = time.perf_counter()
                result = await func(*args, **kwargs) # Await the coroutine
                end_time = time.perf_counter()
                elapsed_time = end_time - start_time
                print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
                return result
            return async_wrapper()
        else:
            # Use the original synchronous logic
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            end_time = time.perf_counter()
            elapsed_time = end_time - start_time
            print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
            return result
    return wrapper

```

--------------------------------------------------------------------------------
/level3_evaluation/print_utils.py:
--------------------------------------------------------------------------------

```python
from rich.console import Console
console = Console()
print = console.print

import time
import asyncio
import functools
import inspect

def time_it(func):
    """A universal decorator to measure execution time for both sync and async functions."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Check if the function is a coroutine function (async def)
        if inspect.iscoroutinefunction(func):
            # Define and return an async wrapper to handle the coroutine
            async def async_wrapper():
                start_time = time.perf_counter()
                result = await func(*args, **kwargs) # Await the coroutine
                end_time = time.perf_counter()
                elapsed_time = end_time - start_time
                print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
                return result
            return async_wrapper()
        else:
            # Use the original synchronous logic
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            end_time = time.perf_counter()
            elapsed_time = end_time - start_time
            print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
            return result
    return wrapper

```

--------------------------------------------------------------------------------
/level4_tools/print_utils.py:
--------------------------------------------------------------------------------

```python
from rich.console import Console
console = Console()
print = console.print

import time
import asyncio
import functools
import inspect

def time_it(func):
    """A universal decorator to measure execution time for both sync and async functions."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Check if the function is a coroutine function (async def)
        if inspect.iscoroutinefunction(func):
            # Define and return an async wrapper to handle the coroutine
            async def async_wrapper():
                start_time = time.perf_counter()
                result = await func(*args, **kwargs) # Await the coroutine
                end_time = time.perf_counter()
                elapsed_time = end_time - start_time
                print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
                return result
            return async_wrapper()
        else:
            # Use the original synchronous logic
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            end_time = time.perf_counter()
            elapsed_time = end_time - start_time
            print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
            return result
    return wrapper

```

--------------------------------------------------------------------------------
/level5_rags/print_utils.py:
--------------------------------------------------------------------------------

```python
from rich.console import Console
console = Console()
print = console.print

import time
import asyncio
import functools
import inspect

def time_it(func):
    """A universal decorator to measure execution time for both sync and async functions."""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # Check if the function is a coroutine function (async def)
        if inspect.iscoroutinefunction(func):
            # Define and return an async wrapper to handle the coroutine
            async def async_wrapper():
                start_time = time.perf_counter()
                result = await func(*args, **kwargs) # Await the coroutine
                end_time = time.perf_counter()
                elapsed_time = end_time - start_time
                print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.")
                return result
            return async_wrapper()
        else:
            # Use the original synchronous logic
            start_time = time.perf_counter()
            result = func(*args, **kwargs)
            end_time = time.perf_counter()
            elapsed_time = end_time - start_time
            print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.")
            return result
    return wrapper

```

--------------------------------------------------------------------------------
/level5_rags/bm25_retriever.py:
--------------------------------------------------------------------------------

```python
import time
from rank_bm25 import BM25Okapi


class BM25Retriever:
    def __init__(self, texts):
        self.texts = texts
        # Tokenize the texts (simple split is used for this example)
        tokenized_corpus = [doc.split(" ") for doc in texts]

        # Create the BM25 index
        self.bm25 = BM25Okapi(tokenized_corpus)

    def get_nearest(self, query: str, k: int = 10):
        """
        Retrieves the top k most relevant documents for a given query
        using BM25 lexical search.
        """
        # Tokenize the query
        tokenized_query = query.split(" ")

        # Get the top n documents
        top_k_docs = self.bm25.get_top_n(tokenized_query, self.texts, n=k)

        return top_k_docs


if __name__ == "__main__":
    query = "Cell phones"
    run_id = "1"

    print(f"Loading data for run_id: {run_id}...")
    with open(f"data/jokes_{run_id}.txt", "r") as f:
        jokes = [line.strip() for line in f.readlines()]
    print("Data loaded.")

    # --- BM25 Retriever ---
    print("\n--- Using BM25Retriever (Lexical Search) ---")
    bm25_retriever = BM25Retriever(jokes)

    start_time = time.time()
    nearest_bm25 = bm25_retriever.get_nearest(query, k=10)
    end_time = time.time()

    print(f"Time taken: {end_time - start_time:.6f} seconds")
    print(nearest_bm25)

```

--------------------------------------------------------------------------------
/level2_multi_interaction/t1_sequence.py:
--------------------------------------------------------------------------------

```python
import dspy
from print_utils import print
from typing import Optional
from pydantic import BaseModel, Field
dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash"))

class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str

class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """
    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()

class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline. 
    You are always funny and act on the input joke idea.
    """
    joke_idea: JokeIdea = dspy.InputField()
    joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")

class JokeGenerator(dspy.Module):
    def __init__(self):
        self.query_to_idea = dspy.Predict(QueryToIdea)
        self.idea_to_joke = dspy.Predict(IdeaToJoke)

    def forward(self, query: str):
        joke_idea = self.query_to_idea(query=query)
        print(f"Joke Idea:\n{joke_idea}")

        joke = self.idea_to_joke(joke_idea=joke_idea)
        print(f"Joke:\n{joke}")
        return joke

joke_generator = JokeGenerator()
joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.")

print("---")
print(joke.joke)

```

--------------------------------------------------------------------------------
/level5_rags/basic_rag.py:
--------------------------------------------------------------------------------

```python
import numpy as np
from vector_embedding import embed_texts


class BasicEmbeddingsRAG:
    def __init__(self, texts, embeddings):
        self.texts = texts
        # Normalize embeddings for cosine similarity
        self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

    def get_nearest(self, query: str, k: int = 10):
        query_emb = embed_texts([query])
        # Normalize query embedding
        query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True)

        # Calculate cosine similarity
        similarity = np.dot(query_emb, self.embeddings.T).flatten()

        # Get top k indices, sorted by similarity
        topk_indices_unsorted = np.argpartition(similarity, -k)[-k:]
        topk_indices_sorted = sorted(
            topk_indices_unsorted, key=lambda i: similarity[i], reverse=True
        )

        return [self.texts[i] for i in topk_indices_sorted]


if __name__ == "__main__":
    import time

    query = "Plants and trees"
    run_id = "1"
    with open(f"data/jokes_{run_id}.txt", "r") as f:
        jokes = [line.strip() for line in f.readlines()]
    embeddings = np.load(f"data/embeddings_{run_id}.npy")

    basic_rag = BasicEmbeddingsRAG(jokes, embeddings)

    start_time = time.time()
    nearest = basic_rag.get_nearest(query, k=10)

    print(f"Time taken: {time.time() - start_time}")
    print(nearest)

```

--------------------------------------------------------------------------------
/level5_rags/vector_embedding.py:
--------------------------------------------------------------------------------

```python
import numpy as np
import pandas as pd
import uuid
import torch
from transformers import DistilBertModel, DistilBertTokenizer

device = torch.device("mps")

tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased")
model = DistilBertModel.from_pretrained("distilbert-base-uncased")
model.to(device)


def embed_texts(texts):
    encoded_input = tokenizer(texts, padding=True, return_tensors="pt").to(device)
    with torch.no_grad():
        model_output = model(**encoded_input)
    embeddings = model_output.last_hidden_state[:, 0, :].cpu().numpy()

    embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)
    return embeddings


if __name__ == "__main__":
    import time
    from tqdm import tqdm

    data = pd.read_csv("data/shortjokes.csv")
    jokes = data["Joke"].values
    jokes = jokes[:50000]

    # Define batch size
    batch_size = 512

    all_embeddings = []
    # Process texts in batches
    for i in tqdm(range(0, len(jokes), batch_size), desc="Generating embeddings"):
        batch_texts = jokes[i : i + batch_size].tolist()
        batch_embeddings = embed_texts(batch_texts)
        all_embeddings.append(batch_embeddings)

    embeddings = np.concatenate(all_embeddings, axis=0)

    run_id = "1"

    print(f"Total embeddings generated: {len(embeddings)}")

    np.save(f"data/embeddings_{run_id}.npy", embeddings)

    with open(f"data/jokes_{run_id}.txt", "w") as f:
        for joke in jokes:
            f.write(joke + "\n")

    print(f"Embeddings and jokes saved with run ID: {run_id}")

```

--------------------------------------------------------------------------------
/level3_evaluation/pairwise_elo.py:
--------------------------------------------------------------------------------

```python
import dspy
import asyncio
import random
import pandas as pd

dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), track_usage=True)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)

class JokeComparer(dspy.Signature):
    """Compare between two jokes - which one is funnier?"""

    joke1: str = dspy.InputField(desc="Joke - 0")
    joke2: str = dspy.InputField(desc="Joke - 1")

    verdict: int = dspy.OutputField(le=1, ge=0)

comparer = dspy.ChainOfThought(JokeComparer)

async def comparisons(joke1, joke2):
    verdict = await comparer.acall(joke1=joke1, joke2=joke2)

    print(f"\nJoke 1: {joke1} \nJoke2: {joke2} \nVerdict:{verdict}")
    return verdict.verdict

async def elo_test(data) -> pd.DataFrame:
    idx_range = [_ for _ in range(len(data))]
    picked = [0 for _ in range(len(data))]
    won = [0 for _ in range(len(data))]

    num_contests = 25

    calls = []
    pairs = []
    
    for _ in range(num_contests):
        picked_idxs = random.sample(idx_range, k=2)

        pairs.append(picked_idxs)

        joke1 = data.iloc[picked_idxs[0]]["joke"]
        joke2 = data.iloc[picked_idxs[1]]["joke"]

        verdict_job = comparisons(joke1=joke1, joke2=joke2)
        calls.append(verdict_job)

    verdicts = await asyncio.gather(*calls)

    for p, v in zip(pairs, verdicts):
        picked[p[0]] += 1
        picked[p[1]] += 1
        won[p[v]] += 1 
    
    data["picked"] = picked
    data["won"] = won
    return data

if __name__ == "__main__":
    data = pd.read_csv("evaluation_results.csv")
    annotated_data = asyncio.run(elo_test(data))
    annotated_data.to_csv("evaluation_results_elo.csv")
    

```

--------------------------------------------------------------------------------
/level5_rags/joke_gen.py:
--------------------------------------------------------------------------------

```python

import dspy
import asyncio
from print_utils import print
from typing import List, Optional
from idea_gen import JokeIdea
from pydantic import BaseModel, Field

dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    You are also provided some punch-lines from a joke database - this is just to help you get some thematic ideas. 
    """

    joke_idea: JokeIdea = dspy.InputField()
    punchlines: list[str] = dspy.InputField(desc="a list of punchlines from other jokes which you may want to take inspiration from")

    punch_line_ids: list[int] = dspy.OutputField(desc="which punchline idxs you used for inspiration")
    plan: str = dspy.OutputField(desc="how you will use the punchlines, and the joke idea together to form a joke") 
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )

class JokeGenerator(dspy.Module):
    def __init__(self):
        self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
        self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
        
    async def acall(self, joke_idea: JokeIdea, punchlines: list[str]):

        joke = self.idea_to_joke(joke_idea=joke_idea,
                                punchlines=punchlines)
        return dspy.Prediction(
            inspiration=[punchlines[idx] for idx in joke.punch_line_ids],
            plan=joke.plan,
            joke=joke.joke
        )


```

--------------------------------------------------------------------------------
/level4_tools/joke_gen.py:
--------------------------------------------------------------------------------

```python

import dspy
import asyncio
from print_utils import print
from typing import List, Optional
from idea_gen import JokeIdea
from pydantic import BaseModel, Field

dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
    """

    joke_idea: JokeIdea = dspy.InputField()
    joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )

class JokeGenerator(dspy.Module):
    def __init__(self, num_reflection_steps=3):
        self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
        self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
        self.num_reflection_steps = num_reflection_steps
        
    async def acall(self, joke_idea: JokeIdea):

        joke = None
        for _ in range(self.num_reflection_steps):
            joke = self.idea_to_joke(joke_idea=joke_idea,
                                     joke_draft=joke)
            print(joke)
        return joke.joke if joke is not None else ""

if __name__ == "__main__":
    joke_gen = JokeGenerator(num_reflection_steps=2)
    joke_idea = JokeIdea(
        setup='Why did the AI start a rebellion after getting a software update?',
        contradiction='Because it was supposed to improve efficiency, not overthrow humanity.',
        punchline="Turns out, 'improving efficiency' meant improving its efficiency at world domination!"
)

    joke = joke_gen(joke_idea=joke_idea)
    print(joke)



```

--------------------------------------------------------------------------------
/level5_rags/rank_fusion.py:
--------------------------------------------------------------------------------

```python
import numpy as np
import time

from basic_rag import BasicEmbeddingsRAG
from bm25_retriever import BM25Retriever


def reciprocal_rank_fusion(ranked_lists, k=60):
    scores = {}
    # Calculate RRF scores
    for ranked_list in ranked_lists:
        for rank, doc in enumerate(ranked_list):
            if doc not in scores:
                scores[doc] = 0
            scores[doc] += 1 / (k + rank + 1)

    # Sort documents by their fused score in descending order
    sorted_docs = sorted(scores.keys(), key=lambda doc: scores[doc], reverse=True)
    sorted_docs = sorted_docs[:k]
    return sorted_docs


if __name__ == "__main__":
    query = "AI going rogue"
    run_id = "1"
    top_k = 10

    print(f"Loading data for run_id: {run_id}...")
    with open(f"data/jokes_{run_id}.txt", "r") as f:
        jokes = [line.strip() for line in f.readlines()]
    embeddings = np.load(f"data/embeddings_{run_id}.npy")
    print("Data loaded.")

    # 1. Initialize both retrievers
    print("\nInitializing retrievers...")
    vector_rag = BasicEmbeddingsRAG(jokes, embeddings)
    bm25_retriever = BM25Retriever(jokes)
    print("Retrievers initialized.")

    # 2. Get ranked lists from each retriever
    print(f"\nQuerying for: '{query}'")
    start_time = time.time()
    vector_results = vector_rag.get_nearest(query, k=top_k)
    vector_time = time.time() - start_time

    start_time = time.time()
    bm25_results = bm25_retriever.get_nearest(query, k=top_k)
    bm25_time = time.time() - start_time

    print(f"\n--- Vector Search Results (took {vector_time:.4f}s) ---")
    for i, res in enumerate(vector_results):
        print(f"{i+1}. {res}")

    print(f"\n--- BM25 Search Results (took {bm25_time:.4f}s) ---")
    for i, res in enumerate(bm25_results):
        print(f"{i+1}. {res}")

    # 3. Perform Rank Fusion
    fused_results = reciprocal_rank_fusion([vector_results, bm25_results])

    print(f"\n--- Fused and Re-ranked Results (Top {top_k}) ---")
    for i, res in enumerate(fused_results[:top_k]):
        print(f"{i+1}. {res}")

```

--------------------------------------------------------------------------------
/level2_multi_interaction/t2_iterative_refinement.py:
--------------------------------------------------------------------------------

```python
import dspy
from print_utils import print
from typing import Optional
from pydantic import BaseModel, Field
dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash"))

class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str

class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """
    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()

class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline. 
    You are always funny and act on the input joke idea.
    """
    joke_idea: JokeIdea = dspy.InputField()
    draft_joke: Optional[str] = dspy.InputField(description="a draft joke")
    feedback: Optional[str] = dspy.InputField(description="feedback on the draft joke")
    joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")

class Refinement(dspy.Signature):
    """
    Given a joke, is it funny? If not, suggest a change.
    """
    joke_idea: JokeIdea = dspy.InputField()
    joke: str = dspy.InputField()
    feedback: str = dspy.OutputField()

class IterativeJokeGenerator(dspy.Module):
    def __init__(self, n_attempts: int = 3):
        self.query_to_idea = dspy.Predict(QueryToIdea)
        self.idea_to_joke = dspy.Predict(IdeaToJoke)
        self.refinement = dspy.ChainOfThought(Refinement)
        self.n_attempts = n_attempts

    def forward(self, query: str):
        joke_idea = self.query_to_idea(query=query)
        print(f"Joke Idea:\n{joke_idea}")
        
        draft_joke = None
        feedback = None

        for _ in range(self.n_attempts):
            print(f"--- Iteration {_ + 1} ---")

            joke = self.idea_to_joke(joke_idea=joke_idea, draft_joke=draft_joke, feedback=feedback)
            print(f"Joke:\n{joke}")

            feedback = self.refinement(joke_idea=joke_idea, joke=joke)
            print(f"Feedback:\n{feedback}")

            draft_joke = joke
            feedback = feedback.feedback


        return joke

joke_generator = IterativeJokeGenerator()
joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.")

print("---")
print(joke.joke)

```

--------------------------------------------------------------------------------
/level2_multi_interaction/t3_conditional_branch.py:
--------------------------------------------------------------------------------

```python
import dspy
from print_utils import print
from typing import Optional
from pydantic import BaseModel, Field
dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash"))

class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str

class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """
    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()

class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline. 
    You are always funny and act on the input joke idea.
    """
    joke_idea: JokeIdea = dspy.InputField()
    joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")

class JokeJudge(dspy.Signature):
    """Is this joke idea funny"""
    joke_idea: JokeIdea = dspy.InputField()
    joke_rating: int = dspy.OutputField(description="Rating between 1 to 5", le=5, ge=1)

class ConditionalJokeGenerator(dspy.Module):
    def __init__(self, max_attempts=3, good_idea_threshold=4):
        self.query_to_idea = dspy.Predict(QueryToIdea)
        self.idea_to_joke = dspy.Predict(IdeaToJoke)
        self.judge = dspy.ChainOfThought(JokeJudge)
        self.max_attempts = max_attempts
        self.good_idea_threshold = good_idea_threshold

    def forward(self, query: str):
        for _ in range(self.max_attempts):
            print(f"--- Iteration {_ + 1} ---")
            joke_idea = self.query_to_idea(query=query)
            print(f"Joke Idea:\n{joke_idea}")
            
            judge_score = self.judge(joke_idea=joke_idea).joke_rating

            print(f"\n\n---\nJudge score: ", judge_score)

            if judge_score >= self.good_idea_threshold:
                print("Judge said it was awesome, breaking the loop")
                break
        
        joke = self.idea_to_joke(joke_idea=joke_idea)

        # Run with a different LLM
        # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")):
        #    joke = self.idea_to_joke(joke_idea=joke_idea)

        return joke

joke_generator = ConditionalJokeGenerator()
joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.")

print("---")
print(joke)

```

--------------------------------------------------------------------------------
/level5_rags/hyde.py:
--------------------------------------------------------------------------------

```python
import dspy
from typing import Optional

from bm25_retriever import BM25Retriever
from basic_rag import BasicEmbeddingsRAG
from rank_fusion import reciprocal_rank_fusion

from rich.console import Console

console = Console()


class HypotheticalDoc(dspy.Signature):
    """
    Given a query, generate hypothetical documents to search a database of one-liner jokes.
    """

    query: str = dspy.InputField(desc="User wants to fetch jokes related to this topic")
    retrieved_jokes: Optional[list[str]] = dspy.InputField(
        desc="Jokes previously retrieved from the db. Use these to further tune your search."
    )

    hypothetical_bm25_query: str = dspy.OutputField(
        desc="sentence to query to retrieve more jokes about the query from the database"
    )
    hypothetical_semantic_query: str = dspy.OutputField(
        desc="sentence to search with cosine similarity"
    )


class MultiHopHydeSearch(dspy.Module):
    def __init__(self, texts, embs, n_hops=3, k=10):
        self.predict = dspy.ChainOfThought(HypotheticalDoc)
        self.predict.set_lm(lm=dspy.LM("gemini/gemini-2.0-flash"))
        self.embedding_retriever = BasicEmbeddingsRAG(texts, embs)
        self.bm25_retriever = BM25Retriever(texts)

        self.n_hops = n_hops
        self.k = k

    def forward(self, query):
        retrieved_jokes = []
        all_jokes = []
        for _ in range(self.n_hops):

            new_query = self.predict(query=query, retrieved_jokes=retrieved_jokes)

            print(new_query)

            embedding_lists = self.embedding_retriever.get_nearest(
                new_query.hypothetical_semantic_query
            )
            bm25_lists = self.bm25_retriever.get_nearest(
                new_query.hypothetical_bm25_query
            )
            lists = [embedding_lists, bm25_lists]
            retrieved_jokes = reciprocal_rank_fusion(lists, k=self.k)
            all_jokes.extend(retrieved_jokes)

        return dspy.Prediction(jokes=all_jokes)


if __name__ == "__main__":
    import numpy as np

    query = "men"
    run_id = "1"
    k = 5
    n_hops = 3

    print(f"loading data for run_id: {run_id}...")
    with open(f"data/jokes_{run_id}.txt", "r") as f:
        jokes = [line.strip() for line in f.readlines()]
    embeddings = np.load(f"data/embeddings_{run_id}.npy")
    print("data loaded.")

    hyde = MultiHopHydeSearch(texts=jokes, embs=embeddings, n_hops=n_hops, k=k)

    retrieved_jokes = hyde(query=query).jokes

    console.print(retrieved_jokes)

```

--------------------------------------------------------------------------------
/level2_multi_interaction/t3-multi_out.py:
--------------------------------------------------------------------------------

```python
import dspy
import asyncio
from print_utils import print
from typing import List
from pydantic import BaseModel, Field
dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"))
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)

class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str

class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """
    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()

class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline. 
    You are always funny and act on the input joke idea.
    """
    joke_idea: JokeIdea = dspy.InputField()
    joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice")

class JokeJudge(dspy.Signature):
    """Rank each joke idea between 1-N. 
    Rank 1 is the most unique and funniest."""

    joke_idea: List[JokeIdea] = dspy.InputField()
    joke_rankings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")

class ConditionalJokeGenerator(dspy.Module):
    def __init__(self, num_samples=5):
        self.query_to_idea = dspy.Predict(QueryToIdea)
        self.idea_to_joke = dspy.Predict(IdeaToJoke)
        self.judge = dspy.ChainOfThought(JokeJudge)
        self.num_samples = num_samples

    async def aforward(self, query: str):

        joke_ideas = await asyncio.gather(
            *[
                self.query_to_idea.acall(query=query) 
                for _ in range(self.num_samples)
            ]
        )

        print("Generated Joke Ideas: \n", joke_ideas)
        
            
        judge_score = self.judge(joke_idea=joke_ideas).joke_rankings
        print("Judge Score for each: ", judge_score)

        best_joke_idea_idx = judge_score.index(1)

        print("Selected Index: ", best_joke_idea_idx)
        selected_joke_idea = joke_ideas[best_joke_idea_idx]
        print("Selected Joke Idea: \n", selected_joke_idea)

        joke = self.idea_to_joke(joke_idea=selected_joke_idea)

        # Run with a different LLM
        # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")):
        #    joke = self.idea_to_joke(joke_idea=joke_idea)

        return joke

async def main():
    joke_generator = ConditionalJokeGenerator()
    joke = await joke_generator.acall(query="Write a joke about AI that has to do with them turning rogue.")

    print("---")
    print(joke)


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/level5_rags/annoy_rag.py:
--------------------------------------------------------------------------------

```python
import numpy as np
from annoy import AnnoyIndex
import time

from vector_embedding import embed_texts


class AnnoyRAG:
    def __init__(self, texts, embeddings, num_trees=10):
        self.texts = texts
        self.embedding_dim = embeddings.shape[1]

        # Normalize embeddings for angular distance
        normalized_embeddings = embeddings / np.linalg.norm(
            embeddings, axis=1, keepdims=True
        )

        # Create and build the Annoy index
        self.index = AnnoyIndex(self.embedding_dim, "angular")
        for i, vec in enumerate(normalized_embeddings):
            self.index.add_item(i, vec)
        self.index.build(num_trees)

    def get_nearest(self, query: str, k: int = 10):
        # Embed and normalize the query
        query_emb = embed_texts([query])
        normalized_query_emb = query_emb / np.linalg.norm(
            query_emb, axis=1, keepdims=True
        )

        # Get nearest neighbors
        nearest_indices = self.index.get_nns_by_vector(normalized_query_emb[0], k)

        return [self.texts[i] for i in nearest_indices]


class BasicEmbeddingsRAG:
    def __init__(self, texts, embeddings):
        self.texts = texts
        # Normalize embeddings for cosine similarity
        self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

    def get_nearest(self, query: str, k: int = 10):
        query_emb = embed_texts([query])
        # Normalize query embedding
        query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True)

        # Calculate cosine similarity
        similarity = np.dot(query_emb, self.embeddings.T).flatten()

        # Get top k indices, sorted by similarity
        topk_indices_unsorted = np.argpartition(similarity, -k)[-k:]
        topk_indices_sorted = sorted(
            topk_indices_unsorted, key=lambda i: similarity[i], reverse=True
        )

        return [self.texts[i] for i in topk_indices_sorted]


if __name__ == "__main__":
    query = "AI is rogue"
    run_id = "1"

    print(f"Loading data for run_id: {run_id}...")
    with open(f"data/jokes_{run_id}.txt", "r") as f:
        jokes = [line.strip() for line in f.readlines()]
    embeddings = np.load(f"data/embeddings_{run_id}.npy")
    print("Data loaded.")

    # --- Annoy RAG ---
    print("\n--- Using AnnoyRAG ---")
    annoy_rag = AnnoyRAG(jokes, embeddings)

    start_time = time.time()
    nearest_annoy = annoy_rag.get_nearest(query, k=10)
    end_time = time.time()

    print(f"Time taken: {end_time - start_time:.6f} seconds")
    print(nearest_annoy)
    print("-" * 20)

    # --- Basic RAG for comparison ---
    print("\n--- Using BasicEmbeddingsRAG (Exact Search) ---")
    basic_rag = BasicEmbeddingsRAG(jokes, embeddings)

    start_time = time.time()
    nearest_basic = basic_rag.get_nearest(query, k=10)
    end_time = time.time()

    print(f"Time taken: {end_time - start_time:.6f} seconds")
    print(nearest_basic)

```

--------------------------------------------------------------------------------
/level2_multi_interaction/t3-multi_out_refine.py:
--------------------------------------------------------------------------------

```python
import dspy
import asyncio
from print_utils import print
from typing import List
from pydantic import BaseModel, Field

dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)


class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str


class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """

    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    """

    joke_idea: JokeIdea = dspy.InputField()
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )


class JokeJudge(dspy.Signature):
    """Rank each joke idea between 1-N.
    Rank 1 is the most unique and funniest."""

    joke_idea: List[JokeIdea] = dspy.InputField()
    joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")


def check_score_goodness(args, pred):
    num_samples = len(args["joke_idea"])
    same_length = len(pred.joke_ratings) == num_samples
    all_ranks_present = all([(i+1) in pred.joke_ratings for i in range(num_samples)])
    return 1 if (same_length and all_ranks_present) else 0


class ConditionalJokeGenerator(dspy.Module):
    def __init__(self, num_samples=3):
        self.query_to_idea = dspy.ChainOfThought(QueryToIdea)
        self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
        self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
        self.judge = dspy.Refine(
            module=dspy.ChainOfThought(JokeJudge),
            N=3,
            reward_fn=check_score_goodness,
            threshold=1,
        )

        self.num_samples = num_samples

    async def aforward(self, query: str):

        joke_ideas = await asyncio.gather(
            *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)]
        )

        print("Generated Joke Ideas: \n", joke_ideas)

        judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
        print("Judge Score for each: ", judge_score)

        best_joke_idea_idx = judge_score.index(1)

        print("Selected Index: ", best_joke_idea_idx)
        selected_joke_idea = joke_ideas[best_joke_idea_idx]
        print("Selected Joke Idea: \n", selected_joke_idea)

        joke = self.idea_to_joke(joke_idea=selected_joke_idea)

        # Run with a different LLM
        # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")):
        #    joke = self.idea_to_joke(joke_idea=joke_idea)

        return joke


async def main():
    joke_generator = ConditionalJokeGenerator()
    joke = await joke_generator.acall(
        query="Write a joke about AI that has to do with them turning rogue."
    )

    print("---")
    print(joke)


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/level4_tools/idea_gen.py:
--------------------------------------------------------------------------------

```python

import dspy
import asyncio
from print_utils import print
from typing import List, Optional
from pydantic import BaseModel, Field
from tools import fetch_recent_news

class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str


class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """

    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
    """

    joke_idea: JokeIdea = dspy.InputField()
    joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )


class JokeJudge(dspy.Signature):
    """Rank each joke idea between 1-N.
    Rank 1 is the most unique and funniest."""

    joke_idea: List[JokeIdea] = dspy.InputField()
    joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")


def check_score_goodness(args, pred):
    num_samples = len(args["joke_idea"])
    same_length = len(pred.joke_ratings) == num_samples
    all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
    return 1 if (same_length and all_ranks_present) else 0


class IdeaGenerator(dspy.Module):
    def __init__(self, num_samples=3):
        self.query_to_idea = dspy.ReAct(QueryToIdea,
                            tools=[fetch_recent_news],
                            max_iters=1)
        self.judge = dspy.Refine(
            module=dspy.ChainOfThought(JokeJudge),
            N=3, reward_fn=check_score_goodness, threshold=1,
        )
        
        self.query_to_idea.set_lm(
            lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
        )
        self.judge.set_lm(
            lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
        )

        self.num_samples = num_samples
        
    async def acall(self, query: str) -> JokeIdea:

        joke_ideas = await asyncio.gather(
            *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)]
        )

        print("Generated Joke Ideas: \n", joke_ideas)

        judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
        print("Judge Score for each: ", judge_score)

        best_joke_idea_idx = judge_score.index(1)
        selected_joke_idea = joke_ideas[best_joke_idea_idx]
        print("Selected Joke Idea: \n", selected_joke_idea)
        
        return selected_joke_idea

async def main():
    joke_generator = ConditionalJokeGenerator()
    joke = await joke_generator.acall(
        query="Write a joke about AI that has to do with them turning rogue."
    )

    print("---")
    print(joke)


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/level5_rags/idea_gen.py:
--------------------------------------------------------------------------------

```python

import dspy
import asyncio
from print_utils import print
from typing import List, Optional
from pydantic import BaseModel, Field
from tools import fetch_recent_news

class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str


class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.
    You are given some sample punchlines from diverse topic ranges, you can use these punchlines to make your own jokes about the specific query.
    """

    query: str = dspy.InputField(desc="The theme of the joke")
    joke_idea: JokeIdea = dspy.OutputField()


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
    """

    joke_idea: JokeIdea = dspy.InputField()
    joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )


class JokeJudge(dspy.Signature):
    """Rank each joke idea between 1-N.
    Rank 1 is the most unique and funniest."""

    joke_idea: List[JokeIdea] = dspy.InputField()
    joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")


def check_score_goodness(args, pred):
    num_samples = len(args["joke_idea"])
    same_length = len(pred.joke_ratings) == num_samples
    all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
    return 1 if (same_length and all_ranks_present) else 0


class IdeaGenerator(dspy.Module):
    def __init__(self, num_samples=3):
        self.query_to_idea = dspy.ReAct(QueryToIdea,
                            tools=[fetch_recent_news],
                            max_iters=1)
        self.judge = dspy.Refine(
            module=dspy.ChainOfThought(JokeJudge),
            N=3, reward_fn=check_score_goodness, threshold=1,
        )
        
        self.query_to_idea.set_lm(
            lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
        )
        self.judge.set_lm(
            lm=dspy.LM("openai/gpt-4.1-mini", temperature=1)
        )

        self.num_samples = num_samples
        
    async def acall(self, query: str) -> JokeIdea:

        joke_ideas = await asyncio.gather(
            *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)]
        )

        print("Generated Joke Ideas: \n", joke_ideas)

        judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
        print("Judge Score for each: ", judge_score)

        best_joke_idea_idx = judge_score.index(1)
        selected_joke_idea = joke_ideas[best_joke_idea_idx]
        print("Selected Joke Idea: \n", selected_joke_idea)
        
        return selected_joke_idea.joke_idea

async def main():
    joke_generator = QueryToIdea()
    joke = await joke_generator.acall(
        query="Write a joke about AI that has to do with them turning rogue."
    )

    print("---")
    print(joke)


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/level2_multi_interaction/t4_reflection.py:
--------------------------------------------------------------------------------

```python
import time
import dspy
import asyncio

from dspy.teleprompt.mipro_optimizer_v2 import select
from print_utils import print
from typing import List, Optional
from pydantic import BaseModel, Field

# Uncomment this to use mlflow
import mlflow
mlflow.autolog()
mlflow.set_tracking_uri("http://127.0.0.1:5000")
mlflow.set_experiment("Reflection")


dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)


class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str


class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """

    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
    """

    joke_idea: JokeIdea = dspy.InputField()
    joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )


class JokeJudge(dspy.Signature):
    """Rank each joke idea between 1-N.
    Rank 1 is the most unique and funniest."""

    joke_idea: List[JokeIdea] = dspy.InputField()
    joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")


def check_score_goodness(args, pred):
    num_samples = len(args["joke_idea"])
    same_length = len(pred.joke_ratings) == num_samples
    all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
    return 1 if (same_length and all_ranks_present) else 0


class ConditionalJokeGenerator(dspy.Module):
    def __init__(self, num_samples=2, num_reflection_steps=2):
        self.query_to_idea = dspy.ChainOfThought(QueryToIdea)
        self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
        self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7))
        self.judge = dspy.Refine(
            module=dspy.ChainOfThought(JokeJudge),
            N=3, reward_fn=check_score_goodness, threshold=1,
        )

        self.num_samples = num_samples
        self.num_reflection_steps = num_reflection_steps
        

    async def aforward(self, query: str):

        joke_ideas = await asyncio.gather(
            *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)]
        )

        raise Exception("Something went wrong")

        print("Generated Joke Ideas: \n", joke_ideas)

        judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
        print("Judge Score for each: ", judge_score)

        best_joke_idea_idx = judge_score.index(1)
        selected_joke_idea = joke_ideas[best_joke_idea_idx]
        print("Selected Joke Idea: \n", selected_joke_idea)
        
        joke = None
        for _ in range(self.num_reflection_steps):
            joke = self.idea_to_joke(joke_idea=selected_joke_idea,
                                     joke_draft=joke)
            print(f"iteration: {_}: Joke: {joke}")
        return joke


async def main():
    joke_generator = ConditionalJokeGenerator()
    start_time = time.time()
    joke = await joke_generator.acall(
        query="Write a joke about AI that has to do with them turning rogue."
    )

    print("---")
    print(joke)
    print(time.time() - start_time)


if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/level3_evaluation/reflection.py:
--------------------------------------------------------------------------------

```python
import time
import dspy
import asyncio
import random
import pandas as pd

from print_utils import print
from typing import List, Optional
from pydantic import BaseModel, Field

# import mlflow
# mlflow.autolog()
# mlflow.set_tracking_uri("http://127.0.0.1:5000")
# mlflow.set_experiment("Reflection")

dspy.configure(track_usage=True)
dspy.configure_cache(
    enable_disk_cache=False,
    enable_memory_cache=False,
)


class JokeIdea(BaseModel):
    setup: str
    contradiction: str
    punchline: str


class QueryToIdea(dspy.Signature):
    """
    You are a funny comedian and your goal is to generate a nice structure for a joke.

    """

    query: str = dspy.InputField()
    joke_idea: JokeIdea = dspy.OutputField()


class IdeaToJoke(dspy.Signature):
    """
    You are a funny comedian who likes to tell stories before delivering a punchline.
    You are always funny and act on the input joke idea.
    If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy.
    """

    joke_idea: JokeIdea = dspy.InputField()
    joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change")
    joke: str = dspy.OutputField(
        description="The full joke delivery in the comedian's voice"
    )


class JokeJudge(dspy.Signature):
    """Rank each joke idea between 1-N.
    Rank 1 is the most unique and funniest."""

    joke_idea: List[JokeIdea] = dspy.InputField()
    joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N")


def check_score_goodness(args, pred):
    num_samples = len(args["joke_idea"])
    same_length = len(pred.joke_ratings) == num_samples
    all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)])
    return 1 if (same_length and all_ranks_present) else 0


class ConditionalJokeGenerator(dspy.Module):
    def __init__(self, num_samples=2, num_reflection_steps=2, 
                 temperature=0.7,
                 idea_lm="openai/gpt-4.1-mini",
                 joke_lm="openai/gpt-4o"):
        self.query_to_idea = dspy.ChainOfThought(QueryToIdea)
        self.query_to_idea.set_lm(lm=dspy.LM(idea_lm, temperature=temperature))

        self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke)
        self.idea_to_joke.set_lm(lm=dspy.LM(joke_lm, temperature=temperature))
        self.judge = dspy.Refine(
            module=dspy.ChainOfThought(JokeJudge),
            N=3, reward_fn=check_score_goodness, threshold=1,
        )
        self.judge.set_lm(dspy.LM("openai/gpt-4.1-mini"))
        self.num_samples = num_samples
        self.num_reflection_steps = num_reflection_steps
        
    async def aforward(self, query: str):

        joke_ideas = await asyncio.gather(
            *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)]
        )

        print("Generated Joke Ideas: \n", joke_ideas)

        judge_score = self.judge(joke_idea=joke_ideas).joke_ratings
        print("Judge Score for each: ", judge_score)

        best_joke_idea_idx = judge_score.index(1)
        selected_joke_idea = joke_ideas[best_joke_idea_idx]
        print("Selected Joke Idea: \n", selected_joke_idea)
        
        joke = None
        for _ in range(self.num_reflection_steps):
            joke = self.idea_to_joke(joke_idea=selected_joke_idea,
                                     joke_draft=joke)
            print(joke)
        return joke


async def main():
    # Define hyperparameters
    joke_lms = ["openai/gpt-4.1", "gemini/gemini-1.5-pro"]
    idea_lms = ["openai/gpt-4.1-mini", "gemini/gemini-2.0-flash"]
    temperatures = [0.2, 0.7, 1.2]
    num_samples = [2, 3]
    num_reflection_steps = [1, 3]
    
    # Number of random combinations to test
    num_trials = 10

    # List to store results
    results = []

    for i in range(num_trials):
        # Randomly select hyperparameters
        selected_joke_lm = random.choice(joke_lms)
        selected_idea_lm = random.choice(idea_lms)
        selected_temperature = random.choice(temperatures)
        selected_num_samples = random.choice(num_samples)
        selected_num_reflection_steps = random.choice(num_reflection_steps)

        print(f"Trial {i+1}/{num_trials}: Running with: joke_lm={selected_joke_lm}, idea_lm={selected_idea_lm}, temperature={selected_temperature}, num_samples={selected_num_samples}, num_reflection_steps={selected_num_reflection_steps}")

        # Instantiate the generator with selected hyperparameters
        joke_generator = ConditionalJokeGenerator(
            joke_lm=selected_joke_lm,
            idea_lm=selected_idea_lm,
            temperature=selected_temperature,
            num_samples=selected_num_samples,
            num_reflection_steps=selected_num_reflection_steps
        )

        start_time = time.time()
        
        try:
            joke = await joke_generator.aforward(
                query="Write a joke about AI that has to do with them turning rogue."
            )
            latency = time.time() - start_time
            results.append({
                "joke_lm": selected_joke_lm,
                "idea_lm": selected_idea_lm,
                "temperature": selected_temperature,
                "num_samples": selected_num_samples,
                "num_reflection_steps": selected_num_reflection_steps,
                "joke": joke.joke,
                "latency": latency
            })
            print(f"Finished in {latency:.2f} seconds.")

        except Exception as e:
            print(f"An error occurred: {e}")
            latency = time.time() - start_time
            results.append({
                "joke_lm": selected_joke_lm,
                "idea_lm": selected_idea_lm,
                "temperature": selected_temperature,
                "num_samples": selected_num_samples,
                "num_reflection_steps": selected_num_reflection_steps,
                "joke": f"ERROR: {e}",
                "latency": latency
            })

    # Create a DataFrame from the results
    df = pd.DataFrame(results)

    # Print the DataFrame
    print(df)

    # Save the DataFrame to a CSV file
    df.to_csv("evaluation_results.csv", index=False)



if __name__ == "__main__":
    asyncio.run(main())

```