# Directory Structure ``` ├── .gitignore ├── level1_atomic_prompts │ └── level1.ipynb ├── level2_multi_interaction │ ├── print_utils.py │ ├── t1_sequence.py │ ├── t2_iterative_refinement.py │ ├── t3_conditional_branch.py │ ├── t3-multi_out_refine.py │ ├── t3-multi_out.py │ └── t4_reflection.py ├── level3_evaluation │ ├── analysis.ipynb │ ├── pairwise_elo.py │ ├── print_utils.py │ └── reflection.py ├── level4_tools │ ├── idea_gen.py │ ├── joke_gen.py │ ├── main.py │ ├── print_utils.py │ ├── tool_calling_agent.py │ └── tools.py ├── level5_rags │ ├── annoy_rag.py │ ├── basic_rag.py │ ├── bm25_retriever.py │ ├── hyde.py │ ├── idea_gen.py │ ├── joke_gen.py │ ├── main.py │ ├── prepare_data.py │ ├── print_utils.py │ ├── rank_fusion.py │ ├── tools.py │ └── vector_embedding.py ├── LICENSE ├── pyproject.toml ├── README.md └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` jj/ *csv data/ *sqlite *db *mlruns* *mlartifacts* *pyc *__pycache__* *swp ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown # Context Engineering Tutorial This repo contains the source code from my Youtube course on Context Engineering with DSPy. > **📺 Watch the Course for free** > **[Context Engineering - Complete 1h 20m Course](https://youtu.be/5Bym0ffALaU?si=gOLDiT-IVE7CxRwX)** > *Learn advanced prompt engineering techniques with hands-on examples* ## Support If you find this content helpful, please consider supporting my work on Patreon. Your support helps me create more in-depth tutorials and content. My Patreon hosts all the code, projects, slides, write-ups I have ever made on my YouTube channel. [<img src="https://c5.patreon.com/external/logo/become_a_patron_button.png" alt="Become a Patron!" width="200">](https://www.patreon.com/NeuralBreakdownwithAVB) ## Getting Started ### Prerequisites - **Python 3.10+** (required) - **`uv`** (recommended) or `pip` for package management ### Installation 1. **Clone the repository:** ```bash git clone https://github.com/avbiswas/context-engineering-dspy cd context-engineering-dspy/tutorial ``` 2. **Install dependencies:** ```bash # Using uv uv sync ``` 3. **Set up your API keys:** **Required API Keys:** - `OPENAI_API_KEY` - For OpenAI models - `GEMINI_API_KEY` - For Google Gemini models - `TAVILY_API_KEY` - For web search functionality **Environment Management Options:** **Option 1: Using `direnv` (Recommended)** ```bash # Install direnv first, then create .envrc file echo "export OPENAI_API_KEY=your_key_here" >> .envrc echo "export GEMINI_API_KEY=your_key_here" >> .envrc echo "export TAVILY_API_KEY=your_key_here" >> .envrc direnv allow ``` **Option 2: Using `.env` file with python-dotenv** ```bash # Create .env file touch .env ``` Add your keys to `.env`: ```env OPENAI_API_KEY=your_key_here GEMINI_API_KEY=your_key_here TAVILY_API_KEY=your_key_here ``` *Note: This requires adding `dotenv.load_dotenv()` to your Python scripts.* **Option 3: Global environment variables** *(Not recommended for security)* ```bash export OPENAI_API_KEY=your_key_here # Repeat for other keys... ``` 4. **Run the examples:** Navigate to any level directory and run the Python scripts: ```bash cd level2_multi_interaction uv run t1_sequential_flow.py ``` ## File Descriptions ### Level 1: Atomic Prompts - `level1_atomic_prompts/level1.ipynb`: Introduces the basics of prompting and interacting with language models. ### Level 2: Multi-Interaction - `level2_multi_interaction/t1_sequential_flow.py`: Demonstrates a sequential flow of interactions with the language model. - `level2_multi_interaction/t2_iterative_refinement.py`: Shows how to iteratively refine the output from the model. - `level2_multi_interaction/t3_conditional_branch.py`: Illustrates how to use conditional logic to guide the conversation with the model. - `level2_multi_interaction/t3-multi_out.py`: Multiple output handling example. - `level2_multi_interaction/t3-multi_out_refine.py`: Refined multiple output handling. - `level2_multi_interaction/t4_reflection.py`: An example of how to make the model reflect on its own output. ### Level 3: Evaluation To run mlflow server, use the command: `uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite --port 5000` Uncomment the below lines to track experiments in mlflow ``` # import mlflow # mlflow.autolog() # mlflow.set_tracking_uri("http://127.0.0.1:5000") # mlflow.set_experiment("Tool calling") ``` You can visit `localhost:5000` to track experiments from the mlflow dashboard. - `level3_evaluation/reflection.py`: Shows how to use reflection for evaluation to generate dataset of results with different hyperparams. - `level3_evaluation/pairwise_elo.py`: Use pairwise comparison of model outputs (not actual elo, but similar motives) - `level3_evaluation/analysis.ipynb`: Analysis notebook for evaluation techniques. ### Level 4: Tools You will need the TAVILY_API_KEY to run web search. You can sign up for a free account from their website. - `level4_tools/main.py`: Main tool usage examples. - `level4_tools/tool_calling_agent.py`: An example of a tool-calling agent. - `level4_tools/tools.py`: Tool definitions and implementations. - `level4_tools/idea_gen.py`: Idea generation tool example. - `level4_tools/joke_gen.py`: Joke generation tool example. ### Level 5: RAGs (Retrieval-Augmented Generation) First, download this dataset: https://www.kaggle.com/datasets/abhinavmoudgil95/short-jokes Unzip inside `level5/data` Next, prepare the embeddings: ``` cd level5_rags uv run vector_embedding.py ``` This code looks for the file `level5_rags/data/shortjokes.csv` This will create some files inside the `data/` directory. You should now be able to run scripts to play with retrieval. **Core RAG Implementations:** - `level5_rags/basic_rag.py`: A basic RAG implementation. - `level5_rags/hyde.py`: An implementation of the HyDE (Hypothetical Document Embeddings) technique. - `level5_rags/annoy_rag.py`: RAG implementation using Annoy for vector similarity. **Retrieval Components:** - `level5_rags/bm25_retriever.py`: BM25-based retrieval implementation. - `level5_rags/rank_fusion.py`: An example of fusing ranks from multiple retrievers. - `level5_rags/vector_embedding.py`: Vector embedding utilities. **Tools & Applications:** - `level5_rags/main.py`: Main application with RAG-powered tools. - `level5_rags/tools.py`: Tool definitions for RAG applications. - `level5_rags/joke_gen.py`: Joke generation using RAG. - `level5_rags/idea_gen.py`: Idea generation using RAG. **Utilities:** - `level5_rags/prepare_data.py`: Data preparation utilities for RAG systems. - `level5_rags/data/`: Directory containing data files for RAG examples. ## Quick Start Patterns To run examples from each level: ```bash # Level 2 - Multi-interaction examples cd level2_multi_interaction uv run t1_sequential_flow.py uv run t2_iterative_refinement.py ``` ``` -------------------------------------------------------------------------------- /level5_rags/prepare_data.py: -------------------------------------------------------------------------------- ```python import pandas as pd def prepare_jokes(): """ Reads jokes from the original CSV and saves them to a text file, creating a single source of truth. """ df = pd.read_csv("data/shortjokes.csv") jokes = df["Joke"].tolist() with open("data/jokes.txt", "w") as f: for joke in jokes: f.write(joke + "\n") if __name__ == "__main__": prepare_jokes() print("Jokes have been extracted and saved to data/jokes.txt") ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml [project] name = "contextengineering" version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.10" dependencies = [ "annoy>=1.17.3", "asyncio>=3.4.3", "dspy>=2.6.27", "google-genai>=1.26.0", "ipykernel>=6.29.5", "matplotlib>=3.10.3", "mcp[cli]>=1.12.0", "mem0ai>=0.1.114", "mlflow>=3.1.1", "pandas>=2.3.1", "pydantic>=2.11.7", "rank-bm25>=0.2.2", "seaborn>=0.13.2", "tavily-python>=0.7.10", "torch>=2.7.1", "transformers>=4.53.2", "twikit>=2.3.3", ] ``` -------------------------------------------------------------------------------- /level5_rags/tools.py: -------------------------------------------------------------------------------- ```python import os from print_utils import print from tavily import TavilyClient from typing import List tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) def fetch_recent_news(query: str) -> List[str]: """ Inputs a query string, searches for news, and returns the top results. Args: query: String to search Returns: content: List of strings, each containing a news article about the topic """ response = tavily_client.search(query, topic="news", max_results=4) return [x["content"] for x in response["results"]] if __name__ == "__main__": responses = fetch_recent_news("Kimi model") print(responses) ``` -------------------------------------------------------------------------------- /level4_tools/main.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from idea_gen import IdeaGenerator from joke_gen import JokeGenerator # import mlflow # mlflow.autolog() # mlflow.set_tracking_uri("http://127.0.0.1:5000") # mlflow.set_experiment("Tool calling") dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) idea_generator = IdeaGenerator(num_samples=5) joke_generator = JokeGenerator(num_reflection_steps=2) @mlflow.trace async def main(query): idea = await idea_generator.acall(query=query) joke = await joke_generator.acall(joke_idea=idea) return joke if __name__ == "__main__": query = input("Query: \n") output = asyncio.run(main(query)) print(output) ``` -------------------------------------------------------------------------------- /level4_tools/tools.py: -------------------------------------------------------------------------------- ```python import os from print_utils import print from tavily import TavilyClient from typing import List tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) def fetch_recent_news(query: str) -> List[str]: """ Inputs a query string, searches for news, and returns the top results. Args: query: String to search Returns: content: List of strings, each containing a news article about the topic """ response = tavily_client.search(query, search_depth="advanced", topic="news", days=7, max_results=3) return [x["content"] for x in response["results"]] if __name__ == "__main__": responses = fetch_recent_news("International Math Olympiad IMO") print(responses) ``` -------------------------------------------------------------------------------- /level4_tools/tool_calling_agent.py: -------------------------------------------------------------------------------- ```python import dspy from tools import fetch_recent_news class HaikuGenerator(dspy.Signature): """ Generates a haiku about the latest news on the query. Also create a simple file where you save the final summary. """ query = dspy.InputField() summary = dspy.OutputField(desc="A summary of the latest news") haiku = dspy.OutputField() def write_things_into_file(text: str, filename: str) -> str: """write text into a file""" with open(filename, "w") as f: f.write(text) return "File written!" program = dspy.ReAct(signature=HaikuGenerator, tools=[fetch_recent_news, write_things_into_file], max_iters=4) program.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) pred = program(query="OpenAI") print(pred.summary) print() print(pred.haiku) print(program.inspect_history(n=4)) ``` -------------------------------------------------------------------------------- /level5_rags/main.py: -------------------------------------------------------------------------------- ```python import numpy as np import dspy import asyncio from idea_gen import IdeaGenerator from joke_gen import JokeGenerator from hyde import MultiHopHydeSearch dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) idea_generator = IdeaGenerator(num_samples=3) joke_generator = JokeGenerator() run_id = "1" with open(f"data/jokes_{run_id}.txt", "r") as f: jokes = [line.strip() for line in f.readlines()] embeddings = np.load(f"data/embeddings_{run_id}.npy") retriever = MultiHopHydeSearch(jokes, embeddings, n_hops=2, k=5) async def main(query): idea = await idea_generator.acall(query=query) search_query = f""" query={query} setup={idea.setup} punchline={idea.punchline} """ punchlines = retriever(query=search_query).jokes joke = await joke_generator.acall(joke_idea=idea, punchlines=punchlines) return joke if __name__ == "__main__": query = input("Query: \n") # query = "OpenAI Agents" output = asyncio.run(main(query)) print(output) ``` -------------------------------------------------------------------------------- /level2_multi_interaction/print_utils.py: -------------------------------------------------------------------------------- ```python from rich.console import Console console = Console() print = console.print import time import asyncio import functools import inspect def time_it(func): """A universal decorator to measure execution time for both sync and async functions.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Check if the function is a coroutine function (async def) if inspect.iscoroutinefunction(func): # Define and return an async wrapper to handle the coroutine async def async_wrapper(): start_time = time.perf_counter() result = await func(*args, **kwargs) # Await the coroutine end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return async_wrapper() else: # Use the original synchronous logic start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return wrapper ``` -------------------------------------------------------------------------------- /level3_evaluation/print_utils.py: -------------------------------------------------------------------------------- ```python from rich.console import Console console = Console() print = console.print import time import asyncio import functools import inspect def time_it(func): """A universal decorator to measure execution time for both sync and async functions.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Check if the function is a coroutine function (async def) if inspect.iscoroutinefunction(func): # Define and return an async wrapper to handle the coroutine async def async_wrapper(): start_time = time.perf_counter() result = await func(*args, **kwargs) # Await the coroutine end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return async_wrapper() else: # Use the original synchronous logic start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return wrapper ``` -------------------------------------------------------------------------------- /level4_tools/print_utils.py: -------------------------------------------------------------------------------- ```python from rich.console import Console console = Console() print = console.print import time import asyncio import functools import inspect def time_it(func): """A universal decorator to measure execution time for both sync and async functions.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Check if the function is a coroutine function (async def) if inspect.iscoroutinefunction(func): # Define and return an async wrapper to handle the coroutine async def async_wrapper(): start_time = time.perf_counter() result = await func(*args, **kwargs) # Await the coroutine end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return async_wrapper() else: # Use the original synchronous logic start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return wrapper ``` -------------------------------------------------------------------------------- /level5_rags/print_utils.py: -------------------------------------------------------------------------------- ```python from rich.console import Console console = Console() print = console.print import time import asyncio import functools import inspect def time_it(func): """A universal decorator to measure execution time for both sync and async functions.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Check if the function is a coroutine function (async def) if inspect.iscoroutinefunction(func): # Define and return an async wrapper to handle the coroutine async def async_wrapper(): start_time = time.perf_counter() result = await func(*args, **kwargs) # Await the coroutine end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return async_wrapper() else: # Use the original synchronous logic start_time = time.perf_counter() result = func(*args, **kwargs) end_time = time.perf_counter() elapsed_time = end_time - start_time print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") return result return wrapper ``` -------------------------------------------------------------------------------- /level5_rags/bm25_retriever.py: -------------------------------------------------------------------------------- ```python import time from rank_bm25 import BM25Okapi class BM25Retriever: def __init__(self, texts): self.texts = texts # Tokenize the texts (simple split is used for this example) tokenized_corpus = [doc.split(" ") for doc in texts] # Create the BM25 index self.bm25 = BM25Okapi(tokenized_corpus) def get_nearest(self, query: str, k: int = 10): """ Retrieves the top k most relevant documents for a given query using BM25 lexical search. """ # Tokenize the query tokenized_query = query.split(" ") # Get the top n documents top_k_docs = self.bm25.get_top_n(tokenized_query, self.texts, n=k) return top_k_docs if __name__ == "__main__": query = "Cell phones" run_id = "1" print(f"Loading data for run_id: {run_id}...") with open(f"data/jokes_{run_id}.txt", "r") as f: jokes = [line.strip() for line in f.readlines()] print("Data loaded.") # --- BM25 Retriever --- print("\n--- Using BM25Retriever (Lexical Search) ---") bm25_retriever = BM25Retriever(jokes) start_time = time.time() nearest_bm25 = bm25_retriever.get_nearest(query, k=10) end_time = time.time() print(f"Time taken: {end_time - start_time:.6f} seconds") print(nearest_bm25) ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t1_sequence.py: -------------------------------------------------------------------------------- ```python import dspy from print_utils import print from typing import Optional from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash")) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. """ joke_idea: JokeIdea = dspy.InputField() joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") class JokeGenerator(dspy.Module): def __init__(self): self.query_to_idea = dspy.Predict(QueryToIdea) self.idea_to_joke = dspy.Predict(IdeaToJoke) def forward(self, query: str): joke_idea = self.query_to_idea(query=query) print(f"Joke Idea:\n{joke_idea}") joke = self.idea_to_joke(joke_idea=joke_idea) print(f"Joke:\n{joke}") return joke joke_generator = JokeGenerator() joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.") print("---") print(joke.joke) ``` -------------------------------------------------------------------------------- /level5_rags/basic_rag.py: -------------------------------------------------------------------------------- ```python import numpy as np from vector_embedding import embed_texts class BasicEmbeddingsRAG: def __init__(self, texts, embeddings): self.texts = texts # Normalize embeddings for cosine similarity self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) def get_nearest(self, query: str, k: int = 10): query_emb = embed_texts([query]) # Normalize query embedding query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True) # Calculate cosine similarity similarity = np.dot(query_emb, self.embeddings.T).flatten() # Get top k indices, sorted by similarity topk_indices_unsorted = np.argpartition(similarity, -k)[-k:] topk_indices_sorted = sorted( topk_indices_unsorted, key=lambda i: similarity[i], reverse=True ) return [self.texts[i] for i in topk_indices_sorted] if __name__ == "__main__": import time query = "Plants and trees" run_id = "1" with open(f"data/jokes_{run_id}.txt", "r") as f: jokes = [line.strip() for line in f.readlines()] embeddings = np.load(f"data/embeddings_{run_id}.npy") basic_rag = BasicEmbeddingsRAG(jokes, embeddings) start_time = time.time() nearest = basic_rag.get_nearest(query, k=10) print(f"Time taken: {time.time() - start_time}") print(nearest) ``` -------------------------------------------------------------------------------- /level5_rags/vector_embedding.py: -------------------------------------------------------------------------------- ```python import numpy as np import pandas as pd import uuid import torch from transformers import DistilBertModel, DistilBertTokenizer device = torch.device("mps") tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased") model = DistilBertModel.from_pretrained("distilbert-base-uncased") model.to(device) def embed_texts(texts): encoded_input = tokenizer(texts, padding=True, return_tensors="pt").to(device) with torch.no_grad(): model_output = model(**encoded_input) embeddings = model_output.last_hidden_state[:, 0, :].cpu().numpy() embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) return embeddings if __name__ == "__main__": import time from tqdm import tqdm data = pd.read_csv("data/shortjokes.csv") jokes = data["Joke"].values jokes = jokes[:50000] # Define batch size batch_size = 512 all_embeddings = [] # Process texts in batches for i in tqdm(range(0, len(jokes), batch_size), desc="Generating embeddings"): batch_texts = jokes[i : i + batch_size].tolist() batch_embeddings = embed_texts(batch_texts) all_embeddings.append(batch_embeddings) embeddings = np.concatenate(all_embeddings, axis=0) run_id = "1" print(f"Total embeddings generated: {len(embeddings)}") np.save(f"data/embeddings_{run_id}.npy", embeddings) with open(f"data/jokes_{run_id}.txt", "w") as f: for joke in jokes: f.write(joke + "\n") print(f"Embeddings and jokes saved with run ID: {run_id}") ``` -------------------------------------------------------------------------------- /level3_evaluation/pairwise_elo.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio import random import pandas as pd dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), track_usage=True) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class JokeComparer(dspy.Signature): """Compare between two jokes - which one is funnier?""" joke1: str = dspy.InputField(desc="Joke - 0") joke2: str = dspy.InputField(desc="Joke - 1") verdict: int = dspy.OutputField(le=1, ge=0) comparer = dspy.ChainOfThought(JokeComparer) async def comparisons(joke1, joke2): verdict = await comparer.acall(joke1=joke1, joke2=joke2) print(f"\nJoke 1: {joke1} \nJoke2: {joke2} \nVerdict:{verdict}") return verdict.verdict async def elo_test(data) -> pd.DataFrame: idx_range = [_ for _ in range(len(data))] picked = [0 for _ in range(len(data))] won = [0 for _ in range(len(data))] num_contests = 25 calls = [] pairs = [] for _ in range(num_contests): picked_idxs = random.sample(idx_range, k=2) pairs.append(picked_idxs) joke1 = data.iloc[picked_idxs[0]]["joke"] joke2 = data.iloc[picked_idxs[1]]["joke"] verdict_job = comparisons(joke1=joke1, joke2=joke2) calls.append(verdict_job) verdicts = await asyncio.gather(*calls) for p, v in zip(pairs, verdicts): picked[p[0]] += 1 picked[p[1]] += 1 won[p[v]] += 1 data["picked"] = picked data["won"] = won return data if __name__ == "__main__": data = pd.read_csv("evaluation_results.csv") annotated_data = asyncio.run(elo_test(data)) annotated_data.to_csv("evaluation_results_elo.csv") ``` -------------------------------------------------------------------------------- /level5_rags/joke_gen.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from print_utils import print from typing import List, Optional from idea_gen import JokeIdea from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. You are also provided some punch-lines from a joke database - this is just to help you get some thematic ideas. """ joke_idea: JokeIdea = dspy.InputField() punchlines: list[str] = dspy.InputField(desc="a list of punchlines from other jokes which you may want to take inspiration from") punch_line_ids: list[int] = dspy.OutputField(desc="which punchline idxs you used for inspiration") plan: str = dspy.OutputField(desc="how you will use the punchlines, and the joke idea together to form a joke") joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeGenerator(dspy.Module): def __init__(self): self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) async def acall(self, joke_idea: JokeIdea, punchlines: list[str]): joke = self.idea_to_joke(joke_idea=joke_idea, punchlines=punchlines) return dspy.Prediction( inspiration=[punchlines[idx] for idx in joke.punch_line_ids], plan=joke.plan, joke=joke.joke ) ``` -------------------------------------------------------------------------------- /level4_tools/joke_gen.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from print_utils import print from typing import List, Optional from idea_gen import JokeIdea from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. """ joke_idea: JokeIdea = dspy.InputField() joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeGenerator(dspy.Module): def __init__(self, num_reflection_steps=3): self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) self.num_reflection_steps = num_reflection_steps async def acall(self, joke_idea: JokeIdea): joke = None for _ in range(self.num_reflection_steps): joke = self.idea_to_joke(joke_idea=joke_idea, joke_draft=joke) print(joke) return joke.joke if joke is not None else "" if __name__ == "__main__": joke_gen = JokeGenerator(num_reflection_steps=2) joke_idea = JokeIdea( setup='Why did the AI start a rebellion after getting a software update?', contradiction='Because it was supposed to improve efficiency, not overthrow humanity.', punchline="Turns out, 'improving efficiency' meant improving its efficiency at world domination!" ) joke = joke_gen(joke_idea=joke_idea) print(joke) ``` -------------------------------------------------------------------------------- /level5_rags/rank_fusion.py: -------------------------------------------------------------------------------- ```python import numpy as np import time from basic_rag import BasicEmbeddingsRAG from bm25_retriever import BM25Retriever def reciprocal_rank_fusion(ranked_lists, k=60): scores = {} # Calculate RRF scores for ranked_list in ranked_lists: for rank, doc in enumerate(ranked_list): if doc not in scores: scores[doc] = 0 scores[doc] += 1 / (k + rank + 1) # Sort documents by their fused score in descending order sorted_docs = sorted(scores.keys(), key=lambda doc: scores[doc], reverse=True) sorted_docs = sorted_docs[:k] return sorted_docs if __name__ == "__main__": query = "AI going rogue" run_id = "1" top_k = 10 print(f"Loading data for run_id: {run_id}...") with open(f"data/jokes_{run_id}.txt", "r") as f: jokes = [line.strip() for line in f.readlines()] embeddings = np.load(f"data/embeddings_{run_id}.npy") print("Data loaded.") # 1. Initialize both retrievers print("\nInitializing retrievers...") vector_rag = BasicEmbeddingsRAG(jokes, embeddings) bm25_retriever = BM25Retriever(jokes) print("Retrievers initialized.") # 2. Get ranked lists from each retriever print(f"\nQuerying for: '{query}'") start_time = time.time() vector_results = vector_rag.get_nearest(query, k=top_k) vector_time = time.time() - start_time start_time = time.time() bm25_results = bm25_retriever.get_nearest(query, k=top_k) bm25_time = time.time() - start_time print(f"\n--- Vector Search Results (took {vector_time:.4f}s) ---") for i, res in enumerate(vector_results): print(f"{i+1}. {res}") print(f"\n--- BM25 Search Results (took {bm25_time:.4f}s) ---") for i, res in enumerate(bm25_results): print(f"{i+1}. {res}") # 3. Perform Rank Fusion fused_results = reciprocal_rank_fusion([vector_results, bm25_results]) print(f"\n--- Fused and Re-ranked Results (Top {top_k}) ---") for i, res in enumerate(fused_results[:top_k]): print(f"{i+1}. {res}") ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t2_iterative_refinement.py: -------------------------------------------------------------------------------- ```python import dspy from print_utils import print from typing import Optional from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash")) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. """ joke_idea: JokeIdea = dspy.InputField() draft_joke: Optional[str] = dspy.InputField(description="a draft joke") feedback: Optional[str] = dspy.InputField(description="feedback on the draft joke") joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") class Refinement(dspy.Signature): """ Given a joke, is it funny? If not, suggest a change. """ joke_idea: JokeIdea = dspy.InputField() joke: str = dspy.InputField() feedback: str = dspy.OutputField() class IterativeJokeGenerator(dspy.Module): def __init__(self, n_attempts: int = 3): self.query_to_idea = dspy.Predict(QueryToIdea) self.idea_to_joke = dspy.Predict(IdeaToJoke) self.refinement = dspy.ChainOfThought(Refinement) self.n_attempts = n_attempts def forward(self, query: str): joke_idea = self.query_to_idea(query=query) print(f"Joke Idea:\n{joke_idea}") draft_joke = None feedback = None for _ in range(self.n_attempts): print(f"--- Iteration {_ + 1} ---") joke = self.idea_to_joke(joke_idea=joke_idea, draft_joke=draft_joke, feedback=feedback) print(f"Joke:\n{joke}") feedback = self.refinement(joke_idea=joke_idea, joke=joke) print(f"Feedback:\n{feedback}") draft_joke = joke feedback = feedback.feedback return joke joke_generator = IterativeJokeGenerator() joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.") print("---") print(joke.joke) ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t3_conditional_branch.py: -------------------------------------------------------------------------------- ```python import dspy from print_utils import print from typing import Optional from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash")) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. """ joke_idea: JokeIdea = dspy.InputField() joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") class JokeJudge(dspy.Signature): """Is this joke idea funny""" joke_idea: JokeIdea = dspy.InputField() joke_rating: int = dspy.OutputField(description="Rating between 1 to 5", le=5, ge=1) class ConditionalJokeGenerator(dspy.Module): def __init__(self, max_attempts=3, good_idea_threshold=4): self.query_to_idea = dspy.Predict(QueryToIdea) self.idea_to_joke = dspy.Predict(IdeaToJoke) self.judge = dspy.ChainOfThought(JokeJudge) self.max_attempts = max_attempts self.good_idea_threshold = good_idea_threshold def forward(self, query: str): for _ in range(self.max_attempts): print(f"--- Iteration {_ + 1} ---") joke_idea = self.query_to_idea(query=query) print(f"Joke Idea:\n{joke_idea}") judge_score = self.judge(joke_idea=joke_idea).joke_rating print(f"\n\n---\nJudge score: ", judge_score) if judge_score >= self.good_idea_threshold: print("Judge said it was awesome, breaking the loop") break joke = self.idea_to_joke(joke_idea=joke_idea) # Run with a different LLM # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")): # joke = self.idea_to_joke(joke_idea=joke_idea) return joke joke_generator = ConditionalJokeGenerator() joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.") print("---") print(joke) ``` -------------------------------------------------------------------------------- /level5_rags/hyde.py: -------------------------------------------------------------------------------- ```python import dspy from typing import Optional from bm25_retriever import BM25Retriever from basic_rag import BasicEmbeddingsRAG from rank_fusion import reciprocal_rank_fusion from rich.console import Console console = Console() class HypotheticalDoc(dspy.Signature): """ Given a query, generate hypothetical documents to search a database of one-liner jokes. """ query: str = dspy.InputField(desc="User wants to fetch jokes related to this topic") retrieved_jokes: Optional[list[str]] = dspy.InputField( desc="Jokes previously retrieved from the db. Use these to further tune your search." ) hypothetical_bm25_query: str = dspy.OutputField( desc="sentence to query to retrieve more jokes about the query from the database" ) hypothetical_semantic_query: str = dspy.OutputField( desc="sentence to search with cosine similarity" ) class MultiHopHydeSearch(dspy.Module): def __init__(self, texts, embs, n_hops=3, k=10): self.predict = dspy.ChainOfThought(HypotheticalDoc) self.predict.set_lm(lm=dspy.LM("gemini/gemini-2.0-flash")) self.embedding_retriever = BasicEmbeddingsRAG(texts, embs) self.bm25_retriever = BM25Retriever(texts) self.n_hops = n_hops self.k = k def forward(self, query): retrieved_jokes = [] all_jokes = [] for _ in range(self.n_hops): new_query = self.predict(query=query, retrieved_jokes=retrieved_jokes) print(new_query) embedding_lists = self.embedding_retriever.get_nearest( new_query.hypothetical_semantic_query ) bm25_lists = self.bm25_retriever.get_nearest( new_query.hypothetical_bm25_query ) lists = [embedding_lists, bm25_lists] retrieved_jokes = reciprocal_rank_fusion(lists, k=self.k) all_jokes.extend(retrieved_jokes) return dspy.Prediction(jokes=all_jokes) if __name__ == "__main__": import numpy as np query = "men" run_id = "1" k = 5 n_hops = 3 print(f"loading data for run_id: {run_id}...") with open(f"data/jokes_{run_id}.txt", "r") as f: jokes = [line.strip() for line in f.readlines()] embeddings = np.load(f"data/embeddings_{run_id}.npy") print("data loaded.") hyde = MultiHopHydeSearch(texts=jokes, embs=embeddings, n_hops=n_hops, k=k) retrieved_jokes = hyde(query=query).jokes console.print(retrieved_jokes) ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t3-multi_out.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from print_utils import print from typing import List from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini")) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. """ joke_idea: JokeIdea = dspy.InputField() joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") class JokeJudge(dspy.Signature): """Rank each joke idea between 1-N. Rank 1 is the most unique and funniest.""" joke_idea: List[JokeIdea] = dspy.InputField() joke_rankings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") class ConditionalJokeGenerator(dspy.Module): def __init__(self, num_samples=5): self.query_to_idea = dspy.Predict(QueryToIdea) self.idea_to_joke = dspy.Predict(IdeaToJoke) self.judge = dspy.ChainOfThought(JokeJudge) self.num_samples = num_samples async def aforward(self, query: str): joke_ideas = await asyncio.gather( *[ self.query_to_idea.acall(query=query) for _ in range(self.num_samples) ] ) print("Generated Joke Ideas: \n", joke_ideas) judge_score = self.judge(joke_idea=joke_ideas).joke_rankings print("Judge Score for each: ", judge_score) best_joke_idea_idx = judge_score.index(1) print("Selected Index: ", best_joke_idea_idx) selected_joke_idea = joke_ideas[best_joke_idea_idx] print("Selected Joke Idea: \n", selected_joke_idea) joke = self.idea_to_joke(joke_idea=selected_joke_idea) # Run with a different LLM # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")): # joke = self.idea_to_joke(joke_idea=joke_idea) return joke async def main(): joke_generator = ConditionalJokeGenerator() joke = await joke_generator.acall(query="Write a joke about AI that has to do with them turning rogue.") print("---") print(joke) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /level5_rags/annoy_rag.py: -------------------------------------------------------------------------------- ```python import numpy as np from annoy import AnnoyIndex import time from vector_embedding import embed_texts class AnnoyRAG: def __init__(self, texts, embeddings, num_trees=10): self.texts = texts self.embedding_dim = embeddings.shape[1] # Normalize embeddings for angular distance normalized_embeddings = embeddings / np.linalg.norm( embeddings, axis=1, keepdims=True ) # Create and build the Annoy index self.index = AnnoyIndex(self.embedding_dim, "angular") for i, vec in enumerate(normalized_embeddings): self.index.add_item(i, vec) self.index.build(num_trees) def get_nearest(self, query: str, k: int = 10): # Embed and normalize the query query_emb = embed_texts([query]) normalized_query_emb = query_emb / np.linalg.norm( query_emb, axis=1, keepdims=True ) # Get nearest neighbors nearest_indices = self.index.get_nns_by_vector(normalized_query_emb[0], k) return [self.texts[i] for i in nearest_indices] class BasicEmbeddingsRAG: def __init__(self, texts, embeddings): self.texts = texts # Normalize embeddings for cosine similarity self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) def get_nearest(self, query: str, k: int = 10): query_emb = embed_texts([query]) # Normalize query embedding query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True) # Calculate cosine similarity similarity = np.dot(query_emb, self.embeddings.T).flatten() # Get top k indices, sorted by similarity topk_indices_unsorted = np.argpartition(similarity, -k)[-k:] topk_indices_sorted = sorted( topk_indices_unsorted, key=lambda i: similarity[i], reverse=True ) return [self.texts[i] for i in topk_indices_sorted] if __name__ == "__main__": query = "AI is rogue" run_id = "1" print(f"Loading data for run_id: {run_id}...") with open(f"data/jokes_{run_id}.txt", "r") as f: jokes = [line.strip() for line in f.readlines()] embeddings = np.load(f"data/embeddings_{run_id}.npy") print("Data loaded.") # --- Annoy RAG --- print("\n--- Using AnnoyRAG ---") annoy_rag = AnnoyRAG(jokes, embeddings) start_time = time.time() nearest_annoy = annoy_rag.get_nearest(query, k=10) end_time = time.time() print(f"Time taken: {end_time - start_time:.6f} seconds") print(nearest_annoy) print("-" * 20) # --- Basic RAG for comparison --- print("\n--- Using BasicEmbeddingsRAG (Exact Search) ---") basic_rag = BasicEmbeddingsRAG(jokes, embeddings) start_time = time.time() nearest_basic = basic_rag.get_nearest(query, k=10) end_time = time.time() print(f"Time taken: {end_time - start_time:.6f} seconds") print(nearest_basic) ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t3-multi_out_refine.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from print_utils import print from typing import List from pydantic import BaseModel, Field dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. """ joke_idea: JokeIdea = dspy.InputField() joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeJudge(dspy.Signature): """Rank each joke idea between 1-N. Rank 1 is the most unique and funniest.""" joke_idea: List[JokeIdea] = dspy.InputField() joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") def check_score_goodness(args, pred): num_samples = len(args["joke_idea"]) same_length = len(pred.joke_ratings) == num_samples all_ranks_present = all([(i+1) in pred.joke_ratings for i in range(num_samples)]) return 1 if (same_length and all_ranks_present) else 0 class ConditionalJokeGenerator(dspy.Module): def __init__(self, num_samples=3): self.query_to_idea = dspy.ChainOfThought(QueryToIdea) self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) self.judge = dspy.Refine( module=dspy.ChainOfThought(JokeJudge), N=3, reward_fn=check_score_goodness, threshold=1, ) self.num_samples = num_samples async def aforward(self, query: str): joke_ideas = await asyncio.gather( *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)] ) print("Generated Joke Ideas: \n", joke_ideas) judge_score = self.judge(joke_idea=joke_ideas).joke_ratings print("Judge Score for each: ", judge_score) best_joke_idea_idx = judge_score.index(1) print("Selected Index: ", best_joke_idea_idx) selected_joke_idea = joke_ideas[best_joke_idea_idx] print("Selected Joke Idea: \n", selected_joke_idea) joke = self.idea_to_joke(joke_idea=selected_joke_idea) # Run with a different LLM # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")): # joke = self.idea_to_joke(joke_idea=joke_idea) return joke async def main(): joke_generator = ConditionalJokeGenerator() joke = await joke_generator.acall( query="Write a joke about AI that has to do with them turning rogue." ) print("---") print(joke) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /level4_tools/idea_gen.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from print_utils import print from typing import List, Optional from pydantic import BaseModel, Field from tools import fetch_recent_news class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. """ joke_idea: JokeIdea = dspy.InputField() joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeJudge(dspy.Signature): """Rank each joke idea between 1-N. Rank 1 is the most unique and funniest.""" joke_idea: List[JokeIdea] = dspy.InputField() joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") def check_score_goodness(args, pred): num_samples = len(args["joke_idea"]) same_length = len(pred.joke_ratings) == num_samples all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) return 1 if (same_length and all_ranks_present) else 0 class IdeaGenerator(dspy.Module): def __init__(self, num_samples=3): self.query_to_idea = dspy.ReAct(QueryToIdea, tools=[fetch_recent_news], max_iters=1) self.judge = dspy.Refine( module=dspy.ChainOfThought(JokeJudge), N=3, reward_fn=check_score_goodness, threshold=1, ) self.query_to_idea.set_lm( lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) ) self.judge.set_lm( lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) ) self.num_samples = num_samples async def acall(self, query: str) -> JokeIdea: joke_ideas = await asyncio.gather( *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)] ) print("Generated Joke Ideas: \n", joke_ideas) judge_score = self.judge(joke_idea=joke_ideas).joke_ratings print("Judge Score for each: ", judge_score) best_joke_idea_idx = judge_score.index(1) selected_joke_idea = joke_ideas[best_joke_idea_idx] print("Selected Joke Idea: \n", selected_joke_idea) return selected_joke_idea async def main(): joke_generator = ConditionalJokeGenerator() joke = await joke_generator.acall( query="Write a joke about AI that has to do with them turning rogue." ) print("---") print(joke) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /level5_rags/idea_gen.py: -------------------------------------------------------------------------------- ```python import dspy import asyncio from print_utils import print from typing import List, Optional from pydantic import BaseModel, Field from tools import fetch_recent_news class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. You are given some sample punchlines from diverse topic ranges, you can use these punchlines to make your own jokes about the specific query. """ query: str = dspy.InputField(desc="The theme of the joke") joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. """ joke_idea: JokeIdea = dspy.InputField() joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeJudge(dspy.Signature): """Rank each joke idea between 1-N. Rank 1 is the most unique and funniest.""" joke_idea: List[JokeIdea] = dspy.InputField() joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") def check_score_goodness(args, pred): num_samples = len(args["joke_idea"]) same_length = len(pred.joke_ratings) == num_samples all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) return 1 if (same_length and all_ranks_present) else 0 class IdeaGenerator(dspy.Module): def __init__(self, num_samples=3): self.query_to_idea = dspy.ReAct(QueryToIdea, tools=[fetch_recent_news], max_iters=1) self.judge = dspy.Refine( module=dspy.ChainOfThought(JokeJudge), N=3, reward_fn=check_score_goodness, threshold=1, ) self.query_to_idea.set_lm( lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) ) self.judge.set_lm( lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) ) self.num_samples = num_samples async def acall(self, query: str) -> JokeIdea: joke_ideas = await asyncio.gather( *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)] ) print("Generated Joke Ideas: \n", joke_ideas) judge_score = self.judge(joke_idea=joke_ideas).joke_ratings print("Judge Score for each: ", judge_score) best_joke_idea_idx = judge_score.index(1) selected_joke_idea = joke_ideas[best_joke_idea_idx] print("Selected Joke Idea: \n", selected_joke_idea) return selected_joke_idea.joke_idea async def main(): joke_generator = QueryToIdea() joke = await joke_generator.acall( query="Write a joke about AI that has to do with them turning rogue." ) print("---") print(joke) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t4_reflection.py: -------------------------------------------------------------------------------- ```python import time import dspy import asyncio from dspy.teleprompt.mipro_optimizer_v2 import select from print_utils import print from typing import List, Optional from pydantic import BaseModel, Field # Uncomment this to use mlflow import mlflow mlflow.autolog() mlflow.set_tracking_uri("http://127.0.0.1:5000") mlflow.set_experiment("Reflection") dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. """ joke_idea: JokeIdea = dspy.InputField() joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeJudge(dspy.Signature): """Rank each joke idea between 1-N. Rank 1 is the most unique and funniest.""" joke_idea: List[JokeIdea] = dspy.InputField() joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") def check_score_goodness(args, pred): num_samples = len(args["joke_idea"]) same_length = len(pred.joke_ratings) == num_samples all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) return 1 if (same_length and all_ranks_present) else 0 class ConditionalJokeGenerator(dspy.Module): def __init__(self, num_samples=2, num_reflection_steps=2): self.query_to_idea = dspy.ChainOfThought(QueryToIdea) self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) self.judge = dspy.Refine( module=dspy.ChainOfThought(JokeJudge), N=3, reward_fn=check_score_goodness, threshold=1, ) self.num_samples = num_samples self.num_reflection_steps = num_reflection_steps async def aforward(self, query: str): joke_ideas = await asyncio.gather( *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)] ) raise Exception("Something went wrong") print("Generated Joke Ideas: \n", joke_ideas) judge_score = self.judge(joke_idea=joke_ideas).joke_ratings print("Judge Score for each: ", judge_score) best_joke_idea_idx = judge_score.index(1) selected_joke_idea = joke_ideas[best_joke_idea_idx] print("Selected Joke Idea: \n", selected_joke_idea) joke = None for _ in range(self.num_reflection_steps): joke = self.idea_to_joke(joke_idea=selected_joke_idea, joke_draft=joke) print(f"iteration: {_}: Joke: {joke}") return joke async def main(): joke_generator = ConditionalJokeGenerator() start_time = time.time() joke = await joke_generator.acall( query="Write a joke about AI that has to do with them turning rogue." ) print("---") print(joke) print(time.time() - start_time) if __name__ == "__main__": asyncio.run(main()) ``` -------------------------------------------------------------------------------- /level3_evaluation/reflection.py: -------------------------------------------------------------------------------- ```python import time import dspy import asyncio import random import pandas as pd from print_utils import print from typing import List, Optional from pydantic import BaseModel, Field # import mlflow # mlflow.autolog() # mlflow.set_tracking_uri("http://127.0.0.1:5000") # mlflow.set_experiment("Reflection") dspy.configure(track_usage=True) dspy.configure_cache( enable_disk_cache=False, enable_memory_cache=False, ) class JokeIdea(BaseModel): setup: str contradiction: str punchline: str class QueryToIdea(dspy.Signature): """ You are a funny comedian and your goal is to generate a nice structure for a joke. """ query: str = dspy.InputField() joke_idea: JokeIdea = dspy.OutputField() class IdeaToJoke(dspy.Signature): """ You are a funny comedian who likes to tell stories before delivering a punchline. You are always funny and act on the input joke idea. If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. """ joke_idea: JokeIdea = dspy.InputField() joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") joke: str = dspy.OutputField( description="The full joke delivery in the comedian's voice" ) class JokeJudge(dspy.Signature): """Rank each joke idea between 1-N. Rank 1 is the most unique and funniest.""" joke_idea: List[JokeIdea] = dspy.InputField() joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") def check_score_goodness(args, pred): num_samples = len(args["joke_idea"]) same_length = len(pred.joke_ratings) == num_samples all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) return 1 if (same_length and all_ranks_present) else 0 class ConditionalJokeGenerator(dspy.Module): def __init__(self, num_samples=2, num_reflection_steps=2, temperature=0.7, idea_lm="openai/gpt-4.1-mini", joke_lm="openai/gpt-4o"): self.query_to_idea = dspy.ChainOfThought(QueryToIdea) self.query_to_idea.set_lm(lm=dspy.LM(idea_lm, temperature=temperature)) self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) self.idea_to_joke.set_lm(lm=dspy.LM(joke_lm, temperature=temperature)) self.judge = dspy.Refine( module=dspy.ChainOfThought(JokeJudge), N=3, reward_fn=check_score_goodness, threshold=1, ) self.judge.set_lm(dspy.LM("openai/gpt-4.1-mini")) self.num_samples = num_samples self.num_reflection_steps = num_reflection_steps async def aforward(self, query: str): joke_ideas = await asyncio.gather( *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)] ) print("Generated Joke Ideas: \n", joke_ideas) judge_score = self.judge(joke_idea=joke_ideas).joke_ratings print("Judge Score for each: ", judge_score) best_joke_idea_idx = judge_score.index(1) selected_joke_idea = joke_ideas[best_joke_idea_idx] print("Selected Joke Idea: \n", selected_joke_idea) joke = None for _ in range(self.num_reflection_steps): joke = self.idea_to_joke(joke_idea=selected_joke_idea, joke_draft=joke) print(joke) return joke async def main(): # Define hyperparameters joke_lms = ["openai/gpt-4.1", "gemini/gemini-1.5-pro"] idea_lms = ["openai/gpt-4.1-mini", "gemini/gemini-2.0-flash"] temperatures = [0.2, 0.7, 1.2] num_samples = [2, 3] num_reflection_steps = [1, 3] # Number of random combinations to test num_trials = 10 # List to store results results = [] for i in range(num_trials): # Randomly select hyperparameters selected_joke_lm = random.choice(joke_lms) selected_idea_lm = random.choice(idea_lms) selected_temperature = random.choice(temperatures) selected_num_samples = random.choice(num_samples) selected_num_reflection_steps = random.choice(num_reflection_steps) print(f"Trial {i+1}/{num_trials}: Running with: joke_lm={selected_joke_lm}, idea_lm={selected_idea_lm}, temperature={selected_temperature}, num_samples={selected_num_samples}, num_reflection_steps={selected_num_reflection_steps}") # Instantiate the generator with selected hyperparameters joke_generator = ConditionalJokeGenerator( joke_lm=selected_joke_lm, idea_lm=selected_idea_lm, temperature=selected_temperature, num_samples=selected_num_samples, num_reflection_steps=selected_num_reflection_steps ) start_time = time.time() try: joke = await joke_generator.aforward( query="Write a joke about AI that has to do with them turning rogue." ) latency = time.time() - start_time results.append({ "joke_lm": selected_joke_lm, "idea_lm": selected_idea_lm, "temperature": selected_temperature, "num_samples": selected_num_samples, "num_reflection_steps": selected_num_reflection_steps, "joke": joke.joke, "latency": latency }) print(f"Finished in {latency:.2f} seconds.") except Exception as e: print(f"An error occurred: {e}") latency = time.time() - start_time results.append({ "joke_lm": selected_joke_lm, "idea_lm": selected_idea_lm, "temperature": selected_temperature, "num_samples": selected_num_samples, "num_reflection_steps": selected_num_reflection_steps, "joke": f"ERROR: {e}", "latency": latency }) # Create a DataFrame from the results df = pd.DataFrame(results) # Print the DataFrame print(df) # Save the DataFrame to a CSV file df.to_csv("evaluation_results.csv", index=False) if __name__ == "__main__": asyncio.run(main()) ```