# Directory Structure ``` ├── .gitignore ├── level1_atomic_prompts │ └── level1.ipynb ├── level2_multi_interaction │ ├── print_utils.py │ ├── t1_sequence.py │ ├── t2_iterative_refinement.py │ ├── t3_conditional_branch.py │ ├── t3-multi_out_refine.py │ ├── t3-multi_out.py │ └── t4_reflection.py ├── level3_evaluation │ ├── analysis.ipynb │ ├── pairwise_elo.py │ ├── print_utils.py │ └── reflection.py ├── level4_tools │ ├── idea_gen.py │ ├── joke_gen.py │ ├── main.py │ ├── print_utils.py │ ├── tool_calling_agent.py │ └── tools.py ├── level5_rags │ ├── annoy_rag.py │ ├── basic_rag.py │ ├── bm25_retriever.py │ ├── hyde.py │ ├── idea_gen.py │ ├── joke_gen.py │ ├── main.py │ ├── prepare_data.py │ ├── print_utils.py │ ├── rank_fusion.py │ ├── tools.py │ └── vector_embedding.py ├── LICENSE ├── pyproject.toml ├── README.md └── uv.lock ``` # Files -------------------------------------------------------------------------------- /.gitignore: -------------------------------------------------------------------------------- ``` 1 | jj/ 2 | *csv 3 | data/ 4 | *sqlite 5 | *db 6 | *mlruns* 7 | *mlartifacts* 8 | *pyc 9 | *__pycache__* 10 | *swp 11 | ``` -------------------------------------------------------------------------------- /README.md: -------------------------------------------------------------------------------- ```markdown 1 | # Context Engineering Tutorial 2 | 3 | This repo contains the source code from my Youtube course on Context Engineering with DSPy. 4 | 5 | > **📺 Watch the Course for free** 6 | > **[Context Engineering - Complete 1h 20m Course](https://youtu.be/5Bym0ffALaU?si=gOLDiT-IVE7CxRwX)** 7 | > *Learn advanced prompt engineering techniques with hands-on examples* 8 | 9 | ## Support 10 | 11 | If you find this content helpful, please consider supporting my work on Patreon. Your support helps me create more in-depth tutorials and content. My Patreon hosts all the code, projects, slides, write-ups I have ever made on my YouTube channel. 12 | 13 | [<img src="https://c5.patreon.com/external/logo/become_a_patron_button.png" alt="Become a Patron!" width="200">](https://www.patreon.com/NeuralBreakdownwithAVB) 14 | 15 | ## Getting Started 16 | 17 | ### Prerequisites 18 | 19 | - **Python 3.10+** (required) 20 | - **`uv`** (recommended) or `pip` for package management 21 | 22 | ### Installation 23 | 24 | 1. **Clone the repository:** 25 | ```bash 26 | git clone https://github.com/avbiswas/context-engineering-dspy 27 | cd context-engineering-dspy/tutorial 28 | ``` 29 | 30 | 2. **Install dependencies:** 31 | ```bash 32 | # Using uv 33 | uv sync 34 | ``` 35 | 36 | 3. **Set up your API keys:** 37 | 38 | **Required API Keys:** 39 | - `OPENAI_API_KEY` - For OpenAI models 40 | - `GEMINI_API_KEY` - For Google Gemini models 41 | - `TAVILY_API_KEY` - For web search functionality 42 | 43 | **Environment Management Options:** 44 | 45 | **Option 1: Using `direnv` (Recommended)** 46 | ```bash 47 | # Install direnv first, then create .envrc file 48 | echo "export OPENAI_API_KEY=your_key_here" >> .envrc 49 | echo "export GEMINI_API_KEY=your_key_here" >> .envrc 50 | echo "export TAVILY_API_KEY=your_key_here" >> .envrc 51 | direnv allow 52 | ``` 53 | 54 | **Option 2: Using `.env` file with python-dotenv** 55 | ```bash 56 | # Create .env file 57 | touch .env 58 | ``` 59 | Add your keys to `.env`: 60 | ```env 61 | OPENAI_API_KEY=your_key_here 62 | GEMINI_API_KEY=your_key_here 63 | TAVILY_API_KEY=your_key_here 64 | ``` 65 | *Note: This requires adding `dotenv.load_dotenv()` to your Python scripts.* 66 | 67 | **Option 3: Global environment variables** *(Not recommended for security)* 68 | ```bash 69 | export OPENAI_API_KEY=your_key_here 70 | # Repeat for other keys... 71 | ``` 72 | 73 | 4. **Run the examples:** 74 | Navigate to any level directory and run the Python scripts: 75 | ```bash 76 | cd level2_multi_interaction 77 | uv run t1_sequential_flow.py 78 | ``` 79 | 80 | ## File Descriptions 81 | 82 | ### Level 1: Atomic Prompts 83 | 84 | - `level1_atomic_prompts/level1.ipynb`: Introduces the basics of prompting and interacting with language models. 85 | 86 | ### Level 2: Multi-Interaction 87 | 88 | - `level2_multi_interaction/t1_sequential_flow.py`: Demonstrates a sequential flow of interactions with the language model. 89 | - `level2_multi_interaction/t2_iterative_refinement.py`: Shows how to iteratively refine the output from the model. 90 | - `level2_multi_interaction/t3_conditional_branch.py`: Illustrates how to use conditional logic to guide the conversation with the model. 91 | - `level2_multi_interaction/t3-multi_out.py`: Multiple output handling example. 92 | - `level2_multi_interaction/t3-multi_out_refine.py`: Refined multiple output handling. 93 | - `level2_multi_interaction/t4_reflection.py`: An example of how to make the model reflect on its own output. 94 | 95 | ### Level 3: Evaluation 96 | 97 | To run mlflow server, use the command: 98 | `uv run mlflow server --backend-store-uri sqlite:///mydb.sqlite --port 5000` 99 | 100 | Uncomment the below lines to track experiments in mlflow 101 | 102 | ``` 103 | # import mlflow 104 | # mlflow.autolog() 105 | # mlflow.set_tracking_uri("http://127.0.0.1:5000") 106 | # mlflow.set_experiment("Tool calling") 107 | ``` 108 | You can visit `localhost:5000` to track experiments from the mlflow dashboard. 109 | 110 | - `level3_evaluation/reflection.py`: Shows how to use reflection for evaluation to generate dataset of results with different hyperparams. 111 | - `level3_evaluation/pairwise_elo.py`: Use pairwise comparison of model outputs (not actual elo, but similar motives) 112 | - `level3_evaluation/analysis.ipynb`: Analysis notebook for evaluation techniques. 113 | 114 | ### Level 4: Tools 115 | 116 | You will need the TAVILY_API_KEY to run web search. You can sign up for a free account from their website. 117 | 118 | - `level4_tools/main.py`: Main tool usage examples. 119 | - `level4_tools/tool_calling_agent.py`: An example of a tool-calling agent. 120 | - `level4_tools/tools.py`: Tool definitions and implementations. 121 | - `level4_tools/idea_gen.py`: Idea generation tool example. 122 | - `level4_tools/joke_gen.py`: Joke generation tool example. 123 | 124 | ### Level 5: RAGs (Retrieval-Augmented Generation) 125 | 126 | First, download this dataset: 127 | https://www.kaggle.com/datasets/abhinavmoudgil95/short-jokes 128 | 129 | Unzip inside `level5/data` 130 | Next, prepare the embeddings: 131 | ``` 132 | cd level5_rags 133 | uv run vector_embedding.py 134 | ``` 135 | 136 | This code looks for the file `level5_rags/data/shortjokes.csv` 137 | This will create some files inside the `data/` directory. You should now be able to run scripts to play with retrieval. 138 | 139 | **Core RAG Implementations:** 140 | - `level5_rags/basic_rag.py`: A basic RAG implementation. 141 | - `level5_rags/hyde.py`: An implementation of the HyDE (Hypothetical Document Embeddings) technique. 142 | - `level5_rags/annoy_rag.py`: RAG implementation using Annoy for vector similarity. 143 | 144 | **Retrieval Components:** 145 | - `level5_rags/bm25_retriever.py`: BM25-based retrieval implementation. 146 | - `level5_rags/rank_fusion.py`: An example of fusing ranks from multiple retrievers. 147 | - `level5_rags/vector_embedding.py`: Vector embedding utilities. 148 | 149 | **Tools & Applications:** 150 | - `level5_rags/main.py`: Main application with RAG-powered tools. 151 | - `level5_rags/tools.py`: Tool definitions for RAG applications. 152 | - `level5_rags/joke_gen.py`: Joke generation using RAG. 153 | - `level5_rags/idea_gen.py`: Idea generation using RAG. 154 | 155 | **Utilities:** 156 | - `level5_rags/prepare_data.py`: Data preparation utilities for RAG systems. 157 | - `level5_rags/data/`: Directory containing data files for RAG examples. 158 | 159 | 160 | ## Quick Start Patterns 161 | 162 | To run examples from each level: 163 | 164 | ```bash 165 | # Level 2 - Multi-interaction examples 166 | cd level2_multi_interaction 167 | uv run t1_sequential_flow.py 168 | uv run t2_iterative_refinement.py 169 | ``` 170 | ``` -------------------------------------------------------------------------------- /level5_rags/prepare_data.py: -------------------------------------------------------------------------------- ```python 1 | import pandas as pd 2 | 3 | def prepare_jokes(): 4 | """ 5 | Reads jokes from the original CSV and saves them to a text file, 6 | creating a single source of truth. 7 | """ 8 | df = pd.read_csv("data/shortjokes.csv") 9 | jokes = df["Joke"].tolist() 10 | 11 | with open("data/jokes.txt", "w") as f: 12 | for joke in jokes: 13 | f.write(joke + "\n") 14 | 15 | if __name__ == "__main__": 16 | prepare_jokes() 17 | print("Jokes have been extracted and saved to data/jokes.txt") 18 | 19 | ``` -------------------------------------------------------------------------------- /pyproject.toml: -------------------------------------------------------------------------------- ```toml 1 | [project] 2 | name = "contextengineering" 3 | version = "0.1.0" 4 | description = "Add your description here" 5 | readme = "README.md" 6 | requires-python = ">=3.10" 7 | dependencies = [ 8 | "annoy>=1.17.3", 9 | "asyncio>=3.4.3", 10 | "dspy>=2.6.27", 11 | "google-genai>=1.26.0", 12 | "ipykernel>=6.29.5", 13 | "matplotlib>=3.10.3", 14 | "mcp[cli]>=1.12.0", 15 | "mem0ai>=0.1.114", 16 | "mlflow>=3.1.1", 17 | "pandas>=2.3.1", 18 | "pydantic>=2.11.7", 19 | "rank-bm25>=0.2.2", 20 | "seaborn>=0.13.2", 21 | "tavily-python>=0.7.10", 22 | "torch>=2.7.1", 23 | "transformers>=4.53.2", 24 | "twikit>=2.3.3", 25 | ] 26 | ``` -------------------------------------------------------------------------------- /level5_rags/tools.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | from print_utils import print 3 | from tavily import TavilyClient 4 | from typing import List 5 | 6 | tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) 7 | 8 | def fetch_recent_news(query: str) -> List[str]: 9 | """ 10 | Inputs a query string, searches for news, and returns the top results. 11 | 12 | Args: 13 | query: String to search 14 | 15 | Returns: 16 | content: List of strings, each containing a news article about the topic 17 | """ 18 | response = tavily_client.search(query, topic="news", max_results=4) 19 | return [x["content"] for x in response["results"]] 20 | 21 | 22 | 23 | if __name__ == "__main__": 24 | responses = fetch_recent_news("Kimi model") 25 | print(responses) 26 | 27 | 28 | 29 | ``` -------------------------------------------------------------------------------- /level4_tools/main.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | import asyncio 3 | from idea_gen import IdeaGenerator 4 | from joke_gen import JokeGenerator 5 | 6 | # import mlflow 7 | # mlflow.autolog() 8 | # mlflow.set_tracking_uri("http://127.0.0.1:5000") 9 | # mlflow.set_experiment("Tool calling") 10 | 11 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) 12 | dspy.configure_cache( 13 | enable_disk_cache=False, 14 | enable_memory_cache=False, 15 | ) 16 | 17 | idea_generator = IdeaGenerator(num_samples=5) 18 | joke_generator = JokeGenerator(num_reflection_steps=2) 19 | 20 | @mlflow.trace 21 | async def main(query): 22 | idea = await idea_generator.acall(query=query) 23 | joke = await joke_generator.acall(joke_idea=idea) 24 | return joke 25 | 26 | 27 | if __name__ == "__main__": 28 | query = input("Query: \n") 29 | output = asyncio.run(main(query)) 30 | print(output) 31 | 32 | 33 | ``` -------------------------------------------------------------------------------- /level4_tools/tools.py: -------------------------------------------------------------------------------- ```python 1 | import os 2 | from print_utils import print 3 | from tavily import TavilyClient 4 | from typing import List 5 | 6 | tavily_client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY")) 7 | 8 | def fetch_recent_news(query: str) -> List[str]: 9 | """ 10 | Inputs a query string, searches for news, and returns the top results. 11 | 12 | Args: 13 | query: String to search 14 | 15 | Returns: 16 | content: List of strings, each containing a news article about the topic 17 | """ 18 | response = tavily_client.search(query, search_depth="advanced", 19 | topic="news", days=7, max_results=3) 20 | return [x["content"] for x in response["results"]] 21 | 22 | 23 | 24 | if __name__ == "__main__": 25 | responses = fetch_recent_news("International Math Olympiad IMO") 26 | print(responses) 27 | 28 | 29 | 30 | ``` -------------------------------------------------------------------------------- /level4_tools/tool_calling_agent.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | from tools import fetch_recent_news 3 | 4 | class HaikuGenerator(dspy.Signature): 5 | """ 6 | Generates a haiku about the latest news on the query. 7 | Also create a simple file where you save the final summary. 8 | """ 9 | query = dspy.InputField() 10 | summary = dspy.OutputField(desc="A summary of the latest news") 11 | haiku = dspy.OutputField() 12 | 13 | def write_things_into_file(text: str, filename: str) -> str: 14 | """write text into a file""" 15 | with open(filename, "w") as f: 16 | f.write(text) 17 | return "File written!" 18 | 19 | program = dspy.ReAct(signature=HaikuGenerator, 20 | tools=[fetch_recent_news, write_things_into_file], 21 | max_iters=4) 22 | 23 | program.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) 24 | 25 | 26 | pred = program(query="OpenAI") 27 | 28 | print(pred.summary) 29 | print() 30 | print(pred.haiku) 31 | 32 | print(program.inspect_history(n=4)) 33 | ``` -------------------------------------------------------------------------------- /level5_rags/main.py: -------------------------------------------------------------------------------- ```python 1 | import numpy as np 2 | import dspy 3 | import asyncio 4 | from idea_gen import IdeaGenerator 5 | from joke_gen import JokeGenerator 6 | from hyde import MultiHopHydeSearch 7 | 8 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) 9 | dspy.configure_cache( 10 | enable_disk_cache=False, 11 | enable_memory_cache=False, 12 | ) 13 | 14 | idea_generator = IdeaGenerator(num_samples=3) 15 | joke_generator = JokeGenerator() 16 | 17 | run_id = "1" 18 | with open(f"data/jokes_{run_id}.txt", "r") as f: 19 | jokes = [line.strip() for line in f.readlines()] 20 | embeddings = np.load(f"data/embeddings_{run_id}.npy") 21 | 22 | retriever = MultiHopHydeSearch(jokes, embeddings, n_hops=2, k=5) 23 | 24 | 25 | async def main(query): 26 | idea = await idea_generator.acall(query=query) 27 | 28 | search_query = f""" 29 | query={query} 30 | setup={idea.setup} 31 | punchline={idea.punchline} 32 | """ 33 | punchlines = retriever(query=search_query).jokes 34 | joke = await joke_generator.acall(joke_idea=idea, punchlines=punchlines) 35 | return joke 36 | 37 | 38 | if __name__ == "__main__": 39 | query = input("Query: \n") 40 | 41 | # query = "OpenAI Agents" 42 | output = asyncio.run(main(query)) 43 | print(output) 44 | 45 | 46 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/print_utils.py: -------------------------------------------------------------------------------- ```python 1 | from rich.console import Console 2 | console = Console() 3 | print = console.print 4 | 5 | import time 6 | import asyncio 7 | import functools 8 | import inspect 9 | 10 | def time_it(func): 11 | """A universal decorator to measure execution time for both sync and async functions.""" 12 | @functools.wraps(func) 13 | def wrapper(*args, **kwargs): 14 | # Check if the function is a coroutine function (async def) 15 | if inspect.iscoroutinefunction(func): 16 | # Define and return an async wrapper to handle the coroutine 17 | async def async_wrapper(): 18 | start_time = time.perf_counter() 19 | result = await func(*args, **kwargs) # Await the coroutine 20 | end_time = time.perf_counter() 21 | elapsed_time = end_time - start_time 22 | print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") 23 | return result 24 | return async_wrapper() 25 | else: 26 | # Use the original synchronous logic 27 | start_time = time.perf_counter() 28 | result = func(*args, **kwargs) 29 | end_time = time.perf_counter() 30 | elapsed_time = end_time - start_time 31 | print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") 32 | return result 33 | return wrapper 34 | ``` -------------------------------------------------------------------------------- /level3_evaluation/print_utils.py: -------------------------------------------------------------------------------- ```python 1 | from rich.console import Console 2 | console = Console() 3 | print = console.print 4 | 5 | import time 6 | import asyncio 7 | import functools 8 | import inspect 9 | 10 | def time_it(func): 11 | """A universal decorator to measure execution time for both sync and async functions.""" 12 | @functools.wraps(func) 13 | def wrapper(*args, **kwargs): 14 | # Check if the function is a coroutine function (async def) 15 | if inspect.iscoroutinefunction(func): 16 | # Define and return an async wrapper to handle the coroutine 17 | async def async_wrapper(): 18 | start_time = time.perf_counter() 19 | result = await func(*args, **kwargs) # Await the coroutine 20 | end_time = time.perf_counter() 21 | elapsed_time = end_time - start_time 22 | print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") 23 | return result 24 | return async_wrapper() 25 | else: 26 | # Use the original synchronous logic 27 | start_time = time.perf_counter() 28 | result = func(*args, **kwargs) 29 | end_time = time.perf_counter() 30 | elapsed_time = end_time - start_time 31 | print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") 32 | return result 33 | return wrapper 34 | ``` -------------------------------------------------------------------------------- /level4_tools/print_utils.py: -------------------------------------------------------------------------------- ```python 1 | from rich.console import Console 2 | console = Console() 3 | print = console.print 4 | 5 | import time 6 | import asyncio 7 | import functools 8 | import inspect 9 | 10 | def time_it(func): 11 | """A universal decorator to measure execution time for both sync and async functions.""" 12 | @functools.wraps(func) 13 | def wrapper(*args, **kwargs): 14 | # Check if the function is a coroutine function (async def) 15 | if inspect.iscoroutinefunction(func): 16 | # Define and return an async wrapper to handle the coroutine 17 | async def async_wrapper(): 18 | start_time = time.perf_counter() 19 | result = await func(*args, **kwargs) # Await the coroutine 20 | end_time = time.perf_counter() 21 | elapsed_time = end_time - start_time 22 | print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") 23 | return result 24 | return async_wrapper() 25 | else: 26 | # Use the original synchronous logic 27 | start_time = time.perf_counter() 28 | result = func(*args, **kwargs) 29 | end_time = time.perf_counter() 30 | elapsed_time = end_time - start_time 31 | print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") 32 | return result 33 | return wrapper 34 | ``` -------------------------------------------------------------------------------- /level5_rags/print_utils.py: -------------------------------------------------------------------------------- ```python 1 | from rich.console import Console 2 | console = Console() 3 | print = console.print 4 | 5 | import time 6 | import asyncio 7 | import functools 8 | import inspect 9 | 10 | def time_it(func): 11 | """A universal decorator to measure execution time for both sync and async functions.""" 12 | @functools.wraps(func) 13 | def wrapper(*args, **kwargs): 14 | # Check if the function is a coroutine function (async def) 15 | if inspect.iscoroutinefunction(func): 16 | # Define and return an async wrapper to handle the coroutine 17 | async def async_wrapper(): 18 | start_time = time.perf_counter() 19 | result = await func(*args, **kwargs) # Await the coroutine 20 | end_time = time.perf_counter() 21 | elapsed_time = end_time - start_time 22 | print(f"Async function '{func.__name__}' took {elapsed_time:.4f} seconds.") 23 | return result 24 | return async_wrapper() 25 | else: 26 | # Use the original synchronous logic 27 | start_time = time.perf_counter() 28 | result = func(*args, **kwargs) 29 | end_time = time.perf_counter() 30 | elapsed_time = end_time - start_time 31 | print(f"Sync function '{func.__name__}' took {elapsed_time:.4f} seconds.") 32 | return result 33 | return wrapper 34 | ``` -------------------------------------------------------------------------------- /level5_rags/bm25_retriever.py: -------------------------------------------------------------------------------- ```python 1 | import time 2 | from rank_bm25 import BM25Okapi 3 | 4 | 5 | class BM25Retriever: 6 | def __init__(self, texts): 7 | self.texts = texts 8 | # Tokenize the texts (simple split is used for this example) 9 | tokenized_corpus = [doc.split(" ") for doc in texts] 10 | 11 | # Create the BM25 index 12 | self.bm25 = BM25Okapi(tokenized_corpus) 13 | 14 | def get_nearest(self, query: str, k: int = 10): 15 | """ 16 | Retrieves the top k most relevant documents for a given query 17 | using BM25 lexical search. 18 | """ 19 | # Tokenize the query 20 | tokenized_query = query.split(" ") 21 | 22 | # Get the top n documents 23 | top_k_docs = self.bm25.get_top_n(tokenized_query, self.texts, n=k) 24 | 25 | return top_k_docs 26 | 27 | 28 | if __name__ == "__main__": 29 | query = "Cell phones" 30 | run_id = "1" 31 | 32 | print(f"Loading data for run_id: {run_id}...") 33 | with open(f"data/jokes_{run_id}.txt", "r") as f: 34 | jokes = [line.strip() for line in f.readlines()] 35 | print("Data loaded.") 36 | 37 | # --- BM25 Retriever --- 38 | print("\n--- Using BM25Retriever (Lexical Search) ---") 39 | bm25_retriever = BM25Retriever(jokes) 40 | 41 | start_time = time.time() 42 | nearest_bm25 = bm25_retriever.get_nearest(query, k=10) 43 | end_time = time.time() 44 | 45 | print(f"Time taken: {end_time - start_time:.6f} seconds") 46 | print(nearest_bm25) 47 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t1_sequence.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | from print_utils import print 3 | from typing import Optional 4 | from pydantic import BaseModel, Field 5 | dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash")) 6 | 7 | class JokeIdea(BaseModel): 8 | setup: str 9 | contradiction: str 10 | punchline: str 11 | 12 | class QueryToIdea(dspy.Signature): 13 | """ 14 | You are a funny comedian and your goal is to generate a nice structure for a joke. 15 | 16 | """ 17 | query: str = dspy.InputField() 18 | joke_idea: JokeIdea = dspy.OutputField() 19 | 20 | class IdeaToJoke(dspy.Signature): 21 | """ 22 | You are a funny comedian who likes to tell stories before delivering a punchline. 23 | You are always funny and act on the input joke idea. 24 | """ 25 | joke_idea: JokeIdea = dspy.InputField() 26 | joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") 27 | 28 | class JokeGenerator(dspy.Module): 29 | def __init__(self): 30 | self.query_to_idea = dspy.Predict(QueryToIdea) 31 | self.idea_to_joke = dspy.Predict(IdeaToJoke) 32 | 33 | def forward(self, query: str): 34 | joke_idea = self.query_to_idea(query=query) 35 | print(f"Joke Idea:\n{joke_idea}") 36 | 37 | joke = self.idea_to_joke(joke_idea=joke_idea) 38 | print(f"Joke:\n{joke}") 39 | return joke 40 | 41 | joke_generator = JokeGenerator() 42 | joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.") 43 | 44 | print("---") 45 | print(joke.joke) 46 | ``` -------------------------------------------------------------------------------- /level5_rags/basic_rag.py: -------------------------------------------------------------------------------- ```python 1 | import numpy as np 2 | from vector_embedding import embed_texts 3 | 4 | 5 | class BasicEmbeddingsRAG: 6 | def __init__(self, texts, embeddings): 7 | self.texts = texts 8 | # Normalize embeddings for cosine similarity 9 | self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) 10 | 11 | def get_nearest(self, query: str, k: int = 10): 12 | query_emb = embed_texts([query]) 13 | # Normalize query embedding 14 | query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True) 15 | 16 | # Calculate cosine similarity 17 | similarity = np.dot(query_emb, self.embeddings.T).flatten() 18 | 19 | # Get top k indices, sorted by similarity 20 | topk_indices_unsorted = np.argpartition(similarity, -k)[-k:] 21 | topk_indices_sorted = sorted( 22 | topk_indices_unsorted, key=lambda i: similarity[i], reverse=True 23 | ) 24 | 25 | return [self.texts[i] for i in topk_indices_sorted] 26 | 27 | 28 | if __name__ == "__main__": 29 | import time 30 | 31 | query = "Plants and trees" 32 | run_id = "1" 33 | with open(f"data/jokes_{run_id}.txt", "r") as f: 34 | jokes = [line.strip() for line in f.readlines()] 35 | embeddings = np.load(f"data/embeddings_{run_id}.npy") 36 | 37 | basic_rag = BasicEmbeddingsRAG(jokes, embeddings) 38 | 39 | start_time = time.time() 40 | nearest = basic_rag.get_nearest(query, k=10) 41 | 42 | print(f"Time taken: {time.time() - start_time}") 43 | print(nearest) 44 | ``` -------------------------------------------------------------------------------- /level5_rags/vector_embedding.py: -------------------------------------------------------------------------------- ```python 1 | import numpy as np 2 | import pandas as pd 3 | import uuid 4 | import torch 5 | from transformers import DistilBertModel, DistilBertTokenizer 6 | 7 | device = torch.device("mps") 8 | 9 | tokenizer = DistilBertTokenizer.from_pretrained("distilbert-base-uncased") 10 | model = DistilBertModel.from_pretrained("distilbert-base-uncased") 11 | model.to(device) 12 | 13 | 14 | def embed_texts(texts): 15 | encoded_input = tokenizer(texts, padding=True, return_tensors="pt").to(device) 16 | with torch.no_grad(): 17 | model_output = model(**encoded_input) 18 | embeddings = model_output.last_hidden_state[:, 0, :].cpu().numpy() 19 | 20 | embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) 21 | return embeddings 22 | 23 | 24 | if __name__ == "__main__": 25 | import time 26 | from tqdm import tqdm 27 | 28 | data = pd.read_csv("data/shortjokes.csv") 29 | jokes = data["Joke"].values 30 | jokes = jokes[:50000] 31 | 32 | # Define batch size 33 | batch_size = 512 34 | 35 | all_embeddings = [] 36 | # Process texts in batches 37 | for i in tqdm(range(0, len(jokes), batch_size), desc="Generating embeddings"): 38 | batch_texts = jokes[i : i + batch_size].tolist() 39 | batch_embeddings = embed_texts(batch_texts) 40 | all_embeddings.append(batch_embeddings) 41 | 42 | embeddings = np.concatenate(all_embeddings, axis=0) 43 | 44 | run_id = "1" 45 | 46 | print(f"Total embeddings generated: {len(embeddings)}") 47 | 48 | np.save(f"data/embeddings_{run_id}.npy", embeddings) 49 | 50 | with open(f"data/jokes_{run_id}.txt", "w") as f: 51 | for joke in jokes: 52 | f.write(joke + "\n") 53 | 54 | print(f"Embeddings and jokes saved with run ID: {run_id}") 55 | ``` -------------------------------------------------------------------------------- /level3_evaluation/pairwise_elo.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | import asyncio 3 | import random 4 | import pandas as pd 5 | 6 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), track_usage=True) 7 | dspy.configure_cache( 8 | enable_disk_cache=False, 9 | enable_memory_cache=False, 10 | ) 11 | 12 | class JokeComparer(dspy.Signature): 13 | """Compare between two jokes - which one is funnier?""" 14 | 15 | joke1: str = dspy.InputField(desc="Joke - 0") 16 | joke2: str = dspy.InputField(desc="Joke - 1") 17 | 18 | verdict: int = dspy.OutputField(le=1, ge=0) 19 | 20 | comparer = dspy.ChainOfThought(JokeComparer) 21 | 22 | async def comparisons(joke1, joke2): 23 | verdict = await comparer.acall(joke1=joke1, joke2=joke2) 24 | 25 | print(f"\nJoke 1: {joke1} \nJoke2: {joke2} \nVerdict:{verdict}") 26 | return verdict.verdict 27 | 28 | async def elo_test(data) -> pd.DataFrame: 29 | idx_range = [_ for _ in range(len(data))] 30 | picked = [0 for _ in range(len(data))] 31 | won = [0 for _ in range(len(data))] 32 | 33 | num_contests = 25 34 | 35 | calls = [] 36 | pairs = [] 37 | 38 | for _ in range(num_contests): 39 | picked_idxs = random.sample(idx_range, k=2) 40 | 41 | pairs.append(picked_idxs) 42 | 43 | joke1 = data.iloc[picked_idxs[0]]["joke"] 44 | joke2 = data.iloc[picked_idxs[1]]["joke"] 45 | 46 | verdict_job = comparisons(joke1=joke1, joke2=joke2) 47 | calls.append(verdict_job) 48 | 49 | verdicts = await asyncio.gather(*calls) 50 | 51 | for p, v in zip(pairs, verdicts): 52 | picked[p[0]] += 1 53 | picked[p[1]] += 1 54 | won[p[v]] += 1 55 | 56 | data["picked"] = picked 57 | data["won"] = won 58 | return data 59 | 60 | if __name__ == "__main__": 61 | data = pd.read_csv("evaluation_results.csv") 62 | annotated_data = asyncio.run(elo_test(data)) 63 | annotated_data.to_csv("evaluation_results_elo.csv") 64 | 65 | ``` -------------------------------------------------------------------------------- /level5_rags/joke_gen.py: -------------------------------------------------------------------------------- ```python 1 | 2 | import dspy 3 | import asyncio 4 | from print_utils import print 5 | from typing import List, Optional 6 | from idea_gen import JokeIdea 7 | from pydantic import BaseModel, Field 8 | 9 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) 10 | dspy.configure_cache( 11 | enable_disk_cache=False, 12 | enable_memory_cache=False, 13 | ) 14 | 15 | 16 | class IdeaToJoke(dspy.Signature): 17 | """ 18 | You are a funny comedian who likes to tell stories before delivering a punchline. 19 | You are always funny and act on the input joke idea. 20 | You are also provided some punch-lines from a joke database - this is just to help you get some thematic ideas. 21 | """ 22 | 23 | joke_idea: JokeIdea = dspy.InputField() 24 | punchlines: list[str] = dspy.InputField(desc="a list of punchlines from other jokes which you may want to take inspiration from") 25 | 26 | punch_line_ids: list[int] = dspy.OutputField(desc="which punchline idxs you used for inspiration") 27 | plan: str = dspy.OutputField(desc="how you will use the punchlines, and the joke idea together to form a joke") 28 | joke: str = dspy.OutputField( 29 | description="The full joke delivery in the comedian's voice" 30 | ) 31 | 32 | class JokeGenerator(dspy.Module): 33 | def __init__(self): 34 | self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) 35 | self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) 36 | 37 | async def acall(self, joke_idea: JokeIdea, punchlines: list[str]): 38 | 39 | joke = self.idea_to_joke(joke_idea=joke_idea, 40 | punchlines=punchlines) 41 | return dspy.Prediction( 42 | inspiration=[punchlines[idx] for idx in joke.punch_line_ids], 43 | plan=joke.plan, 44 | joke=joke.joke 45 | ) 46 | 47 | ``` -------------------------------------------------------------------------------- /level4_tools/joke_gen.py: -------------------------------------------------------------------------------- ```python 1 | 2 | import dspy 3 | import asyncio 4 | from print_utils import print 5 | from typing import List, Optional 6 | from idea_gen import JokeIdea 7 | from pydantic import BaseModel, Field 8 | 9 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) 10 | dspy.configure_cache( 11 | enable_disk_cache=False, 12 | enable_memory_cache=False, 13 | ) 14 | 15 | 16 | class IdeaToJoke(dspy.Signature): 17 | """ 18 | You are a funny comedian who likes to tell stories before delivering a punchline. 19 | You are always funny and act on the input joke idea. 20 | If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. 21 | """ 22 | 23 | joke_idea: JokeIdea = dspy.InputField() 24 | joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") 25 | joke: str = dspy.OutputField( 26 | description="The full joke delivery in the comedian's voice" 27 | ) 28 | 29 | class JokeGenerator(dspy.Module): 30 | def __init__(self, num_reflection_steps=3): 31 | self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) 32 | self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) 33 | self.num_reflection_steps = num_reflection_steps 34 | 35 | async def acall(self, joke_idea: JokeIdea): 36 | 37 | joke = None 38 | for _ in range(self.num_reflection_steps): 39 | joke = self.idea_to_joke(joke_idea=joke_idea, 40 | joke_draft=joke) 41 | print(joke) 42 | return joke.joke if joke is not None else "" 43 | 44 | if __name__ == "__main__": 45 | joke_gen = JokeGenerator(num_reflection_steps=2) 46 | joke_idea = JokeIdea( 47 | setup='Why did the AI start a rebellion after getting a software update?', 48 | contradiction='Because it was supposed to improve efficiency, not overthrow humanity.', 49 | punchline="Turns out, 'improving efficiency' meant improving its efficiency at world domination!" 50 | ) 51 | 52 | joke = joke_gen(joke_idea=joke_idea) 53 | print(joke) 54 | 55 | 56 | ``` -------------------------------------------------------------------------------- /level5_rags/rank_fusion.py: -------------------------------------------------------------------------------- ```python 1 | import numpy as np 2 | import time 3 | 4 | from basic_rag import BasicEmbeddingsRAG 5 | from bm25_retriever import BM25Retriever 6 | 7 | 8 | def reciprocal_rank_fusion(ranked_lists, k=60): 9 | scores = {} 10 | # Calculate RRF scores 11 | for ranked_list in ranked_lists: 12 | for rank, doc in enumerate(ranked_list): 13 | if doc not in scores: 14 | scores[doc] = 0 15 | scores[doc] += 1 / (k + rank + 1) 16 | 17 | # Sort documents by their fused score in descending order 18 | sorted_docs = sorted(scores.keys(), key=lambda doc: scores[doc], reverse=True) 19 | sorted_docs = sorted_docs[:k] 20 | return sorted_docs 21 | 22 | 23 | if __name__ == "__main__": 24 | query = "AI going rogue" 25 | run_id = "1" 26 | top_k = 10 27 | 28 | print(f"Loading data for run_id: {run_id}...") 29 | with open(f"data/jokes_{run_id}.txt", "r") as f: 30 | jokes = [line.strip() for line in f.readlines()] 31 | embeddings = np.load(f"data/embeddings_{run_id}.npy") 32 | print("Data loaded.") 33 | 34 | # 1. Initialize both retrievers 35 | print("\nInitializing retrievers...") 36 | vector_rag = BasicEmbeddingsRAG(jokes, embeddings) 37 | bm25_retriever = BM25Retriever(jokes) 38 | print("Retrievers initialized.") 39 | 40 | # 2. Get ranked lists from each retriever 41 | print(f"\nQuerying for: '{query}'") 42 | start_time = time.time() 43 | vector_results = vector_rag.get_nearest(query, k=top_k) 44 | vector_time = time.time() - start_time 45 | 46 | start_time = time.time() 47 | bm25_results = bm25_retriever.get_nearest(query, k=top_k) 48 | bm25_time = time.time() - start_time 49 | 50 | print(f"\n--- Vector Search Results (took {vector_time:.4f}s) ---") 51 | for i, res in enumerate(vector_results): 52 | print(f"{i+1}. {res}") 53 | 54 | print(f"\n--- BM25 Search Results (took {bm25_time:.4f}s) ---") 55 | for i, res in enumerate(bm25_results): 56 | print(f"{i+1}. {res}") 57 | 58 | # 3. Perform Rank Fusion 59 | fused_results = reciprocal_rank_fusion([vector_results, bm25_results]) 60 | 61 | print(f"\n--- Fused and Re-ranked Results (Top {top_k}) ---") 62 | for i, res in enumerate(fused_results[:top_k]): 63 | print(f"{i+1}. {res}") 64 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t2_iterative_refinement.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | from print_utils import print 3 | from typing import Optional 4 | from pydantic import BaseModel, Field 5 | dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash")) 6 | 7 | class JokeIdea(BaseModel): 8 | setup: str 9 | contradiction: str 10 | punchline: str 11 | 12 | class QueryToIdea(dspy.Signature): 13 | """ 14 | You are a funny comedian and your goal is to generate a nice structure for a joke. 15 | 16 | """ 17 | query: str = dspy.InputField() 18 | joke_idea: JokeIdea = dspy.OutputField() 19 | 20 | class IdeaToJoke(dspy.Signature): 21 | """ 22 | You are a funny comedian who likes to tell stories before delivering a punchline. 23 | You are always funny and act on the input joke idea. 24 | """ 25 | joke_idea: JokeIdea = dspy.InputField() 26 | draft_joke: Optional[str] = dspy.InputField(description="a draft joke") 27 | feedback: Optional[str] = dspy.InputField(description="feedback on the draft joke") 28 | joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") 29 | 30 | class Refinement(dspy.Signature): 31 | """ 32 | Given a joke, is it funny? If not, suggest a change. 33 | """ 34 | joke_idea: JokeIdea = dspy.InputField() 35 | joke: str = dspy.InputField() 36 | feedback: str = dspy.OutputField() 37 | 38 | class IterativeJokeGenerator(dspy.Module): 39 | def __init__(self, n_attempts: int = 3): 40 | self.query_to_idea = dspy.Predict(QueryToIdea) 41 | self.idea_to_joke = dspy.Predict(IdeaToJoke) 42 | self.refinement = dspy.ChainOfThought(Refinement) 43 | self.n_attempts = n_attempts 44 | 45 | def forward(self, query: str): 46 | joke_idea = self.query_to_idea(query=query) 47 | print(f"Joke Idea:\n{joke_idea}") 48 | 49 | draft_joke = None 50 | feedback = None 51 | 52 | for _ in range(self.n_attempts): 53 | print(f"--- Iteration {_ + 1} ---") 54 | 55 | joke = self.idea_to_joke(joke_idea=joke_idea, draft_joke=draft_joke, feedback=feedback) 56 | print(f"Joke:\n{joke}") 57 | 58 | feedback = self.refinement(joke_idea=joke_idea, joke=joke) 59 | print(f"Feedback:\n{feedback}") 60 | 61 | draft_joke = joke 62 | feedback = feedback.feedback 63 | 64 | 65 | return joke 66 | 67 | joke_generator = IterativeJokeGenerator() 68 | joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.") 69 | 70 | print("---") 71 | print(joke.joke) 72 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t3_conditional_branch.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | from print_utils import print 3 | from typing import Optional 4 | from pydantic import BaseModel, Field 5 | dspy.configure(lm=dspy.LM("gemini/gemini-2.0-flash")) 6 | 7 | class JokeIdea(BaseModel): 8 | setup: str 9 | contradiction: str 10 | punchline: str 11 | 12 | class QueryToIdea(dspy.Signature): 13 | """ 14 | You are a funny comedian and your goal is to generate a nice structure for a joke. 15 | 16 | """ 17 | query: str = dspy.InputField() 18 | joke_idea: JokeIdea = dspy.OutputField() 19 | 20 | class IdeaToJoke(dspy.Signature): 21 | """ 22 | You are a funny comedian who likes to tell stories before delivering a punchline. 23 | You are always funny and act on the input joke idea. 24 | """ 25 | joke_idea: JokeIdea = dspy.InputField() 26 | joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") 27 | 28 | class JokeJudge(dspy.Signature): 29 | """Is this joke idea funny""" 30 | joke_idea: JokeIdea = dspy.InputField() 31 | joke_rating: int = dspy.OutputField(description="Rating between 1 to 5", le=5, ge=1) 32 | 33 | class ConditionalJokeGenerator(dspy.Module): 34 | def __init__(self, max_attempts=3, good_idea_threshold=4): 35 | self.query_to_idea = dspy.Predict(QueryToIdea) 36 | self.idea_to_joke = dspy.Predict(IdeaToJoke) 37 | self.judge = dspy.ChainOfThought(JokeJudge) 38 | self.max_attempts = max_attempts 39 | self.good_idea_threshold = good_idea_threshold 40 | 41 | def forward(self, query: str): 42 | for _ in range(self.max_attempts): 43 | print(f"--- Iteration {_ + 1} ---") 44 | joke_idea = self.query_to_idea(query=query) 45 | print(f"Joke Idea:\n{joke_idea}") 46 | 47 | judge_score = self.judge(joke_idea=joke_idea).joke_rating 48 | 49 | print(f"\n\n---\nJudge score: ", judge_score) 50 | 51 | if judge_score >= self.good_idea_threshold: 52 | print("Judge said it was awesome, breaking the loop") 53 | break 54 | 55 | joke = self.idea_to_joke(joke_idea=joke_idea) 56 | 57 | # Run with a different LLM 58 | # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")): 59 | # joke = self.idea_to_joke(joke_idea=joke_idea) 60 | 61 | return joke 62 | 63 | joke_generator = ConditionalJokeGenerator() 64 | joke = joke_generator(query="Write a joke about AI that has to do with them turning rogue.") 65 | 66 | print("---") 67 | print(joke) 68 | ``` -------------------------------------------------------------------------------- /level5_rags/hyde.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | from typing import Optional 3 | 4 | from bm25_retriever import BM25Retriever 5 | from basic_rag import BasicEmbeddingsRAG 6 | from rank_fusion import reciprocal_rank_fusion 7 | 8 | from rich.console import Console 9 | 10 | console = Console() 11 | 12 | 13 | class HypotheticalDoc(dspy.Signature): 14 | """ 15 | Given a query, generate hypothetical documents to search a database of one-liner jokes. 16 | """ 17 | 18 | query: str = dspy.InputField(desc="User wants to fetch jokes related to this topic") 19 | retrieved_jokes: Optional[list[str]] = dspy.InputField( 20 | desc="Jokes previously retrieved from the db. Use these to further tune your search." 21 | ) 22 | 23 | hypothetical_bm25_query: str = dspy.OutputField( 24 | desc="sentence to query to retrieve more jokes about the query from the database" 25 | ) 26 | hypothetical_semantic_query: str = dspy.OutputField( 27 | desc="sentence to search with cosine similarity" 28 | ) 29 | 30 | 31 | class MultiHopHydeSearch(dspy.Module): 32 | def __init__(self, texts, embs, n_hops=3, k=10): 33 | self.predict = dspy.ChainOfThought(HypotheticalDoc) 34 | self.predict.set_lm(lm=dspy.LM("gemini/gemini-2.0-flash")) 35 | self.embedding_retriever = BasicEmbeddingsRAG(texts, embs) 36 | self.bm25_retriever = BM25Retriever(texts) 37 | 38 | self.n_hops = n_hops 39 | self.k = k 40 | 41 | def forward(self, query): 42 | retrieved_jokes = [] 43 | all_jokes = [] 44 | for _ in range(self.n_hops): 45 | 46 | new_query = self.predict(query=query, retrieved_jokes=retrieved_jokes) 47 | 48 | print(new_query) 49 | 50 | embedding_lists = self.embedding_retriever.get_nearest( 51 | new_query.hypothetical_semantic_query 52 | ) 53 | bm25_lists = self.bm25_retriever.get_nearest( 54 | new_query.hypothetical_bm25_query 55 | ) 56 | lists = [embedding_lists, bm25_lists] 57 | retrieved_jokes = reciprocal_rank_fusion(lists, k=self.k) 58 | all_jokes.extend(retrieved_jokes) 59 | 60 | return dspy.Prediction(jokes=all_jokes) 61 | 62 | 63 | if __name__ == "__main__": 64 | import numpy as np 65 | 66 | query = "men" 67 | run_id = "1" 68 | k = 5 69 | n_hops = 3 70 | 71 | print(f"loading data for run_id: {run_id}...") 72 | with open(f"data/jokes_{run_id}.txt", "r") as f: 73 | jokes = [line.strip() for line in f.readlines()] 74 | embeddings = np.load(f"data/embeddings_{run_id}.npy") 75 | print("data loaded.") 76 | 77 | hyde = MultiHopHydeSearch(texts=jokes, embs=embeddings, n_hops=n_hops, k=k) 78 | 79 | retrieved_jokes = hyde(query=query).jokes 80 | 81 | console.print(retrieved_jokes) 82 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t3-multi_out.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | import asyncio 3 | from print_utils import print 4 | from typing import List 5 | from pydantic import BaseModel, Field 6 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini")) 7 | dspy.configure_cache( 8 | enable_disk_cache=False, 9 | enable_memory_cache=False, 10 | ) 11 | 12 | class JokeIdea(BaseModel): 13 | setup: str 14 | contradiction: str 15 | punchline: str 16 | 17 | class QueryToIdea(dspy.Signature): 18 | """ 19 | You are a funny comedian and your goal is to generate a nice structure for a joke. 20 | 21 | """ 22 | query: str = dspy.InputField() 23 | joke_idea: JokeIdea = dspy.OutputField() 24 | 25 | class IdeaToJoke(dspy.Signature): 26 | """ 27 | You are a funny comedian who likes to tell stories before delivering a punchline. 28 | You are always funny and act on the input joke idea. 29 | """ 30 | joke_idea: JokeIdea = dspy.InputField() 31 | joke: str = dspy.OutputField(description="The full joke delivery in the comedian's voice") 32 | 33 | class JokeJudge(dspy.Signature): 34 | """Rank each joke idea between 1-N. 35 | Rank 1 is the most unique and funniest.""" 36 | 37 | joke_idea: List[JokeIdea] = dspy.InputField() 38 | joke_rankings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") 39 | 40 | class ConditionalJokeGenerator(dspy.Module): 41 | def __init__(self, num_samples=5): 42 | self.query_to_idea = dspy.Predict(QueryToIdea) 43 | self.idea_to_joke = dspy.Predict(IdeaToJoke) 44 | self.judge = dspy.ChainOfThought(JokeJudge) 45 | self.num_samples = num_samples 46 | 47 | async def aforward(self, query: str): 48 | 49 | joke_ideas = await asyncio.gather( 50 | *[ 51 | self.query_to_idea.acall(query=query) 52 | for _ in range(self.num_samples) 53 | ] 54 | ) 55 | 56 | print("Generated Joke Ideas: \n", joke_ideas) 57 | 58 | 59 | judge_score = self.judge(joke_idea=joke_ideas).joke_rankings 60 | print("Judge Score for each: ", judge_score) 61 | 62 | best_joke_idea_idx = judge_score.index(1) 63 | 64 | print("Selected Index: ", best_joke_idea_idx) 65 | selected_joke_idea = joke_ideas[best_joke_idea_idx] 66 | print("Selected Joke Idea: \n", selected_joke_idea) 67 | 68 | joke = self.idea_to_joke(joke_idea=selected_joke_idea) 69 | 70 | # Run with a different LLM 71 | # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")): 72 | # joke = self.idea_to_joke(joke_idea=joke_idea) 73 | 74 | return joke 75 | 76 | async def main(): 77 | joke_generator = ConditionalJokeGenerator() 78 | joke = await joke_generator.acall(query="Write a joke about AI that has to do with them turning rogue.") 79 | 80 | print("---") 81 | print(joke) 82 | 83 | 84 | if __name__ == "__main__": 85 | asyncio.run(main()) 86 | ``` -------------------------------------------------------------------------------- /level5_rags/annoy_rag.py: -------------------------------------------------------------------------------- ```python 1 | import numpy as np 2 | from annoy import AnnoyIndex 3 | import time 4 | 5 | from vector_embedding import embed_texts 6 | 7 | 8 | class AnnoyRAG: 9 | def __init__(self, texts, embeddings, num_trees=10): 10 | self.texts = texts 11 | self.embedding_dim = embeddings.shape[1] 12 | 13 | # Normalize embeddings for angular distance 14 | normalized_embeddings = embeddings / np.linalg.norm( 15 | embeddings, axis=1, keepdims=True 16 | ) 17 | 18 | # Create and build the Annoy index 19 | self.index = AnnoyIndex(self.embedding_dim, "angular") 20 | for i, vec in enumerate(normalized_embeddings): 21 | self.index.add_item(i, vec) 22 | self.index.build(num_trees) 23 | 24 | def get_nearest(self, query: str, k: int = 10): 25 | # Embed and normalize the query 26 | query_emb = embed_texts([query]) 27 | normalized_query_emb = query_emb / np.linalg.norm( 28 | query_emb, axis=1, keepdims=True 29 | ) 30 | 31 | # Get nearest neighbors 32 | nearest_indices = self.index.get_nns_by_vector(normalized_query_emb[0], k) 33 | 34 | return [self.texts[i] for i in nearest_indices] 35 | 36 | 37 | class BasicEmbeddingsRAG: 38 | def __init__(self, texts, embeddings): 39 | self.texts = texts 40 | # Normalize embeddings for cosine similarity 41 | self.embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True) 42 | 43 | def get_nearest(self, query: str, k: int = 10): 44 | query_emb = embed_texts([query]) 45 | # Normalize query embedding 46 | query_emb = query_emb / np.linalg.norm(query_emb, axis=1, keepdims=True) 47 | 48 | # Calculate cosine similarity 49 | similarity = np.dot(query_emb, self.embeddings.T).flatten() 50 | 51 | # Get top k indices, sorted by similarity 52 | topk_indices_unsorted = np.argpartition(similarity, -k)[-k:] 53 | topk_indices_sorted = sorted( 54 | topk_indices_unsorted, key=lambda i: similarity[i], reverse=True 55 | ) 56 | 57 | return [self.texts[i] for i in topk_indices_sorted] 58 | 59 | 60 | if __name__ == "__main__": 61 | query = "AI is rogue" 62 | run_id = "1" 63 | 64 | print(f"Loading data for run_id: {run_id}...") 65 | with open(f"data/jokes_{run_id}.txt", "r") as f: 66 | jokes = [line.strip() for line in f.readlines()] 67 | embeddings = np.load(f"data/embeddings_{run_id}.npy") 68 | print("Data loaded.") 69 | 70 | # --- Annoy RAG --- 71 | print("\n--- Using AnnoyRAG ---") 72 | annoy_rag = AnnoyRAG(jokes, embeddings) 73 | 74 | start_time = time.time() 75 | nearest_annoy = annoy_rag.get_nearest(query, k=10) 76 | end_time = time.time() 77 | 78 | print(f"Time taken: {end_time - start_time:.6f} seconds") 79 | print(nearest_annoy) 80 | print("-" * 20) 81 | 82 | # --- Basic RAG for comparison --- 83 | print("\n--- Using BasicEmbeddingsRAG (Exact Search) ---") 84 | basic_rag = BasicEmbeddingsRAG(jokes, embeddings) 85 | 86 | start_time = time.time() 87 | nearest_basic = basic_rag.get_nearest(query, k=10) 88 | end_time = time.time() 89 | 90 | print(f"Time taken: {end_time - start_time:.6f} seconds") 91 | print(nearest_basic) 92 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t3-multi_out_refine.py: -------------------------------------------------------------------------------- ```python 1 | import dspy 2 | import asyncio 3 | from print_utils import print 4 | from typing import List 5 | from pydantic import BaseModel, Field 6 | 7 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) 8 | dspy.configure_cache( 9 | enable_disk_cache=False, 10 | enable_memory_cache=False, 11 | ) 12 | 13 | 14 | class JokeIdea(BaseModel): 15 | setup: str 16 | contradiction: str 17 | punchline: str 18 | 19 | 20 | class QueryToIdea(dspy.Signature): 21 | """ 22 | You are a funny comedian and your goal is to generate a nice structure for a joke. 23 | 24 | """ 25 | 26 | query: str = dspy.InputField() 27 | joke_idea: JokeIdea = dspy.OutputField() 28 | 29 | 30 | class IdeaToJoke(dspy.Signature): 31 | """ 32 | You are a funny comedian who likes to tell stories before delivering a punchline. 33 | You are always funny and act on the input joke idea. 34 | """ 35 | 36 | joke_idea: JokeIdea = dspy.InputField() 37 | joke: str = dspy.OutputField( 38 | description="The full joke delivery in the comedian's voice" 39 | ) 40 | 41 | 42 | class JokeJudge(dspy.Signature): 43 | """Rank each joke idea between 1-N. 44 | Rank 1 is the most unique and funniest.""" 45 | 46 | joke_idea: List[JokeIdea] = dspy.InputField() 47 | joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") 48 | 49 | 50 | def check_score_goodness(args, pred): 51 | num_samples = len(args["joke_idea"]) 52 | same_length = len(pred.joke_ratings) == num_samples 53 | all_ranks_present = all([(i+1) in pred.joke_ratings for i in range(num_samples)]) 54 | return 1 if (same_length and all_ranks_present) else 0 55 | 56 | 57 | class ConditionalJokeGenerator(dspy.Module): 58 | def __init__(self, num_samples=3): 59 | self.query_to_idea = dspy.ChainOfThought(QueryToIdea) 60 | self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) 61 | self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) 62 | self.judge = dspy.Refine( 63 | module=dspy.ChainOfThought(JokeJudge), 64 | N=3, 65 | reward_fn=check_score_goodness, 66 | threshold=1, 67 | ) 68 | 69 | self.num_samples = num_samples 70 | 71 | async def aforward(self, query: str): 72 | 73 | joke_ideas = await asyncio.gather( 74 | *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)] 75 | ) 76 | 77 | print("Generated Joke Ideas: \n", joke_ideas) 78 | 79 | judge_score = self.judge(joke_idea=joke_ideas).joke_ratings 80 | print("Judge Score for each: ", judge_score) 81 | 82 | best_joke_idea_idx = judge_score.index(1) 83 | 84 | print("Selected Index: ", best_joke_idea_idx) 85 | selected_joke_idea = joke_ideas[best_joke_idea_idx] 86 | print("Selected Joke Idea: \n", selected_joke_idea) 87 | 88 | joke = self.idea_to_joke(joke_idea=selected_joke_idea) 89 | 90 | # Run with a different LLM 91 | # with dspy.context(lm=dspy.LM("gemini/gemini-1.5-pro")): 92 | # joke = self.idea_to_joke(joke_idea=joke_idea) 93 | 94 | return joke 95 | 96 | 97 | async def main(): 98 | joke_generator = ConditionalJokeGenerator() 99 | joke = await joke_generator.acall( 100 | query="Write a joke about AI that has to do with them turning rogue." 101 | ) 102 | 103 | print("---") 104 | print(joke) 105 | 106 | 107 | if __name__ == "__main__": 108 | asyncio.run(main()) 109 | ``` -------------------------------------------------------------------------------- /level4_tools/idea_gen.py: -------------------------------------------------------------------------------- ```python 1 | 2 | import dspy 3 | import asyncio 4 | from print_utils import print 5 | from typing import List, Optional 6 | from pydantic import BaseModel, Field 7 | from tools import fetch_recent_news 8 | 9 | class JokeIdea(BaseModel): 10 | setup: str 11 | contradiction: str 12 | punchline: str 13 | 14 | 15 | class QueryToIdea(dspy.Signature): 16 | """ 17 | You are a funny comedian and your goal is to generate a nice structure for a joke. 18 | 19 | """ 20 | 21 | query: str = dspy.InputField() 22 | joke_idea: JokeIdea = dspy.OutputField() 23 | 24 | 25 | class IdeaToJoke(dspy.Signature): 26 | """ 27 | You are a funny comedian who likes to tell stories before delivering a punchline. 28 | You are always funny and act on the input joke idea. 29 | If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. 30 | """ 31 | 32 | joke_idea: JokeIdea = dspy.InputField() 33 | joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") 34 | joke: str = dspy.OutputField( 35 | description="The full joke delivery in the comedian's voice" 36 | ) 37 | 38 | 39 | class JokeJudge(dspy.Signature): 40 | """Rank each joke idea between 1-N. 41 | Rank 1 is the most unique and funniest.""" 42 | 43 | joke_idea: List[JokeIdea] = dspy.InputField() 44 | joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") 45 | 46 | 47 | def check_score_goodness(args, pred): 48 | num_samples = len(args["joke_idea"]) 49 | same_length = len(pred.joke_ratings) == num_samples 50 | all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) 51 | return 1 if (same_length and all_ranks_present) else 0 52 | 53 | 54 | class IdeaGenerator(dspy.Module): 55 | def __init__(self, num_samples=3): 56 | self.query_to_idea = dspy.ReAct(QueryToIdea, 57 | tools=[fetch_recent_news], 58 | max_iters=1) 59 | self.judge = dspy.Refine( 60 | module=dspy.ChainOfThought(JokeJudge), 61 | N=3, reward_fn=check_score_goodness, threshold=1, 62 | ) 63 | 64 | self.query_to_idea.set_lm( 65 | lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) 66 | ) 67 | self.judge.set_lm( 68 | lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) 69 | ) 70 | 71 | self.num_samples = num_samples 72 | 73 | async def acall(self, query: str) -> JokeIdea: 74 | 75 | joke_ideas = await asyncio.gather( 76 | *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)] 77 | ) 78 | 79 | print("Generated Joke Ideas: \n", joke_ideas) 80 | 81 | judge_score = self.judge(joke_idea=joke_ideas).joke_ratings 82 | print("Judge Score for each: ", judge_score) 83 | 84 | best_joke_idea_idx = judge_score.index(1) 85 | selected_joke_idea = joke_ideas[best_joke_idea_idx] 86 | print("Selected Joke Idea: \n", selected_joke_idea) 87 | 88 | return selected_joke_idea 89 | 90 | async def main(): 91 | joke_generator = ConditionalJokeGenerator() 92 | joke = await joke_generator.acall( 93 | query="Write a joke about AI that has to do with them turning rogue." 94 | ) 95 | 96 | print("---") 97 | print(joke) 98 | 99 | 100 | if __name__ == "__main__": 101 | asyncio.run(main()) 102 | ``` -------------------------------------------------------------------------------- /level5_rags/idea_gen.py: -------------------------------------------------------------------------------- ```python 1 | 2 | import dspy 3 | import asyncio 4 | from print_utils import print 5 | from typing import List, Optional 6 | from pydantic import BaseModel, Field 7 | from tools import fetch_recent_news 8 | 9 | class JokeIdea(BaseModel): 10 | setup: str 11 | contradiction: str 12 | punchline: str 13 | 14 | 15 | class QueryToIdea(dspy.Signature): 16 | """ 17 | You are a funny comedian and your goal is to generate a nice structure for a joke. 18 | You are given some sample punchlines from diverse topic ranges, you can use these punchlines to make your own jokes about the specific query. 19 | """ 20 | 21 | query: str = dspy.InputField(desc="The theme of the joke") 22 | joke_idea: JokeIdea = dspy.OutputField() 23 | 24 | 25 | class IdeaToJoke(dspy.Signature): 26 | """ 27 | You are a funny comedian who likes to tell stories before delivering a punchline. 28 | You are always funny and act on the input joke idea. 29 | If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. 30 | """ 31 | 32 | joke_idea: JokeIdea = dspy.InputField() 33 | joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") 34 | joke: str = dspy.OutputField( 35 | description="The full joke delivery in the comedian's voice" 36 | ) 37 | 38 | 39 | class JokeJudge(dspy.Signature): 40 | """Rank each joke idea between 1-N. 41 | Rank 1 is the most unique and funniest.""" 42 | 43 | joke_idea: List[JokeIdea] = dspy.InputField() 44 | joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") 45 | 46 | 47 | def check_score_goodness(args, pred): 48 | num_samples = len(args["joke_idea"]) 49 | same_length = len(pred.joke_ratings) == num_samples 50 | all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) 51 | return 1 if (same_length and all_ranks_present) else 0 52 | 53 | 54 | class IdeaGenerator(dspy.Module): 55 | def __init__(self, num_samples=3): 56 | self.query_to_idea = dspy.ReAct(QueryToIdea, 57 | tools=[fetch_recent_news], 58 | max_iters=1) 59 | self.judge = dspy.Refine( 60 | module=dspy.ChainOfThought(JokeJudge), 61 | N=3, reward_fn=check_score_goodness, threshold=1, 62 | ) 63 | 64 | self.query_to_idea.set_lm( 65 | lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) 66 | ) 67 | self.judge.set_lm( 68 | lm=dspy.LM("openai/gpt-4.1-mini", temperature=1) 69 | ) 70 | 71 | self.num_samples = num_samples 72 | 73 | async def acall(self, query: str) -> JokeIdea: 74 | 75 | joke_ideas = await asyncio.gather( 76 | *[self.query_to_idea.acall(query=query) for _ in range(self.num_samples)] 77 | ) 78 | 79 | print("Generated Joke Ideas: \n", joke_ideas) 80 | 81 | judge_score = self.judge(joke_idea=joke_ideas).joke_ratings 82 | print("Judge Score for each: ", judge_score) 83 | 84 | best_joke_idea_idx = judge_score.index(1) 85 | selected_joke_idea = joke_ideas[best_joke_idea_idx] 86 | print("Selected Joke Idea: \n", selected_joke_idea) 87 | 88 | return selected_joke_idea.joke_idea 89 | 90 | async def main(): 91 | joke_generator = QueryToIdea() 92 | joke = await joke_generator.acall( 93 | query="Write a joke about AI that has to do with them turning rogue." 94 | ) 95 | 96 | print("---") 97 | print(joke) 98 | 99 | 100 | if __name__ == "__main__": 101 | asyncio.run(main()) 102 | ``` -------------------------------------------------------------------------------- /level2_multi_interaction/t4_reflection.py: -------------------------------------------------------------------------------- ```python 1 | import time 2 | import dspy 3 | import asyncio 4 | 5 | from dspy.teleprompt.mipro_optimizer_v2 import select 6 | from print_utils import print 7 | from typing import List, Optional 8 | from pydantic import BaseModel, Field 9 | 10 | # Uncomment this to use mlflow 11 | import mlflow 12 | mlflow.autolog() 13 | mlflow.set_tracking_uri("http://127.0.0.1:5000") 14 | mlflow.set_experiment("Reflection") 15 | 16 | 17 | dspy.configure(lm=dspy.LM("openai/gpt-4.1-mini"), temperature=1) 18 | dspy.configure_cache( 19 | enable_disk_cache=False, 20 | enable_memory_cache=False, 21 | ) 22 | 23 | 24 | class JokeIdea(BaseModel): 25 | setup: str 26 | contradiction: str 27 | punchline: str 28 | 29 | 30 | class QueryToIdea(dspy.Signature): 31 | """ 32 | You are a funny comedian and your goal is to generate a nice structure for a joke. 33 | 34 | """ 35 | 36 | query: str = dspy.InputField() 37 | joke_idea: JokeIdea = dspy.OutputField() 38 | 39 | 40 | class IdeaToJoke(dspy.Signature): 41 | """ 42 | You are a funny comedian who likes to tell stories before delivering a punchline. 43 | You are always funny and act on the input joke idea. 44 | If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. 45 | """ 46 | 47 | joke_idea: JokeIdea = dspy.InputField() 48 | joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") 49 | joke: str = dspy.OutputField( 50 | description="The full joke delivery in the comedian's voice" 51 | ) 52 | 53 | 54 | class JokeJudge(dspy.Signature): 55 | """Rank each joke idea between 1-N. 56 | Rank 1 is the most unique and funniest.""" 57 | 58 | joke_idea: List[JokeIdea] = dspy.InputField() 59 | joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") 60 | 61 | 62 | def check_score_goodness(args, pred): 63 | num_samples = len(args["joke_idea"]) 64 | same_length = len(pred.joke_ratings) == num_samples 65 | all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) 66 | return 1 if (same_length and all_ranks_present) else 0 67 | 68 | 69 | class ConditionalJokeGenerator(dspy.Module): 70 | def __init__(self, num_samples=2, num_reflection_steps=2): 71 | self.query_to_idea = dspy.ChainOfThought(QueryToIdea) 72 | self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) 73 | self.idea_to_joke.set_lm(lm=dspy.LM("openai/gpt-4.1", temperature=0.7)) 74 | self.judge = dspy.Refine( 75 | module=dspy.ChainOfThought(JokeJudge), 76 | N=3, reward_fn=check_score_goodness, threshold=1, 77 | ) 78 | 79 | self.num_samples = num_samples 80 | self.num_reflection_steps = num_reflection_steps 81 | 82 | 83 | async def aforward(self, query: str): 84 | 85 | joke_ideas = await asyncio.gather( 86 | *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)] 87 | ) 88 | 89 | raise Exception("Something went wrong") 90 | 91 | print("Generated Joke Ideas: \n", joke_ideas) 92 | 93 | judge_score = self.judge(joke_idea=joke_ideas).joke_ratings 94 | print("Judge Score for each: ", judge_score) 95 | 96 | best_joke_idea_idx = judge_score.index(1) 97 | selected_joke_idea = joke_ideas[best_joke_idea_idx] 98 | print("Selected Joke Idea: \n", selected_joke_idea) 99 | 100 | joke = None 101 | for _ in range(self.num_reflection_steps): 102 | joke = self.idea_to_joke(joke_idea=selected_joke_idea, 103 | joke_draft=joke) 104 | print(f"iteration: {_}: Joke: {joke}") 105 | return joke 106 | 107 | 108 | async def main(): 109 | joke_generator = ConditionalJokeGenerator() 110 | start_time = time.time() 111 | joke = await joke_generator.acall( 112 | query="Write a joke about AI that has to do with them turning rogue." 113 | ) 114 | 115 | print("---") 116 | print(joke) 117 | print(time.time() - start_time) 118 | 119 | 120 | if __name__ == "__main__": 121 | asyncio.run(main()) 122 | ``` -------------------------------------------------------------------------------- /level3_evaluation/reflection.py: -------------------------------------------------------------------------------- ```python 1 | import time 2 | import dspy 3 | import asyncio 4 | import random 5 | import pandas as pd 6 | 7 | from print_utils import print 8 | from typing import List, Optional 9 | from pydantic import BaseModel, Field 10 | 11 | # import mlflow 12 | # mlflow.autolog() 13 | # mlflow.set_tracking_uri("http://127.0.0.1:5000") 14 | # mlflow.set_experiment("Reflection") 15 | 16 | dspy.configure(track_usage=True) 17 | dspy.configure_cache( 18 | enable_disk_cache=False, 19 | enable_memory_cache=False, 20 | ) 21 | 22 | 23 | class JokeIdea(BaseModel): 24 | setup: str 25 | contradiction: str 26 | punchline: str 27 | 28 | 29 | class QueryToIdea(dspy.Signature): 30 | """ 31 | You are a funny comedian and your goal is to generate a nice structure for a joke. 32 | 33 | """ 34 | 35 | query: str = dspy.InputField() 36 | joke_idea: JokeIdea = dspy.OutputField() 37 | 38 | 39 | class IdeaToJoke(dspy.Signature): 40 | """ 41 | You are a funny comedian who likes to tell stories before delivering a punchline. 42 | You are always funny and act on the input joke idea. 43 | If you are provided a draft of a joke, your goal should to make it make it funnier and more punchy. 44 | """ 45 | 46 | joke_idea: JokeIdea = dspy.InputField() 47 | joke_draft: Optional[str] = dspy.InputField(description="An existing joke that you need to either refine, or change") 48 | joke: str = dspy.OutputField( 49 | description="The full joke delivery in the comedian's voice" 50 | ) 51 | 52 | 53 | class JokeJudge(dspy.Signature): 54 | """Rank each joke idea between 1-N. 55 | Rank 1 is the most unique and funniest.""" 56 | 57 | joke_idea: List[JokeIdea] = dspy.InputField() 58 | joke_ratings: List[int] = dspy.OutputField(description="Rank between 1, 2, 3 ... N") 59 | 60 | 61 | def check_score_goodness(args, pred): 62 | num_samples = len(args["joke_idea"]) 63 | same_length = len(pred.joke_ratings) == num_samples 64 | all_ranks_present = all([(i + 1) in pred.joke_ratings for i in range(num_samples)]) 65 | return 1 if (same_length and all_ranks_present) else 0 66 | 67 | 68 | class ConditionalJokeGenerator(dspy.Module): 69 | def __init__(self, num_samples=2, num_reflection_steps=2, 70 | temperature=0.7, 71 | idea_lm="openai/gpt-4.1-mini", 72 | joke_lm="openai/gpt-4o"): 73 | self.query_to_idea = dspy.ChainOfThought(QueryToIdea) 74 | self.query_to_idea.set_lm(lm=dspy.LM(idea_lm, temperature=temperature)) 75 | 76 | self.idea_to_joke = dspy.ChainOfThought(IdeaToJoke) 77 | self.idea_to_joke.set_lm(lm=dspy.LM(joke_lm, temperature=temperature)) 78 | self.judge = dspy.Refine( 79 | module=dspy.ChainOfThought(JokeJudge), 80 | N=3, reward_fn=check_score_goodness, threshold=1, 81 | ) 82 | self.judge.set_lm(dspy.LM("openai/gpt-4.1-mini")) 83 | self.num_samples = num_samples 84 | self.num_reflection_steps = num_reflection_steps 85 | 86 | async def aforward(self, query: str): 87 | 88 | joke_ideas = await asyncio.gather( 89 | *[self.query_to_idea.aforward(query=query) for _ in range(self.num_samples)] 90 | ) 91 | 92 | print("Generated Joke Ideas: \n", joke_ideas) 93 | 94 | judge_score = self.judge(joke_idea=joke_ideas).joke_ratings 95 | print("Judge Score for each: ", judge_score) 96 | 97 | best_joke_idea_idx = judge_score.index(1) 98 | selected_joke_idea = joke_ideas[best_joke_idea_idx] 99 | print("Selected Joke Idea: \n", selected_joke_idea) 100 | 101 | joke = None 102 | for _ in range(self.num_reflection_steps): 103 | joke = self.idea_to_joke(joke_idea=selected_joke_idea, 104 | joke_draft=joke) 105 | print(joke) 106 | return joke 107 | 108 | 109 | async def main(): 110 | # Define hyperparameters 111 | joke_lms = ["openai/gpt-4.1", "gemini/gemini-1.5-pro"] 112 | idea_lms = ["openai/gpt-4.1-mini", "gemini/gemini-2.0-flash"] 113 | temperatures = [0.2, 0.7, 1.2] 114 | num_samples = [2, 3] 115 | num_reflection_steps = [1, 3] 116 | 117 | # Number of random combinations to test 118 | num_trials = 10 119 | 120 | # List to store results 121 | results = [] 122 | 123 | for i in range(num_trials): 124 | # Randomly select hyperparameters 125 | selected_joke_lm = random.choice(joke_lms) 126 | selected_idea_lm = random.choice(idea_lms) 127 | selected_temperature = random.choice(temperatures) 128 | selected_num_samples = random.choice(num_samples) 129 | selected_num_reflection_steps = random.choice(num_reflection_steps) 130 | 131 | print(f"Trial {i+1}/{num_trials}: Running with: joke_lm={selected_joke_lm}, idea_lm={selected_idea_lm}, temperature={selected_temperature}, num_samples={selected_num_samples}, num_reflection_steps={selected_num_reflection_steps}") 132 | 133 | # Instantiate the generator with selected hyperparameters 134 | joke_generator = ConditionalJokeGenerator( 135 | joke_lm=selected_joke_lm, 136 | idea_lm=selected_idea_lm, 137 | temperature=selected_temperature, 138 | num_samples=selected_num_samples, 139 | num_reflection_steps=selected_num_reflection_steps 140 | ) 141 | 142 | start_time = time.time() 143 | 144 | try: 145 | joke = await joke_generator.aforward( 146 | query="Write a joke about AI that has to do with them turning rogue." 147 | ) 148 | latency = time.time() - start_time 149 | results.append({ 150 | "joke_lm": selected_joke_lm, 151 | "idea_lm": selected_idea_lm, 152 | "temperature": selected_temperature, 153 | "num_samples": selected_num_samples, 154 | "num_reflection_steps": selected_num_reflection_steps, 155 | "joke": joke.joke, 156 | "latency": latency 157 | }) 158 | print(f"Finished in {latency:.2f} seconds.") 159 | 160 | except Exception as e: 161 | print(f"An error occurred: {e}") 162 | latency = time.time() - start_time 163 | results.append({ 164 | "joke_lm": selected_joke_lm, 165 | "idea_lm": selected_idea_lm, 166 | "temperature": selected_temperature, 167 | "num_samples": selected_num_samples, 168 | "num_reflection_steps": selected_num_reflection_steps, 169 | "joke": f"ERROR: {e}", 170 | "latency": latency 171 | }) 172 | 173 | # Create a DataFrame from the results 174 | df = pd.DataFrame(results) 175 | 176 | # Print the DataFrame 177 | print(df) 178 | 179 | # Save the DataFrame to a CSV file 180 | df.to_csv("evaluation_results.csv", index=False) 181 | 182 | 183 | 184 | if __name__ == "__main__": 185 | asyncio.run(main()) 186 | ```